-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdb.py
More file actions
215 lines (171 loc) · 6.9 KB
/
db.py
File metadata and controls
215 lines (171 loc) · 6.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
import os
from typing import Optional, List, Dict, Any
from neo4j import GraphDatabase, Driver
from neo4j.exceptions import Neo4jError
_driver: Optional[Driver] = None
def get_driver() -> Driver:
global _driver
if _driver is None:
uri = os.getenv("NEO4J_CONNECTION_URI")
if not uri:
raise RuntimeError("NEO4J_CONNECTION_URI not set")
user = os.getenv("NEO4J_USERNAME")
password = os.getenv("NEO4J_PASSWORD")
if user and password:
_driver = GraphDatabase.driver(uri, auth=(user, password))
else:
raise RuntimeError("NEO4J_USERNAME and NEO4J_PASSWORD not set")
return _driver
class QuerySyntaxError(Exception):
"""Raised when a Cypher syntax error occurs """
pass
def _run_query(cypher: str, **params):
"""
Execute a Cypher query and convert Neo4j syntax errors to QuerySyntaxError.
Returns a list of neo4j.Record objects. Records are materialized while the
session is open to avoid accessing results after the session closes.
"""
driver = get_driver()
try:
with driver.session() as session:
result = session.run(cypher, **params)
return list(result)
except Neo4jError as e:
_error_handling(cypher, e)
def _run_query_single(cypher: str, **params):
"""Like _run_query but returns a single record or None."""
driver = get_driver()
try:
with driver.session() as session:
result = session.run(cypher, **params)
return result.single()
except Neo4jError as e:
_error_handling(cypher, e)
def _error_handling(cypher: str, e: Neo4jError):
code = getattr(e, "code", "") or ""
msg = str(e)
if "SyntaxError" in code or "syntax" in msg.lower():
raise QuerySyntaxError(f"Cypher syntax error: {msg}\nQuery:\n{cypher}") from e
raise
# ==== Workshop TODOs: Fill in the Cypher queries below ====
def create_user(username: str, password_hash: str) -> Dict[str, Any]:
"""
TODO: Create a new user node if it doesn't exist. The app expects a user object called user.
Suggested shape:
- Label: User
- Properties:
- username: $username
- passwordHash: $password_hash
- createdAt: datetime()
"""
cypher = """
// TODO: Cypher query to create a User node
"""
record = _run_query_single(cypher, username=username, password_hash=password_hash)
return record["user"] if record else {}
def fetch_user_by_username(username: str) -> Optional[Dict[str, Any]]:
"""
TODO: Fetch a user node by username; the app expects a user object called user. The app expects a user object called user.
"""
cypher = """
// TODO: Cypher query to fetch a User node by username
"""
record = _run_query_single(cypher, username=username)
return record["user"] if record else None
def create_chat_and_first_message(username: str, message: str) -> Dict[str, Any]:
"""
TODO: Create a new Chat node linked to the User and store the first user message. The app expects a chat object called chat.
Suggested shape:
- Label: Chat
- Properties:
- id: randomUUID()
- createdAt: datetime()
- updatedAt: datetime()
- Relationship: (user)-[:STARTED]->(chat)
"""
cypher = """
// TODO: Cypher query to create chat
"""
chat_record = _run_query_single(cypher, username=username, message=message)
create_message(chat_record["chat"]["id"], role="user", content=message, previous_ai_chat_id=None)
return chat_record["chat"] if chat_record else {}
def create_message(chat_id: str, role: str, content: str, previous_ai_chat_id: Optional[str]) -> Dict[str, Any]:
"""
TODO: Create a Message node and link it to an existing Chat. The app expects a message object called message.
Suggested shape:
- Label: Message
- Properties:
- id: randomUUID()
- role: $role
- content: $message
- createdAt: datetime()
- previousAIChatId: $previous_ai_chat_id // Note: this comes from the AI and is used for OpenAI to keep track of the conversation
- Relationship: (chat)-[:HAS_MESSAGE]->(message)
"""
cypher = """
// TODO: Cypher query to create a message for a chat
"""
record = _run_query_single(cypher, chat_id=chat_id, role=role, content=content,
previous_ai_chat_id=previous_ai_chat_id)
return record["message"] if record else {}
def list_user_chats(username: str) -> List[Dict[str, Any]]:
"""
TODO: Return all chats for the logged-in user. The app expects a stream of chat objects called chat.
Suggested shape:
- Label: Chat
- Properties:
- id
- createdAt
- updatedAt
- Relationship: (user)-[:STARTED]->(chat)
"""
cypher = """
// TODO: cypher query to list user chats
"""
records = _run_query(cypher, username=username)
return [r["chat"] for r in records]
def fetch_chat(chat_id: str) -> Optional[Dict[str, Any]]:
"""
TODO: Fetch a full chat with all messages in a list. The app expects a chat object called chat and a list of messages called messages.
Hint: Use collect()
suggested shape:
- Label: Chat
- Properties:
- id
- createdAt
- updatedAt
- Relationship: (chat)-[:HAS_MESSAGE]->(message)
"""
cypher = """
// TODO: Cypher query to fetch a chat by id and its messages
"""
record = _run_query_single(cypher, chat_id=chat_id)
return record if record else None
def get_ai_response(chat_id: str, previous_chat_id: Optional[str], message: str) -> Optional[Dict[str, Any]]:
"""
TODO: Send the message to our GenAI plugin; using the ai.text.chat function. The app expects a Cypher Map called aiResponse.
See the docs for more: https://neo4j.com/docs/genai/plugin/current/generate-text/#chat-new
- Using the ai.text.chat function, send the message and the config to OpenAI
Recommended AI config:
{
token: $openaiToken,
model: 'gpt-5-nano'
}
"""
cypher = """
// TODO: Cypher query to send message to AI plugin
"""
record = _run_query_single(cypher, chat_id=chat_id, previous_chat_id=previous_chat_id, message=message,
openaiToken=os.getenv("OPENAI_API_KEY"))
return record["aiResponse"] if record else None
def get_previous_ai_chat_id(chat_id: str) -> Optional[str]:
"""
TODO: Using the chat id, find the most recent AI ChatID (returned by openAI), in order to continue the conversation
if there is none, then the AI chat will be the first chat in this conversation. The app expects a string called chatId.
Hint: Use an ORDER BY and LIMIT and only search for AI messages
"""
cypher = """
// TODO: Cypher query to find the most recent AI ChatID
"""
record = _run_query_single(cypher, chat_id=chat_id)
return record["chatId"] if record else None