Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 21 additions & 22 deletions packages/kafka-client/src/events/provider/kafka/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,16 @@ import { decomposeError } from '../../../utils.js';
* A Kafka topic.
*/
export class KafkaTopic implements Topic {
name: string;
emitter: EventEmitter;
provider: Kafka;
subscribed: string[];
readonly emitter: EventEmitter;
readonly subscribed: string[];
waitQueue: any[];
currentOffset: bigint;
consumer: Consumer;
config: any;
// message sync throttling attributes
asyncQueue: async.QueueObject<any>;
drainEvent: (context: Message, done: (err: any) => void) => void;
// default process one message at a time
asyncLimit = 1;
manualOffsetCommit: boolean;
private subscribedToTopic: boolean;

/**
Expand All @@ -50,22 +46,25 @@ export class KafkaTopic implements Topic {
* @param provider
* @param config
*/
constructor(name: string, provider: Kafka, config: any, manualOffsetCommit = false) {
this.name = name;
constructor(
readonly name: string,
readonly provider: Kafka,
readonly config: KafkaProviderConfig,
readonly manualOffsetCommit = false,
) {
this.emitter = new EventEmitter();
this.provider = provider;
this.subscribed = [];
this.waitQueue = [];
this.currentOffset = 0n;
this.config = config;
this.manualOffsetCommit = manualOffsetCommit;
}

async createIfNotExists(): Promise<void> {
const topics = await this.provider.admin.listTopics();
if (!topics.includes(this.name)) {
return await new Promise((resolve, reject) => {
const operation = retry.operation();
const operation = retry.operation({
retries: Number(this.config?.kafka?.retries ?? 10)
});
operation.attempt(async (attemptNo) => {
try {
await this.provider.admin.createTopics({
Expand Down Expand Up @@ -513,13 +512,13 @@ export interface KafkaProviderConfig {
& AdminOptions;
timeout: number;
groupId: string;
[key: string]: any;
}

/**
* Events provider.
*/
export class Kafka implements EventProvider {
readonly config: KafkaProviderConfig;
readonly topics: Record<string, KafkaTopic> = {};
private producer: Producer;

Expand All @@ -537,7 +536,7 @@ export class Kafka implements EventProvider {
* @param {object} logger
*/
constructor(
config: any,
readonly config: KafkaProviderConfig,
readonly logger: Logger
) {
this.config = clone(config);
Expand All @@ -551,12 +550,12 @@ export class Kafka implements EventProvider {
const operation = retry.operation({
forever: true,
maxTimeout: this.config?.timeout ?? 60000,
retries: Number(this.config?.kafka?.retries ?? 10),
});
return new Promise<void>((resolveRetry) => {
operation.attempt(async () => {
operation.attempt(async (attemptNo) => {
try {
this.commonOptions = {
...this.config.kafka,
serializers: {
key: noopSerializer,
value: noopSerializer,
Expand All @@ -571,15 +570,16 @@ export class Kafka implements EventProvider {
},
retries: 100,
retryDelay: 1000,
autocommit: false
autocommit: false,
...this.config.kafka,
};

// These are kept from migration of KafkaJS to Platformic lib
if ('brokers' in this.commonOptions) {
this.commonOptions['bootstrapBrokers'] = this.commonOptions['brokers'] as string[];
}

this.logger?.info(`[kafka-client] Connecting - attempt No: ${operation.attempts()}`);
this.logger?.info(`[kafka-client] Connecting - attempt No: ${attemptNo}`);

this.producer = new Producer(this.commonOptions);
this.admin = new Admin(this.commonOptions);
Expand Down Expand Up @@ -636,7 +636,6 @@ export class Kafka implements EventProvider {
}
catch (err: any) {
operation.retry(err);
const attemptNo = operation.attempts();
this.producer?.close();
this.logger?.info(`Retry initialize the Producer, attempt No: ${attemptNo}`);
}
Expand All @@ -661,7 +660,7 @@ export class Kafka implements EventProvider {
* @param eventName
* @param msg
*/
decodeObject(config: any, eventName: string, msg: any): any {
decodeObject(config: KafkaProviderConfig, eventName: string, msg: any): any {
try {
return decodeMessage(msg, config[eventName].messageObject);
} catch (error: any) {
Expand All @@ -683,7 +682,7 @@ export class Kafka implements EventProvider {
async $send(topicName: string, eventName: string, message: any): Promise<ProduceResult> {
try {
const messages = Array.isArray(message) ? message : [message];
const config: any = this.config;
const config = this.config;
const messageObject = config[eventName]?.messageObject;
if (!messageObject) {
throw new Error(`messageObject for event ${eventName} not configured!`);
Expand Down Expand Up @@ -752,7 +751,7 @@ export class Kafka implements EventProvider {
* @param config
* @return {Topic} Kafka topic
*/
async topic(topicName: string, config: any): Promise<Topic> {
async topic(topicName: string, config: KafkaProviderConfig): Promise<Topic> {
this.topics[topicName] ??= new KafkaTopic(topicName, this, config);
await this.topics[topicName].createIfNotExists();
return this.topics[topicName];
Expand Down
3 changes: 3 additions & 0 deletions packages/protos/io/restorecommerce/user.proto
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ message SendActivationEmailRequest {
*/
message Deleted {
optional string id = 1;
optional string default_scope = 2; // default hierarchical scope
}

/**
Expand Down Expand Up @@ -410,6 +411,7 @@ message User {
repeated string totp_session_tokens = 29; /// TOTP Login session tokens
repeated string password_hash_history = 30; // List of historical password hashes
repeated string totp_recovery_codes = 31; // List of TOTP recovery codes
optional google.protobuf.Timestamp invited_at = 32;
}

/**
Expand Down Expand Up @@ -459,4 +461,5 @@ message UserRole {
repeated io.restorecommerce.attribute.Attribute properties = 25; // additional properties
optional google.protobuf.Any data = 26; // additional data
repeated io.restorecommerce.role.Role roles = 27;
optional google.protobuf.Timestamp invited_at = 28;
}
Loading