diff --git a/presto-main/src/main/java/com/facebook/presto/PagesIndexPageSorter.java b/presto-main/src/main/java/com/facebook/presto/PagesIndexPageSorter.java new file mode 100644 index 000000000000..387ffff44d75 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/PagesIndexPageSorter.java @@ -0,0 +1,51 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto; + +import com.facebook.presto.operator.PagesIndex; +import com.facebook.presto.spi.Page; +import com.facebook.presto.spi.PageSorter; +import com.facebook.presto.spi.block.SortOrder; +import com.facebook.presto.spi.type.Type; + +import java.util.List; + +import static com.facebook.presto.operator.SyntheticAddress.decodePosition; +import static com.facebook.presto.operator.SyntheticAddress.decodeSliceIndex; + +public class PagesIndexPageSorter + implements PageSorter +{ + @Override + public long[] sort(List types, List pages, List sortTypes, List sortChannels, List sortOrders, int expectedPositions) + { + PagesIndex pagesIndex = new PagesIndex(types, expectedPositions); + pages.forEach(pagesIndex::addPage); + pagesIndex.sort(sortTypes, sortChannels, sortOrders); + + return pagesIndex.getValueAddresses().toLongArray(null); + } + + @Override + public int decodePageIndex(long address) + { + return decodeSliceIndex(address); + } + + @Override + public int decodePositionIndex(long address) + { + return decodePosition(address); + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java index f9ce56eaca82..43793ea110eb 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java +++ b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java @@ -13,6 +13,7 @@ */ package com.facebook.presto.server; +import com.facebook.presto.PagesIndexPageSorter; import com.facebook.presto.block.BlockEncodingManager; import com.facebook.presto.client.QueryResults; import com.facebook.presto.connector.ConnectorManager; @@ -50,6 +51,7 @@ import com.facebook.presto.spi.ConnectorPageSinkProvider; import com.facebook.presto.spi.ConnectorPageSourceProvider; import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.PageSorter; import com.facebook.presto.spi.block.BlockEncodingFactory; import com.facebook.presto.spi.block.BlockEncodingSerde; import com.facebook.presto.spi.block.FixedWidthBlockEncoding; @@ -269,6 +271,9 @@ protected void setup(Binder binder) // thread execution visualizer jaxrsBinder(binder).bind(QueryExecutionResource.class); + + // PageSorter + binder.bind(PageSorter.class).to(PagesIndexPageSorter.class).in(Scopes.SINGLETON); } @Provides diff --git a/presto-main/src/test/java/com/facebook/presto/BenchmarkPagesIndexPageSorter.java b/presto-main/src/test/java/com/facebook/presto/BenchmarkPagesIndexPageSorter.java new file mode 100644 index 000000000000..91fe037e9ce7 --- /dev/null +++ b/presto-main/src/test/java/com/facebook/presto/BenchmarkPagesIndexPageSorter.java @@ -0,0 +1,150 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto; + +import com.facebook.presto.block.BlockAssertions; +import com.facebook.presto.spi.Page; +import com.facebook.presto.spi.PageSorter; +import com.facebook.presto.spi.block.Block; +import com.facebook.presto.spi.block.BlockBuilderStatus; +import com.facebook.presto.spi.type.Type; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.openjdk.jmh.runner.options.VerboseMode; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static com.facebook.presto.spi.block.SortOrder.ASC_NULLS_FIRST; +import static com.facebook.presto.spi.type.BigintType.BIGINT; +import static com.facebook.presto.spi.type.BooleanType.BOOLEAN; +import static com.facebook.presto.spi.type.DoubleType.DOUBLE; +import static com.facebook.presto.spi.type.VarcharType.VARCHAR; +import static java.util.Collections.nCopies; + +@State(Scope.Thread) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(3) +@Warmup(iterations = 10, time = 500, timeUnit = TimeUnit.MILLISECONDS) +@Measurement(iterations = 10, time = 500, timeUnit = TimeUnit.MILLISECONDS) +public class BenchmarkPagesIndexPageSorter +{ + @Benchmark + public int runBenchmark(BenchmarkData data) + { + PageSorter pageSorter = new PagesIndexPageSorter(); + long[] addresses = pageSorter.sort(data.types, data.pages, data.sortTypes, data.sortChannels, nCopies(data.sortChannels.size(), ASC_NULLS_FIRST), 10_000); + return addresses.length; + } + + private static List createPages(int pageCount, int channelCount, Type type) + { + int positionCount = BlockBuilderStatus.DEFAULT_MAX_PAGE_SIZE_IN_BYTES / (channelCount * 8); + + List pages = new ArrayList<>(pageCount); + for (int numPage = 0; numPage < pageCount; numPage++) { + Block[] blocks = new Block[channelCount]; + for (int numChannel = 0; numChannel < channelCount; numChannel++) { + if (type.equals(BIGINT)) { + blocks[numChannel] = BlockAssertions.createLongSequenceBlock(0, positionCount); + } + else if (type.equals(VARCHAR)) { + blocks[numChannel] = BlockAssertions.createStringSequenceBlock(0, positionCount); + } + else if (type.equals(DOUBLE)) { + blocks[numChannel] = BlockAssertions.createDoubleSequenceBlock(0, positionCount); + } + else if (type.equals(BOOLEAN)) { + blocks[numChannel] = BlockAssertions.createBooleanSequenceBlock(0, positionCount); + } + else { + throw new IllegalArgumentException("Unsupported type: " + type); + } + } + pages.add(new Page(blocks)); + } + return pages; + } + + @State(Scope.Thread) + public static class BenchmarkData + { + @Param({ "2", "3", "4", "5" }) + private int numSortChannels; + + @Param({ "BIGINT", "VARCHAR", "DOUBLE", "BOOLEAN" }) + private String sortChannelType; + + private List pages; + private final int maxPages = 500; + + public List types; + public List sortTypes; + public List sortChannels; + + @Setup + public void setup() + { + int totalChannels = 20; + Type type = getType(); + + pages = createPages(maxPages, totalChannels, type); + types = nCopies(totalChannels, type); + + sortChannels = new ArrayList<>(); + for (int i = 0; i < numSortChannels; i++) { + sortChannels.add(i); + } + sortTypes = nCopies(numSortChannels, type); + } + + private Type getType() + { + switch (sortChannelType) { + case "BIGINT": + return BIGINT; + case "VARCHAR": + return VARCHAR; + case "DOUBLE": + return DOUBLE; + case "BOOLEAN": + return BOOLEAN; + } + throw new IllegalArgumentException("Unsupported type: " + sortChannelType); + } + } + + public static void main(String[] args) + throws RunnerException + { + Options options = new OptionsBuilder() + .verbosity(VerboseMode.NORMAL) + .include(".*" + BenchmarkPagesIndexPageSorter.class.getSimpleName() + ".*") + .build(); + + new Runner(options).run(); + } +} diff --git a/presto-main/src/test/java/com/facebook/presto/TestPagesIndexPageSorter.java b/presto-main/src/test/java/com/facebook/presto/TestPagesIndexPageSorter.java new file mode 100644 index 000000000000..286ee58dd8bd --- /dev/null +++ b/presto-main/src/test/java/com/facebook/presto/TestPagesIndexPageSorter.java @@ -0,0 +1,183 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto; + +import com.facebook.presto.spi.Page; +import com.facebook.presto.spi.PageBuilder; +import com.facebook.presto.spi.block.SortOrder; +import com.facebook.presto.spi.type.Type; +import com.facebook.presto.testing.MaterializedResult; +import com.google.common.collect.ImmutableList; +import com.google.common.primitives.Ints; +import org.testng.annotations.Test; + +import java.util.Collections; +import java.util.List; + +import static com.facebook.presto.SessionTestUtils.TEST_SESSION; +import static com.facebook.presto.operator.OperatorAssertion.toMaterializedResult; +import static com.facebook.presto.spi.block.SortOrder.ASC_NULLS_FIRST; +import static com.facebook.presto.spi.type.BigintType.BIGINT; +import static com.facebook.presto.spi.type.DoubleType.DOUBLE; +import static com.facebook.presto.spi.type.VarcharType.VARCHAR; +import static org.testng.Assert.assertEquals; + +public class TestPagesIndexPageSorter +{ + private static final PagesIndexPageSorter sorter = new PagesIndexPageSorter(); + + @Test + public void testPageSorter() + throws Exception + { + List types = ImmutableList.of(BIGINT, DOUBLE, VARCHAR); + List sortTypes = ImmutableList.of(BIGINT); + List sortChannels = Ints.asList(0); + List sortOrders = ImmutableList.of(ASC_NULLS_FIRST); + + List inputPages = RowPagesBuilder.rowPagesBuilder(types) + .row(2, 1.1, "d") + .row(1, 2.2, "c") + .pageBreak() + .row(-2, 2.2, "b") + .row(-12, 2.2, "a") + .build(); + + List expectedPages = RowPagesBuilder.rowPagesBuilder(types) + .row(-12, 2.2, "a") + .row(-2, 2.2, "b") + .pageBreak() + .row(1, 2.2, "c") + .row(2, 1.1, "d") + .build(); + + assertSorted(inputPages, expectedPages, types, sortTypes, sortChannels, sortOrders, 100); + } + + @Test + public void testPageSorterMultipleChannels() + throws Exception + { + List types = ImmutableList.of(BIGINT, DOUBLE, VARCHAR); + List sortTypes = ImmutableList.copyOf(types); + List sortChannels = Ints.asList(0, 1, 2); + List sortOrders = Collections.nCopies(sortChannels.size(), ASC_NULLS_FIRST); + + List inputPages = RowPagesBuilder.rowPagesBuilder(types) + .row(2, 1.1, "d") + .row(1, 2.2, "c") + .pageBreak() + .row(1, 2.2, "b") + .row(1, 2.2, "a") + .pageBreak() + .row(1, 2.2, null) + .row(1, null, "z") + .row(1, null, null) + .build(); + + List expectedPages = RowPagesBuilder.rowPagesBuilder(types) + .row(1, null, null) + .row(1, null, "z") + .row(1, 2.2, null) + .row(1, 2.2, "a") + .row(1, 2.2, "b") + .row(1, 2.2, "c") + .row(2, 1.1, "d") + .build(); + assertSorted(inputPages, expectedPages, types, sortTypes, sortChannels, sortOrders, 100); + } + + @Test + public void testPageSorterSorted() + throws Exception + { + List types = ImmutableList.of(BIGINT, DOUBLE, VARCHAR); + List sortTypes = ImmutableList.of(BIGINT); + List sortChannels = Ints.asList(0); + List sortOrders = ImmutableList.of(ASC_NULLS_FIRST); + + List inputPages = RowPagesBuilder.rowPagesBuilder(types) + .row(-12, 2.2, "a") + .row(-2, 2.2, "b") + .pageBreak() + .row(1, 2.2, "d") + .row(2, 1.1, "c") + .build(); + + List expectedPages = RowPagesBuilder.rowPagesBuilder(types) + .row(-12, 2.2, "a") + .row(-2, 2.2, "b") + .row(1, 2.2, "d") + .row(2, 1.1, "c") + .build(); + + assertSorted(inputPages, expectedPages, types, sortTypes, sortChannels, sortOrders, 100); + } + + @Test + public void testPageSorterForceExpansion() + throws Exception + { + List types = ImmutableList.of(BIGINT, DOUBLE, VARCHAR); + List sortTypes = ImmutableList.of(BIGINT); + List sortChannels = Ints.asList(0); + List sortOrders = ImmutableList.of(ASC_NULLS_FIRST); + + List inputPages = RowPagesBuilder.rowPagesBuilder(types) + .row(2, 1.1, "c") + .row(1, 2.2, "d") + .pageBreak() + .row(-2, 2.2, "b") + .row(-12, 2.2, "a") + .build(); + + List expectedPages = RowPagesBuilder.rowPagesBuilder(types) + .row(-12, 2.2, "a") + .row(-2, 2.2, "b") + .pageBreak() + .row(1, 2.2, "d") + .row(2, 1.1, "c") + .build(); + + assertSorted(inputPages, expectedPages, types, sortTypes, sortChannels, sortOrders, 2); + } + + private static void assertSorted(List inputPages, List expectedPages, List types, List sortTypes, List sortChannels, List sortOrders, int expectedPositions) + { + long[] sortedAddresses = sorter.sort(types, inputPages, sortTypes, sortChannels, sortOrders, expectedPositions); + List outputPages = createOutputPages(types, inputPages, sortedAddresses); + + MaterializedResult expected = toMaterializedResult(TEST_SESSION, types, expectedPages); + MaterializedResult actual = toMaterializedResult(TEST_SESSION, types, outputPages); + assertEquals(actual.getMaterializedRows(), expected.getMaterializedRows()); + } + + private static List createOutputPages(List types, List inputPages, long[] sortedAddresses) + { + PageBuilder pageBuilder = new PageBuilder(types); + pageBuilder.reset(); + for (long address : sortedAddresses) { + int index = sorter.decodePageIndex(address); + int position = sorter.decodePositionIndex(address); + + Page page = inputPages.get(index); + for (int i = 0; i < types.size(); i++) { + Type type = types.get(i); + type.appendTo(page.getBlock(i), position, pageBuilder.getBlockBuilder(i)); + } + pageBuilder.declarePosition(); + } + return ImmutableList.of(pageBuilder.build()); + } +} diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/PageSorter.java b/presto-spi/src/main/java/com/facebook/presto/spi/PageSorter.java new file mode 100644 index 000000000000..9e2f0f739131 --- /dev/null +++ b/presto-spi/src/main/java/com/facebook/presto/spi/PageSorter.java @@ -0,0 +1,38 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.spi; + +import com.facebook.presto.spi.block.SortOrder; +import com.facebook.presto.spi.type.Type; + +import java.util.List; + +/** + * This interface is not stable and will not be supported in future releases. + */ + +@Deprecated +public interface PageSorter +{ + /** + * @return Sorted synthetic addresses for pages. A synthetic address is encoded as a long with + * the high 32 bits containing the page index and the low 32 bits containing position index + */ + @Deprecated + long[] sort(List types, List pages, List sortTypes, List sortChannels, List sortOrders, int expectedPositions); + + int decodePageIndex(long address); + + int decodePositionIndex(long address); +}