Better way of error handling in Kafka Consumer

We Are Going To Discuss About Better way of error handling in Kafka Consumer. So lets Start this Java Article.

Better way of error handling in Kafka Consumer

  1. Better way of error handling in Kafka Consumer

    With the first approach, it is not necessary to use a DeadLetterPublishingRecoverer, you can use any ConsumerRecordRecoverer that you want; in fact the default recoverer simply logs the failed message.

  2. Better way of error handling in Kafka Consumer

    With the first approach, it is not necessary to use a DeadLetterPublishingRecoverer, you can use any ConsumerRecordRecoverer that you want; in fact the default recoverer simply logs the failed message.

Solution 1

With the first approach, it is not necessary to use a DeadLetterPublishingRecoverer, you can use any ConsumerRecordRecoverer that you want; in fact the default recoverer simply logs the failed message.

/**
 * Construct an instance with the default recoverer which simply logs the record after
 * the backOff returns STOP for a topic/partition/offset.
 * @param backOff the {@link BackOff}.
 * @since 2.3
 */
public SeekToCurrentErrorHandler(BackOff backOff) {
    this(null, backOff);
}

And, in the FailedRecordTracker

if (recoverer == null) {
    this.recoverer = (rec, thr) -> {
        
        ...

        logger.error(thr, "Backoff "
            + (failedRecord == null
                ? "none"
                : failedRecord.getBackOffExecution())
            + " exhausted for " + ListenerUtils.recordToString(rec));
    };
}

Backoff (and a limit to retries) was added to the error handler after adding retry in the listener adapter, so it’s “newer” (and preferred).

Also, using in-memory retry can cause issues with rebalancing if long BackOffs are employed.

Finally, only the SeekToCurrentErrorHandler can deal with deserialization problems (via the ErrorHandlingDeserializer).

EDIT

Use the ErrorHandlingDeserializer together with a SeekToCurrentErrorHandler. Deserialization exceptions are considered fatal and the recoverer is called immediately.

See the documentation.

Here is a simple Spring Boot application that demonstrates it:

public class So63236346Application {


    private static final Logger log = LoggerFactory.getLogger(So63236346Application.class);


    public static void main(String[] args) {
        SpringApplication.run(So63236346Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so63236346").partitions(1).replicas(1).build();
    }

    @Bean
    ErrorHandler errorHandler() {
        return new SeekToCurrentErrorHandler((rec, ex) -> log.error(ListenerUtils.recordToString(rec, true) + "\n"
                + ex.getMessage()));
    }

    @KafkaListener(id = "so63236346", topics = "so63236346")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("so63236346", "{\"field\":\"value1\"}");
            template.send("so63236346", "junk");
            template.send("so63236346", "{\"field\":\"value2\"}");
        };
    }

}
package com.example.demo;

public class Thing {

    private String field;

    public Thing() {
    }

    public Thing(String field) {
        this.field = field;
    }

    public String getField() {
        return this.field;
    }

    public void setField(String field) {
        this.field = field;
    }

    @Override
    public String toString() {
        return "Thing [field=" + this.field + "]";
    }

}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.demo.Thing

Result

Thing [field=value1]
2020-08-10 14:30:14.780 ERROR 78857 --- [o63236346-0-C-1] com.example.demo.So63236346Application   : [email protected]
Listener failed; nested exception is org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[106, 117, 110, 107]] from topic [so63236346]
2020-08-10 14:30:14.782  INFO 78857 --- [o63236346-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-so63236346-1, groupId=so63236346] Seeking to offset 8 for partition so63236346-0
Thing [field=value2]

Original Author Of This Content

Solution 2

The expectation was to log any exception that we might get at the container level as well as the listener level.

Without retrying, following is the way I have done error handling:-

If we encounter any exception at the container level, we should be able to log the message payload with the error description and seek that offset and skip it and go ahead receiving the next offset. Though it is done only for DeserializationException, the rest of the exceptions also needs to be seek and offsets needs to be skipped for them.

@Component
public class KafkaContainerErrorHandler implements ErrorHandler {

    private static final Logger logger = LoggerFactory.getLogger(KafkaContainerErrorHandler.class);

    @Override
    public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
        String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];

        // modify below logic according to your topic nomenclature
        String topics = s.substring(0, s.lastIndexOf('-'));
        int offset = Integer.parseInt(s.split("offset ")[1]);
        int partition = Integer.parseInt(s.substring(s.lastIndexOf('-') + 1).split(" at")[0]);

        logger.error("...")
        TopicPartition topicPartition = new TopicPartition(topics, partition);
        logger.info("Skipping {} - {} offset {}",  topics, partition, offset);
        consumer.seek(topicPartition, offset + 1);
    }

    @Override
    public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord) {

    }
}


 factory.setErrorHandler(kafkaContainerErrorHandler);

If we get any exception at the @KafkaListener level, then I am configuring my listener with my custom error handler and logging the exception with the message as can be seen below:-

@Bean("customErrorHandler")
    public KafkaListenerErrorHandler listenerErrorHandler() {
        return (m, e) -> {
            logger.error(...);
            return m;
        };
    }

Original Author edited Aug 10, 2020 at 17:37 Of This Content

Conclusion

So This is all About This Tutorial. Hope This Tutorial Helped You. Thank You.

Also Read,

Siddharth

I am an Information Technology Engineer. I have Completed my MCA And I have 4 Year Plus Experience, I am a web developer with knowledge of multiple back-end platforms Like PHP, Node.js, Python and frontend JavaScript frameworks Like Angular, React, and Vue.

Leave a Comment