-
Notifications
You must be signed in to change notification settings - Fork 458
/
openai_service.py
97 lines (76 loc) · 3.22 KB
/
openai_service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
from openai import OpenAI
import shelve
from dotenv import load_dotenv
import os
import time
import logging
load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
OPENAI_ASSISTANT_ID = os.getenv("OPENAI_ASSISTANT_ID")
client = OpenAI(api_key=OPENAI_API_KEY)
def upload_file(path):
# Upload a file with an "assistants" purpose
file = client.files.create(
file=open("../../data/airbnb-faq.pdf", "rb"), purpose="assistants"
)
def create_assistant(file):
"""
You currently cannot set the temperature for Assistant via the API.
"""
assistant = client.beta.assistants.create(
name="WhatsApp AirBnb Assistant",
instructions="You're a helpful WhatsApp assistant that can assist guests that are staying in our Paris AirBnb. Use your knowledge base to best respond to customer queries. If you don't know the answer, say simply that you cannot help with question and advice to contact the host directly. Be friendly and funny.",
tools=[{"type": "retrieval"}],
model="gpt-4-1106-preview",
file_ids=[file.id],
)
return assistant
# Use context manager to ensure the shelf file is closed properly
def check_if_thread_exists(wa_id):
with shelve.open("threads_db") as threads_shelf:
return threads_shelf.get(wa_id, None)
def store_thread(wa_id, thread_id):
with shelve.open("threads_db", writeback=True) as threads_shelf:
threads_shelf[wa_id] = thread_id
def run_assistant(thread, name):
# Retrieve the Assistant
assistant = client.beta.assistants.retrieve(OPENAI_ASSISTANT_ID)
# Run the assistant
run = client.beta.threads.runs.create(
thread_id=thread.id,
assistant_id=assistant.id,
# instructions=f"You are having a conversation with {name}",
)
# Wait for completion
# https://platform.openai.com/docs/assistants/how-it-works/runs-and-run-steps#:~:text=under%20failed_at.-,Polling%20for%20updates,-In%20order%20to
while run.status != "completed":
# Be nice to the API
time.sleep(0.5)
run = client.beta.threads.runs.retrieve(thread_id=thread.id, run_id=run.id)
# Retrieve the Messages
messages = client.beta.threads.messages.list(thread_id=thread.id)
new_message = messages.data[0].content[0].text.value
logging.info(f"Generated message: {new_message}")
return new_message
def generate_response(message_body, wa_id, name):
# Check if there is already a thread_id for the wa_id
thread_id = check_if_thread_exists(wa_id)
# If a thread doesn't exist, create one and store it
if thread_id is None:
logging.info(f"Creating new thread for {name} with wa_id {wa_id}")
thread = client.beta.threads.create()
store_thread(wa_id, thread.id)
thread_id = thread.id
# Otherwise, retrieve the existing thread
else:
logging.info(f"Retrieving existing thread for {name} with wa_id {wa_id}")
thread = client.beta.threads.retrieve(thread_id)
# Add message to thread
message = client.beta.threads.messages.create(
thread_id=thread_id,
role="user",
content=message_body,
)
# Run the assistant and get the new message
new_message = run_assistant(thread, name)
return new_message