Thank you Jagadish.

Regarding leader selection, we exposed some additional metrics that are 
supposed to tell us which broker is leader for each partition and we confirmed 
your theory, Samza consumer if fully aware who is the current leader.

Entire logs are pretty big so here are some parts instead. File 
container_logs.txt contains logs from containers. Here we can see 
kafka.common.UnknownException. Kafka_logs.txt is log from one of the brokers 
where we have 'Error processing fetch operation on partition' during the whole 
day. It seems like that issue is fixed in scope of this Kafka ticket 
(https://issues.apache.org/jira/browse/KAFKA-4576). For those offsets that 
appear in Kafka log we have Samza system logs with INFO severity saying that 
Samza is constantly validating those offsets. Samza system logs are in file 
samza_system_logs.txt. If you need anything else please say so.

Container log: https://drive.google.com/open?id=0B9_OEEXuRc2aQVRFTFpGb3BKcXM
Kafka log: https://drive.google.com/open?id=0B9_OEEXuRc2aWXB4NTRkUXBxcGs
Samza log as JSON: https://drive.google.com/open?id=0B9_OEEXuRc2acEZ4VlVWQkQ1aTA

While analyzing issues reported on Kafka board we found out that older versions 
of Kafka had some edge cases where broker failure recovery didn't work as 
expected, should we switch to new Kafka consumer since we are using Kafka 
0.10.1 but our Samza jobs are using Kafka consumer 0.8.2?  

Also, since some messages are appearing after even few days, is it possible 
that producer gets stuck and sends some messages with really big latency after 
many unsuccessful retries?

Many thanks,
Aleksandar Bircakovic

-----Original Message-----
From: Jagadish Venkatraman [mailto:jagadish1...@gmail.com] 
Sent: Tuesday, April 4, 2017 6:31 PM
To: dev@samza.apache.org
Cc: m.mis...@improvedigital.com
Subject: Re: Messages lost after broker failure

>> All this is leading us to conclusion that Samza's consumers are 
>> somehow
not aware of all of the partitions

We have had a number of broker failures at LinkedIn, and have not run into data 
loss issues due to consumers being unaware of partitions. You can use the 
metrics emitted at a per-partition level (like messages read, offset lags etc.) 
to validate this theory.


>> BrokerProxy [WARN] It appears that we received an invalid or empty
offset Some(366399914) for [topic_name,60]. Attempting to use Kafka's 
auto.offset.reset setting.

Usually, attempting to fetch from an invalid offset will reset the consumer to 
the upcoming offset. This will cause data-loss since you will only process new 
messages. It will be interesting to find out what caused the consumer to 
receive an invalid offset / why the received offset was invalid. Also, the 
entire log will be helpful (assuming there's no sensitive information that must 
be redacted).




On Tue, Apr 4, 2017 at 1:12 AM, Aleksandar Bircakovic < a.bircako...@levi9.com> 
wrote:

> Hi everyone,
> my team is building real-time system using Samza (version 0.11.0) and 
> we are facing some issues with data loss so we would like to hear your 
> thoughts.
>
> Due to using some additional tools for monitoring and alerting we 
> exceeded number of allowed open files so TooManyOpenFiles exception 
> caused our brokers to fail.
> After fixing this issue failed brokers and all Samza jobs were restarted.
> Issue was gone but it seems like we are constantly losing almost half 
> or the messages from some of our topics after this incident.
> To keep things as simple as possible I will focus just on a small part 
> of the pipeline. On the picture below  we can see two topics, both 
> with 80 partitions, that are input and output for one of our Samza 
> jobs. Number of messages in those topics should be the same but we see 
> that output topic has almost two times less messages than the input 
> one. There is no some kind of bottleneck so messages are not kept in 
> Kafka for too long and they are not deleted by log retention before 
> processing.
>
> http://ibb.co/iJrDbF
>
> Another strange thing is that some old messages are appearing after 
> day or two. All this is leading us to conclusion that Samza's 
> consumers are somehow not aware of all of the partitions. Is it 
> possible that consumers are not aware of new partition leaders, since 
> new leader selection occurred after broker failures, and somehow they 
> are trying to get data from the old ones that are not the leaders 
> anymore and have a lower offsets meaning that a new messages are 
> skipped. Is there some kind of topic metadata caching that could lead 
> us to this situation? While debugging we discovered 
> KafkaSystemConsumer exception that says no leader for partition. Looking at 
> the Kafka Manager all partitions have their leaders.
>
>
> Here are some additional details that might be useful.
>
> Our Samza jobs are built on top of Samza v 0.11.0.
> Kafka 0.8.2.1 consumers/producers are used in jobs.
>
> Kafka cluster:
> - 8 brokers
> - Kafka version 0.10.1
> - unclean.leader.election.enable false
> - replica.lag.time.max.ms 10000
> - log.flush.interval.ms 7200000
>
> Input topic:
> - segment.bytes 2147483647
> - retention.ms 172800000
>
> Some warnings from logs:
> Error processing fetch operation on partition [topic_name,35], offset
> 232841013 (kafka.server.ReplicaManager)
> java.lang.IllegalStateException: Failed to read complete buffer for 
> targetOffset 241769924 startPosition 2147479938 in
>
> BrokerProxy [WARN] Got non-recoverable error codes during multifetch.
> Throwing an exception to trigger reconnect. Errors:
> Error([topic_name,47],-1,kafka.common.UnknownException
>
> BrokerProxy [WARN] It appears that we received an invalid or empty 
> offset
> Some(366399914) for [topic_name,60]. Attempting to use Kafka's 
> auto.offset.reset setting. This can result in data loss if processing 
> continues.
>
> Any help and suggestion will be appreciated.
>
> Thanks,
> Aleksandar Bircakovic
>



--
Jagadish V,
Graduate Student,
Department of Computer Science,
Stanford University

Reply via email to