I have a custom UnboundedSource IO that reads from a series of messaging queues. I have implemented this such that the IO takes a list of queues and expands a UnboundedSource/UnboundedReader for each queue.
If I use autoscaling with maxNumWorkers <= # number of queues everything works well. For example if I have 4 queues and run in dataflow; the Dataflow process starts with 1 worker with 4 Readers each consuming from a queue. As CPU usage and backlog grow, Dataflow spawns more workers and moves the Readers to the new workers. As CPU usage and backlog shrinks, the Readers are moved back to the original worker and unused workers are deleted. This is exactly what I was hoping for. Problems happen if I set maxNumWorkers greater then number of queues. As scaleup goes past the number of queues, not only are Readers moved, but for some reason new Readers are created. This should not be too bad, new Readers would just not receive messages as the original Reader is holding exclusive access to ensure in-order delivery. The real problem is that the original Readers are holding the queue and their advance() method is not being called. The new Readers advance() method is being called, but they are not active on the queue, hence the system is now deadlocked. Questions are: Why are new Readers being spawned if maxNumWorkers exceeds original number of Readers? Is there a way of preventing this as I would like to maintain in-order delivery? Why is the original Readers advance() method no longer being called? This is causing a deadlock.