Skip to content

Commit

Permalink
Add --repeat and --repeat-delay to mosquitto_pub.
Browse files Browse the repository at this point in the history
  • Loading branch information
ralight committed Apr 11, 2019
1 parent 06c059e commit 4995436
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 2 deletions.
2 changes: 2 additions & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ Client features:
session without requiring a message to be received.
- Add --remove-retained to mosquitto_sub, which can be used to clear retained
messages on a broker.
- Add --repeat and --repeat-delay to mosquitto_pub, which can be used to
repeat single message publishes at a regular interval.
- -V now accepts `5, `311`, `31`, as well as `mqttv5` etc.
- Add TLS Engine support.
- Add support for ALPN on TLS connections. Closes #924.
Expand Down
37 changes: 37 additions & 0 deletions client/client_shared.c
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ void init_config(struct mosq_config *cfg, int pub_or_sub)
cfg->keepalive = 60;
cfg->clean_session = true;
cfg->eol = true;
cfg->repeat_count = 1;
cfg->repeat_delay.tv_sec = 0;
cfg->repeat_delay.tv_usec = 0;
if(pub_or_sub == CLIENT_RR){
cfg->protocol_version = MQTT_PROTOCOL_V5;
cfg->msg_count = 1;
Expand Down Expand Up @@ -445,6 +448,7 @@ int cfg_add_topic(struct mosq_config *cfg, int type, char *topic, const char *ar
int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, char *argv[])
{
int i;
float f;

for(i=1; i<argc; i++){
if(!strcmp(argv[i], "-A")){
Expand Down Expand Up @@ -812,6 +816,39 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c
goto unknown_option;
}
cfg->remove_retained = true;
}else if(!strcmp(argv[i], "--repeat")){
if(pub_or_sub != CLIENT_PUB){
goto unknown_option;
}
if(i==argc-1){
fprintf(stderr, "Error: --repeat argument given but no count specified.\n\n");
return 1;
}else{
cfg->repeat_count = atoi(argv[i+1]);
if(cfg->repeat_count < 1){
fprintf(stderr, "Error: --repeat argument must be >0.\n\n");
return 1;
}
}
i++;
}else if(!strcmp(argv[i], "--repeat-delay")){
if(pub_or_sub != CLIENT_PUB){
goto unknown_option;
}
if(i==argc-1){
fprintf(stderr, "Error: --repeat-delay argument given but no time specified.\n\n");
return 1;
}else{
f = atof(argv[i+1]);
if(f < 0.0f){
fprintf(stderr, "Error: --repeat-delay argument must be >=0.0.\n\n");
return 1;
}
f *= 1.0e6;
cfg->repeat_delay.tv_sec = (int)f/1e6;
cfg->repeat_delay.tv_usec = (int)f%1000000;
}
i++;
}else if(!strcmp(argv[i], "--retain-as-published")){
if(pub_or_sub == CLIENT_PUB){
goto unknown_option;
Expand Down
3 changes: 3 additions & 0 deletions client/client_shared.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ and the Eclipse Distribution License is available at
#define CLIENT_CONFIG_H

#include <stdio.h>
#include <sys/time.h>

/* pub_client.c modes */
#define MSGMODE_NONE 0
Expand Down Expand Up @@ -47,6 +48,8 @@ struct mosq_config {
long msglen; /* pub, rr */
char *topic; /* pub, rr */
char *bind_address;
int repeat_count; /* pub */
struct timeval repeat_delay; /* pub */
#ifdef WITH_SRV
bool use_srv;
#endif
Expand Down
63 changes: 61 additions & 2 deletions client/pub_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,35 @@ static char *line_buf = NULL;
static int line_buf_len = 1024;
static bool connected = true;
static bool disconnect_sent = false;
static int publish_count = 0;
static bool ready_for_repeat = false;
static struct timeval next_publish_tv;

static void set_repeat_time(void)
{
gettimeofday(&next_publish_tv, NULL);
next_publish_tv.tv_sec += cfg.repeat_delay.tv_sec;
next_publish_tv.tv_usec += cfg.repeat_delay.tv_usec;

next_publish_tv.tv_sec += next_publish_tv.tv_usec/1e6;
next_publish_tv.tv_usec = next_publish_tv.tv_usec%1000000;
}

static int check_repeat_time(void)
{
struct timeval tv;

gettimeofday(&tv, NULL);

if(tv.tv_sec > next_publish_tv.tv_sec){
return 1;
}else if(tv.tv_sec == next_publish_tv.tv_sec
&& tv.tv_usec > next_publish_tv.tv_usec){

return 1;
}
return 0;
}

void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc, const mosquitto_property *properties)
{
Expand All @@ -57,6 +85,7 @@ void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc, const mos

int my_publish(struct mosquitto *mosq, int *mid, const char *topic, int payloadlen, void *payload, int qos, bool retain)
{
ready_for_repeat = false;
if(cfg.protocol_version == MQTT_PROTOCOL_V5 && cfg.have_topic_alias && first_publish == false){
return mosquitto_publish_v5(mosq, mid, NULL, payloadlen, payload, qos, retain, cfg.publish_props);
}else{
Expand Down Expand Up @@ -134,11 +163,16 @@ void my_publish_callback(struct mosquitto *mosq, void *obj, int mid, int reason_
if(reason_code > 127){
if(!cfg.quiet) fprintf(stderr, "Warning: Publish %d failed: %s.\n", mid, mosquitto_reason_string(reason_code));
}
publish_count++;

if(cfg.pub_mode == MSGMODE_STDIN_LINE){
if(mid == last_mid){
mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props);
disconnect_sent = true;
}
}else if(publish_count < cfg.repeat_count){
ready_for_repeat = true;
set_repeat_time();
}else if(disconnect_sent == false){
mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props);
disconnect_sent = true;
Expand All @@ -165,6 +199,11 @@ int pub_shared_loop(struct mosquitto *mosq)
char *buf2;
int buf_len_actual;
int mode;
int loop_delay = 1000;

if(cfg.repeat_count > 1 && (cfg.repeat_delay.tv_sec == 0 || cfg.repeat_delay.tv_usec != 0)){
loop_delay = cfg.repeat_delay.tv_usec / 2000;
}

mode = cfg.pub_mode;

Expand Down Expand Up @@ -226,7 +265,25 @@ int pub_shared_loop(struct mosquitto *mosq)
}
rc = MOSQ_ERR_SUCCESS;
}else{
rc = mosquitto_loop(mosq, -1, 1);
rc = mosquitto_loop(mosq, loop_delay, 1);
if(ready_for_repeat && check_repeat_time()){
rc = 0;
switch(cfg.pub_mode){
case MSGMODE_CMD:
case MSGMODE_FILE:
case MSGMODE_STDIN_FILE:
rc = my_publish(mosq, &mid_sent, cfg.topic, cfg.msglen, cfg.message, cfg.qos, cfg.retain);
break;
case MSGMODE_NULL:
rc = my_publish(mosq, &mid_sent, cfg.topic, 0, NULL, cfg.qos, cfg.retain);
break;
case MSGMODE_STDIN_LINE:
break;
}
if(rc){
fprintf(stderr, "Error sending repeat publish: %s", mosquitto_strerror(rc));
}
}
}
}while(rc == MOSQ_ERR_SUCCESS && connected);

Expand All @@ -252,7 +309,7 @@ void print_usage(void)
printf("mosquitto_pub version %s running on libmosquitto %d.%d.%d.\n\n", VERSION, major, minor, revision);
printf("Usage: mosquitto_pub {[-h host] [-p port] [-u username [-P password]] -t topic | -L URL}\n");
printf(" {-f file | -l | -n | -m message}\n");
printf(" [-c] [-k keepalive] [-q qos] [-r]\n");
printf(" [-c] [-k keepalive] [-q qos] [-r] [--repeat N] [--repeat-delay time]\n");
#ifdef WITH_SRV
printf(" [-A bind_address] [-S]\n");
#else
Expand Down Expand Up @@ -307,6 +364,8 @@ void print_usage(void)
printf(" -V : specify the version of the MQTT protocol to use when connecting.\n");
printf(" Can be mqttv5, mqttv311 or mqttv31. Defaults to mqttv311.\n");
printf(" --help : display this message.\n");
printf(" --repeat : if publish mode is -f, -m, or -s, then repeat the publish N times.\n");
printf(" --repeat-delay : if using --repeat, wait time seconds between publishes. Defaults to 0.\n");
printf(" --quiet : don't print error messages.\n");
printf(" --will-payload : payload for the client Will, which is sent by the broker in case of\n");
printf(" unexpected disconnection. If not given and will-topic is set, a zero\n");
Expand Down
29 changes: 29 additions & 0 deletions man/mosquitto_pub.1.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
<arg><option>-q</option> <replaceable>message-QoS</replaceable></arg>
<arg><option>--quiet</option></arg>
<arg><option>-r</option></arg>
<arg><option>--repeat</option> <replaceable>count</replaceable></arg>
<arg><option>--repeat-delay</option> <replaceable>seconds</replaceable></arg>
<arg><option>-S</option></arg>
<group choice='req'>
<arg choice='plain'><option>-f</option> <replaceable>file</replaceable></arg>
Expand Down Expand Up @@ -431,6 +433,33 @@
<para>If retain is given, the message will be retained as a "last known good" value on the broker. See <citerefentry><refentrytitle>mqtt</refentrytitle><manvolnum>7</manvolnum></citerefentry> for more information.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>--repeat</option></term>
<listitem>
<para>If the publish mode is<option>-m</option>,
<option>-f</option>, or <option>-s</option> (i.e. the modes
where only a single message is sent), then
<option>--repeat</option> can be used to specify that the
message will be published multiple times.</para>
<para>See also <option>--repeat-delay</option>.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>--repeat-delay</option></term>
<listitem>
<para>If using <option>--repeat</option>, then the default
behaviour is to publish repeated messages as soon as the
previous message is delivered. Use
<option>--repeat-delay</option> to specify the number of
seconds to wait after the previous message was delivered
before publishing the next. Does not need to be an integer
number of seconds.</para>
<para>Note that there is no guarantee as to the actual interval
between messages, this option simply defines the minimum
time from delivery of one message to the start of the
publish of the next.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-s</option></term>
<term><option>--stdin-file</option></term>
Expand Down

0 comments on commit 4995436

Please sign in to comment.