Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inconsistent timing of messages clocked to the second (1110 ms) #2971

Open
PepeMax opened this issue Jan 3, 2024 · 1 comment
Open

Inconsistent timing of messages clocked to the second (1110 ms) #2971

PepeMax opened this issue Jan 3, 2024 · 1 comment

Comments

@PepeMax
Copy link

PepeMax commented Jan 3, 2024

Hello everyone,

I am encountering a very strange issue when sending messages at a one-second interval. To elaborate, we need to send data every second to an MQTT broker, but we have noticed that we are unable to achieve this one-second interval; there is a deviation of 1110 ms between each message, a deviation observed through the message history in MQTT Explorer.

I tried to find a similar issue in the existing GitHub issues and found this one: link to the issue. I attempted to enable the set_tcp_nodelay option in the config.mk file of my broker, but it had no effect.

Third test with 1000 messages (set_tcp_nodelay=true) (average interval between each message):

Wireshark MOSQUITTO Program (ms) Processing Program (ms)
1110.45213 1000.05901 0.02202

Fourth test with 1000 messages (set_tcp_nodelay=false) (average interval between each message):

Wireshark MOSQUITTO Program (ms) Processing Program (ms)
1110.43184 1000.07039 0.02803

Here is my C code.

#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <time.h>
#include <sys/time.h>
#include <unistd.h>
#include <assert.h>

#include <mosquitto.h>
#include <client/client_shared.h>
#include <errno.h>

void my_connect_callback(struct mosquitto *mosq, void *userdata, int result)
{
    int i;
    // printf("%d\n", result);
    if (!result)
    {
        fprintf(stderr, "Connect ok\n");
        // mosquitto_subscribe(mosq, NULL, "#", 0);
    }
    else
    {
        fprintf(stderr, "Connect failed\n");
    }
}


void my_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str)
{
    printf("%s\n", str);
}

void sleepMilliseconds(unsigned long milliseconds)
{
    struct timespec ts;
    ts.tv_sec = milliseconds / 1000;
    ts.tv_nsec = (milliseconds % 1000) * 1000000;
    nanosleep(&ts, NULL);
}

void print_usage(void)
{
    int major, minor, revision;

    mosquitto_lib_version(&major, &minor, &revision);
    printf("mosquitto_sub is a simple mqtt client that will subscribe to a single topic and print all messages it receives.\n");
    printf("mosquitto_sub running on libmosquitto %d.%d.%d.\n\n", /*VERSION,*/ major, minor, revision);
    printf("Usage: mosquitto_sub [-c] [-h host] [-k keepalive] [-p port] [-q qos] [-R] -t topic ...\n");
    printf("                     [-C msg_count] [-T filter_out]\n");
#ifdef WITH_SRV
    printf("                     [-A bind_address] [-S]\n");
#else
    printf("                     [-A bind_address]\n");
#endif
    printf("                     [-i id] [-I id_prefix]\n");
    printf("                     [-d] [-N] [--quiet] [-v]\n");
    printf("                     [-u username [-P password]]\n");
    printf("                     [--will-topic [--will-payload payload] [--will-qos qos] [--will-retain]]\n");
#ifdef WITH_TLS
    printf("                     [{--cafile file | --capath dir} [--cert file] [--key file]\n");
    printf("                      [--ciphers ciphers] [--insecure]]\n");
#ifdef WITH_TLS_PSK
    printf("                     [--psk hex-key --psk-identity identity [--ciphers ciphers]]\n");
#endif
#endif
#ifdef WITH_SOCKS
    printf("                     [--proxy socks-url]\n");
#endif
    printf("       mosquitto_sub --help\n\n");
    printf(" -A : bind the outgoing socket to this host/ip address. Use to control which interface\n");
    printf("      the client communicates over.\n");
    printf(" -c : disable 'clean session' (store subscription and pending messages when client disconnects).\n");
    printf(" -C : disconnect and exit after receiving the 'msg_count' messages.\n");
    printf(" -d : enable debug messages.\n");
    printf(" -h : mqtt host to connect to. Defaults to localhost.\n");
    printf(" -i : id to use for this client. Defaults to mosquitto_sub_ appended with the process id.\n");
    printf(" -I : define the client id as id_prefix appended with the process id. Useful for when the\n");
    printf("      broker is using the clientid_prefixes option.\n");
    printf(" -k : keep alive in seconds for this client. Defaults to 60.\n");
    printf(" -N : do not add an end of line character when printing the payload.\n");
    printf(" -p : network port to connect to. Defaults to 1883.\n");
    printf(" -P : provide a password (requires MQTT 3.1 broker)\n");
    printf(" -q : quality of service level to use for the subscription. Defaults to 0.\n");
    printf(" -R : do not print stale messages (those with retain set).\n");
#ifdef WITH_SRV
    printf(" -S : use SRV lookups to determine which host to connect to.\n");
#endif
    printf(" -t : mqtt topic to subscribe to. May be repeated multiple times.\n");
    printf(" -T : topic string to filter out of results. May be repeated.\n");
    printf(" -u : provide a username (requires MQTT 3.1 broker)\n");
    printf(" -v : print published messages verbosely.\n");
    printf(" -V : specify the version of the MQTT protocol to use when connecting.\n");
    printf("      Can be mqttv31 or mqttv311. Defaults to mqttv31.\n");
    printf(" --help : display this message.\n");
    printf(" --quiet : don't print error messages.\n");
    printf(" --will-payload : payload for the client Will, which is sent by the broker in case of\n");
    printf("                  unexpected disconnection. If not given and will-topic is set, a zero\n");
    printf("                  length message will be sent.\n");
    printf(" --will-qos : QoS level for the client Will.\n");
    printf(" --will-retain : if given, make the client Will retained.\n");
    printf(" --will-topic : the topic on which to publish the client Will.\n");
#ifdef WITH_TLS
    printf(" --cafile : path to a file containing trusted CA certificates to enable encrypted\n");
    printf("            certificate based communication.\n");
    printf(" --capath : path to a directory containing trusted CA certificates to enable encrypted\n");
    printf("            communication.\n");
    printf(" --cert : client certificate for authentication, if required by server.\n");
    printf(" --key : client private key for authentication, if required by server.\n");
    printf(" --ciphers : openssl compatible list of TLS ciphers to support.\n");
    printf(" --tls-version : TLS protocol version, can be one of tlsv1.2 tlsv1.1 or tlsv1.\n");
    printf("                 Defaults to tlsv1.2 if available.\n");
    printf(" --insecure : do not check that the server certificate hostname matches the remote\n");
    printf("              hostname. Using this option means that you cannot be sure that the\n");
    printf("              remote host is the server you wish to connect to and so is insecure.\n");
    printf("              Do not use this option in a production environment.\n");
#ifdef WITH_TLS_PSK
    printf(" --psk : pre-shared-key in hexadecimal (no leading 0x) to enable TLS-PSK mode.\n");
    printf(" --psk-identity : client identity string for TLS-PSK mode.\n");
#endif
#endif
#ifdef WITH_SOCKS
    printf(" --proxy : SOCKS5 proxy URL of the form:\n");
    printf("           socks5h:https://[username[:password]@]hostname[:port]\n");
    printf("           Only \"none\" and \"username\" authentication is supported.\n");
#endif
    printf("\nSee http:https://mosquitto.org/ for more information.\n\n");
}

#include <time.h>
void timestamp()
{
    struct timeval tv;
    gettimeofday(&tv, NULL);

    double time_in_mill = (tv.tv_sec) * 1000 + (tv.tv_usec) / 1000; // convert tv_sec & tv_usec to millisecond
    printf("%f\n", time_in_mill);
}

int main(int argc, char *argv[])
{
    int rc, i;

    /* les variables Mosquitto */
    char topic[20];
    bool clean_session = true;
    struct mosquitto *mosq = NULL;
    struct mosq_config cfg;

    rc = client_config_load(&cfg, CLIENT_SUB, argc, argv);
    if (rc)
    {
        client_config_cleanup(&cfg);
        if (rc == 2)
        {
            /* --help */
            print_usage();
        }
        else
        {
            fprintf(stderr, "\nUse 'mosquitto_sub --help' to see usage.\n");
        }
        return 1;
    }

    mosquitto_lib_init();

    if (client_id_generate(&cfg))
    {
        return 1;
    }

    mosq = mosquitto_new(cfg.id, true, &cfg);
    if (!mosq)
    {
        switch (errno)
        {
        case ENOMEM:
            if (!cfg.quiet)
                fprintf(stderr, "Error: Out of memory.\n");
            break;
        case EINVAL:
            if (!cfg.quiet)
                fprintf(stderr, "Error: Invalid id and/or clean_session.\n");
            break;
        }
        mosquitto_lib_cleanup();
        return 1;
    }
    if (client_opts_set(mosq, &cfg))
    {
        return 1;
    }

    if (cfg.debug)
    {
        mosquitto_log_callback_set(mosq, my_log_callback);
    }
    mosquitto_connect_callback_set(mosq, my_connect_callback);
    mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);

    rc = client_connect(mosq, &cfg);
    if (rc)
        return rc;

    strcpy(topic, cfg.topics[0]);

    mosquitto_loop_start(mosq);

    for (i = 0; i < 1000; i++)
    {
        timestamp();
        mosquitto_publish(mosq, NULL, topic, strlen(topic), topic, 0, 0);
        timestamp();
        sleepMilliseconds(1000);
    }
}

This is a basic program that prints the timestamp, sends the message "test" to the "/test" topic, then reprints the timestamp and sleeps for 1000 ms. To complete my tests, I also captured network traffic using Wireshark with a filter tcp.port = 1883.

3rd test:

Image
Image
On the second image, please disregard the 4585; it is my machine resetting its clock.

4th test:

Image
Image

image
image
The machine pings are good, and the issue persists even when I try to publish messages to the broker located locally on the machine with the IP address 192.6.1.50.

Do you have any idea why I have these extra 110ms? We have tried with other technologies (Node.js and Node-RED), and we don't encounter any issues; the messages are consistently sent every approximately 1000ms.

I'm not sure if you need more information or configuration files; I will try to provide them as soon as possible.

Thanks to everyone.

@dongqiceo
Copy link

hh,you can fix that

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants