Skip to content

Commit

Permalink
Client support for adding properties.
Browse files Browse the repository at this point in the history
  • Loading branch information
ralight committed Oct 31, 2018
1 parent b462115 commit 12cba75
Show file tree
Hide file tree
Showing 30 changed files with 1,101 additions and 75 deletions.
2 changes: 1 addition & 1 deletion client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ include_directories(${mosquitto_SOURCE_DIR} ${mosquitto_SOURCE_DIR}/lib
${STDBOOL_H_PATH} ${STDINT_H_PATH})
link_directories(${mosquitto_BINARY_DIR}/lib)

set(shared_src client_shared.c client_shared.h)
set(shared_src client_shared.c client_shared.h client_props.c)

if (${WITH_SRV} STREQUAL ON)
add_definitions("-DWITH_SRV")
Expand Down
7 changes: 5 additions & 2 deletions client/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ static_pub : pub_client.o client_shared.o ../lib/libmosquitto.a
static_sub : sub_client.o sub_client_output.o client_shared.o ../lib/libmosquitto.a
${CROSS_COMPILE}${CC} $^ -o mosquitto_sub ${CLIENT_LDFLAGS} -lssl -lcrypto -lpthread

mosquitto_pub : pub_client.o client_shared.o
mosquitto_pub : pub_client.o client_shared.o client_props.o
${CROSS_COMPILE}${CC} $^ -o $@ ${CLIENT_LDFLAGS}

mosquitto_sub : sub_client.o sub_client_output.o client_shared.o
mosquitto_sub : sub_client.o sub_client_output.o client_shared.o client_props.o
${CROSS_COMPILE}${CC} $^ -o $@ ${CLIENT_LDFLAGS}

pub_client.o : pub_client.c ../lib/libmosquitto.so.${SOVERSION}
Expand All @@ -32,6 +32,9 @@ sub_client_output.o : sub_client_output.c ../lib/libmosquitto.so.${SOVERSION}
client_shared.o : client_shared.c client_shared.h
${CROSS_COMPILE}${CC} -c $< -o $@ ${CLIENT_CFLAGS}

client_props.o : client_props.c client_shared.h
${CROSS_COMPILE}${CC} -c $< -o $@ ${CLIENT_CFLAGS}

../lib/libmosquitto.so.${SOVERSION} :
$(MAKE) -C ../lib

Expand Down
185 changes: 185 additions & 0 deletions client/client_props.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
Copyright (c) 2018 Roger Light <[email protected]>
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License v1.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
http:https://www.eclipse.org/legal/epl-v10.html
and the Eclipse Distribution License is available at
http:https://www.eclipse.org/org/documents/edl-v10.php.
Contributors:
Roger Light - initial implementation and documentation.
*/

#include "config.h"

#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#ifndef WIN32
#include <unistd.h>
#include <strings.h>
#else
#include <process.h>
#include <winsock2.h>
#define snprintf sprintf_s
#define strncasecmp _strnicmp
#endif

#include "mosquitto.h"
#include "mqtt_protocol.h"
#include "client_shared.h"

enum prop_type
{
PROP_TYPE_BYTE,
PROP_TYPE_INT16,
PROP_TYPE_INT32,
PROP_TYPE_BINARY,
PROP_TYPE_STRING,
PROP_TYPE_STRING_PAIR
};

/* This parses property inputs. It should work for any command type, but is limited at the moment.
*
* Format:
*
* command property value
* command property key value
*
* Example:
*
* publish message-expiry-interval 32
* connect user-property key value
*/

int cfg_parse_property(struct mosq_config *cfg, int argc, char *argv[], int *idx)
{
char *cmdname = NULL, *propname = NULL;
char *key = NULL, *value = NULL;
int cmd, identifier, type;
mosquitto_property **proplist;
int rc;

/* idx now points to "command" */
if((*idx)+2 > argc-1){
/* Not enough args */
fprintf(stderr, "Error: --property argument given but not enough arguments specified.\n\n");
return MOSQ_ERR_INVAL;
}

cmdname = argv[*idx];
if(mosquitto_string_to_command(cmdname, &cmd)){
fprintf(stderr, "Error: Invalid command given in --property argument.\n\n");
return MOSQ_ERR_INVAL;
}

propname = argv[(*idx)+1];
if(mosquitto_string_to_property_info(propname, &identifier, &type)){
fprintf(stderr, "Error: Invalid property name given in --property argument.\n\n");
return MOSQ_ERR_INVAL;
}

if(identifier == MQTT_PROP_USER_PROPERTY){
if((*idx)+3 > argc-1){
/* Not enough args */
fprintf(stderr, "Error: --property argument given but not enough arguments specified.\n\n");
return MOSQ_ERR_INVAL;
}

key = argv[(*idx)+2];
value = argv[(*idx)+3];
(*idx) += 3;
}else{
value = argv[(*idx)+2];
(*idx) += 2;
}


switch(cmd){
case CMD_CONNECT:
proplist = &cfg->connect_props;
break;

case CMD_PUBLISH:
if(identifier == MQTT_PROP_SUBSCRIPTION_IDENTIFIER){
fprintf(stderr, "Error: %s property not supported for %s in --property argument.\n\n", propname, cmdname);
return MOSQ_ERR_INVAL;
}
proplist = &cfg->publish_props;
break;

case CMD_SUBSCRIBE:
if(identifier != MQTT_PROP_USER_PROPERTY){
fprintf(stderr, "Error: %s property not supported for %s in --property argument.\n\n", propname, cmdname);
return MOSQ_ERR_NOT_SUPPORTED;
}
proplist = &cfg->subscribe_props;
break;

case CMD_UNSUBSCRIBE:
proplist = &cfg->subscribe_props;
break;

case CMD_DISCONNECT:
proplist = &cfg->disconnect_props;
break;

case CMD_AUTH:
fprintf(stderr, "Error: %s property not supported for %s in --property argument.\n\n", propname, cmdname);
return MOSQ_ERR_NOT_SUPPORTED;

case CMD_WILL:
proplist = &cfg->will_props;
break;

case CMD_PUBACK:
case CMD_PUBREC:
case CMD_PUBREL:
case CMD_PUBCOMP:
case CMD_SUBACK:
case CMD_UNSUBACK:
fprintf(stderr, "Error: %s property not supported for %s in --property argument.\n\n", propname, cmdname);
return MOSQ_ERR_NOT_SUPPORTED;

default:
return MOSQ_ERR_INVAL;
}

switch(type){
case MQTT_PROP_TYPE_BYTE:
rc = mosquitto_property_add_byte(proplist, identifier, atoi(value));
break;
case MQTT_PROP_TYPE_INT16:
rc = mosquitto_property_add_int16(proplist, identifier, atoi(value));
break;
case MQTT_PROP_TYPE_INT32:
rc = mosquitto_property_add_int32(proplist, identifier, atoi(value));
break;
case MQTT_PROP_TYPE_VARINT:
rc = mosquitto_property_add_varint(proplist, identifier, atoi(value));
break;
case MQTT_PROP_TYPE_BINARY:
rc = mosquitto_property_add_binary(proplist, identifier, value, strlen(value));
break;
case MQTT_PROP_TYPE_STRING:
rc = mosquitto_property_add_string(proplist, identifier, value);
break;
case MQTT_PROP_TYPE_STRING_PAIR:
rc = mosquitto_property_add_string_pair(proplist, identifier, key, value);
break;
default:
return MOSQ_ERR_INVAL;
}
if(rc){
fprintf(stderr, "Error adding property %s %d\n", propname, type);
return rc;
}
return MOSQ_ERR_SUCCESS;
}

16 changes: 14 additions & 2 deletions client/client_shared.c
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,12 @@ void client_config_cleanup(struct mosq_config *cfg)
free(cfg->socks5_username);
free(cfg->socks5_password);
#endif
mosquitto_property_free_all(&cfg->connect_props);
mosquitto_property_free_all(&cfg->publish_props);
mosquitto_property_free_all(&cfg->subscribe_props);
mosquitto_property_free_all(&cfg->unsubscribe_props);
mosquitto_property_free_all(&cfg->disconnect_props);
mosquitto_property_free_all(&cfg->will_props);
}

int client_config_load(struct mosq_config *cfg, int pub_or_sub, int argc, char *argv[])
Expand Down Expand Up @@ -877,6 +883,12 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c
goto unknown_option;
}
cfg->verbose = 1;
}else if(!strcmp(argv[i], "-y") || !strcmp(argv[i], "--property")){
i++;
if(cfg_parse_property(cfg, argc, argv, &i)){
return 1;
}
cfg->protocol_version = MQTT_PROTOCOL_V5;
}else{
goto unknown_option;
}
Expand Down Expand Up @@ -1006,10 +1018,10 @@ int client_connect(struct mosquitto *mosq, struct mosq_config *cfg)
if(cfg->use_srv){
rc = mosquitto_connect_srv(mosq, cfg->host, cfg->keepalive, cfg->bind_address);
}else{
rc = mosquitto_connect_bind(mosq, cfg->host, port, cfg->keepalive, cfg->bind_address);
rc = mosquitto_connect_bind_with_properties(mosq, cfg->host, port, cfg->keepalive, cfg->bind_address, cfg->connect_props);
}
#else
rc = mosquitto_connect_bind(mosq, cfg->host, port, cfg->keepalive, cfg->bind_address);
rc = mosquitto_connect_bind_with_properties(mosq, cfg->host, port, cfg->keepalive, cfg->bind_address, cfg->connect_props);
#endif
if(rc>0){
if(!cfg->quiet){
Expand Down
8 changes: 8 additions & 0 deletions client/client_shared.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ struct mosq_config {
char *socks5_username;
char *socks5_password;
#endif
mosquitto_property *connect_props;
mosquitto_property *publish_props;
mosquitto_property *subscribe_props;
mosquitto_property *unsubscribe_props;
mosquitto_property *disconnect_props;
mosquitto_property *will_props;
};

int client_config_load(struct mosq_config *config, int pub_or_sub, int argc, char *argv[]);
Expand All @@ -100,4 +106,6 @@ int client_opts_set(struct mosquitto *mosq, struct mosq_config *cfg);
int client_id_generate(struct mosq_config *cfg, const char *id_base);
int client_connect(struct mosquitto *mosq, struct mosq_config *cfg);

int cfg_parse_property(struct mosq_config *cfg, int argc, char *argv[], int *idx);

#endif
13 changes: 8 additions & 5 deletions client/pub_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ static char *username = NULL;
static char *password = NULL;
static bool disconnect_sent = false;
static bool quiet = false;
static struct mosq_config cfg;

void my_connect_callback(struct mosquitto *mosq, void *obj, int result)
{
Expand All @@ -64,10 +65,10 @@ void my_connect_callback(struct mosquitto *mosq, void *obj, int result)
case MSGMODE_CMD:
case MSGMODE_FILE:
case MSGMODE_STDIN_FILE:
rc = mosquitto_publish(mosq, &mid_sent, topic, msglen, message, qos, retain);
rc = mosquitto_publish_with_properties(mosq, &mid_sent, topic, msglen, message, qos, retain, cfg.publish_props);
break;
case MSGMODE_NULL:
rc = mosquitto_publish(mosq, &mid_sent, topic, 0, NULL, qos, retain);
rc = mosquitto_publish_with_properties(mosq, &mid_sent, topic, 0, NULL, qos, retain, cfg.publish_props);
break;
case MSGMODE_STDIN_LINE:
status = STATUS_CONNACK_RECVD;
Expand Down Expand Up @@ -230,6 +231,8 @@ void print_usage(void)
#ifdef WITH_SOCKS
printf(" [--proxy socks-url]\n");
#endif
printf(" [--property command identifier value]\n");
printf(" [-y command identifier value]\n");
printf(" mosquitto_pub --help\n\n");
printf(" -A : bind the outgoing socket to this host/ip address. Use to control which interface\n");
printf(" the client communicates over.\n");
Expand Down Expand Up @@ -258,6 +261,7 @@ void print_usage(void)
printf(" -u : provide a username\n");
printf(" -V : specify the version of the MQTT protocol to use when connecting.\n");
printf(" Can be mqttv5, mqttv311 or mqttv31. Defaults to mqttv311.\n");
printf(" -y,--property : Add MQTT v5 properties. See the documentation for more details.\n");
printf(" --help : display this message.\n");
printf(" --quiet : don't print error messages.\n");
printf(" --will-payload : payload for the client Will, which is sent by the broker in case of\n");
Expand Down Expand Up @@ -290,12 +294,11 @@ void print_usage(void)
printf(" socks5h:https://[username[:password]@]hostname[:port]\n");
printf(" Only \"none\" and \"username\" authentication is supported.\n");
#endif
printf("\nSee http:https://mosquitto.org/ for more information.\n\n");
printf("\nSee https:https://mosquitto.org/ for more information.\n\n");
}

int main(int argc, char *argv[])
{
struct mosq_config cfg;
struct mosquitto *mosq = NULL;
int rc;
int rc2;
Expand Down Expand Up @@ -413,7 +416,7 @@ int main(int argc, char *argv[])
buf_len_actual = strlen(buf);
if(buf[buf_len_actual-1] == '\n'){
buf[buf_len_actual-1] = '\0';
rc2 = mosquitto_publish(mosq, &mid_sent, topic, buf_len_actual-1, buf, qos, retain);
rc2 = mosquitto_publish_with_properties(mosq, &mid_sent, topic, buf_len_actual-1, buf, qos, retain, cfg.publish_props);
if(rc2){
if(!quiet) fprintf(stderr, "Error: Publish returned %d, disconnecting.\n", rc2);
mosquitto_disconnect(mosq);
Expand Down
5 changes: 4 additions & 1 deletion client/sub_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ void my_connect_callback(struct mosquitto *mosq, void *obj, int result, int flag
cfg = (struct mosq_config *)obj;

if(!result){
mosquitto_subscribe_multiple(mosq, NULL, cfg->topic_count, cfg->topics, cfg->qos);
mosquitto_subscribe_multiple(mosq, NULL, cfg->topic_count, cfg->topics, cfg->qos, cfg->subscribe_props);

for(i=0; i<cfg->unsub_topic_count; i++){
mosquitto_unsubscribe(mosq, NULL, cfg->unsub_topics[i]);
Expand Down Expand Up @@ -165,6 +165,8 @@ void print_usage(void)
#ifdef WITH_SOCKS
printf(" [--proxy socks-url]\n");
#endif
printf(" [-y command identifier value]\n");
printf(" [--property command identifier value]\n");
printf(" mosquitto_sub --help\n\n");
printf(" -A : bind the outgoing socket to this host/ip address. Use to control which interface\n");
printf(" the client communicates over.\n");
Expand Down Expand Up @@ -198,6 +200,7 @@ void print_usage(void)
#ifndef WIN32
printf(" -W : Specifies a timeout in seconds how long to process incoming MQTT messages.\n");
#endif
printf(" -y, --property : Add MQTT v5 properties. See the documentation for more details.\n");
printf(" --help : display this message.\n");
printf(" --quiet : don't print error messages.\n");
printf(" --retained-only : only handle messages with the retained flag set, and exit when the\n");
Expand Down
Loading

0 comments on commit 12cba75

Please sign in to comment.