forked from assafelovic/gpt-researcher
-
Notifications
You must be signed in to change notification settings - Fork 0
/
research_agent.py
180 lines (145 loc) · 6.79 KB
/
research_agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# Description: Research assistant class that handles the research process for a given question.
# libraries
import asyncio
import json
from actions.web_search import web_search
from actions.web_scrape import async_browse
from processing.text import \
write_to_file, \
create_message, \
create_chat_completion, \
read_txt_files, \
write_md_to_pdf
from config import Config
from agent import prompts
import os
CFG = Config()
class ResearchAgent:
def __init__(self, question, agent, websocket):
""" Initializes the research assistant with the given question.
Args: question (str): The question to research
Returns: None
"""
self.question = question
self.agent = agent
self.visited_urls = set()
self.research_summary = ""
self.directory_name = question[:100] if len(question) > 100 else question
self.dir_path = os.path.dirname(f"./outputs/{self.directory_name}/")
self.websocket = websocket
async def summarize(self, text, topic):
""" Summarizes the given text for the given topic.
Args: text (str): The text to summarize
topic (str): The topic to summarize the text for
Returns: str: The summarized text
"""
messages = [create_message(text, topic)]
await self.websocket.send_json({"type": "logs", "output": f"📝 Summarizing text for query: {text}"})
return create_chat_completion(
model=CFG.fast_llm_model,
messages=messages,
)
async def get_new_urls(self, url_set_input):
""" Gets the new urls from the given url set.
Args: url_set_input (set[str]): The url set to get the new urls from
Returns: list[str]: The new urls from the given url set
"""
new_urls = []
for url in url_set_input:
if url not in self.visited_urls:
await self.websocket.send_json({"type": "logs", "output": f"✅ Adding source url to research: {url}\n"})
self.visited_urls.add(url)
new_urls.append(url)
return new_urls
async def call_agent(self, action, stream=False, websocket=None):
messages = [{
"role": "system",
"content": prompts.generate_agent_role_prompt(self.agent),
}, {
"role": "user",
"content": action,
}]
answer = create_chat_completion(
model=CFG.smart_llm_model,
messages=messages,
stream=stream,
websocket=websocket,
)
return answer
async def create_search_queries(self):
""" Creates the search queries for the given question.
Args: None
Returns: list[str]: The search queries for the given question
"""
result = await self.call_agent(prompts.generate_search_queries_prompt(self.question))
print(result)
await self.websocket.send_json({"type": "logs", "output": f"🧠 I will conduct my research based on the following queries: {result}..."})
return json.loads(result)
async def async_search(self, query):
""" Runs the async search for the given query.
Args: query (str): The query to run the async search for
Returns: list[str]: The async search for the given query
"""
search_results = json.loads(web_search(query))
new_search_urls = self.get_new_urls([url.get("href") for url in search_results])
await self.websocket.send_json(
{"type": "logs", "output": f"🌐 Browsing the following sites for relevant information: {new_search_urls}..."})
# Create a list to hold the coroutine objects
tasks = [async_browse(url, query, self.websocket) for url in await new_search_urls]
# Gather the results as they become available
responses = await asyncio.gather(*tasks, return_exceptions=True)
return responses
async def run_search_summary(self, query):
""" Runs the search summary for the given query.
Args: query (str): The query to run the search summary for
Returns: str: The search summary for the given query
"""
await self.websocket.send_json({"type": "logs", "output": f"🔎 Running research for '{query}'..."})
responses = await self.async_search(query)
result = "\n".join(responses)
os.makedirs(os.path.dirname(f"./outputs/{self.directory_name}/research-{query}.txt"), exist_ok=True)
write_to_file(f"./outputs/{self.directory_name}/research-{query}.txt", result)
return result
async def conduct_research(self):
""" Conducts the research for the given question.
Args: None
Returns: str: The research for the given question
"""
self.research_summary = read_txt_files(self.dir_path) if os.path.isdir(self.dir_path) else ""
if not self.research_summary:
search_queries = await self.create_search_queries()
for query in search_queries:
research_result = await self.run_search_summary(query)
self.research_summary += f"{research_result}\n\n"
await self.websocket.send_json(
{"type": "logs", "output": f"Total research words: {len(self.research_summary.split(' '))}"})
return self.research_summary
async def create_concepts(self):
""" Creates the concepts for the given question.
Args: None
Returns: list[str]: The concepts for the given question
"""
result = self.call_agent(prompts.generate_concepts_prompt(self.question, self.research_summary))
await self.websocket.send_json({"type": "logs", "output": f"I will research based on the following concepts: {result}\n"})
return json.loads(result)
async def write_report(self, report_type, websocket):
""" Writes the report for the given question.
Args: None
Returns: str: The report for the given question
"""
report_type_func = prompts.get_report_by_type(report_type)
await websocket.send_json(
{"type": "logs", "output": f"✍️ Writing {report_type} for research task: {self.question}..."})
answer = await self.call_agent(report_type_func(self.question, self.research_summary), stream=True,
websocket=websocket)
path = await write_md_to_pdf(report_type, self.directory_name, await answer)
return answer, path
async def write_lessons(self):
""" Writes lessons on essential concepts of the research.
Args: None
Returns: None
"""
concepts = await self.create_concepts()
for concept in concepts:
answer = await self.call_agent(prompts.generate_lesson_prompt(concept), stream=True)
write_md_to_pdf("Lesson", self.directory_name, answer)