-
Notifications
You must be signed in to change notification settings - Fork 13.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-18726][table-planner-blink] Support INSERT INTO specific colum… #14977
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit aa8dede (Mon Feb 22 03:24:06 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
cc @leonardBang |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @docete for the contribution, the PR generally LGTM, I only left minor comment about exception message, could you help update it?
tEnv.createTemporaryView("MyTable", t) | ||
|
||
expectedEx.expect(classOf[ValidationException]) | ||
expectedEx.expectMessage("Column 'a' has no default value and does not allow NULLs") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flink does not support default value yet, we can improve exception message here, how about improve to
expectedEx.expectMessage("Column 'a' has no default value and does not allow NULLs") | |
expectedEx.expectMessage("Column 'a' does not allow NULL value, thus it can not be used in partial insert") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good, will update soon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check the error msg again and found the original one is more accuracy. So i will revert the change.
Does it need to be changed here? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert the change and rebase master
tEnv.createTemporaryView("MyTable", t) | ||
|
||
expectedEx.expect(classOf[ValidationException]) | ||
expectedEx.expectMessage("Column 'a' has no default value and does not allow NULLs") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check the error msg again and found the original one is more accuracy. So i will revert the change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @docete for the update, LGTM
Nice catch, I think we can remove this check safely . @docete |
Sorry,I've looked at it again. This verification is in the old planner. It should not be modified |
@flinkbot run azure |
|
||
tEnv.executeSql( | ||
s""" | ||
|INSERT INTO testSink (c, d, e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a test which reorder the specified columns, e.g. INSERT INTO .. (e, c, d)
. I tried in my local machine and it fails.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
The failed e2e is because of time out. |
targetPosition: util.List[Int]): util.ArrayList[SqlNode] = { | ||
val targetList = new Array[SqlNode](sourceList.size()) | ||
0 until sourceList.size() foreach { | ||
idx => targetList(targetPosition.get(idx)) = sourceList.get(idx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here should be idx => targetList(idx) = sourceList.get(targetPosition.get(idx))
What do you think? @leonardBang @wuchong
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any tests to prove this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Test
def testPartialInsertWithComplexReorder(): Unit = {
tEnv.executeSql(
s"""
|CREATE TABLE testSink (
| `a` INT,
| `c` STRING,
| `c1` STRING,
| `c2` STRING,
| `c3` BIGINT,
| `d` INT,
| `e` DOUBLE
|)
|WITH (
| 'connector' = 'values',
| 'sink-insert-only' = 'false'
|)
|""".stripMargin)
val t = env.fromCollection(tupleData2).toTable(tEnv, 'x, 'y)
tEnv.createTemporaryView("MyTable", t)
tEnv.executeSql(
s"""
|INSERT INTO testSink (a,c2,e,c,c1,c3,d)
|SELECT 1,'c2',sum(y),'c','c1',33333,12 FROM MyTable GROUP BY x
|""".stripMargin).await()
val expected = List(
"1,c,c1,c2,33333,12,0.1",
"1,c,c1,c2,33333,12,0.4",
"1,c,c1,c2,33333,12,1.0",
"1,c,c1,c2,33333,12,2.2",
"1,c,c1,c2,33333,12,3.9")
val result = TestValuesTableFactory.getResults("testSink")
assertEquals(expected.sorted, result.sorted)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, I think we should use plan testing instead of itcase.
…ns in blink planner
What is the purpose of the change
This PR supports INSERT INTO specific columns in blink planner.
Brief change log
Verifying this change
This change added tests.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation