Skip to content

Commit

Permalink
Remove independent cluster testing (#1123)
Browse files Browse the repository at this point in the history
* Remove make_pickable_without_dask_sql

* Remove independent cluster testing

* Remove testing logic around independent cluster

* Add test job running on consistent distributed cluster

* Add distributed label to test jobs

* Remove some more mentions of independent cluster

* Remove conditional cluster skips

* Make sure to close client on app fixtures

* Try using session-wide client to speed up test run

* Remove line around complex cluster setups

* Add 3.10 cluster testing build

---------

Co-authored-by: Ayush Dattagupta <[email protected]>
  • Loading branch information
charlesbluca and ayushdg authored May 2, 2023
1 parent 6d0872a commit e68817f
Show file tree
Hide file tree
Showing 20 changed files with 51 additions and 317 deletions.
76 changes: 12 additions & 64 deletions .github/workflows/test-upstream.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ on:
options:
- Dask
- DataFusion

# Required shell entrypoint to have properly activated conda environments
defaults:
run:
Expand All @@ -32,18 +33,25 @@ env:
jobs:
test-dev:
name: "Test upstream dev (${{ matrix.os }}, python: ${{ matrix.python }})"
name: "Test upstream dev (${{ matrix.os }}, python: ${{ matrix.python }}, distributed: ${{ matrix.distributed }})"
runs-on: ${{ matrix.os }}
env:
CONDA_FILE: continuous_integration/environment-${{ matrix.python }}-dev.yaml
defaults:
run:
shell: bash -l {0}
DASK_SQL_DISTRIBUTED_TESTS: ${{ matrix.distributed }}
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
python: ["3.8", "3.9", "3.10"]
distributed: [false]
include:
# run tests on a distributed client
- os: "ubuntu-latest"
python: "3.8"
distributed: true
- os: "ubuntu-latest"
python: "3.10"
distributed: true
steps:
- uses: actions/checkout@v3
with:
Expand Down Expand Up @@ -92,66 +100,6 @@ jobs:
name: test-${{ matrix.os }}-py${{ matrix.python }}-results
path: test-${{ matrix.os }}-py${{ matrix.python }}-results.jsonl

cluster-dev:
name: "Test upstream dev in a dask cluster"
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: conda-incubator/[email protected]
with:
miniforge-variant: Mambaforge
use-mamba: true
python-version: "3.9"
channel-priority: strict
activate-environment: dask-sql
environment-file: continuous_integration/environment-3.9-dev.yaml
- name: Optionally update upstream cargo dependencies
if: env.which_upstream == 'DataFusion'
run: |
cd dask_planner
bash update-dependencies.sh
- name: Build the Rust DataFusion bindings
run: |
python setup.py build install
- name: Install cluster dependencies
run: |
# TODO: add pytest-reportlog to testing environments if we move over to JSONL output
mamba install pytest-reportlog python-blosc lz4 -c conda-forge
which python
pip list
mamba list
- name: Install upstream dev Dask
if: env.which_upstream == 'Dask'
run: |
mamba install --no-channel-priority dask/label/dev::dask
- name: run a dask cluster
run: |
if [[ $which_upstream == "Dask" ]]; then
docker-compose -f continuous_integration/cluster/upstream.yml up -d
else
docker-compose -f continuous_integration/cluster/stable.yml up -d
fi
# periodically ping logs until a connection has been established; assume failure after 2 minutes
timeout 2m bash -c 'until docker logs dask-worker 2>&1 | grep -q "Starting established connection"; do sleep 1; done'
docker logs dask-scheduler
docker logs dask-worker
- name: Test with pytest while running an independent dask cluster
id: run_tests
run: |
DASK_SQL_TEST_SCHEDULER="tcp:https://127.0.0.1:8786" pytest --report-log test-cluster-results.jsonl --cov-report=xml -n auto tests --dist loadfile
- name: Upload pytest results for failure
if: |
always()
&& steps.run_tests.outcome != 'skipped'
uses: actions/upload-artifact@v3
with:
name: test-cluster-results
path: test-cluster-results.jsonl

import-dev:
name: "Test importing with bare requirements and upstream dev"
runs-on: ubuntu-latest
Expand Down
65 changes: 11 additions & 54 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,26 @@ jobs:
keyword: "[test-upstream]"

test:
name: "Build & Test (${{ matrix.os }}, python: ${{ matrix.python }})"
name: "Build & Test (${{ matrix.os }}, python: ${{ matrix.python }}, distributed: ${{ matrix.distributed }})"
needs: [detect-ci-trigger]
runs-on: ${{ matrix.os }}
env:
CONDA_FILE: continuous_integration/environment-${{ matrix.python }}-dev.yaml
DASK_SQL_DISTRIBUTED_TESTS: ${{ matrix.distributed }}
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
python: ["3.8", "3.9", "3.10"]
distributed: [false]
include:
# run tests on a distributed client
- os: "ubuntu-latest"
python: "3.8"
distributed: true
- os: "ubuntu-latest"
python: "3.10"
distributed: true
steps:
- uses: actions/checkout@v3
- name: Set up Python
Expand Down Expand Up @@ -86,59 +96,6 @@ jobs:
if: github.repository == 'dask-contrib/dask-sql'
uses: codecov/codecov-action@v3

cluster:
name: "Test in a dask cluster"
needs: [detect-ci-trigger]
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: conda-incubator/[email protected]
with:
miniforge-variant: Mambaforge
use-mamba: true
python-version: "3.9"
channel-priority: strict
activate-environment: dask-sql
environment-file: continuous_integration/environment-3.9-dev.yaml
- name: Cache Rust
uses: Swatinem/rust-cache@v2
with:
workspaces: dask_planner
shared-key: test
- name: Build the Rust DataFusion bindings
run: |
python setup.py build install
- name: Install dependencies
run: |
mamba install python-blosc lz4 -c conda-forge
which python
pip list
mamba list
- name: Optionally install upstream dev Dask
if: needs.detect-ci-trigger.outputs.triggered == 'true'
run: |
mamba install --no-channel-priority dask/label/dev::dask
- name: run a dask cluster
env:
UPSTREAM: ${{ needs.detect-ci-trigger.outputs.triggered }}
run: |
if [[ $UPSTREAM == "true" ]]; then
docker-compose -f continuous_integration/cluster/upstream.yml up -d
else
docker-compose -f continuous_integration/cluster/stable.yml up -d
fi
# periodically ping logs until a connection has been established; assume failure after 2 minutes
timeout 2m bash -c 'until docker logs dask-worker 2>&1 | grep -q "Starting established connection"; do sleep 1; done'
docker logs dask-scheduler
docker logs dask-worker
- name: Test with pytest while running an independent dask cluster
run: |
DASK_SQL_TEST_SCHEDULER="tcp:https://127.0.0.1:8786" pytest tests
import:
name: "Test importing with bare requirements"
needs: [detect-ci-trigger]
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ if you need it.
process your data in exactly the way that is easiest for you.
* **Infinite Scaling**: using the power of the great `Dask` ecosystem, your computations can scale as you need it - from your laptop to your super cluster - without changing any line of SQL code. From k8s to cloud deployments, from batch systems to YARN - if `Dask` [supports it](https://docs.dask.org/en/latest/setup.html), so will `dask-sql`.
* **Your data - your queries**: Use Python user-defined functions (UDFs) in SQL without any performance drawback and extend your SQL queries with the large number of Python libraries, e.g. machine learning, different complicated input formats, complex statistics.
* **Easy to install and maintain**: `dask-sql` is just a pip/conda install away (or a docker run if you prefer). No need for complicated cluster setups - `dask-sql` will run out of the box on your machine and can be easily connected to your computing cluster.
* **Easy to install and maintain**: `dask-sql` is just a pip/conda install away (or a docker run if you prefer).
* **Use SQL from wherever you like**: `dask-sql` integrates with your jupyter notebook, your normal Python module or can be used as a standalone SQL server from any BI tool. It even integrates natively with [Apache Hue](https://gethue.com/).
* **GPU Support**: `dask-sql` supports running SQL queries on CUDA-enabled GPUs by utilizing [RAPIDS](https://rapids.ai) libraries like [`cuDF`](https://github.com/rapidsai/cudf), enabling accelerated compute for SQL.

Expand Down
7 changes: 0 additions & 7 deletions continuous_integration/cluster/environment.yml

This file was deleted.

22 changes: 0 additions & 22 deletions continuous_integration/cluster/stable.yml

This file was deleted.

24 changes: 0 additions & 24 deletions continuous_integration/cluster/upstream.yml

This file was deleted.

5 changes: 0 additions & 5 deletions dask_sql/physical/rel/custom/wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
from sklearn.metrics import make_scorer
from sklearn.utils.validation import check_is_fitted

from dask_sql.utils import make_pickable_without_dask_sql

try:
import sklearn.base
import sklearn.metrics
Expand Down Expand Up @@ -597,7 +595,6 @@ def handle_empty_partitions(output_meta):
return output_meta.iloc[:0, :]


@make_pickable_without_dask_sql
def _predict(part, estimator, output_meta=None):
if part.shape[0] == 0 and output_meta is not None:
empty_output = handle_empty_partitions(output_meta)
Expand All @@ -606,7 +603,6 @@ def _predict(part, estimator, output_meta=None):
return estimator.predict(part)


@make_pickable_without_dask_sql
def _predict_proba(part, estimator, output_meta=None):
if part.shape[0] == 0 and output_meta is not None:
empty_output = handle_empty_partitions(output_meta)
Expand All @@ -615,7 +611,6 @@ def _predict_proba(part, estimator, output_meta=None):
return estimator.predict_proba(part)


@make_pickable_without_dask_sql
def _transform(part, estimator, output_meta=None):
if part.shape[0] == 0 and output_meta is not None:
empty_output = handle_empty_partitions(output_meta)
Expand Down
10 changes: 2 additions & 8 deletions dask_sql/physical/rel/logical/window.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,7 @@
from dask_sql.physical.rel.base import BaseRelPlugin
from dask_sql.physical.rex.convert import RexConverter
from dask_sql.physical.utils.sort import sort_partition_func
from dask_sql.utils import (
LoggableDataFrame,
make_pickable_without_dask_sql,
new_temporary_column,
)
from dask_sql.utils import LoggableDataFrame, new_temporary_column

if TYPE_CHECKING:
import dask_sql
Expand Down Expand Up @@ -345,9 +341,7 @@ def _apply_window(
# TODO: That is a bit of a hack. We should really use the real column dtype
meta = df._meta.assign(**{col: 0.0 for col in newly_created_columns})

df = df.groupby(group_columns, dropna=False).apply(
make_pickable_without_dask_sql(filled_map), meta=meta
)
df = df.groupby(group_columns, dropna=False).apply(filled_map, meta=meta)
logger.debug(
f"Having created a dataframe {LoggableDataFrame(df)} after windowing. Will now drop {temporary_columns}."
)
Expand Down
3 changes: 1 addition & 2 deletions dask_sql/physical/rex/core/call.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
is_cudf_type,
is_datetime,
is_frame,
make_pickable_without_dask_sql,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -809,7 +808,7 @@ def random_frame(self, seed: int, dc: DataContainer, **kwargs) -> dd.Series:
state_data = random_state_data(df.npartitions, random_state)
dsk = {
(name, i): (
make_pickable_without_dask_sql(self.random_function),
self.random_function,
(df._name, i),
np.random.RandomState(state),
kwargs,
Expand Down
4 changes: 2 additions & 2 deletions dask_sql/physical/utils/sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dask import config as dask_config
from dask.utils import M

from dask_sql.utils import is_cudf_type, make_pickable_without_dask_sql
from dask_sql.utils import is_cudf_type


def apply_sort(
Expand Down Expand Up @@ -68,7 +68,7 @@ def apply_sort(
by=sort_columns[0],
ascending=sort_ascending[0],
na_position="first" if sort_null_first[0] else "last",
sort_function=make_pickable_without_dask_sql(sort_partition_func),
sort_function=(sort_partition_func),
sort_function_kwargs={
"sort_columns": sort_columns,
"sort_ascending": sort_ascending,
Expand Down
3 changes: 0 additions & 3 deletions dask_sql/physical/utils/statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
from dask.layers import DataFrameIOLayer
from dask.utils_test import hlg_layer

from dask_sql.utils import make_pickable_without_dask_sql

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -145,7 +143,6 @@ def parquet_statistics(
return _read_partition_stats_group(parts, fs, engine, columns=columns)


@make_pickable_without_dask_sql
def _read_partition_stats_group(parts, fs, engine, columns=None):
def _read_partition_stats(part, fs, columns=None):
# Helper function to read Parquet-metadata
Expand Down
Loading

0 comments on commit e68817f

Please sign in to comment.