Skip to content

Commit

Permalink
Delete ready jobs.
Browse files Browse the repository at this point in the history
  • Loading branch information
kr committed Nov 22, 2008
1 parent 89cf1d7 commit 6334c6b
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 1 deletion.
1 change: 1 addition & 0 deletions job.c
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ allocate_job(int body_size)
j->ht_next = NULL;
j->tube = NULL;
j->binlog = NULL;
j->heap_index = 0;

return j;
}
Expand Down
1 change: 1 addition & 0 deletions job.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ struct job {
void *binlog;
char state;
job ht_next; /* Next job in a hash table list */
size_t heap_index; /* where is this job in a heap */
char body[];
};

Expand Down
32 changes: 32 additions & 0 deletions pq.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ swap(pq q, unsigned int a, unsigned int b)
j = q->heap[a];
q->heap[a] = q->heap[b];
q->heap[b] = j;

q->heap[a]->heap_index = a;
q->heap[b]->heap_index = b;
}

#define PARENT(i) (((i-1))>>1)
Expand Down Expand Up @@ -112,6 +115,7 @@ static void
delete_min(pq q)
{
q->heap[0] = q->heap[--q->used];
q->heap[0]->heap_index = 0;
if (q->used) bubble_down(q, 0);
}

Expand All @@ -125,6 +129,7 @@ pq_give(pq q, job j)

k = q->used++;
q->heap[k] = j;
j->heap_index = k;
bubble_up(q, k);

return 1;
Expand All @@ -149,6 +154,33 @@ pq_peek(pq q)
return q->heap[0];
}

job
pq_remove(pq q, job j)
{
unsigned long long int id;
unsigned int pri;

if (q->used == 0) return NULL;
if (q->heap[j->heap_index] != j) return NULL;

id = j->id;
j->id = 0;
pri = j->pri;
j->pri = 0;

bubble_up(q, j->heap_index);

j->id = id;
j->pri = pri;

/* can't happen */
if (q->heap[0] != j) return NULL;

delete_min(q);

return j;
}

job
pq_find(pq q, unsigned long long int id)
{
Expand Down
3 changes: 3 additions & 0 deletions pq.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ job pq_take(pq q);
/* return a job if the queue contains jobs, else NULL */
job pq_peek(pq q);

/* remove and return j if the queue contains j, else return NULL */
job pq_remove(pq q, job j);

/* return a job that matches the given id, else NULL */
/* This is O(n), so don't do it much. */
job pq_find(pq q, unsigned long long int id);
Expand Down
19 changes: 18 additions & 1 deletion prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,21 @@ remove_buried_job(job j)
return j;
}

static job
remove_ready_job(job j)
{
if (!j || j->state != JOB_STATE_READY) return NULL;
j = pq_remove(&j->tube->ready, j);
if (j) {
ready_ct--;
if (j->pri < URGENT_THRESHOLD) {
global_stat.urgent_ct--;
j->tube->stat.urgent_ct--;
}
}
return j;
}

static void
enqueue_waiting_conn(conn c)
{
Expand Down Expand Up @@ -1145,7 +1160,9 @@ dispatch_cmd(conn c)
op_ct[type]++;

j = job_find(id);
j = remove_reserved_job(c, j) ? : remove_buried_job(j);
j = remove_reserved_job(c, j) ? :
remove_ready_job(j) ? :
remove_buried_job(j);

if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);

Expand Down
3 changes: 3 additions & 0 deletions shell_tests/delete_ready.commands
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
put 0 0 0 0

delete 1
2 changes: 2 additions & 0 deletions shell_tests/delete_ready.expected
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
INSERTED 1
DELETED

0 comments on commit 6334c6b

Please sign in to comment.