diff --git a/forge/ee/lib/expert/emxq-bridge/setup.js b/forge/ee/lib/expert/emxq-bridge/setup.js new file mode 100644 index 0000000000..afecb12239 --- /dev/null +++ b/forge/ee/lib/expert/emxq-bridge/setup.js @@ -0,0 +1,336 @@ +// Provisioning and tear down of the bridge to the FF Expert broker on the FF App. +// Exposed functions: syncBridge (idempotent orchestrator — call at startup and on a +// periodic housekeeper), addBridge / removeBridge (low-level provision/teardown, +// throw on error), validateBridge (read-only health check, never throws). + +const http = require('http') +const https = require('https') + +const axios = require('axios') + +// EMQX admin API drops idle keep-alive sockets, so axios's default agent pool (keepAlive: true) +// reuses a half-open socket and causes ECONNRESET errors on subsequent requests. +// Disable keep-alive to avoid this. +const httpAgent = new http.Agent({ keepAlive: false }) +const httpsAgent = new https.Agent({ keepAlive: false }) + +const { connector, actionOut, sourceChat, sourceInflight, ruleIn, ruleOut } = require('./templates.js') + +// EMQX v5 IDs for connector/action/source resources are `:`. +// Rule IDs are the rule's own `id` field. +const connectorId = `mqtt:${connector.name}` + +function getConfig (app) { + const teamBrokerEnabled = app.config.broker?.teamBroker?.enabled === true + const expertEnabled = app.config.expert?.enabled === true + const hasBridgeServer = !!app.config.expert?.centralBroker?.server + const bridgeEnabled = teamBrokerEnabled && expertEnabled && hasBridgeServer + + return { + bridgeEnabled, + teamBrokerEnabled, + licenseActive: app.license.active(), + licenseExpired: app.license.status().expired, + licenceId: app.license.get('id'), // used as the central-broker connector creds and topic prefix + licenceJwt: app.license.raw(), + expertBrokerServerAddress: app.config.expert?.centralBroker?.server, + ssl: app.config.expert?.centralBroker?.ssl ?? true, + appBrokerServiceUrl: app.config.broker?.teamBroker?.api?.url, + appBrokerApiKey: app.config.broker?.teamBroker?.api?.key, + appBrokerSecretKey: app.config.broker?.teamBroker?.api?.secret + } +} + +/** + * Create an axios client for the FF App Instance Broker API, authenticated with the team broker API key/secret from config. + * @param {ReturnType} cfg - config object for the bridge, containing API credentials and URLs + * @returns {import('axios').AxiosInstance} + */ +function makeClient (cfg) { + const basicAuth = `Basic ${Buffer.from(cfg.appBrokerApiKey + ':' + cfg.appBrokerSecretKey).toString('base64')}` + // validateStatus: () => true so we can inspect status manually — DELETE tolerates 404, + // and we want to surface the response body in error messages instead of axios's generic throw. + return axios.create({ + baseURL: cfg.appBrokerServiceUrl, + headers: { + 'User-Agent': 'FlowFuse', + Accept: 'application/json', + Authorization: basicAuth + }, + httpAgent, + httpsAgent, + validateStatus: () => true + }) +} + +/** + * Helper to send a DELETE request and throw if the response isn't 204 (deleted) or 404 (already absent). + * @param {import('axios').AxiosInstance} client - an axios client instance authenticated to the FF App Instance Broker API + * @param {string} path - API path to send the DELETE request to (e.g. `/connectors/${connectorId}`) + * @returns {Promise} + */ +async function del (client, path) { + const resp = await client.delete(path) + // 204/200 = deleted; 404 = already absent + if (resp.status === 204 || resp.status === 200 || resp.status === 404) return + throw new Error(`failed to delete ${path} (${resp.status}): ${JSON.stringify(resp.data)}`) +} + +/** + * Helper to send a POST request and throw if the response isn't 201 (created) or 200 (already exists). + * @param {import('axios').AxiosInstance} client - an axios client instance authenticated to the FF App Instance Broker API + * @param {string} path - API path to send the POST request to (e.g. `/connectors`) + * @param {object} body - request body to send as JSON + * @returns {Promise} + */ +async function post (client, path, body) { + const resp = await client.post(path, body) + if (resp.status === 201 || resp.status === 200) return + throw new Error(`failed to create ${path} (${resp.status}): ${JSON.stringify(resp.data)}`) +} + +async function getList (client, path) { + const resp = await client.get(path) + if (resp.status !== 200) { + throw new Error(`failed to GET ${path} (${resp.status}): ${JSON.stringify(resp.data)}`) + } + // EMQX list endpoints aren't uniform: /sources and /actions return arrays directly, + // /rules returns `{ data, meta }` (paginated). Normalize to an array either way. + const data = resp.data + if (Array.isArray(data)) return data + if (data && Array.isArray(data.data)) return data.data + throw new Error(`unexpected list shape from ${path}: ${JSON.stringify(data).slice(0, 200)}`) +} + +// Discover existing resources attached to our connector so we can clean up +// anything left over from previous runs — including orphans from past renames +// that the current template no longer references. +async function discoverResources (client) { + const [allSources, allActions, allRules] = await Promise.all([ + getList(client, '/sources'), + getList(client, '/actions'), + getList(client, '/rules?limit=100') + ]) + const ourSources = allSources.filter(s => s.connector === connector.name) + const ourActions = allActions.filter(a => a.connector === connector.name) + const ourActionIds = new Set(ourActions.map(a => `${a.type}:${a.name}`)) + const ourSourceIds = new Set(ourSources.map(s => `${s.type}:${s.name}`)) + + // Rules don't link to a connector directly, but they reference our actions + // (via `rule.actions`) and our sources (via `$bridges/:` in SQL). + const ourRules = allRules.filter(rule => { + const refsAction = (rule.actions || []).some(act => + typeof act === 'string' && ourActionIds.has(act) + ) + const bridgeRefs = (rule.sql || '').match(/\$bridges\/([\w:-]+)/g) || [] + const refsSource = bridgeRefs.some(ref => ourSourceIds.has(ref.slice('$bridges/'.length))) + return refsAction || refsSource + }) + + return { ourSources, ourActions, ourRules } +} + +/** + * Read-only health check. Returns false if config disabled, resources missing, + * licence creds drifted, central broker address drifted, or any error (errors + * are logged, not thrown). + * + * NOTE: this is an existence-and-identity check, not a config-correctness check. + * It verifies that the connector, action, both sources, and both rules are + * present (by name/id) and that the connector's `username` and `server` fields + * still match config — but it does NOT verify rule SQL, action/source + * `parameters`, `enable` flags, or that an action/source is still bound to our + * connector. A hand-edited resource with a matching name will pass. For a + * guaranteed-clean state, call `syncBridge(app, { force: true })`. + * + * @param {object} app + * @param {Object} [opts] + * @param {ReturnType} [opts.cfg] - pre-built config (skips `getConfig`) + * @param {import('axios').AxiosInstance} [opts.client] - Axios dependency (injection allows for unit testing) + * @returns {Promise} + */ +async function validateBridge (app, { cfg, client } = {}) { + cfg = cfg || getConfig(app) + if (!cfg.bridgeEnabled) { + return false + } + client = client || makeClient(cfg) + try { + const connectors = await getList(client, '/connectors') + const hasConnector = connectors.some(c => c.name === connector.name && c.type === 'mqtt') + if (!hasConnector) { + app.log.info('Expert bridge connector not found') + return false + } + // check licenceId + const connectorResp = await client.get(`/connectors/${connectorId}`) + if (connectorResp.status !== 200) { + app.log.info('Failed to get EMQX connector details') + return false + } + if (connectorResp.data.username !== cfg.licenceId) { + app.log.info('EMQX connector credentials do not match FF licence') + return false + } + if (connectorResp.data.server !== cfg.expertBrokerServerAddress) { + app.log.info('EMQX connector server does not match the central broker address') + return false + } + const [actions, sources, rules] = await Promise.all([ + getList(client, '/actions'), + getList(client, '/sources'), + getList(client, '/rules?limit=100') + ]) + const hasAction = actions.some(a => a.name === actionOut.name && a.type === 'mqtt') + if (!hasAction) { + app.log.info('Expert bridge action not found') + return false + } + const hasSourceChat = sources.some(s => s.name === sourceChat.name && s.type === 'mqtt') + const hasSourceInflight = sources.some(s => s.name === sourceInflight.name && s.type === 'mqtt') + if (!hasSourceChat || !hasSourceInflight) { + app.log.info('Expert bridge sources not found') + return false + } + const hasRuleOut = rules.some(r => r.id === ruleOut.id) + const hasRuleIn = rules.some(r => r.id === ruleIn.id) + if (!hasRuleOut || !hasRuleIn) { + app.log.info('Expert bridge rules not found') + return false + } + return true + } catch (err) { + app.log.error(`Error checking EMQX bridge: ${err.message}`) + return false + } +} + +/** + * Idempotent orchestrator. Brings the bridge into the desired state based on + * config + licence. Logs errors and returns a boolean — never throws. + * @param {object} app + * @param {Object} [opts] + * @param {boolean} [opts.force=false] - when true, tear down and recreate. Typically safe at start up due to being no active sessions. + * @param {import('axios').AxiosInstance} [opts.client] - Axios dependency (injection allows for unit testing) + * @returns {Promise} true if sync completed without error. + */ +async function syncBridge (app, { force = false, client } = {}) { + const cfg = getConfig(app) + const wantBridge = cfg.bridgeEnabled && cfg.licenseActive && !cfg.licenseExpired + + if (!wantBridge) { + if (!cfg.teamBrokerEnabled) { + // No team broker — nothing to talk to, nothing to clean up. + return true + } + app.log.info('Expert bridge not enabled or licence inactive/expired, removing bridge...') + try { + await removeBridge(app, { cfg, client: client || makeClient(cfg) }) + return true + } catch (err) { + app.log.error(`Error removing EMQX bridge: ${err.message}`) + return false + } + } + + client = client || makeClient(cfg) + try { + if (force) { + // Full sync — safe at startup since no active sessions. + await removeBridge(app, { cfg, client }) + await addBridge(app, { cfg, client }) + } else { + // Runtime: only act if something's wrong, to avoid disrupting active sessions. + const valid = await validateBridge(app, { cfg, client }) + if (!valid) { + app.log.info(`EMQX bridge '${connectorId}' not found or misconfigured, updating...`) + // Remove first to clear any partial / stale resources before recreating. + await removeBridge(app, { cfg, client }) + await addBridge(app, { cfg, client }) + } + } + return true + } catch (err) { + app.log.error(`Error syncing EMQX bridge: ${err.message}`) + return false + } +} + +/** + * Tear down all bridge resources. Throws on API error (404s tolerated). + * Prefer `syncBridge` for orchestration; call this directly only when the + * caller wants the throw-on-error contract. + * @param {object} app + * @param {Object} [opts] + * @param {ReturnType} [opts.cfg] - pre-built config (skips `getConfig`) + * @param {import('axios').AxiosInstance} [opts.client] - Axios dependency (injection allows for unit testing) + * @returns {Promise} + */ +async function removeBridge (app, { cfg, client } = {}) { + cfg = cfg || getConfig(app) + client = client || makeClient(cfg) + const { ourSources, ourActions, ourRules } = await discoverResources(client) + + // Tear down in reverse dependency order. 404s are tolerated by `del`. + for (const rule of ourRules) { + app.log.info(`deleting EMQX rule ${rule.id}`) + await del(client, `/rules/${rule.id}`) + } + for (const source of ourSources) { + app.log.info(`deleting EMQX source ${source.type}:${source.name}`) + await del(client, `/sources/${source.type}:${source.name}`) + } + for (const action of ourActions) { + app.log.info(`deleting EMQX action ${action.type}:${action.name}`) + await del(client, `/actions/${action.type}:${action.name}`) + } + app.log.info(`deleting EMQX connector ${connectorId}`) + await del(client, `/connectors/${connectorId}`) +} + +/** + * Create all bridge resources. Throws if a resource already exists — assumes a + * clean slate. Prefer `syncBridge` for orchestration. + * @param {object} app + * @param {Object} [opts] + * @param {ReturnType} [opts.cfg] - pre-built config (skips `getConfig`) + * @param {import('axios').AxiosInstance} [opts.client] - Axios dependency (injection allows for unit testing) + * @returns {Promise} + */ +async function addBridge (app, { cfg, client } = {}) { + cfg = cfg || getConfig(app) + client = client || makeClient(cfg) + + // Clone the connector template so per-call config doesn't mutate module-level + // state — matters for test isolation and any future concurrent callers. + const connectorPayload = { + ...connector, + server: cfg.expertBrokerServerAddress, + username: cfg.licenceId, + password: cfg.licenceJwt + } + + if (cfg.ssl) { + connectorPayload.ssl = { + enable: true + } + } + + // Provision in dependency order: connector → actions/sources → rules. + app.log.info(`creating EMQX connector ${connector.name}`) + await post(client, '/connectors', connectorPayload) + app.log.info(`creating EMQX action ${actionOut.name}`) + await post(client, '/actions', actionOut) + app.log.info(`creating EMQX source ${sourceChat.name}`) + await post(client, '/sources', sourceChat) + app.log.info(`creating EMQX source ${sourceInflight.name}`) + await post(client, '/sources', sourceInflight) + app.log.info(`creating EMQX rule ${ruleOut.id}`) + await post(client, '/rules', ruleOut) + app.log.info(`creating EMQX rule ${ruleIn.id}`) + await post(client, '/rules', ruleIn) +} + +module.exports = { syncBridge } +// Exposed for unit tests +module.exports._internal = { addBridge, removeBridge, validateBridge, discoverResources, getConfig, getList, makeClient, del, post } diff --git a/forge/ee/lib/expert/emxq-bridge/templates.js b/forge/ee/lib/expert/emxq-bridge/templates.js new file mode 100644 index 0000000000..093c401b45 --- /dev/null +++ b/forge/ee/lib/expert/emxq-bridge/templates.js @@ -0,0 +1,124 @@ +// EMQX resource templates for the FF App Instance Broker → Expert Broker bridge. +// The Expert Broker uses mountpoint-based tenant namespacing (`${client_attrs.team}`), +// so these templates operate in raw `ff/v1/...` topic space — no manual prefixing. + +const connector = { + name: 'ff-expert-broker', + type: 'mqtt', + bridge_mode: true, + clean_start: true, + description: 'FlowFuse Expert bridge', + enable: true, + pool_size: 1, + proto_ver: 'v5' // must be v5 to support correlation data and user properties (used in request/response patterns) +} + +// Used by ruleOut to publish outbound bridge messages to the Expert Broker. +const actionOut = { + name: 'ff-app-to-expert-action', + type: 'mqtt', + connector: 'ff-expert-broker', + description: 'Publish outbound bridge messages to the Expert Broker', + enable: true, + parameters: { + payload: '${payload}', // eslint-disable-line no-template-curly-in-string + qos: 2, + retain: false, + topic: '${topic}' // eslint-disable-line no-template-curly-in-string + }, + resource_opts: { + health_check_interval: '15s', + inflight_window: 100, + max_buffer_bytes: '256MB', + query_mode: 'async', + request_ttl: '45s', + worker_pool_size: 16 + } +} + +// Expert Broker → FF App Instance Broker. The Expert Broker strips the licence-id +// mountpoint before delivery, so this subscription is in raw topic space. +const sourceChat = { + name: 'ff-expert-to-app-chat-source', + type: 'mqtt', + connector: 'ff-expert-broker', + description: 'Subscribe to chat responses on the Expert Broker', + enable: true, + parameters: { + qos: 1, + topic: 'ff/v1/expert/+/+/+/+/support/chat/response' + }, + resource_opts: { + health_check_interval: '15s' + } +} + +// Expert Broker → FF App Instance Broker. Inflight requests initiated by the AI agent. +const sourceInflight = { + name: 'ff-expert-to-app-inflight-source', + type: 'mqtt', + connector: 'ff-expert-broker', + description: 'Subscribe to inflight requests on the Expert Broker', + enable: true, + parameters: { + qos: 1, + topic: 'ff/v1/expert/+/+/+/+/support/inflight/+/request' + }, + resource_opts: { + health_check_interval: '15s' + } +} + +// Republish inbound bridge messages onto the FF App Instance Broker. +// `topic` is already mountpoint-stripped by the Expert Broker, so no rewrite needed. +// +// v5 property forwarding via the inline republish action is fiddly on EMQX: +// - `user_properties` is a template scalar, so `${pub_props.'User-Property'}` works. +// - Values inside `mqtt_properties` are NOT evaluated as `${pub_props.'X'}` templates +// (they come through as literal strings). The workaround is to pre-extract each +// hyphenated v5 prop in the SELECT under a clean alias, then reference the alias +// with a plain `${alias}` template in `mqtt_properties`. +const ruleIn = { + id: 'ff-expert-to-app-rule', + name: 'ff-expert-to-app-rule', + description: 'Republish chat responses and inflight requests on the FF App Instance Broker', + enable: true, + sql: 'SELECT\n *,\n pub_props.\'Correlation-Data\' as correlation_data,\n pub_props.\'Response-Topic\' as response_topic,\n pub_props.\'Content-Type\' as content_type,\n pub_props.\'Payload-Format-Indicator\' as payload_format_indicator,\n pub_props.\'Message-Expiry-Interval\' as message_expiry_interval\nFROM\n "$bridges/mqtt:ff-expert-to-app-chat-source",\n "$bridges/mqtt:ff-expert-to-app-inflight-source"', + actions: [ + { + args: { + retain: false, + payload: '${payload}', // eslint-disable-line no-template-curly-in-string + topic: '${topic}', // eslint-disable-line no-template-curly-in-string + qos: 1, + direct_dispatch: false, + mqtt_properties: { + 'Correlation-Data': '${correlation_data}', // eslint-disable-line no-template-curly-in-string + 'Response-Topic': '${response_topic}', // eslint-disable-line no-template-curly-in-string + 'Content-Type': '${content_type}', // eslint-disable-line no-template-curly-in-string + 'Payload-Format-Indicator': '${payload_format_indicator}', // eslint-disable-line no-template-curly-in-string + 'Message-Expiry-Interval': '${message_expiry_interval}' // eslint-disable-line no-template-curly-in-string + }, + user_properties: "${pub_props.'User-Property'}" // eslint-disable-line no-template-curly-in-string + }, + function: 'republish' + } + ] +} + +// FF App Instance Broker → Expert Broker. Forwards two patterns: +// - ../support/chat/request +// - ../support/inflight/+/response +// The Expert Broker's mountpoint applies the `/` namespace prefix on receipt. +const ruleOut = { + id: 'ff-app-to-expert-rule', + name: 'ff-app-to-expert-rule', + description: 'Forward chat requests and inflight responses to the Expert Broker', + enable: true, + sql: 'SELECT\n *\nFROM\n "ff/v1/expert/+/+/+/+/support/chat/request",\n "ff/v1/expert/+/+/+/+/support/inflight/+/response"', + actions: [ + 'mqtt:ff-app-to-expert-action' + ] +} + +module.exports = { connector, actionOut, sourceChat, sourceInflight, ruleIn, ruleOut } diff --git a/forge/ee/lib/expert/tasks/startup.js b/forge/ee/lib/expert/tasks/startup.js new file mode 100644 index 0000000000..ba9b62c6bd --- /dev/null +++ b/forge/ee/lib/expert/tasks/startup.js @@ -0,0 +1,12 @@ +const { syncBridge } = require('../emxq-bridge/setup.js') +module.exports = { + name: 'EmqxExpertBridgeSetup', + startup: 90 * 1000, // Run 1.5min after startup + schedule: '', // no schedule + run: async function (app) { + // On startup, the web app is not running anyway so a full resync + // of the bridge is not disruptive and ensures that any changes to the + // bridge templates & license details are synchronised. + await syncBridge(app, { force: true }) + } +} diff --git a/forge/ee/lib/expert/tasks/weekly.js b/forge/ee/lib/expert/tasks/weekly.js new file mode 100644 index 0000000000..2f9fd746ab --- /dev/null +++ b/forge/ee/lib/expert/tasks/weekly.js @@ -0,0 +1,15 @@ +const { syncBridge } = require('../emxq-bridge/setup.js') +module.exports = { + name: 'EmqxExpertBridgeWeeklySync', + startup: false, + schedule: '@weekly', + run: async function (app) { + // On a weekly basis, check that the bridge connectors/rules/actions are present and has latest license details. + // force:false means: + // - only do a full re-sync if any of the rules, actions, sources or connectors are missing. + // - only do a full re-sync if the license details have changed. + // Local hand-crafted/modified rules, actions, sources or connectors will not be handled/detected by this task + // but they will be re-created if they are missing. + await syncBridge(app, { force: false }) + } +} diff --git a/forge/licensing/index.js b/forge/licensing/index.js index f9342e3abc..cfbc01f6e0 100644 --- a/forge/licensing/index.js +++ b/forge/licensing/index.js @@ -77,6 +77,9 @@ module.exports = fp(async function (app, opts) { } return defaultLimits[key] }, + raw: () => { + return userLicense + }, status: () => { return status() }, diff --git a/test/unit/forge/ee/lib/expert/tasks/emqx-bridge/setup_spec.js b/test/unit/forge/ee/lib/expert/tasks/emqx-bridge/setup_spec.js new file mode 100644 index 0000000000..36692cc4ce --- /dev/null +++ b/test/unit/forge/ee/lib/expert/tasks/emqx-bridge/setup_spec.js @@ -0,0 +1,617 @@ +const should = require('should') +const sinon = require('sinon') + +const bridge = require('../../../../../../../../forge/ee/lib/expert/emxq-bridge/setup.js') +const templates = require('../../../../../../../../forge/ee/lib/expert/emxq-bridge/templates.js') + +const { syncBridge } = bridge +const { + addBridge, removeBridge, validateBridge, discoverResources, + getConfig, getList, makeClient, del, post +} = bridge._internal + +const { connector, actionOut, sourceChat, sourceInflight, ruleIn, ruleOut } = templates + +// #region helpers + +// A stub Fastify `app` with overrides for config and license +function makeApp (overrides = {}) { + const cfg = overrides.config || {} + return { + config: { + broker: cfg.broker !== undefined + ? cfg.broker + : { teamBroker: { enabled: true, api: { url: 'http://broker.test/api/v5', key: 'k', secret: 's' } } }, + expert: cfg.expert !== undefined + ? cfg.expert + : { enabled: true, centralBroker: { server: 'expert.test:1884' } } + }, + license: { + active: sinon.stub().returns(overrides.licenseActive !== false), + status: sinon.stub().returns({ expired: overrides.licenseExpired === true }), + get: sinon.stub().returns(overrides.licenceId || 'test-licence-id'), + raw: sinon.stub().returns(overrides.licenceJwt || 'test.jwt.value') + }, + log: { + info: sinon.stub(), + warn: sinon.stub(), + error: sinon.stub() + } + } +} + +// Stub axios-shaped client. Tests configure return values via .resolves on individual stubs. +function makeFakeClient () { + return { + get: sinon.stub(), + post: sinon.stub().resolves({ status: 201, data: {} }), + delete: sinon.stub().resolves({ status: 204, data: {} }) + } +} + +// Default discovery response shape — empty broker (no resources to clean up). +function emptyDiscovery (client) { + client.get.withArgs('/sources').resolves({ status: 200, data: [] }) + client.get.withArgs('/actions').resolves({ status: 200, data: [] }) + client.get.withArgs('/rules?limit=100').resolves({ status: 200, data: { data: [], meta: {} } }) +} + +// Discovery response with one of each provisioned (matching template names). +function fullDiscovery (client) { + client.get.withArgs('/sources').resolves({ + status: 200, + data: [ + { name: sourceChat.name, type: 'mqtt', connector: connector.name }, + { name: sourceInflight.name, type: 'mqtt', connector: connector.name } + ] + }) + client.get.withArgs('/actions').resolves({ + status: 200, + data: [{ name: actionOut.name, type: 'mqtt', connector: connector.name }] + }) + client.get.withArgs('/rules?limit=100').resolves({ + status: 200, + data: { + data: [ + { id: ruleOut.id, sql: '', actions: [`mqtt:${actionOut.name}`] }, + { id: ruleIn.id, sql: `... "$bridges/mqtt:${sourceChat.name}" ...`, actions: [] } + ], + meta: {} + } + }) +} + +// Set up all mocks needed for validateBridge to return true. Individual tests then +// override specific endpoints to exercise each failure branch. +function passingValidate (client, { licenceId = 'test-licence-id', server = 'expert.test:1884' } = {}) { + client.get.withArgs('/connectors').resolves({ + status: 200, data: [{ name: connector.name, type: 'mqtt' }] + }) + client.get.withArgs(`/connectors/mqtt:${connector.name}`).resolves({ + status: 200, data: { username: licenceId, server } + }) + client.get.withArgs('/actions').resolves({ + status: 200, data: [{ name: actionOut.name, type: 'mqtt' }] + }) + client.get.withArgs('/sources').resolves({ + status: 200, + data: [ + { name: sourceChat.name, type: 'mqtt' }, + { name: sourceInflight.name, type: 'mqtt' } + ] + }) + client.get.withArgs('/rules?limit=100').resolves({ + status: 200, data: { data: [{ id: ruleOut.id }, { id: ruleIn.id }] } + }) +} + +// #endregion helpers + +describe('EMQX bridge-setup', function () { + afterEach(function () { + sinon.restore() + }) + + describe('getConfig', function () { + it('returns bridgeEnabled=true when team broker, expert, and central broker server are all set', function () { + const cfg = getConfig(makeApp()) + cfg.bridgeEnabled.should.be.true() + cfg.teamBrokerEnabled.should.be.true() + }) + + it('returns bridgeEnabled=false when team broker is disabled', function () { + const cfg = getConfig(makeApp({ + config: { broker: { teamBroker: { enabled: false } } } + })) + cfg.bridgeEnabled.should.be.false() + cfg.teamBrokerEnabled.should.be.false() + }) + + it('returns bridgeEnabled=false when expert is disabled', function () { + const cfg = getConfig(makeApp({ + config: { expert: { enabled: false, centralBroker: { server: 'x:1884' } } } + })) + cfg.bridgeEnabled.should.be.false() + }) + + it('returns bridgeEnabled=false when central broker server is missing', function () { + const cfg = getConfig(makeApp({ + config: { expert: { enabled: true, centralBroker: {} } } + })) + cfg.bridgeEnabled.should.be.false() + }) + + it('includes license active/expired/id/jwt', function () { + const cfg = getConfig(makeApp({ licenceId: 'lic-42', licenceJwt: 'jwt-token' })) + cfg.licenseActive.should.be.true() + cfg.licenseExpired.should.be.false() + cfg.licenceId.should.equal('lic-42') + cfg.licenceJwt.should.equal('jwt-token') + }) + + it('does not throw when config sections are entirely missing', function () { + const app = { config: {}, license: { active: () => false, status: () => ({ expired: false }), get: () => null, raw: () => null } } + should(() => getConfig(app)).not.throw() + }) + }) + + describe('makeClient', function () { + it('builds an axios instance with baseURL, basic auth, and disabled keep-alive agents', function () { + const cfg = { + appBrokerServiceUrl: 'http://broker.test/api/v5', + appBrokerApiKey: 'apikey', + appBrokerSecretKey: 'secretkey' + } + const client = makeClient(cfg) + client.defaults.baseURL.should.equal('http://broker.test/api/v5') + client.defaults.headers.Authorization.should.equal('Basic ' + Buffer.from('apikey:secretkey').toString('base64')) + client.defaults.httpAgent.keepAlive.should.be.false() + client.defaults.httpsAgent.keepAlive.should.be.false() + // validateStatus accepts every status so callers can branch on resp.status + client.defaults.validateStatus(500).should.be.true() + }) + }) + + describe('del', function () { + it('resolves on 200/204/404 without throwing', async function () { + for (const status of [200, 204, 404]) { + const client = { delete: sinon.stub().resolves({ status, data: {} }) } + await del(client, '/anything').should.be.fulfilled() + } + }) + + it('throws on other status codes with status and body in message', async function () { + const client = { delete: sinon.stub().resolves({ status: 500, data: { reason: 'boom' } }) } + await del(client, '/x').should.be.rejectedWith(/failed to delete \/x \(500\).*boom/) + }) + }) + + describe('post', function () { + it('resolves on 200 and 201', async function () { + for (const status of [200, 201]) { + const client = { post: sinon.stub().resolves({ status, data: {} }) } + await post(client, '/x', {}).should.be.fulfilled() + } + }) + + it('throws on other status codes', async function () { + const client = { post: sinon.stub().resolves({ status: 400, data: { code: 'BAD' } }) } + await post(client, '/x', {}).should.be.rejectedWith(/failed to create \/x \(400\).*BAD/) + }) + }) + + describe('getList', function () { + it('returns the array directly when the response body is an array', async function () { + const client = { get: sinon.stub().resolves({ status: 200, data: [{ a: 1 }, { a: 2 }] }) } + const list = await getList(client, '/x') + list.should.have.length(2) + }) + + it('unwraps the data field for paginated responses', async function () { + const client = { get: sinon.stub().resolves({ status: 200, data: { data: [{ a: 1 }], meta: {} } }) } + const list = await getList(client, '/x') + list.should.have.length(1) + }) + + it('throws on non-200 status', async function () { + const client = { get: sinon.stub().resolves({ status: 401, data: { error: 'nope' } }) } + await getList(client, '/x').should.be.rejectedWith(/failed to GET \/x \(401\)/) + }) + + it('throws on unexpected response shape', async function () { + const client = { get: sinon.stub().resolves({ status: 200, data: { unexpected: true } }) } + await getList(client, '/x').should.be.rejectedWith(/unexpected list shape/) + }) + }) + + describe('discoverResources', function () { + it('filters sources and actions by connector name', async function () { + const client = makeFakeClient() + client.get.withArgs('/sources').resolves({ + status: 200, + data: [ + { name: 's-mine', type: 'mqtt', connector: connector.name }, + { name: 's-other', type: 'mqtt', connector: 'someone-else' } + ] + }) + client.get.withArgs('/actions').resolves({ + status: 200, + data: [ + { name: 'a-mine', type: 'mqtt', connector: connector.name }, + { name: 'a-other', type: 'mqtt', connector: 'someone-else' } + ] + }) + client.get.withArgs('/rules?limit=100').resolves({ status: 200, data: { data: [], meta: {} } }) + + const { ourSources, ourActions, ourRules } = await discoverResources(client) + ourSources.should.have.length(1) + ourSources[0].name.should.equal('s-mine') + ourActions.should.have.length(1) + ourActions[0].name.should.equal('a-mine') + ourRules.should.have.length(0) + }) + + it('matches rules that reference our actions', async function () { + const client = makeFakeClient() + client.get.withArgs('/sources').resolves({ status: 200, data: [] }) + client.get.withArgs('/actions').resolves({ + status: 200, + data: [{ name: 'a-mine', type: 'mqtt', connector: connector.name }] + }) + client.get.withArgs('/rules?limit=100').resolves({ + status: 200, + data: { + data: [ + { id: 'our-rule', sql: 'SELECT * FROM "t/x"', actions: ['mqtt:a-mine'] }, + { id: 'not-our-rule', sql: 'SELECT * FROM "t/y"', actions: ['mqtt:not-ours'] } + ] + } + }) + + const { ourRules } = await discoverResources(client) + ourRules.should.have.length(1) + ourRules[0].id.should.equal('our-rule') + }) + + it('matches rules that reference our sources via $bridges/:', async function () { + const client = makeFakeClient() + client.get.withArgs('/sources').resolves({ + status: 200, + data: [{ name: 's-mine', type: 'mqtt', connector: connector.name }] + }) + client.get.withArgs('/actions').resolves({ status: 200, data: [] }) + client.get.withArgs('/rules?limit=100').resolves({ + status: 200, + data: { + data: [ + { id: 'our-rule', sql: 'SELECT * FROM "$bridges/mqtt:s-mine"', actions: [] }, + { id: 'not-our-rule', sql: 'SELECT * FROM "$bridges/mqtt:s-other"', actions: [] } + ] + } + }) + + const { ourRules } = await discoverResources(client) + ourRules.should.have.length(1) + ourRules[0].id.should.equal('our-rule') + }) + }) + + describe('validateBridge', function () { + it('returns false when bridge is disabled in config', async function () { + const app = makeApp({ config: { expert: { enabled: false, centralBroker: { server: 'x:1' } } } }) + const result = await validateBridge(app) + result.should.be.false() + }) + + it('returns false when our connector is not in the list', async function () { + const app = makeApp() + const client = makeFakeClient() + client.get.withArgs('/connectors').resolves({ status: 200, data: [] }) + + const result = await validateBridge(app, { client }) + result.should.be.false() + app.log.info.calledWithMatch(/connector not found/).should.be.true() + }) + + it('returns false when the connector username does not match the current licence id', async function () { + const app = makeApp({ licenceId: 'new-lic' }) + const client = makeFakeClient() + passingValidate(client, { licenceId: 'old-lic' }) // connector body has old-lic + + const result = await validateBridge(app, { client }) + result.should.be.false() + app.log.info.calledWithMatch(/credentials do not match/).should.be.true() + }) + + it('returns false when the connector server does not match config', async function () { + const app = makeApp() // cfg.expertBrokerServerAddress = 'expert.test:1884' + const client = makeFakeClient() + passingValidate(client, { server: 'old-broker.example:1884' }) + + const result = await validateBridge(app, { client }) + result.should.be.false() + app.log.info.calledWithMatch(/server does not match/).should.be.true() + }) + + it('returns false when our action is missing', async function () { + const app = makeApp() + const client = makeFakeClient() + passingValidate(client) + // override actions to return empty + client.get.withArgs('/actions').resolves({ status: 200, data: [] }) + + const result = await validateBridge(app, { client }) + result.should.be.false() + app.log.info.calledWithMatch(/action not found/).should.be.true() + }) + + it('returns false when one of our sources is missing', async function () { + const app = makeApp() + const client = makeFakeClient() + passingValidate(client) + // only chat source, missing the inflight source + client.get.withArgs('/sources').resolves({ + status: 200, data: [{ name: sourceChat.name, type: 'mqtt' }] + }) + + const result = await validateBridge(app, { client }) + result.should.be.false() + app.log.info.calledWithMatch(/sources not found/).should.be.true() + }) + + it('returns false when one of our rules is missing', async function () { + const app = makeApp() + const client = makeFakeClient() + passingValidate(client) + // only ruleOut, missing ruleIn + client.get.withArgs('/rules?limit=100').resolves({ + status: 200, data: { data: [{ id: ruleOut.id }] } + }) + + const result = await validateBridge(app, { client }) + result.should.be.false() + app.log.info.calledWithMatch(/rules not found/).should.be.true() + }) + + it('returns true when connector, creds, server, action, sources, and rules all check out', async function () { + const app = makeApp() + const client = makeFakeClient() + passingValidate(client) + + const result = await validateBridge(app, { client }) + result.should.be.true() + }) + + it('returns false (logging error) when a request throws', async function () { + const app = makeApp() + const client = makeFakeClient() + client.get.withArgs('/connectors').rejects(new Error('network down')) + + const result = await validateBridge(app, { client }) + result.should.be.false() + app.log.error.calledWithMatch(/network down/).should.be.true() + }) + }) + + describe('addBridge', function () { + it('POSTs connector → action → sources → rules in order, with cloned connector payload', async function () { + const app = makeApp({ licenceId: 'lic-1', licenceJwt: 'jwt-1' }) + const cfg = getConfig(app) + const client = makeFakeClient() + + await addBridge(app, { cfg, client }) + + client.post.callCount.should.equal(6) // 1 connector + 1 action + 2 sources + 2 rules + client.post.getCall(0).args[0].should.equal('/connectors') + client.post.getCall(1).args[0].should.equal('/actions') + client.post.getCall(2).args[0].should.equal('/sources') + client.post.getCall(3).args[0].should.equal('/sources') + client.post.getCall(4).args[0].should.equal('/rules') + client.post.getCall(5).args[0].should.equal('/rules') + + const connectorPayload = client.post.getCall(0).args[1] + connectorPayload.server.should.equal('expert.test:1884') + connectorPayload.username.should.equal('lic-1') + connectorPayload.password.should.equal('jwt-1') + connectorPayload.name.should.equal(connector.name) + }) + + it('does not mutate the imported connector template', async function () { + const app = makeApp() + const cfg = getConfig(app) + const client = makeFakeClient() + const before = JSON.parse(JSON.stringify(connector)) + + await addBridge(app, { cfg, client }) + + connector.should.deepEqual(before) + }) + + it('throws when a POST returns a non-201 status', async function () { + const app = makeApp() + const cfg = getConfig(app) + const client = makeFakeClient() + client.post.withArgs('/connectors').resolves({ status: 409, data: { code: 'EXISTS' } }) + + await addBridge(app, { cfg, client }).should.be.rejectedWith(/failed to create \/connectors/) + }) + }) + + describe('removeBridge', function () { + it('does only the connector DELETE when no resources exist', async function () { + const app = makeApp() + const cfg = getConfig(app) + const client = makeFakeClient() + emptyDiscovery(client) + + await removeBridge(app, { cfg, client }) + + client.delete.callCount.should.equal(1) + client.delete.getCall(0).args[0].should.equal(`/connectors/mqtt:${connector.name}`) + }) + + it('deletes rules → sources → actions → connector in that order', async function () { + const app = makeApp() + const cfg = getConfig(app) + const client = makeFakeClient() + fullDiscovery(client) + + await removeBridge(app, { cfg, client }) + + // 2 rules + 2 sources + 1 action + 1 connector = 6 deletes + client.delete.callCount.should.equal(6) + const paths = client.delete.getCalls().map(c => c.args[0]) + paths[0].should.match(/^\/rules\//) + paths[1].should.match(/^\/rules\//) + paths[2].should.match(/^\/sources\//) + paths[3].should.match(/^\/sources\//) + paths[4].should.match(/^\/actions\//) + paths[5].should.equal(`/connectors/mqtt:${connector.name}`) + }) + + it('tolerates 404s on individual deletes', async function () { + const app = makeApp() + const cfg = getConfig(app) + const client = makeFakeClient() + emptyDiscovery(client) + client.delete.resolves({ status: 404, data: {} }) + + await removeBridge(app, { cfg, client }).should.be.fulfilled() + }) + + it('throws if a delete returns a non-success status', async function () { + const app = makeApp() + const cfg = getConfig(app) + const client = makeFakeClient() + emptyDiscovery(client) + client.delete.resolves({ status: 500, data: { reason: 'boom' } }) + + await removeBridge(app, { cfg, client }).should.be.rejectedWith(/failed to delete/) + }) + }) + + describe('syncBridge', function () { + it('returns true and does nothing when team broker is disabled', async function () { + const app = makeApp({ config: { broker: { teamBroker: { enabled: false } } } }) + const client = makeFakeClient() + + const result = await syncBridge(app, { client }) + result.should.be.true() + client.get.called.should.be.false() + client.post.called.should.be.false() + client.delete.called.should.be.false() + }) + + it('removes bridge when expert is disabled (team broker still enabled)', async function () { + const app = makeApp({ config: { expert: { enabled: false, centralBroker: { server: 'x:1' } } } }) + const client = makeFakeClient() + emptyDiscovery(client) + + const result = await syncBridge(app, { client }) + result.should.be.true() + client.delete.called.should.be.true() + client.post.called.should.be.false() + }) + + it('removes bridge when licence is inactive', async function () { + const app = makeApp({ licenseActive: false }) + const client = makeFakeClient() + emptyDiscovery(client) + + const result = await syncBridge(app, { client }) + result.should.be.true() + client.delete.called.should.be.true() + client.post.called.should.be.false() + }) + + it('removes bridge when licence is expired', async function () { + const app = makeApp({ licenseExpired: true }) + const client = makeFakeClient() + emptyDiscovery(client) + + const result = await syncBridge(app, { client }) + result.should.be.true() + client.delete.called.should.be.true() + client.post.called.should.be.false() + }) + + it('returns false when delete fails (logs error)', async function () { + const app = makeApp({ licenseExpired: true }) + const client = makeFakeClient() + emptyDiscovery(client) + client.delete.resolves({ status: 500, data: {} }) + + const result = await syncBridge(app, { client }) + result.should.be.false() + app.log.error.called.should.be.true() + }) + + it('with force=true: removes then adds even if the bridge looks healthy', async function () { + const app = makeApp() + const client = makeFakeClient() + emptyDiscovery(client) + + const result = await syncBridge(app, { force: true, client }) + result.should.be.true() + // 1 connector delete + 6 creates (1 connector + 1 action + 2 sources + 2 rules) + client.delete.callCount.should.equal(1) + client.post.callCount.should.equal(6) + // didn't bother validating + client.get.calledWith('/connectors').should.be.false() + }) + + it('without force: skips changes when validateBridge passes', async function () { + const app = makeApp() + const client = makeFakeClient() + // make validateBridge pass + passingValidate(client) + + const result = await syncBridge(app, { client }) + result.should.be.true() + client.delete.called.should.be.false() + client.post.called.should.be.false() + }) + + it('without force: removes + adds when validateBridge fails', async function () { + const app = makeApp() + const client = makeFakeClient() + // validateBridge sees no connector → false + client.get.withArgs('/connectors').resolves({ status: 200, data: [] }) + // discoverResources sees nothing to clean up + client.get.withArgs('/sources').resolves({ status: 200, data: [] }) + client.get.withArgs('/actions').resolves({ status: 200, data: [] }) + client.get.withArgs('/rules?limit=100').resolves({ status: 200, data: { data: [] } }) + + const result = await syncBridge(app, { client }) + result.should.be.true() + client.delete.called.should.be.true() + client.post.callCount.should.equal(6) // 1 connector + 1 action + 2 sources + 2 rules + }) + + it('returns false when an underlying call throws. (force=true) (logs error)', async function () { + const app = makeApp() + const client = makeFakeClient() + emptyDiscovery(client) // removeBridge succeeds (nothing to clean up) + client.post.rejects(new Error('boom')) // addBridge then trips on the connector POST + + const result = await syncBridge(app, { force: true, client }) + result.should.be.false() + app.log.error.calledWithMatch(/boom/).should.be.true() + }) + + it('reads app.config and app.license once per sync run (cfg threading)', async function () { + const app = makeApp() + const client = makeFakeClient() + // validate passes → no further work + passingValidate(client) // validate passes → no further work + + await syncBridge(app, { client }) + + // Each license accessor should fire exactly once during the single getConfig call. + app.license.active.callCount.should.equal(1) + app.license.status.callCount.should.equal(1) + app.license.get.callCount.should.equal(1) + app.license.raw.callCount.should.equal(1) + }) + }) +}) diff --git a/test/unit/forge/licensing/index_spec.js b/test/unit/forge/licensing/index_spec.js index 3901e51336..59aeb0b6c0 100644 --- a/test/unit/forge/licensing/index_spec.js +++ b/test/unit/forge/licensing/index_spec.js @@ -146,12 +146,14 @@ describe('License API', async function () { after(async function () { await app.close() }) + it('Reports as not active', async function () { + app.license.active().should.equal(false) + }) it('Uses default limits when no license applied', async function () { app.license.get('users').should.equal(app.license.defaults.users) app.license.get('teams').should.equal(app.license.defaults.teams) app.license.get('instances').should.equal(app.license.defaults.instances) }) - it('Gets all usage count and limits (unlicensed)', async function () { const usage = await app.license.usage() // check usage contains the correct keys @@ -223,6 +225,14 @@ describe('License API', async function () { after(async function () { await app.close() }) + it('Reports as active', async function () { + app.license.active().should.equal(true) + }) + it('Returns raw license', async function () { + const raw = app.license.raw() + should(raw).be.a.String() + raw.should.equal(TEST_LICENSE_4u_5t_6p_7d) + }) it('Gets all usage count and limits (licensed)', async function () { const usage = await app.license.usage() // check usage contains the correct keys