Skip to content

Commit

Permalink
lab7 replicated state machine
Browse files Browse the repository at this point in the history
  • Loading branch information
remysys committed Aug 7, 2015
1 parent 6375d10 commit cd8d266
Show file tree
Hide file tree
Showing 14 changed files with 923 additions and 73 deletions.
Binary file added .lock_server_cache_rsm.cc.swp
Binary file not shown.
Binary file added .lock_server_cache_rsm.h.swp
Binary file not shown.
Binary file added .rsm.cc.swp
Binary file not shown.
92 changes: 46 additions & 46 deletions config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -232,52 +232,52 @@ config::heartbeater()
std::vector<std::string> cmems;
ScopedLock ml(&cfg_mutex);

while (1) {

gettimeofday(&now, NULL);
next_timeout.tv_sec = now.tv_sec + 3;
next_timeout.tv_nsec = 0;
tprintf("heartbeater: go to sleep\n");
pthread_cond_timedwait(&config_cond, &cfg_mutex, &next_timeout);

stable = true;
vid = myvid;
cmems = get_view_wo(vid);
tprintf("heartbeater: current membership %s\n", print_members(cmems).c_str());

if (!isamember(me, cmems)) {
tprintf("heartbeater: not member yet; skip hearbeat\n");
continue;
}

//find the node with the smallest id
m = me;
for (unsigned i = 0; i < cmems.size(); i++) {
if (m > cmems[i])
m = cmems[i];
}

if (m == me) {
//if i am the one with smallest id, ping the rest of the nodes
for (unsigned i = 0; i < cmems.size(); i++) {
if (cmems[i] != me) {
if ((h = doheartbeat(cmems[i])) != OK) {
stable = false;
m = cmems[i];
break;
}
}
}
} else {
//the rest of the nodes ping the one with smallest id
if ((h = doheartbeat(m)) != OK)
stable = false;
}

if (!stable && vid == myvid) {
remove_wo(m);
}
}
while (1) {

gettimeofday(&now, NULL);
next_timeout.tv_sec = now.tv_sec + 3;
next_timeout.tv_nsec = 0;
tprintf("heartbeater: go to sleep\n");
pthread_cond_timedwait(&config_cond, &cfg_mutex, &next_timeout);

stable = true;
vid = myvid;
cmems = get_view_wo(vid);
tprintf("heartbeater: current membership %s\n", print_members(cmems).c_str());

if (!isamember(me, cmems)) {
tprintf("heartbeater: not member yet; skip hearbeat\n");
continue;
}

//find the node with the smallest id
m = me;
for (unsigned i = 0; i < cmems.size(); i++) {
if (m > cmems[i])
m = cmems[i];
}

if (m == me) {
//if i am the one with smallest id, ping the rest of the nodes
for (unsigned i = 0; i < cmems.size(); i++) {
if (cmems[i] != me) {
if ((h = doheartbeat(cmems[i])) != OK) {
stable = false;
m = cmems[i];
break;
}
}
}
} else {
//the rest of the nodes ping the one with smallest id
if ((h = doheartbeat(m)) != OK)
stable = false;
}

if (!stable && vid == myvid) {
remove_wo(m);
}
}
}

paxos_protocol::status
Expand Down
156 changes: 146 additions & 10 deletions lock_client_cache_rsm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ lock_client_cache_rsm::lock_client_cache_rsm(std::string xdst,
// You fill this in Step Two, Lab 7
// - Create rsmc, and use the object to do RPC
// calls instead of the rpcc object of lock_client
rsmc = new rsm_client(xdst);

pthread_t th;
int r = pthread_create(&th, NULL, &releasethread, (void *) this);
VERIFY (r == 0);
Expand All @@ -53,40 +55,174 @@ lock_client_cache_rsm::releaser()
// This method should be a continuous loop, waiting to be notified of
// freed locks that have been revoked by the server, so that it can
// send a release RPC.

while(true) {
release_entry e;
release_queue.deq(&e);

if (lu) {
lu->dorelease(e.lid);
}
int r;
rsmc->call(lock_protocol::release, e.lid, id, e.xid, r);
pthread_mutex_lock(&client_mutex);
std::map<lock_protocol::lockid_t, lock_entry>::iterator iter;
iter = lockmap.find(e.lid);
VERIFY(iter != lockmap.end());
iter->second.state = NONE;
pthread_cond_broadcast(&iter->second.releasequeue);
pthread_mutex_unlock(&client_mutex);
}
}


lock_protocol::status
lock_client_cache_rsm::acquire(lock_protocol::lockid_t lid)
{
int ret = lock_protocol::OK;

return ret;
int ret = lock_protocol::OK;
int r;
std::map<lock_protocol::lockid_t, lock_entry>::iterator iter;
pthread_mutex_lock(&client_mutex);
iter = lockmap.find(lid);
if(iter == lockmap.end()) {
iter = lockmap.insert(std::make_pair(lid, lock_entry())).first;
}

while (true) {
switch(iter->second.state) {
case NONE:
iter->second.xid = xid;
xid++;
iter->second.state = ACQUIRING;
iter->second.retry = false;
pthread_mutex_unlock(&client_mutex);
ret = rsmc->call(lock_protocol::acquire, lid, id, iter->second.xid, r);
pthread_mutex_lock(&client_mutex);
if (ret == lock_protocol::OK) {
iter->second.state = LOCKED;
pthread_mutex_unlock(&client_mutex);
return ret;
} else if (ret == lock_protocol::RETRY) {
if(!iter->second.retry) {
pthread_cond_wait(&iter->second.retryqueue, &client_mutex);
}
}
break;
case FREE:
iter->second.state = LOCKED;
pthread_mutex_unlock(&client_mutex);
return lock_protocol::OK;
break;
case LOCKED:
pthread_cond_wait(&iter->second.waitqueue, &client_mutex);
break;
case ACQUIRING:
if(!iter->second.retry) {
pthread_cond_wait(&iter->second.waitqueue, &client_mutex);
} else {
iter->second.retry = false;
iter->second.xid = xid;
xid++;
pthread_mutex_unlock(&client_mutex);
ret = rsmc->call(lock_protocol::acquire, lid, id, iter->second.xid, r);
pthread_mutex_lock(&client_mutex);
if (ret == lock_protocol::OK) {
iter->second.state = LOCKED;
pthread_mutex_unlock(&client_mutex);
return ret;
} else if (ret == lock_protocol::RETRY) {
if(!iter->second.retry)
pthread_cond_wait(&iter->second.retryqueue, &client_mutex);
}
}
break;
case RELEASING:
pthread_cond_wait(&iter->second.releasequeue, &client_mutex);
break;
}
}
return lock_protocol::OK;
}

lock_protocol::status
lock_client_cache_rsm::release(lock_protocol::lockid_t lid)
{
return lock_protocol::OK;

int r;
lock_protocol::status ret = lock_protocol::OK;
std::map<lock_protocol::lockid_t, lock_entry>::iterator iter;
pthread_mutex_lock(&client_mutex);
iter = lockmap.find(lid);
if (iter == lockmap.end()) {
printf("ERROR: can't find lock with lockid = %d\n", lid);
return lock_protocol::NOENT;
}
if (iter->second.revoked) {
iter->second.state = RELEASING;
iter->second.revoked = false;
pthread_mutex_unlock(&client_mutex);
//for lab5, flush file extent from extent_client to extent_server
if (lu)
lu->dorelease(lid);

ret = rsmc->call(lock_protocol::release, lid, id, iter->second.xid, r);
pthread_mutex_lock(&client_mutex);
iter->second.state = NONE;
pthread_cond_broadcast(&iter->second.releasequeue);
pthread_mutex_unlock(&client_mutex);
return ret;
} else {
iter->second.state = FREE;
pthread_cond_signal(&iter->second.waitqueue);
pthread_mutex_unlock(&client_mutex);
return lock_protocol::OK;
}
}


rlock_protocol::status
lock_client_cache_rsm::revoke_handler(lock_protocol::lockid_t lid,
lock_protocol::xid_t xid, int &)
{
int ret = rlock_protocol::OK;
return ret;
int r;
int ret = rlock_protocol::OK;
std::map<lock_protocol::lockid_t, lock_entry>::iterator iter;
pthread_mutex_lock(&client_mutex);
iter = lockmap.find(lid);
if (iter == lockmap.end()) {
printf("ERROR: can't find lock with lockid = %d\n", lid);
return lock_protocol::NOENT;
}
VERIFY(iter->second.xid == xid);
if (iter->second.state == FREE) {
iter->second.state = RELEASING;
release_queue.enq(release_entry(lid, xid));
pthread_mutex_unlock(&client_mutex);

} else {
iter->second.revoked = true;
pthread_mutex_unlock(&client_mutex);
}

return ret;
}

rlock_protocol::status
lock_client_cache_rsm::retry_handler(lock_protocol::lockid_t lid,
lock_protocol::xid_t xid, int &)
{
int ret = rlock_protocol::OK;
return ret;
int ret = rlock_protocol::OK;
std::map<lock_protocol::lockid_t, lock_entry>::iterator iter;
pthread_mutex_lock(&client_mutex);
iter = lockmap.find(lid);
if (iter == lockmap.end()) {
printf("ERROR: can't find lock with lockid = %d\n", lid);
return lock_protocol::NOENT;
}
VERIFY(iter->second.xid == xid);
iter->second.retry = true;
pthread_cond_signal(&iter->second.retryqueue);
// pthread_cond_signal(&iter->second.waitqueue);
pthread_mutex_unlock(&client_mutex);
return ret;
}


28 changes: 28 additions & 0 deletions lock_client_cache_rsm.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,41 @@ class lock_client_cache_rsm;
// Clients that caches locks. The server can revoke locks using
// lock_revoke_server.
class lock_client_cache_rsm : public lock_client {
enum lock_state {NONE, FREE, LOCKED, ACQUIRING, RELEASING};
struct lock_entry {
bool revoked;
bool retry;
lock_state state;
lock_protocol::xid_t xid;
pthread_cond_t waitqueue;
pthread_cond_t releasequeue;
pthread_cond_t retryqueue;
lock_entry():revoked(false), retry(false), state(NONE), xid(0) {
VERIFY(pthread_cond_init(&waitqueue, NULL) == 0);
VERIFY(pthread_cond_init(&releasequeue, NULL) == 0);
VERIFY(pthread_cond_init(&retryqueue, NULL) == 0);
}
};

struct release_entry {
lock_protocol::lockid_t lid;
lock_protocol::xid_t xid;
release_entry(lock_protocol::lockid_t lid_ = 0,lock_protocol::xid_t xid_ = 0)
: lid(lid_), xid(xid_) {}
};

private:
rsm_client *rsmc;
class lock_release_user *lu;
int rlock_port;
std::string hostname;
std::string id;
lock_protocol::xid_t xid;

pthread_mutex_t client_mutex;
std::map<lock_protocol::lockid_t, lock_entry> lockmap;
fifo<release_entry> release_queue;

public:
static int last_port;
lock_client_cache_rsm(std::string xdst, class lock_release_user *l = 0);
Expand Down
Loading

0 comments on commit cd8d266

Please sign in to comment.