Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use flatbuffers for some messages from Redis. #341

Merged
merged 14 commits into from
Mar 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .travis/install-dependencies.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ fi
if [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "linux" ]]; then
sudo apt-get update
sudo apt-get install -y cmake build-essential autoconf curl libtool python-dev python-numpy python-pip libboost-all-dev unzip
sudo pip install cloudpickle funcsigs colorama psutil redis tensorflow
sudo pip install cloudpickle funcsigs colorama psutil redis tensorflow flatbuffers
elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "linux" ]]; then
sudo apt-get update
sudo apt-get install -y cmake python-dev python-numpy build-essential autoconf curl libtool libboost-all-dev unzip
# Install miniconda.
wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh -O miniconda.sh
bash miniconda.sh -b -p $HOME/miniconda
export PATH="$HOME/miniconda/bin:$PATH"
pip install numpy cloudpickle funcsigs colorama psutil redis tensorflow
pip install numpy cloudpickle funcsigs colorama psutil redis tensorflow flatbuffers
elif [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "macosx" ]]; then
# check that brew is installed
which -s brew
Expand All @@ -41,7 +41,7 @@ elif [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "macosx" ]]; then
fi
brew install cmake automake autoconf libtool boost
sudo easy_install pip
sudo pip install numpy cloudpickle funcsigs colorama psutil redis tensorflow --ignore-installed six
sudo pip install numpy cloudpickle funcsigs colorama psutil redis tensorflow flatbuffers --ignore-installed six
elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "macosx" ]]; then
# check that brew is installed
which -s brew
Expand All @@ -57,7 +57,7 @@ elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "macosx" ]]; then
wget https://repo.continuum.io/miniconda/Miniconda3-latest-MacOSX-x86_64.sh -O miniconda.sh
bash miniconda.sh -b -p $HOME/miniconda
export PATH="$HOME/miniconda/bin:$PATH"
pip install numpy cloudpickle funcsigs colorama psutil redis tensorflow
pip install numpy cloudpickle funcsigs colorama psutil redis tensorflow flatbuffers
elif [[ "$LINT" == "1" ]]; then
sudo apt-get update
sudo apt-get install -y cmake build-essential autoconf curl libtool libboost-all-dev unzip
Expand Down
2 changes: 1 addition & 1 deletion doc/source/install-on-macosx.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ To install Ray, first install the following dependencies. We recommend using
brew update
brew install cmake automake autoconf libtool boost wget

pip install numpy cloudpickle funcsigs colorama psutil redis --ignore-installed six
pip install numpy cloudpickle funcsigs colorama psutil redis flatbuffers --ignore-installed six
```

If you are using Anaconda, you may also need to run the following.
Expand Down
2 changes: 1 addition & 1 deletion doc/source/install-on-ubuntu.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ To install Ray, first install the following dependencies. We recommend using
sudo apt-get update
sudo apt-get install -y cmake build-essential autoconf curl libtool libboost-all-dev unzip python-dev python-pip # If you're using Anaconda, then python-dev and python-pip are unnecessary.

pip install numpy cloudpickle funcsigs colorama psutil redis
pip install numpy cloudpickle funcsigs colorama psutil redis flatbuffers
```

If you are using Anaconda, you may also need to run the following.
Expand Down
1 change: 1 addition & 0 deletions docker/base-deps/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ RUN echo 'export PATH=/opt/conda/bin:$PATH' > /etc/profile.d/conda.sh \
&& rm /tmp/anaconda.sh
ENV PATH "/opt/conda/bin:$PATH"
RUN conda install -y libgcc
RUN pip install flatbuffers
RUN pip install --upgrade pip cloudpickle
86 changes: 59 additions & 27 deletions python/ray/common/redis_module/runtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
import redis
import ray.services

# Import flatbuffer bindings.
from ray.core.generated.SubscribeToNotificationsReply import SubscribeToNotificationsReply
from ray.core.generated.TaskReply import TaskReply

OBJECT_INFO_PREFIX = "OI:"
OBJECT_LOCATION_PREFIX = "OL:"
OBJECT_SUBSCRIBE_PREFIX = "OS:"
Expand Down Expand Up @@ -142,6 +146,15 @@ def testObjectTableAddAndRemove(self):
self.assertEqual(set(response), set())

def testObjectTableSubscribeToNotifications(self):
# Define a helper method for checking the contents of object notifications.
def check_object_notification(notification_message, object_id, object_size, manager_ids):
notification_object = SubscribeToNotificationsReply.GetRootAsSubscribeToNotificationsReply(notification_message, 0)
self.assertEqual(notification_object.ObjectId(), object_id)
self.assertEqual(notification_object.ObjectSize(), object_size)
self.assertEqual(notification_object.ManagerIdsLength(), len(manager_ids))
for i in range(len(manager_ids)):
self.assertEqual(notification_object.ManagerIds(i), manager_ids[i])

data_size = 0xf1f0
p = self.redis.pubsub()
# Subscribe to an object ID.
Expand All @@ -151,24 +164,37 @@ def testObjectTableSubscribeToNotifications(self):
self.assertEqual(get_next_message(p)["data"], 1)
# Request a notification and receive the data.
self.redis.execute_command("RAY.OBJECT_TABLE_REQUEST_NOTIFICATIONS", "manager_id1", "object_id1")
self.assertEqual(get_next_message(p)["data"], b"object_id1 %s MANAGERS manager_id2"\
%integerToAsciiHex(data_size, 8))
# Verify that the notification is correct.
check_object_notification(get_next_message(p)["data"],
b"object_id1",
data_size,
[b"manager_id2"])

# Request a notification for an object that isn't there. Then add the object
# and receive the data. Only the first call to RAY.OBJECT_TABLE_ADD should
# trigger notifications.
self.redis.execute_command("RAY.OBJECT_TABLE_REQUEST_NOTIFICATIONS", "manager_id1", "object_id2", "object_id3")
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id3", data_size, "hash1", "manager_id1")
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id3", data_size, "hash1", "manager_id2")
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id3", data_size, "hash1", "manager_id3")
self.assertEqual(get_next_message(p)["data"], b"object_id3 %s MANAGERS manager_id1"\
%integerToAsciiHex(data_size, 8))
# Verify that the notification is correct.
check_object_notification(get_next_message(p)["data"],
b"object_id3",
data_size,
[b"manager_id1"])
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id2", data_size, "hash1", "manager_id3")
self.assertEqual(get_next_message(p)["data"], b"object_id2 %s MANAGERS manager_id3"\
%integerToAsciiHex(data_size, 8))
# Verify that the notification is correct.
check_object_notification(get_next_message(p)["data"],
b"object_id2",
data_size,
[b"manager_id3"])
# Request notifications for object_id3 again.
self.redis.execute_command("RAY.OBJECT_TABLE_REQUEST_NOTIFICATIONS", "manager_id1", "object_id3")
self.assertEqual(get_next_message(p)["data"], b"object_id3 %s MANAGERS manager_id1 manager_id2 manager_id3"\
%integerToAsciiHex(data_size, 8))
# Verify that the notification is correct.
check_object_notification(get_next_message(p)["data"],
b"object_id3",
data_size,
[b"manager_id1", b"manager_id2", b"manager_id3"])

def testResultTableAddAndLookup(self):
# Try looking up something in the result table before anything is added.
Expand Down Expand Up @@ -205,10 +231,6 @@ def testInvalidTaskTableAdd(self):
# Non-integer scheduling states should not be added.
self.redis.execute_command("RAY.TASK_TABLE_ADD", "task_id",
"invalid_state", "node_id", "task_spec")
with self.assertRaises(redis.ResponseError):
# Scheduling states with invalid width should not be added.
self.redis.execute_command("RAY.TASK_TABLE_ADD", "task_id", 101,
"node_id", "task_spec")
with self.assertRaises(redis.ResponseError):
# Should not be able to update a non-existent task.
self.redis.execute_command("RAY.TASK_TABLE_UPDATE", "task_id", 10,
Expand All @@ -219,39 +241,46 @@ def testTaskTableAddAndLookup(self):
TASK_STATUS_SCHEDULED = 2
TASK_STATUS_QUEUED = 4

def check_task_reply(message, task_args):
task_status, local_scheduler_id, task_spec = task_args
task_reply_object = TaskReply.GetRootAsTaskReply(message, 0)
self.assertEqual(task_reply_object.State(), task_status)
self.assertEqual(task_reply_object.LocalSchedulerId(), local_scheduler_id)
self.assertEqual(task_reply_object.TaskSpec(), task_spec)

# Check that task table adds, updates, and lookups work correctly.
task_args = [TASK_STATUS_WAITING, b"node_id", b"task_spec"]
response = self.redis.execute_command("RAY.TASK_TABLE_ADD", "task_id",
*task_args)
response = self.redis.execute_command("RAY.TASK_TABLE_GET", "task_id")
self.assertEqual(response, task_args)
check_task_reply(response, task_args)

task_args[0] = TASK_STATUS_SCHEDULED
self.redis.execute_command("RAY.TASK_TABLE_UPDATE", "task_id", *task_args[:2])
response = self.redis.execute_command("RAY.TASK_TABLE_GET", "task_id")
self.assertEqual(response, task_args)
check_task_reply(response, task_args)

# If the current value, test value, and set value are all the same, the
# update happens, and the response is still the same task.
task_args = [task_args[0]] + task_args
response = self.redis.execute_command("RAY.TASK_TABLE_TEST_AND_UPDATE",
"task_id",
*task_args[:3])
self.assertEqual(response, task_args[1:])
check_task_reply(response, task_args[1:])
# Check that the task entry is still the same.
get_response = self.redis.execute_command("RAY.TASK_TABLE_GET", "task_id")
self.assertEqual(get_response, task_args[1:])
check_task_reply(get_response, task_args[1:])

# If the current value is the same as the test value, and the set value is
# different, the update happens, and the response is the entire task.
task_args[1] = TASK_STATUS_QUEUED
response = self.redis.execute_command("RAY.TASK_TABLE_TEST_AND_UPDATE",
"task_id",
*task_args[:3])
self.assertEqual(response, task_args[1:])
check_task_reply(response, task_args[1:])
# Check that the update happened.
get_response = self.redis.execute_command("RAY.TASK_TABLE_GET", "task_id")
self.assertEqual(get_response, task_args[1:])
check_task_reply(get_response, task_args[1:])

# If the current value is no longer the same as the test value, the
# response is nil.
Expand All @@ -271,7 +300,7 @@ def testTaskTableAddAndLookup(self):
response = self.redis.execute_command("RAY.TASK_TABLE_TEST_AND_UPDATE",
"task_id",
*task_args[:3])
self.assertEqual(response, task_args[1:])
check_task_reply(response, task_args[1:])

# If the test value is a bitmask that does not match the current value, the
# update does not happen.
Expand All @@ -288,24 +317,27 @@ def testTaskTableAddAndLookup(self):

def testTaskTableSubscribe(self):
scheduling_state = 1
node_id = "node_id"
local_scheduler_id = "local_scheduler_id"
# Subscribe to the task table.
p = self.redis.pubsub()
p.psubscribe("{prefix}*:*".format(prefix=TASK_PREFIX))
p.psubscribe("{prefix}*:{state: >2}".format(prefix=TASK_PREFIX, state=scheduling_state))
p.psubscribe("{prefix}{node}:*".format(prefix=TASK_PREFIX, node=node_id))
task_args = [b"task_id", scheduling_state, node_id.encode("ascii"), b"task_spec"]
p.psubscribe("{prefix}*:{state}".format(prefix=TASK_PREFIX, state=scheduling_state))
p.psubscribe("{prefix}{local_scheduler_id}:*".format(prefix=TASK_PREFIX, local_scheduler_id=local_scheduler_id))
task_args = [b"task_id", scheduling_state, local_scheduler_id.encode("ascii"), b"task_spec"]
self.redis.execute_command("RAY.TASK_TABLE_ADD", *task_args)
# Receive the acknowledgement message.
self.assertEqual(get_next_message(p)["data"], 1)
self.assertEqual(get_next_message(p)["data"], 2)
self.assertEqual(get_next_message(p)["data"], 3)
# Receive the actual data.
for i in range(3):
message = get_next_message(p)["data"]
message = message.split()
message[1] = int(message[1])
self.assertEqual(message, task_args)
message = get_next_message(p)["data"]
# Check that the notification object is correct.
notification_object = TaskReply.GetRootAsTaskReply(message, 0)
self.assertEqual(notification_object.TaskId(), b"task_id")
self.assertEqual(notification_object.State(), scheduling_state)
self.assertEqual(notification_object.LocalSchedulerId(), local_scheduler_id.encode("ascii"))
self.assertEqual(notification_object.TaskSpec(), b"task_spec")

if __name__ == "__main__":
unittest.main(verbosity=2)
Empty file.
2 changes: 1 addition & 1 deletion python/ray/global_scheduler/test/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
TASK_STATUS_RUNNING = 8
TASK_STATUS_DONE = 16

# These constants are an implementation detail of ray_redis_module.c, so this
# These constants are an implementation detail of ray_redis_module.cc, so this
# must be kept in sync with that file.
DB_CLIENT_PREFIX = "CL:"
TASK_PREFIX = "TT:"
Expand Down
19 changes: 10 additions & 9 deletions python/ray/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@
from ray.services import get_ip_address
from ray.services import get_port

# Import flatbuffer bindings.
from ray.core.generated.SubscribeToDBClientTableReply import SubscribeToDBClientTableReply

# These variables must be kept in sync with the C codebase.
# common/common.h
DB_CLIENT_ID_SIZE = 20
NIL_ID = b"\xff" * DB_CLIENT_ID_SIZE
# common/task.h
TASK_STATUS_LOST = 32
# common/redis_module/ray_redis_module.c
# common/redis_module/ray_redis_module.cc
TASK_PREFIX = "TT:"
DB_CLIENT_PREFIX = "CL:"
DB_CLIENT_TABLE_NAME = b"db_clients"
Expand Down Expand Up @@ -89,14 +92,12 @@ def read_message(self):

# Parse the message.
data = message["data"]
db_client_id = data[:DB_CLIENT_ID_SIZE]
data = data[DB_CLIENT_ID_SIZE + 1:]
data = data.split(b" ")
client_type, auxiliary_address, is_insertion = data
is_insertion = int(is_insertion)
if is_insertion != 1 and is_insertion != 0:
raise Exception("Expected 0 or 1 for insertion field, got {} instead".format(is_insertion))
is_insertion = bool(is_insertion)

notification_object = SubscribeToDBClientTableReply.GetRootAsSubscribeToDBClientTableReply(data, 0)
db_client_id = notification_object.DbClientId()
client_type = notification_object.ClientType()
auxiliary_address = notification_object.AuxAddress()
is_insertion = notification_object.IsInsertion()

return db_client_id, client_type, auxiliary_address, is_insertion

Expand Down
2 changes: 1 addition & 1 deletion python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,7 @@ def get_address_info_from_redis_helper(redis_address, node_ip_address):
# must have run "CONFIG SET protected-mode no".
redis_client = redis.StrictRedis(host=redis_ip_address, port=int(redis_port))
# The client table prefix must be kept in sync with the file
# "src/common/redis_module/ray_redis_module.c" where it is defined.
# "src/common/redis_module/ray_redis_module.cc" where it is defined.
REDIS_CLIENT_TABLE_PREFIX = "CL:"
client_keys = redis_client.keys("{}*".format(REDIS_CLIENT_TABLE_PREFIX))
# Filter to clients on the same node and do some basic checking.
Expand Down
2 changes: 1 addition & 1 deletion scripts/start_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def check_no_existing_redis_clients(node_ip_address, redis_address):
redis_ip_address, redis_port = redis_address.split(":")
redis_client = redis.StrictRedis(host=redis_ip_address, port=int(redis_port))
# The client table prefix must be kept in sync with the file
# "src/common/redis_module/ray_redis_module.c" where it is defined.
# "src/common/redis_module/ray_redis_module.cc" where it is defined.
REDIS_CLIENT_TABLE_PREFIX = "CL:"
client_keys = redis_client.keys("{}*".format(REDIS_CLIENT_TABLE_PREFIX))
# Filter to clients on the same node and do some basic checking.
Expand Down
9 changes: 9 additions & 0 deletions src/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ add_custom_command(
COMMENT "Running flatc compiler on ${COMMON_FBS_SRC}"
VERBATIM)

# Generate Python bindings for the flatbuffers objects.
set(PYTHON_OUTPUT_DIR ${CMAKE_BINARY_DIR}/generated/)
add_custom_command(
TARGET gen_common_fbs
COMMAND ${FLATBUFFERS_COMPILER} -p -o ${PYTHON_OUTPUT_DIR} ${COMMON_FBS_SRC}
DEPENDS ${FBS_DEPENDS}
COMMENT "Running flatc compiler on ${COMMON_FBS_SRC}"
VERBATIM)

add_dependencies(gen_common_fbs flatbuffers_ep)

add_custom_target(
Expand Down
59 changes: 59 additions & 0 deletions src/common/format/common.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,62 @@ table TaskInfo {
}

root_type TaskInfo;

table SubscribeToNotificationsReply {
// The object ID of the object that the notification is about.
object_id: string;
// The size of the object.
object_size: long;
// The IDs of the managers that contain this object.
manager_ids: [string];
}

root_type SubscribeToNotificationsReply;

table TaskReply {
// The task ID of the task that the message is about.
task_id: string;
// The state of the task. This is encoded as a bit mask of scheduling_state
// enum values in task.h.
state: long;
// A local scheduler ID.
local_scheduler_id: string;
// A string of bytes representing the task specification.
task_spec: string;
}

root_type TaskReply;

table SubscribeToDBClientTableReply {
// The db client ID of the client that the message is about.
db_client_id: string;
// The type of the client.
client_type: string;
// If the client is a local scheduler, this is the address of the plasma
// manager that the local scheduler is connected to. Otherwise, it is empty.
aux_address: string;
// True if the message is about the addition of a client and false if it is
// about the deletion of a client.
is_insertion: bool;
}

root_type SubscribeToDBClientTableReply;

table LocalSchedulerInfoMessage {
// The db client ID of the client that the message is about.
db_client_id: string;
// The total number of workers that are connected to this local scheduler.
total_num_workers: long;
// The number of tasks queued in this local scheduler.
task_queue_length: long;
// The number of workers that are available and waiting for tasks.
available_workers: long;
// The resource vector of resources generally available to this local
// scheduler.
static_resources: [double];
// The resource vector of resources currently available to this local
// scheduler.
dynamic_resources: [double];
}

root_type LocalSchedulerInfoMessage;
Loading