Skip to content

Commit

Permalink
Make all ExtClients plain-ole-python objects (dagster-io#16352)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Making this class hierarchy not inherit from `ConfigurableResource`. To avoid forcing users from using `ResourceParam`, with every use, I am proposing a pattern where we export the annotated type. We could support this more directly and ergonomically in the framework, but I think this the user-facing behavior we want for ext users.

## How I Tested These Changes

BK
  • Loading branch information
schrockn authored Sep 7, 2023
1 parent e62c14f commit 6158437
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 30 deletions.
3 changes: 1 addition & 2 deletions python_modules/dagster/dagster/_core/ext/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@
encode_env_var,
)

from dagster._config.pythonic_config import ConfigurableResource
from dagster._core.execution.context.compute import OpExecutionContext

if TYPE_CHECKING:
from dagster._core.ext.context import ExtOrchestrationContext


class ExtClient(ConfigurableResource, ABC):
class ExtClient(ABC):
def get_base_env(self) -> Mapping[str, str]:
return {DAGSTER_EXT_ENV_KEYS["is_orchestration_active"]: encode_env_var(True)}

Expand Down
28 changes: 19 additions & 9 deletions python_modules/dagster/dagster/_core/ext/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
from typing import Iterator, Mapping, Optional, Sequence, Union

from dagster_ext import ExtExtras
from pydantic import Field

from dagster import _check as check
from dagster._core.definitions.resource_annotation import ResourceParam
from dagster._core.errors import DagsterExternalExecutionError
from dagster._core.execution.context.compute import OpExecutionContext
from dagster._core.ext.client import (
Expand All @@ -27,14 +28,20 @@
_MESSAGE_READER_FILENAME = "messages"


class ExtSubprocess(ExtClient):
cwd: Optional[str] = Field(
default=None, description="Working directory in which to launch the subprocess command."
)
env: Optional[Mapping[str, str]] = Field(
default=None,
description="An optional dict of environment variables to pass to the subprocess.",
)
class _ExtSubprocess(ExtClient):
"""An ext client that runs a subprocess with the given command and environment.
By default parameters are injected via environment variables. And then context is passed via
a temp file, and structured messages are read from from a temp file.
Args:
env (Optional[Mapping[str, str]]): An optional dict of environment variables to pass to the subprocess.
cwd (Optional[str]): Working directory in which to launch the subprocess command.
"""

def __init__(self, env: Optional[Mapping[str, str]] = None, cwd: Optional[str] = None):
self.env = check.opt_mapping_param(env, "env", key_type=str, value_type=str)
self.cwd = check.opt_str_param(cwd, "cwd")

def run(
self,
Expand Down Expand Up @@ -88,3 +95,6 @@ def _setup_io(
message_reader.read_messages(external_context)
)
yield io_params_as_env_vars(context_injector_params, message_reader_params)


ExtSubprocess = ResourceParam[_ExtSubprocess]
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
from typing import Any, Iterator, Mapping, Optional, Sequence, Tuple, Union

import docker
from dagster import OpExecutionContext
from dagster import (
OpExecutionContext,
ResourceParam,
_check as check,
)
from dagster._core.ext.client import (
ExtClient,
ExtContextInjector,
Expand All @@ -22,7 +26,6 @@
ExtExtras,
ExtParams,
)
from pydantic import Field


class DockerLogsMessageReader(ExtMessageReader):
Expand All @@ -38,22 +41,22 @@ def consume_docker_logs(self, container, ext_context):
extract_message_or_forward_to_stdout(ext_context, log_line)


class ExtDocker(ExtClient):
class _ExtDocker(ExtClient):
"""An ext protocol compliant resource for launching docker containers.
By default context is injected via environment variables and messages are parsed out of the
log stream and other logs are forwarded to stdout of the orchestration process.
"""
env: Optional[Mapping[str, str]] = Field(
default=None,
description="An optional dict of environment variables to set on the container.",
)
Args:
env (Optional[Mapping[str, str]]): An optional dict of environment variables to pass to the subprocess.
register (Optional[Mapping[str, str]]): An optional dict of registry credentials to login the docker client.
"""

registry: Optional[Mapping[str, str]] = Field(
default=None,
description="An optional dict of registry credentials to use to login the docker client.",
)
def __init__(
self, env: Optional[Mapping[str, str]] = None, registry: Optional[Mapping[str, str]] = None
):
self.env = check.opt_mapping_param(env, "env", key_type=str, value_type=str)
self.registry = check.opt_mapping_param(registry, "registry", key_type=str, value_type=str)

def run(
self,
Expand Down Expand Up @@ -177,3 +180,6 @@ def _setup_ext_protocol(
) as mr_params:
protocol_envs = io_params_as_env_vars(ci_params, mr_params)
yield protocol_envs, message_reader


ExtDocker = ResourceParam[_ExtDocker]
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
from typing import Any, Iterator, Mapping, Optional, Sequence, Union

import kubernetes
from dagster import OpExecutionContext
from dagster import (
OpExecutionContext,
_check as check,
)
from dagster._core.definitions.resource_annotation import ResourceParam
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.ext.client import (
ExtClient,
Expand All @@ -24,7 +28,6 @@
ExtDefaultMessageWriter,
ExtExtras,
)
from pydantic import Field

from dagster_k8s.utils import get_common_labels

Expand Down Expand Up @@ -67,20 +70,21 @@ def consume_pod_logs(
extract_message_or_forward_to_stdout(ext_context, log_line)


class ExtK8sPod(ExtClient):
class _ExtK8sPod(ExtClient):
"""An ext protocol compliant resource for launching kubernetes pods.
By default context is injected via environment variables and messages are parsed out of
the pod logs, with other logs forwarded to stdout of the orchestration process.
The first container within the containers list of the pod spec is expected (or set) to be
the container prepared for ext protocol communication.
Args:
env (Optional[Mapping[str, str]]): An optional dict of environment variables to pass to the subprocess.
"""

env: Optional[Mapping[str, str]] = Field(
default=None,
description="An optional dict of environment variables to set on the container.",
)
def __init__(self, env: Optional[Mapping[str, str]] = None):
self.env = check.opt_mapping_param(env, "env", key_type=str, value_type=str)

def run(
self,
Expand Down Expand Up @@ -236,3 +240,6 @@ def _setup_ext_protocol(
) as mr_params:
protocol_envs = io_params_as_env_vars(ci_params, mr_params)
yield protocol_envs, message_reader


ExtK8sPod = ResourceParam[_ExtK8sPod]

0 comments on commit 6158437

Please sign in to comment.