I believe I have proven that the UnboundedReader.advance() methods is never being called in this case. Each time I enter the advance() method I spawn a thread that loops for up to 60 seconds and throws a runtime exception if it has not been stopped before time expires.
I have proved this works and that I can see the runtime exception. When I run in the situation where deadlock occurs I never see the runtime exception, ie. The Reader that is holding exclusivity never has it's advance() method called after scaleup. On 2019/09/19 17:51:35, Ken Barr <ken.b...@solace.com> wrote: > The Start() seems to be working so focus on advance(). Is there any way to > prove if I am blocked in advance() for dataflow runner? I have been through > code and cannot see anything. But I know that does not mean much. > > Ken > On 2019/09/19 14:57:09, Jan Lukavský <je...@seznam.cz> wrote: > > Hi Ken, > > > > I have seen some deadlock behavior with custom sources earlier > > (different runner, but that might not be important). Some lessons learned: > > > > a) please make sure your advance() or start() methods do not block, > > that will cause issues and possibly deadlocks you describe > > > > b) if you want to limit parallelism, that should be possible in the > > split() method - you can return collection containing only (this) if > > there is no more readers > > > > Hope this helps, please feel free to ask more details if needed. > > > > Best, > > > > Jan > > > > On 9/19/19 4:47 PM, Ken Barr wrote: > > > I have a custom UnboundedSource IO that reads from a series of messaging > > > queues. I have implemented this such that the IO takes a list of queues > > > and expands a UnboundedSource/UnboundedReader for each queue. > > > > > > If I use autoscaling with maxNumWorkers <= # number of queues everything > > > works well. For example if I have 4 queues and run in dataflow; the > > > Dataflow process starts with 1 worker with 4 Readers each consuming from > > > a queue. As CPU usage and backlog grow, Dataflow spawns more workers > > > and moves the Readers to the new workers. As CPU usage and backlog > > > shrinks, the Readers are moved back to the original worker and unused > > > workers are deleted. This is exactly what I was hoping for. > > > > > > Problems happen if I set maxNumWorkers greater then number of queues. > > > As scaleup goes past the number of queues, not only are Readers moved, > > > but for some reason new Readers are created. This should not be too bad, > > > new Readers would just not receive messages as the original Reader is > > > holding exclusive access to ensure in-order delivery. The real problem > > > is that the original Readers are holding the queue and their advance() > > > method is not being called. The new Readers advance() method is being > > > called, but they are not active on the queue, hence the system is now > > > deadlocked. > > > > > > Questions are: > > > Why are new Readers being spawned if maxNumWorkers exceeds original > > > number of Readers? Is there a way of preventing this as I would like to > > > maintain in-order delivery? > > > > > > Why is the original Readers advance() method no longer being called? > > > This is causing a deadlock. > > >