@Loic, yes, what you describe is exactly what I had in mind.



@Sophie, can you elaborate a little bit?

First of all, I agree that it makes sense to maintain the two separate
callbacks for the ProductionExceptionHandler, since one of them is
specifically for serialization exceptions while the other is used for
everything/anything else.

What makes a serialization exception special compare to other errors that it's valuable to treat it differently? Why can we put "everything else" into a single bucket? By your train of though, should we not split out the "everything else" bucket into a different callback method for every different error? If no, why not, but only for serialization errors?

From what I believe to remember, historically, we added the ProductionExceptionHandler, and kinda just missed the serialization error case. And later, when we extended the handler we just could not re-use the existing callback as it was typed with `<byte[], byte[]>` and it would have been an incompatible change; so it was rather a workaround to add the second method to then handler, but not really intended design?


It's of course only my personal opinion that I believe a single callback method is simpler/cleaner compared to sticking with two, and adding the new exception type to make it backward compatible seems worth it. It also kinda introduces the same patter we use elsewhere (cf KIP-1036) what I actually think is an argument for introducing `RercordSerializationExcetpion`, to unify user experience across the board.

Would be great to hear from others about this point. It's not that I strongly object to having two methods, and I would not block this KIP on this question.



-Matthias


On 5/7/24 3:40 PM, Sophie Blee-Goldman wrote:
First of all, I agree that it makes sense to maintain the two separate
callbacks for the ProductionExceptionHandler, since one of them is
specifically for serialization exceptions while the other is used for
everything/anything else. I also think we can take advantage of this fact
to simplify things a bit and cut down on the amount of new stuff added to
the API by just adding a parameter to the #handleSerializationException
callback and use that to pass in the SerializationExceptionOrigin enum to
distinguish between key vs value. This way we wouldn't need to introduce
yet another exception type (the RecordSerializationException) just to pass
in this information.

Thoughts?

On Tue, May 7, 2024 at 8:33 AM Loic Greffier <loic.greff...@michelin.com>
wrote:

Hi Matthias,

To sum up with the ProductionExceptionHandler callback methods (106)
proposed changes.

A new method ProductionExceptionHandler#handle is added with the following
signature:

ProductionExceptionHandlerResponse handle(final ErrorHandlerContext
context, final ProducerRecord<?,?> record, final Exception exception);

The ProducerRecord<?,?> parameter has changed to accept a serialized or
non-serialized record.
Thus, the new ProductionExceptionHandler#handle method can handle both
production exception and serialization exception.

Both old ProductionExceptionHandler#handle and
ProductionExceptionHandler#handleSerializationException methods are now
deprecated.
The old ProductionExceptionHandler#handle method gets a default
implementation, so users do not have to implement a deprecated method.

To handle backward compatibility, the new
ProductionExceptionHandler#handle method gets a default implementation.

default ProductionExceptionHandlerResponse handle(final
ErrorHandlerContext context, final ProducerRecord<?, ?> record, final
Exception exception) {
   if (exception instanceof RecordSerializationException) {
       this.handleSerializationException(record, exception.getCause());
   }

   return handle((ProducerRecord<byte[], byte[]>) record, exception);
}

The default implementation either invokes #handleSerializationException or
#handle depending on the type of the exception, thus users still relying on
deprecated ProductionExceptionHandler#handle
or ProductionExceptionHandler#handleSerializationException custom
implementations won't break.

The new ProductionExceptionHandler#handle method is now invoked in case of
serialization exception:

public <K, V> void send(final String topic, final K key, final V value,
...) {
     try {
         keyBytes = keySerializer.serialize(topic, headers, key);
         ...
     } catch (final ClassCastException exception) {
       ...
     } catch (final Exception exception) {

         try {
             response = productionExceptionHandler.handle(context,
record, new RecordSerializationException(SerializationExceptionOrigin.KEY,
exception));
         } catch (final Exception e) {
         ...
         }
     }
}

To wrap the origin serialization exception and determine whether it comes
from the key or the value, a new exception class is created:

public class RecordSerializationException extends SerializationException
{
     public enum SerializationExceptionOrigin {
         KEY,
         VALUE
     }

     public RecordSerializationException(SerializationExceptionOrigin
origin, final Throwable cause);
}

Hope it all makes sense,
Loïc


Reply via email to