Введение
Мир Telegram-ботов гораздо глубже, чем кажется на первый взгляд. Если вы создаёте что-то большее, чем «бот, отвечающий /start», рано или поздно столкнётесь с:
- параллельной обработкой входящих сообщений;
- групповыми медиа-сообщениями;
- синхронизацией доступа к базе;
- необходимостью ограничения времени выполнения команд.
В этой статье мы пройдём путь от базовой архитектуры до реализации производительного, отказоустойчивого Telegram-бота на Kotlin, использующего корутины, асинхронную агрегацию медиа-групп и безопасные транзакции в базе данных.
Архитектура глазами разработчика
Начнём с верхнего уровня. Для создания Telegram-бота используется библиотека TelegramBots. Бот получает обновления (Update
) через Telegram Bots API. При поступлении любого нового Update
метод consume
вызывается автоматически. При этом, обработка сообщений должна быть:
- быстрой,
- гибкой,
- надёжной.
В нашем примере бот работает через реализацию интерфейса LongPollingSingleThreadUpdateConsumer
— это гарантирует, что входящие сообщения обрабатываются последовательно, в одном потоке.
class BasicConsumer : LongPollingSingleThreadUpdateConsumer {
private val dispatcher = CommandDispatcher()
// Коллектор сообщений из медиа-групп
private val mediaCollector = MediaGroupCollector(
scope = CoroutineScope(Dispatchers.Default),
onUpdatesReady = { updates -> dispatchImmediately(updates) }
)
override fun consume(update: Update) {
mediaCollector.processUpdate(update)
}
private fun dispatchImmediately(updates: List<Update>) {
runBlocking {
newSuspendedTransaction {
withTimeout(30_000L) {
dispatcher.dispatchCommands(updates)
}
}
}
}
}
Разберём ключевые блоки runBlocking + newSuspendedTransaction + withTimeout
- эта комбинация даёт мощную гарантию:
- runBlocking — “мост” из обычного Java мира в мир suspend-функций;
Он нужен, чтобы из синхронного метода consume
войти в suspend-экосистему корутин. Таким образом, мы можем вызвать suspend-функции (newSuspendedTransaction
, withTimeout
, dispatcher.dispatchCommands
) внутри обычного Java-метода, не возвращаясь из consume
раньше времени.
- newSuspendedTransaction — транзакция, безопасная для корутин;
Она позволяет безопасно выполнять операции с БД внутри корутинного контекста. Фактически это то же самое, что традиционный transaction { ... }
, но адаптированный для suspend-функций. Из документации: «Bridge функции newSuspendedTransaction()
и suspendedTransactionAsync()
дают безопасный способ взаимодействовать с БД внутри suspend
-блоков. При этом можно указать нужный диспетчер; если не указать – выполнится в текущем контексте. В нашем случае транзакция будет выполнена в контексте корутины с Dispatchers.Default
(унаследованном от runBlocking
). Это обеспечивает корректное выполнение запросов без проблем многопоточности JDBC.
- withTimeout — не даст команде зависнуть надолго.
Внутри транзакции используется withTimeout(CommandExecutionTimeoutMs)
. Это suspend-функция из стандартной библиотеки корутин: она запускает блок кода с ограничением по времени и выбрасывает TimeoutCancellationException
, если время выполнения превысило лимит. Здесь установлено CommandExecutionTimeoutMs = 30000
(30 секунд). Это означает, что выполнение всех команд должно завершиться за 30 секунд, иначе будет отменено. Такой таймаут защищает бота от зависаний: если какая-то команда затормозила (например, долгий сетевой запрос или рекурсия), она не будет висеть бесконечно.
Проблема с медиа-группами
Одна из особенностей Telegram – это группы медиа-обновлений (альбомы). Когда пользователь отправляет несколько фото/видео одним альбомом, бот получает несколько отдельных Message
с одним и тем же полем media_group_id
. Как указано в документации Telegram Bot API, поле media_group_id
содержит «уникальный идентификатор группы медиа-сообщений, к которой принадлежит данное сообщение. Это позволяет объединять связанные обновления в группу.
Пример:
[
{ "message_id": 101, "media_group_id": "abc123", "photo": ... },
{ "message_id": 102, "media_group_id": "abc123", "photo": ... },
{ "message_id": 103, "media_group_id": "abc123", "photo": ... }
]
Это значит, что бот может получить 3 обновления с задержкой между ними в десятки миллисекунд. Чтобы обработать их корректно, мы должны:
- Собрать все части;
- Дождаться, пока придут остальные;
- Упорядочить их;
- И только потом передать в команду.
MediaGroupCollector
Вот наш герой:
class MediaGroupCollector(
private val scope: CoroutineScope,
private val onUpdatesReady: (List<Update>) -> Unit
) {
private val mediaGroups = ConcurrentHashMap<String, MutableList<Update>>()
private val jobs = ConcurrentHashMap<String, Job>()
fun processUpdate(update: Update) {
val message = update.message ?: return dispatchImmediately(update)
val mediaGroupId = message.mediaGroupId
if (mediaGroupId == null) return dispatchImmediately(update)
val chatId = message.chatId.toString()
val key = "$chatId:$mediaGroupId"
mediaGroups.compute(key) { _, list ->
val newList = list ?: mutableListOf()
newList.add(update)
if (newList.size == 1) {
scheduleGroupProcessing(key)
}
newList
}
}
private fun scheduleGroupProcessing(key: String) {
jobs[key] = scope.launch {
delay(1000) // Ждём остальные части группы
val updates = mediaGroups.remove(key) ?: return@launch
jobs.remove(key)
val sorted = updates.sortedBy { it.message?.messageId }
onUpdatesReady(sorted)
}
}
private fun dispatchImmediately(update: Update) {
onUpdatesReady(listOf(update))
}
fun cancelAll() {
jobs.values.forEach { it.cancel() }
mediaGroups.clear()
jobs.clear()
}
}
Этот код работает неблокирующе, и может обрабатывать десятки групп параллельно, не загружая основной поток.
Класс MediaGroupCollector
реализует сбор таких групп. Алгоритм работы упрощённо такой:
- При получении
update
проверяем, есть ли в нём message
и у него mediaGroupId
.
- Если
message
отсутствует или поле mediaGroupId
равно null
, обновление сразу отправляется на обработку (вызывается dispatchImmediately(update)
).
- Если
mediaGroupId
присутствует, формируется ключ groupKey = chatId:mediaGroupId
и в потокобезопасную карту mediaGroups
добавляется текущее update
. Если это первое сообщение с таким ключом, запускается планировщик через scheduleGroupProcessing(groupKey)
.
- Метод
scheduleGroupProcessing
запускает новую корутину (в переданном CoroutineScope
, здесь – на Dispatchers.Default
). В корутине выполняется delay(1000)
– пауза 1 секунда, чтобы собрать остальные сообщения группы. После задержки все накопленные обновления из mediaGroups[groupKey]
извлекаются, сортируются по messageId
(чтобы сохранить исходный порядок) и передаются дальше как единое целое: onUpdatesReady(sortedUpdates)
.
- Если нужно обработать одиночное обновление (без группы), вызывается
dispatchImmediately(update)
, который просто передаёт список из одного элемента updates = listOf(update)
в onUpdatesReady
.
Таким образом, MediaGroupCollector
асинхронно и неблокирующе агрегирует связанные сообщения-альбомы. Благодаря использованию корутины с Dispatchers.Default
сбор и ожидание не блокирует основной поток получения обновлений – корутина просто приостанавливается на delay(1000)
, а в это время можно обрабатывать другие обновления или запустить сбор других групп. Важно, что пауза всего 1 секунда – обычно этого достаточно, чтобы Telegram прислал все сообщения альбома, но при этом не слишком долгий таймаут для бота.
Архитектура команд
Что происходит, когда все обновления собраны и передаются в dispatchCommands
?
Входит диспетчер:
interface Command {
suspend fun execute(update: Update) = Unit
suspend fun execute(updates: List<Update>) = Unit
suspend fun sizeOverrideExecute(updates: List<Update>) =
if (updates.size == 1) execute(updates.first()) else execute(updates)
fun customTrigger(update: Update): Boolean = false
fun customTrigger(updates: List<Update>): Boolean = false
fun filterChain(update: Update): Boolean = true
}
Каждая команда:
- Проверяет, стоит ли ей срабатывать (
customTrigger
);
- Выполняет своё дело (
execute
);
- Может фильтровать доступ по правам (
filterChain
).
Пример простой команды:
class Test(
override val triggerName: String
) : AdminCommand() {
override suspend fun execute(update: Update) {
telegramClient.execute(
SendMessage(
update.message.chatId.toString(),
update.extractText()
)
)
}
}
Расширяемость
Добавить новую команду — просто:
val dispatcher = CommandDispatcher(
listOf(
EchoCommand(),
PhotoAlbumCommand(),
// Сюда можно добавлять свои
)
)
Корутинные плюсы
Использование Kotlin-корутин даёт боту важные преимущества. Корутины позволяют писать «синхронно выглядящий» код, который на самом деле выполняется асинхронно, не блокируя поток. Это значит, что долгие операции (доступ к БД, сетевые запросы, ожидание нескольких сообщений) не «замораживают» основное приложение. Корутины очень лёгкие: их можно запускать тысячами на одном потоке, поскольку они используют suspension (приостановку) вместо тяжёлой блокировки потоков. В нашем коде, например, MediaGroupCollector
создаёт корутину для каждой группы медиа, а в этот момент основной поток приёма обновлений продолжает работать.
Корутины также встроенно поддерживают отмену. Если корутина ждёт в delay
или выполняет какую-то suspend-операцию, её можно отменить, и она завершится с исключением CancellationException
. Мы используем это через withTimeout
: при превышении лимита время выполнения «всплывает» как исключение и текущая корутина прерывается. При масштабировании бота можно было бы отменять корутины (например, все накопительные задачи) при завершении работы приложения вызовом cancelAll()
.
Кратко, почему всё это работает как часы?
delay(1000)
не блокирует поток;
- Можно запускать сотни параллельных
Job
на обработку;
withTimeout
автоматически отменяет медленные задачи;
newSuspendedTransaction
не ломает данные, используя suspend
.
Работа со scope
В коде передается CoroutineScope(Dispatchers.Default)
. Это означает, что все корутины запускаются в одном общем контексте с фоновыми потоками. По умолчанию, если не указывать свой Job
, используются воркеры диспетчера, распределяющие корутины по потокам. Такой подход упрощает дизайн, но в более сложных приложениях можно было бы использовать структурированную конкуренцию – создавать один «корневой» scope на уровне приложения и порождать дочерние scope для задач. Это позволило бы централизованно отменять все корутины при выходе. Тем не менее, даже без этого текущая архитектура эффективна для однопоточного приёма и многопоточной обработки.
Заключение
Такой бот — это не просто эхо-ответчик, а основа для сложного, масштабируемого ассистента. Его можно дополнить:
- Кэшами (например, в memory или Redis);
- Админ-командами;
- Логами;
- Отправкой сообщений в очередь для асинхронной обработки.
Сама архитектура позволяет легко это сделать. Главное — правильное использование корутин, асинхронных коллекторов, timeout-защиты и структурной маршрутизации команд.