iit2009060 commented on code in PR #21535:
URL: https://github.com/apache/kafka/pull/21535#discussion_r2894125144
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##########
@@ -311,33 +321,100 @@ private void reprocessState(final List<TopicPartition>
topicPartitions,
record.headers());
globalProcessorContext.setRecordContext(recordContext);
- try {
- if (record.key() != null) {
- source.process(new Record(
+ if (record.key() != null) {
+ // Deserialization phase
+ final Record<?, ?> deserializedRecord;
+ try {
+ deserializedRecord = new Record<>(
reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key()),
reprocessFactory.valueDeserializer().deserialize(record.topic(),
record.value()),
record.timestamp(),
- record.headers()));
+ record.headers());
+ } catch (final Exception deserializationException) {
+ // while Java distinguishes checked vs unchecked
exceptions, other languages
+ // like Scala or Kotlin do not, and thus we need
to catch `Exception`
+ // (instead of `RuntimeException`) to work well
with those languages
+ handleDeserializationFailure(
+ deserializationExceptionHandler,
+ globalProcessorContext,
+ deserializationException,
+ record,
+ log,
+ droppedRecordsSensor(
+ Thread.currentThread().getName(),
+ globalProcessorContext.taskId().toString(),
+ globalProcessorContext.metrics()
+ ),
+ null
+ );
+ continue; // Skip this record
+ }
+ final ProcessingExceptionHandler.Response response;
+ // Processing phase
+ try {
+ ((Processor) source).process(deserializedRecord);
restoreCount++;
batchRestoreCount++;
+ } catch (final Exception processingException) {
+ // while Java distinguishes checked vs unchecked
exceptions, other languages
+ // like Scala or Kotlin do not, and thus we need
to catch `Exception`
+ // (instead of `RuntimeException`) to work well
with those languages
+ if (processingExceptionHandler != null) {
+ final ErrorHandlerContext errorHandlerContext
= new DefaultErrorHandlerContext(
+ globalProcessorContext,
+ record.topic(),
+ record.partition(),
+ record.offset(),
+ record.headers(),
+ null,
+ globalProcessorContext.taskId(),
+ record.timestamp(),
+ record.key(),
+ record.value()
+ );
+ try {
+ response =
+
Objects.requireNonNull(processingExceptionHandler.handleError(
+ errorHandlerContext,
+ deserializedRecord,
+ processingException
+ ), "Invalid ProcessingExceptionHandler
response");
+ if
(!response.deadLetterQueueRecords().isEmpty()) {
+ log.warn("Dead letter queue records
cannot be sent for GlobalKTable processors " +
+ "(no producer available). DLQ
support for GlobalKTable will be addressed in a future KIP. " + "Record
context: {}",
+ errorHandlerContext);
+ }
+ } catch (final Exception fatalUserException) {
+ log.error(
+ "Processing error callback failed
after processing error for record: {}",
+ errorHandlerContext,
+ processingException
+ );
+ throw new FailedProcessingException(
+ "Fatal user code error in
processing error callback",
+ null,
+ fatalUserException
+ );
+ }
+
+ if (response.result() ==
ProcessingExceptionHandler.Result.FAIL) {
+ log.error("Processing exception handler is
set to fail upon" +
+ " a processing error. If you would
rather have the streaming pipeline" +
+ " continue after a processing
error, please set the " +
+
PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.");
+ throw new FailedProcessingException(null,
processingException);
+ }
+ // RESUME - log and continue
+ log.warn("Processing exception handler chose
to resume for record at offset {}", record.offset(), processingException);
+ droppedRecordsSensor(
Review Comment:
I updated with member variable.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]