null

Реактивный обмен сообщениями через Kafka в Spring Boot

Введение

Reactive programming (реактивное программирование) — это декларативная парадигма программирования, основанная на идее асинхронной обработки событий и потоков данных.

Apache Kafka — это распределённая система обмена сообщениями, которая в связке с Spring Boot и WebFlux позволяет реализовать реактивный, неблокирующий обмен данными между компонентами информационной системы.

В этой статье мы разберем как реализовать реактивные Kafka Producer/Consumer с использованием Spring Boot.

Необходимые зависимости

implementation 'io.projectreactor.kafka:reactor-kafka:1.3.19'
implementation 'org.springframework.kafka:spring-kafka:3.0.9'
implementation 'org.springframework.boot:spring-boot-starter-webflux:3.1.2'

reactor-kafka: позволяет использовать Project Reactor для реактивного взаимодействия с Apache Kafka.

spring-kafka: стандартная библиотека от Spring для Kafka.

spring-boot-starter-webflux: включает поддержку реактивного программирования в Spring Boot.

P.S. Версии зависимостей приведены для примера, в вашем случае они могут отличаться.

Конфигурация Отправителя сообщений Kafka Producer

@Configuration
public class ReactiveKafkaProducerConfig {

    @Bean
    public ReactiveKafkaProducerTemplate<String, MockClass> reactiveKafkaProducerTemplate(
            KafkaProperties properties) {
        Map<String, Object> props = properties.buildProducerProperties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        return new ReactiveKafkaProducerTemplate<>(SenderOptions.create(props));
    }
}

ReactiveKafkaProducerTemplate — обёртка для асинхронной отправки сообщений в Kafka.

Используются сериализаторы StringSerializer и JsonSerializer для сериализации ключа и значения соответственно.

Producer подключается к Kafka-брокеру по адресу localhost:9092.

Конфигурация Получателя сообщений Kafka Consumer

@Bean
public ReceiverOptions<String, MockClass> kafkaReceiverOptions(
    //Имя Kafka-топика берётся из переменной окружения/конфигурации.
    //Kafka топик (topic) — это логическое имя для потока сообщений, которые Kafka хранит и раздаёт. 
    //Можно представить топик как канал, в который одни приложения пишут сообщения, а другие — читают.
        @Value(value = "${CONSUMER_DTO_TOPIC}") String topic,
        KafkaProperties kafkaProperties) {
    
    Map<String, Object> config = new HashMap<>();  //Мап для настроек, которые мы передадим Kafka Consumer'у для инициализации.
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); //Указываем адрес Kafka-брокеров, к которым клиент будет подключаться.

    //Определяет группу потребителей (Consumer Group), к которой принадлежит данный Consumer.
    //Kafka гарантирует, что каждое сообщение будет обработано только одним Consumer'ом внутри группы.
    //Все участники одной consumer group делят между собой части (партиции) топика. Топик может быть разбит на части (partitions), что позволяет обрабатывать сообщения параллельно.
    //Kafka автоматически распределяет нагрузку между consumer. Если один consumer упал — Kafka перераспределит партиции другим в группе.
    config.put(ConsumerConfig.GROUP_ID_CONFIG, "tune-it");
    //Определяет, должен ли JsonDeserializer использовать метаданные (type info) из заголовков Kafka-сообщения для определения типа Java-объекта.
    //в данном случае параметр = false и это значит, что мы не используем заголовки, а указываем тип явно
    config.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false);
    config.put(JsonDeserializer.TRUSTED_PACKAGES,"*"); //Указывает, какие Java-пакеты разрешены для десериализации из JSON (по сути настройка безопасности).
   
    //Указывает класс, в который нужно десериализовать JSON из Kafka-сообщения, если не используется тип из заголовка 
    //в данном случае Kafka будет преобразовывать JSON в экземпляры класса ru.tuneit.model.MockClass.
    //реализация данного класса в статье опущена, так это всего лишь пример. 
    //Важно отметить, что класс, в который вы будете десериализовать содержимое сообщений, должен быть сериализуемым 
    config.put(JsonDeserializer.VALUE_DEFAULT_TYPE,"ru.tuneit.model.MockClass");

    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); //Указывает, как десериализовать ключи сообщений из Kafka. В данном случае, ключи будут строками. 
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); //Указывает, как десериализовать значения сообщений из Kafka. В данном случае JSON будет превращаться в Java-объекты.

    // Определяет поведение, если у Consumer'а ещё нет сохранённого offset'а. 
    //offset - это уникальный номер отдельно взятого сообщения в партиции топика.
    //В данном случае - читать с самого начала (все доступные сообщения). Полезно для тестов, чтобы получать все сообщения с начала.
    // есть еще: "latest" — читать только новые сообщения, появившиеся после запуска.
    // и "none" — выбросить ошибку, если offset не найден.
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    ReceiverOptions<String, MockClass> basicReceiverOptions = ReceiverOptions.create(config);
    return basicReceiverOptions.subscription(Collections.singletonList(topic));
}

@Bean
public ReactiveKafkaConsumerTemplate<String, MockClass> reactiveKafkaConsumerTemplate(
        ReceiverOptions<String, MockClass> kafkaReceiverOptions) {
    
    return new ReactiveKafkaConsumerTemplate<>(kafkaReceiverOptions);
}

Отправка сообщений Kafka Producer'ом

@Service
@RequiredArgsConstructor
public class MockProducerService {

    private final ReactiveKafkaProducerTemplate<String, MockClass> reactiveKafkaProducerTemplate;
    private static final Logger log = LoggerFactory.getLogger(MockProducerService.class);

 public Mono<String> sendMock() {
        MockClass mock = new MockClass("1", "some-data");

        return reactiveKafkaProducerTemplate.send("mock-topic", mock)
            .doOnSuccess(result -> log.info(
                "Сообщение отправлено в Kafka: {} | offset: {}",
                mock, result.recordMetadata().offset()
            ))
            .doOnError(error -> log.error("Ошибка при отправке в Kafka: {}", error.getMessage()))
            .thenReturn("Сообщение отправлено"); 
    }
}

Метод sendMock() асинхронно публикует объект mock в топик mock-topic.

doOnSuccess() логирует смещение (offset) после успешной отправки.

doOnError() логирует ошибку, если что-то пошло не так

@RestController
@RequiredArgsConstructor
@RequestMapping("/api/mock-endpoint")
public class MockController {

    private final MockProducerService producerService;

    @PostMapping
    public Mono<String> sendMock() {
        return producerService.sendMock(); 
    }
}

Реактивный Kafka Listener (обработка входящих сообщений)

@RequiredArgsConstructor
public class KafkaListenersExample {

    private final ReactiveKafkaConsumerTemplate<String, MockClass> reactiveKafkaConsumerTemplate;

 
    @EventListener(ApplicationStartedEvent.class)
    public Flux<MockClass> startKafkaConsumer() {
        return reactiveKafkaConsumerTemplate
                .receiveAutoAck() // авто-подтверждение
                //.delayElements(Duration.ofSeconds(2L)) // можно раскомментировать для backpressure (для управления скоростью обработки при высоком потоке данных)
                .doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
                        consumerRecord.key(),
                        consumerRecord.value(),
                        consumerRecord.topic(),
                        consumerRecord.offset()))
                .map(ConsumerRecord::value)
                .doOnNext(mocker -> log.info("successfully consumed {}={}", MockClass.class.getSimpleName(), mocker))
                .doOnError(throwable -> log.error("something bad happened while consuming : {}", throwable.getMessage()));
    }
}

Не будет лишним пояснить некоторые моменты.

Метод receiveAutoAck() возвращает Flux<ConsumerRecord<K, V>>, который автоматически подтверждает оффсеты.

Что такое offset (оффсет) в Kafka?

Kafka сохраняет все сообщения в топиках как упорядоченный список, и каждому сообщению присваивается offset — уникальный номер (позиция в очереди).

Пример:

Топик: "mock-topic"
Сообщения:
Offset 0: {"id":1, "data":"some-data"}
Offset 1: {"id":2, "data":"you-are-fantastic"}
Offset 2: {"id":3, "data":"in-tune-it-we-trust"}
...

Когда консьюмер (Consumer) читает сообщение, он может:

подтвердить Kafka, что сообщение прочитано — это и есть подтверждение оффсета (acknowledgement).

или НЕ подтвердить — тогда Kafka может отправить это сообщение повторно (например, если произошло падение приложения).

Что делает .receiveAutoAck()?

Kafka Consumer сам подтверждает оффсет каждого сообщения после его получения.

Это значит, что:

Как только сообщение получено — Kafka считает его "прочитанным".

Это автоматический режим.

Альтернатива — receive() без AutoAck, в таком случае вы должны вручную управлять, когда подтверждать, а когда нет (например, если вы хотите подтверждать только после успешной обработки).

Преимущества receiveAutoAck():

  • Просто.
  • Подходит для большинства случаев, когда обработка сообщений "лёгкая" и не требует сложной логики повторной доставки.

Недостаток:

  • Если приложение "упадёт" до обработки, Kafka всё равно считает сообщение прочитанным — вы потеряете это сообщение.

Ручное управление подтверждением оффсетов (manual ack)

Нужно, если вы хотите гарантировать, что Kafka считает сообщение прочитанным только после успешной обработки, а не сразу при получении.

Заменим receiveAutoAck() на receive():

public Flux<MockClass> startKafkaConsumer() {
    return reactiveKafkaConsumerTemplate
            .receive() // ручное подтверждение
            .flatMap(consumerRecord -> {
                 MockClass mocker = consumerRecord.value();

                return processMockObject(mocker) // ваша кастомная логика обработки
                        .doOnSuccess(result -> {
                            log.info("Processed object: {}", mocker);
                            consumerRecord.receiverOffset().acknowledge(); // ручное подтверждение оффсета
                        });
            })
            .doOnError(e -> log.error("Error while consuming message: {}", e.getMessage()));
}

receive() — теперь Kafka не подтверждает получение сообщений автоматически.

consumerRecord.receiverOffset().acknowledge() — вы сами говорите: "сообщение успешно обработано, можно подтвердить".

Если возникнет ошибка в processMockObject(), Kafka повторно доставит это сообщение (согласно настройки auto.offset.reset).

Бонус: Обработка с повторной попыткой (retry)

Можно добавить повторную попытку при ошибке:

.flatMap(consumerRecord -> {
    return processMockObject(consumerRecord.value())
        .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))) // 3 попытки с задержкой
        .doOnSuccess(result -> consumerRecord.receiverOffset().acknowledge());
})

Что такое delayElements и зачем он нужен?

Backpressure (обратное давление) — это механизм, позволяющий управлять скоростью обработки сообщений, чтобы ваша система не "захлебнулась" при большом потоке данных.

Представьте, что Kafka присылает 1000 сообщений в секунду, а ваше приложение может обрабатывать только 100.

Без контроля — система может выйти из строя, переполнится память, упадёт приложение.

Что делает delayElements(Duration.ofSeconds(2L))?

Эта строка говорит: "Обрабатывать по одному сообщению из потока каждые 2 секунды".

Это искусственная задержка между сообщениями — своего рода "тормоз".

Преимущества:

  • Можно замедлить потребление, если, например, каждое сообщение вызывает сложную бизнес-логику, сторонние сервисы и т.д.
  • Удобно в целях отладки/тестирования или при ручной реализации "плавного потребления".

Недостатки:

  • Увеличивается общая задержка обработки.
  • Неэффективно при большом объёме данных (можно потерять сообщения, если брокер настроен на удаление старых сообщений).

Вместо заключения

Спасибо за чтение этой статьи!

В ней мы рассмотрели как реализовать и использовать реактивные Kafka Producer и Consumer в Spring Boot и WebFlux.

Пользуясь возможностью, хочу сказать, что по мимо этой статьи, в нашем блоге есть еще очень много других не менее интересных статьей на разные темы из мира информационных технологий.

Приятного вам чтения и до новых встреч :)

Next