Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use flatbuffers for some messages from Redis. #341

Merged
merged 14 commits into from
Mar 11, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Redo parsing of object table notifications with flatbuffers.
  • Loading branch information
robertnishihara committed Mar 8, 2017
commit 03bfe3c911fa2bdee5d981b7106b6be4df5be953
11 changes: 11 additions & 0 deletions src/common/format/common.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,14 @@ table TaskInfo {
}

root_type TaskInfo;

table SubscribeToNotificationsReply {
// The object ID of the object that the notification is about.
object_id: string;
// The size of the object.
object_size: long;
// The IDs of the managers that contain this object.
manager_ids: [string];
}

root_type SubscribeToNotificationsReply;
43 changes: 25 additions & 18 deletions src/common/redis_module/ray_redis_module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

#include "redis_string.h"

#include "format/common_generated.h"

#include "common_protocol.h"

/**
* Various tables are maintained in redis:
*
Expand Down Expand Up @@ -329,48 +333,51 @@ bool PublishObjectNotification(RedisModuleCtx *ctx,
RedisModuleString *object_id,
RedisModuleString *data_size,
RedisModuleKey *key) {
/* Create a string formatted as "<object id> MANAGERS <size> <manager id1>
* <manager id2> ..." */
flatbuffers::FlatBufferBuilder fbb;

size_t object_id_size;
const char *object_id_str =
RedisModule_StringPtrLen(object_id, &object_id_size);

long long data_size_value;
if (RedisModule_StringToLongLong(data_size, &data_size_value) !=
REDISMODULE_OK) {
return RedisModule_ReplyWithError(ctx, "data_size must be integer");
}

RedisModuleString *manager_list = RedisString_Format(ctx, "%S ", object_id);

/* Append binary data size for this object. */
/* TODO(pcm): Replace by a formatted fix length version of the size. */
RedisModule_StringAppendBuffer(ctx, manager_list,
(const char *) &data_size_value,
sizeof(data_size_value));

RedisModule_StringAppendBuffer(ctx, manager_list, " MANAGERS",
strlen(" MANAGERS"));

std::vector<flatbuffers::Offset<flatbuffers::String>> manager_ids;
CHECK_ERROR(
RedisModule_ZsetFirstInScoreRange(key, REDISMODULE_NEGATIVE_INFINITE,
REDISMODULE_POSITIVE_INFINITE, 1, 1),
"Unable to initialize zset iterator");

/* Loop over the managers in the object table for this object ID. */
do {
RedisModuleString *curr = RedisModule_ZsetRangeCurrentElement(key, NULL);
RedisModule_StringAppendBuffer(ctx, manager_list, " ", 1);
size_t size;
const char *val = RedisModule_StringPtrLen(curr, &size);
RedisModule_StringAppendBuffer(ctx, manager_list, val, size);
manager_ids.push_back(fbb.CreateString(val, size));
} while (RedisModule_ZsetRangeNext(key));

auto message = CreateSubscribeToNotificationsReply(
fbb, fbb.CreateString(object_id_str, object_id_size), data_size_value,
fbb.CreateVector(manager_ids));
fbb.Finish(message);

//FREE SOME STRINGS!!

/* Publish the notification to the clients notification channel.
* TODO(rkn): These notifications could be batched together. */
RedisModuleString *channel_name =
RedisString_Format(ctx, "%s%S", OBJECT_CHANNEL_PREFIX, client_id);

RedisModuleString *payload =
RedisModule_CreateString(ctx, (const char *) fbb.GetBufferPointer(),
fbb.GetSize());

RedisModuleCallReply *reply;
reply = RedisModule_Call(ctx, "PUBLISH", "ss", channel_name, manager_list);
reply = RedisModule_Call(ctx, "PUBLISH", "ss", channel_name, payload);
RedisModule_FreeString(ctx, channel_name);
RedisModule_FreeString(ctx, manager_list);
RedisModule_FreeString(ctx, payload);
if (reply == NULL) {
return false;
}
Expand Down
102 changes: 20 additions & 82 deletions src/common/state/redis.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ extern "C" {
#include "redis.h"
#include "io.h"

#include "format/common_generated.h"

#include "common_protocol.h"

#ifndef _WIN32
/* This function is actually not declared in standard POSIX, so declare it. */
extern int usleep(useconds_t usec);
Expand Down Expand Up @@ -502,81 +506,6 @@ void redis_object_table_lookup_callback(redisAsyncContext *c,
}
}

/**
* This will parse a payload string published on the object notification
* channel. The string must have the format:
*
* <object id> MANAGERS <manager id1> <manager id2> ...
*
* where there may be any positive number of manager IDs.
*
* @param db The db handle.
* @param payload The payload string.
* @param length The length of the string.
* @param manager_count This method will write the number of managers at this
* address.
* @param manager_vector This method will allocate an array of pointers to
* manager addresses and write the address of the array at this address.
* The caller is responsible for freeing this array.
* @return The object ID that the notification is about.
*/
ObjectID parse_subscribe_to_notifications_payload(
DBHandle *db,
char *payload,
int length,
int64_t *data_size,
int *manager_count,
const char ***manager_vector) {
long long data_size_value = 0;
int num_managers = (length - sizeof(ObjectID) - 1 - sizeof(data_size_value) -
1 - strlen("MANAGERS")) /
(1 + sizeof(DBClientID));

int64_t rval = sizeof(ObjectID) + 1 + sizeof(data_size_value) + 1 +
strlen("MANAGERS") + num_managers * (1 + sizeof(DBClientID));

CHECKM(length == rval,
"length mismatch: num_managers = %d, length = %d, rval = %" PRId64,
num_managers, length, rval);
CHECK(num_managers > 0);
ObjectID obj_id;
/* Track our current offset in the payload. */
int offset = 0;
/* Parse the object ID. */
memcpy(&obj_id.id, &payload[offset], sizeof(obj_id.id));
offset += sizeof(obj_id.id);
/* The next part of the payload is a space. */
const char *space_str = " ";
CHECK(memcmp(&payload[offset], space_str, strlen(space_str)) == 0);
offset += strlen(space_str);
/* The next part of the payload is binary data_size. */
memcpy(&data_size_value, &payload[offset], sizeof(data_size_value));
offset += sizeof(data_size_value);
/* The next part of the payload is the string " MANAGERS" with leading ' '. */
const char *managers_str = " MANAGERS";
CHECK(memcmp(&payload[offset], managers_str, strlen(managers_str)) == 0);
offset += strlen(managers_str);
/* Parse the managers. */
const char **managers = (const char **) malloc(num_managers * sizeof(char *));
for (int i = 0; i < num_managers; ++i) {
/* First there is a space. */
CHECK(memcmp(&payload[offset], " ", strlen(" ")) == 0);
offset += strlen(" ");
/* Get the manager ID. */
DBClientID manager_id;
memcpy(&manager_id.id, &payload[offset], sizeof(manager_id.id));
offset += sizeof(manager_id.id);
/* Write the address of the corresponding manager to the returned array. */
redis_get_cached_db_client(db, manager_id, &managers[i]);
}
CHECK(offset == length);
/* Return the manager array and the object ID. */
*manager_count = num_managers;
*manager_vector = managers;
*data_size = data_size_value;
return obj_id;
}

void object_table_redis_subscribe_to_notifications_callback(
redisAsyncContext *c,
void *r,
Expand All @@ -603,13 +532,22 @@ void object_table_redis_subscribe_to_notifications_callback(
message_type->str);

if (strcmp(message_type->str, "message") == 0) {
/* Handle an object notification. */
int64_t data_size = 0;
int manager_count;
const char **manager_vector;
ObjectID obj_id = parse_subscribe_to_notifications_payload(
db, reply->element[2]->str, reply->element[2]->len, &data_size,
&manager_count, &manager_vector);
/* We received an object notification. Parse the payload. */
auto message = flatbuffers::GetRoot<SubscribeToNotificationsReply>(
reply->element[2]->str);
/* Extract the object ID. */
ObjectID obj_id = from_flatbuf(message->object_id());
/* Extract the data size. */
int64_t data_size = message->object_size();
int manager_count = message->manager_ids()->size();
/* Construct the manager vector from the flatbuffers object. */
const char **manager_vector =
(const char **) malloc(manager_count * sizeof(char *));
for (int i = 0; i < manager_count; ++i) {
DBClientID manager_id = from_flatbuf(message->manager_ids()->Get(i));
redis_get_cached_db_client(db, manager_id, &manager_vector[i]);
}

/* Call the subscribe callback. */
ObjectTableSubscribeData *data =
(ObjectTableSubscribeData *) callback_data->data;
Expand Down