Skip to content

Commit

Permalink
add UNTESTED code to manage a thread-safe producer consumer buffer an…
Browse files Browse the repository at this point in the history
…d memory management with reference counting
  • Loading branch information
mikebrady committed Mar 4, 2015
1 parent 391b48f commit 5ce1337
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 13 deletions.
4 changes: 2 additions & 2 deletions metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -295,14 +295,14 @@ void metadata_process(uint32_t type,uint32_t code,char *data,uint32_t length) {
char thestring[1024];
snprintf(thestring,1024,"<type>%x</type><code>%x</code><length>%u</length>\n",type,code,length);
ret = write(fd, thestring, strlen(thestring));
if (ret < 1) // possibly the pipe is running out of memory becasue the reader is too slow
if (ret < 1) // possibly the pipe is running out of memory because the reader is too slow
debug(1,"Error writing to pipe");
if (length>0) {
snprintf(thestring,1024,"<data encoding=\"base64\">\n");
ret = write(fd, thestring, strlen(thestring));
if (ret < 1) // no reader
debug(1,"Error writing to pipe");
// here, we write the data in base64 form using the crappy base64 encoder provided
// here, we write the data in base64 form using our nice base64 encoder
// but, we break it into lines of 76 output characters, except for the last one.
// thus, we send groups of (76/4)*3 = 57 bytes to the encoder at a time
size_t remaining_count = length;
Expand Down
174 changes: 163 additions & 11 deletions rtsp.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ enum rtsp_read_request_response {
// Mike Brady's part...
static pthread_mutex_t play_lock = PTHREAD_MUTEX_INITIALIZER;

// every time we want to retain or release a reference count, lock it with this
// if a reference count is read as zero, it means the it's being deallocated.
static pthread_mutex_t reference_counter_lock = PTHREAD_MUTEX_INITIALIZER;


// only one thread is allowed to use the player at once.
// it monitors the request variable (at least when interrupted)
Expand All @@ -87,6 +91,127 @@ typedef struct {
pthread_t thread;
} rtsp_conn_info;


typedef struct {
pthread_mutex_t pc_queue_lock;
pthread_cond_t pc_queue_item_added_signal;
pthread_cond_t pc_queue_item_removed_signal;
size_t item_size; // number of bytes in each item
uint32_t count; // number of items in the queue
uint32_t capacity; // maximum number of items
uint32_t toq; // first item to take
uint32_t eoq; // free space at end of queue
void * qbase; // base address of actual queue
} pc_queue; // producer-consumer queue

pc_queue* pc_queue_create(size_t new_item_size, uint32_t number_of_items) {
pc_queue* the_queue = malloc(sizeof(pc_queue)+number_of_items*new_item_size-sizeof(void*));
if (the_queue) {
int rc = pthread_mutex_init(&the_queue->pc_queue_lock,NULL);
if (rc)
debug(1,"Error %d creating pc_queue lock",rc);
rc = pthread_cond_init(&the_queue->pc_queue_item_added_signal,NULL);
if (rc)
debug(1,"Error %d creating pc_queue add cond",rc);
rc = pthread_cond_init(&the_queue->pc_queue_item_removed_signal,NULL);
if (rc)
debug(1,"Error %d creating pc_queue remove cond",rc);
the_queue->item_size = new_item_size;
the_queue->count = 0;
the_queue->capacity = number_of_items;
the_queue->toq = 0;
the_queue->eoq = 0;
}
return the_queue;
}

int pc_queue_delete(pc_queue* the_queue) {
if (the_queue) {
int rc = pthread_mutex_destroy(&the_queue->pc_queue_lock);
if (rc)
debug(1,"Error %d deleting pc_queue lock",rc);
rc = pthread_cond_destroy(&the_queue->pc_queue_item_added_signal);
if (rc)
debug(1,"Error %d deleting pc_queue add cond",rc);
rc = pthread_cond_destroy(&the_queue->pc_queue_item_removed_signal);
if (rc)
debug(1,"Error %d deleting pc_queue remove cond",rc);
free(the_queue);
} else {
debug(1,"Attempting to delete a NULL pc_queue!");
}
return 0;
}

int pc_queue_add_item(pc_queue* the_queue,const void* the_stuff) {
int rc;
if (the_queue) {
rc = pthread_mutex_lock(&the_queue->pc_queue_lock);
if (rc)
debug(1,"Error locking for pc_queue_add_item");
while(the_queue->count==the_queue->capacity) {
rc = pthread_cond_wait(&the_queue->pc_queue_item_removed_signal,&the_queue->pc_queue_lock);
if (rc)
debug(1,"Error waiting for item to be removed");
}
uint32_t i = the_queue->eoq;
void * p = &the_queue->qbase + the_queue->item_size*the_queue->eoq;
memcpy(p,the_stuff,the_queue->item_size);

// update the pointer
i++;
if (i==the_queue->capacity)
// fold pointer if necessary
i=0;
the_queue->eoq = i;
the_queue->count++;
rc = pthread_mutex_unlock(&the_queue->pc_queue_lock);
if (rc)
debug(1,"Error unlocking for pc_queue_add_item");
rc = pthread_cond_signal(&the_queue->pc_queue_item_added_signal);
if (rc)
debug(1,"Error signalling after pc_queue_add_item");
} else {
debug(1,"Adding an item to a NULL queue");
}
return 0;
}

int pc_queue_get_item(pc_queue* the_queue,void* the_stuff) {
int rc;
if (the_queue) {
rc = pthread_mutex_lock(&the_queue->pc_queue_lock);
if (rc)
debug(1,"Error locking for pc_queue_add_item");
while(the_queue->count==0) {
rc = pthread_cond_wait(&the_queue->pc_queue_item_added_signal,&the_queue->pc_queue_lock);
if (rc)
debug(1,"Error waiting for item to be added");
}
uint32_t i = the_queue->toq;
void * p = &the_queue->qbase + the_queue->item_size*the_queue->toq;
memcpy(the_stuff,p,the_queue->item_size);

// update the pointer
i++;
if (i==the_queue->capacity)
// fold pointer if necessary
i=0;
the_queue->toq = i;
the_queue->count--;
rc = pthread_mutex_unlock(&the_queue->pc_queue_lock);
if (rc)
debug(1,"Error unlocking for pc_queue_get_item");
rc = pthread_cond_signal(&the_queue->pc_queue_item_removed_signal);
if (rc)
debug(1,"Error signalling after pc_queue_removed_item");
} else {
debug(1,"Removing an item from a NULL queue");
}
return 0;
}


// determine if we are the currently playing thread
static inline int rtsp_playing(void) {
if (pthread_mutex_trylock(&playing_mutex)) {
Expand Down Expand Up @@ -188,10 +313,24 @@ typedef struct {
int respcode;
} rtsp_message;

static void msg_retain(rtsp_message * msg) {
if (msg) {
int rc = pthread_mutex_lock(&reference_counter_lock);
if (rc)
debug(1,"Error %d locking reference counter lock");
msg->referenceCount++;
rc = pthread_mutex_unlock(&reference_counter_lock);
if (rc)
debug(1,"Error %d unlocking reference counter lock");
} else {
debug(1,"null rtsp_message pointer passed to retain");
}
}

static rtsp_message * msg_init(void) {
rtsp_message *msg = malloc(sizeof(rtsp_message));
memset(msg, 0, sizeof(rtsp_message));
msg->referenceCount = 1;
msg->referenceCount = 1; // from now on, any access to this must be protected with the lock
return msg;
}

Expand Down Expand Up @@ -224,18 +363,31 @@ static void msg_print_debug_headers(rtsp_message *msg) {
}

static void msg_free(rtsp_message *msg) {
if (--(msg->referenceCount)<=0) {
int i;
for (i=0; i<msg->nheaders; i++) {
free(msg->name[i]);
free(msg->value[i]);
}
if (msg->content)

if (msg) {
int rc = pthread_mutex_lock(&reference_counter_lock);
if (rc)
debug(1,"Error %d locking reference counter lock during msg_free()",rc);
msg->referenceCount--;
rc = pthread_mutex_unlock(&reference_counter_lock);
if (rc)
debug(1,"Error %d unlocking reference counter lock during msg_free()",rc);
if (msg->referenceCount==0) {
debug(1,"Freeing...");
int i;
for (i=0; i<msg->nheaders; i++) {
free(msg->name[i]);
free(msg->value[i]);
}
if (msg->content)
free(msg->content);
free(msg);
free(msg);
} else {
debug(1,"rtsp_message reference count non-zero: %d!",msg->referenceCount);
}
} else {
debug(1,"rtsp_message reference count non-zero!");
}
debug(1,"null rtsp_message pointer passed to msg_free()");
}
}


Expand Down

0 comments on commit 5ce1337

Please sign in to comment.