With Dagster 1.3 we added a new set of APIs to our resources and config systems based on Pydantic, rather than our own bespoke configuration schema API. This added more type safety and made the Python APIs more idiomatic. This guide provides information on how to migrate from our legacy resource and config system to this new system, even in large existing Dagster codebases.
A critical part of any migration process is the ability to break the migration into a series of small, low-risk changes. This reduces coordination costs and increases the parallelization of work. Without an incremental process, engineers are stuck in a world where there is a one-time, high-risk migration that requires synchronized coordination with all the other stakeholders in the code base.
We designed this new API with this in mind, allowing for old code and new code to co-exist within a single Dagster code location. When needed for backwards compatibility, config schemas defined in Pydantic can easily be converted to the old configuration schema format. And your assets and ops can access Pythonic config and resources through the old APIs attached to the context, allowing you to update assets and ops separately
To port this code to use the new config schema system, let's start by writing a new class that inherits from Config. Instead of passing a dictionary of field names and types to config_schema, we can use Python typing to declare the equivalent schema. Now we can program against that object directly rather than through a dictionary attached to the context.
from dagster import Config, Definitions, asset
classAnAssetConfig(Config):
conn_string:str
port:int@assetdefan_asset(upstream_asset, config: AnAssetConfig):assert config.conn_string
assert config.port
defs = Definitions(assets=[an_asset, upstream_asset])
job_def = defs.get_implicit_global_asset_job_def()# code to launch/execute jobs is unchanged
result = job_def.execute_in_process(
run_config={"ops":{"an_asset":{"config":{"conn_string":"foo","port":1}}}})
Notice that we did not have to change the run config dictionary passed to execute_in_process. The same thing goes for RunRequest instances in sensors and schedules: you do not have to update them in lockstep order to migrate to the Pythonic config API.
However, we do enable the use of strong typing to launch runs which you can opt into if you so choose. To do this, you can use the RunConfig object.
result = job_def.execute_in_process(
run_config=RunConfig(ops={"an_asset": AnAssetConfig(conn_string="foo", port=1)}))
The first step is to convert to a Pythonic resource. We will convert FancyDbResource to a Pythonic resource by making it inherit from ConfigurableResource . Instead of config_schema={"conn_string": str}, we can declare attributes on a class using vanilla Python typing.
from dagster import ConfigurableResource
classFancyDbResource(ConfigurableResource):
conn_string:strdefexecute(self, query:str)->None:...
The attributes declared on a class inheriting from ConfigurableResource serve as the new way to declare a configuration schema. Now, however, there's a problem: You're migrating an existing codebase that contains numerous callsites to the old fancy_db_resource function annotated with @resource. You have declared the config schema twice, once on @resource and once on the class. This is fine for now as the config schema is simple, but for more complicated schemas this can be a problem.
To assist with this, you can use the old resource API and our recommended pattern for constructing the old-style resource from the new one:
from dagster import InitResourceContext, resource
@resource(config_schema=FancyDbResource.to_config_schema())deffancy_db_resource(context: InitResourceContext)-> FancyDbResource:return FancyDbResource.from_resource_context(context)# old-style resource API still works, but the Pythonic resource is the source of truth# for schema information and implementation
defs = Definitions(# ...
resources={"fancy_db": fancy_db_resource.configured({"conn_string":"some_value"})},)
In this example, we've written a Pythonic resource while passing the old-style resource into our Definitions object. This allows us to have a single source of truth for the config schema.
Step 2: Providing the Pythonic resource to Definitions#
Next, we'll change Definitions to include the Pythonic resource. Note that we don't need to migrate our asset code at the same time, as Pythonic resources are available via the asset's context:
from dagster import AssetExecutionContext, ConfigurableResource, Definitions, asset
classFancyDbResource(ConfigurableResource):
conn_string:strdefexecute(self, query:str)->None:...@asset(required_resource_keys={"fancy_db"})defasset_one(context: AssetExecutionContext)->None:# this still works because the resource is still available on the contextassert context.resources.fancy_db
defs = Definitions(
assets=[asset_one],
resources={"fancy_db": FancyDbResource(conn_string="some_value")},)
Lastly, we'll update the asset to take the resource as a parameter:
from dagster import AssetExecutionContext, asset
@assetdefasset_one(context: AssetExecutionContext, fancy_db: FancyDbResource)->None:assert fancy_db
Migrating resources that use separate objects for business logic#
A common pattern is to separate the interaction with the configuration system from the object that contains the actual business logic. For example:
Pre-existing or third-party code that doesn't know about the configuration system - and shouldn't
A complex client that requires mutable state and bookkeeping
In these cases, using the old resource API, you would have written a @resource decorator that returns that object directly.
# Pre-existing code that you don't want to alterclassFancyDbClient:def__init__(self, conn_string:str)->None:
self.conn_string = conn_string
defexecute_query(self, query:str)->None:...# Alternatively could have been imported from third-party library# from fancy_db import FancyDbClientfrom dagster import AssetExecutionContext, InitResourceContext, asset, resource
@resource(config_schema={"conn_string":str})deffancy_db_resource(context: InitResourceContext)-> FancyDbClient:return FancyDbClient(context.resource_config["conn_string"])@asset(required_resource_keys={"fancy_db"})defexisting_asset(context: AssetExecutionContext)->None:
context.resources.fancy_db.execute_query("SELECT * FROM foo")
With Pythonic-style resources you would write a class that can return that client from a method. In code that consumes that resource you would call that method to access the underlying client.
from dagster import AssetExecutionContext, ConfigurableResource, asset
classFancyDbResource(ConfigurableResource):
conn_string:strdefget_client(self)-> FancyDbClient:return FancyDbClient(self.conn_string)@assetdefnew_asset(fancy_db: FancyDbResource)->None:
client = fancy_db.get_client()
client.execute_query("SELECT * FROM foo")
In the old API, @resource functions could also be context managers to handle initialization and cleanup tasks. This context manager was called by framework code rather than user code:
from dagster import AssetExecutionContext, InitResourceContext, asset, resource
@resource(config_schema={"conn_string":str})deffancy_db_resource(context: InitResourceContext)-> Iterator[FancyDbClient]:
some_expensive_setup()try:# the client is yielded to the assets that require ityield FancyDbClient(context.resource_config["conn_string"])finally:# this is only called once the asset has finished executing
some_expensive_teardown()@asset(required_resource_keys={"fancy_db"})defasset_one(context: AssetExecutionContext)->None:# some_expensive_setup() has been called, but some_expensive_teardown() has not
context.resources.fancy_db.execute_query("SELECT * FROM foo")
This could cause confusion and difficult-to-understand stack traces. With Pythonic resources, you can manage this directly in the body of the asset or op:
from contextlib import contextmanager
from dagster import ConfigurableResource, asset
classFancyDbResource(ConfigurableResource):
conn_string:str@contextmanagerdefget_client(self)-> Iterator[FancyDbClient]:try:
some_expensive_setup()yield FancyDbClient(self.conn_string)finally:
some_expensive_teardown()@assetdefasset_one(fancy_db: FancyDbResource)->None:with fancy_db.get_client()as client:
client.execute_query("SELECT * FROM foo")
Migrating code with resources with separate business objects#
While there are benefits to managing object access in a resource rather than having the @resource factory function return the object, this does present a problem with performing an at-scale migration. The old code will expect the business object on the context object while the new code will expect the enclosing resource object when it's accessed as a parameter.
@asset(required_resource_keys={"fancy_db"})defexisting_asset(context: AssetExecutionContext)->None:# This code is now broken because the resource is no longer a FancyDbClient# but it is a FancyDbResource.
context.resources.fancy_db.execute_query("SELECT * FROM foo")
Ultimately, we want the underlying client to reside in the context of the old code, but in the new code, have the new resource passed to the asset.
You can accomplish this by using Dagster's framework support, IAttachDifferentObjectToOpContext. Implementing this interface allows you to instruct the framework to place a different object on the context object.
This framework can be implemented while you migrate your code, so that both new and old code can co-exist:
from dagster import(
AssetExecutionContext,
ConfigurableResource,
IAttachDifferentObjectToOpContext,
asset,)classFancyDbResource(ConfigurableResource, IAttachDifferentObjectToOpContext):
conn_string:strdefget_object_to_set_on_execution_context(self)-> FancyDbClient:return self.get_client()defget_client(self)-> FancyDbClient:return FancyDbClient(self.conn_string)@assetdefnew_asset(fancy_db: FancyDbResource)->None:
client = fancy_db.get_client()
client.execute_query("SELECT * FROM foo")@asset(required_resource_keys={"fancy_db"})defexisting_asset(context: AssetExecutionContext)->None:# This code now works because context.resources.fancy_db is now a FancyDbClient
context.resources.fancy_db.execute_query("SELECT * FROM foo")