Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@

<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId>
<groupId>software.amazon.awssdk</groupId>
<artifactId>kinesis</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,6 @@
*/
package com.amazonaws.services.schemaregistry.examples.kds;

import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
import com.amazonaws.services.kinesis.model.PutRecordsRequest;
import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
import com.amazonaws.services.kinesis.model.PutRecordsResult;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.schemaregistry.common.Schema;
import com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration;
import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializer;
Expand All @@ -49,7 +36,22 @@
import org.joda.time.DateTime;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.glue.model.DataFormat;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;

import java.io.ByteArrayOutputStream;
import java.io.File;
Expand All @@ -70,7 +72,7 @@
*/
public class PutRecordGetRecordExample {
private static final String AVRO_USER_SCHEMA_FILE = "src/main/resources/user.avsc";
private static AmazonKinesis kinesisClient;
private static KinesisClient kinesisClient;
private static final Logger LOGGER = Logger.getLogger(PutRecordGetRecordExample.class.getSimpleName());
private static AwsCredentialsProvider awsCredentialsProvider =
DefaultCredentialsProvider
Expand All @@ -97,97 +99,106 @@ public static void main(final String[] args) throws Exception {
int numOfRecords = Integer.parseInt(cmd.getOptionValue("numRecords", "10"));

//Kinesis data streams client initialization.
kinesisClient = AmazonKinesisClientBuilder.standard().withRegion(regionName).build();
try (KinesisClient client = KinesisClient.builder().region(Region.of(regionName)).build()) {
kinesisClient = client;

Comment thread
jvdadda marked this conversation as resolved.
//Glue Schema Registry serializer initialization for the producer.
glueSchemaRegistrySerializer =
new GlueSchemaRegistrySerializerImpl(
awsCredentialsProvider,
getSchemaRegistryConfiguration(regionName)
);
//Glue Schema Registry serializer initialization for the producer.
glueSchemaRegistrySerializer =
new GlueSchemaRegistrySerializerImpl(
awsCredentialsProvider,
getSchemaRegistryConfiguration(regionName)
);

//Glue Schema Registry de-serializer initialization for the consumer.
glueSchemaRegistryDeserializer =
new GlueSchemaRegistryDeserializerImpl(awsCredentialsProvider, getSchemaRegistryConfiguration(regionName));
//Glue Schema Registry de-serializer initialization for the consumer.
glueSchemaRegistryDeserializer =
new GlueSchemaRegistryDeserializerImpl(awsCredentialsProvider, getSchemaRegistryConfiguration(regionName));

//Define the Glue Schema Registry schema object that will be used to encode data.
Schema gsrSchema =
new com.amazonaws.services.schemaregistry.common.Schema(getAvroSchema().toString(),
DataFormat.AVRO.name(), schemaName);
//Define the Glue Schema Registry schema object that will be used to encode data.
Schema gsrSchema =
new com.amazonaws.services.schemaregistry.common.Schema(getAvroSchema().toString(),
DataFormat.AVRO.name(), schemaName);

LOGGER.info("Client initialization complete.");
LOGGER.info("Client initialization complete.");

Date timestamp = DateTime.now().toDate();
Date timestamp = DateTime.now().toDate();

//Put records into Kinesis stream.
putRecordsWithSchema(streamName, numOfRecords, gsrSchema, timestamp);
//Put records into Kinesis stream.
putRecordsWithSchema(streamName, numOfRecords, gsrSchema, timestamp);

//Start receiving records from the stream.
getRecordsWithSchema(streamName, timestamp);
//Start receiving records from the stream.
getRecordsWithSchema(streamName, timestamp);
}
}

private static void getRecordsWithSchema(String streamName, Date timestamp) throws IOException {
//Standard Kinesis code to getRecords from a Kinesis Data Stream.
String shardIterator;
DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
describeStreamRequest.setStreamName(streamName);
List<Shard> shards = new ArrayList<>();
String exclusiveStartShardId = null;

DescribeStreamResult streamRes;
DescribeStreamResponse streamRes;
do {
streamRes = kinesisClient.describeStream(describeStreamRequest);
shards.addAll(streamRes.getStreamDescription().getShards());
DescribeStreamRequest.Builder requestBuilder = DescribeStreamRequest.builder()
.streamName(streamName);
if (exclusiveStartShardId != null) {
requestBuilder.exclusiveStartShardId(exclusiveStartShardId);
}
streamRes = kinesisClient.describeStream(requestBuilder.build());
shards.addAll(streamRes.streamDescription().shards());

if (shards.size() > 0) {
Comment thread
jvdadda marked this conversation as resolved.
shards.get(shards.size() - 1).getShardId();
exclusiveStartShardId = shards.get(shards.size() - 1).shardId();
}
} while (streamRes.getStreamDescription().getHasMoreShards());
} while (streamRes.streamDescription().hasMoreShards());

GetShardIteratorRequest itReq = new GetShardIteratorRequest();
itReq.setStreamName(streamName);
itReq.setShardId(shards.get(0).getShardId());
itReq.setTimestamp(timestamp);
itReq.setShardIteratorType("AT_TIMESTAMP");
GetShardIteratorRequest itReq = GetShardIteratorRequest.builder()
.streamName(streamName)
.shardId(shards.get(0).shardId())
.timestamp(timestamp.toInstant())
.shardIteratorType(ShardIteratorType.AT_TIMESTAMP)
.build();

GetShardIteratorResult shardIteratorResult = kinesisClient.getShardIterator(itReq);
shardIterator = shardIteratorResult.getShardIterator();
GetShardIteratorResponse shardIteratorResult = kinesisClient.getShardIterator(itReq);
shardIterator = shardIteratorResult.shardIterator();

// Create new GetRecordsRequest with existing shardIterator.
GetRecordsRequest recordsRequest = new GetRecordsRequest();
recordsRequest.setShardIterator(shardIterator);
recordsRequest.setLimit(1000);
GetRecordsRequest recordsRequest = GetRecordsRequest.builder()
.shardIterator(shardIterator)
.limit(1000)
.build();

GetRecordsResult result = kinesisClient.getRecords(recordsRequest);
GetRecordsResponse result = kinesisClient.getRecords(recordsRequest);

for (Record record : result.getRecords()) {
ByteBuffer recordAsByteBuffer = record.getData();
for (Record record : result.records()) {
ByteBuffer recordAsByteBuffer = record.data().asByteBuffer();
GenericRecord decodedRecord = decodeRecord(recordAsByteBuffer);
LOGGER.info("Decoded Record: " + decodedRecord);
}
}

private static void putRecordsWithSchema(String streamName, int numOfRecords, Schema gsrSchema, Date timestamp) {
//Standard Kinesis code to putRecords into a Kinesis Data Stream.
PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
putRecordsRequest.setStreamName(streamName);

List<PutRecordsRequestEntry> recordsRequestEntries = new ArrayList<>();

LOGGER.info("Putting " + numOfRecords + " into " + streamName + " with schema" + gsrSchema);
for (int i = 0; i < numOfRecords; i++) {
GenericRecord record = (GenericRecord) getTestRecord(i);
byte[] recordWithSchema = encodeRecord(record, streamName, gsrSchema);
PutRecordsRequestEntry entry = new PutRecordsRequestEntry();
entry.setData(ByteBuffer.wrap(recordWithSchema));
entry.setPartitionKey(String.valueOf(timestamp.toInstant()
.toEpochMilli()));
PutRecordsRequestEntry entry = PutRecordsRequestEntry.builder()
.data(SdkBytes.fromByteArray(recordWithSchema))
.partitionKey(String.valueOf(timestamp.toInstant()
.toEpochMilli()))
.build();

recordsRequestEntries.add(entry);
}

putRecordsRequest.setRecords(recordsRequestEntries);
PutRecordsRequest putRecordsRequest = PutRecordsRequest.builder()
.streamName(streamName)
.records(recordsRequestEntries)
.build();

PutRecordsResult putRecordResult = kinesisClient.putRecords(putRecordsRequest);
PutRecordsResponse putRecordResult = kinesisClient.putRecords(putRecordsRequest);

LOGGER.info("Successfully put records: " + putRecordResult);
}
Expand Down