Skip to content

Commit

Permalink
Add missing calls to declarePosition
Browse files Browse the repository at this point in the history
Also, validate that blocks have the same number of positions as the
declared positions in the page to catch these issues in the future
  • Loading branch information
martint committed Dec 19, 2014
1 parent aa80790 commit edf3eac
Show file tree
Hide file tree
Showing 25 changed files with 85 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ private static void filterAndProjectRowOriented(PageBuilder pageBuilder,
// extendedprice * (1 - discount) * (1 + tax)
// discount

pageBuilder.declarePosition();
if (returnFlagBlock.isNull(position)) {
pageBuilder.getBlockBuilder(0).appendNull();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public List<Page> partition(List<Page> pages)
continue;
}

pageBuilder.declarePosition();
for (int channel = 0; channel < types.size(); channel++) {
Type type = types.get(channel);
type.appendTo(page.getBlock(channel), position, pageBuilder.getBlockBuilder(channel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public Builder(List<String> columnNames, List<Type> types)

public Builder add(Object... values)
{
pageBuilder.declarePosition();
for (int i = 0; i < types.size(); i++) {
BlockUtils.appendObject(types.get(i), pageBuilder.getBlockBuilder(i), values[i]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ public Page getOutput()
List<Type> types = aggregates.stream().map(Aggregator::getType).collect(toImmutableList());

PageBuilder pageBuilder = new PageBuilder(types);

pageBuilder.declarePosition();
for (int i = 0; i < aggregates.size(); i++) {
Aggregator aggregator = aggregates.get(i);
BlockBuilder blockBuilder = pageBuilder.getBlockBuilder(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ public void addInput(Page page)
GroupByIdBlock ids = groupByHash.getGroupIds(page);
for (int position = 0; position < ids.getPositionCount(); position++) {
if (ids.getGroupId(position) == nextDistinctId) {
pageBuilder.declarePosition();
for (int channel = 0; channel < types.size(); channel++) {
Type type = types.get(channel);
type.appendTo(page.getBlock(channel), position, pageBuilder.getBlockBuilder(channel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ protected Page computeNext()
while (!pageBuilder.isFull() && groupId < groupCount) {
groupByHash.appendValuesTo(groupId, pageBuilder, 0);

pageBuilder.declarePosition();
for (int i = 0; i < aggregators.size(); i++) {
Aggregator aggregator = aggregators.get(i);
BlockBuilder output = pageBuilder.getBlockBuilder(types.size() + i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ private boolean joinCurrentPosition()
{
// while we have a position to join against...
while (joinPosition >= 0) {
pageBuilder.declarePosition();

// write probe columns
probe.appendTo(pageBuilder);

Expand Down Expand Up @@ -205,6 +207,7 @@ private boolean outerJoinCurrentPosition()
{
if (enableOuterJoin && joinPosition < 0) {
// write probe columns
pageBuilder.declarePosition();
probe.appendTo(pageBuilder);

// write nulls into build columns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ public int buildPage(int position, int[] outputChannels, PageBuilder pageBuilder
int blockPosition = decodePosition(pageAddress);

// append the row
pageBuilder.declarePosition();
for (int i = 0; i < outputChannels.length; i++) {
int outputChannel = outputChannels[i];
Type type = types.get(outputChannel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ private Page getSelectedRows()
if (rowCount == maxRowsPerPartition.get()) {
continue;
}
pageBuilder.declarePosition();
for (int i = 0; i < outputChannels.length; i++) {
int channel = outputChannels[i];
Type type = types.get(channel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ public Page getOutput()
if (repeats > 0) {
// copy input values to output page
// NOTE: last output type is sample weight so we skip it
pageBuilder.declarePosition();
for (int channel = 0; channel < types.size() - 1; channel++) {
Type type = types.get(channel);
type.appendTo(page.getBlock(channel), position, pageBuilder.getBlockBuilder(channel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public Page getOutput()
tableCommitter.commitTable(fragmentBuilder.build());

PageBuilder page = new PageBuilder(getTypes());
page.declarePosition();
BIGINT.writeLong(page.getBlockBuilder(0), rowCount);
return page.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ public Page getOutput()
String fragment = recordSink.commit();

PageBuilder page = new PageBuilder(TYPES);
page.declarePosition();
BIGINT.writeLong(page.getBlockBuilder(0), rowCount);
VARCHAR.writeSlice(page.getBlockBuilder(1), Slices.utf8Slice(fragment));
return page.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ public Page getOutput()
pageBuilder.reset();
while (!pageBuilder.isFull() && outputIterator.hasNext()) {
Block[] next = outputIterator.next();
pageBuilder.declarePosition();
for (int i = 0; i < next.length; i++) {
Type type = types.get(i);
type.appendTo(next[i], 0, pageBuilder.getBlockBuilder(i));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ private Page getPage()
while (!pageBuilder.isFull() && currentFlushingPartition.hasNext()) {
Block[] next = currentFlushingPartition.next();
sizeDelta += sizeOfRow(next);

pageBuilder.declarePosition();
for (int i = 0; i < outputChannels.length; i++) {
int channel = outputChannels[i];
Type type = types.get(channel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ public Page getOutput()
type.appendTo(currentPage.getBlock(channel), currentPosition, pageBuilder.getBlockBuilder(replicateChannel));
}
int offset = replicateTypes.size();

pageBuilder.declarePosition();
for (Unnester unnester : unnesters) {
if (unnester.hasNext()) {
unnester.appendNext(pageBuilder, offset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ public Page getOutput()
}

// copy output channels
pageBuilder.declarePosition();
int channel = 0;
while (channel < outputChannels.length) {
pagesIndex.appendTo(outputChannels[channel], currentPosition, pageBuilder.getBlockBuilder(channel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ public IndexSnapshot createIndexSnapshot(UnloadedIndexKeyRecordSet indexKeysReco
Page page = indexKeysRecordCursor.getPage();
int position = indexKeysRecordCursor.getPosition();
if (lookupSource.getJoinPosition(position, page) < 0) {
missingKeysPageBuilder.declarePosition();
for (int i = 0; i < blocks.length; i++) {
Block block = blocks[i];
Type type = indexKeysRecordCursor.getType(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,31 +134,29 @@ private void generateProcessMethod(ClassDefinition classDefinition, int projecti

Block trueBlock = new Block(context);
ifStatement.ifTrue(trueBlock);
if (projections == 0) {
// pageBuilder.declarePosition();

// pageBuilder.declarePosition();
trueBlock.getVariable(pageBuilderVariable)
.invokeVirtual(PageBuilder.class, "declarePosition", void.class);

// this.project_43(session, cursor, pageBuilder.getBlockBuilder(42)));
for (int projectionIndex = 0; projectionIndex < projections; projectionIndex++) {
trueBlock.pushThis()
.getVariable(sessionVariable)
.getVariable(cursorVariable);

// pageBuilder.getBlockBuilder(0)
trueBlock.getVariable(pageBuilderVariable)
.invokeVirtual(PageBuilder.class, "declarePosition", void.class);
}
else {
// this.project_43(session, cursor, pageBuilder.getBlockBuilder(42)));
for (int projectionIndex = 0; projectionIndex < projections; projectionIndex++) {
trueBlock.pushThis()
.getVariable(sessionVariable)
.getVariable(cursorVariable);

// pageBuilder.getBlockBuilder(0)
trueBlock.getVariable(pageBuilderVariable)
.push(projectionIndex)
.invokeVirtual(PageBuilder.class, "getBlockBuilder", BlockBuilder.class, int.class);

// project(block..., blockBuilder)
trueBlock.invokeVirtual(classDefinition.getType(),
"project_" + projectionIndex,
type(void.class),
type(ConnectorSession.class),
type(RecordCursor.class),
type(BlockBuilder.class));
}
.push(projectionIndex)
.invokeVirtual(PageBuilder.class, "getBlockBuilder", BlockBuilder.class, int.class);

// project(block..., blockBuilder)
trueBlock.invokeVirtual(classDefinition.getType(),
"project_" + projectionIndex,
type(void.class),
type(ConnectorSession.class),
type(RecordCursor.class),
type(BlockBuilder.class));
}
forLoopBody.append(ifStatement.build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,36 +159,34 @@ private void generateProcessMethod(ClassDefinition classDefinition, RowExpressio
.build()))
.ifTrue(trueBlock);

if (projections.size() == 0) {
trueBlock.getVariable(pageBuilderVariable)
.invokeVirtual(PageBuilder.class, "declarePosition", void.class);
}
else {
for (int projectionIndex = 0; projectionIndex < projections.size(); projectionIndex++) {
List<Integer> inputChannels = getInputChannels(projections.get(projectionIndex));

trueBlock.pushThis()
.getVariable(sessionVariable)
.append(pushBlockVariables(context, inputChannels))
.getVariable(positionVariable);

trueBlock.comment("pageBuilder.getBlockBuilder(" + projectionIndex + ")")
.getVariable(pageBuilderVariable)
.push(projectionIndex)
.invokeVirtual(PageBuilder.class, "getBlockBuilder", BlockBuilder.class, int.class);

trueBlock.comment("project_" + projectionIndex + "(session, block_" + inputChannels + ", position, blockBuilder)")
.invokeVirtual(classDefinition.getType(),
"project_" + projectionIndex,
type(void.class),
ImmutableList.<ParameterizedType>builder()
.add(type(ConnectorSession.class))
.addAll(nCopies(inputChannels.size(), type(com.facebook.presto.spi.block.Block.class)))
.add(type(int.class))
.add(type(BlockBuilder.class))
.build());
}
trueBlock.getVariable(pageBuilderVariable)
.invokeVirtual(PageBuilder.class, "declarePosition", void.class);

for (int projectionIndex = 0; projectionIndex < projections.size(); projectionIndex++) {
List<Integer> inputChannels = getInputChannels(projections.get(projectionIndex));

trueBlock.pushThis()
.getVariable(sessionVariable)
.append(pushBlockVariables(context, inputChannels))
.getVariable(positionVariable);

trueBlock.comment("pageBuilder.getBlockBuilder(" + projectionIndex + ")")
.getVariable(pageBuilderVariable)
.push(projectionIndex)
.invokeVirtual(PageBuilder.class, "getBlockBuilder", BlockBuilder.class, int.class);

trueBlock.comment("project_" + projectionIndex + "(session, block_" + inputChannels + ", position, blockBuilder)")
.invokeVirtual(classDefinition.getType(),
"project_" + projectionIndex,
type(void.class),
ImmutableList.<ParameterizedType>builder()
.add(type(ConnectorSession.class))
.addAll(nCopies(inputChannels.size(), type(com.facebook.presto.spi.block.Block.class)))
.add(type(int.class))
.add(type(BlockBuilder.class))
.build());
}

loopBody.append(filterBlock.build());

method.getBody()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public void testAppendTo()

PageBuilder pageBuilder = new PageBuilder(groupByHash.getTypes());
for (int i = 0; i < groupByHash.getGroupCount(); i++) {
pageBuilder.declarePosition();
groupByHash.appendValuesTo(i, pageBuilder, 0);
}
Page page = pageBuilder.build();
Expand Down Expand Up @@ -111,6 +112,7 @@ public void testAppendToMultipleTuplesPerGroup()

PageBuilder pageBuilder = new PageBuilder(groupByHash.getTypes());
for (int i = 0; i < groupByHash.getGroupCount(); i++) {
pageBuilder.declarePosition();
groupByHash.appendValuesTo(i, pageBuilder, 0);
}
Page outputPage = pageBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public TestNumericHistogramAggregation()
PageBuilder builder = new PageBuilder(ImmutableList.of(BIGINT, DOUBLE, DOUBLE));

for (int i = 0; i < 100; i++) {
builder.declarePosition();

BIGINT.writeLong(builder.getBlockBuilder(0), numberOfBuckets);
DOUBLE.writeDouble(builder.getBlockBuilder(1), i); // value
DOUBLE.writeDouble(builder.getBlockBuilder(2), 1); // weight
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ public static Page createInputPage()
LineItemGenerator lineItemGenerator = new LineItemGenerator(1, 1, 1);
Iterator<LineItem> iterator = lineItemGenerator.iterator();
for (int i = 0; i < 10_000; i++) {
pageBuilder.declarePosition();

LineItem lineItem = iterator.next();
DOUBLE.writeDouble(pageBuilder.getBlockBuilder(EXTENDED_PRICE), lineItem.getExtendedPrice());
DOUBLE.writeDouble(pageBuilder.getBlockBuilder(DISCOUNT), lineItem.getDiscount());
Expand Down Expand Up @@ -155,6 +157,7 @@ public int process(ConnectorSession session, Page page, int start, int end, Page

private static void project(int position, PageBuilder pageBuilder, Block extendedPriceBlock, Block discountBlock)
{
pageBuilder.declarePosition();
if (discountBlock.isNull(position) || extendedPriceBlock.isNull(position)) {
pageBuilder.getBlockBuilder(0).appendNull();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public void testSingleChannel(boolean hashEnabled)
}

// write position to output block
pageBuilder.declarePosition();
hashStrategy.appendTo(leftBlockIndex, leftBlockPosition, pageBuilder, 0);
}

Expand Down Expand Up @@ -217,6 +218,7 @@ public void testMultiChannel(boolean hashEnabled)
}

// write position to output block
pageBuilder.declarePosition();
hashStrategy.appendTo(leftBlockIndex, leftBlockPosition, pageBuilder, 0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public void testSingleChannel(boolean hashEnabled)
for (int position = 0; position < page.getPositionCount(); position++) {
assertTrue(joinProbe.advanceNextPosition());

pageBuilder.declarePosition();
joinProbe.appendTo(pageBuilder);

assertEquals(joinProbe.getCurrentJoinPosition(), lookupSource.getJoinPosition(position, page));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,11 @@ public Page build()
Block[] blocks = new Block[blockBuilders.length];
for (int i = 0; i < blocks.length; i++) {
blocks[i] = blockBuilders[i].build();
if (blocks[i].getPositionCount() != declaredPositions) {
throw new IllegalStateException(String.format("Declared positions (%s) does not match block %s's number of entries (%s)", declaredPositions, i, blocks[i].getPositionCount()));
}
}

return new Page(blocks);
}
}

0 comments on commit edf3eac

Please sign in to comment.