Skip to content

Commit

Permalink
[SPARK-47069][PYTHON][CONNECT] Introduce spark.profile.show/dump fo…
Browse files Browse the repository at this point in the history
…r SparkSession-based profiling

### What changes were proposed in this pull request?
Introduce `spark.profile.show/dump` for SparkSession-based profiling for non-Spark-Connect.

### Why are the changes needed?
SparkContext-based profiling has `sc.dump_profiles/show_profiles` for both perf and memory profiling.
Currently SparkSession-based has `spark.dump/showPerfProfiles` and `spark.dump/showMemoryProfiles` for perf and memory profiling separately.
It would be more consistent and user-friendly to consolidate them to a uniform interface as `spark.profile.dump/show`.

### Does this PR introduce _any_ user-facing change?
Yes. `spark.profile.show/dump` is supported, whereas (not-released yet) APIs below are removed
-`spark.dumpPerfProfiles`
-`spark.dumpMemoryProfiles`
-`spark.showPerfProfiles`
-`spark.showMemoryProfiles`

```py
>>> spark.conf.set("spark.sql.pyspark.udf.profiler", "perf")  # enable cProfiler
>>>
>>> udf("string")
... def f(x):
...       return str(x)
...
>>> df = spark.range(10).select(f(col("id")))
>>> df.collect()
[Row(f(id)='0'), ...]
>>> spark.profile.show()
============================================================
Profile of UDF<id=2>
============================================================
...

>>> spark.profile.show(type="memory")
>>> spark.profile.show(type="perf")
============================================================
Profile of UDF<id=2>
============================================================
...

>>> spark.profile.show(2, type="perf")
============================================================
Profile of UDF<id=2>
============================================================
...

>>> spark.profile.show(2, type="memory")

```

### How was this patch tested?
Unit tests.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#45129 from xinrong-meng/spark.profile.

Authored-by: Xinrong Meng <[email protected]>
Signed-off-by: Takuya UESHIN <[email protected]>
  • Loading branch information
xinrong-meng authored and ueshin committed Feb 22, 2024
1 parent 6185e5c commit 9debaea
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 85 deletions.
30 changes: 4 additions & 26 deletions python/pyspark/sql/connect/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
from pyspark.sql.connect.streaming.query import StreamingQueryManager
from pyspark.sql.pandas.serializers import ArrowStreamPandasSerializer
from pyspark.sql.pandas.types import to_arrow_schema, to_arrow_type, _deduplicate_field_names
from pyspark.sql.profiler import Profile
from pyspark.sql.session import classproperty, SparkSession as PySparkSession
from pyspark.sql.types import (
_infer_schema,
Expand Down Expand Up @@ -941,32 +942,9 @@ def session_id(self) -> str:
def _profiler_collector(self) -> ProfilerCollector:
return self._client._profiler_collector

def showPerfProfiles(self, id: Optional[int] = None) -> None:
self._profiler_collector.show_perf_profiles(id)

showPerfProfiles.__doc__ = PySparkSession.showPerfProfiles.__doc__

def showMemoryProfiles(self, id: Optional[int] = None) -> None:
if has_memory_profiler:
self._profiler_collector.show_memory_profiles(id)
else:
warnings.warn(
"Memory profiling is disabled. To enable it, install 'memory-profiler',"
" e.g., from PyPI (https://pypi.org/project/memory-profiler/).",
UserWarning,
)

showMemoryProfiles.__doc__ = PySparkSession.showMemoryProfiles.__doc__

def dumpPerfProfiles(self, path: str, id: Optional[int] = None) -> None:
self._profiler_collector.dump_perf_profiles(path, id)

dumpPerfProfiles.__doc__ = PySparkSession.dumpPerfProfiles.__doc__

def dumpMemoryProfiles(self, path: str, id: Optional[int] = None) -> None:
self._profiler_collector.dump_memory_profiles(path, id)

dumpMemoryProfiles.__doc__ = PySparkSession.dumpMemoryProfiles.__doc__
@property
def profile(self) -> Profile:
return Profile(self._client._profiler_collector)


SparkSession.__doc__ = PySparkSession.__doc__
Expand Down
70 changes: 70 additions & 0 deletions python/pyspark/sql/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
SpecialAccumulatorIds,
_accumulatorRegistry,
)
from pyspark.errors import PySparkValueError
from pyspark.profiler import CodeMapDict, MemoryProfiler, MemUsageParam, PStatsParam

if TYPE_CHECKING:
Expand Down Expand Up @@ -239,3 +240,72 @@ def _profile_results(self) -> "ProfileResults":
with self._lock:
value = self._accumulator.value
return value if value is not None else {}


class Profile:
"""User-facing profile API. This instance can be accessed by
:attr:`spark.profile`.
.. versionadded: 4.0.0
"""

def __init__(self, profiler_collector: ProfilerCollector):
self.profiler_collector = profiler_collector

def show(self, id: Optional[int] = None, *, type: Optional[str] = None) -> None:
"""
Show the profile results.
.. versionadded:: 4.0.0
Parameters
----------
id : int, optional
A UDF ID to be shown. If not specified, all the results will be shown.
type : str, optional
The profiler type, which can be either "perf" or "memory".
"""
if type == "memory":
self.profiler_collector.show_memory_profiles(id)
elif type == "perf" or type is None:
self.profiler_collector.show_perf_profiles(id)
if type is None: # Show both perf and memory profiles
self.profiler_collector.show_memory_profiles(id)
else:
raise PySparkValueError(
error_class="VALUE_NOT_ALLOWED",
message_parameters={
"arg_name": "type",
"allowed_values": str(["perf", "memory"]),
},
)

def dump(self, path: str, id: Optional[int] = None, *, type: Optional[str] = None) -> None:
"""
Dump the profile results into directory `path`.
.. versionadded:: 4.0.0
Parameters
----------
path: str
A directory in which to dump the profile.
id : int, optional
A UDF ID to be shown. If not specified, all the results will be shown.
type : str, optional
The profiler type, which can be either "perf" or "memory".
"""
if type == "memory":
self.profiler_collector.dump_memory_profiles(path, id)
elif type == "perf" or type is None:
self.profiler_collector.dump_perf_profiles(path, id)
if type is None: # Dump both perf and memory profiles
self.profiler_collector.dump_memory_profiles(path, id)
else:
raise PySparkValueError(
error_class="VALUE_NOT_ALLOWED",
message_parameters={
"arg_name": "type",
"allowed_values": str(["perf", "memory"]),
},
)
33 changes: 5 additions & 28 deletions python/pyspark/sql/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import lit
from pyspark.sql.pandas.conversion import SparkConversionMixin
from pyspark.sql.profiler import AccumulatorProfilerCollector, ProfilerCollector
from pyspark.sql.profiler import AccumulatorProfilerCollector, Profile
from pyspark.sql.readwriter import DataFrameReader
from pyspark.sql.sql_formatter import SQLStringFormatter
from pyspark.sql.streaming import DataStreamReader
Expand Down Expand Up @@ -906,6 +906,10 @@ def dataSource(self) -> "DataSourceRegistration":

return DataSourceRegistration(self)

@property
def profile(self) -> Profile:
return Profile(self._profiler_collector)

def range(
self,
start: int,
Expand Down Expand Up @@ -2128,33 +2132,6 @@ def clearTags(self) -> None:
message_parameters={"feature": "SparkSession.clearTags"},
)

def showPerfProfiles(self, id: Optional[int] = None) -> None:
self._profiler_collector.show_perf_profiles(id)

showPerfProfiles.__doc__ = ProfilerCollector.show_perf_profiles.__doc__

def showMemoryProfiles(self, id: Optional[int] = None) -> None:
if has_memory_profiler:
self._profiler_collector.show_memory_profiles(id)
else:
warnings.warn(
"Memory profiling is disabled. To enable it, install 'memory-profiler',"
" e.g., from PyPI (https://pypi.org/project/memory-profiler/).",
UserWarning,
)

showMemoryProfiles.__doc__ = ProfilerCollector.show_memory_profiles.__doc__

def dumpPerfProfiles(self, path: str, id: Optional[int] = None) -> None:
self._profiler_collector.dump_perf_profiles(path, id)

dumpPerfProfiles.__doc__ = ProfilerCollector.dump_perf_profiles.__doc__

def dumpMemoryProfiles(self, path: str, id: Optional[int] = None) -> None:
self._profiler_collector.dump_memory_profiles(path, id)

dumpMemoryProfiles.__doc__ = ProfilerCollector.dump_memory_profiles.__doc__


def _test() -> None:
import os
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def action(df):

for id in self.profile_results:
with self.trap_stdout() as io:
self.spark.showMemoryProfiles(id)
self.spark.profile.show(id, type="memory")

self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
self.assertRegex(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def action(df):

for id in self.profile_results:
with self.trap_stdout() as io:
self.spark.showPerfProfiles(id)
self.spark.profile.show(id, type="perf")

self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
self.assertRegex(
Expand Down
63 changes: 62 additions & 1 deletion python/pyspark/sql/tests/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
from io import StringIO

from pyspark import SparkConf, SparkContext
from pyspark.errors import PySparkRuntimeError
from pyspark.errors import PySparkRuntimeError, PySparkValueError
from pyspark.sql import SparkSession, SQLContext, Row
from pyspark.sql.functions import col
from pyspark.testing.connectutils import (
should_test_connect,
connect_requirement_message,
)
from pyspark.sql.profiler import Profile
from pyspark.testing.sqlutils import ReusedSQLTestCase
from pyspark.testing.utils import PySparkTestCase, PySparkErrorTestUtils

Expand Down Expand Up @@ -471,6 +472,66 @@ def test_invalid_create(self):
)


class SparkSessionProfileTests(unittest.TestCase, PySparkErrorTestUtils):
def setUp(self):
self.profiler_collector_mock = unittest.mock.Mock()
self.profile = Profile(self.profiler_collector_mock)

def test_show_memory_type(self):
self.profile.show(type="memory")
self.profiler_collector_mock.show_memory_profiles.assert_called_with(None)
self.profiler_collector_mock.show_perf_profiles.assert_not_called()

def test_show_perf_type(self):
self.profile.show(type="perf")
self.profiler_collector_mock.show_perf_profiles.assert_called_with(None)
self.profiler_collector_mock.show_memory_profiles.assert_not_called()

def test_show_no_type(self):
self.profile.show()
self.profiler_collector_mock.show_perf_profiles.assert_called_with(None)
self.profiler_collector_mock.show_memory_profiles.assert_called_with(None)

def test_show_invalid_type(self):
with self.assertRaises(PySparkValueError) as e:
self.profile.show(type="invalid")
self.check_error(
exception=e.exception,
error_class="VALUE_NOT_ALLOWED",
message_parameters={
"arg_name": "type",
"allowed_values": str(["perf", "memory"]),
},
)

def test_dump_memory_type(self):
self.profile.dump("path/to/dump", type="memory")
self.profiler_collector_mock.dump_memory_profiles.assert_called_with("path/to/dump", None)
self.profiler_collector_mock.dump_perf_profiles.assert_not_called()

def test_dump_perf_type(self):
self.profile.dump("path/to/dump", type="perf")
self.profiler_collector_mock.dump_perf_profiles.assert_called_with("path/to/dump", None)
self.profiler_collector_mock.dump_memory_profiles.assert_not_called()

def test_dump_no_type(self):
self.profile.dump("path/to/dump")
self.profiler_collector_mock.dump_perf_profiles.assert_called_with("path/to/dump", None)
self.profiler_collector_mock.dump_memory_profiles.assert_called_with("path/to/dump", None)

def test_dump_invalid_type(self):
with self.assertRaises(PySparkValueError) as e:
self.profile.dump("path/to/dump", type="invalid")
self.check_error(
exception=e.exception,
error_class="VALUE_NOT_ALLOWED",
message_parameters={
"arg_name": "type",
"allowed_values": str(["perf", "memory"]),
},
)


class SparkExtensionsTest(unittest.TestCase):
# These tests are separate because it uses 'spark.sql.extensions' which is
# static and immutable. This can't be set or unset, for example, via `spark.conf`.
Expand Down
28 changes: 14 additions & 14 deletions python/pyspark/sql/tests/test_udf_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,16 +183,16 @@ def test_perf_profiler_udf(self):
self.assertEqual(3, len(self.profile_results), str(list(self.profile_results)))

with self.trap_stdout() as io_all:
self.spark.showPerfProfiles()
self.spark.profile.show(type="perf")

with tempfile.TemporaryDirectory() as d:
self.spark.dumpPerfProfiles(d)
self.spark.profile.dump(d, type="perf")

for id in self.profile_results:
self.assertIn(f"Profile of UDF<id={id}>", io_all.getvalue())

with self.trap_stdout() as io:
self.spark.showPerfProfiles(id)
self.spark.profile.show(id, type="perf")

self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
self.assertRegex(
Expand All @@ -212,7 +212,7 @@ def test_perf_profiler_udf_with_arrow(self):

for id in self.profile_results:
with self.trap_stdout() as io:
self.spark.showPerfProfiles(id)
self.spark.profile.show(id, type="perf")

self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
self.assertRegex(
Expand All @@ -231,7 +231,7 @@ def action(df):

for id in self.profile_results:
with self.trap_stdout() as io:
self.spark.showPerfProfiles(id)
self.spark.profile.show(id, type="perf")

self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
self.assertRegex(
Expand All @@ -252,7 +252,7 @@ def add1(x):

for id in self.profile_results:
with self.trap_stdout() as io:
self.spark.showPerfProfiles(id)
self.spark.profile.show(id, type="perf")

self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
self.assertRegex(
Expand Down Expand Up @@ -282,7 +282,7 @@ def add2(x):

for id in self.profile_results:
with self.trap_stdout() as io:
self.spark.showPerfProfiles(id)
self.spark.profile.show(id, type="perf")

self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
self.assertRegex(
Expand Down Expand Up @@ -315,7 +315,7 @@ def add2(iter: Iterator[pd.Series]) -> Iterator[pd.Series]:

for id in self.profile_results:
with self.trap_stdout() as io:
self.spark.showPerfProfiles(id)
self.spark.profile.show(id, type="perf")

self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
self.assertRegex(
Expand Down Expand Up @@ -362,7 +362,7 @@ def mean_udf(v: pd.Series) -> float:

for id in self.profile_results:
with self.trap_stdout() as io:
self.spark.showPerfProfiles(id)
self.spark.profile.show(id, type="perf")

self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
self.assertRegex(
Expand Down Expand Up @@ -391,7 +391,7 @@ def min_udf(v: pd.Series) -> float:

for id in self.profile_results:
with self.trap_stdout() as io:
self.spark.showPerfProfiles(id)
self.spark.profile.show(id, type="perf")

self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
self.assertRegex(
Expand Down Expand Up @@ -419,7 +419,7 @@ def normalize(pdf):

for id in self.profile_results:
with self.trap_stdout() as io:
self.spark.showPerfProfiles(id)
self.spark.profile.show(id, type="perf")

self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
self.assertRegex(
Expand Down Expand Up @@ -454,7 +454,7 @@ def asof_join(left, right):

for id in self.profile_results:
with self.trap_stdout() as io:
self.spark.showPerfProfiles(id)
self.spark.profile.show(id, type="perf")

self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
self.assertRegex(
Expand Down Expand Up @@ -485,7 +485,7 @@ def normalize(table):

for id in self.profile_results:
with self.trap_stdout() as io:
self.spark.showPerfProfiles(id)
self.spark.profile.show(id, type="perf")

self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
self.assertRegex(
Expand Down Expand Up @@ -514,7 +514,7 @@ def summarize(left, right):

for id in self.profile_results:
with self.trap_stdout() as io:
self.spark.showPerfProfiles(id)
self.spark.profile.show(id, type="perf")

self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
self.assertRegex(
Expand Down
Loading

0 comments on commit 9debaea

Please sign in to comment.