[ https://issues.apache.org/jira/browse/KAFKA-8228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Boquan Tang resolved KAFKA-8228. -------------------------------- Resolution: Duplicate This might duplicate KAFKA-7866, close for now and watch that ticket. > Exactly once semantics break during server restart for kafka-streams > application > -------------------------------------------------------------------------------- > > Key: KAFKA-8228 > URL: https://issues.apache.org/jira/browse/KAFKA-8228 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.2.0 > Reporter: Boquan Tang > Priority: Major > > We are using 2.2.0 for kafka-streams client and 2.0.1 for server. > We have a simple kafka-streams application that has the following topology: > {code:java} > Source: KSTREAM-SOURCE-0000000004 (topics: [deduped-adclick]) > --> KSTREAM-TRANSFORM-0000000005 > Processor: KSTREAM-TRANSFORM-0000000005 (stores: [uid-offset-store]) > --> KSTREAM-TRANSFORM-0000000006 > <-- KSTREAM-SOURCE-0000000004 > Source: KSTREAM-SOURCE-0000000000 (topics: [advertiser-budget]) > --> KTABLE-SOURCE-0000000001 > Source: KSTREAM-SOURCE-0000000002 (topics: [advertisement-budget]) > --> KTABLE-SOURCE-0000000003 > Processor: KSTREAM-TRANSFORM-0000000006 (stores: [advertiser-budget-store, > advertisement-budget-store]) > --> KSTREAM-SINK-0000000007 > <-- KSTREAM-TRANSFORM-0000000005 > Sink: KSTREAM-SINK-0000000007 (topic: budget-adclick) > <-- KSTREAM-TRANSFORM-0000000006 > Processor: KTABLE-SOURCE-0000000001 (stores: [advertiser-budget-store]) > --> none > <-- KSTREAM-SOURCE-0000000000 > Processor: KTABLE-SOURCE-0000000003 (stores: [advertisement-budget-store]) > --> none > <-- KSTREAM-SOURCE-0000000002{code} > The *Processor: KSTREAM-TRANSFORM-0000000005 (stores: [uid-offset-store])* is > added additionally to investigate this EOS broken issue, and its transform() > is like this (specific K V class name is removed): > {code:java} > public void init(final ProcessorContext context) { > uidStore = (WindowStore<String, Long>) > context.getStateStore(uidStoreName); > this.context = context; > } > public KeyValue<K, V> transform(final K key, final V value) { > final long offset = context.offset(); > final String uid = value.getUid(); > final long beginningOfHour = > Instant.ofEpochMilli(clickTimestamp).atZone(ZoneId.systemDefault()).withMinute(0).withSecond(0).toEpochSecond() > * 1000; > final Long maybeLastOffset = uidStore.fetch(uid, beginningOfHour); > final boolean dupe = null != maybeLastOffset && offset == maybeLastOffset; > uidStore.put(uid, offset, beginningOfHour); > if (dupe) { > LOGGER.warn("Find duplication in partition {}, uid is {}, current > offset is {}, last offset is {}", > context.partition(), > uid, > offset, > maybeLastOffset); > statsEmitter.count("duplication"); > } > return dupe ? null : new KeyValue<>(key, value); > } > {code} > Although not 100% reproduce-able, we found that after we restart one or more > server on the cluster side, the duplication would be found: > {code:java} > 2019-04-12T07:12:58Z WARN [org.apache.kafka.clients.NetworkClient] > [kafka-producer-network-thread | > adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer] > [Producer > clientId=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer, > transactionalId=adclick-budget-decorator-streams-0_9] Connection to node 2 > (*:9092) could not be established. Broker may not be available. > 2019-04-12T07:12:58Z WARN [org.apache.kafka.clients.NetworkClient] > [kafka-producer-network-thread | > adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer] > [Producer > clientId=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer, > transactionalId=adclick-budget-decorator-streams-0_9] Connection to node 2 > (*:9092) could not be established. Broker may not be available. > 2019-04-12T07:14:02Z WARN [org.apache.kafka.clients.NetworkClient] > [kafka-producer-network-thread | > adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_12-producer] > [Producer > clientId=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_12-producer, > transactionalId=adclick-budget-decorator-streams-0_12] Connection to node 2 > (*:9092) could not be established. Broker may not be available. > 2019-04-12T07:27:39Z WARN > [org.apache.kafka.streams.processor.internals.StreamThread] > [adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1] > stream-thread > [adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1] > Detected task 0_9 that got migrated to another thread. This implies that > this thread missed a rebalance and dropped out of the consumer group. Will > try to rejoin the consumer group. Below is the detailed description of the > task: >TaskId: 0_9 >> ProcessorTopology: > KSTREAM-SOURCE-0000000000: > > topics: [advertiser-budget] > children: [KTABLE-SOURCE-0000000001] > > KTABLE-SOURCE-0000000001: > states: [advertiser-budget-store] > > KSTREAM-SOURCE-0000000004: > topics: [deduped-adclick] > children: > [KSTREAM-TRANSFORM-0000000005] > KSTREAM-TRANSFORM-0000000005: > states: > [uid-offset-store] > children: [KSTREAM-TRANSFORM-0000000006] > > KSTREAM-TRANSFORM-0000000006: > states: [advertiser-budget-store, > advertisement-budget-store] > children: [KSTREAM-SINK-0000000007] > > KSTREAM-SINK-0000000007: > topic: StaticTopicNameExtractor(budget-adclick) > > KSTREAM-SOURCE-0000000002: > topics: [advertisement-budget] > children: > [KTABLE-SOURCE-0000000003] > KTABLE-SOURCE-0000000003: > states: > [advertisement-budget-store] >Partitions [advertiser-budget-9, > deduped-adclick-9, advertisement-budget-9] > 2019-04-12T07:27:40Z WARN [org.apache.kafka.common.utils.AppInfoParser] > [adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1] > Error registering AppInfo mbean > javax.management.InstanceAlreadyExistsException > javax.management.InstanceAlreadyExistsException: > kafka.producer:type=app-info,id=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_18-producer > at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) > at > com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) > at > org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62) > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:424) > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:287) > at > org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getProducer(DefaultKafkaClientSupplier.java:39) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createProducer(StreamThread.java:457) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.lambda$createTask$0(StreamThread.java:447) > at > org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:192) > at > org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:172) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:448) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:399) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:384) > at > org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks(TaskManager.java:148) > at > org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:107) > at > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:281) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:292) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:410) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:344) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:342) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1226) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1191) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:941) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:846) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774) > 2019-04-12T07:30:28Z WARN > [com.indeed.adclick.budget.decorator.streams.transformers.UidDedupeTransformer] > > [adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1] > Find duplication in partition 18, uid is 1d8868ce40umu002, current offset is > 212770034, last offset is 212770034 > 2019-04-12T07:30:28Z WARN > [com.indeed.adclick.budget.decorator.streams.transformers.UidDedupeTransformer] > > [adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1] > Find duplication in partition 18, uid is 1d8868du40u1u001, current offset is > 212770036, last offset is 212770036{code} > And our kafka-streams are configured simply like this: > {code:java} > streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, > kafkaStreamsApplicationId); > streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > bootstrapServers); > streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > Serdes.String().getClass().getName()); > streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, > Serdes.String().getClass().getName()); > streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > EXACTLY_ONCE); > streamsConfiguration.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); > streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.BATCH_SIZE_CONFIG), > 1000); > {code} > In my understanding, the uid-offset-store in topology should be committed in > the same transaction with that of the consumer offsets of deduped-adclick > topic, so in theory the duplication check should not hit, am I understanding > it correctly? > I noticed one of the unusual issue on the server side is that the group > coordinator needs long time to initiate when server restarts, that caused the > long halt between node loss and eventual task migration. > Please let me know if I need to look into certain server side logs, I can > share any finding with you, or even perform more of destroy test in order to > re-produce the issue so we can investigate. > Update: we did some more test, and observed that the rollback / EOS break > issue only happens when the lead node (node 2 in this case) of the > __consumer_offsets partition corresponding to this particular kafka streams > is restarted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)