Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Global scheduler skeleton #45

Merged
merged 14 commits into from
Nov 19, 2016
Merged

Global scheduler skeleton #45

merged 14 commits into from
Nov 19, 2016

Conversation

robertnishihara
Copy link
Collaborator

@robertnishihara robertnishihara commented Nov 17, 2016

Things done in this PR:

  • Enable subscribing to the "local scheduler" table.
  • Bare bones scheduler that subscribes to the local scheduler table and the task table, so it receives updates about all tasks that are scheduled and all new local schedulers that connect to Redis. When it receives a task, it immediately assigns it to a local scheduler (if no local schedulers have connected yet, we currently just fail).
  • Allow the local scheduler to start without connecting to Redis, in which case everything is done locally. If a Redis address is provided, then the local scheduler subscribes to updates to the task table that are assigned to it.

Things that can be done in a subsequent PR:

  • For calls to PUBLISH, check that the number of clients that received the message is what we expect
  • Replace concept of node_id with client_id or perhaps db_client_id.
  • Subscribe to updates from the object table.
  • Create some state managed by the global scheduling algorithm.
  • Allow subscriptions to the various tables get everything that has already been placed in those tables.

Notes:

  • This PR makes the microbenchmark of just submitting as many tasks as quickly as possible slower (20us to 30us on my laptop (using 1 worker)). This may be because the load is increased on the local scheduler because most tasks get submitted to the local scheduler once by the driver and then again by the global scheduler. This could potentially be solved by more clever local/global scheduling algorithms.
    -We should have an option for always forwarding tasks to the global scheduler for benchmarking purposes.

task *original_task,
node_id node_id) {
task *updated_task =
alloc_task(task_task_spec(original_task), TASK_STATUS_SCHEDULED, node_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to allocate a new task here? I think we agreed earlier that the scheduling_state and node_id fields of a task instance should be mutable.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.

db_client_table_subscribe(g_state->db, process_new_db_client,
(void *) g_state, &retry, NULL, NULL);
/* Subscribe to notifications about waiting tasks. */
task_table_subscribe(g_state->db, NIL_ID, TASK_STATUS_WAITING,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also do the initial read of all the tasks that are in state TASK_STATUS_WAITING? Or at least, a TODO to come back to this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a TODO.

*
*/

void handle_task_waiting(global_scheduler_state *state, task *original_task);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documentation for these methods.


#include "global_scheduler_algorithm.h"

void handle_task_waiting(global_scheduler_state *state, task *original_task) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming that this code is throwaway, but can we document what the current algorithm is anyway?

task_contents = self.redis_client.hgetall(task_entries[0])
task_status = int(task_contents["state"])
self.assertTrue(task_status in [TASK_STATUS_WAITING, TASK_STATUS_SCHEDULED])
if task_status == TASK_STATUS_SCHEDULED:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a check to make sure that the task is scheduled on the right node?

task_contents = [self.redis_client.hgetall(task_entries[i]) for i in range(len(task_entries))]
task_statuses = [int(contents["state"]) for contents in task_contents]
self.assertTrue(all([status in [TASK_STATUS_WAITING, TASK_STATUS_SCHEDULED] for status in task_statuses]))
if all([status == TASK_STATUS_SCHEDULED for status in task_statuses]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same check here.

Copy link
Contributor

@mehrdadn mehrdadn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dammit, just learned I need to click "Submit Review" for anyone to see these now :( I hate GitHub's new interface...

Anyway, most of the comments were superficial and not really important... but a few of them really should be addressed I think (especially the signal handling one) so please do take a look at all of them...

event_type, message, utstring_body(timestamp));
/* Fill in the client ID and send the message to Redis. */
redisAsyncCommand(db->context, NULL, NULL, utstring_body(formatted_message),
(char *) db->client.id, sizeof(db_client_id));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do these calls not have return values? If they do can you check them?

Copy link
Collaborator Author

@robertnishihara robertnishihara Nov 19, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I'll fix that.

CHECK(reply->element[j]->type == REDIS_REPLY_STRING);
managers[j] = atoi(reply->element[j]->str);
redis_get_cached_service(db, managers[j], manager_vector + j);
memcpy(managers[j].id, reply->element[j]->str, sizeof(db_client_id));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. You're only copying 1 field of the struct, but you're taking the size of the whole struct. There is no guarantee the two will match in size, at least if you happen to add a field later (though I'm not even sure if you can assume a lack of padding as you are here).
  2. Even if the above wasn't the case, you should always use sizeof(field) instead of sizeof(type) when you can. That way when you change the field type later, the code won't break.

So this should be changed to memcpy(managers[j].id, reply->element[j]->str, sizeof(managers[j].id));
Perhaps worthy of a macro at some point, though not necessarily now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, scratch that. Don't use memcpy if you can avoid it. Just do something like
managers[j] = *(db_client_id const *)reply->element[j]->str
and let the compiler take care of the assignment.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I've tried stuff like that and remember getting errors like this.

state/redis.c:372:22: error: array type 'unsigned char [20]' is not assignable
      managers[j].id = *(db_client_id const *) reply->element[j]->str;
      ~~~~~~~~~~~~~~ ^

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, that's because you were assigning to id instead of the whole struct.
Just assign to the whole struct.

void redis_db_client_table_subscribe_callback(redisAsyncContext *c,
void *r,
void *privdata) {
REDIS_CALLBACK_HEADER(db, callback_data, r)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put semicolons after macros that are statements (and ideally, write the macros such that these are required).
It makes it clear that these macros are statements and not declarations of some sort.
(Indeed some declarations also require semicolons, but not all of them do. However, all statements do.)
Bonus points: Some editors/IDEs choke on their indentations when you don't do this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point

/* Otherwise, parse the payload and call the callback. */
db_client_table_subscribe_data *data = callback_data->data;
db_client_id client;
memcpy(client.id, payload->str, sizeof(db_client_id));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, same thing here as before. Fix this everywhere.

* Should only be used very rarely, it is not asynchronous. */
/** Cache for the IP addresses of db clients. This is a hash table mapping
* client IDs to addresses. */
db_client_cache_entry *db_client_cache;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused... if this is a hashtable then why is it a pointer to an entry? Am I misreading it?

If this is just how you're representing it, use a typedef to abstract it away properly. This should point to a db_client_cache or something. It doesn't make sense as it is written now, and it makes the code more brittle.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the way the uthash library is used. https://troydhanson.github.io/uthash/userguide.html

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see :\ okay if it's idiomatic use of the library then never mind.

/* Copy the spec and add it to the task queue. The allocated spec will be
* freed when it is assigned to a worker. */
task_queue_entry *elt = malloc(sizeof(task_queue_entry));
elt->spec = malloc(task_spec_size(spec));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not cast the return value of malloc but cast the return value of others like utarray_back?
Be consistent. I'd say cast all of them so people can compile the code with a C++ compiler too.

Copy link
Contributor

@mehrdadn mehrdadn Nov 19, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(not terribly important given this is already merged, but worth keeping in mind for new code)

DCHECK(!from_global_scheduler);
task *task = alloc_task(spec, TASK_STATUS_WAITING, NIL_ID);
DCHECK(info->db != NULL);
task_table_add_task(info->db, task, (retry_info *) &photon_retry, NULL, NULL);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this mutating photon_retry? Or is casting away const only to make the pointer types match...?

Copy link
Collaborator Author

@robertnishihara robertnishihara Nov 19, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't mutate anything, probably just get to get rid of a compiler warning.

/* If this task's dependencies are available locally, and if there is an
* available worker, then assign this task to an available worker. If we
* cannot assign the task to a worker immediately, queue the task locally. */
if ((utarray_len(s->available_workers) > 0) && can_run(s, spec)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove extra parentheses; they make code harder to read.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I often include them to save myself time when debugging. E.g., if there is a bug, I'll start to wonder if I got the order of operations wrong, and so I'll go and add in the parentheses just to be sure..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha, okay fair enough.

/* Update the global task table. */
if (info->db != NULL) {
retry_info retry = {
.num_retries = 0, .timeout = 100, .fail_callback = NULL,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, avoid designated initializers. (I'll stop commenting but there are more.)

char redis_addr[16] = {0};
char redis_port[6] = {0};
if (sscanf(redis_addr_port, "%15[0-9.]:%5[0-9]", redis_addr, redis_port) !=
2) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was painful to read. Maybe store the output of sscanf in a variable nassigned and then test nassigned != 2 on a single line.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants