/cc @Boyuan Zhang <boyu...@google.com> On Wed, Mar 17, 2021 at 3:38 AM Pradyumna Achar <pradyumna.ac...@gmail.com> wrote:
> Hello, > > I am running into a strange issue with the KafkaIO streaming source. > > The source just keeps reading records from the Kafka topics even before > the downstream DoFns in the pipeline have got a chance to process them. (It > keeps reading new data continuously, quickly resulting in a heap > out-of-memory, regardless of how large the heap is) > > I tried to read through the code, and understood that the KafkaIO > unbounded reader reads from Kafka when its "advance()" method gets called > (with some optimization to pre-read a little bit in a separate thread). I > couldn't figure out, though as to who calls this "advance" method and why > is that caller calling it even though the previously read data isn't yet > processed. > > In my pipeline, I have these: > 1. A simple DoFn that does an outputWithTimestamp to assign timestamps to > records > 2. Windowing into FixedWindows > 3. A Stateful DoFn that collects incoming records into a BagState. This > does some processing with the records in the BagState and outputs the > records after a certain condition is met. > 4. Another windowing function for the output from the stateful DoFn > 5. FileIO to write to a remote destination. > > I suspect that somewhere in this, probably in that stateful DoFn, Beam > might be advancing the watermark as soon as it has handed over the record > to DoFn that puts it into the BagState. Or, maybe it does not wait for > FileIO to write to the remote destination and advances the watermark. > > How do I debug this; or is there a way to inform Beam that I've not yet > fully processed a particular record? > > Thank you. >