You are not obliged to generate exactly the numSplits in return to
split(). If you have fewer queues (partitions), you can just return
fewer number of splits (and therefore make sure that all your splits
have associated queue). On the other hand, if your source returns
watermark of BoundedWindo
I already poll a local queue in advance() and respond false if queue empty.
Is there an example of restricting UnboundedSource splits?
On 2019/09/25 08:13:43, Jan Lukavský wrote:
> I have a feeling there has to be something going on wrong with your
> source. If I were you, I would probably do
I have a feeling there has to be something going on wrong with your
source. If I were you, I would probably do the following:
- verify that I never produce an UnboundedSource split with no queue
associated (Preconditions.checkState())
- make sure that I *really never* block in call to advan
I might have to resort to this. I am reading from queue in a timed manor and
properly returning false when read timeout. I have implemented a set of
statistic to enumerate advance() behavior, number of times it successfully
reads a message, fails to read a message etc.
On 2019/09/19 19:45:49,
I believe I have proven that the UnboundedReader.advance() methods is never
being called in this case. Each time I enter the advance() method I spawn a
thread that loops for up to 60 seconds and throws a runtime exception if it has
not been stopped before time expires.
I have proved this works
Yes, that should be fine. But what do you mean by re-entrant in this
context? All accesses to reader should be single-threaded.
On 9/20/19 6:11 PM, Ken Barr wrote:
Is the IO SDK re-entrant? Is it safe to call advance() from within start()?
On 2019/09/19 14:57:09, Jan Lukavský wrote:
Hi Ken,
Is the IO SDK re-entrant? Is it safe to call advance() from within start()?
On 2019/09/19 14:57:09, Jan Lukavský wrote:
> Hi Ken,
>
> I have seen some deadlock behavior with custom sources earlier
> (different runner, but that might not be important). Some lessons learned:
>
> a) please ma
You can ssh to the dataflow worker and investigate jstack of the
harness. The principal source of blocking would be if you wait for any
data. The logic should be implemented so that if there is no data
available at the moment then just return false and don't wait for
anything. Another suggestio
The Start() seems to be working so focus on advance(). Is there any way to
prove if I am blocked in advance() for dataflow runner? I have been through
code and cannot see anything. But I know that does not mean much.
Ken
On 2019/09/19 14:57:09, Jan Lukavský wrote:
> Hi Ken,
>
> I have seen
Hi Ken,
I have seen some deadlock behavior with custom sources earlier
(different runner, but that might not be important). Some lessons learned:
a) please make sure your advance() or start() methods do not block,
that will cause issues and possibly deadlocks you describe
b) if you want t
I have a custom UnboundedSource IO that reads from a series of messaging
queues. I have implemented this such that the IO takes a list of queues and
expands a UnboundedSource/UnboundedReader for each queue.
If I use autoscaling with maxNumWorkers <= # number of queues everything works
well.
11 matches
Mail list logo