Skip to content

Commit

Permalink
[FLINK-16179][hive] Use configuration from TableFactory in hive conne…
Browse files Browse the repository at this point in the history
…ctor (apache#11201)
  • Loading branch information
JingsongLi authored Feb 25, 2020
1 parent 3fdd856 commit fa3f564
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.table.sinks.OutputFormatTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -71,19 +70,16 @@ public TableSource<BaseRow> createTableSource(TableSourceFactory.Context context
boolean isGeneric = Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));

if (!isGeneric) {
return createHiveTableSource(context.getObjectIdentifier().toObjectPath(), table);
return new HiveTableSource(
new JobConf(hiveConf),
context.getConfiguration(),
context.getObjectIdentifier().toObjectPath(),
table);
} else {
return TableFactoryUtil.findAndCreateTableSource(context);
}
}

/**
* Creates and configures a {@link StreamTableSource} using the given {@link CatalogTable}.
*/
private StreamTableSource<BaseRow> createHiveTableSource(ObjectPath tablePath, CatalogTable table) {
return new HiveTableSource(new JobConf(hiveConf), tablePath, table);
}

@Override
public TableSink<Row> createTableSink(TableSinkFactory.Context context) {
CatalogTable table = checkNotNull(context.getTable());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connectors.hive.read.HiveTableInputFormat;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
Expand Down Expand Up @@ -80,6 +79,7 @@ public class HiveTableSource implements
private static final Logger LOG = LoggerFactory.getLogger(HiveTableSource.class);

private final JobConf jobConf;
private final ReadableConfig flinkConf;
private final ObjectPath tablePath;
private final CatalogTable catalogTable;
// Remaining partition specs after partition pruning is performed. Null if pruning is not pushed down.
Expand All @@ -92,8 +92,10 @@ public class HiveTableSource implements
private boolean isLimitPushDown = false;
private long limit = -1L;

public HiveTableSource(JobConf jobConf, ObjectPath tablePath, CatalogTable catalogTable) {
public HiveTableSource(
JobConf jobConf, ReadableConfig flinkConf, ObjectPath tablePath, CatalogTable catalogTable) {
this.jobConf = Preconditions.checkNotNull(jobConf);
this.flinkConf = Preconditions.checkNotNull(flinkConf);
this.tablePath = Preconditions.checkNotNull(tablePath);
this.catalogTable = Preconditions.checkNotNull(catalogTable);
this.hiveVersion = Preconditions.checkNotNull(jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION),
Expand All @@ -103,14 +105,19 @@ public HiveTableSource(JobConf jobConf, ObjectPath tablePath, CatalogTable catal
}

// A constructor mainly used to create copies during optimizations like partition pruning and projection push down.
private HiveTableSource(JobConf jobConf, ObjectPath tablePath, CatalogTable catalogTable,
List<Map<String, String>> remainingPartitions,
String hiveVersion,
boolean partitionPruned,
int[] projectedFields,
boolean isLimitPushDown,
long limit) {
private HiveTableSource(
JobConf jobConf,
ReadableConfig flinkConf,
ObjectPath tablePath,
CatalogTable catalogTable,
List<Map<String, String>> remainingPartitions,
String hiveVersion,
boolean partitionPruned,
int[] projectedFields,
boolean isLimitPushDown,
long limit) {
this.jobConf = Preconditions.checkNotNull(jobConf);
this.flinkConf = Preconditions.checkNotNull(flinkConf);
this.tablePath = Preconditions.checkNotNull(tablePath);
this.catalogTable = Preconditions.checkNotNull(catalogTable);
this.remainingPartitions = remainingPartitions;
Expand All @@ -134,12 +141,15 @@ public DataStream<BaseRow> getDataStream(StreamExecutionEnvironment execEnv) {
@SuppressWarnings("unchecked")
TypeInformation<BaseRow> typeInfo =
(TypeInformation<BaseRow>) TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
Configuration conf = GlobalConfiguration.loadConfiguration();
HiveTableInputFormat inputFormat = getInputFormat(allHivePartitions, conf.getBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER));

HiveTableInputFormat inputFormat = getInputFormat(
allHivePartitions,
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER));

DataStreamSource<BaseRow> source = execEnv.createInput(inputFormat, typeInfo);

if (conf.getBoolean(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM)) {
int max = conf.getInteger(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX);
if (flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM)) {
int max = flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX);
if (max < 1) {
throw new IllegalConfigurationException(
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX.key() +
Expand All @@ -166,7 +176,13 @@ public DataStream<BaseRow> getDataStream(StreamExecutionEnvironment execEnv) {
@VisibleForTesting
HiveTableInputFormat getInputFormat(List<HiveTablePartition> allHivePartitions, boolean useMapRedReader) {
return new HiveTableInputFormat(
jobConf, catalogTable, allHivePartitions, projectedFields, limit, hiveVersion, useMapRedReader);
jobConf,
catalogTable,
allHivePartitions,
projectedFields,
limit,
hiveVersion,
useMapRedReader);
}

@Override
Expand Down Expand Up @@ -198,8 +214,17 @@ public boolean isLimitPushedDown() {

@Override
public TableSource<BaseRow> applyLimit(long limit) {
return new HiveTableSource(jobConf, tablePath, catalogTable, remainingPartitions, hiveVersion,
partitionPruned, projectedFields, true, limit);
return new HiveTableSource(
jobConf,
flinkConf,
tablePath,
catalogTable,
remainingPartitions,
hiveVersion,
partitionPruned,
projectedFields,
true,
limit);
}

@Override
Expand All @@ -213,11 +238,35 @@ public TableSource<BaseRow> applyPartitionPruning(List<Map<String, String>> rema
if (catalogTable.getPartitionKeys() == null || catalogTable.getPartitionKeys().size() == 0) {
return this;
} else {
return new HiveTableSource(jobConf, tablePath, catalogTable, remainingPartitions, hiveVersion,
true, projectedFields, isLimitPushDown, limit);
return new HiveTableSource(
jobConf,
flinkConf,
tablePath,
catalogTable,
remainingPartitions,
hiveVersion,
true,
projectedFields,
isLimitPushDown,
limit);
}
}

@Override
public TableSource<BaseRow> projectFields(int[] fields) {
return new HiveTableSource(
jobConf,
flinkConf,
tablePath,
catalogTable,
remainingPartitions,
hiveVersion,
partitionPruned,
fields,
isLimitPushDown,
limit);
}

private List<HiveTablePartition> initAllPartitions() {
List<HiveTablePartition> allHivePartitions = new ArrayList<>();
// Please note that the following directly accesses Hive metastore, which is only a temporary workaround.
Expand Down Expand Up @@ -323,10 +372,4 @@ public String explainSource() {
}
return TableConnectorUtils.generateRuntimeName(getClass(), getTableSchema().getFieldNames()) + explain;
}

@Override
public TableSource<BaseRow> projectFields(int[] fields) {
return new HiveTableSource(jobConf, tablePath, catalogTable, remainingPartitions, hiveVersion,
partitionPruned, fields, isLimitPushDown, limit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,16 @@
package org.apache.flink.connectors.hive;

import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.connectors.hive.read.HiveMapredSplitReader;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connectors.hive.read.HiveTableInputFormat;
import org.apache.flink.connectors.hive.read.HiveTableInputSplit;
import org.apache.flink.connectors.hive.read.HiveVectorizedOrcSplitReader;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.HiveVersionTestUtil;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableUtils;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
Expand All @@ -40,13 +38,13 @@
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.utils.TableTestUtil;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.FileUtils;

import com.klarna.hiverunner.HiveShell;
import com.klarna.hiverunner.annotations.HiveSQL;
Expand All @@ -63,11 +61,8 @@

import javax.annotation.Nullable;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -79,6 +74,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;

Expand Down Expand Up @@ -415,84 +411,86 @@ public void testParallelismSetting() {
}

@Test
public void testVectorReaderSwitch() throws Exception {
public void testSourceConfig() throws Exception {
// vector reader not available for 1.x and we're not testing orc for 2.0.x
Assume.assumeTrue(HiveVersionTestUtil.HIVE_210_OR_LATER);
Map<String, String> env = System.getenv();
hiveShell.execute("create database db1");
try {
hiveShell.execute("create table db1.src (x int,y string) stored as orc");
hiveShell.execute("insert into db1.src values (1,'a'),(2,'b')");
testVectorReader(true);
testVectorReader(false);
testSourceConfig(true, true);
testSourceConfig(false, false);
} finally {
TestBaseUtils.setEnv(env);
hiveShell.execute("drop database db1 cascade");
}
}

private void testVectorReader(boolean fallback) throws Exception {
File tmpDir = Files.createTempDirectory(null).toFile();
Runtime.getRuntime().addShutdownHook(new Thread(() -> FileUtils.deleteDirectoryQuietly(tmpDir)));

File flinkConf = new File(tmpDir, GlobalConfiguration.FLINK_CONF_FILENAME);
Configuration conf = new Configuration();
conf.setBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, fallback);
BootstrapTools.writeConfiguration(conf, flinkConf);
Map<String, String> map = new HashMap<>(System.getenv());
map.put(ConfigConstants.ENV_FLINK_CONF_DIR, tmpDir.getAbsolutePath());
TestBaseUtils.setEnv(map);
private void testSourceConfig(boolean fallbackMR, boolean inferParallelism) throws Exception {
HiveTableFactory tableFactorySpy = spy((HiveTableFactory) hiveCatalog.getTableFactory().get());

ObjectPath tablePath = new ObjectPath("db1", "src");
CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(tablePath);
doAnswer(invocation -> {
TableSourceFactory.Context context = invocation.getArgument(0);
return new TestConfigSource(
new JobConf(hiveCatalog.getHiveConf()),
context.getConfiguration(),
context.getObjectIdentifier().toObjectPath(),
context.getTable(),
fallbackMR,
inferParallelism);
}).when(tableFactorySpy).createTableSource(any(TableSourceFactory.Context.class));

HiveTableFactory tableFactorySpy = spy((HiveTableFactory) hiveCatalog.getTableFactory().get());
doReturn(new TestVectorReaderSource(new JobConf(hiveCatalog.getHiveConf()), tablePath, catalogTable))
.when(tableFactorySpy).createTableSource(any(TableSourceFactory.Context.class));
HiveCatalog catalogSpy = spy(hiveCatalog);
doReturn(Optional.of(tableFactorySpy)).when(catalogSpy).getTableFactory();

TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
tableEnv.getConfig().getConfiguration().setBoolean(
HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, fallbackMR);
tableEnv.getConfig().getConfiguration().setBoolean(
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, inferParallelism);
tableEnv.getConfig().getConfiguration().setInteger(
ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
tableEnv.registerCatalog(catalogSpy.getName(), catalogSpy);
tableEnv.useCatalog(catalogSpy.getName());

List<Row> results = TableUtils.collectToList(tableEnv.sqlQuery("select * from db1.src order by x"));
assertEquals("[1,a, 2,b]", results.toString());
}

// A sub-class of HiveTableSource to test vector reader switch.
private static class TestVectorReaderSource extends HiveTableSource {
private final JobConf jobConf;
private final CatalogTable catalogTable;

TestVectorReaderSource(JobConf jobConf, ObjectPath tablePath, CatalogTable catalogTable) {
super(jobConf, tablePath, catalogTable);
this.jobConf = jobConf;
this.catalogTable = catalogTable;
/**
* A sub-class of HiveTableSource to test vector reader switch.
*/
private static class TestConfigSource extends HiveTableSource {
private final boolean fallbackMR;
private final boolean inferParallelism;

TestConfigSource(
JobConf jobConf,
ReadableConfig flinkConf,
ObjectPath tablePath,
CatalogTable catalogTable,
boolean fallbackMR,
boolean inferParallelism) {
super(jobConf, flinkConf, tablePath, catalogTable);
this.fallbackMR = fallbackMR;
this.inferParallelism = inferParallelism;
}

@Override
HiveTableInputFormat getInputFormat(List<HiveTablePartition> allHivePartitions, boolean useMapRedReader) {
return new TestVectorReaderInputFormat(
jobConf, catalogTable, allHivePartitions, null, -1, hiveCatalog.getHiveVersion(), useMapRedReader);
}
}

// A sub-class of HiveTableInputFormat to test vector reader switch.
private static class TestVectorReaderInputFormat extends HiveTableInputFormat {

private static final long serialVersionUID = 1L;

TestVectorReaderInputFormat(JobConf jobConf, CatalogTable catalogTable, List<HiveTablePartition> partitions,
int[] projectedFields, long limit, String hiveVersion, boolean useMapRedReader) {
super(jobConf, catalogTable, partitions, projectedFields, limit, hiveVersion, useMapRedReader);
public DataStream<BaseRow> getDataStream(StreamExecutionEnvironment execEnv) {
DataStreamSource<BaseRow> dataStream = (DataStreamSource<BaseRow>) super.getDataStream(execEnv);
int parallelism = dataStream.getTransformation().getParallelism();
assertEquals(inferParallelism ? 1 : 2, parallelism);
return dataStream;
}

@Override
public void open(HiveTableInputSplit split) throws IOException {
super.open(split);
boolean fallback = GlobalConfiguration.loadConfiguration().getBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER);
assertTrue((fallback && reader instanceof HiveMapredSplitReader) || (!fallback && reader instanceof HiveVectorizedOrcSplitReader));
HiveTableInputFormat getInputFormat(
List<HiveTablePartition> allHivePartitions,
boolean useMapRedReader) {
assertEquals(useMapRedReader, fallbackMR);
return super.getInputFormat(allHivePartitions, useMapRedReader);
}
}

Expand Down

0 comments on commit fa3f564

Please sign in to comment.