August 14, 20233 minute read

Parallel Computing on Dagster with Dask

Orchestrate your Dask computations and make your pipelines faster for larger data engineering and machine learning tasks.
Odette Harary
Name
Odette Harary
Handle
@odette

Dask

Dask (https://www.dask.org/) is a flexible library for parallel computing in Python which can be a helpful tool when performing larger data engineering and machine learning tasks. Let's review how to use Dagster’s resources to automate your Dask computations and make your pipelines faster.

In this guide, you’ll learn how to:

Using Dagster’s resources to set up a Dask Cluster

When using Dask with Dagster, you can define a dask_resource to set up a Dask client to share Dask configuration across multiple assets and simplify a pipeline. In this example, a local dask.distributed client is set up as a ConfigurableResource. Resources allow you to centralize your configuration in a single location, reducing redundant code in your assets.

from dask.distributed import Client
from dagster import ConfigurableResource

class dask_resource(ConfigurableResource):
    def make_dask_cluster(self) -> Client:
        client = Client()
        return client

The dask.distributed.LocalCluster API allows customization of the cluster, which can be leveraged through Dagster’s resource. In the following example, configuration is setup at the resource level, which is then leveraged by Definitions and applied to all uses of the resource.

class dask_resource(ConfigurableResource):
    n_workers : int

    def make_dask_cluster(self) -> Client:
        client = Client(LocalCluster(n_workers=self.n_workers))
        return client

defs = Definitions(
    assets=[asset_1, asset_2],
        resources={"my_dask_resource": dask_resource(n_workers= 2)})

In the following example, the LocalCluster configuration is being used in make_dask_cluster, which can be set for each @asset .

class dask_resource(ConfigurableResource):

    def make_dask_cluster(self, n_workers) -> Client:
        client = Client(LocalCluster(n_workers))
        return client

@asset
def resource_asset(my_dask_resource: dask_resource():
    return my_dask_resource.make_dask_cluster(n_workers=5)

Using the Dask Resource

Now that the Dask resource is set up, an asset can use it to parallelize computations. This asset demonstrates using the client within Dagster using the example from the Dask distributed quickstart.

from dagster import asset, MetadataValue
@asset
def simple_dask_asset(my_dask_resource: dask_resource):
    client = my_dask_resource.make_dask_cluster()
    def square(x):
        return x ** 2
    def neg(x):
        return -x
    A = client.map(square, range(10000))
    B = client.map(neg, A)
    total = client.submit(sum, B)
    return total.result()

defs = Definitions(
    assets=[simple_dask_asset ],
        resources={"my_dask_resource": dask_resource()})

Note: The Dask UI, which can accessed at https://localhost:8787/status by default, or using client.dashboard_link can be used to monitor the executions from Dagster which can be accessed through the link generated in the Dagster metadata. The Dask UI is only available while executions are in progress since Dagster is only spinning up the resource when the asset needs it.

Which can be viewed executing in the Dask UI:

Speeding up processes using Dask

One of the benefits of using Dask with Dagster is speed. In the next example, we modified the hackernews_stories asset from Building machine learning pipelines with Dagster to distribute the API call for Hacker News.

from dagster import asset, MetadataValue
import requests
import dask
import pandas as pd

@asset
def hackernews_stories_dask(context, my_dask_resource: dask_resource):
    client = my_dask_resource.make_dask_cluster()
    latest_item = requests.get(
        "https://hacker-news.firebaseio.com/v0/maxitem.json"
    ).json()
    results = []
    scope = range(latest_item - 1000, latest_item)

    @dask.delayed
    def get_single_line(item_id):
        item = requests.get(
            f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json"
        ).json()
        return item

    for item_id in scope:
        y = dask.delayed(get_single_line)(item_id)
        results.append(y)

    results = dask.compute(*results)
    df = pd.DataFrame(results)
    context.add_output_metadata({'volume': len(df)})
    client.close()
    return df

The Dask UI shows each instance of get_single_line being scheduled by Dask:

The Dagster UI can be used to review the execution timing. In this case, the original hackernews_stories takes almost two minutes to execute for 1,000 records whereas hackernews_stories_dask only takes about 18 seconds.

Building a machine learning pipeline using Dask

Now that you walked through using the dask_resource for specific assets, next you’ll build a machine learning pipeline using Dask.

First, you can use dask_ml.datasets to generate some synthetic data using the make_classification API. Choosing the appropriate chunk size can help optimize the processing, this post give more insight into choosing chunk sizes. Choosing too small of a chunk size will produce more work for Dask to manage scheduling, while choosing a larger chunk size can limit Dask’s ability to distribute the work.

from dask_ml.datasets import make_classification
from dagster import asset, MetadataValue

@asset
def my_classification_data(my_dask_resource: dask_resource):
    client = my_dask_resource.make_dask_cluster()
    X, y = make_classification(chunks=200, n_samples=10000)
    return X, y

Next, use Dask’s method, model_selection.train_test_split, to generate a training and test set of data using Dagster’s multi_asset. Dask will produce the training and test sets as four dask.arrays.

from dagster import multi_asset, AssetOut
from dask_ml import model_selection

@multi_asset(outs={"training_data": AssetOut(), "test_data": AssetOut()})
def train_test_data(my_dask_resource: DaskResource, my_classification_data):
    client = my_dask_resource.make_dask_cluster()
    X, y = my_classification_data
    X_train, X_test, y_train, y_test= model_selection.train_test_split(X, y)
    return (X_train, y_train), (X_test, y_test)

Now that you have X_train,X_test,y_train,y_test, you can search for the best classification model using sklearn’s SGDClassifier and Dask’s HyperbandSearchCV.

SGDClassifier uses an iterative training algorithm that runs multiple iterations to decrease loss or how poorly the model predicted a value. Many parameters can be used for the SGDClassifier, so rather than checking each of the combinations manually to see what the best results are, hyper-parameter tuning can be used.

Hyperparameter tuning searches through a range of different models and parameter values to find the best one for your data. In this example, Dask’s HyperbandSearchCV is being used to test each of the combinations defined in the param_dist. HyperbandSearchCV finds the best parameters by focusing on high-performing estimators versus a more random search algorithm that might try random parameters.

The parameters of the best model can be stored as Dagster metadata to view in the Dagster UI. The model is returned, which can be used for inference or predictions.

from sklearn.linear_model import SGDClassifier
from dask_ml.model_selection import HyperbandSearchCV
import numpy as np

@asset
def my_model(context, my_dask_resource: dask_resource, training_data):
    client = my_dask_resource.make_dask_cluster()
    X_train, y_train = training_data
    est = SGDClassifier(tol=1e-3)
    param_dist = {'alpha': np.logspace(-4, 0, num=1000),
                'loss': ['hinge', 'log', 'modified_huber', 'squared_hinge'],
                'average': [True, False]}
    search = HyperbandSearchCV(est, param_dist)
    search.fit(X_train, y_train, classes=np.unique(y_train))
    metadata = search.best_params_
    context.add_output_metadata(metadata)
    model = search.best_estimator_
    return model

The loss, average, and alpha parameters from search.best_params_ are available in the Dagster UI.

The Dask dashboard is available while the execution is in progress and shows the tasks submitted to the Dask resource.

The last asset in the pipeline will use test_data to score the model. This can also be done in the model asset or separately, depending on your specific machine learning use case.

@asset
def my_score(context,my_dask_resource: dask_resource, test_data, my_model):
    client = my_dask_resource.make_dask_cluster()
    X_test, y_test = test_data
    my_score = my_model.score(X_test, y_test)
    context.add_output_metadata({'score': my_score})
    return my_score

The final step is loading the assets into a code location or Definitions. The dask_resource is included with the assets.

from dagster import Definitions

defs = Definitions(
    assets=[my_classification_data, train_test_data, my_model, my_score ],
        resources={"my_dask_resource": dask_resource()})

Great job setting up your Dask resource and building a machine learning pipeline using Dask in Dagster!


The Dagster Labs logo

We're always happy to hear your feedback, so please reach out to us! If you have any questions, ask them in the Dagster community Slack (join here!) or start a Github discussion. If you run into any bugs, let us know with a Github issue. And if you're interested in working with us, check out our open roles!

Follow us:


Read more filed under
Blog post category for Integration. Integration
Share this article
Share Dagster on Twitter / XCheck out Dagster on LinkedInShare Dagster on Reddit

Dagster Newsletter: Get updates delivered to your inbox

Dagster University