diff --git a/docs/features/streaming-caches.md b/docs/features/streaming-caches.md index 0372691..2e021b1 100644 --- a/docs/features/streaming-caches.md +++ b/docs/features/streaming-caches.md @@ -36,7 +36,7 @@ Cachex.put(:my_cache, "three", 3) filter = { :==, { :rem, :value, 2 }, 1 } # generate the query using the filter -query = Cachex.Query.create(filter, :value) +query = Cachex.Query.where(filter, :value) # 4 :my_cache @@ -44,4 +44,4 @@ query = Cachex.Query.create(filter, :value) |> Enum.sum ``` -Couple of things to mention here; first of all, you can use any of the `entry()` field names in your matches, and they'll be substituted out automatically. In this case we use `:value` in our filter, which would compile down to `:"$4"` instead. You might also have noticed that we can jump directly to `Enum.sum/1` here. The second (optional) argument to `create/2` controls the format of the stream elements, in this case just streaming the `:value` field of the entry. If the second argument is not provided, it'll stream entry records (just like the first example). It should be noted that `Cachex.Query.create/2` will automatically bind a filter clause to filter out expired documents. If you wish to run a query on the entire dataset, you can use `Cachex.Query.raw/2` instead. +Couple of things to mention here; first of all, you can use any of the `entry()` field names in your matches, and they'll be substituted out automatically. In this case we use `:value` in our filter, which would compile down to `:"$4"` instead. You might also have noticed that we can jump directly to `Enum.sum/1` here. The second (optional) argument to `where/2` controls the format of the stream elements, in this case just streaming the `:value` field of the entry. If the second argument is not provided, it'll stream entry records (just like the first example). It should be noted that `Cachex.Query.where/2` will automatically bind a filter clause to filter out expired documents. If you wish to run a query on the entire dataset, you can use `Cachex.Query.raw/2` instead. diff --git a/lib/cachex.ex b/lib/cachex.ex index c7951b5..a8e3dbc 100644 --- a/lib/cachex.ex +++ b/lib/cachex.ex @@ -1236,8 +1236,8 @@ defmodule Cachex do * `:batch_size` - Allows customization of the internal batching when paginating the QLC - cursor coming back from ETS. It's unlikely this will ever need changing. + Allows customization of the internal batching when paginating the cursor + coming back from ETS. It's unlikely this will ever need changing. ## Examples @@ -1251,21 +1251,21 @@ defmodule Cachex do {:entry, "c", 1519015805679, nil, 3}, {:entry, "a", 1519015794445, nil, 1}] - iex> query = Cachex.Query.create(true, :key) + iex> query = Cachex.Query.where(true, :key) iex> :my_cache |> Cachex.stream!(query) |> Enum.to_list ["b", "c", "a"] - iex> query = Cachex.Query.create(true, :value) + iex> query = Cachex.Query.where(true, :value) iex> :my_cache |> Cachex.stream!(query) |> Enum.to_list [2, 3, 1] - iex> query = Cachex.Query.create(true, { :key, :value }) + iex> query = Cachex.Query.where(true, { :key, :value }) iex> :my_cache |> Cachex.stream!(query) |> Enum.to_list [{"b", 2}, {"c", 3}, {"a", 1}] """ @spec stream(cache, any, Keyword.t()) :: {status, Enumerable.t()} - def stream(cache, query \\ Query.create(true), options \\ []) + def stream(cache, query \\ Query.where(true), options \\ []) when is_list(options), do: Conductor.route(cache, {:stream, [query, options]}) diff --git a/lib/cachex/actions/keys.ex b/lib/cachex/actions/keys.ex index f211e5d..031e6e1 100644 --- a/lib/cachex/actions/keys.ex +++ b/lib/cachex/actions/keys.ex @@ -24,5 +24,5 @@ defmodule Cachex.Actions.Keys do will not be included. """ def execute(cache(name: name), _options), - do: {:ok, :ets.select(name, Query.create(true, :key))} + do: {:ok, :ets.select(name, Query.where(true, :key))} end diff --git a/lib/cachex/actions/stream.ex b/lib/cachex/actions/stream.ex index e7c2d57..ce31faa 100644 --- a/lib/cachex/actions/stream.ex +++ b/lib/cachex/actions/stream.ex @@ -53,13 +53,21 @@ defmodule Cachex.Actions.Stream do # the cursor is spent, in which case we halt the stream and kill the cursor. defp init_stream(batch, name, spec) do Stream.resource( - fn -> [] end, + fn -> :"$start_of_table" end, fn - :"$end_of_table" -> {:halt, []} - [] -> :ets.select(name, spec, batch) - cursor -> :ets.select(cursor) + # we're finished! + :"$end_of_table" -> + {:halt, nil} + + # we're starting! + :"$start_of_table" -> + :ets.select(name, spec, batch) + + # we're continuing! + continuation -> + :ets.select(continuation) end, - fn _ -> [] end + & &1 ) end end diff --git a/lib/cachex/policy/lrw.ex b/lib/cachex/policy/lrw.ex index 95c2d29..f48ca51 100644 --- a/lib/cachex/policy/lrw.ex +++ b/lib/cachex/policy/lrw.ex @@ -46,8 +46,8 @@ defmodule Cachex.Policy.LRW do alias Cachex.Query alias Cachex.Services.Informant - # compile our QLC match at runtime to avoid recalculating - @qlc_match Query.raw(true, {:key, :touched}) + # compile our match to avoid recalculating + @ets_match Query.raw(true, {:key, :touched}) #################### # Policy Behaviour # @@ -134,55 +134,21 @@ defmodule Cachex.Policy.LRW do # to remove from the cache table. We do this by traversing the underlying ETS table, # which only selects the key and touch time as a minor optimization. The key is # naturally required when it comes to removing the document, and the touch time is - # used to determine the sort order required for LRW. We transform this sort using - # a QLC cursor and pass it through to `erase_cursor/3` to delete. - defp erase_lower_bound(offset, cache(name: name), batch_size) + # used to determine the sort order required for LRW. + defp erase_lower_bound(offset, cache(name: name) = cache, batch) when offset > 0 do - name - |> :ets.table(traverse: {:select, @qlc_match}) - |> :qlc.sort(order: fn {_k1, t1}, {_k2, t2} -> t1 < t2 end) - |> :qlc.cursor() - |> erase_cursor(name, offset, batch_size) + cache + |> Cachex.stream!(@ets_match, const(:notify_false) ++ [batch_size: batch]) + |> Enum.sort(fn {_k1, t1}, {_k2, t2} -> t1 < t2 end) + |> Enum.take(offset) + |> Enum.each(fn {k, _t} -> :ets.delete(name, k) end) offset end - defp erase_lower_bound(offset, _state, _batch_size), + defp erase_lower_bound(offset, _state, _batch), do: offset - # Erases entries in an LRW ETS cursor. - # - # This will exhaust a QLC cursor by taking in a provided cursor and removing the first - # N elements (where N is the number of entries we need to remove). Removals are done - # in configurable batches according to the `:batch_size` option. - # - # This is a recursive function as we have to keep track of the number to remove, - # as the removal is done by calling `erase_batch/3`. At the end of the recursion, - # we make sure to delete the trailing QLC cursor to avoid it lying around still. - defp erase_cursor(cursor, table, remainder, batch_size) - when remainder > batch_size do - erase_batch(cursor, table, batch_size) - erase_cursor(cursor, table, remainder - batch_size, batch_size) - end - - defp erase_cursor(cursor, table, remainder, _batch_size) do - erase_batch(cursor, table, remainder) - :qlc.delete_cursor(cursor) - end - - # Erases a batch of entries from a QLC cursor. - # - # This is not the most performant way to do this (as far as I know), as - # we need to pull a batch of entries from the table and then just pass - # them back through to ETS to erase them by key. This is nowhere near as - # performant as `:ets.select_delete/2` but appears to be required because - # of the need to sort the QLC cursor by the touch time. - defp erase_batch(cursor, table, batch_size) do - for {key, _touched} <- :qlc.next_answers(cursor, batch_size) do - :ets.delete(table, key) - end - end - # Broadcasts the number of removed entries to the cache hooks. # # If the offset is not positive we didn't have to remove anything and so we diff --git a/lib/cachex/query.ex b/lib/cachex/query.ex index cba5798..20383d2 100644 --- a/lib/cachex/query.ex +++ b/lib/cachex/query.ex @@ -17,13 +17,6 @@ defmodule Cachex.Query do # Public API # ############## - @doc """ - Creates an expiration-aware query. - """ - @spec create(any, any) :: [{tuple, [tuple], [any]}] - def create(condition, output \\ :"$_"), - do: raw({:andalso, unexpired_clause(), condition}, output) - @doc """ Creates a query to retrieve all expired records. """ @@ -65,6 +58,13 @@ defmodule Cachex.Query do def unexpired_clause, do: {:orelse, {:==, :"$3", nil}, {:>, {:+, :"$2", :"$3"}, now()}} + @doc """ + Creates an expiration-aware query. + """ + @spec where(any, any) :: [{tuple, [tuple], [any]}] + def where(condition, output \\ :"$_"), + do: raw({:andalso, unexpired_clause(), condition}, output) + ############### # Private API # ############### diff --git a/test/cachex/actions/stream_test.exs b/test/cachex/actions/stream_test.exs index 8da28f8..f225918 100644 --- a/test/cachex/actions/stream_test.exs +++ b/test/cachex/actions/stream_test.exs @@ -41,8 +41,8 @@ defmodule Cachex.Actions.StreamTest do {:ok, true} = Cachex.put(cache, "key3", "value3") # create two test queries - query1 = Cachex.Query.create(true, {:key, :value}) - query2 = Cachex.Query.create(true, :key) + query1 = Cachex.Query.where(true, {:key, :value}) + query2 = Cachex.Query.where(true, :key) # create cache streams {:ok, stream1} = Cachex.stream(cache, query1) diff --git a/test/cachex/query_test.exs b/test/cachex/query_test.exs index 5ae2fde..96192c2 100644 --- a/test/cachex/query_test.exs +++ b/test/cachex/query_test.exs @@ -6,8 +6,8 @@ defmodule Cachex.QueryTest do # creation default, which will attach the checks for expirations. test "creating basic queries" do # create a query with a true filter - query1 = Cachex.Query.create(true) - query2 = Cachex.Query.create(true, :key) + query1 = Cachex.Query.where(true) + query2 = Cachex.Query.where(true, :key) # verify the mapping of both queries assert [{{:_, :"$1", :"$2", :"$3", :"$4"}, _, _}] = query1 @@ -26,7 +26,7 @@ defmodule Cachex.QueryTest do assert [{_, _, [:"$1"]}] = query2 end - # The `expired()` function is just a wrapper to `create` whilst inverting + # The `expired()` function is just a wrapper to `where` whilst inverting # the expiration checks. This test just covers this behaviour. test "creating expired queries" do # create a couple of expired queries