Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subquery cache & friends #21888

Open
wants to merge 13 commits into
base: master
Choose a base branch
from

Conversation

sopel39
Copy link
Member

@sopel39 sopel39 commented May 9, 2024

Implement subquery cache for Hive/Iceberg/Delta

Subquery cache is a lightweight mechanism for caching
source stage computations. It works across queries, but
also within a query if similar subqueries are identified.

Subquery cache works with both streaming and FTE mode. Cache
results are never stalled since data is cached per split. Dedicated
"cache splits ids" include create time and change set
(in case of Delta/Iceberg).

Subquery cache works as follows:
1. During planning, subqueries eligible for caching
   are identified. If there are similar subqueries within
   a query, then common subplan is extracted.
2. Query plan is rewritten using caching plan alternatives
   (fallback to original subquery, cache data, load from cache)
3. SPI PlanSignatures are computed for each cached subquery
4. Splits are scheduled deterministically on nodes based on (PlanSignature, SplitId) pair
5. On the worker cache plugin (currently only memory based) will determine
   if cached data is available for a given split

@cla-bot cla-bot bot added the cla-signed label May 9, 2024
@github-actions github-actions bot added iceberg Iceberg connector delta-lake Delta Lake connector hive Hive connector labels May 9, 2024
@sopel39 sopel39 force-pushed the ks/subquery_cache branch 2 times, most recently from 9f4aa11 to ad12339 Compare May 9, 2024 16:10

List<InternalNode> filteredNodes = filterNodes(nodeMap, includeCoordinator, ImmutableSet.of());
ResettableRandomizedIterator<InternalNode> randomCandidates = new ResettableRandomizedIterator<>(filteredNodes);
Set<InternalNode> schedulableNodes = new HashSet<>(filteredNodes);

// optimizedLocalScheduling enables prioritized assignment of splits to local nodes when splits contain locality information
if (optimizedLocalScheduling) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove optimizedLocalScheduling from NodeSchedulerConfig ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make a separate PR out of the last commit ? It's unrelated to everything else and can be landed quickly

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can also need to change implementation of hive.force-local-scheduling. With current changes we're effectively forcing local scheduling by default in hive (unless node went down). That's probably not the right default as it could create hot spots in the cluster. Instead of flipping isRemotelyAccessible, that flag should now change whether or not host addresses are returned by hive connector and by default we shouldn't provide addresses from HDFS.

@sopel39 sopel39 force-pushed the ks/subquery_cache branch 5 times, most recently from c270355 to a72b5df Compare May 15, 2024 12:16
@github-actions github-actions bot added the ui Web UI label May 21, 2024
@sopel39 sopel39 force-pushed the ks/subquery_cache branch 4 times, most recently from f771ae0 to a0ab7fc Compare May 22, 2024 08:25
@sopel39 sopel39 marked this pull request as ready for review May 22, 2024 08:25
@sopel39 sopel39 changed the title WIP: Subquery cache & friends Subquery cache & friends May 22, 2024
@sopel39 sopel39 force-pushed the ks/subquery_cache branch 5 times, most recently from 865c615 to 74302ec Compare May 23, 2024 12:38
@deigote
Copy link
Member

deigote commented May 27, 2024

Hi 👋🏽 maybe a dumb question, but from subquery cache for Hive/Iceberg/Delta I'm not clear if this is about a subquery cache for the Hive/Iceberg/Delta connectors, or rather a cache for any connector that uses Hive/Iceberg/Delta under the hood. I'm hoping the latter but it'd be great if you could clarify 🙏🏽 !

@sopel39
Copy link
Member Author

sopel39 commented May 27, 2024

or rather a cache for any connector that uses Hive/Iceberg/Delta under the hood.

@deigote I'm not sure what you mean by any connector that uses Hive/Iceberg/Delta under the hood. However, this PR makes subquery cache a 1st class citizen, where source of data can be from any connector as long as connector implements getCacheTableId, getCacheColumnId, getCacheSplitId

@deigote
Copy link
Member

deigote commented May 27, 2024

Thanks! That's what I was hoping for. I got confused by the PR's description saying Implement subquery cache for Hive/Iceberg/Delta which seemed to imply the PR was only about the Hive / Iceberg / Delta connectors. But what I get is with this PR:

  • Trino offers any connector the ability to leverage subquery caching
  • The Hive / Iceberg / Delta already use said ability to implement subquery caching

@sopel39
Copy link
Member Author

sopel39 commented May 29, 2024

Removed dynamic row filtering from PR as it will be handled separately (#22175 (comment))

@kekwan
Copy link
Contributor

kekwan commented Jun 6, 2024

Looking forward to this. Would this work also solve CTE #10?

@deigote
Copy link
Member

deigote commented Jun 6, 2024

@kekwan the way I understood it, it wouldn't "solve" it but it'd contribute to making it a much less severe issue. The CTEs would still execute twice, but their results would be cached on quite a low level. Hopefully the cache hit ratio would be very high but I'm guessing it'd depend on how busy the workers are (I'm assuming the busier they are the more cache evictions).

lukasz-stec and others added 13 commits June 11, 2024 15:03
ChooseAlternativeNode defines alternative sub-plans that can be used
to execute given part of the query.
The actual sub-plan is then chosen per split during task execution.
Alternative sub-plans cannot span multiple stages and are only supported
for source stages.

Co-authored-by: Assaf Bern <[email protected]>
These methods are required by subquery cache to describe
split data for cache key purpose.

ConnectorPageSourceProvider#getUnenforcedPredicate
is used to describe what unenforced predicate will be
applied on split data.

ConnectorPageSourceProvider#prunePredicate is used
to simplify filter predicates on per split bases
(e.g. removing paritioning predicates that fully
contain split data)

Co-authored-by: Kamil Endruszkiewicz <[email protected]>
Co-authored-by: radek <[email protected]>
CacheManager is a set of SPI classes for implementing
split level cache storage.

MemoryCacheManager is a high-performance implementation of
CacheManager that keeps cached data in revocable memory.
Cache table id together with split id and column id represent
rows produced by ConnectorPageSource for a given split.

Cache ids can also be used to canonicalise query plans
for the purpouse of comparison or cache key generation.

This commit implements cache ids for Hive, Iceberg, Delta and TPCH
connectors.

Co-authored-by: Kamil Endruszkiewicz <[email protected]>
Co-authored-by: radek <[email protected]>
Co-authored-by: lukasz-stec <[email protected]>
Cache hit rate depend on deterministic split generation.
Hive connector has a concept of "initial splits" which
are smaller and there is a limited of them.
Therefore, if deterministic splits are
required, then initial splits must be disabled because
Hive split generation doesn't have guaranteed ordering.
Dynamic filter id might be registered by both local join
and as coming from coordinator.
CanonicalSubplanExtractor creates a canonical
representation of a subplan using cache ids
provided by the connector. Canonical subplans
are used to compare plans against each other
and enable extracting of common subplans.

Co-authored-by: Kamil Endruszkiewicz <[email protected]>
Subquery cache is a lightweight mechanism for caching
source stage computations. It works across queries, but
also within a query if similar subqueries are identified.

Subquery cache works with both streaming and FTE mode. Cache
results are never stalled since data is cached per split. Dedicated
"cache splits ids" include create time and change set
(in case of Delta/Iceberg).

Subquery cache works as follows:
1. During planning, subqueries eligible for caching
   are identified. If there are similar subqueries within
   a query, then common subplan is extracted.
2. Query plan is rewritten using caching plan alternatives
   (fallback to original subquery, cache data, load from cache)
3. SPI PlanSignatures are computed for each cached subquery
4. Splits are scheduled deterministically on nodes based on (PlanSignature, SplitId) pair
5. On the worker cache plugin (currently only memory based) will determine
   if cached data is available for a given split

Co-authored-by: Kamil Endruszkiewicz <[email protected]>
Co-authored-by: radek <[email protected]>
Co-authored-by: lukasz-stec <[email protected]>
Co-authored-by: Raunaq Morarka <[email protected]>
@sopel39
Copy link
Member Author

sopel39 commented Jun 11, 2024

rebased after #22190

Copy link
Member

@martint martint left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some initial comments. Still reviewing.

@@ -28,7 +29,7 @@ public interface DriverFactory

OptionalInt getDriverInstances();

Driver createDriver(DriverContext driverContext);
Driver createDriver(DriverContext driverContext, Optional<ScheduledSplit> split);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the motivation and purpose for this new argument?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was added to support alternative plans for the source stage. The alternative in this context is a concrete list of operators (a Driver instance) chosen based on the split.

{
private final List<PlanNode> alternatives;

private final FilteredTableScan originalTableScan;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this for? What does "original" mean in this context (e.g. what if the plan is formulated with a set of alternatives from the get go?)

Also, this seems overly specific. What if the original plan had operations other than a table scan and filter?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was named originalTableScan because ChooseAlternativeNode is ATM created for some existing original sub-plan. The reason we have it though is different. Alternatives work only on a source stage level and are chosen based on a split. This means we need a single source of splits for the ChooseAlternativeNode and originalTableScan is exactly that. The filter part is needed to support dynamic filters at the split source level.

@@ -794,7 +812,7 @@ public PartitioningHandleReassigner(PartitioningHandle fragmentPartitioningHandl
}

@Override
public PlanNode visitTableScan(TableScanNode node, RewriteContext<Void> context)
public TableScanNode visitTableScan(TableScanNode node, RewriteContext<Void> context)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated change?

List<PlanNode> newAlternatives = node.getSources().stream()
.map(alternative -> context.defaultRewrite(alternative, context.get()))
.toList();
TableScanNode newTableScan = visitTableScan(node.getOriginalTableScan().tableScanNode(), context);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Visitor methods are not supposed to be called directly. Use rewriteWith(...).

filter);
return decomposedPredicate.getTupleDomain().transformKeys(scan.getAssignments()::get);
}).orElse(TupleDomain.all());
addInputTableConstraints(filterDomain, scan, context);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't necessarily correct. It may apply to some alternatives, but not all. It should either pick the most conservative choice (that which applies to all the alternatives), or list all the possible I/O plans.

Comment on lines +96 to +97
predicate
)).isEqualTo(TupleDomain.all());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Formatting:

assertThat(pageSourceProvider.prunePredicate(
        TEST_SESSION.toConnectorSession(),
        prepareSplit(
                TupleDomain.withColumnDomains(ImmutableMap.of(regularColumnHandle, Domain.multipleValues(BIGINT, ImmutableList.of(0L)))),
                ImmutableMap.of("partitionedColumn", Optional.of("0"))),
        tableHandle,
        predicate))
        .isEqualTo(TupleDomain.all());

}

@Test
public void testPrunePredicateWhenSplitIsFilteredOut()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check formatting in the methods below, too.

ConnectorSession session,
ConnectorSplit split,
ConnectorTableHandle table,
TupleDomain<ColumnHandle> dynamicFilter)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the argument named "dynamicFilter"? Rename it to "constraint"

@@ -13,6 +13,8 @@
*/
package io.trino.spi.connector;

import io.trino.spi.predicate.TupleDomain;

import java.util.List;

public interface ConnectorPageSourceProvider
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new methods seem misplaced in this class. Why are they associated with the PageSourceProvider (and not RecordSetProvider? The should really live outside of either of those, as they have nothing to do with a data stream itself. They are more related to split management.

Comment on lines +48 to +50
* Prunes columns from predicate that are not effective in filtering split data.
* If split is completely filtered out by given predicate, then this
* method must return {@link TupleDomain#none}.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a weird mix of column pruning and constraint pruning (i..e, a column-wise intersection and a all-or-none intersection of the constraint). It would be more general and easier to reason about if it were a pure intersection of the given tupledomain with the constraint guaranteed by the split.

I can't tell yet what's the right abstraction as there are no uses of it up to this commit -- I will revisit once I review the rest of the PR.

Copy link
Member

@martint martint left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more comments and questions as I continue perusing the code.

ConnectorSession session,
ConnectorTableHandle tableHandle,
DynamicFilter dynamicFilter,
boolean preferDeterministicSplits,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We talked about this offline a few weeks ago. Instead of adding this, we should revisit whether the adaptive split logic is still useful and remove it if not. It's almost certainly not useful for data formats such as ORC and Parquet that cannot be split across row group boundaries.

Comment on lines +45 to +56
* @param predicate Predicate that should be enforced on cached rows.
* Output of `cachedSplitA` can be used to derive output of matching `cachedSplitB`
* (with corresponding {@link PlanSignature}) as long as `cachedSplitB.predicate` is a strict
* subset of `cachedSplitA.predicate`. To do so, `cachedSplitB.predicate` must be
* applied on output of `cachedSplitA`. Before serialization as a cache key, predicate
* needs to be normalized using {@code io.trino.plugin.base.cache.CacheUtils#normalizeTupleDomain(TupleDomain)}.
* @param unenforcedPredicate Unenforced (best-effort) predicate that should be applied on cached rows.
* Output of `cachedSplitA` can be used to derive output of matching `cachedSplitB`
* (with corresponding {@link PlanSignature}) as long as `cachedSplitB.unenforcedPredicate`
* is a subset of `cachedSplitA.unenforcedPredicate`. Before serialization as a cache key, predicate
* needs to be normalized using {@code io.trino.plugin.base.cache.CacheUtils#normalizeTupleDomain(TupleDomain)}.
* @return cached pages for a given split.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace markdown code quotes with proper javadoc code tags.

Comment on lines +49 to +50
* applied on output of `cachedSplitA`. Before serialization as a cache key, predicate
* needs to be normalized using {@code io.trino.plugin.base.cache.CacheUtils#normalizeTupleDomain(TupleDomain)}.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this enforced? Who is responsible for the normalization? What happens if it's not normalized?

* subset of `cachedSplitA.predicate`. To do so, `cachedSplitB.predicate` must be
* applied on output of `cachedSplitA`. Before serialization as a cache key, predicate
* needs to be normalized using {@code io.trino.plugin.base.cache.CacheUtils#normalizeTupleDomain(TupleDomain)}.
* @param unenforcedPredicate Unenforced (best-effort) predicate that should be applied on cached rows.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear what this predicate is for.

import static io.airlift.slice.SizeOf.instanceSize;
import static java.util.Objects.requireNonNull;

public class CacheSplitId
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a record

import static io.airlift.slice.SizeOf.instanceSize;
import static java.util.Objects.requireNonNull;

public class SignatureKey
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a record

* {@link ColumnHandle}). {@link CacheColumnId} for complex projections will use canonicalized and formatted
* version of projection expression.
*/
public class CanonicalSubplan
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the relationship between CanonicalSubplan and PlanSignature?

* {@link ColumnHandle}). {@link CacheColumnId} for complex projections will use canonicalized and formatted
* version of projection expression.
*/
public class CanonicalSubplan
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a dedicated class for this? Why isn't a regular PlanNode tree sufficient to represent a canonical structure (after canonicalization, of course)?

Comment on lines +24 to +32
/**
* Returns a table identifier for the purpose of caching with {@link CacheManager}.
* {@link CacheTableId} together with {@link CacheSplitId} and {@link CacheColumnId}s represents
* rows produced by {@link ConnectorPageSource} for a given split. Local table properties
* (e.g. rows order) must be part of {@link CacheTableId} if they are present. List of selected
* columns should not be part of {@link CacheTableId}. {@link CacheTableId} should not contain
* elements that can be derived from {@link CacheSplitId} such as predicate on partition column
* which can filter splits entirely.
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is too complicated. Why are all those conditions required?

What does it mean for "List of selected columns should not be part of ...", especially in the case of a ConnectorTableHandle representing a pushed-down subplan?

* are eligible for caching with {@link CacheManager}. Connector should convert provided
* {@link ConnectorTableHandle} into canonical one by pruning of every non-canonical field.
*/
ConnectorTableHandle getCanonicalTableHandle(ConnectorTableHandle handle);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand what this is. ConnectorTableHandle represents an opaque reference to a table that can be carried in query plans, transmitted to workers, etc. What is a "property of a ConnectorTableHandle"? What does it mean for it to "affect final query results when underlying table is queried"?

Also, what's the purpose of this, given that there's getCacheTableId above?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed delta-lake Delta Lake connector hive Hive connector iceberg Iceberg connector performance ui Web UI
Development

Successfully merging this pull request may close these issues.

None yet

7 participants