Skip to content

Commit

Permalink
Simplify code around SYS statistics gathering.
Browse files Browse the repository at this point in the history
  • Loading branch information
ralight committed May 16, 2015
1 parent b598aec commit 8049c4b
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 92 deletions.
51 changes: 18 additions & 33 deletions lib/packet_mosq.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,6 @@ and the Eclipse Distribution License is available at

#ifdef WITH_BROKER
# include "mosquitto_broker.h"
# ifdef WITH_SYS_TREE
extern uint64_t g_bytes_received;
extern uint64_t g_bytes_sent;
extern unsigned long g_msgs_received;
extern unsigned long g_msgs_sent;
extern unsigned long g_pub_msgs_received;
extern unsigned long g_pub_msgs_sent;
# endif
# ifdef WITH_WEBSOCKETS
# include <libwebsockets.h>
# endif
Expand All @@ -39,6 +31,14 @@ and the Eclipse Distribution License is available at
#include "net_mosq.h"
#include "packet_mosq.h"
#include "read_handle.h"
#ifdef WITH_BROKER
# include "sys_tree.h"
#else
# define G_BYTES_RECEIVED_INC(A)
# define G_BYTES_SENT_INC(A)
# define G_MSGS_SENT_INC(A)
# define G_PUB_MSGS_SENT_INC(A)
#endif

void mosquitto__packet_cleanup(struct mosquitto__packet *packet)
{
Expand Down Expand Up @@ -239,9 +239,7 @@ int mosquitto__packet_write(struct mosquitto *mosq)
while(packet->to_process > 0){
write_length = mosquitto__net_write(mosq, &(packet->payload[packet->pos]), packet->to_process);
if(write_length > 0){
#if defined(WITH_BROKER) && defined(WITH_SYS_TREE)
g_bytes_sent += write_length;
#endif
G_BYTES_SENT_INC(write_length);
packet->to_process -= write_length;
packet->pos += write_length;
}else{
Expand All @@ -263,15 +261,10 @@ int mosquitto__packet_write(struct mosquitto *mosq)
}
}

#ifdef WITH_BROKER
# ifdef WITH_SYS_TREE
g_msgs_sent++;
if(((packet->command)&0xF6) == PUBLISH){
g_pub_msgs_sent++;
}
# endif
#else
G_MSGS_SENT_INC(1);
if(((packet->command)&0xF6) == PUBLISH){
G_PUB_MSGS_SENT_INC(1);
#ifndef WITH_BROKER
pthread_mutex_lock(&mosq->callback_mutex);
if(mosq->on_publish){
/* This is a QoS=0 message */
Expand Down Expand Up @@ -315,8 +308,8 @@ int mosquitto__packet_write(struct mosquitto *mosq)
pthread_mutex_unlock(&mosq->callback_mutex);
pthread_mutex_unlock(&mosq->current_out_packet_mutex);
return MOSQ_ERR_SUCCESS;
}
#endif
}

/* Free data and reset values */
pthread_mutex_lock(&mosq->out_packet_mutex);
Expand Down Expand Up @@ -376,9 +369,7 @@ int mosquitto__packet_read(struct mosquitto *mosq)
if(read_length == 1){
mosq->in_packet.command = byte;
#ifdef WITH_BROKER
# ifdef WITH_SYS_TREE
g_bytes_received++;
# endif
G_BYTES_RECEIVED_INC(1);
/* Clients must send CONNECT as their first command. */
if(!(mosq->bridge) && mosq->state == mosq_cs_new && (byte&0xF0) != CONNECT) return MOSQ_ERR_PROTOCOL;
#endif
Expand Down Expand Up @@ -418,9 +409,7 @@ int mosquitto__packet_read(struct mosquitto *mosq)
*/
if(mosq->in_packet.remaining_count < -4) return MOSQ_ERR_PROTOCOL;

#if defined(WITH_BROKER) && defined(WITH_SYS_TREE)
g_bytes_received++;
#endif
G_BYTES_RECEIVED_INC(1);
mosq->in_packet.remaining_length += (byte & 127) * mosq->in_packet.remaining_mult;
mosq->in_packet.remaining_mult *= 128;
}else{
Expand Down Expand Up @@ -453,9 +442,7 @@ int mosquitto__packet_read(struct mosquitto *mosq)
while(mosq->in_packet.to_process>0){
read_length = mosquitto__net_read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process);
if(read_length > 0){
#if defined(WITH_BROKER) && defined(WITH_SYS_TREE)
g_bytes_received += read_length;
#endif
G_BYTES_RECEIVED_INC(read_length);
mosq->in_packet.to_process -= read_length;
mosq->in_packet.pos += read_length;
}else{
Expand Down Expand Up @@ -488,12 +475,10 @@ int mosquitto__packet_read(struct mosquitto *mosq)
/* All data for this packet is read. */
mosq->in_packet.pos = 0;
#ifdef WITH_BROKER
# ifdef WITH_SYS_TREE
g_msgs_received++;
G_MSGS_RECEIVED_INC(1);
if(((mosq->in_packet.command)&0xF5) == PUBLISH){
g_pub_msgs_received++;
G_PUB_MSGS_RECEIVED_INC(1);
}
# endif
rc = mqtt3_packet_handle(db, mosq);
#else
rc = mosquitto__packet_handle(mosq);
Expand Down
16 changes: 6 additions & 10 deletions lib/send_mosq.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ and the Eclipse Distribution License is available at
#include "util_mosq.h"

#ifdef WITH_BROKER
#include "mosquitto_broker.h"
# ifdef WITH_SYS_TREE
extern uint64_t g_pub_bytes_sent;
# endif
# include "mosquitto_broker.h"
# include "sys_tree.h"
#else
# define G_PUB_BYTES_SENT_INC(A)
#endif

int mosquitto__send_pingreq(struct mosquitto *mosq)
Expand Down Expand Up @@ -155,9 +155,7 @@ int mosquitto__send_publish(struct mosquitto *mosq, uint16_t mid, const char *to
mapped_topic = topic_temp;
}
mosquitto__log_printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBLISH to %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", mosq->id, dup, qos, retain, mid, mapped_topic, (long)payloadlen);
#ifdef WITH_SYS_TREE
g_pub_bytes_sent += payloadlen;
#endif
G_PUB_BYTES_SENT_INC(payloadlen);
rc = mosquitto__send_real_publish(mosq, mid, mapped_topic, payloadlen, payload, qos, retain, dup);
mosquitto__free(mapped_topic);
return rc;
Expand All @@ -167,9 +165,7 @@ int mosquitto__send_publish(struct mosquitto *mosq, uint16_t mid, const char *to
}
#endif
mosquitto__log_printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBLISH to %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", mosq->id, dup, qos, retain, mid, topic, (long)payloadlen);
# ifdef WITH_SYS_TREE
g_pub_bytes_sent += payloadlen;
# endif
G_PUB_BYTES_SENT_INC(payloadlen);
#else
mosquitto__log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBLISH (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", mosq->id, dup, qos, retain, mid, topic, (long)payloadlen);
#endif
Expand Down
12 changes: 3 additions & 9 deletions src/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@ and the Eclipse Distribution License is available at
#include "mosquitto_broker.h"
#include "memory_mosq.h"
#include "send_mosq.h"
#include "sys_tree.h"
#include "time_mosq.h"

static int max_inflight = 20;
static int max_queued = 100;
#ifdef WITH_SYS_TREE
extern unsigned long g_msgs_dropped;
#endif

int mqtt3_db_open(struct mqtt3_config *config, struct mosquitto_db *db)
{
Expand Down Expand Up @@ -359,16 +357,12 @@ int mqtt3_db_message_insert(struct mosquitto_db *db, struct mosquitto *context,
"Outgoing messages are being dropped for client %s.",
context->id);
}
#ifdef WITH_SYS_TREE
g_msgs_dropped++;
#endif
G_MSGS_DROPPED_INC();
return 2;
}
}else{
if(max_queued > 0 && context->msg_count12 >= max_queued){
#ifdef WITH_SYS_TREE
g_msgs_dropped++;
#endif
G_MSGS_DROPPED_INC();
if(context->is_dropping == false){
context->is_dropping = true;
mosquitto__log_printf(NULL, MOSQ_LOG_NOTICE,
Expand Down
8 changes: 2 additions & 6 deletions src/loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ and the Eclipse Distribution License is available at
#include "memory_mosq.h"
#include "packet_mosq.h"
#include "send_mosq.h"
#include "sys_tree.h"
#include "time_mosq.h"
#include "util_mosq.h"

Expand All @@ -53,9 +54,6 @@ extern bool flag_db_backup;
#endif
extern bool flag_tree_print;
extern int run;
#ifdef WITH_SYS_TREE
extern int g_clients_expired;
#endif

static void loop_handle_errors(struct mosquitto_db *db, struct pollfd *pollfds);
static void loop_handle_reads_writes(struct mosquitto_db *db, struct pollfd *pollfds);
Expand Down Expand Up @@ -296,9 +294,7 @@ int mosquitto_main_loop(struct mosquitto_db *db, int *listensock, int listensock
id = "<unknown>";
}
mosquitto__log_printf(NULL, MOSQ_LOG_NOTICE, "Expiring persistent client %s due to timeout.", id);
#ifdef WITH_SYS_TREE
g_clients_expired++;
#endif
G_CLIENTS_EXPIRED_INC();
context->clean_session = true;
context->state = mosq_cs_expiring;
do_disconnect(db, context);
Expand Down
8 changes: 2 additions & 6 deletions src/net.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,7 @@ static int tls_ex_index_context = -1;
static int tls_ex_index_listener = -1;
#endif

#ifdef WITH_SYS_TREE
extern unsigned int g_socket_connections;
#endif
#include "sys_tree.h"

int mqtt3_socket_accept(struct mosquitto_db *db, int listensock)
{
Expand All @@ -83,9 +81,7 @@ int mqtt3_socket_accept(struct mosquitto_db *db, int listensock)
new_sock = accept(listensock, NULL, 0);
if(new_sock == INVALID_SOCKET) return -1;

#ifdef WITH_SYS_TREE
g_socket_connections++;
#endif
G_SOCKET_CONNECTIONS_INC();

if(mosquitto__socket_nonblock(new_sock)){
return INVALID_SOCKET;
Expand Down
9 changes: 2 additions & 7 deletions src/read_handle.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,9 @@ and the Eclipse Distribution License is available at
#include "packet_mosq.h"
#include "read_handle.h"
#include "send_mosq.h"
#include "sys_tree.h"
#include "util_mosq.h"

#ifdef WITH_SYS_TREE
extern uint64_t g_pub_bytes_received;
#endif

int mqtt3_packet_handle(struct mosquitto_db *db, struct mosquitto *context)
{
if(!context) return MOSQ_ERR_INVAL;
Expand Down Expand Up @@ -170,9 +167,7 @@ int mqtt3_handle_publish(struct mosquitto_db *db, struct mosquitto *context)
}

payloadlen = context->in_packet.remaining_length - context->in_packet.pos;
#ifdef WITH_SYS_TREE
g_pub_bytes_received += payloadlen;
#endif
G_PUB_BYTES_RECEIVED_INC(payloadlen);
if(context->listener && context->listener->mount_point){
len = strlen(context->listener->mount_point) + strlen(topic) + 1;
topic_mount = mosquitto__malloc(len+1);
Expand Down
11 changes: 3 additions & 8 deletions src/read_handle_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ and the Eclipse Distribution License is available at
#include <stdio.h>
#include <string.h>

#include "config.h>
#include "config.h"

#include "mosquitto_broker.h"
#include "mqtt3_protocol.h"
#include "memory_mosq.h"
#include "packet_mosq.h"
#include "send_mosq.h"
#include "sys_tree.h"
#include "time_mosq.h"
#include "tls_mosq.h"
#include "util_mosq.h"
Expand All @@ -36,10 +37,6 @@ and the Eclipse Distribution License is available at
#include <libwebsockets.h>
#endif

#ifdef WITH_SYS_TREE
extern unsigned int g_connection_count;
#endif

static char *client_id_gen(struct mosquitto_db *db)
{
char *client_id;
Expand Down Expand Up @@ -101,9 +98,7 @@ int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context)
X509_NAME_ENTRY *name_entry;
#endif

#ifdef WITH_SYS_TREE
g_connection_count++;
#endif
G_CONNECTION_COUNT_INC();

/* Don't accept multiple CONNECT commands. */
if(context->state != mosq_cs_new){
Expand Down
18 changes: 5 additions & 13 deletions src/websockets.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,12 @@ POSSIBILITY OF SUCH DAMAGE.
#include "mosquitto_broker.h"
#include "mqtt3_protocol.h"
#include "memory_mosq.h"
#include "sys_tree.h"

#include <stdlib.h>
#include <errno.h>
#include <sys/stat.h>

#ifdef WITH_SYS_TREE
extern uint64_t g_bytes_received;
extern uint64_t g_bytes_sent;
extern unsigned long g_msgs_received;
extern unsigned long g_msgs_sent;
extern unsigned long g_pub_msgs_received;
extern unsigned long g_pub_msgs_sent;
#endif
extern struct mosquitto_db int_db;

static int callback_mqtt(struct libwebsocket_context *context,
Expand Down Expand Up @@ -258,9 +251,7 @@ static int callback_mqtt(struct libwebsocket_context *context,
mosq = u->mosq;
pos = 0;
buf = (uint8_t *)in;
#ifdef WITH_SYS_TREE
g_bytes_received += len;
#endif
G_BYTES_RECEIVED_INC(len);
while(pos < len){
if(!mosq->in_packet.command){
mosq->in_packet.command = buf[pos];
Expand Down Expand Up @@ -315,10 +306,11 @@ static int callback_mqtt(struct libwebsocket_context *context,

/* All data for this packet is read. */
mosq->in_packet.pos = 0;

#ifdef WITH_SYS_TREE
g_msgs_received++;
G_MSGS_RECEIVED_INC();
if(((mosq->in_packet.command)&0xF5) == PUBLISH){
g_pub_msgs_received++;
G_PUB_MSGS_RECEIVED_INC();
}
#endif
rc = mqtt3_packet_handle(db, mosq);
Expand Down

0 comments on commit 8049c4b

Please sign in to comment.