While working with Kafka Connect and GSR, "AWSKafkaAvroConverter" throws below exception in the absence of config values"key.converter.avroRecordType" and "value.converter.avroRecordType". Below is the detailed stack trace of error message and sample connector configuration.
[2022-01-07 20:59:09,204] ERROR WorkerSinkTask{id=s3-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:485)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:465)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.toConnectData(AWSKafkaAvroConverter.java:119)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:485)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
Caused by: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Exception occurred while de-serializing Avro message
at com.amazonaws.services.schemaregistry.deserializers.avro.AvroDeserializer.deserialize(AvroDeserializer.java:101)
at com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializationFacade.deserialize(GlueSchemaRegistryDeserializationFacade.java:162)
at com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer.deserializeByHeaderVersionByte(AWSKafkaAvroDeserializer.java:149)
at com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer.deserialize(AWSKafkaAvroDeserializer.java:114)
at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.toConnectData(AWSKafkaAvroConverter.java:117)
... 17 more
Caused by: com.google.common.util.concurrent.UncheckedExecutionException: java.lang.NullPointerException
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051)
at com.google.common.cache.LocalCache.get(LocalCache.java:3951)
at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935)
at com.amazonaws.services.schemaregistry.deserializers.avro.AvroDeserializer.deserialize(AvroDeserializer.java:91)
... 21 more
Caused by: java.lang.NullPointerException
at com.amazonaws.services.schemaregistry.deserializers.avro.DatumReaderInstance.from(DatumReaderInstance.java:37)
at com.amazonaws.services.schemaregistry.deserializers.avro.AvroDeserializer$DatumReaderCache.load(AvroDeserializer.java:112)
at com.amazonaws.services.schemaregistry.deserializers.avro.AvroDeserializer$DatumReaderCache.load(AvroDeserializer.java:109)
at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
name=s3-sink
connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=1
topics=db1.sampleavro.movies
s3.region=us-east-1
s3.bucket.name=<BUCKET NAME>
storage.class=io.confluent.connect.s3.storage.S3Storage
format.class=io.confluent.connect.s3.format.parquet.ParquetFormat
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
schema.compatibility=NONE
flush.size=10
store.kafka.keys=false
store.kafka.headers=false
key.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
key.converter.compressionType=NONE
value.converter.compressionType=NONE
key.converter.endpoint=https://glue.us-east-1.amazonaws.com
value.converter.endpoint=https://glue.us-east-1.amazonaws.com
key.converter.region=us-east-1
value.converter.region=us-east-1
key.converter.timeToLiveMillis=3600000
value.converter.timeToLiveMillis=3600000
key.converter.cacheSize=100
value.converter.cacheSize=100
key.converter.registry.name=msk-cdc-avro-keys
value.converter.registry.name=msk-cdc-avro-values
key.converter.compatibility=NONE
value.converter.compatibility=NONE
key.converter.description=none
value.converter.description=none
key.converter.schemaAutoRegistrationEnabled=true
value.converter.schemaAutoRegistrationEnabled=true
While working with Kafka Connect and GSR, "AWSKafkaAvroConverter" throws below exception in the absence of config values"key.converter.avroRecordType" and "value.converter.avroRecordType". Below is the detailed stack trace of error message and sample connector configuration.