Re: # of Readers < MaxNumWorkers for UnboundedSource. Causing deadlocks

2019-09-26 Thread Jan Lukavský
You are not obliged to generate exactly the numSplits in return to split(). If you have fewer queues (partitions), you can just return fewer number of splits (and therefore make sure that all your splits have associated queue). On the other hand, if your source returns watermark of BoundedWindo

Re: # of Readers < MaxNumWorkers for UnboundedSource. Causing deadlocks

2019-09-25 Thread Ken Barr
I already poll a local queue in advance() and respond false if queue empty. Is there an example of restricting UnboundedSource splits? On 2019/09/25 08:13:43, Jan Lukavský wrote: > I have a feeling there has to be something going on wrong with your > source. If I were you, I would probably do

Re: # of Readers < MaxNumWorkers for UnboundedSource. Causing deadlocks

2019-09-25 Thread Jan Lukavský
I have a feeling there has to be something going on wrong with your source. If I were you, I would probably do the following:  - verify that I never produce an UnboundedSource split with no queue associated (Preconditions.checkState())  - make sure that I *really never* block in call to advan

Re: # of Readers < MaxNumWorkers for UnboundedSource. Causing deadlocks

2019-09-24 Thread Ken Barr
I might have to resort to this. I am reading from queue in a timed manor and properly returning false when read timeout. I have implemented a set of statistic to enumerate advance() behavior, number of times it successfully reads a message, fails to read a message etc. On 2019/09/19 19:45:49,

Re: # of Readers < MaxNumWorkers for UnboundedSource. Causing deadlocks

2019-09-24 Thread Ken Barr
I believe I have proven that the UnboundedReader.advance() methods is never being called in this case. Each time I enter the advance() method I spawn a thread that loops for up to 60 seconds and throws a runtime exception if it has not been stopped before time expires. I have proved this works

Re: # of Readers < MaxNumWorkers for UnboundedSource. Causing deadlocks

2019-09-20 Thread Jan Lukavský
Yes, that should be fine. But what do you mean by re-entrant in this context? All accesses to reader should be single-threaded. On 9/20/19 6:11 PM, Ken Barr wrote: Is the IO SDK re-entrant? Is it safe to call advance() from within start()? On 2019/09/19 14:57:09, Jan Lukavský wrote: Hi Ken,

Re: # of Readers < MaxNumWorkers for UnboundedSource. Causing deadlocks

2019-09-20 Thread Ken Barr
Is the IO SDK re-entrant? Is it safe to call advance() from within start()? On 2019/09/19 14:57:09, Jan Lukavský wrote: > Hi Ken, > > I have seen some deadlock behavior with custom sources earlier > (different runner, but that might not be important). Some lessons learned: > >  a) please ma

Re: # of Readers < MaxNumWorkers for UnboundedSource. Causing deadlocks

2019-09-19 Thread Jan Lukavský
You can ssh to the dataflow worker and investigate jstack of the harness. The principal source of blocking would be if you wait for any data. The logic should be implemented so that if there is no data available at the moment then just return false and don't wait for anything. Another suggestio

Re: # of Readers < MaxNumWorkers for UnboundedSource. Causing deadlocks

2019-09-19 Thread Ken Barr
The Start() seems to be working so focus on advance(). Is there any way to prove if I am blocked in advance() for dataflow runner? I have been through code and cannot see anything. But I know that does not mean much. Ken On 2019/09/19 14:57:09, Jan Lukavský wrote: > Hi Ken, > > I have seen

Re: # of Readers < MaxNumWorkers for UnboundedSource. Causing deadlocks

2019-09-19 Thread Jan Lukavský
Hi Ken, I have seen some deadlock behavior with custom sources earlier (different runner, but that might not be important). Some lessons learned:  a) please make sure your advance() or start() methods do not block, that will cause issues and possibly deadlocks you describe  b) if you want t

# of Readers < MaxNumWorkers for UnboundedSource. Causing deadlocks

2019-09-19 Thread Ken Barr
I have a custom UnboundedSource IO that reads from a series of messaging queues. I have implemented this such that the IO takes a list of queues and expands a UnboundedSource/UnboundedReader for each queue. If I use autoscaling with maxNumWorkers <= # number of queues everything works well.