We Are Going To Discuss About Better way of error handling in Kafka Consumer. So lets Start this Java Article.
Better way of error handling in Kafka Consumer
- Better way of error handling in Kafka Consumer
With the first approach, it is not necessary to use a
DeadLetterPublishingRecoverer
, you can use anyConsumerRecordRecoverer
that you want; in fact the default recoverer simply logs the failed message. - Better way of error handling in Kafka Consumer
With the first approach, it is not necessary to use a
DeadLetterPublishingRecoverer
, you can use anyConsumerRecordRecoverer
that you want; in fact the default recoverer simply logs the failed message.
Solution 1
With the first approach, it is not necessary to use a DeadLetterPublishingRecoverer
, you can use any ConsumerRecordRecoverer
that you want; in fact the default recoverer simply logs the failed message.
/**
* Construct an instance with the default recoverer which simply logs the record after
* the backOff returns STOP for a topic/partition/offset.
* @param backOff the {@link BackOff}.
* @since 2.3
*/
public SeekToCurrentErrorHandler(BackOff backOff) {
this(null, backOff);
}
And, in the FailedRecordTracker
…
if (recoverer == null) {
this.recoverer = (rec, thr) -> {
...
logger.error(thr, "Backoff "
+ (failedRecord == null
? "none"
: failedRecord.getBackOffExecution())
+ " exhausted for " + ListenerUtils.recordToString(rec));
};
}
Backoff (and a limit to retries) was added to the error handler after adding retry in the listener adapter, so it’s “newer” (and preferred).
Also, using in-memory retry can cause issues with rebalancing if long BackOff
s are employed.
Finally, only the SeekToCurrentErrorHandler
can deal with deserialization problems (via the ErrorHandlingDeserializer
).
EDIT
Use the ErrorHandlingDeserializer
together with a SeekToCurrentErrorHandler
. Deserialization exceptions are considered fatal and the recoverer is called immediately.
See the documentation.
Here is a simple Spring Boot application that demonstrates it:
public class So63236346Application {
private static final Logger log = LoggerFactory.getLogger(So63236346Application.class);
public static void main(String[] args) {
SpringApplication.run(So63236346Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so63236346").partitions(1).replicas(1).build();
}
@Bean
ErrorHandler errorHandler() {
return new SeekToCurrentErrorHandler((rec, ex) -> log.error(ListenerUtils.recordToString(rec, true) + "\n"
+ ex.getMessage()));
}
@KafkaListener(id = "so63236346", topics = "so63236346")
public void listen(String in) {
System.out.println(in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("so63236346", "{\"field\":\"value1\"}");
template.send("so63236346", "junk");
template.send("so63236346", "{\"field\":\"value2\"}");
};
}
}
package com.example.demo;
public class Thing {
private String field;
public Thing() {
}
public Thing(String field) {
this.field = field;
}
public String getField() {
return this.field;
}
public void setField(String field) {
this.field = field;
}
@Override
public String toString() {
return "Thing [field=" + this.field + "]";
}
}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.demo.Thing
Result
Thing [field=value1]
2020-08-10 14:30:14.780 ERROR 78857 --- [o63236346-0-C-1] com.example.demo.So63236346Application : [email protected]
Listener failed; nested exception is org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[106, 117, 110, 107]] from topic [so63236346]
2020-08-10 14:30:14.782 INFO 78857 --- [o63236346-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-so63236346-1, groupId=so63236346] Seeking to offset 8 for partition so63236346-0
Thing [field=value2]
Original Author Of This Content
Solution 2
The expectation was to log any exception that we might get at the container level as well as the listener level.
Without retrying, following is the way I have done error handling:-
If we encounter any exception at the container level, we should be able to log the message payload with the error description and seek that offset and skip it and go ahead receiving the next offset. Though it is done only for DeserializationException, the rest of the exceptions also needs to be seek and offsets needs to be skipped for them.
@Component
public class KafkaContainerErrorHandler implements ErrorHandler {
private static final Logger logger = LoggerFactory.getLogger(KafkaContainerErrorHandler.class);
@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
// modify below logic according to your topic nomenclature
String topics = s.substring(0, s.lastIndexOf('-'));
int offset = Integer.parseInt(s.split("offset ")[1]);
int partition = Integer.parseInt(s.substring(s.lastIndexOf('-') + 1).split(" at")[0]);
logger.error("...")
TopicPartition topicPartition = new TopicPartition(topics, partition);
logger.info("Skipping {} - {} offset {}", topics, partition, offset);
consumer.seek(topicPartition, offset + 1);
}
@Override
public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord) {
}
}
factory.setErrorHandler(kafkaContainerErrorHandler);
If we get any exception at the @KafkaListener level, then I am configuring my listener with my custom error handler and logging the exception with the message as can be seen below:-
@Bean("customErrorHandler")
public KafkaListenerErrorHandler listenerErrorHandler() {
return (m, e) -> {
logger.error(...);
return m;
};
}
Original Author edited Aug 10, 2020 at 17:37 Of This Content
Conclusion
So This is all About This Tutorial. Hope This Tutorial Helped You. Thank You.