Spring Kafka Unit Test with Embedded Kafka

In our previous post, we saw how we can build a telemetry receiver using spring boot and publish the message to kafka using spring kafka.

We covered most of the development in there and in this post, we will be concentrating more on writing unit tests for the same, by using an embedded kafka broker within the application.

JUnit 5 Jupiter will be our choice of library for unit tests along with Mockito and spring-kafka-test dependency.

Test Properties

To begin with, we need to provide a properties file to spring boot for the test to run. In this we set the spring, kafka and logging related properties.

server.port=9080
spring.main.banner-mode=off
debug=false

spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
spring.kafka.producer.topic=telemetryTest
spring.kafka.properties.retries=1
spring.kafka.properties.max.block.ms=500

logging.level.org.springframework=WARN
logging.level.org.apache.catalina=WARN
logging.level.org.apache.kafka=INFO
logging.level.com.barrelsofdata=DEBUG

Observe how we define the broker url using embedded kafka property and not our own.

The Code

We will first look at the entire test code of an integration test, covering all unit tests is redundant in the scope of this post. You can go over all the other tests in the repository.

@SpringBootTest
@ExtendWith(SpringExtension.class)
@EmbeddedKafka
@AutoConfigureMockMvc
public class ApplicationIntegrationTest {
    private int NUMBER_OF_BROKERS = 2;
    private boolean CONTROLLER_SHUTDOWN = true;
    private int NUMBER_OF_PARTITIONS = 2;
    @Value("${spring.kafka.producer.topic}")
    private String TOPIC;

    @Autowired private TelemetryService telemetryService;

    @Autowired private MockMvc mockMvc;

    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker = new EmbeddedKafkaBroker(NUMBER_OF_BROKERS, CONTROLLER_SHUTDOWN, NUMBER_OF_PARTITIONS, TOPIC);

    private BlockingQueue> records;
    private KafkaMessageListenerContainer container;

    @BeforeEach
    void setUp() {
        Map consumerConfigs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
        DefaultKafkaConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory<>(consumerConfigs, new StringDeserializer(), new StringDeserializer());
        ContainerProperties containerProperties = new ContainerProperties(TOPIC);
        container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
        records = new LinkedBlockingQueue<>();
        container.setupMessageListener((MessageListener) records::add);
        container.start();
        ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
    }

    @AfterEach
    void tearDown() {
        container.stop();
    }

    @ParameterizedTest(name = "Integration: API request success")
    @CsvSource(value = {
            "{"ts":"1606297994000","id":"123","ty":"LEFT_MOUSE_BUTTON_CLICK","pl":{"x":1000,"y":5000,"w":213,"h":124}};{"timestamp":"2020-11-25T09:53:14.000+00:00","id":123,"type":"LEFT_MOUSE_BUTTON_CLICK","payload":{"width":213,"height":124,"x":1000.0,"y":5000.0}}",
            "{"ts":"1606297994000","id":"123","ty":"RIGHT_MOUSE_BUTTON_CLICK"};{"timestamp":"2020-11-25T09:53:14.000+00:00","id":123,"type":"RIGHT_MOUSE_BUTTON_CLICK","payload":null}"}
            , delimiter = ';')
    public void success(String inputJson, String kafkaJson) throws Exception {
        HttpHeaders headers = new HttpHeaders();

        mockMvc.perform(
                put("/telemetry")
                        .contentType(MediaType.APPLICATION_JSON)
                        .content(inputJson)
                        .headers(headers))
                .andExpect(
                        status().isCreated()
                )
                .andExpect(
                        content().contentType(MediaType.APPLICATION_JSON)
                )
                .andExpect(
                        content().string(HttpStatus.CREATED.getReasonPhrase())
                );

        Thread.sleep(1000);
        ConsumerRecord singleRecord = records.poll(100, TimeUnit.MILLISECONDS);
        Assertions.assertThat(singleRecord).isNotNull();
        Assertions.assertThat(singleRecord.key()).isNull();
        Assertions.assertThat(singleRecord.value()).isEqualTo(kafkaJson);
    }

    @ParameterizedTest(name = "Integration: API request success")
    @CsvSource(value = {
            "{"ts":"1606297994000","id":"123","ty":"LEFT_MOUSE_BUTTON_CLICK","pl":{"x":1000,"y":5000,"w":213,"h":124}};{"timestamp":"2020-11-25T09:53:14.000+00:00","id":123,"type":"LEFT_MOUSE_BUTTON_CLICK","payload":{"width":213,"height":124,"x":1000.0,"y":5000.0}}",
            "{"ts":"1606297994000","id":"123","ty":"RIGHT_MOUSE_BUTTON_CLICK"};{"timestamp":"2020-11-25T09:53:14.000+00:00","id":123,"type":"RIGHT_MOUSE_BUTTON_CLICK","payload":null}"}
            , delimiter = ';')
    public void unsupportedMedia(String inputJson, String kafkaJson) throws Exception {
        HttpHeaders headers = new HttpHeaders();

        mockMvc.perform(
                put("/telemetry")
                        .contentType(MediaType.APPLICATION_JSON)
                        .content(inputJson)
                        .headers(headers))
                .andExpect(
                        status().isCreated()
                )
                .andExpect(
                        content().contentType(MediaType.APPLICATION_JSON)
                )
                .andExpect(
                        content().string(HttpStatus.CREATED.getReasonPhrase())
                );

        Thread.sleep(1000);
        ConsumerRecord singleRecord = records.poll(100, TimeUnit.MILLISECONDS);
        Assertions.assertThat(singleRecord).isNotNull();
        Assertions.assertThat(singleRecord.key()).isNull();
        Assertions.assertThat(singleRecord.value()).isEqualTo(kafkaJson);
    }
}

Let's quickly get through the annotations first and then speak about everything else.

  • @SpringBootTest - Marks the class as a test class to run during test.
  • @ExtendWith(SpringExtension.class) - Adds support to run JUnit 5 Jupiter tests. If you go with JUnit 4, you can avoid this annotation.
  • @EmbeddedKafka - Let's us use an embedded kafka instance while running unit tests.
  • @AutoConfigureMockMvc - Creates a web layer for us to test hitting our API endpoints and checking responses. Not related to Kafka in any way, but since we are running an integration test, it is required.

Embedded Kafka Broker

Let us now create an embedded Kafka broker while providing inputs of how many brokers to simulate, how many partitions to have and what topics to create.

    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker = new EmbeddedKafkaBroker(NUMBER_OF_BROKERS, CONTROLLER_SHUTDOWN, NUMBER_OF_PARTITIONS, TOPIC);

Set up and tear down Consumer / Producer

Now that we have a broker, we can set up a consumer to listen to published data produced by the producer.

    @BeforeEach
    void setUp() {
        Map consumerConfigs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
        DefaultKafkaConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory<>(consumerConfigs, new StringDeserializer(), new StringDeserializer());
        ContainerProperties containerProperties = new ContainerProperties(TOPIC);
        container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
        records = new LinkedBlockingQueue<>();
        container.setupMessageListener((MessageListener) records::add);
        container.start();
        ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
    }

    @AfterEach
    void tearDown() {
        container.stop();
    }

We can do the same for producer, but since we are using a KafkaTemplate in our producer class, the producer is automatically injected.

If you need the producer to be set up as well, here is the reference code. But we will not use it in our use case.

Map producerConfigs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
Producer producer = new DefaultKafkaProducerFactory<>(producerConfigs, new StringSerializer(), new StringSerializer()).createProducer();

Testing

Now that we have created embedded brokers, create a consumer and have the producer autowired in our producer class, let's write the code for testing.

    @CsvSource(value = {
            "{"ts":"1606297994000","id":"123","ty":"LEFT_MOUSE_BUTTON_CLICK","pl":{"x":1000,"y":5000,"w":213,"h":124}};{"timestamp":"2020-11-25T09:53:14.000+00:00","id":123,"type":"LEFT_MOUSE_BUTTON_CLICK","payload":{"width":213,"height":124,"x":1000.0,"y":5000.0}}",
            "{"ts":"1606297994000","id":"123","ty":"RIGHT_MOUSE_BUTTON_CLICK"};{"timestamp":"2020-11-25T09:53:14.000+00:00","id":123,"type":"RIGHT_MOUSE_BUTTON_CLICK","payload":null}"}
            , delimiter = ';')
    public void success(String inputJson, String kafkaJson) throws Exception {
        HttpHeaders headers = new HttpHeaders();

        mockMvc.perform(
                put("/telemetry")
                        .contentType(MediaType.APPLICATION_JSON)
                        .content(inputJson)
                        .headers(headers))
                .andExpect(
                        status().isCreated()
                )
                .andExpect(
                        content().contentType(MediaType.APPLICATION_JSON)
                )
                .andExpect(
                        content().string(HttpStatus.CREATED.getReasonPhrase())
                );

        Thread.sleep(1000);
        ConsumerRecord singleRecord = records.poll(100, TimeUnit.MILLISECONDS);
        Assertions.assertThat(singleRecord).isNotNull();
        Assertions.assertThat(singleRecord.key()).isNull();
        Assertions.assertThat(singleRecord.value()).isEqualTo(kafkaJson);
    }

Observe we are providing input to the test method as a CSV line. There are two lines and each line has two fields delimited by a ";".

The first field is the mouse click event input, in abbreviated form, we send to our REST API endpoint and the second field is the output, in expanded form, we expect to be published to kafka.

We hit our endpoint with the input and listen to the published message using the Kafka consumer.

We also pause for a second to give enough time for the message to get published (this happens within few milliseconds) and then poll the topic to get the published message.

The MockMvc part is all about simulating the request as if we are sending it from actual web. The line 87-90 are all about sending the HTTP PUT request and lines following that are about checking the API response.

End notes

The source code of this application can be found at https://git.barrelsofdata.com/barrelsofdata/spring-telemetry-receiver.

You do not need to have kafka installed on your machine to run these tests, just clone the repository and run the below command from the project root.

./gradlew test

The generated test report file will be available at build/reports/tests/test/index.html.

Spring kafka unit test result