Skip to content

Commit

Permalink
[2/2][Serve] Support dynamic log (ray-project#40735)
Browse files Browse the repository at this point in the history
Follow up with the pr to implement the dynamic logging.
---------

Signed-off-by: Sihan Wang <[email protected]>
  • Loading branch information
sihanwang41 committed Nov 9, 2023
1 parent 99b1a2c commit 00a1dcb
Show file tree
Hide file tree
Showing 24 changed files with 880 additions and 51 deletions.
3 changes: 3 additions & 0 deletions dashboard/modules/serve/serve_rest_api_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ async def put_all_applications(self, req: Request) -> Response:
client = await serve_start_async(
http_options=full_http_options,
grpc_options=grpc_options,
logging_config=config.logging_config,
)

# Serve ignores HTTP options if it was already running when
Expand All @@ -185,6 +186,8 @@ async def put_all_applications(self, req: Request) -> Response:
self.validate_http_options(client, full_http_options)

try:
if config.logging_config:
client.update_system_logging_config(config.logging_config)
client.deploy_apps(config)
record_extra_usage_tag(TagKey.SERVE_REST_API_VERSION, "v2")
except RayTaskError as e:
Expand Down
14 changes: 12 additions & 2 deletions python/ray/serve/_private/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from ray.serve.context import _get_global_client, _set_global_client
from ray.serve.deployment import Application, Deployment
from ray.serve.exceptions import RayServeException
from ray.serve.schema import LoggingConfig

logger = logging.getLogger(__file__)

Expand Down Expand Up @@ -110,6 +111,7 @@ def _check_http_options(
def _start_controller(
http_options: Union[None, dict, HTTPOptions] = None,
grpc_options: Union[None, dict, gRPCOptions] = None,
system_logging_config: Union[None, dict, LoggingConfig] = None,
**kwargs,
) -> Tuple[ActorHandle, str]:
"""Start Ray Serve controller.
Expand Down Expand Up @@ -155,10 +157,16 @@ def _start_controller(
if isinstance(grpc_options, dict):
grpc_options = gRPCOptions(**grpc_options)

if system_logging_config is None:
system_logging_config = LoggingConfig()
elif isinstance(system_logging_config, dict):
system_logging_config = LoggingConfig(**system_logging_config)

controller = ServeController.options(**controller_actor_options).remote(
SERVE_CONTROLLER_NAME,
http_config=http_options,
grpc_options=grpc_options,
system_logging_config=system_logging_config,
)

proxy_handles = ray.get(controller.get_proxies.remote())
Expand All @@ -178,6 +186,7 @@ def _start_controller(
async def serve_start_async(
http_options: Union[None, dict, HTTPOptions] = None,
grpc_options: Union[None, dict, gRPCOptions] = None,
system_logging_config: Union[None, dict, LoggingConfig] = None,
**kwargs,
) -> ServeControllerClient:
"""Initialize a serve instance asynchronously.
Expand Down Expand Up @@ -207,7 +216,7 @@ async def serve_start_async(
controller, controller_name = (
await ray.remote(_start_controller)
.options(num_cpus=0)
.remote(http_options, grpc_options, **kwargs)
.remote(http_options, grpc_options, system_logging_config, **kwargs)
)

client = ServeControllerClient(
Expand All @@ -222,6 +231,7 @@ async def serve_start_async(
def serve_start(
http_options: Union[None, dict, HTTPOptions] = None,
grpc_options: Union[None, dict, gRPCOptions] = None,
system_logging_config: Union[None, dict, LoggingConfig] = None,
**kwargs,
) -> ServeControllerClient:
"""Initialize a serve instance.
Expand Down Expand Up @@ -279,7 +289,7 @@ def serve_start(
pass

controller, controller_name = _start_controller(
http_options, grpc_options, **kwargs
http_options, grpc_options, system_logging_config, **kwargs
)

client = ServeControllerClient(
Expand Down
18 changes: 16 additions & 2 deletions python/ray/serve/_private/application_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ def apply_deployment_info(
)

deployment_id = DeploymentID(deployment_name, self._name)

self._deployment_state_manager.deploy(deployment_id, deployment_info)

if deployment_info.route_prefix is not None:
Expand Down Expand Up @@ -572,7 +573,18 @@ def _reconcile_target_deployments(self) -> None:

# Set target state for each deployment
for deployment_name, info in self._target_state.deployment_infos.items():
self.apply_deployment_info(deployment_name, info)
deploy_info = deepcopy(info)
# Apply the application logging config to the deployment logging config
# if it is not set.
if (
self._target_state.config
and self._target_state.config.logging_config
and deploy_info.deployment_config.logging_config is None
):
deploy_info.deployment_config.logging_config = (
self._target_state.config.logging_config
)
self.apply_deployment_info(deployment_name, deploy_info)

# Delete outdated deployments
for deployment_name in self._get_live_deployments():
Expand Down Expand Up @@ -888,6 +900,9 @@ def build_serve_application(
name: application name. If specified, application will be deployed
without removing existing applications.
args: Arguments to be passed to the application builder.
logging_config: The application logging config, if deployment logging
config is not set, application logging config will be applied to the
deployment logging config.
Returns:
Deploy arguments: a list of deployment arguments if application
was built successfully, otherwise None.
Expand Down Expand Up @@ -1018,7 +1033,6 @@ def override_deployment_info(
options.pop("name", None)
original_options.update(options)
override_options["deployment_config"] = DeploymentConfig(**original_options)

deployment_infos[deployment_name] = info.update(**override_options)

# Overwrite ingress route prefix
Expand Down
7 changes: 6 additions & 1 deletion python/ray/serve/_private/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
)
from ray.serve.generated.serve_pb2 import StatusOverview as StatusOverviewProto
from ray.serve.handle import DeploymentHandle, RayServeHandle, RayServeSyncHandle
from ray.serve.schema import ServeApplicationSchema, ServeDeploySchema
from ray.serve.schema import LoggingConfig, ServeApplicationSchema, ServeDeploySchema

logger = logging.getLogger(__file__)

Expand Down Expand Up @@ -588,3 +588,8 @@ def record_multiplexed_replica_info(self, info: MultiplexedReplicaInfo):
model ids.
"""
self._controller.record_multiplexed_replica_info.remote(info)

@_ensure_connected
def update_system_logging_config(self, logging_config: LoggingConfig):
"""Reconfigure the logging config for the controller & proxies."""
self._controller.reconfigure_system_logging_config.remote(logging_config)
36 changes: 36 additions & 0 deletions python/ray/serve/_private/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
from ray.serve.generated.serve_pb2 import AutoscalingConfig as AutoscalingConfigProto
from ray.serve.generated.serve_pb2 import DeploymentConfig as DeploymentConfigProto
from ray.serve.generated.serve_pb2 import DeploymentLanguage
from ray.serve.generated.serve_pb2 import EncodingType as EncodingTypeProto
from ray.serve.generated.serve_pb2 import LoggingConfig as LoggingConfigProto
from ray.serve.generated.serve_pb2 import ReplicaConfig as ReplicaConfigProto
from ray.util.placement_group import VALID_PLACEMENT_GROUP_STRATEGIES

Expand Down Expand Up @@ -118,6 +120,11 @@ class DeploymentConfig(BaseModel):
update_type=DeploymentOptionUpdateType.HeavyWeight,
)

logging_config: Optional[dict] = Field(
default=None,
update_type=DeploymentOptionUpdateType.NeedsActorReconfigure,
)

# Contains the names of deployment options manually set by the user
user_configured_option_names: Set[str] = set()

Expand Down Expand Up @@ -147,6 +154,22 @@ def user_config_json_serializable(cls, v):

return v

@validator("logging_config", always=True)
def logging_config_valid(cls, v):
if v is None:
return v
if not isinstance(v, dict):
raise TypeError(
f"Got invalid type '{type(v)}' for logging_config. "
"Expected a dictionary."
)
# Handle default value
from ray.serve.schema import LoggingConfig

v = LoggingConfig(**v).dict()

return v

def needs_pickle(self):
return _needs_pickle(self.deployment_language, self.is_cross_language)

Expand All @@ -159,6 +182,13 @@ def to_proto(self):
data["autoscaling_config"] = AutoscalingConfigProto(
**data["autoscaling_config"]
)
if data.get("logging_config"):
if "encoding" in data["logging_config"]:
data["logging_config"]["encoding"] = EncodingTypeProto.Value(
data["logging_config"]["encoding"]
)

data["logging_config"] = LoggingConfigProto(**data["logging_config"])
data["user_configured_option_names"] = list(
data["user_configured_option_names"]
)
Expand Down Expand Up @@ -206,6 +236,12 @@ def from_proto(cls, proto: DeploymentConfigProto):
data["user_configured_option_names"] = set(
data["user_configured_option_names"]
)
if "logging_config" in data:
if "encoding" in data["logging_config"]:
data["logging_config"]["encoding"] = EncodingTypeProto.Name(
data["logging_config"]["encoding"]
)

return cls(**data)

@classmethod
Expand Down
76 changes: 64 additions & 12 deletions python/ray/serve/_private/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
configure_component_memory_profiler,
get_component_logger_file_path,
)
from ray.serve._private.long_poll import LongPollHost
from ray.serve._private.long_poll import LongPollHost, LongPollNamespace
from ray.serve._private.proxy_state import ProxyStateManager
from ray.serve._private.storage.kv_store import RayInternalKVStore
from ray.serve._private.usage import ServeUsageTag
Expand All @@ -64,6 +64,7 @@
from ray.serve.schema import (
ApplicationDetails,
HTTPOptionsSchema,
LoggingConfig,
ServeActorDetails,
ServeApplicationSchema,
ServeDeploySchema,
Expand All @@ -79,6 +80,7 @@
_CRASH_AFTER_CHECKPOINT_PROBABILITY = 0

CONFIG_CHECKPOINT_KEY = "serve-app-config-checkpoint"
LOGGING_CONFIG_CHECKPOINT_KEY = "serve-logging-config-checkpoint"


@ray.remote(num_cpus=0)
Expand Down Expand Up @@ -112,16 +114,32 @@ async def __init__(
controller_name: str,
*,
http_config: HTTPOptions,
system_logging_config: LoggingConfig,
grpc_options: Optional[gRPCOptions] = None,
):
self._controller_node_id = ray.get_runtime_context().get_node_id()
assert (
self._controller_node_id == get_head_node_id()
), "Controller must be on the head node."

configure_component_logger(
component_name="controller", component_id=str(os.getpid())
)
self.ray_worker_namespace = ray.get_runtime_context().namespace
self.controller_name = controller_name
self.gcs_client = GcsClient(address=ray.get_runtime_context().gcs_address)
kv_store_namespace = f"{self.controller_name}-{self.ray_worker_namespace}"
self.kv_store = RayInternalKVStore(kv_store_namespace, self.gcs_client)

self.long_poll_host = LongPollHost()
self.done_recovering_event = asyncio.Event()

# Try to read config from checkpoint
# logging config from checkpoint take precedence over the one passed in
# the constructor.
self.system_logging_config = None
log_config_checkpoint = self.kv_store.get(LOGGING_CONFIG_CHECKPOINT_KEY)
if log_config_checkpoint is not None:
system_logging_config = pickle.loads(log_config_checkpoint)
self.reconfigure_system_logging_config(system_logging_config)

configure_component_memory_profiler(
component_name="controller", component_id=str(os.getpid())
)
Expand All @@ -136,22 +154,15 @@ async def __init__(
call_function_from_import_path(RAY_SERVE_CONTROLLER_CALLBACK_IMPORT_PATH)

# Used to read/write checkpoints.
self.ray_worker_namespace = ray.get_runtime_context().namespace
self.controller_name = controller_name
self.gcs_client = GcsClient(address=ray.get_runtime_context().gcs_address)
kv_store_namespace = f"{self.controller_name}-{self.ray_worker_namespace}"
self.kv_store = RayInternalKVStore(kv_store_namespace, self.gcs_client)
self.cluster_node_info_cache = create_cluster_node_info_cache(self.gcs_client)
self.cluster_node_info_cache.update()

self.long_poll_host = LongPollHost()
self.done_recovering_event = asyncio.Event()

self.proxy_state_manager = ProxyStateManager(
controller_name,
http_config,
self._controller_node_id,
self.cluster_node_info_cache,
self.system_logging_config,
grpc_options,
)

Expand Down Expand Up @@ -210,6 +221,31 @@ async def __init__(
description="The number of times that controller has started.",
).inc()

def reconfigure_system_logging_config(self, system_logging_config: LoggingConfig):
if (
self.system_logging_config
and self.system_logging_config == system_logging_config
):
return
self.kv_store.put(
LOGGING_CONFIG_CHECKPOINT_KEY, pickle.dumps(system_logging_config)
)
self.system_logging_config = system_logging_config

self.long_poll_host.notify_changed(
LongPollNamespace.SYSTEM_LOGGING_CONFIG,
system_logging_config,
)
configure_component_logger(
component_name="controller",
component_id=str(os.getpid()),
logging_config=system_logging_config,
)
logger.debug(
"Configure the serve controller logger "
f"with logging config: {self.system_logging_config}"
)

def check_alive(self) -> None:
"""No-op to check if this controller is alive."""
return
Expand Down Expand Up @@ -516,6 +552,7 @@ def shutdown(self):

logger.info("Controller shutdown started!", extra={"log_to_stderr": False})
self.kv_store.delete(CONFIG_CHECKPOINT_KEY)
self.kv_store.delete(LOGGING_CONFIG_CHECKPOINT_KEY)
self.application_state_manager.shutdown()
self.deployment_state_manager.shutdown()
self.endpoint_state.shutdown()
Expand Down Expand Up @@ -677,6 +714,11 @@ def deploy_config(
"Serve config instead."
)

# If the application logging config is not set, use the global logging
# config.
if app_config.logging_config is None and config.logging_config:
app_config.logging_config = config.logging_config

app_config_dict = app_config.dict(exclude_unset=True)
new_config_checkpoint[app_config.name] = app_config_dict

Expand Down Expand Up @@ -981,6 +1023,14 @@ def _save_cpu_profile_data(self) -> str:
"the RAY_SERVE_ENABLE_CPU_PROFILING env var."
)

def _get_logging_config(self) -> Tuple:
"""Get the logging configuration (for testing purposes)."""
log_file_path = None
for handler in logger.handlers:
if isinstance(handler, logging.handlers.RotatingFileHandler):
log_file_path = handler.baseFilename
return self.system_logging_config, log_file_path


@ray.remote(num_cpus=0)
class ServeControllerAvatar:
Expand All @@ -1006,6 +1056,7 @@ def __init__(
self._controller = None
if self._controller is None:
http_config = HTTPOptions()
logging_config = LoggingConfig()
http_config.port = http_proxy_port
self._controller = ServeController.options(
num_cpus=0,
Expand All @@ -1019,6 +1070,7 @@ def __init__(
).remote(
controller_name,
http_config=http_config,
system_logging_config=logging_config,
)

def check_alive(self) -> None:
Expand Down
Loading

0 comments on commit 00a1dcb

Please sign in to comment.