Error Handling for Custom Deserializer in Spring Kafka Project

How to solve infinite deserialization exception for custom deserializer in Kafka Spring Boot project?

Md Ariful Islam Rana
2 min readJul 6, 2022

Kafka is used for communicating among different microservices in our product. All are spring projects and use kafka via ‘spring-kafka’ library (version: 2.6.5). For transferring objects from one microservice to another, I wrote custom serializer and deserializer. In this topic, I’ll discuss about an error that happened in deserializing object. So, only deserializing related code will be provided here.

Overview

Let’s see the code of sample custom deserializer and related configuration of consumer factory. Note that, MessageDto is just a simple POJO class.

public class KafkaMessageDeserializer
implements Deserializer<MessageDto> {

@Override
public void configure(Map<String, ?> map, boolean b) {}

@Override
public MessageDto deserialize(String s, byte[] bytes) {
if (bytes == null) {
return null;
}
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.readValue(new String(bytes,
StandardCharsets.UTF_8), MessageDto.class);
} catch (Exception e) {
throw new SerializationException("Error when
deserializing byte[] to messageDto");
}
}

@Override
public void close() {}
}

And consumer factory looks like this:

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value(value = "${hcs.kafka.bootstrapAddress}")
public String bootstrapAddress;

@Bean
public ConsumerFactory<String, MessageDto> consumerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
configs.put(ConsumerConfig.GROUP_ID_CONFIG,
Topics.HCS_KAFKA_GROUP);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
KafkaMessageDeserializer.class);
return new DefaultKafkaConsumerFactory<>(configs);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, MessageDto> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MessageDto>
factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}

Problem

Now, After sending message through kafka, a deserialization exception happened. Deserialization may fail in many reasons. In my case, it fails due to problem in `deserialize` method in `KafkaMessageDeserializer`. Errors in log looks like this:

2022-07-05 14:08:31 PM ERROR [org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer] - Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:145)
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1427)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1124)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition hcs_main_topic_send-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error when deserializing byte[] to messageDto

Solution

There is a clue in the log which is — please consider configuring an ‘ErrorHandlingDeserializer’ in the value and/or key deserializer.

Now, it’s very simple to fix this problem. We need to add org.springframework.kafka.support.serializer.ErrorHandlingDeserializer class to handle exception in our custom value deserializer class.

public class KafkaMessageDeserializer 
extends ErrorHandlingDeserializer<MessageDto>
implements Deserializer<MessageDto> {}

Just extending ErrorHandlingDeserializer will solve this infinite deserialization exception problem. Note that, it will not solve the deserialization problem, but it will solve the infinite error logs shown in the application console/log file. For my case, to fix the deserialization problem I have to change code of `deserialize` method in `KafkaMessageDeserializer`.

I hope someone will find this post helpful! Cheers!

--

--