Hi,
Thanks Aljoscha for this FLIP.

1. I agree with Piotr and Becket that the non-blocking source is very
important. But in addition to `Future/poll`, there may be another way to
achieve this. I think it may be not very memory friendly if every advance
call return a Future.

public interface Listener {
     public void notify();
}

public interface SplitReader() {
     /**
      * When there is no element temporarily, this will return false.
      * When elements is available again splitReader can call
listener.notify()
      * In addition the frame would check `advance` periodically .
      * Of course advance can always return true and ignore the listener
argument for simplicity.
      */
     public boolean advance(Listener listener);
}

2.  The FLIP tells us very clearly that how to create all Splits and how to
create a SplitReader from a Split. But there is no strategy for the user to
choose how to assign the splits to the tasks. I think we could add a Enum
to let user to choose.
/**
  public Enum SplitsAssignmentPolicy {
    Location,
    Workload,
    Random,
    Average
  }
*/

3. If merge the `advance` and `getCurrent`  to one method like `getNext`
the `getNext` would need return a `ElementWithTimestamp` because some
sources want to add timestamp to every element. IMO, this is not so memory
friendly so I prefer this design.



Thanks

Piotr Nowojski <pi...@data-artisans.com> 于2018年11月1日周四 下午6:08写道:

> Hi,
>
> Thanks Aljoscha for starting this, it’s blocking quite a lot of other
> possible improvements. I have one proposal. Instead of having a method:
>
> boolean advance() throws IOException;
>
> I would replace it with
>
> /*
>  * Return a future, which when completed means that source has more data
> and getNext() will not block.
>  * If you wish to use benefits of non blocking connectors, please
> implement this method appropriately.
>  */
> default CompletableFuture<?> isBlocked() {
>         return CompletableFuture.completedFuture(null);
> }
>
> And rename `getCurrent()` to `getNext()`.
>
> Couple of arguments:
> 1. I don’t understand the division of work between `advance()` and
> `getCurrent()`. What should be done in which, especially for connectors
> that handle records in batches (like Kafka) and when should you call
> `advance` and when `getCurrent()`.
> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us in the
> future to have asynchronous/non blocking connectors and more efficiently
> handle large number of blocked threads, without busy waiting. While at the
> same time it doesn’t add much complexity, since naive connector
> implementations can be always blocking.
> 3. This also would allow us to use a fixed size thread pool of task
> executors, instead of one thread per task.
>
> Piotrek
>
> > On 31 Oct 2018, at 17:22, Aljoscha Krettek <aljos...@apache.org> wrote:
> >
> > Hi All,
> >
> > In order to finally get the ball rolling on the new source interface
> that we have discussed for so long I finally created a FLIP:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> >
> > I cc'ed Thomas and Jamie because of the ongoing work/discussion about
> adding per-partition watermark support to the Kinesis source and because
> this would enable generic implementation of event-time alignment for all
> sources. Maybe we need another FLIP for the event-time alignment part,
> especially the part about information sharing between operations (I'm not
> calling it state sharing because state has a special meaning in Flink).
> >
> > Please discuss away!
> >
> > Aljoscha
> >
> >
>
>

Reply via email to