Skip to content

Commit

Permalink
Simplify join code in local planner
Browse files Browse the repository at this point in the history
Extract code to create lookup source OperatorFactor to a new method.
Push some parameter calculations inside of lookup join creation.
  • Loading branch information
dain committed Dec 1, 2015
1 parent c9fe576 commit 17cc14e
Showing 1 changed file with 43 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1337,6 +1337,9 @@ private PhysicalOperation createLookupJoin(JoinNode node,
Optional<Symbol> buildHashSymbol,
LocalExecutionPlanContext context)
{
// Plan build
LookupSourceSupplier lookupSourceSupplier = createLookupJoinSource(node, buildNode, buildSymbols, buildHashSymbol, context);

// Plan probe and introduce a projection to put all fields from the probe side into a single channel if necessary
PhysicalOperation probeSource;
LocalExecutionPlanContext parallelParentContext = null;
Expand All @@ -1351,10 +1354,36 @@ private PhysicalOperation createLookupJoin(JoinNode node,
else {
probeSource = probeNode.accept(this, context);
}
List<Integer> probeChannels = ImmutableList.copyOf(getChannelsForSymbols(probeSymbols, probeSource.getLayout()));
Optional<Integer> probeHashChannel = probeHashSymbol.map(channelGetter(probeSource));
OperatorFactory operator = createLookupJoin(node, probeSource, probeSymbols, probeHashSymbol, lookupSourceSupplier, context);

// do the same on the build side
ImmutableMap.Builder<Symbol, Integer> outputMappings = ImmutableMap.builder();
List<Symbol> outputSymbols = node.getOutputSymbols();
for (int i = 0; i < outputSymbols.size(); i++) {
Symbol symbol = outputSymbols.get(i);
outputMappings.put(symbol, i);
}
PhysicalOperation operation = new PhysicalOperation(operator, outputMappings.build(), probeSource);

// merge parallel joiners back into a single stream
if (parallelParentContext != null) {
operation = addInMemoryExchange(parallelParentContext, operation, context);
}

return operation;
}

private boolean isBuildOuter(JoinNode node)
{
return node.getType() == RIGHT || node.getType() == FULL;
}

private LookupSourceSupplier createLookupJoinSource(
JoinNode node,
PlanNode buildNode,
List<Symbol> buildSymbols,
Optional<Symbol> buildHashSymbol,
LocalExecutionPlanContext context)
{
LocalExecutionPlanContext buildContext = context.createSubContext();
PhysicalOperation buildSource = buildNode.accept(this, buildContext);
List<Integer> buildChannels = ImmutableList.copyOf(getChannelsForSymbols(buildSymbols, buildSource.getLayout()));
Expand Down Expand Up @@ -1407,43 +1436,22 @@ private PhysicalOperation createLookupJoin(JoinNode node,

lookupSourceSupplier = parallelHashBuilder.getLookupSourceSupplier();
}

ImmutableMap.Builder<Symbol, Integer> outputMappings = ImmutableMap.builder();
outputMappings.putAll(probeSource.getLayout());

// inputs from build side of the join are laid out following the input from the probe side,
// so adjust the channel ids but keep the field layouts intact
int offset = probeSource.getTypes().size();
for (Map.Entry<Symbol, Integer> entry : buildSource.getLayout().entrySet()) {
Integer input = entry.getValue();
outputMappings.put(entry.getKey(), offset + input);
}

OperatorFactory operator = createLookupJoin(node.getType(), lookupSourceSupplier, probeSource.getTypes(), probeChannels, probeHashChannel, context);
PhysicalOperation operation = new PhysicalOperation(operator, outputMappings.build(), probeSource);

// merge parallel joiners back into a single stream
if (parallelParentContext != null) {
operation = addInMemoryExchange(parallelParentContext, operation, context);
}

return operation;
}

private boolean isBuildOuter(JoinNode node)
{
return node.getType() == RIGHT || node.getType() == FULL;
return lookupSourceSupplier;
}

private OperatorFactory createLookupJoin(
JoinNode.Type type,
JoinNode node,
PhysicalOperation probeSource,
List<Symbol> probeSymbols,
Optional<Symbol> probeHashSymbol,
LookupSourceSupplier lookupSourceSupplier,
List<Type> probeTypes,
List<Integer> probeJoinChannels,
Optional<Integer> probeHashChannel,
LocalExecutionPlanContext context)
{
switch (type) {
List<Type> probeTypes = probeSource.getTypes();
List<Integer> probeJoinChannels = ImmutableList.copyOf(getChannelsForSymbols(probeSymbols, probeSource.getLayout()));
Optional<Integer> probeHashChannel = probeHashSymbol.map(channelGetter(probeSource));

switch (node.getType()) {
case INNER:
return LookupJoinOperators.innerJoin(context.getNextOperatorId(), lookupSourceSupplier, probeTypes, probeJoinChannels, probeHashChannel);
case LEFT:
Expand All @@ -1453,7 +1461,7 @@ private OperatorFactory createLookupJoin(
case FULL:
return LookupJoinOperators.fullOuterJoin(context.getNextOperatorId(), lookupSourceSupplier, probeTypes, probeJoinChannels, probeHashChannel);
default:
throw new UnsupportedOperationException("Unsupported join type: " + type);
throw new UnsupportedOperationException("Unsupported join type: " + node.getType());
}
}

Expand Down

0 comments on commit 17cc14e

Please sign in to comment.