Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make write an operator as part of the execution plan #32015

Merged
merged 51 commits into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
edc51bd
Fix read_tfrecords_benchmark nightly test
jianoaix Dec 8, 2022
61f4d6d
Merge branch 'master' of https://github.com/ray-project/ray
jianoaix Dec 14, 2022
a33a943
Merge branch 'master' of https://github.com/ray-project/ray
jianoaix Dec 16, 2022
36ebe52
Merge branch 'master' of https://github.com/ray-project/ray
jianoaix Dec 16, 2022
ce6763e
Merge branch 'master' of https://github.com/ray-project/ray
jianoaix Dec 19, 2022
0e2c29e
Merge branch 'master' of https://github.com/ray-project/ray
jianoaix Dec 21, 2022
f2b6ed0
Merge branch 'master' of https://github.com/ray-project/ray
jianoaix Dec 22, 2022
bb6c5c4
Merge branch 'master' of https://github.com/ray-project/ray
jianoaix Jan 4, 2023
540fe79
Merge branch 'master' of https://github.com/ray-project/ray
jianoaix Jan 10, 2023
edad7d0
Merge branch 'master' of https://github.com/ray-project/ray
jianoaix Jan 10, 2023
60cc079
Merge branch 'master' of https://github.com/ray-project/ray
jianoaix Jan 11, 2023
a3d3980
Merge branch 'master' of https://github.com/ray-project/ray
jianoaix Jan 12, 2023
001579c
Merge branch 'master' of https://github.com/ray-project/ray
jianoaix Jan 17, 2023
8aeed6c
Merge branch 'master' of https://github.com/ray-project/ray
jianoaix Jan 18, 2023
7a9a49b
Merge branch 'master' of https://github.com/ray-project/ray
jianoaix Jan 19, 2023
ef97167
Merge branch 'master' of https://github.com/ray-project/ray
jianoaix Jan 20, 2023
6f0563c
Merge branch 'master' of https://github.com/ray-project/ray
jianoaix Jan 21, 2023
bcec4d6
Merge branch 'master' of https://github.com/ray-project/ray
jianoaix Jan 24, 2023
ddef4e5
Merge branch 'master' of https://github.com/ray-project/ray
jianoaix Jan 25, 2023
fc9a175
Merge branch 'master' of https://github.com/ray-project/ray
jianoaix Jan 25, 2023
f0e90b7
Merge branch 'master' of https://github.com/ray-project/ray
jianoaix Jan 26, 2023
0c820ea
Merge branch 'master' of https://github.com/ray-project/ray into writ…
jianoaix Jan 26, 2023
253da6a
Make write an operator as part of the execution plan
jianoaix Jan 27, 2023
514ec14
fix / fix do_write
jianoaix Jan 28, 2023
204ea5f
Merge branch 'master' of https://github.com/ray-project/ray into writ…
jianoaix Jan 28, 2023
b75bf49
Fix the merge
jianoaix Jan 28, 2023
6a28257
fix arg passing
jianoaix Jan 28, 2023
3082e52
lint
jianoaix Jan 28, 2023
5ddfdc0
Reconcile taskcontext
jianoaix Jan 31, 2023
3843ff4
Reconcile taskcontext continued
jianoaix Jan 31, 2023
012a413
Merge branch 'master' of https://github.com/ray-project/ray into writ…
jianoaix Jan 31, 2023
84a74f0
Use task context in write op
jianoaix Jan 31, 2023
bb2a474
fix test
jianoaix Jan 31, 2023
ad5f7c7
feedback: backward compatibility
jianoaix Jan 31, 2023
a77053a
fix
jianoaix Jan 31, 2023
554171a
test write fusion
jianoaix Jan 31, 2023
1ba1b9f
Result of write operator; datasource callbacks
jianoaix Feb 1, 2023
5ced246
Handle an empty list on failure
jianoaix Feb 3, 2023
43eca29
execute the plan in-place in write_datasource
jianoaix Feb 3, 2023
f25d54b
Keep write_datasource semantics diff-neutral regarding the plan
jianoaix Feb 3, 2023
c5ddf07
Merge branch 'master' of https://github.com/ray-project/ray into writ…
jianoaix Feb 3, 2023
1d58e13
disable the write_XX in new optimizer: it's not supported yet
jianoaix Feb 3, 2023
d309dbd
fix comment
jianoaix Feb 3, 2023
21a50db
refactor: do_write() calls direct_write() to reduce code duplication
jianoaix Feb 4, 2023
a84e27b
refactor: for mongo datasource do_write
jianoaix Feb 4, 2023
8879df0
backward compatible
jianoaix Feb 7, 2023
d6873e1
rename: direct_write -> write
jianoaix Feb 7, 2023
10ef980
unnecessary test removed
jianoaix Feb 7, 2023
48e9415
fix
jianoaix Feb 7, 2023
87dc925
deprecation message/logging
jianoaix Feb 7, 2023
b77ca8d
deprecation logging
jianoaix Feb 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix
  • Loading branch information
jianoaix committed Feb 7, 2023
commit 48e94158d1e652fc1e9cd552321c81a3dff46bbd
6 changes: 2 additions & 4 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2658,9 +2658,7 @@ def write_datasource(
# WriteResult (one element per write task). Otherwise, an error will
# be raised. The Datasource can handle execution outcomes with the
# on_write_complete() and on_write_failed().
def transform(
blocks: Iterable[Block], ctx, fn
) -> Iterable[Block]:
def transform(blocks: Iterable[Block], ctx, fn) -> Iterable[Block]:
return [[datasource.write(blocks, ctx, **write_args)]]

plan = self._plan.with_stage(
Expand All @@ -2675,7 +2673,7 @@ def transform(
try:
self._write_ds = Dataset(plan, self._epoch, self._lazy).fully_executed()
datasource.on_write_complete(
self._write_ds._plan.execute().get_blocks()
ray.get(self._write_ds._plan.execute().get_blocks())
)
except Exception as e:
datasource.on_write_failed([], e)
Expand Down
4 changes: 2 additions & 2 deletions python/ray/data/datasource/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,8 @@ def write(
ray.get(tasks)
return "ok"

def on_write_complete(self, write_results: List[ObjectRef[WriteResult]]) -> None:
assert all(ray.get(w) == ["ok"] for w in write_results), write_results
def on_write_complete(self, write_results: List[WriteResult]) -> None:
assert all(w == ["ok"] for w in write_results), write_results
self.num_ok += 1

def on_write_failed(
Expand Down
4 changes: 1 addition & 3 deletions python/ray/data/datasource/file_based_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@

from ray.data._internal.arrow_block import ArrowRow
from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
from ray.data._internal.block_list import BlockMetadata
from ray.data._internal.execution.interfaces import TaskContext
from ray.data._internal.output_buffer import BlockOutputBuffer
from ray.data._internal.remote_fn import cached_remote_fn
from ray.data._internal.util import _check_pyarrow_version, _resolve_custom_scheme
from ray.data.block import Block, BlockAccessor
from ray.data.context import DatasetContext
Expand All @@ -36,7 +34,7 @@
)

from ray.types import ObjectRef
from ray.util.annotations import Deprecated, DeveloperAPI, PublicAPI
from ray.util.annotations import DeveloperAPI, PublicAPI
from ray._private.utils import _add_creatable_buckets_param_if_s3_uri

if TYPE_CHECKING:
Expand Down
6 changes: 2 additions & 4 deletions python/ray/data/datasource/mongo_datasource.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Any, Dict, List, Optional, TYPE_CHECKING
from typing import Dict, List, Optional, TYPE_CHECKING

from ray.data.datasource.datasource import Datasource, Reader, ReadTask, WriteResult
from ray.data.block import (
Expand All @@ -8,10 +8,8 @@
BlockMetadata,
)
from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
from ray.data._internal.remote_fn import cached_remote_fn
from ray.data._internal.execution.interfaces import TaskContext
from ray.types import ObjectRef
from ray.util.annotations import Deprecated, PublicAPI
from ray.util.annotations import PublicAPI
from typing import Iterable

if TYPE_CHECKING:
Expand Down
1 change: 1 addition & 0 deletions python/ray/data/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -5516,6 +5516,7 @@ def test_ragged_tensors(ray_start_regular_shared):
ArrowVariableShapedTensorType(dtype=new_type, ndim=3),
]


class LoggerWarningCalled(Exception):
"""Custom exception used in test_warning_execute_with_no_cpu() and
test_nowarning_execute_with_cpu(). Raised when the `logger.warning` method
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/tests/test_dataset_formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def write(b):
return "ok"

def on_write_complete(self, write_results: List[WriteResult]) -> None:
assert all(ray.get(w) == ["ok"] for w in write_results), write_results
assert all(w == ["ok"] for w in write_results), write_results
self.num_ok += 1

def on_write_failed(
Expand Down