Skip to content

Commit

Permalink
input: chunk: fallback to filesystem storage and up/down new interfaces
Browse files Browse the repository at this point in the history
If an input plugin tries to register some data and it already exceeded it
memory buffer limit, this patch checks the plugin storage type, if is 'memory'
the plugin is paused, otherwise if 'filesystem' is used, it will store the data
ine the filesystem and put the data chunk 'down', meaning: keep a reference to
this context of data but remove any mmap() and open file descriptor associated.

Signed-off-by: Eduardo Silva <[email protected]>
  • Loading branch information
edsiper committed Apr 17, 2019
1 parent 0086f53 commit c6e72cb
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 5 deletions.
5 changes: 4 additions & 1 deletion include/fluent-bit/flb_input_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,12 @@ int flb_input_chunk_release_lock(struct flb_input_chunk *ic);
int flb_input_chunk_get_tag(struct flb_input_chunk *ic,
char **tag_buf, int *tag_len);
ssize_t flb_input_chunk_get_size(struct flb_input_chunk *ic);
int flb_input_chunk_set_limits(struct flb_input_instance *in);
size_t flb_input_chunk_set_limits(struct flb_input_instance *in);
size_t flb_input_chunk_total_size(struct flb_input_instance *in);
struct flb_input_chunk *flb_input_chunk_map(struct flb_input_instance *in,
void *chunk);
int flb_input_chunk_set_up_down(struct flb_input_chunk *ic);
int flb_input_chunk_set_up(struct flb_input_chunk *ic);
int flb_input_chunk_down(struct flb_input_chunk *ic);

#endif
102 changes: 98 additions & 4 deletions src/flb_input_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in,
chunk = cio_chunk_open(storage->cio, storage->stream, name,
CIO_OPEN, FLB_INPUT_CHUNK_SIZE);
if (!chunk) {
flb_error("[input chunk] could not create chunk file");
flb_error("[input chunk] could not create chunk file: %s:%s",
storage->stream, name);
return NULL;
}

Expand Down Expand Up @@ -178,6 +179,11 @@ static struct flb_input_chunk *input_chunk_get(char *tag, int tag_len,
continue;
}

if (cio_chunk_is_up(ic->chunk) == CIO_FALSE) {
ic = NULL;
continue;
}

if (cio_meta_cmp(ic->chunk, tag, tag_len) != 0) {
ic = NULL;
continue;
Expand Down Expand Up @@ -222,6 +228,12 @@ size_t flb_input_chunk_total_size(struct flb_input_instance *in)

mk_list_foreach(head, &in->chunks) {
ic = mk_list_entry(head, struct flb_input_chunk, _head);

/* Skip files who are 'down' */
if (cio_chunk_is_up(ic->chunk) == CIO_FALSE) {
continue;
}

bytes = flb_input_chunk_get_size(ic);
if (bytes <= 0) {
continue;
Expand All @@ -236,8 +248,10 @@ size_t flb_input_chunk_total_size(struct flb_input_instance *in)
* Count and update the number of bytes being used by the instance. Also
* check if the instance is paused, if so, check if it can be resumed if
* is not longer over the limits.
*
* It always returns the number of bytes in use.
*/
int flb_input_chunk_set_limits(struct flb_input_instance *in)
size_t flb_input_chunk_set_limits(struct flb_input_instance *in)
{
size_t total;

Expand All @@ -261,7 +275,7 @@ int flb_input_chunk_set_limits(struct flb_input_instance *in)
}
}

return 0;
return total;
}

/*
Expand All @@ -285,6 +299,60 @@ static inline int flb_input_chunk_protect(struct flb_input_instance *i)
return FLB_FALSE;
}

/*
* Validate if the chunk coming from the input plugin basd on config and
* resources usage must be 'up' or 'down' (applicable for filesystem storage
* type).
*
* FIXME: can we find a better name for this function ?
*/
int flb_input_chunk_set_up_down(struct flb_input_chunk *ic)
{
size_t total;
struct flb_input_instance *in;

in = ic->in;

/* Gather total number of enqueued bytes */
total = flb_input_chunk_total_size(in);

/* Register the total into the context variable */
in->mem_chunks_size = total;

if (flb_input_chunk_is_overlimit(in) == FLB_TRUE) {
if (cio_chunk_is_up(ic->chunk) == CIO_TRUE) {
cio_chunk_down(ic->chunk);

/* Adjust new counters */
total = flb_input_chunk_total_size(ic->in);
in->mem_chunks_size = total;

return FLB_FALSE;
}
}

return FLB_TRUE;
}

int flb_input_chunk_down(struct flb_input_chunk *ic)
{
if (cio_chunk_is_up(ic->chunk) == CIO_TRUE) {
return cio_chunk_down(ic->chunk);
}

return 0;
}

int flb_input_chunk_set_up(struct flb_input_chunk *ic)
{
if (cio_chunk_is_up(ic->chunk) == CIO_FALSE) {
return cio_chunk_up(ic->chunk);
}

return 0;
}


/* Append a RAW MessagPack buffer to the input instance */
int flb_input_chunk_append_raw(struct flb_input_instance *in,
char *tag, size_t tag_len,
Expand All @@ -293,6 +361,8 @@ int flb_input_chunk_append_raw(struct flb_input_instance *in,
int ret;
size_t size;
struct flb_input_chunk *ic;
struct flb_storage_input *si;

#ifdef FLB_HAVE_METRICS
int records;
#endif
Expand Down Expand Up @@ -383,8 +453,24 @@ int flb_input_chunk_append_raw(struct flb_input_instance *in,

/* Update memory counters and adjust limits if any */
flb_input_chunk_set_limits(in);
flb_input_chunk_protect(in);

/*
* Check if we are overlimit and validate if is there any filesystem
* storage type asociated to this input instance, if so, unload the
* chunk content from memory to respect imposed limits.
*
* Calling cio_chunk_down() the memory map associated and the file
* descriptor will be released. At any later time, it must be bring up
* for I/O operations.
*/
si = (struct flb_storage_input *) in->storage;
if (flb_input_chunk_is_overlimit(in) == FLB_TRUE &&
si->type == CIO_STORE_FS) {
cio_chunk_down(ic->chunk);
return 0;
}

flb_input_chunk_protect(in);
return 0;
}

Expand All @@ -394,6 +480,14 @@ void *flb_input_chunk_flush(struct flb_input_chunk *ic, size_t *size)
int ret;
char *buf = NULL;

if (cio_chunk_is_up(ic->chunk) == CIO_FALSE) {
ret = cio_chunk_up(ic->chunk);
if (ret == -1) {
flb_error("[input chunk] cannot load chunk content");
return NULL;
}
}

/*
* msgpack-c internal use a raw buffer for it operations, since we
* already appended data we just can take out the references to avoid
Expand Down

0 comments on commit c6e72cb

Please sign in to comment.