Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial implementation of TURN TCP transport between client and server #111

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

xdyyll
Copy link
Contributor

@xdyyll xdyyll commented Jul 15, 2021

test code:

#include "juice/juice.h"

#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
#include <string.h>

#ifdef _WIN32
#include <windows.h>
static void sleep(unsigned int secs) { Sleep(secs * 1000); }
#else
#include <unistd.h> // for sleep
#endif

#define BUFFER_SIZE 4096

static juice_agent_t *agent1;
static juice_agent_t *agent2;

static void on_state_changed1(juice_agent_t *agent, juice_state_t state, void *user_ptr);
static void on_state_changed2(juice_agent_t *agent, juice_state_t state, void *user_ptr);

static void on_candidate1(juice_agent_t *agent, const char *sdp, void *user_ptr);
static void on_candidate2(juice_agent_t *agent, const char *sdp, void *user_ptr);

static void on_gathering_done1(juice_agent_t *agent, void *user_ptr);
static void on_gathering_done2(juice_agent_t *agent, void *user_ptr);

static void on_recv1(juice_agent_t *agent, const char *data, size_t size, void *user_ptr);
static void on_recv2(juice_agent_t *agent, const char *data, size_t size, void *user_ptr);

int main(int argc, char *argv[]) {
	//juice_set_log_level(JUICE_LOG_LEVEL_DEBUG);

	// Agent 1: Create agent
	juice_config_t config1;
	memset(&config1, 0, sizeof(config1));

	// TURN server
	// Please do not use outside of libjuice tests
	juice_turn_server_t turn_server;
	memset(&turn_server, 0, sizeof(turn_server));
	turn_server.host = "stun.ageneau.net";
	turn_server.port = 3478;
	turn_server.username = "juice_test";
	turn_server.password = "28245150316902";
	config1.turn_servers = &turn_server;
	config1.turn_servers_count = 1;

	config1.cb_state_changed = on_state_changed1;
	config1.cb_candidate = on_candidate1;
	config1.cb_gathering_done = on_gathering_done1;
	config1.cb_recv = on_recv1;
	config1.user_ptr = NULL;

	// Agent 2: Create agent
	juice_config_t config2;
	memset(&config2, 0, sizeof(config2));
	juice_turn_server_t turn_server2;
	memcpy(&turn_server2, &turn_server, sizeof(juice_turn_server_t));

	// Use the same TURN server
	config2.turn_servers = &turn_server2;
	config2.turn_servers_count = 1;

	config2.cb_state_changed = on_state_changed2;
	config2.cb_candidate = on_candidate2;
	config2.cb_gathering_done = on_gathering_done2;
	config2.cb_recv = on_recv2;
	config2.user_ptr = NULL;

	config1.turn_servers[0].transport = JUICE_TRANSPORT_TCP;
	//config1.turn_servers[0].allocate_transport = JUICE_TRANSPORT_TCP;
	config2.turn_servers[0].transport = JUICE_TRANSPORT_TCP;
	//config2.turn_servers[0].allocate_transport = JUICE_TRANSPORT_TCP;
	agent1 = juice_create(&config1);
	agent2 = juice_create(&config2);

	// Agent 1: Generate local description
	char sdp1[JUICE_MAX_SDP_STRING_LEN];
	juice_get_local_description(agent1, sdp1, JUICE_MAX_SDP_STRING_LEN);
	printf("Local description 1:\n%s\n", sdp1);

	// Agent 2: Receive description from agent 1
	juice_set_remote_description(agent2, sdp1);

	// Agent 2: Generate local description
	char sdp2[JUICE_MAX_SDP_STRING_LEN];
	juice_get_local_description(agent2, sdp2, JUICE_MAX_SDP_STRING_LEN);
	printf("Local description 2:\n%s\n", sdp2);

	// Agent 1: Receive description from agent 2
	juice_set_remote_description(agent1, sdp2);

	// Agent 1: Gather candidates (and send them to agent 2)
	juice_gather_candidates(agent1);

	// Agent 2: Gather candidates (and send them to agent 1)
	juice_gather_candidates(agent2);

	sleep(10);

	// -- Connection should be finished --
	bool success = true;

	/*
	    // Check states
	    juice_state_t state1 = juice_get_state(agent1);
	    juice_state_t state2 = juice_get_state(agent2);
	    bool success = (state1 == JUICE_STATE_COMPLETED && state2 == JUICE_STATE_COMPLETED);
	*/

	// Retrieve candidates
	char local[JUICE_MAX_CANDIDATE_SDP_STRING_LEN];
	char remote[JUICE_MAX_CANDIDATE_SDP_STRING_LEN];
	if (success &=
	    (juice_get_selected_candidates(agent1, local, JUICE_MAX_CANDIDATE_SDP_STRING_LEN, remote,
	                                   JUICE_MAX_CANDIDATE_SDP_STRING_LEN) == 0)) {
		printf("Local candidate  1: %s\n", local);
		printf("Remote candidate 1: %s\n", remote);
	}
	if (success &=
	    (juice_get_selected_candidates(agent2, local, JUICE_MAX_CANDIDATE_SDP_STRING_LEN, remote,
	                                   JUICE_MAX_CANDIDATE_SDP_STRING_LEN) == 0)) {
		printf("Local candidate  2: %s\n", local);
		printf("Remote candidate 2: %s\n", remote);
	}

	// Retrieve addresses
	char localAddr[JUICE_MAX_ADDRESS_STRING_LEN];
	char remoteAddr[JUICE_MAX_ADDRESS_STRING_LEN];
	if (success &= (juice_get_selected_addresses(agent1, localAddr, JUICE_MAX_ADDRESS_STRING_LEN,
	                                             remoteAddr, JUICE_MAX_ADDRESS_STRING_LEN) == 0)) {
		printf("Local address  1: %s\n", localAddr);
		printf("Remote address 1: %s\n", remoteAddr);
	}
	if (success &= (juice_get_selected_addresses(agent2, localAddr, JUICE_MAX_ADDRESS_STRING_LEN,
	                                             remoteAddr, JUICE_MAX_ADDRESS_STRING_LEN) == 0)) {
		printf("Local address  2: %s\n", localAddr);
		printf("Remote address 2: %s\n", remoteAddr);
	}

	sleep(300);


	// Agent 1: destroy
	juice_destroy(agent1);

	// Agent 2: destroy
	juice_destroy(agent2);

	// Sleep so we can check destruction went well
	sleep(2);

	if (success) {
		printf("Success\n");
		return 0;
	} else {
		printf("Failure\n");
		return -1;
	}
}

// Agent 1: on state changed
static void on_state_changed1(juice_agent_t *agent, juice_state_t state, void *user_ptr) {
	printf("State 1: %s\n", juice_state_to_string(state));

	if (state == JUICE_STATE_CONNECTED) {
		// Agent 1: on connected, send a message
		const char *message = "Hello from 1";
		juice_send(agent, message, strlen(message));
	}
}

// Agent 2: on state changed
static void on_state_changed2(juice_agent_t *agent, juice_state_t state, void *user_ptr) {
	printf("State 2: %s\n", juice_state_to_string(state));
	if (state == JUICE_STATE_CONNECTED) {
		// Agent 2: on connected, send a message
		const char *message = "Hello from 2";
		juice_send(agent, message, strlen(message));
	}
}

// Agent 1: on local candidate gathered
static void on_candidate1(juice_agent_t *agent, const char *sdp, void *user_ptr) {
	printf("Candidate 1: %s\n", sdp);

	// Filter relayed candidates
	if (!strstr(sdp, "relay"))
		return;

	// Agent 2: Receive it from agent 1
	juice_add_remote_candidate(agent2, sdp);
}

// Agent 2: on local candidate gathered
static void on_candidate2(juice_agent_t *agent, const char *sdp, void *user_ptr) {
	printf("Candidate 2: %s\n", sdp);

	// Filter relayed candidates
	if (!strstr(sdp, "relay"))
		return;

	// Agent 1: Receive it from agent 2
	juice_add_remote_candidate(agent1, sdp);
}

// Agent 1: on local candidates gathering done
static void on_gathering_done1(juice_agent_t *agent, void *user_ptr) {
	printf("Gathering done 1\n");
	juice_set_remote_gathering_done(agent2); // optional
}

// Agent 2: on local candidates gathering done
static void on_gathering_done2(juice_agent_t *agent, void *user_ptr) {
	printf("Gathering done 2\n");
	juice_set_remote_gathering_done(agent1); // optional
}

// Agent 1: on message received
static void on_recv1(juice_agent_t *agent, const char *data, size_t size, void *user_ptr) {
	char buffer[BUFFER_SIZE];
	if (size > BUFFER_SIZE - 1)
		size = BUFFER_SIZE - 1;
	memcpy(buffer, data, size);
	buffer[size] = '\0';
	printf("Received 1: %s\n", buffer);
}

// Agent 2: on message received
static void on_recv2(juice_agent_t *agent, const char *data, size_t size, void *user_ptr) {
	char buffer[BUFFER_SIZE];
	if (size > BUFFER_SIZE - 1)
		size = BUFFER_SIZE - 1;
	memcpy(buffer, data, size);
	buffer[size] = '\0';
	printf("Received 2: %s\n", buffer);
}

Copy link
Owner

@paullouisageneau paullouisageneau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a coupe minor problems, but overall it looks good, thanks!

I noticed you closed and reopened the PR, it should not be needed, just push your changes on your branch and the tests will be run again.


return agent_direct_send(agent, &entry->record, buffer, size, ds);
if (agent_direct_send2(agent, entry, buffer, size, ds) < 0) {
JLOG_WARN("STUN message send failed, errno=%d", sockerrno);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cleaned up the sockerrno after agent_direct_send() as it's a bit clumsy (the function could internally make a last system call modifying errno), so you should do the same here.


return agent_direct_send(agent, &entry->record, buffer, len, ds);
if (agent_direct_send2(agent, entry, buffer, len, ds) < 0) {
JLOG_WARN("ChannelData message send failed, errno=%d", sockerrno);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here about sockerrno.

socket_config.port_begin = agent->config.local_port_range_begin;
socket_config.port_end = agent->config.local_port_range_end;
switch (turn_server->transport) {
case JUICE_TRANSPORT_UDP:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does UDP transport mean for the user? If I get it correctly, it opens a dedicated UDP socket to reach the server instead of using the one of the agent. What is the point of this mechanism?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my mind, the ideal is to replace the udp .c completely with transport .c, UDP, TCP and TLS are supported.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. While this is smart for factoring the code, it will cause issues, for instance reflexive candidates from TURN UDP servers will be incorrect, as they'll reflect the external address for the transport UDP socket instead of the agent one. I think the only way to keep them working is limiting the transport to TCP and use the agent socket for UDP, sadly.

}
JLOG_DEBUG("Leaving agent thread");
agent_change_state(agent, JUICE_STATE_DISCONNECTED);
mutex_unlock(&agent->mutex);
}

int agent_recv(juice_agent_t *agent) {
int agent_recv(juice_agent_t *agent, agent_stun_entry_t *entry) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There could be a dedicated function to receive from entry transport as the logic looks quite decorelated.

if (agent_add_local_reflexive_candidate(agent, ICE_CANDIDATE_TYPE_SERVER_REFLEXIVE,
&msg->mapped)) {
JLOG_WARN("Failed to add local peer reflexive candidate from TURN mapped address");
if (entry->transport != JUICE_TRANSPORT_TCP) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that if the entry uses a secondary UDP socket for some reason, the reflexive candidate is useless anyway.

@@ -508,6 +508,39 @@ bool is_stun_datagram(const void *data, size_t size) {
return true;
}

bool is_stun_datagram2(const void *data, size_t size) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference with is_stun_datagram?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference with is_stun_datagram?

TCP a stream-oriented transmission protocol, more than one packet may be received in one recv() call.

STUN Packet + STUN Packet + ...
STUN Packet + Non STUN Packet + ...
...

@@ -815,6 +928,9 @@ int agent_bookkeeping(juice_agent_t *agent, timestamp_t *next_timestamp) {
continue;

if (entry->retransmissions >= 0) {
if (entry->sock != INVALID_SOCKET && entry->transport == JUICE_TRANSPORT_TCP)
transport_wait_for_connected(entry->sock, NULL);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think blocking the whole thread until the transport connects is not acceptable, it would be better to asynchronously wait for the connection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. I'll think about ways to do it asynchronously.

return false;
}

int transport_get_addrs(socket_t sock, addr_record_t *records, size_t count) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a lot of functions copied from udp.c that you could clean up. This one should be, for sure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Until the new transport.c is complete,do not break other code.

@@ -436,6 +437,35 @@ int agent_direct_send(juice_agent_t *agent, const addr_record_t *dst, const char
return ret;
}

int agent_direct_send2(juice_agent_t *agent, const agent_stun_entry_t *entry, const char *data,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name agent_direct_send2 is not very explicit, is there a reason for not modifying agent_direct_send() since they have the same signature?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I modify the parameters of the function, but I don't want to break other code that relies on the original function

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, you changed dst to entry, my bad. You could call it something more explicit then, like agent_transport_send.

@xdyyll
Copy link
Contributor Author

xdyyll commented Jul 16, 2021

A lot work to do, for TCP TURN, Close this PR first.

@xdyyll xdyyll closed this Jul 16, 2021
@paullouisageneau
Copy link
Owner

Please continue working on the same PR as otherwise the messages history is kind of lost. It's totally fine to keep the PR open as you work on it and push changes.

@xdyyll
Copy link
Contributor Author

xdyyll commented Jul 16, 2021

Please continue working on the same PR as otherwise the messages history is kind of lost. It's totally fine to keep the PR open as you work on it and push changes.

Okay, I reopen this PR now, and I will continue working on it. thx.

@Sean-Der
Copy link
Contributor

Hi @xdyyll can you give me access to your fork and I can finish this PR? Or I can start a new PR

@@ -72,6 +79,7 @@ typedef struct juice_turn_server {
const char *username;
const char *password;
uint16_t port;
juice_transport_t transport;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an issue now as it breaks the library interface and would require releasing a new major version.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants