Skip to content

Commit

Permalink
Merge pull request #54 from gausby/store-connection-in-ets
Browse files Browse the repository at this point in the history
Store connection socket in ETS
  • Loading branch information
gausby committed Jul 29, 2018
2 parents 8cd7c3f + eb989bb commit 255d1e0
Show file tree
Hide file tree
Showing 24 changed files with 660 additions and 530 deletions.
52 changes: 52 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,57 @@
# Changelog

## 0.6.0 - 2018-07-29

### Changed

- Keep the active connection in a ETS based registry, allowing
processes that publish messages to the wire, using either
`Tortoise.publish/4` or `Tortoise.Pipe.publish/4` to obtain the
network connection and shoot the message directly on the network
socket. With this change we can also error out if a process tries to
send on a non-existing connection.

- The implementation for `Tortoise.publish/4` and
`Tortoise.publish_sync/4` has been moved to the `Tortoise` module
itself and is therefore no longer delegated to the
`Tortoise.Connection` module. This changes the interface a bit but
makes for a cleaner interface.

### Added

- This release introduces a `Tortoise.Events` module that implements a
`Registry` based PubSub. Tortoise will use this to dispatch system
events to listeners. For now it is used to dispatch new network
connections, which is currently used by `Tortoise.publish`, the
`Tortoise.Connection.Inflight` process, and the `Tortoise.Pipe`s. In
the future we might add more message types to `Tortoise.Events`.

- Tests for `Tortoise.publish/4` and `Tortoise.publish_sync/4`

### Removed

- The `Tortoise.Connection.Transmitter` process is no longer needed,
so it has been removed.

- Some dead code in the `Tortoise.Connection.Inflight` module has been
removed. This should not change anything user facing.

### Fixed

- A server sending a ping request to a client is now considered a
protocol violation, as specified by both the MQTT 3.1.1 and MQTT 5
specifications.

- The connection process will now cancel the keep alive timer if it
goes offline. Previously it would terminate itself because it would
not get the ping response from the server.

- Regression: The receiver will no longer crash on an assertion when
it request a reconnect from the `Tortoise.Connection` process.

- The specified Elixir version in the mix.exs file should now allow
more versions of Elixir without warnings.

## 0.5.1 - 2018-07-23

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ dependencies in `mix.exs`:
```elixir
def deps do
[
{:tortoise, "~> 0.5.1"}
{:tortoise, "~> 0.6.0"}
]
end
```
Expand Down
58 changes: 53 additions & 5 deletions lib/tortoise.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ defmodule Tortoise do
approach should be fast and efficient.
"""

alias Tortoise.Package
alias Tortoise.Connection
alias Tortoise.Connection.Inflight

@doc """
Publish a message to the MQTT broker.
Expand Down Expand Up @@ -128,9 +132,30 @@ defmodule Tortoise do
with `Tortoise` so it is easy to see where the message originated
from.
"""

defdelegate publish(client_id, topic, payload \\ nil, opts \\ []),
to: Tortoise.Connection
def publish(client_id, topic, payload \\ nil, opts \\ []) do
qos = Keyword.get(opts, :qos, 0)

publish = %Package.Publish{
topic: topic,
qos: qos,
payload: payload,
retain: Keyword.get(opts, :retain, false)
}

with {:ok, {transport, socket}} <- Connection.connection(client_id) do
case publish do
%Package.Publish{qos: 0} ->
encoded_publish = Package.encode(publish)
apply(transport, :send, [socket, encoded_publish])

%Package.Publish{qos: qos} when qos in [1, 2] ->
Inflight.track(client_id, {:outgoing, publish})
end
else
{:error, :unknown_connection} ->
{:error, :unknown_connection}
end
end

@doc """
Synchronously send a message to the MQTT broker.
Expand Down Expand Up @@ -158,8 +183,31 @@ defmodule Tortoise do
See the documentation for `Tortoise.publish/4` for configuration.
"""
defdelegate publish_sync(client_id, topic, payload \\ nil, opts \\ []),
to: Tortoise.Connection
def publish_sync(client_id, topic, payload \\ nil, opts \\ []) do
timeout = Keyword.get(opts, :timeout, :infinity)
qos = Keyword.get(opts, :qos, 0)

publish = %Package.Publish{
topic: topic,
qos: qos,
payload: payload,
retain: Keyword.get(opts, :retain, false)
}

with {:ok, {transport, socket}} <- Connection.connection(client_id) do
case publish do
%Package.Publish{qos: 0} ->
encoded_publish = Package.encode(publish)
apply(transport, :send, [socket, encoded_publish])

%Package.Publish{qos: qos} when qos in [1, 2] ->
Inflight.track_sync(client_id, {:outgoing, publish}, timeout)
end
else
{:error, :unknown_connection} ->
{:error, :unknown_connection}
end
end

@doc """
Subscribe to one or more topics using topic filters on `client_id`
Expand Down
5 changes: 2 additions & 3 deletions lib/tortoise/app.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@ defmodule Tortoise.App do

use Application

@doc """
Start the application supervisor
"""
@impl true
def start(_type, _args) do
# read configuration and start connections
# start with client_id, and handler from config

children = [
{Registry, [keys: :unique, name: Tortoise.Registry]},
{Registry, [keys: :duplicate, name: Tortoise.Events]},
{Tortoise.Supervisor, [strategy: :one_for_one]}
]

Expand Down
102 changes: 52 additions & 50 deletions lib/tortoise/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ defmodule Tortoise.Connection do
@type client_id() :: binary() | atom()

alias Tortoise.{Transport, Connection, Package}
alias Tortoise.Connection.{Inflight, Controller, Receiver, Transmitter, Backoff}
alias Tortoise.Connection.{Inflight, Controller, Receiver, Backoff}
alias Tortoise.Package.{Connect, Connack}

@doc """
Expand Down Expand Up @@ -69,7 +69,8 @@ defmodule Tortoise.Connection do
GenServer.start_link(__MODULE__, initial, opts)
end

defp via_name(client_id) do
@doc false
def via_name(client_id) do
Tortoise.Registry.via_name(__MODULE__, client_id)
end

Expand All @@ -89,6 +90,7 @@ defmodule Tortoise.Connection do
case GenServer.whereis(via_name(client_id)) do
pid when is_pid(pid) ->
send(pid, :connect)
:ok

nil ->
{:error, :unknown_connection}
Expand All @@ -106,47 +108,6 @@ defmodule Tortoise.Connection do
GenServer.call(via_name(client_id), :subscriptions)
end

@doc false
def publish(client_id, topic, payload \\ nil, opts \\ []) do
qos = Keyword.get(opts, :qos, 0)

publish = %Package.Publish{
topic: topic,
qos: qos,
payload: payload,
retain: Keyword.get(opts, :retain, false)
}

case publish do
%Package.Publish{qos: 0} ->
Transmitter.cast(client_id, publish)

%Package.Publish{qos: qos} when qos in [1, 2] ->
Inflight.track(client_id, {:outgoing, publish})
end
end

@doc false
def publish_sync(client_id, topic, payload \\ nil, opts \\ []) do
timeout = Keyword.get(opts, :timeout, :infinity)
qos = Keyword.get(opts, :qos, 0)

publish = %Package.Publish{
topic: topic,
qos: qos,
payload: payload,
retain: Keyword.get(opts, :retain, false)
}

case publish do
%Package.Publish{qos: 0} ->
Transmitter.cast(client_id, publish)

%Package.Publish{qos: qos} when qos in [1, 2] ->
Inflight.track_sync(client_id, {:outgoing, publish}, timeout)
end
end

@doc false
def subscribe(client_id, topics, opts \\ [])

Expand Down Expand Up @@ -236,14 +197,31 @@ defmodule Tortoise.Connection do
end

@doc false
@spec renew(client_id()) :: :ok
def renew(client_id) do
case GenServer.whereis(via_name(client_id)) do
pid when is_pid(pid) ->
send(pid, :connect)
:ok
@spec connection(client_id(), [opts]) ::
{:ok, {module(), term()}} | {:error, :unknown_connection} | {:error, :timeout}
when opts: {:timeout, non_neg_integer()} | {:active, boolean()}
def connection(client_id, opts \\ [active: false]) do
active = Keyword.get(opts, :active, false)

case Tortoise.Registry.meta(via_name(client_id)) do
{:ok, {_transport, _socket} = connection} ->
if active, do: Tortoise.Events.register(client_id, :connection)
{:ok, connection}

{:ok, :connecting} ->
timeout = Keyword.get(opts, :timeout, :infinity)
{:ok, _} = Tortoise.Events.register(client_id, :connection)

receive do
{{Tortoise, ^client_id}, :connection, {transport, socket}} ->
unless active, do: Tortoise.Events.unregister(client_id, :connection)
{:ok, {transport, socket}}
after
timeout ->
{:error, :timeout}
end

nil ->
:error ->
{:error, :unknown_connection}
end
end
Expand All @@ -259,21 +237,36 @@ defmodule Tortoise.Connection do
opts: opts
}

Tortoise.Registry.put_meta(via_name(connect.client_id), :connecting)

# eventually, switch to handle_continue
send(self(), :connect)
{:ok, state}
end

@impl true
def terminate(_reason, state) do
:ok = Tortoise.Registry.delete_meta(via_name(state.connect.client_id))
:ok
end

@impl true
def handle_info(:connect, state) do
:ok = Controller.update_connection_status(state.connect.client_id, :down)
# make sure we will not fall for a keep alive timeout while we are
# trying to reconnect
state = cancel_keep_alive(state)

with {%Connack{status: :accepted} = connack, socket} <-
do_connect(state.server, state.connect),
{:ok, state} = init_connection(socket, state) do
# we are connected; reset backoff state
state = %State{state | backoff: Backoff.reset(state.backoff)}

connection = {state.server.type, socket}
:ok = Tortoise.Registry.put_meta(via_name(state.connect.client_id), connection)
:ok = Tortoise.Events.dispatch(state.connect.client_id, :connection, connection)

case connack do
%Connack{session_present: true} ->
{:noreply, reset_keep_alive(state)}
Expand Down Expand Up @@ -415,6 +408,15 @@ defmodule Tortoise.Connection do
%State{state | keep_alive: ref}
end

defp cancel_keep_alive(%State{keep_alive: nil} = state) do
state
end

defp cancel_keep_alive(%State{keep_alive: keep_alive_ref} = state) do
_ = Process.cancel_timer(keep_alive_ref)
%State{state | keep_alive: nil}
end

defp do_connect(server, %Connect{} = connect) do
%Transport{type: transport, host: host, port: port, opts: opts} = server

Expand Down
Loading

0 comments on commit 255d1e0

Please sign in to comment.