Skip to content

Commit

Permalink
Patch/bisheng langchain aio (dataelement#126)
Browse files Browse the repository at this point in the history
解决ProxyLLM 无法通过request_timout设置超时时间的问题
  • Loading branch information
yaojin3616 committed Nov 2, 2023
2 parents 1eaecec + dd031d9 commit 884f996
Show file tree
Hide file tree
Showing 5 changed files with 230 additions and 15 deletions.
40 changes: 28 additions & 12 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,13 @@ concurrency:
cancel-in-progress: true

jobs:
build:
build_bisheng_api:
runs-on: ubuntu-latest
#if: startsWith(github.event.ref, 'refs/tags')
steps:
- name: checkout
uses: actions/checkout@v2

# 登录 docker hub
- name: Login to DockerHub
uses: docker/login-action@v1
with:
# GitHub Repo => Settings => Secrets 增加 docker hub 登录密钥信息
# DOCKERHUB_USERNAME 是 docker hub 账号名.
# DOCKERHUB_TOKEN: docker hub => Account Setting => Security 创建.
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}


- name: Get version
id: get_version
run: |
Expand All @@ -54,6 +44,32 @@ jobs:
cd ./src/bisheng-langchain
python setup.py bdist_wheel
twine upload dist/* -u ${{ secrets.PYPI_USER }} -p ${{ secrets.PYPI_PASSWORD }} --repository pypi
build_bisheng:
needs: build_bisheng_api
runs-on: ubuntu-latest
# if: startsWith(github.event.ref, 'refs/tags')
steps:
- name: checkout
uses: actions/checkout@v2

- name: Get version
id: get_version
run: |
echo ::set-output name=VERSION::${GITHUB_REF/refs\/tags\//}
- name: Set Environment Variable
run: echo "RELEASE_VERSION=${{ steps.get_version.outputs.VERSION }}" >> $GITHUB_ENV

# 登录 docker hub
- name: Login to DockerHub
uses: docker/login-action@v1
with:
# GitHub Repo => Settings => Secrets 增加 docker hub 登录密钥信息
# DOCKERHUB_USERNAME 是 docker hub 账号名.
# DOCKERHUB_TOKEN: docker hub => Account Setting => Security 创建.
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}

# 构建 backend 并推送到 Docker hub
- name: Set up QEMU
Expand Down
5 changes: 4 additions & 1 deletion src/backend/bisheng/interface/initialize/loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,10 @@ def instantiate_llm(node_type, class_object, params: Dict):

# 支持request_timeout & max_retries
if hasattr(llm, 'request_timeout') and 'request_timeout' in llm_config:
llm.request_timeout = llm_config.get('request_timeout')
if isinstance(llm.request_timeout, str):
llm.request_timeout = eval(llm_config.get('request_timeout'))
else:
llm.request_timeout = llm_config.get('request_timeout')
if hasattr(llm, 'max_retries') and 'max_retries' in llm_config:
llm.max_retries = llm_config.get('max_retries')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import sys
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Mapping, Optional, Tuple, Union

from langchain import requests
from bisheng_langchain.utils import requests
from langchain.callbacks.manager import AsyncCallbackManagerForLLMRun, CallbackManagerForLLMRun
from langchain.chat_models.base import BaseChatModel
from langchain.schema import ChatGeneration, ChatResult
Expand Down Expand Up @@ -166,8 +166,8 @@ def validate_environment(cls, values: Dict) -> Dict:
@property
def _default_params(self) -> Dict[str, Any]:
"""Get the default parameters for calling ProxyChatLLM API."""
self.client.request_timeout = self.request_timeout
return {
'request_timeout': self.request_timeout,
'model': self.model_name,
'temperature': self.temperature,
'top_p': self.top_p,
Expand Down
Empty file.
196 changes: 196 additions & 0 deletions src/bisheng-langchain/bisheng_langchain/utils/requests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
"""Lightweight wrapper around requests library, with async support."""
from contextlib import asynccontextmanager
from typing import Any, AsyncGenerator, Dict, Optional

import aiohttp
import requests
from pydantic import BaseModel, Extra


class Requests(BaseModel):
"""Wrapper around requests to handle auth and async.
The main purpose of this wrapper is to handle authentication (by saving
headers) and enable easy async methods on the same base object.
"""

headers: Optional[Dict[str, str]] = None
aiosession: Optional[aiohttp.ClientSession] = None
auth: Optional[Any] = None
request_timeout: int = 120

class Config:
"""Configuration for this pydantic object."""

extra = Extra.forbid
arbitrary_types_allowed = True

def get(self, url: str, **kwargs: Any) -> requests.Response:
"""GET the URL and return the text."""
return requests.get(url,
headers=self.headers,
auth=self.auth,
timeout=self.request_timeout,
**kwargs)

def post(self, url: str, data: Dict[str, Any], **kwargs: Any) -> requests.Response:
"""POST to the URL and return the text."""
return requests.post(url,
json=data,
headers=self.headers,
auth=self.auth,
timeout=self.request_timeout,
**kwargs)

def patch(self, url: str, data: Dict[str, Any], **kwargs: Any) -> requests.Response:
"""PATCH the URL and return the text."""
return requests.patch(url,
json=data,
headers=self.headers,
auth=self.auth,
timeout=self.request_timeout,
**kwargs)

def put(self, url: str, data: Dict[str, Any], **kwargs: Any) -> requests.Response:
"""PUT the URL and return the text."""
return requests.put(url,
json=data,
headers=self.headers,
auth=self.auth,
timeout=self.request_timeout,
**kwargs)

def delete(self, url: str, **kwargs: Any) -> requests.Response:
"""DELETE the URL and return the text."""
return requests.delete(url,
headers=self.headers,
auth=self.auth,
timeout=self.request_timeout,
**kwargs)

@asynccontextmanager
async def _arequest(self, method: str, url: str,
**kwargs: Any) -> AsyncGenerator[aiohttp.ClientResponse, None]:
"""Make an async request."""
if not self.aiosession:
if not self.request_timeout:
self.request_timeout = 120
if isinstance(self.request_timeout, tuple):
timeout = aiohttp.ClientTimeout(connect=self.request_timeout[0],
total=self.request_timeout[1])
else:
timeout = aiohttp.ClientTimeout(total=self.request_timeout)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.request(method, url, headers=self.headers, **kwargs) as response:
yield response
else:
async with self.aiosession.request(method,
url,
headers=self.headers,
auth=self.auth,
**kwargs) as response:
yield response

@asynccontextmanager
async def aget(self, url: str, **kwargs: Any) -> AsyncGenerator[aiohttp.ClientResponse, None]:
"""GET the URL and return the text asynchronously."""
async with self._arequest('GET', url, auth=self.auth, **kwargs) as response:
yield response

@asynccontextmanager
async def apost(self, url: str, data: Dict[str, Any],
**kwargs: Any) -> AsyncGenerator[aiohttp.ClientResponse, None]:
"""POST to the URL and return the text asynchronously."""
async with self._arequest('POST', url, json=data, auth=self.auth, **kwargs) as response:
yield response

@asynccontextmanager
async def apatch(self, url: str, data: Dict[str, Any],
**kwargs: Any) -> AsyncGenerator[aiohttp.ClientResponse, None]:
"""PATCH the URL and return the text asynchronously."""
async with self._arequest('PATCH', url, json=data, auth=self.auth, **kwargs) as response:
yield response

@asynccontextmanager
async def aput(self, url: str, data: Dict[str, Any],
**kwargs: Any) -> AsyncGenerator[aiohttp.ClientResponse, None]:
"""PUT the URL and return the text asynchronously."""
async with self._arequest('PUT', url, json=data, auth=self.auth, **kwargs) as response:
yield response

@asynccontextmanager
async def adelete(self, url: str,
**kwargs: Any) -> AsyncGenerator[aiohttp.ClientResponse, None]:
"""DELETE the URL and return the text asynchronously."""
async with self._arequest('DELETE', url, auth=self.auth, **kwargs) as response:
yield response


class TextRequestsWrapper(BaseModel):
"""Lightweight wrapper around requests library.
The main purpose of this wrapper is to always return a text output.
"""

headers: Optional[Dict[str, str]] = None
aiosession: Optional[aiohttp.ClientSession] = None
auth: Optional[Any] = None

class Config:
"""Configuration for this pydantic object."""

extra = Extra.forbid
arbitrary_types_allowed = True

@property
def requests(self) -> Requests:
return Requests(headers=self.headers, aiosession=self.aiosession, auth=self.auth)

def get(self, url: str, **kwargs: Any) -> str:
"""GET the URL and return the text."""
return self.requests.get(url, **kwargs).text

def post(self, url: str, data: Dict[str, Any], **kwargs: Any) -> str:
"""POST to the URL and return the text."""
return self.requests.post(url, data, **kwargs).text

def patch(self, url: str, data: Dict[str, Any], **kwargs: Any) -> str:
"""PATCH the URL and return the text."""
return self.requests.patch(url, data, **kwargs).text

def put(self, url: str, data: Dict[str, Any], **kwargs: Any) -> str:
"""PUT the URL and return the text."""
return self.requests.put(url, data, **kwargs).text

def delete(self, url: str, **kwargs: Any) -> str:
"""DELETE the URL and return the text."""
return self.requests.delete(url, **kwargs).text

async def aget(self, url: str, **kwargs: Any) -> str:
"""GET the URL and return the text asynchronously."""
async with self.requests.aget(url, **kwargs) as response:
return await response.text()

async def apost(self, url: str, data: Dict[str, Any], **kwargs: Any) -> str:
"""POST to the URL and return the text asynchronously."""
async with self.requests.apost(url, data, **kwargs) as response:
return await response.text()

async def apatch(self, url: str, data: Dict[str, Any], **kwargs: Any) -> str:
"""PATCH the URL and return the text asynchronously."""
async with self.requests.apatch(url, data, **kwargs) as response:
return await response.text()

async def aput(self, url: str, data: Dict[str, Any], **kwargs: Any) -> str:
"""PUT the URL and return the text asynchronously."""
async with self.requests.aput(url, data, **kwargs) as response:
return await response.text()

async def adelete(self, url: str, **kwargs: Any) -> str:
"""DELETE the URL and return the text asynchronously."""
async with self.requests.adelete(url, **kwargs) as response:
return await response.text()


# For backwards compatibility
RequestsWrapper = TextRequestsWrapper

0 comments on commit 884f996

Please sign in to comment.