Skip to content

Commit

Permalink
[SPARK-46688][SPARK-46691][PYTHON][CONNECT] Support v2 profiling in a…
Browse files Browse the repository at this point in the history
…ggregate Pandas UDFs

### What changes were proposed in this pull request?
Support v2 (perf, memory) profiling in Aggregate (Series to Scalar) Pandas UDFs, which rely on physical plan nodes AggregateInPandasExec and WindowInPandasExec.

### Why are the changes needed?
Complete v2 profiling support.

### Does this PR introduce _any_ user-facing change?
Yes. V2 profiling in Aggregate Pandas UDFs is supported.

### How was this patch tested?
Unit tests.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#45035 from xinrong-meng/other_p.

Lead-authored-by: Xinrong Meng <[email protected]>
Co-authored-by: Takuya UESHIN <[email protected]>
Signed-off-by: Takuya UESHIN <[email protected]>
  • Loading branch information
xinrong-meng and ueshin committed Feb 7, 2024
1 parent 31f85e5 commit 9bd0d7c
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 3 deletions.
61 changes: 61 additions & 0 deletions python/pyspark/sql/tests/test_udf_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, pandas_udf, udf
from pyspark.sql.window import Window
from pyspark.profiler import UDFBasicProfiler
from pyspark.testing.sqlutils import (
ReusedSQLTestCase,
Expand Down Expand Up @@ -333,6 +334,66 @@ def filter_func(iterator):

self.assertEqual(0, len(self.profile_results), str(self.profile_results.keys()))

@unittest.skipIf(
not have_pandas or not have_pyarrow,
cast(str, pandas_requirement_message or pyarrow_requirement_message),
)
def test_perf_profiler_pandas_udf_window(self):
# WindowInPandasExec
import pandas as pd

@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()

df = self.spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
)
w = Window.partitionBy("id").orderBy("v").rowsBetween(-1, 0)

with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
df.withColumn("mean_v", mean_udf("v").over(w)).show()

self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys()))

for id in self.profile_results:
with self.trap_stdout() as io:
self.spark.showPerfProfiles(id)

self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
self.assertRegex(
io.getvalue(), f"5.*{os.path.basename(inspect.getfile(_do_computation))}"
)

@unittest.skipIf(
not have_pandas or not have_pyarrow,
cast(str, pandas_requirement_message or pyarrow_requirement_message),
)
def test_perf_profiler_aggregate_in_pandas(self):
# AggregateInPandasExec
import pandas as pd

@pandas_udf("double")
def min_udf(v: pd.Series) -> float:
return v.min()

with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
df = self.spark.createDataFrame(
[(2, "Alice"), (3, "Alice"), (5, "Bob"), (10, "Bob")], ["age", "name"]
)
df.groupBy(df.name).agg(min_udf(df.age)).show()

self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys()))

for id in self.profile_results:
with self.trap_stdout() as io:
self.spark.showPerfProfiles(id)

self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
self.assertRegex(
io.getvalue(), f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
)


class UDFProfiler2Tests(UDFProfiler2TestsMixin, ReusedSQLTestCase):
def setUp(self) -> None:
Expand Down
61 changes: 61 additions & 0 deletions python/pyspark/tests/test_memory_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from pyspark.profiler import has_memory_profiler
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, pandas_udf, udf
from pyspark.sql.window import Window
from pyspark.testing.sqlutils import (
have_pandas,
have_pyarrow,
Expand Down Expand Up @@ -380,6 +381,66 @@ def filter_func(iterator):

self.assertEqual(0, len(self.profile_results), str(self.profile_results.keys()))

@unittest.skipIf(
not have_pandas or not have_pyarrow,
cast(str, pandas_requirement_message or pyarrow_requirement_message),
)
def test_memory_profiler_pandas_udf_window(self):
# WindowInPandasExec
import pandas as pd

@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()

df = self.spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
)
w = Window.partitionBy("id").orderBy("v").rowsBetween(-1, 0)

with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
df.withColumn("mean_v", mean_udf("v").over(w)).show()

self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys()))

for id in self.profile_results:
with self.trap_stdout() as io:
self.spark.showMemoryProfiles(id)

self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
self.assertRegex(
io.getvalue(), f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}"
)

@unittest.skipIf(
not have_pandas or not have_pyarrow,
cast(str, pandas_requirement_message or pyarrow_requirement_message),
)
def test_memory_profiler_aggregate_in_pandas(self):
# AggregateInPandasExec
import pandas as pd

@pandas_udf("double")
def min_udf(v: pd.Series) -> float:
return v.min()

with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
df = self.spark.createDataFrame(
[(2, "Alice"), (3, "Alice"), (5, "Bob"), (10, "Bob")], ["age", "name"]
)
df.groupBy(df.name).agg(min_udf(df.age)).show()

self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys()))

for id in self.profile_results:
with self.trap_stdout() as io:
self.spark.showMemoryProfiles(id)

self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
self.assertRegex(
io.getvalue(), f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}"
)


class MemoryProfiler2Tests(MemoryProfiler2TestsMixin, ReusedSQLTestCase):
def setUp(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,7 @@ case class AggregateInPandasExec(
pythonRunnerConf,
pythonMetrics,
jobArtifactUUID,
None) // TODO(SPARK-46688): Support profiling on AggregateInPandasExec
.compute(projectedRowIter, context.partitionId(), context)
conf.pythonUDFProfiler).compute(projectedRowIter, context.partitionId(), context)

val joinedAttributes =
groupingExpressions.map(_.toAttribute) ++ aggExpressions.map(_.resultAttribute)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ case class WindowInPandasExec(
child.output,
longMetric("spillSize"),
pythonMetrics,
None) // TODO(SPARK-46691): Support profiling on WindowInPandasExec
conf.pythonUDFProfiler)

// Start processing.
if (conf.usePartitionEvaluator) {
Expand Down

0 comments on commit 9bd0d7c

Please sign in to comment.