Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-18726][table-planner-blink] Support INSERT INTO specific colum… #14977

Merged
merged 2 commits into from
Mar 1, 2021

Conversation

docete
Copy link
Contributor

@docete docete commented Feb 22, 2021

…ns in blink planner

What is the purpose of the change

This PR supports INSERT INTO specific columns in blink planner.

Brief change log

  • aa8dede: Support INSERT INTO specific columns in blink planner

Verifying this change

This change added tests.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit aa8dede (Mon Feb 22 03:24:06 UTC 2021)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Feb 22, 2021

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

@wuchong
Copy link
Member

wuchong commented Feb 22, 2021

cc @leonardBang

Copy link
Contributor

@leonardBang leonardBang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @docete for the contribution, the PR generally LGTM, I only left minor comment about exception message, could you help update it?

tEnv.createTemporaryView("MyTable", t)

expectedEx.expect(classOf[ValidationException])
expectedEx.expectMessage("Column 'a' has no default value and does not allow NULLs")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flink does not support default value yet, we can improve exception message here, how about improve to

Suggested change
expectedEx.expectMessage("Column 'a' has no default value and does not allow NULLs")
expectedEx.expectMessage("Column 'a' does not allow NULL value, thus it can not be used in partial insert")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, will update soon.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check the error msg again and found the original one is more accuracy. So i will revert the change.

@sllence
Copy link

sllence commented Feb 24, 2021

https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java

else if (validated instanceof RichSqlInsert) {
            SqlNodeList targetColumnList = ((RichSqlInsert) validated).getTargetColumnList();
            if (targetColumnList != null && targetColumnList.size() != 0) {
                throw new ValidationException("Partial inserts are not supported");
            }
            return Optional.of(converter.convertSqlInsert((RichSqlInsert) validated));
        }

Does it need to be changed here?

Copy link
Contributor Author

@docete docete left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert the change and rebase master

tEnv.createTemporaryView("MyTable", t)

expectedEx.expect(classOf[ValidationException])
expectedEx.expectMessage("Column 'a' has no default value and does not allow NULLs")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check the error msg again and found the original one is more accuracy. So i will revert the change.

Copy link
Contributor

@leonardBang leonardBang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @docete for the update, LGTM

@leonardBang
Copy link
Contributor

leonardBang commented Feb 24, 2021

https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java

else if (validated instanceof RichSqlInsert) {
            SqlNodeList targetColumnList = ((RichSqlInsert) validated).getTargetColumnList();
            if (targetColumnList != null && targetColumnList.size() != 0) {
                throw new ValidationException("Partial inserts are not supported");
            }
            return Optional.of(converter.convertSqlInsert((RichSqlInsert) validated));
        }

Does it need to be changed here?

Nice catch, I think we can remove this check safely . @docete

@sllence
Copy link

sllence commented Feb 24, 2021

https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java

else if (validated instanceof RichSqlInsert) {
            SqlNodeList targetColumnList = ((RichSqlInsert) validated).getTargetColumnList();
            if (targetColumnList != null && targetColumnList.size() != 0) {
                throw new ValidationException("Partial inserts are not supported");
            }
            return Optional.of(converter.convertSqlInsert((RichSqlInsert) validated));
        }

Does it need to be changed here?

Nice catch, I think we can remove this check safely . @docete

Sorry,I've looked at it again. This verification is in the old planner. It should not be modified

@docete
Copy link
Contributor Author

docete commented Feb 25, 2021

@flinkbot run azure


tEnv.executeSql(
s"""
|INSERT INTO testSink (c, d, e)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a test which reorder the specified columns, e.g. INSERT INTO .. (e, c, d). I tried in my local machine and it fails.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@wuchong
Copy link
Member

wuchong commented Mar 1, 2021

The failed e2e is because of time out.
Will merge this PR.

@wuchong wuchong merged commit cb4bc98 into apache:master Mar 1, 2021
@docete docete deleted the FLINK-18726 branch March 2, 2021 06:59
targetPosition: util.List[Int]): util.ArrayList[SqlNode] = {
val targetList = new Array[SqlNode](sourceList.size())
0 until sourceList.size() foreach {
idx => targetList(targetPosition.get(idx)) = sourceList.get(idx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here should be idx => targetList(idx) = sourceList.get(targetPosition.get(idx)) What do you think? @leonardBang @wuchong

Copy link
Member

@wuchong wuchong Mar 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any tests to prove this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Test
  def testPartialInsertWithComplexReorder(): Unit = {
    tEnv.executeSql(
      s"""
         |CREATE TABLE testSink (
         |  `a` INT,
         |  `c` STRING,
         |  `c1` STRING,
         |  `c2` STRING,
         |  `c3` BIGINT,
         |  `d` INT,
         |  `e` DOUBLE
         |)
         |WITH (
         |  'connector' = 'values',
         |  'sink-insert-only' = 'false'
         |)
         |""".stripMargin)

    val t = env.fromCollection(tupleData2).toTable(tEnv, 'x, 'y)
    tEnv.createTemporaryView("MyTable", t)

    tEnv.executeSql(
      s"""
         |INSERT INTO testSink (a,c2,e,c,c1,c3,d)
         |SELECT 1,'c2',sum(y),'c','c1',33333,12 FROM MyTable GROUP BY x
         |""".stripMargin).await()
    val expected = List(
      "1,c,c1,c2,33333,12,0.1",
      "1,c,c1,c2,33333,12,0.4",
      "1,c,c1,c2,33333,12,1.0",
      "1,c,c1,c2,33333,12,2.2",
      "1,c,c1,c2,33333,12,3.9")
    val result = TestValuesTableFactory.getResults("testSink")
    assertEquals(expected.sorted, result.sorted)
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I think we should use plan testing instead of itcase.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
7 participants