ChatBus : build your first multi-user chat room app with Erlang/OTP
In this tutorial I will build yet another messaging system using Erlang/OTP and ErlBus (we will talk about it later) and during this process explore the power of Erlang!
NOTE: The following assumes assumes the basic understanding of Erlang/OTP and web development.
The aim of this article is to build a multi-user chat room application but this idea sounds too boring ! So let's add a little creativity and call it ChatBus. Now you might wonder how did this name help. Hold on ! let me explain what ChatBus is about. We abstract the idea of a chat room to a bus and all the users in a chat room would be called hitchhikers since they don't have to pay any fare for riding this bus !
Before we dive in and starting coding like a mad dog ! we should decide what features we want ChatBus to have. This is normally known as requirement specifications in software development. Always make it rule of thumb to know exactly what to implement before you implement or soon the project will start to fall apart !
For our multi-user chat system we will have the following features:
- Allow users to create a new Bus (aka chat rooms) !
- Allow users to switch from one Bus to another ! (some Mission Impossible stuff right here)
- Dynamically update the list of available Buses and Hitchhikers riding the selected bus.
This completes the basic specifications. Now we move on to decide the tools and libraries we would need to implement this project.
Tools and Libraries
Polymer : For this app I used Polymer to develop the client side but you can choose to implement it in Bootstrap or whatever you like because discussing client side is out of scope for this discussion.
ErlBus : Message passing in Erlang is really easy but this project takes a step further and makes it even more easier. It allows user to create channels and subscribe listeners processes to them. This is all we need to implement ChatBus ! Following is quick example to show it’s power:
% Create anonymous function to act which which listen to a channel
% Notice that the function takes two arguments, first one is self
% first one is self explanatory and second one is some context
% (ctx) which you can set while spawning this function.F = fun({Channel, Msg}, Ctx) ->
io:format("[Pid: ~p][Channel: ~p][Msg: ~p][Ctx: ~p]~n",
[self(), Channel, Msg, Ctx])
end.
% Spawn this function as a process.
MH2 = ebus_handler:new(F, {my_ctx, <<"MH2">>}).
% Subscribe the spawned process to a channel (ch1)
% NOTE: the channel is created if it doesn't exist.
ebus:sub(ch1, [MH2]).
% Let's publish a message to 'ch1'
ebus:pub(ch1, "Hello!").
[Pid: <0.52.0>][Channel: ch1][Msg: "Hello!"][Ctx: {my_ctx,<<"MH2">>}]
I hope this makes it clear enough how to use ChatBus but if doesn't you can have a look some example on their GitHub project page.
Cowboy: We will use cowboy to implement our web server.
Rebar3 : We will use rebar3 as our build and release tool for this project.
Implementation
The project source of ChatBus is located here.
Firstly we will generate a template project using rebar3:
$ rebar3 new release chatbus
this will generate a template project. Execute to make sure that the project works fine:
$ rebar3 compile
$ rebar3 release
$ ./_build/default/rel/chatbus/bin/chatbus console
The last command should result in an erlang shell. The project should something as follows
├── apps
│ └── chatbus
│ └── src
│ ├── chatbus_app.erl
│ ├── chatbus.app.src
│ └── chatbus_sup.erl
├── _build
│ └── default
│ ├── lib
│ └── rel
├── config
│ ├── sys.config
│ └── vm.args
├── LICENSE
├── Makefile
├── README.md
├── rebar.config
└── rebar.loc
Firstly we add the dependencies needed for the project, change your rebar.config to look like this. We have the following deps there,
- cowboy: for creating a webserver that will allow clients to connect.
- lager : logging errors and stuff.
- mochiweb: to parsing json.
- erlbus: for creating channels and passing messages.
- sync: for development purposes. You can choose to exclude it but you will have to change the chatbus.app.src accordingly.
Now we create our web server using the cowboy. In your chatbus_app.erl file make the start function look at follows :
start(_StartType, _StartArgs) ->
ok = application:ensure_started(ebus),
Dispatch = cowboy_router:compile(
[{'_', [
{"/", cowboy_static, {priv_file, chatbus, "index.html"}},
{"/ws", ws_handler, []},
{"/[...]", cowboy_static, {priv_dir, chatbus, "./"}}]}]), {ok, _} = cowboy:start_http(http, 100, [{port, 9090}], [{env, [{dispatch, Dispatch}]}]),
'chatbus_sup':start_link().
Here we added a few routes. Our app will be a single page web app so we define where the static file “index.html” will be located and anyone hitting “/” will be served this “index.html” and other static files. Next we specify a route “/ws” which we will use to make a websocket connection, any request to this endpoint will handled by ws_handler.erl module. This websocket endpoint will invoked using javascript once the static files are loaded. Lastly we define the location for all the static files using “[…]”.
Now we will create a bus_listener.erl module which will subscribe to a channel, listen and broadcast messages,
-module(bus_listener).
-behaviour(ebus_handler). %% API
-export([handle_msg/2]). handle_msg({_Channel, {Sender, Type, Msg}}, User) ->
if
Sender =:= User -> ok;
true -> User ! {Type, Msg}
end.
If you go through the docs here the above module should be easy to understand but nevertheless I will explain this code but a little later.
Now we will create a websocket handler which will create websocket connection when the endpoint “/ws” is invoked. Recall that we specified the handler name as “ws_handler” in the routes so we create a module named “ws_handler.erl” with the following contents,
-module(ws_handler).-export([init/2]).
-export([websocket_handle/3]).
-export([websocket_info/3]).
-export([websocket_terminate/3]).
-export([send_active_channels/1]).init(Req, _Opts) ->
io:format(“connected !~n”),
%% subscribe to default bus
BusFd = ebus_handler:new(bus_listener, self()),
ok = ebus:sub(default, [BusFd]), %% send subscribes bus name
auto_send(<<”bus_subscribed”>>, default),
{cowboy_websocket, Req, #{bus => default
,bus_fd => BusFd
,hitchhicker => false}}.websocket_handle({text, Msg}, Req, #{bus := BusName
,bus_fd := BusFd
,hitchhicker := Hitchhicker} =
State) ->
{ok, {Type, Msg1}} = parse_message(Msg),
case Type of
<<”chat”>> ->
ok = ebus:pub(BusName, {self(), Type, Msg1}),
{ok, Req, State};
<<”bus_list”>> ->
{ok, List} = bus_manager:bus_list(),
{ok, Reply} = encode_message(<<”bus_list”>>, List),
{reply, {text, Reply}, Req, State};
<<”hitchhicker_list”>> ->
{ok, List} = bus_manager:get_hitchhickers(BusName),
{ok, Reply} = encode_message(<<”hitchhicker_list”>>, List),
{reply, {text, Reply}, Req, State};
<<”bus_subscribed”>> ->
BusName2 = erlang:binary_to_atom(Msg1, utf8),
ok = ebus:unsub(BusName, BusFd),
ok = ebus:sub(BusName2, [BusFd]),
{ok, Reply} = encode_message(<<”bus_subscribed”>>, BusName2),
{reply, {text, Reply}, Req, State#{bus => BusName2}};
<<”add_bus”>> ->
BusNewName = erlang:binary_to_atom(Msg1, utf8),
ok = ebus:unsub(BusName, BusFd),
ok = ebus:sub(BusNewName, [BusFd]), %% signal bus_manager to send all client list of
%% active buses
bus_manager:check_bus(BusName),
%% send message to client updating his bus
{ok, Reply} = encode_message(<<”bus_subscribed”>>,
BusNewName),
{reply, {text, Reply}, Req, State#{bus => BusNewName}};
<<”username”>> ->
%% check if username is assignable
case bus_manager:store_username(BusName, Msg1) of
{ok, error} ->
{ok, Reply} = encode_message( <<”username_error”>>, error),
{reply, {text, Reply}, Req, State};
_ ->
{ok, List} = bus_manager:get_hitchhickers(BusName),
ok = ebus:pub(BusName, {none,
<<”hitchhicker_list”>>, List}), {ok, Reply} = encode_message(<<”username”>>,
Msg1),
{reply, {text, Reply}, Req,
State#{hitchhicker => Msg1}}
end;
<<”terminate”>> ->
bus_manager:remove_hitchhicker(Hitchhicker),
ebus:unsub(BusName, BusFd),
ebus_handler:delete(BusFd),
{ok, List} = bus_manager:get_hitchhickers(BusName),
ok = ebus:pub(BusName, {none, <<”hitchhicker_list”>>,
List}),
{shutdown, Req, State};
_ ->
io:format(“unknown message type ~p~n”, [Type]),
{ok, Req, State}
end;websocket_handle(Data, Req, State) ->
io:format(“received ~p~n”, [Data]),
{ok, Req, State}.%% handle erlang messages
websocket_info({Type, Msg}, Req, State) ->
{ok, Reply} = encode_message(Type, Msg),
{reply, {text, Reply}, Req, State};websocket_info(Info, Req, State) ->
io:format(“[ws_info]: unknown message ~p~n”, [Info]),
{ok, Req, State}.websocket_terminate(_Reason, _Req, _State) ->
io:format(“[ws_info]: terminating websocket ~n”),
ok.%% ===============================================================
%% other exports
%% ===============================================================
send_active_channels(Channels) ->
lists:map(fun(Bus) ->
ok = ebus:pub(Bus, {none, <<”bus_list”>>, Channels})
end, Channels).%% ===============================================================
%% internal functions
%% ===============================================================
auto_send(Mtype, Msg) ->
%% send subscribes bus name
timer:send_after(10, self(), {Mtype, Msg}).parse_message(Msg) ->
{struct, Msg1} = mochijson2:decode(Msg),
{<<”type”>>, Type} = lists:keyfind(<<”type”>>, 1, Msg1),
{<<”msg”>>, Content} = lists:keyfind(<<”msg”>>, 1, Msg1),
{ok, {Type, Content}}.encode_message(Type, Msg) ->
Reply = {[{type, Type}, {msg, Msg}]},
{ok, iolist_to_binary(mochijson2:encode(Reply))}.
When the client invokes “/ws” endpoint the websocket will be created by invoking init/2 function. In this function two important things happen,
BusFd = ebus_handler:new(bus_listener, self()),
ok = ebus:sub(default, [BusFd]),
In the first line we spawn a new ebus handler process using bus_listener module. This handler process then subscribes to a channel called “default”. Now, whenever a message is sent on this channel handle_msg/2 from the bus_listener module is invoked for all the processes that have subscribed to this channel,
handle_msg({_Channel, {Sender, Type, Msg}}, User) ->
if
Sender =:= User -> ok;
true -> User ! {Type, Msg}
end.
The first argument to handle_msg/2 is a tuple containing channel name and message, the second argument is the one we passed to ebus_handler:new/2 while creating this process i.e. self().
Next, websocket_handle/3 function handles data from client. One can easily notice that the client send json objects which contain the message type and the message. Based on the message type we perform different actions for eg. message type “chat” is used to send a message on the channel which is accomplished using ebus:pub/2 , there are other message types which perform different functions like changing user’s name, adding new chat room, sending list of connected users etc.
With the above discussion I have tried to cover the basic code base of ChatBus that handles the messaging part. I encourage the readers to explore the code base and try to play around with it.
In conclusion, building a chat system with Erlang is damn easy :)