diff --git a/src/plasma/CMakeLists.txt b/src/plasma/CMakeLists.txt index e6a2eb53341a5..22c0deacdc608 100644 --- a/src/plasma/CMakeLists.txt +++ b/src/plasma/CMakeLists.txt @@ -5,8 +5,6 @@ project(plasma) # Recursively include common include(${CMAKE_CURRENT_LIST_DIR}/../common/cmake/Common.cmake) -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") - if(APPLE) SET(CMAKE_SHARED_LIBRARY_SUFFIX ".so") endif(APPLE) @@ -14,6 +12,7 @@ endif(APPLE) include_directories("${PYTHON_INCLUDE_DIRS}" thirdparty) set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} --std=c99 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L") +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} --std=c++11 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L") # Compile flatbuffers diff --git a/src/plasma/plasma_store.cc b/src/plasma/plasma_store.cc index 7582fc0acc787..7bb20efd3d4d8 100644 --- a/src/plasma/plasma_store.cc +++ b/src/plasma/plasma_store.cc @@ -25,6 +25,9 @@ #include #include +#include +#include + #include "common.h" #include "format/common_generated.h" #include "event_loop.h" @@ -44,8 +47,24 @@ void *dlmemalign(size_t alignment, size_t bytes); void dlfree(void *); } +namespace std { +template <> +struct hash { + size_t operator()(const UniqueID &unique_id) const { + return *reinterpret_cast(unique_id.id + UNIQUE_ID_SIZE - + sizeof(size_t)); + } +}; +} // namespace std + +bool operator==(UniqueID a, UniqueID b) { + return UNIQUE_ID_EQ(a, b); +} + /** Contains all information that is associated with a Plasma store client. */ struct Client { + Client(int sock, PlasmaStoreState *plasma_state); + /** The socket used to communicate with the client. */ int sock; /** A pointer to the global plasma state. */ @@ -70,46 +89,35 @@ typedef struct { UT_hash_handle hh; } NotificationQueue; -typedef struct { +struct GetRequest { + GetRequest(Client *client, int num_object_ids, ObjectID object_ids[]); + /** The client connection that called get. */ Client *client; /** The ID of the timer that will time out and cause this wait to return to * the client if it hasn't already returned. */ int64_t timer; - /** The number of objects in this get request. */ - int64_t num_object_ids; /** The object IDs involved in this request. This is used in the reply. */ - ObjectID *object_ids; + std::vector object_ids; /** The object information for the objects in this request. This is used in * the reply. */ - PlasmaObject *objects; + std::vector objects; /** The minimum number of objects to wait for in this request. */ int64_t num_objects_to_wait_for; /** The number of object requests in this wait request that are already * satisfied. */ int64_t num_satisfied; -} GetRequest; - -typedef struct { - /** The ID of the object. This is used as a key in a hash table. */ - ObjectID object_id; - /** An array of the get requests involving this object ID. */ - UT_array *get_requests; - /** Handle for the uthash table in the store state that keeps track of the get - * requests involving this object ID. */ - UT_hash_handle hh; -} ObjectGetRequests; - -/** This is used to define the utarray of get requests in the - * ObjectGetRequests struct. */ -UT_icd get_request_icd = {sizeof(GetRequest *), NULL, NULL, NULL}; +}; struct PlasmaStoreState { + PlasmaStoreState(event_loop *loop, int64_t system_memory); + /* Event loop of the plasma store. */ event_loop *loop; /** A hash table mapping object IDs to a vector of the get requests that are * waiting for the object to arrive. */ - ObjectGetRequests *object_get_requests; + std::unordered_map> object_get_requests; + /** The pending notifications that have not been sent to subscribers because * the socket send buffers were full. This is a hash table from client file * descriptor to an array of object_ids to send to that client. */ @@ -130,23 +138,29 @@ PlasmaStoreState *g_state; UT_icd byte_icd = {sizeof(uint8_t), NULL, NULL, NULL}; -PlasmaStoreState *PlasmaStoreState_init(event_loop *loop, - int64_t system_memory) { - PlasmaStoreState *state = - (PlasmaStoreState *) malloc(sizeof(PlasmaStoreState)); - state->loop = loop; - state->object_get_requests = NULL; - state->pending_notifications = NULL; - /* Initialize the plasma store info. */ - state->plasma_store_info = - (PlasmaStoreInfo *) malloc(sizeof(PlasmaStoreInfo)); - state->plasma_store_info->objects = NULL; - state->plasma_store_info->memory_capacity = system_memory; - /* Initialize the eviction state. */ - state->eviction_state = EvictionState_init(); - utarray_new(state->input_buffer, &byte_icd); - state->builder = make_protocol_builder(); - return state; +Client::Client(int sock, PlasmaStoreState *plasma_state) + : sock(sock), plasma_state(plasma_state) {} + +GetRequest::GetRequest(Client *client, + int num_object_ids, + ObjectID object_ids[]) + : client(client), + timer(-1), + object_ids(object_ids, object_ids + num_object_ids), + objects(num_object_ids), + num_objects_to_wait_for(num_object_ids), + num_satisfied(0) {} + +PlasmaStoreState::PlasmaStoreState(event_loop *loop, int64_t system_memory) + : loop(loop), + pending_notifications(NULL), + plasma_store_info((PlasmaStoreInfo *) malloc(sizeof(PlasmaStoreInfo))), + eviction_state(EvictionState_init()), + builder(make_protocol_builder()) { + this->plasma_store_info->objects = NULL; + this->plasma_store_info->memory_capacity = system_memory; + + utarray_new(this->input_buffer, &byte_icd); } void PlasmaStoreState_free(PlasmaStoreState *state) { @@ -279,45 +293,19 @@ int create_object(Client *client_context, void add_get_request_for_object(PlasmaStoreState *store_state, ObjectID object_id, GetRequest *get_req) { - ObjectGetRequests *object_get_reqs; - HASH_FIND(hh, store_state->object_get_requests, &object_id, sizeof(object_id), - object_get_reqs); - /* If there are currently no get requests involving this object ID, create a - * new ObjectGetRequests struct for this object ID and add it to the hash - * table. */ - if (object_get_reqs == NULL) { - object_get_reqs = (ObjectGetRequests *) malloc(sizeof(ObjectGetRequests)); - object_get_reqs->object_id = object_id; - utarray_new(object_get_reqs->get_requests, &get_request_icd); - HASH_ADD(hh, store_state->object_get_requests, object_id, - sizeof(object_get_reqs->object_id), object_get_reqs); - } - /* Add this get request to the vector of get requests involving this object - * ID. */ - utarray_push_back(object_get_reqs->get_requests, &get_req); + store_state->object_get_requests[object_id].push_back(get_req); } void remove_get_request_for_object(PlasmaStoreState *store_state, ObjectID object_id, GetRequest *get_req) { - ObjectGetRequests *object_get_reqs; - HASH_FIND(hh, store_state->object_get_requests, &object_id, sizeof(object_id), - object_get_reqs); - /* If there is a vector of get requests for this object ID, and if this vector - * contains the get request, then remove the get request from the vector. */ - if (object_get_reqs != NULL) { - for (int i = 0; i < utarray_len(object_get_reqs->get_requests); ++i) { - GetRequest **get_req_ptr = - (GetRequest **) utarray_eltptr(object_get_reqs->get_requests, i); - if (*get_req_ptr == get_req) { - /* Remove the get request from the array. */ - utarray_erase(object_get_reqs->get_requests, i, 1); - break; - } + std::vector &get_requests = + store_state->object_get_requests[object_id]; + for (auto it = get_requests.begin(); it != get_requests.end(); ++it) { + if (*it == get_req) { + get_requests.erase(it); + break; } - /* In principle, if there are no more get requests involving this object ID, - * then we could remove the object_get_reqs struct. However, the - * object_get_reqs struct gets removed in update_object_get_requests. */ } } @@ -325,9 +313,7 @@ void remove_get_request(PlasmaStoreState *store_state, GetRequest *get_req) { if (get_req->timer != -1) { CHECK(event_loop_remove_timer(store_state->loop, get_req->timer) == AE_OK); } - free(get_req->object_ids); - free(get_req->objects); - free(get_req); + delete get_req; } void PlasmaObject_init(PlasmaObject *object, object_table_entry *entry) { @@ -344,20 +330,19 @@ void PlasmaObject_init(PlasmaObject *object, object_table_entry *entry) { void return_from_get(PlasmaStoreState *store_state, GetRequest *get_req) { /* Send the get reply to the client. */ - int status = plasma_send_GetReply(get_req->client->sock, store_state->builder, - get_req->object_ids, get_req->objects, - get_req->num_object_ids); + int status = plasma_send_GetReply( + get_req->client->sock, store_state->builder, &get_req->object_ids[0], + &get_req->objects[0], get_req->object_ids.size()); warn_if_sigpipe(status, get_req->client->sock); /* If we successfully sent the get reply message to the client, then also send * the file descriptors. */ if (status >= 0) { /* Send all of the file descriptors for the present objects. */ - for (int i = 0; i < get_req->num_object_ids; ++i) { + for (PlasmaObject &object : get_req->objects) { /* We use the data size to indicate whether the object is present or not. */ - if (get_req->objects[i].data_size != -1) { - int error_code = - send_fd(get_req->client->sock, get_req->objects[i].handle.store_fd); + if (object.data_size != -1) { + int error_code = send_fd(get_req->client->sock, object.handle.store_fd); /* If we failed to send the file descriptor, loop until we have sent it * successfully. TODO(rkn): This is problematic for two reasons. First * of all, sending the file descriptor should just succeed without any @@ -367,8 +352,7 @@ void return_from_get(PlasmaStoreState *store_state, GetRequest *get_req) { while (error_code < 0) { if (errno == EMSGSIZE) { LOG_WARN("Failed to send file descriptor, retrying."); - error_code = send_fd(get_req->client->sock, - get_req->objects[i].handle.store_fd); + error_code = send_fd(get_req->client->sock, object.handle.store_fd); continue; } warn_if_sigpipe(error_code, get_req->client->sock); @@ -381,8 +365,8 @@ void return_from_get(PlasmaStoreState *store_state, GetRequest *get_req) { /* Remove the get request from each of the relevant object_get_requests hash * tables if it is present there. It should only be present there if the get * request timed out. */ - for (int i = 0; i < get_req->num_object_ids; ++i) { - remove_get_request_for_object(store_state, get_req->object_ids[i], get_req); + for (ObjectID &object_id : get_req->object_ids) { + remove_get_request_for_object(store_state, object_id, get_req); } /* Remove the get request. */ remove_get_request(store_state, get_req); @@ -390,62 +374,48 @@ void return_from_get(PlasmaStoreState *store_state, GetRequest *get_req) { void update_object_get_requests(PlasmaStoreState *store_state, ObjectID obj_id) { - /* Update the in-progress get requests. */ - ObjectGetRequests *object_get_reqs; - HASH_FIND(hh, store_state->object_get_requests, &obj_id, sizeof(obj_id), - object_get_reqs); - if (object_get_reqs != NULL) { - /* We compute the number of requests first because the length of the utarray - * will change as we iterate over it (because each call to return_from_get - * will remove one element). */ - int num_requests = utarray_len(object_get_reqs->get_requests); - /* The argument index is the index of the current element of the utarray - * that we are processing. It may differ from the counter i when elements - * are removed from the array. */ - int index = 0; - for (int i = 0; i < num_requests; ++i) { - GetRequest **get_req_ptr = - (GetRequest **) utarray_eltptr(object_get_reqs->get_requests, index); - GetRequest *get_req = *get_req_ptr; - - int num_updated = 0; - for (int j = 0; j < get_req->num_objects_to_wait_for; ++j) { - object_table_entry *entry; - HASH_FIND(handle, store_state->plasma_store_info->objects, &obj_id, - sizeof(obj_id), entry); - CHECK(entry != NULL); - - if (ObjectID_equal(get_req->object_ids[j], obj_id)) { - PlasmaObject_init(&get_req->objects[j], entry); - num_updated += 1; - get_req->num_satisfied += 1; - /* Record the fact that this client will be using this object and will - * be responsible for releasing this object. */ - add_client_to_object_clients(entry, get_req->client); - } - } - /* Check a few things just to be sure there aren't bugs. */ - DCHECK(num_updated > 0); - if (num_updated > 1) { - LOG_WARN("A get request contained a duplicated object ID."); + std::vector &get_requests = + store_state->object_get_requests[obj_id]; + int index = 0; + int num_requests = get_requests.size(); + for (int i = 0; i < num_requests; ++i) { + GetRequest *get_req = get_requests[index]; + int num_updated = 0; + for (int j = 0; j < get_req->num_objects_to_wait_for; ++j) { + object_table_entry *entry; + HASH_FIND(handle, store_state->plasma_store_info->objects, &obj_id, + sizeof(obj_id), entry); + CHECK(entry != NULL); + + if (ObjectID_equal(get_req->object_ids[j], obj_id)) { + PlasmaObject_init(&get_req->objects[j], entry); + num_updated += 1; + get_req->num_satisfied += 1; + /* Record the fact that this client will be using this object and will + * be responsible for releasing this object. */ + add_client_to_object_clients(entry, get_req->client); } + } + /* Check a few things just to be sure there aren't bugs. */ + DCHECK(num_updated > 0); + if (num_updated > 1) { + LOG_WARN("A get request contained a duplicated object ID."); + } - /* If this get request is done, reply to the client. */ - if (get_req->num_satisfied == get_req->num_objects_to_wait_for) { - return_from_get(store_state, get_req); - } else { - /* The call to return_from_get will remove the current element in the - * array, so we only increment the counter in the else branch. */ - index += 1; - } + /* If this get request is done, reply to the client. */ + if (get_req->num_satisfied == get_req->num_objects_to_wait_for) { + return_from_get(store_state, get_req); + } else { + /* The call to return_from_get will remove the current element in the + * array, so we only increment the counter in the else branch. */ + index += 1; } - DCHECK(index == utarray_len(object_get_reqs->get_requests)); - /* Remove the array of get requests for this object, since no one should be - * waiting for this object anymore. */ - HASH_DELETE(hh, store_state->object_get_requests, object_get_reqs); - utarray_free(object_get_reqs->get_requests); - free(object_get_reqs); } + + DCHECK(index == get_requests.size()); + /* Remove the array of get requests for this object, since no one should be + * waiting for this object anymore. */ + store_state->object_get_requests.erase(obj_id); } int get_timeout_handler(event_loop *loop, timer_id id, void *context) { @@ -461,19 +431,8 @@ void process_get_request(Client *client_context, PlasmaStoreState *plasma_state = client_context->plasma_state; /* Create a get request for this object. */ - GetRequest *get_req = (GetRequest *) malloc(sizeof(GetRequest)); - memset(get_req, 0, sizeof(GetRequest)); - get_req->client = client_context; - get_req->timer = -1; - get_req->num_object_ids = num_object_ids; - get_req->object_ids = (ObjectID *) malloc(num_object_ids * sizeof(ObjectID)); - get_req->objects = - (PlasmaObject *) malloc(num_object_ids * sizeof(PlasmaObject)); - for (int i = 0; i < num_object_ids; ++i) { - get_req->object_ids[i] = object_ids[i]; - } - get_req->num_objects_to_wait_for = num_object_ids; - get_req->num_satisfied = 0; + GetRequest *get_req = + new GetRequest(client_context, num_object_ids, object_ids); for (int i = 0; i < num_object_ids; ++i) { ObjectID obj_id = object_ids[i]; @@ -858,9 +817,7 @@ void new_client_connection(event_loop *loop, int new_socket = accept_client(listener_sock); /* Create a new client object. This will also be used as the context to use * for events on this client's socket. TODO(rkn): free this somewhere. */ - Client *client_context = (Client *) malloc(sizeof(Client)); - client_context->sock = new_socket; - client_context->plasma_state = plasma_state; + Client *client_context = new Client(new_socket, plasma_state); /* Add a callback to handle events on this socket. */ event_loop_add_file(loop, new_socket, EVENT_LOOP_READ, process_message, client_context); @@ -881,7 +838,7 @@ void start_server(char *socket_name, int64_t system_memory) { signal(SIGPIPE, SIG_IGN); /* Create the event loop. */ event_loop *loop = event_loop_create(); - PlasmaStoreState *state = PlasmaStoreState_init(loop, system_memory); + PlasmaStoreState *state = new PlasmaStoreState(loop, system_memory); int socket = bind_ipc_sock(socket_name, true); CHECK(socket >= 0); event_loop_add_file(loop, socket, EVENT_LOOP_READ, new_client_connection,