diff --git a/.gitignore b/.gitignore index 333f171..1bf70c4 100644 --- a/.gitignore +++ b/.gitignore @@ -41,5 +41,8 @@ opensrc/ # Presets .discord-search-presets.json +# Local config +*.local.json + # Traycer .traycer/ diff --git a/src/discord/client.ts b/src/discord/client.ts new file mode 100644 index 0000000..69978cd --- /dev/null +++ b/src/discord/client.ts @@ -0,0 +1,381 @@ +import { Result } from "better-result"; +import type { z } from "zod"; +import { + IndexNotReadyResponseSchema, + RateLimitBodySchema, +} from "@/discord/schemas.ts"; +import { + DiscordApiError, + IndexNotReadyError, + RateLimitExhaustedError, + ValidationError, +} from "@/errors.ts"; + +const DISCORD_API_BASE = "https://discord.com/api/v10"; +const DEFAULT_MAX_RETRIES_429 = 3; +const DEFAULT_MAX_RETRIES_202 = 5; +const DEFAULT_RETRY_DELAY_SECONDS = 2; +const DEFAULT_REQUEST_TIMEOUT_MS = 30_000; + +type RateLimitState = { + lastRequestTime: number; + remaining: number; + resetAfterMs: number; +}; + +type BucketKey = string; + +const rateLimitBuckets = new Map(); +const bucketKeyMap = new Map(); + +const computeBucketKey = (path: string, token: string): BucketKey => { + const routePath = path.split("?")[0]; + const hashInput = `${routePath}:${token}`; + let hash = 0; + for (let i = 0; i < hashInput.length; i++) { + const char = hashInput.charCodeAt(i); + hash = ((hash << 5) - hash + char) >>> 0; + } + return `bucket:${hash}`; +}; + +const getBucketState = (bucketKey: BucketKey): RateLimitState => { + let state = rateLimitBuckets.get(bucketKey); + if (!state) { + state = { + lastRequestTime: 0, + remaining: 1, + resetAfterMs: 0, + }; + rateLimitBuckets.set(bucketKey, state); + } + return state; +}; + +const sleep = (ms: number) => + new Promise((resolve) => setTimeout(resolve, ms)); + +const updateRateLimitState = ( + headers: Headers, + bucketKey: BucketKey, + computedKey: BucketKey +): void => { + const state = getBucketState(bucketKey); + const remaining = headers.get("X-RateLimit-Remaining"); + const resetAfter = headers.get("X-RateLimit-Reset-After"); + const bucket = headers.get("X-RateLimit-Bucket"); + + if (bucket !== null && `discord:${bucket}` !== bucketKey) { + const discordBucketKey = `discord:${bucket}`; + rateLimitBuckets.delete(bucketKey); + rateLimitBuckets.set(discordBucketKey, state); + bucketKeyMap.set(computedKey, discordBucketKey); + } + + if (remaining !== null) { + state.remaining = Number.parseInt(remaining, 10); + } + if (resetAfter !== null) { + state.resetAfterMs = Number.parseFloat(resetAfter) * 1000; + } + state.lastRequestTime = Date.now(); +}; + +const waitForRateLimit = async (bucketKey: BucketKey): Promise => { + const state = getBucketState(bucketKey); + if (state.remaining > 0) { + state.remaining--; + return; + } + + const elapsed = Date.now() - state.lastRequestTime; + const waitTime = state.resetAfterMs - elapsed; + + if (waitTime > 0) { + await sleep(waitTime); + } +}; + +const makeHeaders = (token: string) => ({ + Authorization: `Bot ${token}`, + "Content-Type": "application/json", +}); + +const fetchWithAuth = async ( + url: string, + token: string, + timeoutMs = DEFAULT_REQUEST_TIMEOUT_MS, + method = "GET" +): Promise> => { + const controller = new AbortController(); + const timer = setTimeout(() => controller.abort(), timeoutMs); + + const result = await Result.tryPromise({ + try: () => + fetch(url, { + method, + headers: makeHeaders(token), + signal: controller.signal, + }), + catch: (cause) => { + if (cause instanceof DOMException && cause.name === "AbortError") { + return new DiscordApiError({ + message: `Request timed out after ${timeoutMs}ms`, + status: 0, + body: null, + }); + } + return new DiscordApiError({ + message: `Network error: ${cause instanceof Error ? cause.message : String(cause)}`, + status: 0, + body: null, + }); + }, + }); + + clearTimeout(timer); + return result; +}; + +type DiscordFetchError = + | DiscordApiError + | RateLimitExhaustedError + | IndexNotReadyError + | ValidationError; + +const handle429 = async ( + response: Response, + rateLimitAttempt: number, + max429Retries: number +): Promise> => { + const bodyResult = await Result.tryPromise({ + try: () => response.json() as Promise, + catch: () => + new DiscordApiError({ + message: "Failed to parse 429 response body", + status: 429, + body: null, + }), + }); + + if (bodyResult.isErr()) { + return bodyResult; + } + + const parsed = RateLimitBodySchema.safeParse(bodyResult.value); + if (!parsed.success) { + return Result.err( + new DiscordApiError({ + message: "Invalid 429 response body", + status: 429, + body: bodyResult.value, + }) + ); + } + + const retryAfter = parsed.data.retry_after; + + if (rateLimitAttempt >= max429Retries) { + return Result.err( + new RateLimitExhaustedError({ + message: `Rate limited after ${max429Retries} retries`, + retryAfter, + }) + ); + } + + const backoffMs = + retryAfter * 1000 * 2 ** rateLimitAttempt + Math.random() * 1000; + await sleep(backoffMs); + return Result.ok("retry" as const); +}; + +const parseRetryAfterFrom202 = async (response: Response): Promise => { + const bodyResult = await Result.tryPromise({ + try: () => response.json() as Promise, + catch: () => null, + }); + + if (bodyResult.isErr()) { + return DEFAULT_RETRY_DELAY_SECONDS; + } + + const parsed = IndexNotReadyResponseSchema.safeParse(bodyResult.value); + return parsed.success + ? (parsed.data.retry_after ?? DEFAULT_RETRY_DELAY_SECONDS) + : DEFAULT_RETRY_DELAY_SECONDS; +}; + +type RetryCounter = { value: number }; + +const handle202 = async ( + response: Response, + url: string, + token: string, + schema: z.ZodType, + maxRetries: number, + maxRetries429: number, + bucketKey: BucketKey, + computedKey: BucketKey, + rateLimitCounter: RetryCounter +): Promise> => { + let retryAfter = await parseRetryAfterFrom202(response); + + for (let attempt = 0; attempt < maxRetries; attempt++) { + await sleep(retryAfter * 1000); + await waitForRateLimit(bucketKey); + + const retryResult = await fetchWithAuth(url, token); + if (retryResult.isErr()) { + return retryResult; + } + + const retryResponse = retryResult.value; + updateRateLimitState(retryResponse.headers, bucketKey, computedKey); + + if (retryResponse.status === 429) { + const rateLimitResult = await handle429( + retryResponse, + rateLimitCounter.value, + maxRetries429 + ); + if (rateLimitResult.isErr()) { + return rateLimitResult; + } + rateLimitCounter.value++; + attempt--; + continue; + } + + if (retryResponse.status === 202) { + retryAfter = await parseRetryAfterFrom202(retryResponse); + continue; + } + + if (!retryResponse.ok) { + return await handleErrorResponse(retryResponse); + } + + return await parseResponse(retryResponse, schema); + } + + return Result.err( + new IndexNotReadyError({ + message: `Index still not available after ${maxRetries} retries`, + retryAfter, + }) + ); +}; + +const handleErrorResponse = async ( + response: Response +): Promise> => { + const bodyResult = await Result.tryPromise({ + try: () => response.json() as Promise, + catch: () => null as unknown, + }); + + return Result.err( + new DiscordApiError({ + message: `Discord API error: ${response.status} ${response.statusText}`, + status: response.status, + body: bodyResult.isOk() ? bodyResult.value : null, + }) + ); +}; + +const parseResponse = async ( + response: Response, + schema: z.ZodType +): Promise> => { + const jsonResult = await Result.tryPromise({ + try: () => response.json() as Promise, + catch: (cause) => + new DiscordApiError({ + message: `Failed to parse response JSON: ${cause instanceof Error ? cause.message : String(cause)}`, + status: response.status, + body: null, + }), + }); + + if (jsonResult.isErr()) { + return jsonResult; + } + + const parsed = schema.safeParse(jsonResult.value); + + if (!parsed.success) { + return Result.err( + new ValidationError({ + message: "Discord API response validation failed", + issues: parsed.error.issues, + }) + ); + } + + return Result.ok(parsed.data); +}; + +export const discordFetch = async ( + path: string, + schema: z.ZodType, + token: string, + maxRetries429 = DEFAULT_MAX_RETRIES_429, + maxRetries202 = DEFAULT_MAX_RETRIES_202 +): Promise> => { + const url = `${DISCORD_API_BASE}${path}`; + const computedKey = computeBucketKey(path, token); + const bucketKey = bucketKeyMap.get(computedKey) ?? computedKey; + const rateLimitCounter: RetryCounter = { value: 0 }; + + for (; rateLimitCounter.value <= maxRetries429; rateLimitCounter.value++) { + await waitForRateLimit(bucketKey); + + const fetchResult = await fetchWithAuth(url, token); + if (fetchResult.isErr()) { + return fetchResult; + } + + const response = fetchResult.value; + updateRateLimitState(response.headers, bucketKey, computedKey); + + if (response.status === 429) { + const result = await handle429( + response, + rateLimitCounter.value, + maxRetries429 + ); + if (result.isErr()) { + return result; + } + continue; + } + + if (response.status === 202) { + return await handle202( + response, + url, + token, + schema, + maxRetries202, + maxRetries429, + bucketKey, + computedKey, + rateLimitCounter + ); + } + + if (!response.ok) { + return await handleErrorResponse(response); + } + + return await parseResponse(response, schema); + } + + return Result.err( + new RateLimitExhaustedError({ + message: "Rate limit retries exhausted", + retryAfter: 0, + }) + ); +}; diff --git a/src/discord/schemas.ts b/src/discord/schemas.ts new file mode 100644 index 0000000..97e0cb4 --- /dev/null +++ b/src/discord/schemas.ts @@ -0,0 +1,220 @@ +import { z } from "zod"; + +export const MAX_OFFSET = 9975; +export const MAX_PAGE_SIZE = 25; + +const SNOWFLAKE_REGEX = /^\d{17,20}$/; + +const snowflakeSchema = z + .string() + .regex(SNOWFLAKE_REGEX, { + message: "Invalid Discord ID: must be a 17-20 digit numeric snowflake", + }) + .max(20, { message: "Discord ID exceeds maximum length of 20 characters" }); + +const snowflakeArraySchema = z.array(snowflakeSchema); + +// Embed sub-objects + +export const EmbedFooterSchema = z.object({ + text: z.string(), + icon_url: z.string().optional(), + proxy_icon_url: z.string().optional(), +}); + +const EmbedMediaSchema = z.object({ + url: z.string(), + proxy_url: z.string().optional(), + height: z.number().optional(), + width: z.number().optional(), +}); + +export const EmbedImageSchema = EmbedMediaSchema; +export const EmbedThumbnailSchema = EmbedMediaSchema; + +export const EmbedVideoSchema = z.object({ + url: z.string().optional(), + proxy_url: z.string().optional(), + height: z.number().optional(), + width: z.number().optional(), +}); + +export const EmbedProviderSchema = z.object({ + name: z.string().optional(), + url: z.string().optional(), +}); + +export const EmbedAuthorSchema = z.object({ + name: z.string(), + url: z.string().optional(), + icon_url: z.string().optional(), + proxy_icon_url: z.string().optional(), +}); + +export const EmbedFieldSchema = z.object({ + name: z.string(), + value: z.string(), + inline: z.boolean().optional(), +}); + +export const EmbedSchema = z.object({ + title: z.string().optional(), + type: z.string().optional(), + description: z.string().optional(), + url: z.string().optional(), + timestamp: z.string().optional(), + color: z.number().optional(), + footer: EmbedFooterSchema.optional(), + image: EmbedImageSchema.optional(), + thumbnail: EmbedThumbnailSchema.optional(), + video: EmbedVideoSchema.optional(), + provider: EmbedProviderSchema.optional(), + author: EmbedAuthorSchema.optional(), + fields: z.array(EmbedFieldSchema).optional(), +}); + +export const UserSchema = z.object({ + id: snowflakeSchema, + username: z.string(), + discriminator: z.string().optional(), + avatar: z.string().nullable().optional(), + bot: z.boolean().optional(), +}); + +export const AttachmentSchema = z.object({ + id: snowflakeSchema, + filename: z.string(), + size: z.number(), + url: z.string(), + proxy_url: z.string().optional(), + content_type: z.string().optional(), +}); + +export const MessageSchema = z.object({ + id: snowflakeSchema, + channel_id: snowflakeSchema, + guild_id: snowflakeSchema.optional(), + author: UserSchema, + content: z.string(), + timestamp: z.string(), + edited_timestamp: z.string().nullable().optional(), + tts: z.boolean().optional(), + mention_everyone: z.boolean().optional(), + mentions: z.array(UserSchema).optional(), + pinned: z.boolean().optional(), + type: z.number().optional(), + embeds: z.array(EmbedSchema).optional(), + attachments: z.array(AttachmentSchema).optional(), + referenced_message: z + .object({ + id: snowflakeSchema, + channel_id: snowflakeSchema, + author: UserSchema.optional(), + content: z.string().optional(), + }) + .nullable() + .optional(), +}); + +export const ThreadSchema = z.object({ + id: snowflakeSchema, + type: z.number(), + name: z.string().optional(), + guild_id: snowflakeSchema.optional(), + parent_id: snowflakeSchema.nullable().optional(), + message_count: z.number().optional(), + member_count: z.number().optional(), + thread_metadata: z + .object({ + archived: z.boolean(), + auto_archive_duration: z.number().optional(), + archive_timestamp: z.string().optional(), + locked: z.boolean().optional(), + }) + .optional(), +}); + +export const MemberSchema = z.object({ + user_id: snowflakeSchema.optional(), + nick: z.string().nullable().optional(), + avatar: z.string().nullable().optional(), + roles: z.array(snowflakeSchema).optional(), + joined_at: z.string().optional(), + deaf: z.boolean().optional(), + mute: z.boolean().optional(), +}); + +export const SearchResponseSchema = z.object({ + total_results: z.number(), + messages: z.array(z.array(MessageSchema)), + doing_deep_historical_index: z.boolean().optional(), + documents_indexed: z.number().optional(), + threads: z.array(ThreadSchema).optional(), + members: z.array(MemberSchema).optional(), +}); + +export const RateLimitBodySchema = z.object({ + retry_after: z.number(), +}); + +export const IndexNotReadyResponseSchema = z.object({ + message: z.string(), + code: z.number(), + documents_indexed: z.number().optional(), + retry_after: z.number(), +}); + +export const SearchParamsSchema = z.object({ + attachmentExtension: z.array(z.string()).optional(), + attachmentFilename: z.array(z.string()).optional(), + authorId: snowflakeArraySchema.optional(), + authorType: z.array(z.enum(["user", "bot", "webhook"])).optional(), + channelId: snowflakeArraySchema.optional(), + content: z.string().optional(), + embedProvider: z.array(z.string()).optional(), + embedType: z + .array(z.enum(["image", "video", "gif", "sound", "article"])) + .optional(), + guildId: snowflakeSchema, + has: z + .array( + z.enum([ + "image", + "sound", + "video", + "file", + "sticker", + "embed", + "link", + "poll", + "snapshot", + ]) + ) + .optional(), + includeNsfw: z.boolean().optional(), + linkHostname: z.array(z.string()).optional(), + maxId: snowflakeSchema.optional(), + mentionEveryone: z.boolean().optional(), + mentions: snowflakeArraySchema.optional(), + mentionsRoleId: snowflakeArraySchema.optional(), + minId: snowflakeSchema.optional(), + pinned: z.boolean().optional(), + repliedToMessageId: snowflakeArraySchema.optional(), + repliedToUserId: snowflakeArraySchema.optional(), + slop: z.number().int().min(0).optional(), + sortBy: z.enum(["timestamp", "relevance"]).optional(), + sortOrder: z.enum(["asc", "desc"]).optional(), + offset: z.number().int().min(0).max(MAX_OFFSET).optional(), + limit: z.number().int().min(1).max(MAX_PAGE_SIZE).optional(), +}); + +// Inferred types +export type Embed = z.infer; +export type EmbedField = z.infer; +export type User = z.infer; +export type Message = z.infer; +export type Thread = z.infer; +export type Member = z.infer; +export type SearchResponse = z.infer; +export type IndexNotReadyResponse = z.infer; +export type SearchParams = z.infer; diff --git a/src/discord/search.ts b/src/discord/search.ts new file mode 100644 index 0000000..7cb27f0 --- /dev/null +++ b/src/discord/search.ts @@ -0,0 +1,332 @@ +import { Ok, Result } from "better-result"; +import { discordFetch } from "@/discord/client.ts"; +import { + MAX_OFFSET, + MAX_PAGE_SIZE, + type Message, + type SearchParams, + SearchParamsSchema, + type SearchResponse, + SearchResponseSchema, +} from "@/discord/schemas.ts"; +import type { + DiscordApiError, + IndexNotReadyError, + RateLimitExhaustedError, +} from "@/errors.ts"; +import { ValidationError } from "@/errors.ts"; + +const validateSearchParams = ( + params: unknown +): Result => { + const parsed = SearchParamsSchema.safeParse(params); + if (!parsed.success) { + return Result.err( + new ValidationError({ + message: "Invalid search parameters", + issues: parsed.error.issues, + }) + ); + } + return Result.ok(parsed.data); +}; + +type SearchError = + | DiscordApiError + | RateLimitExhaustedError + | IndexNotReadyError + | ValidationError; + +const buildQueryString = ( + params: SearchParams, + offset: number, + maxId?: string +): string => { + const qs = new URLSearchParams(); + const pageSize = params.limit + ? Math.min(params.limit, MAX_PAGE_SIZE) + : MAX_PAGE_SIZE; + + qs.set("limit", String(pageSize)); + qs.set("offset", String(offset)); + + if (params.content) { + qs.set("content", params.content); + } + if (params.pinned !== undefined) { + qs.set("pinned", String(params.pinned)); + } + if (params.mentionEveryone !== undefined) { + qs.set("mention_everyone", String(params.mentionEveryone)); + } + if (params.sortBy) { + qs.set("sort_by", params.sortBy); + } + if (params.sortOrder) { + qs.set("sort_order", params.sortOrder); + } + if (params.includeNsfw !== undefined) { + qs.set("include_nsfw", String(params.includeNsfw)); + } + if (params.slop !== undefined) { + qs.set("slop", String(params.slop)); + } + + // Use override maxId for snowflake partitioning, otherwise use params + const effectiveMaxId = maxId ?? params.maxId; + if (effectiveMaxId) { + qs.set("max_id", effectiveMaxId); + } + if (params.minId) { + qs.set("min_id", params.minId); + } + + // Array params + const arrayParams: [string, string[] | undefined][] = [ + ["channel_id", params.channelId], + ["author_id", params.authorId], + ["author_type", params.authorType], + ["mentions", params.mentions], + ["mentions_role_id", params.mentionsRoleId], + ["replied_to_user_id", params.repliedToUserId], + ["replied_to_message_id", params.repliedToMessageId], + ["has", params.has], + ["embed_type", params.embedType], + ["embed_provider", params.embedProvider], + ["link_hostname", params.linkHostname], + ["attachment_filename", params.attachmentFilename], + ["attachment_extension", params.attachmentExtension], + ]; + + for (const [key, values] of arrayParams) { + if (values) { + for (const value of values) { + qs.append(key, value); + } + } + } + + return qs.toString(); +}; + +const fetchSearch = async ( + params: SearchParams, + token: string, + offset: number, + maxId?: string +): Promise> => { + const queryString = buildQueryString(params, offset, maxId); + const encodedGuildId = encodeURIComponent(params.guildId); + const path = `/guilds/${encodedGuildId}/messages/search?${queryString}`; + + return await discordFetch(path, SearchResponseSchema, token); +}; + +export const searchMessages = async ( + params: unknown, + token: string, + offset = 0, + maxId?: string +): Promise> => { + const validatedParams = validateSearchParams(params); + if (validatedParams.isErr()) { + return validatedParams; + } + + return await fetchSearch(validatedParams.value, token, offset, maxId); +}; + +export type SearchProgress = { fetched: number; total: number }; +export type ProgressCallback = (progress: SearchProgress) => void; +export type PageCallback = (messages: Message[]) => void; + +type PaginationState = { + allMessages: Message[]; + totalResults: number; +}; + +const fetchPage = async ( + params: SearchParams, + token: string, + offset: number, + maxId: string | undefined, + state: PaginationState, + maxMessages: number | undefined, + onProgress?: ProgressCallback, + onPage?: PageCallback +): Promise> => { + const result = await fetchSearch(params, token, offset, maxId); + if (result.isErr()) { + return result; + } + + const data = result.value; + const pageSize = params.limit + ? Math.min(params.limit, MAX_PAGE_SIZE) + : MAX_PAGE_SIZE; + + if (state.totalResults === 0) { + state.totalResults = maxMessages + ? Math.min(data.total_results, maxMessages) + : data.total_results; + } + + let pageMessages = data.messages + .map((group) => group[0]) + .filter((msg): msg is Message => msg !== undefined); + + if (pageMessages.length === 0) { + onProgress?.({ + fetched: state.allMessages.length, + total: state.totalResults, + }); + return new Ok("done" as const); + } + + // Truncate if adding this page would exceed the limit + if ( + maxMessages && + state.allMessages.length + pageMessages.length > maxMessages + ) { + pageMessages = pageMessages.slice( + 0, + maxMessages - state.allMessages.length + ); + } + + Array.prototype.push.apply(state.allMessages, pageMessages); + onPage?.(pageMessages); + onProgress?.({ + fetched: state.allMessages.length, + total: state.totalResults, + }); + + if (state.allMessages.length >= state.totalResults) { + return new Ok("done" as const); + } + + if (pageMessages.length < pageSize) { + return new Ok("break" as const); + } + + return new Ok("continue" as const); +}; + +type PaginationConfig = { + searchParams: SearchParams; + token: string; + startOffset: number; + maxMessages: number | undefined; + pageSize: number; + onProgress?: ProgressCallback; + onPage?: PageCallback; +}; + +const fetchPartition = async ( + config: PaginationConfig, + state: PaginationState, + maxId: string | undefined +): Promise> => { + const initialOffset = maxId ? 0 : config.startOffset; + + for ( + let offset = initialOffset; + offset <= MAX_OFFSET; + offset += config.pageSize + ) { + const pageResult = await fetchPage( + config.searchParams, + config.token, + offset, + maxId, + state, + config.maxMessages, + config.onProgress, + config.onPage + ); + if (pageResult.isErr()) { + return pageResult; + } + if (pageResult.value === "done") { + return new Ok("done" as const); + } + if (pageResult.value === "break") { + break; + } + } + + return new Ok("continue" as const); +}; + +export const searchAllMessages = async ( + params: unknown, + token: string, + onProgress?: ProgressCallback, + onPage?: PageCallback, + maxMessages?: number +): Promise> => { + const validatedParams = validateSearchParams(params); + if (validatedParams.isErr()) { + return validatedParams; + } + + const queryParams = validatedParams.value; + + if (queryParams.sortBy && queryParams.sortBy !== "timestamp") { + return Result.err( + new ValidationError({ + message: + "searchAllMessages requires sortBy: 'timestamp' for snowflake-based pagination", + issues: [], + }) + ); + } + + if (queryParams.sortOrder && queryParams.sortOrder !== "desc") { + return Result.err( + new ValidationError({ + message: + "searchAllMessages requires sortOrder: 'desc' for snowflake-based pagination", + issues: [], + }) + ); + } + + const config: PaginationConfig = { + searchParams: { ...queryParams, sortBy: "timestamp", sortOrder: "desc" }, + token, + startOffset: queryParams.offset ?? 0, + maxMessages, + pageSize: queryParams.limit + ? Math.min(queryParams.limit, MAX_PAGE_SIZE) + : MAX_PAGE_SIZE, + onProgress, + onPage, + }; + + const state: PaginationState = { allMessages: [], totalResults: 0 }; + let currentMaxId: string | undefined; + + while (true) { + const result = await fetchPartition(config, state, currentMaxId); + if (result.isErr()) { + return result; + } + if (result.value === "done") { + break; + } + + if (state.allMessages.length >= state.totalResults) { + break; + } + + const lastMessage = state.allMessages.at(-1); + if (!lastMessage || currentMaxId === lastMessage.id) { + break; + } + + currentMaxId = lastMessage.id; + } + + return new Ok(state.allMessages); +};