Skip to content

Commit

Permalink
Merge pull request #30 from mgdm/add-onpublish-callback
Browse files Browse the repository at this point in the history
Add onPublish callback
  • Loading branch information
mgdm committed Sep 1, 2015
2 parents ee76924 + f5ba54f commit c1fd3ea
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 8 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,6 @@
/tests/**/*.sh

/tests/certs

.idea/
CMakeLists.txt
2 changes: 2 additions & 0 deletions CREDITS
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
libmosquitto PHP binding
Michael Maclean
onPublish callback from Github user @acrazing

22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ This is the actual Mosquitto client.
1. [onLog](#onlog) - set the logging callback
1. [onSubscribe](#onsubscribe) - set the subscribe callback
1. [onMessage](#onmessage) - set the callback fired when a message is received
1. [onPublish](#onpublish) - set the callback fired when a message is published
1. [setMaxInFlightMessages](#setmaxinflightmessages) - set the number of QoS
1 and 2 messages that can be "in flight" at once
1. [setMessageRetry](#setmessageretry) - set the number of seconds to wait
Expand Down Expand Up @@ -296,6 +297,24 @@ The callback should take parameters of the form:
| message | Mosquitto\Message | A Message object containing the message data |


#### onPublish

Set the publish callback, This is called when a message is publish by the client itself.

**Warning**: this may be called before the method `Mosquitto\Client::publish` return the
message id, so, you need to create a queue to deal with the mid list

| Parameter | Type | Description |
| --- | --- | --- |
| callback | callback | The callback |

The callback should take parameters of the form:

| Parameter | Type | Description |
| --- | --- | --- |
| mid | int | the message id returned by `Mosquitto\Client::publish` |


#### setMaxInFlightMessages

Set the number of QoS 1 and 2 messages that can be “in flight” at one time. An
Expand Down Expand Up @@ -329,6 +348,9 @@ Publish a message on a given topic.
| qos | int | Integer value 0, 1 or 2 indicating the QoS for this message |
| retain | boolean | If true, make this message retained |

return `int` the message id in broker system
**Warning** the message id is not unique

#### subscribe

Subscribe to a topic.
Expand Down
65 changes: 65 additions & 0 deletions examples/testOnpublish.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
<?php
class MQ {
public static $publish = array();
public static $receive = array();
public static function addPublish($mid, $msg) {
$msg->id = $mid;
self::$publish[$mid] = $msg;
}

public static function confirm($mid) {
if(array_key_exists($mid, self::$publish)) {
self::$publish[$mid]->state = true;
}
}

public static function addReceive($msg) {
$msg = Message::factory($msg, true);
self::$receive[$msg->id] = $msg;
}
}

class Message {
public $id;
public $state = false;
public $msg;
public static function factory(Mosquitto\Message $msg, $state = false) {
$message = new Message();
$message->state = $state;
$message->msg = $msg;
$message->id = $msg->mid;
return $message;
}
}
$client = new Mosquitto\Client('client.terminal.onpublish', false);

$client->onMessage(function($msg) {
print_r(array('receive', $msg));
MQ::addReceive($msg);
});

$client->onPublish(function($mid) {
MQ::confirm($mid);
print_r(array('comfirm publish', MQ::$publish[$mid]));
});
$client->onConnect(function($rc, $msg) {
print_r(array('rc' => $rc, 'message' => $msg));
});

$client->connect('localhost', 1883, 60);

sleep(1);


$client->subscribe('/test/publish', 1);
$msg = Message::factory(new Mosquitto\Message());
$msg->msg->topic = '/test/publish';
$msg->msg->payload = 'hello from on publish';
$msg->msg->qos = 1;
$mid = $client->publish($msg->msg->topic, $msg->msg->payload, $msg->msg->qos);
print_r(array('publish', $msg));
MQ::addPublish($mid, $msg);

sleep(1);

$client->loopForever();
88 changes: 80 additions & 8 deletions mosquitto.c
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,40 @@ PHP_METHOD(Mosquitto_Client, onMessage)
}
/* }}} */

/* {{{ Mosquitto\Client::onPublish() */
PHP_METHOD(Mosquitto_Client, onPublish)
{
mosquitto_client_object *object;
zend_fcall_info publish_callback = empty_fcall_info;
zend_fcall_info_cache publish_callback_cache = empty_fcall_info_cache;

PHP_MOSQUITTO_ERROR_HANDLING();
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "f!",
&publish_callback, &publish_callback_cache) == FAILURE) {

PHP_MOSQUITTO_RESTORE_ERRORS();
return;
}
PHP_MOSQUITTO_RESTORE_ERRORS();

object = (mosquitto_client_object *) mosquitto_client_object_get(getThis() TSRMLS_CC);

if (!ZEND_FCI_INITIALIZED(publish_callback)) {
zend_throw_exception(mosquitto_ce_exception, "Need a valid callback", 0 TSRMLS_CC);
}

object->publish_callback = publish_callback;
object->publish_callback_cache = publish_callback_cache;
Z_ADDREF_P(publish_callback.function_name);

if (publish_callback.object_ptr != NULL) {
Z_ADDREF_P(publish_callback.object_ptr);
}

mosquitto_publish_callback_set(object->client, php_mosquitto_publish_callback);
}
/* }}} */

/* {{{ Mosquitto\Client::getSocket() */
PHP_METHOD(Mosquitto_Client, getSocket)
{
Expand Down Expand Up @@ -820,12 +854,12 @@ PHP_MOSQUITTO_API void php_mosquitto_exit_loop(mosquitto_client_object *object)

static inline mosquitto_client_object *mosquitto_client_object_get(zval *zobj TSRMLS_DC)
{
mosquitto_client_object *pobj = zend_object_store_get_object(zobj TSRMLS_CC);
mosquitto_client_object *pobj = zend_object_store_get_object(zobj TSRMLS_CC);

if (pobj->client == NULL) {
php_error(E_ERROR, "Internal surface object missing in %s wrapper, you must call parent::__construct in extended classes", Z_OBJCE_P(zobj)->name);
}
return pobj;
if (pobj->client == NULL) {
php_error(E_ERROR, "Internal surface object missing in %s wrapper, you must call parent::__construct in extended classes", Z_OBJCE_P(zobj)->name);
}
return pobj;
}

static void mosquitto_client_object_destroy(void *object TSRMLS_DC)
Expand All @@ -845,6 +879,7 @@ static void mosquitto_client_object_destroy(void *object TSRMLS_DC)
PHP_MOSQUITTO_FREE_CALLBACK(connect);
PHP_MOSQUITTO_FREE_CALLBACK(subscribe);
PHP_MOSQUITTO_FREE_CALLBACK(unsubscribe);
PHP_MOSQUITTO_FREE_CALLBACK(publish);
PHP_MOSQUITTO_FREE_CALLBACK(message);
PHP_MOSQUITTO_FREE_CALLBACK(disconnect);
PHP_MOSQUITTO_FREE_CALLBACK(log);
Expand Down Expand Up @@ -1056,6 +1091,42 @@ PHP_MOSQUITTO_API void php_mosquitto_message_callback(struct mosquitto *mosq, vo
}
}


PHP_MOSQUITTO_API void php_mosquitto_publish_callback(struct mosquitto *mosq, void *client_obj, int mid)
{
mosquitto_client_object *object = (mosquitto_client_object *) client_obj;
zval *retval_ptr = NULL;
zval *mid_zval;
zval **params[1];
#ifdef ZTS
TSRMLS_D = object->TSRMLS_C;
#endif

if (!ZEND_FCI_INITIALIZED(object->publish_callback)) {
return;
}

MAKE_STD_ZVAL(mid_zval);
ZVAL_LONG(mid_zval, mid);
params[0] = &mid_zval;

object->publish_callback.params = params;
object->publish_callback.param_count = 1;
object->publish_callback.retval_ptr_ptr = &retval_ptr;

if (zend_call_function(&object->publish_callback, &object->publish_callback_cache TSRMLS_CC) == FAILURE) {
if (!EG(exception)) {
zend_throw_exception_ex(mosquitto_ce_exception, 0 TSRMLS_CC, "Failed to invoke publish callback %s()", Z_STRVAL_P(object->publish_callback.function_name));
}
}

zval_ptr_dtor(params[0]);

if (retval_ptr != NULL) {
zval_ptr_dtor(&retval_ptr);
}
}

PHP_MOSQUITTO_API void php_mosquitto_subscribe_callback(struct mosquitto *mosq, void *client_obj, int mid, int qos_count, const int *granted_qos)
{
mosquitto_client_object *object = (mosquitto_client_object *) client_obj;
Expand Down Expand Up @@ -1157,6 +1228,7 @@ const zend_function_entry mosquitto_client_methods[] = {
PHP_ME(Mosquitto_Client, onSubscribe, Mosquitto_Client_callback_args, ZEND_ACC_PUBLIC)
PHP_ME(Mosquitto_Client, onUnsubscribe, Mosquitto_Client_callback_args, ZEND_ACC_PUBLIC)
PHP_ME(Mosquitto_Client, onMessage, Mosquitto_Client_callback_args, ZEND_ACC_PUBLIC)
PHP_ME(Mosquitto_Client, onPublish, Mosquitto_Client_callback_args, ZEND_ACC_PUBLIC)
PHP_ME(Mosquitto_Client, getSocket, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Mosquitto_Client, setTlsCertificates, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Mosquitto_Client, setTlsInsecure, NULL, ZEND_ACC_PUBLIC)
Expand Down Expand Up @@ -1266,11 +1338,11 @@ PHP_MINFO_FUNCTION(mosquitto)

php_info_print_table_start();
php_info_print_table_header(2, "Mosquitto support", "enabled");
php_info_print_table_colspan_header(2,
php_info_print_table_colspan_header(2,
#ifdef COMPILE_DL_MOSQUITTO
"Compiled as dynamic module"
"Compiled as dynamic module"
#else
"Compiled as static module"
"Compiled as static module"
#endif
);
php_info_print_table_row(2, "libmosquitto version", tmp);
Expand Down
3 changes: 3 additions & 0 deletions php_mosquitto.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ typedef struct _mosquitto_client_object {
zend_fcall_info_cache unsubscribe_callback_cache;
zend_fcall_info message_callback;
zend_fcall_info_cache message_callback_cache;
zend_fcall_info publish_callback;
zend_fcall_info_cache publish_callback_cache;
zend_fcall_info disconnect_callback;
zend_fcall_info_cache disconnect_callback_cache;
zend_fcall_info log_callback;
Expand Down Expand Up @@ -163,6 +165,7 @@ PHP_MOSQUITTO_API void php_mosquitto_log_callback(struct mosquitto *mosq, void *
PHP_MOSQUITTO_API void php_mosquitto_message_callback(struct mosquitto *mosq, void *client_obj, const struct mosquitto_message *message);
PHP_MOSQUITTO_API void php_mosquitto_subscribe_callback(struct mosquitto *mosq, void *client_obj, int mid, int qos_count, const int *granted_qos);
PHP_MOSQUITTO_API void php_mosquitto_unsubscribe_callback(struct mosquitto *mosq, void *client_obj, int mid);
PHP_MOSQUITTO_API void php_mosquitto_publish_callback(struct mosquitto *mosq, void *client_obj, int mid);
PHP_MOSQUITTO_API void php_mosquitto_disconnect_callback(struct mosquitto *mosq, void *obj, int rc);

PHP_MOSQUITTO_API char *php_mosquitto_strerror_wrapper(int err);
Expand Down

0 comments on commit c1fd3ea

Please sign in to comment.