cadonna commented on code in PR #18739: URL: https://github.com/apache/kafka/pull/18739#discussion_r2111048449
########## streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java: ########## @@ -147,4 +147,38 @@ public interface ErrorHandlerContext { * @return The timestamp. */ long timestamp(); + + /** + * Return the non-deserialized byte[] of the input message key if the context has been triggered by a message. + * + * <p> If this method is invoked within a {@link Punctuator#punctuate(long) + * punctuation callback}, or while processing a record that was forwarded by a punctuation + * callback, it will return null. Review Comment: nit: ```suggestion * callback, it will return {@code null}. ``` ########## streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java: ########## @@ -147,4 +147,38 @@ public interface ErrorHandlerContext { * @return The timestamp. */ long timestamp(); + + /** + * Return the non-deserialized byte[] of the input message key if the context has been triggered by a message. + * + * <p> If this method is invoked within a {@link Punctuator#punctuate(long) + * punctuation callback}, or while processing a record that was forwarded by a punctuation + * callback, it will return null. + * + * <p> If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent + * to the repartition topic. + * + * <p> Always returns null if this method is invoked within a + * ProductionExceptionHandler.handle(ErrorHandlerContext, ProducerRecord, Exception) + * + * @return the raw byte of the key of the source message + */ + byte[] sourceRawKey(); + + /** + * Return the non-deserialized byte[] of the input message value if the context has been triggered by a message. + * + * <p> If this method is invoked within a {@link Punctuator#punctuate(long) + * punctuation callback}, or while processing a record that was forwarded by a punctuation + * callback, it will return null. Review Comment: nit: ```suggestion * callback, it will return {@code null}. ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/RecordContext.java: ########## @@ -110,4 +110,37 @@ public interface RecordContext { */ Headers headers(); + /** + * Return the non-deserialized byte[] of the input message key if the context has been triggered by a message. + * + * <p> If this method is invoked within a {@link Punctuator#punctuate(long) + * punctuation callback}, or while processing a record that was forwarded by a punctuation + * callback, it will return null. + * + * <p> If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent + * to the repartition topic. + * + * <p> Always returns null if this method is invoked within a + * ProductionExceptionHandler.handle(ErrorHandlerContext, ProducerRecord, Exception) Review Comment: Is this method ever called in `ProductionExceptionHandler.handle()`? Is this a copy&paste mistake from `ErrorHandlerContext`? ########## streams/src/main/java/org/apache/kafka/streams/processor/RecordContext.java: ########## @@ -110,4 +110,37 @@ public interface RecordContext { */ Headers headers(); + /** + * Return the non-deserialized byte[] of the input message key if the context has been triggered by a message. + * + * <p> If this method is invoked within a {@link Punctuator#punctuate(long) + * punctuation callback}, or while processing a record that was forwarded by a punctuation + * callback, it will return null. + * + * <p> If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent + * to the repartition topic. + * + * <p> Always returns null if this method is invoked within a + * ProductionExceptionHandler.handle(ErrorHandlerContext, ProducerRecord, Exception) + * + * @return the raw byte of the key of the source message + */ + byte[] sourceRawKey(); + + /** + * Return the non-deserialized byte[] of the input message value if the context has been triggered by a message. + * + * <p> If this method is invoked within a {@link Punctuator#punctuate(long) + * punctuation callback}, or while processing a record that was forwarded by a punctuation + * callback, it will return null. + * + * <p> If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent + * to the repartition topic. + * + * <p> Always returns null if this method is invoked within a + * ProductionExceptionHandler.handle(ErrorHandlerContext, ProducerRecord, Exception) Review Comment: Is this method ever called in `ProductionExceptionHandler.handle()`? Is this a copy&paste mistake from `ErrorHandlerContext`? ########## streams/src/main/java/org/apache/kafka/streams/processor/RecordContext.java: ########## @@ -110,4 +110,37 @@ public interface RecordContext { */ Headers headers(); + /** + * Return the non-deserialized byte[] of the input message key if the context has been triggered by a message. + * + * <p> If this method is invoked within a {@link Punctuator#punctuate(long) + * punctuation callback}, or while processing a record that was forwarded by a punctuation + * callback, it will return null. Review Comment: nit: ```suggestion * callback, it will return {@code null}. ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java: ########## @@ -1890,6 +1892,69 @@ public void shouldNotSendIfSendOfOtherTaskFailedInCallback() { )); } + @Test + public void shouldFreeRawRecordsInContextBeforeSending() { + final KafkaException exception = new KafkaException("KABOOM!"); + final byte[][] sourceRawData = new byte[][]{new byte[]{}, new byte[]{}}; + + final RecordCollector collector = new RecordCollectorImpl( + logContext, + taskId, + getExceptionalStreamsProducerOnSend(exception), + new ProductionExceptionHandler() { + @Override + public void configure(final Map<String, ?> configs) { + + } + + @Override + public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, final ProducerRecord<byte[], byte[]> record, final Exception exception) { + sourceRawData[0] = context.sourceRawKey(); + sourceRawData[1] = context.sourceRawValue(); + return ProductionExceptionHandlerResponse.CONTINUE; + } + }, + streamsMetrics, + topology + ); + + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, sinkNodeName, context, streamPartitioner); + collector.flush(); Review Comment: Why do you flush before the assertion? Could you assert before flush? ########## streams/src/main/java/org/apache/kafka/streams/processor/RecordContext.java: ########## @@ -110,4 +110,37 @@ public interface RecordContext { */ Headers headers(); + /** + * Return the non-deserialized byte[] of the input message key if the context has been triggered by a message. + * + * <p> If this method is invoked within a {@link Punctuator#punctuate(long) + * punctuation callback}, or while processing a record that was forwarded by a punctuation + * callback, it will return null. + * + * <p> If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent + * to the repartition topic. + * + * <p> Always returns null if this method is invoked within a + * ProductionExceptionHandler.handle(ErrorHandlerContext, ProducerRecord, Exception) + * + * @return the raw byte of the key of the source message + */ + byte[] sourceRawKey(); + + /** + * Return the non-deserialized byte[] of the input message value if the context has been triggered by a message. + * + * <p> If this method is invoked within a {@link Punctuator#punctuate(long) + * punctuation callback}, or while processing a record that was forwarded by a punctuation + * callback, it will return null. Review Comment: nit: ```suggestion * callback, it will return {@code null}. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org