diff --git a/docs/content/append-table/blob.md b/docs/content/append-table/blob.md index 73124964f139..d5cfc324f19b 100644 --- a/docs/content/append-table/blob.md +++ b/docs/content/append-table/blob.md @@ -105,7 +105,7 @@ This allows one table to mix raw-data BLOB fields, descriptor-only BLOB fields, No (none) String - Specifies column names that should be stored as blob type. This is used when you want to treat a BYTES column as a BLOB. + Specifies column names that should be stored as blob type. This is used when you want to treat a BYTES column as a BLOB. Fields listed in blob-descriptor-field or blob-view-field are also treated as BLOB fields.
blob-as-descriptor
@@ -120,7 +120,7 @@ This allows one table to mix raw-data BLOB fields, descriptor-only BLOB fields, (none) String - Comma-separated BLOB field names stored as serialized BlobDescriptor bytes inline in normal data files. + Comma-separated field names treated as BLOB fields and stored as serialized BlobDescriptor bytes inline in normal data files. By default, all blob fields store blob bytes in separate .blob files. If configured, one table can mix: some BLOB fields in .blob files and some as descriptor references. @@ -132,9 +132,9 @@ This allows one table to mix raw-data BLOB fields, descriptor-only BLOB fields, (none) String - Comma-separated BLOB field names stored as serialized BlobViewStruct bytes inline in normal data files. + Comma-separated field names treated as BLOB fields and stored as serialized BlobViewStruct bytes inline in normal data files. The field values reference BLOB values in upstream tables and are resolved at read time. - This option must be a subset of blob-field and must not overlap with blob-descriptor-field. + This option must not overlap with blob-descriptor-field. @@ -300,7 +300,7 @@ Blob view is useful when a downstream table should reference BLOB values already Blob view requires: - the upstream table to have row tracking enabled, so each row has a stable `_ROW_ID` -- the downstream field to be listed in both `blob-field` and `blob-view-field` +- the downstream field to be listed in `blob-view-field` - writes to provide a serialized `BlobViewStruct`; in Flink SQL, use the built-in `sys.blob_view` function The Flink SQL function signature is: @@ -335,7 +335,6 @@ CREATE TABLE image_view_table ( ) WITH ( 'row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true', - 'blob-field' = 'image_ref', 'blob-view-field' = 'image_ref' ); diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 5faf8276dae9..1928fde9ddbf 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -66,7 +66,7 @@
blob-descriptor-field
(none) String - Comma-separated BLOB field names, selected from blob-field, to store as serialized BlobDescriptor bytes inline in data files. + Comma-separated field names to treat as BLOB fields and store as serialized BlobDescriptor bytes inline in data files.
blob-external-storage-field
@@ -84,13 +84,13 @@
blob-field
(none) String - Specifies column names that should be stored as blob type. This is used when you want to treat a BYTES column as a BLOB. + Specifies column names that should be stored as blob type. This is used when you want to treat a BYTES column as a BLOB. Fields listed in blob-descriptor-field or blob-view-field are also treated as BLOB fields.
blob-view-field
(none) String - Comma-separated BLOB field names, selected from blob-field, to store as serialized BlobViewStruct bytes inline in data files and resolve from upstream tables at read time. + Comma-separated field names to treat as BLOB fields and store as serialized BlobViewStruct bytes inline in data files and resolve from upstream tables at read time.
blob.split-by-file-size
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index a42011082843..8ab03c868c4b 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -2277,7 +2277,9 @@ public InlineElement getDescription() { .noDefaultValue() .withDescription( "Specifies column names that should be stored as blob type. " - + "This is used when you want to treat a BYTES column as a BLOB."); + + "This is used when you want to treat a BYTES column as a BLOB. " + + "Fields listed in blob-descriptor-field or blob-view-field " + + "are also treated as BLOB fields."); @Immutable public static final ConfigOption BLOB_DESCRIPTOR_FIELD = @@ -2286,7 +2288,7 @@ public InlineElement getDescription() { .noDefaultValue() .withFallbackKeys("blob.stored-descriptor-fields") .withDescription( - "Comma-separated BLOB field names, selected from blob-field, to store " + "Comma-separated field names to treat as BLOB fields and store " + "as serialized BlobDescriptor bytes inline in data files."); @Immutable @@ -2295,7 +2297,7 @@ public InlineElement getDescription() { .stringType() .noDefaultValue() .withDescription( - "Comma-separated BLOB field names, selected from blob-field, to store " + "Comma-separated field names to treat as BLOB fields and store " + "as serialized BlobViewStruct bytes inline in data files and " + "resolve from upstream tables at read time."); diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index d2b929731710..b5285bb60679 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -165,11 +165,10 @@ public static void validateTableSchema(TableSchema schema) { FileFormat fileFormat = FileFormat.fromIdentifier(options.formatType(), new Options(schema.options())); RowType tableRowType = new RowType(schema.fields()); - Set blobFields = validateBlobFields(tableRowType, options); - Set blobDescriptorFields = - validateBlobDescriptorFields(tableRowType, options, blobFields); + validateBlobFields(tableRowType, options); + Set blobDescriptorFields = validateBlobDescriptorFields(tableRowType, options); Set blobViewFields = - validateBlobViewFields(tableRowType, options, blobFields, blobDescriptorFields); + validateBlobViewFields(tableRowType, options, blobDescriptorFields); Set blobInlineFields = new HashSet<>(blobDescriptorFields); blobInlineFields.addAll(blobViewFields); validateBlobExternalStorageFields(tableRowType, options, blobDescriptorFields); @@ -719,7 +718,7 @@ private static void validateRowTracking(TableSchema schema, CoreOptions options) } } - private static Set validateBlobFields(RowType rowType, CoreOptions options) { + private static void validateBlobFields(RowType rowType, CoreOptions options) { Set blobFieldNames = rowType.getFields().stream() .filter(field -> field.type().getTypeRoot() == DataTypeRoot.BLOB) @@ -735,11 +734,9 @@ private static Set validateBlobFields(RowType rowType, CoreOptions optio field, CoreOptions.BLOB_FIELD.key()); } - return configured; } - private static Set validateBlobDescriptorFields( - RowType rowType, CoreOptions options, Set blobFields) { + private static Set validateBlobDescriptorFields(RowType rowType, CoreOptions options) { Set blobFieldNames = rowType.getFields().stream() .filter(field -> field.type().getTypeRoot() == DataTypeRoot.BLOB) @@ -752,21 +749,12 @@ private static Set validateBlobDescriptorFields( "Field '%s' in '%s' must be a BLOB field in table schema.", field, CoreOptions.BLOB_DESCRIPTOR_FIELD.key()); - checkArgument( - blobFields.contains(field), - "Field '%s' in '%s' must also be in '%s'.", - field, - CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), - CoreOptions.BLOB_FIELD.key()); } return configured; } private static Set validateBlobViewFields( - RowType rowType, - CoreOptions options, - Set blobFields, - Set blobDescriptorFields) { + RowType rowType, CoreOptions options, Set blobDescriptorFields) { Set blobFieldNames = rowType.getFields().stream() .filter(field -> field.type().getTypeRoot() == DataTypeRoot.BLOB) @@ -779,12 +767,6 @@ private static Set validateBlobViewFields( "Field '%s' in '%s' must be a BLOB field in table schema.", field, CoreOptions.BLOB_VIEW_FIELD.key()); - checkArgument( - blobFields.contains(field), - "Field '%s' in '%s' must also be in '%s'.", - field, - CoreOptions.BLOB_VIEW_FIELD.key(), - CoreOptions.BLOB_FIELD.key()); checkArgument( !blobDescriptorFields.contains(field), "Field '%s' in '%s' can not also be in '%s'.", diff --git a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java index b86e43be4825..38aabeda6a7b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java @@ -51,6 +51,7 @@ import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.system.RowTrackingTable; import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypeRoot; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Range; @@ -497,49 +498,32 @@ public void testExternalStorageFieldMustBeSubsetOfDescriptorField() { } @Test - public void testBlobViewFieldMustBeSubsetOfBlobField() { - assertThatThrownBy( - () -> { - Schema.Builder schemaBuilder = Schema.newBuilder(); - schemaBuilder.column("f0", DataTypes.INT()); - schemaBuilder.column("f1", DataTypes.STRING()); - schemaBuilder.column("f2", DataTypes.BLOB()); - schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB"); - schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); - schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); - schemaBuilder.option(CoreOptions.BLOB_VIEW_FIELD.key(), "f2"); - catalog.createTable(identifier(), schemaBuilder.build(), true); - }) - .hasRootCauseInstanceOf(IllegalArgumentException.class) - .hasRootCauseMessage( - "Field 'f2' in '" - + CoreOptions.BLOB_VIEW_FIELD.key() - + "' must also be in '" - + CoreOptions.BLOB_FIELD.key() - + "'."); + public void testBlobInlineFieldCanDeclareBlobWithoutBlobField() throws Exception { + assertCreateBlobInlineFieldWithoutBlobField( + "blob_descriptor_without_blob_field", CoreOptions.BLOB_DESCRIPTOR_FIELD.key()); + assertCreateBlobInlineFieldWithoutBlobField( + "blob_view_without_blob_field", CoreOptions.BLOB_VIEW_FIELD.key()); } - @Test - public void testBlobDescriptorFieldMustBeSubsetOfBlobField() { - assertThatThrownBy( - () -> { - Schema.Builder schemaBuilder = Schema.newBuilder(); - schemaBuilder.column("f0", DataTypes.INT()); - schemaBuilder.column("f1", DataTypes.STRING()); - schemaBuilder.column("f2", DataTypes.BLOB()); - schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB"); - schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); - schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); - schemaBuilder.option(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "f2"); - catalog.createTable(identifier(), schemaBuilder.build(), true); - }) - .hasRootCauseInstanceOf(IllegalArgumentException.class) - .hasRootCauseMessage( - "Field 'f2' in '" - + CoreOptions.BLOB_DESCRIPTOR_FIELD.key() - + "' must also be in '" - + CoreOptions.BLOB_FIELD.key() - + "'."); + private void assertCreateBlobInlineFieldWithoutBlobField(String tableName, String optionKey) + throws Exception { + Schema.Builder schemaBuilder = Schema.newBuilder(); + schemaBuilder.column("f0", DataTypes.INT()); + schemaBuilder.column("f1", DataTypes.STRING()); + schemaBuilder.column("f2", DataTypes.BLOB()); + schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB"); + schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + schemaBuilder.option(optionKey, "f2"); + + catalog.createTable(identifier(tableName), schemaBuilder.build(), true); + + assertThat( + catalog.getTable(identifier(tableName)) + .rowType() + .getTypeAt(2) + .is(DataTypeRoot.BLOB)) + .isTrue(); } @Test diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index df225feba6e9..cae85c238621 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -129,6 +129,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -1091,11 +1092,7 @@ public static Schema fromCatalogTable(CatalogBaseTable catalogTable) { Map options = new HashMap<>(catalogTable.getOptions()); List blobFields = CoreOptions.blobField(options); - Set blobDescriptorFields = new CoreOptions(options).blobDescriptorField(); - List blobViewFields = CoreOptions.blobViewField(options); - validateSecondaryBlobFields( - blobFields, blobDescriptorFields, CoreOptions.BLOB_DESCRIPTOR_FIELD.key()); - validateSecondaryBlobFields(blobFields, blobViewFields, CoreOptions.BLOB_VIEW_FIELD.key()); + Set blobTypeFields = blobTypeFields(options); if (!blobFields.isEmpty()) { checkArgument( options.containsKey(CoreOptions.DATA_EVOLUTION_ENABLED.key()), @@ -1124,30 +1121,29 @@ public static Schema fromCatalogTable(CatalogBaseTable catalogTable) { field -> schemaBuilder.column( field.getName(), - resolveDataType(field.getName(), field.getType(), options), + resolveDataType( + field.getName(), + field.getType(), + options, + blobTypeFields), columnComments.get(field.getName()))); return schemaBuilder.build(); } - private static void validateSecondaryBlobFields( - List blobFields, Iterable secondaryBlobFields, String optionKey) { - for (String secondaryBlobField : secondaryBlobFields) { - checkArgument( - blobFields.contains(secondaryBlobField), - "Field '%s' in '%s' must also be in '%s'.", - secondaryBlobField, - optionKey, - CoreOptions.BLOB_FIELD.key()); - } + private static Set blobTypeFields(Map options) { + Set blobTypeFields = new HashSet<>(CoreOptions.blobField(options)); + blobTypeFields.addAll(new CoreOptions(options).blobDescriptorField()); + blobTypeFields.addAll(CoreOptions.blobViewField(options)); + return blobTypeFields; } private static org.apache.paimon.types.DataType resolveDataType( String fieldName, org.apache.flink.table.types.logical.LogicalType logicalType, - Map options) { - List blobFields = CoreOptions.blobField(options); - if (blobFields.contains(fieldName)) { + Map options, + Set blobTypeFields) { + if (blobTypeFields.contains(fieldName)) { return toBlobType(logicalType); } Set vectorFields = CoreOptions.vectorField(options); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java index 01e91542c61e..1b56bdd74d79 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java @@ -27,6 +27,7 @@ import org.apache.paimon.options.Options; import org.apache.paimon.rest.TestHttpWebServer; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.types.DataTypeRoot; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.UriReader; import org.apache.paimon.utils.UriReaderFactory; @@ -191,7 +192,6 @@ public void testWriteBlobViewWithBuiltInFunction() throws Exception { "CREATE TABLE downstream_blob_view (id INT, label STRING, image_ref BYTES)" + " WITH ('row-tracking.enabled'='true'," + " 'data-evolution.enabled'='true'," - + " 'blob-field'='image_ref'," + " 'blob-view-field'='image_ref')"); batchSql( @@ -251,26 +251,24 @@ public void testBlobViewRejectsNonBlobField() { } @Test - public void testBlobInlineFieldRequiresBlobField() { - assertSecondaryBlobFieldRequiresBlobField( + public void testBlobInlineFieldCanDeclareBlobWithoutBlobField() throws Exception { + assertSecondaryBlobFieldCanDeclareBlobWithoutBlobField( "blob_descriptor_without_blob_field", "blob-descriptor-field"); - assertSecondaryBlobFieldRequiresBlobField( + assertSecondaryBlobFieldCanDeclareBlobWithoutBlobField( "blob_view_without_blob_field", "blob-view-field"); } - private void assertSecondaryBlobFieldRequiresBlobField(String tableName, String optionKey) { - assertThatThrownBy( - () -> - tEnv.executeSql( - String.format( - "CREATE TABLE %s (id INT, picture BYTES)" - + " WITH ('row-tracking.enabled'='true'," - + " 'data-evolution.enabled'='true'," - + " '%s'='picture')", - tableName, optionKey))) - .hasRootCauseInstanceOf(IllegalArgumentException.class) - .hasRootCauseMessage( - "Field 'picture' in '" + optionKey + "' must also be in 'blob-field'."); + private void assertSecondaryBlobFieldCanDeclareBlobWithoutBlobField( + String tableName, String optionKey) throws Exception { + tEnv.executeSql( + String.format( + "CREATE TABLE %s (id INT, picture BYTES)" + + " WITH ('row-tracking.enabled'='true'," + + " 'data-evolution.enabled'='true'," + + " '%s'='picture')", + tableName, optionKey)); + + assertThat(paimonTable(tableName).rowType().getTypeAt(1).is(DataTypeRoot.BLOB)).isTrue(); } @Test diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index 3ab8406931e1..c68e7768abe6 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -83,6 +83,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import static org.apache.paimon.CoreOptions.FILE_FORMAT; @@ -457,6 +458,7 @@ private Schema toInitialSchema( StructType schema, Transform[] partitions, Map properties) { Map normalizedProperties = new HashMap<>(properties); List blobFields = CoreOptions.blobField(properties); + Set blobDescriptorFields = new CoreOptions(properties).blobDescriptorField(); List blobViewFields = CoreOptions.blobViewField(properties); String provider = properties.get(TableCatalog.PROP_PROVIDER); if (!usePaimon(provider)) { @@ -491,7 +493,9 @@ private Schema toInitialSchema( for (StructField field : schema.fields()) { String name = field.name(); DataType type; - if (blobFields.contains(name) || blobViewFields.contains(name)) { + if (blobFields.contains(name) + || blobDescriptorFields.contains(name) + || blobViewFields.contains(name)) { checkArgument( field.dataType() instanceof org.apache.spark.sql.types.BinaryType, "The type of blob field must be binary"); diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala index 2ff6eb308d77..38d9793d2d0b 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala @@ -399,7 +399,7 @@ class BlobTestBase extends PaimonSparkTestBase { sql( "CREATE TABLE t (id INT, name STRING, picture BINARY) TBLPROPERTIES " + "('row-tracking.enabled'='true', 'data-evolution.enabled'='true', " + - "'blob-field'='picture', 'blob-descriptor-field'='picture')") + "'blob-descriptor-field'='picture')") // Insert with a descriptor pointing to a real file val blobData = new Array[Byte](256)