Skip to content

Commit b5adc12

Browse files
committed
Release v1.6.0
1 parent 8b959bc commit b5adc12

23 files changed

Lines changed: 784 additions & 326 deletions

scripts/catalog.sql

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
CREATE TABLE IF NOT EXISTS iceberg_tables (
2-
catalog_name VARCHAR(255) NOT NULL,
32
table_namespace VARCHAR(255) NOT NULL,
43
table_name VARCHAR(255) NOT NULL,
54
metadata_location VARCHAR(1000),
6-
previous_metadata_location VARCHAR(1000),
7-
PRIMARY KEY (catalog_name, table_namespace, table_name)
5+
columns JSONB
86
);
97

8+
CREATE UNIQUE INDEX IF NOT EXISTS idx_tables ON iceberg_tables (table_namespace, table_name);
9+
1010
CREATE TABLE IF NOT EXISTS iceberg_materialized_views (
1111
schema_name VARCHAR(255) NOT NULL,
1212
table_name VARCHAR(255) NOT NULL,

src/common/common_config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package common
22

33
const (
4-
VERSION = "1.5.0"
4+
VERSION = "1.6.0"
55

66
ENV_LOG_LEVEL = "BEMIDB_LOG_LEVEL"
77
ENV_DISABLE_ANONYMOUS_ANALYTICS = "BEMIDB_DISABLE_ANONYMOUS_ANALYTICS"

src/common/iceberg_catalog.go

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,18 @@ package common
22

33
import (
44
"context"
5+
"encoding/json"
56
"fmt"
67
"strings"
78
)
89

910
const (
1011
TEMP_TABLE_SUFFIX_SYNCING = "-bemidb-syncing"
1112
TEMP_TABLE_SUFFIX_DELETING = "-bemidb-deleting"
12-
13-
CATALOG_NAME_PLACEHOLDER = "postgres" // Trino legacy, to delete
1413
)
1514

15+
// ---------------------------------------------------------------------------------------------------------------------
16+
1617
type IcebergSchemaTable struct {
1718
Schema string
1819
Table string
@@ -26,6 +27,8 @@ func (schemaTable IcebergSchemaTable) String() string {
2627
return fmt.Sprintf(`"%s"."%s"`, schemaTable.Schema, schemaTable.Table)
2728
}
2829

30+
// ---------------------------------------------------------------------------------------------------------------------
31+
2932
type IcebergMaterializedView struct {
3033
Schema string
3134
Table string
@@ -39,6 +42,8 @@ func (view IcebergMaterializedView) ToIcebergSchemaTable() IcebergSchemaTable {
3942
}
4043
}
4144

45+
// ---------------------------------------------------------------------------------------------------------------------
46+
4247
type IcebergCatalog struct {
4348
Config *CommonConfig
4449
}
@@ -170,6 +175,29 @@ func (catalog *IcebergCatalog) SchemaTableNames(schemaName string) Set[string] {
170175
return tableNames
171176
}
172177

178+
func (catalog *IcebergCatalog) TableColumns(icebergSchemaTable IcebergSchemaTable) ([]CatalogTableColumn, error) {
179+
pgClient := catalog.newPostgresClient()
180+
defer pgClient.Close()
181+
182+
var columnsJson []byte
183+
err := pgClient.QueryRow(
184+
context.Background(),
185+
"SELECT columns FROM iceberg_tables WHERE table_namespace=$1 AND table_name=$2",
186+
icebergSchemaTable.Schema, icebergSchemaTable.Table,
187+
).Scan(&columnsJson)
188+
if err != nil {
189+
return nil, err
190+
}
191+
192+
var catalogTableColumns []CatalogTableColumn
193+
err = json.Unmarshal(columnsJson, &catalogTableColumns)
194+
if err != nil {
195+
return nil, err
196+
}
197+
198+
return catalogTableColumns, nil
199+
}
200+
173201
func (catalog *IcebergCatalog) TableS3Path(icebergTableName IcebergSchemaTable) string {
174202
metadataFileS3Path := catalog.MetadataFileS3Path(icebergTableName)
175203
if metadataFileS3Path == "" {
@@ -181,17 +209,24 @@ func (catalog *IcebergCatalog) TableS3Path(icebergTableName IcebergSchemaTable)
181209

182210
// Write ---------------------------------------------------------------------------------------------------------------
183211

184-
func (catalog *IcebergCatalog) CreateTable(icebergSchemaTable IcebergSchemaTable, metadataLocation string) {
212+
func (catalog *IcebergCatalog) CreateTable(icebergSchemaTable IcebergSchemaTable, metadataLocation string, icebergSchemaColumns []*IcebergSchemaColumn) {
185213
pgClient := catalog.newPostgresClient()
186214
defer pgClient.Close()
187215

188-
_, err := pgClient.Exec(
216+
catalogTableColumns := make([]CatalogTableColumn, len(icebergSchemaColumns))
217+
for i, icebergSchemaColumn := range icebergSchemaColumns {
218+
catalogTableColumns[i] = icebergSchemaColumn.CatalogTableColumn()
219+
}
220+
columnsJson, err := json.Marshal(catalogTableColumns)
221+
PanicIfError(catalog.Config, err)
222+
223+
_, err = pgClient.Exec(
189224
context.Background(),
190-
"INSERT INTO iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) VALUES ($1, $2, $3, $4)",
191-
CATALOG_NAME_PLACEHOLDER,
225+
"INSERT INTO iceberg_tables (table_namespace, table_name, metadata_location, columns) VALUES ($1, $2, $3, $4)",
192226
icebergSchemaTable.Schema,
193227
icebergSchemaTable.Table,
194228
metadataLocation,
229+
columnsJson,
195230
)
196231
PanicIfError(catalog.Config, err)
197232
}

src/common/iceberg_schema_column.go

Lines changed: 97 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
)
1515

1616
type IcebergColumnType string
17+
type IcebergLogicalColumnType string
1718

1819
const (
1920
IcebergColumnTypeBoolean IcebergColumnType = "boolean"
@@ -29,50 +30,81 @@ const (
2930
IcebergColumnTypeTimestamp IcebergColumnType = "timestamp"
3031
IcebergColumnTypeBinary IcebergColumnType = "binary"
3132

33+
IcebergLogicalColumnTypeInterval IcebergLogicalColumnType = "interval"
34+
IcebergLogicalColumnTypeBpchar IcebergLogicalColumnType = "bpchar"
35+
IcebergLogicalColumnTypePoint IcebergLogicalColumnType = "point"
36+
IcebergLogicalColumnTypeJson IcebergLogicalColumnType = "json"
37+
IcebergLogicalColumnTypeUserDefined IcebergLogicalColumnType = "user_defined"
38+
3239
BEMIDB_NULL_STRING = "BEMIDB_NULL"
3340

3441
PARQUET_NAN = 0 // DuckDB crashes on NaN, libc++abi: terminating due to uncaught exception of type duckdb::InvalidConfigurationException: {"exception_type":"Invalid Configuration","exception_message":"Column float4_column lower bound deserialization failed: Failed to deserialize blob '' of size 0, attempting to produce value of type 'FLOAT'"}
3542
PARQUET_MAX_DECIMAL_PRECISION = 38
3643
PARQUET_FALLBACK_DECIMAL_SCALE = 6
3744
PARQUET_NESTED_FIELD_ID_PREFIX = 1000
38-
39-
PG_USER_DEFINED_PRIMITIVE_TYPE = "USER_DEFINED"
4045
)
4146

42-
type IcebergSchemaColumn struct {
43-
Config *CommonConfig
44-
ColumnName string
45-
ColumnType IcebergColumnType
46-
Position int
47-
NumericPrecision int
48-
NumericScale int
49-
DatetimePrecision int
50-
IsList bool
51-
IsRequired bool
52-
IsPartOfUniqueIndex bool
53-
PgPrimitiveColumnType string
47+
type CatalogTableColumn struct {
48+
Name string `json:"name"`
49+
Type string `json:"type"`
50+
Position int `json:"position"`
51+
List bool `json:"list"`
52+
Required bool `json:"required"`
53+
}
54+
55+
func (tableColumn CatalogTableColumn) ToSql() string {
56+
sql := fmt.Sprintf(`"%s" %s`, tableColumn.Name, tableColumn.Type)
57+
58+
if tableColumn.List {
59+
sql += "[]"
60+
}
61+
62+
if tableColumn.Required {
63+
sql += " NOT NULL"
64+
}
65+
66+
return sql
5467
}
5568

56-
type ParquetSchemaField struct {
57-
FieldId string
58-
Name string
59-
Type string
60-
ConvertedType string
61-
RepetitionType string
62-
Scale string
63-
Precision string
64-
Length string
65-
NestedType string
66-
NestedConvertedType string
67-
NestedRepetitionType string
68-
NestedFieldId string
69+
func (tableColumn CatalogTableColumn) ToMetadataFieldMap() map[string]interface{} {
70+
primitiveType := tableColumn.Type
71+
if primitiveType == "json" {
72+
primitiveType = "string"
73+
}
74+
75+
result := map[string]interface{}{
76+
"id": tableColumn.Position,
77+
"name": tableColumn.Name,
78+
"type": primitiveType,
79+
"required": tableColumn.Required,
80+
}
81+
82+
if tableColumn.List {
83+
result["type"] = map[string]interface{}{
84+
"type": "list",
85+
"element": primitiveType,
86+
"element-id": PARQUET_NESTED_FIELD_ID_PREFIX + tableColumn.Position,
87+
"element-required": false,
88+
}
89+
}
90+
91+
return result
6992
}
7093

71-
type IcebergSchemaField struct {
72-
Id int `json:"id"`
73-
Name string `json:"name"`
74-
Type interface{} `json:"type"`
75-
Required bool `json:"required"`
94+
// ---------------------------------------------------------------------------------------------------------------------
95+
96+
type IcebergSchemaColumn struct {
97+
Config *CommonConfig
98+
ColumnName string
99+
ColumnType IcebergColumnType
100+
LogicalColumnType IcebergLogicalColumnType
101+
Position int
102+
NumericPrecision int
103+
NumericScale int
104+
DatetimePrecision int
105+
IsList bool
106+
IsRequired bool
107+
IsPartOfUniqueIndex bool
76108
}
77109

78110
func (col *IcebergSchemaColumn) NormalizedColumnName() string {
@@ -97,51 +129,47 @@ func (col *IcebergSchemaColumn) NormalizedScale() int {
97129
return col.NumericScale
98130
}
99131

100-
func (col *IcebergSchemaColumn) IcebergSchemaFieldMap() IcebergSchemaField {
101-
icebergSchemaField := IcebergSchemaField{
102-
Id: col.Position,
103-
Name: col.NormalizedColumnName(),
132+
func (col *IcebergSchemaColumn) CatalogTableColumn() CatalogTableColumn {
133+
catalogTableColumn := CatalogTableColumn{
134+
Name: col.NormalizedColumnName(),
135+
Position: col.Position,
136+
Required: col.IsRequired,
137+
List: col.IsList,
104138
}
105139

106140
switch col.ColumnType {
107141
case IcebergColumnTypeBoolean:
108-
icebergSchemaField.Type = "boolean"
142+
catalogTableColumn.Type = "boolean"
109143
case IcebergColumnTypeString:
110-
icebergSchemaField.Type = "string"
144+
switch col.LogicalColumnType {
145+
case IcebergLogicalColumnTypeJson:
146+
catalogTableColumn.Type = "json"
147+
default:
148+
catalogTableColumn.Type = "string"
149+
}
111150
case IcebergColumnTypeInteger:
112-
icebergSchemaField.Type = "int"
151+
catalogTableColumn.Type = "int"
113152
case IcebergColumnTypeDecimal:
114-
icebergSchemaField.Type = "decimal(" + IntToString(col.NormalizedPrecision()) + ", " + IntToString(col.NormalizedScale()) + ")"
153+
catalogTableColumn.Type = "decimal(" + IntToString(col.NormalizedPrecision()) + ", " + IntToString(col.NormalizedScale()) + ")"
115154
case IcebergColumnTypeLong:
116-
icebergSchemaField.Type = "long"
155+
catalogTableColumn.Type = "long"
117156
case IcebergColumnTypeFloat:
118-
icebergSchemaField.Type = "float"
157+
catalogTableColumn.Type = "float"
119158
case IcebergColumnTypeDouble:
120-
icebergSchemaField.Type = "double"
159+
catalogTableColumn.Type = "double"
121160
case IcebergColumnTypeDate:
122-
icebergSchemaField.Type = "date"
161+
catalogTableColumn.Type = "date"
123162
case IcebergColumnTypeTime, IcebergColumnTypeTimeTz:
124-
icebergSchemaField.Type = "time"
163+
catalogTableColumn.Type = "time"
125164
case IcebergColumnTypeTimestamp:
126-
icebergSchemaField.Type = "timestamp"
165+
catalogTableColumn.Type = "timestamp"
127166
case IcebergColumnTypeBinary:
128-
icebergSchemaField.Type = "binary"
167+
catalogTableColumn.Type = "binary"
129168
default:
130169
panic("Unsupported column type: " + string(col.ColumnType))
131170
}
132171

133-
icebergSchemaField.Required = col.IsRequired
134-
135-
if col.IsList {
136-
icebergSchemaField.Type = map[string]interface{}{
137-
"type": "list",
138-
"element": icebergSchemaField.Type,
139-
"element-id": PARQUET_NESTED_FIELD_ID_PREFIX + col.Position,
140-
"element-required": false,
141-
}
142-
}
143-
144-
return icebergSchemaField
172+
return catalogTableColumn
145173
}
146174

147175
func (col *IcebergSchemaColumn) DuckdbType() string {
@@ -250,8 +278,8 @@ func (col *IcebergSchemaColumn) duckdbPrimitiveValueFromCsv(value string) interf
250278
}
251279
return valueFloat
252280
case IcebergColumnTypeDecimal:
253-
switch col.PgPrimitiveColumnType {
254-
case "interval":
281+
switch col.LogicalColumnType {
282+
case IcebergLogicalColumnTypeInterval:
255283
microseconds := 0
256284

257285
parts := strings.Split(value, " ")
@@ -341,13 +369,13 @@ func (col *IcebergSchemaColumn) duckdbPrimitiveValueFromJson(value any) interfac
341369
return value
342370
}
343371
case IcebergColumnTypeString:
344-
switch col.PgPrimitiveColumnType {
345-
case "bpchar":
372+
switch col.LogicalColumnType {
373+
case IcebergLogicalColumnTypeBpchar:
346374
return strings.TrimRight(value.(string), " ")
347-
case "point":
375+
case IcebergLogicalColumnTypePoint:
348376
valueMap := value.(map[string]interface{})
349377
return "(" + Float64ToString(valueMap["x"].(float64)) + "," + Float64ToString(valueMap["y"].(float64)) + ")"
350-
case PG_USER_DEFINED_PRIMITIVE_TYPE:
378+
case IcebergLogicalColumnTypeUserDefined:
351379
valueString := value.(string)
352380
valueDecodedHex, err := HexToString(valueString)
353381
if err == nil {
@@ -378,6 +406,8 @@ func (col *IcebergSchemaColumn) duckdbPrimitiveValueFromJson(value any) interfac
378406
case reflect.Float64:
379407
days := value.(float64)
380408
return time.Unix(0, 0).UTC().AddDate(0, 0, int(days))
409+
default:
410+
Panic(col.Config, fmt.Sprintf("Unsupported value: %v for column type: %s", value, col.ColumnType))
381411
}
382412
case IcebergColumnTypeTime:
383413
var nanoseconds int64
@@ -426,6 +456,8 @@ func (col *IcebergSchemaColumn) duckdbPrimitiveValueFromJson(value any) interfac
426456
}
427457
milliseconds := int64(valueFloat)
428458
return epoch.Add(time.Duration(milliseconds) * time.Millisecond)
459+
default:
460+
Panic(col.Config, fmt.Sprintf("Unsupported value: %v for column type: %s", value, col.ColumnType))
429461
}
430462
}
431463

src/common/iceberg_table.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ func (table *IcebergTable) MetadataFileS3Path() string {
3838
return table.IcebergCatalog.MetadataFileS3Path(table.IcebergSchemaTable)
3939
}
4040

41-
func (table *IcebergTable) Create(tableS3Path string) {
41+
func (table *IcebergTable) Create(tableS3Path string, icebergSchemaColumns []*IcebergSchemaColumn) {
4242
LogInfo(table.Config, "Creating Iceberg table:", table.IcebergSchemaTable.Table)
43-
table.IcebergCatalog.CreateTable(table.IcebergSchemaTable, tableS3Path+"/metadata/"+ICEBERG_METADATA_INITIAL_FILE_NAME)
43+
table.IcebergCatalog.CreateTable(table.IcebergSchemaTable, tableS3Path+"/metadata/"+ICEBERG_METADATA_INITIAL_FILE_NAME, icebergSchemaColumns)
4444
}
4545

4646
func (table *IcebergTable) ReplaceWith(callbackFunc func(syncingIcebergTable *IcebergTable)) {

src/common/iceberg_table_writer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ func (writer *IcebergTableWriter) InsertFromQuery(query string) error {
162162
LogInfo(writer.Config, "Written", parquetFile.RecordCount, "records in Parquet file #"+IntToString(len(parquetFilesSortedAsc)), "("+writer.formattedParquetFileSize(parquetFile.Size)+")")
163163

164164
// Create as table
165-
writer.IcebergTable.Create(tableS3Path)
165+
writer.IcebergTable.Create(tableS3Path, icebergSchemaColumns)
166166

167167
return nil
168168
}
@@ -213,7 +213,7 @@ func (writer *IcebergTableWriter) insertRows(loadRowsToDuckdbTableFunc func(duck
213213

214214
// Create table
215215
if len(parquetFilesSortedAsc) == 1 {
216-
writer.IcebergTable.Create(tableS3Path)
216+
writer.IcebergTable.Create(tableS3Path, writer.IcebergSchemaColumns)
217217
}
218218

219219
// Delete old files

0 commit comments

Comments
 (0)