Skip to content

Commit e92b27a

Browse files
authored
[core] Introduce vector-store for data-evolution table (#7240)
1 parent 79f70ea commit e92b27a

File tree

34 files changed

+2096
-145
lines changed

34 files changed

+2096
-145
lines changed
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
---
2+
title: "Vector Storage"
3+
weight: 7
4+
type: docs
5+
aliases:
6+
- /append-table/vector-storage.html
7+
---
8+
<!--
9+
Licensed to the Apache Software Foundation (ASF) under one
10+
or more contributor license agreements. See the NOTICE file
11+
distributed with this work for additional information
12+
regarding copyright ownership. The ASF licenses this file
13+
to you under the Apache License, Version 2.0 (the
14+
"License"); you may not use this file except in compliance
15+
with the License. You may obtain a copy of the License at
16+
17+
http://www.apache.org/licenses/LICENSE-2.0
18+
19+
Unless required by applicable law or agreed to in writing,
20+
software distributed under the License is distributed on an
21+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
22+
KIND, either express or implied. See the License for the
23+
specific language governing permissions and limitations
24+
under the License.
25+
-->
26+
27+
# Vector Storage
28+
29+
## Overview
30+
31+
With the explosive growth of AI scenarios, vector storage has become increasingly important.
32+
33+
Paimon provides optimized storage solutions specifically designed for vector data to meet the needs of various scenarios.
34+
35+
## Vector Data Type
36+
37+
Vector data comes in many types, among which dense vectors are the most commonly used. They are typically expressed as fixed-length, densely packed arrays, generally without `null` elements.
38+
39+
Paimon supports defining columns of type `VECTOR<t, n>`, which represents a fixed-length, dense vector column, where:
40+
- **`t`**: The element type of the vector. Currently supports seven primitive types: `BOOLEAN`, `TINYINT`, `SMALLINT`, `INT`, `BIGINT`, `FLOAT`, `DOUBLE`;
41+
- **`n`**: The vector dimension, must be a positive integer not exceeding `2,147,483,647`;
42+
- **`null constraint`**: `VECTOR` type supports defining `NOT NULL` or the default nullable. However, if a specific `VECTOR` value itself is not `null`, its elements are not allowed to be `null`.
43+
44+
Compared to variable-length arrays, these features make dense vectors more concise in storage and memory representation, with benefits including:
45+
- More natural semantic constraints, preventing mismatched lengths, `null` elements, and other anomalies at the data storage layer;
46+
- Better point-lookup performance, eliminating offset array storage and access;
47+
- Closer alignment with type representations in specialized vector engines, often avoiding memory copies and type conversions during queries.
48+
49+
Example: Define a table with a `VECTOR` column using Java API and write one row of data.
50+
```java
51+
public class CreateTableWithVector {
52+
53+
public static void main(String[] args) throws Exception {
54+
// Schema
55+
Schema.Builder schemaBuilder = Schema.newBuilder();
56+
schemaBuilder.column("id", DataTypes.BIGINT());
57+
schemaBuilder.column("embed", DataTypes.VECTOR(3, DataTypes.FLOAT()));
58+
schemaBuilder.option(CoreOptions.FILE_FORMAT.key(), "lance");
59+
schemaBuilder.option(CoreOptions.FILE_COMPRESSION.key(), "none");
60+
Schema schema = schemaBuilder.build();
61+
62+
// Create catalog
63+
String database = "default";
64+
String tempPath = System.getProperty("java.io.tmpdir") + UUID.randomUUID();
65+
Path warehouse = new Path(TraceableFileIO.SCHEME + "://" + tempPath);
66+
Identifier identifier = Identifier.create("default", "my_table");
67+
try (Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(warehouse))) {
68+
69+
// Create table
70+
catalog.createDatabase(database, true);
71+
catalog.createTable(identifier, schema, true);
72+
FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
73+
74+
// Write data
75+
BatchWriteBuilder builder = table.newBatchWriteBuilder();
76+
InternalVector vector = BinaryVector.fromPrimitiveArray(new float[] {1.0f, 2.0f, 3.0f});
77+
try (BatchTableWrite batchTableWrite = builder.newWrite()) {
78+
try (BatchTableCommit commit = builder.newCommit()) {
79+
batchTableWrite.write(GenericRow.of(1L, vector));
80+
commit.commit(batchTableWrite.prepareCommit());
81+
}
82+
}
83+
84+
// Read data
85+
ReadBuilder readBuilder = table.newReadBuilder();
86+
TableScan.Plan plan = readBuilder.newScan().plan();
87+
try (RecordReader<InternalRow> reader = readBuilder.newRead().createReader(plan)) {
88+
reader.forEachRemaining(row -> {
89+
float[] readVector = row.getVector(1).toFloatArray();
90+
System.out.println(Arrays.toString(readVector));
91+
});
92+
}
93+
}
94+
}
95+
}
96+
```
97+
98+
**Notes**:
99+
- Columns of `VECTOR` type cannot be used as primary key columns, partition columns, or for sorting.
100+
101+
## Engine-Level Representation
102+
103+
Since engine layers typically don't have dedicated vector types, to support `VECTOR` type in engine SQL, Paimon provides a separate configuration to convert the engine's `ARRAY` type to Paimon's `VECTOR` type.
104+
105+
Usage:
106+
- **`'vector-field'`**: Declare columns as `VECTOR` type, multiple columns separated by commas (`,`);
107+
- **`'field.{field-name}.vector-dim'`**: Declare the dimension of the vector column.
108+
109+
Example: Define a table with a `VECTOR` column using Flink SQL.
110+
```sql
111+
CREATE TABLE IF NOT EXISTS ts_table (
112+
id BIGINT,
113+
embed1 ARRAY<FLOAT>,
114+
embed2 ARRAY<FLOAT>
115+
) WITH (
116+
'file.format' = 'lance',
117+
'vector-field' = 'embed1,embed2',
118+
'field.embed1.vector-dim' = '128',
119+
'field.embed2.vector-dim' = '768'
120+
);
121+
```
122+
123+
**Notes**:
124+
- When defining `vector-field` columns, you must provide the vector dimension; otherwise, the CREATE TABLE statement will fail;
125+
- Currently, only Flink SQL supports this configuration; other engines have not been implemented yet.
126+
127+
## Specify File Format for Vector
128+
129+
When mapping `VECTOR` type to the file format layer, the ideal storage format is `FixedSizeList`. Currently, this is only supported for certain file formats (such as `lance`) through the `paimon-arrow` integration. This means that to use `VECTOR` type, you must specify a particular format via `file.format`, which has a global impact. In particular, this may be unfavorable for scalars and multimodal (Blob) data.
130+
131+
Therefore, Paimon provides a solution to store vector columns separately within Data Evolution tables.
132+
133+
Layout:
134+
```
135+
table/
136+
├── bucket-0/
137+
│ ├── data-uuid-0.parquet # Contains id, name columns
138+
│ ├── data-uuid-1.blob # Contains blob data
139+
│ ├── data-uuid-2.vector.lance # Contains vector data using lance format
140+
│ └── ...
141+
├── manifest/
142+
├── schema/
143+
└── snapshot/
144+
```
145+
146+
Usage:
147+
- **`vector.file.format`**: Store `VECTOR` type columns separately in the specified file format;
148+
- **`vector.target-file-size`**: If stored separately, specifies the target file size for vector data, defaulting to `10 * 'target-file-size'`.
149+
150+
Example: Store `VECTOR` columns separately using Flink SQL.
151+
```sql
152+
CREATE TABLE IF NOT EXISTS ts_table (
153+
id BIGINT,
154+
embed ARRAY<FLOAT>
155+
) WITH (
156+
'file.format' = 'parquet',
157+
'vector.file.format' = 'lance',
158+
'vector-field' = 'embed',
159+
'field.embed.vector-dim' = '128',
160+
'row-tracking.enabled' = 'true',
161+
'data-evolution.enabled' = 'true'
162+
);
163+
```
164+
165+
**Notes**:
166+
- If `vector.file.format` is the same as `file.format`, the data will be stored together and not separately;
167+
- Only supported for Append tables, not primary key tables, and requires `row-tracking.enabled` and `data-evolution.enabled` to be enabled.

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1506,6 +1506,24 @@
15061506
<td>String</td>
15071507
<td>The Variant shredding schema for writing.</td>
15081508
</tr>
1509+
<tr>
1510+
<td><h5>vector-field</h5></td>
1511+
<td style="word-wrap: break-word;">(none)</td>
1512+
<td>String</td>
1513+
<td>Specifies column names that should be stored as vector type. This is used when you want to treat a ARRAY column as a VECTOR.</td>
1514+
</tr>
1515+
<tr>
1516+
<td><h5>vector.file.format</h5></td>
1517+
<td style="word-wrap: break-word;">(none)</td>
1518+
<td>String</td>
1519+
<td>Specify the vector store file format.</td>
1520+
</tr>
1521+
<tr>
1522+
<td><h5>vector.target-file-size</h5></td>
1523+
<td style="word-wrap: break-word;">(none)</td>
1524+
<td>MemorySize</td>
1525+
<td>Target size of a vector-store file. Default is 10 * TARGET_FILE_SIZE.</td>
1526+
</tr>
15091527
<tr>
15101528
<td><h5>visibility-callback.check-interval</h5></td>
15111529
<td style="word-wrap: break-word;">10 s</td>

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2324,6 +2324,31 @@ public InlineElement getDescription() {
23242324
.withDescription(
23252325
"The interval for checking visibility when visibility-callback enabled.");
23262326

2327+
public static final ConfigOption<String> VECTOR_FILE_FORMAT =
2328+
key("vector.file.format")
2329+
.stringType()
2330+
.noDefaultValue()
2331+
.withDescription("Specify the vector store file format.");
2332+
2333+
public static final ConfigOption<String> VECTOR_FIELD =
2334+
key("vector-field")
2335+
.stringType()
2336+
.noDefaultValue()
2337+
.withDescription(
2338+
"Specifies column names that should be stored as vector type. "
2339+
+ "This is used when you want to treat a ARRAY column as a VECTOR.");
2340+
2341+
public static final ConfigOption<MemorySize> VECTOR_TARGET_FILE_SIZE =
2342+
key("vector.target-file-size")
2343+
.memoryType()
2344+
.noDefaultValue()
2345+
.withDescription(
2346+
Description.builder()
2347+
.text(
2348+
"Target size of a vector-store file."
2349+
+ " Default is 10 * TARGET_FILE_SIZE.")
2350+
.build());
2351+
23272352
private final Options options;
23282353

23292354
public CoreOptions(Map<String, String> options) {
@@ -3647,6 +3672,33 @@ public Duration visibilityCallbackCheckInterval() {
36473672
return options.get(VISIBILITY_CALLBACK_CHECK_INTERVAL);
36483673
}
36493674

3675+
public String vectorFileFormatString() {
3676+
return normalizeFileFormat(options.get(VECTOR_FILE_FORMAT));
3677+
}
3678+
3679+
public Set<String> vectorField() {
3680+
String vectorFields = options.get(CoreOptions.VECTOR_FIELD);
3681+
if (vectorFields == null || vectorFields.trim().isEmpty()) {
3682+
return Collections.emptySet();
3683+
}
3684+
return Arrays.stream(vectorFields.trim().split(",")).collect(Collectors.toSet());
3685+
}
3686+
3687+
public static Set<String> vectorField(Map<String, String> options) {
3688+
String vectorFields = options.getOrDefault(CoreOptions.VECTOR_FIELD.key(), null);
3689+
if (vectorFields == null || vectorFields.trim().isEmpty()) {
3690+
return Collections.emptySet();
3691+
}
3692+
return Arrays.stream(vectorFields.trim().split(",")).collect(Collectors.toSet());
3693+
}
3694+
3695+
public long vectorTargetFileSize() {
3696+
// Since vectors are large, it would be better to set a larger target size for vectors.
3697+
return options.getOptional(VECTOR_TARGET_FILE_SIZE)
3698+
.map(MemorySize::getBytes)
3699+
.orElse(10 * targetFileSize(false));
3700+
}
3701+
36503702
/** Specifies the merge engine for table with primary key. */
36513703
public enum MergeEngine implements DescribedEnum {
36523704
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),

paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,14 @@ public static FileFormat fileFormat(CoreOptions options) {
107107
return FileFormat.fromIdentifier(options.fileFormatString(), options.toConfiguration());
108108
}
109109

110+
public static FileFormat vectorFileFormat(CoreOptions options) {
111+
String vectorFileFormat = options.vectorFileFormatString();
112+
if (vectorFileFormat == null) {
113+
return fileFormat(options);
114+
}
115+
return FileFormat.fromIdentifier(vectorFileFormat, options.toConfiguration());
116+
}
117+
110118
public static FileFormat manifestFormat(CoreOptions options) {
111119
return FileFormat.fromIdentifier(options.manifestFormatString(), options.toConfiguration());
112120
}

paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.paimon.operation.BlobFileContext;
4242
import org.apache.paimon.options.MemorySize;
4343
import org.apache.paimon.reader.RecordReaderIterator;
44+
import org.apache.paimon.types.DataField;
4445
import org.apache.paimon.types.RowType;
4546
import org.apache.paimon.utils.BatchRecordWriter;
4647
import org.apache.paimon.utils.CommitIncrement;
@@ -52,15 +53,20 @@
5253
import org.apache.paimon.utils.SinkWriter.BufferedSinkWriter;
5354
import org.apache.paimon.utils.SinkWriter.DirectSinkWriter;
5455
import org.apache.paimon.utils.StatsCollectorFactories;
56+
import org.apache.paimon.utils.VectorStoreUtils;
5557

5658
import javax.annotation.Nullable;
5759

5860
import java.util.ArrayList;
5961
import java.util.Collection;
6062
import java.util.Collections;
6163
import java.util.List;
64+
import java.util.Set;
6265
import java.util.concurrent.ExecutionException;
6366
import java.util.function.Supplier;
67+
import java.util.stream.Collectors;
68+
69+
import static org.apache.paimon.types.BlobType.fieldsInBlobFile;
6470

6571
/**
6672
* A {@link RecordWriter} implementation that only accepts records which are always insert
@@ -71,8 +77,10 @@ public class AppendOnlyWriter implements BatchRecordWriter, MemoryOwner {
7177
private final FileIO fileIO;
7278
private final long schemaId;
7379
private final FileFormat fileFormat;
80+
private final FileFormat vectorFileFormat;
7481
private final long targetFileSize;
7582
private final long blobTargetFileSize;
83+
private final long vectorTargetFileSize;
7684
private final RowType writeSchema;
7785
@Nullable private final List<String> writeCols;
7886
private final DataFilePathFactory pathFactory;
@@ -103,8 +111,10 @@ public AppendOnlyWriter(
103111
@Nullable IOManager ioManager,
104112
long schemaId,
105113
FileFormat fileFormat,
114+
FileFormat vectorFileFormat,
106115
long targetFileSize,
107116
long blobTargetFileSize,
117+
long vectorTargetFileSize,
108118
RowType writeSchema,
109119
@Nullable List<String> writeCols,
110120
long maxSequenceNumber,
@@ -127,8 +137,10 @@ public AppendOnlyWriter(
127137
this.fileIO = fileIO;
128138
this.schemaId = schemaId;
129139
this.fileFormat = fileFormat;
140+
this.vectorFileFormat = vectorFileFormat;
130141
this.targetFileSize = targetFileSize;
131142
this.blobTargetFileSize = blobTargetFileSize;
143+
this.vectorTargetFileSize = vectorTargetFileSize;
132144
this.writeSchema = writeSchema;
133145
this.writeCols = writeCols;
134146
this.pathFactory = pathFactory;
@@ -302,13 +314,38 @@ public void toBufferedWriter() throws Exception {
302314
}
303315

304316
private RollingFileWriter<InternalRow, DataFileMeta> createRollingRowWriter() {
305-
if (blobContext != null) {
306-
return new RollingBlobFileWriter(
317+
boolean hasNormal, hasBlob, hasVectorStore;
318+
{
319+
hasBlob = (blobContext != null);
320+
321+
List<DataField> fieldsInVectorFile =
322+
VectorStoreUtils.fieldsInVectorFile(writeSchema, fileFormat, vectorFileFormat);
323+
Set<String> vectorFieldNames =
324+
fieldsInVectorFile.stream().map(DataField::name).collect(Collectors.toSet());
325+
hasVectorStore = !fieldsInVectorFile.isEmpty();
326+
327+
List<DataField> fieldsInBlobFile =
328+
hasBlob
329+
? fieldsInBlobFile(writeSchema, blobContext.blobDescriptorFields())
330+
: Collections.emptyList();
331+
Set<String> blobFieldNames =
332+
fieldsInBlobFile.stream().map(DataField::name).collect(Collectors.toSet());
333+
hasNormal =
334+
writeSchema.getFields().stream()
335+
.anyMatch(
336+
f ->
337+
!blobFieldNames.contains(f.name())
338+
&& !vectorFieldNames.contains(f.name()));
339+
}
340+
if (hasBlob || (hasNormal && hasVectorStore)) {
341+
return new DataEvolutionRollingFileWriter(
307342
fileIO,
308343
schemaId,
309344
fileFormat,
345+
vectorFileFormat,
310346
targetFileSize,
311347
blobTargetFileSize,
348+
vectorTargetFileSize,
312349
writeSchema,
313350
pathFactory,
314351
seqNumCounterProvider,
@@ -319,11 +356,13 @@ private RollingFileWriter<InternalRow, DataFileMeta> createRollingRowWriter() {
319356
statsDenseStore,
320357
blobContext);
321358
}
359+
FileFormat realFileFormat = hasNormal ? fileFormat : vectorFileFormat;
360+
long realTargetFileSize = hasNormal ? targetFileSize : vectorTargetFileSize;
322361
return new RowDataRollingFileWriter(
323362
fileIO,
324363
schemaId,
325-
fileFormat,
326-
targetFileSize,
364+
realFileFormat,
365+
realTargetFileSize,
327366
writeSchema,
328367
pathFactory,
329368
seqNumCounterProvider,

0 commit comments

Comments
 (0)