Skip to content

Commit

Permalink
engine: new signal mechanism for tasks/threads and cleanups
Browse files Browse the repository at this point in the history
This patch implements signaling through the main event loop for the
tasks running under the engine, it allow each Task to indicate when
it finished.

Also a new routine exists for output plugins so when they finish,
they can return an OK or Error value, this is done through the event
loop (note that co-routines cannot return a value).

The changes introduced are affecting how routing is working at the
moment, so it can be considered work in process, some functionality
is broken as multiple destinations.

note: outpug plugins now requries
Signed-off-by: Eduardo Silva <[email protected]>
  • Loading branch information
edsiper committed Jul 22, 2016
1 parent 9c46d0b commit de8d73d
Show file tree
Hide file tree
Showing 13 changed files with 232 additions and 228 deletions.
28 changes: 28 additions & 0 deletions include/fluent-bit/flb_bits.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

/* Fluent Bit
* ==========
* Copyright (C) 2015-2016 Treasure Data Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef FLB_BITS_H
#define FLB_BITS_H

#define FLB_BITS_U64_SET(a, b) ((uint64_t) a << 32 | b)
#define FLB_BITS_U64_HIGH(val) ((uint64_t) val >> 32)
#define FLB_BITS_U64_LOW(val) ((uint64_t) val & 0xffffffff)
#define FLB_BITS_CLEAR(val, n) (val & ~(1 << n))

#endif
3 changes: 3 additions & 0 deletions include/fluent-bit/flb_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include <fluent-bit/flb_info.h>
#include <fluent-bit/flb_log.h>
#include <fluent-bit/flb_engine_task_map.h>

#ifdef FLB_HAVE_TLS
#include <fluent-bit/flb_io_tls.h>
Expand Down Expand Up @@ -111,6 +112,8 @@ struct flb_config {
int buffer_workers;
char *buffer_path;
#endif

struct flb_engine_task_map tasks_map[2048];
};

struct flb_config *flb_config_init();
Expand Down
20 changes: 15 additions & 5 deletions include/fluent-bit/flb_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#define FLB_ENGINE_H

#include <fluent-bit/flb_info.h>
#include <fluent-bit/flb_bits.h>
#include <fluent-bit/flb_config.h>
#include <fluent-bit/flb_input.h>
#include <fluent-bit/flb_output.h>
Expand All @@ -30,11 +31,20 @@
#define FLB_ENGINE_EV_CUSTOM MK_EVENT_CUSTOM
#define FLB_ENGINE_EV_THREAD 1024

/* Engine signals */
#define FLB_ENGINE_STARTED 0x00110aa0 /* Notify Fluent Bit started */
#define FLB_ENGINE_STOP 0xdeadbeef /* Requested to stop Fluent Bit */
#define FLB_ENGINE_SHUTDOWN 0xdead0000 /* Started shutdown phase */
#define FLB_ENGINE_STATS 0xaabbccdd /* Collect stats */
/* Engine events: all engine events set the left 32 bits to '1' */
#define FLB_ENGINE_EV_STARTED FLB_BITS_U64_SET(1, 1) /* Engine started */
#define FLB_ENGINE_EV_STOP FLB_BITS_U64_SET(1, 2) /* Requested to stop */
#define FLB_ENGINE_EV_SHUTDOWN FLB_BITS_U64_SET(1, 3) /* Engine shutdown */
#define FLB_ENGINE_EV_STATS FLB_BITS_U64_SET(1, 4) /* Collect stats */

/* Similar to engine events, but used as return values */
#define FLB_ENGINE_STARTED FLB_BITS_U64_LOW(FLB_ENGINE_EV_STARTED)
#define FLB_ENGINE_STOP FLB_BITS_U64_LOW(FLB_ENGINE_EV_STOP)
#define FLB_ENGINE_SHUTDOWN FLB_BITS_U64_LOW(FLB_ENGINE_EV_SHUTDOWN)
#define FLB_ENGINE_STATS FLB_BITS_U64_LOW(FLB_ENGINE_EV_STATS)

/* Engine signals: Task, it only refer to the type */
#define FLB_ENGINE_TASK 2

int flb_engine_start(struct flb_config *config);
int flb_engine_flush(struct flb_config *config,
Expand Down
33 changes: 31 additions & 2 deletions include/fluent-bit/flb_engine_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,21 @@ struct flb_thread;
#include <fluent-bit/flb_thread.h>
#include <fluent-bit/flb_input.h>

/* Task status */
#define FLB_ENGINE_TASK_NEW 0
#define FLB_ENGINE_TASK_RUNNING 1

/* Task signals */
//#define FLB_ENGINE_TASK_DONE FLB_ENGINE_MASK(FLB_ENGINE_TASK, 1)

struct flb_engine_task_route {
struct flb_output_instance *out;
struct mk_list _head;
};

/* A task takes a buffer and sync input and output instances to handle it */
struct flb_engine_task {
int id; /* task id */
int status; /* new task or running ? */
int deleted; /* should be deleted ? */
int users; /* number of users (threads) */
Expand All @@ -49,13 +54,14 @@ struct flb_engine_task {
struct mk_list threads; /* ref flb_input_instance->tasks */
struct mk_list routes; /* routes to dispatch data */
struct mk_list _head; /* link to input_instance */
struct flb_config *config; /* parent flb config */

#ifdef FLB_HAVE_FLUSH_PTHREADS
pthread_mutex_t mutex_threads;
#endif
};

/* If there is no active users, destroy the task context */
/* If there is no active users, mark the task as ready for destroy */
static inline int flb_engine_task_remove(struct flb_engine_task *task)
{
/* Handle task users */
Expand All @@ -69,13 +75,29 @@ static inline int flb_engine_task_remove(struct flb_engine_task *task)

static inline void flb_engine_task_destroy(struct flb_engine_task *task)
{
struct mk_list *tmp;
struct mk_list *head;
struct flb_engine_task_route *route;

if (task->dt) {
flb_input_dyntag_destroy(task->dt);
}

mk_list_del(&task->_head);
/* Release task_id */
task->config->tasks_map[task->id].id = 0;
task->config->tasks_map[task->id].task = NULL;

/* Remove routes */
mk_list_foreach_safe(head, tmp, &task->routes) {
route = mk_list_entry(head, struct flb_engine_task_route, _head);
mk_list_del(&route->_head);
free(route);
}

/* Unlink and release */
mk_list_del(&task->_head);
free(task->buf);
free(task->tag);
free(task);
}

Expand Down Expand Up @@ -104,4 +126,11 @@ static inline void flb_engine_task_add_thread(struct mk_list *head,
#endif
}

struct flb_engine_task *flb_engine_task_create(char *buf,
size_t size,
struct flb_input_instance *i_ins,
struct flb_input_dyntag *dt,
char *tag,
struct flb_config *config);

#endif
30 changes: 30 additions & 0 deletions include/fluent-bit/flb_engine_task_map.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

/* Fluent Bit
* ==========
* Copyright (C) 2015-2016 Treasure Data Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef FLB_ENGINE_TASK_MAP_H
#define FLB_ENGINE_TASK_MAP_H

#include <inttypes.h>

struct flb_engine_task_map {
uint8_t id;
void *task;
};

#endif
3 changes: 3 additions & 0 deletions include/fluent-bit/flb_macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
#define FLB_FALSE 0
#define FLB_TRUE !FLB_FALSE

#define FLB_OK 1
#define FLB_ERROR 0

#define FLB_INLINE inline __attribute__((always_inline))

#if __GNUC__ >= 4
Expand Down
46 changes: 46 additions & 0 deletions include/fluent-bit/flb_output.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@
#endif

#include <fluent-bit/flb_info.h>
#include <fluent-bit/flb_bits.h>
#include <fluent-bit/flb_io.h>
#include <fluent-bit/flb_stats.h>
#include <fluent-bit/flb_config.h>
#include <fluent-bit/flb_network.h>
#include <fluent-bit/flb_engine.h>
#include <fluent-bit/flb_engine_task.h>

#include <unistd.h>

/* Output plugin masks */
#define FLB_OUTPUT_NET 32 /* output address may set host and port */

Expand Down Expand Up @@ -249,6 +252,49 @@ struct flb_thread *flb_output_thread(struct flb_engine_task *task,
}
#endif

/*
* This function is used by the output plugins to return. It's mandatory
* as it will take care to signal the event loop letting know the flush
* callback has done.
*
* The signal emmited indicate the 'Task' number that have finished plus
* a return value. The return value is either FLB_OK or FLB_ERROR.
*/
static inline int flb_output_return(int ret) {
int n;
uint32_t set;
uint64_t val;
struct flb_thread *th;
struct flb_engine_task *task;

th = (struct flb_thread *) pthread_getspecific(flb_thread_key);
task = th->task;
printf("finishing task=%p\n", task);
/*
* To compose the signal event the relevant info is:
*
* - Unique Task events id: 2 in this case
* - Return value: FLB_OK (0) or FLB_ERROR (1)
* - Task ID
*
* We put together the return value with the task_id on the 32 bits at right
*/
set = ((uint8_t) ret) << 31 | task->id;
val = FLB_BITS_U64_SET(2, set);
printf("check type=%lu ret=%lu task_id=%lu\n",
FLB_BITS_U64_HIGH(val), 1, 1);
n = write(task->config->ch_manager[1], &val, sizeof(uint64_t));
if (n == -1) {
perror("write");
return -1;
}

return 0;
}

#define FLB_OUTPUT_RETURN(x) \
return flb_output_return(x);

struct flb_output_instance *flb_output_new(struct flb_config *config,
char *output, void *data);

Expand Down
29 changes: 2 additions & 27 deletions plugins/out_stdout/stdout.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,40 +48,15 @@ int cb_stdout_flush(void *data, size_t bytes,
(void) out_context;
(void) config;

/* See: in_forward.rb of fluentd.
*
* message Entry {
* 1: long time
* 2: object record
* }
*
* message Forward {
* 1: string tag
* 2: list<Entry> entries
* 3: object option (optional)
* }
*
* message PackedForward {
* 1: string tag
* 2: raw entries # msgpack stream of Entry
* 3: object option (optional)
* }
*
* message Message {
* 1: string tag
* 2: long? time
* 3: object record
* 4: object option (optional)
* }
*/
msgpack_unpacked_init(&result);
while (msgpack_unpack_next(&result, data, bytes, &off)) {
printf("[%zd] %s: ", cnt++, tag);
msgpack_object_print(stdout, result.data);
printf("\n");
}
msgpack_unpacked_destroy(&result);
return bytes;

FLB_OUTPUT_RETURN(FLB_OK);
}

struct flb_output_plugin out_stdout_plugin = {
Expand Down
2 changes: 2 additions & 0 deletions src/flb_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ struct flb_config *flb_config_init()
mk_list_init(&config->inputs);
mk_list_init(&config->outputs);

memset(&config->tasks_map, '\0', sizeof(config->tasks_map));

/* Register plugins */
flb_register_plugins(config);

Expand Down
Loading

0 comments on commit de8d73d

Please sign in to comment.