Тестируем Kafka в интеграционных тестах Spring Boot

В последнее время Kafka стала настоящим трендом в сфере решений распределенных логов. Люди используют Кафку по разному. Кто-то, как простой log. Кто-то реализует поверх Кафки очередь, или транспорт для event-based системы. А особенно серьезные люди используют Кафку для хранения данных. Не важно зачем Kafka нужна вам. Важно, что вам в любом случае придётся писать для этого интеграционные тесты. Так давайте же как раз об этом и поговорим.

Сразу скажу про стек, о котором пойдёт речь. Это самый новый Spring boot версии 2.1 (важно, что в случае 2.0 что-то уже работать не будет; в последней версии был задепрекейчен класс KafkaEmbedded — вместо него теперь нужно использовать EmbeddedKafkaRule).

Что вообще нужно сделать, для того чтобы писать компонентные (интеграционные) тесты вашего Spring Boot приложения, которое работает с Кафкой? На самом деле, крайне мало. Спринг замечательно интегрирован с Кафкой, как в разрезе боевого кода, так и в разрезе тестов. Парни из Pivotal сделали Embedded Kafka, которую мы и заиспользуем.

Итак, делаем следующее:

  1. Подымаем контекст приложения. Тут есть несколько вариантов — будет ли поднят веб-сервер, или нет, какие контексты вы хотите подымать, и так далее. Всё это конфигурируется, и нормально описано в документации — https://docs.spring.io/spring-boot/docs/current/reference/html/boot-features-testing.html. Главная аннотация в этом деле — @SpringBootTest. Она, собственно, и позволяет сказать, какие конфигурации нужно подымать, и будет ли поднять веб-сервер.
  2. Запустить при старте тестов встроенный Kafka Broker — именно тут нам и нужна Embedded Kafka. Всё, что нужно сделать для этого — создать Bean класса EmbeddedKafkaRule в тестовой конфигурации. Тут вы зададите адрес, где будет слушать Брокер, а также топики, которые нужно создать по умолчанию.
  3. Создать своих Консьюмеров (Лисенеров) и Продьюсеров. Тут всё стандартно, в EmbeddedKafkaRule вам нужен метод getEmbeddedKafka().

Почти полный пример можно найти тут — https://stackoverflow.com/a/48756251/1756750, или тут — https://github.com/ayortanli/kafka-with-springboot.

Какие есть проблемы у Встроенной Кафки при тестировании? Я столкнулся с двумя проблемами. Во-первых, это @DirtiesContext. Хотя в документации говорится, что EmbeddedKafka будет запущена один раз на все тесты, у меня в каждом новом тестовом классе Брокер подымается заново. Есть даже обсуждение на гитхабе этой проблемы — https://github.com/spring-projects/spring-kafka/issues/666.

Вторая по очереди, но первая по гадкости трабла — это некоректная работа Consumers, если они подписываются на топик с помощью Подписки:

consumer.subscribe("test_topic");

В этом случае, если один Consumer пытается подписаться на топик, на который он уже подписан, то всё зависает на секунд 40, на моменте Partitions Rebalance. Спасает тут использование KafkaConsumer.assign. Так как, скорей всего, у нас в тестовой конфигурации всегда 1 партиция, то мы можем жоско прибить потребителя данных к данному топику к нулевой партиции.

Кстати, про этот баг тоже был разговор на гитхабе — https://github.com/spring-projects/spring-kafka/pull/538/files. Там написано, что баг якобы починили, но у меня это поведение воспроизводится уже с новым патчем.

В заключении, давайте обсудим, можно ли не использовать EmbeddedKafka, но при этом мочь писать нормальные компонентные тесты? Ответ — конечно же, да. У нас есть тест-контейнеры, про которые нам каждый день рассказывает Сергей Егоровhttps://github.com/testcontainers/testcontainers-java/blob/master/docs/usage/kafka_containers.md.

Что стоит выбрать вам? Сильно зависит от вашей инфраструктуры: далеко не везде получится на CI запустить контейнеры, и там уже выбор будет очевиден. Но если вы уже завязаны на Testcontainers, то можно смело использовать их и для Кафки.

Категории: Программирование