I already poll a local queue in advance() and respond false if queue empty. Is there an example of restricting UnboundedSource splits?
On 2019/09/25 08:13:43, Jan Lukavský <je...@seznam.cz> wrote: > I have a feeling there has to be something going on wrong with your > source. If I were you, I would probably do the following: > > - verify that I never produce an UnboundedSource split with no queue > associated (Preconditions.checkState()) > > - make sure that I *really never* block in call to advance(), that is > not even when the queue is actually empty. You can have a look at > KafkaUnboundedReader for inspiration, generally, you would probably want > to spawn a new thread to feed a local BlockingQueue (or something > similar) in case you cannot do purely async IO (i.e. just check if you > have any data in network buffer in call to advance() and return false > otherwise) > > Each blocking call in advance() is potential source of distributed > deadlocks which might be what you observe. > > Jan > > On 9/24/19 4:07 PM, Ken Barr wrote: > > I might have to resort to this. I am reading from queue in a timed manor > > and properly returning false when read timeout. I have implemented a set > > of statistic to enumerate advance() behavior, number of times it > > successfully reads a message, fails to read a message etc. > > > > On 2019/09/19 19:45:49, Jan Lukavský <je...@seznam.cz> wrote: > >> You can ssh to the dataflow worker and investigate jstack of the > >> harness. The principal source of blocking would be if you wait for any > >> data. The logic should be implemented so that if there is no data > >> available at the moment then just return false and don't wait for > >> anything. Another suggestion would be, focus on how your reader behaves > >> when it receives no queue. I think a proper behavior would be to return > >> false from each call to advance() and set watermark to > >> BoundedWindow.TIMESTAMP_MAX_VALUE to indicate that there will be no more > >> data anymore. > >> > >> Jan > >> >