Skip to content

Commit

Permalink
[FLINK-25178][table] Throw exception when managed table sink needs ch…
Browse files Browse the repository at this point in the history
…eckpointing

This closes apache#18219
  • Loading branch information
SteNicholas committed Jan 10, 2022
1 parent f27c221 commit 5b16a8f
Show file tree
Hide file tree
Showing 9 changed files with 174 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,9 @@ public JobExecutionResult execute(Pipeline pipeline) throws Exception {
public JobClient executeAsync(Pipeline pipeline) throws Exception {
return executor.executeAsync(pipeline);
}

@Override
public boolean isCheckpointingEnabled() {
return executor.isCheckpointingEnabled();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,11 @@ Pipeline createPipeline(
* @throws Exception which occurs during job execution.
*/
JobClient executeAsync(Pipeline pipeline) throws Exception;

/**
* Checks whether checkpointing is enabled.
*
* @return True if checkpointing is enables, false otherwise.
*/
boolean isCheckpointingEnabled();
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,9 @@ public JobExecutionResult execute(Pipeline pipeline) throws Exception {
public JobClient executeAsync(Pipeline pipeline) {
return null;
}

@Override
public boolean isCheckpointingEnabled() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.types.RowKind;
Expand All @@ -35,7 +36,8 @@
import java.util.concurrent.atomic.AtomicReference;

/** A test {@link ManagedTableFactory}. */
public class TestManagedTableFactory implements DynamicTableSourceFactory, ManagedTableFactory {
public class TestManagedTableFactory
implements DynamicTableSourceFactory, DynamicTableSinkFactory, ManagedTableFactory {

public static final String ENRICHED_KEY = "ENRICHED_KEY";

Expand Down Expand Up @@ -105,6 +107,11 @@ public DynamicTableSource createDynamicTableSource(Context context) {
return new TestManagedTableSource(changelogMode);
}

@Override
public DynamicTableSink createDynamicTableSink(Context context) {
return new TestManagedTableSink();
}

/** Managed {@link DynamicTableSource} for testing. */
public static class TestManagedTableSource implements ScanTableSource {

Expand Down Expand Up @@ -167,4 +174,38 @@ private ChangelogMode parseChangelogMode(String string) {
}
return builder.build();
}

/** Managed {@link DynamicTableSink} for testing. */
public static class TestManagedTableSink implements DynamicTableSink {

@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
return requestedMode;
}

@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
throw new UnsupportedOperationException();
}

@Override
public DynamicTableSink copy() {
throw new UnsupportedOperationException();
}

@Override
public String asSummaryString() {
throw new UnsupportedOperationException();
}

@Override
public boolean equals(Object o) {
throw new UnsupportedOperationException();
}

@Override
public int hashCode() {
throw new UnsupportedOperationException();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ public JobClient executeAsync(Pipeline pipeline) throws Exception {
return executionEnvironment.executeAsync((StreamGraph) pipeline);
}

@Override
public boolean isCheckpointingEnabled() {
return executionEnvironment.getCheckpointConfig().isCheckpointingEnabled();
}

private void configureBatchSpecificProperties() {
executionEnvironment.getConfig().enableObjectReuse();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api.config.{ExecutionConfigOptions, TableConfigOptions}
import org.apache.flink.table.api._
import org.apache.flink.table.catalog._
import org.apache.flink.table.catalog.ManagedTableListener.isManagedTable
import org.apache.flink.table.connector.sink.DynamicTableSink
import org.apache.flink.table.delegation.{Executor, Parser, Planner}
import org.apache.flink.table.descriptors.{ConnectorDescriptorValidator, DescriptorProperties}
Expand Down Expand Up @@ -384,6 +385,15 @@ abstract class PlannerBase(
}
val catalog = toScala(catalogManager.getCatalog(objectIdentifier.getCatalogName))
val isTemporary = lookupResult.isTemporary

if (isStreamingMode && isManagedTable(catalog.get, resolvedTable) &&
!executor.isCheckpointingEnabled) {
throw new TableException(
s"You should enable the checkpointing for sinking to managed table " +
s"${objectIdentifier}, managed table relies on checkpoint to commit and " +
s"the data is visible only after commit.")
}

if (isLegacyConnectorOptions(objectIdentifier, resolvedTable.getOrigin, isTemporary)) {
val tableSink = TableFactoryUtil.findAndCreateTableSink(
catalog.orNull,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,39 @@ class TableSinkTest extends TableTestBase {

util.verifyExecPlan(stmtSet)
}

@Test
def testManagedTableSinkWithDisableCheckpointing(): Unit = {
util.addTable(
s"""
|CREATE TABLE sink (
| `a` INT,
| `b` BIGINT,
| `c` STRING
|) WITH(
|)
|""".stripMargin)
val stmtSet = util.tableEnv.createStatementSet()
stmtSet.addInsertSql("INSERT INTO sink SELECT * FROM MyTable")

util.verifyAstPlan(stmtSet)
}

@Test
def testManagedTableSinkWithEnableCheckpointing(): Unit = {
util.getStreamEnv.enableCheckpointing(10)
util.addTable(
s"""
|CREATE TABLE sink (
| `a` INT,
| `b` BIGINT,
| `c` STRING
|) WITH(
|)
|""".stripMargin)
val stmtSet = util.tableEnv.createStatementSet()
stmtSet.addInsertSql("INSERT INTO sink SELECT * FROM MyTable")

util.verifyAstPlan(stmtSet)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,47 @@ class TableSinkTest extends TableTestBase {
// source and sink has same parallelism, but sink shuffle by pk is enforced
util.verifyExplain(stmtSet, ExplainDetail.JSON_EXECUTION_PLAN)
}

@Test
def testManagedTableSinkWithDisableCheckpointing(): Unit = {
util.addTable(
s"""
|CREATE TABLE sink (
| `a` INT,
| `b` BIGINT,
| `c` STRING
|) WITH(
|)
|""".stripMargin)
val stmtSet = util.tableEnv.createStatementSet()
stmtSet.addInsertSql("INSERT INTO sink SELECT * FROM MyTable")

expectedException.expect(classOf[TableException])
expectedException.expectMessage(
s"You should enable the checkpointing for sinking to managed table " +
s"`default_catalog`.`default_database`.`sink`, " +
s"managed table relies on checkpoint to commit and " +
s"the data is visible only after commit.")
util.verifyAstPlan(stmtSet, ExplainDetail.CHANGELOG_MODE)
}

@Test
def testManagedTableSinkWithEnableCheckpointing(): Unit = {
util.getStreamEnv.enableCheckpointing(10)
util.addTable(
s"""
|CREATE TABLE sink (
| `a` INT,
| `b` BIGINT,
| `c` STRING
|) WITH(
|)
|""".stripMargin)
val stmtSet = util.tableEnv.createStatementSet()
stmtSet.addInsertSql("INSERT INTO sink SELECT * FROM MyTable")

util.verifyAstPlan(stmtSet, ExplainDetail.CHANGELOG_MODE)
}
}

/** tests table factory use ParallelSourceFunction which support parallelism by env*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,30 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean)
() => Unit)
}

/**
* Verify the AST (abstract syntax tree).
*/
def verifyAstPlan(stmtSet: StatementSet): Unit = {
doVerifyPlan(
stmtSet,
Array.empty[ExplainDetail],
withRowType = false,
Array(PlanKind.AST),
() => Unit)
}

/**
* Verify the AST (abstract syntax tree). The plans will contain the extra [[ExplainDetail]]s.
*/
def verifyAstPlan(stmtSet: StatementSet, extraDetails: ExplainDetail*): Unit = {
doVerifyPlan(
stmtSet,
extraDetails.toArray,
withRowType = false,
Array(PlanKind.AST),
() => Unit)
}

/**
* Verify the AST (abstract syntax tree) and the optimized rel plan for the given SELECT query.
*/
Expand Down

0 comments on commit 5b16a8f

Please sign in to comment.