Skip to content

Commit

Permalink
worker
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidAlphaFox committed May 8, 2020
1 parent 533ff8e commit 36855d0
Show file tree
Hide file tree
Showing 6 changed files with 242 additions and 13 deletions.
22 changes: 19 additions & 3 deletions src/aiutp_dispatch.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
%%% Created : 6 May 2020 by David Gao <[email protected]>
%%%-------------------------------------------------------------------
-module(aiutp_dispatch).

-behaviour(gen_server).
-include("aiutp.hrl").

%% API
-export([start_link/1]).
Expand Down Expand Up @@ -90,9 +90,17 @@ handle_call(_Request, _From, State) ->
{noreply, NewState :: term(), Timeout :: timeout()} |
{noreply, NewState :: term(), hibernate} |
{stop, Reason :: term(), NewState :: term()}.
handle_cast({dispatch,Remote,_Payload},State)->
io:format("dispatch from remote: ~p~n",[Remote]),
handle_cast({dispatch,Remote,Payload},
#state{socket = Socket} = State)->
case aiutp_protocol:decode(Payload) of
{ok,Packet,TS,TSDiff,RecvTime} ->
handle_packet(Packet,TS,TSDiff,RecvTime,Remote,Socket);
{error,Reason}->
error_logger:info_report([decode_error,Reason]),
ok
end,
{noreply,State};

handle_cast(_Request, State) ->
{noreply, State}.

Expand Down Expand Up @@ -153,3 +161,11 @@ format_status(_Opt, Status) ->
%%%===================================================================
%%% Internal functions
%%%===================================================================
handle_packet(#packet{conn_id = ConnID} = Packet,
TS,TSDiff,RecvTime,Remote,Socket)->
case aiutp_conn_manager:lookup(Remote, ConnID) of
{error,not_exist} ->
aiutp_socket:incoming(Socket,Packet,{TS,TSDiff,RecvTime},Remote);
{ok,Worker} ->
aiutp_worker:incoming(Worker,Packet,{TS,TSDiff,RecvTime},Remote)
end.
12 changes: 10 additions & 2 deletions src/aiutp_socket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@
-module(aiutp_socket).

-behaviour(gen_server).

-include("aiutp.hrl").
%% API
-export([start_link/2]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, format_status/2]).
-export([incoming/4]).

-define(SERVER, ?MODULE).

Expand All @@ -27,6 +28,14 @@
%%%===================================================================
%%% API
%%%===================================================================
incoming(Socket,#packet{type = Type} = Packet,Timing,Remote)->
case Type of
st_reset -> ok;
st_syn ->
gen_server:cast(Socket,{syn,Packet,Timing,Remote});
_ ->
gen_server:cast(Socket,{reset,Packet,Timing,Remote})
end.

%%--------------------------------------------------------------------
%% @doc
Expand Down Expand Up @@ -111,7 +120,6 @@ handle_info({'EXIT',Dispatch,Reason},#state{dispatch = Dispatch} = State)->
{stop,Reason,State};
handle_info({udp, Socket, IP, InPortNo, Packet},
#state{socket = Socket,dispatch = Dispatch} = State)->
ok = inet:setopts(Socket, [{active,false}]),
aiutp_dispatch:dispatch(Dispatch,{IP,InPortNo}, Packet),
ok = inet:setopts(Socket, [{active,once}]),
{noreply,State};
Expand Down
2 changes: 1 addition & 1 deletion src/aiutp_socket_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ init([]) ->
start => {aiutp_socket,start_link,[]},
restart => temporary,
shutdown => 5000,
type => supervisor,
type => worker,
modules => [aiutp_socket]
},
{ok, {SupFlags, [Socket]}}.
Expand Down
21 changes: 14 additions & 7 deletions src/aiutp_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,17 @@ init([]) ->
modules => [aiutp_conn_manager]
},
SocketSup = #{id => aiutp_socket_sup,
start => {aiutp_socket_sup,start_link,[]},
restart => transient,
shutdown => 5000,
type => supervisor,
modules => [aiutp_socket_sup]
},
{ok, {SupFlags, [ConnManager,SocketSup]}}.
start => {aiutp_socket_sup,start_link,[]},
restart => transient,
shutdown => 5000,
type => supervisor,
modules => [aiutp_socket_sup]
},
WorkerSup = #{id => aiutp_woker_sup,
start => {aiutp_worker_sup,start_link,[]},
restart => transient,
shutdown => 5000,
type => supervisor,
modules => [aiutp_worker_sup]
},
{ok, {SupFlags, [ConnManager,SocketSup,WorkerSup]}}.
126 changes: 126 additions & 0 deletions src/aiutp_worker.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
%%%-------------------------------------------------------------------
%%% @author David Gao <[email protected]>
%%% @copyright (C) 2020, David Gao
%%% @doc
%%%
%%% @end
%%% Created : 8 May 2020 by David Gao <[email protected]>
%%%-------------------------------------------------------------------
-module(aiutp_worker).

-behaviour(gen_statem).

%% API
-export([start_link/1]).

%% gen_statem callbacks
-export([callback_mode/0, init/1, terminate/3, code_change/4]).
-export([idle/3]).

-define(SERVER, ?MODULE).

-record(data, {
socket :: pid(),
monitor :: reference()
}).

%%%===================================================================
%%% API
%%%===================================================================

%%--------------------------------------------------------------------
%% @doc
%% Creates a gen_statem process which calls Module:init/1 to
%% initialize. To ensure a synchronized start-up procedure, this
%% function does not return until Module:init/1 has returned.
%%
%% @end
%%--------------------------------------------------------------------
-spec start_link(pid()) ->
{ok, Pid :: pid()} |
ignore |
{error, Error :: term()}.
start_link(Socket) ->
gen_statem:start_link(?MODULE, [Socket], []).

%%%===================================================================
%%% gen_statem callbacks
%%%===================================================================

%%--------------------------------------------------------------------
%% @private
%% @doc
%% Define the callback_mode() for this callback module.
%% @end
%%--------------------------------------------------------------------
-spec callback_mode() -> gen_statem:callback_mode_result().
callback_mode() -> state_functions.

%%--------------------------------------------------------------------
%% @private
%% @doc
%% Whenever a gen_statem is started using gen_statem:start/[3,4] or
%% gen_statem:start_link/[3,4], this function is called by the new
%% process to initialize.
%% @end
%%--------------------------------------------------------------------
-spec init(Args :: term()) ->
gen_statem:init_result(atom()).
init([Socket]) ->
Monitor = erlang:monitor(process, Socket),
{ok, idle, #data{
socket = Socket,
monitor = Monitor
}}.

%%--------------------------------------------------------------------
%% @private
%% @doc
%% There should be one function like this for each state name.
%% Whenever a gen_statem receives an event, the function
%% with the name of the current state (StateName)
%% is called to handle the event.
%% @end
%%--------------------------------------------------------------------
-spec idle('enter',
OldState :: atom(),
Data :: term()) ->
gen_statem:state_enter_result('state_name');
(gen_statem:event_type(),
Msg :: term(),
Data :: term()) ->
gen_statem:event_handler_result(atom()).
idle({call,Caller}, _Msg, Data) ->
{next_state, idle, Data, [{reply,Caller,ok}]}.

%%--------------------------------------------------------------------
%% @private
%% @doc
%% This function is called by a gen_statem when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_statem terminates with
%% Reason. The return value is ignored.
%% @end
%%--------------------------------------------------------------------
-spec terminate(Reason :: term(), State :: term(), Data :: term()) ->
any().
terminate(_Reason, _State, _Data) ->
void.

%%--------------------------------------------------------------------
%% @private
%% @doc
%% Convert process state when code is changed
%% @end
%%--------------------------------------------------------------------
-spec code_change(
OldVsn :: term() | {down,term()},
State :: term(), Data :: term(), Extra :: term()) ->
{ok, NewState :: term(), NewData :: term()} |
(Reason :: term()).
code_change(_OldVsn, State, Data, _Extra) ->
{ok, State, Data}.

%%%===================================================================
%%% Internal functions
%%%===================================================================
72 changes: 72 additions & 0 deletions src/aiutp_worker_sup.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
%%%-------------------------------------------------------------------
%%% @author David Gao <[email protected]>
%%% @copyright (C) 2020, David Gao
%%% @doc
%%%
%%% @end
%%% Created : 6 May 2020 by David Gao <[email protected]>
%%%-------------------------------------------------------------------
-module(aiutp_worker_sup).

-behaviour(supervisor).

%% API
-export([start_link/0]).

%% Supervisor callbacks
-export([init/1]).
-export([new/1]).

-define(SERVER, ?MODULE).

%%%===================================================================
%%% API functions
%%%===================================================================
new(Socket)-> supervisor:start_child(?SERVER, [Socket]).

%%--------------------------------------------------------------------
%% @doc
%% Starts the supervisor
%% @end
%%--------------------------------------------------------------------
-spec start_link() -> {ok, Pid :: pid()} |
{error, {already_started, Pid :: pid()}} |
{error, {shutdown, term()}} |
{error, term()} |
ignore.
start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).

%%%===================================================================
%%% Supervisor callbacks
%%%===================================================================

%%--------------------------------------------------------------------
%% @private
%% @doc
%% Whenever a supervisor is started using supervisor:start_link/[2,3],
%% this function is called by the new process to find out about
%% restart strategy, maximum restart intensity, and child
%% specifications.
%% @end
%%--------------------------------------------------------------------
-spec init(Args :: term()) ->
{ok, {SupFlags :: supervisor:sup_flags(),
[ChildSpec :: supervisor:child_spec()]}} |
ignore.
init([]) ->
SupFlags = #{strategy => simple_one_for_one,
intensity => 1,
period => 5},
Worker = #{id => aiutp_worker,
start => {aiutp_worker,start_link,[]},
restart => temporary,
shutdown => 5000,
type => worker,
modules => [aiutp_worker]
},
{ok, {SupFlags, [Worker]}}.

%%%===================================================================
%%% Internal functions
%%%===================================================================

0 comments on commit 36855d0

Please sign in to comment.