Hey, We've been implementing a simple Storm-like fault tolerance system with persisting source records and keeping track of all the records (whether they've been processed), and replaying them if they fail. The straightforward way to do this was creating a special AbstractJobVertex (and AbstractInvokable) that deals with replaying and keeps track of the records.
So there is a so called FTLayerVertex that needs a two-way communication with the tasks: 1. it needs to be notified about a processed record or a newly created record 2. it needs to resend failed records to the appropriate tasks We've decided to use events for 1. and regular message sending (i.e. BufferReader/Writer) for 2. So we need to send events backward on a BufferReader. The problem is that backward events are only sent and received when the channel is initialized (in case of one channel for now), there's a living connection (i.e. it's not UNKNOWN, please correct me if I'm wrong). But sometimes the connection is not yet initialized when we're sending these events from the tasks to the FTLayerVertex. We've tried a workaround: sending an "initializer" forward event from FT to all the tasks, and waiting for this event at the beginning of every task's invoke, thus the backward events will only be sent when the channel is surely up. That did not work out, because in order to receive an event, we need to read that input, and we only want to read and process that input after we made sure we can use the channel. Another workaround were sending an "initializer" record instead of an event. That did not work out either, because we wanted to read that input (from the FTLayerVertex) as a regular input, so the given BufferReader were also passed to a UnionBufferReader, and reading the given BufferReader separately (wrapped in a RecordReader) resulted in the UnionBufferReader getting stuck. (It seemed as the UnionBufferReader put the given BufferReader in it's queue because of this "initializer" message, but the message had been already read from the BufferReader separately, so it got stuck waiting for a new record from the BufferReader). These workarounds don't seem natural either, because we're pulling this "synchronization" of connection initialization a layer up. It would be nice if we had some kind of events that are made sure to get to the destination. Is that possible? Would that be useful for other purposes too? Does anyone know some workaround that might work? Are we missing something? (Please correct me if I see something wrong.) Should we use something else instead of events? (I think we cannot avoid this two-way communication between vertices.) All suggestions are welcome! Best regards, Gabor