Skip to content

Commit

Permalink
[FLINK-15912][table] Support create table source/sink by context in s…
Browse files Browse the repository at this point in the history
…ql-cli
  • Loading branch information
JingsongLi authored and wuchong committed Feb 22, 2020
1 parent f6895da commit 306a89a
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,12 @@
import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.client.config.Environment;
import org.apache.flink.table.client.config.entries.DeploymentEntry;
import org.apache.flink.table.client.config.entries.ExecutionEntry;
import org.apache.flink.table.client.config.entries.SinkTableEntry;
import org.apache.flink.table.client.config.entries.SourceSinkTableEntry;
import org.apache.flink.table.client.config.entries.SourceTableEntry;
Expand All @@ -72,7 +73,9 @@
import org.apache.flink.table.factories.ModuleFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.factories.TableSinkFactory;
import org.apache.flink.table.factories.TableSinkFactoryContextImpl;
import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.table.factories.TableSourceFactoryContextImpl;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.FunctionService;
Expand Down Expand Up @@ -374,25 +377,37 @@ private Catalog createCatalog(String name, Map<String, String> catalogProperties
return factory.createCatalog(name, catalogProperties);
}

private static TableSource<?> createTableSource(ExecutionEntry execution, Map<String, String> sourceProperties, ClassLoader classLoader) {
if (execution.isStreamingPlanner()) {
private TableSource<?> createTableSource(String name, Map<String, String> sourceProperties) {
if (environment.getExecution().isStreamingPlanner()) {
final TableSourceFactory<?> factory = (TableSourceFactory<?>)
TableFactoryService.find(TableSourceFactory.class, sourceProperties, classLoader);
return factory.createTableSource(sourceProperties);
} else if (execution.isBatchPlanner()) {
return factory.createTableSource(new TableSourceFactoryContextImpl(
ObjectIdentifier.of(
tableEnv.getCurrentCatalog(),
tableEnv.getCurrentDatabase(),
name),
CatalogTableImpl.fromProperties(sourceProperties),
tableEnv.getConfig().getConfiguration()));
} else if (environment.getExecution().isBatchPlanner()) {
final BatchTableSourceFactory<?> factory = (BatchTableSourceFactory<?>)
TableFactoryService.find(BatchTableSourceFactory.class, sourceProperties, classLoader);
return factory.createBatchTableSource(sourceProperties);
}
throw new SqlExecutionException("Unsupported execution type for sources.");
}

private static TableSink<?> createTableSink(ExecutionEntry execution, Map<String, String> sinkProperties, ClassLoader classLoader) {
if (execution.isStreamingPlanner()) {
private TableSink<?> createTableSink(String name, Map<String, String> sinkProperties) {
if (environment.getExecution().isStreamingPlanner()) {
final TableSinkFactory<?> factory = (TableSinkFactory<?>)
TableFactoryService.find(TableSinkFactory.class, sinkProperties, classLoader);
return factory.createTableSink(sinkProperties);
} else if (execution.isBatchPlanner()) {
return factory.createTableSink(new TableSinkFactoryContextImpl(
ObjectIdentifier.of(
tableEnv.getCurrentCatalog(),
tableEnv.getCurrentDatabase(),
name),
CatalogTableImpl.fromProperties(sinkProperties),
tableEnv.getConfig().getConfiguration()));
} else if (environment.getExecution().isBatchPlanner()) {
final BatchTableSinkFactory<?> factory = (BatchTableSinkFactory<?>)
TableFactoryService.find(BatchTableSinkFactory.class, sinkProperties, classLoader);
return factory.createBatchTableSink(sinkProperties);
Expand Down Expand Up @@ -567,10 +582,10 @@ private void initializeCatalogs() {
Map<String, TableSink<?>> tableSinks = new HashMap<>();
environment.getTables().forEach((name, entry) -> {
if (entry instanceof SourceTableEntry || entry instanceof SourceSinkTableEntry) {
tableSources.put(name, createTableSource(environment.getExecution(), entry.asMap(), classLoader));
tableSources.put(name, createTableSource(name, entry.asMap()));
}
if (entry instanceof SinkTableEntry || entry instanceof SourceSinkTableEntry) {
tableSinks.put(name, createTableSink(environment.getExecution(), entry.asMap(), classLoader));
tableSinks.put(name, createTableSink(name, entry.asMap()));
}
});
// register table sources
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.SchemaValidator;
import org.apache.flink.table.factories.StreamTableSinkFactory;
import org.apache.flink.table.factories.TableSinkFactory;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.StreamTableSink;
import org.apache.flink.table.sinks.TableSink;
Expand Down Expand Up @@ -93,12 +92,10 @@ public List<String> supportedProperties() {
}

@Override
public StreamTableSink<Row> createStreamTableSink(Map<String, String> properties) {
final DescriptorProperties params = new DescriptorProperties(true);
params.putProperties(properties);
public StreamTableSink<Row> createTableSink(TableSinkFactory.Context context) {
return new TestTableSink(
SchemaValidator.deriveTableSinkSchema(params),
properties.get(testProperty));
context.getTable().getSchema(),
context.getTable().getProperties().get(testProperty));
}

// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.SchemaValidator;
import org.apache.flink.table.factories.StreamTableSourceFactory;
import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.table.sources.DefinedProctimeAttribute;
import org.apache.flink.table.sources.DefinedRowtimeAttributes;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
Expand Down Expand Up @@ -96,14 +97,15 @@ public List<String> supportedProperties() {
}

@Override
public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
public StreamTableSource<Row> createTableSource(TableSourceFactory.Context context) {
TableSchema schema = context.getTable().getSchema();
final DescriptorProperties params = new DescriptorProperties(true);
params.putProperties(properties);
params.putProperties(context.getTable().getProperties());
final Optional<String> proctime = SchemaValidator.deriveProctimeAttribute(params);
final List<RowtimeAttributeDescriptor> rowtime = SchemaValidator.deriveRowtimeAttributes(params);
return new TestTableSource(
TableSchemaUtils.getPhysicalSchema(params.getTableSchema(SCHEMA)),
properties.get(testProperty),
schema,
context.getTable().getProperties().get(testProperty),
proctime.orElse(null),
rowtime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,19 @@ public Map<String, String> toProperties() {

return descriptor.asMap();
}

/**
* Construct a {@link CatalogTableImpl} from complete properties that contains table schema.
*/
public static CatalogTableImpl fromProperties(Map<String, String> properties) {
DescriptorProperties descriptorProperties = new DescriptorProperties();
descriptorProperties.putProperties(properties);
TableSchema tableSchema = descriptorProperties.getTableSchema(Schema.SCHEMA);
descriptorProperties.removeKeyPrefix(Schema.SCHEMA);
return new CatalogTableImpl(
tableSchema,
descriptorProperties.asMap(),
""
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import javax.annotation.Nullable;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/**
Expand Down Expand Up @@ -80,19 +79,7 @@ public void createTemporaryTable(String path) {
" use registerTableSource/registerTableSink/registerTableSourceAndSink.");
}

Map<String, String> schemaProperties = schemaDescriptor.toProperties();
TableSchema tableSchema = getTableSchema(schemaProperties);

Map<String, String> properties = new HashMap<>(toProperties());
schemaProperties.keySet().forEach(properties::remove);

CatalogTableImpl catalogTable = new CatalogTableImpl(
tableSchema,
properties,
""
);

registration.createTemporaryTable(path, catalogTable);
registration.createTemporaryTable(path, CatalogTableImpl.fromProperties(toProperties()));
}

private TableSchema getTableSchema(Map<String, String> schemaProperties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -143,6 +144,24 @@ public void putPropertiesWithPrefix(String prefix, Map<String, String> prop) {
}
}

/**
* Removes the mapping for a key prefix from this properties if it is present.
*
* <p>For example: for prefix "flink", the kvs in properties like key "flink.k" and
* value "v" will be removed.
*/
public void removeKeyPrefix(String prefix) {
checkNotNull(prefix);

Iterator<Map.Entry<String, String>> iterator = properties.entrySet().iterator();
while (iterator.hasNext()) {
String key = iterator.next().getKey();
if (key.startsWith(prefix)) {
iterator.remove();
}
}
}

/**
* Adds a class under the given key.
*/
Expand Down

0 comments on commit 306a89a

Please sign in to comment.