Spring Boot based Telemetry Data Receiver API with Kafka Producer

In the bigdata world, a lot of data is generated through IoT sensors and telemetries emitted from various apps, websites and devices. In this post, we will develop a spring kafka based REST API that will act as a cloud receiving endpoint for the telemetry data.

We will consume data in json format with abbreviation, convert it to a data transfer object (DTO) while expanding the abbrevated fields and push the data to a kafka topic.

The abbreviation helps reduce the amout of data sent from edge systems. Although we can choose to directly push the data in abbreviated form to kafka, the downstream consumers might have a hard time later to make sense of the data. Hence we expand, this will save a lot of time to other business teams when they start consuming our data.

For this example, we assume we are ingesting website telemetry data particularly focussing on mouse clicks.

Project Structure

Will will have many sub packages within the main package. Here is a brief about them.

  • contants - Will hold the constants we use in the project and DTO
  • controller - Will hold the API endpoint defining classes
  • dto - Will hold the DTO, also called POJOs (plain old java object)
  • exception - Will hold the custom exceptions we define
  • producer - Will hold the producer files, it is Kafka in this example
  • service - Will hold the business logic of the application

The Application Class

The application class tends to be the entry point for any spring boot application.

package com.barrelsofdata.springexamples;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;

@SpringBootApplication
@ComponentScan(basePackages = "com.barrelsofdata.springexamples")
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

Kafka Producer

We using spring-kafka library to create the kafka producer. The autowired KafkaTemplate takes care of all the initializations and other maintainance. We set all the kafka properties in the gradle.properties file.

package com.barrelsofdata.springexamples.producer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

@Component
public class KafkaImpl implements Kafka {
    private static final Logger logger = LoggerFactory.getLogger(Kafka.class);

    @Autowired private KafkaTemplate kafkaTemplate;

    @Value("${spring.kafka.producer.topic}")
    private String topic;

    public void publish(String payload) throws KafkaException {
        ListenableFuture> future = kafkaTemplate.send(topic, payload); // Blocks call if kafka broker isn't available/responding
        future.addCallback(new ListenableFutureCallback>() {
            @Override
            public void onSuccess(SendResult result) {
                logger.info("Message published to Kafka partition {} with offset {}", result.getRecordMetadata().partition(), result.getRecordMetadata().offset());
            }
            @Override
            public void onFailure(Throwable ex) {
                logger.error("Unable to publish message {}", payload, ex);
            }
        });
    }
}

Note two very important things with the above implementation.

  • Although the send() method returns a future, it blocks the call while fetching metadata from the broker. If your broker has an issue, this call is blocked for 1 minute by default. You can make this method asynchronous yourself by using @Async annotation on the method, while also adding @EnableAsync annotation on the application class.
  • You can choose to make this entire operation blocking, so that you can return a success/fail status to the calling application. To do so, use send().get(). This will return a SendResult object instead of a future.

Define the DTO

We can now define the DTO, which we will push to Kafka. We will define two DTOs, nesting one within the other.

package com.barrelsofdata.springexamples.dto;

import com.fasterxml.jackson.annotation.JsonAlias;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

@ToString
@Setter
@Getter
public class EventDetailsDto {
    @JsonAlias("w")
    private Integer width;
    @JsonAlias("h")
    private Integer height;
    @JsonAlias("x")
    private Float x;
    @JsonAlias("y")
    private Float y;
}

And, nesting the above will be the request DTO.

package com.barrelsofdata.springexamples.dto;

import com.barrelsofdata.springexamples.constants.EventType;
import com.fasterxml.jackson.annotation.JsonAlias;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

import javax.validation.constraints.NotNull;
import java.sql.Timestamp;

@ToString
@Setter
@Getter
public class EventRequestDto {
    @NotNull
    @JsonAlias("ts")
    private Timestamp timestamp;
    @NotNull
    @JsonAlias("id")
    private Integer id;
    @NotNull
    @JsonAlias("ty")
    private EventType type;
    @JsonAlias("pl")
    private EventDetailsDto payload;
}

Do you see those extremely useful annotations like @ToString, @Getter, @Setter? Those are from a library called Lombok, which automatically adds all the said methods into the code. This makes your code look more elegant and remove all the development overhead.

Check out this guide to know about all the annotations.

Service Layer

Now that we have our DTOs and producer, lets wire them up with our business logic in the service class.

Our business logic is pretty simple, we convert the request DTO to json and use the kafka producer's publish method to ship our data.

package com.barrelsofdata.springexamples.service;

import com.barrelsofdata.springexamples.dto.EventRequestDto;
import com.barrelsofdata.springexamples.exception.JsonConversionException;
import com.barrelsofdata.springexamples.producer.Kafka;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.KafkaException;
import org.springframework.stereotype.Service;

@Service
public class TelemetryServiceImpl implements TelemetryService {
    private static final Logger logger = LoggerFactory.getLogger(TelemetryService.class);

    @Autowired private Kafka producer;

    @Autowired private ObjectMapper jsonMapper;

    @Override
    public void receiveTelemetry(EventRequestDto eventRequest) {
        try {
            String payload = jsonMapper.writeValueAsString(eventRequest);
            producer.publish(payload);
        } catch (JsonProcessingException e) {
            logger.error("Unable to convert message to json {}", eventRequest);
            throw new JsonConversionException("Failed json conversion");
        } catch (KafkaException e) {
            logger.error("Kafka exception for request {}", eventRequest);
            // TODO: Handle what you want to do with the data here
        }
    }
}

API Endpoint Controller

We will now create a "/telemetry" endpoint in the application to receive data from the website.

As a good practice, we clearly define we accept only json, produce json in response and accept only PUT method.

package com.barrelsofdata.springexamples.controller;

import com.barrelsofdata.springexamples.dto.EventRequestDto;
import com.barrelsofdata.springexamples.service.TelemetryService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

import javax.validation.Valid;

@RestController
public class TelemetryController {

    @Autowired private TelemetryService telemetryService;

    @PutMapping(
            value = "/telemetry",
            consumes = MediaType.APPLICATION_JSON_VALUE,
            produces = MediaType.APPLICATION_JSON_VALUE
    )
    public ResponseEntity receiveTelemetry(@RequestBody @Valid EventRequestDto eventRequest) {
        telemetryService.receiveTelemetry(eventRequest);
        return new ResponseEntity<>(HttpStatus.CREATED.getReasonPhrase(), HttpStatus.CREATED);
    }
}

Note that our json request is mapped to our EventRequest DTO object implicitly through jackson library. The @Valid annotation checks if the DTO conforms to the requirements such as mandatory fileds (through @NotNull annotation), min value, max value etc. Check out this link to know more.

Since we use JsonAlias() to let the mapper know what abbreviated field to map to which variable, and the variable names are in expanded format, during conversion to json in our service layer, the json payload would have expanded field names.

The Properties file

The spring application.properties file can be put in the src/main/resource directory for it to be picked up automatically, but that is not a good ops practice. So, I recommend using external properties file to run the application.

We place different properties file in the <project_root>/config directory and serve on of these to the application during runtime.

server.port=9080
spring.main.banner-mode=off
debug=false

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.topic=telemetry
spring.kafka.properties.retries=1
spring.kafka.properties.max.block.ms=500

logging.level.org.springframework=WARN
logging.level.org.apache.catalina=WARN
logging.level.org.apache.kafka=WARN

The full source code of the application along with readme can be found on git at https://git.barrelsofdata.com/barrelsofdata/spring-telemetry-receiver.

Running the application

You can run the application natively or using containers. I will detail both below.

Assuming you have kafka installed, let's start the zookeeper server and kafka brokers.

zkServer.sh start
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties --override broker.id=10 --override log.dirs=/tmp/kafka-logs/broker-10 --override port=9092
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties --override broker.id=20 --override log.dirs=/tmp/kafka-logs/broker-20 --override port=9093

We will now create the kafka topic.

kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 2 --partitions 2 --topic telemetry
Run with Native Java

You can now build and run the application as through the below commands.

./gradlew clean test bootJar
java -jar build/libs/spring-telemetry-receiver-1.0.jar --spring.config.location=config/dev.properties
# OR
./gradlew clean test bootRun -PjvmArgs="-D--spring.config.location=config/dev.properties"
Run as a Docker Container

First build the container image using bootBuildImage.

./gradlew clean test bootBuildImage

This will build the required image to run the application as a container.

Let's run the docker container while connecting it to the host network, since we are running the kafka brokers on the host.

docker run -itd --rm --network host --mount type=bind,source=$(pwd)/config/dev.properties,target=/application.properties,readonly -e JAVA_OPTS=-D--spring.config.location=/application.properties --name spring-telemetry-server spring-telemetry-receiver:1.0

Take a note of how I am serving the properites file though mount.

Generating and viewing telemetry

Now that we have all things set up and running, let's go ahead and try to publish some telemetry.

First let's bring up a kafka console consumer to view the telemetry data.

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic telemetry

Now let's hit out API endpoint with some data.

curl -i -X PUT 
  http://localhost:9080/telemetry/ 
  -H 'Cache-Control: no-cache' 
  -H 'Content-Type: application/json' 
  -d '{"ts":"1606297994000","id":"123","ty":"LEFT_MOUSE_BUTTON_CLICK","pl":{"x":1000,"y":5000,"w":213,"h":124}}'

The result.

Running telemetry data receiver as docker container
Telemetry data received in kafka consumer

You have now successfully built the telemetry receiver. Go ahead and make requests with various different scenarios to see how the endpoint reacts.

No development ends without unit tests, the next post covers unit testing this application using embedded kafka.

This project has a lot of issues in terms of code coverage and best practices. Checkout SonarQube analysis of the same, to know how to find and resolve them.