Skip to content

Commit

Permalink
[FLINK-18369] Fix instable TableEnvironmentITCase#testStatementSetWit…
Browse files Browse the repository at this point in the history
…hSameSinkTableNames

Replaced TestingOverwritableTableSink with UnsafeMemoryAppendTableSink
as the first uses DataSet#writeAsText. This sink cannot be used twice
with the same path in a single JobGraph.
  • Loading branch information
dawidwys committed Jul 10, 2020
1 parent fce502c commit e5bf5dc
Showing 1 changed file with 6 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,6 @@ class TableEnvironmentITCase(
}

@Test
@Ignore
def testStatementSetWithSameSinkTableNames(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = BatchTableEnvironment.create(env)
Expand All @@ -543,15 +542,15 @@ class TableEnvironmentITCase(
val t = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTable", t)

val sinkPath = _tempFolder.newFile().getAbsolutePath
val configuredSink = new TestingOverwritableTableSink(sinkPath)
MemoryTableSourceSinkUtil.clear()
val configuredSink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink()
.configure(Array("d", "e", "f"), Array(INT, LONG, STRING))
tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal("MySink", configuredSink)
assertTrue(FileUtils.readFileUtf8(new File(sinkPath)).isEmpty)
tEnv.asInstanceOf[TableEnvironmentInternal]
.registerTableSinkInternal("MySink", configuredSink)

val stmtSet = tEnv.createStatementSet()
stmtSet.addInsert("MySink", tEnv.sqlQuery("select * from MyTable where a > 2"), true)
.addInsertSql("INSERT OVERWRITE MySink SELECT a, b, c FROM MyTable where a <= 2")
stmtSet.addInsert("MySink", tEnv.sqlQuery("select * from MyTable where a > 2"))
.addInsertSql("INSERT INTO MySink SELECT a, b, c FROM MyTable where a <= 2")

val tableResult = stmtSet.execute()
// wait job finished
Expand Down

0 comments on commit e5bf5dc

Please sign in to comment.