Skip to content

Commit

Permalink
Merge pull request #73 from gausby/next-release
Browse files Browse the repository at this point in the history
v0.8.1
  • Loading branch information
gausby committed Aug 25, 2018
2 parents 91bc41f + a302fc4 commit 527fca2
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 3 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@

## [Unreleased]

## 0.8.1 - 2018-08-25

### Added

- `disconnect/1` has been added to the `Tortoise.Connection`
module. Given a `client_id` it will close down the connection
cleanly; in flight messages will get canceled and a disconnect
package will be send to the broker.

## 0.8.0 - 2018-08-19

### Changed
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ dependencies in `mix.exs`:
```elixir
def deps do
[
{:tortoise, "~> 0.8.0"}
{:tortoise, "~> 0.8.1"}
]
end
```
Expand Down
22 changes: 22 additions & 0 deletions lib/tortoise/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,23 @@ defmodule Tortoise.Connection do
%{
id: Keyword.get(opts, :name, __MODULE__),
start: {__MODULE__, :start_link, [opts]},
restart: Keyword.get(opts, :restart, :transient),
type: :worker
}
end

@doc """
Close the connection to the broker.
Given the `client_id` of a running connection it will cancel the
inflight messages and send the proper disconnect message to the
broker. The session will get terminated on the server.
"""
@spec disconnect(client_id()) :: :ok
def disconnect(client_id) do
GenServer.call(via_name(client_id), :disconnect)
end

@doc """
Return the list of subscribed topics.
Expand Down Expand Up @@ -237,6 +250,7 @@ defmodule Tortoise.Connection do
@impl true
def terminate(_reason, state) do
:ok = Tortoise.Registry.delete_meta(via_name(state.connect.client_id))
:ok = Events.dispatch(state.client_id, :status, :terminated)
:ok
end

Expand Down Expand Up @@ -351,6 +365,14 @@ defmodule Tortoise.Connection do
{:reply, state.subscriptions, state}
end

def handle_call(:disconnect, from, state) do
:ok = Events.dispatch(state.client_id, :status, :terminating)
:ok = Inflight.drain(state.client_id)
:ok = Controller.stop(state.client_id)
:ok = GenServer.reply(from, :ok)
{:stop, :shutdown, state}
end

@impl true
def handle_cast({:subscribe, {caller_pid, ref}, subscribe, opts}, state) do
client_id = state.connect.client_id
Expand Down
38 changes: 38 additions & 0 deletions lib/tortoise/connection/inflight.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ defmodule Tortoise.Connection.Inflight do
GenStateMachine.stop(via_name(client_id))
end

@doc false
def drain(client_id) do
GenStateMachine.call(via_name(client_id), :drain)
end

@doc false
def track(client_id, {:incoming, %Package.Publish{qos: qos} = publish})
when qos in 1..2 do
Expand Down Expand Up @@ -165,6 +170,10 @@ defmodule Tortoise.Connection.Inflight do
end

# possible duplicate
def handle_event(:cast, {:incoming, _}, :draining, %State{}) do
:keep_state_and_data
end

def handle_event(
:cast,
{:incoming, %Package.Publish{identifier: identifier, dup: true} = publish},
Expand All @@ -187,6 +196,11 @@ defmodule Tortoise.Connection.Inflight do
end
end

def handle_event(:cast, {:outgoing, {pid, ref}, _}, :draining, data) do
send(pid, {{Tortoise, data.client_id}, ref, {:error, :terminating}})
:keep_state_and_data
end

def handle_event(:cast, {:outgoing, caller, package}, _state, data) do
{:ok, package} = assign_identifier(package, data.pending)
track = Track.create({:negative, caller}, package)
Expand All @@ -205,6 +219,10 @@ defmodule Tortoise.Connection.Inflight do
end

# update
def handle_event(:cast, {:update, _}, :draining, _data) do
:keep_state_and_data
end

def handle_event(
:cast,
{:update, {_, %{identifier: identifier}} = update},
Expand Down Expand Up @@ -261,6 +279,10 @@ defmodule Tortoise.Connection.Inflight do
:keep_state_and_data
end

def handle_event(:internal, {:execute, _}, :draining, _) do
:keep_state_and_data
end

def handle_event(
:internal,
{:execute, %Track{pending: [[{:dispatch, package}, _] | _]} = track},
Expand Down Expand Up @@ -296,6 +318,22 @@ defmodule Tortoise.Connection.Inflight do
end
end

def handle_event({:call, from}, :drain, {:connected, {transport, socket}}, %State{} = data) do
for {_, %Track{polarity: :negative, caller: {pid, ref}}} <- data.pending do
send(pid, {{Tortoise, data.client_id}, ref, {:error, :canceled}})
end

data = %State{data | pending: %{}, order: []}
disconnect = %Package.Disconnect{}

case apply(transport, :send, [socket, Package.encode(disconnect)]) do
:ok ->
:ok = transport.close(socket)
reply = {:reply, from, :ok}
{:next_state, :draining, data, reply}
end
end

# helpers ------------------------------------------------------------
defp handle_next(
%Track{pending: [[_, :cleanup]], identifier: identifier},
Expand Down
5 changes: 5 additions & 0 deletions lib/tortoise/handler/logger.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ defmodule Tortoise.Handler.Logger do
{:ok, state}
end

def connection(:terminating, state) do
Logger.warn("Connection is terminating")
{:ok, state}
end

@impl true
def subscription(:up, topic, state) do
Logger.info("Subscribed to #{topic}")
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Tortoise.MixProject do
use Mix.Project

@version "0.8.0"
@version "0.8.1"

def project do
[
Expand Down
25 changes: 25 additions & 0 deletions test/tortoise/connection/inflight_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -238,4 +238,29 @@ defmodule Tortoise.Connection.InflightTest do
assert_receive {{Tortoise, ^client_id}, ^ref, {:error, :canceled}}
end
end

describe "draining" do
setup [:setup_connection, :setup_inflight]

test "cancel outgoing inflight packages", %{client_id: client_id} = context do
publish = %Package.Publish{identifier: 1, topic: "foo", qos: 1}
{:ok, ref} = Inflight.track(client_id, {:outgoing, publish})
# the publish should get dispatched
expected = publish |> Package.encode() |> IO.iodata_to_binary()
assert {:ok, ^expected} = :gen_tcp.recv(context.server, byte_size(expected), 500)
# start draining
:ok = Inflight.drain(client_id)
# updates should have no effect at this point
:ok = Inflight.update(client_id, {:received, %Package.Puback{identifier: 1}})
# the calling process should get a result response
assert_receive {{Tortoise, ^client_id}, ^ref, {:error, :canceled}}
# Now the inflight manager should be in the draining state, new
# outbound messages should not get accepted
{:ok, ref} = Inflight.track(client_id, {:outgoing, publish})
assert_receive {{Tortoise, ^client_id}, ^ref, {:error, :terminating}}
# the remote should receive a disconnect package
expected = %Package.Disconnect{} |> Package.encode() |> IO.iodata_to_binary()
assert {:ok, ^expected} = :gen_tcp.recv(context.server, byte_size(expected), 500)
end
end
end
34 changes: 33 additions & 1 deletion test/tortoise/connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ defmodule Tortoise.ConnectionTest do
]

assert {:ok, _pid} = Connection.start_link(opts)
assert_receive {ScriptedMqttServer, {:received, ^connect}}
assert_receive {ScriptedMqttServer, {:received, ^connect}}, 5000
assert_receive {ScriptedMqttServer, :completed}
end

Expand Down Expand Up @@ -586,4 +586,36 @@ defmodule Tortoise.ConnectionTest do
assert_receive {ScriptedMqttServer, :completed}
end
end

describe "life-cycle" do
setup [:setup_scripted_mqtt_server]

test "connect and cleanly disconnect", context do
Process.flag(:trap_exit, true)
client_id = context.client_id

connect = %Package.Connect{client_id: client_id}
expected_connack = %Package.Connack{status: :accepted, session_present: false}
disconnect = %Package.Disconnect{}

script = [{:receive, connect}, {:send, expected_connack}, {:receive, disconnect}]

{:ok, {ip, port}} = ScriptedMqttServer.enact(context.scripted_mqtt_server, script)

opts = [
client_id: client_id,
server: {Tortoise.Transport.Tcp, [host: ip, port: port]},
handler: {Tortoise.Handler.Default, []}
]

assert {:ok, pid} = Connection.start_link(opts)
assert_receive {ScriptedMqttServer, {:received, ^connect}}

assert :ok = Tortoise.Connection.disconnect(client_id)
assert_receive {ScriptedMqttServer, {:received, ^disconnect}}
assert_receive {:EXIT, ^pid, :shutdown}

assert_receive {ScriptedMqttServer, :completed}
end
end
end

0 comments on commit 527fca2

Please sign in to comment.