Skip to content

Commit

Permalink
Merge pull request #18 from BramSrna/add_swarm_memory_schema
Browse files Browse the repository at this point in the history
Add support for tables in swarm memory
  • Loading branch information
BramSrna authored Mar 18, 2023
2 parents a49a581 + 1eca37d commit e4b933e
Show file tree
Hide file tree
Showing 6 changed files with 289 additions and 178 deletions.
15 changes: 7 additions & 8 deletions swarm/message_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@ class MessageTypes(Enum):
EXECUTION_GROUP_TEARDOWN = 8
TASK_OUTPUT = 9
EXECUTION_GROUP_DELETION = 10
RESPOND_TO_READ = 11
NEW_SWARM_BOT_ID = 12
FORWARD_MESSAGE = 13
SYNC_INTERMEDIARIES = 14
BOT_TEARDOWN = 15
REQUEST_PATH_TO_BOT = 16
MSG_RESPONSE = 17
UPDATE_SWARM_MEMORY_VALUE = 18
NEW_SWARM_BOT_ID = 11
FORWARD_MESSAGE = 12
SYNC_INTERMEDIARIES = 13
BOT_TEARDOWN = 14
REQUEST_PATH_TO_BOT = 15
MSG_RESPONSE = 16
UPDATE_SWARM_MEMORY_VALUE = 17
51 changes: 23 additions & 28 deletions swarm/swarm_bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from swarm.executor_interface import ExecutorInterface
from swarm.message_types import MessageTypes
from swarm.swarm_task.task_scheduling_algorithms import simple_task_sort
from swarm.swarm_task.swarm_task import SwarmTask
from swarm.swarm_memory.swarm_memory_interface import SwarmMemoryInterface
from swarm.swarm_task.task_execution_controller import TaskExecutionController

Expand Down Expand Up @@ -56,7 +55,6 @@ def __init__(self, additional_config_path: str = None, additional_config_dict: d
self.assign_msg_handler(str(MessageTypes.DELETE_FROM_SWARM_MEMORY), self.handle_delete_from_swarm_memory_message)
self.assign_msg_handler(str(MessageTypes.TASK_OUTPUT), self.handle_task_output_message)
self.assign_msg_handler(str(MessageTypes.EXECUTION_GROUP_DELETION), self.handle_execution_group_deletion_message)
self.assign_msg_handler(str(MessageTypes.RESPOND_TO_READ), self.handle_respond_to_read_message)
self.assign_msg_handler(str(MessageTypes.NEW_SWARM_BOT_ID), self.handle_new_swarm_bot_id_message)
self.assign_msg_handler(str(MessageTypes.FORWARD_MESSAGE), self.handle_forward_message_message)
self.assign_msg_handler(str(MessageTypes.SYNC_INTERMEDIARIES), self.handle_sync_intermediaries_message)
Expand Down Expand Up @@ -89,30 +87,29 @@ def receive_task_bundle(self, new_task_bundle, listener_bot_id=None):
for i in range(len(tasks)):
curr_task = tasks[i]
self.write_to_swarm_memory(
curr_task.get_id(),
"TASK_QUEUE/" + str(curr_task.get_id()),
{
"TASK": curr_task,
"PARENT_BUNDLE_ID": new_task_bundle.get_id(),
"REQ_NUM_BOTS": new_task_bundle.get_req_num_bots(),
"INDEX_IN_BUNDLE": i,
"LISTENER_ID": listener_bot_id
},
SwarmTask.__name__
}
)
self.task_queue_has_values.set()
return True

def read_from_swarm_memory(self, key_to_read):
return self.swarm_memory_interface.read_from_swarm_memory(key_to_read)
def read_from_swarm_memory(self, path_to_read):
return self.swarm_memory_interface.read_from_swarm_memory(path_to_read)

def write_to_swarm_memory(self, key_to_write, value_to_write, data_type):
return self.swarm_memory_interface.write_to_swarm_memory(key_to_write, value_to_write, data_type)
def write_to_swarm_memory(self, path_to_write, value_to_write):
return self.swarm_memory_interface.write_to_swarm_memory(path_to_write, value_to_write)

def update_swarm_memory(self, key_to_update, new_value):
return self.swarm_memory_interface.update_swarm_memory(key_to_update, new_value)
def update_swarm_memory(self, path_to_update, new_value):
return self.swarm_memory_interface.update_swarm_memory(path_to_update, new_value)

def pop_from_swarm_memory(self, key_to_pop):
return self.swarm_memory_interface.pop_from_swarm_memory(key_to_pop)
def pop_from_swarm_memory(self, path_to_pop):
return self.swarm_memory_interface.pop_from_swarm_memory(path_to_pop)

def get_task_execution_history(self):
return self.task_execution_history
Expand All @@ -125,20 +122,18 @@ def set_task_executor_status(self, new_status):
self.run_task_executor.set()
self.task_queue_has_values.set()

def get_task_bundle_queue(self):
return self.swarm_memory_interface.get_ids_of_contents_of_type(SwarmTask.__name__)

def get_task_queue(self):
return self.swarm_memory_interface.get_ids_of_contents_of_type(SwarmTask.__name__)
return self.read_from_swarm_memory("TASK_QUEUE")

def get_next_task_to_execute(self):
next_task_info = None
task_queue = self.swarm_memory_interface.get_ids_of_contents_of_type(SwarmTask.__name__)
while ((next_task_info is None) and (len(task_queue) > 0)):
task_queue.sort(key=self.task_scheduling_algorithm)
next_task_id = task_queue.pop(0)
next_task_info = self.swarm_memory_interface.pop_from_swarm_memory(next_task_id)
task_queue = self.swarm_memory_interface.get_ids_of_contents_of_type(SwarmTask.__name__)
task_queue = self.get_task_queue()
if task_queue is not None:
task_ids = list(task_queue.keys())
task_ids.sort(key=self.task_scheduling_algorithm)
while ((next_task_info is None) and (len(task_queue) > 0)):
next_task_id = task_ids.pop(0)
next_task_info = self.pop_from_swarm_memory("TASK_QUEUE/" + str(next_task_id))

return next_task_info

Expand Down Expand Up @@ -249,9 +244,9 @@ def get_num_jumps_to(self, target_bot_id):

def handle_swarm_memory_object_location_message(self, message):
msg_payload = message.get_message_payload()
data_type = msg_payload["DATA_TYPE"]
path = msg_payload["PATH"]
self.swarm_memory_interface.handle_swarm_memory_object_location_message(message)
if data_type == SwarmTask.__name__:
if "TASK_QUEUE" in path:
self.task_queue_has_values.set()

def handle_request_swarm_memory_read_message(self, message):
Expand Down Expand Up @@ -306,9 +301,9 @@ def handle_respond_to_read_message(self, message):
msg_payload = message.get_message_payload()
object_key = msg_payload["OBJECT_KEY"]
object_value = msg_payload["OBJECT_VALUE"]
data_type = msg_payload["DATA_TYPE"]
self.write_to_swarm_memory(object_key, object_value, data_type)
if data_type == SwarmTask.__name__:
table_id = msg_payload["TABLE_ID"]
self.write_to_swarm_memory(table_id, object_key, object_value)
if table_id == "TASK_QUEUE":
self.task_queue_has_values.set()

def handle_new_swarm_bot_id_message(self, message):
Expand Down
131 changes: 92 additions & 39 deletions swarm/swarm_memory/local_swarm_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,95 @@ def __init__(self, owner_bot_id):
self.contents = {}
self.data_to_holder_id_map = {}

def write(self, new_key_to_write, new_value_to_write, data_type):
self.contents[new_key_to_write] = new_value_to_write
self.update_data_holder(new_key_to_write, self.owner_bot_id, data_type)

def read(self, key_to_read):
if key_to_read in self.contents:
return self.contents[key_to_read]
return None

def update(self, key_to_update, new_value):
if self.has_data_key(key_to_update):
self.contents[key_to_update] = new_value

def delete(self, key_to_delete):
if key_to_delete in self.contents:
self.contents.pop(key_to_delete)
if key_to_delete in self.data_to_holder_id_map:
self.data_to_holder_id_map.pop(key_to_delete)

def has_data_key(self, data_key):
return data_key in self.contents

def update_data_holder(self, data_key, data_holder_id, data_type):
self.data_to_holder_id_map[data_key] = {
"DATA_TYPE": data_type,
"HOLDER_ID": data_holder_id
}

def get_data_holder_id(self, data_key):
if data_key in self.data_to_holder_id_map:
return self.data_to_holder_id_map[data_key]["HOLDER_ID"]
return None

def get_ids_of_contents_of_type(self, type_to_get):
ids = []
for data_key, data_info in self.data_to_holder_id_map.items():
if data_info["DATA_TYPE"] == type_to_get:
ids.append(data_key)
return ids
def write(self, path_to_write, value_to_write):
path_components = path_to_write.split("/")
curr_dict = self.contents
for i in range(len(path_components)):
key = path_components[i]
if i == len(path_components) - 1:
curr_dict[key] = value_to_write
else:
if key not in curr_dict:
curr_dict[key] = {}
if (not isinstance(curr_dict[key], dict)):
curr_dict[key] = {}
curr_dict = curr_dict[key]
self.update_data_holder(path_to_write, self.owner_bot_id)

def read(self, path_to_read):
path_components = path_to_read.split("/")
curr_dict = self.contents
for i in range(len(path_components)):
key = path_components[i]
if i == len(path_components) - 1:
if key in curr_dict:
return curr_dict[key]
return None
else:
curr_dict = curr_dict[key]

def delete(self, path_to_delete):
path_components = path_to_delete.split("/")
curr_dict = self.contents
for i in range(len(path_components)):
key = path_components[i]
if i == len(path_components) - 1:
if key in curr_dict:
curr_dict.pop(key)
else:
if key not in curr_dict:
curr_dict[key] = {}
if (not isinstance(curr_dict[key], dict)):
curr_dict[key] = {}
curr_dict = curr_dict[key]

curr_dict = self.data_to_holder_id_map
for i in range(len(path_components)):
key = path_components[i]
if i == len(path_components) - 1:
if key in curr_dict:
curr_dict.pop(key)
else:
curr_dict = curr_dict[key]

def has_path(self, path):
path_components = path.split("/")
curr_dict = self.contents
for i in range(len(path_components)):
key = path_components[i]
if key not in curr_dict:
return False
curr_dict = curr_dict[key]
return True

def update_data_holder(self, path, data_holder_id):
if (data_holder_id != self.owner_bot_id) and (self.has_path(path)):
self.delete(path)

path_components = path.split("/")
curr_dict = self.data_to_holder_id_map
for i in range(len(path_components)):
key = path_components[i]
if i == len(path_components) - 1:
curr_dict[key] = data_holder_id
else:
if key not in curr_dict:
curr_dict[key] = {}
if (not isinstance(curr_dict[key], dict)):
curr_dict[key] = {}
curr_dict = curr_dict[key]

def get_key_holder_ids(self, path):
path_components = path.split("/")
curr_dict = self.data_to_holder_id_map
for i in range(len(path_components) - 1):
key = path_components[i]
curr_dict = curr_dict[key]
return list(set(self.get_nested_dict_values(curr_dict)))

def get_nested_dict_values(self, dict_to_parse):
for value in dict_to_parse.values():
if isinstance(value, dict):
yield from self.get_nested_dict_values(value)
else:
yield value
Loading

0 comments on commit e4b933e

Please sign in to comment.