From d68d4e2c596906a4dd7ca85c4364f3a595dcaadc Mon Sep 17 00:00:00 2001 From: Christopher Miller <32465602+camilleryr@users.noreply.github.com> Date: Tue, 4 Jun 2024 06:36:55 -0500 Subject: [PATCH] Ensure fetch fallback is called once per key per ttl --- lib/cachex/services/courier.ex | 72 ++++++++++++++++-------------- test/cachex/actions/fetch_test.exs | 22 +++++++++ 2 files changed, 61 insertions(+), 33 deletions(-) diff --git a/lib/cachex/services/courier.ex b/lib/cachex/services/courier.ex index bcad3fa..c4e9af4 100644 --- a/lib/cachex/services/courier.ex +++ b/lib/cachex/services/courier.ex @@ -18,6 +18,7 @@ defmodule Cachex.Services.Courier do # add some aliases alias Cachex.Actions + alias Cachex.Actions.Get alias Cachex.Actions.Put alias Cachex.ExecutionError @@ -65,45 +66,50 @@ defmodule Cachex.Services.Courier do # Due to the nature of the async behaviour, this call will return before # the task has been completed, and the :notify callback will receive the # results from the task after completion (regardless of outcome). - def handle_call({:dispatch, key, task, stack}, caller, {cache, tasks}) do - references = - case Map.get(tasks, key) do - {pid, listeners} -> - {pid, [caller | listeners]} - - nil -> - parent = self() - - worker = - spawn_link(fn -> - result = - try do - task.() - rescue - e -> - { - :error, - %ExecutionError{ - message: Exception.message(e), - stack: stack_compat() ++ stack + def handle_call({:dispatch, key, task, stack}, caller, {cache, tasks} = state) do + case Map.get(tasks, key) do + {pid, listeners} -> + {:noreply, {cache, Map.put(tasks, key, {pid, [caller | listeners]})}} + + nil -> + case Get.execute(cache, key, []) do + {:ok, nil} -> + parent = self() + + worker = + spawn_link(fn -> + result = + try do + task.() + rescue + e -> + { + :error, + %ExecutionError{ + message: Exception.message(e), + stack: stack_compat() ++ stack + } } - } - end + end - formatted = Actions.format_fetch_value(result) - normalized = Actions.normalize_commit(formatted) + formatted = Actions.format_fetch_value(result) + normalized = Actions.normalize_commit(formatted) - with {:commit, val, options} <- normalized do - Put.execute(cache, key, val, [const(:notify_false) | options]) - end + with {:commit, val, options} <- normalized do + Put.execute(cache, key, val, [ + const(:notify_false) | options + ]) + end - send(parent, {:notify, key, formatted}) - end) + send(parent, {:notify, key, formatted}) + end) - {worker, [caller]} - end + {:noreply, {cache, Map.put(tasks, key, {worker, [caller]})}} - {:noreply, {cache, Map.put(tasks, key, references)}} + {:ok, _value} = res -> + {:reply, res, state} + end + end end @doc false diff --git a/test/cachex/actions/fetch_test.exs b/test/cachex/actions/fetch_test.exs index c655ef9..067fba8 100644 --- a/test/cachex/actions/fetch_test.exs +++ b/test/cachex/actions/fetch_test.exs @@ -180,4 +180,26 @@ defmodule Cachex.Actions.FetchTest do assert(get1 == {:ok, "1"}) assert(get2 == {:ok, "2"}) end + + # This test ensures that the fallback is executed just once per key, per ttl + test "fetching will only call fallback once per key" do + cache = Helper.create_cache() + agent = start_supervised!({Agent, fn -> %{} end}) + + for test_index <- 1..100 do + test_key = "test_key_#{test_index}" + + 1..(System.schedulers_online() * 2) + |> Task.async_stream(fn _ -> + Cachex.fetch(cache, test_key, fn -> + Agent.update(agent, fn state -> + Map.update(state, test_key, 1, &(&1 + 1)) + end) + end) + end) + |> Stream.run() + + assert 1 == Agent.get(agent, &Map.get(&1, test_key)) + end + end end