Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add agentUniverse-data flow. #95

Merged
merged 27 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
50659d8
feat: add data agent in the agentUniverse.
EdwinInAu Jun 14, 2024
b7dbe10
feat: add data agent in the agentUniverse.
EdwinInAu Jun 17, 2024
9012d18
feat: add data agent in the agentUniverse.
EdwinInAu Jun 18, 2024
36ba3e8
feat: add data agent in the agentUniverse.
EdwinInAu Jun 18, 2024
1b19975
feat: add data agent in the agentUniverse.
EdwinInAu Jun 18, 2024
73f6685
feat: add data agent in the agentUniverse.
EdwinInAu Jun 19, 2024
3d37dfd
feat: add data agent in the agentUniverse.
EdwinInAu Jun 19, 2024
ec55462
feat: add data agent in the agentUniverse.
EdwinInAu Jun 20, 2024
7ae0bd8
docs: update readme & guidebook
LandJerry Jun 14, 2024
da761da
feat: Release version 0.0.9
LandJerry Jun 14, 2024
53fee05
feat: Release version 0.0.9
LandJerry Jun 14, 2024
188695a
Add: peer case docs
AniviaTn Jun 18, 2024
bab76de
Add: peer case docs
AniviaTn Jun 18, 2024
b604777
docs: update README
LandJerry Jun 18, 2024
751c85e
docs: update readme
LandJerry Jun 14, 2024
c1e0ff7
docs: update README for new version
LandJerry Jun 17, 2024
57dd5f0
docs: update readme for new version.
LandJerry Jun 17, 2024
ede51e2
docs: update Discord code
LandJerry Jun 17, 2024
8c4fd61
add tool doc
weizjajj Jun 17, 2024
cebfbf0
add en doc
weizjajj Jun 17, 2024
ba905ac
暂存
weizjajj Jun 17, 2024
0fecf96
add knowlege
weizjajj Jun 17, 2024
6707da4
fix output
weizjajj Jun 17, 2024
3105870
remove unused import
weizjajj Jun 17, 2024
83ba37d
add deepseek support
weizjajj Jun 19, 2024
377f19c
Fix: Pydantic warning in chroma store
AniviaTn Jun 20, 2024
0af9a6f
Merge remote-tracking branch 'origin/dev' into dev_chongshi_data_agent
LandJerry Jun 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions agentuniverse_dataflow/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# !/usr/bin/env python3
# -*- coding:utf-8 -*-

# @Time : 2024/6/11 20:46
# @Author : wangchongshi
# @Email : [email protected]
# @FileName: __init__.py.py
7 changes: 7 additions & 0 deletions agentuniverse_dataflow/flow/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# !/usr/bin/env python3
# -*- coding:utf-8 -*-

# @Time : 2024/6/14 16:58
# @Author : wangchongshi
# @Email : [email protected]
# @FileName: __init__.py.py
90 changes: 90 additions & 0 deletions agentuniverse_dataflow/flow/dataflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# !/usr/bin/env python3
# -*- coding:utf-8 -*-

# @Time : 2024/6/14 17:05
# @Author : wangchongshi
# @Email : [email protected]
# @FileName: dataflow.py
import importlib
import sys
import traceback
from typing import List, Optional

from pydantic import BaseModel

from agentuniverse.base.util.logging.logging_util import LOGGER
from agentuniverse.base.config.configer import Configer
from agentuniverse_dataflow.node.base.node_base import NodeBase


class Dataflow(BaseModel):
"""The Dataflow class, which is used to define the class of data flow."""

_flow_name: str = None
_flow_desc: str = None
_node_sequence_list: List[NodeBase] = None
_llm_name: str = None
_configer: Optional[Configer] = None

def __init__(self, conf_path: str = None):
super().__init__()
if conf_path:
self._configer = Configer(conf_path)

def _flow_preprocess(self) -> None:
self._configer.load()
self._flow_name = self._configer.get('name')
self._flow_desc = self._configer.get('description')

nodes = self._configer.get('nodes')
if not nodes:
LOGGER.error('no nodes in yaml conf')
return

self._node_sequence_list = []
for i in range(0, len(nodes)):
node_obj = nodes[i]
module_str = node_obj.get('module')
class_str = node_obj.get('class')
param_in_jsonl_str = node_obj.get('param_in_jsonl')
param_out_json_str = node_obj.get('param_out_jsonl')
datasets_in_jsonl_list = node_obj.get('datasets_in_jsonl')
dataset_out_jsonl_str = node_obj.get('dataset_out_jsonl')
node_param_json = node_obj.get('node_param')

node_llm = node_obj.get('llm')
node_prompt_version = node_obj.get('prompt_version')

module = importlib.import_module(module_str)
clz = getattr(module, class_str)
node: NodeBase = clz()
node.set_param_in_jsonl(param_in_jsonl_str)
node.set_param_out_jsonl(param_out_json_str)
node.set_datasets_in_jsonl(datasets_in_jsonl_list)
node.set_dataset_out_jsonl(dataset_out_jsonl_str)
node.set_node_param_json(node_param_json)
node.set_llm(node_llm)
node.set_prompt_version(node_prompt_version)

self._node_sequence_list.append(node)

return

def _flow_process(self) -> None:
if self._node_sequence_list:
for i in range(0, len(self._node_sequence_list)):
self._node_sequence_list[i].execute()

def _flow_postprocess(self) -> None:
pass

def execute(self) -> None:
try:
self._flow_preprocess()
self._flow_process()
self._flow_postprocess()
except Exception as e:
# logging later
LOGGER.warn(e)
LOGGER.warn(traceback.format_exc())
LOGGER.warn(traceback.extract_tb(sys.exc_info()))
7 changes: 7 additions & 0 deletions agentuniverse_dataflow/node/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# !/usr/bin/env python3
# -*- coding:utf-8 -*-

# @Time : 2024/6/14 17:06
# @Author : wangchongshi
# @Email : [email protected]
# @FileName: __init__.py.py
7 changes: 7 additions & 0 deletions agentuniverse_dataflow/node/base/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# !/usr/bin/env python3
# -*- coding:utf-8 -*-

# @Time : 2024/6/14 17:06
# @Author : wangchongshi
# @Email : [email protected]
# @FileName: __init__.py.py
17 changes: 17 additions & 0 deletions agentuniverse_dataflow/node/base/data_node_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# !/usr/bin/env python3
# -*- coding:utf-8 -*-

# @Time : 2024/6/14 17:06
# @Author : wangchongshi
# @Email : [email protected]
# @FileName: data_node.py
from agentuniverse_dataflow.node.base.node_base import NodeBase


class DataNodeBase(NodeBase):
"""The DataBase class, which is used to define the base class of data node."""
_batch_line_size: int = 10
_batch_prompt_size: int = 10

def _node_process(self) -> None:
pass
19 changes: 19 additions & 0 deletions agentuniverse_dataflow/node/base/eval_node_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# !/usr/bin/env python3
# -*- coding:utf-8 -*-

# @Time : 2024/6/14 17:31
# @Author : wangchongshi
# @Email : [email protected]
# @FileName: eval_node_base.py
from agentuniverse_dataflow.node.enum.enum import NodeEnum
from agentuniverse_dataflow.node.base.node_base import NodeBase


class EvalNodeBase(NodeBase):
"""The EvalNodeBase class, which is used to define the base class of eval node."""

def __init__(self):
super().__init__(node_type=NodeEnum.EVAL)

def _node_process(self) -> None:
pass
35 changes: 35 additions & 0 deletions agentuniverse_dataflow/node/base/model_node_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# !/usr/bin/env python3
# -*- coding:utf-8 -*-

# @Time : 2024/6/14 17:32
# @Author : wangchongshi
# @Email : [email protected]
# @FileName: model_node_base.py
import json

from agentuniverse_dataflow.node.enum.enum import NodeEnum
from agentuniverse_dataflow.node.base.node_base import NodeBase


class ModelNodeBase(NodeBase):
"""The ModelNodeBase class, which is used to define the base class of model node."""

# a json line in jsonl for msg between nodes
_param_in_json_obj: json = None
_platform_mode_is_pro: bool = False

def __init__(self):
super().__init__(node_type=NodeEnum.MODEL)

def _node_preprocess(self) -> None:
super()._node_preprocess()
if self._param_in_handler:
self._param_in_json_obj = self._param_in_handler.read_json_obj()

def _node_process(self) -> None:
pass

def _node_postprocess(self) -> None:
super()._node_postprocess()
if self._param_in_json_obj and self._param_out_handler:
self._param_out_handler.write_json_obj(self._param_in_json_obj)
96 changes: 96 additions & 0 deletions agentuniverse_dataflow/node/base/node_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# !/usr/bin/env python3
# -*- coding:utf-8 -*-

# @Time : 2024/6/14 17:06
# @Author : wangchongshi
# @Email : [email protected]
# @FileName: node_base.py
from abc import abstractmethod
import json
from typing import Any, List, Dict, Optional

from pydantic import BaseModel

from agentuniverse_dataflow.node.enum.enum import NodeEnum
from agentuniverse_dataflow.util.fileio.node_msg_jsonl import JsonFileWriter, JsonFileReader


class NodeBase(BaseModel):
"""The NodeBase class, which is used to define the base class of node."""

node_type: NodeEnum = None
_is_flow_start_node: bool = False

# jsonl message between nodes
param_in_jsonl: str = None
datasets_in_jsonl: List[str] = None
param_out_jsonl: str = None
dataset_out_jsonl: str = None

llm: str = None
prompt_version: str = None

node_param_json: Dict[str, Any] = None

_param_in_handler: Any = None
_param_out_handler: Any = None
_dataset_in_handler: Any = None
_dataset_out_handler: Any = None

def set_flow_start_node(self) -> None:
self._is_flow_start_node = True

def set_param_in_jsonl(self, param_in_jsonl: str) -> None:
self.param_in_jsonl = param_in_jsonl

def set_param_out_jsonl(self, param_out_jsonl: str) -> None:
self.param_out_jsonl = param_out_jsonl

def set_datasets_in_jsonl(self, datasets_in_jsonl: list[str]) -> None:
self.datasets_in_jsonl = datasets_in_jsonl

def set_dataset_out_jsonl(self, dataset_out_jsonl: str) -> None:
self.dataset_out_jsonl = dataset_out_jsonl

def set_node_param_json(self, node_param_json: json) -> None:
if node_param_json:
self.node_param_json = node_param_json

def set_llm(self, llm: str) -> None:
if llm:
self.llm = llm

def set_prompt_version(self, prompt_version: str) -> None:
if prompt_version:
self.prompt_version = prompt_version

def _node_preprocess(self) -> None:
if self.param_in_jsonl:
self._param_in_handler = JsonFileReader(self.param_in_jsonl)
# init the first dataset as default
if self.datasets_in_jsonl and len(self.datasets_in_jsonl) > 0:
self._dataset_in_handler = JsonFileReader(self.datasets_in_jsonl[0])
if self.param_out_jsonl:
self._param_out_handler = JsonFileWriter(self.param_out_jsonl)
if self.dataset_out_jsonl:
self._dataset_out_handler = JsonFileWriter(self.dataset_out_jsonl)

@abstractmethod
def _node_process(self) -> None:
pass

def _node_postprocess(self) -> None:
pass

def execute(self) -> None:

self._node_preprocess()
self._node_process()
self._node_postprocess()

def _get_node_param(self, key: str, default=None) -> Optional[any]:
if self.node_param_json:
if key in self.node_param_json:
return self.node_param_json.get(key)

return default
7 changes: 7 additions & 0 deletions agentuniverse_dataflow/node/data/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# !/usr/bin/env python3
# -*- coding:utf-8 -*-

# @Time : 2024/6/16 19:18
# @Author : wangchongshi
# @Email : [email protected]
# @FileName: __init__.py.py
7 changes: 7 additions & 0 deletions agentuniverse_dataflow/node/data/answer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# !/usr/bin/env python3
# -*- coding:utf-8 -*-

# @Time : 2024/6/16 19:18
# @Author : wangchongshi
# @Email : [email protected]
# @FileName: __init__.py.py
39 changes: 39 additions & 0 deletions agentuniverse_dataflow/node/data/answer/answer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# !/usr/bin/env python3
# -*- coding:utf-8 -*-
# @Time : 2024/6/16 19:19
# @Author : wangchongshi
# @Email : [email protected]
# @FileName: answer.py
from agentuniverse_dataflow.node.data.base.prompt_answer_base import PromptAnswerBase
from agentuniverse_dataflow.util.llm.llm_call import batch_call


class AnswerNode(PromptAnswerBase):
"""The RewriteNode class, which is used to define the class of rewrite node."""

def _node_process(self) -> None:
if not self._prompt_answer_list or len(self._prompt_answer_list) == 0:
return

total_prompt_num = 0
prompts = []
prompt_list_num = len(self._prompt_answer_list)
prompt_answer_list = self._prompt_answer_list
self._prompt_answer_list = []
for i in range(0, prompt_list_num):
prompt = prompt_answer_list[i][0]
prompts.append(prompt)
total_prompt_num = total_prompt_num + 1
if total_prompt_num % self._batch_prompt_size == 0:
res = batch_call(prompts, self.llm)
for prompt, answer in zip(prompts, res):
if res is not None:
self._prompt_answer_list.append((prompt, answer))
prompts = []
else:
if total_prompt_num == prompt_list_num and len(prompts) > 0:
res = batch_call(prompts, self.llm)
for prompt, answer in zip(prompts, res):
if res is not None:
self._prompt_answer_list.append((prompt, answer))
return
Loading