Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ping requests not sent when using custom loop handler #2335

Closed
systelomeganebularat opened this issue Oct 4, 2021 · 2 comments
Closed

Ping requests not sent when using custom loop handler #2335

systelomeganebularat opened this issue Oct 4, 2021 · 2 comments
Labels
Component: libmosquitto Status: Completed Nothing further to be done with this issue, it can be closed by the requestor or committer. Type: Question A query or clarification.

Comments

@systelomeganebularat
Copy link

I am developing an MQTT client application with a large number of file descriptors used.
As referenced in GitLab issue #1299 this is causing the MQTT library to return MOSQ_ERR_INVAL errors.

Following an advice in this GitLab issue, I am now trying to use my own event loop, but am having some issues with it. I found the client is not sending the PING command, causing the MQTT server to disconnect the client. Server log shows:

1632236557: Client mosq-dDa3ooaeKWKrdxSdQa has exceeded timeout, disconnecting.

This is confirmed by running a Wireshark capture.

I am reproducing the issue with the sample code below. The on_disconnect callback is called before the message is published.
When replacing my custom loop with a call to mosquito_loop_start, this all works fine.
This is with libmosquitto version 1.6.9, on Ubuntu 20.04

#include <mosquittopp.h>

#include <atomic>
#include <cassert>
#include <iostream>
#include <sys/epoll.h>
#include <thread>

static void on_connect(struct mosquitto*, void*, int rc)
{
	std::cerr << "on_connect " + std::to_string(rc) << std::endl;
}

static void on_disconnect(struct mosquitto*, void*, int rc)
{
	std::cerr << "on_disconnect " + std::to_string(rc) << std::endl;
}

static void on_publish(struct mosquitto*, void*, int mid)
{
	std::cerr << "on_publish " + std::to_string(mid) << std::endl;
}

static void on_message(struct mosquitto*, void*, const mosquitto_message*)
{
	std::cerr << "on_message" << std::endl;
}

static void on_subscribe(struct mosquitto*, void*, int, int, const int *)
{
	std::cerr << "on_subscribe" << std::endl;
}

int main()
{
	auto ret = mosquitto_lib_init();
	assert(ret == MOSQ_ERR_SUCCESS);

	struct mosquitto * mosq = mosquitto_new(NULL, true, NULL);
	assert(mosq != nullptr);

	mosquitto_connect_callback_set(mosq, on_connect);
	mosquitto_disconnect_callback_set(mosq, on_disconnect);
	mosquitto_publish_callback_set(mosq, on_publish);
	mosquitto_message_callback_set(mosq, on_message);
	mosquitto_subscribe_callback_set(mosq, on_subscribe);

	ret = mosquitto_connect(mosq, "127.0.0.1", 1883, 5);
	assert(ret == MOSQ_ERR_SUCCESS);

	/**
	 * Use our own thread with a polling logic.
	 */
	std::atomic_bool ended(false);
	auto worker = std::thread([mosq, &ended]
	{
		/**
		 * Tell the library we are using threads, but not mosquitto_loop_start.
		 */
		mosquitto_threaded_set(mosq, true);

		struct epoll_event ev;
		auto epfd = epoll_create1(0);
		assert(epfd >= 0);

		int sock = mosquitto_socket(mosq);
		ev.data.fd = sock;
		ev.events = EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLRDHUP | EPOLLET;
		auto err = epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev);
		assert(err == 0);

		while (!ended)
		{
			auto err = epoll_wait(epfd, &ev, 1, 250);

			if (err > 0)
			{
				if (ev.events & EPOLLIN)
					mosquitto_loop_read(mosq, 1);
				if (ev.events & EPOLLOUT)
				{
					if (mosquitto_want_write(mosq))
						mosquitto_loop_write(mosq, 1);
				}
			}

			mosquitto_loop_misc(mosq);
		}
	});

	/**
	 * Wait for keepalive * 2 to see if Ping requests are working.
	 */
	std::this_thread::sleep_for(std::chrono::seconds{10});

	std::string msg = "test message";
	ret = mosquitto_publish(mosq, NULL, "test-topic", msg.size(), msg.c_str(), 2, false);
	std::cerr << "publish result=" + std::to_string(ret) << std::endl;

	std::this_thread::sleep_for(std::chrono::milliseconds{500});

	mosquitto_disconnect(mosq);

	ended = true;
	worker.join();

	mosquitto_lib_cleanup();
}
@ralight
Copy link
Contributor

ralight commented Oct 6, 2021

The problem is that mosquitto_loop_misc() checks whether a PINGREQ needs to be sent, and queues it if it is, but doesn't actually write the data - mosquitto_loop_write() does that and the handling of the write events isn't correct.

The code uses EPOLLET. That's ok for the read case because we read until EAGAIN occurs, but in the write case it will cause problems. That's because we cannot guarantee that every write will result in the write buffer becoming full and write() returning EAGAIN. This means that the EPOLLOUT event will not be delivered to us again unless we rearm the event - which is not being done.

Here is a modified version that works:

#include <mosquittopp.h>

#include <atomic>
#include <cassert>
#include <iostream>
#include <sys/epoll.h>
#include <thread>

static void on_connect(struct mosquitto*, void*, int rc)
{
	std::cerr << "on_connect " + std::to_string(rc) << std::endl;
}

static void on_disconnect(struct mosquitto*, void*, int rc)
{
	std::cerr << "on_disconnect " + std::to_string(rc) << std::endl;
}

static void on_publish(struct mosquitto*, void*, int mid)
{
	std::cerr << "on_publish " + std::to_string(mid) << std::endl;
}

static void on_message(struct mosquitto*, void*, const mosquitto_message*)
{
	std::cerr << "on_message" << std::endl;
}

static void on_subscribe(struct mosquitto*, void*, int, int, const int *)
{
	std::cerr << "on_subscribe" << std::endl;
}

int main()
{
	auto ret = mosquitto_lib_init();
	assert(ret == MOSQ_ERR_SUCCESS);

	struct mosquitto * mosq = mosquitto_new(NULL, true, NULL);
	assert(mosq != nullptr);

	mosquitto_connect_callback_set(mosq, on_connect);
	mosquitto_disconnect_callback_set(mosq, on_disconnect);
	mosquitto_publish_callback_set(mosq, on_publish);
	mosquitto_message_callback_set(mosq, on_message);
	mosquitto_subscribe_callback_set(mosq, on_subscribe);

	ret = mosquitto_connect(mosq, "127.0.0.1", 1883, 5);
	assert(ret == MOSQ_ERR_SUCCESS);

	/**
	 * Use our own thread with a polling logic.
	 */
	std::atomic_bool ended(false);
	auto worker = std::thread([mosq, &ended]
	{
		/**
		 * Tell the library we are using threads, but not mosquitto_loop_start.
		 */
		mosquitto_threaded_set(mosq, true);

		struct epoll_event ev;
		auto epfd = epoll_create1(0);
		assert(epfd >= 0);

		int sock = mosquitto_socket(mosq);
		ev.data.fd = sock;
		ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
		auto err = epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev);
		assert(err == 0);

		bool want_write_set = true;

		while (!ended)
		{
			if(mosquitto_want_write(mosq)){
				/* Re arm EPOLLOUT event. */
				ev.data.fd = mosquitto_socket(mosq);
				ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
				err = epoll_ctl(epfd, EPOLL_CTL_MOD, ev.data.fd, &ev);
			}

			auto err = epoll_wait(epfd, &ev, 1, 250);

			if (err > 0)
			{
				if (ev.events & EPOLLIN)
					mosquitto_loop_read(mosq, 1);
				if (ev.events & EPOLLOUT)
				{
					mosquitto_loop_write(mosq, 1);
				}
			}
            mosquitto_loop_misc(mosq);
		}
	});

	/**
	 * Wait for keepalive * 2 to see if Ping requests are working.
	 */
	std::this_thread::sleep_for(std::chrono::seconds{10});

	std::string msg = "test message";
	ret = mosquitto_publish(mosq, NULL, "test-topic", msg.size(), msg.c_str(), 2, false);
	std::cerr << "publish result=" + std::to_string(ret) << std::endl;

	std::this_thread::sleep_for(std::chrono::milliseconds{500});

	mosquitto_disconnect(mosq);

	ended = true;
	worker.join();

	mosquitto_lib_cleanup();
}

@ralight ralight added Component: libmosquitto Status: Completed Nothing further to be done with this issue, it can be closed by the requestor or committer. Type: Question A query or clarification. labels Oct 6, 2021
@systelomeganebularat
Copy link
Author

Thanks a lot for that thorough explanation

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Component: libmosquitto Status: Completed Nothing further to be done with this issue, it can be closed by the requestor or committer. Type: Question A query or clarification.
Projects
None yet
Development

No branches or pull requests

2 participants