參考原文https://blog.csdn.net/chenlu5201314/article/details/94740765
參考原文https://zendei.com/article/79696.html
非同步連線方法
private async Task ConnectMqttServerAsync(string PROJECT_KEY) { try { if (client == null) { client = new MqttFactory().CreateManagedMqttClient(); } string timestamp = DateTime.Now.ToString("yyyyMMddHH"); string clientID = "JAX_" + timestamp; options = new ManagedMqttClientOptionsBuilder() .WithAutoReconnectDelay(TimeSpan.FromSeconds(RECONNECT_TIME)) .WithClientOptions(new MqttClientOptionsBuilder() .WithTcpServer(txtServerUri, portNotSsl) .WithCommunicationTimeout(TimeSpan.FromSeconds(DEFAULT_CONNECT_TIMEOUT)) .WithCredentials(PROJECT_KEY, PROJECT_KEY) .WithClientId(clientID) .WithKeepAlivePeriod(TimeSpan.FromSeconds(DEFAULT_KEEPLIVE)) .WithCleanSession(false) .WithProtocolVersion(MqttProtocolVersion.V311) .Build()) .Build(); Debug.WriteLine("嘗試連接SERVER"); if (client.IsStarted) { Debug.WriteLine("stopAsync"); client.ApplicationMessageProcessedHandler = null; client.ApplicationMessageReceivedHandler = null; client.ConnectedHandler = null; client.DisconnectedHandler = null; await client.StopAsync(); } Debug.WriteLine("注册事件"); // 注册事件 client.ApplicationMessageProcessedHandler = new ApplicationMessageProcessedHandlerDelegate( new Action<ApplicationMessageProcessedEventArgs>( ApplicationMessageProcessedHandlerMethod)); // 消息发布回调 Debug.WriteLine("命令下发回调"); client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate( new Action<MqttApplicationMessageReceivedEventArgs>( MqttApplicationMessageReceived)); // 命令下发回调 Debug.WriteLine("连接成功回调"); client.ConnectedHandler = new MqttClientConnectedHandlerDelegate( new Action<MqttClientConnectedEventArgs>( OnMqttClientConnected)); // 连接成功回调 Debug.WriteLine("连接断开回调"); client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate( new Action<MqttClientDisconnectedEventArgs>( OnMqttClientDisconnected)); // 连接断开回调 reConnnectPROJECTKEY = PROJECT_KEY; // 连接平台设备 await client.StartAsync(options); } catch (Exception ex) { Debug.WriteLine("connect to mqtt server fail" + Environment.NewLine); } }
實作上述四個Callback
private void ApplicationMessageProcessedHandlerMethod(ApplicationMessageProcessedEventArgs e) { try { if (e.HasFailed) { //Debug.WriteLine("publish messageId is " + e.ApplicationMessage.Id + ", topic: " + e.ApplicationMessage.ApplicationMessage.Topic + ", payload: " + Encoding.UTF8.GetString(e.ApplicationMessage.ApplicationMessage.Payload) + " is published fail"); } else if (e.HasSucceeded) { //Debug.WriteLine("publish messageId " + e.ApplicationMessage.Id + ", topic: " + e.ApplicationMessage.ApplicationMessage.Topic + ", payload: " + Encoding.UTF8.GetString(e.ApplicationMessage.ApplicationMessage.Payload) + " is published success"); } } catch (Exception ex) { Debug.WriteLine("mqtt demo message publish error: " + ex.Message + Environment.NewLine); } } private void OnMqttClientConnected(MqttClientConnectedEventArgs e) { Debug.WriteLine("connect to mqtt server success " + Environment.NewLine); status = Status.connected; WhenConnectedNotify(); } private void OnMqttClientDisconnected(MqttClientDisconnectedEventArgs e) { status = Status.disconnected; try { Debug.WriteLine("mqtt server is disconnected" + Environment.NewLine); Debug.WriteLine("reconnect is starting" + Environment.NewLine); Thread.Sleep(1000); Task.Run(async () => { await ConnectMqttServerAsync(reConnnectPROJECTKEY); }); } catch (Exception ex) { Debug.WriteLine("mqtt demo error: " + ex.Message + Environment.NewLine); } } private void MqttApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e) { //Debug.WriteLine($"received message is {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}{Environment.NewLine}"); string msg = Encoding.UTF8.GetString(e.ApplicationMessage.Payload); var appMsg = new MqttApplicationMessage(); appMsg.Payload = Encoding.UTF8.GetBytes(msg); //appMsg.Topic = txtSubTopic; appMsg.Topic = e.ApplicationMessage.Topic; appMsg.QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce; appMsg.Retain = false; //// 上行响应 client.PublishAsync(appMsg).Wait(); /* * 觸發事件,傳入msg到message事件 */ ReceiveMessageNotify(msg); }
封裝方法給連線、斷線使用
/* * Connect/Disconnect */ public void connect(string PROJECT_KEY) { Task.Run(async () => { await ConnectMqttServerAsync(PROJECT_KEY); }); } public void disconnect() { Task.Run(async () => { await DisconnectMqttServerAsync(); }); } private async Task DisconnectMqttServerAsync() { //断开连接 await client.StopAsync(); }
訂閱主題
/* * Subscribe */ public void subscribe(List<string> sourceList) { // if (string.IsNullOrEmpty(txtSubTopic)) if (sourceList.Count == 0) { Debug.WriteLine("订阅主题不能为空!"); return; } if (!client.IsConnected) { Debug.WriteLine("MQTT客户端尚未连接!"); return; } var topicFilterBulderPreTopic = new MqttTopicFilterBuilder().WithTopic("NA").Build(); List<MqttTopicFilter> listTopic = new List<MqttTopicFilter>(); for (int i = 0; i < sourceList.Count; i++) { string topictemp = "topic/rawdata"; topicFilterBulderPreTopic = new MqttTopicFilterBuilder().WithTopic(topictemp).Build(); listTopic.Add(topicFilterBulderPreTopic); } // 订阅Topic client.SubscribeAsync(listTopic.ToArray()).Wait(); Debug.WriteLine($"topic : [{txtSubTopic}] is subscribe success" + Environment.NewLine); }