I have a feeling there has to be something going on wrong with your source. If I were you, I would probably do the following:

 - verify that I never produce an UnboundedSource split with no queue associated (Preconditions.checkState())

 - make sure that I *really never* block in call to advance(), that is not even when the queue is actually empty. You can have a look at KafkaUnboundedReader for inspiration, generally, you would probably want to spawn a new thread to feed a local BlockingQueue (or something similar) in case you cannot do purely async IO (i.e. just check if you have any data in network buffer in call to advance() and return false otherwise)

Each blocking call in advance() is potential source of distributed deadlocks which might be what you observe.

Jan

On 9/24/19 4:07 PM, Ken Barr wrote:
I might have to resort to this.  I am reading from queue in a timed manor and 
properly returning false when read timeout.  I have implemented a set of 
statistic to enumerate advance() behavior, number of times it successfully 
reads a message, fails to read a message etc.

On 2019/09/19 19:45:49, Jan Lukavský <je...@seznam.cz> wrote:
You can ssh to the dataflow worker and investigate jstack of the
harness. The principal source of blocking would be if you wait for any
data. The logic should be implemented so that if there is no data
available at the moment then just return false and don't wait for
anything. Another suggestion would be, focus on how your reader behaves
when it receives no queue. I think a proper behavior would be to return
false from each call to advance() and set watermark to
BoundedWindow.TIMESTAMP_MAX_VALUE to indicate that there will be no more
data anymore.

Jan

Reply via email to