Skip to content

Commit

Permalink
pack: add to state last_byte position of processed record
Browse files Browse the repository at this point in the history
Signed-off-by: Eduardo Silva <[email protected]>
  • Loading branch information
edsiper committed Aug 4, 2017
1 parent cc47a10 commit a517899
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 5 deletions.
1 change: 1 addition & 0 deletions include/fluent-bit/flb_pack.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ struct flb_pack_state {
int multiple; /* support multiple jsons? */
int tokens_count; /* number of parsed tokens */
int tokens_size; /* array size of tokens */
int last_byte; /* last byte of a full msg */
jsmntok_t *tokens; /* tokens array */
jsmn_parser parser; /* parser state */
};
Expand Down
22 changes: 17 additions & 5 deletions src/flb_pack.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ static inline int is_float(char *buf, int len)

/* Receive a tokenized JSON message and convert it to MsgPack */
static char *tokens_to_msgpack(char *js,
jsmntok_t *tokens, int arr_size, int *out_size)
jsmntok_t *tokens, int arr_size, int *out_size,
int *last_byte)
{
int i;
int flen;
Expand All @@ -104,6 +105,11 @@ static char *tokens_to_msgpack(char *js,
if (t->start == -1 || t->end == -1 || (t->start == 0 && t->end == 0)) {
break;
}

if (t->parent == -1) {
*last_byte = t->end;
}

flen = (t->end - t->start);

switch (t->type) {
Expand Down Expand Up @@ -183,7 +189,8 @@ int flb_pack_json(char *js, size_t len, char **buffer, int *size)
goto flb_pack_json_end;
}

buf = tokens_to_msgpack(js, state.tokens, state.tokens_count, &out);
int last;
buf = tokens_to_msgpack(js, state.tokens, state.tokens_count, &out, &last);
if (!buf) {
ret = -1;
goto flb_pack_json_end;
Expand All @@ -210,8 +217,9 @@ int flb_pack_state_init(struct flb_pack_state *s)
perror("calloc");
return -1;
}
s->tokens_size = size;
s->tokens_count = 0;
s->tokens_size = size;
s->tokens_count = 0;
s->last_byte = 0;

return 0;
}
Expand All @@ -237,6 +245,7 @@ int flb_pack_json_state(char *js, size_t len,
int ret;
int out;
int delim = 0;
int last = 0;
char *buf;
jsmntok_t *t;

Expand All @@ -257,6 +266,7 @@ int flb_pack_json_state(char *js, size_t len,

for (i = 1; i < state->tokens_size; i++) {
t = &state->tokens[i];

if (t->start < (state->tokens[i - 1]).start) {
break;
}
Expand All @@ -265,6 +275,7 @@ int flb_pack_json_state(char *js, size_t len,
found++;
delim = i;
}

}

if (found > 0) {
Expand All @@ -278,13 +289,14 @@ int flb_pack_json_state(char *js, size_t len,
return ret;
}

buf = tokens_to_msgpack(js, state->tokens, state->tokens_count, &out);
buf = tokens_to_msgpack(js, state->tokens, state->tokens_count, &out, &last);
if (!buf) {
return -1;
}

*size = out;
*buffer = buf;
state->last_byte = last;

return 0;
}
Expand Down

0 comments on commit a517899

Please sign in to comment.