Skip to content

Commit

Permalink
Fix tracing bug when actors are defined before connecting to cluster (r…
Browse files Browse the repository at this point in the history
  • Loading branch information
kathryn-zhou committed May 27, 2021
1 parent 65eab8f commit 6c1ea66
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 9 deletions.
9 changes: 5 additions & 4 deletions doc/source/ray-tracing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ First, install OpenTelemetry.

.. code-block:: shell
pip install opentelemetry-api==1.0.0rc1
pip install opentelemetry-sdk==1.0.0rc1
pip install opentelemetry-api==1.1.0
pip install opentelemetry-sdk==1.1.0
pip install opentelemetry-exporter-otlp==1.1.0
Tracing Startup Hook
--------------------
Expand All @@ -30,7 +31,7 @@ Below is an example tracing startup hook that sets up the default tracing provid
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
ConsoleSpanExporter,
SimpleExportSpanProcessor,
SimpleSpanProcessor,
)
Expand All @@ -41,7 +42,7 @@ Below is an example tracing startup hook that sets up the default tracing provid
# context and will log a warning if attempted multiple times.
trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
SimpleExportSpanProcessor(
SimpleSpanProcessor(
ConsoleSpanExporter(
out=open(f"/tmp/spans/{os.getpid()}.json", "a")
)
Expand Down
21 changes: 20 additions & 1 deletion python/ray/tests/test_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@ def ray_start_cli_tracing(scope="function"):
check_call_ray(["stop", "--force"])


@pytest.fixture()
def ray_start_cli_predefined_actor_tracing(scope="function"):
"""Start ray with tracing-startup-hook, and clean up at end of test."""
check_call_ray(["stop", "--force"], )
check_call_ray(
["start", "--head", "--tracing-startup-hook", setup_tracing_path], )
yield
ray.shutdown()
check_call_ray(["stop", "--force"])


@pytest.fixture()
def ray_start_init_tracing(scope="function"):
"""Call ray.init with tracing-startup-hook, and clean up at end of test."""
Expand Down Expand Up @@ -91,7 +102,7 @@ def f(value):
}


def sync_actor_helper():
def sync_actor_helper(connect_to_cluster: bool = False):
"""Run a Ray sync actor and check the spans produced."""

@ray.remote
Expand All @@ -103,6 +114,9 @@ def increment(self):
self.value += 1
return self.value

if connect_to_cluster:
ray.init(address="auto")

# Create an actor from this class.
counter = Counter.remote()
obj_ref = counter.increment.remote()
Expand Down Expand Up @@ -177,6 +191,11 @@ def test_tracing_async_actor_start_workflow(cleanup_dirs,
assert async_actor_helper()


def test_tracing_predefined_actor(cleanup_dirs,
ray_start_cli_predefined_actor_tracing):
assert sync_actor_helper(connect_to_cluster=True)


def test_wrapping(ray_start_init_tracing):
@ray.remote
def f(**_kwargs):
Expand Down
8 changes: 4 additions & 4 deletions python/ray/util/tracing/tracing_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def _function_hydrate_span_args(func: Callable[..., Any]):
# We only get task ID for workers
if ray.worker.global_worker.mode == ray.worker.WORKER_MODE:
task_id = (runtime_context["task_id"].hex()
if runtime_context["task_id"] else None)
if runtime_context.get("task_id") else None)
if task_id:
span_args["ray.task_id"] = task_id

Expand Down Expand Up @@ -195,7 +195,7 @@ def _actor_hydrate_span_args(class_: _nameable, method: _nameable):
# We only get actor ID for workers
if ray.worker.global_worker.mode == ray.worker.WORKER_MODE:
actor_id = (runtime_context["actor_id"].hex()
if runtime_context["actor_id"] else None)
if runtime_context.get("actor_id") else None)

if actor_id:
span_args["ray.actor_id"] = actor_id
Expand Down Expand Up @@ -313,6 +313,7 @@ def _invocation_actor_class_remote_span(
kwargs = {}
# If tracing feature flag is not on, perform a no-op
if not is_tracing_enabled():
kwargs["_ray_trace_ctx"] = None
return method(self, args, kwargs, *_args, **_kwargs)

class_name = self.__ray_metadata__.class_name
Expand Down Expand Up @@ -464,8 +465,7 @@ async def _resume_span(
# Skip tracing for staticmethod or classmethod, because these method
# might not be called directly by remote calls. Additionally, they are
# tricky to get wrapped and unwrapped.
if (is_static_method(_cls, name) or is_class_method(method)
or not is_tracing_enabled()):
if (is_static_method(_cls, name) or is_class_method(method)):
continue

# Add _ray_trace_ctx to method signature
Expand Down

0 comments on commit 6c1ea66

Please sign in to comment.