diff --git a/meteor/server/api/peripheralDevice.ts b/meteor/server/api/peripheralDevice.ts index dd55251de23..a49d7774107 100644 --- a/meteor/server/api/peripheralDevice.ts +++ b/meteor/server/api/peripheralDevice.ts @@ -37,7 +37,7 @@ import { checkAccessAndGetPeripheralDevice } from '../security/check' import { UserActionsLogItem } from '@sofie-automation/meteor-lib/dist/collections/UserActionsLog' import { PackageManagerIntegration } from './integration/expectedPackages' import { profiler } from './profiler' -import { QueueStudioJob } from '../worker/worker' +import { QueueStudioJob, QueueOrUpdateStudioJob } from '../worker/worker' import { StudioJobs } from '@sofie-automation/corelib/dist/worker/studio' import { PlayoutChangedResults, @@ -46,6 +46,7 @@ import { TimelineTriggerTimeResult, DeviceStatusDetail, } from '@sofie-automation/shared-lib/dist/peripheralDevice/peripheralDeviceAPI' +import type { PeripheralDeviceExternalEvent } from '@sofie-automation/shared-lib/dist/peripheralDevice/externalEvents' import { checkStudioExists } from '../optimizations' import { ExpectedPackageId, @@ -465,6 +466,32 @@ export namespace ServerPeripheralDeviceAPI { transaction?.end() } + export async function reportExternalEvents( + context: MethodContext, + deviceId: PeripheralDeviceId, + token: string, + events: PeripheralDeviceExternalEvent[] + ): Promise { + check(events, Array) + + const peripheralDevice = await checkAccessAndGetPeripheralDevice(deviceId, token, context) + + if (!peripheralDevice.studioAndConfigId) + throw new Error(`PeripheralDevice "${peripheralDevice._id}" sent reportExternalEvents, but has no studioId`) + + if (!events.length) return + + const studioId = peripheralDevice.studioAndConfigId.studioId + // An arbitrary cap, to avoid unbound memory growth + const MAX_PENDING_EXTERNAL_EVENTS = 1000 + + // Merge events into the last pending OnExternalEvents job in the queue, or enqueue a new one. + // This prevents queue flooding when many events arrive in a burst, or when multiple gateways + // report events for the same studio simultaneously. + QueueOrUpdateStudioJob(StudioJobs.OnExternalEvents, studioId, (existing) => ({ + events: [...(existing?.events ?? []), ...events].slice(-MAX_PENDING_EXTERNAL_EVENTS), + })) + } export async function pingWithCommand( context: MethodContext, deviceId: PeripheralDeviceId, @@ -1036,6 +1063,13 @@ class ServerPeripheralDeviceAPIClass extends MethodContextAPI implements NewPeri ) { return ServerPeripheralDeviceAPI.playoutPlaybackChanged(this, deviceId, deviceToken, changedResults) } + async reportExternalEvents( + deviceId: PeripheralDeviceId, + deviceToken: string, + events: PeripheralDeviceExternalEvent[] + ) { + return ServerPeripheralDeviceAPI.reportExternalEvents(this, deviceId, deviceToken, events) + } async reportResolveDone( deviceId: PeripheralDeviceId, deviceToken: string, diff --git a/meteor/server/publications/_publications.ts b/meteor/server/publications/_publications.ts index 64a027a2791..501d8fa4d15 100644 --- a/meteor/server/publications/_publications.ts +++ b/meteor/server/publications/_publications.ts @@ -4,6 +4,7 @@ import './lib/lib' import './buckets' import './blueprintUpgradeStatus/publication' import './ingestStatus/publication' +import './externalEventSubscriptions' import './packageManager/expectedPackages/publication' import './packageManager/packageContainers' import './packageManager/playoutContext' diff --git a/meteor/server/publications/externalEventSubscriptions.ts b/meteor/server/publications/externalEventSubscriptions.ts new file mode 100644 index 00000000000..acf8b14d559 --- /dev/null +++ b/meteor/server/publications/externalEventSubscriptions.ts @@ -0,0 +1,179 @@ +import { PeripheralDeviceId, StudioId } from '@sofie-automation/corelib/dist/dataModel/Ids' +import { assertNever, getHash, literal } from '@sofie-automation/corelib/dist/lib' +import { protectString } from '@sofie-automation/corelib/dist/protectedString' +import { MongoFieldSpecifierOnesStrict } from '@sofie-automation/corelib/dist/mongo' +import { DBRundown } from '@sofie-automation/corelib/dist/dataModel/Rundown' +import { DBRundownPlaylist } from '@sofie-automation/corelib/dist/dataModel/RundownPlaylist/RundownPlaylist' +import { ReadonlyDeep } from 'type-fest' +import { + CustomPublish, + CustomPublishCollection, + meteorCustomPublish, + setUpCollectionOptimizedObserver, + SetupObserversResult, + TriggerUpdate, +} from '../lib/customPublication' +import { logger } from '../logging' +import { RundownPlaylists, Rundowns } from '../collections' +import { + PeripheralDevicePubSub, + PeripheralDevicePubSubCollectionsNames, + ExternalEventSubscriptionDocument, + ExternalEventSubscriptionId, +} from '@sofie-automation/shared-lib/dist/pubsub/peripheralDevice' +import type { PeripheralDeviceExternalEvent } from '@sofie-automation/shared-lib/dist/peripheralDevice/externalEvents' +import { checkAccessAndGetPeripheralDevice } from '../security/check' +import { check } from '../lib/check' + +type RundownPlaylistFields = '_id' | 'activationId' +const rundownPlaylistFieldSpecifier = literal< + MongoFieldSpecifierOnesStrict> +>({ + _id: 1, + activationId: 1, +}) + +type RundownFields = '_id' | 'playlistId' | 'externalEventSubscriptions' +const rundownFieldSpecifier = literal>>({ + _id: 1, + playlistId: 1, + externalEventSubscriptions: 1, +}) + +interface ExternalEventSubscriptionsArgs { + readonly studioId: StudioId + readonly type: PeripheralDeviceExternalEvent['type'] +} + +// eslint-disable-next-line @typescript-eslint/no-empty-object-type +interface ExternalEventSubscriptionsState {} + +interface ExternalEventSubscriptionsUpdateProps { + invalidateAll: true +} + +async function setupExternalEventSubscriptionsObservers( + args: ReadonlyDeep, + triggerUpdate: TriggerUpdate +): Promise { + const trigger = () => triggerUpdate({ invalidateAll: true }) + + return [ + // Observe active playlists in the studio — activation/deactivation changes which rundowns are in scope + RundownPlaylists.observeChanges( + { studioId: args.studioId }, + { added: trigger, changed: trigger, removed: trigger }, + { projection: rundownPlaylistFieldSpecifier } + ), + // Observe rundowns in the studio — react only when externalEventSubscriptions or playlistId changes + Rundowns.observeChanges( + { studioId: args.studioId }, + { added: trigger, changed: trigger, removed: trigger }, + { projection: rundownFieldSpecifier } + ), + ] +} + +async function manipulateExternalEventSubscriptionsData( + args: ReadonlyDeep, + _state: Partial, + collection: CustomPublishCollection, + _updateProps: Partial> | undefined +): Promise { + // Find all active playlists in the studio + const activePlaylists = (await RundownPlaylists.findFetchAsync( + { studioId: args.studioId, activationId: { $exists: true } }, + { projection: rundownPlaylistFieldSpecifier } + )) as Pick[] + const activePlaylistIds = activePlaylists.map((p) => p._id) + + // Find rundowns belonging to active playlists + const activeRundowns = (await Rundowns.findFetchAsync( + { studioId: args.studioId, playlistId: { $in: activePlaylistIds } }, + { projection: rundownFieldSpecifier } + )) as Pick[] + + // Build the set of valid IDs and the docs to upsert (filtered by type) + const validIds = new Set() + const subsToUpsert: ExternalEventSubscriptionDocument[] = [] + + for (const rundown of activeRundowns) { + for (const sub of rundown.externalEventSubscriptions ?? []) { + if (sub.type !== args.type) continue + + switch (sub.type) { + case 'tsr': { + const id = protectString( + getHash(`tsr_${sub.deviceId}_${sub.deviceType}_${String(sub.event)}`) + ) + validIds.add(id) + subsToUpsert.push({ + _id: id, + type: 'tsr', + deviceId: sub.deviceId, + deviceType: sub.deviceType, + event: sub.event as string, + }) + break + } + default: + assertNever(sub.type) + break + } + } + } + + // Remove docs for subscriptions that are no longer active + collection.remove((doc) => !validIds.has(doc._id)) + + // Upsert each individual subscription doc + for (const sub of subsToUpsert) { + collection.replace(sub) + } +} + +async function startOrJoinExternalEventSubscriptionsPublication( + pub: CustomPublish, + studioId: StudioId, + type: PeripheralDeviceExternalEvent['type'] +) { + await setUpCollectionOptimizedObserver< + ExternalEventSubscriptionDocument, + ExternalEventSubscriptionsArgs, + ExternalEventSubscriptionsState, + ExternalEventSubscriptionsUpdateProps + >( + `pub_${PeripheralDevicePubSub.externalEventSubscriptionsForDevice}_${studioId}_${type}`, + { studioId, type }, + setupExternalEventSubscriptionsObservers, + manipulateExternalEventSubscriptionsData, + pub, + 100 + ) +} + +meteorCustomPublish( + PeripheralDevicePubSub.externalEventSubscriptionsForDevice, + PeripheralDevicePubSubCollectionsNames.externalEventSubscriptions, + async function ( + pub: CustomPublish, + type: PeripheralDeviceExternalEvent['type'], + deviceId: PeripheralDeviceId, + token: string | undefined + ) { + check(deviceId, String) + check(type, String) + + const peripheralDevice = await checkAccessAndGetPeripheralDevice(deviceId, token, this) + + const studioId = peripheralDevice.studioAndConfigId?.studioId + if (!studioId) { + logger.warn( + `Publication ${PeripheralDevicePubSub.externalEventSubscriptionsForDevice}: device ${deviceId} has no studio` + ) + return + } + + await startOrJoinExternalEventSubscriptionsPublication(pub, studioId, type) + } +) diff --git a/meteor/server/publications/rundown.ts b/meteor/server/publications/rundown.ts index 6e4e5788858..b83748913c9 100644 --- a/meteor/server/publications/rundown.ts +++ b/meteor/server/publications/rundown.ts @@ -67,6 +67,7 @@ meteorPublish( { projection: { privateData: 0, + externalEventSubscriptions: 0, }, } ) @@ -90,6 +91,7 @@ meteorPublish( const modifier: FindOptions = { projection: { privateData: 0, + externalEventSubscriptions: 0, }, } @@ -112,6 +114,7 @@ meteorPublish( const modifier: FindOptions = { projection: { privateData: 0, + externalEventSubscriptions: 0, }, } diff --git a/meteor/server/worker/__tests__/jobQueue.test.ts b/meteor/server/worker/__tests__/jobQueue.test.ts index 93db5024fda..2977379faa3 100644 --- a/meteor/server/worker/__tests__/jobQueue.test.ts +++ b/meteor/server/worker/__tests__/jobQueue.test.ts @@ -651,6 +651,175 @@ describe('WorkerJobQueueManager', () => { }) }) + describe('mergeOrQueueJob', () => { + it('should enqueue a new job when the queue is empty', async () => { + const queueName = 'testQueue' + const jobName = 'mergeJob' + const generateData = jest.fn((_existing: unknown | null) => ({ value: 1 })) + + manager.mergeOrQueueJob(queueName, jobName, generateData) + + expect(generateData).toHaveBeenCalledTimes(1) + expect(generateData).toHaveBeenCalledWith(null) + + const job = await manager.getNextJob(queueName) + expect(job).not.toBeNull() + expect(job?.name).toBe(jobName) + expect(job?.data).toEqual({ value: 1 }) + }) + + it('should enqueue a new job when the tail has a different job name', async () => { + const queueName = 'testQueue' + const generateData = jest.fn((_existing: unknown | null) => ({ value: 42 })) + + await manager.queueJobWithoutResult(queueName, 'otherJob', { existing: true }, undefined) + + manager.mergeOrQueueJob(queueName, 'mergeJob', generateData) + + expect(generateData).toHaveBeenCalledWith(null) + + // Two jobs should be in the queue + const firstJob = await manager.getNextJob(queueName) + expect(firstJob?.name).toBe('otherJob') + + const secondJob = await manager.getNextJob(queueName) + expect(secondJob?.name).toBe('mergeJob') + expect(secondJob?.data).toEqual({ value: 42 }) + }) + + it('should merge data into the tail job when it has the same name', async () => { + const queueName = 'testQueue' + const jobName = 'mergeJob' + const initialData = { count: 1 } + const generateData = jest.fn((existing: unknown | null) => ({ + ...(existing as object), + count: ((existing as { count: number } | null)?.count ?? 0) + 1, + })) + + await manager.queueJobWithoutResult(queueName, jobName, initialData, undefined) + + manager.mergeOrQueueJob(queueName, jobName, generateData) + + expect(generateData).toHaveBeenCalledWith(initialData) + + // Should still be only one job in the queue + const firstJob = await manager.getNextJob(queueName) + expect(firstJob?.name).toBe(jobName) + expect(firstJob?.data).toEqual({ count: 2 }) + + const noMoreJobs = await manager.getNextJob(queueName) + expect(noMoreJobs).toBeNull() + }) + + it('should only merge with the tail job, not jobs earlier in the queue', async () => { + const queueName = 'testQueue' + const jobName = 'mergeJob' + const generateData = jest.fn((_existing: unknown | null) => ({ merged: true })) + + // Queue: [mergeJob (index 0), otherJob (index 1/tail)] + await manager.queueJobWithoutResult(queueName, jobName, { original: true }, undefined) + await manager.queueJobWithoutResult(queueName, 'otherJob', {}, undefined) + + // Tail is 'otherJob', not 'mergeJob' — should enqueue a new job + manager.mergeOrQueueJob(queueName, jobName, generateData) + + expect(generateData).toHaveBeenCalledWith(null) + + // Three jobs should be in the queue + const firstJob = await manager.getNextJob(queueName) + expect(firstJob?.name).toBe(jobName) + expect(firstJob?.data).toEqual({ original: true }) + + const secondJob = await manager.getNextJob(queueName) + expect(secondJob?.name).toBe('otherJob') + + const thirdJob = await manager.getNextJob(queueName) + expect(thirdJob?.name).toBe(jobName) + expect(thirdJob?.data).toEqual({ merged: true }) + }) + + it('should notify a waiting worker when a new job is enqueued', async () => { + const queueName = 'testQueue' + + // Worker starts waiting (queue is empty) + const waitPromise = manager.waitForNextJob(queueName) + + manager.mergeOrQueueJob(queueName, 'mergeJob', () => ({ value: 1 })) + + // Wait for deferred notification + await waitTime(10) + + await expect(waitPromise).resolves.toBeUndefined() + }) + + it('should not add a new job when merging into the tail', async () => { + const queueName = 'testQueue' + const jobName = 'mergeJob' + + await manager.queueJobWithoutResult(queueName, jobName, { value: 0 }, undefined) + + // Merge — should not add a second job + manager.mergeOrQueueJob(queueName, jobName, (existing) => ({ + ...(existing as object), + value: ((existing as { value: number } | null)?.value ?? 0) + 1, + })) + + const firstJob = await manager.getNextJob(queueName) + expect(firstJob?.name).toBe(jobName) + expect(firstJob?.data).toEqual({ value: 1 }) + + // Queue should now be empty + const noMoreJobs = await manager.getNextJob(queueName) + expect(noMoreJobs).toBeNull() + }) + + it('should enqueue a new job after the previous tail job has been consumed', async () => { + const queueName = 'testQueue' + const jobName = 'mergeJob' + + // First call: new job + manager.mergeOrQueueJob(queueName, jobName, () => ({ round: 1 })) + + // Consume the job + const firstJob = await manager.getNextJob(queueName) + expect(firstJob?.name).toBe(jobName) + expect(firstJob?.data).toEqual({ round: 1 }) + + // Second call: queue is empty again — should create a fresh job with null as existing + const generateData = jest.fn(() => ({ round: 2 })) + manager.mergeOrQueueJob(queueName, jobName, generateData) + + expect(generateData).toHaveBeenCalledWith(null) + + const secondJob = await manager.getNextJob(queueName) + expect(secondJob?.name).toBe(jobName) + expect(secondJob?.data).toEqual({ round: 2 }) + }) + + it('should accumulate multiple merges into the same tail job', async () => { + const queueName = 'testQueue' + const jobName = 'mergeJob' + + // First call creates a new job + manager.mergeOrQueueJob(queueName, jobName, () => ({ total: 1 })) + + // Subsequent calls should keep merging into the tail + manager.mergeOrQueueJob(queueName, jobName, (existing) => ({ + total: ((existing as { total: number } | null)?.total ?? 0) + 1, + })) + manager.mergeOrQueueJob(queueName, jobName, (existing) => ({ + total: ((existing as { total: number } | null)?.total ?? 0) + 1, + })) + + // Should be exactly one job with accumulated data + const job = await manager.getNextJob(queueName) + expect(job?.name).toBe(jobName) + expect(job?.data).toEqual({ total: 3 }) + + expect(await manager.getNextJob(queueName)).toBeNull() + }) + }) + describe('multiple queues', () => { it('should maintain separate queues for different queue names', async () => { const queue1 = 'queue1' diff --git a/meteor/server/worker/jobQueue.ts b/meteor/server/worker/jobQueue.ts index 459116cf103..24e024c9fb8 100644 --- a/meteor/server/worker/jobQueue.ts +++ b/meteor/server/worker/jobQueue.ts @@ -360,6 +360,46 @@ export class WorkerJobQueueManager { } this.#runningJobs.clear() } + + /** + * Merge new data into the last pending job in the queue if it has the given name, + * otherwise enqueue a new fire-and-forget job. + * + * This prevents queue flooding when high-frequency events arrive: as long as the tail job + * has not yet been picked up by the worker, incoming data is merged into it in-place. + * Once the worker starts executing it or another jobName gets queued, the next call will enqueue a fresh job. + * + * @param queueName - The target queue + * @param jobName - The job type name to match against + * @param generateData - Called to produce job data. Receives the existing job's data when + * merging into a tail job, or `null` when creating a new job. + */ + mergeOrQueueJob(queueName: string, jobName: string, generateData: (existing: unknown | null) => unknown): void { + const queue = this.#getOrCreateQueue(queueName) + + // Only inspect the very last entry. Scanning further back would risk updating a job in the + // middle of the queue, breaking the ordering guarantee relative to other job types. + const tail = queue.jobsHighPriority[queue.jobsHighPriority.length - 1] + if (tail && tail.spec.name === jobName) { + // Merge into the existing tail job in-place + tail.spec.data = generateData(tail.spec.data) + logger.debug(`Merged events into existing "${jobName}" job in queue "${queueName}"`) + } else { + // Tail is absent, already picked up, or a different job type — push a new job + const entry: JobEntry = { + spec: { + id: getRandomString(), + name: jobName, + data: generateData(null), + }, + completionHandler: null, + } + queue.jobsHighPriority.push(entry) + queue.metricsTotal.inc() + logger.debug(`Enqueued new "${jobName}" job in queue "${queueName}"`) + this.#notifyWorker(queue) + } + } } function generateCompletionHandler( diff --git a/meteor/server/worker/worker.ts b/meteor/server/worker/worker.ts index 6a813e0a3aa..d5b496ca45e 100644 --- a/meteor/server/worker/worker.ts +++ b/meteor/server/worker/worker.ts @@ -239,6 +239,33 @@ export async function QueueStudioJob( return queueManager.queueJobAndWrapResult(getStudioQueueName(studioId), jobName, jobParameters, now, options) } +/** + * Merge new job data into the last pending job in the studio's queue if it has the same name, + * otherwise enqueue a new fire-and-forget job. + * + * Use this instead of {@link QueueStudioJob} when events can be safely batched together and you + * do not need to await the result (e.g. device feedback events from gateways). + * + * @param jobName - The job type + * @param studioId - Id of the studio + * @param generateData - Called to produce the job data. Receives the existing pending job's data + * when merging, or `null` when creating a new job. Both cases are handled by the same function. + */ +export function QueueOrUpdateStudioJob( + jobName: T, + studioId: StudioId, + generateData: (existing: Parameters[0] | null) => Parameters[0] +): void { + if (isInTestWrite()) throw new Meteor.Error(404, 'Should not be reachable during startup tests') + if (!studioId) throw new Meteor.Error(500, 'Missing studioId') + + queueManager.mergeOrQueueJob( + getStudioQueueName(studioId), + jobName, + generateData as (existing: unknown | null) => unknown + ) +} + /** * Queue a job for ingest * @param jobName Job name diff --git a/meteor/yarn.lock b/meteor/yarn.lock index 607237f560d..17569f1a747 100644 --- a/meteor/yarn.lock +++ b/meteor/yarn.lock @@ -2421,7 +2421,7 @@ __metadata: dependencies: "@mos-connection/model": "npm:^5.0.0-alpha.0" kairos-lib: "npm:^1.0.0" - timeline-state-resolver-types: "npm:10.0.0-nightly-main-20260514-152958-5f6033589.0" + timeline-state-resolver-types: "npm:10.0.0-nightly-main-20260601-094843-59ce43391.0" tslib: "npm:^2.8.1" type-fest: "npm:^4.41.0" languageName: node @@ -12415,13 +12415,13 @@ __metadata: languageName: node linkType: hard -"timeline-state-resolver-types@npm:10.0.0-nightly-main-20260514-152958-5f6033589.0": - version: 10.0.0-nightly-main-20260514-152958-5f6033589.0 - resolution: "timeline-state-resolver-types@npm:10.0.0-nightly-main-20260514-152958-5f6033589.0" +"timeline-state-resolver-types@npm:10.0.0-nightly-main-20260601-094843-59ce43391.0": + version: 10.0.0-nightly-main-20260601-094843-59ce43391.0 + resolution: "timeline-state-resolver-types@npm:10.0.0-nightly-main-20260601-094843-59ce43391.0" dependencies: kairos-lib: "npm:1.0.0" tslib: "npm:^2.8.1" - checksum: 10/debb3faa9eda1f4ab2a85165c052cfc5b8698367d5a84be8b4095dbdaf6a40175e4c5ec80f09199e7650b53280e8c730f85bbd30685e08d3ea382ca08ee84e8b + checksum: 10/87a33b766a94e809cdcd3517f695c4e171bbb3a451ad390adbfc2ca31be8ccff0a3531e81f0a548b683d9fea964252043837b89c6cd12c7d27d0f7e9205b65f2 languageName: node linkType: hard diff --git a/packages/blueprints-integration/src/api/showStyle.ts b/packages/blueprints-integration/src/api/showStyle.ts index 0a9cab59014..5935853dd35 100644 --- a/packages/blueprints-integration/src/api/showStyle.ts +++ b/packages/blueprints-integration/src/api/showStyle.ts @@ -18,6 +18,7 @@ import type { IFixUpConfigContext, IOnTakeContext, IOnSetAsNextContext, + IExternalEventContext, IPlaylistSnapshotCreatedContext, IBlueprintPlaylistSnapshotInfo, } from '../context/index.js' @@ -54,6 +55,7 @@ import type { SofieIngestSegment } from '../ingest-types.js' import { PackageStatusMessage } from '@sofie-automation/shared-lib/dist/packageStatusMessages' import { BlueprintPlayoutPersistentStore } from '../context/playoutStore.js' import type { ITranslatableMessage } from '../translations.js' +import type { BlueprintExternalEvent, BlueprintExternalEventSubscription } from '../externalEvent.js' export { PackageStatusMessage } @@ -135,6 +137,17 @@ export interface ShowStyleBlueprintManifest< triggerMode?: string ) => Promise + /** + * Handle a batch of external events (e.g. TSR device state changes). + * Called when one or more events matching the rundown's {@link BlueprintResultRundown.externalEventSubscriptions} are received. + * Events are batched to avoid queuing a handler per event during bursts. + */ + onExternalEvent?: ( + context: IExternalEventContext, + playoutPersistentState: BlueprintPlayoutPersistentStore, + events: BlueprintExternalEvent[] + ) => Promise + /** Execute an action defined by an IBlueprintActionManifest */ executeAction?: ( context: IActionExecutionContext, @@ -307,6 +320,8 @@ export interface BlueprintResultRundown { globalActions: IBlueprintActionManifest[] globalPieces?: IBlueprintRundownPiece[] baseline: BlueprintResultBaseline + /** Subscriptions to external events (e.g. TSR device state changes) for this rundown */ + externalEventSubscriptions?: BlueprintExternalEventSubscription[] } export interface BlueprintResultSegment { segment: IBlueprintSegment diff --git a/packages/blueprints-integration/src/context/adlibActionContext.ts b/packages/blueprints-integration/src/context/adlibActionContext.ts index 00da91558df..836d3daae2e 100644 --- a/packages/blueprints-integration/src/context/adlibActionContext.ts +++ b/packages/blueprints-integration/src/context/adlibActionContext.ts @@ -1,11 +1,11 @@ -import type { DatastorePersistenceMode, Time } from '../common.js' +import type { DatastorePersistenceMode } from '../common.js' import type { IEventContext } from './index.js' import type { IShowStyleUserContext } from './showStyleContext.js' import { IPartAndPieceActionContext } from './partsAndPieceActionContext.js' import { IExecuteTSRActionsContext, ITriggerIngestChangeContext } from './executeTsrActionContext.js' -import { IBlueprintPart, IBlueprintPartInstance, IBlueprintPiece } from '../index.js' import { IRouteSetMethods } from './routeSetContext.js' import { ITTimersContext } from './tTimersContext.js' +import type { IPlayoutActionContext } from './playoutActionContext.js' /** Actions */ export interface IDataStoreMethods { @@ -30,23 +30,10 @@ export interface IActionExecutionContext IExecuteTSRActionsContext, ITriggerIngestChangeContext, IRouteSetMethods, - ITTimersContext { + ITTimersContext, + IPlayoutActionContext { /** Fetch the showstyle config for the specified part */ // getNextShowStyleConfig(): Readonly<{ [key: string]: ConfigItemValue }> - - /** Move the next part through the rundown. Can move by either a number of parts, or segments in either direction. */ - moveNextPart(partDelta: number, segmentDelta: number, ignoreQuickloop?: boolean): Promise - /** Set flag to perform take after executing the current action. Returns state of the flag after each call. */ - takeAfterExecuteAction(take: boolean): Promise - /** Inform core that a take out of the current partinstance should be blocked until the specified time */ - blockTakeUntil(time: Time | null): Promise - - /** Insert a queued part to follow the current part */ - queuePart(part: IBlueprintPart, pieces: IBlueprintPiece[]): Promise - - /** Insert a queued part to follow the taken part */ - queuePartAfterTake(part: IBlueprintPart, pieces: IBlueprintPiece[]): void - /** Misc actions */ // updateAction(newManifest: Pick): void // only updates itself. to allow for the next one to do something different // executePeripheralDeviceAction(deviceId: string, functionName: string, args: any[]): Promise diff --git a/packages/blueprints-integration/src/context/externalEventContext.ts b/packages/blueprints-integration/src/context/externalEventContext.ts new file mode 100644 index 00000000000..527f3cb24f5 --- /dev/null +++ b/packages/blueprints-integration/src/context/externalEventContext.ts @@ -0,0 +1,19 @@ +import type { IEventContext } from './index.js' +import type { IShowStyleUserContext } from './showStyleContext.js' +import type { IPartAndPieceActionContext } from './partsAndPieceActionContext.js' +import type { IExecuteTSRActionsContext, ITriggerIngestChangeContext } from './executeTsrActionContext.js' +import type { IRouteSetMethods } from './routeSetContext.js' +import type { IDataStoreMethods } from './adlibActionContext.js' +import type { IPlayoutActionContext } from './playoutActionContext.js' + +/** Context provided to the blueprint's `onExternalEvent` handler. */ +export interface IExternalEventContext + extends + IShowStyleUserContext, + IEventContext, + IDataStoreMethods, + IPartAndPieceActionContext, + IExecuteTSRActionsContext, + ITriggerIngestChangeContext, + IRouteSetMethods, + IPlayoutActionContext {} diff --git a/packages/blueprints-integration/src/context/index.ts b/packages/blueprints-integration/src/context/index.ts index a87d44f1cd3..78fe281ccdd 100644 --- a/packages/blueprints-integration/src/context/index.ts +++ b/packages/blueprints-integration/src/context/index.ts @@ -1,7 +1,9 @@ export * from './adlibActionContext.js' export * from './baseContext.js' export * from './eventContext.js' +export * from './externalEventContext.js' export * from './fixUpConfigContext.js' +export * from './playoutActionContext.js' export * from './onSetAsNextContext.js' export * from './onTakeContext.js' export * from './packageInfoContext.js' diff --git a/packages/blueprints-integration/src/context/playoutActionContext.ts b/packages/blueprints-integration/src/context/playoutActionContext.ts new file mode 100644 index 00000000000..6f27bde200f --- /dev/null +++ b/packages/blueprints-integration/src/context/playoutActionContext.ts @@ -0,0 +1,15 @@ +import type { IBlueprintPart, IBlueprintPartInstance, IBlueprintPiece } from '../index.js' + +/** + * The playout-action methods shared between {@link IActionExecutionContext} and {@link IExternalEventContext}. + */ +export interface IPlayoutActionContext { + /** Move the next part through the rundown. Can move by either a number of parts, or segments in either direction. */ + moveNextPart(partDelta: number, segmentDelta: number, ignoreQuickloop?: boolean): Promise + /** Set flag to perform a take after the current handler completes. Returns state of the flag after each call. */ + takeAfterExecuteAction(take: boolean): Promise + /** Insert a queued part to follow the current part */ + queuePart(part: IBlueprintPart, pieces: IBlueprintPiece[]): Promise + /** Insert a queued part to follow the taken part */ + queuePartAfterTake(part: IBlueprintPart, pieces: IBlueprintPiece[]): void +} diff --git a/packages/blueprints-integration/src/externalEvent.ts b/packages/blueprints-integration/src/externalEvent.ts new file mode 100644 index 00000000000..96e5c5794ae --- /dev/null +++ b/packages/blueprints-integration/src/externalEvent.ts @@ -0,0 +1,40 @@ +import type { TSR } from '@sofie-automation/shared-lib/dist/tsr' + +/** + * A TSR state event, wrapped with a `type` discriminant so it can be distinguished + * from other future event sources in the {@link BlueprintExternalEvent} union. + */ +export type BlueprintExternalTSREvent = TSR.SomeTSRStateEvent & { type: 'tsr' } + +/** + * An event from an external source, received by the blueprint's {@link onExternalEvent} handler. + * + * Currently only TSR state events are supported. This union will be extended in future to cover + * other event sources + */ +export type BlueprintExternalEvent = BlueprintExternalTSREvent + +export type TSREventDeviceType = { + [K in keyof TSR.TSREventTypesMap]: TSR.TSREventTypesMap[K] extends never ? never : K +}[keyof TSR.TSREventTypesMap] + +/** A subscription to a single named event on a TSR playout device */ +export type TSRExternalEventSubscription = { + type: 'tsr' + /** The id of the playout device, e.g. `'atem0'` */ + deviceId: string + /** The type of the playout device */ + deviceType: TDevice + /** The event key to subscribe to, e.g. `'me.0.programInput'` */ + event: keyof TSR.TSREventTypesMap[TDevice] +} + +/** + * A subscription to a named event on any TSR playout device. + * + * This is a discriminated union over all known device types, so `deviceType` and `event` + * are always correlated — the compiler will reject an ATEM event key on an Abstract device, etc. + */ +export type BlueprintExternalEventSubscription = { + [TDevice in TSREventDeviceType]: TSRExternalEventSubscription +}[TSREventDeviceType] diff --git a/packages/blueprints-integration/src/index.ts b/packages/blueprints-integration/src/index.ts index ac46527404c..c73d6280872 100644 --- a/packages/blueprints-integration/src/index.ts +++ b/packages/blueprints-integration/src/index.ts @@ -5,6 +5,7 @@ export * from './common.js' export * from './content.js' export * from './context/index.js' export * from './documents/index.js' +export * from './externalEvent.js' export * from './ingest.js' export * from './ingest-types.js' export * from './lib.js' diff --git a/packages/corelib/src/dataModel/Rundown.ts b/packages/corelib/src/dataModel/Rundown.ts index b288fe10dd0..5299e8c5792 100644 --- a/packages/corelib/src/dataModel/Rundown.ts +++ b/packages/corelib/src/dataModel/Rundown.ts @@ -1,4 +1,8 @@ -import { RundownPlaylistTiming, Time } from '@sofie-automation/blueprints-integration' +import { + BlueprintExternalEventSubscription, + RundownPlaylistTiming, + Time, +} from '@sofie-automation/blueprints-integration' import { RundownId, StudioId, @@ -84,6 +88,9 @@ export interface Rundown { * User editing definitions for this rundown */ userEditOperations?: CoreUserEditingDefinition[] + + /** Subscriptions to external device events, as declared by the blueprint */ + externalEventSubscriptions?: BlueprintExternalEventSubscription[] } /** A description of where a Rundown originated from */ diff --git a/packages/corelib/src/worker/studio.ts b/packages/corelib/src/worker/studio.ts index 715fbbd3b4f..c6af340fed0 100644 --- a/packages/corelib/src/worker/studio.ts +++ b/packages/corelib/src/worker/studio.ts @@ -1,4 +1,5 @@ import { PlayoutChangedResults } from '@sofie-automation/shared-lib/dist/peripheralDevice/peripheralDeviceAPI' +import type { PeripheralDeviceExternalEvent } from '@sofie-automation/shared-lib/dist/peripheralDevice/externalEvents' import { AdLibActionId, BucketAdLibActionId, @@ -127,6 +128,11 @@ export enum StudioJobs { * ( typically when using the "now"-feature ) */ OnTimelineTriggerTime = 'onTimelineTriggerTime', + /** + * Called by a gateway when a playout device emits an external event. + * Events from multiple gateways or rapid bursts are merged into a single job. + */ + OnExternalEvents = 'onExternalEvents', /** * Recalculate T-Timer projections based on current playlist state @@ -348,6 +354,9 @@ export interface OnPlayoutPlaybackChangedProps extends RundownPlayoutPropsBase { export interface OnTimelineTriggerTimeProps { results: Array<{ id: string; time: number }> } +export interface OnExternalEventsProps { + events: PeripheralDeviceExternalEvent[] +} export type OrderRestoreToDefaultProps = RundownPlayoutPropsBase export interface OrderMoveRundownToPlaylistProps { @@ -511,6 +520,7 @@ export type StudioJobFunc = { [StudioJobs.OnPlayoutPlaybackChanged]: (data: OnPlayoutPlaybackChangedProps) => void [StudioJobs.OnTimelineTriggerTime]: (data: OnTimelineTriggerTimeProps) => void + [StudioJobs.OnExternalEvents]: (data: OnExternalEventsProps) => void [StudioJobs.RecalculateTTimerProjections]: () => void diff --git a/packages/job-worker/src/ingest/generationRundown.ts b/packages/job-worker/src/ingest/generationRundown.ts index 113f6dfb424..be540a1656f 100644 --- a/packages/job-worker/src/ingest/generationRundown.ts +++ b/packages/job-worker/src/ingest/generationRundown.ts @@ -299,7 +299,8 @@ export async function regenerateRundownAndBaselineFromIngestData( showStyleBlueprint, rundownSource, rundownNotes, - translateUserEditsFromBlueprint(rundownRes.rundown.userEditOperations, translationNamespaces) + translateUserEditsFromBlueprint(rundownRes.rundown.userEditOperations, translationNamespaces), + rundownRes.externalEventSubscriptions ) // get the rundown separetely to ensure it exists now diff --git a/packages/job-worker/src/ingest/model/IngestModel.ts b/packages/job-worker/src/ingest/model/IngestModel.ts index 946237b8571..95745f4187c 100644 --- a/packages/job-worker/src/ingest/model/IngestModel.ts +++ b/packages/job-worker/src/ingest/model/IngestModel.ts @@ -24,7 +24,7 @@ import { RundownNote } from '@sofie-automation/corelib/dist/dataModel/Notes' import { DBSegment } from '@sofie-automation/corelib/dist/dataModel/Segment' import { ProcessedShowStyleBase, ProcessedShowStyleVariant } from '../../jobs/showStyle.js' import { WrappedShowStyleBlueprint } from '../../blueprints/cache.js' -import { IBlueprintRundown } from '@sofie-automation/blueprints-integration' +import type { BlueprintExternalEventSubscription, IBlueprintRundown } from '@sofie-automation/blueprints-integration' import type { INotificationsModel } from '../../notifications/NotificationsModel.js' import type { IngestExpectedPackage } from './IngestExpectedPackage.js' @@ -214,7 +214,8 @@ export interface IngestModel extends IngestModelReadonly, BaseModel, INotificati showStyleBlueprint: ReadonlyDeep, source: RundownSource, rundownNotes: RundownNote[], - userEdits: CoreUserEditingDefinition[] | undefined + userEdits: CoreUserEditingDefinition[] | undefined, + externalEventSubscriptions: BlueprintExternalEventSubscription[] | undefined ): ReadonlyDeep /** diff --git a/packages/job-worker/src/ingest/model/implementation/IngestModelImpl.ts b/packages/job-worker/src/ingest/model/implementation/IngestModelImpl.ts index 47b916a9322..0f319d8d248 100644 --- a/packages/job-worker/src/ingest/model/implementation/IngestModelImpl.ts +++ b/packages/job-worker/src/ingest/model/implementation/IngestModelImpl.ts @@ -50,7 +50,7 @@ import { RundownNote } from '@sofie-automation/corelib/dist/dataModel/Notes' import { diffAndReturnLatestObjects } from './utils.js' import _ from 'underscore' import { protectString } from '@sofie-automation/corelib/dist/protectedString' -import { IBlueprintRundown } from '@sofie-automation/blueprints-integration' +import type { BlueprintExternalEventSubscription, IBlueprintRundown } from '@sofie-automation/blueprints-integration' import { getCurrentTime, getSystemVersion } from '../../../lib/index.js' import { WrappedShowStyleBlueprint } from '../../../blueprints/cache.js' import { SaveIngestModelHelper } from './SaveIngestModel.js' @@ -409,7 +409,8 @@ export class IngestModelImpl implements IngestModel, IngestDatabasePersistedMode showStyleBlueprint: ReadonlyDeep, source: RundownSource, rundownNotes: RundownNote[], - userEditOperations: CoreUserEditingDefinition[] | undefined + userEditOperations: CoreUserEditingDefinition[] | undefined, + externalEventSubscriptions: BlueprintExternalEventSubscription[] | undefined ): ReadonlyDeep { const newRundown = literal>({ ...clone(rundownData as Complete), @@ -420,6 +421,7 @@ export class IngestModelImpl implements IngestModel, IngestDatabasePersistedMode showStyleVariantId: showStyleVariant._id, showStyleBaseId: showStyleBase._id, userEditOperations: clone(userEditOperations), + externalEventSubscriptions: clone(externalEventSubscriptions), orphaned: undefined, importVersions: { diff --git a/packages/job-worker/src/playout/adlibAction.ts b/packages/job-worker/src/playout/adlibAction.ts index 8d9f01661c5..f1c2aed0ddd 100644 --- a/packages/job-worker/src/playout/adlibAction.ts +++ b/packages/job-worker/src/playout/adlibAction.ts @@ -309,12 +309,12 @@ export async function executeActionInner( } } -async function applyAnyExecutionSideEffects( +export async function applyAnyExecutionSideEffects( context: JobContext, playoutModel: PlayoutModel, actionContext: ActionExecutionContext, now: number -) { +): Promise { await applyActionSideEffects(context, playoutModel, actionContext) if (actionContext.takeAfterExecute) { @@ -372,13 +372,13 @@ async function executeDataStoreAction( } } -function storeNotificationsForCategory( +export function storeNotificationsForCategory( notificationHelper: INotificationsModel, notificationCategory: string, blueprintId: BlueprintId, notes: INoteBase[], partInstanceInfo: SelectedPartInstance | null -) { +): void { for (const note of notes) { notificationHelper.setNotification(notificationCategory, { ...convertNoteToNotification(note, [blueprintId]), diff --git a/packages/job-worker/src/playout/externalEvents.ts b/packages/job-worker/src/playout/externalEvents.ts new file mode 100644 index 00000000000..86fd0389e79 --- /dev/null +++ b/packages/job-worker/src/playout/externalEvents.ts @@ -0,0 +1,127 @@ +import { OnExternalEventsProps } from '@sofie-automation/corelib/dist/worker/studio' +import { PeripheralDeviceExternalEvent } from '@sofie-automation/shared-lib/dist/peripheralDevice/externalEvents' +import { logger } from '../logging.js' +import { JobContext } from '../jobs/index.js' +import { PlayoutModel } from './model/PlayoutModel.js' +import { runJobWithPlayoutModel } from './lock.js' +import { runJobWithStudioPlayoutModel } from '../studio/lock.js' +import { ActionExecutionContext } from '../blueprints/context/adlibActions.js' +import { WatchedPackagesHelper } from '../blueprints/context/watchedPackages.js' +import { PartAndPieceInstanceActionService } from '../blueprints/context/services/PartAndPieceInstanceActionService.js' +import { PersistentPlayoutStateStore } from '../blueprints/context/services/PersistantStateStore.js' +import { applyAnyExecutionSideEffects, storeNotificationsForCategory } from './adlibAction.js' +import { getCurrentTime } from '../lib/index.js' +import { getRandomId } from '@sofie-automation/corelib/dist/lib' +import { BlueprintExternalEvent } from '@sofie-automation/blueprints-integration' + +/** + * Called by sofie-core when one or more external events have been received from a gateway. + * + * Events from multiple gateways, or from rapid bursts on a single gateway, are merged into a + * single job invocation by the queue manager to prevent flooding. + * + * For each active playlist in the studio, the show-style blueprint's `onExternalEvent` handler + * is invoked if it is defined. + */ +export async function handleOnExternalEvents(context: JobContext, data: OnExternalEventsProps): Promise { + if (!data.events.length) return + + logger.debug(`handleOnExternalEvents: received ${data.events.length} event(s)`) + + await runJobWithStudioPlayoutModel(context, async (studioPlayoutModel) => { + const activePlaylists = studioPlayoutModel.getActiveRundownPlaylists() + if (activePlaylists.length === 0) { + logger.debug('handleOnExternalEvents: no active playlists — events discarded') + return + } + + for (const playlist of activePlaylists) { + await runJobWithPlayoutModel(context, { playlistId: playlist._id }, null, async (playoutModel) => { + await executeOnExternalEventsForPlaylist(context, playoutModel, data.events) + }) + } + }) +} + +async function executeOnExternalEventsForPlaylist( + context: JobContext, + playoutModel: PlayoutModel, + wireEvents: PeripheralDeviceExternalEvent[] +): Promise { + const playlist = playoutModel.playlist + + const activePartInfo = playlist.currentPartInfo ?? playlist.nextPartInfo + if (!activePartInfo) { + logger.error( + `handleOnExternalEvents: playlist "${playlist._id}" has neither currentPartInfo nor nextPartInfo — events will be lost` + ) + return + } + + const currentRundown = playoutModel.getRundown(activePartInfo.rundownId) + if (!currentRundown) { + logger.error( + `executeOnExternalEventsForPlaylist: rundown "${activePartInfo.rundownId}" not found in playlist "${playlist._id}" — events will be lost` + ) + return + } + + const showStyle = await context.getShowStyleCompound( + currentRundown.rundown.showStyleVariantId, + currentRundown.rundown.showStyleBaseId + ) + const blueprint = await context.getShowStyleBlueprint(showStyle._id) + + if (!blueprint.blueprint.onExternalEvent) { + logger.debug( + `executeOnExternalEventsForPlaylist: blueprint for show style "${showStyle._id}" has no onExternalEvent handler — events discarded` + ) + return + } + + logger.debug( + `executeOnExternalEventsForPlaylist: invoking onExternalEvent for playlist "${playlist._id}" with ${wireEvents.length} event(s): ${wireEvents.map((e) => `${e.type}/${(e as { event?: string }).event ?? '?'}`).join(', ')}` + ) + + const now = getCurrentTime() + + // Future: This may want to become a different context, but for now the types align cleanly + const actionContext = new ActionExecutionContext( + { + name: `${currentRundown.rundown.name}(${playlist.name})`, + identifier: `playlist=${playlist._id},rundown=${currentRundown.rundown._id},activePartInstance=${ + activePartInfo.partInstanceId + },execution=${getRandomId()}`, + }, + context, + playoutModel, + showStyle, + context.getShowStyleBlueprintConfig(showStyle), + WatchedPackagesHelper.empty(context), + new PartAndPieceInstanceActionService(context, playoutModel, showStyle, currentRundown) + ) + + const persistentState = new PersistentPlayoutStateStore( + playlist.privatePlayoutPersistentState, + playlist.publicPlayoutPersistentState + ) + + // Cast the wire events to blueprint events. + // PeripheralDeviceExternalTSREvent has the same shape as BlueprintExternalTSREvent, but uses + // `deviceType: string` (loose) rather than the strongly-typed TSR enum, to accommodate custom + // TSR plugin device types that do not appear in the closed enum. + const blueprintEvents = wireEvents as unknown as BlueprintExternalEvent[] + + await blueprint.blueprint.onExternalEvent(actionContext, persistentState, blueprintEvents) + persistentState.saveToModel(playoutModel) + + storeNotificationsForCategory( + playoutModel, + `externalEvent:${getRandomId()}`, + blueprint.blueprintId, + actionContext.notes, + playlist.currentPartInfo ?? playlist.nextPartInfo + ) + + await applyAnyExecutionSideEffects(context, playoutModel, actionContext, now) +} diff --git a/packages/job-worker/src/workers/studio/jobs.ts b/packages/job-worker/src/workers/studio/jobs.ts index f8d13060295..98c8c95d8e3 100644 --- a/packages/job-worker/src/workers/studio/jobs.ts +++ b/packages/job-worker/src/workers/studio/jobs.ts @@ -43,6 +43,7 @@ import { handleBlueprintValidateConfigForStudio, } from '../../playout/upgrade.js' import { handleTimelineTriggerTime, handleOnPlayoutPlaybackChanged } from '../../playout/timings/index.js' +import { handleOnExternalEvents } from '../../playout/externalEvents.js' import { handleExecuteAdlibAction } from '../../playout/adlibAction.js' import { handleTakeNextPart } from '../../playout/take.js' import { handleClearQuickLoopMarkers, handleSetQuickLoopMarker } from '../../playout/quickLoopMarkers.js' @@ -99,6 +100,7 @@ export const studioJobHandlers: StudioJobHandlers = { [StudioJobs.OnPlayoutPlaybackChanged]: handleOnPlayoutPlaybackChanged, [StudioJobs.OnTimelineTriggerTime]: handleTimelineTriggerTime, + [StudioJobs.OnExternalEvents]: handleOnExternalEvents, [StudioJobs.RecalculateTTimerProjections]: handleRecalculateTTimerProjections, diff --git a/packages/playout-gateway/package.json b/packages/playout-gateway/package.json index 3fa88f4bf64..b98191a1606 100644 --- a/packages/playout-gateway/package.json +++ b/packages/playout-gateway/package.json @@ -55,7 +55,7 @@ "@sofie-automation/shared-lib": "26.3.0-2", "debug": "^4.4.3", "influx": "^5.12.0", - "timeline-state-resolver": "10.0.0-nightly-main-20260514-152958-5f6033589.0", + "timeline-state-resolver": "10.0.0-nightly-main-20260601-094843-59ce43391.0", "tslib": "^2.8.1", "underscore": "^1.13.8", "winston": "^3.19.0" diff --git a/packages/playout-gateway/src/coreHandler.ts b/packages/playout-gateway/src/coreHandler.ts index c310eb1046a..541d1fa136f 100644 --- a/packages/playout-gateway/src/coreHandler.ts +++ b/packages/playout-gateway/src/coreHandler.ts @@ -116,6 +116,11 @@ export class CoreHandler implements ICoreHandler { this.core.autoSubscribe(PeripheralDevicePubSub.peripheralDeviceCommands, this.core.deviceId), this.core.autoSubscribe(PeripheralDevicePubSub.rundownsForDevice, this.core.deviceId), this.core.autoSubscribe(PeripheralDevicePubSub.expectedPlayoutItemsForDevice, this.core.deviceId), + this.core.autoSubscribe( + PeripheralDevicePubSub.externalEventSubscriptionsForDevice, + 'tsr', + this.core.deviceId + ), ]) this.logger.info('Core: Subscriptions are set up!') diff --git a/packages/playout-gateway/src/tsrHandler.ts b/packages/playout-gateway/src/tsrHandler.ts index a592d9ba2d7..e2210e22221 100644 --- a/packages/playout-gateway/src/tsrHandler.ts +++ b/packages/playout-gateway/src/tsrHandler.ts @@ -82,6 +82,7 @@ export class TSRHandler { // private _config: TSRConfig private _coreHandler!: CoreHandler private _triggerupdateExpectedPlayoutItemsTimeout: NodeJS.Timeout | null = null + private _triggerUpdateEventSubscriptionsTimeout: NodeJS.Timeout | null = null private _coreTsrHandlers: { [deviceId: string]: CoreTSRDeviceHandler } = {} private _observers: Array> = [] private _cachedStudioId: StudioId | null = null @@ -249,6 +250,7 @@ export class TSRHandler { ) this._triggerupdateExpectedPlayoutItems() // So that any recently created devices will get all the ExpectedPlayoutItems + this._triggerUpdateEventSubscriptions() // Ensure new device gets current external event subscriptions }) this.tsr.connectionManager.on('connectionInitialised', (id) => { @@ -456,6 +458,14 @@ export class TSRHandler { this.tsr.connectionManager.on('connectionEvent:timeTrace', (_id, trace) => { sendTrace(trace) }) + this.tsr.connectionManager.on('connectionEvent:stateEvent', (_id, events) => { + this.logger.debug( + `connectionEvent:stateEvent: received ${events.length} event(s) from device "${_id}": ${events.map((e) => e.event).join(', ')}` + ) + this._coreHandler.core.coreMethods + .reportExternalEvents(events.map((e) => ({ ...e, type: 'tsr' as const }))) + .catch((e: unknown) => this.logger.error('Error when reporting external events to core', e)) + }) } private setupObservers(): void { @@ -539,6 +549,20 @@ export class TSRHandler { this._triggerUpdateDatastore() } this._observers.push(timelineDatastoreObserver) + + const externalEventSubscriptionsObserver = this._coreHandler.core.observe( + PeripheralDevicePubSubCollectionsNames.externalEventSubscriptions + ) + externalEventSubscriptionsObserver.added = () => { + this._triggerUpdateEventSubscriptions() + } + externalEventSubscriptionsObserver.changed = () => { + this._triggerUpdateEventSubscriptions() + } + externalEventSubscriptionsObserver.removed = () => { + this._triggerUpdateEventSubscriptions() + } + this._observers.push(externalEventSubscriptionsObserver) } private resendStatuses(): void { _.each(this._coreTsrHandlers, (tsrHandler) => { @@ -737,6 +761,7 @@ export class TSRHandler { } this.tsr.connectionManager.setConnections(connections) + this._triggerUpdateEventSubscriptions() // Re-apply subscriptions after connection set changes } } @@ -838,6 +863,55 @@ export class TSRHandler { if (!this._initialized) return this._updateDatastore().catch((e) => this.logger.error('Error in _updateDatastore', e)) } + private _triggerUpdateEventSubscriptions() { + if (!this._initialized) return + if (this._triggerUpdateEventSubscriptionsTimeout) { + clearTimeout(this._triggerUpdateEventSubscriptionsTimeout) + } + this._triggerUpdateEventSubscriptionsTimeout = setTimeout(() => { + this._updateEventSubscriptions().catch((e) => { + this.logger.error('Error in _updateEventSubscriptions', e) + }) + }, 200) + } + private async _updateEventSubscriptions() { + const subscriptionDocs = this._coreHandler.core + .getCollection(PeripheralDevicePubSubCollectionsNames.externalEventSubscriptions) + .find({}) + + this.logger.debug(`_updateEventSubscriptions: ${subscriptionDocs.length} subscription doc(s) in collection`) + + // Aggregate subscriptions, group by deviceId + const subscriptionsByDeviceId = new Map>() + for (const sub of subscriptionDocs) { + if (sub.type !== 'tsr') continue + + let events = subscriptionsByDeviceId.get(sub.deviceId) + if (!events) { + events = new Set() + subscriptionsByDeviceId.set(sub.deviceId, events) + } + events.add(sub.event) + } + + if (subscriptionsByDeviceId.size === 0) { + this.logger.debug('_updateEventSubscriptions: no subscriptions — clearing all devices') + } + + await Promise.allSettled( + _.map(this.tsr.connectionManager.getConnections(), async (container) => { + const events = subscriptionsByDeviceId.get(container.deviceId) ?? new Set() + this.logger.debug( + `_updateEventSubscriptions: setting ${events.size} event subscription(s) on device "${container.deviceId}": [${Array.from(events).join(', ')}]` + ) + await container.device.setEventSubscriptions(Array.from(events)).catch((e) => { + this.logger.error( + `Error setting event subscriptions for device "${container.deviceId}": ${stringifyError(e)}` + ) + }) + }) + ) + } private async _updateDatastore() { const datastoreCollection = this._coreHandler.core.getCollection( PeripheralDevicePubSubCollectionsNames.timelineDatastore diff --git a/packages/shared-lib/package.json b/packages/shared-lib/package.json index 3f403ade106..8a9468376c1 100644 --- a/packages/shared-lib/package.json +++ b/packages/shared-lib/package.json @@ -51,7 +51,7 @@ "dependencies": { "@mos-connection/model": "^5.0.0-alpha.0", "kairos-lib": "^1.0.0", - "timeline-state-resolver-types": "10.0.0-nightly-main-20260514-152958-5f6033589.0", + "timeline-state-resolver-types": "10.0.0-nightly-main-20260601-094843-59ce43391.0", "tslib": "^2.8.1", "type-fest": "^4.41.0" }, diff --git a/packages/shared-lib/src/peripheralDevice/externalEvents.ts b/packages/shared-lib/src/peripheralDevice/externalEvents.ts new file mode 100644 index 00000000000..331bbe4b571 --- /dev/null +++ b/packages/shared-lib/src/peripheralDevice/externalEvents.ts @@ -0,0 +1,44 @@ +/** + * A subscription to a single named event on a TSR playout device. + * + * Typed loosely (plain `string` fields) so that shared-lib does not depend on blueprints-integration or TSR. + */ +export interface PeripheralDeviceExternalTSREventSubscription { + type: 'tsr' + /** The id of the playout device, e.g. `'atem0'` */ + deviceId: string + /** + * The type of the playout device, e.g. `'ATEM'`. + * Typed as `string` rather than `TSR.DeviceType` to accommodate custom plugin device types. + */ + deviceType: string + /** The event key, e.g. `'me.0.programInput'` */ + event: string +} + +/** + * A subscription to an external device event, as declared by a blueprint. + * + * This is the shared-lib mirror of `BlueprintExternalEventSubscription`. + */ +export type PeripheralDeviceExternalEventSubscription = PeripheralDeviceExternalTSREventSubscription + +/** + * A TSR device state event as reported over the wire from a gateway. + * + * Extends the subscription type by adding the event payload. + * `deviceType` is a plain `string` rather than `TSR.DeviceType` because TSR plugins can define + * custom device types, and shared-lib deliberately avoids a hard dependency on TSR types. + */ +export interface PeripheralDeviceExternalTSREvent extends PeripheralDeviceExternalTSREventSubscription { + /** The event payload. Opaque on the wire; cast to the appropriate type in the job-worker. */ + payload: unknown +} + +/** + * An external event received from a gateway over the DDP wire. + * + * This is a discriminated union so that additional event sources can be added in future + * without breaking existing consumers. + */ +export type PeripheralDeviceExternalEvent = PeripheralDeviceExternalTSREvent diff --git a/packages/shared-lib/src/peripheralDevice/methodsAPI.ts b/packages/shared-lib/src/peripheralDevice/methodsAPI.ts index bf7875a35cd..5b9b09aae3d 100644 --- a/packages/shared-lib/src/peripheralDevice/methodsAPI.ts +++ b/packages/shared-lib/src/peripheralDevice/methodsAPI.ts @@ -32,6 +32,7 @@ import type { TimeDiff, PlayoutChangedResults, } from './peripheralDeviceAPI.js' +import type { PeripheralDeviceExternalEvent } from './externalEvents.js' import type { MediaObject } from '../core/model/MediaObjects.js' export type UpdateExpectedPackageWorkStatusesChanges = @@ -96,6 +97,11 @@ export interface NewPeripheralDeviceAPI { ping(deviceId: PeripheralDeviceId, deviceToken: string): Promise getPeripheralDevice(deviceId: PeripheralDeviceId, deviceToken: string): Promise playoutPlaybackChanged(deviceId: PeripheralDeviceId, deviceToken: string, r: PlayoutChangedResults): Promise + reportExternalEvents( + deviceId: PeripheralDeviceId, + deviceToken: string, + events: PeripheralDeviceExternalEvent[] + ): Promise pingWithCommand( deviceId: PeripheralDeviceId, deviceToken: string, @@ -367,6 +373,8 @@ export enum PeripheralDeviceAPIMethods { 'playoutPlaybackChanged' = 'peripheralDevice.playout.playbackChanged', + 'reportExternalEvents' = 'peripheralDevice.playout.reportExternalEvents', + 'getDebugStates' = 'peripheralDevice.playout.getDebugStates', // 'reportCommandError' = 'peripheralDevice.playout.reportCommandError', diff --git a/packages/shared-lib/src/pubsub/peripheralDevice.ts b/packages/shared-lib/src/pubsub/peripheralDevice.ts index 5aa1af459bd..0af14f24721 100644 --- a/packages/shared-lib/src/pubsub/peripheralDevice.ts +++ b/packages/shared-lib/src/pubsub/peripheralDevice.ts @@ -11,6 +11,11 @@ import type { PeripheralDeviceCommand } from '../core/model/PeripheralDeviceComm import type { ExpectedPlayoutItemPeripheralDevice } from '../expectedPlayoutItem.js' import type { DeviceTriggerMountedAction, PreviewWrappedAdLib } from '../input-gateway/deviceTriggerPreviews.js' import type { IngestRundownStatus } from '../ingest/rundownStatus.js' +import type { + PeripheralDeviceExternalEvent, + PeripheralDeviceExternalEventSubscription, +} from '../peripheralDevice/externalEvents.js' +import type { ProtectedString } from '../lib/protectedString.js' /** * Ids of possible DDP subscriptions for any PeripheralDevice. @@ -59,6 +64,11 @@ export enum PeripheralDevicePubSub { * Ingest status of rundowns for a PeripheralDevice */ ingestDeviceRundownStatus = 'ingestDeviceRundownStatus', + + // Playout gateway (external event subscriptions): + + /** External event subscriptions from blueprints for the Studio */ + externalEventSubscriptionsForDevice = 'externalEventSubscriptionsForDevice', } /** @@ -127,6 +137,17 @@ export interface PeripheralDevicePubSubTypes { deviceId: PeripheralDeviceId, token?: string ) => PeripheralDevicePubSubCollectionsNames.ingestRundownStatus + [PeripheralDevicePubSub.externalEventSubscriptionsForDevice]: ( + type: PeripheralDeviceExternalEvent['type'], + deviceId: PeripheralDeviceId, + token?: string + ) => PeripheralDevicePubSubCollectionsNames.externalEventSubscriptions +} + +/** An individual external device event subscription, as published by the server for the playout gateway */ +export type ExternalEventSubscriptionId = ProtectedString<'ExternalEventSubscriptionId'> +export type ExternalEventSubscriptionDocument = PeripheralDeviceExternalEventSubscription & { + _id: ExternalEventSubscriptionId } export enum PeripheralDevicePubSubCollectionsNames { @@ -149,6 +170,7 @@ export enum PeripheralDevicePubSubCollectionsNames { packageManagerExpectedPackages = 'packageManagerExpectedPackages', ingestRundownStatus = 'ingestRundownStatus', + externalEventSubscriptions = 'externalEventSubscriptions', } export type PeripheralDevicePubSubCollections = { @@ -171,4 +193,5 @@ export type PeripheralDevicePubSubCollections = { [PeripheralDevicePubSubCollectionsNames.packageManagerExpectedPackages]: PackageManagerExpectedPackage [PeripheralDevicePubSubCollectionsNames.ingestRundownStatus]: IngestRundownStatus + [PeripheralDevicePubSubCollectionsNames.externalEventSubscriptions]: ExternalEventSubscriptionDocument } diff --git a/packages/yarn.lock b/packages/yarn.lock index cc938b85e5c..2f4364a1964 100644 --- a/packages/yarn.lock +++ b/packages/yarn.lock @@ -7142,7 +7142,7 @@ __metadata: dependencies: "@mos-connection/model": "npm:^5.0.0-alpha.0" kairos-lib: "npm:^1.0.0" - timeline-state-resolver-types: "npm:10.0.0-nightly-main-20260514-152958-5f6033589.0" + timeline-state-resolver-types: "npm:10.0.0-nightly-main-20260601-094843-59ce43391.0" tslib: "npm:^2.8.1" type-fest: "npm:^4.41.0" languageName: unknown @@ -23576,7 +23576,7 @@ asn1@evs-broadcast/node-asn1: "@sofie-automation/shared-lib": "npm:26.3.0-2" debug: "npm:^4.4.3" influx: "npm:^5.12.0" - timeline-state-resolver: "npm:10.0.0-nightly-main-20260514-152958-5f6033589.0" + timeline-state-resolver: "npm:10.0.0-nightly-main-20260601-094843-59ce43391.0" tslib: "npm:^2.8.1" underscore: "npm:^1.13.8" winston: "npm:^3.19.0" @@ -28320,30 +28320,30 @@ asn1@evs-broadcast/node-asn1: languageName: node linkType: hard -"timeline-state-resolver-api@npm:10.0.0-nightly-main-20260514-152958-5f6033589.0": - version: 10.0.0-nightly-main-20260514-152958-5f6033589.0 - resolution: "timeline-state-resolver-api@npm:10.0.0-nightly-main-20260514-152958-5f6033589.0" +"timeline-state-resolver-api@npm:10.0.0-nightly-main-20260601-094843-59ce43391.0": + version: 10.0.0-nightly-main-20260601-094843-59ce43391.0 + resolution: "timeline-state-resolver-api@npm:10.0.0-nightly-main-20260601-094843-59ce43391.0" dependencies: tslib: "npm:^2.8.1" peerDependencies: - timeline-state-resolver-types: 10.0.0-nightly-main-20260514-152958-5f6033589.0 - checksum: 10/0dbb7d7f151fc0614d7b1303b1ad876540afb6cdcea193e339c61e247527ea8e2738230ff6a7260cdc368f61c238074bfb868535cdd776f2c040b6a4c5bdb07a + timeline-state-resolver-types: 10.0.0-nightly-main-20260601-094843-59ce43391.0 + checksum: 10/f439634295b2a6162a1dfd4201728bd8d8afa5c1c1eca11581e5569bd41e01028c702279de536960c66146e8323aab79913b5d556855575ccf4a53c993934a85 languageName: node linkType: hard -"timeline-state-resolver-types@npm:10.0.0-nightly-main-20260514-152958-5f6033589.0": - version: 10.0.0-nightly-main-20260514-152958-5f6033589.0 - resolution: "timeline-state-resolver-types@npm:10.0.0-nightly-main-20260514-152958-5f6033589.0" +"timeline-state-resolver-types@npm:10.0.0-nightly-main-20260601-094843-59ce43391.0": + version: 10.0.0-nightly-main-20260601-094843-59ce43391.0 + resolution: "timeline-state-resolver-types@npm:10.0.0-nightly-main-20260601-094843-59ce43391.0" dependencies: kairos-lib: "npm:1.0.0" tslib: "npm:^2.8.1" - checksum: 10/debb3faa9eda1f4ab2a85165c052cfc5b8698367d5a84be8b4095dbdaf6a40175e4c5ec80f09199e7650b53280e8c730f85bbd30685e08d3ea382ca08ee84e8b + checksum: 10/87a33b766a94e809cdcd3517f695c4e171bbb3a451ad390adbfc2ca31be8ccff0a3531e81f0a548b683d9fea964252043837b89c6cd12c7d27d0f7e9205b65f2 languageName: node linkType: hard -"timeline-state-resolver@npm:10.0.0-nightly-main-20260514-152958-5f6033589.0": - version: 10.0.0-nightly-main-20260514-152958-5f6033589.0 - resolution: "timeline-state-resolver@npm:10.0.0-nightly-main-20260514-152958-5f6033589.0" +"timeline-state-resolver@npm:10.0.0-nightly-main-20260601-094843-59ce43391.0": + version: 10.0.0-nightly-main-20260601-094843-59ce43391.0 + resolution: "timeline-state-resolver@npm:10.0.0-nightly-main-20260601-094843-59ce43391.0" dependencies: "@tv2media/v-connection": "npm:^7.3.4" atem-connection: "npm:3.7.0" @@ -28368,16 +28368,16 @@ asn1@evs-broadcast/node-asn1: sprintf-js: "npm:^1.1.3" superfly-timeline: "npm:^9.2.0" threadedclass: "npm:^1.4.0" - timeline-state-resolver-api: "npm:10.0.0-nightly-main-20260514-152958-5f6033589.0" - timeline-state-resolver-types: "npm:10.0.0-nightly-main-20260514-152958-5f6033589.0" + timeline-state-resolver-api: "npm:10.0.0-nightly-main-20260601-094843-59ce43391.0" + timeline-state-resolver-types: "npm:10.0.0-nightly-main-20260601-094843-59ce43391.0" tslib: "npm:^2.8.1" tv-automation-quantel-gateway-client: "npm:^3.1.7" type-fest: "npm:^3.13.1" underscore: "npm:^1.13.8" utf-8-validate: "npm:^6.0.6" - ws: "npm:^8.19.0" + ws: "npm:^8.21.0" xml-js: "npm:^1.6.11" - checksum: 10/1587b2d9e3c33b2f2754e0ae6caa0cb76e0cf1fd9c57cfeb240cbf2039e050477ba38bba03ea2717137b6943977bf82dc5f5c5b7b3ca4e17efa6aa7f90a15209 + checksum: 10/10dd02f3405673796de06f9c0c624777cbc37ace663f82b065ad4becb043d2d5825fca11a3f3221e8d20ad5e58ce2e9567b0668682a9ba0bda26ea0e97932479 languageName: node linkType: hard @@ -30663,9 +30663,9 @@ asn1@evs-broadcast/node-asn1: languageName: node linkType: hard -"ws@npm:^8.13.0, ws@npm:^8.18.0, ws@npm:^8.19.0, ws@npm:^8.20.0": - version: 8.20.0 - resolution: "ws@npm:8.20.0" +"ws@npm:^8.13.0, ws@npm:^8.18.0, ws@npm:^8.19.0, ws@npm:^8.20.0, ws@npm:^8.21.0": + version: 8.21.0 + resolution: "ws@npm:8.21.0" peerDependencies: bufferutil: ^4.0.1 utf-8-validate: ">=5.0.2" @@ -30674,7 +30674,7 @@ asn1@evs-broadcast/node-asn1: optional: true utf-8-validate: optional: true - checksum: 10/b7ab934b21ffdea9f25a5af5097e8c1ec7625db553bca026c5a23e35b7c236f3fb89782f2b57fab9da553864512f9aa7d245827ef998d26ffa1b2187a19a6d10 + checksum: 10/088411956432c8f876158409d5a285cb9ad1382f593391f51d3a599bd0a5b277f876609ebd00fc3596321c4a4c9064d6fffe1ebad960e8ea7fd9ae25324f35c2 languageName: node linkType: hard