Skip to content

Commit

Permalink
[FLINK-26506][python] Support StreamExecutionEnvironment.registerCach…
Browse files Browse the repository at this point in the history
…edFile in Python DataStream API

This closes apache#19011.
  • Loading branch information
SteNicholas authored and dianfu committed Mar 15, 2022
1 parent c3244c3 commit eac5b97
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 1 deletion.
16 changes: 16 additions & 0 deletions flink-python/pyflink/datastream/stream_execution_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,22 @@ def get_execution_plan(self) -> str:
j_stream_graph = self._generate_stream_graph(False)
return j_stream_graph.getStreamingPlanAsJSON()

def register_cached_file(self, file_path: str, name: str, executable: bool = False):
"""
Registers a file at the distributed cache under the given name. The file will be accessible
from any user-defined function in the (distributed) runtime under a local path. Files may be
local files (which will be distributed via BlobServer), or files in a distributed file
system. The runtime will copy the files temporarily to a local cache, if needed.
:param file_path: The path of the file, as a URI (e.g. "file:https:///some/path" or
hdfs:https://host:port/and/path").
:param name: The name under which the file is registered.
:param executable: Flag indicating whether the file should be executable.
.. versionadded:: 1.16.0
"""
self._j_stream_execution_environment.registerCachedFile(file_path, name, executable)

@staticmethod
def get_execution_environment() -> 'StreamExecutionEnvironment':
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -717,5 +717,17 @@ def test_register_slot_sharing_group(self):
MemorySize.of_mebi_bytes(200))
self.assertFalse(j_resource_profile_3.isPresent())

def test_register_cached_file(self):
texts = ['machen', 'zeit', 'heerscharen', 'keiner', 'meine']
text_path = self.tempdir + '/text_file'
with open(text_path, 'a') as f:
for text in texts:
f.write(text)
f.write('\n')
self.env.register_cached_file(text_path, 'cache_test')
cached_files = self.env._j_stream_execution_environment.getCachedFiles()
self.assertEqual(cached_files.size(), 1)
self.assertEqual(cached_files[0].getField(0), 'cache_test')

def tearDown(self) -> None:
self.test_sink.clear()
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def excluded_methods(cls):
# 'isForceCheckpointing', 'getNumberOfExecutionRetries', 'setNumberOfExecutionRetries'
# is deprecated, exclude them.
return {'getLastJobExecutionResult', 'getId', 'getIdString',
'registerCachedFile', 'createCollectionsEnvironment', 'createLocalEnvironment',
'createCollectionsEnvironment', 'createLocalEnvironment',
'createRemoteEnvironment', 'addOperator', 'fromElements',
'resetContextEnvironment', 'getCachedFiles', 'generateSequence',
'getNumberOfExecutionRetries', 'getStreamGraph', 'fromParallelCollection',
Expand Down

0 comments on commit eac5b97

Please sign in to comment.