diff --git a/services/telegram-bot/pod-telegram-bot/src/account.ts b/services/telegram-bot/pod-telegram-bot/src/account.ts index 7705220181f..219bf66bd4d 100644 --- a/services/telegram-bot/pod-telegram-bot/src/account.ts +++ b/services/telegram-bot/pod-telegram-bot/src/account.ts @@ -200,3 +200,17 @@ export async function enableIntegration (integration: IntegrationInfo): Promise< } }) } + +export async function updateIntegrationData ( + integration: IntegrationInfo, + dataPatch: Record +): Promise { + const client = getAccountClient(serviceToken()) + await client.updateIntegration({ + ...integration, + data: { + ...(integration.data ?? {}), + ...dataPatch + } + }) +} diff --git a/services/telegram-bot/pod-telegram-bot/src/db.ts b/services/telegram-bot/pod-telegram-bot/src/db.ts index 3c45a28f118..bc0b60a5193 100644 --- a/services/telegram-bot/pod-telegram-bot/src/db.ts +++ b/services/telegram-bot/pod-telegram-bot/src/db.ts @@ -15,10 +15,11 @@ import postgres from 'postgres' import { AccountUuid, Ref, WorkspaceUuid } from '@hcengineering/core' +import { ChunterSpace } from '@hcengineering/chunter' import { ActivityMessage } from '@hcengineering/activity' import config from './config' -import { ChannelId, ChannelRecord, MessageRecord, OtpRecord, ReplyRecord } from './types' +import { ChannelId, ChannelRecord, ForumTopicRecord, MessageRecord, OtpRecord, ReplyRecord } from './types' export async function getDb (): Promise { const sql = postgres(config.DbUrl, { @@ -36,6 +37,7 @@ const otpTable = 'telegram_bot.otp' const messagesTable = 'telegram_bot.messages' const channelsTable = 'telegram_bot.channels' const repliesTable = 'telegram_bot.replies' +const forumTopicsTable = 'telegram_bot.forum_topics' type DBFlavor = 'cockroach' | 'postgres' | 'unknown' @@ -73,7 +75,7 @@ export class PostgresDB { const sql = ` CREATE SCHEMA IF NOT EXISTS telegram_bot; - + CREATE TABLE IF NOT EXISTS ${otpTable} ( telegram_id INT8 NOT NULL, telegram_username TEXT NOT NULL, @@ -82,7 +84,7 @@ export class PostgresDB { created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), PRIMARY KEY (code) ); - + CREATE TABLE IF NOT EXISTS ${messagesTable} ( message_id VARCHAR(255) NOT NULL, workspace UUID NOT NULL, @@ -108,6 +110,17 @@ export class PostgresDB { reply_id INT8 NOT NULL, PRIMARY KEY (message_id, telegram_user_id, reply_id) ); + + CREATE TABLE IF NOT EXISTS ${forumTopicsTable} ( + workspace UUID NOT NULL, + account UUID NOT NULL, + channel_id VARCHAR(255) NOT NULL, + forum_chat_id INT8 NOT NULL, + topic_id INT8 NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + PRIMARY KEY (workspace, account, channel_id), + UNIQUE (forum_chat_id, topic_id) + ); ` await client.unsafe(sql) @@ -217,6 +230,44 @@ export class PostgresDB { return res.map(toReplyRecord)[0] } + async getForumTopic ( + workspace: WorkspaceUuid, + account: AccountUuid, + channelId: Ref + ): Promise { + const sql = ` + SELECT * FROM ${forumTopicsTable} + WHERE workspace = $1::uuid AND account = $2::uuid AND channel_id = $3::varchar + LIMIT 1` + const res = await this.client.unsafe(sql, [workspace, account, channelId]) + return res.map(toForumTopicRecord)[0] + } + + async insertForumTopic (record: Omit): Promise { + const sql = ` + INSERT INTO ${forumTopicsTable} ( + workspace, account, channel_id, forum_chat_id, topic_id + ) + VALUES ($1::uuid, $2::uuid, $3::varchar, $4::int8, $5::int8) + ON CONFLICT (workspace, account, channel_id) DO NOTHING` + await this.client.unsafe(sql, [ + record.workspace, + record.account, + record.channelId, + record.forumChatId, + record.topicId + ]) + } + + async getForumTopicByThread (forumChatId: number, topicId: number): Promise { + const sql = ` + SELECT * FROM ${forumTopicsTable} + WHERE forum_chat_id = $1::int8 AND topic_id = $2::int8 + LIMIT 1` + const res = await this.client.unsafe(sql, [forumChatId, topicId]) + return res.map(toForumTopicRecord)[0] + } + async close (): Promise { await this.client.end({ timeout: 0 }) } @@ -259,3 +310,14 @@ function toMessageRecord (raw: any): MessageRecord { telegramMessageId: Number(raw.telegram_message_id) } } + +function toForumTopicRecord (raw: any): ForumTopicRecord { + return { + workspace: raw.workspace, + account: raw.account, + channelId: raw.channel_id, + forumChatId: Number(raw.forum_chat_id), + topicId: Number(raw.topic_id), + createdAt: new Date(raw.created_at) + } +} diff --git a/services/telegram-bot/pod-telegram-bot/src/telegraf/bot.ts b/services/telegram-bot/pod-telegram-bot/src/telegraf/bot.ts index 8534679e9d3..bc8737ac52e 100644 --- a/services/telegram-bot/pod-telegram-bot/src/telegraf/bot.ts +++ b/services/telegram-bot/pod-telegram-bot/src/telegraf/bot.ts @@ -100,6 +100,52 @@ async function onReply ( return await worker.reply(integration, messageRecord, htmlToMarkup(toHTML(message)), files) } +/** + * Routes a non-reply message that arrived inside a Telegram forum topic to the matching + * Huly channel (via the forum_topics table). Returns true if the message was successfully + * routed; false if the topic is not registered (caller should fall back to legacy flow). + */ +async function handleForumTopicMessage ( + ctx: Context, + worker: PlatformWorker, + chatId: number, + threadId: number, + fromId: number +): Promise { + const topic = await worker.getForumTopicByThread(chatId, threadId) + if (topic === undefined) return false + + const integration = await getAnyIntegrationByTelegramId(fromId, topic.workspace) + if (integration === undefined) return false + + const channel = await worker.resolveChannelByRef(topic.workspace, topic.account, topic.channelId) + if (channel === undefined) return false + + const ctxMessage = ctx.message as Message | undefined + if (ctxMessage === undefined) return false + + const file = await toTelegramFileInfo(ctx as TgContext, ctxMessage) + let text = htmlToMarkup(toHTML(ctxMessage as Message.TextMessage)) + + if (isEmptyMarkup(text) && 'caption' in ctxMessage && ctxMessage.caption !== undefined) { + text = jsonToMarkup({ + type: MarkupNodeType.text, + text: ctxMessage.caption + }) + } + + if (isEmptyMarkup(text) && file === undefined) return false + + return await worker.sendMessage( + channel, + integration.account, + integration.socialId, + ctxMessage.message_id, + text, + file + ) +} + async function handleSelectChannel ( ctx: Context>, worker: PlatformWorker, @@ -217,17 +263,32 @@ export async function setUpBot (worker: PlatformWorker): Promise { - const id = ctx.chat?.id + const chatId = ctx.chat?.id const message = ctx.message - if (id === undefined || message.reply_to_message === undefined) { + if (chatId === undefined || message.reply_to_message === undefined) { return } + const fromId = ctx.from?.id + const threadId = (message as Message & { message_thread_id?: number }).message_thread_id + + // Inside a forum-enabled DM, "replying" to the topic's own first system message is + // really the user opening the topic to type into it. Treat it as a fresh forum-routed + // message instead of trying to thread-link it to the synthetic topic head. + if (fromId !== undefined && threadId !== undefined && ctx.chat?.type === 'private') { + const replyToId = message.reply_to_message.message_id + const isTopicHead = replyToId === threadId + if (isTopicHead) { + const routed = await handleForumTopicMessage(ctx, worker, chatId, threadId, fromId) + if (routed) return + } + } + const replyTo = message.reply_to_message const isReplied = await onReply( ctx, - id, + chatId, message as ReplyMessage, message.message_id, replyTo.message_id, @@ -241,11 +302,38 @@ export async function setUpBot (worker: PlatformWorker): Promise { - const id = ctx.chat?.id - if (id === undefined) return + const chatId = ctx.chat?.id + if (chatId === undefined) return if ('reply_to_message' in ctx.message) return - const integrations = await listIntegrationsByTelegramId(id) + // Skip Telegram forum service messages (topic created/edited/closed/reopened, etc.). + // They carry message_thread_id but represent system events, not user input — answering + // them with reply_parameters tied to a transient probe topic causes 400 "message thread not found". + const m = ctx.message as Record + if ( + m.forum_topic_created !== undefined || + m.forum_topic_edited !== undefined || + m.forum_topic_closed !== undefined || + m.forum_topic_reopened !== undefined || + m.general_forum_topic_hidden !== undefined || + m.general_forum_topic_unhidden !== undefined + ) { + return + } + + const fromId = ctx.from?.id + const threadId = (ctx.message as Message.TextMessage & { message_thread_id?: number }).message_thread_id + + if (fromId !== undefined && threadId !== undefined && ctx.chat?.type === 'private') { + const routed = await handleForumTopicMessage(ctx, worker, chatId, threadId, fromId) + if (routed) return + // Inside a topic but the topic is not in our table (stale topic, manual user + // creation, etc). Skip the workspace/channel keyboard fallback so Telegraf does + // not echo back into a thread id that may not exist anymore. + return + } + + const integrations = await listIntegrationsByTelegramId(chatId) if (integrations === undefined) return const workspaces: WorkspaceUuid[] = integrations diff --git a/services/telegram-bot/pod-telegram-bot/src/telegraf/commands.ts b/services/telegram-bot/pod-telegram-bot/src/telegraf/commands.ts index 3ab2707ee65..46cc5b0ea71 100644 --- a/services/telegram-bot/pod-telegram-bot/src/telegraf/commands.ts +++ b/services/telegram-bot/pod-telegram-bot/src/telegraf/commands.ts @@ -25,7 +25,8 @@ import { listIntegrationsByTelegramId, getAccountPerson, removeIntegrationsByTg, - getAnyIntegrationByTelegramId + getAnyIntegrationByTelegramId, + updateIntegrationData } from '../account' import { WorkspaceUuid } from '@hcengineering/core' @@ -34,6 +35,8 @@ export enum Command { Connect = 'connect', SyncAllChannels = 'sync_all_channels', SyncStarredChannels = 'sync_starred_channels', + SetForum = 'setforum', + UnsetForum = 'unsetforum', Help = 'help', Stop = 'stop' } @@ -56,6 +59,14 @@ export async function getBotCommands (lang: string = 'en'): Promise { + const id = ctx.from?.id + if (id === undefined) return + + // Bot API 9.4 surfaces the Threaded Mode bit via getMe().has_topics_enabled. + // Without it, createForumTopic will fail with 400, so refuse early with a + // helpful pointer to @BotFather. Using getMe instead of a probe createForumTopic + // avoids polluting the DM with a "topic created" service message. + const me = (await ctx.telegram.getMe()) as { username?: string, has_topics_enabled?: boolean } + if (me.has_topics_enabled !== true) { + await ctx.reply( + 'Topic creation is not allowed in this DM. The bot administrator must enable Threaded Mode via @BotFather: ' + + `/mybots -> @${me.username ?? 'bot'} -> press Open (Mini App) -> Threads -> toggle ON, then retry /setforum.` + ) + return + } + + const integrations = await listIntegrationsByTelegramId(id) + if (integrations.length === 0) { + await ctx.reply('No Huly integration found. Connect a workspace first via /connect.') + return + } + + for (const integration of integrations) { + await updateIntegrationData(integration, { forumChatId: id }) + } + + await ctx.reply( + 'Forum routing enabled. Every Huly channel will become its own topic in this DM. ' + + 'Use /unsetforum to turn it off.' + ) +} + +async function onUnsetForum (ctx: Context, worker: PlatformWorker): Promise { + const id = ctx.from?.id + if (id === undefined) return + + const integrations = await listIntegrationsByTelegramId(id) + if (integrations.length === 0) { + await ctx.reply('No Huly integration found.') + return + } + + for (const integration of integrations) { + await updateIntegrationData(integration, { forumChatId: null }) + } + + await ctx.reply('Forum routing disabled. Notifications will return to this DM.') +} + async function onConnect (ctx: Context, worker: PlatformWorker): Promise { const id = ctx.from?.id const lang = ctx.from?.language_code ?? 'en' @@ -178,4 +239,6 @@ export async function defineCommands (bot: Telegraf, worker: Platform bot.command(Command.Connect, (ctx) => onConnect(ctx, worker)) bot.command(Command.SyncAllChannels, (ctx) => onSyncChannels(ctx, worker, false)) bot.command(Command.SyncStarredChannels, (ctx) => onSyncChannels(ctx, worker, true)) + bot.command(Command.SetForum, (ctx) => onSetForum(ctx, worker)) + bot.command(Command.UnsetForum, (ctx) => onUnsetForum(ctx, worker)) } diff --git a/services/telegram-bot/pod-telegram-bot/src/types.ts b/services/telegram-bot/pod-telegram-bot/src/types.ts index 978321a21fe..6197581d2e4 100644 --- a/services/telegram-bot/pod-telegram-bot/src/types.ts +++ b/services/telegram-bot/pod-telegram-bot/src/types.ts @@ -42,6 +42,15 @@ export interface ReplyRecord { replyId: number } +export interface ForumTopicRecord { + workspace: WorkspaceUuid + account: AccountUuid + channelId: Ref + forumChatId: number + topicId: number + createdAt: Date +} + export interface OtpRecord { telegramId: number telegramUsername: string diff --git a/services/telegram-bot/pod-telegram-bot/src/worker.ts b/services/telegram-bot/pod-telegram-bot/src/worker.ts index 3a21d3fb4da..d0ade0bf08e 100644 --- a/services/telegram-bot/pod-telegram-bot/src/worker.ts +++ b/services/telegram-bot/pod-telegram-bot/src/worker.ts @@ -23,6 +23,7 @@ import { ActivityMessage } from '@hcengineering/activity' import { ChannelId, ChannelRecord, + ForumTopicRecord, IntegrationInfo, MessageRecord, PlatformFileInfo, @@ -143,6 +144,34 @@ export class PlatformWorker { return await wsClient.getFiles(message) } + async getChannelInfoForMessage ( + workspace: WorkspaceUuid, + account: AccountUuid, + messageId: Ref + ): Promise<{ channelId: Ref, channelName: string } | undefined> { + const wsClient = await WorkspaceClient.create(workspace, account, this.ctx, this.storage) + const channel = await wsClient.getChannelForActivityMessage(messageId) + if (channel === undefined) return undefined + const channelName = await this.getChannelName(wsClient, channel, account) + return { channelId: channel._id, channelName } + } + + async getForumTopic ( + workspace: WorkspaceUuid, + account: AccountUuid, + channelId: Ref + ): Promise { + return await this.db.getForumTopic(workspace, account, channelId) + } + + async saveForumTopic (record: Omit): Promise { + await this.db.insertForumTopic(record) + } + + async getForumTopicByThread (forumChatId: number, topicId: number): Promise { + return await this.db.getForumTopicByThread(forumChatId, topicId) + } + async updateTelegramUsername (personId: PersonId, telegramUsername: string): Promise { await getAccountClient(serviceToken()).updateSocialId(personId, telegramUsername) } @@ -245,6 +274,37 @@ export class PlatformWorker { return true } + /** + * Looks up a channel by its Huly Ref (not the internal rowid), used by the forum-topic + * outbound path where we know the channel ref from the forum_topics table but don't + * have the corresponding channelsTable row cached. + */ + async resolveChannelByRef ( + workspace: WorkspaceUuid, + account: AccountUuid, + channelRef: Ref + ): Promise { + for (const cached of this.channelByRowId.values()) { + if (cached.workspace === workspace && cached.account === account && cached._id === channelRef) { + return cached + } + } + + const client = await WorkspaceClient.create(workspace, account, this.ctx, this.storage) + const space = await client.findChunterSpace(channelRef) + if (space === undefined) return undefined + + const name = await this.getChannelName(client, space, account) + return { + rowId: `forum:${channelRef}` as ChannelId, + workspace, + _id: space._id, + _class: space._class, + name, + account + } + } + async syncChannels (account: AccountUuid, workspace: WorkspaceUuid, onlyStarred: boolean): Promise { const client = await WorkspaceClient.create(workspace, account, this.ctx, this.storage) const channels = await client.getChannels(account, onlyStarred) @@ -371,6 +431,7 @@ export class PlatformWorker { } const integration = integrations[0] + const forumChatId = this.getForumChatId(integrations) void this.limiter.add(integration.telegramId, async () => { const { full: fullMessage, short: shortMessage } = toTelegramHtml(record) @@ -380,16 +441,36 @@ export class PlatformWorker { : [] const tgMessageIds: number[] = [] - if (files.length === 0) { - const message = await bot.telegram.sendMessage(integration.telegramId, fullMessage, { - parse_mode: 'HTML' - }) + let targetChatId: number = integration.telegramId + let threadId: number | undefined + if (forumChatId !== undefined && record.messageId != null) { + const routed = await this.resolveForumTopic( + bot, + forumChatId, + workspace, + record.account, + record.messageId + ) + if (routed !== undefined) { + targetChatId = forumChatId + threadId = routed + } + } + const baseExtra: Record = { parse_mode: 'HTML' } + if (threadId !== undefined) baseExtra.message_thread_id = threadId + + if (files.length === 0) { + const message = await bot.telegram.sendMessage(targetChatId, fullMessage, baseExtra as any) tgMessageIds.push(message.message_id) } else { const groups = toMediaGroups(files, fullMessage, shortMessage) for (const group of groups) { - const mediaGroup = await bot.telegram.sendMediaGroup(integration.telegramId, group) + const mediaGroup = await bot.telegram.sendMediaGroup( + targetChatId, + group, + threadId !== undefined ? ({ message_thread_id: threadId } as any) : undefined + ) tgMessageIds.push(...mediaGroup.map((it) => it.message_id)) } } @@ -406,6 +487,63 @@ export class PlatformWorker { }) } + /** + * Returns the forum chat id stored on any of the user's integrations, or undefined + * if forum routing has not been configured via /setforum. Cleared values (null) + * are treated the same as unset. + */ + getForumChatId (integrations: IntegrationInfo[]): number | undefined { + for (const integration of integrations) { + const raw = integration.data?.forumChatId + if (typeof raw === 'number' && Number.isFinite(raw)) return raw + } + return undefined + } + + /** + * Looks up (or lazily creates) the Telegram forum topic that mirrors the Huly channel + * that owns this activity message. Returns the topic message_thread_id, or undefined + * if the channel cannot be resolved or topic creation fails. + */ + async resolveForumTopic ( + bot: Telegraf, + forumChatId: number, + workspace: WorkspaceUuid, + account: AccountUuid, + messageId: Ref + ): Promise { + let channelInfo: { channelId: Ref, channelName: string } | undefined + try { + channelInfo = await this.getChannelInfoForMessage(workspace, account, messageId) + } catch (e) { + this.ctx.warn('Failed to resolve channel for forum topic', { error: e, messageId }) + return undefined + } + if (channelInfo === undefined) return undefined + + const existing = await this.db.getForumTopic(workspace, account, channelInfo.channelId) + if (existing !== undefined) return existing.topicId + + try { + const created = await bot.telegram.createForumTopic(forumChatId, channelInfo.channelName) + await this.db.insertForumTopic({ + workspace, + account, + channelId: channelInfo.channelId, + forumChatId, + topicId: created.message_thread_id + }) + return created.message_thread_id + } catch (e) { + this.ctx.warn('Failed to create forum topic, falling back to DM', { + error: e, + forumChatId, + channelName: channelInfo.channelName + }) + return undefined + } + } + async processWorkspaceSubscription ( workspace: WorkspaceUuid, record: TelegramWorkspaceSubscriptionQueueMessage diff --git a/services/telegram-bot/pod-telegram-bot/src/workspace.ts b/services/telegram-bot/pod-telegram-bot/src/workspace.ts index d7893f74543..b33037e508d 100644 --- a/services/telegram-bot/pod-telegram-bot/src/workspace.ts +++ b/services/telegram-bot/pod-telegram-bot/src/workspace.ts @@ -258,6 +258,33 @@ export class WorkspaceClient { return res } + /** + * Resolves the chunter space (Channel or DirectMessage) that owns the activity + * message identified by `messageId`. For thread messages we walk to the parent + * object rather than the comment itself. Returns undefined when the message is + * not attached to a chunter space, so the caller can fall back to a plain DM. + */ + async getChannelForActivityMessage ( + messageId: Ref + ): Promise { + const message = await this.client.findOne(activity.class.ActivityMessage, { _id: messageId }) + if (message === undefined) return undefined + + let channelId: Ref + if (this.hierarchy.isDerived(message._class, chunter.class.ThreadMessage)) { + const thread = message as ThreadMessage + channelId = thread.objectId as Ref + } else { + channelId = message.attachedTo as Ref + } + + return await this.client.findOne(chunter.class.ChunterSpace, { _id: channelId }) + } + + async findChunterSpace (channelRef: Ref): Promise { + return await this.client.findOne(chunter.class.ChunterSpace, { _id: channelRef }) + } + async getChannels (account: AccountUuid, onlyStarred: boolean): Promise { if (!onlyStarred) { return await this.client.findAll(chunter.class.ChunterSpace, {