Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Retire openapi3, use openapi-service-client instead #7514

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Retire openapi3, use openapi-service-client instead
  • Loading branch information
vblagoje committed May 23, 2024
commit 0d9c324a2d67fec02d7596b3463bd8659facd3b6
147 changes: 13 additions & 134 deletions haystack/components/connectors/openapi_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
# SPDX-License-Identifier: Apache-2.0

import json
from collections import defaultdict
from copy import copy
from typing import Any, Dict, List, Optional, Union

from haystack import component, logging
Expand All @@ -13,15 +11,13 @@

logger = logging.getLogger(__name__)

with LazyImport("Run 'pip install openapi3'") as openapi_imports:
from openapi3 import OpenAPI
with LazyImport("Run 'pip install openapi-service-client'") as openapi_imports:
from openapi_service_client import OpenAPIServiceClient, OpenAPIServiceClientConfigurationBuilder


@component
class OpenAPIServiceConnector:
"""
A component which connects the Haystack framework to OpenAPI services.

The `OpenAPIServiceConnector` component connects the Haystack framework to OpenAPI services, enabling it to call
operations as defined in the OpenAPI specification of the service.

Expand Down Expand Up @@ -83,10 +79,8 @@ def run(
service_credentials: Optional[Union[dict, str]] = None,
) -> Dict[str, List[ChatMessage]]:
"""
Processes a list of chat messages to invoke a method on an OpenAPI service.

It parses the last message in the list, expecting it to contain an OpenAI function calling descriptor
(name & parameters) in JSON format.
Processes a list of chat messages to invoke a method on an OpenAPI service. It parses the last message in the
list, expecting it to contain an OpenAI function calling descriptor (name & parameters) in JSON format.

:param messages: A list of `ChatMessage` objects containing the messages to be processed. The last message
should contain the function invocation payload in OpenAI function calling format. See the example in the class
Expand All @@ -112,18 +106,18 @@ def run(
function_invocation_payloads = self._parse_message(last_message)

# instantiate the OpenAPI service for the given specification
openapi_service = OpenAPI(service_openapi_spec)
self._authenticate_service(openapi_service, service_credentials)
builder = OpenAPIServiceClientConfigurationBuilder()
config_openapi = builder.with_openapi_spec(service_openapi_spec).with_credentials(service_credentials).build()
openapi_service = OpenAPIServiceClient(config_openapi)

response_messages = []
for method_invocation_descriptor in function_invocation_payloads:
service_response = self._invoke_method(openapi_service, method_invocation_descriptor)
# openapi3 parses the JSON service response into a model object, which is not our focus at the moment.
# Instead, we require direct access to the raw JSON data of the response, rather than the model objects
# provided by the openapi3 library. This approach helps us avoid issues related to (de)serialization.
# By accessing the raw JSON response through `service_response._raw_data`, we can serialize this data
# into a string. Finally, we use this string to create a ChatMessage object.
response_messages.append(ChatMessage.from_user(json.dumps(service_response._raw_data)))
try:
service_response = openapi_service.invoke(method_invocation_descriptor)
except Exception as e:
logger.error(f"Error invoking function: {method_invocation_descriptor['name']}. Error: {e}")
service_response = {"error": str(e)}
response_messages.append(ChatMessage.from_user(json.dumps(service_response)))

return {"service_response": response_messages}

Expand Down Expand Up @@ -153,118 +147,3 @@ def _parse_message(self, message: ChatMessage) -> List[Dict[str, Any]]:
{"arguments": json.loads(function_call["arguments"]), "name": function_call["name"]}
)
return function_payloads

def _authenticate_service(self, openapi_service: OpenAPI, credentials: Optional[Union[dict, str]] = None):
"""
Authentication with an OpenAPI service.

Authenticates with the OpenAPI service if required, supporting both single (str) and multiple
authentication methods (dict).

OpenAPI spec v3 supports the following security schemes:
http – for Basic, Bearer and other HTTP authentications schemes
apiKey – for API keys and cookie authentication
oauth2 – for OAuth 2
openIdConnect – for OpenID Connect Discovery

Currently, only the http and apiKey schemes are supported. Multiple security schemes can be defined in the
OpenAPI spec, and the credentials should be provided as a dictionary with keys matching the security scheme
names. If only one security scheme is defined, the credentials can be provided as a simple string.

:param openapi_service: The OpenAPI service instance.
:param credentials: Credentials for authentication, which can be either a string (e.g. token) or a dictionary
with keys matching the authentication method names.
:raises ValueError: If authentication fails, is not found, or if appropriate credentials are missing.
"""
if openapi_service.raw_element.get("components", {}).get("securitySchemes"):
service_name = openapi_service.info.title
if not credentials:
raise ValueError(f"Service {service_name} requires authentication but no credentials were provided.")

# a dictionary of security schemes defined in the OpenAPI spec
# each key is the name of the security scheme, and the value is the scheme definition
security_schemes = openapi_service.components.securitySchemes.raw_element
supported_schemes = ["http", "apiKey"] # todo: add support for oauth2 and openIdConnect

authenticated = False
for scheme_name, scheme in security_schemes.items():
if scheme["type"] in supported_schemes:
auth_credentials = None
if isinstance(credentials, str):
auth_credentials = credentials
elif isinstance(credentials, dict) and scheme_name in credentials:
auth_credentials = credentials[scheme_name]
if auth_credentials:
openapi_service.authenticate(scheme_name, auth_credentials)
authenticated = True
break

raise ValueError(
f"Service {service_name} requires {scheme_name} security scheme but no "
f"credentials were provided for it. Check the service configuration and credentials."
)
if not authenticated:
raise ValueError(
f"Service {service_name} requires authentication but no credentials were provided "
f"for it. Check the service configuration and credentials."
)

def _invoke_method(self, openapi_service: OpenAPI, method_invocation_descriptor: Dict[str, Any]) -> Any:
"""
Invokes the specified method on the OpenAPI service.

The method name and arguments are passed in the method_invocation_descriptor.

:param openapi_service: The OpenAPI service instance.
:param method_invocation_descriptor: The method name and arguments to be passed to the method. The payload
should contain the method name (key: "name") and the arguments (key: "arguments"). The name is a string, and
the arguments are a dictionary of key-value pairs.
:return: A service JSON response.
:raises RuntimeError: If the method is not found or invocation fails.
"""
name = method_invocation_descriptor.get("name")
invocation_arguments = copy(method_invocation_descriptor.get("arguments", {}))
if not name or not invocation_arguments:
raise ValueError(
f"Invalid function calling descriptor: {method_invocation_descriptor} . It should contain "
f"a method name and arguments."
)

# openapi3 specific method to call the operation, do we have it?
method_to_call = getattr(openapi_service, f"call_{name}", None)
if not callable(method_to_call):
raise RuntimeError(f"Operation {name} not found in OpenAPI specification {openapi_service.info.title}")

# get the operation reference from the method_to_call
operation = method_to_call.operation.__self__
operation_dict = operation.raw_element

# Pack URL/query parameters under "parameters" key
method_call_params: Dict[str, Dict[str, Any]] = defaultdict(dict)
parameters = operation_dict.get("parameters", [])
request_body = operation_dict.get("requestBody", {})

for param in parameters:
param_name = param["name"]
param_value = invocation_arguments.get(param_name)
if param_value:
method_call_params["parameters"][param_name] = param_value
else:
if param.get("required", False):
raise ValueError(f"Missing parameter: '{param_name}' required for the '{name}' operation.")

# Pack request body parameters under "data" key
if request_body:
schema = request_body.get("content", {}).get("application/json", {}).get("schema", {})
required_params = schema.get("required", [])
for param_name in schema.get("properties", {}):
param_value = invocation_arguments.get(param_name)
if param_value:
method_call_params["data"][param_name] = param_value
else:
if param_name in required_params:
raise ValueError(
f"Missing requestBody parameter: '{param_name}' required for the '{name}' operation."
)
# call the underlying service REST API with the parameters
return method_to_call(**method_call_params)
Loading