Skip to content

Commit

Permalink
[data] New executor backend [2/n]--- Add allocation tracing util (ray…
Browse files Browse the repository at this point in the history
…-project#31283)

Add a utility class for tracing object allocation / freeing. This makes it a lot easier to debug memory allocation / freeing issues.

This is split out from ray-project#30903

Signed-off-by: tmynn <[email protected]>
  • Loading branch information
ericl authored and tamohannes committed Jan 25, 2023
1 parent f8ed09e commit a00640e
Show file tree
Hide file tree
Showing 2 changed files with 178 additions and 0 deletions.
140 changes: 140 additions & 0 deletions python/ray/data/_internal/memory_tracing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
"""Utility for debugging object store memory eager deletion in Datasets.
Enable with RAY_DATASET_TRACE_ALLOCATIONS=1.
Basic usage is to call `trace_allocation` each time a new object is created, and call
`trace_deallocation` when an object should be disposed of. When the workload is
complete, call `leak_report` to view possibly leaked objects.
Note that so called "leaked" objects will be reclaimed eventually by reference counting
in Ray. This is just to debug the eager deletion protocol which is more efficient.
"""

from io import StringIO
from typing import Dict, List

import ray
from ray.data.context import DatasetContext


def trace_allocation(ref: ray.ObjectRef, loc: str) -> None:
"""Record that an object has been created.
Args:
ref: The object created.
loc: A human-readable string identifying the call site.
"""
ctx = DatasetContext.get_current()
if ctx.trace_allocations:
tracer = _get_mem_actor()
# TODO: it would be nice to determine loc automatically based on the stack.
ray.get(tracer.trace_alloc.remote([ref], loc))


def trace_deallocation(ref: ray.ObjectRef, loc: str, free: bool = True) -> None:
"""Record that an object has been deleted (and delete if free=True).
Args:
ref: The object we no longer need.
loc: A human-readable string identifying the call site.
free: Whether to eagerly destroy the object instead of waiting for Ray
reference counting to kick in.
"""
if free:
ray._private.internal_api.free(ref, local_only=False)
ctx = DatasetContext.get_current()
if ctx.trace_allocations:
tracer = _get_mem_actor()
ray.get(tracer.trace_dealloc.remote([ref], loc, free))


def leak_report() -> str:
tracer = _get_mem_actor()
return ray.get(tracer.leak_report.remote())


@ray.remote(num_cpus=0)
class _MemActor:
def __init__(self):
self.allocated: Dict[ray.ObjectRef, dict] = {}
self.deallocated: Dict[ray.ObjectRef, dict] = {}
self.skip_dealloc: Dict[ray.ObjectRef, str] = {}
self.peak_mem = 0
self.cur_mem = 0

def trace_alloc(self, ref: List[ray.ObjectRef], loc: str):
ref = ref[0] # Avoid Ray materializing the ref.
if ref not in self.allocated:
meta = ray.experimental.get_object_locations([ref])
size_bytes = meta.get("object_size", 0)
if not size_bytes:
size_bytes = -1
from ray import cloudpickle as pickle

try:
obj = ray.get(ref, timeout=5.0)
size_bytes = len(pickle.dumps(obj))
except Exception:
print("[mem_tracing] ERROR getting size")
size_bytes = -1
print(f"[mem_tracing] Allocated {size_bytes} bytes at {loc}: {ref}")
entry = {
"size_bytes": size_bytes,
"loc": loc,
}
self.allocated[ref] = entry
self.cur_mem += size_bytes
self.peak_mem = max(self.cur_mem, self.peak_mem)

def trace_dealloc(self, ref: List[ray.ObjectRef], loc: str, freed: bool):
ref = ref[0] # Avoid Ray materializing the ref.
size_bytes = self.allocated.get(ref, {}).get("size_bytes", 0)
if freed:
print(f"[mem_tracing] Freed {size_bytes} bytes at {loc}: {ref}")
if ref in self.allocated:
self.cur_mem -= size_bytes
self.deallocated[ref] = self.allocated.pop(ref)
self.deallocated[ref]["dealloc_loc"] = loc
else:
print(f"[mem_tracing] WARNING: allocation of {ref} was not traced!")
else:
print(f"[mem_tracing] Skipped freeing {size_bytes} bytes at {loc}: {ref}")
self.skip_dealloc[ref] = loc

def leak_report(self) -> str:
output = StringIO()
output.write("[mem_tracing] ===== Leaked objects =====\n")
for ref in self.allocated:
size_bytes = self.allocated[ref].get("size_bytes")
loc = self.allocated[ref].get("loc")
if ref in self.skip_dealloc:
dealloc_loc = self.skip_dealloc[ref]
output.write(
f"[mem_tracing] Leaked object, created at {loc}, size "
f"{size_bytes}, skipped dealloc at {dealloc_loc}: {ref}\n"
)
else:
output.write(
f"[mem_tracing] Leaked object, created at {loc}, "
f"size {size_bytes}: {ref}\n"
)
output.write("[mem_tracing] ===== End leaked objects =====\n")
output.write("[mem_tracing] ===== Freed objects =====\n")
for ref in self.deallocated:
size_bytes = self.deallocated[ref].get("size_bytes")
loc = self.deallocated[ref].get("loc")
dealloc_loc = self.deallocated[ref].get("dealloc_loc")
output.write(
f"[mem_tracing] Freed object from {loc} at {dealloc_loc}, "
f"size {size_bytes}: {ref}\n"
)
output.write("[mem_tracing] ===== End freed objects =====\n")
output.write(f"[mem_tracing] Peak size bytes {self.peak_mem}\n")
output.write(f"[mem_tracing] Current size bytes {self.cur_mem}\n")
return output.getvalue()


def _get_mem_actor():
return _MemActor.options(
name="mem_tracing_actor", get_if_exists=True, lifetime="detached"
).remote()
38 changes: 38 additions & 0 deletions python/ray/data/tests/test_util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
import pytest
import ray
import numpy as np

from ray.data._internal.util import _check_pyarrow_version
from ray.data._internal.memory_tracing import (
trace_allocation,
trace_deallocation,
leak_report,
)
from ray.data.tests.conftest import * # noqa: F401, F403


Expand Down Expand Up @@ -34,6 +41,37 @@ def test_check_pyarrow_version_supported():
pytest.fail(f"_check_pyarrow_version failed unexpectedly: {e}")


@pytest.mark.parametrize("enabled", [False, True])
def test_memory_tracing(enabled):
ctx = ray.data.context.DatasetContext.get_current()
ctx.trace_allocations = enabled
ref1 = ray.put(np.zeros(1024 * 1024))
ref2 = ray.put(np.zeros(1024 * 1024))
ref3 = ray.put(np.zeros(1024 * 1024))
trace_allocation(ref1, "test1")
trace_allocation(ref2, "test2")
trace_allocation(ref3, "test5")
trace_deallocation(ref1, "test3", free=False)
trace_deallocation(ref2, "test4", free=True)
ray.get(ref1)
with pytest.raises(ray.exceptions.ObjectFreedError):
ray.get(ref2)
report = leak_report()
print(report)

if enabled:
assert "Leaked object, created at test1" in report, report
assert "Leaked object, created at test5" in report, report
assert "Freed object from test2 at test4" in report, report
assert "skipped dealloc at test3" in report, report
else:
assert "test1" not in report, report
assert "test2" not in report, report
assert "test3" not in report, report
assert "test4" not in report, report
assert "test5" not in report, report


if __name__ == "__main__":
import sys

Expand Down

0 comments on commit a00640e

Please sign in to comment.