-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Subquery cache & friends #21888
base: master
Are you sure you want to change the base?
Subquery cache & friends #21888
Conversation
sopel39
commented
May 9, 2024
•
edited
Loading
edited
9f4aa11
to
ad12339
Compare
|
||
List<InternalNode> filteredNodes = filterNodes(nodeMap, includeCoordinator, ImmutableSet.of()); | ||
ResettableRandomizedIterator<InternalNode> randomCandidates = new ResettableRandomizedIterator<>(filteredNodes); | ||
Set<InternalNode> schedulableNodes = new HashSet<>(filteredNodes); | ||
|
||
// optimizedLocalScheduling enables prioritized assignment of splits to local nodes when splits contain locality information | ||
if (optimizedLocalScheduling) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove optimizedLocalScheduling
from NodeSchedulerConfig ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you make a separate PR out of the last commit ? It's unrelated to everything else and can be landed quickly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can also need to change implementation of hive.force-local-scheduling
. With current changes we're effectively forcing local scheduling by default in hive (unless node went down). That's probably not the right default as it could create hot spots in the cluster. Instead of flipping isRemotelyAccessible
, that flag should now change whether or not host addresses are returned by hive connector and by default we shouldn't provide addresses from HDFS.
c270355
to
a72b5df
Compare
a72b5df
to
4f474e2
Compare
f771ae0
to
a0ab7fc
Compare
865c615
to
74302ec
Compare
Hi 👋🏽 maybe a dumb question, but from |
@deigote I'm not sure what you mean by any connector that uses Hive/Iceberg/Delta under the hood. However, this PR makes subquery cache a 1st class citizen, where source of data can be from any connector as long as connector implements |
Thanks! That's what I was hoping for. I got confused by the PR's description saying Implement subquery cache for Hive/Iceberg/Delta which seemed to imply the PR was only about the Hive / Iceberg / Delta connectors. But what I get is with this PR:
|
74302ec
to
d80cb1d
Compare
d80cb1d
to
d7af76b
Compare
d7af76b
to
bc4d54b
Compare
Removed dynamic row filtering from PR as it will be handled separately (#22175 (comment)) |
Looking forward to this. Would this work also solve CTE #10? |
@kekwan the way I understood it, it wouldn't "solve" it but it'd contribute to making it a much less severe issue. The CTEs would still execute twice, but their results would be cached on quite a low level. Hopefully the cache hit ratio would be very high but I'm guessing it'd depend on how busy the workers are (I'm assuming the busier they are the more cache evictions). |
bc4d54b
to
9e3e422
Compare
ChooseAlternativeNode defines alternative sub-plans that can be used to execute given part of the query. The actual sub-plan is then chosen per split during task execution. Alternative sub-plans cannot span multiple stages and are only supported for source stages. Co-authored-by: Assaf Bern <[email protected]>
These methods are required by subquery cache to describe split data for cache key purpose. ConnectorPageSourceProvider#getUnenforcedPredicate is used to describe what unenforced predicate will be applied on split data. ConnectorPageSourceProvider#prunePredicate is used to simplify filter predicates on per split bases (e.g. removing paritioning predicates that fully contain split data) Co-authored-by: Kamil Endruszkiewicz <[email protected]> Co-authored-by: radek <[email protected]>
CacheManager is a set of SPI classes for implementing split level cache storage. MemoryCacheManager is a high-performance implementation of CacheManager that keeps cached data in revocable memory.
Cache table id together with split id and column id represent rows produced by ConnectorPageSource for a given split. Cache ids can also be used to canonicalise query plans for the purpouse of comparison or cache key generation. This commit implements cache ids for Hive, Iceberg, Delta and TPCH connectors. Co-authored-by: Kamil Endruszkiewicz <[email protected]> Co-authored-by: radek <[email protected]> Co-authored-by: lukasz-stec <[email protected]>
Cache hit rate depend on deterministic split generation. Hive connector has a concept of "initial splits" which are smaller and there is a limited of them. Therefore, if deterministic splits are required, then initial splits must be disabled because Hive split generation doesn't have guaranteed ordering.
Dynamic filter id might be registered by both local join and as coming from coordinator.
CanonicalSubplanExtractor creates a canonical representation of a subplan using cache ids provided by the connector. Canonical subplans are used to compare plans against each other and enable extracting of common subplans. Co-authored-by: Kamil Endruszkiewicz <[email protected]>
Subquery cache is a lightweight mechanism for caching source stage computations. It works across queries, but also within a query if similar subqueries are identified. Subquery cache works with both streaming and FTE mode. Cache results are never stalled since data is cached per split. Dedicated "cache splits ids" include create time and change set (in case of Delta/Iceberg). Subquery cache works as follows: 1. During planning, subqueries eligible for caching are identified. If there are similar subqueries within a query, then common subplan is extracted. 2. Query plan is rewritten using caching plan alternatives (fallback to original subquery, cache data, load from cache) 3. SPI PlanSignatures are computed for each cached subquery 4. Splits are scheduled deterministically on nodes based on (PlanSignature, SplitId) pair 5. On the worker cache plugin (currently only memory based) will determine if cached data is available for a given split Co-authored-by: Kamil Endruszkiewicz <[email protected]> Co-authored-by: radek <[email protected]> Co-authored-by: lukasz-stec <[email protected]> Co-authored-by: Raunaq Morarka <[email protected]>
rebased after #22190 |
9e3e422
to
a2aa506
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some initial comments. Still reviewing.
@@ -28,7 +29,7 @@ public interface DriverFactory | |||
|
|||
OptionalInt getDriverInstances(); | |||
|
|||
Driver createDriver(DriverContext driverContext); | |||
Driver createDriver(DriverContext driverContext, Optional<ScheduledSplit> split); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the motivation and purpose for this new argument?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was added to support alternative plans for the source stage. The alternative in this context is a concrete list of operators (a Driver instance) chosen based on the split.
{ | ||
private final List<PlanNode> alternatives; | ||
|
||
private final FilteredTableScan originalTableScan; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's this for? What does "original" mean in this context (e.g. what if the plan is formulated with a set of alternatives from the get go?)
Also, this seems overly specific. What if the original plan had operations other than a table scan and filter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was named originalTableScan
because ChooseAlternativeNode
is ATM created for some existing original sub-plan. The reason we have it though is different. Alternatives work only on a source stage level and are chosen based on a split. This means we need a single source of splits for the ChooseAlternativeNode
and originalTableScan
is exactly that. The filter part is needed to support dynamic filters at the split source level.
@@ -794,7 +812,7 @@ public PartitioningHandleReassigner(PartitioningHandle fragmentPartitioningHandl | |||
} | |||
|
|||
@Override | |||
public PlanNode visitTableScan(TableScanNode node, RewriteContext<Void> context) | |||
public TableScanNode visitTableScan(TableScanNode node, RewriteContext<Void> context) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated change?
List<PlanNode> newAlternatives = node.getSources().stream() | ||
.map(alternative -> context.defaultRewrite(alternative, context.get())) | ||
.toList(); | ||
TableScanNode newTableScan = visitTableScan(node.getOriginalTableScan().tableScanNode(), context); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Visitor methods are not supposed to be called directly. Use rewriteWith(...)
.
filter); | ||
return decomposedPredicate.getTupleDomain().transformKeys(scan.getAssignments()::get); | ||
}).orElse(TupleDomain.all()); | ||
addInputTableConstraints(filterDomain, scan, context); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't necessarily correct. It may apply to some alternatives, but not all. It should either pick the most conservative choice (that which applies to all the alternatives), or list all the possible I/O plans.
predicate | ||
)).isEqualTo(TupleDomain.all()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Formatting:
assertThat(pageSourceProvider.prunePredicate(
TEST_SESSION.toConnectorSession(),
prepareSplit(
TupleDomain.withColumnDomains(ImmutableMap.of(regularColumnHandle, Domain.multipleValues(BIGINT, ImmutableList.of(0L)))),
ImmutableMap.of("partitionedColumn", Optional.of("0"))),
tableHandle,
predicate))
.isEqualTo(TupleDomain.all());
} | ||
|
||
@Test | ||
public void testPrunePredicateWhenSplitIsFilteredOut() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check formatting in the methods below, too.
ConnectorSession session, | ||
ConnectorSplit split, | ||
ConnectorTableHandle table, | ||
TupleDomain<ColumnHandle> dynamicFilter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the argument named "dynamicFilter"? Rename it to "constraint"
@@ -13,6 +13,8 @@ | |||
*/ | |||
package io.trino.spi.connector; | |||
|
|||
import io.trino.spi.predicate.TupleDomain; | |||
|
|||
import java.util.List; | |||
|
|||
public interface ConnectorPageSourceProvider |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new methods seem misplaced in this class. Why are they associated with the PageSourceProvider (and not RecordSetProvider? The should really live outside of either of those, as they have nothing to do with a data stream itself. They are more related to split management.
* Prunes columns from predicate that are not effective in filtering split data. | ||
* If split is completely filtered out by given predicate, then this | ||
* method must return {@link TupleDomain#none}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a weird mix of column pruning and constraint pruning (i..e, a column-wise intersection and a all-or-none intersection of the constraint). It would be more general and easier to reason about if it were a pure intersection of the given tupledomain with the constraint guaranteed by the split.
I can't tell yet what's the right abstraction as there are no uses of it up to this commit -- I will revisit once I review the rest of the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some more comments and questions as I continue perusing the code.
ConnectorSession session, | ||
ConnectorTableHandle tableHandle, | ||
DynamicFilter dynamicFilter, | ||
boolean preferDeterministicSplits, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We talked about this offline a few weeks ago. Instead of adding this, we should revisit whether the adaptive split logic is still useful and remove it if not. It's almost certainly not useful for data formats such as ORC and Parquet that cannot be split across row group boundaries.
* @param predicate Predicate that should be enforced on cached rows. | ||
* Output of `cachedSplitA` can be used to derive output of matching `cachedSplitB` | ||
* (with corresponding {@link PlanSignature}) as long as `cachedSplitB.predicate` is a strict | ||
* subset of `cachedSplitA.predicate`. To do so, `cachedSplitB.predicate` must be | ||
* applied on output of `cachedSplitA`. Before serialization as a cache key, predicate | ||
* needs to be normalized using {@code io.trino.plugin.base.cache.CacheUtils#normalizeTupleDomain(TupleDomain)}. | ||
* @param unenforcedPredicate Unenforced (best-effort) predicate that should be applied on cached rows. | ||
* Output of `cachedSplitA` can be used to derive output of matching `cachedSplitB` | ||
* (with corresponding {@link PlanSignature}) as long as `cachedSplitB.unenforcedPredicate` | ||
* is a subset of `cachedSplitA.unenforcedPredicate`. Before serialization as a cache key, predicate | ||
* needs to be normalized using {@code io.trino.plugin.base.cache.CacheUtils#normalizeTupleDomain(TupleDomain)}. | ||
* @return cached pages for a given split. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace markdown code quotes with proper javadoc code tags.
* applied on output of `cachedSplitA`. Before serialization as a cache key, predicate | ||
* needs to be normalized using {@code io.trino.plugin.base.cache.CacheUtils#normalizeTupleDomain(TupleDomain)}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this enforced? Who is responsible for the normalization? What happens if it's not normalized?
* subset of `cachedSplitA.predicate`. To do so, `cachedSplitB.predicate` must be | ||
* applied on output of `cachedSplitA`. Before serialization as a cache key, predicate | ||
* needs to be normalized using {@code io.trino.plugin.base.cache.CacheUtils#normalizeTupleDomain(TupleDomain)}. | ||
* @param unenforcedPredicate Unenforced (best-effort) predicate that should be applied on cached rows. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not clear what this predicate is for.
import static io.airlift.slice.SizeOf.instanceSize; | ||
import static java.util.Objects.requireNonNull; | ||
|
||
public class CacheSplitId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be a record
import static io.airlift.slice.SizeOf.instanceSize; | ||
import static java.util.Objects.requireNonNull; | ||
|
||
public class SignatureKey |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be a record
* {@link ColumnHandle}). {@link CacheColumnId} for complex projections will use canonicalized and formatted | ||
* version of projection expression. | ||
*/ | ||
public class CanonicalSubplan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the relationship between CanonicalSubplan and PlanSignature?
* {@link ColumnHandle}). {@link CacheColumnId} for complex projections will use canonicalized and formatted | ||
* version of projection expression. | ||
*/ | ||
public class CanonicalSubplan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need a dedicated class for this? Why isn't a regular PlanNode tree sufficient to represent a canonical structure (after canonicalization, of course)?
/** | ||
* Returns a table identifier for the purpose of caching with {@link CacheManager}. | ||
* {@link CacheTableId} together with {@link CacheSplitId} and {@link CacheColumnId}s represents | ||
* rows produced by {@link ConnectorPageSource} for a given split. Local table properties | ||
* (e.g. rows order) must be part of {@link CacheTableId} if they are present. List of selected | ||
* columns should not be part of {@link CacheTableId}. {@link CacheTableId} should not contain | ||
* elements that can be derived from {@link CacheSplitId} such as predicate on partition column | ||
* which can filter splits entirely. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is too complicated. Why are all those conditions required?
What does it mean for "List of selected columns should not be part of ...", especially in the case of a ConnectorTableHandle representing a pushed-down subplan?
* are eligible for caching with {@link CacheManager}. Connector should convert provided | ||
* {@link ConnectorTableHandle} into canonical one by pruning of every non-canonical field. | ||
*/ | ||
ConnectorTableHandle getCanonicalTableHandle(ConnectorTableHandle handle); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand what this is. ConnectorTableHandle represents an opaque reference to a table that can be carried in query plans, transmitted to workers, etc. What is a "property of a ConnectorTableHandle"? What does it mean for it to "affect final query results when underlying table is queried"?
Also, what's the purpose of this, given that there's getCacheTableId
above?