Hi Gordon and Fabian, I just re-ran test case vs Flink 1.3.2, I could not reproduce this error, so it does appear to be new to Flink 1.4.0 if my test is good.
The difference between my local env and prod is mostly the scale, production has multi-broker Kafka cluster with durable backups, etc. Flink has Active-standby-standby Job Managers, multiple task-managers, full checkpoints to hdfs, etc; my local execution is a single Kafka instance, a single in memory StreamExecutionEnvironment. But my application code is identical. I tried comparing all of my kafka settings (max message size, etc), they seem in line aside from being single instance. I’m not trying to rule out my environment as a factor but I have tried very hard to examine it, this issue has proved very frustrating to reproduce otherwise I would have happily sent a test case or even made a pass at debugging it myself. * what exactly your deserialization schema is doing? It’s some google flatbuffers data, so it’s a byte array that gets read into a flatbuffer schema that will read at certain offsets to pull out values (ex: ID, timestamp, Array<SensorData>). I think it’s out of scope since I can see that the byte count is wrong in problem cases, which is before I get to stick it into a flatbuffer deserializer. At the same time, something does seem important about the payload. I’m loading ~2 million data points across ~30 datasets into flink. This is the only one that exhibits this problem. I spent over a day digging in to what might be different about this dataset, I can’t manage to find it. This makes me incredibly suspicious, I wouldn’t have emailed you all had I not measured the bytes in kafka vs after the flink read. Honestly this has been a very frustrating issue to dig in to. The fact that I seem to get all of my data is currently leading me to discard and ignore this error, it’s rare, flink still seems to work, but something is very hard to debug here and despite some confusing observations, most of my evidence suggests that this originates in the flink kafka consumer. If I can help more, please let me know. Thank you for your replies. -Phil From: "Tzu-Li (Gordon) Tai" <tzuli...@apache.org> Date: Tuesday, February 27, 2018 at 3:12 AM To: Fabian Hueske <fhue...@gmail.com>, Philip Doctor <philip.doc...@physiq.com> Cc: "user@flink.apache.org" <user@flink.apache.org> Subject: Re: Flink Kafka reads too many bytes .... Very rarely Hi Philip, Yes, I also have the question that Fabian mentioned. Did you start observing this only after upgrading to 1.4.0? Could you let me know what exactly your deserialization schema is doing? I don’t have any clues at the moment, but maybe there are hints there. Also, you mentioned that the issue could not be reproduced on a local setup, only in “near-production” environments. What main differences are there between the two? Cheers, Gordon On 27 February 2018 at 5:05:33 PM, Fabian Hueske (fhue...@gmail.com<mailto:fhue...@gmail.com>) wrote: Hi, Thanks for reporting the error and sharing the results of your detailed analysis! I don't have enough insight into the Kafka consumer, but noticed that you said you've been running your application for over a year and only noticed the faulty behavior recently. Flink 1.4.0 was released in mid December last year. Did you observe the bug before you migrated to 1.4.0? I'll add Gordon to the thread who is the expert about Flink's Kafka consumer. Best, Fabian 2018-02-27 0:02 GMT+01:00 Philip Doctor <philip.doc...@physiq.com<mailto:philip.doc...@physiq.com>>: Hello, I’m using Flink 1.4.0 with FlinkKafkaConsumer010 and have been for almost a year. Recently, I started getting messages of the wrong length in Flink causing my deserializer to fail. Let me share what I’ve learned: 1. All of my messages are 520 bytes exactly when my producer places them in kafka 2. About 1% of these messages have this deserialization issue in flink 3. When it happens, I read 10104 bytes in flink 4. When I write the bytes my producer creates to a file on disk (rather than kafka) my code reads 520 bytes and consumes them without issue off of disk 5. When I use kafka tool (http://www.kafkatool.com/index.html) to dump the contents of my topic to disk, and read each message one at a time off of disk, my code reads 520 bytes per message and consumes them without issue 6. When I write a simple Kafka consumer (not using flink) I read one message at a time it’s 520 bytes and my code runs without issue #5 and #6 are what lead me to believe that this issue is squarely a problem with Flink. However, it gets more complicated, I took the messages I wrote out with both my simple consumer and the kafka tool, and I load them into a local kafka server, then attach a local flink cluster and I cannot reproduce the error, yet I can reproduce it 100% of the time in something closer to my production environment. I realize this latter sounds suspicious, but I have not found anything in the Kafka docs indicating that I might have a configuration issue here, yet my simple local setup that would allow me to iterate on this and debug has failed me. I’m really quite at a loss here, I believe there’s a Flink Kafka consumer bug, it happens exceedingly rarely as I went a year without seeing it. I can reproduce it in an expensive environment but not in a “cheap” environment. Thank you for your time, I can provide my sample data set in case that helps. I dumped it on my google drive https://drive.google.com/file/d/1h8jpAFdkSolMrT8n47JJdS6x21nd_b7n/view?usp=sharing that’s the full data set, about 1% of it ends up failing, it’s really hard to figure out which message since I can’t read any of the message that I receive and I get data out of order.