-
Notifications
You must be signed in to change notification settings - Fork 13.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-17148][python] Support converting pandas DataFrame to Flink Table #11832
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit b463662 (Mon Apr 20 15:04:35 UTC 2020) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dianfu Thanks a lot for the PR. The code looks very good. Left some suggestions about improvement below.
class PandasConversionTestBase(object): | ||
|
||
@classmethod | ||
def setUpClass(cls): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found we should use lowercase for these test methods. However, it is not related to this PR. Maybe we can create another jira to address the problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name setUpClass
is from unittest.TestCase and I guess we can not change it.
data_dict = {} | ||
for j, name in enumerate(cls.data_type.names): | ||
data_dict[name] = [cls.data[i][j] for i in range(len(cls.data))] | ||
# need convert to numpy types |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we need to convert to NumPy types?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The integer types will be parsed as int64
by default and so we need to specify it explicitly.
"1970-01-01 00:00:00.123,[1, 2]"]) | ||
|
||
|
||
class StreamPandasConversionTests(PandasConversionITTests, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also cover the batch mode for the old planner?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most code could not be reusable for the batch mode of the old planner. So I'd like to handle it in a separate JIRA if needed.
@@ -1107,6 +1107,63 @@ def _from_elements(self, elements, schema): | |||
finally: | |||
os.unlink(temp_file.name) | |||
|
|||
def from_pandas(self, pdf, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add detailed python docs for the API.
BTW, do we plan to add Flink document for this API in another PR? If so, we can first create a jira to address it under FLINK-17146
running = false; | ||
} | ||
|
||
public abstract ArrowReader<OUT> createArrowReader(VectorSchemaRoot root); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
protected
} | ||
|
||
@Override | ||
public void initializeState(FunctionInitializationContext context) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add some log in this method? For example, LOG.info the restored information.
runner2.join(); | ||
|
||
Assert.assertNull(error[0]); | ||
Assert.assertEquals(testData.f0.size(), numOfEmittedElements.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also verify the content of the data?
|
||
Assert.assertNull(error[0]); | ||
Assert.assertNull(error[1]); | ||
Assert.assertEquals(testData.f0.size(), numOfEmittedElements.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also verify the content of the data?
arrowSourceFunction.run(sourceContext); | ||
} catch (Throwable t) { | ||
if (!t.getMessage().equals("Fail the arrow source")) { | ||
error[0] = t; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add the corresponding assert to verify that error[0] is not null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
error[0] should always be null and it has been asserted at the end of the test. I'm not sure what do you mean about verify that error[0] is not null
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ignore my comment here. You are right.
BTW. Would be great if you can rebase to the master. The interface in BaseRow has been changed, i.e., |
@hequn8128 Thanks a lot for your valuable feedback. I have updated the PR accordingly (also rebased the PR). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me and with some minor comments.
Example: | ||
:: | ||
|
||
# use the second parameter to specify custom field names |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this comment after the creation of DataFrame.
|
||
:param pdf: The pandas DataFrame. | ||
:param schema: The schema of the converted table. | ||
:type schema: RowType or list[str] or list[DataType] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
duplicate type hint.
If not specified, the default parallelism will be used. | ||
:type splits_num: int | ||
:return: The result table. | ||
:rtype: Table |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
duplicate type hint.
:param splits_num: The number of splits the given Pandas DataFrame will be split into. It | ||
determines the number of parallel source tasks. | ||
If not specified, the default parallelism will be used. | ||
:type splits_num: int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
duplicate type hint.
docs/dev/table/python/index.md
Outdated
@@ -32,5 +32,6 @@ Apache Flink has provided Python Table API support since 1.9.0. | |||
- [Installation]({{ site.baseurl }}/dev/table/python/installation.html): Introduction of how to set up the Python Table API execution environment. | |||
- [User-defined Functions]({{ site.baseurl }}/dev/table/python/python_udfs.html): Explanation of how to define Python user-defined functions. | |||
- [Vectorized User-defined Functions]({{ site.baseurl }}/dev/table/python/vectorized_python_udfs.html): Explanation of how to define vectorized Python user-defined functions. | |||
- [Conversion between PyFlink Table and Pandas DataFrame]({{ site.baseurl }}/dev/table/python/conversion_of_pandas.html): Explanation of how to convert between PyFlink Table and Pandas DataFrame. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Conversions?
docs/dev/table/python/index.zh.md
Outdated
@@ -32,5 +32,6 @@ Apache Flink has provided Python Table API support since 1.9.0. | |||
- [环境安装]({{ site.baseurl }}/zh/dev/table/python/installation.html): Introduction of how to set up the Python Table API execution environment. | |||
- [自定义函数]({{ site.baseurl }}/zh/dev/table/python/python_udfs.html): Explanation of how to define Python user-defined functions. | |||
- [自定义向量化函数]({{ site.baseurl }}/zh/dev/table/python/vectorized_python_udfs.html): Explanation of how to define vectorized Python user-defined functions. | |||
- [PyFlink Table和Pandas DataFrame互转]({{ site.baseurl }}/zh/dev/table/python/conversion_of_pandas.html): Explanation of how to convert between PyFlink Table and Pandas DataFrame. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to most copywriting guidelines, it's better to leave a blank between an English word and a Chinese word.
pdf = pd.DataFrame(np.random.rand(1000, 2)) | ||
|
||
# Create a PyFlink Table from a Pandas DataFrame | ||
table = t_env.from_pandas(pdf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add more examples here. For example, how to specify table names, which is commonly required.
@hequn8128 Thanks for the review. Updated. |
Not sure why the CI of Azure wasn't triggered. It has succeed in my private azure pipeline: https://dev.azure.com/dianfu/Flink/_build/results?buildId=50&view=results |
@dianfu Thanks. Merging... |
…7255 * 'master' of https://github.com/apache/flink: [FLINK-15591][table] Support create/drop temporary table in both planners [FLINK-15591][sql-parser] Support parsing TEMPORARY in table definition [FLINK-17148][python] Support converting pandas DataFrame to Flink Table (apache#11832) [FLINK-17254][python][docs] Improve the PyFlink documentation and examples to use SQL DDL for source/sink definition. [FLINK-17374][travis] Further removals of travis-mentions [FLINK-17374][travis] Remove tools/travis directory [FLINK-17374][travis] Remove travis-related files [FLINK-16423][e2e] Introduce timeouts for HA tests [FLINK-17440][network] Resolve potential buffer leak in output unspilling for unaligned checkpoints [hotfix][checkpointing] Use practical ChannelStateReader instead of NO_OP [hotfix][network] Rename ResultPartitionWriter#initializeState to #readRecoveredState [hotfix][tests] Deduplicate code in SlotManagerImplTest [FLINK-16605][runtime] Make the SlotManager respect the max limitation for slots [FLINK-16605][runtime] Pass the slotmanager.max-number-of-slots to the SlotManagerImpl [FLINK-16605][core][config] Add slotmanager.max-number-of-slots config option [hotfix][runtime] Add sanity check to SlotManagerConfiguration [FLINK-17455][table][filesystem] Move FileSystemFormatFactory to table common [FLINK-17391][filesystem] sink.rolling-policy.time.interval default value should be bigger [FLINK-17414][python][docs] Update the documentation about PyFlink build about Cython support
What is the purpose of the change
This pull request add the support for converting pandas dataframe to flink table.
Brief change log
Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation