Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 118 additions & 3 deletions packages/api/src/controllers/project.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Router } from "express";
import { Request, Router } from "express";
import { authorizer } from "../middleware";
import { db } from "../store";
import { db, jobsDb } from "../store";
import { v4 as uuid } from "uuid";
import {
makeNextHREF,
Expand All @@ -11,7 +11,11 @@ import {
import { NotFoundError, ForbiddenError } from "../store/errors";
import sql from "sql-template-strings";
import { WithID } from "../store/types";
import { Project } from "../schema/types";
import { Asset, Project } from "../schema/types";
import { CliArgs } from "../parse-cli";
import Queue from "../store/queue";
import logger from "../logger";
import { DB } from "../store/db";

const app = Router();

Expand Down Expand Up @@ -177,4 +181,115 @@ app.delete("/:id", authorizer({}), async (req, res) => {
res.end();
});

app.post(
"/job/projects-cleanup",
authorizer({ anyAdmin: true }),

Check failure

Code scanning / CodeQL

Missing rate limiting

This route handler performs [authorization](1), but is not rate-limited.
Comment on lines +185 to +187
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw if you're creating this only because the other jobs have sth like it, it's unnecessary! The only reason they're like that is tech debt: we already had such APIs that were called from GitHub actions.

The cleanest way for a new job IMO will be to implement everything on the job. Then if later we want the API to trigger it manually, we can make an API that calls the job function (not the other way around).

Your call though, especially if you're already too far into having this API that it would be a big refactor to get rid of it now.

async (req, res) => {
// import the job dynamically to avoid circular dependencies
const { default: projectsCleanup } = await import(
"../jobs/projects-cleanup"
);

const limit = parseInt(req.query.limit?.toString()) || 1000;

const { cleanedUp } = await projectsCleanup(
{
...req.config,
projectsCleanupLimit: limit,
},
req,
{ jobsDb },
);

res.status(200);
res.json({ cleanedUp });
},
);

export function triggerCleanUpProjectsJob(
projects: Project[],
req: Request,
): [Project[], Promise<void>] {
if (!projects.length) {
return [projects, Promise.resolve()];
}

const jobPromise = Promise.resolve().then(async () => {
try {
await Promise.all(projects.map((s) => cleanUpProject(jobsDb, s, req)));
} catch (err) {
const ids = projects.map((s) => s.id);
logger.error(`Error cleaning up projectId=${ids} err=`, err);
}
});

return [projects, jobPromise];
}

async function cleanUpProject(db: DB, project: Project, req: Request) {
let [assets] = await db.asset.find({
filters: {
projectId: project.id,
deleted: false,
},
});

let [streams] = await db.stream.find({
filters: {
projectId: project.id,
deleted: false,
},
});

let [signingKeys] = await db.signingKey.find({
filters: {
projectId: project.id,
deleted: false,
},
});

let [webhooks] = await db.webhook.find({
filters: {
projectId: project.id,
deleted: false,
},
});

let [sessions] = await db.session.find({
filters: {
projectId: project.id,
deleted: false,
},
});

for (const asset of assets) {
await req.taskScheduler.deleteAsset(asset.id);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure this doesn't send webhooks I guess? Or do we want webhooks?

}

for (const stream of streams) {
await db.stream.update(stream.id, {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW there's a markDeleted function on the tables which is generally used for the soft deletion

deleted: true,
});
}

for (const signingKey of signingKeys) {
await db.signingKey.update(signingKey.id, {
deleted: true,
disabled: true,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to disable them as well? Does the code that check disabled not check deleted? If so, that's the thing that needs to be fixed instead imo.

My reasoning is that if we ever want to "undelete" a project then we would need to figure out which signing keys were already disabled and which ones we disabled automatically here.

});
}

for (const webhook of webhooks) {
await db.webhook.update(webhook.id, {
deleted: true,
});
}

for (const session of sessions) {
await db.session.update(session.id, {
deleted: true,
});
}
}

export default app;
129 changes: 129 additions & 0 deletions packages/api/src/jobs/projects-cleanup.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import { v4 as uuid } from "uuid";

import * as appRouter from "../app-router";
import * as projectsController from "../controllers/project";
import { cache } from "../store/cache";
import { DB } from "../store/db";
import Queue, { RabbitQueue } from "../store/queue";
import { rabbitMgmt } from "../test-helpers";
import params, { testId } from "../test-params";
import projectsCleanup from "./projects-cleanup";
import { Request } from "express";

describe("projects-cleanup", () => {
// There are further functional tests under controllers/stream.test.ts "active clean-up"

let db: DB;
let initClientsSpy: jest.SpyInstance;

beforeAll(async () => {
db = new DB();
await db.start({ postgresUrl: params.postgresUrl });
await rabbitMgmt.createVhost(testId);
});

afterAll(async () => {
await rabbitMgmt.deleteVhost(testId);
});

let queue: Queue;

beforeEach(() => {
const originalInitClient = appRouter.initClients;
initClientsSpy = jest
.spyOn(appRouter, "initClients")
.mockImplementation(async (params, name) => {
const result = await originalInitClient(params, name);
queue = result.queue;
jest.spyOn(queue, "consume");
jest.spyOn(queue, "delayedPublishWebhook");
return result;
});
});

afterEach(() => {
jest.restoreAllMocks();
cache.storage = null;
queue?.close();
queue = null;
});

const mockProject = (deleted: boolean, deletedAt: number) => {
return {
id: uuid(),
name: "project1",
deleted,
deletedAt,
};
};

const mockStream = (projectId: string) => {
return {
id: uuid(),
name: "stream1",
playbackId: uuid(),
streamKey: uuid(),
projectId,
};
};

const mockAsset = (projectId: string) => {
return {
id: uuid(),
name: "asset1",
projectId,
source: {
type: "url" as const,
url: "someSource",
},
};
};

it("it should deleted related assets and streams", async () => {
const triggerSpy = jest
.spyOn(projectsController, "triggerCleanUpProjectsJob")
.mockImplementation(() => [[], Promise.resolve()]);
Comment on lines +83 to +85
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same regarding the "inverted" nature of the current jobs implementation. Let's make sure the "real" tests are implemented here, which check the effects of the deletion job. The API that should be the "side-kick" which calls the job, but the core logic of the job should be the job itself


const now = Date.now();
const project = await db.project.create(mockProject(false, now - 10000));

let streamsToDelete = [];
let assetsToDelete = [];
// 3 streams
for (let i = 0; i < 3; i++) {
const stream = await db.stream.create(mockStream(project.id));
streamsToDelete.push(stream.id);
}
// 3 assets
for (let i = 0; i < 3; i++) {
const asset = await db.asset.create(mockAsset(project.id));
assetsToDelete.push(asset.id);
}

let mockReq: Request = {
user: {
id: "test",
admin: false,
defaultProjectId: uuid(),
},
project: {
id: "test",
},
} as Request;

await projectsCleanup(params, mockReq);

expect(triggerSpy).toHaveBeenCalledTimes(1);

for (const streamId of streamsToDelete) {
let stream = await db.stream.get(streamId);
expect(stream.id).toBe(streamId);
expect(stream.deleted).toBe(true);
}
for (const assetId of assetsToDelete) {
let asset = await db.asset.get(assetId);
expect(asset.id).toBe(assetId);
expect(asset.deleted).toBe(true);
}
});
});
35 changes: 35 additions & 0 deletions packages/api/src/jobs/projects-cleanup.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import sql from "sql-template-strings";
import { initClients } from "../app-router";
import { CliArgs } from "../parse-cli";
import { DB } from "../store/db";
import Queue from "../store/queue";
import { triggerCleanUpProjectsJob } from "../controllers/project";
import { Request } from "express";

// queries for all the deleted projects
// clean up logic for all related assets and streams
export default async function projectsCleanup(
config: CliArgs,
req: Request,
clients?: { jobsDb: DB },
) {
if (!config.ingest?.length) {
throw new Error("ingest not configured");
}
const { jobsDb } =
clients ?? (await initClients(config, "projects-cleanup-job"));
const { projectsCleanupLimit: limit, ingest } = config;

let [projects] = await jobsDb.stream.find([sql`data->>'deleted' = 'true'`], {
limit,
order: "data->>'lastSeen' DESC",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this exist? I thought it was an user/api-key/stream thing only

});
Comment on lines +23 to +26
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably need sth like a "deleting phase" on the projects too, otherwise this will keep listing deleted files that have already been cleaned-up ad infinitum. Maybe a simple cleanedUp boolean in the project that gets set by this job?


const [cleanedUp, jobPromise] = triggerCleanUpProjectsJob(projects, req);
await jobPromise;

return {
cleanedUp,
logContext: `limit=${limit} numCleanedUp=${cleanedUp.length}`,
};
}
5 changes: 5 additions & 0 deletions packages/api/src/parse-cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,11 @@ export default function parseCli(argv?: string | readonly string[]) {
type: "number",
default: 1000,
},
"projects-cleanup-limit": {
describe: "job/projects-cleanup: max number of projects to clean up",
type: "number",
default: 100,
},
"update-usage-from": {
describe:
"job/update-usage: unix millis timestamp for start time of update usage job",
Expand Down