Skip to content

Commit

Permalink
Merge branch 'master' into fix_deployment_state
Browse files Browse the repository at this point in the history
  • Loading branch information
sihanwang41 committed Sep 29, 2023
2 parents db1492d + d13c530 commit 0bc0fc2
Show file tree
Hide file tree
Showing 76 changed files with 1,041 additions and 1,015 deletions.
1 change: 1 addition & 0 deletions ci/ray_ci/data.tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ flaky_tests:
- //python/ray/data:test_streaming_executor
- //python/ray/data:test_split
- //python/ray/data:test_object_gc
- //python/ray/data:test_stats
12 changes: 10 additions & 2 deletions ci/ray_ci/test_tester.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,20 @@ def test_get_test_targets() -> None:


def test_get_all_test_query() -> None:
assert _get_all_test_query(["a", "b"], "core", "") == (
assert _get_all_test_query(["a", "b"], "core") == (
"attr(tags, 'team:core\\\\b', tests(a) union tests(b))"
)
assert _get_all_test_query(["a"], "core", "tag") == (
assert _get_all_test_query(["a"], "core", except_tags="tag") == (
"attr(tags, 'team:core\\\\b', tests(a)) except (attr(tags, tag, tests(a)))"
)
assert _get_all_test_query(["a"], "core", only_tags="tag") == (
"attr(tags, 'team:core\\\\b', tests(a)) intersect (attr(tags, tag, tests(a)))"
)
assert _get_all_test_query(["a"], "core", except_tags="tag1", only_tags="tag2") == (
"attr(tags, 'team:core\\\\b', tests(a)) "
"except (attr(tags, tag1, tests(a))) "
"intersect (attr(tags, tag2, tests(a)))"
)


def test_get_flaky_test_targets() -> None:
Expand Down
44 changes: 32 additions & 12 deletions ci/ray_ci/tester.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@
type=str,
help=("Except tests with the given tags."),
)
@click.option(
"--only-tags",
default="",
type=str,
help=("Only include tests with the given tags."),
)
@click.option(
"--run-flaky-tests",
is_flag=True,
Expand All @@ -65,6 +71,7 @@ def main(
worker_id: int,
parallelism_per_worker: int,
except_tags: str,
only_tags: str,
run_flaky_tests: bool,
test_env: List[str],
build_name: Optional[str],
Expand All @@ -85,6 +92,7 @@ def main(
targets,
team,
except_tags,
only_tags,
)
success = container.run_tests(test_targets, test_env)
sys.exit(0 if success else 1)
Expand All @@ -108,39 +116,51 @@ def _get_container(
)


def _get_all_test_query(targets: List[str], team: str, except_tags: str) -> str:
def _get_all_test_query(
targets: List[str],
team: str,
except_tags: Optional[str] = None,
only_tags: Optional[str] = None,
) -> str:
"""
Get all test targets that are owned by a particular team, except those that
have the given tags
"""
test_query = " union ".join([f"tests({target})" for target in targets])
team_query = f"attr(tags, 'team:{team}\\\\b', {test_query})"
if not except_tags:
# return all tests owned by the team if no except_tags are given
return team_query

# otherwise exclude tests with the given tags
except_query = " union ".join(
[f"attr(tags, {t}, {test_query})" for t in except_tags.split(",")]
)
return f"{team_query} except ({except_query})"
query = f"attr(tags, 'team:{team}\\\\b', {test_query})"

if except_tags:
except_query = " union ".join(
[f"attr(tags, {t}, {test_query})" for t in except_tags.split(",")]
)
query = f"{query} except ({except_query})"

if only_tags:
only_query = " union ".join(
[f"attr(tags, {t}, {test_query})" for t in only_tags.split(",")]
)
query = f"{query} intersect ({only_query})"

return query


def _get_test_targets(
container: TesterContainer,
targets: str,
team: str,
except_tags: Optional[str] = "",
only_tags: Optional[str] = "",
yaml_dir: Optional[str] = None,
) -> List[str]:
"""
Get all test targets that are not flaky
"""

query = _get_all_test_query(targets, team, except_tags, only_tags)
test_targets = (
container.run_script_with_output(
[
f'bazel query "{_get_all_test_query(targets, team, except_tags)}"',
f'bazel query "{query}"',
]
)
.decode("utf-8")
Expand Down
13 changes: 12 additions & 1 deletion dashboard/modules/job/tests/test_http_job_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,18 @@ def test_submit_job(job_sdk_client, runtime_env_option, monkeypatch):
runtime_env=runtime_env_option["runtime_env"],
)

wait_for_condition(_check_job_succeeded, client=client, job_id=job_id, timeout=60)
try:
wait_for_condition(
_check_job_succeeded, client=client, job_id=job_id, timeout=60
)
except RuntimeError as e:
# If the job is still pending, include job logs and info in error.
if client.get_job_status(job_id) == JobStatus.PENDING:
logs = client.get_job_logs(job_id)
info = client.get_job_info(job_id)
raise RuntimeError(
f"Job was stuck in PENDING.\nLogs: {logs}\nInfo: {info}"
) from e

logs = client.get_job_logs(job_id)
assert runtime_env_option["expected_logs"] in logs
Expand Down
24 changes: 18 additions & 6 deletions dashboard/modules/job/tests/test_job_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,24 @@ async def test_submit_job(job_sdk_client, runtime_env_option, monkeypatch):
submit_result = await agent_client.submit_job_internal(request)
job_id = submit_result.submission_id

wait_for_condition(
partial(
_check_job, client=head_client, job_id=job_id, status=JobStatus.SUCCEEDED
),
timeout=60,
)
try:
wait_for_condition(
partial(
_check_job,
client=head_client,
job_id=job_id,
status=JobStatus.SUCCEEDED,
),
timeout=60,
)
except RuntimeError as e:
# If the job is still pending, include job logs and info in error.
if head_client.get_job_status(job_id) == JobStatus.PENDING:
logs = head_client.get_job_logs(job_id)
info = head_client.get_job_info(job_id)
raise RuntimeError(
f"Job was stuck in PENDING.\nLogs: {logs}\nInfo: {info}"
) from e

# There is only one node, so there is no need to replace the client of the JobAgent
resp = await agent_client.get_job_logs_internal(job_id)
Expand Down
25 changes: 25 additions & 0 deletions dashboard/modules/reporter/reporter_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,12 @@ def jsonify_asdict(o) -> str:
"MB",
COMPONENT_METRICS_TAG_KEYS,
),
"component_num_fds": Gauge(
"component_num_fds",
"Number of open fds of all components on the node.",
"count",
COMPONENT_METRICS_TAG_KEYS,
),
"cluster_active_nodes": Gauge(
"cluster_active_nodes",
"Active nodes on the cluster",
Expand Down Expand Up @@ -529,6 +535,7 @@ def _get_workers(self):
"cmdline",
"memory_info",
"memory_full_info",
"num_fds",
]
)
)
Expand Down Expand Up @@ -566,6 +573,7 @@ def _get_raylet(self):
"cmdline",
"memory_info",
"memory_full_info",
"num_fds",
]
)

Expand All @@ -582,6 +590,7 @@ def _get_agent(self):
"cmdline",
"memory_info",
"memory_full_info",
"num_fds",
]
)

Expand Down Expand Up @@ -688,6 +697,13 @@ def _generate_reseted_stats_record(self, component_name: str) -> List[Record]:
tags=tags,
)
)
records.append(
Record(
gauge=METRICS_GAUGES["component_num_fds"],
value=0,
tags=tags,
)
)
return records

def _generate_system_stats_record(
Expand All @@ -709,6 +725,7 @@ def _generate_system_stats_record(
total_rss = 0.0
total_uss = 0.0
total_shm = 0.0
total_num_fds = 0

for stat in stats:
total_cpu_percentage += float(stat.get("cpu_percent", 0.0)) # noqa
Expand All @@ -721,6 +738,7 @@ def _generate_system_stats_record(
mem_full_info = stat.get("memory_full_info")
if mem_full_info is not None:
total_uss += float(mem_full_info.uss) / 1.0e6
total_num_fds += int(stat.get("num_fds", 0))

tags = {"ip": self._ip, "Component": component_name}
if pid:
Expand Down Expand Up @@ -756,6 +774,13 @@ def _generate_system_stats_record(
tags=tags,
)
)
records.append(
Record(
gauge=METRICS_GAUGES["component_num_fds"],
value=total_num_fds,
tags=tags,
)
)

return records

Expand Down
Loading

0 comments on commit 0bc0fc2

Please sign in to comment.