This may only be tangentially related. We identified and fixed a producer deadlock bug in Kafka producer in Samza 0.12 - https://issues.apache.org/jira/browse/SAMZA-1069 .
On Tue, Apr 11, 2017 at 8:55 AM, Jagadish Venkatraman < jagadish1...@gmail.com> wrote: > - Are all tasks in the container not receiving messages? > - The errors that you pointed out are on the BrokerProxy threads. Usually, > they are re-created upon failures. > - I'm wondering what the main thread is doing? > - Is the call from the main-thread to send hanging? If so, do you have a > log? (The previous log files you shared had some Exceptions. It'd be > helpful to have a timeline) > - Do you catch exceptions and move on? > > Thanks, > Jagadish > > > > On Tue, Apr 11, 2017 at 7:14 AM, Aleksandar Bircakovic < > a.bircako...@levi9.com> wrote: > >> Hello again, >> just a quick update regarding this message loss issue. >> Our further investigation proved that Samza job was unable to read 10 out >> of 80 partitions from input topic. Tasks that were consuming those >> partitions were stuck. All those tasks were running on the same container. >> When trying to fetch messages with the offset obtained from checkpoint >> topic it would try to reset offsets and fail throwing an unknown exception >> (BrokerProxy [WARN] Got non-recoverable error codes during multifetch. >> Throwing an exception to trigger reconnect. Errors: >> Error([topic_name,47],-1,kafka.common.UnknownException) when it would >> enter an infinite loop. >> The solution was to temporary disable checkpointing for that topic and >> after that those tasks were able to consume their partitions. In a >> meanwhile, console consumers/producer and standalone apps were able to read >> from those partitions. >> We are currently trying to reproduce this without luck, do you perhaps >> have a scenario in mind that could cause such behavior? >> >> Thank you, >> Aleksandar Bircakovic >> >> -----Original Message----- >> From: Aleksandar Bircakovic [mailto:a.bircako...@levi9.com] >> Sent: Wednesday, April 5, 2017 5:26 PM >> To: dev@samza.apache.org >> Cc: m.mis...@improvedigital.com >> Subject: RE: Messages lost after broker failure >> >> Thank you Jagadish. >> >> Regarding leader selection, we exposed some additional metrics that are >> supposed to tell us which broker is leader for each partition and we >> confirmed your theory, Samza consumer if fully aware who is the current >> leader. >> >> Entire logs are pretty big so here are some parts instead. File >> container_logs.txt contains logs from containers. Here we can see >> kafka.common.UnknownException. Kafka_logs.txt is log from one of the >> brokers where we have 'Error processing fetch operation on partition' >> during the whole day. It seems like that issue is fixed in scope of this >> Kafka ticket (https://issues.apache.org/jira/browse/KAFKA-4576). For >> those offsets that appear in Kafka log we have Samza system logs with INFO >> severity saying that Samza is constantly validating those offsets. Samza >> system logs are in file samza_system_logs.txt. If you need anything else >> please say so. >> >> Container log: https://drive.google.com/open? >> id=0B9_OEEXuRc2aQVRFTFpGb3BKcXM >> Kafka log: https://drive.google.com/open?id=0B9_OEEXuRc2aWXB4NTRkUXBxcGs >> Samza log as JSON: https://drive.google.com/open? >> id=0B9_OEEXuRc2acEZ4VlVWQkQ1aTA >> >> While analyzing issues reported on Kafka board we found out that older >> versions of Kafka had some edge cases where broker failure recovery didn't >> work as expected, should we switch to new Kafka consumer since we are using >> Kafka 0.10.1 but our Samza jobs are using Kafka consumer 0.8.2? >> >> Also, since some messages are appearing after even few days, is it >> possible that producer gets stuck and sends some messages with really big >> latency after many unsuccessful retries? >> >> Many thanks, >> Aleksandar Bircakovic >> >> -----Original Message----- >> From: Jagadish Venkatraman [mailto:jagadish1...@gmail.com] >> Sent: Tuesday, April 4, 2017 6:31 PM >> To: dev@samza.apache.org >> Cc: m.mis...@improvedigital.com >> Subject: Re: Messages lost after broker failure >> >> >> All this is leading us to conclusion that Samza's consumers are >> >> somehow >> not aware of all of the partitions >> >> We have had a number of broker failures at LinkedIn, and have not run >> into data loss issues due to consumers being unaware of partitions. You can >> use the metrics emitted at a per-partition level (like messages read, >> offset lags etc.) to validate this theory. >> >> >> >> BrokerProxy [WARN] It appears that we received an invalid or empty >> offset Some(366399914) for [topic_name,60]. Attempting to use Kafka's >> auto.offset.reset setting. >> >> Usually, attempting to fetch from an invalid offset will reset the >> consumer to the upcoming offset. This will cause data-loss since you will >> only process new messages. It will be interesting to find out what caused >> the consumer to receive an invalid offset / why the received offset was >> invalid. Also, the entire log will be helpful (assuming there's no >> sensitive information that must be redacted). >> >> >> >> >> On Tue, Apr 4, 2017 at 1:12 AM, Aleksandar Bircakovic < >> a.bircako...@levi9.com> wrote: >> >> > Hi everyone, >> > my team is building real-time system using Samza (version 0.11.0) and >> > we are facing some issues with data loss so we would like to hear your >> > thoughts. >> > >> > Due to using some additional tools for monitoring and alerting we >> > exceeded number of allowed open files so TooManyOpenFiles exception >> > caused our brokers to fail. >> > After fixing this issue failed brokers and all Samza jobs were >> restarted. >> > Issue was gone but it seems like we are constantly losing almost half >> > or the messages from some of our topics after this incident. >> > To keep things as simple as possible I will focus just on a small part >> > of the pipeline. On the picture below we can see two topics, both >> > with 80 partitions, that are input and output for one of our Samza >> > jobs. Number of messages in those topics should be the same but we see >> > that output topic has almost two times less messages than the input >> > one. There is no some kind of bottleneck so messages are not kept in >> > Kafka for too long and they are not deleted by log retention before >> processing. >> > >> > http://ibb.co/iJrDbF >> > >> > Another strange thing is that some old messages are appearing after >> > day or two. All this is leading us to conclusion that Samza's >> > consumers are somehow not aware of all of the partitions. Is it >> > possible that consumers are not aware of new partition leaders, since >> > new leader selection occurred after broker failures, and somehow they >> > are trying to get data from the old ones that are not the leaders >> > anymore and have a lower offsets meaning that a new messages are >> > skipped. Is there some kind of topic metadata caching that could lead >> > us to this situation? While debugging we discovered >> > KafkaSystemConsumer exception that says no leader for partition. >> Looking at the Kafka Manager all partitions have their leaders. >> > >> > >> > Here are some additional details that might be useful. >> > >> > Our Samza jobs are built on top of Samza v 0.11.0. >> > Kafka 0.8.2.1 consumers/producers are used in jobs. >> > >> > Kafka cluster: >> > - 8 brokers >> > - Kafka version 0.10.1 >> > - unclean.leader.election.enable false >> > - replica.lag.time.max.ms 10000 >> > - log.flush.interval.ms 7200000 >> > >> > Input topic: >> > - segment.bytes 2147483647 >> > - retention.ms 172800000 >> > >> > Some warnings from logs: >> > Error processing fetch operation on partition [topic_name,35], offset >> > 232841013 (kafka.server.ReplicaManager) >> > java.lang.IllegalStateException: Failed to read complete buffer for >> > targetOffset 241769924 startPosition 2147479938 in >> > >> > BrokerProxy [WARN] Got non-recoverable error codes during multifetch. >> > Throwing an exception to trigger reconnect. Errors: >> > Error([topic_name,47],-1,kafka.common.UnknownException >> > >> > BrokerProxy [WARN] It appears that we received an invalid or empty >> > offset >> > Some(366399914) for [topic_name,60]. Attempting to use Kafka's >> > auto.offset.reset setting. This can result in data loss if processing >> > continues. >> > >> > Any help and suggestion will be appreciated. >> > >> > Thanks, >> > Aleksandar Bircakovic >> > >> >> >> >> -- >> Jagadish V, >> Graduate Student, >> Department of Computer Science, >> Stanford University >> > > > > -- > Jagadish V, > Graduate Student, > Department of Computer Science, > Stanford University > -- Jagadish V, Graduate Student, Department of Computer Science, Stanford University