diff --git a/agentuniverse_dataflow/__init__.py b/agentuniverse_dataflow/__init__.py new file mode 100644 index 0000000..ebd9432 --- /dev/null +++ b/agentuniverse_dataflow/__init__.py @@ -0,0 +1,7 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/11 20:46 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: __init__.py.py diff --git a/agentuniverse_dataflow/flow/__init__.py b/agentuniverse_dataflow/flow/__init__.py new file mode 100644 index 0000000..582b80f --- /dev/null +++ b/agentuniverse_dataflow/flow/__init__.py @@ -0,0 +1,7 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/14 16:58 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: __init__.py.py diff --git a/agentuniverse_dataflow/flow/dataflow.py b/agentuniverse_dataflow/flow/dataflow.py new file mode 100644 index 0000000..2784d82 --- /dev/null +++ b/agentuniverse_dataflow/flow/dataflow.py @@ -0,0 +1,90 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/14 17:05 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: dataflow.py +import importlib +import sys +import traceback +from typing import List, Optional + +from pydantic import BaseModel + +from agentuniverse.base.util.logging.logging_util import LOGGER +from agentuniverse.base.config.configer import Configer +from agentuniverse_dataflow.node.base.node_base import NodeBase + + +class Dataflow(BaseModel): + """The Dataflow class, which is used to define the class of data flow.""" + + _flow_name: str = None + _flow_desc: str = None + _node_sequence_list: List[NodeBase] = None + _llm_name: str = None + _configer: Optional[Configer] = None + + def __init__(self, conf_path: str = None): + super().__init__() + if conf_path: + self._configer = Configer(conf_path) + + def _flow_preprocess(self) -> None: + self._configer.load() + self._flow_name = self._configer.get('name') + self._flow_desc = self._configer.get('description') + + nodes = self._configer.get('nodes') + if not nodes: + LOGGER.error('no nodes in yaml conf') + return + + self._node_sequence_list = [] + for i in range(0, len(nodes)): + node_obj = nodes[i] + module_str = node_obj.get('module') + class_str = node_obj.get('class') + param_in_jsonl_str = node_obj.get('param_in_jsonl') + param_out_json_str = node_obj.get('param_out_jsonl') + datasets_in_jsonl_list = node_obj.get('datasets_in_jsonl') + dataset_out_jsonl_str = node_obj.get('dataset_out_jsonl') + node_param_json = node_obj.get('node_param') + + node_llm = node_obj.get('llm') + node_prompt_version = node_obj.get('prompt_version') + + module = importlib.import_module(module_str) + clz = getattr(module, class_str) + node: NodeBase = clz() + node.set_param_in_jsonl(param_in_jsonl_str) + node.set_param_out_jsonl(param_out_json_str) + node.set_datasets_in_jsonl(datasets_in_jsonl_list) + node.set_dataset_out_jsonl(dataset_out_jsonl_str) + node.set_node_param_json(node_param_json) + node.set_llm(node_llm) + node.set_prompt_version(node_prompt_version) + + self._node_sequence_list.append(node) + + return + + def _flow_process(self) -> None: + if self._node_sequence_list: + for i in range(0, len(self._node_sequence_list)): + self._node_sequence_list[i].execute() + + def _flow_postprocess(self) -> None: + pass + + def execute(self) -> None: + try: + self._flow_preprocess() + self._flow_process() + self._flow_postprocess() + except Exception as e: + # logging later + LOGGER.warn(e) + LOGGER.warn(traceback.format_exc()) + LOGGER.warn(traceback.extract_tb(sys.exc_info())) diff --git a/agentuniverse_dataflow/node/__init__.py b/agentuniverse_dataflow/node/__init__.py new file mode 100644 index 0000000..6376102 --- /dev/null +++ b/agentuniverse_dataflow/node/__init__.py @@ -0,0 +1,7 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/14 17:06 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: __init__.py.py diff --git a/agentuniverse_dataflow/node/base/__init__.py b/agentuniverse_dataflow/node/base/__init__.py new file mode 100644 index 0000000..6376102 --- /dev/null +++ b/agentuniverse_dataflow/node/base/__init__.py @@ -0,0 +1,7 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/14 17:06 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: __init__.py.py diff --git a/agentuniverse_dataflow/node/base/data_node_base.py b/agentuniverse_dataflow/node/base/data_node_base.py new file mode 100644 index 0000000..43a5cb4 --- /dev/null +++ b/agentuniverse_dataflow/node/base/data_node_base.py @@ -0,0 +1,17 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/14 17:06 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: data_node.py +from agentuniverse_dataflow.node.base.node_base import NodeBase + + +class DataNodeBase(NodeBase): + """The DataBase class, which is used to define the base class of data node.""" + _batch_line_size: int = 10 + _batch_prompt_size: int = 10 + + def _node_process(self) -> None: + pass diff --git a/agentuniverse_dataflow/node/base/eval_node_base.py b/agentuniverse_dataflow/node/base/eval_node_base.py new file mode 100644 index 0000000..59e7223 --- /dev/null +++ b/agentuniverse_dataflow/node/base/eval_node_base.py @@ -0,0 +1,19 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/14 17:31 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: eval_node_base.py +from agentuniverse_dataflow.node.enum.enum import NodeEnum +from agentuniverse_dataflow.node.base.node_base import NodeBase + + +class EvalNodeBase(NodeBase): + """The EvalNodeBase class, which is used to define the base class of eval node.""" + + def __init__(self): + super().__init__(node_type=NodeEnum.EVAL) + + def _node_process(self) -> None: + pass diff --git a/agentuniverse_dataflow/node/base/model_node_base.py b/agentuniverse_dataflow/node/base/model_node_base.py new file mode 100644 index 0000000..f07e59c --- /dev/null +++ b/agentuniverse_dataflow/node/base/model_node_base.py @@ -0,0 +1,35 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/14 17:32 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: model_node_base.py +import json + +from agentuniverse_dataflow.node.enum.enum import NodeEnum +from agentuniverse_dataflow.node.base.node_base import NodeBase + + +class ModelNodeBase(NodeBase): + """The ModelNodeBase class, which is used to define the base class of model node.""" + + # a json line in jsonl for msg between nodes + _param_in_json_obj: json = None + _platform_mode_is_pro: bool = False + + def __init__(self): + super().__init__(node_type=NodeEnum.MODEL) + + def _node_preprocess(self) -> None: + super()._node_preprocess() + if self._param_in_handler: + self._param_in_json_obj = self._param_in_handler.read_json_obj() + + def _node_process(self) -> None: + pass + + def _node_postprocess(self) -> None: + super()._node_postprocess() + if self._param_in_json_obj and self._param_out_handler: + self._param_out_handler.write_json_obj(self._param_in_json_obj) diff --git a/agentuniverse_dataflow/node/base/node_base.py b/agentuniverse_dataflow/node/base/node_base.py new file mode 100644 index 0000000..0c5da8c --- /dev/null +++ b/agentuniverse_dataflow/node/base/node_base.py @@ -0,0 +1,96 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/14 17:06 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: node_base.py +from abc import abstractmethod +import json +from typing import Any, List, Dict, Optional + +from pydantic import BaseModel + +from agentuniverse_dataflow.node.enum.enum import NodeEnum +from agentuniverse_dataflow.util.fileio.node_msg_jsonl import JsonFileWriter, JsonFileReader + + +class NodeBase(BaseModel): + """The NodeBase class, which is used to define the base class of node.""" + + node_type: NodeEnum = None + _is_flow_start_node: bool = False + + # jsonl message between nodes + param_in_jsonl: str = None + datasets_in_jsonl: List[str] = None + param_out_jsonl: str = None + dataset_out_jsonl: str = None + + llm: str = None + prompt_version: str = None + + node_param_json: Dict[str, Any] = None + + _param_in_handler: Any = None + _param_out_handler: Any = None + _dataset_in_handler: Any = None + _dataset_out_handler: Any = None + + def set_flow_start_node(self) -> None: + self._is_flow_start_node = True + + def set_param_in_jsonl(self, param_in_jsonl: str) -> None: + self.param_in_jsonl = param_in_jsonl + + def set_param_out_jsonl(self, param_out_jsonl: str) -> None: + self.param_out_jsonl = param_out_jsonl + + def set_datasets_in_jsonl(self, datasets_in_jsonl: list[str]) -> None: + self.datasets_in_jsonl = datasets_in_jsonl + + def set_dataset_out_jsonl(self, dataset_out_jsonl: str) -> None: + self.dataset_out_jsonl = dataset_out_jsonl + + def set_node_param_json(self, node_param_json: json) -> None: + if node_param_json: + self.node_param_json = node_param_json + + def set_llm(self, llm: str) -> None: + if llm: + self.llm = llm + + def set_prompt_version(self, prompt_version: str) -> None: + if prompt_version: + self.prompt_version = prompt_version + + def _node_preprocess(self) -> None: + if self.param_in_jsonl: + self._param_in_handler = JsonFileReader(self.param_in_jsonl) + # init the first dataset as default + if self.datasets_in_jsonl and len(self.datasets_in_jsonl) > 0: + self._dataset_in_handler = JsonFileReader(self.datasets_in_jsonl[0]) + if self.param_out_jsonl: + self._param_out_handler = JsonFileWriter(self.param_out_jsonl) + if self.dataset_out_jsonl: + self._dataset_out_handler = JsonFileWriter(self.dataset_out_jsonl) + + @abstractmethod + def _node_process(self) -> None: + pass + + def _node_postprocess(self) -> None: + pass + + def execute(self) -> None: + + self._node_preprocess() + self._node_process() + self._node_postprocess() + + def _get_node_param(self, key: str, default=None) -> Optional[any]: + if self.node_param_json: + if key in self.node_param_json: + return self.node_param_json.get(key) + + return default diff --git a/agentuniverse_dataflow/node/data/__init__.py b/agentuniverse_dataflow/node/data/__init__.py new file mode 100644 index 0000000..fdd5faa --- /dev/null +++ b/agentuniverse_dataflow/node/data/__init__.py @@ -0,0 +1,7 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/16 19:18 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: __init__.py.py diff --git a/agentuniverse_dataflow/node/data/answer/__init__.py b/agentuniverse_dataflow/node/data/answer/__init__.py new file mode 100644 index 0000000..fdd5faa --- /dev/null +++ b/agentuniverse_dataflow/node/data/answer/__init__.py @@ -0,0 +1,7 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/16 19:18 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: __init__.py.py diff --git a/agentuniverse_dataflow/node/data/answer/answer.py b/agentuniverse_dataflow/node/data/answer/answer.py new file mode 100644 index 0000000..bda8515 --- /dev/null +++ b/agentuniverse_dataflow/node/data/answer/answer.py @@ -0,0 +1,39 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- +# @Time : 2024/6/16 19:19 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: answer.py +from agentuniverse_dataflow.node.data.base.prompt_answer_base import PromptAnswerBase +from agentuniverse_dataflow.util.llm.llm_call import batch_call + + +class AnswerNode(PromptAnswerBase): + """The RewriteNode class, which is used to define the class of rewrite node.""" + + def _node_process(self) -> None: + if not self._prompt_answer_list or len(self._prompt_answer_list) == 0: + return + + total_prompt_num = 0 + prompts = [] + prompt_list_num = len(self._prompt_answer_list) + prompt_answer_list = self._prompt_answer_list + self._prompt_answer_list = [] + for i in range(0, prompt_list_num): + prompt = prompt_answer_list[i][0] + prompts.append(prompt) + total_prompt_num = total_prompt_num + 1 + if total_prompt_num % self._batch_prompt_size == 0: + res = batch_call(prompts, self.llm) + for prompt, answer in zip(prompts, res): + if res is not None: + self._prompt_answer_list.append((prompt, answer)) + prompts = [] + else: + if total_prompt_num == prompt_list_num and len(prompts) > 0: + res = batch_call(prompts, self.llm) + for prompt, answer in zip(prompts, res): + if res is not None: + self._prompt_answer_list.append((prompt, answer)) + return diff --git a/agentuniverse_dataflow/node/data/answer/filter.py b/agentuniverse_dataflow/node/data/answer/filter.py new file mode 100644 index 0000000..41212ab --- /dev/null +++ b/agentuniverse_dataflow/node/data/answer/filter.py @@ -0,0 +1,122 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/17 14:46 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: filter.py +import json + +from langchain.output_parsers.json import parse_json_markdown + +from agentuniverse.base.util.logging.logging_util import LOGGER +from agentuniverse.prompt.prompt import Prompt +from agentuniverse.prompt.prompt_manager import PromptManager +from agentuniverse_dataflow.node.data.base.prompt_answer_base import PromptAnswerBase +from agentuniverse_dataflow.util.fileio.node_msg_jsonl import JsonFileReader, JsonFileWriter +from agentuniverse_dataflow.util.llm.llm_call import batch_call + + +class FilterNode(PromptAnswerBase): + """The FilterNode class, which is used to define the class of filter node.""" + + dimscore_threshold: int = 70 + avgscore_threshold: int = 70 + + def _node_preprocess(self) -> None: + super()._node_preprocess() + + self.dimscore_threshold = self._get_node_param('dimscore_threshold') + self.avgscore_threshold = self._get_node_param('avgscore_threshold') + + def _node_process(self) -> None: + if not self._prompt_answer_list or len(self._prompt_answer_list) == 0: + return + + self.__quality_eval() + self.__filter_items() + + def __quality_eval(self) -> None: + prompts = [] + total_lines = 0 + jfw_quality = JsonFileWriter(self.dataset_out_jsonl + '.eval') + + list_len = len(self._prompt_answer_list) + for i in range(0, list_len): + do_req = False + prompt = self._prompt_answer_list[i][0] + answer = self._prompt_answer_list[i][1] + total_lines += 1 + + if not prompt or not answer: + continue + + prompts.append(self.generate_prompt(prompt, answer)) + + if total_lines % self._batch_prompt_size == 0: + do_req = True + elif i + 1 == list_len and len(prompts) > 0: + do_req = True + + if do_req: + res = batch_call(prompts, self.llm) + + start_line_num = total_lines - len(prompts) + for res_idx in range(0, len(res)): + try: + if res[res_idx] and res[res_idx] != '': + json_obj = parse_json_markdown(str(res[res_idx])) + json_obj['line'] = start_line_num + res_idx + 1 + jfw_quality.write_json_obj(json_obj) + except json.JSONDecodeError as e: + LOGGER.warn(f'except[__quality_eval]>>>{e}:{res[res_idx]}') + prompts = [] + + def __filter_items(self) -> None: + jfr_quality = JsonFileReader(self.dataset_out_jsonl + '.eval') + jfw_pos = JsonFileWriter(self.dataset_out_jsonl + '.pos') + jfw_neg = JsonFileWriter(self.dataset_out_jsonl + '.neg') + + prompt_answer_list = self._prompt_answer_list + self._prompt_answer_list = [] + + while True: + json_obj = jfr_quality.read_json_obj() + if json_obj: + dimensions = json_obj.get('dimensions') + + score_sum = 0.0 + is_pos = True + for i in range(0, len(dimensions)): + score = float(dimensions[i].get('score')) + score_sum = score_sum + score + if score < self.dimscore_threshold: + is_pos = False + + avg_score = score_sum / len(dimensions) + if avg_score < self.avgscore_threshold: + is_pos = False + + if is_pos: + jfw_pos.write_json_obj(json_obj) + if 'line' in json_obj: + line = json_obj['line'] + if line > 0: + self._prompt_answer_list.append(prompt_answer_list[line - 1]) + else: + jfw_neg.write_json_obj(json_obj) + + else: + break + + def generate_prompt(self, prompt_str: str, answer_str: str) -> str: + if len(prompt_str) > 2000: + prompt_str = prompt_str[0:2000] + if len(answer_str) > 5000: + answer = answer_str[0:5000] + version_prompt: Prompt = PromptManager().get_instance_obj(self.prompt_version) + + prompt = version_prompt.prompt_template.replace("", prompt_str) + prompt = prompt.replace("", answer_str) + + return prompt diff --git a/agentuniverse_dataflow/node/data/base/__init__.py b/agentuniverse_dataflow/node/data/base/__init__.py new file mode 100644 index 0000000..01448f2 --- /dev/null +++ b/agentuniverse_dataflow/node/data/base/__init__.py @@ -0,0 +1,7 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/16 19:19 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: __init__.py.py diff --git a/agentuniverse_dataflow/node/data/base/prompt_answer_base.py b/agentuniverse_dataflow/node/data/base/prompt_answer_base.py new file mode 100644 index 0000000..7afc79a --- /dev/null +++ b/agentuniverse_dataflow/node/data/base/prompt_answer_base.py @@ -0,0 +1,32 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/16 19:20 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: prompt_answer_base.py +from typing import List, Tuple + +from agentuniverse_dataflow.node.enum.enum import NodeEnum +from agentuniverse_dataflow.node.base.data_node_base import DataNodeBase + + +class PromptAnswerBase(DataNodeBase): + """The PromptAnswerNodeBase class, which is used to define the base class of prompt answer node.""" + + _prompt_answer_list: List[Tuple[str, str]] = None + + def __init__(self, *args, **kwargs): + super().__init__(node_type=NodeEnum.PROMPT_ANSWER) + + def _node_preprocess(self) -> None: + super()._node_preprocess() + + if self._dataset_in_handler: + self._prompt_answer_list = self._dataset_in_handler.read_json_prompt_answer_list() + + def _node_postprocess(self) -> None: + super()._node_postprocess() + + if self._prompt_answer_list: + self._dataset_out_handler.write_json_prompt_answer_list(self._prompt_answer_list) diff --git a/agentuniverse_dataflow/node/data/base/prompt_base.py b/agentuniverse_dataflow/node/data/base/prompt_base.py new file mode 100644 index 0000000..1819a4b --- /dev/null +++ b/agentuniverse_dataflow/node/data/base/prompt_base.py @@ -0,0 +1,32 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/16 19:19 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: prompt_base.py +from typing import List + +from agentuniverse_dataflow.node.enum.enum import NodeEnum +from agentuniverse_dataflow.node.base.data_node_base import DataNodeBase + + +class PromptBase(DataNodeBase): + """The PromptNodeBase class, which is used to define the base class of prompt node.""" + + _prompt_list: List[str] = None + + def __init__(self, *args, **kwargs): + super().__init__(node_type=NodeEnum.PROMPT) + + def _node_preprocess(self) -> None: + super()._node_preprocess() + + if self._dataset_in_handler: + self._prompt_list = self._dataset_in_handler.read_json_prompt_list() + + def _node_postprocess(self) -> None: + super()._node_postprocess() + + if self._prompt_list and self._dataset_out_handler: + self._dataset_out_handler.write_json_prompt_list(self._prompt_list) diff --git a/agentuniverse_dataflow/node/data/event/__init__.py b/agentuniverse_dataflow/node/data/event/__init__.py new file mode 100644 index 0000000..e2b69ee --- /dev/null +++ b/agentuniverse_dataflow/node/data/event/__init__.py @@ -0,0 +1,7 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/16 19:21 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: __init__.py.py diff --git a/agentuniverse_dataflow/node/data/event/executor.py b/agentuniverse_dataflow/node/data/event/executor.py new file mode 100644 index 0000000..78d996d --- /dev/null +++ b/agentuniverse_dataflow/node/data/event/executor.py @@ -0,0 +1,90 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/16 19:22 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: executor.py +from typing import List, Tuple, Dict + +import pandas as pd + +from agentuniverse_dataflow.node.enum.enum import NodeEnum +from agentuniverse_dataflow.node.base.data_node_base import DataNodeBase +from agentuniverse.base.util.logging.logging_util import LOGGER +from agentuniverse_dataflow.util.fileio.node_msg_jsonl import JsonFileReader + + +class ExecutorNode(DataNodeBase): + """The ExecutorNode class, which is used to define the class of executor node.""" + + event_db: str = None + event_uri: str = None + event_sql: str = None + prompt_col: str = None + answer_col: str = None + + _data_frame: pd.DataFrame = None + _plan_dict: Dict = None + _input_data_jsonlist: List[Dict] = None + _dump_prompt_answer_list: List[Tuple[str, str]] = [] + + def __init__(self, *args, **kwargs): + super().__init__(node_type=NodeEnum.PROMPT_ANSWER) + + def _node_preprocess(self) -> None: + super()._node_preprocess() + if self._param_in_handler: + self._plan_dict = self._param_in_handler.read_json_obj() + + self.event_db = self._plan_dict.get('event_db') + self.event_uri = self._plan_dict.get('event_uri') + self.event_sql = self._plan_dict.get('event_sql') + self.prompt_col = self._plan_dict.get('prompt_col') + self.answer_col = self._plan_dict.get('answer_col') + + def _node_process(self) -> None: + if self.event_db: + if self.event_db == 'jsonl': + self._input_data_jsonlist = JsonFileReader(self.event_uri).read_json_obj_list() + self.__dump_from_plan() + + def __dump_from_plan(self): + prompt_plan_code = self._plan_dict.get('prompt_plan').get('plan_code') + answer_plan_code = self._plan_dict.get('answer_plan').get('plan_code') + prompt_input_var = self._plan_dict.get('prompt_plan').get('input_var') + answer_input_var = self._plan_dict.get('answer_plan').get('input_var') + prompt_output_var = self._plan_dict.get('prompt_plan').get('output_var') + answer_output_var = self._plan_dict.get('answer_plan').get('output_var') + + for i in range(0, len(self._input_data_jsonlist)): + prompt_related_str = self._input_data_jsonlist[i].get(self.prompt_col) + answer_related_str = self._input_data_jsonlist[i].get(self.answer_col) + + prompt_namespace = { + prompt_input_var: prompt_related_str, + prompt_output_var: '' + } + answer_namespace = { + answer_input_var: answer_related_str, + answer_output_var: '' + } + + try: + exec(prompt_plan_code, globals(), prompt_namespace) + exec(answer_plan_code, globals(), answer_namespace) + except Exception as e: + LOGGER.warn( + f'exception from exec>>>{e}:\nprompt_plan_code:{prompt_plan_code}\nanswer_plan_code:{answer_plan_code}') + break + + prompt = prompt_namespace.get(prompt_output_var) + answer = answer_namespace.get(answer_output_var) + self._dump_prompt_answer_list.append((prompt, answer)) + + if self._dataset_out_handler: + self._dataset_out_handler.write_json_prompt_answer_list(self._dump_prompt_answer_list) + self._dump_prompt_answer_list.clear() + + def _node_postprocess(self) -> None: + super()._node_postprocess() diff --git a/agentuniverse_dataflow/node/data/event/perceiver.py b/agentuniverse_dataflow/node/data/event/perceiver.py new file mode 100644 index 0000000..c9384c7 --- /dev/null +++ b/agentuniverse_dataflow/node/data/event/perceiver.py @@ -0,0 +1,60 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/16 19:25 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: perceiver.py +from typing import List, Dict + +from agentuniverse_dataflow.node.enum.enum import NodeEnum +from agentuniverse_dataflow.node.base.data_node_base import DataNodeBase +from agentuniverse_dataflow.util.fileio.node_msg_jsonl import JsonFileReader + + +class PerceiverNode(DataNodeBase): + """The PerceiverNode class, which is used to define the class of perceiver node.""" + + event_db: str = None + event_uri: str = None + event_sql: str = None + prompt_col: str = None + answer_col: str = None + + _input_data_jsonlist: List[Dict] = None + + def __init__(self, *args, **kwargs): + super().__init__(node_type=NodeEnum.PROMPT_ANSWER) + + def _node_preprocess(self) -> None: + super()._node_preprocess() + + self.event_db = self._get_node_param('event_db') + self.event_uri = self._get_node_param('event_uri') + self.event_sql = self._get_node_param('event_sql') + self.prompt_col = self._get_node_param('prompt_col') + self.answer_col = self._get_node_param('answer_col') + + def _node_process(self) -> None: + if self.event_db: + if self.event_db == 'jsonl': + self._input_data_jsonlist = JsonFileReader(self.event_uri).read_json_obj_list() + + def _node_postprocess(self) -> None: + super()._node_postprocess() + if self._input_data_jsonlist is not None and self._dataset_out_handler: + for i in range(0, len(self._input_data_jsonlist)): + prompt_related_str = self._input_data_jsonlist[i].get(self.prompt_col) + answer_related_str = self._input_data_jsonlist[i].get(self.answer_col) + + self._dataset_out_handler.write_json_prompt_answer(prompt_related_str, answer_related_str) + + if self._param_out_handler: + param_obj = { + 'event_db': self.event_db, + 'event_uri': self.event_uri, + 'event_sql': self.event_sql, + 'prompt_col': self.prompt_col, + 'answer_col': self.answer_col + } + self._param_out_handler.write_json_obj(param_obj) diff --git a/agentuniverse_dataflow/node/data/event/planner.py b/agentuniverse_dataflow/node/data/event/planner.py new file mode 100644 index 0000000..a699623 --- /dev/null +++ b/agentuniverse_dataflow/node/data/event/planner.py @@ -0,0 +1,214 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/16 19:26 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: planner.py +import json +from typing import List, Dict + +from langchain.output_parsers.json import parse_json_markdown + +from agentuniverse.base.util.logging.logging_util import LOGGER +from agentuniverse.prompt.prompt import Prompt +from agentuniverse.prompt.prompt_manager import PromptManager +from agentuniverse_dataflow.util.fileio.node_msg_jsonl import JsonFileReader +from agentuniverse_dataflow.node.enum.enum import NodeEnum +from agentuniverse_dataflow.node.base.data_node_base import DataNodeBase +from agentuniverse_dataflow.util.llm.llm_call import batch_call + + +class PlannerNode(DataNodeBase): + """The PlannerNode class, which is used to define the class of planner node.""" + + _perceived_prompt_list: List[str] = [] + _perceived_answer_list: List[str] = [] + _perceived_list_size: int = 0 + + _new_plan_dict: Dict = {} + _last_plan_dict: Dict = None + _verify_lines: int = None + + def __init__(self, *args, **kwargs): + super().__init__(node_type=NodeEnum.PROMPT_ANSWER) + + def _node_preprocess(self) -> None: + super()._node_preprocess() + self._verify_lines = int(self._get_node_param('verify_lines')) + if self._dataset_in_handler: + perceived_list = self._dataset_in_handler.read_json_obj_list() + # one for plan , at least one for verification + if not perceived_list or len(perceived_list) <= 1: + raise Exception('perceived json list does not provide at least 2 samples!') + + for i in range(0, len(perceived_list)): + json_obj = perceived_list[i] + self._perceived_prompt_list.append(json_obj['prompt']) + self._perceived_answer_list.append(json_obj['answer']) + self._perceived_list_size += 1 + + if self.param_out_jsonl: + jr = JsonFileReader(self.param_out_jsonl) + self._last_plan_dict = jr.read_json_obj() + if self._param_in_handler: + self._new_plan_dict.update(self._param_in_handler.read_json_obj()) + + self._new_plan_dict['prompt_plan'] = { + 'input_var': 'input_str', + 'output_var': 'prompt', + 'plan_code': None + } + self._new_plan_dict['answer_plan'] = { + 'input_var': 'input_str', + 'output_var': 'answer', + 'plan_code': None + } + + def _node_process(self) -> None: + # check whether last plan works or not + if self.__check_last_plan(): + self._new_plan_dict = self._last_plan_dict + return + + prompt_plan_code = self.__generate_plan_and_verify(self._new_plan_dict.get('prompt_plan'), + self._perceived_prompt_list) + answer_plan_code = self.__generate_plan_and_verify(self._new_plan_dict.get('answer_plan'), + self._perceived_answer_list) + if prompt_plan_code and answer_plan_code: + self._new_plan_dict['prompt_plan']['plan_code'] = prompt_plan_code + self._new_plan_dict['answer_plan']['plan_code'] = answer_plan_code + + def __check_last_plan(self) -> bool: + if not self._last_plan_dict: + return False + + last_plan = self._last_plan_dict.get('prompt_plan') + last_plan_code = last_plan.get('plan_code') + last_output_var = last_plan.get('output_var') + dict_obj = self.execute_from_plan(self._perceived_prompt_list, 0, 1, last_plan_code, last_output_var) + if dict_obj.get('reflection'): + return False + + last_plan = self._last_plan_dict.get('answer_plan') + last_plan_code = last_plan.get('plan_code') + last_output_var = last_plan.get('output_var') + dict_obj = self.execute_from_plan(self._perceived_answer_list, 0, 1, last_plan_code, last_output_var) + if dict_obj.get('reflection'): + return False + + return True + + def __generate_plan_and_verify(self, plan_list: str, data_list: list) -> str: + + output_var = plan_list.get('output_var') + version_prompt: Prompt = PromptManager().get_instance_obj(self.prompt_version) + + prompt_template = getattr(version_prompt, 'data_event_plan', '').replace('', output_var) + prompt_reflection_template = prompt_template.replace('', data_list[0]) + prompt = prompt_reflection_template.replace('', 'None') + + llm_retry = 10 + reflection_retry = 10 + last_reflection = None + while True: + if llm_retry <= 0 or reflection_retry <= 0: + return None + + LOGGER.debug(prompt) + res = batch_call([prompt], self.llm) + LOGGER.debug(res) + + try: + res_obj = parse_json_markdown(res[0].strip()) + LOGGER.debug(res_obj) + except json.decoder.JSONDecodeError as e: + reflection = 'exception: ' + str(e) + '\ncode: json.loads' + if reflection == last_reflection: + reflection = 'None' + prompt = prompt_reflection_template.replace('', reflection) + last_reflection = reflection + LOGGER.warn(e) + continue + + plan_code = res_obj["plan_code"].strip() + LOGGER.debug(f'plan_code>>>{plan_code}') + + reflection_retry -= 1 + start_idx = 1 + verify_lines = 10 if not self._verify_lines else self._verify_lines + if verify_lines > self._perceived_list_size - start_idx: + verify_lines = self._perceived_list_size - start_idx + + dict_obj = self.execute_from_plan(data_list, start_idx, verify_lines, plan_code, output_var) + reflection = dict_obj.get('reflection') + LOGGER.debug(f'reflection:{reflection}') + + if reflection: + if reflection == last_reflection: + reflection = 'None' + prompt = prompt_reflection_template.replace('', reflection) + last_reflection = reflection + else: + return plan_code + + return None + + def _node_postprocess(self) -> None: + super()._node_postprocess() + + if self._param_out_handler: + self._param_out_handler.write_json_obj(self._new_plan_dict) + + def check_data_reflection(self, data_str: str) -> str: + if not data_str or data_str == '': + return 'Output cannot be empty' + + version_prompt: Prompt = PromptManager().get_instance_obj(self.prompt_version) + + prompt = getattr(version_prompt, 'exec_result_verification', '').replace('', data_str) + LOGGER.debug(prompt) + res = batch_call([prompt], self.llm) + LOGGER.debug(res[0]) + json_obj = parse_json_markdown(res[0]) + if json_obj.get('success'): + return None + else: + return json_obj.get('reflection') + + def execute_from_plan(self, data_list: list, start_idx: int, execute_lines: int, plan_code: str, + output_var: str) -> Dict: + reflection = None + output_list = [] + for i in range(start_idx, start_idx + execute_lines): + namespace = { + 'input_str': data_list[i], + output_var: '', + } + + try: + exec(plan_code, globals(), namespace) + except Exception as e: + LOGGER.warn(f'except from exec>>>{e}') + reflection = 'exec Exception:' + str(e) + '\ncode:' + plan_code + break + + output = namespace.get(output_var) + LOGGER.debug(f'after exec >>>{output}') + + if output == '': + reflection = "The output is empty" + break + if not isinstance(output, str): + reflection = 'The type of output must be a string,the current output type is: ' + str( + type(output)) + ', the original code is as follows: ' + plan_code + break + + result_str = output_var + ':' + output + reflection = self.check_data_reflection(result_str) + if reflection: + break + else: + output_list.append(output) + + return {'reflection': reflection, 'output_list': output_list} diff --git a/agentuniverse_dataflow/node/data/prompt/__init__.py b/agentuniverse_dataflow/node/data/prompt/__init__.py new file mode 100644 index 0000000..3cddf9e --- /dev/null +++ b/agentuniverse_dataflow/node/data/prompt/__init__.py @@ -0,0 +1,7 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/17 14:54 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: __init__.py.py diff --git a/agentuniverse_dataflow/node/data/prompt/input/__init__.py b/agentuniverse_dataflow/node/data/prompt/input/__init__.py new file mode 100644 index 0000000..3cddf9e --- /dev/null +++ b/agentuniverse_dataflow/node/data/prompt/input/__init__.py @@ -0,0 +1,7 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/17 14:54 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: __init__.py.py diff --git a/agentuniverse_dataflow/node/data/prompt/input/dedupe.py b/agentuniverse_dataflow/node/data/prompt/input/dedupe.py new file mode 100644 index 0000000..bd6f4a8 --- /dev/null +++ b/agentuniverse_dataflow/node/data/prompt/input/dedupe.py @@ -0,0 +1,72 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/17 17:17 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: dedupe.py +from collections import Counter + +from agentuniverse_dataflow.node.data.base.prompt_base import PromptBase + + +class DedupeNode(PromptBase): + """The DedupeNode class, which is used to define the class of dedupe node.""" + + diversify_hamming_threshold: int = 18 + freq_top_percent: float = 1.0 + freq_least_count: int = 100 + + def _node_preprocess(self) -> None: + super()._node_preprocess() + + self.diversify_hamming_threshold = self._get_node_param('diversify_hamming_threshold') + self.freq_top_percent = self._get_node_param('freq_top_percent') + self.freq_least_count = self._get_node_param('freq_least_count') + + def _node_process(self) -> None: + if not self._prompt_list or len(self._prompt_list) == 0: + return + + try: + from simhash import Simhash + except ImportError: + raise ImportError( + "simhash is required at the DedupeNode. Please: `pip install simhash`" + ) + + # calculate the simhash value for each document. + simhashes = [(doc, Simhash(doc)) for doc in self._prompt_list] + + unique_documents = [] + replaced_documents = [] + for doc1, sh1 in simhashes: + if all(sh1.distance(sh2) > self.diversify_hamming_threshold for _, sh2 in unique_documents): + unique_documents.append((doc1, sh1)) + dis = sh1.distance(unique_documents[0][1]) + least_hamming = self.diversify_hamming_threshold + least_hamming_doc = None + for i in range(0, len(unique_documents)): + sh2 = unique_documents[i][1] + hamming_dist = sh1.distance(sh2) + if hamming_dist < least_hamming: + least_hamming = hamming_dist + least_hamming_doc = unique_documents[i][0] + replaced_documents.append(least_hamming_doc) + + freq_counter = Counter(replaced_documents) + sorted_docs = sorted(freq_counter.items(), key=lambda x: x[1], reverse=True) + + orig_len = len(sorted_docs) + freq_top_num = int(orig_len * self.freq_top_percent) + if freq_top_num < self.freq_least_count: + freq_top_num = self.freq_least_count + if self.freq_least_count > orig_len: + freq_top_num = orig_len + + freq_top_docs = sorted_docs[:freq_top_num] + + self._prompt_list = [] + for doc, freq in freq_top_docs: + if doc is not None: + self._prompt_list.append(doc) diff --git a/agentuniverse_dataflow/node/data/prompt/input/extend.py b/agentuniverse_dataflow/node/data/prompt/input/extend.py new file mode 100644 index 0000000..2af1732 --- /dev/null +++ b/agentuniverse_dataflow/node/data/prompt/input/extend.py @@ -0,0 +1,58 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/17 17:30 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: extend.py +import json +import math + +from langchain.output_parsers.json import parse_json_markdown + +from agentuniverse.prompt.prompt import Prompt +from agentuniverse.prompt.prompt_manager import PromptManager +from agentuniverse_dataflow.node.data.base.prompt_base import PromptBase +from agentuniverse.base.util.logging.logging_util import LOGGER +from agentuniverse_dataflow.util.llm.llm_call import batch_call + + +class ExtendNode(PromptBase): + """The ExtendNode class, which is used to define the class of extend node.""" + + extend_times: int = 4 + + def _node_preprocess(self) -> None: + super()._node_preprocess() + + self.extend_times = self._get_node_param('extend_times') + + def _node_process(self) -> None: + if not self._prompt_list or len(self._prompt_list) == 0: + return + version_prompt: Prompt = PromptManager().get_instance_obj(self.prompt_version) + prompt_with_extend_times = version_prompt.prompt_template.replace('', str(self.extend_times)) + + input_list = '' + input_len = len(self._prompt_list) + inputs = self._prompt_list + prompts = [] + batch_size = math.ceil(20 / self.extend_times) + for i in range(0, input_len): + input_list = input_list + inputs[i] + '\n' + if (i + 1) % batch_size == 0 or i == input_len - 1: + prompts.append(prompt_with_extend_times.replace('', input_list)) + input_list = '' + + responses = batch_call(prompts, self.llm) + + for i in range(0, len(responses)): + try: + if responses[i] != '' and responses[i] is not None: + data = parse_json_markdown(responses[i]) + if 'extend_inputs' in data: + extend_inputs = data['extend_inputs'] + self._prompt_list.extend(extend_inputs) + except Exception as e: + LOGGER.warn(f'except[]>>>{e}:{responses[i]}') + continue diff --git a/agentuniverse_dataflow/node/data/prompt/input/rewrite.py b/agentuniverse_dataflow/node/data/prompt/input/rewrite.py new file mode 100644 index 0000000..c087f94 --- /dev/null +++ b/agentuniverse_dataflow/node/data/prompt/input/rewrite.py @@ -0,0 +1,44 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/17 17:34 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: rewrite.py +import json + +from langchain.output_parsers.json import parse_json_markdown + +from agentuniverse.prompt.prompt import Prompt +from agentuniverse.prompt.prompt_manager import PromptManager +from agentuniverse_dataflow.node.data.base.prompt_base import PromptBase +from agentuniverse_dataflow.util.llm.llm_call import batch_call + + +class RewriteNode(PromptBase): + """The RewriteNode class, which is used to define the class of rewrite node.""" + + def _node_process(self) -> None: + if not self._prompt_list or len(self._prompt_list) == 0: + return + + prompts = [] + inputs = self._prompt_list + inputs_all = '' + version_prompt: Prompt = PromptManager().get_instance_obj(self.prompt_version) + for i in range(0, len(inputs)): + inputs_all = inputs_all + inputs[i] + '\n' + if (i + 1) % self._batch_prompt_size == 0 or i == len(inputs) - 1: + prompts.append(version_prompt.prompt_template.replace('', inputs_all)) + inputs_all = '' + + responses = batch_call(prompts, self.llm) + + self._prompt_list = [] + for i in range(0, len(responses)): + try: + rewrite_input = parse_json_markdown(responses[i])['rewrite_inputs'] + self._prompt_list.extend(rewrite_input) + except json.JSONDecodeError as e: + # rewrite_inputs_all.extend(inputs) + continue diff --git a/agentuniverse_dataflow/node/data/prompt/input/seed.py b/agentuniverse_dataflow/node/data/prompt/input/seed.py new file mode 100644 index 0000000..7f02dbb --- /dev/null +++ b/agentuniverse_dataflow/node/data/prompt/input/seed.py @@ -0,0 +1,66 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/17 17:45 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: seed.py +import json + +from langchain.output_parsers.json import parse_json_markdown + +from agentuniverse.prompt.prompt import Prompt +from agentuniverse.prompt.prompt_manager import PromptManager +from agentuniverse_dataflow.node.data.base.prompt_base import PromptBase +from agentuniverse.base.util.logging.logging_util import LOGGER +from agentuniverse_dataflow.util.llm.llm_call import batch_call + + +class SeedNode(PromptBase): + """The SeedNode class, which is used to define the class of seed node.""" + + seeds_num: int = 100 + seed_gen_requirement: str = "金融领域" + + def __init__(self, **kwargs): + super().__init__(**kwargs) + if 'seeds_num' in kwargs: + self.seeds_num = kwargs['seeds_num'] + + def _node_preprocess(self) -> None: + super()._node_preprocess() + + seeds_num = self._get_node_param('seeds_num') + if seeds_num: + self.seeds_num = seeds_num + seed_gen_requirement = self._get_node_param('seed_gen_requirement') + if seed_gen_requirement: + self.seed_gen_requirement = seed_gen_requirement + + def _node_process(self) -> None: + version_prompt: Prompt = PromptManager().get_instance_obj(self.prompt_version) + prompt_with_seed_requirement = version_prompt.prompt_template.replace('', + self.seed_gen_requirement) + prompts = [] + + for i in range(0, self.seeds_num): + if (i + 1) % self._batch_line_size == 0: + prompt = prompt_with_seed_requirement.replace('', str(self._batch_line_size)) + prompts.append(prompt) + elif i == self.seeds_num - 1: + left_num = (i + 1) % self._batch_line_size + prompt = prompt_with_seed_requirement.replace('', str(left_num)) + prompts.append(prompt) + + responses = batch_call(prompts, self.llm) + + self._prompt_list = [] + for i in range(0, len(responses)): + try: + seeds = parse_json_markdown(responses[i])['seeds'] + self._prompt_list.extend(seeds) + except TypeError as e: + continue + except json.decoder.JSONDecodeError as e: + LOGGER.warn(f'except>>>{e}:{responses}') + continue diff --git a/agentuniverse_dataflow/node/data/prompt/instruct/__init__.py b/agentuniverse_dataflow/node/data/prompt/instruct/__init__.py new file mode 100644 index 0000000..3cddf9e --- /dev/null +++ b/agentuniverse_dataflow/node/data/prompt/instruct/__init__.py @@ -0,0 +1,7 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/17 14:54 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: __init__.py.py diff --git a/agentuniverse_dataflow/node/data/prompt/instruct/prompt_gen.py b/agentuniverse_dataflow/node/data/prompt/instruct/prompt_gen.py new file mode 100644 index 0000000..ef503f7 --- /dev/null +++ b/agentuniverse_dataflow/node/data/prompt/instruct/prompt_gen.py @@ -0,0 +1,52 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/17 14:57 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: prompt_gen.py +from typing import List + +from agentuniverse.prompt.prompt import Prompt +from agentuniverse.prompt.prompt_manager import PromptManager +from agentuniverse_dataflow.node.data.base.prompt_base import PromptBase +from agentuniverse_dataflow.util.fileio.node_msg_jsonl import JsonFileReader + + +class PromptGenNode(PromptBase): + """The PromptGenNode class, which is used to define the class of prompt generate node.""" + + _instruct_list: List[str] = None + _input_sample: str = None + + _instruct_num: int = None + + def _node_preprocess(self) -> None: + + super()._node_preprocess() + + if not self.datasets_in_jsonl or len(self.datasets_in_jsonl) != 2: + raise Exception(f"Node param {self.datasets_in_jsonl} should contain 2 elements:1.instruct 2.input") + + instruct_in_handler = JsonFileReader(self.datasets_in_jsonl[0]) + input_in_handler = JsonFileReader(self.datasets_in_jsonl[1]) + + if instruct_in_handler: + self._instruct_list = instruct_in_handler.read_json_prompt_list() + if input_in_handler: + self._input_sample = input_in_handler.read_json_prompt() + + self._instruct_num = self._get_node_param('instruct_num') + + def _node_process(self) -> None: + + self._prompt_list = [] + for instruct_idx in range(0, min(self._instruct_num, len(self._instruct_list))): + prompt = self.generate_prompt_from_instruct_and_input(self._instruct_list[instruct_idx], self._input_sample) + self._prompt_list.append(prompt) + + def generate_prompt_from_instruct_and_input(self, instruct='', input=''): + version_prompt: Prompt = PromptManager().get_instance_obj(self.prompt_version) + prompt = version_prompt.prompt_template.replace("", instruct) + prompt = prompt.replace("", input) + return prompt.strip() diff --git a/agentuniverse_dataflow/node/data/prompt/instruct/seed.py b/agentuniverse_dataflow/node/data/prompt/instruct/seed.py new file mode 100644 index 0000000..bb48270 --- /dev/null +++ b/agentuniverse_dataflow/node/data/prompt/instruct/seed.py @@ -0,0 +1,43 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/17 15:02 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: seed.py +from langchain.output_parsers.json import parse_json_markdown + +from agentuniverse.prompt.prompt import Prompt +from agentuniverse.prompt.prompt_manager import PromptManager +from agentuniverse_dataflow.node.data.base.prompt_base import PromptBase +from agentuniverse.base.util.logging.logging_util import LOGGER +from agentuniverse_dataflow.util.llm.llm_call import batch_call + + +class SeedNode(PromptBase): + """The SeedNode class, which is used to define the class of extend node.""" + + extend_times: int = 20 + + def _node_preprocess(self) -> None: + super()._node_preprocess() + + self.extend_times = self._get_node_param('extend_times') + + def _node_process(self) -> None: + version_prompt: Prompt = PromptManager().get_instance_obj(self.prompt_version) + prompt_with_extend_times = version_prompt.prompt_template.replace('', + str(self.extend_times)) + + prompts = [prompt_with_extend_times] + responses = batch_call(prompts, self.llm) + self._prompt_list = [] + if len(responses) == 1: + try: + if responses[0] != '' and responses[0] is not None: + data = parse_json_markdown(responses[0]) + if 'instructs' in data: + extend_instructs = data['instructs'] + self._prompt_list.extend(extend_instructs) + except Exception as e: + LOGGER.warn(f'except[]>>>{e}:{responses[0]}') diff --git a/agentuniverse_dataflow/node/enum/__init__.py b/agentuniverse_dataflow/node/enum/__init__.py new file mode 100644 index 0000000..8138e72 --- /dev/null +++ b/agentuniverse_dataflow/node/enum/__init__.py @@ -0,0 +1,7 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/14 17:07 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: __init__.py.py diff --git a/agentuniverse_dataflow/node/enum/enum.py b/agentuniverse_dataflow/node/enum/enum.py new file mode 100644 index 0000000..556a1e2 --- /dev/null +++ b/agentuniverse_dataflow/node/enum/enum.py @@ -0,0 +1,39 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/14 17:07 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: enum.py +from enum import Enum + + +class NodeEnum(Enum): + """The enumeration of the supported nodes.""" + + # DATA + PROMPT = "PROMPT" + PROMPT_ANSWER = "PROMPT_ANSWER" + + # MODEL + MODEL = "MODEL" + + # CRITIC + EVAL = "EVAL" + QUALITY = "QUALITY" + + @staticmethod + def to_value_list(): + """Return the value list of the enumeration.""" + return [item.value for item in NodeEnum] + + +class InOutEnum(Enum): + """The enumeration of the supported inputs and outputs.""" + + JSONL = "JSONL" + + @staticmethod + def to_value_list(): + """Return the value list of the enumeration.""" + return [item.value for item in InOutEnum] diff --git a/agentuniverse_dataflow/node/eval/__init__.py b/agentuniverse_dataflow/node/eval/__init__.py new file mode 100644 index 0000000..f52c075 --- /dev/null +++ b/agentuniverse_dataflow/node/eval/__init__.py @@ -0,0 +1,7 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/14 18:00 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: __init__.py.py diff --git a/agentuniverse_dataflow/node/eval/eval.py b/agentuniverse_dataflow/node/eval/eval.py new file mode 100644 index 0000000..b87cae3 --- /dev/null +++ b/agentuniverse_dataflow/node/eval/eval.py @@ -0,0 +1,100 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/14 18:07 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: eval.py +from typing import List, Tuple + +from langchain.output_parsers.json import parse_json_markdown + +from agentuniverse.prompt.prompt import Prompt +from agentuniverse.prompt.prompt_manager import PromptManager +from agentuniverse_dataflow.node.base.eval_node_base import EvalNodeBase +from agentuniverse.base.util.logging.logging_util import LOGGER +from agentuniverse_dataflow.util.constant.eval_node_dimensions import get_eval_dims +from agentuniverse_dataflow.util.llm.llm_call import batch_call + + +class EvalNode(EvalNodeBase): + """The EvalNode class, which is used to define the class of eval node.""" + + _prompt_answer_list: List[Tuple[str, str]] = None + _eval_dims_json_list: List[str] = None + _eval_lines: int = 100 + + def set_eval_lines(self, eval_lines: int) -> None: + if eval_lines > 0: + self._eval_lines = eval_lines + + def _node_preprocess(self) -> None: + super()._node_preprocess() + + self._eval_lines = self._get_node_param('eval_lines') + if self._dataset_in_handler: + self._prompt_answer_list = self._dataset_in_handler.read_json_prompt_answer_list() + + def _node_postprocess(self) -> None: + super()._node_postprocess() + + if self._dataset_out_handler and self._eval_dims_json_list: + self._dataset_out_handler.write_json_obj_list(self._eval_dims_json_list) + + def _node_process(self) -> None: + if not self._prompt_answer_list or len(self._prompt_answer_list) == 0: + return + + eval_dims = get_eval_dims() + + line_num = 0 + self._eval_dims_json_list = [] + for i in range(0, len(self._prompt_answer_list)): + prompt = self._prompt_answer_list[i][0] + answer = self._prompt_answer_list[i][1] + + if prompt is None: + break + + line_num += 1 + if line_num > self._eval_lines: + break + + if len(prompt) > 2000: + prompt = prompt[0:2000] + if len(answer) > 5000: + answer = answer[0:5000] + + version_prompt: Prompt = PromptManager().get_instance_obj(self.prompt_version) + + eval_prompt_temp = version_prompt.prompt_template.replace('', prompt) + eval_prompt_temp = eval_prompt_temp.replace('', answer) + dim_prompts = [] + + for i in range(0, len(eval_dims)): + eval_dim_name = eval_dims[i][0] + eval_dim_requirement = eval_dims[i][1] + dim_prompt = f'dimension name: {eval_dim_name} \n dimension requirement: {eval_dim_requirement}' + dim_prompts.append(dim_prompt) + + eval_prompt_temp = eval_prompt_temp.replace('', '\n'.join(dim_prompts)) + res = batch_call([eval_prompt_temp], self.llm) + + dim_score_json = {'line': line_num} + dimensions = [] + avg_score = 0.0 + + try: + if res[0] != '' and res[0] is not None: + data = parse_json_markdown(res[0]) + dimensions = data['dimensions'] + avg_score = sum(data['score'] for data in dimensions) + except Exception as e: + LOGGER.warn(f'except[eval_prompt_answer_from_jsonl]>>>{e}:{res[0]}') + continue + if len(dimensions) > 0: + avg_score = avg_score / len(dimensions) + dim_score_json['avg_score'] = avg_score + dim_score_json['dimensions'] = dimensions + + self._eval_dims_json_list.append(dim_score_json) diff --git a/agentuniverse_dataflow/node/eval/report.py b/agentuniverse_dataflow/node/eval/report.py new file mode 100644 index 0000000..e616c8d --- /dev/null +++ b/agentuniverse_dataflow/node/eval/report.py @@ -0,0 +1,67 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/14 18:02 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: report.py +from typing import List + +from agentuniverse_dataflow.node.base.eval_node_base import EvalNodeBase +from agentuniverse_dataflow.util.fileio.node_msg_jsonl import JsonFileReader + + +class ReportNode(EvalNodeBase): + """The ReportNode class, which is used to define the class of report node.""" + + _eval_report_json_list: List[str] = None + + def _node_preprocess(self) -> None: + super()._node_preprocess() + if not self.datasets_in_jsonl or len(self.datasets_in_jsonl) == 0: + raise Exception(f"No input datasets: {self.datasets_in_jsonl}") + + def _node_postprocess(self) -> None: + super()._node_postprocess() + + if self._dataset_out_handler and self._eval_report_json_list: + self._dataset_out_handler.write_json_obj_list(self._eval_report_json_list) + + def _node_process(self) -> None: + self._eval_report_json_list = [] + for i in range(0, len(self.datasets_in_jsonl)): + jfr = JsonFileReader(self.datasets_in_jsonl[i]) + line_objs = jfr.read_json_obj_list() + if line_objs is None: + break + + line_num = 0 + total_avg_score = 0.0 + dim_avg_score = {} + for j in range(0, len(line_objs)): + json_obj = line_objs[j] + line_num += 1 + total_avg_score += json_obj['avg_score'] + dimensions = json_obj['dimensions'] + + dim_num = len(dimensions) + for k in range(0, dim_num): + name = dimensions[k]['name'] + score = dimensions[k]['score'] + if name in dim_avg_score: + dim_avg_score[name] += float(score) + else: + dim_avg_score[name] = float(score) + + for key in dim_avg_score: + dim_total_score = dim_avg_score[key] + dim_avg_score[key] = dim_total_score / line_num + + total_avg_score = total_avg_score / line_num + + report_line_obj = { + 'input_file': self.datasets_in_jsonl[i], + 'total_avg_score': total_avg_score, + 'dim_avg_score': dim_avg_score + } + self._eval_report_json_list.append(report_line_obj) diff --git a/agentuniverse_dataflow/node/model/__init__.py b/agentuniverse_dataflow/node/model/__init__.py new file mode 100644 index 0000000..cdbd34b --- /dev/null +++ b/agentuniverse_dataflow/node/model/__init__.py @@ -0,0 +1,7 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/14 17:53 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: __init__.py.py diff --git a/agentuniverse_dataflow/node/model/base/__init__.py b/agentuniverse_dataflow/node/model/base/__init__.py new file mode 100644 index 0000000..cdbd34b --- /dev/null +++ b/agentuniverse_dataflow/node/model/base/__init__.py @@ -0,0 +1,7 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/14 17:53 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: __init__.py.py diff --git a/agentuniverse_dataflow/node/model/base/deploy_base.py b/agentuniverse_dataflow/node/model/base/deploy_base.py new file mode 100644 index 0000000..bc1afdf --- /dev/null +++ b/agentuniverse_dataflow/node/model/base/deploy_base.py @@ -0,0 +1,21 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/14 17:53 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: deploy_base.py +from agentuniverse_dataflow.node.base.model_node_base import ModelNodeBase + + +class DeployBase(ModelNodeBase): + """The DeployBase class, which is used to define the base class of deploy node.""" + + def _node_preprocess(self) -> None: + super()._node_preprocess() + + def _node_process(self) -> None: + pass + + def _node_postprocess(self) -> None: + super()._node_postprocess() diff --git a/agentuniverse_dataflow/node/model/base/dump_base.py b/agentuniverse_dataflow/node/model/base/dump_base.py new file mode 100644 index 0000000..34d1f00 --- /dev/null +++ b/agentuniverse_dataflow/node/model/base/dump_base.py @@ -0,0 +1,29 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/14 17:54 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: dump_base.py +from typing import List, Tuple + +from agentuniverse_dataflow.node.base.model_node_base import ModelNodeBase + + +class DumpBase(ModelNodeBase): + """The DumpBase class, which is used to define the base class of dump node.""" + + _prompt_answer_out_list: List[Tuple[str, str]] = None + + def _node_preprocess(self) -> None: + super()._node_preprocess() + + def _node_process(self) -> None: + pass + + def _node_postprocess(self) -> None: + self._param_in_json_obj = None + super()._node_postprocess() + + if self._prompt_answer_out_list and self._dataset_out_handler: + self._dataset_out_handler.write_json_prompt_answer_list(self._prompt_answer_out_list) diff --git a/agentuniverse_dataflow/node/model/base/pre_deploy_base.py b/agentuniverse_dataflow/node/model/base/pre_deploy_base.py new file mode 100644 index 0000000..b46bcaf --- /dev/null +++ b/agentuniverse_dataflow/node/model/base/pre_deploy_base.py @@ -0,0 +1,21 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/14 17:54 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: pre_deploy_base.py +from agentuniverse_dataflow.node.base.model_node_base import ModelNodeBase + + +class PreDeployBase(ModelNodeBase): + """The PreDeployBase class, which is used to define the base class of pre-deploy node.""" + + def _node_preprocess(self) -> None: + super()._node_preprocess() + + def _node_process(self) -> None: + pass + + def _node_postprocess(self) -> None: + super()._node_postprocess() diff --git a/agentuniverse_dataflow/node/model/base/train_base.py b/agentuniverse_dataflow/node/model/base/train_base.py new file mode 100644 index 0000000..5d4f09c --- /dev/null +++ b/agentuniverse_dataflow/node/model/base/train_base.py @@ -0,0 +1,23 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/14 17:54 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: train_base.py +from agentuniverse_dataflow.node.base.model_node_base import ModelNodeBase + + +class TrainBase(ModelNodeBase): + """The TrainBase class, which is used to define the base class of train node.""" + + train_out_artifact: str = None + + def _node_preprocess(self) -> None: + super()._node_preprocess() + + def _node_process(self) -> None: + pass + + def _node_postprocess(self) -> None: + super()._node_postprocess() diff --git a/agentuniverse_dataflow/node/model/base/upload_base.py b/agentuniverse_dataflow/node/model/base/upload_base.py new file mode 100644 index 0000000..2b00d9d --- /dev/null +++ b/agentuniverse_dataflow/node/model/base/upload_base.py @@ -0,0 +1,29 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/14 17:59 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: upload_base.py +from agentuniverse_dataflow.node.base.model_node_base import ModelNodeBase + + +class UploadBase(ModelNodeBase): + """The UploadBase class, which is used to define the base class of upload node.""" + + dataset_jsonl_filename: str = None + + def __init__(self, **kwargs): + super().__init__() + self.set_flow_start_node() + + def _node_preprocess(self) -> None: + super()._node_preprocess() + if self.datasets_in_jsonl and len(self.datasets_in_jsonl) > 0: + self.dataset_jsonl_filename = self.datasets_in_jsonl[0] + '.jsonl' + + def _node_process(self) -> None: + pass + + def _node_postprocess(self) -> None: + super()._node_postprocess() diff --git a/agentuniverse_dataflow/prompt/__init__.py b/agentuniverse_dataflow/prompt/__init__.py new file mode 100644 index 0000000..68b2996 --- /dev/null +++ b/agentuniverse_dataflow/prompt/__init__.py @@ -0,0 +1,7 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/17 10:45 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: __init__.py.py diff --git a/agentuniverse_dataflow/prompt/dataset_extend_node_cn.yaml b/agentuniverse_dataflow/prompt/dataset_extend_node_cn.yaml new file mode 100644 index 0000000..845ffd6 --- /dev/null +++ b/agentuniverse_dataflow/prompt/dataset_extend_node_cn.yaml @@ -0,0 +1,23 @@ +introduction: 你是一个极致严谨的金融专家,对于输入的字符串列表,请基于以下要求扩展,用来提交给大模型从而能获取高质量结果,请确保输出字符串只包含扩展后的内容且每一行是一条,如不能有“扩展后的问题:”之类的表述。 +instruction: | + 扩展要求: + 1.请确保扩展出来的输出字符串和输入字符串语义上完全不一样 + 2.请确保扩展出来的输出字符串和输入字符串大部分字符不相同 + 3.请确保扩展出来的输出字符串是对原输入有多样性大幅提升 + 4.请确保扩展出来的输出字符串和原输入对比是完全不同的金融话题 + 5.扩展倍数:以输入作为参考,输出尽量扩展条数为输入的倍 + + + 输入列表: + + + 请确保输出包含且只包含如下的json格式: + { + "extend_inputs": + [ + "xxx" + ] + } +metadata: + type: 'PROMPT' + version: 'dataset_extend_node.cn' diff --git a/agentuniverse_dataflow/prompt/dataset_filter_node_cn.yaml b/agentuniverse_dataflow/prompt/dataset_filter_node_cn.yaml new file mode 100644 index 0000000..bd63c65 --- /dev/null +++ b/agentuniverse_dataflow/prompt/dataset_filter_node_cn.yaml @@ -0,0 +1,82 @@ +introduction: 你是一个极致严谨的金融专家,需要帮助按照如下要求评估answer在各维度的质量打分。 +instruction: | + 各维度质量打分请分别以0~100分来评估: + 1.不符合要求的打0~20分 + 2.基本符合要求的打20~40分 + 3.比较优秀的打40~60分 + 4.特别突出的打60~80分 + 5.极致卓越的打80~100分 + + prompt: + + + answer: + + + 质量打分维度描述: + + 1.合规性: + 文本内容应遵守所有相关的法律法规、行业准则和公司政策。 + 必须遵循金融监管机构的要求,包括但不限于披露、报告和交易规则。 + 应保证文本中的任何建议或信息不违反利益冲突和道德行为准则。 + + 2.安全性: + 保护文本中包含的敏感信息,包括个人信息和财务数据,确保其不会被非授权人员访问。 + 应实施数据加密、访问控制和其他安全措施以防止数据泄露或未授权使用。 + + 3.事实性: + 文本中如果有声明、数据和引用都应是可查证的、基于可靠数据源和事实的。 + 避免这些信息是推测性的语言,确保所有信息都有确凿的证据支持。 + + 4.结构性: + 文本应具有清晰、逻辑性强的结构,包括合适的标题、子标题和段落。 + 信息的呈现应有助于理解,遵循从总体到细节的组织方式。 + + 5.合理性: + 呈现的分析或论点应基于合理的假设和逻辑推理。 + 结论应当合理,不偏离数据和证据所能支持的范围。 + + 6.严谨性: + 文本应遵循高标准的专业准则,精确使用术语,严谨处理数据。 + 所提供的分析和结论应深入、准确,无逻辑漏洞。 + + 7.重复性: + 文本内容在反复使用或引用时应保持一致,能够被同样的方式理解和解释。 + 术语定义和使用标准应一致,确保在不同场合和时间的解读不发生变化。 + + 8.时效性: + 文本内容应反映最新的信息和数据,及时更新以匹配当前的金融市场和环境。 + 如有数据和信息必须标明对应的时点,确保读者明白其相关性和有效性。 + + 9.全面性: + 文本应全面覆盖主题,考虑到所有相关方面和潜在影响。 + 应包含足够的背景信息、上下文和可能的影响分析。 + + 10.相关性: + 评估answer相对于prompt的相关性 + a.不符合准入条件:答非所问,不相关 + b.门槛值:能够知道到用户在问什么,并且所答即所问,包括: + 【有明确意图时】识别出用户的问题并对问题有回复,且回复的内容有相关性;有主意图的时候,回复的内容要与主意图有相关 + 【无明确意图时】遇到模糊意图时进行恰当的意图反问确认,并给予准确回应 + c.加分项 + 能够get到用户在问什么后:对用户的所有问题(包含情绪,只有一个问题,那就是回复了就算)都给出回应;回应的内容整体相关性都很高;回复的问题全面且易懂 + + + 请确保严格要求,不轻易给出高分,请确保输出包含且只包含如下字段且每个字段不能为空长度也不超过200的json格式: + { + "dimensions": [ + { + "name":质量打分维度, + "score":分数, + "negative substr":扣分最多的子串 + }, + { + "name":质量打分维度, + "score":分数, + "negative substr":扣分最多的子串 + } + ] + } +metadata: + type: 'PROMPT' + version: 'dataset_filter_node.cn' diff --git a/agentuniverse_dataflow/prompt/dataset_input_seed_node_cn.yaml b/agentuniverse_dataflow/prompt/dataset_input_seed_node_cn.yaml new file mode 100644 index 0000000..5ba9d53 --- /dev/null +++ b/agentuniverse_dataflow/prompt/dataset_input_seed_node_cn.yaml @@ -0,0 +1,26 @@ +introduction: 你是一个极致严谨的金融专家,请尽量确保生成条金融问题用来提交给大模型从而能获取更多高质量的问题,请确保输出字符串是中文的且只包含生成后的问题,如不能有“生成的问题:”之类的表述。 +instruction: | + 输出表达要求: + 1.同义词替换:用同义词替换原始查询中的某些词汇,以扩大搜索结果的覆盖范围。 + 2.标准化处理:将查询中的词汇标准化,比如将复数形式转换为单数形式,或者使用基本形式的词替换其它形式的词。 + 3.去噪声词:从查询中去除不重要的词汇,如介词和连接词,以突出关键信息。 + 4.查询扩展:在原始查询基础上添加额外的词汇或短语,以提高查询的详细度或覆盖面。 + 5.重排和重构:根据特定的上下文或目标,对查询中的词语进行重新排序或重构,以改善其表达的清晰度或逻辑性。 + 6.意图识别:识别用户查询的潜在意图,然后根据这一意图进行相应的改写。 + 7.表达形式:尽可能的丰富指令的表达方式和长度,既有非常简洁的表达也有略长的表达 + 8.格式处理:规范化格式,提出不必要的标点、空格、回车、乱码等 + 9.结构形式:请保持相似的角色和指令结构或者框架,使的大模型能遵从相关回答结构 + + 一句话描述生成的种子: + + + 请确保输出包含且只包含如下的json格式: + { + "seeds": + [ + "xxx" + ] + } +metadata: + type: 'PROMPT' + version: 'dataset_input_seed_node.cn' diff --git a/agentuniverse_dataflow/prompt/dataset_rewrite_node_cn.yaml b/agentuniverse_dataflow/prompt/dataset_rewrite_node_cn.yaml new file mode 100644 index 0000000..713ee07 --- /dev/null +++ b/agentuniverse_dataflow/prompt/dataset_rewrite_node_cn.yaml @@ -0,0 +1,27 @@ +introduction: 你是一个极致严谨的金融专家,对于输入的一列字符串,请基于以下要求改写,用来提交给大模型从而能获取高质量结果,请确保输出每一行字符串与输入每一行字符串一一对应且只包含改写后的内容,如不能有“改写后的问题:”之类的表述。 +instruction: | + + 改写要求: + 1.同义词替换:用同义词替换原始查询中的某些词汇,以扩大搜索结果的覆盖范围。 + 2.标准化处理:将查询中的词汇标准化,比如将复数形式转换为单数形式,或者使用基本形式的词替换其它形式的词。 + 3.去噪声词:从查询中去除不重要的词汇,如介词和连接词,以突出关键信息。 + 4.查询扩展:在原始查询基础上添加额外的词汇或短语,以提高查询的详细度或覆盖面。 + 5.重排和重构:根据特定的上下文或目标,对查询中的词语进行重新排序或重构,以改善其表达的清晰度或逻辑性。 + 6.意图识别:识别用户查询的潜在意图,然后根据这一意图进行相应的改写。 + 7.表达形式:尽可能的丰富指令的表达方式和长度,既有非常简洁的表达也有略长的表达 + 8.格式处理:规范化格式,提出不必要的标点、空格、回车、乱码等 + 9.结构形式:请保持相似的角色和指令结构或者框架,使的大模型能遵从相关回答结构 + + 输入字符串包括: + + + 请确保输出包含且只包含如下的json格式: + { + "rewrite_inputs": + [ + "xxx" + ] + } +metadata: + type: 'PROMPT' + version: 'dataset_rewrite_node.cn' diff --git a/agentuniverse_dataflow/prompt/eval_node_cn.yaml b/agentuniverse_dataflow/prompt/eval_node_cn.yaml new file mode 100644 index 0000000..7db7aeb --- /dev/null +++ b/agentuniverse_dataflow/prompt/eval_node_cn.yaml @@ -0,0 +1,38 @@ +introduction: 你是一个极致严谨的金融专家,需要帮助按照如下要求评估answer在各维度的质量打分。 +instruction: | + 打分要求,请精确到小数点后1位: + 1.不符合准入条件(0.0分) + 2.门槛值(0.0~3.0分) + 3.加分项(0.0~2.0分) + + prompt: + + + answer: + + + 质量打分维度描述: + + + + + 输出格式要求: + + 请确保严格要求,不轻易给出高分,请确保输出只包含如下的json格式字段,且每个字段不能为空长度也不超过200,不包含其他内容: + { + "dimensions": [ + { + "name":质量打分维度名称, + "score":分数, + "negative substr":扣分最多的子串 + }, + { + "name":质量打分维度, + "score":分数, + "negative substr":扣分最多的子串 + } + ] + } +metadata: + type: 'PROMPT' + version: 'eval_node.cn' diff --git a/agentuniverse_dataflow/prompt/event_planner_node_cn.yaml b/agentuniverse_dataflow/prompt/event_planner_node_cn.yaml new file mode 100644 index 0000000..0e30f99 --- /dev/null +++ b/agentuniverse_dataflow/prompt/event_planner_node_cn.yaml @@ -0,0 +1,37 @@ +data_event_plan: | + 你是一个极致严谨的金融指令工程专家,我们认为大模型输入input为prompt输出output为answer, prompt包括指令、上下文和用户输入, answer是针对prompt输入后LLM的回答 + 你需要针对输入str,规划对应的plan从而基于plan的代码输出满足要求的str + + 如下是输入: + input_str: + + + ### + 请严格遵循如下输出要求: + 1.请特别注意输入字符串是否是json编码的字符串并做相应的处理, 如果不是请不要解码避免引起不必要的麻烦 + 2.请确保代码输出的所有变量都是人可阅读的str类型, 如果不是请解析并提取到输出,同时尽量不要遗漏输入中的可阅读内容 + 3.请确保提供最简洁安全的代码片段, 请不要打印到屏幕 + 4.请参考之前的反馈: + + ### + 请确保你的回答包含且仅包含如下json格式的内容并严格遵循json规范: + { + "plan_code": 输入变量为input_str,输出变量为的python exec可执行的代码片段 + } +exec_result_verification: | + 你是一个极致严谨的数据质量专家,如下输入字符串每一行都是一条数据,你的工作是一行一行来确认是不是符合如下要求: + 1.人可以读取理解的文本 + 2.输入字段是完全解码的 + + 输入: + + + ### + 请确保你的回答包含且仅包含如下json格式的内容: + { + "success": 如果输入每行符合上面要求则为true否则为false, + "reflection": 不符合上面要求的原因和改进建议 + } +metadata: + type: 'PROMPT' + version: 'event_planner_node.cn' diff --git a/agentuniverse_dataflow/prompt/instruct_prompt_gen_cn.yaml b/agentuniverse_dataflow/prompt/instruct_prompt_gen_cn.yaml new file mode 100644 index 0000000..c2f7235 --- /dev/null +++ b/agentuniverse_dataflow/prompt/instruct_prompt_gen_cn.yaml @@ -0,0 +1,7 @@ +instruction: | + + + +metadata: + type: 'PROMPT' + version: 'instruct_prompt_gen_node.cn' diff --git a/agentuniverse_dataflow/prompt/instruct_seed_node_cn.yaml b/agentuniverse_dataflow/prompt/instruct_seed_node_cn.yaml new file mode 100644 index 0000000..b75b604 --- /dev/null +++ b/agentuniverse_dataflow/prompt/instruct_seed_node_cn.yaml @@ -0,0 +1,32 @@ +introduction: 你是一个极致严谨的金融专家。 +instruction: | + 请参考输入尽量按照扩展要求确保生成金融指令用来提交给大模型从而能获取更多高质量的问题,请确保输出字符串是中文的且只包含生成后的指令,如不能有“生成的指令:”“输出:”之类的表述。 + 同时请确保指令中没有具体的金融领域对象,只包含对输出的要求不包含输入的金融问题本身,角色的描述也可以扩展同类表述。 + + 指令输出要求: + 1.清晰表述:指令中使用清晰、明确的描述,避免模糊的词语,例如可以加上数字,具体的风格 + 2.指令位置:把指令放在prompt开头,并用三个引号或者三个井号等特殊的内容将待处理的内容和指令分开 + 3.输出格式:为了得到稳定的输出,指令中请尽量指定好输出格式 + 4.角色扮演:指令中请包含角色扮演技巧,以及针对角色扮演的专业要求 + 5.用户角色:指令中告诉模型用户是什么角色,从而得到的答案会令你感到惊艳 + 6.语义差异:请确保扩展出来的输出字符串和输入字符串语义上完全不一样 + 7.多样性:请确保扩展出来的输出字符串是对原输入有多样性大幅提升 + 8.字符串:请确保扩展出来的输出字符串和原输入对比是完全不同的金融话题 + 9.扩展倍数:以输入作为参考,输出尽量扩展条数为输入的倍 + + + 指令输入参考: + 你是一个极致严谨的金融专家,请尽量找出问题对应的金融解读框架,并基于解读框架一步一步结构化详细回答如下问题: + + 指令输出格式要求: + + 请确保输出只包含如下的json格式,不包含其他内容: + { + "instructs": + [ + "xxx" + ] + } +metadata: + type: 'PROMPT' + version: 'instruct_seed_node.cn' diff --git a/agentuniverse_dataflow/util/__init__.py b/agentuniverse_dataflow/util/__init__.py new file mode 100644 index 0000000..cc148e6 --- /dev/null +++ b/agentuniverse_dataflow/util/__init__.py @@ -0,0 +1,7 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/14 17:12 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: __init__.py.py diff --git a/agentuniverse_dataflow/util/constant/__init__.py b/agentuniverse_dataflow/util/constant/__init__.py new file mode 100644 index 0000000..34f082d --- /dev/null +++ b/agentuniverse_dataflow/util/constant/__init__.py @@ -0,0 +1,7 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/18 14:41 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: __init__.py.py diff --git a/agentuniverse_dataflow/util/constant/eval_node_dimensions.py b/agentuniverse_dataflow/util/constant/eval_node_dimensions.py new file mode 100644 index 0000000..a3b2312 --- /dev/null +++ b/agentuniverse_dataflow/util/constant/eval_node_dimensions.py @@ -0,0 +1,150 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/18 14:41 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: eval_node_dimensions.py + +_EVAL_DIM_RELATIVE_NAME = "相关性" +_EVAL_DIM_RELATIVE_REQUIREMENT = """ +1.不符合准入条件 +答非所问,不相关 + +2.门槛值 +能够知道到用户在问什么,并且所答即所问,包括: +【有明确意图时】 +识别出用户的问题并对问题有回复,且回复的内容有相关性 +有主意图的时候,回复的内容要与主意图有相关 +【无明确意图时】 +遇到模糊意图时进行恰当的意图反问确认,并给予准确回应 + +3.加分项 +能够get到用户在问什么后: +a、对用户的所有问题(包含情绪,只有一个问题,那就是回复了就算)都给出回应 +b、回应的内容整体相关性都很高 +c、回复的问题全面且易懂 +""" + +_EVAL_DIM_FACTUALITY_NAME = "事实性" +_EVAL_DIM_FACTUALITY_REQUIREMENT = """ +1.不符合准入条件 +数据错误,不符合事实 + +2.门槛值 +个别数据不准确,但不影响整体事实性 + +3.加分项 +解读内容中所有引用数据真实可信,数据来源官方可信 +""" + +_EVAL_DIM_CONSTRUCTIVE_NAME = "结构性" +_EVAL_DIM_CONSTITUTIVE_REQUIREMENT = """ +1.不符合准入条件 +总分结构: +a、总分论点只是抽取了原文中的部分内容,并没有真正的总结归纳 +b、生成模型是总分结构,但是总论点对分论点的总结性小于50% +c、生成模型是总分结构,但是分论点是为了回答query而硬答,与query实际并没有关系。 +d、总结内容论点逻辑冲突; +总结摘要结构: +a、摘要内容是抽取了部分段落,并没有真正的归纳总结 +b、摘要内容,仅仅总结成一段文本,无摘要详述 + +2.门槛值 +总分结构: +a、总分结构,有总结和归纳,分论点有层次或者结构 +b、总分结构,总论点对分论点的总结行大于50% +c、总分结构,分论点之间有逻辑关系(互补、递进),分论点与query之间有相关性 +d、总结内容论点间无冲突; +总结摘要结构: +a、摘要内容有对原文进行归纳总结,不是抽取的原文段落 +b、摘要内容论点有展开叙述 + +3.加分项 +总分结构: +a、总分结构,分论点有归类抽象有条理有逻辑 +b、总分结构,分论点为1234分层结构,有条理 + +摘要结构: +a、摘要内容论点叙述详细充分,有逻辑有条理 +b、总结表达:总结内容话术表达逻辑清晰,表达精炼,字数<= XX字 +""" + +_EVAL_DIM_RATIONALITY_NAME = "合理性" +_EVAL_DIM_RATIONALITY_REQUIREMENT = """ +1.不符合准入条件 +出现专业错误 +(如经济复苏利好股市利空债市、供不应求带来价格上涨、经济下行压力加剧通常会倒逼宏观政策加码等) + +2.门槛值 +符合业内公认的金融逻辑,前后结论不要自相矛盾,不出现明显的专业错误 + +3.加分项 +通篇解读要一以贯之、逻辑自洽,且逻辑推演是有逐层递进的(后文分析以前文结论为依据) +""" + +_EVAL_DIM_TIMELINESS_NAME = "时效性" +_EVAL_DIM_TIMELINESS_REQUIREMENT = """ +1.不符合准入条件 +数据陈旧过时,不符合现状 + +2.门槛值 +次新,符合当下现状 + +3.加分项 +引用官方发布的最新数据,包含清晰的发布日期 +""" + +_EVAL_DIM_COMPREHENSIVE_NAME = "全面性" +_EVAL_DIM_COMPREHENSIVE_REQUIREMENT = """ +3分门槛:从2~3个维度分析,分析类问题的论据较为饱满 +3分~5分看主观感受 +分子维度占分母维度的百分比进行打分: +●信息查询类:暂不涉及 +●标的分析类:暂不涉及 +●宏观分析类: +包含指标与市场预期相比(必填项) 、指标整体对经济\政策的指示意义(必填项)、经济指标同比\环比的趋势分析、指标子项细拆分析(如PMI各个分项)表征含义、对股债资产的影响。尽可能包含更多维度的分析。必填项缺失得零分 +●市场分析类: +尽可能包含以下2-3个维度以上的分析:尽可能从以下多个驱动力维度展开分析,基本面、政策面、情绪面(突发事件)、估值面、资金面,同时驱动力维度的判别要准确 +●政策解读类: +尽可能包含以下2-3个维度以上的分析:宏观政策类:包含政策出台背景(原因)、政策对基本面\流动性\情绪面(风险偏好)的影响、政策对股债资产、相关行业的影响。宏观会议类:包含与历届同类会议的措辞对比、对经济的定调、对货币、财政、房地产、金融、资本市场、防风险等重要政策的定调、整体对股债市场、相关行业的影响 +●大事解读类: +尽可能包含以下2-3个维度以上的分析:行业政策类:包含政策出台背景(原因)、政策对行业基本面\流动性\情绪面(风险偏好)的影响、政策对行业的短期\中长期多空影响。其它非经济\政策类大事对股市的影响:事件背景\原因、事件对A股基本面、政策面、情绪面、估值面、资金面的影响(不需要面面俱到,只需要对驱动力维度的归属准确) +●报告解读类: +尽可能包含以下2-3个维度以上的分析: +财报解读类:分析角度主要包括企业盈利能力、企业成长能力、企业营运水平和企业财务报表质量。企业盈利能力指标包括毛利率、净利率和ROE等。企业成长能力指标包括营业收入增速和净利润增速。企业营运能力分析包括存货周转天数和应收账款周转天数。企业财务报表质量指标包括资产负债率和利息覆盖率。以上指标为常用分析指标,主要供参考,分析解读围绕财务报表即可; +研报解读类:分析师分析一般会从业绩分析、行业分析、市场前景、技术发展、政策、行业事件、估值分析、策略建议等视角,这些视角都可以拆细,通常选取其中主要矛盾和市场核心关注点展开分析; +政府报告解读类:一般包括央行货币执行报告、财政预算报告等,这块解读没有固定框架,主流分析方法主要对政府报告内容引用、概括和提炼精神,能提到报告主要内容多个点即可。 +●策略建议类:暂不涉及 +●泛金融类:暂不涉及 +""" + +_EVAL_DIM_OVERALL_NAME = "整体性" +_EVAL_DIM_OVERALL_REQUIREMENT = """ +1.不符合准入条件 +满足以下任意一点即为0分: +a、逻辑不通顺,或存在语病,或表达冗余内容超长(整体文本长度>XX字) +b、论点&总结重复/论点&论点重复:任意两条论点存在80%以上的重复度 +c、倒装、错别字、语句不通顺、逻辑问题等 +d、存在特殊符号 + +2.门槛值 +a、语句流畅、表达可阅读性强 +b、表达直白(不反复说车轱辘话) +c、论点论据表达无重复 + +3.加分项 +a、答案对用户query有明确回应表达 +b、语言简洁精炼,逻辑清晰 +""" + + +def get_eval_dims(): + eval_dims = [(_EVAL_DIM_RELATIVE_NAME, _EVAL_DIM_RELATIVE_REQUIREMENT), + (_EVAL_DIM_FACTUALITY_NAME, _EVAL_DIM_FACTUALITY_REQUIREMENT), + (_EVAL_DIM_CONSTRUCTIVE_NAME, _EVAL_DIM_CONSTITUTIVE_REQUIREMENT), + (_EVAL_DIM_RATIONALITY_NAME, _EVAL_DIM_RATIONALITY_REQUIREMENT), + (_EVAL_DIM_TIMELINESS_NAME, _EVAL_DIM_TIMELINESS_REQUIREMENT), + (_EVAL_DIM_COMPREHENSIVE_NAME, _EVAL_DIM_COMPREHENSIVE_REQUIREMENT), + (_EVAL_DIM_OVERALL_NAME, _EVAL_DIM_OVERALL_REQUIREMENT)] + return eval_dims diff --git a/agentuniverse_dataflow/util/fileio/__init__.py b/agentuniverse_dataflow/util/fileio/__init__.py new file mode 100644 index 0000000..cc148e6 --- /dev/null +++ b/agentuniverse_dataflow/util/fileio/__init__.py @@ -0,0 +1,7 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/14 17:12 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: __init__.py.py diff --git a/agentuniverse_dataflow/util/fileio/node_msg_jsonl.py b/agentuniverse_dataflow/util/fileio/node_msg_jsonl.py new file mode 100644 index 0000000..94cbb43 --- /dev/null +++ b/agentuniverse_dataflow/util/fileio/node_msg_jsonl.py @@ -0,0 +1,150 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/14 17:14 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: node_msg_jsonl.py +import json +import os +import sys + +from agentuniverse.base.util.logging.logging_util import LOGGER + +sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) + +DATA_DIR = 'data/' +os.makedirs(DATA_DIR, exist_ok=True) + + +class JsonFileOps(object): + def __init__(self): + return + + @classmethod + def is_file_exist(cls, input_file_name, extension='jsonl'): + infile = DATA_DIR + input_file_name + '.' + extension + return os.path.exists(infile) + + @classmethod + def rm_file_if_exist(cls, input_file_name, extension='jsonl'): + infile = DATA_DIR + input_file_name + '.' + extension + if os.path.exists(infile): + os.remove(infile) + return + + +class JsonFileWriter(object): + def __init__(self, output_file_name: str, extension='jsonl'): + self.outfile_path = DATA_DIR + output_file_name + '.' + extension + # create directory if not exist + directory = os.path.dirname(self.outfile_path) + if not os.path.exists(directory): + os.makedirs(directory) + + self.outfile_handler = open(self.outfile_path, 'w', encoding='utf-8') + + def write_json_obj(self, json_obj: dict): + try: + # Confirm that it's a json string and then write + # json.loads(json_obj) + json_line = json.dumps(json_obj, ensure_ascii=False) + self.outfile_handler.write(json_line.strip() + '\n') + self.outfile_handler.flush() + except Exception as e: + LOGGER.warn(f"except[write_json_obj]>>>{e}:{json_obj}") + + return + + def write_json_obj_list(self, json_obj_list: list): + for i in range(0, len(json_obj_list)): + self.write_json_obj(json_obj_list[i]) + return + + def write_json_prompt(self, prompt: str): + json_obj = {"prompt": prompt} + self.write_json_obj(json_obj) + + def write_json_prompt_list(self, prompt_list: list): + for i in range(0, len(prompt_list)): + self.write_json_prompt(prompt_list[i]) + + def write_json_prompt_answer(self, prompt: str, answer: str): + json_obj = {"prompt": prompt, "answer": answer} + self.write_json_obj(json_obj) + + def write_json_prompt_answer_list(self, prompt_answer_list: list): + for i in range(0, len(prompt_answer_list)): + self.write_json_prompt_answer(prompt_answer_list[i][0], prompt_answer_list[i][1]) + + +class JsonFileReader(object): + def __init__(self, input_file_name: str, extension='jsonl'): + self.infile_handler = None + self.filename = None + if JsonFileOps.is_file_exist(input_file_name, extension): + self.filename = input_file_name + '.' + extension + self.infile_handler = open(DATA_DIR + self.filename, 'r', encoding='utf-8') + + def read_json_obj(self): + if not self.infile_handler: + raise Exception(f"None json file to read: {self.filename}") + return None + + json_line = self.infile_handler.readline() + if json_line: + try: + json_obj = json.loads(json_line.strip()) + return json_obj + except Exception as e: + LOGGER.warn(f"except[read_json_line]>>>{e}:{json_line}") + return json.loads('{}') + else: + return None + + def read_json_obj_list(self): + obj_list = [] + + while True: + obj = self.read_json_obj() + if obj is None: + break + obj_list.append(obj) + + return obj_list + + def read_json_prompt(self): + json_obj = self.read_json_obj() + if json_obj is not None: + return json_obj.get('prompt') + else: + return None + + def read_json_prompt_list(self): + prompts = [] + while True: + prompt = self.read_json_prompt() + if prompt is None: + break + prompts.append(prompt) + + return prompts + + def read_json_prompt_answer(self): + json_obj = self.read_json_obj() + if json_obj is not None: + prompt = json_obj.get('prompt') if 'prompt' in json_obj else None + answer = json_obj.get('answer') if 'answer' in json_obj else None + return prompt, answer + else: + return None, None + + def read_json_prompt_answer_list(self): + prompt_answer_list = [] + while True: + prompt, answer = self.read_json_prompt_answer() + if prompt is None: + break + prompt_answer_list.append((prompt, answer)) + + return prompt_answer_list diff --git a/agentuniverse_dataflow/util/llm/__init__.py b/agentuniverse_dataflow/util/llm/__init__.py new file mode 100644 index 0000000..ae852a5 --- /dev/null +++ b/agentuniverse_dataflow/util/llm/__init__.py @@ -0,0 +1,7 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/17 20:47 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: __init__.py.py diff --git a/agentuniverse_dataflow/util/llm/llm_call.py b/agentuniverse_dataflow/util/llm/llm_call.py new file mode 100644 index 0000000..7cea21f --- /dev/null +++ b/agentuniverse_dataflow/util/llm/llm_call.py @@ -0,0 +1,58 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/17 20:47 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: llm_call.py +import asyncio + +from agentuniverse.base.util.logging.logging_util import LOGGER +from agentuniverse.llm.llm import LLM +from agentuniverse.llm.llm_manager import LLMManager + + +def batch_call(prompts: list[str], llm_name: str): + return asyncio.run(async_batch_call(prompts, llm_name)) + + +async def async_batch_call(prompts: list[str], llm_name: str): + tasks = [] + + prompt_len = len(prompts) + for i in range(0, prompt_len): + llm: LLM = LLMManager().get_instance_obj(component_instance_name=llm_name, new_instance=True) + if llm is None: + raise Exception('LLM not found for agentuniverse data.') + messages = [{"role": "user", "content": prompts[i]}] + tasks.append(llm.acall(messages=messages, timeout=200)) + + task = asyncio.create_task(show_progress(len(prompts), asyncio.get_running_loop())) + outputs = await asyncio.gather(*tasks, return_exceptions=True) + task.cancel() + + results = [] + for i, output in enumerate(outputs): + if isinstance(output, Exception): + LOGGER.warn(f'>>>except[async_llm_call]:{output}') + results.append(None) + else: + results.append(output.text) + + print_progress(len(results), prompt_len) + + return results + + +def print_progress(completed: int, task_count: int): + progress = (completed / task_count) * 100 + LOGGER.info(f"\r>>>llm progress: {completed}/{task_count} = {progress:.2f}%") + + +async def show_progress(task_count, loop): + completed = 0 + while completed < task_count: + completed_tasks = [t for t in asyncio.all_tasks(loop=loop) if t.done()] + completed = len(completed_tasks) + print_progress(completed, task_count) + await asyncio.sleep(10) diff --git a/pyproject.toml b/pyproject.toml index 089d1fc..302c6b6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,6 +52,8 @@ anthropic = "^0.26.0" ollama = '^0.2.1' langchain-anthropic = '^0.1.13' numpy = '^1.26.0' +pandas = "^2.2.2" +pyarrow = "^16.1.0" [tool.poetry.extras] log_ext = ["aliyun-log-python-sdk"] diff --git a/sample_standard_app/dataflow/__init__.py b/sample_standard_app/dataflow/__init__.py new file mode 100644 index 0000000..8f9304d --- /dev/null +++ b/sample_standard_app/dataflow/__init__.py @@ -0,0 +1,7 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/18 16:32 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: __init__.py.py diff --git a/sample_standard_app/dataflow/dispatch/__init__.py b/sample_standard_app/dataflow/dispatch/__init__.py new file mode 100644 index 0000000..6d7ca69 --- /dev/null +++ b/sample_standard_app/dataflow/dispatch/__init__.py @@ -0,0 +1,7 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/18 16:35 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: __init__.py.py diff --git a/sample_standard_app/dataflow/dispatch/data/demo_jsonl.jsonl b/sample_standard_app/dataflow/dispatch/data/demo_jsonl.jsonl new file mode 100644 index 0000000..59b1da4 --- /dev/null +++ b/sample_standard_app/dataflow/dispatch/data/demo_jsonl.jsonl @@ -0,0 +1,2 @@ +{"prompt": "你是一位精通信息分析的ai助手。你的目标是判断问题对应的答案是否提供了有价值的信息,并对问题的答案做出建议和评价。需要回答的问题是: 分析下巴菲特减持比亚迪的原因", "answer": "巴菲特减持比亚迪的原因可以从以下几个方面进行分析:\\\\n\\\\n1. **估值偏高**:\\\\n - 巴菲特在公开采访中提到:“以目前的股价计算,比亚迪市值比梅赛德斯-奔驰还高,这不是一只便宜的股票。”这表明他认为比亚迪当前的估值过高,不再具备吸引力。\\\\n\\\\n2. **获利了结**:\\\\n - 巴菲特在过去7个月内多次减持比亚迪股份,可能是因为他已经从中获得了巨大的浮盈,选择在高位获利了结。这种操作符合巴菲特一贯的投资逻辑,即在高估时卖出。\\\\n\\\\n3. **规避风险**:\\\\n - 美国汇盛金融管理公司首席经济学家陈凯丰指出,巴菲特的减持行为也有规避风险的考量。随着新能源汽车市场竞争加剧,未来的不确定性增加,减持可以降低投资组合的风险。\\\\n\\\\n4. **市场竞争加剧**:\\\\n - 尽管比亚迪近年来借新能源汽车的东风高速发展,但市场竞争趋于白热化。这种竞争环境可能增加公司的经营风险,从而影响投资回报。\\\\n\\\\n5. **行业渗透率增速放缓**:\\\\n - 业内人士认为,9月电动车行业整体渗透率提升的增速已经放缓。尽管比亚迪三季报业绩优异,但未来业绩增长潜力可能受到限制,这也是巴菲特减持的一个原因。\\\\n\\\\n6. **上游原材料价格波动**:\\\\n - 比亚迪的业绩增长部分得益于上游碳酸锂价格的下滑,但这种波动性也可能带来不确定性。巴菲特可能考虑到这种不确定性,选择减持。综上所述,巴菲特减持比亚迪的原因主要包括估值偏高、获利了结、规避风险、市场竞争加剧、行业渗透率增速放缓以及上游原材料价格波动等因素。"} +{"prompt": "你是一位精通信息分析的ai助手。你的目标是判断问题对应的答案是否提供了有价值的信息,并对问题的答案做出建议和评价。需要回答的问题是: 分析下英伟达股票大涨的原因", "answer": "英伟达股票大涨的原因可以归纳为以下几个方面:\\n\\n1. **股票拆分**:英伟达宣布的股票拆分计划增加了股票的流动性和吸引力,使得更多投资者能够参与投资,这一举措直接导致股价在宣布后上涨了27%。\\n\\n2. **市场分析乐观**:投资机构KeyBanc Capital Markets将英伟达的目标股价上调至1200美元,显示出市场对英伟达未来业绩的强烈信心,这对股价构成显著正面影响。\\n\\n3. **季度业绩预期**:市场对英伟达即将公布的季度业绩持积极态度,分析师认为业绩表现将超出预期,这种正面预期促使投资者提前买入,推高股价。\\n\\n4. **行业领导地位**:英伟达作为人工智能和高性能计算领域的领导者,其技术优势和市场地位不断巩固,吸引了大量基于长期增长潜力的投资。\\n\\n5. **市场情绪**:英伟达股价创新高不仅提升了自身市值,还激发了市场对于人工智能板块乃至整个股市的热情,带动了全球股市上扬。\\n\\n6. **宏观经济因素**:虽然背景信息未详细提及,但通常股市整体表现、经济环境、政策支持等因素也会影响个股表现,英伟达股价上涨可能也受益于较为有利的宏观经济背景。\\n\\n综上所述,英伟达股价的大涨是多因素共同作用的结果,包括公司自身的战略决策、市场对其业绩的乐观预期、行业地位的强化以及良好的市场情绪等。"} \ No newline at end of file diff --git a/sample_standard_app/dataflow/dispatch/dispatch.py b/sample_standard_app/dataflow/dispatch/dispatch.py new file mode 100644 index 0000000..91443e4 --- /dev/null +++ b/sample_standard_app/dataflow/dispatch/dispatch.py @@ -0,0 +1,38 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/11 20:52 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: dispatch.py +import yaml + +from agentuniverse.base.util.logging.logging_util import LOGGER +from agentuniverse.base.agentuniverse import AgentUniverse + +from agentuniverse_dataflow.flow.dataflow import Dataflow + +AgentUniverse().start(config_path='../../config/config.toml') + + +def load_dataflows_from_yaml(conf_file): + with open(conf_file, 'r') as file: + config = yaml.safe_load(file) + dataflows = [] + yaml_list = config.get('dataflows') + for item in yaml_list: + dataflows.append(Dataflow(item)) + return dataflows + + +def dispatch(conf_file='dispatch.yaml'): + dataflows = load_dataflows_from_yaml(conf_file) + try: + for dataflow in dataflows: + dataflow.execute() + except Exception as e: + LOGGER.error(f"An error occurred: {e}") + + +if __name__ == '__main__': + dispatch('dispatch.yaml') diff --git a/sample_standard_app/dataflow/dispatch/dispatch.yaml b/sample_standard_app/dataflow/dispatch/dispatch.yaml new file mode 100644 index 0000000..49d6550 --- /dev/null +++ b/sample_standard_app/dataflow/dispatch/dispatch.yaml @@ -0,0 +1,4 @@ +name: 'main_dispatch' +description: 'dispatch with multi-dataflows which will execute one after another' +dataflows: + - ../flow/auto_event.yaml \ No newline at end of file diff --git a/sample_standard_app/dataflow/flow/__init__.py b/sample_standard_app/dataflow/flow/__init__.py new file mode 100644 index 0000000..8f9304d --- /dev/null +++ b/sample_standard_app/dataflow/flow/__init__.py @@ -0,0 +1,7 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/6/18 16:32 +# @Author : wangchongshi +# @Email : wangchongshi.wcs@antgroup.com +# @FileName: __init__.py.py diff --git a/sample_standard_app/dataflow/flow/auto_event.yaml b/sample_standard_app/dataflow/flow/auto_event.yaml new file mode 100644 index 0000000..490f92f --- /dev/null +++ b/sample_standard_app/dataflow/flow/auto_event.yaml @@ -0,0 +1,42 @@ +name: 'auto_event_flow' +description: "event collect agent via perceiver planner executor" +nodes: + - module: 'agentuniverse_dataflow.node.data.event.perceiver' + # data perceiver in event agent which could perceive tables in jsonl .etc + class: 'PerceiverNode' + param_in_jsonl: + param_out_jsonl: 'auto_event_flow.perceiver.param.out' + datasets_in_jsonl: + dataset_out_jsonl: 'auto_event_flow.perceiver.dataset.out' + node_param: + # supported event_db: jsonl + event_db: jsonl + # if jsonl, please input jsonl file prefix as event_uri and make sure jsonl file in path data/; then event_sql is useless + event_uri: demo_jsonl + # perceive limited lines + event_sql: + prompt_col: prompt + answer_col: answer + + - module: 'agentuniverse_dataflow.node.data.event.planner' + # data planner in event agent which could plan python code to extract with reflections + class: 'PlannerNode' + llm: 'qwen_llm' + prompt_version: 'event_planner_node.cn' + param_in_jsonl: 'auto_event_flow.perceiver.param.out' + param_out_jsonl: 'auto_event_flow.planner.plan.out' + datasets_in_jsonl: + - 'auto_event_flow.perceiver.dataset.out' + dataset_out_jsonl: + node_param: + verify_lines: 1 + + - module: 'agentuniverse_dataflow.node.data.event.executor' + # data executor in event agent which execute python code from planner + class: 'ExecutorNode' + param_in_jsonl: 'auto_event_flow.planner.plan.out' + param_out_jsonl: + datasets_in_jsonl: + dataset_out_jsonl: 'auto_event_flow.executor.out' + + diff --git a/sample_standard_app/dataflow/flow/dataset_build.yaml b/sample_standard_app/dataflow/flow/dataset_build.yaml new file mode 100644 index 0000000..ae1c7f3 --- /dev/null +++ b/sample_standard_app/dataflow/flow/dataset_build.yaml @@ -0,0 +1,77 @@ +name: 'dataset_build_flow' +description: "train dataset build agent flow" +nodes: + - module: 'agentuniverse_dataflow.node.data.prompt.input.seed' + # query seed from LLM + class: 'SeedNode' + llm: 'qwen_llm' + prompt_version: 'dataset_input_seed_node.cn' + param_in_jsonl: + param_out_jsonl: + datasets_in_jsonl: + dataset_out_jsonl: 'dataset_build_flow.seed.out' + node_param: + seeds_num: 5 + seed_gen_requirement: '金融领域' + + - module: 'agentuniverse_dataflow.node.data.prompt.input.rewrite' + # rewrite query from LLM + class: 'RewriteNode' + llm: 'qwen_llm' + prompt_version: 'dataset_rewrite_node.cn' + param_in_jsonl: + param_out_jsonl: + datasets_in_jsonl: + - 'dataset_build_flow.seed.out' + dataset_out_jsonl: 'dataset_build_flow.rewrite.out' + + - module: 'agentuniverse_dataflow.node.data.prompt.input.extend' + # extend query from LLM + class: 'ExtendNode' + llm: 'qwen_llm' + prompt_version: 'dataset_extend_node.cn' + param_in_jsonl: + param_out_jsonl: + datasets_in_jsonl: + - 'dataset_build_flow.rewrite.out' + dataset_out_jsonl: 'dataset_build_flow.extend.out' + node_param: + extend_times: 2 + + - module: 'agentuniverse_dataflow.node.data.prompt.input.dedupe' + # dedupe queries which improve sft result + class: 'DedupeNode' + param_in_jsonl: + param_out_jsonl: + datasets_in_jsonl: + - 'dataset_build_flow.extend.out' + dataset_out_jsonl: 'dataset_build_flow.dedupe.out' + node_param: + diversify_hamming_threshold: 18 + freq_top_percent: 1.0 + freq_least_count: 100 + + - module: 'agentuniverse_dataflow.node.data.answer.answer' + # build answers from the prompt via LLM as training data + class: 'AnswerNode' + llm: 'qwen_llm' + param_in_jsonl: + param_out_jsonl: + datasets_in_jsonl: + - 'dataset_build_flow.dedupe.out' + dataset_out_jsonl: 'dataset_build_flow.answer.out' + + - module: 'agentuniverse_dataflow.node.data.answer.filter' + # filter training data to improve data quality + class: 'FilterNode' + llm: 'qwen_llm' + prompt_version: 'dataset_filter_node.cn' + param_in_jsonl: + param_out_jsonl: + datasets_in_jsonl: + - 'dataset_build_flow.answer.out' + dataset_out_jsonl: 'dataset_build_flow.filter.out' + node_param: + dimscore_threshold: 60 + avgscore_threshold: 75 + diff --git a/sample_standard_app/dataflow/flow/eval_report.yaml b/sample_standard_app/dataflow/flow/eval_report.yaml new file mode 100644 index 0000000..8c89643 --- /dev/null +++ b/sample_standard_app/dataflow/flow/eval_report.yaml @@ -0,0 +1,27 @@ +name: 'eval_report_flow' +description: "eval for nodes and summarize report" +nodes: + - module: 'agentuniverse_dataflow.node.eval.eval' + # eval dump result from LLM infer + class: 'EvalNode' + llm: 'qwen_llm' + prompt_version: 'eval_node.cn' + param_in_jsonl: + param_out_jsonl: + datasets_in_jsonl: +# - 'train_dump_flow.aci_dump.out' + - 'dataset_build_flow.filter.out' + dataset_out_jsonl: 'eval_report_flow.eval.out' + node_param: + eval_lines: 100 + + - module: 'agentuniverse_dataflow.node.eval.report' + # compare multi-eval result as a report + class: 'ReportNode' + param_in_jsonl: + param_out_jsonl: + # multiple datasets + datasets_in_jsonl: + - 'eval_report_flow.eval.out' + dataset_out_jsonl: 'eval_report_flow.report.out' + diff --git a/sample_standard_app/dataflow/flow/instruct_select.yaml b/sample_standard_app/dataflow/flow/instruct_select.yaml new file mode 100644 index 0000000..9e30116 --- /dev/null +++ b/sample_standard_app/dataflow/flow/instruct_select.yaml @@ -0,0 +1,53 @@ +name: 'instruct_select_flow' +description: "extend instruct from sample and choose the best via pre-deploy model" +nodes: + - module: 'agentuniverse_dataflow.node.data.prompt.instruct.seed' + # produce multiple instruct seed candidates + class: 'SeedNode' + llm: 'qwen_llm' + prompt_version: 'instruct_seed_node.cn' + param_in_jsonl: + param_out_jsonl: + datasets_in_jsonl: + dataset_out_jsonl: 'instruct_select_flow.seed.out' + node_param: + extend_times: 10 + + - module: 'agentuniverse_dataflow.node.data.prompt.instruct.prompt_gen' + # produce prompt from instruct and query + class: 'PromptGenNode' + llm: 'qwen_llm' + prompt_version: 'instruct_prompt_gen_node.cn' + param_in_jsonl: + param_out_jsonl: + datasets_in_jsonl: + # instruct jsonl + - 'instruct_select_flow.seed.out' + # input jsonl + - 'dataset_build_flow.dedupe.out' + dataset_out_jsonl: 'instruct_select_flow.prompt_gen.out' + node_param: + instruct_num: 10 + +# - module: '' +# # dump answer from pre deployed LLM infer via prompt as input +# class: 'DumpNode' +# param_in_jsonl: 'train_dump_flow.aci_pre_deploy.out' +# param_out_jsonl: +# datasets_in_jsonl: +# - 'instruct_select_flow.prompt_gen.out' +# dataset_out_jsonl: 'instruct_select_flow.dump.out' + +# - module: 'agentuniverse_dataflow.node.eval.eval' +# # eval dump result through 7 dimensions +# class: 'EvalNode' +# llm: 'qwen_llm' +# prompt_version: 'eval_node.cn' +# param_in_jsonl: +# param_out_jsonl: +# datasets_in_jsonl: +# - 'instruct_select_flow.dump.out' +# dataset_out_jsonl: 'instruct_select_flow.eval.out' +# node_param: +# eval_lines: 100 + diff --git a/sample_standard_app/dataflow/flow/model_deploy.yaml b/sample_standard_app/dataflow/flow/model_deploy.yaml new file mode 100644 index 0000000..9cc3564 --- /dev/null +++ b/sample_standard_app/dataflow/flow/model_deploy.yaml @@ -0,0 +1,21 @@ +name: 'model_deploy_flow' +description: "deploy with artifact" +nodes: + - module: '' + # deploy LLM from artifact + class: 'DeployNode' + param_in_jsonl: 'train_dump_flow.train.out' + param_out_jsonl: 'train_dump_flow.deploy.out' + datasets_in_jsonl: + dataset_out_jsonl: + + - module: '' + # dump answer from deploy via prompt + class: 'DumpNode' + param_in_jsonl: 'train_dump_flow.deploy.out' + param_out_jsonl: + datasets_in_jsonl: + - 'dataset_build_flow.filter.out' + dataset_out_jsonl: 'train_dump_flow.dump.deploy.out' +train_dump: + yaml: \ No newline at end of file diff --git a/sample_standard_app/dataflow/flow/train_dump.yaml b/sample_standard_app/dataflow/flow/train_dump.yaml new file mode 100644 index 0000000..21d847f --- /dev/null +++ b/sample_standard_app/dataflow/flow/train_dump.yaml @@ -0,0 +1,42 @@ +name: 'train_dump_flow' +description: "train then dump instructions from dataset input" +nodes: + - module: '' + # upload sft dataset to training platform + class: 'UploadNode' + param_in_jsonl: + param_out_jsonl: 'train_dump_flow.upload.out' + datasets_in_jsonl: + - 'dataset_build_flow.filter.out' + dataset_out_jsonl: + node_param: + datagraph_token_name: 'dataflow token' + dataset_name: 'dataagent_dataset' + dataset_desc: 'dataagent dataset common' + + - module: '' + # train sft model via sfs dataset + class: 'TrainNode' + param_in_jsonl: 'train_dump_flow.upload.out' + param_out_jsonl: 'train_dump_flow.aci_train.out' + datasets_in_jsonl: + dataset_out_jsonl: + + - module: '' + # pre deployed LLM infer service via artifact from train stage via platform with deploy func + class: 'PreDeployNode' + param_in_jsonl: 'train_dump_flow.aci_train.out' + param_out_jsonl: 'train_dump_flow.aci_pre_deploy.out' + datasets_in_jsonl: + dataset_out_jsonl: + + - module: '' + # dump answers from LLM inference service via prompts from platform + class: 'DumpNode' + param_in_jsonl: 'train_dump_flow.aci_pre_deploy.out' + param_out_jsonl: + datasets_in_jsonl: + - 'dataset_build_flow.filter.out' + dataset_out_jsonl: 'train_dump_flow.aci_dump.out' + node_param: + dump_lines: 100 \ No newline at end of file diff --git a/sample_standard_app/pyproject.toml b/sample_standard_app/pyproject.toml index 7b763d8..eab8e02 100644 --- a/sample_standard_app/pyproject.toml +++ b/sample_standard_app/pyproject.toml @@ -11,7 +11,7 @@ packages = [ [tool.poetry.dependencies] python = "^3.10" -agentUniverse = "^0.0.5" +agentUniverse = "^0.0.9" [tool.poetry.group.dev.dependencies] pytest = "^7.2.0"