Skip to content

Commit

Permalink
Begin to clean up the 'stop' signalling among the threads -- try to r…
Browse files Browse the repository at this point in the history
…emove as many globals as possible
  • Loading branch information
mikebrady committed Jan 22, 2016
1 parent 628a5a2 commit 21fb39d
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 65 deletions.
60 changes: 43 additions & 17 deletions player.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ static int sampling_rate, frame_size;
static aes_context dctx;
#endif

static pthread_t player_thread;
//static pthread_t player_thread = NULL;
static int please_stop;
static int encrypted; // Normally the audio is encrypted, but it may not be

Expand Down Expand Up @@ -893,6 +893,13 @@ typedef struct stats { // statistics for running averages
} stats_t;

static void *player_thread_func(void *arg) {
int threads_stop = 0;
// create and start the timing, control and audio receiver threads
pthread_t rtp_audio_thread, rtp_control_thread, rtp_timing_thread;
pthread_create(&rtp_audio_thread, NULL, &rtp_audio_receiver, (void *)&threads_stop);
pthread_create(&rtp_control_thread, NULL, &rtp_control_receiver, (void *)&threads_stop);
pthread_create(&rtp_timing_thread, NULL, &rtp_timing_receiver, (void *)&threads_stop);

session_corrections = 0;
play_segment_reference_frame = 0; // zero signals that we are not in a play segment
// check that there are enough buffers to accommodate the desired latency and the latency offset
Expand Down Expand Up @@ -1219,8 +1226,22 @@ static void *player_thread_func(void *arg) {
}
}
}
if (config.output->stop)
config.output->stop();
free(outbuf);
free(silence);
debug(1,"Shut down audio, control and timing threads");

pthread_kill(rtp_audio_thread, SIGUSR1);
pthread_kill(rtp_control_thread, SIGUSR1);
pthread_kill(rtp_timing_thread, SIGUSR1);
pthread_join(rtp_timing_thread, NULL);
debug(1,"timing thread joined");
pthread_join(rtp_audio_thread, NULL);
debug(1,"audio thread joined");
pthread_join(rtp_control_thread, NULL);
debug(1,"control thread joined");
debug(1,"Player thread exit");
return 0;
}

Expand Down Expand Up @@ -1389,7 +1410,9 @@ void player_flush(uint32_t timestamp) {
#endif
}

int player_play(stream_cfg *stream) {
int player_play(stream_cfg *stream, pthread_t *player_thread) {
//if (*player_thread!=NULL)
// die("Trying to create a second player thread for this RTSP session");
packet_count = 0;
encrypted = stream->encrypted;
if (config.buffer_start_fill > BUFFER_FRAMES)
Expand Down Expand Up @@ -1434,23 +1457,26 @@ int player_play(stream_cfg *stream) {
rc = pthread_attr_setstacksize(&tattr, size);
if (rc)
debug(1, "Error setting stack size for player_thread: %s", strerror(errno));
pthread_create(&player_thread, &tattr, player_thread_func, NULL);
pthread_create(player_thread, &tattr, player_thread_func, NULL);
pthread_attr_destroy(&tattr);
return 0;
}

void player_stop(void) {
please_stop = 1;
pthread_cond_signal(&flowcontrol); // tell it to give up
pthread_join(player_thread, NULL);
#ifdef CONFIG_METADATA
send_ssnc_metadata('pend', NULL, 0, 1);
#endif
config.output->stop();
command_stop();
free_buffer();
free_decoder();
int rc = pthread_cond_destroy(&flowcontrol);
if (rc)
debug(1, "Error destroying condition variable.");
void player_stop(pthread_t *player_thread) {
//if (*thread==NULL)
// debug(1,"Trying to stop a non-existent player thread");
// else {
please_stop = 1;
pthread_cond_signal(&flowcontrol); // tell it to give up
pthread_join(*player_thread, NULL);
#ifdef CONFIG_METADATA
send_ssnc_metadata('pend', NULL, 0, 1);
#endif
command_stop();
free_buffer();
free_decoder();
int rc = pthread_cond_destroy(&flowcontrol);
if (rc)
debug(1, "Error destroying condition variable.");
// }
}
4 changes: 2 additions & 2 deletions player.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ typedef uint16_t seq_t;
// wrapped number between two seq_t.
int32_t seq_diff(seq_t a, seq_t b);

int player_play(stream_cfg *cfg);
void player_stop(void);
int player_play(stream_cfg *cfg, pthread_t *thread);
void player_stop(pthread_t *thread);

void player_volume(double f);
void player_flush(uint32_t timestamp);
Expand Down
70 changes: 36 additions & 34 deletions rtp.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ typedef struct time_ping_record {

// only one RTP session can be active at a time.
static int running = 0;
static int please_shutdown;

static char client_ip_string[INET6_ADDRSTRLEN]; // the ip string pointing to the client
static short client_ip_family; // AF_INET / AF_INET6
Expand All @@ -72,7 +71,7 @@ static SOCKADDR rtp_client_timing_socket; // a socket pointing to the timing po
static int audio_socket; // our local [server] audio socket
static int control_socket; // our local [server] control socket
static int timing_socket; // local timing socket
static pthread_t rtp_audio_thread, rtp_control_thread, rtp_timing_thread;
//static pthread_t rtp_audio_thread, rtp_control_thread, rtp_timing_thread;

static uint32_t reference_timestamp;
static uint64_t reference_timestamp_time;
Expand All @@ -96,16 +95,16 @@ static pthread_mutex_t reference_time_mutex = PTHREAD_MUTEX_INITIALIZER;

uint64_t static local_to_remote_time_difference; // used to switch between local and remote clocks

static void *rtp_audio_receiver(void *arg) {
void *rtp_audio_receiver(void *arg) {
// we inherit the signal mask (SIGUSR1)

int *stop = arg; // when set to 1, we should stop

int32_t last_seqno = -1;
uint8_t packet[2048], *pktp;

ssize_t nread;
while (1) {
if (please_shutdown)
break;
while (*stop==0) {
nread = recv(audio_socket, packet, sizeof(packet), 0);
if (nread < 0)
break;
Expand Down Expand Up @@ -158,17 +157,18 @@ static void *rtp_audio_receiver(void *arg) {
return NULL;
}

static void *rtp_control_receiver(void *arg) {
void *rtp_control_receiver(void *arg) {
// we inherit the signal mask (SIGUSR1)

int *stop = arg; // when set to 1, we should stop

reference_timestamp = 0; // nothing valid received yet
uint8_t packet[2048], *pktp;
struct timespec tn;
uint64_t remote_time_of_sync, local_time_now, remote_time_now;
uint32_t sync_rtp_timestamp, rtp_timestamp_less_latency;
ssize_t nread;
while (1) {
if (please_shutdown)
break;
while (*stop==0) {
nread = recv(control_socket, packet, sizeof(packet), 0);
local_time_now = get_absolute_time_in_fp();
// clock_gettime(CLOCK_MONOTONIC,&tn);
Expand Down Expand Up @@ -259,7 +259,8 @@ static void *rtp_control_receiver(void *arg) {
return NULL;
}

static void *rtp_timing_sender(void *arg) {
void *rtp_timing_sender(void *arg) {
int *stop = arg; // the parameter points to this request to stop thing
struct timing_request {
char leader;
char type;
Expand All @@ -280,9 +281,8 @@ static void *rtp_timing_sender(void *arg) {
time_ping_count = 0;

// we inherit the signal mask (SIGUSR1)
while (1) {
if (please_shutdown)
break;
while (*stop==0) {
// debug(1,"Send a timing request");

if (!running)
die("rtp_timing_sender called without active stream!");
Expand Down Expand Up @@ -314,12 +314,16 @@ static void *rtp_timing_sender(void *arg) {
return NULL;
}

static void *rtp_timing_receiver(void *arg) {
void *rtp_timing_receiver(void *arg) {
// we inherit the signal mask (SIGUSR1)

int *stop = arg; // when set to 1, we should stop

uint8_t packet[2048], *pktp;
ssize_t nread;
int request_stop = 0;
pthread_t timer_requester;
pthread_create(&timer_requester, NULL, &rtp_timing_sender, NULL);
pthread_create(&timer_requester, NULL, &rtp_timing_sender, (void *)&request_stop);
// struct timespec att;
uint64_t distant_receive_time, distant_transmit_time, arrival_time, return_time, transit_time,
processing_time;
Expand All @@ -331,9 +335,7 @@ static void *rtp_timing_receiver(void *arg) {
uint64_t first_local_to_remote_time_difference = 0;
uint64_t first_local_to_remote_time_difference_time;
uint64_t l2rtd = 0;
while (1) {
if (please_shutdown)
break;
while (*stop==0) {
nread = recv(timing_socket, packet, sizeof(packet), 0);
arrival_time = get_absolute_time_in_fp();
// clock_gettime(CLOCK_MONOTONIC,&att);
Expand Down Expand Up @@ -497,7 +499,8 @@ static void *rtp_timing_receiver(void *arg) {
}
}

debug(1, "Timing RTP thread interrupted. terminating.");
debug(1, "Timing thread interrupted. terminating.");
request_stop = 1;
void *retval;
pthread_kill(timer_requester, SIGUSR1);
pthread_join(timer_requester, &retval);
Expand Down Expand Up @@ -655,11 +658,10 @@ void rtp_setup(SOCKADDR *remote, int cport, int tport, uint32_t active_remote, i
debug(2, "listening for audio, control and timing on ports %d, %d, %d.", *lsport, *lcport,
*ltport);

please_shutdown = 0;
reference_timestamp = 0;
pthread_create(&rtp_audio_thread, NULL, &rtp_audio_receiver, NULL);
pthread_create(&rtp_control_thread, NULL, &rtp_control_receiver, NULL);
pthread_create(&rtp_timing_thread, NULL, &rtp_timing_receiver, NULL);
//pthread_create(&rtp_audio_thread, NULL, &rtp_audio_receiver, NULL);
//pthread_create(&rtp_control_thread, NULL, &rtp_control_receiver, NULL);
//pthread_create(&rtp_timing_thread, NULL, &rtp_timing_receiver, NULL);

running = 1;
request_sent = 0;
Expand All @@ -682,18 +684,18 @@ void clear_reference_timestamp(void) {

void rtp_shutdown(void) {
if (!running)
die("rtp_shutdown called without active stream!");
debug(1,"rtp_shutdown called without active stream!");

debug(2, "shutting down RTP thread");
please_shutdown = 1;
void *retval;
reference_timestamp = 0;
pthread_kill(rtp_audio_thread, SIGUSR1);
pthread_join(rtp_audio_thread, &retval);
pthread_kill(rtp_control_thread, SIGUSR1);
pthread_join(rtp_control_thread, &retval);
pthread_kill(rtp_timing_thread, SIGUSR1);
pthread_join(rtp_timing_thread, &retval);
clear_reference_timestamp();
// debug(1,"Shut down audio, control and timing threads");
// usleep(3000000); // hack
// pthread_kill(rtp_audio_thread, SIGUSR1);
// pthread_kill(rtp_control_thread, SIGUSR1);
// pthread_kill(rtp_timing_thread, SIGUSR1);
// pthread_join(rtp_audio_thread, &retval);
// pthread_join(rtp_control_thread, &retval);
// pthread_join(rtp_timing_thread, &retval);
running = 0;
}

Expand Down
4 changes: 4 additions & 0 deletions rtp.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

#include "player.h"

void *rtp_audio_receiver(void *arg);
void *rtp_control_receiver(void *arg);
void *rtp_timing_receiver(void *arg);

void rtp_setup(SOCKADDR *remote, int controlport, int timingport, uint32_t active_remote,
int *local_server_port, int *local_control_port, int *local_timing_port);
void rtp_shutdown(void);
Expand Down
25 changes: 13 additions & 12 deletions rtsp.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ typedef struct {
SOCKADDR remote;
int running;
pthread_t thread;
pthread_t player_thread;
} rtsp_conn_info;

#ifdef CONFIG_METADATA
Expand Down Expand Up @@ -426,7 +427,7 @@ static int msg_handle_line(rtsp_message **pmsg, char *line) {
*p = 0;
p += 2;
msg_add_header(msg, line, p);
debug(2, " %s: %s.", line, p);
// debug(2, " %s: %s.", line, p);
return -1;
} else {
char *cl = msg_get_header(msg, "Content-Length");
Expand Down Expand Up @@ -568,7 +569,7 @@ static void msg_write_response(int fd, rtsp_message *resp) {
p += n;

for (i = 0; i < resp->nheaders; i++) {
debug(2, " %s: %s.", resp->name[i], resp->value[i]);
// debug(3, " %s: %s.", resp->name[i], resp->value[i]);
n = snprintf(p, pktfree, "%s: %s\r\n", resp->name[i], resp->value[i]);
pktfree -= n;
p += n;
Expand Down Expand Up @@ -764,7 +765,7 @@ static void handle_setup(rtsp_conn_info *conn, rtsp_message *req, rtsp_message *
strcat(hdr, q); // should unsplice the timing port entry
}

player_play(&conn->stream);
player_play(&conn->stream,&conn->player_thread); // the tread better be null

char *resphdr = alloca(200);
*resphdr = 0;
Expand Down Expand Up @@ -1535,12 +1536,12 @@ static void *rtsp_conversation_thread_func(void *pconn) {
int method_selected = 0;
for (mh = method_handlers; mh->method; mh++) {
if (!strcmp(mh->method, req->method)) {
//debug(1,"RTSP Packet received of type \"%s\":",mh->method),
//msg_print_debug_headers(req);
// debug(1,"RTSP Packet received of type \"%s\":",mh->method),
// msg_print_debug_headers(req);
method_selected = 1;
mh->handler(conn, req, resp);
//debug(1,"RTSP Response:");
//msg_print_debug_headers(resp);
// debug(1,"RTSP Response:");
// msg_print_debug_headers(resp);
break;
}
}
Expand All @@ -1557,20 +1558,20 @@ static void *rtsp_conversation_thread_func(void *pconn) {
}
} while (reply != rtsp_read_request_response_shutdown_requested);

debug(1, "closing RTSP connection.");
debug(1, "Now closing RTSP connection.");
if (conn->fd > 0)
close(conn->fd);
if (rtsp_playing()) {
player_stop(&conn->player_thread); // might be less noisy doing this first
rtp_shutdown();
player_stop();
pthread_mutex_unlock(&play_lock);
please_shutdown = 0;
pthread_mutex_unlock(&playing_mutex);
}
if (auth_nonce)
free(auth_nonce);
conn->running = 0;
debug(2, "terminating RTSP thread.");
debug(2, "Now terminating RTSP conversation thread.");
please_shutdown = 0;
return NULL;
}

Expand Down Expand Up @@ -1699,7 +1700,7 @@ void rtsp_listen_loop(void) {
memset(conn, 0, sizeof(rtsp_conn_info));
socklen_t slen = sizeof(conn->remote);

debug(1, "new RTSP connection.");
debug(1, "New RTSP connection on port %d",config.port);
conn->fd = accept(acceptfd, (struct sockaddr *)&conn->remote, &slen);
if (conn->fd < 0) {
perror("failed to accept connection");
Expand Down

0 comments on commit 21fb39d

Please sign in to comment.