When writing a sink-connector with confluentinc-kafka-connect-s3-10.5.17 I get the following stack trace when I call toConnectData() with an avro schema that contains a logical decimal field corresponding to a java BigDecimal field
[Worker-055a71def9708e242] [2024-11-29 16:01:15,823] ERROR [events-archive-s3-sink-connector-1-0-0|task-1] WorkerSinkTask{id=events-archive-s3-sink-connector-1-0-0-1} Error converting message value in topic 'staging.pixel.event.online' partition 6 at offset 158975 and timestamp 1732896075632: Invalid class for bytes type, expecting byte[] or ByteBuffer but found class java.math.BigDecimal (org.apache.kafka.connect.runtime.WorkerSinkTask:548)
2024-11-29T16:01:15.000Z
[Worker-055a71def9708e242] org.apache.kafka.connect.errors.DataException: Invalid class for bytes type, expecting byte[] or ByteBuffer but found class java.math.BigDecimal
2024-11-29T16:01:15.000Z
[Worker-055a71def9708e242] at com.amazonaws.services.schemaregistry.kafkaconnect.avrodata.AvroData.toConnectData(AvroData.java:1394)
2024-11-29T16:01:15.000Z
I added a test for BigDecimal that reproduces the problem and made a fix which you can review. Let me know if I should submit a PR.
code that fixes the problem (AvroData.java line 1385), I added the "} else if (value instanceof BigDecimal) {"
case BYTES:
if (value instanceof byte[]) {
converted = ByteBuffer.wrap((byte[]) value);
} else if (value instanceof ByteBuffer) {
converted = value;
} else if (value instanceof GenericFixed) {
converted = ByteBuffer.wrap(((GenericFixed) value).bytes());
} else if (value instanceof BigDecimal) {
// Avro's ByteBuffer encoding of decimal is a byte array of the unscaled value
// of the BigDecimal, two's-complement big-endian, in an Avro fixed of the
// appropriate size.
BigDecimal decimal = (BigDecimal) value;
int scale = schema.parameters() != null
? Integer.parseInt(schema.parameters().get(Decimal.SCALE_FIELD))
: 0;
int precision = schema.parameters() != null
? Integer.parseInt(schema.parameters().get(CONNECT_AVRO_DECIMAL_PRECISION_PROP))
: CONNECT_AVRO_DECIMAL_PRECISION_DEFAULT;
byte[] unscaledValue = decimal.unscaledValue().toByteArray();
byte[] fixedValue = new byte[precision];
int offset = fixedValue.length - unscaledValue.length;
if (decimal.signum() < 0) {
// If negative, fill the unscaled value with 0xFF
Arrays.fill(fixedValue, 0, offset, (byte) 0xFF);
}
System.arraycopy(unscaledValue, 0, fixedValue, offset, unscaledValue.length);
converted = ByteBuffer.wrap(fixedValue);
} else {
throw new DataException("Invalid class for bytes type, expecting byte[] or ByteBuffer "
+ "but found " + value.getClass());
}
break;
A Test which fails with the master version of AvroData.java, passes with the above code added
@Test
public void testToConnectBigDecimalAvro() {
org.apache.avro.Schema avroSchema = org.apache.avro.SchemaBuilder.builder().bytesType();
avroSchema.addProp(AvroData.AVRO_LOGICAL_TYPE_PROP, AvroData.AVRO_LOGICAL_DECIMAL);
avroSchema.addProp("precision", 50);
avroSchema.addProp("scale", 2);
final SchemaAndValue expected = new SchemaAndValue(
Decimal.builder(2).parameter(AvroData.CONNECT_AVRO_DECIMAL_PRECISION_PROP, "50").build(),
TEST_DECIMAL
);
final SchemaAndValue actual = avroData.toConnectData(avroSchema, TEST_DECIMAL);
assertThat("schema.parameters() does not match.",
actual.schema().parameters(),
IsEqual.equalTo(expected.schema().parameters())
);
assertEquals("schema does not match.", expected.schema(), actual.schema());
assertEquals("value does not match.", expected.value(), actual.value());
}
I am using
<dependency>
<groupId>software.amazon.glue</groupId>
<artifactId>schema-registry-kafkaconnect-converter</artifactId>
<version>1.1.22</version>
</dependency>
<dependency>
<groupId>software.amazon.glue</groupId>
<artifactId>schema-registry-parent</artifactId>
<version>1.1.22</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>software.amazon.glue</groupId>
<artifactId>schema-registry-serde</artifactId>
<version>1.1.22</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.12.0</version>
</dependency>
Thank you,
Ruairi
When writing a sink-connector with confluentinc-kafka-connect-s3-10.5.17 I get the following stack trace when I call toConnectData() with an avro schema that contains a logical decimal field corresponding to a java BigDecimal field
I added a test for BigDecimal that reproduces the problem and made a fix which you can review. Let me know if I should submit a PR.
code that fixes the problem (AvroData.java line 1385), I added the "} else if (value instanceof BigDecimal) {"
A Test which fails with the master version of AvroData.java, passes with the above code added
I am using
Thank you,
Ruairi