[ 
https://issues.apache.org/jira/browse/KAFKA-8228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boquan Tang resolved KAFKA-8228.
--------------------------------
    Resolution: Duplicate

This might duplicate KAFKA-7866, close for now and watch that ticket.

> Exactly once semantics break during server restart for kafka-streams 
> application
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-8228
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8228
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.2.0
>            Reporter: Boquan Tang
>            Priority: Major
>
> We are using 2.2.0 for kafka-streams client and 2.0.1 for server.
> We have a simple kafka-streams application that has the following topology:
> {code:java}
> Source: KSTREAM-SOURCE-0000000004 (topics: [deduped-adclick]) 
> --> KSTREAM-TRANSFORM-0000000005 
> Processor: KSTREAM-TRANSFORM-0000000005 (stores: [uid-offset-store]) 
> --> KSTREAM-TRANSFORM-0000000006 
> <-- KSTREAM-SOURCE-0000000004 
> Source: KSTREAM-SOURCE-0000000000 (topics: [advertiser-budget]) 
> --> KTABLE-SOURCE-0000000001 
> Source: KSTREAM-SOURCE-0000000002 (topics: [advertisement-budget]) 
> --> KTABLE-SOURCE-0000000003 
> Processor: KSTREAM-TRANSFORM-0000000006 (stores: [advertiser-budget-store, 
> advertisement-budget-store]) 
> --> KSTREAM-SINK-0000000007 
> <-- KSTREAM-TRANSFORM-0000000005 
> Sink: KSTREAM-SINK-0000000007 (topic: budget-adclick) 
> <-- KSTREAM-TRANSFORM-0000000006 
> Processor: KTABLE-SOURCE-0000000001 (stores: [advertiser-budget-store]) 
> --> none 
> <-- KSTREAM-SOURCE-0000000000 
> Processor: KTABLE-SOURCE-0000000003 (stores: [advertisement-budget-store]) 
> --> none 
> <-- KSTREAM-SOURCE-0000000002{code}
> The *Processor: KSTREAM-TRANSFORM-0000000005 (stores: [uid-offset-store])* is 
> added additionally to investigate this EOS broken issue, and its transform() 
> is like this (specific K V class name is removed):
> {code:java}
> public void init(final ProcessorContext context) {
>     uidStore = (WindowStore<String, Long>) 
> context.getStateStore(uidStoreName);
>     this.context = context;
> }
> public KeyValue<K, V> transform(final K key, final V value) {
>     final long offset = context.offset();
>     final String uid = value.getUid();
>     final long beginningOfHour = 
> Instant.ofEpochMilli(clickTimestamp).atZone(ZoneId.systemDefault()).withMinute(0).withSecond(0).toEpochSecond()
>  * 1000;
>     final Long maybeLastOffset = uidStore.fetch(uid, beginningOfHour);
>     final boolean dupe = null != maybeLastOffset && offset == maybeLastOffset;
>     uidStore.put(uid, offset, beginningOfHour);
>     if (dupe) {
>         LOGGER.warn("Find duplication in partition {}, uid is {}, current 
> offset is {}, last offset is {}",
>             context.partition(),
>             uid,
>             offset,
>             maybeLastOffset);
>         statsEmitter.count("duplication");
>     }
>     return dupe ? null : new KeyValue<>(key, value);
> }
> {code}
> Although not 100% reproduce-able, we found that after we restart one or more 
> server on the cluster side, the duplication would be found:
> {code:java}
> 2019-04-12T07:12:58Z WARN [org.apache.kafka.clients.NetworkClient] 
> [kafka-producer-network-thread | 
> adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer]
>  [Producer 
> clientId=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer,
>  transactionalId=adclick-budget-decorator-streams-0_9] Connection to node 2 
> (*:9092) could not be established. Broker may not be available.
> 2019-04-12T07:12:58Z WARN [org.apache.kafka.clients.NetworkClient] 
> [kafka-producer-network-thread | 
> adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer]
>  [Producer 
> clientId=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer,
>  transactionalId=adclick-budget-decorator-streams-0_9] Connection to node 2 
> (*:9092) could not be established. Broker may not be available.
> 2019-04-12T07:14:02Z WARN [org.apache.kafka.clients.NetworkClient] 
> [kafka-producer-network-thread | 
> adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_12-producer]
>  [Producer 
> clientId=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_12-producer,
>  transactionalId=adclick-budget-decorator-streams-0_12] Connection to node 2 
> (*:9092) could not be established. Broker may not be available.
> 2019-04-12T07:27:39Z WARN 
> [org.apache.kafka.streams.processor.internals.StreamThread] 
> [adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1]
>  stream-thread 
> [adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1]
>  Detected task 0_9 that got migrated to another thread. This implies that 
> this thread missed a rebalance and dropped out of the consumer group. Will 
> try to rejoin the consumer group. Below is the detailed description of the 
> task: >TaskId: 0_9 >> ProcessorTopology: > KSTREAM-SOURCE-0000000000: > 
> topics: [advertiser-budget] > children: [KTABLE-SOURCE-0000000001] > 
> KTABLE-SOURCE-0000000001: > states: [advertiser-budget-store] > 
> KSTREAM-SOURCE-0000000004: > topics: [deduped-adclick] > children: 
> [KSTREAM-TRANSFORM-0000000005] > KSTREAM-TRANSFORM-0000000005: > states: 
> [uid-offset-store] > children: [KSTREAM-TRANSFORM-0000000006] > 
> KSTREAM-TRANSFORM-0000000006: > states: [advertiser-budget-store, 
> advertisement-budget-store] > children: [KSTREAM-SINK-0000000007] > 
> KSTREAM-SINK-0000000007: > topic: StaticTopicNameExtractor(budget-adclick) > 
> KSTREAM-SOURCE-0000000002: > topics: [advertisement-budget] > children: 
> [KTABLE-SOURCE-0000000003] > KTABLE-SOURCE-0000000003: > states: 
> [advertisement-budget-store] >Partitions [advertiser-budget-9, 
> deduped-adclick-9, advertisement-budget-9]
> 2019-04-12T07:27:40Z WARN [org.apache.kafka.common.utils.AppInfoParser] 
> [adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1]
>  Error registering AppInfo mbean 
> javax.management.InstanceAlreadyExistsException 
> javax.management.InstanceAlreadyExistsException: 
> kafka.producer:type=app-info,id=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_18-producer
>  at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>  at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>  at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>  at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>  at 
> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) 
> at 
> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:424)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:287)
>  at 
> org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getProducer(DefaultKafkaClientSupplier.java:39)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createProducer(StreamThread.java:457)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.lambda$createTask$0(StreamThread.java:447)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:192)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:172)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:448)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:399)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:384)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks(TaskManager.java:148)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:107)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:281)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:292)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:410)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:344)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:342)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1226)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1191) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176) 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:941)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:846)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
> 2019-04-12T07:30:28Z WARN 
> [com.indeed.adclick.budget.decorator.streams.transformers.UidDedupeTransformer]
>  
> [adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1]
>  Find duplication in partition 18, uid is 1d8868ce40umu002, current offset is 
> 212770034, last offset is 212770034
> 2019-04-12T07:30:28Z WARN 
> [com.indeed.adclick.budget.decorator.streams.transformers.UidDedupeTransformer]
>  
> [adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1]
>  Find duplication in partition 18, uid is 1d8868du40u1u001, current offset is 
> 212770036, last offset is 212770036{code}
> And our kafka-streams are configured simply like this:
> {code:java}
> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
> kafkaStreamsApplicationId);
> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
> bootstrapServers);
> streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass().getName());
> streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass().getName());
> streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> EXACTLY_ONCE);
> streamsConfiguration.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.BATCH_SIZE_CONFIG),
>  1000);
> {code}
> In my understanding, the uid-offset-store in topology should be committed in 
> the same transaction with that of the consumer offsets of deduped-adclick 
> topic, so in theory the duplication check should not hit, am I understanding 
> it correctly?
> I noticed one of the unusual issue on the server side is that the group 
> coordinator needs long time to initiate when server restarts, that caused the 
> long halt between node loss and eventual task migration.
> Please let me know if I need to look into certain server side logs, I can 
> share any finding with you, or even perform more of destroy test in order to 
> re-produce the issue so we can investigate.
> Update: we did some more test, and observed that the rollback / EOS break 
> issue only happens when the lead node (node 2 in this case) of the 
> __consumer_offsets partition corresponding to this particular kafka streams 
> is restarted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to