null

Создаём умного Telegram-бота на Kotlin: асинхронная архитектура с корутинами и обработкой медиа-групп

Введение

Мир Telegram-ботов гораздо глубже, чем кажется на первый взгляд. Если вы создаёте что-то большее, чем «бот, отвечающий /start», рано или поздно столкнётесь с:

  • параллельной обработкой входящих сообщений;
  • групповыми медиа-сообщениями;
  • синхронизацией доступа к базе;
  • необходимостью ограничения времени выполнения команд.

В этой статье мы пройдём путь от базовой архитектуры до реализации производительного, отказоустойчивого Telegram-бота на Kotlin, использующего корутины, асинхронную агрегацию медиа-групп и безопасные транзакции в базе данных.

Архитектура глазами разработчика

Начнём с верхнего уровня. Для создания Telegram-бота используется библиотека TelegramBots. Бот получает обновления (Update) через Telegram Bots API. При поступлении любого нового Update метод consume вызывается автоматически. При этом, обработка сообщений должна быть:

  • быстрой,
  • гибкой,
  • надёжной.

В нашем примере бот работает через реализацию интерфейса LongPollingSingleThreadUpdateConsumer — это гарантирует, что входящие сообщения обрабатываются последовательно, в одном потоке.

class BasicConsumer : LongPollingSingleThreadUpdateConsumer {
    private val dispatcher = CommandDispatcher()

    // Коллектор сообщений из медиа-групп
    private val mediaCollector = MediaGroupCollector(
        scope = CoroutineScope(Dispatchers.Default),
        onUpdatesReady = { updates -> dispatchImmediately(updates) }
    )

    override fun consume(update: Update) {
        mediaCollector.processUpdate(update)
    }

    private fun dispatchImmediately(updates: List<Update>) {
        runBlocking {
            newSuspendedTransaction {
                withTimeout(30_000L) {
                    dispatcher.dispatchCommands(updates)
                }
            }
        }
    }
}

 

Разберём ключевые блоки runBlocking + newSuspendedTransaction + withTimeout - эта комбинация даёт мощную гарантию:

  • runBlocking — “мост” из обычного Java мира в мир suspend-функций; 

Он нужен, чтобы из синхронного метода consume войти в suspend-экосистему корутин. Таким образом, мы можем вызвать suspend-функции (newSuspendedTransaction, withTimeout, dispatcher.dispatchCommands) внутри обычного Java-метода, не возвращаясь из consume раньше времени.

  • newSuspendedTransaction — транзакция, безопасная для корутин; 

Она позволяет безопасно выполнять операции с БД внутри корутинного контекста. Фактически это то же самое, что традиционный transaction { ... }, но адаптированный для suspend-функций. Из документации: «Bridge функции newSuspendedTransaction() и suspendedTransactionAsync() дают безопасный способ взаимодействовать с БД внутри suspend-блоков. При этом можно указать нужный диспетчер; если не указать – выполнится в текущем контексте. В нашем случае транзакция будет выполнена в контексте корутины с Dispatchers.Default (унаследованном от runBlocking). Это обеспечивает корректное выполнение запросов без проблем многопоточности JDBC.

  • withTimeout — не даст команде зависнуть надолго.

Внутри транзакции используется withTimeout(CommandExecutionTimeoutMs). Это suspend-функция из стандартной библиотеки корутин: она запускает блок кода с ограничением по времени и выбрасывает TimeoutCancellationException, если время выполнения превысило лимит. Здесь установлено CommandExecutionTimeoutMs = 30000 (30 секунд). Это означает, что выполнение всех команд должно завершиться за 30 секунд, иначе будет отменено. Такой таймаут защищает бота от зависаний: если какая-то команда затормозила (например, долгий сетевой запрос или рекурсия), она не будет висеть бесконечно.

Проблема с медиа-группами

Одна из особенностей Telegram – это группы медиа-обновлений (альбомы). Когда пользователь отправляет несколько фото/видео одним альбомом, бот получает несколько отдельных Message с одним и тем же полем media_group_id. Как указано в документации Telegram Bot API, поле media_group_id содержит «уникальный идентификатор группы медиа-сообщений, к которой принадлежит данное сообщение. Это позволяет объединять связанные обновления в группу.

Пример:

[
  { "message_id": 101, "media_group_id": "abc123", "photo": ... },
  { "message_id": 102, "media_group_id": "abc123", "photo": ... },
  { "message_id": 103, "media_group_id": "abc123", "photo": ... }
]

 

Это значит, что бот может получить 3 обновления с задержкой между ними в десятки миллисекунд. Чтобы обработать их корректно, мы должны:

  1. Собрать все части;
  2. Дождаться, пока придут остальные;
  3. Упорядочить их;
  4. И только потом передать в команду.

MediaGroupCollector

Вот наш герой:

class MediaGroupCollector(
    private val scope: CoroutineScope,
    private val onUpdatesReady: (List<Update>) -> Unit
) {
    private val mediaGroups = ConcurrentHashMap<String, MutableList<Update>>()
    private val jobs = ConcurrentHashMap<String, Job>()

    fun processUpdate(update: Update) {
        val message = update.message ?: return dispatchImmediately(update)
        val mediaGroupId = message.mediaGroupId
        if (mediaGroupId == null) return dispatchImmediately(update)

        val chatId = message.chatId.toString()
        val key = "$chatId:$mediaGroupId"

        mediaGroups.compute(key) { _, list ->
            val newList = list ?: mutableListOf()
            newList.add(update)
            if (newList.size == 1) {
                scheduleGroupProcessing(key)
            }
            newList
        }
    }

    private fun scheduleGroupProcessing(key: String) {
        jobs[key] = scope.launch {
            delay(1000) // Ждём остальные части группы

            val updates = mediaGroups.remove(key) ?: return@launch
            jobs.remove(key)
            val sorted = updates.sortedBy { it.message?.messageId }
            onUpdatesReady(sorted)
        }
    }

    private fun dispatchImmediately(update: Update) {
        onUpdatesReady(listOf(update))
    }

    fun cancelAll() {
        jobs.values.forEach { it.cancel() }
        mediaGroups.clear()
        jobs.clear()
    }
}

 

Этот код работает неблокирующе, и может обрабатывать десятки групп параллельно, не загружая основной поток. 

Класс MediaGroupCollector реализует сбор таких групп. Алгоритм работы упрощённо такой:

  1. При получении update проверяем, есть ли в нём message и у него mediaGroupId.
  2. Если message отсутствует или поле mediaGroupId равно null, обновление сразу отправляется на обработку (вызывается dispatchImmediately(update)).
  3. Если mediaGroupId присутствует, формируется ключ groupKey = chatId:mediaGroupId и в потокобезопасную карту mediaGroups добавляется текущее update. Если это первое сообщение с таким ключом, запускается планировщик через scheduleGroupProcessing(groupKey).
  4. Метод scheduleGroupProcessing запускает новую корутину (в переданном CoroutineScope, здесь – на Dispatchers.Default). В корутине выполняется delay(1000) – пауза 1 секунда, чтобы собрать остальные сообщения группы. После задержки все накопленные обновления из mediaGroups[groupKey] извлекаются, сортируются по messageId (чтобы сохранить исходный порядок) и передаются дальше как единое целое: onUpdatesReady(sortedUpdates).
  5. Если нужно обработать одиночное обновление (без группы), вызывается dispatchImmediately(update), который просто передаёт список из одного элемента updates = listOf(update) в onUpdatesReady.

Таким образом, MediaGroupCollector асинхронно и неблокирующе агрегирует связанные сообщения-альбомы. Благодаря использованию корутины с Dispatchers.Default сбор и ожидание не блокирует основной поток получения обновлений – корутина просто приостанавливается на delay(1000), а в это время можно обрабатывать другие обновления или запустить сбор других групп. Важно, что пауза всего 1 секунда – обычно этого достаточно, чтобы Telegram прислал все сообщения альбома, но при этом не слишком долгий таймаут для бота.

Архитектура команд

Что происходит, когда все обновления собраны и передаются в dispatchCommands?

Входит диспетчер:

interface Command {
    suspend fun execute(update: Update) = Unit
    suspend fun execute(updates: List<Update>) = Unit

    suspend fun sizeOverrideExecute(updates: List<Update>) =
        if (updates.size == 1) execute(updates.first()) else execute(updates)

    fun customTrigger(update: Update): Boolean = false
    fun customTrigger(updates: List<Update>): Boolean = false

    fun filterChain(update: Update): Boolean = true
}

 

Каждая команда:

  • Проверяет, стоит ли ей срабатывать (customTrigger);
  • Выполняет своё дело (execute);
  • Может фильтровать доступ по правам (filterChain).

Пример простой команды:

class Test(
    override val triggerName: String
) : AdminCommand() {
    override suspend fun execute(update: Update) {
        telegramClient.execute(
            SendMessage(
                update.message.chatId.toString(),
                update.extractText()
            )
        )
    }
}

 

Расширяемость

Добавить новую команду — просто:

val dispatcher = CommandDispatcher(
    listOf(
        EchoCommand(),
        PhotoAlbumCommand(),
        // Сюда можно добавлять свои
    )
)

 

Корутинные плюсы

Использование Kotlin-корутин даёт боту важные преимущества. Корутины позволяют писать «синхронно выглядящий» код, который на самом деле выполняется асинхронно, не блокируя поток. Это значит, что долгие операции (доступ к БД, сетевые запросы, ожидание нескольких сообщений) не «замораживают» основное приложение. Корутины очень лёгкие: их можно запускать тысячами на одном потоке, поскольку они используют suspension (приостановку) вместо тяжёлой блокировки потоков. В нашем коде, например, MediaGroupCollector создаёт корутину для каждой группы медиа, а в этот момент основной поток приёма обновлений продолжает работать.

Корутины также встроенно поддерживают отмену. Если корутина ждёт в delay или выполняет какую-то suspend-операцию, её можно отменить, и она завершится с исключением CancellationException. Мы используем это через withTimeout: при превышении лимита время выполнения «всплывает» как исключение и текущая корутина прерывается. При масштабировании бота можно было бы отменять корутины (например, все накопительные задачи) при завершении работы приложения вызовом cancelAll().

Кратко, почему всё это работает как часы?

  • delay(1000) не блокирует поток;
  • Можно запускать сотни параллельных Job на обработку;
  • withTimeout автоматически отменяет медленные задачи;
  • newSuspendedTransaction не ломает данные, используя suspend.

Работа со scope

В коде передается CoroutineScope(Dispatchers.Default). Это означает, что все корутины запускаются в одном общем контексте с фоновыми потоками. По умолчанию, если не указывать свой Job, используются воркеры диспетчера, распределяющие корутины по потокам. Такой подход упрощает дизайн, но в более сложных приложениях можно было бы использовать структурированную конкуренцию – создавать один «корневой» scope на уровне приложения и порождать дочерние scope для задач. Это позволило бы централизованно отменять все корутины при выходе. Тем не менее, даже без этого текущая архитектура эффективна для однопоточного приёма и многопоточной обработки.

Заключение

Такой бот — это не просто эхо-ответчик, а основа для сложного, масштабируемого ассистента. Его можно дополнить:

  • Кэшами (например, в memory или Redis);
  • Админ-командами;
  • Логами;
  • Отправкой сообщений в очередь для асинхронной обработки.

Сама архитектура позволяет легко это сделать. Главное — правильное использование корутин, асинхронных коллекторов, timeout-защиты и структурной маршрутизации команд.

Назад Вперед