diff --git a/lib/agent.js b/lib/agent.js index afef4a2..f57ceca 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -13,6 +13,7 @@ class Agent { this.topics = {} this.error = null this.stopped = true + this.lastUpload = Date.now() } async connect () { @@ -145,10 +146,21 @@ class Agent { } catch (err) { logger.log(`Error parsing payload on topic '${topic}' ${err}`) } - // logger.log(topic, type) - agent.topics[topic] = { - timestamp: Date.now(), - type + const existing = agent.topics[topic] + if (existing) { + if (JSON.stringify(existing.type) !== JSON.stringify(type)) { + // changed update + agent.topics[topic] = { + timestamp: Date.now(), + type + } + } + } else { + // logger.log(topic, type) + agent.topics[topic] = { + timestamp: Date.now(), + type + } } }) } catch (err) { @@ -168,23 +180,46 @@ class Agent { // logger.log(JSON.stringify(agent.topics, null, 2)) const upload = [] Object.keys(this.topics).forEach((key) => { - upload.push({ - topic: key, - type: this.topics[key].type, - timestamp: this.topics[key].timestamp - }) + if (this.topics[key].timestamp > this.lastUpload) { + upload.push({ + topic: key, + type: this.topics[key].type, + timestamp: this.topics[key].timestamp + }) + } }) + if (upload.length > 0) { + try { + logger.log('uploading') + await got.post(`${this.options.forgeURL}/api/v1/teams/${this.options.team}/brokers/${this.options.broker}/topics`, { + headers: { + Authorization: `Bearer ${this.options.token}` + }, + json: upload + }) + this.lastUpload = Date.now() + logger.log(`uploaded ${upload.length}`) + // clear list so only uploading new topics each time + // this.topics = {} + } catch (err) { + logger.log(err) + } + } + } + + async primeCache () { try { - logger.log('uploading') - await got.post(`${this.options.forgeURL}/api/v1/teams/${this.options.team}/brokers/${this.options.broker}/topics`, { + const topics = await got.get(`${this.options.forgeURL}/api/v1/teams/${this.options.team}/brokers/${this.options.broker}/topics`, { headers: { Authorization: `Bearer ${this.options.token}` - }, - json: upload + } + }).json() + topics.topics.forEach(t => { + this.topics[t.topic] = { + type: t.inferredSchema, + timestamp: t.timestamp ?? Date.now() + } }) - logger.log('uploaded') - // clear list so only uploading new topics each time - this.topics = {} } catch (err) { logger.log(err) } @@ -194,6 +229,9 @@ class Agent { if (this.client) { await this.stop() } + + await this.primeCache() + await this.connect() }