>From StreamThread100.log : DEBUG | 13:35:32 | clients.NetworkClient (NetworkClient.java:760) - Initiating connection to node 1 at emspproximitykafka5102.mydomain.com:9092.
Can you take a look at the log on emspproximitykafka5102 to see if there was some clue around the time of disconnection ? On Sat, Sep 16, 2017 at 10:34 PM, dev loper <[email protected]> wrote: > Hi Ted/Bill, > > I deleted the data directory of Kafka Brokers and restarted brokers > yesterday. It has been better since , But the problem still exits, Below > are my observations. > > *----- *The application pauses in between and take a long time to resume > in certain instances for no reason I could identify . All this time > application is not processing messages and the application processing slows > down on other instance, I understand that some kind of rebalance is going > on, But I am not sure what triggered this. From application logs I could > see the message processing logic what I have is not taking time . Its near > zero and I am not doing anything more than few mathematical computations > and forwarding the messages to sink. Only think I could correlate is , It > is going for some metadata version upgrade is going on and it is also one > other contributing factor for long pauses. > > Below are the sequence of the events when this happens: > > a) DEBUG | 23:19:45 | internals.ProcessorStateManager > (ProcessorStateManager.java:256) - task [0_1] Flushing all stores > registered in the state manager > DEBUG | 23:19:45 | internals.RecordCollectorImpl > (RecordCollectorImpl.java:142) - task [0_1] Flushing producer > DEBUG | 23:19:45 | internals.StreamTask (StreamTask.java:288) - task [0_1] > Committing offsets > > > > * It happens from task [0_1] to [0_35] since I have 36 Partitions .* > *b) *Here I could see Commit interval took time 62013 ms, but I am not > sure why it took > 62013 ms , earlier I found out in punctuate while I am putting back to > state store , state store was taking time over the period of time, I have > commented the portion of the code where I am putting back to store during > this test to rule out that issue. On a separate thread I have raised my > queries regarding state store performance improvement. > > DEBUG | 23:19:51 | internals.ConsumerCoordinator > (ConsumerCoordinator.java:758) - Group > ProximityKafka-proxkafkalivereplicaengine14 > committed offset 2500775 for partition MYTOPIC05SEPT-17 > DEBUG | 23:19:57 | internals.ConsumerCoordinator > (ConsumerCoordinator.java:758) - Group > ProximityKafka-proxkafkalivereplicaengine14 > committed offset 2518966 for partition MYTOPIC05SEPT-35 > DEBUG | 23:19:57 | internals.StreamThread (StreamThread.java:775) - > stream-thread [ProximityKafka-proxkafkalivereplicaengine14- > dd702abb-e1ad-4640-aad3-cc5aa922e373-StreamThread-1] Committing all > active tasks [0_0, 0_1, 0_2, 0_3, 0_4, 0_5, 0_6, 0_7, 0_8, 0_9, 0_10, 0_11, > 0_12, 0_13, 0_14, 0_15, 0_16, 0_17, 0_18, 0_19, 0_20, 0_21, 0_22, 0_23, > 0_24, 0_25, 0_26, 0_27, 0_28, 0_29, 0_30, 0_31, 0_32, 0_33, 0_34, 0_35] and > standby tasks [] because the commit interval 30000ms has elapsed by 62013ms > DEBUG | 23:19:57 | internals.ProcessorStateManager > (ProcessorStateManager.java:256) - task [0_0] Flushing all stores > registered in the state manager > > *c) *Some times I have observed below error , But I don't have any clue > why its says Node 1 disconnected. I am able to do a telnet to both Kafka > broker at the same time and How I can avoid below error, since it is taking > considerable toll on application performance. > > DEBUG | 00:16:41 | clients.NetworkClient (NetworkClient.java:702) - Node 1 > disconnected. > DEBUG | 00:16:41 | clients.NetworkClient (NetworkClient.java:889) - > Sending metadata request (type=MetadataRequest, > topics=ProximityVisitEventsProtoBuf) > to node 0 > DEBUG | 00:16:41 | clients.Metadata (Metadata.java:251) - Updated cluster > metadata version 14 to Cluster(id = NR-WaCkTRwK9Dl2sNKozPQ, nodes = [ > mykafka5102.mydomain.com:9092 (id: 1 rack: null), > mykafka5101.mydomain.com:9092 (id: 0 rack: null)], partitions = > [Partition(topic = ProximityVisitEventsProtoBuf, partition = 0, leader = 0, > replicas = [0], isr = [0])]) > > > *c) *After this new consumer is created and it starts processing > messages. Again it goes back to Step a and the application paused for > long time . > > *-----* Kafka Re balance is taking lot of time when I start the instance > one after the another and if the gap between application start time > between one instance and other instance is more than a minute. > > Below I have mentioned the hardware configuration and number of instances > we are using for this solution. Please let me know if hardware is the > limiting factor here. We didn't go for higher configuration since the load > average on these instances were quite low and I could hardly see any CPU > spikes . > > Kafka Machine Machine Details: - 2 Broker Instances with below > Configuration , (Current CPU Usage 2%- 8%) > > Instance Type : AWS T2 Large > Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS > > Kafka Streams Instance : 3 Kafka Streams Application Instances (Current > CPU Usage 8%- 24%) > > Instance Type : AWS M4 Large > Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS (Dedicated > EBS bandwidth 450 mbps) > Thanks > > Dev > > > > On Sun, Sep 17, 2017 at 10:01 AM, Ted Yu <[email protected]> wrote: > >> I downloaded and expanded StreamThread100.log >> >> I found the following line: >> >> StreamThread100.log:org.apache.kafka.streams.errors.StreamsException: >> stream-thread failed to suspend stream tasks >> >> Previously I was using this line from your previous email: >> >> StreamThread100.log:org.apache.kafka.streams.errors.StreamsException: >> stream-thread failed to suspend stream tasks >> >> The error seems to correspond to the following in >> ConsumerCoordinator.sendOffsetCommitRequest(): >> >> if (generation == null) >> >> return RequestFuture.failure(new CommitFailedException()); >> Let me study the code more. >> >> On Sat, Sep 16, 2017 at 9:09 PM, dev loper <[email protected]> wrote: >> >>> Hi Ted , >>> >>> You might be looking at he logs which I shared earlier. Later in the >>> mail chain I have provided another set of StreamThread logs . I have >>> re-attahced the logs with this mail for your reference .Also I have copied >>> the mail content below. >>> >>> Hi Damian, >>> >>> I have repeated my tests with slight configuration change. The current >>> logs captured for "StreamThread" keyword has more relevant logs when >>> compared to logs which i shared previously. I started the application on >>> instances 100,101 and 102 simultaneously with below configuration >>> >>> 1) Reduced MAX_POLL_RECORDS_CONFIG to 5000 (previously 50000) >>> 2) Reduced MAX_POLL_RECORDS_CONFIG =60000 (Ipreviously nteger.MAXVALUE) >>> >>> When the application started all three instances started processing for >>> first few minutes everything went well. After that I could see that >>> "StreamThread100" error consumer was going for a toss and it started >>> closing and creating the consumers for a while exactly with the pattern of >>> logs I mentioned in my previous email and after some time I could see that >>> " StreamThread100" stopped processing messages with below exception and the >>> other two continued processing messages without any issues. >>> >>> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot >>> be completed since the group has already rebalanced and assigned the >>> partitions to another member. This means that the time between subsequent >>> calls to poll() was longer than the configured max.poll.interval.ms, >>> which typically implies that the poll loop is spending too much time >>> message processing. You can address this either by increasing the session >>> timeout or by reducing the maximum size of batches returned in poll() with >>> max.poll.records. >>> >>> I think since the consumers were starting and stopping there was no poll >>> made form the system . Since I reduced Reduced MAX_POLL_RECORDS_CONFIG >>> =60000 and the processors were getting closed and started which might have >>> resulted in the "CommitFailedException due to non avialability of >>> processing processors. >>> >>> After some time the issue got propagated to other servers, I have >>> attached the relevant logs with this mail Kindly go through this and let me >>> know how I can solve this issue ? >>> >>> >>> On Sun, Sep 17, 2017 at 2:23 AM, Ted Yu <[email protected]> wrote: >>> >>>> I searched for 'stream-thread failed to' in StreamThread100.log but >>>> didn't find any occurrence: >>>> >>>> StreamThread100.log:org.apache.kafka.streams.errors.StreamsException: >>>> stream-thread failed to suspend stream tasks >>>> >>>> Can you double check ? >>>> >>>> On Fri, Sep 15, 2017 at 8:09 PM, dev loper <[email protected]> wrote: >>>> >>>>> Hi All , >>>>> >>>>> @Bill, >>>>> >>>>> I will reduce the MAX_POLL_RECORDS to 500/1000 and I will share the >>>>> results shortly. >>>>> >>>>> @Ted, >>>>> >>>>> Yes I reduced MAX_POLL_RECORDS_CONFIG from 50000 to 5000 . It was not >>>>> a typo . Do you think 50000 is way too high for an kafkaStreams >>>>> Application >>>>> ? My Spark application which I was trying to replace with kafka streams >>>>> was processing 250000 messages per 5 second batch, That was the reason I >>>>> set 50000 records for MAX_POLL_RECORDS_CONFIG . >>>>> >>>>> I don't think I have hit KAFKA-5397since I couldn't find any >>>>> instance "org.apache.kafka.streams.errors.LockException" in my logs. >>>>> I could see below exception , but these exceptions are triggered long >>>>> after the application stopped consuming any messages . >>>>> >>>>> StreamThread100.log:org.apache.kafka.streams.errors.StreamsException: >>>>> stream-thread failed to suspend stream tasks >>>>> User provided listener org.apache.kafka.streams.proce >>>>> ssor.internals.StreamThread$RebalanceListener for group >>>>> myKafka-kafkareplica101Sept08 failed on partition revocation >>>>> >>>>> @Damian , I figured out the pattern below, I don't know whether it >>>>> helps. The streamthread logs which I shared, Did it help?. I >>>>> >>>>> >>>>> I couldn't figure of the reason why the consumers are getting closed >>>>> while its getting allocated and suddenly stopped without any reason. If >>>>> you >>>>> look at the pattern >>>>> >>>>> 1) Consumer is getting Created with Config Values Supplied by the >>>>> Application. >>>>> 2) Adding Sensors >>>>> 3) Fetching API Version and Initiating Connections to Kafka Brokers >>>>> (NetworkClient.java) >>>>> 4) Sending metadata request and Recorded API Version From Broker >>>>> (NetworkClient.java) >>>>> 5) Updated cluster metadata version (Some Incremental verison ) to >>>>> Cluster >>>>> 6) Discovered coordinator (One of the kafka Brokers) >>>>> 7) Initiating connection to coordinator (One of the kafka Brokers) >>>>> 8) fetching committed offsets for partitions ( >>>>> ConsumerCoordinator.java:826) >>>>> 9) Recorded API versions for node >>>>> 10) Resetting offset for partition( for all the partitions) >>>>> 11) Handling ListOffsetResponse ( for all the partitions) >>>>> 12) Removing Sensors ( I couldn't see any exceptions though) >>>>> 13) consumer.KafkaConsumer (KafkaConsumer.java:1617) - The Kafka >>>>> consumer has closed. ( >>>>> I couldn't see any exceptions though, I couldn't figure out the reason >>>>> why KafkaConsumer Closed , no reasons provided in the logs) >>>>> 14) The kafka streams application goes back o step 1 and creates the >>>>> consumer again.( the whole process is repeated but the application 90% of >>>>> the time never recovers) >>>>> 15) Since I have introduced the limit for MAX_POLL_RECORDS_CONFIG >>>>> =60000 (previouslyI nteger.MAXVALUE), I could See below exception after 60 >>>>> seconds of consumer retry. >>>>> org.apache.kafka.clients.consumer.CommitFailedException: Commit >>>>> cannot be completed since the group has already rebalancedAfter >>>>> >>>>> I am not sure what could be wrong on my side and how I can resolve >>>>> this issue so that my application start processing messages consistently. >>>>> >>>>> Thanks >>>>> Dev >>>>> >>>>> On Sat, Sep 16, 2017 at 2:02 AM, Bill Bejeck <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> Could you set MAX_POLL_RECORDS to something lower like 500 or 1000 >>>>>> and try >>>>>> again? >>>>>> >>>>>> Thanks, >>>>>> Bill >>>>>> >>>>>> On Fri, Sep 15, 2017 at 3:40 PM, dev loper <[email protected]> >>>>>> wrote: >>>>>> >>>>>> > Hi Damian, >>>>>> > >>>>>> > I have repeated my tests with slight configuration change. The >>>>>> current >>>>>> > logs captured for "StreamThread" keyword has more relevant logs >>>>>> when >>>>>> > compared to logs which i shared previously. I started the >>>>>> application on >>>>>> > instances 100,101 and 102 simultaneously with below configuration >>>>>> > >>>>>> > 1) Reduced MAX_POLL_RECORDS_CONFIG to 5000 (previously 50000) >>>>>> > 2) Reduced MAX_POLL_RECORDS_CONFIG =60000 (Ipreviously >>>>>> nteger.MAXVALUE) >>>>>> > >>>>>> > When the application started all three instances started processing >>>>>> for >>>>>> > first few minutes everything went well. After that I could see that >>>>>> > "StreamThread100" error consumer was going for a toss and it started >>>>>> > closing and creating the consumers for a while exactly with the >>>>>> pattern of >>>>>> > logs I mentioned in my previous email and after some time I could >>>>>> see that >>>>>> > " StreamThread100" stopped processing messages with below exception >>>>>> and the >>>>>> > other two continued processing messages without any issues. >>>>>> > >>>>>> > org.apache.kafka.clients.consumer.CommitFailedException: Commit >>>>>> cannot be >>>>>> > completed since the group has already rebalanced and assigned the >>>>>> > partitions to another member. This means that the time between >>>>>> subsequent >>>>>> > calls to poll() was longer than the configured max.poll.interval.ms >>>>>> , >>>>>> > which typically implies that the poll loop is spending too much time >>>>>> > message processing. You can address this either by increasing the >>>>>> session >>>>>> > timeout or by reducing the maximum size of batches returned in >>>>>> poll() with >>>>>> > max.poll.records. >>>>>> > >>>>>> > I think since the consumers were starting and stopping there was no >>>>>> poll >>>>>> > made form the system . Since I reduced Reduced >>>>>> MAX_POLL_RECORDS_CONFIG >>>>>> > =60000 and the processors were getting closed and started which >>>>>> might have >>>>>> > resulted in the "CommitFailedException due to non avialability of >>>>>> > processing processors. >>>>>> > >>>>>> > After some time the issue got propagated to other servers, I have >>>>>> attached >>>>>> > the relevant logs with this mail Kindly go through this and let me >>>>>> know how >>>>>> > I can solve this issue ? >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > <https://mail.google.com/mail/?ui=2&ik=6aa5d30a60&view=att&t >>>>>> h=15e8701650e040b7&attid=0.3&disp=safe&realattid=f_j7m9sld82&zw> >>>>>> > >>>>>> > On Fri, Sep 15, 2017 at 10:33 PM, dev loper <[email protected]> >>>>>> wrote: >>>>>> > >>>>>> >> Hi Ted, >>>>>> >> >>>>>> >> What should I be looking in broker logs ? I haven't looked at the >>>>>> broker >>>>>> >> side since my spark application processing from the same topic >>>>>> with a >>>>>> >> different group id is able to process well. >>>>>> >> >>>>>> >> On Fri, Sep 15, 2017 at 3:30 PM, Ted Yu <[email protected]> >>>>>> wrote: >>>>>> >> >>>>>> >>> Is there some clue in broker logs ? >>>>>> >>> >>>>>> >>> Thanks >>>>>> >>> >>>>>> >>> On Thu, Sep 14, 2017 at 11:19 PM, dev loper <[email protected]> >>>>>> wrote: >>>>>> >>> >>>>>> >>>> Dear Kafka Users, >>>>>> >>>> >>>>>> >>>> I am fairly new to Kafka Streams . I have deployed two instances >>>>>> of >>>>>> >>>> Kafka 0.11 brokers on AWS M3.Xlarge insatnces. I have created a >>>>>> topic with >>>>>> >>>> 36 partitions .and speperate application writes to this topic >>>>>> and it >>>>>> >>>> produces records at the rate of 10000 messages per second. I >>>>>> have threes >>>>>> >>>> instances of AWS M4.xlarge instance where my Kafka streams >>>>>> application is >>>>>> >>>> running which consumes these messages produced by the other >>>>>> application. >>>>>> >>>> The application starts up fine working fine and its processing >>>>>> messages on >>>>>> >>>> the first instance, but when I start the same application on >>>>>> other >>>>>> >>>> instances it is not starting even though the process is alive it >>>>>> is not >>>>>> >>>> processing messages.Also I could see the other instances takes a >>>>>> long time >>>>>> >>>> to start . >>>>>> >>>> >>>>>> >>>> Apart from first instance, other instances I could see the >>>>>> consumer >>>>>> >>>> getting added and removed repeatedly and I couldn't see any >>>>>> message >>>>>> >>>> processing at all . I have attached the detailed logs where this >>>>>> behavior >>>>>> >>>> is observed. >>>>>> >>>> >>>>>> >>>> Consumer is getting started with below log in these instances and >>>>>> >>>> getting stopped with below log (* detailed logs attached *) >>>>>> >>>> >>>>>> >>>> INFO | 21:59:30 | consumer.ConsumerConfig >>>>>> (AbstractConfig.java:223) - >>>>>> >>>> ConsumerConfig values: >>>>>> >>>> auto.commit.interval.ms = 5000 >>>>>> >>>> auto.offset.reset = latest >>>>>> >>>> bootstrap.servers = [l-mykafkainstancekafka5101:9092, >>>>>> >>>> l-mykafkainstancekafka5102:9092] >>>>>> >>>> check.crcs = true >>>>>> >>>> client.id = >>>>>> >>>> connections.max.idle.ms = 540000 >>>>>> >>>> enable.auto.commit = false >>>>>> >>>> exclude.internal.topics = true >>>>>> >>>> fetch.max.bytes = 52428800 >>>>>> >>>> fetch.max.wait.ms = 500 >>>>>> >>>> fetch.min.bytes = 1 >>>>>> >>>> group.id = myKafka-kafkareplica101Sept08 >>>>>> >>>> heartbeat.interval.ms = 3000 >>>>>> >>>> interceptor.classes = null >>>>>> >>>> internal.leave.group.on.close = true >>>>>> >>>> isolation.level = read_uncommitted >>>>>> >>>> key.deserializer = class mx.july.jmx.proximity.kafka.Ka >>>>>> fkaKryoCodec >>>>>> >>>> max.partition.fetch.bytes = 1048576 >>>>>> >>>> max.poll.interval.ms = 300000 >>>>>> >>>> max.poll.records = 500 >>>>>> >>>> metadata.max.age.ms = 300000 >>>>>> >>>> metric.reporters = [] >>>>>> >>>> metrics.num.samples = 2 >>>>>> >>>> metrics.recording.level = INFO >>>>>> >>>> metrics.sample.window.ms = 30000 >>>>>> >>>> partition.assignment.strategy = [class >>>>>> >>>> org.apache.kafka.clients.consumer.RangeAssignor] >>>>>> >>>> receive.buffer.bytes = 65536 >>>>>> >>>> reconnect.backoff.max.ms = 1000 >>>>>> >>>> reconnect.backoff.ms = 50 >>>>>> >>>> request.timeout.ms = 305000 >>>>>> >>>> retry.backoff.ms = 100 >>>>>> >>>> sasl.jaas.config = null >>>>>> >>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit >>>>>> >>>> sasl.kerberos.min.time.before.relogin = 60000 >>>>>> >>>> sasl.kerberos.service.name = null >>>>>> >>>> sasl.kerberos.ticket.renew.jitter = 0.05 >>>>>> >>>> sasl.kerberos.ticket.renew.window.factor = 0.8 >>>>>> >>>> sasl.mechanism = GSSAPI >>>>>> >>>> security.protocol = PLAINTEXT >>>>>> >>>> send.buffer.bytes = 131072 >>>>>> >>>> session.timeout.ms = 10000 >>>>>> >>>> ssl.cipher.suites = null >>>>>> >>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >>>>>> >>>> ssl.endpoint.identification.algorithm = null >>>>>> >>>> ssl.key.password = null >>>>>> >>>> ssl.keymanager.algorithm = SunX509 >>>>>> >>>> ssl.keystore.location = null >>>>>> >>>> ssl.keystore.password = null >>>>>> >>>> ssl.keystore.type = JKS >>>>>> >>>> ssl.protocol = TLS >>>>>> >>>> ssl.provider = null >>>>>> >>>> ssl.secure.random.implementation = null >>>>>> >>>> ssl.trustmanager.algorithm = PKIX >>>>>> >>>> ssl.truststore.location = null >>>>>> >>>> ssl.truststore.password = null >>>>>> >>>> ssl.truststore.type = JKS >>>>>> >>>> value.deserializer = class my.dev.MessageUpdateCodec >>>>>> >>>> >>>>>> >>>> >>>>>> >>>> DEBUG | 21:59:30 | consumer.KafkaConsumer >>>>>> (KafkaConsumer.java:1617) - >>>>>> >>>> The Kafka consumer has closed. and the whole process repeats. >>>>>> >>>> >>>>>> >>>> >>>>>> >>>> >>>>>> >>>> Below you can find my startup code for kafkastreams and the >>>>>> parameters >>>>>> >>>> which I have configured for starting the kafkastreams >>>>>> application . >>>>>> >>>> >>>>>> >>>> private static Properties settings = new Properties(); >>>>>> >>>> settings.put(StreamsConfig.APPLICATION_ID_CONFIG, >>>>>> >>>> "mykafkastreamsapplication"); >>>>>> >>>> settings.put(ConsumerConfig.A >>>>>> UTO_OFFSET_RESET_CONFIG,"latest"); >>>>>> >>>> settings.put(ConsumerConfig.H >>>>>> EARTBEAT_INTERVAL_MS_CONFIG,"10 >>>>>> >>>> 000"); >>>>>> >>>> settings.put(ConsumerConfig.S >>>>>> ESSION_TIMEOUT_MS_CONFIG,"30000"); >>>>>> >>>> settings.put(ConsumerConfig.M >>>>>> AX_POLL_INTERVAL_MS_CONFIG,Inte >>>>>> >>>> ger.MAX_VALUE); >>>>>> >>>> settings.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, >>>>>> "10000"); >>>>>> >>>> settings.put(ConsumerConfig.C >>>>>> ONNECTIONS_MAX_IDLE_MS_CONFIG," >>>>>> >>>> 60000"); >>>>>> >>>> >>>>>> >>>> KStreamBuilder builder = new KStreamBuilder(); >>>>>> >>>> KafkaStreams streams = new KafkaStreams(builder, >>>>>> settings); >>>>>> >>>> builder.addSource(..... >>>>>> >>>> .addProcessor ............. >>>>>> >>>> .addProcessor ........ >>>>>> >>>> >>>>>> >>>> .addStateStore(............... >>>>>> ....).persistent().build(),"my >>>>>> >>>> processor") >>>>>> >>>> .addSink .............. >>>>>> >>>> . addSink .............. >>>>>> >>>> streams.start(); >>>>>> >>>> >>>>>> >>>> and I am using a Simple processor to process my logic .. >>>>>> >>>> >>>>>> >>>> public class InfoProcessor extends AbstractProcessor<Key, >>>>>> Update> { >>>>>> >>>> private static Logger logger = Logger.getLogger(InfoProcessor >>>>>> .class); >>>>>> >>>> private ProcessorContext context; >>>>>> >>>> private KeyValueStore<Key, Info> infoStore; >>>>>> >>>> >>>>>> >>>> @Override >>>>>> >>>> @SuppressWarnings("unchecked") >>>>>> >>>> public void init(ProcessorContext context) { >>>>>> >>>> this.context = context; >>>>>> >>>> this.context.schedule(Constants.BATCH_DURATION_SECONDS * >>>>>> 1000); >>>>>> >>>> infoStore = (KeyValueStore<Key, Info>) >>>>>> >>>> context.getStateStore("InfoStore"); >>>>>> >>>> } >>>>>> >>>> >>>>>> >>>> @Override >>>>>> >>>> public void process(Key key, Update update) { >>>>>> >>>> try { >>>>>> >>>> if (key != null && update != null) { >>>>>> >>>> Info info = infoStore.get(key); >>>>>> >>>> // merge logic >>>>>> >>>> infoStore.put(key, info); >>>>>> >>>> } >>>>>> >>>> >>>>>> >>>> } catch (Exception e) { >>>>>> >>>> logger.error(e.getMessage(), e); >>>>>> >>>> } finally { >>>>>> >>>> } >>>>>> >>>> context.commit(); >>>>>> >>>> } >>>>>> >>>> >>>>>> >>>> @Override >>>>>> >>>> public void punctuate(long timestamp) { >>>>>> >>>> try { >>>>>> >>>> KeyValueIterator<Key, Info> iter = this.infoStore.all(); >>>>>> >>>> while (iter.hasNext()) { >>>>>> >>>> // processing logic >>>>>> >>>> >>>>>> >>>> } >>>>>> >>>> iter.close(); >>>>>> >>>> context.commit(); >>>>>> >>>> } catch (Exception e) { >>>>>> >>>> logger.error(e.getMessage(), e); >>>>>> >>>> } >>>>>> >>>> } >>>>>> >>>> >>>>>> >>>> >>>>>> >>>> >>>>>> >>>> >>>>>> >>>> >>>>>> >>> >>>>>> >> >>>>>> > >>>>>> >>>>> >>>>> >>>> >>> >> >
