Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure fetch fallback is called once per key per ttl #348

Merged
merged 1 commit into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 39 additions & 33 deletions lib/cachex/services/courier.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ defmodule Cachex.Services.Courier do

# add some aliases
alias Cachex.Actions
alias Cachex.Actions.Get
alias Cachex.Actions.Put
alias Cachex.ExecutionError

Expand Down Expand Up @@ -65,45 +66,50 @@ defmodule Cachex.Services.Courier do
# Due to the nature of the async behaviour, this call will return before
# the task has been completed, and the :notify callback will receive the
# results from the task after completion (regardless of outcome).
def handle_call({:dispatch, key, task, stack}, caller, {cache, tasks}) do
references =
case Map.get(tasks, key) do
{pid, listeners} ->
{pid, [caller | listeners]}

nil ->
parent = self()

worker =
spawn_link(fn ->
result =
try do
task.()
rescue
e ->
{
:error,
%ExecutionError{
message: Exception.message(e),
stack: stack_compat() ++ stack
def handle_call({:dispatch, key, task, stack}, caller, {cache, tasks} = state) do
case Map.get(tasks, key) do
{pid, listeners} ->
{:noreply, {cache, Map.put(tasks, key, {pid, [caller | listeners]})}}

nil ->
case Get.execute(cache, key, []) do
{:ok, nil} ->
parent = self()

worker =
spawn_link(fn ->
result =
try do
task.()
rescue
e ->
{
:error,
%ExecutionError{
message: Exception.message(e),
stack: stack_compat() ++ stack
}
}
}
end
end

formatted = Actions.format_fetch_value(result)
normalized = Actions.normalize_commit(formatted)
formatted = Actions.format_fetch_value(result)
normalized = Actions.normalize_commit(formatted)

with {:commit, val, options} <- normalized do
Put.execute(cache, key, val, [const(:notify_false) | options])
end
with {:commit, val, options} <- normalized do
Put.execute(cache, key, val, [
const(:notify_false) | options
])
end

send(parent, {:notify, key, formatted})
end)
send(parent, {:notify, key, formatted})
end)

{worker, [caller]}
end
{:noreply, {cache, Map.put(tasks, key, {worker, [caller]})}}

{:noreply, {cache, Map.put(tasks, key, references)}}
{:ok, _value} = res ->
{:reply, res, state}
end
end
end

@doc false
Expand Down
22 changes: 22 additions & 0 deletions test/cachex/actions/fetch_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -180,4 +180,26 @@ defmodule Cachex.Actions.FetchTest do
assert(get1 == {:ok, "1"})
assert(get2 == {:ok, "2"})
end

# This test ensures that the fallback is executed just once per key, per ttl
test "fetching will only call fallback once per key" do
cache = Helper.create_cache()
agent = start_supervised!({Agent, fn -> %{} end})

for test_index <- 1..100 do
test_key = "test_key_#{test_index}"

1..(System.schedulers_online() * 2)
|> Task.async_stream(fn _ ->
Cachex.fetch(cache, test_key, fn ->
Agent.update(agent, fn state ->
Map.update(state, test_key, 1, &(&1 + 1))
end)
end)
end)
|> Stream.run()

assert 1 == Agent.get(agent, &Map.get(&1, test_key))
end
end
end
Loading