diff --git a/packages/kafka-client/src/events/provider/kafka/index.ts b/packages/kafka-client/src/events/provider/kafka/index.ts index d25e11e5..009365bf 100644 --- a/packages/kafka-client/src/events/provider/kafka/index.ts +++ b/packages/kafka-client/src/events/provider/kafka/index.ts @@ -23,20 +23,16 @@ import { decomposeError } from '../../../utils.js'; * A Kafka topic. */ export class KafkaTopic implements Topic { - name: string; - emitter: EventEmitter; - provider: Kafka; - subscribed: string[]; + readonly emitter: EventEmitter; + readonly subscribed: string[]; waitQueue: any[]; currentOffset: bigint; consumer: Consumer; - config: any; // message sync throttling attributes asyncQueue: async.QueueObject; drainEvent: (context: Message, done: (err: any) => void) => void; // default process one message at a time asyncLimit = 1; - manualOffsetCommit: boolean; private subscribedToTopic: boolean; /** @@ -50,22 +46,25 @@ export class KafkaTopic implements Topic { * @param provider * @param config */ - constructor(name: string, provider: Kafka, config: any, manualOffsetCommit = false) { - this.name = name; + constructor( + readonly name: string, + readonly provider: Kafka, + readonly config: KafkaProviderConfig, + readonly manualOffsetCommit = false, + ) { this.emitter = new EventEmitter(); - this.provider = provider; this.subscribed = []; this.waitQueue = []; this.currentOffset = 0n; - this.config = config; - this.manualOffsetCommit = manualOffsetCommit; } async createIfNotExists(): Promise { const topics = await this.provider.admin.listTopics(); if (!topics.includes(this.name)) { return await new Promise((resolve, reject) => { - const operation = retry.operation(); + const operation = retry.operation({ + retries: Number(this.config?.kafka?.retries ?? 10) + }); operation.attempt(async (attemptNo) => { try { await this.provider.admin.createTopics({ @@ -513,13 +512,13 @@ export interface KafkaProviderConfig { & AdminOptions; timeout: number; groupId: string; + [key: string]: any; } /** * Events provider. */ export class Kafka implements EventProvider { - readonly config: KafkaProviderConfig; readonly topics: Record = {}; private producer: Producer; @@ -537,7 +536,7 @@ export class Kafka implements EventProvider { * @param {object} logger */ constructor( - config: any, + readonly config: KafkaProviderConfig, readonly logger: Logger ) { this.config = clone(config); @@ -551,12 +550,12 @@ export class Kafka implements EventProvider { const operation = retry.operation({ forever: true, maxTimeout: this.config?.timeout ?? 60000, + retries: Number(this.config?.kafka?.retries ?? 10), }); return new Promise((resolveRetry) => { - operation.attempt(async () => { + operation.attempt(async (attemptNo) => { try { this.commonOptions = { - ...this.config.kafka, serializers: { key: noopSerializer, value: noopSerializer, @@ -571,7 +570,8 @@ export class Kafka implements EventProvider { }, retries: 100, retryDelay: 1000, - autocommit: false + autocommit: false, + ...this.config.kafka, }; // These are kept from migration of KafkaJS to Platformic lib @@ -579,7 +579,7 @@ export class Kafka implements EventProvider { this.commonOptions['bootstrapBrokers'] = this.commonOptions['brokers'] as string[]; } - this.logger?.info(`[kafka-client] Connecting - attempt No: ${operation.attempts()}`); + this.logger?.info(`[kafka-client] Connecting - attempt No: ${attemptNo}`); this.producer = new Producer(this.commonOptions); this.admin = new Admin(this.commonOptions); @@ -636,7 +636,6 @@ export class Kafka implements EventProvider { } catch (err: any) { operation.retry(err); - const attemptNo = operation.attempts(); this.producer?.close(); this.logger?.info(`Retry initialize the Producer, attempt No: ${attemptNo}`); } @@ -661,7 +660,7 @@ export class Kafka implements EventProvider { * @param eventName * @param msg */ - decodeObject(config: any, eventName: string, msg: any): any { + decodeObject(config: KafkaProviderConfig, eventName: string, msg: any): any { try { return decodeMessage(msg, config[eventName].messageObject); } catch (error: any) { @@ -683,7 +682,7 @@ export class Kafka implements EventProvider { async $send(topicName: string, eventName: string, message: any): Promise { try { const messages = Array.isArray(message) ? message : [message]; - const config: any = this.config; + const config = this.config; const messageObject = config[eventName]?.messageObject; if (!messageObject) { throw new Error(`messageObject for event ${eventName} not configured!`); @@ -752,7 +751,7 @@ export class Kafka implements EventProvider { * @param config * @return {Topic} Kafka topic */ - async topic(topicName: string, config: any): Promise { + async topic(topicName: string, config: KafkaProviderConfig): Promise { this.topics[topicName] ??= new KafkaTopic(topicName, this, config); await this.topics[topicName].createIfNotExists(); return this.topics[topicName]; diff --git a/packages/protos/io/restorecommerce/user.proto b/packages/protos/io/restorecommerce/user.proto index fc3306d3..6f120f31 100644 --- a/packages/protos/io/restorecommerce/user.proto +++ b/packages/protos/io/restorecommerce/user.proto @@ -276,6 +276,7 @@ message SendActivationEmailRequest { */ message Deleted { optional string id = 1; + optional string default_scope = 2; // default hierarchical scope } /** @@ -410,6 +411,7 @@ message User { repeated string totp_session_tokens = 29; /// TOTP Login session tokens repeated string password_hash_history = 30; // List of historical password hashes repeated string totp_recovery_codes = 31; // List of TOTP recovery codes + optional google.protobuf.Timestamp invited_at = 32; } /** @@ -459,4 +461,5 @@ message UserRole { repeated io.restorecommerce.attribute.Attribute properties = 25; // additional properties optional google.protobuf.Any data = 26; // additional data repeated io.restorecommerce.role.Role roles = 27; + optional google.protobuf.Timestamp invited_at = 28; }