Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WebSocket .NET 5.0 connection and publish issues #1197

Open
xeonfusion opened this issue Jul 17, 2021 · 7 comments
Open

WebSocket .NET 5.0 connection and publish issues #1197

xeonfusion opened this issue Jul 17, 2021 · 7 comments
Labels
question It is a question regarding the project

Comments

@xeonfusion
Copy link

Describe your question

There are issues with Managed client and WebSockets, it fails (or takes a long time more than 30-60 secs) to connect and publish to a test Mosquitto broker, with .NET 5.0 on Windows 10, and fails on Mac OS X. It does work sometimes if .NET standard framework or Mono is used on Mac OS X or Linux.

I suspect platform specific code issues with the ManagedClient code (messages getting enqueued without actually being published, Task completion or threading issues?). The examples for ManagedClient here: https://dzone.com/articles/mqtt-publishing-and-subscribing-messages-to-mqtt-b and https://github.com/chkr1011/MQTTnet/wiki/ManagedClient were having these issues with dotnet core.

I managed to get the code working only using a Task completion and Task.ContinueWith() approach as shown here (@chkr1011 is this an optimal approach?):

            CancellationTokenSource source = new CancellationTokenSource();
            CancellationToken token = source.Token;

            var mqttClient = new MqttFactory().CreateMqttClient();
            var managedClient = new ManagedMqttClient(mqttClient, new MqttNetLogger());

            try
            {
                var task = Task.Run(async () =>
                {
                    var connected = GetConnectedTask(managedClient);
                    await ConnectMQTTAsync(managedClient, token, m_MQTTUrl, m_MQTTclientId, m_MQTTuser, m_MQTTpassw);
                    await connected;

                });

                task.ContinueWith(antecedent => {
                    if (antecedent.Status == TaskStatus.RanToCompletion)
                    {
                        Task.Run(async () =>
                        {
                            await PublishMQTTAsync(managedClient, token, m_MQTTtopic, serializedJSON);
                            await managedClient.StopAsync();
                        });
                    }
                });

            }

            catch (Exception _Exception)
            {
                Console.WriteLine("Exception caught in process: {0}", _Exception.ToString());
            }
public static async Task ConnectMQTTAsync(ManagedMqttClient mqttClient, CancellationToken token, string mqtturl, string clientId, string mqttuser, string mqttpassw)
        {
            bool mqttSecure = true;

            var messageBuilder = new MqttClientOptionsBuilder()
            .WithClientId(clientId)
            .WithCredentials(mqttuser, mqttpassw)
            .WithCommunicationTimeout(new TimeSpan(0, 0, 5))
            .WithKeepAlivePeriod(TimeSpan.FromSeconds(1200))
            .WithWebSocketServer(mqtturl)
            .WithCleanSession();

            var options = mqttSecure
            ? messageBuilder
                .WithTls()
                .Build()
            : messageBuilder
                .Build();

            var managedOptions = new ManagedMqttClientOptionsBuilder()
              .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
              .WithClientOptions(options)
              .Build();

            await mqttClient.StartAsync(managedOptions);

        }

        Task GetConnectedTask(ManagedMqttClient managedClient)
        {
            TaskCompletionSource<bool> connected = new TaskCompletionSource<bool>();
            managedClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(e =>
            {
                managedClient.ConnectedHandler = null;
                connected.SetResult(true);
            });
            return connected.Task;
        }

        public static async Task PublishMQTTAsync(ManagedMqttClient mqttClient, CancellationToken token, string topic, string payload, bool retainFlag = true, int qos = 1)
        {
            if (mqttClient.IsConnected)
            {
                await mqttClient.PublishAsync(new MqttApplicationMessageBuilder()
                 .WithTopic(topic)
                 .WithPayload(payload)
                 .WithQualityOfServiceLevel((MQTTnet.Protocol.MqttQualityOfServiceLevel)qos)
                 .WithRetainFlag(retainFlag)
                 .Build(), token);
            }
        }

Which project is your question related to?

  • ManagedClient
@xeonfusion xeonfusion added the question It is a question regarding the project label Jul 17, 2021
@lazydino
Copy link

I am experiencing the same kind of connection issues running a simple .net 5.0 console MQTT client against a mosquitto broker in linux via websockets. I am able to verify the websockets connectivity through a test client like https://hezhii.github.io/mqttjs-client/ , but the .net client would fail (connection time out) in 10 seconds or so. The managed client failed too.

Here is the options for the managed mqtt.net websockets client I use.

var options = new ManagedMqttClientOptionsBuilder()

            .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
            .WithStorage(default(IManagedMqttClientStorage)) // TODO: need to implement.
            .WithClientOptions(new MQTTnet.Client.Options.MqttClientOptionsBuilder()
                .WithClientId(chatid)
                ////.WithCredentials("Your Secret Here")
                .WithKeepAlivePeriod(TimeSpan.FromHours(24))
                //.WithKeepAliveSendInterval(TimeSpan.FromSeconds(5))
                .WithCleanSession()
                .WithWebSocketServer(mosquittourl)
                .WithTls()
                .Build()
            )
            .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
            .Build();

My goal is to utilize the very same .NET 5 client code for realtime communications between web clients and desktops. One runs on destop or devices as IoT applications, and the other runs as a Blazor web app via webassembly in the browser. Would MQTT.net support this type of inter-operable scenario on the client side?

@fabioGulfMike
Copy link

fabioGulfMike commented Oct 18, 2021

I am experiencing a similar problem using this libs on Blazor Progressive Web Applications. I get stable connection with a broker (using VerneMQ on a local docker container) but it do not publish any message.

The code await mqttClient.PublishAsync(MqttMessage, CancellationToken.None); returns a success code, but using another MQTT client, the message do not reach the topic. The UseApplicationMessageReceivedHandler works perfectly!

Just for test, let me share my code (I used MudBlazor as web design):

Index.razor

<MudGrid Justify="Justify.FlexStart" Spacing="0">
      <MudItem xs="12">
          <MudGrid Justify="Justify.FlexStart" Spacing="0">
              <!-- Server -->
              <MudItem xs="6">
                  <MudPaper Class="pa-3 ma-3" Elevation="3">
                      <MudGrid Justify="Justify.FlexStart" Spacing="1" Style="align-items:center;">
                          <MudItem xs="1">
                              <MudIcon Icon="@Icons.Material.Filled.Circle" Size="Size.Small" Color="@ConnectionStatus" />
                          </MudItem>
                          <MudItem xs="7">
                              <MudTextField @bind-Value="model.Server" Label="Server" Variant="Variant.Outlined" Adornment="Adornment.Start" AdornmentIcon="@Icons.Material.Filled.Computer" AdornmentColor="Color.Warning" />
                          </MudItem>
                          <MudItem xs="4">
                              <MudButton FullWidth="true" Variant="Variant.Filled" Color="Color.Primary">@ButtonConnectText</MudButton>
                          </MudItem>
                          <MudSpacer />
                      </MudGrid>
                  </MudPaper>
              </MudItem> <!-- Server -->
              <!-- User/Password -->
              <MudItem xs="6">
                  <MudPaper Class="pa-3 ma-3" Elevation="3">
                      <MudGrid Justify="Justify.FlexStart" Spacing="1" Style="align-items:center;">
                          <MudItem xs="4">
                              <MudSwitch @bind-Checked="@Authenticated" Label="@(Authenticated ? "User" : "No User")" />
                          </MudItem>
                          <MudItem xs="4">
                              <MudTextField @bind-Value="model.Username" Label="User" Variant="Variant.Outlined" Disabled="@(!Authenticated)" />
                          </MudItem>
                          <MudItem xs="4">
                              <MudTextField @bind-Value="model.Password" Label="Password" Variant="Variant.Outlined" InputType="@PasswordInput" Adornment="Adornment.End" AdornmentIcon="@PasswordInputIcon" OnAdornmentClick="ShowPasswordButton" Disabled="@(!Authenticated)" />
                          </MudItem>
                      </MudGrid>
                  </MudPaper>
              </MudItem> <!-- User/Password -->
          </MudGrid>
      </MudItem>
  </MudGrid>

@code{
    MqttFactory factory = new MqttFactory();
    ManagedMqttClient mqttClient { get; set; }
    ManagedMqttClientOptions ManagedOptions;

    IMqttClientOptions clientOptions;
    MqttClientCredentials ClientCredentials;

    private async Task Connect()
    {
        var server = model.Server;
        var port = model.Port;

        Console.WriteLine($"--> Connecting to server {server}...");

        if (!string.IsNullOrEmpty(model.Username) && !string.IsNullOrEmpty(model.Password))
        {
            ClientCredentials = new MqttClientCredentials();
            ClientCredentials.Username = model.Username;
            ClientCredentials.Password = Encoding.ASCII.GetBytes(model.Password);

            clientOptions = new MqttClientOptionsBuilder()
                            .WithClientId("Client1")
                            .WithCredentials(ClientCredentials)
                            .WithCommunicationTimeout(new TimeSpan(0, 0, 5))
                            .WithKeepAlivePeriod(TimeSpan.FromSeconds(1200))
                            .WithCleanSession()
                            //.WithTls()
                            .WithWebSocketServer($"{server}:8080/mqtt")
                            .Build();
        }
        else
        {
            clientOptions = new MqttClientOptionsBuilder()
                .WithClientId("Client1")
                .WithCommunicationTimeout(new TimeSpan(0, 0, 5))
                .WithKeepAlivePeriod(TimeSpan.FromSeconds(1200))
                .WithCleanSession()
                .WithWebSocketServer($"{server}:8080/mqtt")
                .Build();
        }

        ManagedOptions = new ManagedMqttClientOptionsBuilder()
                        .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
                        .WithClientOptions(clientOptions)
                        .Build();

        mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(OnConnected);
        mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(OnDisconnected);
        mqttClient.ConnectingFailedHandler = new ConnectingFailedHandlerDelegate(OnConnectingFailed);

        var topicFilter = new MqttTopicFilterBuilder()
        .WithTopic("YOUR_TOPIC")
        .Build();

        await mqttClient.SubscribeAsync(topicFilter);
        mqttClient.UseApplicationMessageReceivedHandler(ReceiveMessage);

        try
        {
            await mqttClient.StartAsync(ManagedOptions);
        }
        catch (Exception e)
        {
            Console.WriteLine($"--> Not connected. Error: {e.Message}");

            this.ButtonConnectText = "Connect";
        }
    }

    void OnConnected(MqttClientConnectedEventArgs obj)
    {
        ConnectionStatus = Color.Success;
        this.ButtonConnectText = "Disconnect";
        Console.WriteLine("--> Connected!");
        StateHasChanged();
    }

    void OnConnectingFailed(ManagedProcessFailedEventArgs obj)
    {
        ConnectionStatus = Color.Error;
        Console.WriteLine($"--> Not connected! Error: {obj.Exception}");
        this.ButtonConnectText = "Connect";
        StateHasChanged();
    }

    void OnDisconnected(MqttClientDisconnectedEventArgs obj)
    {
        ConnectionStatus = Color.Error;
        Console.WriteLine($"--> Not connected! Error: {obj.Reason}");
        this.ButtonConnectText = "Connect";
        StateHasChanged();
    }

    async Task PublishMessage(string message, string topic, bool retain = false, int qos = 0)
    {
        try
        {
            var MqttMessage = new MqttApplicationMessageBuilder()
                                    .WithTopic(topic)
                                    .WithPayload(message)
                                    .WithQualityOfServiceLevel((MQTTnet.Protocol.MqttQualityOfServiceLevel)qos)
                                    .WithRetainFlag(retain)
                                    .Build();

            if (mqttClient.IsConnected)
            {
                var RESULT = await mqttClient.PublishAsync(MqttMessage, CancellationToken.None);

                Console.WriteLine($"--> Publish: {RESULT.ReasonString} (code {RESULT.ReasonCode}) - {JsonConvert.SerializeObject(RESULT.UserProperties)}");
            }
        }
        catch (Exception e)
        {
            Console.WriteLine($"--> Error on publish {message} on '{topic}'. Error: {e.Message}");
        }
    }

    void ReceiveMessage(MqttApplicationMessageReceivedEventArgs e)
    {
        DeviceMessage = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
        Topic = e.ApplicationMessage.Topic;

        Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###");
        Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}");
        Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
        Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}");
        Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}");
        Console.WriteLine();
        StateHasChanged();
    }
    #endregion

    #region Server and User/Password Card

    public class ServerForm
    {
        [Required]
        public string Server { get; set; }
        public string Port { get; set; }
        public string Username { get; set; } = "";
        public string Password { get; set; } = "";
    }

    ServerForm model = new ServerForm();

    Color ConnectionStatus { get; set; }

    string ButtonConnectText { get; set; }

    bool Authenticated { get; set; }

    string PasswordInputIcon { get; set; } = Icons.Material.Filled.VisibilityOff;

    bool ShowPassword { get; set; }

    InputType PasswordInput { get; set; }

    void ShowPasswordButton()
    {
        if (ShowPassword)
        {
            ShowPassword = false;
            PasswordInputIcon = Icons.Material.Filled.VisibilityOff;
            PasswordInput = InputType.Password;
        }
        else
        {
            ShowPassword = true;
            PasswordInputIcon = Icons.Material.Filled.Visibility;
            PasswordInput = InputType.Text;
        }
    }

    #endregion

    #region Page Life Cicle

    protected override Task OnInitializedAsync()
    {
        ButtonConnectText = "Connect";
        Authenticated = false;
        ConnectionStatus = Color.Error;
        ShowPassword = false;
        PasswordInput = InputType.Password;

        var factory = new MqttFactory();
        mqttClient = (ManagedMqttClient)factory.CreateManagedMqttClient();

        return base.OnInitializedAsync();
    }

    async Task ConnectDisconnectButtonClick(string status)
    {
        if (status == "Disconnect")
        {
            await mqttClient.StopAsync();
        }
        else if (status == "Connect")
        {
            await this.Connect();
        }
    }

    #endregion
}

_Imports.razor

// Other namespaces
// ...
@using MudBlazor
@using MQTTnet;
@using MQTTnet.Client;
@using MQTTnet.Client.Connecting;
@using MQTTnet.Client.Disconnecting;
@using MQTTnet.Client.Options;
@using MQTTnet.Client.Publishing;
@using MQTTnet.Client.Subscribing;
@using MQTTnet.Exceptions;
@using MQTTnet.Extensions.ManagedClient;
@using MQTTnet.Packets;
@using MQTTnet.Protocol;
@using Newtonsoft.Json;

@sbroszat
Copy link

sbroszat commented Dec 9, 2021

I have the same bug as fabioGulfMike. I use MQTTnet with Websocket in a Blazor .net5.0 Webassambly app.
Subscribing works without any problems.
But publishing does not work, the queue is getting longer and longer, but no messages are arriving the broker.

When I use the debugger to slowly step through the Publish() code while return back into the Blazor Framework publishing works and the broker receives the messages. So in my opinion there is something wronge with the TryPublishQueuedMessageAsync.
Maybe its never called or the Task never get compute time in this single thread environment.

@iotechFabio
Copy link

@sbroszat, I changed to MQTT client implemented in JavaScript and used the JS.Interop libs to build my project. Faster and more reliable.

@mvanhil
Copy link

mvanhil commented Dec 12, 2021

I have the same issue and came upon this thread

@sbroszat
Copy link

@mvanhil if you mean Blazor: I changed also to the Paho JS MQTT Client with JS.Interop. Thanks for the hint @iotechFabio. Works without problems.

@mvanhil
Copy link

mvanhil commented Dec 23, 2021

I'm now using the default library (not the managed client) and all is working perfectly for now (just publishing).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question It is a question regarding the project
Projects
None yet
Development

No branches or pull requests

6 participants