Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Restructure stdout logging #43360

Merged
merged 3 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Clean up stdout logging
Signed-off-by: Cheng Su <[email protected]>
  • Loading branch information
c21 committed Feb 22, 2024
commit 3ecf1ce230d30ad28e8b99cd7782c85b9ebff712
10 changes: 10 additions & 0 deletions python/ray/data/_internal/dataset_logger.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import os
from typing import Optional

import ray
from ray._private.ray_constants import LOGGER_FORMAT, LOGGER_LEVEL
Expand Down Expand Up @@ -37,6 +38,8 @@ def __init__(self, log_name: str):
self.log_name = log_name
# Lazily initialized in self._initialize_logger()
self._logger = None
# Lazily initialized in self._initialize_logger()
self._datasets_log_path = None

def _initialize_logger(self) -> logging.Logger:
"""Internal method to initialize the logger and the extra file handler
Expand Down Expand Up @@ -79,6 +82,7 @@ def _initialize_logger(self) -> logging.Logger:
file_log_handler.setLevel(LOGGER_LEVEL.upper())
file_log_handler.setFormatter(file_log_formatter)
logger.addHandler(file_log_handler)
self._datasets_log_path = datasets_log_path
return logger

def get_logger(self, log_to_stdout: bool = True) -> logging.Logger:
Expand All @@ -99,3 +103,9 @@ def get_logger(self, log_to_stdout: bool = True) -> logging.Logger:
self._logger = self._initialize_logger()
self._logger.propagate = log_to_stdout
return self._logger

def get_datasets_log_path(self) -> Optional[str]:
"""
Returns the Datasets log file path if it exists.
"""
return self._datasets_log_path
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ class ExecutionOptions:
streaming_split operations.
verbose_progress: Whether to report progress individually per operator. By
default, only AllToAll operators and global progress is reported. This
option is useful for performance debugging. Off by default.
option is useful for performance debugging. On by default.
"""

resource_limits: ExecutionResources = field(default_factory=ExecutionResources)
Expand All @@ -172,7 +172,7 @@ class ExecutionOptions:

actor_locality_enabled: bool = True

verbose_progress: bool = bool(int(os.environ.get("RAY_DATA_VERBOSE_PROGRESS", "0")))
verbose_progress: bool = bool(int(os.environ.get("RAY_DATA_VERBOSE_PROGRESS", "1")))

def validate(self) -> None:
"""Validate the options."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def start(self, options: ExecutionOptions):
# situations where the scheduler is unable to schedule downstream operators
# due to lack of available actors, causing an initial "pileup" of objects on
# upstream operators, leading to a spike in memory usage prior to steady state.
logger.get_logger().info(
logger.get_logger(log_to_stdout=False).info(
f"{self._name}: Waiting for {len(refs)} pool actors to start..."
)
try:
Expand Down
20 changes: 15 additions & 5 deletions python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,20 @@ def execute(
self._start_time = time.perf_counter()

if not isinstance(dag, InputDataBuffer):
logger.get_logger().info("Executing DAG %s", dag)
logger.get_logger().info("Execution config: %s", self._options)
stdout_logger = logger.get_logger()
log_path = logger.get_datasets_log_path()
message = (
"Starting execution of Dataset. Monitor progress on Ray "
"Dashboard."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like the "Monitor progress on Ray Dashboard" part isn't helpful since you already receive a message when Ray initializes instructing you to view the dashboard.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wanted to emphasize that people should use dashboard for monitoring. I can also remove this, WDYT? @raulchen.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed for now.

)
if log_path is not None:
bveeramani marked this conversation as resolved.
Show resolved Hide resolved
message += f" Full log is in {log_path}"
stdout_logger.info(message)
stdout_logger.info("Execution plan of Dataset: %s\n", dag)
logger.get_logger(log_to_stdout=False).info(
"Execution config: %s", self._options)
if not self._options.verbose_progress:
logger.get_logger().info(
logger.get_logger(log_to_stdout=False).info(
"Tip: For detailed progress reporting, run "
"`ray.data.DataContext.get_current()."
"execution_options.verbose_progress = True`"
Expand Down Expand Up @@ -168,7 +178,7 @@ def shutdown(self, execution_completed: bool = True):
with self._shutdown_lock:
if not self._execution_started or self._shutdown:
return
logger.get_logger().debug(f"Shutting down {self}.")
logger.get_logger(log_to_stdout=False).debug(f"Shutting down {self}.")
_num_shutdown += 1
self._shutdown = True
# Give the scheduling loop some time to finish processing.
Expand Down Expand Up @@ -251,7 +261,7 @@ def _scheduling_loop_step(self, topology: Topology) -> bool:
"""

if DEBUG_TRACE_SCHEDULING:
logger.get_logger().info("Scheduling loop step...")
logger.get_logger(log_to_stdout=False).info("Scheduling loop step...")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one is already behind a debug flag. can keep log_to_stdout=True for easier debugging

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, changed.


# Note: calling process_completed_tasks() is expensive since it incurs
# ray.wait() overhead, so make sure to allow multiple dispatch per call for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,19 @@ def compute_additional_split_factor(
expected_block_size = None
if mem_size:
expected_block_size = mem_size / num_read_tasks
logger.get_logger().debug(
logger.get_logger(log_to_stdout=False).debug(
f"Expected in-memory size {mem_size}," f" block size {expected_block_size}"
)
size_based_splits = round(max(1, expected_block_size / target_max_block_size))
else:
size_based_splits = 1
if cur_additional_split_factor:
size_based_splits *= cur_additional_split_factor
logger.get_logger().debug(f"Size based split factor {size_based_splits}")
logger.get_logger(log_to_stdout=False).debug(
f"Size based split factor {size_based_splits}")
estimated_num_blocks = num_read_tasks * size_based_splits
logger.get_logger().debug(f"Blocks after size splits {estimated_num_blocks}")
logger.get_logger(log_to_stdout=False).debug(
f"Blocks after size splits {estimated_num_blocks}")

available_cpu_slots = ray_available_resources().get("CPU", 1)
if (
Expand Down Expand Up @@ -112,19 +114,20 @@ def _apply(self, op: PhysicalOperator, logical_op: Read):

if logical_op._parallelism == -1:
assert reason != ""
logger.get_logger().info(
logger.get_logger(log_to_stdout=False).info(
f"Using autodetected parallelism={detected_parallelism} "
f"for operator {logical_op.name} to satisfy {reason}."
)
logical_op.set_detected_parallelism(detected_parallelism)

if k is not None:
logger.get_logger().info(
logger.get_logger(log_to_stdout=False).info(
f"To satisfy the requested parallelism of {detected_parallelism}, "
f"each read task output is split into {k} smaller blocks."
)

if k is not None:
op.set_additional_split_factor(k)

logger.get_logger().debug(f"Estimated num output blocks {estimated_num_blocks}")
logger.get_logger(log_to_stdout=False).debug(
f"Estimated num output blocks {estimated_num_blocks}")
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def execute(

if _debug_limit_execution_to_num_blocks is not None:
input_blocks_list = input_blocks_list[:_debug_limit_execution_to_num_blocks]
logger.get_logger().info(
logger.get_logger(log_to_stdout=False).info(
f"Limiting execution to {len(input_blocks_list)} map tasks"
)
shuffle_map_out = [
Expand Down Expand Up @@ -112,7 +112,7 @@ def execute(

if _debug_limit_execution_to_num_blocks is not None:
output_num_blocks = _debug_limit_execution_to_num_blocks
logger.get_logger().info(
logger.get_logger(log_to_stdout=False).info(
f"Limiting execution to {output_num_blocks} reduce tasks"
)
shuffle_reduce_out = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ def __init__(
self._reduce_arg_blocks = self._reduce_arg_blocks[
:_debug_limit_execution_to_num_blocks
]
logger.get_logger().info(
logger.get_logger(log_to_stdout=False).info(
f"Limiting execution to {len(self._reduce_arg_blocks)} reduce tasks"
)

Expand Down Expand Up @@ -442,7 +442,8 @@ def execute(
merge_factor: float = 2,
_debug_limit_execution_to_num_blocks: int = None,
) -> Tuple[List[RefBundle], StatsDict]:
logger.get_logger().info("Using experimental push-based shuffle.")
logger.get_logger(log_to_stdout=False).info(
"Using experimental push-based shuffle.")
# TODO: Preemptively clear the blocks list since we will incrementally delete
# the last remaining references as we submit the dependent map tasks during the
# map-merge stage.
Expand Down Expand Up @@ -495,7 +496,8 @@ def execute(
# the logging level to DEBUG from a driver script, so just print
# verbosely for now.
# See https://github.com/ray-project/ray/issues/42002.
logger.get_logger().info(f"Push-based shuffle schedule:\n{stage}")
logger.get_logger(log_to_stdout=False).info(
f"Push-based shuffle schedule:\n{stage}")

map_fn = self._map_partition
merge_fn = self._merge
Expand All @@ -514,7 +516,7 @@ def merge(*args, **kwargs):

if _debug_limit_execution_to_num_blocks is not None:
input_blocks_list = input_blocks_list[:_debug_limit_execution_to_num_blocks]
logger.get_logger().info(
logger.get_logger(log_to_stdout=False).info(
f"Limiting execution to {len(input_blocks_list)} map tasks"
)
map_stage_iter = _MapStageIterator(
Expand Down Expand Up @@ -739,7 +741,7 @@ def _compute_shuffle_schedule(
num_output_blocks: int,
) -> _PushBasedShuffleStage:
num_cpus_total = sum(v for v in num_cpus_per_node_map.values())
logger.get_logger().info(
logger.get_logger(log_to_stdout=False).info(
f"Found {num_cpus_total} CPUs available CPUs for push-based shuffle."
)
num_tasks_per_map_merge_group = merge_factor + 1
Expand Down
5 changes: 3 additions & 2 deletions python/ray/data/datasource/file_based_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def read_task_fn():
if len(read_paths) < num_threads:
num_threads = len(read_paths)

logger.get_logger().debug(
logger.get_logger(log_to_stdout=False).debug(
f"Reading {len(read_paths)} files with {num_threads} threads."
)

Expand All @@ -261,7 +261,8 @@ def read_task_fn():
num_workers=num_threads,
)
else:
logger.get_logger().debug(f"Reading {len(read_paths)} files.")
logger.get_logger(log_to_stdout=False).debug(
f"Reading {len(read_paths)} files.")
yield from read_files(read_paths)

return read_task_fn
Expand Down
4 changes: 2 additions & 2 deletions python/ray/data/datasource/file_datasink.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def write_row_to_path():
with self.open_output_stream(write_path) as file:
self.write_row_to_file(row, file)

logger.get_logger().debug(f"Writing {write_path} file.")
logger.get_logger(log_to_stdout=False).debug(f"Writing {write_path} file.")
call_with_retry(
write_row_to_path,
description=f"write '{write_path}'",
Expand Down Expand Up @@ -262,7 +262,7 @@ def write_block_to_path():
with self.open_output_stream(write_path) as file:
self.write_block_to_file(block, file)

logger.get_logger().debug(f"Writing {write_path} file.")
logger.get_logger(log_to_stdout=False).debug(f"Writing {write_path} file.")
call_with_retry(
write_block_to_path,
description=f"write '{write_path}'",
Expand Down
Loading