From 375aa70513454b90909d07bc13e62ed0de925f9d Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Wed, 9 Aug 2023 22:37:20 -0700 Subject: [PATCH 1/4] Initial commit Signed-off-by: Balaji Veeramani --- python/ray/data/datasource/file_based_datasource.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/python/ray/data/datasource/file_based_datasource.py b/python/ray/data/datasource/file_based_datasource.py index 48f1730c43245..11b62971ed341 100644 --- a/python/ray/data/datasource/file_based_datasource.py +++ b/python/ray/data/datasource/file_based_datasource.py @@ -314,9 +314,6 @@ def write( # if an S3 URI is provided. tmp = _add_creatable_buckets_param_if_s3_uri(path) filesystem.create_dir(tmp, recursive=True) - filesystem = _wrap_s3_serialization_workaround(filesystem) - - fs = _unwrap_s3_serialization_workaround(filesystem) if self._WRITE_FILE_PER_ROW: for row_index, row in enumerate( From 9d23fef9ce0388846098a0884ce4c47425820208 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Wed, 9 Aug 2023 22:38:18 -0700 Subject: [PATCH 2/4] Appease lint Signed-off-by: Balaji Veeramani --- python/ray/data/datasource/file_based_datasource.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/data/datasource/file_based_datasource.py b/python/ray/data/datasource/file_based_datasource.py index 11b62971ed341..2938a9deac844 100644 --- a/python/ray/data/datasource/file_based_datasource.py +++ b/python/ray/data/datasource/file_based_datasource.py @@ -326,7 +326,7 @@ def write( ) write_path = os.path.join(path, filename) logger.get_logger().debug(f"Writing {write_path} file.") - with fs.open_output_stream(write_path, **open_stream_args) as f: + with filesystem.open_output_stream(write_path, **open_stream_args) as f: _write_row_to_file( f, row, @@ -344,7 +344,7 @@ def write( file_format=file_format, ) logger.get_logger().debug(f"Writing {write_path} file.") - with fs.open_output_stream(write_path, **open_stream_args) as f: + with filesystem.open_output_stream(write_path, **open_stream_args) as f: _write_block_to_file( f, block, From dfdfe0554325519f6e14debc6cb7f1d24b32c2cb Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Wed, 9 Aug 2023 22:38:41 -0700 Subject: [PATCH 3/4] Format files Signed-off-by: Balaji Veeramani --- python/ray/data/datasource/file_based_datasource.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/ray/data/datasource/file_based_datasource.py b/python/ray/data/datasource/file_based_datasource.py index 2938a9deac844..9d32f7b564c96 100644 --- a/python/ray/data/datasource/file_based_datasource.py +++ b/python/ray/data/datasource/file_based_datasource.py @@ -326,7 +326,9 @@ def write( ) write_path = os.path.join(path, filename) logger.get_logger().debug(f"Writing {write_path} file.") - with filesystem.open_output_stream(write_path, **open_stream_args) as f: + with filesystem.open_output_stream( + write_path, **open_stream_args + ) as f: _write_row_to_file( f, row, From f6c3d7193bd2b215422fa92d350e23af27c35f48 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Wed, 9 Aug 2023 22:41:13 -0700 Subject: [PATCH 4/4] Update files Signed-off-by: Balaji Veeramani --- python/ray/data/datasource/file_based_datasource.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/ray/data/datasource/file_based_datasource.py b/python/ray/data/datasource/file_based_datasource.py index 9d32f7b564c96..5004d2f840165 100644 --- a/python/ray/data/datasource/file_based_datasource.py +++ b/python/ray/data/datasource/file_based_datasource.py @@ -315,6 +315,8 @@ def write( tmp = _add_creatable_buckets_param_if_s3_uri(path) filesystem.create_dir(tmp, recursive=True) + fs = _unwrap_s3_serialization_workaround(filesystem) + if self._WRITE_FILE_PER_ROW: for row_index, row in enumerate( block.iter_rows(public_row_format=False) @@ -326,9 +328,7 @@ def write( ) write_path = os.path.join(path, filename) logger.get_logger().debug(f"Writing {write_path} file.") - with filesystem.open_output_stream( - write_path, **open_stream_args - ) as f: + with fs.open_output_stream(write_path, **open_stream_args) as f: _write_row_to_file( f, row, @@ -346,7 +346,7 @@ def write( file_format=file_format, ) logger.get_logger().debug(f"Writing {write_path} file.") - with filesystem.open_output_stream(write_path, **open_stream_args) as f: + with fs.open_output_stream(write_path, **open_stream_args) as f: _write_block_to_file( f, block,