Skip to content

Commit

Permalink
Make wait_for_condition raise exception when timing out. (#9710)
Browse files Browse the repository at this point in the history
  • Loading branch information
robertnishihara committed Jul 27, 2020
1 parent 4d08ddb commit db0d6e8
Show file tree
Hide file tree
Showing 10 changed files with 40 additions and 42 deletions.
4 changes: 2 additions & 2 deletions python/ray/serve/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ def check_dead():
pass
return True

assert wait_for_condition(check_dead)
wait_for_condition(check_dead)


def test_shadow_traffic(serve_instance):
Expand Down Expand Up @@ -622,7 +622,7 @@ def check_requests():
requests_to_backend("backend4") > 0,
])

assert wait_for_condition(check_requests)
wait_for_condition(check_requests)


if __name__ == "__main__":
Expand Down
10 changes: 5 additions & 5 deletions python/ray/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,22 +239,22 @@ def wait_for_errors(error_type, num_errors, timeout=20):


def wait_for_condition(condition_predictor, timeout=30, retry_interval_ms=100):
"""A helper function that waits until a condition is met.
"""Wait until a condition is met or time out with an exception.
Args:
condition_predictor: A function that predicts the condition.
timeout: Maximum timeout in seconds.
retry_interval_ms: Retry interval in milliseconds.
Return:
Whether the condition is met within the timeout.
Raises:
RuntimeError: If the condition is not met before the timeout expires.
"""
start = time.time()
while time.time() - start <= timeout:
if condition_predictor():
return True
return
time.sleep(retry_interval_ms / 1000.0)
return False
raise RuntimeError("The condition wasn't met before the timeout expired.")


def wait_until_succeeded_without_exception(func,
Expand Down
8 changes: 4 additions & 4 deletions python/ray/tests/test_actor_failures.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,14 +362,14 @@ def actor_resource_available():

ray.experimental.set_resource("actor", 1)
actor = RestartableActor.remote()
assert wait_for_condition(lambda: not actor_resource_available())
wait_for_condition(lambda: not actor_resource_available())
# Kill the actor.
pid = ray.get(actor.get_pid.remote())

p = probe.remote()
os.kill(pid, SIGKILL)
ray.get(p)
assert wait_for_condition(lambda: not actor_resource_available())
wait_for_condition(lambda: not actor_resource_available())


def test_caller_actor_restart(ray_start_regular):
Expand Down Expand Up @@ -869,7 +869,7 @@ def actor_dead():
cluster.remove_node(get_other_nodes(cluster, exclude_head=True)[-1])
# Repeatedly submit tasks and call ray.wait until the exception for the
# dead actor is received.
assert wait_for_condition(actor_dead)
wait_for_condition(actor_dead)

# Create an actor on the local node that will call ray.wait in a loop.
head_node_resource = "HEAD_NODE"
Expand All @@ -889,7 +889,7 @@ def ping(self):
# Repeatedly call ray.wait through the local actor until the exception for
# the dead actor is received.
parent_actor = ParentActor.remote()
assert wait_for_condition(lambda: ray.get(parent_actor.wait.remote()))
wait_for_condition(lambda: ray.get(parent_actor.wait.remote()))


@pytest.mark.parametrize(
Expand Down
8 changes: 4 additions & 4 deletions python/ray/tests/test_failure.py
Original file line number Diff line number Diff line change
Expand Up @@ -1098,21 +1098,21 @@ def test_process_failure(use_actors):
pid = ray.get(a.get_pid.remote())
a.start_child.remote(use_actors=use_actors)
# Wait for the child to be scheduled.
assert wait_for_condition(lambda: not child_resource_available())
wait_for_condition(lambda: not child_resource_available())
# Kill the parent process.
os.kill(pid, 9)
assert wait_for_condition(child_resource_available)
wait_for_condition(child_resource_available)

# Test fate sharing if the parent node dies.
def test_node_failure(node_to_kill, use_actors):
a = Actor.options(resources={"parent": 1}).remote()
a.start_child.remote(use_actors=use_actors)
# Wait for the child to be scheduled.
assert wait_for_condition(lambda: not child_resource_available())
wait_for_condition(lambda: not child_resource_available())
# Kill the parent process.
cluster.remove_node(node_to_kill, allow_graceful=False)
node_to_kill = cluster.add_node(num_cpus=1, resources={"parent": 1})
assert wait_for_condition(child_resource_available)
wait_for_condition(child_resource_available)
return node_to_kill

if node_failure:
Expand Down
2 changes: 1 addition & 1 deletion python/ray/tests/test_gcs_fault_tolerance.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def condition():
return False

# Wait for the removed node dead.
assert wait_for_condition(condition, timeout=10)
wait_for_condition(condition, timeout=10)


if __name__ == "__main__":
Expand Down
6 changes: 3 additions & 3 deletions python/ray/tests/test_global_gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def check_refs_gced():
return (local_ref() is None and
not any(ray.get([a.has_garbage.remote() for a in actors])))

assert wait_for_condition(check_refs_gced)
wait_for_condition(check_refs_gced)
finally:
gc.enable()

Expand Down Expand Up @@ -105,7 +105,7 @@ def check_refs_gced():
return (local_ref() is None and
not any(ray.get([a.has_garbage.remote() for a in actors])))

assert wait_for_condition(check_refs_gced)
wait_for_condition(check_refs_gced)

# Local driver.
local_ref = weakref.ref(LargeObjectWithCyclicRef())
Expand All @@ -124,7 +124,7 @@ def check_refs_gced():
return (local_ref() is None and
not any(ray.get([a.has_garbage.remote() for a in actors])))

assert wait_for_condition(check_refs_gced)
wait_for_condition(check_refs_gced)
finally:
gc.enable()

Expand Down
2 changes: 1 addition & 1 deletion python/ray/tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def actor_finish():
else:
return False

assert wait_for_condition(actor_finish)
wait_for_condition(actor_finish)


def test_job_gc_with_detached_actor(call_ray_start):
Expand Down
35 changes: 16 additions & 19 deletions python/ray/tests/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ def f(arg):
stop_memory_table()
return True

def test_object_pineed_in_memory():
def test_object_pinned_in_memory():

a = ray.put(np.zeros(200 * 1024, dtype=np.uint8))
b = ray.get(a) # Noqa F841
Expand All @@ -470,8 +470,8 @@ def test_pending_task_references():
def f(arg):
time.sleep(1)

a = ray.put(np.zeros(200 * 1024, dtype=np.uint8)) # Noqa F841
b = f.remote(a) # Noqa F841
a = ray.put(np.zeros(200 * 1024, dtype=np.uint8))
b = f.remote(a)

wait_for_condition(memory_table_ready)
memory_table = get_memory_table()
Expand All @@ -492,7 +492,7 @@ def test_serialized_object_ref_reference():
def f(arg):
time.sleep(1)

a = ray.put(None) # Noqa F841
a = ray.put(None)
b = f.remote([a]) # Noqa F841

wait_for_condition(memory_table_ready)
Expand Down Expand Up @@ -551,30 +551,27 @@ class Actor:
# These tests should be retried because it takes at least one second
# to get the fresh new memory table. It is because memory table is updated
# Whenever raylet and node info is renewed which takes 1 second.
assert (wait_for_condition(
test_local_reference, timeout=30000, retry_interval_ms=1000) is True)
wait_for_condition(
test_local_reference, timeout=30000, retry_interval_ms=1000)

assert (wait_for_condition(
test_object_pineed_in_memory, timeout=30000, retry_interval_ms=1000) is
True)
wait_for_condition(
test_object_pinned_in_memory, timeout=30000, retry_interval_ms=1000)

assert (wait_for_condition(
test_pending_task_references, timeout=30000, retry_interval_ms=1000) is
True)
wait_for_condition(
test_pending_task_references, timeout=30000, retry_interval_ms=1000)

assert (wait_for_condition(
wait_for_condition(
test_serialized_object_ref_reference,
timeout=30000,
retry_interval_ms=1000) is True)
retry_interval_ms=1000)

assert (wait_for_condition(
wait_for_condition(
test_captured_object_ref_reference,
timeout=30000,
retry_interval_ms=1000) is True)
retry_interval_ms=1000)

assert (wait_for_condition(
test_actor_handle_reference, timeout=30000, retry_interval_ms=1000) is
True)
wait_for_condition(
test_actor_handle_reference, timeout=30000, retry_interval_ms=1000)


"""Memory Table Unit Test"""
Expand Down
5 changes: 3 additions & 2 deletions python/ray/tests/test_reconstruction.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def dependent_task(x):
cluster.remove_node(node_to_kill, allow_graceful=False)
cluster.add_node(
num_cpus=1, resources={"node1": 1}, object_store_memory=10**8)
assert wait_for_condition(
wait_for_condition(
lambda: not all(node["Alive"] for node in ray.nodes()), timeout=10)

for _ in range(20):
Expand Down Expand Up @@ -101,7 +101,7 @@ def dependent_task(x):
cluster.remove_node(node_to_kill, allow_graceful=False)
cluster.add_node(
num_cpus=1, resources={"node1": 1}, object_store_memory=10**8)
assert wait_for_condition(
wait_for_condition(
lambda: not all(node["Alive"] for node in ray.nodes()), timeout=10)

for _ in range(20):
Expand Down Expand Up @@ -296,6 +296,7 @@ def dependent_task(x):
pid = ray.get(a.pid.remote())


@pytest.mark.skipif(sys.platform == "win32", reason="Test failing on Windows.")
@pytest.mark.parametrize("reconstruction_enabled", [False, True])
def test_basic_reconstruction_actor_constructor(ray_start_cluster,
reconstruction_enabled):
Expand Down
2 changes: 1 addition & 1 deletion python/ray/tests/test_reference_counting_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def ref_not_exists():
inner_oid = ray.ObjectRef(inner_oid_binary)
return not worker.core_worker.object_exists(inner_oid)

assert wait_for_condition(ref_not_exists)
wait_for_condition(ref_not_exists)


# Call a recursive chain of tasks that pass a serialized reference that was
Expand Down

0 comments on commit db0d6e8

Please sign in to comment.