Skip to content

Commit

Permalink
Add mosquitto_subscribe_callback().
Browse files Browse the repository at this point in the history
  • Loading branch information
ralight committed Jan 26, 2016
1 parent bd906e6 commit 0a95c9a
Show file tree
Hide file tree
Showing 8 changed files with 275 additions and 47 deletions.
8 changes: 7 additions & 1 deletion examples/subscribe_simple/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,20 @@ include ../../config.mk

.PHONY: all

all : sub_single sub_multiple
all : sub_callback sub_single sub_multiple

sub_callback : callback.o
${CROSS_COMPILE}${CC} $^ -o $@ ../../lib/libmosquitto.so.${SOVERSION}

sub_single : single.o
${CROSS_COMPILE}${CC} $^ -o $@ ../../lib/libmosquitto.so.${SOVERSION}

sub_multiple : multiple.o
${CROSS_COMPILE}${CC} $^ -o $@ ../../lib/libmosquitto.so.${SOVERSION}

callback.o : callback.c ../../lib/libmosquitto.so.${SOVERSION}
${CROSS_COMPILE}${CC} -c $< -o $@ -I../../lib ${CFLAGS}

single.o : single.c ../../lib/libmosquitto.so.${SOVERSION}
${CROSS_COMPILE}${CC} -c $< -o $@ -I../../lib ${CFLAGS}

Expand Down
34 changes: 34 additions & 0 deletions examples/subscribe_simple/callback.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#include <stdlib.h>
#include <stdio.h>
#include "mosquitto.h"

int on_message(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *msg)
{
printf("%s %s (%d)\n", msg->topic, (const char *)msg->payload, msg->payloadlen);
return 0;
}


int main(int argc, char *argv[])
{
int rc;

mosquitto_lib_init();

rc = mosquitto_subscribe_callback(
on_message,
"irc/#", 0, NULL,
"test.mosquitto.org", 1883,
NULL, 60, true,
NULL, NULL,
NULL, NULL);

if(rc){
printf("Error: %s\n", mosquitto_strerror(rc));
}

mosquitto_lib_cleanup();

return rc;
}

45 changes: 45 additions & 0 deletions lib/cpp/mosquittopp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,51 @@ int topic_matches_sub(const char *sub, const char *topic, bool *result)
return mosquitto_topic_matches_sub(sub, topic, result);
}

int subscribe_simple(
struct mosquitto_message **messages,
int msg_count,
const char *topic,
int qos,
bool retained,
const char *host,
int port,
const char *client_id,
int keepalive,
bool clean_session,
const char *username,
const char *password,
const struct libmosquitto_will *will,
const struct libmosquitto_tls *tls)
{
return mosquitto_subscribe_simple(messages, msg_count, topic, qos, retained,
host, port, client_id, keepalive, clean_session,
username, password,
will, tls);
}

mosqpp_EXPORT int subscribe_callback(
int (*callback)(struct mosquitto *, void *, const struct mosquitto_message *),
const char *topic,
int qos,
void *userdata,
bool retained,
const char *host,
int port,
const char *client_id,
int keepalive,
bool clean_session,
const char *username,
const char *password,
const struct libmosquitto_will *will,
const struct libmosquitto_tls *tls)
{
return mosquitto_subscribe_callback(callback, topic, qos, userdata, retained,
host, port, client_id, keepalive, clean_session,
username, password,
will, tls);
}


mosquittopp::mosquittopp(const char *id, bool clean_session)
{
m_mosq = mosquitto_new(id, clean_session, this);
Expand Down
31 changes: 31 additions & 0 deletions lib/cpp/mosquittopp.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,37 @@ mosqpp_EXPORT int lib_version(int *major, int *minor, int *revision);
mosqpp_EXPORT int lib_init();
mosqpp_EXPORT int lib_cleanup();
mosqpp_EXPORT int topic_matches_sub(const char *sub, const char *topic, bool *result);
mosqpp_EXPORT int subscribe_simple(
struct mosquitto_message **messages,
int msg_count,
const char *topic,
int qos=0,
bool retained=true,
const char *host="localhost",
int port=1883,
const char *client_id=NULL,
int keepalive=60,
bool clean_session=true,
const char *username=NULL,
const char *password=NULL,
const struct libmosquitto_will *will=NULL,
const struct libmosquitto_tls *tls=NULL);

mosqpp_EXPORT int subscribe_callback(
int (*callback)(struct mosquitto *, void *, const struct mosquitto_message *),
const char *topic,
int qos=0,
void *userdata=NULL,
bool retained=true,
const char *host="localhost",
int port=1883,
const char *client_id=NULL,
int keepalive=60,
bool clean_session=true,
const char *username=NULL,
const char *password=NULL,
const struct libmosquitto_will *will=NULL,
const struct libmosquitto_tls *tls=NULL);

/*
* Class: mosquittopp
Expand Down
135 changes: 89 additions & 46 deletions lib/helpers.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,55 +20,69 @@ and the Eclipse Distribution License is available at
#include "mosquitto.h"
#include "mosquitto_internal.h"

struct subscribe__userdata {
struct userdata__callback {
const char *topic;
int (*callback)(struct mosquitto *, void *, const struct mosquitto_message *);
void *userdata;
int qos;
int rc;
};

struct userdata__simple {
struct mosquitto_message *messages;
int max_msg_count;
int message_count;
int qos;
int rc;
bool retained;
};


void on_connect(struct mosquitto *mosq, void *obj, int rc)
static void on_connect(struct mosquitto *mosq, void *obj, int rc)
{
struct subscribe__userdata *userdata = obj;
struct userdata__callback *userdata = obj;

mosquitto_subscribe(mosq, NULL, userdata->topic, userdata->qos);
}


void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
static void on_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
{
struct subscribe__userdata *userdata = obj;
int rc;
struct userdata__callback *userdata = obj;

rc = userdata->callback(mosq, userdata->userdata, message);
if(rc){
mosquitto_disconnect(mosq);
}
}

static int on_message_simple(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
{
struct userdata__simple *userdata = obj;
int rc;

if(userdata->max_msg_count == 0){
return;
return 0;
}

/* Don't process stale retained messages if 'retained' was false */
if(!userdata->retained && message->retain){
return;
return 0;
}

userdata->max_msg_count--;

rc = mosquitto_message_copy(&userdata->messages[userdata->message_count], message);
if(rc){
userdata->rc = rc;
mosquitto_disconnect(mosq);
return;
return rc;
}
userdata->message_count++;
if(userdata->max_msg_count == 0){
mosquitto_disconnect(mosq);
}
return 0;
}



libmosq_EXPORT int mosquitto_subscribe_simple(
struct mosquitto_message **messages,
int msg_count,
Expand All @@ -85,8 +99,7 @@ libmosq_EXPORT int mosquitto_subscribe_simple(
const struct libmosquitto_will *will,
const struct libmosquitto_tls *tls)
{
struct mosquitto *mosq;
struct subscribe__userdata userdata;
struct userdata__simple userdata;
int rc;
int i;

Expand All @@ -96,84 +109,114 @@ libmosq_EXPORT int mosquitto_subscribe_simple(

*messages = NULL;

userdata.topic = topic;
userdata.qos = qos;
userdata.max_msg_count = msg_count;
userdata.retained = retained;
userdata.messages = calloc(sizeof(struct mosquitto_message), msg_count);
userdata.rc = 0;
if(!userdata.messages){
return MOSQ_ERR_NOMEM;
}
userdata.message_count = 0;
userdata.max_msg_count = msg_count;
userdata.retained = retained;

mosq = mosquitto_new(client_id, clean_session, &userdata);
if(!mosq){
rc = mosquitto_subscribe_callback(on_message_simple,
topic, qos,
&userdata,
host, port,
client_id, keepalive, clean_session,
username, password,
will, tls);

if(!rc && userdata.max_msg_count == 0){
*messages = userdata.messages;
return MOSQ_ERR_SUCCESS;
}else{
for(i=0; i<msg_count; i++){
mosquitto_message_free_contents(&userdata.messages[i]);
}
free(userdata.messages);
userdata.messages = NULL;
return rc;
}
}

libmosq_EXPORT int mosquitto_subscribe_callback(
int (*callback)(struct mosquitto *, void *, const struct mosquitto_message *),
const char *topic,
int qos,
void *userdata,
const char *host,
int port,
const char *client_id,
int keepalive,
bool clean_session,
const char *username,
const char *password,
const struct libmosquitto_will *will,
const struct libmosquitto_tls *tls)
{
struct mosquitto *mosq;
struct userdata__callback cb_userdata;
int rc;

if(!callback || !topic){
return MOSQ_ERR_INVAL;
}

cb_userdata.topic = topic;
cb_userdata.qos = qos;
cb_userdata.rc = 0;
cb_userdata.userdata = userdata;
cb_userdata.callback = callback;

mosq = mosquitto_new(client_id, clean_session, &cb_userdata);
if(!mosq){
return MOSQ_ERR_NOMEM;
}

if(will){
rc = mosquitto_will_set(mosq, will->topic, will->payloadlen, will->payload, will->qos, will->retain);
if(rc){
free(userdata.messages);
userdata.messages = NULL;
mosquitto_destroy(mosq);
return rc;
}
}
if(username){
rc = mosquitto_username_pw_set(mosq, username, password);
if(rc){
free(userdata.messages);
userdata.messages = NULL;
mosquitto_destroy(mosq);
return rc;
}
}
if(tls){
rc = mosquitto_tls_set(mosq, tls->cafile, tls->capath, tls->certfile, tls->keyfile, tls->pw_callback);
if(rc){
free(userdata.messages);
userdata.messages = NULL;
mosquitto_destroy(mosq);
return rc;
}
rc = mosquitto_tls_opts_set(mosq, tls->cert_reqs, tls->tls_version, tls->ciphers);
if(rc){
free(userdata.messages);
userdata.messages = NULL;
mosquitto_destroy(mosq);
return rc;
}
}

mosquitto_connect_callback_set(mosq, on_connect);
mosquitto_message_callback_set(mosq, on_message);
mosquitto_message_callback_set(mosq, on_message_callback);

rc = mosquitto_connect(mosq, host, port, keepalive);
if(rc){
free(userdata.messages);
userdata.messages = NULL;
mosquitto_destroy(mosq);
return rc;
}
rc = mosquitto_loop_forever(mosq, -1, 1);
mosquitto_destroy(mosq);
if(userdata.rc){
rc = userdata.rc;
}
if(!rc && userdata.max_msg_count == 0){
*messages = userdata.messages;
return MOSQ_ERR_SUCCESS;
}else{
for(i=0; i<msg_count; i++){
mosquitto_message_free_contents(&userdata.messages[i]);
}
free(userdata.messages);
userdata.messages = NULL;
return rc;
if(cb_userdata.rc){
rc = cb_userdata.rc;
}
//if(!rc && cb_userdata.max_msg_count == 0){
//return MOSQ_ERR_SUCCESS;
//}else{
//return rc;
//}
return MOSQ_ERR_SUCCESS;
}

1 change: 1 addition & 0 deletions lib/linker.version
Original file line number Diff line number Diff line change
Expand Up @@ -82,5 +82,6 @@ MOSQ_1.4 {
MOSQ_1.5 {
global:
mosquitto_subscribe_simple;
mosquitto_subscribe_callback;
mosquitto_message_free_contents;
} MOSQ_1.4;
Loading

0 comments on commit 0a95c9a

Please sign in to comment.