Mikhail Dubrovin created KAFKA-13681:
----------------------------------------

             Summary: Event duplicates for partition-stuck kafka-stream 
application
                 Key: KAFKA-13681
                 URL: https://issues.apache.org/jira/browse/KAFKA-13681
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 2.8.1
            Reporter: Mikhail Dubrovin
         Attachments: fail_topology.txt

Hello,

We found the following unpredictable behavior of Kafka streams:
{code:java}
public void buildStreams(final BuilderHelper builder) {        
KTable<TableId, TableValue> table = builder.table();        
KTable<TableId, ArrayList<InternalWorkflowDTO>> workflowTable = 
workflowTable(builder);        
table
                .mapValues(value -> mappers.mainDTO(value))
                .leftJoin(workflowTable, mappers::joinWorkflows)
                .toStream()
                .map((key, value) -> KeyValue.pair(
                        AggregateId.newBuilder().setId(value.getId()).build(),
                        mappers.aggregateDTO(value)))
                .peek((k, v) -> logSinkRecord(v))
                .filter((id, dto) -> !isReprocessing)
                .to(...);
    }    

private static KTable<TableId, ArrayList<InternalWorkflowDTO>> 
workflowTable(BuilderHelper builderHelper) {
            return builderHelper.workflowTable()
                    .groupBy((id, workflow) -> KeyValue.pair(
                            
TableId.newBuilder().setId(workflow.getTableId()).build(),
                            mappers.mapWorkflow(workflow)),
                            Grouped.with(...))
                    .aggregate(ArrayList::new, (key, value, agg) -> {
                        agg.add(value);
                        return agg;
                    }, (key, value, agg) -> {
                        agg.remove(value);
                        return agg;
                    }, Materialized.with(...));
        } {code}
it is a small part of our topology but it shows the error flow.

*Data structure:*

We have two many-partition topics: entity and workflow. Every topic is 
represented as KTable.

*Data error that causes application shutdown:*

Our final event(join the entity and workflow ktables) expects a not-null field 
in the entity but for some reason, it comes for one event. The whole aggregator 
fails in _mappers.aggregateDTO(value)_ of the _buildStreams_ method 

We have a health check which restarts the aggregator if it fails.

When incorrect data comes to one partition, the partition processing is stuck 
but other partitions are processed.

It causes that at every restart, _workflowTable_ topology repeats .aggregate() 
add/remove flows and puts new List into the repartition topic. But offsets are 
not moved for processed partitions due to the aggregator's shutdown.

_This behavior generates/sinks a lot of final entity duplicates at every 
restart because the flow is successful for data from a not-corrupted partition 
but offsets are not moved for them._ 

And it also causes troubles if @EqualsAndHashCode is defined to use all fields 
to compare. At every restart, the topology tries to remove the old value(not 
existing after the first run) and adds a new value at the end of the list. The 
list grows after each restart(contains the same - new value values).

I also attached the topology description. To visualize: 
[https://zz85.github.io/kafka-streams-viz/]

*Current workaround:*

To redefine @EqualsAndHashCode to use entities' ids only.

*Not solved issue:*

Sink events duplication at every restart.

Thank you in advance!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to