null

Использование type mapping при передаче Kafka сообщений между сервисами

Современные подходы к разработке предполагают создание и поддержку сложных систем, состоящих их множества разных сервисов, каждый из которых отвечает за свою часть необходимой функциональсти. Зачастую в типичном бизнес-процессе зайдействованы сразу несколько сервисов, поэтому необходимости обмена информацией между ними не избежать. Для организации асинхронного обмена сообщениями между сервисами обычно используется брокер сообщений Apache Kafka, ставший уже, можно сказать, стандартом для решения подобных задач. Но сейчас не о нем как таковом, а о способах десериализации объектов, переданных из другого сервиса. Как consumer понимает, объект какого типа ему прислал producer в своем сообщении?

Конечно можно сериализовать объект в JSON-строку перед передачей в другой микросервис, а по получении десериализовать ее в коде самостоятельно. Можно также создать отдельную библиотеку, в которой описать единый формат DTO, и использовать ее во всех сервисах при работе с сообщениями. Однако в этой небольшой заметке хотел показать еще один менее известный способ, который по личному опыту знаком не всем и часто остается в тени.

Допустим, есть два сервиса. Оба написаны на Kotlin с использованием фреймворка Spring. Первый отвечает за какую-либо логику (producer), второй - обычный сервис для отправки уведомлений, который принимает команды на отправку писем (consumer). 

На стороне первого сервиса есть класс с информацией по письму, которое необходимо отправить. Пусть для демонстрации он будет иметь следующий вид:

data class SendEmailCommand(
    val subject: String,
    val body: String,
    val recipients: List<String>
)

И есть простейший сервис, который указанную выше "команду" отправляет в кафку (topic в контексте статьи нам не важен)

@Service
class EmailSenderService {
    @Autowired
    private lateinit var kafkaMessageProperties: KafkaMessageProperties
    @Autowired
    private lateinit var kafkaTemplate: KafkaTemplate<String, Any>

    fun sendEmail(sendEmailCommand: SendEmailCommand) {
        kafkaTemplate.send(
            kafkaMessageProperties.sendEmailCommand.topic,
            sendEmailCommand
        )
    }
}

Однако если мы попробуем обработать это событие в consumer, то скорее всего получим ошибку, связанную с тем, что объект класса  com.tuneit.model.kafka.message.SendEmailCommand не распознан, что неудивительно, поскольку такой класс находится в другом приложении. Чтобы этого избежать, через свойства Spring-приложения мы можем задать type mapping - маппинги для автоматического отображения объектов того или иного класса. Вот фрагмент из application.yml:

spring:
  kafka:
    producer:
      acks: all
      retries: 3
      properties:
        spring:
          json:
            type:
              mapping: emailMessage:com.tuneit.model.kafka.message.SendEmailCommand

Внутри mapping мы видим строку

 emailMessage:com.tuneit.model.kafka.message.SendEmailCommand

Ее суть в том, что мы задаем метку emailMessage классу com.tuneit.model.kafka.message.SendEmailCommand. То есть теперь мы можем идентифицировать нужный нам класс не через иерархию пакетов, которая отличается в разных сервисах, а через указанную метку. В consumer сервисе мы указываем класс, который соответсвет метке emailMessage уже в рамках сервиса уведомлений. Делается это через свойство kafka consumer в процессе конфигурации Kafka (строка 14):

@Configuration
class KafkaConfiguration {

    @Bean(KafkaListenerAnnotationBeanPostProcessor.DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME)
    fun listenerContainerFactory(
        kafkaProperties: KafkaProperties,
        objectMapper: ObjectMapper,
    ): ConcurrentKafkaListenerContainerFactory<String, Any> {

        val jsonDeserializer = JsonDeserializer<Any>(objectMapper)
        val properties = kafkaProperties.buildConsumerProperties()
        properties[JsonDeserializer.TRUSTED_PACKAGES] = "*"
        properties[JsonDeserializer.TYPE_MAPPINGS] =
            "emailMessage:com.tuneit.notification.model.kafka.message.SendEmailCommand"

        return ConcurrentKafkaListenerContainerFactory<String, Any>().apply {
            consumerFactory = DefaultKafkaConsumerFactory(
                properties,
​​​​​​​                StringDeserializer(),
                jsonDeserializer
            )
        }
    }
}

Теперь наш сервис уведомлений понимает, что под меткой emailMessage с его стороны имеется в виду класс com.tuneit.notification.model.kafka.message.SendEmailCommand и он будет способен десериализовать объект этого класса. В итоге, когда мы задали отображение класса producer'а на класс consumer'а через "emailMessage", сервис уведомлений будет готов слушать соответствующие сообщения из брокера:

@Service
class SendEmailCommandListener(
    val properties: KafkaMessageProperties,
    private val handler: SendEmailCommandHandler,
) {
    @KafkaListener(topics = ["#{__listener.properties.sendEmailCommand.topic}"])
    fun receive(command: SendEmailCommand) {
        handler.handle(command)
    }
}

Безусловно детали реализации взаимодействия между сервисами зависят от архитектуры системы, специфических требований и ограничений, а также договоренностей и привычек внутри команды. Однако описанный способ может быть удобен и порой позволяет избежать лишнего кода по десериализации объектов.

Next