Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mosquitto_loop_misc returns MOSQ_ERR_KEEPALIVE, client gets disconnected, server reports socket errors #3013

Open
Shadocko opened this issue Mar 8, 2024 · 1 comment

Comments

@Shadocko
Copy link

Shadocko commented Mar 8, 2024

mosquitto version: 1.6.10
platform: CentOS 7.9

Hello,
I'm trying to use libmosquitto in an application with many open fds.
Due to issue #1299, mosquitto_loop() function cannot be relied upon, so I attempted to implement my own alternative based on epoll(), mosquitto_loop_read(), mosquitto_loop_write() and mosquitto_loop_misc() instead, using answer to #2335 as inspiration.
The code looks like this:

   struct epoll_event ev;
   int epfd = epoll_create1(0);
   if (epfd < 0)
   {
      fprintf (stderr, "epfd < 0: %d\n", epfd);
      exit (-1);
   }

   int sock = mosquitto_socket(mosq);
   ev.data.fd = sock;
   ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
   int err = epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev);
   if (err != 0)
   {
      fprintf (stderr, "err != 0: %d\n", err);
      exit (-1);
   }

   while ( !stopAsked )
   {
      int result;
      if ( mosquitto_want_write(mosq) )
      {
         // Re-arm EPOLLOUT event...
         ev.data.fd = sock;
         ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
         err = epoll_ctl(epfd, EPOLL_CTL_MOD, ev.data.fd, &ev);
      }

      err = epoll_wait(epfd, &ev, 1, MQTT_POLL_INTERVAL_MS);

      if (err < 0)
      {
         fprintf (stderr, "epoll_wait returned %d, errno: %d %s\n", err, errno, strerror(errno));
      }
      else if (err > 0)
      {
         if (ev.events & EPOLLIN)
         {
            result = mosquitto_loop_read(mosq, 1);
            if ( result != MOSQ_ERR_SUCCESS && result != MOSQ_ERR_KEEPALIVE )
            {
               goto __handle_result;
            }
            else if ( result == MOSQ_ERR_KEEPALIVE )
            {
               fprintf (stderr, "mosquitto_loop_read() returned MOSQ_ERR_KEEPALIVE\n");
            }
         }
         if ( (ev.events & EPOLLOUT)
              && mosquitto_want_write(mosq) )
         {
            result = mosquitto_loop_write(mosq, 1);
            if ( result != MOSQ_ERR_SUCCESS && result != MOSQ_ERR_KEEPALIVE )
            {
               goto __handle_result;
            }
            else if ( result == MOSQ_ERR_KEEPALIVE )
            {
               fprintf (stderr, "mosquitto_loop_write() returned MOSQ_ERR_KEEPALIVE\n");
            }
         }
      }

      result = mosquitto_loop_misc(mosq);
      if ( result == MOSQ_ERR_KEEPALIVE )
      {
         fprintf (stderr, "mosquitto_loop_misc() returned MOSQ_ERR_KEEPALIVE\n");
      }

      __handle_result:
      switch (result)
      {
      case MOSQ_ERR_SUCCESS:
      case MOSQ_ERR_KEEPALIVE:
         break;

      case MOSQ_ERR_INVAL:
         fprintf (stderr, "mosquitto_loop: Invalid parameter!\n");
         exit (-1);

      case MOSQ_ERR_NOMEM:
         fprintf (stderr, "mosquitto_loop: Out of memory!\n");
         exit (-1);

      case MOSQ_ERR_NO_CONN:
         fprintf (stderr, "mosquitto_loop: No connection. Reconnecting...\n");
         // Stop watching old socket FD...
         ev.data.fd = sock;
         ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
         epoll_ctl(epfd, EPOLL_CTL_DEL, ev.data.fd, &ev);
         // (Re)connect to broker...
         result = mosquitto_connect(mosq, mqttSubHost, mqttSubPort, MQTT_CONNECT_KEEPALIVE);
         if (result != MOSQ_ERR_SUCCESS)
         {
            fprintf (stderr, "Could not connect to broker: %d (%d/%s)\n", result, errno, strerror(errno));
            exit (-1);
         }
         // Start watching new socket FD...
         ev.data.fd = sock = mosquitto_socket(mosq);
         ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
         err = epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev);
         if (err != 0)
         {
            fprintf (stderr, "Could not watch new socket, err != 0: %d, socket = %d\n", err, sock );
            exit (-1);
         }
         break;

      case MOSQ_ERR_CONN_LOST:
         fprintf (stderr, "mosquitto_loop: Connection lost. Reconnecting...\n");
         // Stop watching old socket FD...
         ev.data.fd = sock;
         ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
         epoll_ctl(epfd, EPOLL_CTL_DEL, ev.data.fd, &ev);
         // Reconnect...
         mosquitto_reconnect(mosq);
         // Start watching new socket FD...
         ev.data.fd = sock = mosquitto_socket(mosq);
         ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
         err = epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev);
         if (err != 0)
         {
            fprintf (stderr, "Could not watch new socket, err != 0: %d, socket = %d\n", err, sock );
            exit (-1);
         }
         break;

      case MOSQ_ERR_PROTOCOL:
         fprintf (stderr, "mosquitto_loop: Protocol error!\n");
         break;

      case MOSQ_ERR_ERRNO:
         fprintf (stderr, "mosquitto_loop: syscall error; %s (%d)\n", strerror(errno), errno);
         break;

      default:
         fprintf (stderr, "mosquitto_loop: unknown error %d!\n", result);
         break;
      }
   }

I'm encountering stability issues, with mosquitto_loop_misc() returning MOSQ_ERR_KEEPALIVE, which is not a documented return value and seems to be returned from mosquitto__check_keepalive(), but I don't know what to do with this result.
My client eventually gets disconnected, with the following log on the broker side:

1709911264: Socket error on client XYZ, disconnecting.

When attempting this with a TLS connection, it gets worse and the client is missing a lot of messages (client is consumer-only with many topic subscriptions, managed from another thread).

Is there something I'm doing wrong, or some way I can work around this issue?
Regards,

@erkoln
Copy link

erkoln commented Mar 11, 2024

Hello,
Here is the complete source code for an epoll-based command-line subscriber exhibiting the issue:
https://gist.github.com/erkoln/90a4efbf7c3f94d8c76a9a355939e2a2
I also tested this on Ubuntu 22.04 using deb packages in version 2.0.11 for both the mosquitto broker and library, as well as mosquitto 2.0.18 built from git repo.
The sample code makes unnecessary use of a thread for servicing the MQTT connection; it doesn't make any differences when calling consumer_loop() directly from main() but that was my attempt testing if it was a thread-related issue.
The problem seems to have something to do with the use of edge-triggered epoll. The epoll fd apparently needs to be be rearmed in more cases but I can't figure out what libmosquitto API I could base that on.
Removing EPOLLET everywhere works but I'm afraid it would cause a waste of CPU cycles.
Any suggestion would be welcome.
Best regards,

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants