Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[State Observability] Add warnings for data truncation + order columns as it is defined in StateSchema #27018

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions dashboard/modules/state/state_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,6 @@ async def list_logs(self, req: aiohttp.web.Request) -> aiohttp.web.Response:
node_ip = req.query.get("node_ip", None)
timeout = int(req.query.get("timeout", DEFAULT_RPC_TIMEOUT))

# TODO(sang): Do input validation from the middleware instead.
if not node_id and not node_ip:
return self._reply(
success=False,
Expand Down Expand Up @@ -354,8 +353,6 @@ async def list_logs(self, req: aiohttp.web.Request) -> aiohttp.web.Response:
@routes.get("/api/v0/logs/{media_type}")
@RateLimitedModule.enforce_max_concurrent_calls
async def get_logs(self, req: aiohttp.web.Request):
# TODO(sang): We need a better error handling for streaming
# when we refactor the server framework.
options = GetLogOptions(
timeout=int(req.query.get("timeout", DEFAULT_RPC_TIMEOUT)),
node_id=req.query.get("node_id", None),
Expand Down
9 changes: 9 additions & 0 deletions dashboard/state_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,9 @@ async def summarize_tasks(self, option: SummaryApiOptions) -> SummaryApiResponse
partial_failure_warning=result.partial_failure_warning,
warnings=result.warnings,
num_after_truncation=result.num_after_truncation,
# Currently, there's no filtering support for summary,
# so we don't calculate this separately.
num_filtered=len(result.result),
)

async def summarize_actors(self, option: SummaryApiOptions) -> SummaryApiResponse:
Expand All @@ -633,6 +636,9 @@ async def summarize_actors(self, option: SummaryApiOptions) -> SummaryApiRespons
partial_failure_warning=result.partial_failure_warning,
warnings=result.warnings,
num_after_truncation=result.num_after_truncation,
# Currently, there's no filtering support for summary,
# so we don't calculate this separately.
num_filtered=len(result.result),
)

async def summarize_objects(self, option: SummaryApiOptions) -> SummaryApiResponse:
Expand All @@ -653,6 +659,9 @@ async def summarize_objects(self, option: SummaryApiOptions) -> SummaryApiRespon
partial_failure_warning=result.partial_failure_warning,
warnings=result.warnings,
num_after_truncation=result.num_after_truncation,
# Currently, there's no filtering support for summary,
# so we don't calculate this separately.
num_filtered=len(result.result),
)

def _message_to_dict(
Expand Down
2 changes: 0 additions & 2 deletions python/ray/_private/ray_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,6 @@ def setup_and_get_worker_interceptor_logger(
is_for_stdout=is_for_stdout,
)
logger.addHandler(handler)
# TODO(sang): Add 0 or 1 to decide whether
# or not logs are streamed to drivers.
handler.setFormatter(logging.Formatter("%(message)s"))
# Avoid messages are propagated to parent loggers.
logger.propagate = False
Expand Down
6 changes: 4 additions & 2 deletions python/ray/_private/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
from ray.scripts.scripts import main as ray_main
from ray.util.queue import Empty, Queue, _QueueActor

logger = logging.getLogger(__name__)

try:
from prometheus_client.parser import text_string_to_metric_families
except (ImportError, ModuleNotFoundError):
Expand Down Expand Up @@ -355,8 +357,8 @@ def wait_for_condition(
try:
if condition_predictor(**kwargs):
return
except Exception as ex:
last_ex = ex
except Exception:
last_ex = ray._private.utils.format_error_message(traceback.format_exc())
time.sleep(retry_interval_ms / 1000.0)
message = "The condition wasn't met before the timeout expired."
if last_ex is not None:
Expand Down
138 changes: 85 additions & 53 deletions python/ray/experimental/state/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,61 +311,77 @@ def get(
assert len(result) == 1
return result[0]

def _print_api_warning(self, resource: StateResource, api_response: dict):
def _print_api_warning(
self,
resource: StateResource,
api_response: dict,
warn_data_source_not_available: bool = True,
warn_data_truncation: bool = True,
warn_limit: bool = True,
warn_server_side_warnings: bool = True,
):
"""Print the API warnings.

We print warnings for users:
1. when some data sources are not available
2. when results were truncated at the data source
3. when results were limited
4. when callsites not enabled for listing objects

Args:
resource: Resource names, i.e. 'jobs', 'actors', 'nodes',
see `StateResource` for details.
api_response: The dictionarified `ListApiResponse` or `SummaryApiResponse`.
warn_data_source_not_available: Warn when some data sources
are not available.
warn_data_truncation: Warn when results were truncated at
the data source.
warn_limit: Warn when results were limited.
warn_server_side_warnings: Warn when the server side generates warnings
(E.g., when callsites not enabled for listing objects)
"""
# Print warnings if anything was given.
warning_msgs = api_response.get("partial_failure_warning", None)
if warning_msgs:
warnings.warn(warning_msgs)

# Print warnings if data is truncated at the data source.
num_after_truncation = api_response["num_after_truncation"]
total = api_response["total"]
if total > num_after_truncation:
# NOTE(rickyyx): For now, there's not much users could do (neither can we),
# with hard truncation. Unless we allow users to set a higher
# `RAY_MAX_LIMIT_FROM_DATA_SOURCE`, the data will always be truncated at the
# data source.
warnings.warn(
(
f"{num_after_truncation} ({total} total) {resource.value} "
"are retrieved from the data source. "
f"{total - num_after_truncation} entries have been truncated. "
f"Max of {num_after_truncation} entries are retrieved from data "
"source to prevent over-sized payloads."
),
)
if warn_data_source_not_available:
warning_msgs = api_response.get("partial_failure_warning", None)
if warning_msgs:
warnings.warn(warning_msgs)

if warn_data_truncation:
# Print warnings if data is truncated at the data source.
num_after_truncation = api_response["num_after_truncation"]
total = api_response["total"]
if total > num_after_truncation:
# NOTE(rickyyx): For now, there's not much users
# could do (neither can we), with hard truncation.
# Unless we allow users to set a higher
# `RAY_MAX_LIMIT_FROM_DATA_SOURCE`, the data will
# always be truncated at the data source.
warnings.warn(
(
"The returned data may contain incomplete result. "
f"{num_after_truncation} ({total} total from the cluster) "
f"{resource.value} are retrieved from the data source. "
f"{total - num_after_truncation} entries have been truncated. "
f"Max of {num_after_truncation} entries are retrieved "
"from data source to prevent over-sized payloads."
),
)

# Print warnings if return data is limited at the API server due to
# limit enforced at the server side
num_filtered = api_response["num_filtered"]
data = api_response["result"]
if num_filtered > len(data):
warnings.warn(
(
f"{len(data)}/{num_filtered} {resource.value} returned. "
"Use `--filter` to reduce the amount of data to return or "
"setting a higher limit with `--limit` to see all data. "
),
)
if warn_limit:
# Print warnings if return data is limited at the API server due to
# limit enforced at the server side
num_filtered = api_response["num_filtered"]
data = api_response["result"]
if num_filtered > len(data):
warnings.warn(
(
f"Limit last {len(data)} entries "
f"(Total {num_filtered}). Use `--filter` to reduce "
"the amount of data to return or "
"setting a higher limit with `--limit` to see all data. "
),
)

# Print the additional warnings.
warnings_to_print = api_response.get("warnings", [])
if warnings_to_print:
for warning_to_print in warnings_to_print:
warnings.warn(warning_to_print)
if warn_server_side_warnings:
# Print the additional warnings.
warnings_to_print = api_response.get("warnings", [])
if warnings_to_print:
for warning_to_print in warnings_to_print:
warnings.warn(warning_to_print)

def _raise_on_missing_output(self, resource: StateResource, api_response: dict):
"""Raise an exception when the API resopnse contains a missing output.
Expand All @@ -380,15 +396,30 @@ def _raise_on_missing_output(self, resource: StateResource, api_response: dict):
see `StateResource` for details.
api_response: The dictionarified `ListApiResponse` or `SummaryApiResponse`.
"""
# Raise an exception if there are partial failures that cause missing output.
warning_msgs = api_response.get("partial_failure_warning", None)
# TODO(sang) raise an exception on truncation after
# https://github.com/ray-project/ray/pull/26801.
if warning_msgs:
raise RayStateApiException(
f"Failed to retrieve all {resource.value} from the cluster. "
f"It can happen when some of {resource.value} information is not "
"reachable or the returned data is truncated because it is too large. "
"To allow having missing output, set `raise_on_missing_output=False`. "
f"Failed to retrieve all {resource.value} from the cluster because"
"they are not reachable due to query failures to the data sources. "
"To avoid raising an exception and allow having missing output, "
"set `raise_on_missing_output=False`. "
)
# Raise an exception is there is data truncation that cause missing output.
total = api_response["total"]
num_after_truncation = api_response["num_after_truncation"]

if total != num_after_truncation:
raise RayStateApiException(
f"Failed to retrieve all {resource.value} from the cluster because "
"they are not reachable due to data truncation. It happens "
"when the returned data is too large "
# When the data is truncated, the truncation
# threshold == num_after_truncation. We cannot set this to env
# var because the CLI side might not have the correct env var.
f"(> {num_after_truncation}) "
"To avoid raising an exception and allow having missing output, "
"set `raise_on_missing_output=False`. "
)

def list(
Expand Down Expand Up @@ -474,8 +505,9 @@ def summary(
)
if raise_on_missing_output:
self._raise_on_missing_output(resource, summary_api_response)
# TODO(sang): Add warning after
# # https://github.com/ray-project/ray/pull/26801 is merged.
if _explain:
# There's no limit applied to summary, so we shouldn't warn.
self._print_api_warning(resource, summary_api_response, warn_limit=False)
return summary_api_response["result"]["node_id_to_summary"]


Expand Down
Loading