Skip to content

Commit 4b26253

Browse files
committed
Migrate examples module from AWS SDK v1 Kinesis to v2
Replace com.amazonaws:aws-java-sdk-kinesis with software.amazon.awssdk:kinesis and migrate PutRecordGetRecordExample.java from AmazonKinesis (v1) to KinesisClient (v2) using the builder pattern.
1 parent b280404 commit 4b26253

2 files changed

Lines changed: 51 additions & 45 deletions

File tree

examples/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,8 @@
6161

6262
<dependencies>
6363
<dependency>
64-
<groupId>com.amazonaws</groupId>
65-
<artifactId>aws-java-sdk-kinesis</artifactId>
64+
<groupId>software.amazon.awssdk</groupId>
65+
<artifactId>kinesis</artifactId>
6666
</dependency>
6767
<dependency>
6868
<groupId>com.fasterxml.jackson.dataformat</groupId>

examples/src/main/java/com/amazonaws/services/schemaregistry/examples/kds/PutRecordGetRecordExample.java

Lines changed: 49 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,6 @@
1414
*/
1515
package com.amazonaws.services.schemaregistry.examples.kds;
1616

17-
import com.amazonaws.services.kinesis.AmazonKinesis;
18-
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
19-
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
20-
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
21-
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
22-
import com.amazonaws.services.kinesis.model.GetRecordsResult;
23-
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
24-
import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
25-
import com.amazonaws.services.kinesis.model.PutRecordsRequest;
26-
import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
27-
import com.amazonaws.services.kinesis.model.PutRecordsResult;
28-
import com.amazonaws.services.kinesis.model.Record;
29-
import com.amazonaws.services.kinesis.model.Shard;
3017
import com.amazonaws.services.schemaregistry.common.Schema;
3118
import com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration;
3219
import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializer;
@@ -49,7 +36,22 @@
4936
import org.joda.time.DateTime;
5037
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
5138
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
39+
import software.amazon.awssdk.core.SdkBytes;
40+
import software.amazon.awssdk.regions.Region;
5241
import software.amazon.awssdk.services.glue.model.DataFormat;
42+
import software.amazon.awssdk.services.kinesis.KinesisClient;
43+
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
44+
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
45+
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
46+
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
47+
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
48+
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
49+
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
50+
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
51+
import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
52+
import software.amazon.awssdk.services.kinesis.model.Record;
53+
import software.amazon.awssdk.services.kinesis.model.Shard;
54+
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
5355

5456
import java.io.ByteArrayOutputStream;
5557
import java.io.File;
@@ -70,7 +72,7 @@
7072
*/
7173
public class PutRecordGetRecordExample {
7274
private static final String AVRO_USER_SCHEMA_FILE = "src/main/resources/user.avsc";
73-
private static AmazonKinesis kinesisClient;
75+
private static KinesisClient kinesisClient;
7476
private static final Logger LOGGER = Logger.getLogger(PutRecordGetRecordExample.class.getSimpleName());
7577
private static AwsCredentialsProvider awsCredentialsProvider =
7678
DefaultCredentialsProvider
@@ -97,7 +99,7 @@ public static void main(final String[] args) throws Exception {
9799
int numOfRecords = Integer.parseInt(cmd.getOptionValue("numRecords", "10"));
98100

99101
//Kinesis data streams client initialization.
100-
kinesisClient = AmazonKinesisClientBuilder.standard().withRegion(regionName).build();
102+
kinesisClient = KinesisClient.builder().region(Region.of(regionName)).build();
101103

102104
//Glue Schema Registry serializer initialization for the producer.
103105
glueSchemaRegistrySerializer =
@@ -129,65 +131,69 @@ public static void main(final String[] args) throws Exception {
129131
private static void getRecordsWithSchema(String streamName, Date timestamp) throws IOException {
130132
//Standard Kinesis code to getRecords from a Kinesis Data Stream.
131133
String shardIterator;
132-
DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
133-
describeStreamRequest.setStreamName(streamName);
134+
DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder()
135+
.streamName(streamName)
136+
.build();
134137
List<Shard> shards = new ArrayList<>();
135138

136-
DescribeStreamResult streamRes;
139+
DescribeStreamResponse streamRes;
137140
do {
138141
streamRes = kinesisClient.describeStream(describeStreamRequest);
139-
shards.addAll(streamRes.getStreamDescription().getShards());
142+
shards.addAll(streamRes.streamDescription().shards());
140143

141144
if (shards.size() > 0) {
142-
shards.get(shards.size() - 1).getShardId();
145+
shards.get(shards.size() - 1).shardId();
143146
}
144-
} while (streamRes.getStreamDescription().getHasMoreShards());
147+
} while (streamRes.streamDescription().hasMoreShards());
145148

146-
GetShardIteratorRequest itReq = new GetShardIteratorRequest();
147-
itReq.setStreamName(streamName);
148-
itReq.setShardId(shards.get(0).getShardId());
149-
itReq.setTimestamp(timestamp);
150-
itReq.setShardIteratorType("AT_TIMESTAMP");
149+
GetShardIteratorRequest itReq = GetShardIteratorRequest.builder()
150+
.streamName(streamName)
151+
.shardId(shards.get(0).shardId())
152+
.timestamp(timestamp.toInstant())
153+
.shardIteratorType(ShardIteratorType.AT_TIMESTAMP)
154+
.build();
151155

152-
GetShardIteratorResult shardIteratorResult = kinesisClient.getShardIterator(itReq);
153-
shardIterator = shardIteratorResult.getShardIterator();
156+
GetShardIteratorResponse shardIteratorResult = kinesisClient.getShardIterator(itReq);
157+
shardIterator = shardIteratorResult.shardIterator();
154158

155159
// Create new GetRecordsRequest with existing shardIterator.
156-
GetRecordsRequest recordsRequest = new GetRecordsRequest();
157-
recordsRequest.setShardIterator(shardIterator);
158-
recordsRequest.setLimit(1000);
160+
GetRecordsRequest recordsRequest = GetRecordsRequest.builder()
161+
.shardIterator(shardIterator)
162+
.limit(1000)
163+
.build();
159164

160-
GetRecordsResult result = kinesisClient.getRecords(recordsRequest);
165+
GetRecordsResponse result = kinesisClient.getRecords(recordsRequest);
161166

162-
for (Record record : result.getRecords()) {
163-
ByteBuffer recordAsByteBuffer = record.getData();
167+
for (Record record : result.records()) {
168+
ByteBuffer recordAsByteBuffer = record.data().asByteBuffer();
164169
GenericRecord decodedRecord = decodeRecord(recordAsByteBuffer);
165170
LOGGER.info("Decoded Record: " + decodedRecord);
166171
}
167172
}
168173

169174
private static void putRecordsWithSchema(String streamName, int numOfRecords, Schema gsrSchema, Date timestamp) {
170175
//Standard Kinesis code to putRecords into a Kinesis Data Stream.
171-
PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
172-
putRecordsRequest.setStreamName(streamName);
173-
174176
List<PutRecordsRequestEntry> recordsRequestEntries = new ArrayList<>();
175177

176178
LOGGER.info("Putting " + numOfRecords + " into " + streamName + " with schema" + gsrSchema);
177179
for (int i = 0; i < numOfRecords; i++) {
178180
GenericRecord record = (GenericRecord) getTestRecord(i);
179181
byte[] recordWithSchema = encodeRecord(record, streamName, gsrSchema);
180-
PutRecordsRequestEntry entry = new PutRecordsRequestEntry();
181-
entry.setData(ByteBuffer.wrap(recordWithSchema));
182-
entry.setPartitionKey(String.valueOf(timestamp.toInstant()
183-
.toEpochMilli()));
182+
PutRecordsRequestEntry entry = PutRecordsRequestEntry.builder()
183+
.data(SdkBytes.fromByteArray(recordWithSchema))
184+
.partitionKey(String.valueOf(timestamp.toInstant()
185+
.toEpochMilli()))
186+
.build();
184187

185188
recordsRequestEntries.add(entry);
186189
}
187190

188-
putRecordsRequest.setRecords(recordsRequestEntries);
191+
PutRecordsRequest putRecordsRequest = PutRecordsRequest.builder()
192+
.streamName(streamName)
193+
.records(recordsRequestEntries)
194+
.build();
189195

190-
PutRecordsResult putRecordResult = kinesisClient.putRecords(putRecordsRequest);
196+
PutRecordsResponse putRecordResult = kinesisClient.putRecords(putRecordsRequest);
191197

192198
LOGGER.info("Successfully put records: " + putRecordResult);
193199
}

0 commit comments

Comments
 (0)