Skip to content

Commit b280404

Browse files
authored
Removes Lz4 Shim and Updates Dependency Version (#499)
* Removes Lz4 Shim and Updates Dependency Version Using a shim does works locally, but does not allow us to publish. Instead, we are manually excluding the problematic dependency and taking an explicit dependency on the fixed version. Also adds an explicit test for testing LZ4 compression.
1 parent 8f5e669 commit b280404

8 files changed

Lines changed: 94 additions & 68 deletions

File tree

avro-flink-serde/pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,10 @@
153153
<groupId>org.apache.commons</groupId>
154154
<artifactId>commons-compress</artifactId>
155155
</exclusion>
156+
<exclusion>
157+
<groupId>org.lz4</groupId>
158+
<artifactId>lz4-java</artifactId>
159+
</exclusion>
156160
</exclusions>
157161
</dependency>
158162

@@ -166,6 +170,10 @@
166170
<groupId>org.apache.commons</groupId>
167171
<artifactId>commons-compress</artifactId>
168172
</exclusion>
173+
<exclusion>
174+
<groupId>org.lz4</groupId>
175+
<artifactId>lz4-java</artifactId>
176+
</exclusion>
169177
</exclusions>
170178
</dependency>
171179

integration-tests/run-local-tests.sh

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,11 @@ cleanUpConnectFiles() {
115115

116116
cleanUpDockerResources || true
117117
# Start Kafka using docker command asynchronously
118-
docker compose up --no-attach localstack &
118+
docker-compose up --no-attach localstack &
119119
sleep 10
120120
## Run mvn tests for Kafka and Kinesis Platforms
121+
## This includes testProduceConsumeWithKafkaLZ4Compression which verifies
122+
## that at.yawk.lz4:lz4-java works correctly with Kafka's native LZ4 compression
121123
cd .. && mvn --file integration-tests/pom.xml verify -Psurefire -X && cd integration-tests
122124
cleanUpDockerResources
123125

@@ -131,7 +133,7 @@ downloadMongoDBConnector
131133
copyGSRConverters
132134

133135
runConnectTests() {
134-
docker compose up --no-attach localstack &
136+
docker-compose up --no-attach localstack &
135137
setUpMongoDBLocal
136138
startKafkaConnectTasks ${1}
137139
echo "Waiting for Sink task to pick up data.."

integration-tests/src/test/java/com/amazonaws/services/schemaregistry/integrationtests/kafka/GlueSchemaRegistryKafkaIntegrationTest.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -506,4 +506,53 @@ private String getRecordType(DataFormat dataFormat, AvroRecordType avroRecordTyp
506506

507507
return avroRecordType.getName();
508508
}
509+
510+
/**
511+
* Test to verify that Kafka's native LZ4 compression works correctly with at.yawk.lz4:lz4-java dependency.
512+
* This test ensures that the replacement of org.lz4:lz4-java with at.yawk.lz4:lz4-java is functioning properly.
513+
*/
514+
@Test
515+
public void testProduceConsumeWithKafkaLZ4Compression() throws Exception {
516+
log.info("Starting test for Kafka native LZ4 compression...");
517+
518+
DataFormat dataFormat = DataFormat.AVRO;
519+
AWSSchemaRegistryConstants.COMPRESSION compression = AWSSchemaRegistryConstants.COMPRESSION.NONE;
520+
AvroRecordType recordType = AvroRecordType.GENERIC_RECORD;
521+
Compatibility compatibility = Compatibility.NONE;
522+
523+
final Pair<String, KafkaHelper> kafkaHelperPair = createAndGetKafkaHelper(TOPIC_NAME_PREFIX);
524+
final String topic = kafkaHelperPair.getLeft();
525+
final KafkaHelper kafkaHelper = kafkaHelperPair.getRight();
526+
527+
final TestDataGenerator testDataGenerator = testDataGeneratorFactory.getInstance(
528+
TestDataGeneratorType.valueOf(dataFormat, recordType, compatibility));
529+
final List<?> records = testDataGenerator.createRecords();
530+
531+
String schemaName = String.format("%s-%s-%s-LZ4", topic, dataFormat.name(), compatibility);
532+
schemasToCleanUp.add(schemaName);
533+
534+
ProducerProperties producerProperties = ProducerProperties.builder()
535+
.topicName(topic)
536+
.schemaName(schemaName)
537+
.dataFormat(dataFormat.name())
538+
.compatibilityType(compatibility.name())
539+
.compressionType(compression.name())
540+
.autoRegistrationEnabled("true")
541+
// Enable Kafka's native LZ4 compression
542+
.kafkaCompressionType("lz4")
543+
.build();
544+
545+
List<ProducerRecord<String, Object>> producerRecords =
546+
kafkaHelper.doProduceRecords(producerProperties, records);
547+
548+
ConsumerProperties.ConsumerPropertiesBuilder consumerPropertiesBuilder =
549+
ConsumerProperties.builder().topicName(topic);
550+
consumerPropertiesBuilder.avroRecordType(recordType.getName());
551+
552+
List<ConsumerRecord<String, Object>> consumerRecords =
553+
kafkaHelper.doConsumeRecords(consumerPropertiesBuilder.build());
554+
555+
assertRecordsEquality(producerRecords, consumerRecords);
556+
log.info("Successfully completed test for Kafka native LZ4 compression with {} records", consumerRecords.size());
557+
}
509558
}

integration-tests/src/test/java/com/amazonaws/services/schemaregistry/integrationtests/kafka/KafkaHelper.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,14 @@ public <T> List<ProducerRecord<String, T>> doProduceRecordsMultithreaded(final P
319319

320320
private Properties getProducerProperties(final ProducerProperties producerProperties) {
321321
Properties properties = getKafkaProducerProperties();
322+
323+
// Add Kafka's native compression if specified
324+
if (producerProperties.getKafkaCompressionType() != null &&
325+
!producerProperties.getKafkaCompressionType().isEmpty()) {
326+
properties.put("compression.type", producerProperties.getKafkaCompressionType());
327+
log.info("Setting Kafka compression.type to: {}", producerProperties.getKafkaCompressionType());
328+
}
329+
322330
setSchemaRegistrySerializerProperties(properties, producerProperties);
323331
return properties;
324332
}

integration-tests/src/test/java/com/amazonaws/services/schemaregistry/integrationtests/kafka/ProducerProperties.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,5 +31,6 @@ public class ProducerProperties implements GlueSchemaRegistryConnectionPropertie
3131
private String inputTopic;
3232
private String outputTopic;
3333
private String recordType; // required only for AVRO or Protobuf case
34+
private String kafkaCompressionType; // Kafka's native compression type (e.g., "lz4", "snappy", "gzip", "zstd")
3435
}
3536

lz4-shim/pom.xml

Lines changed: 0 additions & 61 deletions
This file was deleted.

pom.xml

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@
6363
</distributionManagement>
6464

6565
<modules>
66-
<module>lz4-shim</module>
6766
<module>build-tools</module>
6867
<module>common</module>
6968
<module>serializer-deserializer</module>
@@ -147,16 +146,34 @@
147146
<groupId>org.apache.kafka</groupId>
148147
<artifactId>kafka_${kafka.scala.version}</artifactId>
149148
<version>${kafka.version}</version>
149+
<exclusions>
150+
<exclusion>
151+
<groupId>org.lz4</groupId>
152+
<artifactId>lz4-java</artifactId>
153+
</exclusion>
154+
</exclusions>
150155
</dependency>
151156
<dependency>
152157
<groupId>org.apache.kafka</groupId>
153158
<artifactId>kafka-clients</artifactId>
154159
<version>${kafka.version}</version>
160+
<exclusions>
161+
<exclusion>
162+
<groupId>org.lz4</groupId>
163+
<artifactId>lz4-java</artifactId>
164+
</exclusion>
165+
</exclusions>
155166
</dependency>
156167
<dependency>
157168
<groupId>org.apache.kafka</groupId>
158169
<artifactId>kafka-streams</artifactId>
159170
<version>${kafka.version}</version>
171+
<exclusions>
172+
<exclusion>
173+
<groupId>org.lz4</groupId>
174+
<artifactId>lz4-java</artifactId>
175+
</exclusion>
176+
</exclusions>
160177
</dependency>
161178
<dependency>
162179
<groupId>org.apache.kafka</groupId>
@@ -373,9 +390,9 @@
373390
<artifactId>json</artifactId>
374391
<version>20231013</version>
375392
</dependency>
376-
<!-- Override vulnerable lz4-java with secure shim -->
393+
<!-- Use secure at.yawk.lz4:lz4-java instead of vulnerable org.lz4:lz4-java -->
377394
<dependency>
378-
<groupId>org.lz4</groupId>
395+
<groupId>at.yawk.lz4</groupId>
379396
<artifactId>lz4-java</artifactId>
380397
<version>${lz4.java.version}</version>
381398
</dependency>
@@ -513,8 +530,6 @@
513530
<exclude>org.apache.avro</exclude>
514531
<exclude>metadata</exclude>
515532
<exclude>additionalTypes</exclude>
516-
<!-- Exclude lz4-shim module from coverage -->
517-
<exclude>org.lz4</exclude>
518533
<exclude>default</exclude>
519534
</excludes>
520535
</rule>

serializer-deserializer/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@
8383
<groupId>org.apache.kafka</groupId>
8484
<artifactId>kafka-clients</artifactId>
8585
</dependency>
86+
<dependency>
87+
<groupId>at.yawk.lz4</groupId>
88+
<artifactId>lz4-java</artifactId>
89+
</dependency>
8690
<dependency>
8791
<groupId>org.projectlombok</groupId>
8892
<artifactId>lombok</artifactId>

0 commit comments

Comments
 (0)