Kafka
Kafka Integration with Spring Boot
Kafka is a distributed event store with stream processing message,handles low latency platforms for real time data feeds.Kafka allows users to subscribe and publish to the kafka broker.
To integrate kafka with spring boot,follow the below steps.
Dependencies
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Add the above spring boot kafka dependency to pom.xml
,the <version/>
tag is not required because it is taken from the spring boot’s parent pom.xml
.Update the project after adding the dependency.
Overview
To implement the integration of spring boot with apache kafka,let us implement it in microservices architecture, a separate service for user,account and sms, which communicate each other over microservices.
- User Service: Registering a user profile in a bank.
- Sms Service: Send SMS on successful user registration (implemented with Twilio)
- Account Service: To create an account for the registered user.
Producer/Publisher: User Service Consumer/Subscriber: SMS Service, Account Service.
Upon successful creation of user profile, a message event is sent to the Kafka server, this message event is consumed by both the consumers/subscribers here account and sms service. The message is consumed by both the service for their implementation.
-
User Microservice
-
Upon hitting the
POST
REST API endpoint(using Swagger/POSTMAN), a user profile data is persisted on the sql server. -
Message event: persisted data of the user is sent to the kafka server,here the user microservice is the publisher/producer of the project.
-
-
Account Microservice
- Message event published to the kafka broker by the user microservice is consumed by account microservice, using
KafkaListener
annotation,the object is retreived using theConsumerRecord
class along withObject Mapper
class to retreive only one attribute from the list of attributes usingJsonNode
, after retreiving a attributeemail
from the user message event from the kafka server,an account is created and persisted on the database.
- Message event published to the kafka broker by the user microservice is consumed by account microservice, using
-
SMS Microservice 1. Message event published to the kafka broker by the user microservice is also consumed/subscribed by the SMS microservice to send an SMS using
Twilio SDK
.
This implementation is to demonstrate the example of one producer with multiple consumers.
Producer/Publisher
Here the producer/publisher is the user microservice.
spring.kafka.producer.bootstrap-servers = localhost:9092
spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.StringSerializer
Add the above properties in the spring boot user service application,defining the type of key and value of the kafka message event,here defined as String
. A message to the kafka server has to be sent in the binary format, the user object should be serialized into binary format and then transmitted to the kafka broker.
Here we are defining the key and value types to be both String Serializer
and String Deserializer
.
localhost:9092
is the address of the kakfa server running on port 9092
.
public class JsonSerializer<T> implements Serializer{
@Override
public byte[] serialize(String topic, Object data) {
byte[] byteValue = null;
ObjectMapper mapper = new ObjectMapper();
try {
byteValue = mapper.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return byteValue;
}
}
Here we implement a custom json serializer to serialize the object to send to the kafka cluster.
@Service
@RequiredArgsConstructor
public class UserService {
private final UsersRepository userRepository;
public ResponseDto newUser(UserDto dto) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
@SuppressWarnings({ "unchecked", "rawtypes", "resource" })
Producer<String, Users> producer = new KafkaProducer<>(properties, new StringSerializer(),
new JsonSerializer());
Users users = new Users();
BeanUtils.copyProperties(dto, users);
ProducerRecord<String, Users> records = new ProducerRecord<>("usertopic", users);
producer.send(records);
userRepository.save(users);
return ResponseDto.builder().message("user registration successfull").httpStatusCode(200).build();
}
}
-
Properties
: class fromjava.util
package which is subclass of hashtable,which maintains a list of values where the key and value both are strings.Using the properties class to configure the kafka server with the ip address, herelocalhost:9092
. -
KafkaProducer
: a kafka client that publishes records to the kafka cluster,here creating a newProducer
object of the typeKafkaProducer
with theproperties
,also defining thekey:serializer
andvalue:serializer
.
Producer<String, Users> producer = new KafkaProducer<>(properties, new StringSerializer(),
new JsonSerializer());
Here the key is String
and value is Users
,hence the serializer type will be StringSerializer for key and JsonSerializer for value(the custom json serializer which we implemented above).
- `ProducerRecord`: Defining the producer record,topic name,key and value.
- Controller code snippet to test the `POST` endpoint,having the "@RequestMapping("/api/v1")"
```java
@PostMapping("/users")
public ResponseEntity<ResponseDto> newUser(@Valid @RequestBody UserDto userDto) {
return new ResponseEntity<>(userService.newUser(userDto),HttpStatus.ACCEPTED);
}
```
Consumers/Subscriber
Account Microservice
Add the same spring kafka dependency to other microservices as well.
spring.kafka.consumer.bootstrap-servers = localhost:9092
spring.kafka.consumer.group-id = userId
spring.kafka.consumer.auto-offset-reset = earliest
spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.StringDeserializer
Above are few properties defined for consumer side.Note the difference between spring.kafka.producer
on the producer side and spring.kafka.consumer
on the consumer side.Offset
is a position within a partition for the next message to be sent to a consumer
@Service
@RequiredArgsConstructor
@Slf4j
public class AccountService {
private final AccountsRepository accountsRepository;
@KafkaListener(topics = "usertopic",groupId = "accountgroupid")
public void consume(ConsumerRecord<String, Object> records) throws IOException {
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.readTree(records.value().toString());
String email = node.get("email").asText();
SecureRandom secureRandom = new SecureRandom();
Accounts accounts = new Accounts();
accounts.setAccountBalance(1000.00);
accounts.setAccountName("Savings");
accounts.setAccountNumber(Math.abs(secureRandom.nextLong()));
accounts.setEmail(email);
accountsRepository.save(accounts);
log.info(String.format("Order event received in stock service => %s", records.toString()));
}
}
-
@KafkaListener
: allows a method to consume messages from Kafka topic(s),designates a method as a listener in a KafkaMessageListenerContainer.A KafkaMessageListenerContainer is how Spring Boot connects and polls records from Kafka under the hood. -
ObjectMapper & JsonNode
: To convert Java object into JsonNode & vice versa in Jackson. -
Fetching the
Consumer Record
from kafka cluster,using Object mapper and JsonNode to retreive onlyemail
attribute from the object and doing necessary operation
public class JsonDeserializer<T> implements Deserializer<Object>{
public JsonDeserializer(Class<T> type) {
this.type = type;
}
public JsonDeserializer() {
}
private Class<T> type;
@Override
public Object deserialize(String topic, byte[] data) {
ObjectMapper mapper = new ObjectMapper();
T object = null;
try {
object = mapper.readValue(data, type);
} catch (IOException e) {
e.printStackTrace();
}
return object;
}
}
Custom Json Deserializer implementation to deserialize the object from the Kafka cluster.
SMS Service
Add the below properties to application.properties
of the SMS microservice.
server.port=9008
spring.kafka.consumer.bootstrap-servers = localhost:9092
spring.kafka.consumer.group-id = userId
spring.kafka.consumer.auto-offset-reset = earliest
spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.StringDeserializer
Add the below Twilio SDK
dependency to SMS microservice’s pom.xml
to send SMS using TWILIO sms provider.
<dependency>
<groupId>com.twilio.sdk</groupId>
<artifactId>twilio</artifactId>
<version>8.8.0</version>
</dependency>
@Slf4j
@Service
public class SmsService {
@KafkaListener(groupId = "sms", topics = "usertopic")
public void consumeUserAndSendSms(ConsumerRecord<String, Object> records)
throws JsonMappingException, JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.readTree(records.value().toString());
String email = node.get("email").asText();
String phoneNumber=node.get("phoneNumber").asText();
log.info(email);
String fromNumber = System.getenv("TWILIO_NUMBER");
Twilio.init(System.getenv("TWILIO_ACCOUNT_SID"), System.getenv("TWILIO_AUTH_TOKEN"));
Message.creator(new PhoneNumber("+91"+phoneNumber), new PhoneNumber(fromNumber),
"Your profile has been created successfully at ABC Bank with email " + email).create();
}
}
Setup few system environment variables like
- TWILIO_NUMBER
: Number provided by Twilio
- TWILIO_ACCOUNT_SID
: Twilio Account side
- TWILIO_AUTH_TOKEN
: Authentication token
Run Up
-
Start the Zookeeper & Kafka Server.
Extract the binary of the Kafka and start the zookeeper server first with its properties file and then later start the kafka server with
server.properties
file -
Setup environment variables:
MYSQL_USERNAME
: user name of the mysql server
MYSQL_PASSWORD
: password for the mysql user
MYSQL_URL
: url and port number of the mysql server connection
TWILIO_NUMBER
: Mobile number given by twilio
TWILIO_ACCOUNT_SID
: Twilio account SID
TWILIO_AUTH_TOKEN
: Authentication Token provided by Twilio
-
Execute the API endpoint, to produce the message event to Kafka, and persist the data to the database.Hit the
POST
API of the User Service to persist a new User. -
Once the user registration is complete, a response body with user registration successful and the message event is produced to the Kafka server.
{
"message": "user registration successful",
"httpStatusCode": 200
}
- The produced message event from User service is sent to Kafka server and this message event is consumed by two services, Account service and SMS service. Account service creates an account, with the reference from user as email. SMS service sends a message to the registered mobile number with the message for successful profile generation.