Skip to content

Commit

Permalink
Avoid publishing in the task table unnecessarily. (#416)
Browse files Browse the repository at this point in the history
  • Loading branch information
robertnishihara authored and pcmoritz committed Mar 30, 2017
1 parent 036b873 commit f1b48f2
Showing 1 changed file with 35 additions and 31 deletions.
66 changes: 35 additions & 31 deletions src/common/redis_module/ray_redis_module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "redis_string.h"

#include "format/common_generated.h"
#include "task.h"

#include "common_protocol.h"

Expand Down Expand Up @@ -898,42 +899,45 @@ int TaskTableWrite(RedisModuleCtx *ctx,
}
RedisModule_CloseKey(key);

/* Build the PUBLISH topic and message for task table subscribers. The topic
* is a string in the format "TASK_PREFIX:<local scheduler ID>:<state>". The
* message is a serialized SubscribeToTasksReply flatbuffer object. */
RedisModuleString *publish_topic = RedisString_Format(
ctx, "%s%S:%S", TASK_PREFIX, local_scheduler_id, state);
if (state_value == TASK_STATUS_WAITING ||
state_value == TASK_STATUS_SCHEDULED) {
/* Build the PUBLISH topic and message for task table subscribers. The topic
* is a string in the format "TASK_PREFIX:<local scheduler ID>:<state>". The
* message is a serialized SubscribeToTasksReply flatbuffer object. */
RedisModuleString *publish_topic = RedisString_Format(
ctx, "%s%S:%S", TASK_PREFIX, local_scheduler_id, state);

/* Construct the flatbuffers object for the payload. */
flatbuffers::FlatBufferBuilder fbb;
/* Use the old task spec if the current one is NULL. */
RedisModuleString *task_spec_to_use;
if (task_spec != NULL) {
task_spec_to_use = task_spec;
} else {
task_spec_to_use = existing_task_spec;
}
/* Create the flatbuffers message. */
auto message =
CreateTaskReply(fbb, RedisStringToFlatbuf(fbb, task_id), state_value,
RedisStringToFlatbuf(fbb, local_scheduler_id),
RedisStringToFlatbuf(fbb, task_spec_to_use));
fbb.Finish(message);
/* Construct the flatbuffers object for the payload. */
flatbuffers::FlatBufferBuilder fbb;
/* Use the old task spec if the current one is NULL. */
RedisModuleString *task_spec_to_use;
if (task_spec != NULL) {
task_spec_to_use = task_spec;
} else {
task_spec_to_use = existing_task_spec;
}
/* Create the flatbuffers message. */
auto message =
CreateTaskReply(fbb, RedisStringToFlatbuf(fbb, task_id), state_value,
RedisStringToFlatbuf(fbb, local_scheduler_id),
RedisStringToFlatbuf(fbb, task_spec_to_use));
fbb.Finish(message);

RedisModuleString *publish_message = RedisModule_CreateString(
ctx, (const char *) fbb.GetBufferPointer(), fbb.GetSize());
RedisModuleString *publish_message = RedisModule_CreateString(
ctx, (const char *) fbb.GetBufferPointer(), fbb.GetSize());

RedisModuleCallReply *reply =
RedisModule_Call(ctx, "PUBLISH", "ss", publish_topic, publish_message);
RedisModuleCallReply *reply =
RedisModule_Call(ctx, "PUBLISH", "ss", publish_topic, publish_message);

RedisModule_FreeString(ctx, publish_message);
RedisModule_FreeString(ctx, publish_topic);
if (existing_task_spec != NULL) {
RedisModule_FreeString(ctx, existing_task_spec);
}
RedisModule_FreeString(ctx, publish_message);
RedisModule_FreeString(ctx, publish_topic);
if (existing_task_spec != NULL) {
RedisModule_FreeString(ctx, existing_task_spec);
}

if (reply == NULL) {
return RedisModule_ReplyWithError(ctx, "PUBLISH unsuccessful");
if (reply == NULL) {
return RedisModule_ReplyWithError(ctx, "PUBLISH unsuccessful");
}
}

RedisModule_ReplyWithSimpleString(ctx, "OK");
Expand Down

0 comments on commit f1b48f2

Please sign in to comment.