Skip to content

Commit

Permalink
Merge pull request #295 from nokute78/head_escape_str
Browse files Browse the repository at this point in the history
in_head: add lines mode like $ head -n
  • Loading branch information
edsiper committed Jun 21, 2017
2 parents 28004a5 + 37eb78d commit a455d07
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 14 deletions.
10 changes: 10 additions & 0 deletions conf/in_head.conf
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@
# Rename key Default: head
Key head

# Lines
# ====
# Lines to read. If sets, in_head works like 'head -n'
Lines 10

# Split_line
# ====
# If true, in_head splits lines into k-v pairs
Split_line true

[OUTPUT]
Name stdout
Match head.*
194 changes: 180 additions & 14 deletions plugins/in_head/in_head.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,37 +28,101 @@

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>


#include "in_head.h"

/* cb_collect callback */
static int in_head_collect(struct flb_input_instance *i_ins,
struct flb_config *config, void *in_context)
#define BUF_SIZE_MAX 512
static int read_lines(struct flb_in_head_config *head_config)
{
struct flb_in_head_config *head_config = in_context;
int fd = -1;
int ret = -1;
int num_map = 1;
FILE *fp = NULL;
int i;
int index = 0;
int str_len;
char buf[BUF_SIZE_MAX] = {0};

int new_len = 0;
char *tmp;
char *ret_buf;

fp = fopen(head_config->filepath, "r");
if (fp == NULL) {
perror("fopen");
return -1;
}

for(i=0; i<head_config->lines; i++){
ret_buf = fgets(buf, BUF_SIZE_MAX-1, fp);
if (ret_buf == NULL) {
break;
}
str_len = strlen(buf);
if (head_config->buf_size < str_len + index + 1) {
/* buffer full. re-allocate new buffer */
new_len = head_config->buf_size + str_len + 1;
tmp = (char*)flb_malloc(new_len);
if (tmp == NULL) {
flb_error("failed to allocate buffer");
/* try to output partial data */
break;
}
/* copy and release old buffer */
strcpy(tmp, head_config->buf);
flb_free(head_config->buf);

head_config->buf_size = new_len;
head_config->buf = tmp;
}
strncat(&head_config->buf[index], buf, str_len);
head_config->buf_len += str_len;
index += str_len;
}
fclose(fp);
return 0;
}

static int read_bytes(struct flb_in_head_config *head_config)
{
int fd = -1;
/* open at every collect callback */
fd = open(head_config->filepath, O_RDONLY);
if (fd < 0) {
perror("open");
return -1;
}

head_config->buf_len = read(fd, head_config->buf, head_config->buf_size);
flb_trace("%s read_len=%d buf_size=%d", __FUNCTION__,
head_config->buf_len, head_config->buf_size);

close(fd);
if (head_config->buf_len < 0) {
perror("read");
goto collect_fin;
return -1;
}
else {
return 0;
}
}

static int single_value_per_record(struct flb_input_instance *i_ins,
struct flb_in_head_config *head_config)
{
int ret = -1;
int num_map = 1;

head_config->buf[0] = '\0'; /* clear buf */
head_config->buf_len = 0;

if (head_config->lines > 0) {
read_lines(head_config);
}
else {
read_bytes(head_config);
}

flb_trace("%s read_len=%d buf_size=%d", __FUNCTION__,
head_config->buf_len, head_config->buf_size);

if (head_config->add_path == FLB_TRUE) {
num_map++;
Expand Down Expand Up @@ -91,8 +155,92 @@ static int in_head_collect(struct flb_input_instance *i_ins,
flb_input_buf_write_end(i_ins);
flb_stats_update(in_head_plugin.stats_fd, 0, 1);

collect_fin:
close(fd);
return ret;

}

#define KEY_LEN_MAX 32
static int split_lines_per_record(struct flb_input_instance *i_ins,
struct flb_in_head_config *head_config)
{
FILE *fp = NULL;
int i;
size_t str_len;
size_t key_len;
int num_map = head_config->lines;
char *ret_buf;
char key_str[KEY_LEN_MAX] = {0};

fp = fopen(head_config->filepath, "r");
if (fp == NULL) {
perror("fopen");
return -1;
}

if (head_config->add_path == FLB_TRUE) {
num_map++;
}

/* Mark the start of a 'buffer write' operation */
flb_input_buf_write_start(i_ins);

msgpack_pack_array(&i_ins->mp_pck, 2);
flb_pack_time_now(&i_ins->mp_pck);
msgpack_pack_map(&i_ins->mp_pck, num_map);

if (head_config->add_path == FLB_TRUE) {
msgpack_pack_str(&i_ins->mp_pck, 4);
msgpack_pack_str_body(&i_ins->mp_pck, "path", 4);
msgpack_pack_str(&i_ins->mp_pck, head_config->path_len);
msgpack_pack_str_body(&i_ins->mp_pck,
head_config->filepath, head_config->path_len);
}

for(i=0; i<head_config->lines; i++){
ret_buf = fgets(head_config->buf, head_config->buf_size, fp);
if (ret_buf == NULL) {
head_config->buf[0] = '\0';
str_len = 0;
}
else {
str_len = strnlen(head_config->buf, head_config->buf_size-1);
head_config->buf[str_len-1] = '\0';/* chomp str */
}

key_len = snprintf(key_str, KEY_LEN_MAX, "line%d", i);
if (key_len > KEY_LEN_MAX) {
key_len = KEY_LEN_MAX;
}

msgpack_pack_str(&i_ins->mp_pck, key_len);
msgpack_pack_str_body(&i_ins->mp_pck, key_str, key_len);
msgpack_pack_str(&i_ins->mp_pck, str_len);
msgpack_pack_str_body(&i_ins->mp_pck,
head_config->buf, str_len);
}

flb_input_buf_write_end(i_ins);
flb_stats_update(in_head_plugin.stats_fd, 0, 1);

fclose(fp);
return 0;
}


/* cb_collect callback */
static int in_head_collect(struct flb_input_instance *i_ins,
struct flb_config *config, void *in_context)
{
struct flb_in_head_config *head_config = in_context;
int ret = -1;

if (head_config->lines > 0 && head_config->split_line) {
ret = split_lines_per_record(i_ins, head_config);
}
else {
ret = single_value_per_record(i_ins, head_config);
}

return ret;
}

Expand Down Expand Up @@ -146,6 +294,23 @@ static int in_head_config_read(struct flb_in_head_config *head_config,
head_config->interval_nsec = DEFAULT_INTERVAL_NSEC;
}

pval = flb_input_get_property("split_line", in);
if (pval != NULL && flb_utils_bool(pval)) {
head_config->split_line = FLB_TRUE;
head_config->lines = 10;
}
else {
head_config->split_line = FLB_FALSE;
}

pval = flb_input_get_property("lines", in);
if (pval != NULL && atoi(pval) >= 0) {
head_config->lines = atoi(pval);
}
else {
head_config->lines = 0; /* read bytes mode */
}

if (head_config->interval_sec <= 0 && head_config->interval_nsec <= 0) {
/* Illegal settings. Override them. */
head_config->interval_sec = DEFAULT_INTERVAL_SEC;
Expand Down Expand Up @@ -194,6 +359,7 @@ static int in_head_init(struct flb_input_instance *in,
head_config->buf = NULL;
head_config->buf_len = 0;
head_config->add_path = FLB_FALSE;
head_config->lines = 0;

/* Initialize head config */
ret = in_head_config_read(head_config, in);
Expand Down
3 changes: 3 additions & 0 deletions plugins/in_head/in_head.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ struct flb_in_head_config {
char add_path; /* add path mode */
size_t path_len;

int lines; /* line num to read */
int split_line;

int interval_sec;
int interval_nsec;
};
Expand Down

0 comments on commit a455d07

Please sign in to comment.