Skip to content

Commit

Permalink
[Data] Restructure stdout logging (#43360)
Browse files Browse the repository at this point in the history
This PR is to restructure the standard output logging of Ray Data w/ motivation to provide useful information for users, because we heard from multiple users that the logging on standard output is spammy and not that useful.

Signed-off-by: Cheng Su <[email protected]>
  • Loading branch information
c21 committed Feb 26, 2024
1 parent 63d29b7 commit 2bc3bd4
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 24 deletions.
10 changes: 10 additions & 0 deletions python/ray/data/_internal/dataset_logger.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import os
from typing import Optional

import ray
from ray._private.ray_constants import LOGGER_FORMAT, LOGGER_LEVEL
Expand Down Expand Up @@ -37,6 +38,8 @@ def __init__(self, log_name: str):
self.log_name = log_name
# Lazily initialized in self._initialize_logger()
self._logger = None
# Lazily initialized in self._initialize_logger()
self._datasets_log_path = None

def _initialize_logger(self) -> logging.Logger:
"""Internal method to initialize the logger and the extra file handler
Expand Down Expand Up @@ -79,6 +82,7 @@ def _initialize_logger(self) -> logging.Logger:
file_log_handler.setLevel(LOGGER_LEVEL.upper())
file_log_handler.setFormatter(file_log_formatter)
logger.addHandler(file_log_handler)
self._datasets_log_path = datasets_log_path
return logger

def get_logger(self, log_to_stdout: bool = True) -> logging.Logger:
Expand All @@ -99,3 +103,9 @@ def get_logger(self, log_to_stdout: bool = True) -> logging.Logger:
self._logger = self._initialize_logger()
self._logger.propagate = log_to_stdout
return self._logger

def get_datasets_log_path(self) -> Optional[str]:
"""
Returns the Datasets log file path if it exists.
"""
return self._datasets_log_path
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ class ExecutionOptions:
streaming_split operations.
verbose_progress: Whether to report progress individually per operator. By
default, only AllToAll operators and global progress is reported. This
option is useful for performance debugging. Off by default.
option is useful for performance debugging. On by default.
"""

resource_limits: ExecutionResources = field(default_factory=ExecutionResources)
Expand All @@ -172,7 +172,7 @@ class ExecutionOptions:

actor_locality_enabled: bool = True

verbose_progress: bool = bool(int(os.environ.get("RAY_DATA_VERBOSE_PROGRESS", "0")))
verbose_progress: bool = bool(int(os.environ.get("RAY_DATA_VERBOSE_PROGRESS", "1")))

def validate(self) -> None:
"""Validate the options."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def start(self, options: ExecutionOptions):
# situations where the scheduler is unable to schedule downstream operators
# due to lack of available actors, causing an initial "pileup" of objects on
# upstream operators, leading to a spike in memory usage prior to steady state.
logger.get_logger().info(
logger.get_logger(log_to_stdout=False).info(
f"{self._name}: Waiting for {len(refs)} pool actors to start..."
)
try:
Expand Down
16 changes: 12 additions & 4 deletions python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,18 @@ def execute(
self._start_time = time.perf_counter()

if not isinstance(dag, InputDataBuffer):
logger.get_logger().info("Executing DAG %s", dag)
logger.get_logger().info("Execution config: %s", self._options)
stdout_logger = logger.get_logger()
log_path = logger.get_datasets_log_path()
message = "Starting execution of Dataset."
if log_path is not None:
message += f" Full log is in {log_path}"
stdout_logger.info(message)
stdout_logger.info("Execution plan of Dataset: %s\n", dag)
logger.get_logger(log_to_stdout=False).info(
"Execution config: %s", self._options
)
if not self._options.verbose_progress:
logger.get_logger().info(
logger.get_logger(log_to_stdout=False).info(
"Tip: For detailed progress reporting, run "
"`ray.data.DataContext.get_current()."
"execution_options.verbose_progress = True`"
Expand Down Expand Up @@ -168,7 +176,7 @@ def shutdown(self, execution_completed: bool = True):
with self._shutdown_lock:
if not self._execution_started or self._shutdown:
return
logger.get_logger().debug(f"Shutting down {self}.")
logger.get_logger(log_to_stdout=False).debug(f"Shutting down {self}.")
_num_shutdown += 1
self._shutdown = True
# Give the scheduling loop some time to finish processing.
Expand Down
18 changes: 12 additions & 6 deletions python/ray/data/_internal/logical/rules/set_read_parallelism.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,21 @@ def compute_additional_split_factor(
expected_block_size = None
if mem_size:
expected_block_size = mem_size / num_read_tasks
logger.get_logger().debug(
logger.get_logger(log_to_stdout=False).debug(
f"Expected in-memory size {mem_size}," f" block size {expected_block_size}"
)
size_based_splits = round(max(1, expected_block_size / target_max_block_size))
else:
size_based_splits = 1
if cur_additional_split_factor:
size_based_splits *= cur_additional_split_factor
logger.get_logger().debug(f"Size based split factor {size_based_splits}")
logger.get_logger(log_to_stdout=False).debug(
f"Size based split factor {size_based_splits}"
)
estimated_num_blocks = num_read_tasks * size_based_splits
logger.get_logger().debug(f"Blocks after size splits {estimated_num_blocks}")
logger.get_logger(log_to_stdout=False).debug(
f"Blocks after size splits {estimated_num_blocks}"
)

available_cpu_slots = ray_available_resources().get("CPU", 1)
if (
Expand Down Expand Up @@ -112,19 +116,21 @@ def _apply(self, op: PhysicalOperator, logical_op: Read):

if logical_op._parallelism == -1:
assert reason != ""
logger.get_logger().info(
logger.get_logger(log_to_stdout=False).info(
f"Using autodetected parallelism={detected_parallelism} "
f"for operator {logical_op.name} to satisfy {reason}."
)
logical_op.set_detected_parallelism(detected_parallelism)

if k is not None:
logger.get_logger().info(
logger.get_logger(log_to_stdout=False).info(
f"To satisfy the requested parallelism of {detected_parallelism}, "
f"each read task output is split into {k} smaller blocks."
)

if k is not None:
op.set_additional_split_factor(k)

logger.get_logger().debug(f"Estimated num output blocks {estimated_num_blocks}")
logger.get_logger(log_to_stdout=False).debug(
f"Estimated num output blocks {estimated_num_blocks}"
)
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def execute(

if _debug_limit_execution_to_num_blocks is not None:
input_blocks_list = input_blocks_list[:_debug_limit_execution_to_num_blocks]
logger.get_logger().info(
logger.get_logger(log_to_stdout=False).info(
f"Limiting execution to {len(input_blocks_list)} map tasks"
)
shuffle_map_out = [
Expand Down Expand Up @@ -112,7 +112,7 @@ def execute(

if _debug_limit_execution_to_num_blocks is not None:
output_num_blocks = _debug_limit_execution_to_num_blocks
logger.get_logger().info(
logger.get_logger(log_to_stdout=False).info(
f"Limiting execution to {output_num_blocks} reduce tasks"
)
shuffle_reduce_out = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ def __init__(
self._reduce_arg_blocks = self._reduce_arg_blocks[
:_debug_limit_execution_to_num_blocks
]
logger.get_logger().info(
logger.get_logger(log_to_stdout=False).info(
f"Limiting execution to {len(self._reduce_arg_blocks)} reduce tasks"
)

Expand Down Expand Up @@ -442,7 +442,9 @@ def execute(
merge_factor: float = 2,
_debug_limit_execution_to_num_blocks: int = None,
) -> Tuple[List[RefBundle], StatsDict]:
logger.get_logger().info("Using experimental push-based shuffle.")
logger.get_logger(log_to_stdout=False).info(
"Using experimental push-based shuffle."
)
# TODO: Preemptively clear the blocks list since we will incrementally delete
# the last remaining references as we submit the dependent map tasks during the
# map-merge stage.
Expand Down Expand Up @@ -495,7 +497,9 @@ def execute(
# the logging level to DEBUG from a driver script, so just print
# verbosely for now.
# See https://github.com/ray-project/ray/issues/42002.
logger.get_logger().info(f"Push-based shuffle schedule:\n{stage}")
logger.get_logger(log_to_stdout=False).info(
f"Push-based shuffle schedule:\n{stage}"
)

map_fn = self._map_partition
merge_fn = self._merge
Expand All @@ -514,7 +518,7 @@ def merge(*args, **kwargs):

if _debug_limit_execution_to_num_blocks is not None:
input_blocks_list = input_blocks_list[:_debug_limit_execution_to_num_blocks]
logger.get_logger().info(
logger.get_logger(log_to_stdout=False).info(
f"Limiting execution to {len(input_blocks_list)} map tasks"
)
map_stage_iter = _MapStageIterator(
Expand Down Expand Up @@ -739,7 +743,7 @@ def _compute_shuffle_schedule(
num_output_blocks: int,
) -> _PushBasedShuffleStage:
num_cpus_total = sum(v for v in num_cpus_per_node_map.values())
logger.get_logger().info(
logger.get_logger(log_to_stdout=False).info(
f"Found {num_cpus_total} CPUs available CPUs for push-based shuffle."
)
num_tasks_per_map_merge_group = merge_factor + 1
Expand Down
6 changes: 4 additions & 2 deletions python/ray/data/datasource/file_based_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def read_task_fn():
if len(read_paths) < num_threads:
num_threads = len(read_paths)

logger.get_logger().debug(
logger.get_logger(log_to_stdout=False).debug(
f"Reading {len(read_paths)} files with {num_threads} threads."
)

Expand All @@ -261,7 +261,9 @@ def read_task_fn():
num_workers=num_threads,
)
else:
logger.get_logger().debug(f"Reading {len(read_paths)} files.")
logger.get_logger(log_to_stdout=False).debug(
f"Reading {len(read_paths)} files."
)
yield from read_files(read_paths)

return read_task_fn
Expand Down
4 changes: 2 additions & 2 deletions python/ray/data/datasource/file_datasink.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def write_row_to_path():
with self.open_output_stream(write_path) as file:
self.write_row_to_file(row, file)

logger.get_logger().debug(f"Writing {write_path} file.")
logger.get_logger(log_to_stdout=False).debug(f"Writing {write_path} file.")
call_with_retry(
write_row_to_path,
description=f"write '{write_path}'",
Expand Down Expand Up @@ -242,7 +242,7 @@ def write_block_to_path():
with self.open_output_stream(write_path) as file:
self.write_block_to_file(block, file)

logger.get_logger().debug(f"Writing {write_path} file.")
logger.get_logger(log_to_stdout=False).debug(f"Writing {write_path} file.")
call_with_retry(
write_block_to_path,
description=f"write '{write_path}'",
Expand Down

0 comments on commit 2bc3bd4

Please sign in to comment.