[ 
https://issues.apache.org/jira/browse/KAFKA-4486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15723370#comment-15723370
 ] 

Guozhang Wang commented on KAFKA-4486:
--------------------------------------

The distinguishment is not in Streams, but in the embedded Producer / Consumer 
clients. Anyways, I think your raised issue is indeed a bug that we should fix: 
if shutdown is called through the raised exception, commit offsets step should 
not be triggered, though other steps like flushing the store, releasing state 
store dir lock should still complete.

We will start to propose a fix asap on this JIRA.

> Kafka Streams - exception in process still commits offsets
> ----------------------------------------------------------
>
>                 Key: KAFKA-4486
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4486
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.1.0
>         Environment: Java 8
>            Reporter: Joel Lundell
>
> I'm building a streams application and would like to be able to control the 
> commits manually using ProcessorContext#commit() from an instance of 
> org.apache.kafka.streams.processor.Processor.
> My use case is that I want to read messages from a topic and push them to AWS 
> SQS and I need to be able to guarantee that all messages reach the queue at 
> least once. I also want to use SQS batching support so my approach at the 
> moment is that in Processor#process i'm saving X records in a data structure 
> and when I have a full batch I send it off and if successful i commit. If I 
> for any reason can't deliver the records I don't want the offsets being 
> committed so that when processing works again I can start processing from the 
> last successful record.
> When I was trying out the error handling I noticed that if I create a 
> Processor and in the process method always throw an exception that will 
> trigger StreamThread#shutdownTaskAndState which calls 
> StreamThread#commitOffsets and next time I run the application it starts as 
> if the previous "record" was successfully processed.
> Is there a way to achieve what I'm looking for?
> I found a similar discussion in 
> https://issues.apache.org/jira/browse/KAFKA-3491 but that issue is still open.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to