Skip to content

Commit

Permalink
Add configuration options for OrcReader
Browse files Browse the repository at this point in the history
  • Loading branch information
nileema committed Apr 30, 2015
1 parent f72104a commit 19a7799
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package com.facebook.presto.orc;

import io.airlift.units.DataSize;
import io.airlift.units.DataSize.Unit;

import java.io.File;
import java.io.FileNotFoundException;
Expand All @@ -26,12 +25,6 @@ public class FileOrcDataSource
{
private final RandomAccessFile input;

public FileOrcDataSource(File path, DataSize maxMergeDistance)
throws IOException
{
this(path, maxMergeDistance, new DataSize(Integer.MAX_VALUE, Unit.BYTE), new DataSize(0, Unit.BYTE));
}

public FileOrcDataSource(File path, DataSize maxMergeDistance, DataSize maxReadSize, DataSize streamBufferSize)
throws FileNotFoundException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ private static Vector createResultsVector(ObjectInspector objectInspector)
private static OrcRecordReader createCustomOrcRecordReader(TempFile tempFile, MetadataReader metadataReader, OrcPredicate predicate)
throws IOException
{
OrcDataSource orcDataSource = new FileOrcDataSource(tempFile.getFile(), new DataSize(1, Unit.MEGABYTE));
OrcDataSource orcDataSource = new FileOrcDataSource(tempFile.getFile(), new DataSize(1, Unit.MEGABYTE), new DataSize(1, Unit.MEGABYTE), new DataSize(1, Unit.MEGABYTE));
OrcReader orcReader = new OrcReader(orcDataSource, metadataReader);

assertEquals(orcReader.getColumnNames(), ImmutableList.of("test"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public class OrcStorageManager
private final String nodeId;
private final StorageService storageService;
private final DataSize orcMaxMergeDistance;
private final DataSize orcMaxReadSize;
private final DataSize orcStreamBufferSize;
private final ShardRecoveryManager recoveryManager;
private final Duration recoveryTimeout;
private final long maxShardRows;
Expand All @@ -91,6 +93,8 @@ public OrcStorageManager(
this(currentNodeId.toString(),
storageService,
config.getOrcMaxMergeDistance(),
config.getOrcMaxReadSize(),
config.getOrcStreamBufferSize(),
recoveryManager,
config.getShardRecoveryTimeout(),
config.getMaxShardRows(),
Expand All @@ -102,6 +106,8 @@ public OrcStorageManager(
String nodeId,
StorageService storageService,
DataSize orcMaxMergeDistance,
DataSize orcMaxReadSize,
DataSize orcStreamBufferSize,
ShardRecoveryManager recoveryManager,
Duration shardRecoveryTimeout,
long maxShardRows,
Expand All @@ -111,6 +117,9 @@ public OrcStorageManager(
this.nodeId = checkNotNull(nodeId, "nodeId is null");
this.storageService = checkNotNull(storageService, "storageService is null");
this.orcMaxMergeDistance = checkNotNull(orcMaxMergeDistance, "orcMaxMergeDistance is null");
this.orcMaxReadSize = checkNotNull(orcMaxReadSize, "orcMaxReadSize is null");
this.orcStreamBufferSize = checkNotNull(orcStreamBufferSize, "orcStreamBufferSize is null");

this.recoveryManager = checkNotNull(recoveryManager, "recoveryManager is null");
this.recoveryTimeout = checkNotNull(shardRecoveryTimeout, "shardRecoveryTimeout is null");

Expand Down Expand Up @@ -232,7 +241,7 @@ OrcDataSource openShard(UUID shardUuid)
}

try {
return new FileOrcDataSource(file, orcMaxMergeDistance);
return new FileOrcDataSource(file, orcMaxMergeDistance, orcMaxMergeDistance, orcMaxMergeDistance);
}
catch (IOException e) {
throw new PrestoException(RAPTOR_ERROR, "Failed to open shard file: " + file, e);
Expand All @@ -241,7 +250,7 @@ OrcDataSource openShard(UUID shardUuid)

private List<ColumnStats> computeShardStats(File file, List<Long> columnIds, List<Type> types)
{
try (OrcDataSource dataSource = new FileOrcDataSource(file, orcMaxMergeDistance)) {
try (OrcDataSource dataSource = new FileOrcDataSource(file, orcMaxMergeDistance, orcMaxReadSize, orcStreamBufferSize)) {
OrcReader reader = new OrcReader(dataSource, new OrcMetadataReader());

ImmutableList.Builder<ColumnStats> list = ImmutableList.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class StorageManagerConfig
private Duration shardRecoveryTimeout = new Duration(30, TimeUnit.SECONDS);
private Duration missingShardDiscoveryInterval = new Duration(5, TimeUnit.MINUTES);
private DataSize orcMaxMergeDistance = new DataSize(1, MEGABYTE);
private DataSize orcMaxReadSize = new DataSize(8, MEGABYTE);
private DataSize orcStreamBufferSize = new DataSize(8, MEGABYTE);
private int recoveryThreads = 10;

private long maxShardRows = 1_000_000;
Expand Down Expand Up @@ -84,6 +86,32 @@ public StorageManagerConfig setOrcMaxMergeDistance(DataSize orcMaxMergeDistance)
return this;
}

@NotNull
public DataSize getOrcMaxReadSize()
{
return orcMaxReadSize;
}

@Config("storage.orc.max-read-size")
public StorageManagerConfig setOrcMaxReadSize(DataSize orcMaxReadSize)
{
this.orcMaxReadSize = orcMaxReadSize;
return this;
}

@NotNull
public DataSize getOrcStreamBufferSize()
{
return orcStreamBufferSize;
}

@Config("storage.orc.stream-buffer-size")
public StorageManagerConfig setOrcStreamBufferSize(DataSize orcStreamBufferSize)
{
this.orcStreamBufferSize = orcStreamBufferSize;
return this;
}

public Duration getShardRecoveryTimeout()
{
return shardRecoveryTimeout;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ public class TestOrcStorageManager
private static final ConnectorSession SESSION = new ConnectorSession("user", UTC_KEY, ENGLISH, System.currentTimeMillis(), null);
private static final String CURRENT_NODE = "node";
private static final DataSize ORC_MAX_MERGE_DISTANCE = new DataSize(1, MEGABYTE);
private static final DataSize ORC_MAX_READ_SIZE = new DataSize(1, MEGABYTE);
private static final DataSize ORC_STREAM_BUFFER_SIZE = new DataSize(1, MEGABYTE);
private static final Duration SHARD_RECOVERY_TIMEOUT = new Duration(30, TimeUnit.SECONDS);
private static final DataSize MAX_BUFFER_SIZE = new DataSize(256, MEGABYTE);
private static final int MAX_SHARD_ROWS = 100;
Expand Down Expand Up @@ -431,7 +433,7 @@ public static OrcStorageManager createOrcStorageManager(StorageService storageSe

public static OrcStorageManager createOrcStorageManager(StorageService storageService, ShardRecoveryManager recoveryManager, int maxShardRows, DataSize maxFileSize)
{
return new OrcStorageManager(CURRENT_NODE, storageService, ORC_MAX_MERGE_DISTANCE, recoveryManager, SHARD_RECOVERY_TIMEOUT, maxShardRows, maxFileSize, MAX_BUFFER_SIZE);
return new OrcStorageManager(CURRENT_NODE, storageService, ORC_MAX_MERGE_DISTANCE, ORC_MAX_READ_SIZE, ORC_STREAM_BUFFER_SIZE, recoveryManager, SHARD_RECOVERY_TIMEOUT, maxShardRows, maxFileSize, MAX_BUFFER_SIZE);
}

private static void assertColumnStats(List<ColumnStats> list, long columnId, Object min, Object max)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void testWriter()
writer.appendPages(rowPagesBuilder.build());
}

try (FileOrcDataSource dataSource = new FileOrcDataSource(file, new DataSize(1, Unit.MEGABYTE))) {
try (FileOrcDataSource dataSource = new FileOrcDataSource(file, new DataSize(1, Unit.MEGABYTE), new DataSize(1, Unit.MEGABYTE), new DataSize(1, Unit.MEGABYTE))) {
OrcRecordReader reader = createReader(dataSource, columnIds);
assertEquals(reader.getTotalRowCount(), 3);
assertEquals(reader.getPosition(), 0);
Expand Down Expand Up @@ -151,7 +151,7 @@ public void testWriterZeroRows()
// no rows
}

try (FileOrcDataSource dataSource = new FileOrcDataSource(file, new DataSize(1, Unit.MEGABYTE))) {
try (FileOrcDataSource dataSource = new FileOrcDataSource(file, new DataSize(1, Unit.MEGABYTE), new DataSize(1, Unit.MEGABYTE), new DataSize(1, Unit.MEGABYTE))) {
OrcRecordReader reader = createReaderNoRows(dataSource);
assertEquals(reader.getTotalRowCount(), 0);
assertEquals(reader.getPosition(), 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public void testDefaults()
.setDataDirectory(null)
.setBackupDirectory(null)
.setOrcMaxMergeDistance(new DataSize(1, MEGABYTE))
.setOrcMaxReadSize(new DataSize(8, MEGABYTE))
.setOrcStreamBufferSize(new DataSize(8, MEGABYTE))
.setShardRecoveryTimeout(new Duration(30, SECONDS))
.setMissingShardDiscoveryInterval(new Duration(5, MINUTES))
.setRecoveryThreads(10)
Expand All @@ -57,6 +59,8 @@ public void testExplicitPropertyMappings()
.put("storage.data-directory", "/data")
.put("storage.backup-directory", "/backup")
.put("storage.orc.max-merge-distance", "16kB")
.put("storage.orc.max-read-size", "16kB")
.put("storage.orc.stream-buffer-size", "16kB")
.put("storage.shard-recovery-timeout", "1m")
.put("storage.missing-shard-discovery-interval", "4m")
.put("storage.max-recovery-threads", "12")
Expand All @@ -69,6 +73,8 @@ public void testExplicitPropertyMappings()
.setDataDirectory(new File("/data"))
.setBackupDirectory(new File("/backup"))
.setOrcMaxMergeDistance(new DataSize(16, KILOBYTE))
.setOrcMaxReadSize(new DataSize(16, KILOBYTE))
.setOrcStreamBufferSize(new DataSize(16, KILOBYTE))
.setShardRecoveryTimeout(new Duration(1, MINUTES))
.setMissingShardDiscoveryInterval(new Duration(4, MINUTES))
.setRecoveryThreads(12)
Expand Down

0 comments on commit 19a7799

Please sign in to comment.