[C#]MQTTnet使用方法

參考原文https://blog.csdn.net/chenlu5201314/article/details/94740765

參考原文https://zendei.com/article/79696.html

非同步連線方法

private async Task ConnectMqttServerAsync(string PROJECT_KEY)
        {

            try
            {
                if (client == null)
                {
                    client = new MqttFactory().CreateManagedMqttClient();
                }

                string timestamp = DateTime.Now.ToString("yyyyMMddHH");
                string clientID = "JAX_" + timestamp;

                options = new ManagedMqttClientOptionsBuilder()
                .WithAutoReconnectDelay(TimeSpan.FromSeconds(RECONNECT_TIME))
                .WithClientOptions(new MqttClientOptionsBuilder()
                .WithTcpServer(txtServerUri, portNotSsl)
                .WithCommunicationTimeout(TimeSpan.FromSeconds(DEFAULT_CONNECT_TIMEOUT))
                .WithCredentials(PROJECT_KEY, PROJECT_KEY)
                .WithClientId(clientID)
                .WithKeepAlivePeriod(TimeSpan.FromSeconds(DEFAULT_KEEPLIVE))
                .WithCleanSession(false)
                .WithProtocolVersion(MqttProtocolVersion.V311)
                .Build())
                .Build();


                Debug.WriteLine("嘗試連接SERVER");

                if (client.IsStarted)
                {
                    Debug.WriteLine("stopAsync");
                    client.ApplicationMessageProcessedHandler = null;
                    client.ApplicationMessageReceivedHandler = null;
                    client.ConnectedHandler = null;
                    client.DisconnectedHandler = null;
                    await client.StopAsync();
                }

                Debug.WriteLine("注册事件");
                // 注册事件
                client.ApplicationMessageProcessedHandler =
                    new ApplicationMessageProcessedHandlerDelegate(
                        new Action<ApplicationMessageProcessedEventArgs>(
                            ApplicationMessageProcessedHandlerMethod)); // 消息发布回调

                Debug.WriteLine("命令下发回调");
                client.ApplicationMessageReceivedHandler =
                    new MqttApplicationMessageReceivedHandlerDelegate(
                        new Action<MqttApplicationMessageReceivedEventArgs>(
                            MqttApplicationMessageReceived)); // 命令下发回调

                Debug.WriteLine("连接成功回调");
                client.ConnectedHandler =
                    new MqttClientConnectedHandlerDelegate(
                        new Action<MqttClientConnectedEventArgs>(
                            OnMqttClientConnected)); // 连接成功回调

                Debug.WriteLine("连接断开回调");
                client.DisconnectedHandler =
                    new MqttClientDisconnectedHandlerDelegate(
                        new Action<MqttClientDisconnectedEventArgs>(
                            OnMqttClientDisconnected)); // 连接断开回调

                reConnnectPROJECTKEY = PROJECT_KEY;
                // 连接平台设备
                await client.StartAsync(options);

            }
            catch (Exception ex)
            {
                Debug.WriteLine("connect to mqtt server fail" + Environment.NewLine);
            }
        }

實作上述四個Callback

        private void ApplicationMessageProcessedHandlerMethod(ApplicationMessageProcessedEventArgs e)
        {
            try
            {
                if (e.HasFailed)
                {
                    //Debug.WriteLine("publish messageId is " + e.ApplicationMessage.Id + ", topic: " + e.ApplicationMessage.ApplicationMessage.Topic + ", payload: " + Encoding.UTF8.GetString(e.ApplicationMessage.ApplicationMessage.Payload) + " is published fail");

                }
                else if (e.HasSucceeded)
                {
                    //Debug.WriteLine("publish messageId " + e.ApplicationMessage.Id + ", topic: " + e.ApplicationMessage.ApplicationMessage.Topic + ", payload: " + Encoding.UTF8.GetString(e.ApplicationMessage.ApplicationMessage.Payload) + " is published success");

                }
            }
            catch (Exception ex)
            {
                Debug.WriteLine("mqtt demo message publish error: " + ex.Message + Environment.NewLine);

            }

        }

        private void OnMqttClientConnected(MqttClientConnectedEventArgs e)
        {
            Debug.WriteLine("connect to mqtt server success " + Environment.NewLine);
            status = Status.connected;
            WhenConnectedNotify();
        }

        private void OnMqttClientDisconnected(MqttClientDisconnectedEventArgs e)
        {
            status = Status.disconnected;
            try
            {
                Debug.WriteLine("mqtt server is disconnected" + Environment.NewLine);
                Debug.WriteLine("reconnect is starting" + Environment.NewLine);
                Thread.Sleep(1000);
                Task.Run(async () => { await ConnectMqttServerAsync(reConnnectPROJECTKEY); });

            }
            catch (Exception ex)
            {
                Debug.WriteLine("mqtt demo error: " + ex.Message + Environment.NewLine);
            }
        }

        private void MqttApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
        {

            //Debug.WriteLine($"received message is {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}{Environment.NewLine}");

            string msg = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);


            var appMsg = new MqttApplicationMessage();
            appMsg.Payload = Encoding.UTF8.GetBytes(msg);
            //appMsg.Topic = txtSubTopic;
            appMsg.Topic = e.ApplicationMessage.Topic;

            appMsg.QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce;
            appMsg.Retain = false;

            //// 上行响应
            client.PublishAsync(appMsg).Wait();

            /*
             * 觸發事件,傳入msg到message事件
             */
            ReceiveMessageNotify(msg);

        }

封裝方法給連線、斷線使用

    /*
     * Connect/Disconnect
     */

    public void connect(string PROJECT_KEY)
    {
        Task.Run(async () => { await ConnectMqttServerAsync(PROJECT_KEY); });
    }
    public void disconnect()
    {
        Task.Run(async () => { await DisconnectMqttServerAsync(); });
    }

    private async Task DisconnectMqttServerAsync()
    {
        //断开连接
        await client.StopAsync();
    }

訂閱主題

    /*
     * Subscribe
     */

    public void subscribe(List<string> sourceList)
    {
        // if (string.IsNullOrEmpty(txtSubTopic))
        if (sourceList.Count == 0)
        {
            Debug.WriteLine("订阅主题不能为空!");
            return;
        }

        if (!client.IsConnected)
        {
            Debug.WriteLine("MQTT客户端尚未连接!");
            return;
        }

        
        var topicFilterBulderPreTopic = new MqttTopicFilterBuilder().WithTopic("NA").Build();
        List<MqttTopicFilter> listTopic = new List<MqttTopicFilter>();

        for (int i = 0; i < sourceList.Count; i++)
        {
            string topictemp = "topic/rawdata";
            topicFilterBulderPreTopic = new MqttTopicFilterBuilder().WithTopic(topictemp).Build();
            listTopic.Add(topicFilterBulderPreTopic);
        }

        // 订阅Topic
        client.SubscribeAsync(listTopic.ToArray()).Wait();

        Debug.WriteLine($"topic : [{txtSubTopic}] is subscribe success" + Environment.NewLine);

    }

發佈留言