Введение
Мир Telegram-ботов гораздо глубже, чем кажется на первый взгляд. Если вы создаёте что-то большее, чем «бот, отвечающий /start», рано или поздно столкнётесь с:
- параллельной обработкой входящих сообщений;
- групповыми медиа-сообщениями;
- синхронизацией доступа к базе;
- необходимостью ограничения времени выполнения команд.
В этой статье мы пройдём путь от базовой архитектуры до реализации производительного, отказоустойчивого Telegram-бота на Kotlin, использующего корутины, асинхронную агрегацию медиа-групп и безопасные транзакции в базе данных.
Архитектура глазами разработчика
Начнём с верхнего уровня. Для создания Telegram-бота используется библиотека TelegramBots. Бот получает обновления (Update) через Telegram Bots API. При поступлении любого нового Update метод consume вызывается автоматически. При этом, обработка сообщений должна быть:
- быстрой,
- гибкой,
- надёжной.
В нашем примере бот работает через реализацию интерфейса LongPollingSingleThreadUpdateConsumer — это гарантирует, что входящие сообщения обрабатываются последовательно, в одном потоке.
class BasicConsumer : LongPollingSingleThreadUpdateConsumer {
private val dispatcher = CommandDispatcher()
// Коллектор сообщений из медиа-групп
private val mediaCollector = MediaGroupCollector(
scope = CoroutineScope(Dispatchers.Default),
onUpdatesReady = { updates -> dispatchImmediately(updates) }
)
override fun consume(update: Update) {
mediaCollector.processUpdate(update)
}
private fun dispatchImmediately(updates: List<Update>) {
runBlocking {
newSuspendedTransaction {
withTimeout(30_000L) {
dispatcher.dispatchCommands(updates)
}
}
}
}
}
Разберём ключевые блоки runBlocking + newSuspendedTransaction + withTimeout - эта комбинация даёт мощную гарантию:
- runBlocking — “мост” из обычного Java мира в мир suspend-функций;
Он нужен, чтобы из синхронного метода consume войти в suspend-экосистему корутин. Таким образом, мы можем вызвать suspend-функции (newSuspendedTransaction, withTimeout, dispatcher.dispatchCommands) внутри обычного Java-метода, не возвращаясь из consume раньше времени.
- newSuspendedTransaction — транзакция, безопасная для корутин;
Она позволяет безопасно выполнять операции с БД внутри корутинного контекста. Фактически это то же самое, что традиционный transaction { ... }, но адаптированный для suspend-функций. Из документации: «Bridge функции newSuspendedTransaction() и suspendedTransactionAsync() дают безопасный способ взаимодействовать с БД внутри suspend-блоков. При этом можно указать нужный диспетчер; если не указать – выполнится в текущем контексте. В нашем случае транзакция будет выполнена в контексте корутины с Dispatchers.Default (унаследованном от runBlocking). Это обеспечивает корректное выполнение запросов без проблем многопоточности JDBC.
- withTimeout — не даст команде зависнуть надолго.
Внутри транзакции используется withTimeout(CommandExecutionTimeoutMs). Это suspend-функция из стандартной библиотеки корутин: она запускает блок кода с ограничением по времени и выбрасывает TimeoutCancellationException, если время выполнения превысило лимит. Здесь установлено CommandExecutionTimeoutMs = 30000 (30 секунд). Это означает, что выполнение всех команд должно завершиться за 30 секунд, иначе будет отменено. Такой таймаут защищает бота от зависаний: если какая-то команда затормозила (например, долгий сетевой запрос или рекурсия), она не будет висеть бесконечно.
Проблема с медиа-группами
Одна из особенностей Telegram – это группы медиа-обновлений (альбомы). Когда пользователь отправляет несколько фото/видео одним альбомом, бот получает несколько отдельных Message с одним и тем же полем media_group_id. Как указано в документации Telegram Bot API, поле media_group_id содержит «уникальный идентификатор группы медиа-сообщений, к которой принадлежит данное сообщение. Это позволяет объединять связанные обновления в группу.
Пример:
[
{ "message_id": 101, "media_group_id": "abc123", "photo": ... },
{ "message_id": 102, "media_group_id": "abc123", "photo": ... },
{ "message_id": 103, "media_group_id": "abc123", "photo": ... }
]
Это значит, что бот может получить 3 обновления с задержкой между ними в десятки миллисекунд. Чтобы обработать их корректно, мы должны:
- Собрать все части;
- Дождаться, пока придут остальные;
- Упорядочить их;
- И только потом передать в команду.
MediaGroupCollector
Вот наш герой:
class MediaGroupCollector(
private val scope: CoroutineScope,
private val onUpdatesReady: (List<Update>) -> Unit
) {
private val mediaGroups = ConcurrentHashMap<String, MutableList<Update>>()
private val jobs = ConcurrentHashMap<String, Job>()
fun processUpdate(update: Update) {
val message = update.message ?: return dispatchImmediately(update)
val mediaGroupId = message.mediaGroupId
if (mediaGroupId == null) return dispatchImmediately(update)
val chatId = message.chatId.toString()
val key = "$chatId:$mediaGroupId"
mediaGroups.compute(key) { _, list ->
val newList = list ?: mutableListOf()
newList.add(update)
if (newList.size == 1) {
scheduleGroupProcessing(key)
}
newList
}
}
private fun scheduleGroupProcessing(key: String) {
jobs[key] = scope.launch {
delay(1000) // Ждём остальные части группы
val updates = mediaGroups.remove(key) ?: return@launch
jobs.remove(key)
val sorted = updates.sortedBy { it.message?.messageId }
onUpdatesReady(sorted)
}
}
private fun dispatchImmediately(update: Update) {
onUpdatesReady(listOf(update))
}
fun cancelAll() {
jobs.values.forEach { it.cancel() }
mediaGroups.clear()
jobs.clear()
}
}
Этот код работает неблокирующе, и может обрабатывать десятки групп параллельно, не загружая основной поток.
Класс MediaGroupCollector реализует сбор таких групп. Алгоритм работы упрощённо такой:
- При получении
update проверяем, есть ли в нём message и у него mediaGroupId.
- Если
message отсутствует или поле mediaGroupId равно null, обновление сразу отправляется на обработку (вызывается dispatchImmediately(update)).
- Если
mediaGroupId присутствует, формируется ключ groupKey = chatId:mediaGroupId и в потокобезопасную карту mediaGroups добавляется текущее update. Если это первое сообщение с таким ключом, запускается планировщик через scheduleGroupProcessing(groupKey).
- Метод
scheduleGroupProcessing запускает новую корутину (в переданном CoroutineScope, здесь – на Dispatchers.Default). В корутине выполняется delay(1000) – пауза 1 секунда, чтобы собрать остальные сообщения группы. После задержки все накопленные обновления из mediaGroups[groupKey] извлекаются, сортируются по messageId (чтобы сохранить исходный порядок) и передаются дальше как единое целое: onUpdatesReady(sortedUpdates).
- Если нужно обработать одиночное обновление (без группы), вызывается
dispatchImmediately(update), который просто передаёт список из одного элемента updates = listOf(update) в onUpdatesReady.
Таким образом, MediaGroupCollector асинхронно и неблокирующе агрегирует связанные сообщения-альбомы. Благодаря использованию корутины с Dispatchers.Default сбор и ожидание не блокирует основной поток получения обновлений – корутина просто приостанавливается на delay(1000), а в это время можно обрабатывать другие обновления или запустить сбор других групп. Важно, что пауза всего 1 секунда – обычно этого достаточно, чтобы Telegram прислал все сообщения альбома, но при этом не слишком долгий таймаут для бота.
Архитектура команд
Что происходит, когда все обновления собраны и передаются в dispatchCommands?
Входит диспетчер:
interface Command {
suspend fun execute(update: Update) = Unit
suspend fun execute(updates: List<Update>) = Unit
suspend fun sizeOverrideExecute(updates: List<Update>) =
if (updates.size == 1) execute(updates.first()) else execute(updates)
fun customTrigger(update: Update): Boolean = false
fun customTrigger(updates: List<Update>): Boolean = false
fun filterChain(update: Update): Boolean = true
}
Каждая команда:
- Проверяет, стоит ли ей срабатывать (
customTrigger);
- Выполняет своё дело (
execute);
- Может фильтровать доступ по правам (
filterChain).
Пример простой команды:
class Test(
override val triggerName: String
) : AdminCommand() {
override suspend fun execute(update: Update) {
telegramClient.execute(
SendMessage(
update.message.chatId.toString(),
update.extractText()
)
)
}
}
Расширяемость
Добавить новую команду — просто:
val dispatcher = CommandDispatcher(
listOf(
EchoCommand(),
PhotoAlbumCommand(),
// Сюда можно добавлять свои
)
)
Корутинные плюсы
Использование Kotlin-корутин даёт боту важные преимущества. Корутины позволяют писать «синхронно выглядящий» код, который на самом деле выполняется асинхронно, не блокируя поток. Это значит, что долгие операции (доступ к БД, сетевые запросы, ожидание нескольких сообщений) не «замораживают» основное приложение. Корутины очень лёгкие: их можно запускать тысячами на одном потоке, поскольку они используют suspension (приостановку) вместо тяжёлой блокировки потоков. В нашем коде, например, MediaGroupCollector создаёт корутину для каждой группы медиа, а в этот момент основной поток приёма обновлений продолжает работать.
Корутины также встроенно поддерживают отмену. Если корутина ждёт в delay или выполняет какую-то suspend-операцию, её можно отменить, и она завершится с исключением CancellationException. Мы используем это через withTimeout: при превышении лимита время выполнения «всплывает» как исключение и текущая корутина прерывается. При масштабировании бота можно было бы отменять корутины (например, все накопительные задачи) при завершении работы приложения вызовом cancelAll().
Кратко, почему всё это работает как часы?
delay(1000) не блокирует поток;
- Можно запускать сотни параллельных
Job на обработку;
withTimeout автоматически отменяет медленные задачи;
newSuspendedTransaction не ломает данные, используя suspend.
Работа со scope
В коде передается CoroutineScope(Dispatchers.Default). Это означает, что все корутины запускаются в одном общем контексте с фоновыми потоками. По умолчанию, если не указывать свой Job, используются воркеры диспетчера, распределяющие корутины по потокам. Такой подход упрощает дизайн, но в более сложных приложениях можно было бы использовать структурированную конкуренцию – создавать один «корневой» scope на уровне приложения и порождать дочерние scope для задач. Это позволило бы централизованно отменять все корутины при выходе. Тем не менее, даже без этого текущая архитектура эффективна для однопоточного приёма и многопоточной обработки.
Заключение
Такой бот — это не просто эхо-ответчик, а основа для сложного, масштабируемого ассистента. Его можно дополнить:
- Кэшами (например, в memory или Redis);
- Админ-командами;
- Логами;
- Отправкой сообщений в очередь для асинхронной обработки.
Сама архитектура позволяет легко это сделать. Главное — правильное использование корутин, асинхронных коллекторов, timeout-защиты и структурной маршрутизации команд.