Hello again, just a quick update regarding this message loss issue. Our further investigation proved that Samza job was unable to read 10 out of 80 partitions from input topic. Tasks that were consuming those partitions were stuck. All those tasks were running on the same container. When trying to fetch messages with the offset obtained from checkpoint topic it would try to reset offsets and fail throwing an unknown exception (BrokerProxy [WARN] Got non-recoverable error codes during multifetch. Throwing an exception to trigger reconnect. Errors: Error([topic_name,47],-1,kafka.common.UnknownException) when it would enter an infinite loop. The solution was to temporary disable checkpointing for that topic and after that those tasks were able to consume their partitions. In a meanwhile, console consumers/producer and standalone apps were able to read from those partitions. We are currently trying to reproduce this without luck, do you perhaps have a scenario in mind that could cause such behavior?
Thank you, Aleksandar Bircakovic -----Original Message----- From: Aleksandar Bircakovic [mailto:a.bircako...@levi9.com] Sent: Wednesday, April 5, 2017 5:26 PM To: dev@samza.apache.org Cc: m.mis...@improvedigital.com Subject: RE: Messages lost after broker failure Thank you Jagadish. Regarding leader selection, we exposed some additional metrics that are supposed to tell us which broker is leader for each partition and we confirmed your theory, Samza consumer if fully aware who is the current leader. Entire logs are pretty big so here are some parts instead. File container_logs.txt contains logs from containers. Here we can see kafka.common.UnknownException. Kafka_logs.txt is log from one of the brokers where we have 'Error processing fetch operation on partition' during the whole day. It seems like that issue is fixed in scope of this Kafka ticket (https://issues.apache.org/jira/browse/KAFKA-4576). For those offsets that appear in Kafka log we have Samza system logs with INFO severity saying that Samza is constantly validating those offsets. Samza system logs are in file samza_system_logs.txt. If you need anything else please say so. Container log: https://drive.google.com/open?id=0B9_OEEXuRc2aQVRFTFpGb3BKcXM Kafka log: https://drive.google.com/open?id=0B9_OEEXuRc2aWXB4NTRkUXBxcGs Samza log as JSON: https://drive.google.com/open?id=0B9_OEEXuRc2acEZ4VlVWQkQ1aTA While analyzing issues reported on Kafka board we found out that older versions of Kafka had some edge cases where broker failure recovery didn't work as expected, should we switch to new Kafka consumer since we are using Kafka 0.10.1 but our Samza jobs are using Kafka consumer 0.8.2? Also, since some messages are appearing after even few days, is it possible that producer gets stuck and sends some messages with really big latency after many unsuccessful retries? Many thanks, Aleksandar Bircakovic -----Original Message----- From: Jagadish Venkatraman [mailto:jagadish1...@gmail.com] Sent: Tuesday, April 4, 2017 6:31 PM To: dev@samza.apache.org Cc: m.mis...@improvedigital.com Subject: Re: Messages lost after broker failure >> All this is leading us to conclusion that Samza's consumers are >> somehow not aware of all of the partitions We have had a number of broker failures at LinkedIn, and have not run into data loss issues due to consumers being unaware of partitions. You can use the metrics emitted at a per-partition level (like messages read, offset lags etc.) to validate this theory. >> BrokerProxy [WARN] It appears that we received an invalid or empty offset Some(366399914) for [topic_name,60]. Attempting to use Kafka's auto.offset.reset setting. Usually, attempting to fetch from an invalid offset will reset the consumer to the upcoming offset. This will cause data-loss since you will only process new messages. It will be interesting to find out what caused the consumer to receive an invalid offset / why the received offset was invalid. Also, the entire log will be helpful (assuming there's no sensitive information that must be redacted). On Tue, Apr 4, 2017 at 1:12 AM, Aleksandar Bircakovic < a.bircako...@levi9.com> wrote: > Hi everyone, > my team is building real-time system using Samza (version 0.11.0) and > we are facing some issues with data loss so we would like to hear your > thoughts. > > Due to using some additional tools for monitoring and alerting we > exceeded number of allowed open files so TooManyOpenFiles exception > caused our brokers to fail. > After fixing this issue failed brokers and all Samza jobs were restarted. > Issue was gone but it seems like we are constantly losing almost half > or the messages from some of our topics after this incident. > To keep things as simple as possible I will focus just on a small part > of the pipeline. On the picture below we can see two topics, both > with 80 partitions, that are input and output for one of our Samza > jobs. Number of messages in those topics should be the same but we see > that output topic has almost two times less messages than the input > one. There is no some kind of bottleneck so messages are not kept in > Kafka for too long and they are not deleted by log retention before > processing. > > http://ibb.co/iJrDbF > > Another strange thing is that some old messages are appearing after > day or two. All this is leading us to conclusion that Samza's > consumers are somehow not aware of all of the partitions. Is it > possible that consumers are not aware of new partition leaders, since > new leader selection occurred after broker failures, and somehow they > are trying to get data from the old ones that are not the leaders > anymore and have a lower offsets meaning that a new messages are > skipped. Is there some kind of topic metadata caching that could lead > us to this situation? While debugging we discovered > KafkaSystemConsumer exception that says no leader for partition. Looking at > the Kafka Manager all partitions have their leaders. > > > Here are some additional details that might be useful. > > Our Samza jobs are built on top of Samza v 0.11.0. > Kafka 0.8.2.1 consumers/producers are used in jobs. > > Kafka cluster: > - 8 brokers > - Kafka version 0.10.1 > - unclean.leader.election.enable false > - replica.lag.time.max.ms 10000 > - log.flush.interval.ms 7200000 > > Input topic: > - segment.bytes 2147483647 > - retention.ms 172800000 > > Some warnings from logs: > Error processing fetch operation on partition [topic_name,35], offset > 232841013 (kafka.server.ReplicaManager) > java.lang.IllegalStateException: Failed to read complete buffer for > targetOffset 241769924 startPosition 2147479938 in > > BrokerProxy [WARN] Got non-recoverable error codes during multifetch. > Throwing an exception to trigger reconnect. Errors: > Error([topic_name,47],-1,kafka.common.UnknownException > > BrokerProxy [WARN] It appears that we received an invalid or empty > offset > Some(366399914) for [topic_name,60]. Attempting to use Kafka's > auto.offset.reset setting. This can result in data loss if processing > continues. > > Any help and suggestion will be appreciated. > > Thanks, > Aleksandar Bircakovic > -- Jagadish V, Graduate Student, Department of Computer Science, Stanford University