Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Strip out :qlc from LRW implementation #354

Merged
merged 1 commit into from
Aug 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Strip out :qlc from LRW implementation
  • Loading branch information
whitfin committed Aug 25, 2024
commit d19509195b3187dff69a62fcc212258377b6b543
4 changes: 2 additions & 2 deletions docs/features/streaming-caches.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ Cachex.put(:my_cache, "three", 3)
filter = { :==, { :rem, :value, 2 }, 1 }

# generate the query using the filter
query = Cachex.Query.create(filter, :value)
query = Cachex.Query.where(filter, :value)

# 4
:my_cache
|> Cachex.stream!(query)
|> Enum.sum
```

Couple of things to mention here; first of all, you can use any of the `entry()` field names in your matches, and they'll be substituted out automatically. In this case we use `:value` in our filter, which would compile down to `:"$4"` instead. You might also have noticed that we can jump directly to `Enum.sum/1` here. The second (optional) argument to `create/2` controls the format of the stream elements, in this case just streaming the `:value` field of the entry. If the second argument is not provided, it'll stream entry records (just like the first example). It should be noted that `Cachex.Query.create/2` will automatically bind a filter clause to filter out expired documents. If you wish to run a query on the entire dataset, you can use `Cachex.Query.raw/2` instead.
Couple of things to mention here; first of all, you can use any of the `entry()` field names in your matches, and they'll be substituted out automatically. In this case we use `:value` in our filter, which would compile down to `:"$4"` instead. You might also have noticed that we can jump directly to `Enum.sum/1` here. The second (optional) argument to `where/2` controls the format of the stream elements, in this case just streaming the `:value` field of the entry. If the second argument is not provided, it'll stream entry records (just like the first example). It should be noted that `Cachex.Query.where/2` will automatically bind a filter clause to filter out expired documents. If you wish to run a query on the entire dataset, you can use `Cachex.Query.raw/2` instead.
12 changes: 6 additions & 6 deletions lib/cachex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1236,8 +1236,8 @@ defmodule Cachex do

* `:batch_size`

Allows customization of the internal batching when paginating the QLC
cursor coming back from ETS. It's unlikely this will ever need changing.
Allows customization of the internal batching when paginating the cursor
coming back from ETS. It's unlikely this will ever need changing.

## Examples

Expand All @@ -1251,21 +1251,21 @@ defmodule Cachex do
{:entry, "c", 1519015805679, nil, 3},
{:entry, "a", 1519015794445, nil, 1}]

iex> query = Cachex.Query.create(true, :key)
iex> query = Cachex.Query.where(true, :key)
iex> :my_cache |> Cachex.stream!(query) |> Enum.to_list
["b", "c", "a"]

iex> query = Cachex.Query.create(true, :value)
iex> query = Cachex.Query.where(true, :value)
iex> :my_cache |> Cachex.stream!(query) |> Enum.to_list
[2, 3, 1]

iex> query = Cachex.Query.create(true, { :key, :value })
iex> query = Cachex.Query.where(true, { :key, :value })
iex> :my_cache |> Cachex.stream!(query) |> Enum.to_list
[{"b", 2}, {"c", 3}, {"a", 1}]

"""
@spec stream(cache, any, Keyword.t()) :: {status, Enumerable.t()}
def stream(cache, query \\ Query.create(true), options \\ [])
def stream(cache, query \\ Query.where(true), options \\ [])
when is_list(options),
do: Conductor.route(cache, {:stream, [query, options]})

Expand Down
2 changes: 1 addition & 1 deletion lib/cachex/actions/keys.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ defmodule Cachex.Actions.Keys do
will not be included.
"""
def execute(cache(name: name), _options),
do: {:ok, :ets.select(name, Query.create(true, :key))}
do: {:ok, :ets.select(name, Query.where(true, :key))}
end
18 changes: 13 additions & 5 deletions lib/cachex/actions/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,21 @@ defmodule Cachex.Actions.Stream do
# the cursor is spent, in which case we halt the stream and kill the cursor.
defp init_stream(batch, name, spec) do
Stream.resource(
fn -> [] end,
fn -> :"$start_of_table" end,
fn
:"$end_of_table" -> {:halt, []}
[] -> :ets.select(name, spec, batch)
cursor -> :ets.select(cursor)
# we're finished!
:"$end_of_table" ->
{:halt, nil}

# we're starting!
:"$start_of_table" ->
:ets.select(name, spec, batch)

# we're continuing!
continuation ->
:ets.select(continuation)
end,
fn _ -> [] end
& &1
)
end
end
54 changes: 10 additions & 44 deletions lib/cachex/policy/lrw.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ defmodule Cachex.Policy.LRW do
alias Cachex.Query
alias Cachex.Services.Informant

# compile our QLC match at runtime to avoid recalculating
@qlc_match Query.raw(true, {:key, :touched})
# compile our match to avoid recalculating
@ets_match Query.raw(true, {:key, :touched})

####################
# Policy Behaviour #
Expand Down Expand Up @@ -134,55 +134,21 @@ defmodule Cachex.Policy.LRW do
# to remove from the cache table. We do this by traversing the underlying ETS table,
# which only selects the key and touch time as a minor optimization. The key is
# naturally required when it comes to removing the document, and the touch time is
# used to determine the sort order required for LRW. We transform this sort using
# a QLC cursor and pass it through to `erase_cursor/3` to delete.
defp erase_lower_bound(offset, cache(name: name), batch_size)
# used to determine the sort order required for LRW.
defp erase_lower_bound(offset, cache(name: name) = cache, batch)
when offset > 0 do
name
|> :ets.table(traverse: {:select, @qlc_match})
|> :qlc.sort(order: fn {_k1, t1}, {_k2, t2} -> t1 < t2 end)
|> :qlc.cursor()
|> erase_cursor(name, offset, batch_size)
cache
|> Cachex.stream!(@ets_match, const(:notify_false) ++ [batch_size: batch])
|> Enum.sort(fn {_k1, t1}, {_k2, t2} -> t1 < t2 end)
|> Enum.take(offset)
|> Enum.each(fn {k, _t} -> :ets.delete(name, k) end)

offset
end

defp erase_lower_bound(offset, _state, _batch_size),
defp erase_lower_bound(offset, _state, _batch),
do: offset

# Erases entries in an LRW ETS cursor.
#
# This will exhaust a QLC cursor by taking in a provided cursor and removing the first
# N elements (where N is the number of entries we need to remove). Removals are done
# in configurable batches according to the `:batch_size` option.
#
# This is a recursive function as we have to keep track of the number to remove,
# as the removal is done by calling `erase_batch/3`. At the end of the recursion,
# we make sure to delete the trailing QLC cursor to avoid it lying around still.
defp erase_cursor(cursor, table, remainder, batch_size)
when remainder > batch_size do
erase_batch(cursor, table, batch_size)
erase_cursor(cursor, table, remainder - batch_size, batch_size)
end

defp erase_cursor(cursor, table, remainder, _batch_size) do
erase_batch(cursor, table, remainder)
:qlc.delete_cursor(cursor)
end

# Erases a batch of entries from a QLC cursor.
#
# This is not the most performant way to do this (as far as I know), as
# we need to pull a batch of entries from the table and then just pass
# them back through to ETS to erase them by key. This is nowhere near as
# performant as `:ets.select_delete/2` but appears to be required because
# of the need to sort the QLC cursor by the touch time.
defp erase_batch(cursor, table, batch_size) do
for {key, _touched} <- :qlc.next_answers(cursor, batch_size) do
:ets.delete(table, key)
end
end

# Broadcasts the number of removed entries to the cache hooks.
#
# If the offset is not positive we didn't have to remove anything and so we
Expand Down
14 changes: 7 additions & 7 deletions lib/cachex/query.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,6 @@ defmodule Cachex.Query do
# Public API #
##############

@doc """
Creates an expiration-aware query.
"""
@spec create(any, any) :: [{tuple, [tuple], [any]}]
def create(condition, output \\ :"$_"),
do: raw({:andalso, unexpired_clause(), condition}, output)

@doc """
Creates a query to retrieve all expired records.
"""
Expand Down Expand Up @@ -65,6 +58,13 @@ defmodule Cachex.Query do
def unexpired_clause,
do: {:orelse, {:==, :"$3", nil}, {:>, {:+, :"$2", :"$3"}, now()}}

@doc """
Creates an expiration-aware query.
"""
@spec where(any, any) :: [{tuple, [tuple], [any]}]
def where(condition, output \\ :"$_"),
do: raw({:andalso, unexpired_clause(), condition}, output)

###############
# Private API #
###############
Expand Down
4 changes: 2 additions & 2 deletions test/cachex/actions/stream_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ defmodule Cachex.Actions.StreamTest do
{:ok, true} = Cachex.put(cache, "key3", "value3")

# create two test queries
query1 = Cachex.Query.create(true, {:key, :value})
query2 = Cachex.Query.create(true, :key)
query1 = Cachex.Query.where(true, {:key, :value})
query2 = Cachex.Query.where(true, :key)

# create cache streams
{:ok, stream1} = Cachex.stream(cache, query1)
Expand Down
6 changes: 3 additions & 3 deletions test/cachex/query_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ defmodule Cachex.QueryTest do
# creation default, which will attach the checks for expirations.
test "creating basic queries" do
# create a query with a true filter
query1 = Cachex.Query.create(true)
query2 = Cachex.Query.create(true, :key)
query1 = Cachex.Query.where(true)
query2 = Cachex.Query.where(true, :key)

# verify the mapping of both queries
assert [{{:_, :"$1", :"$2", :"$3", :"$4"}, _, _}] = query1
Expand All @@ -26,7 +26,7 @@ defmodule Cachex.QueryTest do
assert [{_, _, [:"$1"]}] = query2
end

# The `expired()` function is just a wrapper to `create` whilst inverting
# The `expired()` function is just a wrapper to `where` whilst inverting
# the expiration checks. This test just covers this behaviour.
test "creating expired queries" do
# create a couple of expired queries
Expand Down
Loading