-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathdatabase.py
More file actions
284 lines (241 loc) · 9.31 KB
/
database.py
File metadata and controls
284 lines (241 loc) · 9.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
from typing import Any, Dict, List, Optional, Union
from ._service import SupabaseService
from ._service import SupabaseService
class SupabaseDatabaseService(SupabaseService):
"""
Service for interacting with Supabase Database (PostgreSQL) API.
This class provides methods for database operations using Supabase's
RESTful API for PostgreSQL.
"""
def fetch_data(self,
table: str,
auth_token: Optional[str] = None,
select: str = "*",
filters: Optional[Dict[str, Any]] = None,
order: Optional[str] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
is_admin: bool = False) -> List[Dict[str, Any]]:
"""
Fetch data from a table with optional filtering, ordering, and pagination.
Args:
table: Table name
auth_token: Optional JWT token for authenticated requests
select: Columns to select (default: "*")
filters: Optional filters as dictionary
order: Optional order by clause
limit: Optional limit of rows to return
offset: Optional offset for pagination
Returns:
List of rows as dictionaries
"""
endpoint = f"/rest/v1/{table}"
params = {"select": select}
# Add filters if provided
if filters:
for key, value in filters.items():
# Format filter with eq operator for Supabase REST API
params[f"{key}"] = f"eq.{value}"
# Add ordering if provided
if order:
params["order"] = order
# Add pagination if provided
if limit is not None:
params["limit"] = limit
if offset is not None:
params["offset"] = offset
return self._make_request(
method="GET",
endpoint=endpoint,
auth_token=auth_token,
is_admin=is_admin,
params=params,
headers={"Prefer": "return=representation"}
)
def insert_data(self,
table: str,
data: Union[Dict[str, Any], List[Dict[str, Any]]],
auth_token: Optional[str] = None,
upsert: bool = False,
is_admin: bool = False) -> List[Dict[str, Any]]:
"""
Insert data into a table.
Args:
table: Table name
data: Data to insert (single record or list of records)
auth_token: Optional JWT token for authenticated requests
upsert: Whether to upsert (update on conflict)
is_admin: Whether to use service role key (bypasses RLS)
Returns:
Inserted data
"""
endpoint = f"/rest/v1/{table}"
headers = {"Prefer": "return=representation"}
if upsert:
headers["Prefer"] = "resolution=merge-duplicates,return=representation"
return self._make_request(
method="POST",
endpoint=endpoint,
auth_token=auth_token,
is_admin=is_admin,
data=data,
headers=headers
)
def update_data(self,
table: str,
data: Dict[str, Any],
filters: Dict[str, Any],
auth_token: Optional[str] = None,
is_admin: bool = False) -> List[Dict[str, Any]]:
"""
Update data in a table.
Args:
table: Table name
data: Data to update
filters: Filters to identify rows to update
auth_token: Optional JWT token for authenticated requests
Returns:
Updated data
"""
endpoint = f"/rest/v1/{table}"
params = {}
# Format filters with eq operator for Supabase REST API
if filters:
for key, value in filters.items():
params[f"{key}"] = f"eq.{value}"
return self._make_request(
method="PATCH",
endpoint=endpoint,
auth_token=auth_token,
is_admin=is_admin, # Pass the is_admin parameter to use service role key
data=data,
params=params,
headers={"Prefer": "return=representation"}
)
def upsert_data(self,
table: str,
data: Union[Dict[str, Any], List[Dict[str, Any]]],
auth_token: Optional[str] = None,
is_admin: bool = False) -> List[Dict[str, Any]]:
"""
Upsert data in a table (insert or update).
Args:
table: Table name
data: Data to upsert
auth_token: Optional JWT token for authenticated requests
Returns:
Upserted data
"""
return self.insert_data(table, data, auth_token, upsert=True, is_admin=is_admin)
def delete_data(self,
table: str,
filters: Dict[str, Any],
auth_token: Optional[str] = None,
is_admin: bool = False) -> List[Dict[str, Any]]:
"""
Delete data from a table.
Args:
table: Table name
filters: Filters to identify rows to delete
auth_token: Optional JWT token for authenticated requests
Returns:
Deleted data
"""
endpoint = f"/rest/v1/{table}"
params = {}
# Format filters with eq operator for Supabase REST API
if filters:
for key, value in filters.items():
params[f"{key}"] = f"eq.{value}"
return self._make_request(
method="DELETE",
endpoint=endpoint,
auth_token=auth_token,
is_admin=is_admin, # Pass the is_admin parameter to use service role key
params=params,
headers={"Prefer": "return=representation"}
)
def call_function(self,
function_name: str,
params: Optional[Dict[str, Any]] = None,
auth_token: Optional[str] = None) -> Any:
"""
Call a PostgreSQL function.
Args:
function_name: Function name
params: Function parameters
auth_token: Optional JWT token for authenticated requests
Returns:
Function result
"""
endpoint = f"/rest/v1/rpc/{function_name}"
return self._make_request(
method="POST",
endpoint=endpoint,
auth_token=auth_token,
data=params or {}
)
def create_test_table(self,
table: str,
auth_token: Optional[str] = None,
is_admin: bool = True) -> Dict[str, Any]:
"""
Create a simple test table for integration tests.
Args:
table: Table name to create
auth_token: Optional JWT token for authenticated requests
is_admin: Whether to use service role key (admin access)
Returns:
Response from the API
"""
# SQL to create a simple test table
sql = f"""
CREATE TABLE IF NOT EXISTS {table} (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
user_id TEXT
);
-- Set up RLS policies
ALTER TABLE {table} ENABLE ROW LEVEL SECURITY;
-- Create policy to allow all operations for authenticated users
DROP POLICY IF EXISTS "Allow all operations for authenticated users" ON {table};
CREATE POLICY "Allow all operations for authenticated users"
ON {table}
FOR ALL
TO authenticated
USING (true)
WITH CHECK (true);
"""
# Execute the SQL using the rpc endpoint
return self._make_request(
method="POST",
endpoint="/rest/v1/rpc/exec_sql",
auth_token=auth_token,
is_admin=is_admin, # Must use admin privileges to create tables
data={"query": sql}
)
def delete_table(self,
table: str,
auth_token: Optional[str] = None,
is_admin: bool = True) -> Dict[str, Any]:
"""
Delete a table from the database.
Args:
table: Table name to delete
auth_token: Optional JWT token for authenticated requests
is_admin: Whether to use service role key (admin access)
Returns:
Response from the API
"""
# SQL to drop the table
sql = f"DROP TABLE IF EXISTS {table};"
# Execute the SQL using the rpc endpoint
return self._make_request(
method="POST",
endpoint="/rest/v1/rpc/exec_sql",
auth_token=auth_token,
is_admin=is_admin, # Must use admin privileges to delete tables
data={"query": sql}
)