Kafka

Kafka Integration with Spring Boot

Kafka is a distributed event store with stream processing message,handles low latency platforms for real time data feeds.Kafka allows users to subscribe and publish to the kafka broker.

GitLab Repository Link

To integrate kafka with spring boot,follow the below steps.

Dependencies

<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
</dependency>

Add the above spring boot kafka dependency to pom.xml,the <version/> tag is not required because it is taken from the spring boot’s parent pom.xml.Update the project after adding the dependency.

Overview

To implement the integration of spring boot with apache kafka,let us implement it in microservices architecture, a separate service for user,account and sms, which communicate each other over microservices.

- User Service: Registering a user profile in a bank.

- Sms Service: Send SMS on successful user registration (implemented with Twilio)

- Account Service: To create an account for the registered user.

Producer/Publisher: User Service Consumer/Subscriber: SMS Service, Account Service.

Upon successful creation of user profile, a message event is sent to the Kafka server, this message event is consumed by both the consumers/subscribers here account and sms service. The message is consumed by both the service for their implementation.

  • User Microservice

    1. Upon hitting the POST REST API endpoint(using Swagger/POSTMAN), a user profile data is persisted on the sql server.

    2. Message event: persisted data of the user is sent to the kafka server,here the user microservice is the publisher/producer of the project.

  • Account Microservice

    1. Message event published to the kafka broker by the user microservice is consumed by account microservice, using KafkaListener annotation,the object is retreived using the ConsumerRecord class along with Object Mapper class to retreive only one attribute from the list of attributes using JsonNode, after retreiving a attribute email from the user message event from the kafka server,an account is created and persisted on the database.
  • SMS Microservice 1. Message event published to the kafka broker by the user microservice is also consumed/subscribed by the SMS microservice to send an SMS using Twilio SDK.

This implementation is to demonstrate the example of one producer with multiple consumers.

Producer/Publisher

Here the producer/publisher is the user microservice.

user service application.properties
spring.kafka.producer.bootstrap-servers = localhost:9092
spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.StringSerializer

Add the above properties in the spring boot user service application,defining the type of key and value of the kafka message event,here defined as String. A message to the kafka server has to be sent in the binary format, the user object should be serialized into binary format and then transmitted to the kafka broker. Here we are defining the key and value types to be both String Serializer and String Deserializer.

localhost:9092 is the address of the kakfa server running on port 9092.

JsonSerializer.java
public  class JsonSerializer<T> implements Serializer{

	@Override
	public byte[] serialize(String topic, Object data) {

		byte[] byteValue = null;
		ObjectMapper mapper = new ObjectMapper();
		try {
			byteValue = mapper.writeValueAsBytes(data);
		} catch (JsonProcessingException e) {
			e.printStackTrace();
		}
		return byteValue;
	}

}

Here we implement a custom json serializer to serialize the object to send to the kafka cluster.

UserService.java(Kafka Producer)
@Service
@RequiredArgsConstructor
public class UserService {

	private final UsersRepository userRepository;

	public ResponseDto newUser(UserDto dto) {

		Properties properties = new Properties();
		properties.put("bootstrap.servers", "localhost:9092");

		@SuppressWarnings({ "unchecked", "rawtypes", "resource" })
		Producer<String, Users> producer = new KafkaProducer<>(properties, new StringSerializer(),
				new JsonSerializer());
		Users users = new Users();
		BeanUtils.copyProperties(dto, users);
		ProducerRecord<String, Users> records = new ProducerRecord<>("usertopic", users);
		producer.send(records);
		userRepository.save(users);
		
		return ResponseDto.builder().message("user registration successfull").httpStatusCode(200).build();
	}
}
  • Properties: class from java.util package which is subclass of hashtable,which maintains a list of values where the key and value both are strings.Using the properties class to configure the kafka server with the ip address, here localhost:9092.

  • KafkaProducer: a kafka client that publishes records to the kafka cluster,here creating a new Producer object of the type KafkaProducer with the properties,also defining the key:serializer and value:serializer.

Producer<String, Users> producer = new KafkaProducer<>(properties, new StringSerializer(),
				new JsonSerializer());

Here the key is String and value is Users,hence the serializer type will be StringSerializer for key and JsonSerializer for value(the custom json serializer which we implemented above).

- `ProducerRecord`: Defining the producer record,topic name,key and value.

- Controller code snippet to test the `POST` endpoint,having the "@RequestMapping("/api/v1")"

```java
@PostMapping("/users")
public ResponseEntity<ResponseDto> newUser(@Valid @RequestBody UserDto userDto) {
	return new ResponseEntity<>(userService.newUser(userDto),HttpStatus.ACCEPTED);
}
```

Consumers/Subscriber

Account Microservice

Add the same spring kafka dependency to other microservices as well.

account microservice application.properties
spring.kafka.consumer.bootstrap-servers = localhost:9092
spring.kafka.consumer.group-id = userId
spring.kafka.consumer.auto-offset-reset = earliest
spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.StringDeserializer

Above are few properties defined for consumer side.Note the difference between spring.kafka.producer on the producer side and spring.kafka.consumer on the consumer side.Offset is a position within a partition for the next message to be sent to a consumer

AccountService.java
@Service
@RequiredArgsConstructor
@Slf4j
public class AccountService {

	private final AccountsRepository accountsRepository;

	@KafkaListener(topics = "usertopic",groupId = "accountgroupid")
	public void consume(ConsumerRecord<String, Object> records) throws IOException {

		ObjectMapper mapper = new ObjectMapper();
		JsonNode node = mapper.readTree(records.value().toString());
		String email = node.get("email").asText();

		SecureRandom secureRandom = new SecureRandom();
		Accounts accounts = new Accounts();
		accounts.setAccountBalance(1000.00);
		accounts.setAccountName("Savings");
		accounts.setAccountNumber(Math.abs(secureRandom.nextLong()));
		accounts.setEmail(email);

		accountsRepository.save(accounts);
		log.info(String.format("Order event received in stock service => %s", records.toString()));
	}
}
  1. @KafkaListener: allows a method to consume messages from Kafka topic(s),designates a method as a listener in a KafkaMessageListenerContainer.A KafkaMessageListenerContainer is how Spring Boot connects and polls records from Kafka under the hood.

  2. ObjectMapper & JsonNode: To convert Java object into JsonNode & vice versa in Jackson.

  3. Fetching the Consumer Record from kafka cluster,using Object mapper and JsonNode to retreive only email attribute from the object and doing necessary operation

JsonDeserializer.java

public class JsonDeserializer<T> implements Deserializer<Object>{

	
	public JsonDeserializer(Class<T> type) {
		this.type = type;
	}
	public JsonDeserializer() {
		}

	private Class<T> type;

	@Override
	public Object deserialize(String topic, byte[] data) {
		ObjectMapper mapper = new ObjectMapper();
		T object = null;
		try {
			object = mapper.readValue(data, type);
		} catch (IOException e) {
			e.printStackTrace();
		}
		return object;
	}
	
}

Custom Json Deserializer implementation to deserialize the object from the Kafka cluster.

SMS Service

Add the below properties to application.properties of the SMS microservice.

server.port=9008
spring.kafka.consumer.bootstrap-servers = localhost:9092
spring.kafka.consumer.group-id = userId
spring.kafka.consumer.auto-offset-reset = earliest
spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.StringDeserializer

Add the below Twilio SDK dependency to SMS microservice’s pom.xml to send SMS using TWILIO sms provider.

twilio sdk dependency
		<dependency>
			<groupId>com.twilio.sdk</groupId>
			<artifactId>twilio</artifactId>
			<version>8.8.0</version>
		</dependency>
SmsService.java
@Slf4j
@Service
public class SmsService {

	@KafkaListener(groupId = "sms", topics = "usertopic")
	public void consumeUserAndSendSms(ConsumerRecord<String, Object> records)
			throws JsonMappingException, JsonProcessingException {
		
		ObjectMapper mapper = new ObjectMapper();
		JsonNode node = mapper.readTree(records.value().toString());
		String email = node.get("email").asText();
		String phoneNumber=node.get("phoneNumber").asText();

		log.info(email);

		String fromNumber = System.getenv("TWILIO_NUMBER");
		Twilio.init(System.getenv("TWILIO_ACCOUNT_SID"), System.getenv("TWILIO_AUTH_TOKEN"));
		Message.creator(new PhoneNumber("+91"+phoneNumber), new PhoneNumber(fromNumber),
				"Your profile has been created successfully at ABC Bank with email " + email).create();
	}

}

Setup few system environment variables like - TWILIO_NUMBER: Number provided by Twilio - TWILIO_ACCOUNT_SID: Twilio Account side - TWILIO_AUTH_TOKEN: Authentication token

Run Up

  1. Start the Zookeeper & Kafka Server.

    Extract the binary of the Kafka and start the zookeeper server first with its properties file and then later start the kafka server with server.properties file

  2. Setup environment variables:

MYSQL_USERNAME: user name of the mysql server

MYSQL_PASSWORD: password for the mysql user

MYSQL_URL : url and port number of the mysql server connection

TWILIO_NUMBER: Mobile number given by twilio

TWILIO_ACCOUNT_SID: Twilio account SID

TWILIO_AUTH_TOKEN: Authentication Token provided by Twilio

  1. Execute the API endpoint, to produce the message event to Kafka, and persist the data to the database.Hit the POST API of the User Service to persist a new User.

  2. Once the user registration is complete, a response body with user registration successful and the message event is produced to the Kafka server.

{
  "message": "user registration successful",
  "httpStatusCode": 200
}
  1. The produced message event from User service is sent to Kafka server and this message event is consumed by two services, Account service and SMS service. Account service creates an account, with the reference from user as email. SMS service sends a message to the registered mobile number with the message for successful profile generation.