Введение
Reactive programming (реактивное программирование) — это декларативная парадигма программирования, основанная на идее асинхронной обработки событий и потоков данных.
Apache Kafka — это распределённая система обмена сообщениями, которая в связке с Spring Boot и WebFlux позволяет реализовать реактивный, неблокирующий обмен данными между компонентами информационной системы.
В этой статье мы разберем как реализовать реактивные Kafka Producer/Consumer с использованием Spring Boot.
Необходимые зависимости
implementation 'io.projectreactor.kafka:reactor-kafka:1.3.19'
implementation 'org.springframework.kafka:spring-kafka:3.0.9'
implementation 'org.springframework.boot:spring-boot-starter-webflux:3.1.2'
reactor-kafka: позволяет использовать Project Reactor для реактивного взаимодействия с Apache Kafka.
spring-kafka: стандартная библиотека от Spring для Kafka.
spring-boot-starter-webflux: включает поддержку реактивного программирования в Spring Boot.
P.S. Версии зависимостей приведены для примера, в вашем случае они могут отличаться.
Конфигурация Отправителя сообщений Kafka Producer
@Configuration
public class ReactiveKafkaProducerConfig {
@Bean
public ReactiveKafkaProducerTemplate<String, MockClass> reactiveKafkaProducerTemplate(
KafkaProperties properties) {
Map<String, Object> props = properties.buildProducerProperties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new ReactiveKafkaProducerTemplate<>(SenderOptions.create(props));
}
}
ReactiveKafkaProducerTemplate — обёртка для асинхронной отправки сообщений в Kafka.
Используются сериализаторы StringSerializer и JsonSerializer для сериализации ключа и значения соответственно.
Producer подключается к Kafka-брокеру по адресу localhost:9092.
Конфигурация Получателя сообщений Kafka Consumer
@Bean
public ReceiverOptions<String, MockClass> kafkaReceiverOptions(
//Имя Kafka-топика берётся из переменной окружения/конфигурации.
//Kafka топик (topic) — это логическое имя для потока сообщений, которые Kafka хранит и раздаёт.
//Можно представить топик как канал, в который одни приложения пишут сообщения, а другие — читают.
@Value(value = "${CONSUMER_DTO_TOPIC}") String topic,
KafkaProperties kafkaProperties) {
Map<String, Object> config = new HashMap<>(); //Мап для настроек, которые мы передадим Kafka Consumer'у для инициализации.
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); //Указываем адрес Kafka-брокеров, к которым клиент будет подключаться.
//Определяет группу потребителей (Consumer Group), к которой принадлежит данный Consumer.
//Kafka гарантирует, что каждое сообщение будет обработано только одним Consumer'ом внутри группы.
//Все участники одной consumer group делят между собой части (партиции) топика. Топик может быть разбит на части (partitions), что позволяет обрабатывать сообщения параллельно.
//Kafka автоматически распределяет нагрузку между consumer. Если один consumer упал — Kafka перераспределит партиции другим в группе.
config.put(ConsumerConfig.GROUP_ID_CONFIG, "tune-it");
//Определяет, должен ли JsonDeserializer использовать метаданные (type info) из заголовков Kafka-сообщения для определения типа Java-объекта.
//в данном случае параметр = false и это значит, что мы не используем заголовки, а указываем тип явно
config.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false);
config.put(JsonDeserializer.TRUSTED_PACKAGES,"*"); //Указывает, какие Java-пакеты разрешены для десериализации из JSON (по сути настройка безопасности).
//Указывает класс, в который нужно десериализовать JSON из Kafka-сообщения, если не используется тип из заголовка
//в данном случае Kafka будет преобразовывать JSON в экземпляры класса ru.tuneit.model.MockClass.
//реализация данного класса в статье опущена, так это всего лишь пример.
//Важно отметить, что класс, в который вы будете десериализовать содержимое сообщений, должен быть сериализуемым
config.put(JsonDeserializer.VALUE_DEFAULT_TYPE,"ru.tuneit.model.MockClass");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); //Указывает, как десериализовать ключи сообщений из Kafka. В данном случае, ключи будут строками.
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); //Указывает, как десериализовать значения сообщений из Kafka. В данном случае JSON будет превращаться в Java-объекты.
// Определяет поведение, если у Consumer'а ещё нет сохранённого offset'а.
//offset - это уникальный номер отдельно взятого сообщения в партиции топика.
//В данном случае - читать с самого начала (все доступные сообщения). Полезно для тестов, чтобы получать все сообщения с начала.
// есть еще: "latest" — читать только новые сообщения, появившиеся после запуска.
// и "none" — выбросить ошибку, если offset не найден.
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ReceiverOptions<String, MockClass> basicReceiverOptions = ReceiverOptions.create(config);
return basicReceiverOptions.subscription(Collections.singletonList(topic));
}
@Bean
public ReactiveKafkaConsumerTemplate<String, MockClass> reactiveKafkaConsumerTemplate(
ReceiverOptions<String, MockClass> kafkaReceiverOptions) {
return new ReactiveKafkaConsumerTemplate<>(kafkaReceiverOptions);
}
Отправка сообщений Kafka Producer'ом
@Service
@RequiredArgsConstructor
public class MockProducerService {
private final ReactiveKafkaProducerTemplate<String, MockClass> reactiveKafkaProducerTemplate;
private static final Logger log = LoggerFactory.getLogger(MockProducerService.class);
public Mono<String> sendMock() {
MockClass mock = new MockClass("1", "some-data");
return reactiveKafkaProducerTemplate.send("mock-topic", mock)
.doOnSuccess(result -> log.info(
"Сообщение отправлено в Kafka: {} | offset: {}",
mock, result.recordMetadata().offset()
))
.doOnError(error -> log.error("Ошибка при отправке в Kafka: {}", error.getMessage()))
.thenReturn("Сообщение отправлено");
}
}
Метод sendMock() асинхронно публикует объект mock в топик mock-topic.
doOnSuccess() логирует смещение (offset) после успешной отправки.
doOnError() логирует ошибку, если что-то пошло не так
@RestController
@RequiredArgsConstructor
@RequestMapping("/api/mock-endpoint")
public class MockController {
private final MockProducerService producerService;
@PostMapping
public Mono<String> sendMock() {
return producerService.sendMock();
}
}
Реактивный Kafka Listener (обработка входящих сообщений)
@RequiredArgsConstructor
public class KafkaListenersExample {
private final ReactiveKafkaConsumerTemplate<String, MockClass> reactiveKafkaConsumerTemplate;
@EventListener(ApplicationStartedEvent.class)
public Flux<MockClass> startKafkaConsumer() {
return reactiveKafkaConsumerTemplate
.receiveAutoAck() // авто-подтверждение
//.delayElements(Duration.ofSeconds(2L)) // можно раскомментировать для backpressure (для управления скоростью обработки при высоком потоке данных)
.doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
consumerRecord.key(),
consumerRecord.value(),
consumerRecord.topic(),
consumerRecord.offset()))
.map(ConsumerRecord::value)
.doOnNext(mocker -> log.info("successfully consumed {}={}", MockClass.class.getSimpleName(), mocker))
.doOnError(throwable -> log.error("something bad happened while consuming : {}", throwable.getMessage()));
}
}
Не будет лишним пояснить некоторые моменты.
Метод receiveAutoAck() возвращает Flux<ConsumerRecord<K, V>>, который автоматически подтверждает оффсеты.
Что такое offset (оффсет) в Kafka?
Kafka сохраняет все сообщения в топиках как упорядоченный список, и каждому сообщению присваивается offset — уникальный номер (позиция в очереди).
Пример:
Топик: "mock-topic"
Сообщения:
Offset 0: {"id":1, "data":"some-data"}
Offset 1: {"id":2, "data":"you-are-fantastic"}
Offset 2: {"id":3, "data":"in-tune-it-we-trust"}
...
Когда консьюмер (Consumer) читает сообщение, он может:
подтвердить Kafka, что сообщение прочитано — это и есть подтверждение оффсета (acknowledgement).
или НЕ подтвердить — тогда Kafka может отправить это сообщение повторно (например, если произошло падение приложения).
Что делает .receiveAutoAck()?
Kafka Consumer сам подтверждает оффсет каждого сообщения после его получения.
Это значит, что:
Как только сообщение получено — Kafka считает его "прочитанным".
Это автоматический режим.
Альтернатива — receive() без AutoAck, в таком случае вы должны вручную управлять, когда подтверждать, а когда нет (например, если вы хотите подтверждать только после успешной обработки).
Преимущества receiveAutoAck():
- Просто.
- Подходит для большинства случаев, когда обработка сообщений "лёгкая" и не требует сложной логики повторной доставки.
Недостаток:
- Если приложение "упадёт" до обработки, Kafka всё равно считает сообщение прочитанным — вы потеряете это сообщение.
Ручное управление подтверждением оффсетов (manual ack)
Нужно, если вы хотите гарантировать, что Kafka считает сообщение прочитанным только после успешной обработки, а не сразу при получении.
Заменим receiveAutoAck() на receive():
public Flux<MockClass> startKafkaConsumer() {
return reactiveKafkaConsumerTemplate
.receive() // ручное подтверждение
.flatMap(consumerRecord -> {
MockClass mocker = consumerRecord.value();
return processMockObject(mocker) // ваша кастомная логика обработки
.doOnSuccess(result -> {
log.info("Processed object: {}", mocker);
consumerRecord.receiverOffset().acknowledge(); // ручное подтверждение оффсета
});
})
.doOnError(e -> log.error("Error while consuming message: {}", e.getMessage()));
}
receive() — теперь Kafka не подтверждает получение сообщений автоматически.
consumerRecord.receiverOffset().acknowledge() — вы сами говорите: "сообщение успешно обработано, можно подтвердить".
Если возникнет ошибка в processMockObject(), Kafka повторно доставит это сообщение (согласно настройки auto.offset.reset).
Бонус: Обработка с повторной попыткой (retry)
Можно добавить повторную попытку при ошибке:
.flatMap(consumerRecord -> {
return processMockObject(consumerRecord.value())
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))) // 3 попытки с задержкой
.doOnSuccess(result -> consumerRecord.receiverOffset().acknowledge());
})
Что такое delayElements и зачем он нужен?
Backpressure (обратное давление) — это механизм, позволяющий управлять скоростью обработки сообщений, чтобы ваша система не "захлебнулась" при большом потоке данных.
Представьте, что Kafka присылает 1000 сообщений в секунду, а ваше приложение может обрабатывать только 100.
Без контроля — система может выйти из строя, переполнится память, упадёт приложение.
Что делает delayElements(Duration.ofSeconds(2L))?
Эта строка говорит: "Обрабатывать по одному сообщению из потока каждые 2 секунды".
Это искусственная задержка между сообщениями — своего рода "тормоз".
Преимущества:
- Можно замедлить потребление, если, например, каждое сообщение вызывает сложную бизнес-логику, сторонние сервисы и т.д.
- Удобно в целях отладки/тестирования или при ручной реализации "плавного потребления".
Недостатки:
- Увеличивается общая задержка обработки.
- Неэффективно при большом объёме данных (можно потерять сообщения, если брокер настроен на удаление старых сообщений).
Вместо заключения
Спасибо за чтение этой статьи!
В ней мы рассмотрели как реализовать и использовать реактивные Kafka Producer и Consumer в Spring Boot и WebFlux.
Пользуясь возможностью, хочу сказать, что по мимо этой статьи, в нашем блоге есть еще очень много других не менее интересных статьей на разные темы из мира информационных технологий.
Приятного вам чтения и до новых встреч :)