參考原文https://blog.csdn.net/chenlu5201314/article/details/94740765
參考原文https://zendei.com/article/79696.html
非同步連線方法
private async Task ConnectMqttServerAsync(string PROJECT_KEY)
{
try
{
if (client == null)
{
client = new MqttFactory().CreateManagedMqttClient();
}
string timestamp = DateTime.Now.ToString("yyyyMMddHH");
string clientID = "JAX_" + timestamp;
options = new ManagedMqttClientOptionsBuilder()
.WithAutoReconnectDelay(TimeSpan.FromSeconds(RECONNECT_TIME))
.WithClientOptions(new MqttClientOptionsBuilder()
.WithTcpServer(txtServerUri, portNotSsl)
.WithCommunicationTimeout(TimeSpan.FromSeconds(DEFAULT_CONNECT_TIMEOUT))
.WithCredentials(PROJECT_KEY, PROJECT_KEY)
.WithClientId(clientID)
.WithKeepAlivePeriod(TimeSpan.FromSeconds(DEFAULT_KEEPLIVE))
.WithCleanSession(false)
.WithProtocolVersion(MqttProtocolVersion.V311)
.Build())
.Build();
Debug.WriteLine("嘗試連接SERVER");
if (client.IsStarted)
{
Debug.WriteLine("stopAsync");
client.ApplicationMessageProcessedHandler = null;
client.ApplicationMessageReceivedHandler = null;
client.ConnectedHandler = null;
client.DisconnectedHandler = null;
await client.StopAsync();
}
Debug.WriteLine("注册事件");
// 注册事件
client.ApplicationMessageProcessedHandler =
new ApplicationMessageProcessedHandlerDelegate(
new Action<ApplicationMessageProcessedEventArgs>(
ApplicationMessageProcessedHandlerMethod)); // 消息发布回调
Debug.WriteLine("命令下发回调");
client.ApplicationMessageReceivedHandler =
new MqttApplicationMessageReceivedHandlerDelegate(
new Action<MqttApplicationMessageReceivedEventArgs>(
MqttApplicationMessageReceived)); // 命令下发回调
Debug.WriteLine("连接成功回调");
client.ConnectedHandler =
new MqttClientConnectedHandlerDelegate(
new Action<MqttClientConnectedEventArgs>(
OnMqttClientConnected)); // 连接成功回调
Debug.WriteLine("连接断开回调");
client.DisconnectedHandler =
new MqttClientDisconnectedHandlerDelegate(
new Action<MqttClientDisconnectedEventArgs>(
OnMqttClientDisconnected)); // 连接断开回调
reConnnectPROJECTKEY = PROJECT_KEY;
// 连接平台设备
await client.StartAsync(options);
}
catch (Exception ex)
{
Debug.WriteLine("connect to mqtt server fail" + Environment.NewLine);
}
}
實作上述四個Callback
private void ApplicationMessageProcessedHandlerMethod(ApplicationMessageProcessedEventArgs e)
{
try
{
if (e.HasFailed)
{
//Debug.WriteLine("publish messageId is " + e.ApplicationMessage.Id + ", topic: " + e.ApplicationMessage.ApplicationMessage.Topic + ", payload: " + Encoding.UTF8.GetString(e.ApplicationMessage.ApplicationMessage.Payload) + " is published fail");
}
else if (e.HasSucceeded)
{
//Debug.WriteLine("publish messageId " + e.ApplicationMessage.Id + ", topic: " + e.ApplicationMessage.ApplicationMessage.Topic + ", payload: " + Encoding.UTF8.GetString(e.ApplicationMessage.ApplicationMessage.Payload) + " is published success");
}
}
catch (Exception ex)
{
Debug.WriteLine("mqtt demo message publish error: " + ex.Message + Environment.NewLine);
}
}
private void OnMqttClientConnected(MqttClientConnectedEventArgs e)
{
Debug.WriteLine("connect to mqtt server success " + Environment.NewLine);
status = Status.connected;
WhenConnectedNotify();
}
private void OnMqttClientDisconnected(MqttClientDisconnectedEventArgs e)
{
status = Status.disconnected;
try
{
Debug.WriteLine("mqtt server is disconnected" + Environment.NewLine);
Debug.WriteLine("reconnect is starting" + Environment.NewLine);
Thread.Sleep(1000);
Task.Run(async () => { await ConnectMqttServerAsync(reConnnectPROJECTKEY); });
}
catch (Exception ex)
{
Debug.WriteLine("mqtt demo error: " + ex.Message + Environment.NewLine);
}
}
private void MqttApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
{
//Debug.WriteLine($"received message is {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}{Environment.NewLine}");
string msg = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
var appMsg = new MqttApplicationMessage();
appMsg.Payload = Encoding.UTF8.GetBytes(msg);
//appMsg.Topic = txtSubTopic;
appMsg.Topic = e.ApplicationMessage.Topic;
appMsg.QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce;
appMsg.Retain = false;
//// 上行响应
client.PublishAsync(appMsg).Wait();
/*
* 觸發事件,傳入msg到message事件
*/
ReceiveMessageNotify(msg);
}
封裝方法給連線、斷線使用
/*
* Connect/Disconnect
*/
public void connect(string PROJECT_KEY)
{
Task.Run(async () => { await ConnectMqttServerAsync(PROJECT_KEY); });
}
public void disconnect()
{
Task.Run(async () => { await DisconnectMqttServerAsync(); });
}
private async Task DisconnectMqttServerAsync()
{
//断开连接
await client.StopAsync();
}
訂閱主題
/*
* Subscribe
*/
public void subscribe(List<string> sourceList)
{
// if (string.IsNullOrEmpty(txtSubTopic))
if (sourceList.Count == 0)
{
Debug.WriteLine("订阅主题不能为空!");
return;
}
if (!client.IsConnected)
{
Debug.WriteLine("MQTT客户端尚未连接!");
return;
}
var topicFilterBulderPreTopic = new MqttTopicFilterBuilder().WithTopic("NA").Build();
List<MqttTopicFilter> listTopic = new List<MqttTopicFilter>();
for (int i = 0; i < sourceList.Count; i++)
{
string topictemp = "topic/rawdata";
topicFilterBulderPreTopic = new MqttTopicFilterBuilder().WithTopic(topictemp).Build();
listTopic.Add(topicFilterBulderPreTopic);
}
// 订阅Topic
client.SubscribeAsync(listTopic.ToArray()).Wait();
Debug.WriteLine($"topic : [{txtSubTopic}] is subscribe success" + Environment.NewLine);
}