Suppose we have the following Avro schema:
{
"name":"data",
"type":{
"type":"record",
"name":"Product",
"fields":[
{
"name":"name",
"type":{
"type":"record",
"name":"LanguageString",
"fields":[
{
"name":"de",
"type":{
"type":"string",
"avro.java.string":"String"
}
},
{
"name":"en",
"type":[
"null",
{
"type":"string",
"avro.java.string":"String"
}
]
}
]
},
"default":{
"de":"",
"en":null
}
}
]
}
}
Note the default value that is specified as an object.
Using AWSKafkaAvroConverter inside a Kafka Connect connector fails:
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:516)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base\/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base\/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base\/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base\/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base\/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: Invalid class for string type, expecting String or CharSequence but found class org.apache.avro.JsonProperties$Null
at com.amazonaws.services.schemaregistry.kafkaconnect.avrodata.AvroData.toConnectData(AvroData.java:1381)
at com.amazonaws.services.schemaregistry.kafkaconnect.avrodata.AvroData.toConnectData(AvroData.java:1222)
at com.amazonaws.services.schemaregistry.kafkaconnect.avrodata.AvroData.toConnectData(AvroData.java:1470)
at com.amazonaws.services.schemaregistry.kafkaconnect.avrodata.AvroData.defaultValueFromAvroWithoutLogical(AvroData.java:1898)
at com.amazonaws.services.schemaregistry.kafkaconnect.avrodata.AvroData.defaultValueFromAvro(AvroData.java:1881)
at com.amazonaws.services.schemaregistry.kafkaconnect.avrodata.AvroData.toConnectSchema(AvroData.java:1814)
at com.amazonaws.services.schemaregistry.kafkaconnect.avrodata.AvroData.toConnectSchema(AvroData.java:1558)
at com.amazonaws.services.schemaregistry.kafkaconnect.avrodata.AvroData.toConnectSchema(AvroData.java:1683)
at com.amazonaws.services.schemaregistry.kafkaconnect.avrodata.AvroData.toConnectSchema(AvroData.java:1558)
at com.amazonaws.services.schemaregistry.kafkaconnect.avrodata.AvroData.toConnectSchema(AvroData.java:1683)
at com.amazonaws.services.schemaregistry.kafkaconnect.avrodata.AvroData.toConnectSchema(AvroData.java:1534)
at com.amazonaws.services.schemaregistry.kafkaconnect.avrodata.AvroData.toConnectData(AvroData.java:1217)
at com.amazonaws.services.schemaregistry.kafkaconnect.avrodata.AvroData.toConnectData(AvroData.java:1198)
at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.toConnectData(AWSKafkaAvroConverter.java:124)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:516)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
... 13 more
After debugging, it seems a check for JsonProperties.NULL_VALUE is missing in AvroData which has since been added in the Confluent schema registry repo (relevant PR: confluentinc/schema-registry#1928)
When adding this check, the error is fixed. I can provide a PR.
Suppose we have the following Avro schema:
Note the
defaultvalue that is specified as an object.Using
AWSKafkaAvroConverterinside a Kafka Connect connector fails:After debugging, it seems a check for
JsonProperties.NULL_VALUEis missing inAvroDatawhich has since been added in the Confluent schema registry repo (relevant PR: confluentinc/schema-registry#1928)When adding this check, the error is fixed. I can provide a PR.