diff --git a/lib/loop.c b/lib/loop.c index 048b8211ff..f3b75aab26 100644 --- a/lib/loop.c +++ b/lib/loop.c @@ -243,19 +243,28 @@ static int interruptible_sleep(struct mosquitto *mosq, unsigned long reconnect_d } -int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets) +int mosquitto__loop_forever(struct mosquitto *mosq, int timeout, int max_packets, bool explicit_cancellation_points) { int run = 1; int rc; unsigned long reconnect_delay; int state; +#ifndef WIN32 + UNUSED(explicit_cancellation_points); +#endif + if(!mosq) return MOSQ_ERR_INVAL; mosq->reconnects = 0; while(run){ do{ +#ifdef WIN32 + if(explicit_cancellation_points) { + pthread_testcancel(); + } +#endif rc = mosquitto_loop(mosq, timeout, max_packets); }while(run && rc == MOSQ_ERR_SUCCESS); /* Quit after fatal errors. */ @@ -280,6 +289,11 @@ int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets) return rc; } do{ +#ifdef WIN32 + if(explicit_cancellation_points) { + pthread_testcancel(); + } +#endif rc = MOSQ_ERR_SUCCESS; state = mosquitto__get_state(mosq); if(state == mosq_cs_disconnecting || state == mosq_cs_disconnected){ @@ -317,6 +331,12 @@ int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets) } +int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets) +{ + return mosquitto__loop_forever(mosq, timeout, max_packets, false); +} + + int mosquitto_loop_misc(struct mosquitto *mosq) { if(!mosq) return MOSQ_ERR_INVAL; diff --git a/lib/thread_mosq.c b/lib/thread_mosq.c index d32e26face..6c7b136c28 100644 --- a/lib/thread_mosq.c +++ b/lib/thread_mosq.c @@ -77,14 +77,19 @@ int mosquitto_loop_stop(struct mosquitto *mosq, bool force) } #ifdef WITH_THREADING +int mosquitto__loop_forever(struct mosquitto *mosq, int timeout, int max_packets, bool explicit_cancellation_points); + void *mosquitto__thread_main(void *obj) { struct mosquitto *mosq = obj; int state; #ifndef WIN32 + bool explicit_cancellation_points = false; struct timespec ts; ts.tv_sec = 0; ts.tv_nsec = 10000000; +#else + bool explicit_cancellation_points = true; #endif if(!mosq) return NULL; @@ -104,10 +109,10 @@ void *mosquitto__thread_main(void *obj) if(!mosq->keepalive){ /* Sleep for a day if keepalive disabled. */ - mosquitto_loop_forever(mosq, 1000*86400, 1); + mosquitto__loop_forever(mosq, 1000*86400, 1, explicit_cancellation_points); }else{ /* Sleep for our keepalive value. publish() etc. will wake us up. */ - mosquitto_loop_forever(mosq, mosq->keepalive*1000, 1); + mosquitto__loop_forever(mosq, mosq->keepalive*1000, 1, explicit_cancellation_points); } return obj;