Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions docs/content/append-table/blob.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ This allows one table to mix raw-data BLOB fields, descriptor-only BLOB fields,
<td>No</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Specifies column names that should be stored as blob type. This is used when you want to treat a BYTES column as a BLOB.</td>
<td>Specifies column names that should be stored as blob type. This is used when you want to treat a BYTES column as a BLOB. Fields listed in <code>blob-descriptor-field</code> or <code>blob-view-field</code> are also treated as BLOB fields.</td>
</tr>
<tr>
<td><h5>blob-as-descriptor</h5></td>
Expand All @@ -120,7 +120,7 @@ This allows one table to mix raw-data BLOB fields, descriptor-only BLOB fields,
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>
Comma-separated BLOB field names stored as serialized <code>BlobDescriptor</code> bytes inline in normal data files.
Comma-separated field names treated as BLOB fields and stored as serialized <code>BlobDescriptor</code> bytes inline in normal data files.
By default, all blob fields store blob bytes in separate <code>.blob</code> files.
If configured, one table can mix:
some BLOB fields in <code>.blob</code> files and some as descriptor references.
Expand All @@ -132,9 +132,9 @@ This allows one table to mix raw-data BLOB fields, descriptor-only BLOB fields,
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>
Comma-separated BLOB field names stored as serialized <code>BlobViewStruct</code> bytes inline in normal data files.
Comma-separated field names treated as BLOB fields and stored as serialized <code>BlobViewStruct</code> bytes inline in normal data files.
The field values reference BLOB values in upstream tables and are resolved at read time.
This option must be a subset of <code>blob-field</code> and must not overlap with <code>blob-descriptor-field</code>.
This option must not overlap with <code>blob-descriptor-field</code>.
</td>
</tr>
<tr>
Expand Down Expand Up @@ -300,7 +300,7 @@ Blob view is useful when a downstream table should reference BLOB values already
Blob view requires:

- the upstream table to have row tracking enabled, so each row has a stable `_ROW_ID`
- the downstream field to be listed in both `blob-field` and `blob-view-field`
- the downstream field to be listed in `blob-view-field`
- writes to provide a serialized `BlobViewStruct`; in Flink SQL, use the built-in `sys.blob_view` function

The Flink SQL function signature is:
Expand Down Expand Up @@ -335,7 +335,6 @@ CREATE TABLE image_view_table (
) WITH (
'row-tracking.enabled' = 'true',
'data-evolution.enabled' = 'true',
'blob-field' = 'image_ref',
'blob-view-field' = 'image_ref'
);

Expand Down
6 changes: 3 additions & 3 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
<td><h5>blob-descriptor-field</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Comma-separated BLOB field names, selected from blob-field, to store as serialized BlobDescriptor bytes inline in data files.</td>
<td>Comma-separated field names to treat as BLOB fields and store as serialized BlobDescriptor bytes inline in data files.</td>
</tr>
<tr>
<td><h5>blob-external-storage-field</h5></td>
Expand All @@ -84,13 +84,13 @@
<td><h5>blob-field</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Specifies column names that should be stored as blob type. This is used when you want to treat a BYTES column as a BLOB.</td>
<td>Specifies column names that should be stored as blob type. This is used when you want to treat a BYTES column as a BLOB. Fields listed in blob-descriptor-field or blob-view-field are also treated as BLOB fields.</td>
</tr>
<tr>
<td><h5>blob-view-field</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Comma-separated BLOB field names, selected from blob-field, to store as serialized BlobViewStruct bytes inline in data files and resolve from upstream tables at read time.</td>
<td>Comma-separated field names to treat as BLOB fields and store as serialized BlobViewStruct bytes inline in data files and resolve from upstream tables at read time.</td>
</tr>
<tr>
<td><h5>blob.split-by-file-size</h5></td>
Expand Down
8 changes: 5 additions & 3 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -2277,7 +2277,9 @@ public InlineElement getDescription() {
.noDefaultValue()
.withDescription(
"Specifies column names that should be stored as blob type. "
+ "This is used when you want to treat a BYTES column as a BLOB.");
+ "This is used when you want to treat a BYTES column as a BLOB. "
+ "Fields listed in blob-descriptor-field or blob-view-field "
+ "are also treated as BLOB fields.");

@Immutable
public static final ConfigOption<String> BLOB_DESCRIPTOR_FIELD =
Expand All @@ -2286,7 +2288,7 @@ public InlineElement getDescription() {
.noDefaultValue()
.withFallbackKeys("blob.stored-descriptor-fields")
.withDescription(
"Comma-separated BLOB field names, selected from blob-field, to store "
"Comma-separated field names to treat as BLOB fields and store "
+ "as serialized BlobDescriptor bytes inline in data files.");

@Immutable
Expand All @@ -2295,7 +2297,7 @@ public InlineElement getDescription() {
.stringType()
.noDefaultValue()
.withDescription(
"Comma-separated BLOB field names, selected from blob-field, to store "
"Comma-separated field names to treat as BLOB fields and store "
+ "as serialized BlobViewStruct bytes inline in data files and "
+ "resolve from upstream tables at read time.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,10 @@ public static void validateTableSchema(TableSchema schema) {
FileFormat fileFormat =
FileFormat.fromIdentifier(options.formatType(), new Options(schema.options()));
RowType tableRowType = new RowType(schema.fields());
Set<String> blobFields = validateBlobFields(tableRowType, options);
Set<String> blobDescriptorFields =
validateBlobDescriptorFields(tableRowType, options, blobFields);
validateBlobFields(tableRowType, options);
Set<String> blobDescriptorFields = validateBlobDescriptorFields(tableRowType, options);
Set<String> blobViewFields =
validateBlobViewFields(tableRowType, options, blobFields, blobDescriptorFields);
validateBlobViewFields(tableRowType, options, blobDescriptorFields);
Set<String> blobInlineFields = new HashSet<>(blobDescriptorFields);
blobInlineFields.addAll(blobViewFields);
validateBlobExternalStorageFields(tableRowType, options, blobDescriptorFields);
Expand Down Expand Up @@ -719,7 +718,7 @@ private static void validateRowTracking(TableSchema schema, CoreOptions options)
}
}

private static Set<String> validateBlobFields(RowType rowType, CoreOptions options) {
private static void validateBlobFields(RowType rowType, CoreOptions options) {
Set<String> blobFieldNames =
rowType.getFields().stream()
.filter(field -> field.type().getTypeRoot() == DataTypeRoot.BLOB)
Expand All @@ -735,11 +734,9 @@ private static Set<String> validateBlobFields(RowType rowType, CoreOptions optio
field,
CoreOptions.BLOB_FIELD.key());
}
return configured;
}

private static Set<String> validateBlobDescriptorFields(
RowType rowType, CoreOptions options, Set<String> blobFields) {
private static Set<String> validateBlobDescriptorFields(RowType rowType, CoreOptions options) {
Set<String> blobFieldNames =
rowType.getFields().stream()
.filter(field -> field.type().getTypeRoot() == DataTypeRoot.BLOB)
Expand All @@ -752,21 +749,12 @@ private static Set<String> validateBlobDescriptorFields(
"Field '%s' in '%s' must be a BLOB field in table schema.",
field,
CoreOptions.BLOB_DESCRIPTOR_FIELD.key());
checkArgument(
blobFields.contains(field),
"Field '%s' in '%s' must also be in '%s'.",
field,
CoreOptions.BLOB_DESCRIPTOR_FIELD.key(),
CoreOptions.BLOB_FIELD.key());
}
return configured;
}

private static Set<String> validateBlobViewFields(
RowType rowType,
CoreOptions options,
Set<String> blobFields,
Set<String> blobDescriptorFields) {
RowType rowType, CoreOptions options, Set<String> blobDescriptorFields) {
Set<String> blobFieldNames =
rowType.getFields().stream()
.filter(field -> field.type().getTypeRoot() == DataTypeRoot.BLOB)
Expand All @@ -779,12 +767,6 @@ private static Set<String> validateBlobViewFields(
"Field '%s' in '%s' must be a BLOB field in table schema.",
field,
CoreOptions.BLOB_VIEW_FIELD.key());
checkArgument(
blobFields.contains(field),
"Field '%s' in '%s' must also be in '%s'.",
field,
CoreOptions.BLOB_VIEW_FIELD.key(),
CoreOptions.BLOB_FIELD.key());
checkArgument(
!blobDescriptorFields.contains(field),
"Field '%s' in '%s' can not also be in '%s'.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.system.RowTrackingTable;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Range;
Expand Down Expand Up @@ -497,49 +498,32 @@ public void testExternalStorageFieldMustBeSubsetOfDescriptorField() {
}

@Test
public void testBlobViewFieldMustBeSubsetOfBlobField() {
assertThatThrownBy(
() -> {
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.column("f0", DataTypes.INT());
schemaBuilder.column("f1", DataTypes.STRING());
schemaBuilder.column("f2", DataTypes.BLOB());
schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
schemaBuilder.option(CoreOptions.BLOB_VIEW_FIELD.key(), "f2");
catalog.createTable(identifier(), schemaBuilder.build(), true);
})
.hasRootCauseInstanceOf(IllegalArgumentException.class)
.hasRootCauseMessage(
"Field 'f2' in '"
+ CoreOptions.BLOB_VIEW_FIELD.key()
+ "' must also be in '"
+ CoreOptions.BLOB_FIELD.key()
+ "'.");
public void testBlobInlineFieldCanDeclareBlobWithoutBlobField() throws Exception {
assertCreateBlobInlineFieldWithoutBlobField(
"blob_descriptor_without_blob_field", CoreOptions.BLOB_DESCRIPTOR_FIELD.key());
assertCreateBlobInlineFieldWithoutBlobField(
"blob_view_without_blob_field", CoreOptions.BLOB_VIEW_FIELD.key());
}

@Test
public void testBlobDescriptorFieldMustBeSubsetOfBlobField() {
assertThatThrownBy(
() -> {
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.column("f0", DataTypes.INT());
schemaBuilder.column("f1", DataTypes.STRING());
schemaBuilder.column("f2", DataTypes.BLOB());
schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
schemaBuilder.option(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "f2");
catalog.createTable(identifier(), schemaBuilder.build(), true);
})
.hasRootCauseInstanceOf(IllegalArgumentException.class)
.hasRootCauseMessage(
"Field 'f2' in '"
+ CoreOptions.BLOB_DESCRIPTOR_FIELD.key()
+ "' must also be in '"
+ CoreOptions.BLOB_FIELD.key()
+ "'.");
private void assertCreateBlobInlineFieldWithoutBlobField(String tableName, String optionKey)
throws Exception {
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.column("f0", DataTypes.INT());
schemaBuilder.column("f1", DataTypes.STRING());
schemaBuilder.column("f2", DataTypes.BLOB());
schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
schemaBuilder.option(optionKey, "f2");

catalog.createTable(identifier(tableName), schemaBuilder.build(), true);

assertThat(
catalog.getTable(identifier(tableName))
.rowType()
.getTypeAt(2)
.is(DataTypeRoot.BLOB))
.isTrue();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -1091,11 +1092,7 @@ public static Schema fromCatalogTable(CatalogBaseTable catalogTable) {

Map<String, String> options = new HashMap<>(catalogTable.getOptions());
List<String> blobFields = CoreOptions.blobField(options);
Set<String> blobDescriptorFields = new CoreOptions(options).blobDescriptorField();
List<String> blobViewFields = CoreOptions.blobViewField(options);
validateSecondaryBlobFields(
blobFields, blobDescriptorFields, CoreOptions.BLOB_DESCRIPTOR_FIELD.key());
validateSecondaryBlobFields(blobFields, blobViewFields, CoreOptions.BLOB_VIEW_FIELD.key());
Set<String> blobTypeFields = blobTypeFields(options);
if (!blobFields.isEmpty()) {
checkArgument(
options.containsKey(CoreOptions.DATA_EVOLUTION_ENABLED.key()),
Expand Down Expand Up @@ -1124,30 +1121,29 @@ public static Schema fromCatalogTable(CatalogBaseTable catalogTable) {
field ->
schemaBuilder.column(
field.getName(),
resolveDataType(field.getName(), field.getType(), options),
resolveDataType(
field.getName(),
field.getType(),
options,
blobTypeFields),
columnComments.get(field.getName())));

return schemaBuilder.build();
}

private static void validateSecondaryBlobFields(
List<String> blobFields, Iterable<String> secondaryBlobFields, String optionKey) {
for (String secondaryBlobField : secondaryBlobFields) {
checkArgument(
blobFields.contains(secondaryBlobField),
"Field '%s' in '%s' must also be in '%s'.",
secondaryBlobField,
optionKey,
CoreOptions.BLOB_FIELD.key());
}
private static Set<String> blobTypeFields(Map<String, String> options) {
Set<String> blobTypeFields = new HashSet<>(CoreOptions.blobField(options));
blobTypeFields.addAll(new CoreOptions(options).blobDescriptorField());
blobTypeFields.addAll(CoreOptions.blobViewField(options));
return blobTypeFields;
}

private static org.apache.paimon.types.DataType resolveDataType(
String fieldName,
org.apache.flink.table.types.logical.LogicalType logicalType,
Map<String, String> options) {
List<String> blobFields = CoreOptions.blobField(options);
if (blobFields.contains(fieldName)) {
Map<String, String> options,
Set<String> blobTypeFields) {
if (blobTypeFields.contains(fieldName)) {
return toBlobType(logicalType);
}
Set<String> vectorFields = CoreOptions.vectorField(options);
Expand Down
Loading