-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathrequests.go
More file actions
307 lines (252 loc) · 11.3 KB
/
requests.go
File metadata and controls
307 lines (252 loc) · 11.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
/*
Copyright 2025 The Antfly Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client
import (
"bytes"
"fmt"
"github.com/antflydb/antfly/pkg/client/oapi"
"github.com/antflydb/antfly/pkg/client/query"
"github.com/antflydb/antfly/pkg/libaf/json"
)
// BatchRequest represents a batch operation request with flexible insert types.
// Unlike the oapi.BatchRequest, this version allows Inserts to accept any type
// (including structs) which will be automatically marshaled.
type BatchRequest struct {
// Deletes List of keys to delete.
Deletes []string `json:"deletes,omitempty"`
// Inserts Map of key to document. Documents can be any type (map, struct, etc.)
// and will be automatically marshaled to JSON.
Inserts map[string]any `json:"inserts,omitempty"`
// Transforms Array of transform operations for in-place document updates using MongoDB-style operators.
// Transform operations allow you to modify documents without read-modify-write races:
// - Operations are applied atomically on the server
// - Multiple operations per document are applied in sequence
// - Supports numeric operations ($inc, $mul, $min, $max), array operations ($push, $pull), and more
Transforms []Transform `json:"transforms,omitempty"`
// SyncLevel Synchronization level for the batch operation:
// - "propose": Wait for Raft proposal acceptance (fastest, default)
// - "write": Wait for Pebble KV write
// - "full_text": Wait for full-text index WAL write (slowest, most durable)
// - "aknn": Wait for vector index write with best-effort synchronous embedding (falls back to async on timeout)
SyncLevel SyncLevel `json:"sync_level,omitempty"`
}
// BatchResult represents the result of a batch operation with detailed failure information
type BatchResult struct {
// Deleted Number of documents successfully deleted
Deleted int `json:"deleted,omitempty"`
// Inserted Number of documents successfully inserted
Inserted int `json:"inserted,omitempty"`
// Transformed Number of documents successfully transformed
Transformed int `json:"transformed,omitempty"`
// Failed List of failed operations with error details
Failed []struct {
// Error message for this failure
Error string `json:"error,omitempty"`
// Id The document ID that failed
Id string `json:"id,omitempty"`
} `json:"failed,omitempty"`
}
// QueryRequest represents a query request with strongly-typed query fields.
// This is the SDK-friendly version of oapi.QueryRequest with Query types instead of json.RawMessage.
type QueryRequest struct {
// Table name to query
Table string `json:"table,omitempty"`
// Analyses specifies analysis operations to perform
Analyses *oapi.Analyses `json:"analyses,omitempty"`
// Count whether to return only the count of matching documents
Count bool `json:"count,omitempty"`
// DistanceOver minimum distance for semantic similarity search
DistanceOver *float32 `json:"distance_over,omitempty"`
// DistanceUnder maximum distance for semantic similarity search
DistanceUnder *float32 `json:"distance_under,omitempty"`
// Embeddings raw embeddings to use for semantic searches (the keys are the indexes to use for the queries).
// Supports both dense ([]float32 via Embedding0) and sparse ({Indices, Values} via Embedding1) embeddings.
Embeddings map[string]Embedding `json:"embeddings,omitempty"`
// ExclusionQuery strongly-typed Bleve search query for exclusions
ExclusionQuery *query.Query `json:"-"`
// Aggregations to compute
Aggregations map[string]AggregationRequest `json:"aggregations,omitempty"`
// Fields list of fields to include in the results
Fields []string `json:"fields,omitempty"`
// FilterPrefix for filtering by key prefix
FilterPrefix []byte `json:"filter_prefix,omitempty"`
// FilterQuery strongly-typed Bleve search query for filtering
FilterQuery *query.Query `json:"-"`
// FullTextSearch strongly-typed Bleve search query for full-text search
FullTextSearch *query.Query `json:"-"`
// Indexes to search (required for semantic search)
Indexes []string `json:"indexes,omitempty"`
// Limit maximum number of results to return or topk for semantic_search
Limit int `json:"limit,omitempty"`
// MergeConfig for combining results from semantic_search and full_text_search
MergeConfig MergeConfig `json:"merge_config"`
// Offset number of results to skip for pagination (only available for full_text_search queries)
Offset int `json:"offset,omitempty"`
// OrderBy specifies fields to order by with direction
OrderBy []oapi.SortField `json:"order_by,omitempty"`
// SearchAfter cursor for forward pagination (pass _sort values from last hit)
SearchAfter []string `json:"search_after,omitempty"`
// SearchBefore cursor for backward pagination (pass _sort values from first hit)
SearchBefore []string `json:"search_before,omitempty"`
// Reranker configuration for reranking results
Reranker *RerankerConfig `json:"reranker,omitempty"`
// Pruner configuration for pruning search results based on score quality
Pruner Pruner `json:"pruner,omitzero"`
// SemanticSearch text to use for semantic similarity search
SemanticSearch string `json:"semantic_search,omitempty"`
// DocumentRenderer optional Go template string for rendering document content to the prompt
DocumentRenderer string `json:"document_renderer,omitempty"`
// GraphSearches declarative graph queries to execute after full-text/vector searches.
// Results can reference search results using node selectors like $full_text_results.
GraphSearches map[string]GraphQuery `json:"graph_searches,omitempty"`
// Join configuration for joining data from another table.
// Supports inner, left, and right joins with automatic strategy selection.
Join JoinClause `json:"join"`
// ForeignSources maps table names to foreign data source configurations for
// query-time federated access. When a table name referenced in a query or join
// appears in this map, the query is routed to the external data source instead
// of Antfly storage.
ForeignSources map[string]ForeignSource `json:"foreign_sources,omitempty"`
}
// MarshalJSON implements custom JSON marshalling for QueryRequest.
// It converts the strongly-typed *query.Query fields to json.RawMessage
// for compatibility with the OAPI layer.
func (q QueryRequest) MarshalJSON() ([]byte, error) {
// Convert SDK QueryRequest to oapi.QueryRequest
oapiReq := oapi.QueryRequest{
Table: q.Table,
Analyses: q.Analyses,
Count: q.Count,
DistanceOver: q.DistanceOver,
DistanceUnder: q.DistanceUnder,
Embeddings: q.Embeddings,
Aggregations: q.Aggregations,
Fields: q.Fields,
FilterPrefix: q.FilterPrefix,
Indexes: q.Indexes,
Limit: q.Limit,
MergeConfig: q.MergeConfig,
Offset: q.Offset,
OrderBy: q.OrderBy,
SearchAfter: q.SearchAfter,
SearchBefore: q.SearchBefore,
Reranker: q.Reranker,
Pruner: q.Pruner,
SemanticSearch: q.SemanticSearch,
DocumentRenderer: q.DocumentRenderer,
GraphSearches: q.GraphSearches,
Join: q.Join,
ForeignSources: q.ForeignSources,
}
// Marshal query fields to json.RawMessage
var err error
if q.FilterQuery != nil {
oapiReq.FilterQuery, err = json.Marshal(q.FilterQuery)
if err != nil {
return nil, fmt.Errorf("marshalling filter_query: %w", err)
}
}
if q.FullTextSearch != nil {
oapiReq.FullTextSearch, err = json.Marshal(q.FullTextSearch)
if err != nil {
return nil, fmt.Errorf("marshalling full_text_search: %w", err)
}
}
if q.ExclusionQuery != nil {
oapiReq.ExclusionQuery, err = json.Marshal(q.ExclusionQuery)
if err != nil {
return nil, fmt.Errorf("marshalling exclusion_query: %w", err)
}
}
return json.Marshal(oapiReq)
}
// UnmarshalJSON implements custom JSON unmarshalling for QueryRequest.
// It converts json.RawMessage fields back to strongly-typed *query.Query.
func (q *QueryRequest) UnmarshalJSON(data []byte) error {
// Unmarshal into oapi.QueryRequest
var oapiReq oapi.QueryRequest
if err := json.Unmarshal(data, &oapiReq); err != nil {
return err
}
// Copy simple fields
q.Table = oapiReq.Table
q.Analyses = oapiReq.Analyses
q.Count = oapiReq.Count
q.DistanceOver = oapiReq.DistanceOver
q.DistanceUnder = oapiReq.DistanceUnder
q.Embeddings = oapiReq.Embeddings
q.Aggregations = oapiReq.Aggregations
q.Fields = oapiReq.Fields
q.FilterPrefix = oapiReq.FilterPrefix
q.Indexes = oapiReq.Indexes
q.Limit = oapiReq.Limit
q.MergeConfig = oapiReq.MergeConfig
q.Offset = oapiReq.Offset
q.OrderBy = oapiReq.OrderBy
q.SearchAfter = oapiReq.SearchAfter
q.SearchBefore = oapiReq.SearchBefore
q.Reranker = oapiReq.Reranker
q.Pruner = oapiReq.Pruner
q.SemanticSearch = oapiReq.SemanticSearch
q.DocumentRenderer = oapiReq.DocumentRenderer
q.GraphSearches = oapiReq.GraphSearches
q.Join = oapiReq.Join
q.ForeignSources = oapiReq.ForeignSources
// Unmarshal query fields (only if not null and not empty)
if len(oapiReq.FilterQuery) > 0 && !bytes.Equal(oapiReq.FilterQuery, []byte("null")) {
q.FilterQuery = new(query.Query)
if err := json.Unmarshal(oapiReq.FilterQuery, q.FilterQuery); err != nil {
return fmt.Errorf("unmarshalling filter_query: %w", err)
}
}
if len(oapiReq.FullTextSearch) > 0 && !bytes.Equal(oapiReq.FullTextSearch, []byte("null")) {
q.FullTextSearch = new(query.Query)
if err := json.Unmarshal(oapiReq.FullTextSearch, q.FullTextSearch); err != nil {
return fmt.Errorf("unmarshalling full_text_search: %w", err)
}
}
if len(oapiReq.ExclusionQuery) > 0 && !bytes.Equal(oapiReq.ExclusionQuery, []byte("null")) {
q.ExclusionQuery = new(query.Query)
if err := json.Unmarshal(oapiReq.ExclusionQuery, q.ExclusionQuery); err != nil {
return fmt.Errorf("unmarshalling exclusion_query: %w", err)
}
}
return nil
}
// MultiBatchRequest represents a cross-table batch operation.
type MultiBatchRequest struct {
// Tables maps table names to their batch operations.
Tables map[string]BatchRequest `json:"tables"`
// SyncLevel Synchronization level for the batch operation.
SyncLevel SyncLevel `json:"sync_level,omitempty"`
}
// MultiBatchResult represents the result of a cross-table batch operation.
type MultiBatchResult struct {
// Tables maps table names to their batch results.
Tables map[string]BatchResult `json:"tables,omitempty"`
}
// TransactionCommitResult represents the result of an OCC transaction commit.
type TransactionCommitResult struct {
// Status is "committed" or "aborted".
Status string `json:"status"`
// Conflict details (only present when status is "aborted").
Conflict *TransactionConflict `json:"conflict,omitempty"`
// Tables maps table names to their batch results (only present when committed).
Tables map[string]BatchResult `json:"tables,omitempty"`
}
// TransactionConflict describes a version conflict that caused a transaction abort.
type TransactionConflict struct {
Table string `json:"table,omitempty"`
Key string `json:"key,omitempty"`
Message string `json:"message,omitempty"`
}