Hi

Good point with select/epoll, however I do not see how they couldn’t be with 
Flink if we would like single task in Flink to be single-threaded (and I 
believe we should pursue this goal). If your connector blocks on `select`, then 
it can not process/handle control messages from Flink, like checkpoints, 
releasing resources and potentially output flushes. This would require tight 
integration between connector and Flink’s main event loop/selects/etc.

Looking at it from other perspective. Let’s assume that we have a connector 
implemented on top of `select`/`epoll`. In order to integrate it with Flink’s 
checkpointing/flushes/resource releasing it will have to be executed in 
separate thread one way or another. At least if our API will enforce/encourage 
non blocking implementations with some kind of notifications (`isBlocked()` or 
`notify()` callback), some connectors might skip one layer of wapping threads.

Btw, I don’t know if you understand my point. Having only `poll()` and `take()` 
is not enough for single threaded task. If our source interface doesn’t provide 
`notify()` callback nor `CompletableFuture<?> isBlocked(),`, there is no way to 
implement single threaded task that both reads the data from the source 
connector and can also react to system events. Ok, non blocking `poll()` would 
allow that, but with busy looping.

Piotrek

> On 8 Nov 2018, at 06:56, Becket Qin <becket....@gmail.com> wrote:
> 
> Hi Piotrek,
> 
>> But I don’t see a reason why we should expose both blocking `take()` and
> non-blocking `poll()` methods to the Flink engine. Someone (Flink engine or
> connector) would have to do the same busy
>> looping anyway and I think it would be better to have a simpler connector
> API (that would solve our problems) and force connectors to comply one way
> or another.
> 
> If we let the block happen inside the connector, the blocking does not have
> to be a busy loop. For example, to do the block waiting efficiently, the
> connector can use java NIO selector().select which relies on OS syscall
> like epoll[1] instead of busy looping. But if Flink engine blocks outside
> the connector, it pretty much has to do the busy loop. So if there is only
> one API to get the element, a blocking getNextElement() makes more sense.
> In any case, we should avoid ambiguity. It has to be crystal clear about
> whether a method is expected to be blocking or non-blocking. Otherwise it
> would be very difficult for Flink engine to do the right thing with the
> connectors. At the first glance at getCurrent(), the expected behavior is
> not quite clear.
> 
> That said, I do agree that functionality wise, poll() and take() kind of
> overlap. But they are actually not quite different from
> isBlocked()/getNextElement(). Compared with isBlocked(), the only
> difference is that poll() also returns the next record if it is available.
> But I agree that the isBlocked() + getNextElement() is more flexible as
> users can just check the record availability, but not fetch the next
> element.
> 
>> In case of thread-less readers with only non-blocking `queue.poll()` (is
> that really a thing? I can not think about a real implementation that
> enforces such constraints)
> Right, it is pretty much a syntax sugar to allow user combine the
> check-and-take into one method. It could be achieved with isBlocked() +
> getNextElement().
> 
> [1] http://man7.org/linux/man-pages/man7/epoll.7.html
> 
> Thanks,
> 
> Jiangjie (Becket) Qin
> 
> On Wed, Nov 7, 2018 at 11:58 PM Piotr Nowojski <pi...@data-artisans.com>
> wrote:
> 
>> Hi Becket,
>> 
>> With my proposal, both of your examples would have to be solved by the
>> connector and solution to both problems would be the same:
>> 
>> Pretend that connector is never blocked (`isBlocked() { return
>> NOT_BLOCKED; }`) and implement `getNextElement()` in blocking fashion (or
>> semi blocking with return of control from time to time to allow for
>> checkpointing, network flushing and other resource management things to
>> happen in the same main thread). In other words, exactly how you would
>> implement `take()` method or how the same source connector would be
>> implemented NOW with current source interface. The difference with current
>> interface would be only that main loop would be outside of the connector,
>> and instead of periodically releasing checkpointing lock, periodically
>> `return null;` or `return Optional.empty();` from `getNextElement()`.
>> 
>> In case of thread-less readers with only non-blocking `queue.poll()` (is
>> that really a thing? I can not think about a real implementation that
>> enforces such constraints), we could provide a wrapper that hides the busy
>> looping. The same applies how to solve forever blocking readers - we could
>> provider another wrapper running the connector in separate thread.
>> 
>> But I don’t see a reason why we should expose both blocking `take()` and
>> non-blocking `poll()` methods to the Flink engine. Someone (Flink engine or
>> connector) would have to do the same busy looping anyway and I think it
>> would be better to have a simpler connector API (that would solve our
>> problems) and force connectors to comply one way or another.
>> 
>> Piotrek
>> 
>>> On 7 Nov 2018, at 10:55, Becket Qin <becket....@gmail.com> wrote:
>>> 
>>> Hi Piotr,
>>> 
>>> I might have misunderstood you proposal. But let me try to explain my
>>> concern. I am thinking about the following case:
>>> 1. a reader has the following two interfaces,
>>>   boolean isBlocked()
>>>   T getNextElement()
>>> 2. the implementation of getNextElement() is non-blocking.
>>> 3. The reader is thread-less, i.e. it does not have any internal thread.
>>> For example, it might just delegate the getNextElement() to a
>> queue.poll(),
>>> and isBlocked() is just queue.isEmpty().
>>> 
>>> How can Flink efficiently implement a blocking reading behavior with this
>>> reader? Either a tight loop or a backoff interval is needed. Neither of
>>> them is ideal.
>>> 
>>> Now let's say in the reader mentioned above implements a blocking
>>> getNextElement() method. Because there is no internal thread in the
>> reader,
>>> after isBlocked() returns false. Flink will still have to loop on
>>> isBlocked() to check whether the next record is available. If the next
>>> record reaches after 10 min, it is a tight loop for 10 min. You have
>>> probably noticed that in this case, even isBlocked() returns a future,
>> that
>>> future() will not be completed if Flink does not call some method from
>> the
>>> reader, because the reader has no internal thread to complete that future
>>> by itself.
>>> 
>>> Due to the above reasons, a blocking take() API would allow Flink to have
>>> an efficient way to read from a reader. There are many ways to wake up
>> the
>>> blocking thread when checkpointing is needed depending on the
>>> implementation. But I think the poll()/take() API would also work in that
>>> case.
>>> 
>>> Thanks,
>>> 
>>> Jiangjie (Becket) Qin
>>> 
>>> On Wed, Nov 7, 2018 at 4:31 PM Piotr Nowojski <pi...@data-artisans.com>
>>> wrote:
>>> 
>>>> Hi,
>>>> 
>>>> a)
>>>> 
>>>>> BTW, regarding the isBlock() method, I have a few more questions. 21,
>> Is
>>>> a method isReady() with boolean as a return value
>>>>> equivalent? Personally I found it is a little bit confusing in what is
>>>> supposed to be returned when the future is completed. 22. if
>>>>> the implementation of isBlocked() is optional, how do the callers know
>>>> whether the method is properly implemented or not?
>>>>> Does not implemented mean it always return a completed future?
>>>> 
>>>> `CompletableFuture<?> isBlocked()` is more or less an equivalent to
>>>> `boolean hasNext()` which in case of “false” provides some kind of a
>>>> listener/callback that notifies about presence of next element. There
>> are
>>>> some minor details, like `CompletableFuture<?>` has a minimal two state
>>>> logic:
>>>> 
>>>> 1. Future is completed - we have more data
>>>> 2. Future not yet completed - we don’t have data now, but we might/we
>> will
>>>> have in the future
>>>> 
>>>> While `boolean hasNext()` and `notify()` callback are a bit more
>>>> complicated/dispersed and can lead/encourage `notify()` spam.
>>>> 
>>>> b)
>>>> 
>>>>> 3. If merge the `advance` and `getCurrent`  to one method like
>> `getNext`
>>>> the `getNext` would need return a
>>>>> `ElementWithTimestamp` because some sources want to add timestamp to
>>>> every element. IMO, this is not so memory friendly
>>>>> so I prefer this design.
>>>> 
>>>> Guowei I don’t quite understand this. Could you elaborate why having a
>>>> separate `advance()` help?
>>>> 
>>>> c)
>>>> 
>>>> Regarding advance/poll/take. What’s the value of having two separate
>>>> methods: poll and take? Which one of them should be called and which
>>>> implemented? What’s the benefit of having those methods compared to
>> having
>>>> a one single method `getNextElement()` (or `pollElement() or whatever we
>>>> name it) with following contract:
>>>> 
>>>> CompletableFuture<?> isBlocked();
>>>> 
>>>> /**
>>>> Return next element - will be called only if `isBlocked()` is completed.
>>>> Try to implement it in non blocking fashion, but if that’s impossible or
>>>> you just don’t need the effort, you can block in this method.
>>>> */
>>>> T getNextElement();
>>>> 
>>>> I mean, if the connector is implemented non-blockingly, Flink should use
>>>> it that way. If it’s not, then `poll()` will `throw new
>>>> NotImplementedException()`. Implementing both of them and providing
>> both of
>>>> them to Flink wouldn’t make a sense, thus why not merge them into a
>> single
>>>> method call that should preferably (but not necessarily need to) be
>>>> non-blocking? It’s not like we are implementing general purpose `Queue`,
>>>> which users might want to call either of `poll` or `take`. We would
>> always
>>>> prefer to call `poll`, but if it’s blocking, then still we have no
>> choice,
>>>> but to call it and block on it.
>>>> 
>>>> d)
>>>> 
>>>>> 1. I agree with Piotr and Becket that the non-blocking source is very
>>>>> important. But in addition to `Future/poll`, there may be another way
>> to
>>>>> achieve this. I think it may be not very memory friendly if every
>> advance
>>>>> call return a Future.
>>>> 
>>>> I didn’t want to mention this, to not clog my initial proposal, but
>> there
>>>> is a simple solution for the problem:
>>>> 
>>>> public interface SplitReader {
>>>> 
>>>>   (…)
>>>> 
>>>>   CompletableFuture<?> NOT_BLOCKED =
>>>> CompletableFuture.completedFuture(null);
>>>> 
>>>>   /**
>>>>    * Returns a future that will be completed when the page source
>> becomes
>>>>    * unblocked.  If the page source is not blocked, this method should
>>>> return
>>>>    * {@code NOT_BLOCKED}.
>>>>    */
>>>>   default CompletableFuture<?> isBlocked()
>>>>   {
>>>>       return NOT_BLOCKED;
>>>>   }
>>>> 
>>>> If we are blocked and we are waiting for the IO, then creating a new
>>>> Future is non-issue. Under full throttle/throughput and not blocked
>> sources
>>>> returning a static `NOT_BLOCKED` constant  should also solve the
>> problem.
>>>> 
>>>> One more remark, non-blocking sources might be a necessity in a single
>>>> threaded model without a checkpointing lock. (Currently when sources are
>>>> blocked, they can release checkpointing lock and re-acquire it again
>>>> later). Non-blocking `poll`/`getNext()` would allow for checkpoints to
>>>> happen when source is idling. In that case either `notify()` or my
>> proposed
>>>> `isBlocked()` would allow to avoid busy-looping.
>>>> 
>>>> Piotrek
>>>> 
>>>>> On 5 Nov 2018, at 03:59, Becket Qin <becket....@gmail.com> wrote:
>>>>> 
>>>>> Hi Thomas,
>>>>> 
>>>>> The iterator-like API was also the first thing that came to me. But it
>>>>> seems a little confusing that hasNext() does not mean "the stream has
>> not
>>>>> ended", but means "the next record is ready", which is repurposing the
>>>> well
>>>>> known meaning of hasNext(). If we follow the hasNext()/next() pattern,
>> an
>>>>> additional isNextReady() method to indicate whether the next record is
>>>>> ready seems more intuitive to me.
>>>>> 
>>>>> Similarly, in poll()/take() pattern, another method of isDone() is
>> needed
>>>>> to indicate whether the stream has ended or not.
>>>>> 
>>>>> Compared with hasNext()/next()/isNextReady() pattern,
>>>>> isDone()/poll()/take() seems more flexible for the reader
>> implementation.
>>>>> When I am implementing a reader, I could have a couple of choices:
>>>>> 
>>>>> - A thread-less reader that does not have any internal thread.
>>>>> - When poll() is called, the same calling thread will perform a bunch
>>>> of
>>>>>    IO asynchronously.
>>>>>    - When take() is called, the same calling thread will perform a
>>>> bunch
>>>>>    of IO and wait until the record is ready.
>>>>> - A reader with internal threads performing network IO and put records
>>>>> into a buffer.
>>>>>    - When poll() is called, the calling thread simply reads from the
>>>>>    buffer and return empty result immediately if there is no record.
>>>>>    - When take() is called, the calling thread reads from the buffer
>>>> and
>>>>>    block waiting if the buffer is empty.
>>>>> 
>>>>> On the other hand, with the hasNext()/next()/isNextReady() API, it is
>>>> less
>>>>> intuitive for the reader developers to write the thread-less pattern.
>>>>> Although technically speaking one can still do the asynchronous IO to
>>>>> prepare the record in isNextReady(). But it is inexplicit and seems
>>>>> somewhat hacky.
>>>>> 
>>>>> Thanks,
>>>>> 
>>>>> Jiangjie (Becket) Qin
>>>>> 
>>>>> 
>>>>> 
>>>>> On Mon, Nov 5, 2018 at 6:55 AM Thomas Weise <t...@apache.org> wrote:
>>>>> 
>>>>>> Couple more points regarding discovery:
>>>>>> 
>>>>>> The proposal mentions that discovery could be outside the execution
>>>> graph.
>>>>>> Today, discovered partitions/shards are checkpointed. I believe that
>>>> will
>>>>>> also need to be the case in the future, even when discovery and
>> reading
>>>> are
>>>>>> split between different tasks.
>>>>>> 
>>>>>> For cases such as resharding of a Kinesis stream, the relationship
>>>> between
>>>>>> splits needs to be considered. Splits cannot be randomly distributed
>>>> over
>>>>>> readers in certain situations. An example was mentioned here:
>>>>>> https://github.com/apache/flink/pull/6980#issuecomment-435202809
>>>>>> 
>>>>>> Thomas
>>>>>> 
>>>>>> 
>>>>>> On Sun, Nov 4, 2018 at 1:43 PM Thomas Weise <t...@apache.org> wrote:
>>>>>> 
>>>>>>> Thanks for getting the ball rolling on this!
>>>>>>> 
>>>>>>> Can the number of splits decrease? Yes, splits can be closed and go
>>>> away.
>>>>>>> An example would be a shard merge in Kinesis (2 existing shards will
>> be
>>>>>>> closed and replaced with a new shard).
>>>>>>> 
>>>>>>> Regarding advance/poll/take: IMO the least restrictive approach would
>>>> be
>>>>>>> the thread-less IO model (pull based, non-blocking, caller retrieves
>>>> new
>>>>>>> records when available). The current Kinesis API requires the use of
>>>>>>> threads. But that can be internal to the split reader and does not
>> need
>>>>>> to
>>>>>>> be a source API concern. In fact, that's what we are working on right
>>>> now
>>>>>>> as improvement to the existing consumer: Each shard consumer thread
>>>> will
>>>>>>> push to a queue, the consumer main thread will poll the queue(s). It
>> is
>>>>>>> essentially a mapping from threaded IO to non-blocking.
>>>>>>> 
>>>>>>> The proposed SplitReader interface would fit the thread-less IO
>> model.
>>>>>>> Similar to an iterator, we find out if there is a new element
>> (hasNext)
>>>>>> and
>>>>>>> if so, move to it (next()). Separate calls deliver the meta
>> information
>>>>>>> (timestamp, watermark). Perhaps advance call could offer a timeout
>>>>>> option,
>>>>>>> so that the caller does not end up in a busy wait. On the other
>> hand, a
>>>>>>> caller processing multiple splits may want to cycle through fast, to
>>>>>>> process elements of other splits as soon as they become available.
>> The
>>>>>> nice
>>>>>>> thing is that this "split merge" logic can now live in Flink and be
>>>>>>> optimized and shared between different sources.
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> Thomas
>>>>>>> 
>>>>>>> 
>>>>>>> On Sun, Nov 4, 2018 at 6:34 AM Guowei Ma <guowei....@gmail.com>
>> wrote:
>>>>>>> 
>>>>>>>> Hi,
>>>>>>>> Thanks Aljoscha for this FLIP.
>>>>>>>> 
>>>>>>>> 1. I agree with Piotr and Becket that the non-blocking source is
>> very
>>>>>>>> important. But in addition to `Future/poll`, there may be another
>> way
>>>> to
>>>>>>>> achieve this. I think it may be not very memory friendly if every
>>>>>> advance
>>>>>>>> call return a Future.
>>>>>>>> 
>>>>>>>> public interface Listener {
>>>>>>>>   public void notify();
>>>>>>>> }
>>>>>>>> 
>>>>>>>> public interface SplitReader() {
>>>>>>>>   /**
>>>>>>>>    * When there is no element temporarily, this will return false.
>>>>>>>>    * When elements is available again splitReader can call
>>>>>>>> listener.notify()
>>>>>>>>    * In addition the frame would check `advance` periodically .
>>>>>>>>    * Of course advance can always return true and ignore the
>>>> listener
>>>>>>>> argument for simplicity.
>>>>>>>>    */
>>>>>>>>   public boolean advance(Listener listener);
>>>>>>>> }
>>>>>>>> 
>>>>>>>> 2.  The FLIP tells us very clearly that how to create all Splits and
>>>> how
>>>>>>>> to create a SplitReader from a Split. But there is no strategy for
>> the
>>>>>> user
>>>>>>>> to choose how to assign the splits to the tasks. I think we could
>> add
>>>> a
>>>>>>>> Enum to let user to choose.
>>>>>>>> /**
>>>>>>>> public Enum SplitsAssignmentPolicy {
>>>>>>>>  Location,
>>>>>>>>  Workload,
>>>>>>>>  Random,
>>>>>>>>  Average
>>>>>>>> }
>>>>>>>> */
>>>>>>>> 
>>>>>>>> 3. If merge the `advance` and `getCurrent`  to one method like
>>>> `getNext`
>>>>>>>> the `getNext` would need return a `ElementWithTimestamp` because
>> some
>>>>>>>> sources want to add timestamp to every element. IMO, this is not so
>>>>>> memory
>>>>>>>> friendly so I prefer this design.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Thanks
>>>>>>>> 
>>>>>>>> Piotr Nowojski <pi...@data-artisans.com> 于2018年11月1日周四 下午6:08写道:
>>>>>>>> 
>>>>>>>>> Hi,
>>>>>>>>> 
>>>>>>>>> Thanks Aljoscha for starting this, it’s blocking quite a lot of
>> other
>>>>>>>>> possible improvements. I have one proposal. Instead of having a
>>>> method:
>>>>>>>>> 
>>>>>>>>> boolean advance() throws IOException;
>>>>>>>>> 
>>>>>>>>> I would replace it with
>>>>>>>>> 
>>>>>>>>> /*
>>>>>>>>> * Return a future, which when completed means that source has more
>>>>>> data
>>>>>>>>> and getNext() will not block.
>>>>>>>>> * If you wish to use benefits of non blocking connectors, please
>>>>>>>>> implement this method appropriately.
>>>>>>>>> */
>>>>>>>>> default CompletableFuture<?> isBlocked() {
>>>>>>>>>      return CompletableFuture.completedFuture(null);
>>>>>>>>> }
>>>>>>>>> 
>>>>>>>>> And rename `getCurrent()` to `getNext()`.
>>>>>>>>> 
>>>>>>>>> Couple of arguments:
>>>>>>>>> 1. I don’t understand the division of work between `advance()` and
>>>>>>>>> `getCurrent()`. What should be done in which, especially for
>>>> connectors
>>>>>>>>> that handle records in batches (like Kafka) and when should you
>> call
>>>>>>>>> `advance` and when `getCurrent()`.
>>>>>>>>> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us in
>>>> the
>>>>>>>>> future to have asynchronous/non blocking connectors and more
>>>>>> efficiently
>>>>>>>>> handle large number of blocked threads, without busy waiting. While
>>>> at
>>>>>> the
>>>>>>>>> same time it doesn’t add much complexity, since naive connector
>>>>>>>>> implementations can be always blocking.
>>>>>>>>> 3. This also would allow us to use a fixed size thread pool of task
>>>>>>>>> executors, instead of one thread per task.
>>>>>>>>> 
>>>>>>>>> Piotrek
>>>>>>>>> 
>>>>>>>>>> On 31 Oct 2018, at 17:22, Aljoscha Krettek <aljos...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>> Hi All,
>>>>>>>>>> 
>>>>>>>>>> In order to finally get the ball rolling on the new source
>> interface
>>>>>>>>> that we have discussed for so long I finally created a FLIP:
>>>>>>>>> 
>>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>>>>>>>>>> 
>>>>>>>>>> I cc'ed Thomas and Jamie because of the ongoing work/discussion
>>>> about
>>>>>>>>> adding per-partition watermark support to the Kinesis source and
>>>>>> because
>>>>>>>>> this would enable generic implementation of event-time alignment
>> for
>>>>>> all
>>>>>>>>> sources. Maybe we need another FLIP for the event-time alignment
>>>> part,
>>>>>>>>> especially the part about information sharing between operations
>> (I'm
>>>>>> not
>>>>>>>>> calling it state sharing because state has a special meaning in
>>>> Flink).
>>>>>>>>>> 
>>>>>>>>>> Please discuss away!
>>>>>>>>>> 
>>>>>>>>>> Aljoscha
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>> 
>>>> 
>>>> 
>> 
>> 

Reply via email to