Mikhail Dubrovin created KAFKA-13681: ----------------------------------------
Summary: Event duplicates for partition-stuck kafka-stream application Key: KAFKA-13681 URL: https://issues.apache.org/jira/browse/KAFKA-13681 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.8.1 Reporter: Mikhail Dubrovin Attachments: fail_topology.txt Hello, We found the following unpredictable behavior of Kafka streams: {code:java} public void buildStreams(final BuilderHelper builder) { KTable<TableId, TableValue> table = builder.table(); KTable<TableId, ArrayList<InternalWorkflowDTO>> workflowTable = workflowTable(builder); table .mapValues(value -> mappers.mainDTO(value)) .leftJoin(workflowTable, mappers::joinWorkflows) .toStream() .map((key, value) -> KeyValue.pair( AggregateId.newBuilder().setId(value.getId()).build(), mappers.aggregateDTO(value))) .peek((k, v) -> logSinkRecord(v)) .filter((id, dto) -> !isReprocessing) .to(...); } private static KTable<TableId, ArrayList<InternalWorkflowDTO>> workflowTable(BuilderHelper builderHelper) { return builderHelper.workflowTable() .groupBy((id, workflow) -> KeyValue.pair( TableId.newBuilder().setId(workflow.getTableId()).build(), mappers.mapWorkflow(workflow)), Grouped.with(...)) .aggregate(ArrayList::new, (key, value, agg) -> { agg.add(value); return agg; }, (key, value, agg) -> { agg.remove(value); return agg; }, Materialized.with(...)); } {code} it is a small part of our topology but it shows the error flow. *Data structure:* We have two many-partition topics: entity and workflow. Every topic is represented as KTable. *Data error that causes application shutdown:* Our final event(join the entity and workflow ktables) expects a not-null field in the entity but for some reason, it comes for one event. The whole aggregator fails in _mappers.aggregateDTO(value)_ of the _buildStreams_ method We have a health check which restarts the aggregator if it fails. When incorrect data comes to one partition, the partition processing is stuck but other partitions are processed. It causes that at every restart, _workflowTable_ topology repeats .aggregate() add/remove flows and puts new List into the repartition topic. But offsets are not moved for processed partitions due to the aggregator's shutdown. _This behavior generates/sinks a lot of final entity duplicates at every restart because the flow is successful for data from a not-corrupted partition but offsets are not moved for them._ And it also causes troubles if @EqualsAndHashCode is defined to use all fields to compare. At every restart, the topology tries to remove the old value(not existing after the first run) and adds a new value at the end of the list. The list grows after each restart(contains the same - new value values). I also attached the topology description. To visualize: [https://zz85.github.io/kafka-streams-viz/] *Current workaround:* To redefine @EqualsAndHashCode to use entities' ids only. *Not solved issue:* Sink events duplication at every restart. Thank you in advance! -- This message was sent by Atlassian Jira (v8.20.1#820001)