forked from openai/openai-cookbook
-
Notifications
You must be signed in to change notification settings - Fork 0
/
answers_functionality_example.py
304 lines (256 loc) · 9.31 KB
/
answers_functionality_example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
from transformers import GPT2TokenizerFast
import openai
tokenizer = GPT2TokenizerFast.from_pretrained("gpt2")
MAX_TOKENS_LIMIT = 2048
ANSWERS_INSTRUCTION = "Please answer the question according to the above context.\n"
CONTEXT_TEMPLATE = "===\nContext: {context}\n===\n"
def extract_instruction(instruction):
"""
Extract `instruction` parameter and format it properly.
If not exist, return empty string.
"""
if instruction is None:
return ""
return f"{instruction.strip()}\n\n"
def semantic_search(
search_model, query_for_search, file_id=None, max_documents=None, examples=None
):
"""
:param examples: A list of {"text":...} or {"text": ..., "label": ...}.
:return:
a list of semantic search result dict of documents sorted by "score":
[
{
"document": ...,
"object": "search_result",
"score": ...,
"text": ...,
},
...
]
"""
assert (examples is None) ^ (file_id is None) # xor
if file_id is not None:
# This is where you'd do an elastic search call. Since there isn't an example of this
# we can query, we'll raise an error.
# The return value from this would be a list of examples
raise NotImplementedError()
# This isn't quite accurate since Search is also being deprecated. See our search guide for more
# information.
search_result = openai.Search.create(
model=search_model,
documents=[x["text"] for x in examples],
query=query_for_search,
)
info_dict = {d["document"]: d for d in search_result["data"]}
sorted_doc_ids = sorted(
info_dict.keys(), key=lambda x: info_dict[x]["score"], reverse=True
)
if max_documents:
sorted_doc_ids = sorted_doc_ids[:max_documents]
return [info_dict[i] for i in sorted_doc_ids]
def select_by_length(
sorted_doc_infos,
max_token_len,
lambda_fn=None,
):
"""
Give a list of (document ID, document content in string), we will select as many
documents as possible as long as the total length does not go above `max_token_len`.
:param sorted_doc_infos: A list of semantic search result dict of documents sorted by "score".
:param max_token_len: The maximum token length for selected documents.
:param lambda_fn: A function that takes in search results dict and output a formatted
example for context stuffing.
:return: A tuple of (
A concatenation of selected documents used as context,
A list of selected document IDs
)
"""
if not sorted_doc_infos:
return "", []
selected_indices = []
total_doc_tokens = 0
doc_dict = {}
for i, doc_info in enumerate(sorted_doc_infos):
doc = lambda_fn(doc_info) if lambda_fn else doc_info["text"]
n_doc_tokens = len(tokenizer.encode(doc))
if total_doc_tokens + n_doc_tokens < max_token_len:
total_doc_tokens += n_doc_tokens
selected_indices.append(i)
doc_dict[i] = doc
# The top ranked documents should go at the end.
selected_indices = selected_indices[::-1]
context = "".join([doc_dict[i] for i in selected_indices])
selected_doc_infos = [sorted_doc_infos[i] for i in selected_indices]
return context, selected_doc_infos
def answers(
examples,
question,
model,
examples_context,
file_id=None,
documents=None,
logit_bias=None,
max_rerank=200,
max_tokens=16,
alternative_question=None,
search_model="ada",
temperature=0.0,
logprobs=0,
stop=None,
n=1,
):
"""
Given a prompt, a question, a list of (question, answer) pairs as examples, and
a list of documents for context, it tries to include all the QA examples and top
relevant context documents.
The constructed prompt for the final completion call:
```
Please answer the question according to the above context.
===
Context: {{ the context for example QA pairs. }}
===
Q: example 1 question
A: example 1 answer
---
Q: example 2 question
A: example 2 answer
===
Context: {{ a list of relevant documents sorted via search(question, documents) }}
===
Q: question
A:
```
The returned object has a structure like:
{
"answers": [
"Beijing",
"Beijing, China"
],
"completion_id": "xxx-xxx",
"object": "answer",
"selected_documents": [
{
"document": ..., # document index, same as in search/ results.
"object": "search_result",
"text": ...,
},
...
],
}
"""
examples = examples if examples else []
example_prompts = [f"Q: {x}\nA: {y}" for x, y in examples]
prompt = f"Q: {question}\nA:"
# Append all the QA examples into the prompt.
if examples_context:
examples_context = CONTEXT_TEMPLATE.format(context=examples_context)
instruction = (
ANSWERS_INSTRUCTION + examples_context + "\n---\n".join(example_prompts) + "\n"
)
logit_bias = logit_bias if logit_bias is not None else {}
if file_id is None and documents is None:
raise Exception("Please submit at least one of `documents` or `file`.")
if file_id is not None and documents is not None:
raise Exception("Please submit only one of `documents` or `file`.")
instruction = extract_instruction(instruction)
n_instruction_tokens = len(tokenizer.encode(instruction))
n_prompt_tokens = len(tokenizer.encode(prompt))
n_query_tokens = len(tokenizer.encode(question))
n_context_tokens = len(tokenizer.encode(CONTEXT_TEMPLATE.format(context="")))
if documents is not None:
documents = [doc.strip() + " " for doc in documents]
n_docs_tokens = [len(tokenizer.encode(doc)) for doc in documents]
# Except all the required content, how many tokens left for context stuffing.
leftover_token_len = MAX_TOKENS_LIMIT - (
n_instruction_tokens + n_context_tokens + n_prompt_tokens + max_tokens
)
sorted_doc_infos = []
question_for_search = (
alternative_question if alternative_question is not None else question
)
if file_id is not None:
search_model_, sorted_doc_infos = semantic_search(
search_model,
question_for_search,
file_id=file_id,
max_documents=max_rerank,
)
elif len(documents) == 0:
# If no context document is provided, do nothing.
pass
elif min(n_docs_tokens) >= leftover_token_len:
# If there is no room for adding any context doc.
pass
elif (max_rerank is None or max_rerank >= len(documents)) and sum(
n_docs_tokens
) < leftover_token_len:
# If the total length of docs is short enough to be added all.
selected_indices = list(range(len(documents)))
sorted_doc_infos = [
{"document": i, "text": documents[i]} for i in selected_indices
]
elif n_query_tokens + max(n_docs_tokens) >= MAX_TOKENS_LIMIT:
# If the prompt and the longest document together go above the limit.
total_tokens = n_query_tokens + max(n_docs_tokens)
raise Exception(
f"The longest document and prompt pair together contains {total_tokens} "
f"tokens, above the limit {MAX_TOKENS_LIMIT} for semantic search. Please consider "
f"shortening the prompt or the longest document."
)
else:
# If we can add some context documents but not all of them, we should
# query search endpoint to rank docs by score.
sorted_doc_infos = semantic_search(
search_model,
question_for_search,
examples=[{"text": doc} for doc in documents],
max_documents=max_rerank,
)
# Select documents w.r.t. the context length limitation.
context, sorted_doc_infos = select_by_length(
sorted_doc_infos,
leftover_token_len,
lambda_fn=lambda x: x["text"].strip() + " ",
)
# Add instruction before the context and the prompt after the context.
if context:
context = CONTEXT_TEMPLATE.format(context=context.strip())
full_prompt = instruction + context + prompt
completion_result = openai.Completion.create(
engine=model,
prompt=full_prompt,
logit_bias=logit_bias,
temperature=temperature,
n=n,
max_tokens=max_tokens,
stop=stop,
logprobs=logprobs,
)
completion_result["selected_documents"] = sorted_doc_infos
result = dict(
object="answer",
selected_documents=completion_result.pop("selected_documents"),
completion=completion_result["id"],
)
result["answers"] = [
item["text"].replace("A:", "").split("Q:")[0].strip()
for item in completion_result["choices"]
]
return result
print(
answers(
examples=[
["What is the capital of Washington", "Olympia"],
["What is the capital of Oregon", "Salem"],
],
question="What is the capital of China?",
examples_context="I am a bot that names country capitals",
documents=["I am a bot that names country capitals"],
model="davinci",
search_model="ada",
alternative_question="different test",
max_tokens=16,
stop=["\n\n"],
)
)