Skip to content

Commit

Permalink
Ensure fetch fallback is called once per key per ttl
Browse files Browse the repository at this point in the history
  • Loading branch information
camilleryr committed Jun 4, 2024
1 parent 31c3fd0 commit aa574a3
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 43 deletions.
85 changes: 42 additions & 43 deletions lib/cachex/services/courier.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ defmodule Cachex.Services.Courier do

# add some aliases
alias Cachex.Actions
alias Cachex.Actions.Get
alias Cachex.Actions.Put
alias Cachex.ExecutionError

Expand Down Expand Up @@ -56,54 +57,52 @@ defmodule Cachex.Services.Courier do
do: Process.flag(:trap_exit, true) || {:ok, {cache, %{}}}

@doc false
# Dispatches a tasks to be carried out by the Courier.
# Dispatches a worker to execute the fallback_fun by the Courier.
#
# Tasks will only be executed if they're not already in progress. This
# is only tracked on a key level, so it's not possible to track different
# tasks for a given key.
# Workers will only be executed if they're not already in progress and the
# cache does not have a value for the given key. This is only tracked on a
# key level, so it's not possible to track different workders for a given key.
#
# Due to the nature of the async behaviour, this call will return before
# the task has been completed, and the :notify callback will receive the
# results from the task after completion (regardless of outcome).
def handle_call({:dispatch, key, task, stack}, caller, {cache, tasks}) do
references =
case Map.get(tasks, key) do
{pid, listeners} ->
{pid, [caller | listeners]}

nil ->
parent = self()

worker =
spawn_link(fn ->
result =
try do
task.()
rescue
e ->
{
:error,
%ExecutionError{
message: Exception.message(e),
stack: stack_compat() ++ stack
}
}
end

formatted = Actions.format_fetch_value(result)
normalized = Actions.normalize_commit(formatted)

with {:commit, val, options} <- normalized do
Put.execute(cache, key, val, [const(:notify_false) | options])
end

send(parent, {:notify, key, formatted})
end)

{worker, [caller]}
# the worker has completed, and the :notify callback will receive the results
# from the worker after completion (regardless of outcome).
def handle_call({:dispatch, key, fallback, stack}, caller, {cache, workers}) do
with nil <- Map.get(workers, key),
{:ok, nil} <- Get.execute(cache, key, []) do
worker_pid = start_worker(cache, key, fallback, stack)
{:noreply, {cache, Map.put(workers, key, {worker_pid, [caller]})}}
else
_worker_for_key = {worker_pid, [_ | _] = listeners} ->
{:noreply,
{cache, Map.put(workers, key, {worker_pid, [caller | listeners]})}}

cache_get_result ->
{:reply, cache_get_result, {cache, workers}}
end
end

defp start_worker(cache, key, fallback, stack) do
parent = self()

spawn_link(fn ->
formatted_result =
try do
Actions.format_fetch_value(fallback.())
rescue
e ->
{:error,
%ExecutionError{
message: Exception.message(e),
stack: stack_compat() ++ stack
}}
end

with {:commit, val, options} <- Actions.normalize_commit(formatted_result) do
Put.execute(cache, key, val, [const(:notify_false) | options])
end

{:noreply, {cache, Map.put(tasks, key, references)}}
send(parent, {:notify, key, formatted_result})
end)
end

@doc false
Expand Down
22 changes: 22 additions & 0 deletions test/cachex/actions/fetch_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -180,4 +180,26 @@ defmodule Cachex.Actions.FetchTest do
assert(get1 == {:ok, "1"})
assert(get2 == {:ok, "2"})
end

# This test ensures that the fallback is executed just once per key, per ttl
test "fetching will only call fallback once per key" do
cache = Helper.create_cache()
agent = start_supervised!({Agent, fn -> %{} end})

for test_index <- 1..100 do
test_key = "test_key_#{test_index}"

1..(System.schedulers_online() * 2)
|> Task.async_stream(fn _ ->
Cachex.fetch(cache, test_key, fn ->
Agent.update(agent, fn state ->
Map.update(state, test_key, 1, &(&1 + 1))
end)
end)
end)
|> Stream.run()

assert 1 == Agent.get(agent, &Map.get(&1, test_key))
end
end
end

0 comments on commit aa574a3

Please sign in to comment.