MQTTNET sample
using MQTTnet;
using MQTTnet.Client;
using System;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Forms;
//client端
namespace MqttServerTest
{
public partial class Form1 : Form
{
static MqttFactory factory = new MqttFactory();
public Form1()
{
test();
}
public static async Task test() {
// Create a new MQTT client.
var mqttClient = factory.CreateMqttClient();
var options = new MqttClientOptionsBuilder()
.WithTcpServer("localhost", 1883) // Port is optional
.Build();
await mqttClient.ConnectAsync(options);
int count = 0;
var t=DateTime.Now;
var t1 = DateTime.Now;
while (true)
{
count++;
var message = new MqttApplicationMessageBuilder()
.WithTopic(count.ToString())
.WithPayload(t.ToString()+" "+t1.ToString())
.WithExactlyOnceQoS()
.WithRetainFlag()
.Build();
await mqttClient.PublishAsync(message);
//Thread.Sleep(1);
if (count == 100001)
{
t1 = DateTime.Now;
count = 0;
// break;
}
//else if (count == 20001)
//{
// break;
//}
}
mqttClient.ApplicationMessageReceived += (s, e) =>
{
Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###");
Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}");
Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}");
Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}");
Console.WriteLine();
};
Console.WriteLine("Press any key to exit.");
Console.ReadLine();
}
}
}
using MQTTnet;
using MQTTnet.Adapter;
using MQTTnet.Diagnostics;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
//server
namespace ConsoleApp4
{
class Program
{
static void Main(string[] args)
{
testAsync();
}
static bool st = true;
static DateTime ti = DateTime.Now;
private static async Task testAsync() {
var optionsBuilder = new MqttServerOptionsBuilder()
.WithConnectionBacklog(100)
.WithDefaultEndpointPort(1883);
var mqttServer = new MqttFactory().CreateMqttServer();
await mqttServer.StartAsync(optionsBuilder.Build());
var options = new MqttServerOptions();
options.ConnectionValidator = c =>
{
c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
};
// Extend the timestamp for all messages from clients.
options.ApplicationMessageInterceptor = context =>
{
if (MqttTopicFilterComparer.IsMatch(context.ApplicationMessage.Topic, "/myTopic/WithTimestamp/#"))
{
// Replace the payload with the timestamp. But also extending a JSON
// based payload with the timestamp is a suitable use case.
context.ApplicationMessage.Payload = Encoding.UTF8.GetBytes(DateTime.Now.ToString("O"));
}
};
mqttServer.ApplicationMessageReceived += (s, e) =>
{
Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###");
Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}");
Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}");
Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}");
};
Console.WriteLine("Press any key to exit.");
Console.ReadLine();
await mqttServer.StopAsync();
}
}
}