version | title |
---|---|
1.0.0 |
Параллелизм |
Одна из сильных сторон Elixir — поддержка параллелизма. Благодаря Erlang VM (BEAM) параллелизм в Elixir легче, чем вы думали. В основе модели параллелизма лежат акторы — процессы, взаимодействующие с другими процессами путём передачи сообщений.
В этом уроке мы познакомимся с модулями параллелизма, поставляемыми вместе с Elixir. В следующей части мы также узнаем, каким способом они реализованы в OTP.
{% include toc.html %}
Процессы в Erlang VM легковесны и выполняются на всех процессорах. Они могут показаться похожими на нативные потоки, но они проще, и вполне нормально иметь тысячи параллельных процессов в одном приложении Elixir.
Простейший способ создать новый процесс это функция spawn
, принимающая анонимную или именованную функцию. Когда мы создаём новый процесс, он возвращает Идентификатор процесса, или PID, для однозначного определения внутри нашего приложения.
Для начала создадим модуль и опишем функцию, которую мы хотели бы запустить:
defmodule Example do
def add(a, b) do
IO.puts(a + b)
end
end
iex> Example.add(2, 3)
5
:ok
Чтобы выполнить функцию асинхронно, воспользуемся spawn/3
:
iex> spawn(Example, :add, [2, 3])
5
#PID<0.80.0>
Для взаимодействия между собой процессы используют сообщения. Для этого существует две части: send/2
и receive
. Функция send/2
позволяет отправлять сообщения PID'y. Для получения и проверки сообщений используется receive
. Если при проверке совпадение не будет найдено, выполнение продолжится.
defmodule Example do
def listen do
receive do
{:ok, "hello"} -> IO.puts "World"
end
listen
end
end
iex> pid = spawn(Example, :listen, [])
#PID<0.108.0>
iex> send pid, {:ok, "hello"}
World
{:ok, "hello"}
iex> send pid, :ok
:ok
Стоит заметить, что функция listen/0
рекурсивна (вызывает саму себя), что позволяет этому процессу обработать несколько сообщений. Без этого вызова процесс завершит свою работу после обработки первого сообщения.
Одна из проблем при использовании spawn
— узнать о выходе процесса из строя. Для этого мы свяжем наши процессы с помощью spawn_link
. Два связанных процесса будут получать друг от друга уведомления о завершении:
defmodule Example do
def explode, do: exit(:kaboom)
end
iex> spawn(Example, :explode, [])
#PID<0.66.0>
iex> spawn_link(Example, :explode, [])
** (EXIT from #PID<0.57.0>) :kaboom
Иногда мы не хотим, чтобы связанный процесс завершал текущий. Для этого нужно перехватывать попытки завершения. Перехваченные попытки будут получены в виде сообщения-кортежа: {:EXIT, from_pid, reason}
.
defmodule Example do
def explode, do: exit(:kaboom)
def run do
Process.flag(:trap_exit, true)
spawn_link(Example, :explode, [])
receive do
{:EXIT, from_pid, reason} -> IO.puts "Exit reason: #{reason}"
end
end
end
iex> Example.run
Exit reason: kaboom
:ok
Но что делать, если мы не хотим связывать два процесса, но при этом хотим получать информацию? Можно воспользоваться spawn_monitor
для мониторинга процесса. При наблюдении за процессом мы получаем сообщения, если процесс выйдет из строя, без завершения текущего процесса и необходимости явно перехватывать попытки завершения.
defmodule Example do
def explode, do: exit(:kaboom)
def run do
{pid, ref} = spawn_monitor(Example, :explode, [])
receive do
{:DOWN, ref, :process, from_pid, reason} -> IO.puts "Exit reason: #{reason}"
end
end
end
iex> Example.run
Exit reason: kaboom
:ok
Агенты — абстракция над фоновыми процессами, сохраняющими состояние. Мы можем получить доступ к ним из другого процесса нашего приложения. Состояние агента устанавливается равным возвращаемому значению нашей функции:
iex> {:ok, agent} = Agent.start_link(fn -> [1, 2, 3] end)
{:ok, #PID<0.65.0>}
iex> Agent.update(agent, fn (state) -> state ++ [4, 5] end)
:ok
iex> Agent.get(agent, &(&1))
[1, 2, 3, 4, 5]
Если мы зададим имя агенту, то сможем обращаться к нему, используя имя, а не PID:
iex> Agent.start_link(fn -> [1, 2, 3] end, name: Numbers)
{:ok, #PID<0.74.0>}
iex> Agent.get(Numbers, &(&1))
[1, 2, 3]
Задачи предоставляют возможность выполнять функцию в фоновом режиме и получать её значение потом. Они могут быть особенно полезны при обработке дорогостоящих операций без блокировки выполнения приложения.
defmodule Example do
def double(x) do
:timer.sleep(2000)
x * 2
end
end
iex> task = Task.async(Example, :double, [2000])
%Task{pid: #PID<0.111.0>, ref: #Reference<0.0.8.200>}
# Делаем что-нибудь
iex> Task.await(task)
4000