Hi Becket,
That's great we have reached a consensus on Source#getBoundedness().
Regarding to option#3, my concern is that if we don't support streaming
mode for bounded source,
how could we create a testing source for streaming mode? Currently, all the
testing source for streaming
are bounded, so that the integration test will finish finally.
Regarding to Source#getRecordOrder(), could we have a implicit contract
that unbounded source should
already read in order (i.e. reading partitions in parallel), for bounded
source the order is not mandatory.
This is also the behaviors of the current sources.
1) a source can't guarantee it reads in strict order, because the producer
may produce data not in order.
2) *Bounded-StrictOrder* is not necessary, because batch can reorder data.
Best,
Jark
On Tue, 17 Dec 2019 at 22:03, Becket Qin <becket....@gmail.com> wrote:
Hi folks,
Thanks for the comments. I am convinced that the Source API should not
take
boundedness as a parameter after it is constructed. What Timo and Dawid
suggested sounds a reasonable solution to me. So the Source API would
become:
Source {
Boundedness getBoundedness();
}
Assuming the above Source API, in addition to the two options mentioned
in
earlier emails, I am thinking of another option:
*Option 3:*
// MySource must be unbounded, otherwise throws exception.
DataStream<Type> dataStream = env.source(mySource);
// MySource must be bounded, otherwise throws exception.
BoundedDataStream<Type> boundedDataStream = env.boundedSource(mySource);
The pros of this API are:
a) It fits the requirements from Table / SQL well.
b) DataStream users still have type safety (option 2 only has partial
type safety).
c) Cristal clear boundedness from the API which makes DataStream join
/
connect easy to reason about.
The caveats I see,
a) It is inconsistent with Table since Table has one unified
interface.
b) No streaming mode for bounded source.
@Stephan Ewen <ewenstep...@gmail.com> @Aljoscha Krettek
<aljos...@ververica.com> what do you think of the approach?
Orthogonal to the above API, I am wondering whether boundedness is the
only
dimension needed to describe the characteristic of the Source behavior.
We
may also need to have another dimension of *record order*.
For example, when a file source is reading from a directory with bounded
records, it may have two ways to read.
1. Read files in parallel.
2. Read files in the chronological order.
In both cases, the file source is a Bounded Source. However, the
processing
requirement for downstream may be different. In the first case, the
record processing and result emitting order does not matter, e.g. word
count. In the second case, the records may have to be processed in the
order they were read, e.g. change log processing.
If the Source only has a getBoundedness() method, the downstream
processors
would not know whether the records emitted from the Source should be
processed in order or not. So combining the boundedness and record order,
we will have four scenarios:
*Bounded-StrictOrder*: A segment of change log.
*Bounded-Random*: Batch Word Count.
*Unbounded-StrictOrder*: An infinite change log.
*Unbounded-Random*: Streaming Word Count.
Option 2 mentioned in the previous email was kind of trying to handle the
Bounded-StrictOrder case by creating a DataStream from a bounded source,
which actually does not work.
It looks that we do not have strict order support in some operators at
this
point, e.g. join. But we may still want to add the semantic to the Source
first so later on we don't need to change all the source implementations,
especially given that many of them will be implemented by 3rd party.
Given that, we need another dimension of *Record Order* in the Source.
More
specifically, the API would become:
Source {
Boundedness getBoundedness();
RecordOrder getRecordOrder();
}
public enum RecordOrder {
/** The record in the DataStream must be processed in its strict
order
for correctness. */
STRICT,
/** The record in the DataStream can be processed in arbitrary order.
*/
RANDOM;
}
Any thoughts?
Thanks,
Jiangjie (Becket) Qin
On Tue, Dec 17, 2019 at 3:44 PM Timo Walther <twal...@apache.org> wrote:
Hi Becket,
I completely agree with Dawid's suggestion. The information about the
boundedness should come out of the source. Because most of the
streaming
sources can be made bounded based on some connector specific criterion.
In Kafka, it would be an end offset or end timestamp but in any case
having just a env.boundedSource() is not enough because parameters for
making the source bounded are missing.
I suggest to have a simple `isBounded(): Boolean` flag in every source
that might be influenced by a connector builder as Dawid mentioned.
For type safety during programming, we can still go with *Final state
1*. By having a env.source() vs env.boundedSource(). The latter would
just enforce that the boolean flag is set to `true` and could make
bounded operations available (if we need that actually).
However, I don't think that we should start making a unified Table API
ununified again. Boundedness is an optimization property. Every bounded
operation can also executed in an unbounded way using
updates/retraction
or watermarks.
Regards,
Timo
On 15.12.19 14:22, Becket Qin wrote:
Hi Dawid and Jark,
I think the discussion ultimately boils down to the question that
which
one
of the following two final states do we want? Once we make this
decision,
everything else can be naturally derived.
*Final state 1*: Separate API for bounded / unbounded DataStream &
Table.
That means any code users write will be valid at the point when they
write
the code. This is similar to having type safety check at programming
time.
For example,
BoundedDataStream extends DataStream {
// Operations only available for bounded data.
BoundedDataStream sort(...);
// Interaction with another BoundedStream returns a Bounded stream.
BoundedJoinedDataStream join(BoundedDataStream other)
// Interaction with another unbounded stream returns an unbounded
stream.
JoinedDataStream join(DataStream other)
}
BoundedTable extends Table {
// Bounded only operation.
BoundedTable sort(...);
// Interaction with another BoundedTable returns a BoundedTable.
BoundedTable join(BoundedTable other)
// Interaction with another unbounded table returns an unbounded
table.
Table join(Table other)
}
*Final state 2*: One unified API for bounded / unbounded DataStream /
Table.
That unified API may throw exception at DAG compilation time if an
invalid
operation is tried. This is what Table API currently follows.
DataStream {
// Throws exception if the DataStream is unbounded.
DataStream sort();
// Get boundedness.
Boundedness getBoundedness();
}
Table {
// Throws exception if the table has infinite rows.
Table orderBy();
// Get boundedness.
Boundedness getBoundedness();
}
>From what I understand, there is no consensus so far on this decision
yet.
Whichever final state we choose, we need to make it consistent across
the
entire project. We should avoid the case that Table follows one final
state
while DataStream follows another. Some arguments I am aware of from
both
sides so far are following:
Arguments for final state 1:
1a) Clean API with method safety check at programming time.
1b) (Counter 2b) Although SQL does not have programming time error
check, SQL
is not really a "programming language" per se. So SQL can be
different
from
Table and DataStream.
1c) Although final state 2 seems making it easier for SQL to use
given
it
is more "config based" than "parameter based", final state 1 can
probably
also meet what SQL wants by wrapping the Source in TableSource /
TableSourceFactory API if needed.
Arguments for final state 2:
2a) The Source API itself seems already sort of following the unified
API
pattern.
2b) There is no "programming time" method error check in SQL case, so
we
cannot really achieve final state 1 across the board.
2c) It is an easier path given our current status, i.e. Table is
already
following final state 2.
2d) Users can always explicitly check the boundedness if they want
to.
As I mentioned earlier, my initial thought was also to have a
"configuration based" Source rather than a "parameter based" Source.
So
it
is completely possible that I missed some important consideration or
design
principles that we want to enforce for the project. It would be good
if @Stephan
Ewen <step...@ververica.com> and @Aljoscha Krettek <
aljos...@ververica.com> can
also provide more thoughts on this.
Re: Jingsong
As you said, there are some batched system source, like parquet/orc
source.
Could we have the batch emit interface to improve performance? The
queue of
per record may cause performance degradation.
The current interface does not necessarily cause performance problem
in a
multi-threading case. In fact, the base implementation allows
SplitReaders
to add a batch <E> of records<T> to the records queue<E>, so each
element
in the records queue would be a batch <E>. In this case, when the
main
thread polls records, it will take a batch <E> of records <T> from
the
shared records queue and process the records <T> in a batch manner.
Thanks,
Jiangjie (Becket) Qin
On Thu, Dec 12, 2019 at 1:29 PM Jingsong Li <jingsongl...@gmail.com>
wrote:
Hi Becket,
I also have some performance concerns too.
If I understand correctly, SourceOutput will emit data per record
into
the
queue? I'm worried about the multithreading performance of this
queue.
One example is some batched messaging systems which only have an
offset
for the entire batch instead of individual messages in the batch.
As you said, there are some batched system source, like parquet/orc
source.
Could we have the batch emit interface to improve performance? The
queue of
per record may cause performance degradation.
Best,
Jingsong Lee
On Thu, Dec 12, 2019 at 9:15 AM Jark Wu <imj...@gmail.com> wrote:
Hi Becket,
I think Dawid explained things clearly and makes a lot of sense.
I'm also in favor of #2, because #1 doesn't work for our future
unified
envrionment.
You can see the vision in this documentation [1]. In the future, we
would
like to
drop the global streaming/batch mode in SQL (i.e.
EnvironmentSettings#inStreamingMode/inBatchMode).
A source is bounded or unbounded once defined, so queries can be
inferred
from source to run
in streaming or batch or hybrid mode. However, in #1, we will lose
this
ability because the framework
doesn't know whether the source is bounded or unbounded.
Best,
Jark
[1]:
https://docs.google.com/document/d/1yrKXEIRATfxHJJ0K3t6wUgXAtZq8D-XgvEnvl2uUcr0/edit#heading=h.v4ib17buma1p
On Wed, 11 Dec 2019 at 20:52, Piotr Nowojski <pi...@ververica.com>
wrote:
Hi,
Regarding the:
Collection<E> getNextRecords()
I’m pretty sure such design would unfortunately impact the
performance
(accessing and potentially creating the collection on the hot
path).
Also the
InputStatus emitNext(DataOutput<T> output) throws Exception;
or
Status pollNext(SourceOutput<T> sourceOutput) throws Exception;
Gives us some opportunities in the future, to allow Source hot
looping
inside, until it receives some signal “please exit because of some
reasons”
(output collector could return such hint upon collecting the
result).
But
that’s another topic outside of this FLIP’s scope.
Piotrek
On 11 Dec 2019, at 10:41, Till Rohrmann <trohrm...@apache.org>
wrote:
Hi Becket,
quick clarification from my side because I think you
misunderstood
my
question. I did not suggest to let the SourceReader return only a
single
record at a time when calling getNextRecords. As the return type
indicates,
the method can return an arbitrary number of records.
Cheers,
Till
On Wed, Dec 11, 2019 at 10:13 AM Dawid Wysakowicz <
dwysakow...@apache.org <mailto:dwysakow...@apache.org>>
wrote:
Hi Becket,
Issue #1 - Design of Source interface
I mentioned the lack of a method like
Source#createEnumerator(Boundedness
boundedness, SplitEnumeratorContext context), because without
the
current
proposal is not complete/does not work.
If we say that boundedness is an intrinsic property of a source
imo
we
don't need the Source#createEnumerator(Boundedness boundedness,
SplitEnumeratorContext context) method.
Assuming a source from my previous example:
Source source = KafkaSource.builder()
...
.untilTimestamp(...)
.build()
Would the enumerator differ if created like
source.createEnumerator(CONTINUOUS_UNBOUNDED, ...) vs source
.createEnumerator(BOUNDED, ...)? I know I am repeating myself,
but
this
is
the part that my opinion differ the most from the current
proposal.
I
really think it should always be the source that tells if it is
bounded
or
not. In the current proposal methods
continousSource/boundedSource
somewhat
reconfigure the source, which I think is misleading.
I think a call like:
Source source = KafkaSource.builder()
...
.readContinously() / readUntilLatestOffset() /
readUntilTimestamp
/
readUntilOffsets / ...
.build()
is way cleaner (and expressive) than
Source source = KafkaSource.builder()
...
.build()
env.continousSource(source) // which actually underneath would
call
createEnumerator(CONTINUOUS, ctx) which would be equivalent to
source.readContinously().createEnumerator(ctx)
// or
env.boundedSource(source) // which actually underneath would
call
createEnumerator(BOUNDED, ctx) which would be equivalent to
source.readUntilLatestOffset().createEnumerator(ctx)
Sorry for the comparison, but to me it seems there is too much
magic
happening underneath those two calls.
I really believe the Source interface should have getBoundedness
method
instead of (supportBoundedness) + createEnumerator(Boundedness,
...)
Issue #2 - Design of
ExecutionEnvironment#source()/continuousSource()/boundedSource()
As you might have guessed I am slightly in favor of option #2
modified.
Yes I am aware every step of the dag would have to be able to
say
if
it
is
bounded or not. I have a feeling it would be easier to express
cross
bounded/unbounded operations, but I must admit I have not
thought
it
through thoroughly, In the spirit of batch is just a special
case
of
streaming I thought BoundedStream would extend from DataStream.
Correct
me
if I am wrong. In such a setup the cross bounded/unbounded
operation
could
be expressed quite easily I think:
DataStream {
DataStream join(DataStream, ...); // we could not really tell
if
the
result is bounded or not, but because bounded stream is a special
case
of
unbounded the API object is correct, irrespective if the left or
right
side
of the join is bounded
}
BoundedStream extends DataStream {
BoundedStream join(BoundedStream, ...); // only if both sides
are
bounded the result can be bounded as well. However we do have
access
to
the
DataStream#join here, so you can still join with a DataStream
}
On the other hand I also see benefits of two completely
disjointed
APIs,
as we could prohibit some streaming calls in the bounded API. I
can't
think
of any unbounded operators that could not be implemented for
bounded
stream.
Besides I think we both agree we don't like the method:
DataStream boundedStream(Source)
suggested in the current state of the FLIP. Do we ? :)
Best,
Dawid
On 10/12/2019 18:57, Becket Qin wrote:
Hi folks,
Thanks for the discussion, great feedback. Also thanks Dawid for
the
explanation, it is much clearer now.
One thing that is indeed missing from the FLIP is how the
boundedness
is
passed to the Source implementation. So the API should be
Source#createEnumerator(Boundedness boundedness,
SplitEnumeratorContext
context)
And we can probably remove the
Source#supportBoundedness(Boundedness
boundedness) method.
Assuming we have that, we are essentially choosing from one of
the
following two options:
Option 1:
// The source is continuous source, and only unbounded
operations
can
be
performed.
DataStream<Type> datastream = env.continuousSource(someSource);
// The source is bounded source, both bounded and unbounded
operations
can
be performed.
BoundedDataStream<Type> boundedDataStream =
env.boundedSource(someSource);
- Pros:
a) explicit boundary between bounded / unbounded streams,
it
is
quite simple and clear to the users.
- Cons:
a) For applications that do not involve bounded
operations,
they
still have to call different API to distinguish bounded /
unbounded
streams.
b) No support for bounded stream to run in a streaming
runtime
setting, i.e. scheduling and operators behaviors.
Option 2:
// The source is either bounded or unbounded, but only unbounded
operations
could be performed on the returned DataStream.
DataStream<Type> dataStream = env.source(someSource);
// The source must be a bounded source, otherwise exception is
thrown.
BoundedDataStream<Type> boundedDataStream =
env.boundedSource(boundedSource);
The pros and cons are exactly the opposite of option 1.
- Pros:
a) For applications that do not involve bounded
operations,
they
still have to call different API to distinguish bounded /
unbounded
streams.
b) Support for bounded stream to run in a streaming
runtime
setting,
i.e. scheduling and operators behaviors.
- Cons:
a) Bounded / unbounded streams are kind of mixed, i.e.
given
a
DataStream, it is not clear whether it is bounded or not, unless
you
have
the access to its source.
If we only think from the Source API perspective, option 2
seems a
better
choice because functionality wise it is a superset of option 1,
at
the
cost
of some seemingly acceptable ambiguity in the DataStream API.
But if we look at the DataStream API as a whole, option 1 seems
a
clearer
choice. For example, some times a library may have to know
whether a
certain task will finish or not. And it would be difficult to
tell
if
the
input is a DataStream, unless additional information is provided
all
the
way from the Source. One possible solution is to have a
*modified
option 2*
which adds a method to the DataStream API to indicate
boundedness,
such
as
getBoundedness(). It would solve the problem with a potential
confusion
of
what is difference between a DataStream with
getBoundedness()=true
and a
BoundedDataStream. But that seems not super difficult to
explain.
So from API's perspective, I don't have a strong opinion between
*option 1*
and *modified option 2. *I like the cleanness of option 1, but
modified
option 2 would be more attractive if we have concrete use case
for
the
"Bounded stream with unbounded streaming runtime settings".
Re: Till
Maybe this has already been asked before but I was wondering why
the
SourceReader interface has the method pollNext which hands the
responsibility of outputting elements to the SourceReader
implementation?
Has this been done for backwards compatibility reasons with the
old
source
interface? If not, then one could define a Collection<E>
getNextRecords()
method which returns the currently retrieved records and then
the
caller
emits them outside of the SourceReader. That way the interface
would
not
allow to implement an outputting loop where we never hand back
control
to
the caller. At the moment, this contract can be easily broken
and
is
only
mentioned loosely in the JavaDocs.
The primary reason we handover the SourceOutput to the
SourceReader
is
because sometimes it is difficult for a SourceReader to emit one
record
at
a time. One example is some batched messaging systems which only
have
an
offset for the entire batch instead of individual messages in
the
batch. In
that case, returning one record at a time would leave the
SourceReader
in
an uncheckpointable state because they can only checkpoint at
the
batch
boundaries.
Thanks,
Jiangjie (Becket) Qin
On Tue, Dec 10, 2019 at 5:33 PM Till Rohrmann <
trohrm...@apache.org
<mailto:trohrm...@apache.org>> <trohrm...@apache.org <mailto:
trohrm...@apache.org>> wrote:
Hi everyone,
thanks for drafting this FLIP. It reads very well.
Concerning Dawid's proposal, I tend to agree. The boundedness
could
come
from the source and tell the system how to treat the operator
(scheduling
wise). From a user's perspective it should be fine to get back a
DataStream
when calling env.source(boundedSource) if he does not need
special
operations defined on a BoundedDataStream. If he needs this,
then
one
could
use the method BoundedDataStream
env.boundedSource(boundedSource).
If possible, we could enforce the proper usage of
env.boundedSource()
by
introducing a BoundedSource type so that one cannot pass an
unbounded source to it. That way users would not be able to
shoot
themselves in the foot.
Maybe this has already been asked before but I was wondering why
the
SourceReader interface has the method pollNext which hands the
responsibility of outputting elements to the SourceReader
implementation?
Has this been done for backwards compatibility reasons with the
old
source
interface? If not, then one could define a Collection<E>
getNextRecords()
method which returns the currently retrieved records and then
the
caller
emits them outside of the SourceReader. That way the interface
would
not
allow to implement an outputting loop where we never hand back
control
to
the caller. At the moment, this contract can be easily broken
and
is
only
mentioned loosely in the JavaDocs.
Cheers,
Till
On Tue, Dec 10, 2019 at 7:49 AM Jingsong Li <
jingsongl...@gmail.com
<mailto:jingsongl...@gmail.com>> <jingsongl...@gmail.com <mailto:
jingsongl...@gmail.com>>
wrote:
Hi all,
I think current design is good.
My understanding is:
For execution mode: bounded mode and continuous mode, It's
totally
different. I don't think we have the ability to integrate the
two
models
at
present. It's about scheduling, memory, algorithms, States, etc.
we
shouldn't confuse them.
For source capabilities: only bounded, only continuous, both
bounded
and
continuous.
I think Kafka is a source that can be ran both bounded
and continuous execution mode.
And Kafka with end offset should be ran both bounded
and continuous execution mode. Using apache Beam with Flink
runner, I
used
to run a "bounded" Kafka in streaming mode. For our previous
DataStream,
it
is not necessarily required that the source cannot be bounded.
So it is my thought for Dawid's question:
1.pass a bounded source to continuousSource() +1
2.pass a continuous source to boundedSource() -1, should throw
exception.
In StreamExecutionEnvironment, continuousSource and
boundedSource
define
the execution mode. It defines a clear boundary of execution
mode.
Best,
Jingsong Lee
On Tue, Dec 10, 2019 at 10:37 AM Jark Wu <imj...@gmail.com
<mailto:
imj...@gmail.com>> <imj...@gmail.com <mailto:imj...@gmail.com>>
wrote:
I agree with Dawid's point that the boundedness information
should
come
from the source itself (e.g. the end timestamp), not through
env.boundedSouce()/continuousSource().
I think if we want to support something like `env.source()` that
derive
the
execution mode from source, `supportsBoundedness(Boundedness)`
method is not enough, because we don't know whether it is
bounded
or
not.
Best,
Jark
On Mon, 9 Dec 2019 at 22:21, Dawid Wysakowicz <
dwysakow...@apache.org
<mailto:dwysakow...@apache.org>> <dwysakow...@apache.org <mailto:
dwysakow...@apache.org>>
wrote:
One more thing. In the current proposal, with the
supportsBoundedness(Boundedness) method and the boundedness
coming
from
either continuousSource or boundedSource I could not find how
this
information is fed back to the SplitEnumerator.
Best,
Dawid
On 09/12/2019 13:52, Becket Qin wrote:
Hi Dawid,
Thanks for the comments. This actually brings another relevant
question
about what does a "bounded source" imply. I actually had the
same
impression when I look at the Source API. Here is what I
understand
after
some discussion with Stephan. The bounded source has the
following
impacts.
1. API validity.
- A bounded source generates a bounded stream so some operations
that
only
works for bounded records would be performed, e.g. sort.
- To expose these bounded stream only APIs, there are two
options:
a. Add them to the DataStream API and throw exception if a
method
is
called on an unbounded stream.
b. Create a BoundedDataStream class which is returned from
env.boundedSource(), while DataStream is returned from
env.continousSource().
Note that this cannot be done by having single
env.source(theSource)
even
the Source has a getBoundedness() method.
2. Scheduling
- A bounded source could be computed stage by stage without
bringing
up
all
the tasks at the same time.
3. Operator behaviors
- A bounded source indicates the records are finite so some
operators
can
wait until it receives all the records before it starts the
processing.
In the above impact, only 1 is relevant to the API design. And
the
current
proposal in FLIP-27 is following 1.b.
// boundedness depends of source property, imo this should
always
be
preferred
DataStream<MyType> stream = env.source(theSource);
In your proposal, does DataStream have bounded stream only
methods?
It
looks it should have, otherwise passing a bounded Source to
env.source()
would be confusing. In that case, we will essentially do 1.a if
an
unbounded Source is created from env.source(unboundedSource).
If we have the methods only supported for bounded streams in
DataStream,
it
seems a little weird to have a separate BoundedDataStream
interface.
Am I understand it correctly?
Thanks,
Jiangjie (Becket) Qin
On Mon, Dec 9, 2019 at 6:40 PM Dawid Wysakowicz <
dwysakow...@apache.org <mailto:dwysakow...@apache.org>>
wrote:
Hi all,
Really well written proposal and very important one. I must
admit
I
have
not understood all the intricacies of it yet.
One question I have though is about where does the information
about
boundedness come from. I think in most cases it is a property of
the
source. As you described it might be e.g. end offset, a flag
should
it
monitor new splits etc. I think it would be a really nice use
case
to
be
able to say:
new KafkaSource().readUntil(long timestamp),
which could work as an "end offset". Moreover I think all
Bounded
sources
support continuous mode, but no intrinsically continuous source
support
the
Bounded mode. If I understood the proposal correctly it suggest
the
boundedness sort of "comes" from the outside of the source, from
the
invokation of either boundedStream or continousSource.
I am wondering if it would make sense to actually change the
method
boolean Source#supportsBoundedness(Boundedness)
to
Boundedness Source#getBoundedness().
As for the methods #boundedSource, #continousSource, assuming
the
boundedness is property of the source they do not affect how the
enumerator
works, but mostly how the dag is scheduled, right? I am not
against
those
methods, but I think it is a very specific use case to actually
override
the property of the source. In general I would expect users to
only
call
env.source(theSource), where the source tells if it is bounded
or
not. I
would suggest considering following set of methods:
// boundedness depends of source property, imo this should
always
be
preferred
DataStream<MyType> stream = env.source(theSource);
// always continous execution, whether bounded or unbounded
source
DataStream<MyType> boundedStream =
env.continousSource(theSource);
// imo this would make sense if the BoundedDataStream provides
additional features unavailable for continous mode
BoundedDataStream<MyType> batch = env.boundedSource(theSource);
Best,
Dawid
On 04/12/2019 11:25, Stephan Ewen wrote:
Thanks, Becket, for updating this.
I agree with moving the aspects you mentioned into separate
FLIPs
-
this
one way becoming unwieldy in size.
+1 to the FLIP in its current state. Its a very detailed
write-up,
nicely
done!
On Wed, Dec 4, 2019 at 7:38 AM Becket Qin <becket....@gmail.com
<mailto:becket....@gmail.com>> <becket....@gmail.com <mailto:
becket....@gmail.com>>
<
becket....@gmail.com <mailto:becket....@gmail.com>> wrote:
Hi all,
Sorry for the long belated update. I have updated FLIP-27 wiki
page
with
the latest proposals. Some noticeable changes include:
1. A new generic communication mechanism between SplitEnumerator
and
SourceReader.
2. Some detail API method signature changes.
We left a few things out of this FLIP and will address them in
separate
FLIPs. Including:
1. Per split event time.
2. Event time alignment.
3. Fine grained failover for SplitEnumerator failure.
Please let us know if you have any question.
Thanks,
Jiangjie (Becket) Qin
On Sat, Nov 16, 2019 at 6:10 AM Stephan Ewen <se...@apache.org
<mailto:
se...@apache.org>> <se...@apache.org <mailto:se...@apache.org>> <
se...@apache.org <mailto:se...@apache.org>> wrote:
Hi Łukasz!
Becket and me are working hard on figuring out the last details
and
implementing the first PoC. We would update the FLIP hopefully
next
week.
There is a fair chance that a first version of this will be in
1.10,
but
I
think it will take another release to battle test it and migrate
the
connectors.
Best,
Stephan
On Fri, Nov 15, 2019 at 11:14 AM Łukasz Jędrzejewski <
l...@touk.pl
<mailto:l...@touk.pl>
<
l...@touk.pl <mailto:l...@touk.pl>>
wrote:
Hi,
This proposal looks very promising for us. Do you have any plans
in
which
Flink release it is going to be released? We are thinking on
using a
Data
Set API for our future use cases but on the other hand Data Set
API
is
going to be deprecated so using proposed bounded data streams
solution
could be more viable in the long term.
Thanks,
Łukasz
On 2019/10/01 15:48:03, Thomas Weise <thomas.we...@gmail.com
<mailto:
thomas.we...@gmail.com>> <thomas.we...@gmail.com <mailto:
thomas.we...@gmail.com>> <
thomas.we...@gmail.com <mailto:thomas.we...@gmail.com>> wrote:
Thanks for putting together this proposal!
I see that the "Per Split Event Time" and "Event Time Alignment"
sections
are still TBD.
It would probably be good to flesh those out a bit before
proceeding
too
far
as the event time alignment will probably influence the
interaction
with
the split reader, specifically ReaderStatus
emitNext(SourceOutput<E>
output).
We currently have only one implementation for event time
alignment
in
the
Kinesis consumer. The synchronization in that case takes place
as
the
last
step before records are emitted downstream (RecordEmitter). With
the
currently proposed interfaces, the equivalent can be implemented
in
the
reader loop, although note that in the Kinesis consumer the per
shard
threads push records.
Synchronization has not been implemented for the Kafka consumer
yet.
https://issues.apache.org/jira/browse/FLINK-12675 <
https://issues.apache.org/jira/browse/FLINK-12675>
When I looked at it, I realized that the implementation will
look
quite
different
from Kinesis because it needs to take place in the pull part,
where
records
are taken from the Kafka client. Due to the multiplexing it
cannot
be
done
by blocking the split thread like it currently works for
Kinesis.
Reading
from individual Kafka partitions needs to be controlled via
pause/resume
on the Kafka client.
To take on that responsibility the split thread would need to be
aware
of
the
watermarks or at least whether it should or should not continue
to
consume
a given split and this may require a different SourceReader or
SourceOutput
interface.
Thanks,
Thomas
On Fri, Jul 26, 2019 at 1:39 AM Biao Liu <mmyy1...@gmail.com
<mailto:
mmyy1...@gmail.com>> <mmyy1...@gmail.com <mailto:
mmyy1...@gmail.com
<
mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>> wrote:
Hi Stephan,
Thank you for feedback!
Will take a look at your branch before public discussing.
On Fri, Jul 26, 2019 at 12:01 AM Stephan Ewen <se...@apache.org
<mailto:se...@apache.org>> <se...@apache.org <mailto:
se...@apache.org
<
se...@apache.org <mailto:se...@apache.org>>
wrote:
Hi Biao!
Thanks for reviving this. I would like to join this discussion,
but
am
quite occupied with the 1.9 release, so can we maybe pause this
discussion
for a week or so?
In the meantime I can share some suggestion based on prior
experiments:
How to do watermarks / timestamp extractors in a simpler and
more
flexible
way. I think that part is quite promising should be part of the
new
source
interface.
https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/eventtime
<
https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/eventtime
https://github.com/StephanEwen/flink/blob/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src/SourceOutput.java
<
https://github.com/StephanEwen/flink/blob/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src/SourceOutput.java
Some experiments on how to build the source reader and its
library
for
common threading/split patterns:
https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src
<
https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src
Best,
Stephan
On Thu, Jul 25, 2019 at 10:03 AM Biao Liu <mmyy1...@gmail.com
<mailto:
mmyy1...@gmail.com>> <mmyy1...@gmail.com <mailto:
mmyy1...@gmail.com
<
mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>>
wrote:
Hi devs,
Since 1.9 is nearly released, I think we could get back to
FLIP-27.
I
believe it should be included in 1.10.
There are so many things mentioned in document of FLIP-27. [1] I
think
we'd better discuss them separately. However the wiki is not a
good
place
to discuss. I wrote google doc about SplitReader API which
misses
some
details in the document. [2]
1.
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface
<
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface
2.
https://docs.google.com/document/d/1R1s_89T4S3CZwq7Tf31DciaMCqZwrLHGZFqPASu66oE/edit?usp=sharing
<
https://docs.google.com/document/d/1R1s_89T4S3CZwq7Tf31DciaMCqZwrLHGZFqPASu66oE/edit?usp=sharing
CC Stephan, Aljoscha, Piotrek, Becket
On Thu, Mar 28, 2019 at 4:38 PM Biao Liu <mmyy1...@gmail.com
<mailto:
mmyy1...@gmail.com>> <mmyy1...@gmail.com <mailto:
mmyy1...@gmail.com
<
mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>>
wrote:
Hi Steven,
Thank you for the feedback. Please take a look at the document
FLIP-27
<
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
<
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
which
is updated recently. A lot of details of enumerator were added
in
this
document. I think it would help.
Steven Wu <stevenz...@gmail.com <mailto:stevenz...@gmail.com>>
<
stevenz...@gmail.com <mailto:stevenz...@gmail.com>> <
stevenz...@gmail.com
<mailto:stevenz...@gmail.com>> <stevenz...@gmail.com <mailto:
stevenz...@gmail.com>>
于2019年3月28日周四
下午12:52写道:
This proposal mentioned that SplitEnumerator might run on the
JobManager or
in a single task on a TaskManager.
if enumerator is a single task on a taskmanager, then the job
DAG
can
never
been embarrassingly parallel anymore. That will nullify the
leverage
of
fine-grained recovery for embarrassingly parallel jobs.
It's not clear to me what's the implication of running
enumerator
on
the
jobmanager. So I will leave that out for now.
On Mon, Jan 28, 2019 at 3:05 AM Biao Liu <mmyy1...@gmail.com
<mailto:
mmyy1...@gmail.com>> <mmyy1...@gmail.com <mailto:
mmyy1...@gmail.com
<
mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>>
wrote:
Hi Stephan & Piotrek,
Thank you for feedback.
It seems that there are a lot of things to do in community.
I
am
just
afraid that this discussion may be forgotten since there so
many
proposals
recently.
Anyway, wish to see the split topics soon :)
Piotr Nowojski <pi...@da-platform.com <mailto:
pi...@da-platform.com
<
pi...@da-platform.com <mailto:pi...@da-platform.com>> <
pi...@da-platform.com <mailto:pi...@da-platform.com>> <
pi...@da-platform.com <mailto:pi...@da-platform.com>>
于2019年1月24日周四
下午8:21写道:
Hi Biao!
This discussion was stalled because of preparations for
the
open
sourcing
& merging Blink. I think before creating the tickets we
should
split this
discussion into topics/areas outlined by Stephan and
create
Flips
for
that.
I think there is no chance for this to be completed in
couple
of
remaining
weeks/1 month before 1.8 feature freeze, however it would
be
good
to aim
with those changes for 1.9.
Piotrek
On 20 Jan 2019, at 16:08, Biao Liu <mmyy1...@gmail.com <mailto:
mmyy1...@gmail.com>> <mmyy1...@gmail.com <mailto:
mmyy1...@gmail.com
<
mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>>
wrote:
Hi community,
The summary of Stephan makes a lot sense to me. It is
much
clearer
indeed
after splitting the complex topic into small ones.
I was wondering is there any detail plan for next step?
If
not,
I
would
like to push this thing forward by creating some JIRA
issues.
Another question is that should version 1.8 include
these
features?
Stephan Ewen <se...@apache.org <mailto:se...@apache.org>> <
se...@apache.org <mailto:se...@apache.org>> <se...@apache.org
<mailto:
se...@apache.org>> <se...@apache.org <mailto:se...@apache.org>>
于2018年12月1日周六
上午4:20写道:
Thanks everyone for the lively discussion. Let me try
to
summarize
where I
see convergence in the discussion and open issues.
I'll try to group this by design aspect of the source.
Please
let me
know
if I got things wrong or missed something crucial here.
For issues 1-3, if the below reflects the state of the
discussion, I
would
try and update the FLIP in the next days.
For the remaining ones we need more discussion.
I would suggest to fork each of these aspects into a
separate
mail
thread,
or will loose sight of the individual aspects.
*(1) Separation of Split Enumerator and Split Reader*
- All seem to agree this is a good thing
- Split Enumerator could in the end live on JobManager
(and
assign
splits
via RPC) or in a task (and assign splits via data
streams)
- this discussion is orthogonal and should come later,
when
the
interface
is agreed upon.
*(2) Split Readers for one or more splits*
- Discussion seems to agree that we need to support
one
reader
that
possibly handles multiple splits concurrently.
- The requirement comes from sources where one
poll()-style
call
fetches
data from different splits / partitions
--> example sources that require that would be for
example
Kafka,
Pravega, Pulsar
- Could have one split reader per source, or multiple
split
readers
that
share the "poll()" function
- To not make it too complicated, we can start with
thinking
about
one
split reader for all splits initially and see if that
covers
all
requirements
*(3) Threading model of the Split Reader*
- Most active part of the discussion ;-)
- A non-blocking way for Flink's task code to interact
with
the
source
is
needed in order to a task runtime code based on a
single-threaded/actor-style task design
--> I personally am a big proponent of that, it will
help
with
well-behaved checkpoints, efficiency, and simpler yet
more
robust
runtime
code
- Users care about simple abstraction, so as a
subclass
of
SplitReader
(non-blocking / async) we need to have a
BlockingSplitReader
which
will
form the basis of most source implementations.
BlockingSplitReader
lets
users do blocking simple poll() calls.
- The BlockingSplitReader would spawn a thread (or
more)
and
the
thread(s) can make blocking calls and hand over data
buffers
via
a
blocking
queue
- This should allow us to cover both, a fully async
runtime,
and a
simple
blocking interface for users.
- This is actually very similar to how the Kafka
connectors
work.
Kafka
9+ with one thread, Kafka 8 with multiple threads
- On the base SplitReader (the async one), the
non-blocking
method
that
gets the next chunk of data would signal data
availability
via
a
CompletableFuture, because that gives the best
flexibility
(can
await
completion or register notification handlers).
- The source task would register a "thenHandle()" (or
similar)
on the
future to put a "take next data" task into the
actor-style
mailbox
*(4) Split Enumeration and Assignment*
- Splits may be generated lazily, both in cases where
there
is a
limited
number of splits (but very many), or splits are
discovered
over
time
- Assignment should also be lazy, to get better load
balancing
- Assignment needs support locality preferences
- Possible design based on discussion so far:
--> SplitReader has a method "addSplits(SplitT...)"
to
add
one or
more
splits. Some split readers might assume they have only
one
split
ever,
concurrently, others assume multiple splits. (Note:
idea
behind
being
able
to add multiple splits at the same time is to ease
startup
where
multiple
splits may be assigned instantly.)
--> SplitReader has a context object on which it can
call
indicate
when
splits are completed. The enumerator gets that
notification and
can
use
to
decide when to assign new splits. This should help both
in
cases
of
sources
that take splits lazily (file readers) and in case the
source
needs to
preserve a partial order between splits (Kinesis,
Pravega,
Pulsar may
need
that).
--> SplitEnumerator gets notification when
SplitReaders
start
and
when
they finish splits. They can decide at that moment to
push
more
splits
to
that reader
--> The SplitEnumerator should probably be aware of
the
source
parallelism, to build its initial distribution.
- Open question: Should the source expose something
like
"host
preferences", so that yarn/mesos/k8s can take this into
account
when
selecting a node to start a TM on?
*(5) Watermarks and event time alignment*
- Watermark generation, as well as idleness, needs to
be
per
split
(like
currently in the Kafka Source, per partition)
- It is desirable to support optional
event-time-alignment,
meaning
that
splits that are ahead are back-pressured or temporarily
unsubscribed
- I think i would be desirable to encapsulate
watermark
generation
logic
in watermark generators, for a separation of concerns.
The
watermark
generators should run per split.
- Using watermark generators would also help with
another
problem of
the
suggested interface, namely supporting non-periodic
watermarks
efficiently.
- Need a way to "dispatch" next record to different
watermark
generators
- Need a way to tell SplitReader to "suspend" a split
until a
certain
watermark is reached (event time backpressure)
- This would in fact be not needed (and thus simpler)
if
we
had
a
SplitReader per split and may be a reason to re-open
that
discussion
*(6) Watermarks across splits and in the Split
Enumerator*
- The split enumerator may need some watermark
awareness,
which
should
be
purely based on split metadata (like create timestamp
of
file
splits)
- If there are still more splits with overlapping
event
time
range
for
a
split reader, then that split reader should not advance
the
watermark
within the split beyond the overlap boundary. Otherwise
future
splits
will
produce late data.
- One way to approach this could be that the split
enumerator
may
send
watermarks to the readers, and the readers cannot emit
watermarks
beyond
that received watermark.
- Many split enumerators would simply immediately send
Long.MAX
out
and
leave the progress purely to the split readers.
- For event-time alignment / split back pressure, this
begs
the
question
how we can avoid deadlocks that may arise when splits
are
suspended
for
event time back pressure,
*(7) Batch and streaming Unification*
- Functionality wise, the above design should support
both
- Batch often (mostly) does not care about reading "in
order"
and
generating watermarks
--> Might use different enumerator logic that is
more
locality
aware
and ignores event time order
--> Does not generate watermarks
- Would be great if bounded sources could be
identified
at
compile
time,
so that "env.addBoundedSource(...)" is type safe and
can
return a
"BoundedDataStream".
- Possible to defer this discussion until later
*Miscellaneous Comments*
- Should the source have a TypeInformation for the
produced
type,
instead
of a serializer? We need a type information in the
stream
anyways, and
can
derive the serializer from that. Plus, creating the
serializer
should
respect the ExecutionConfig.
- The TypeSerializer interface is very powerful but
also
not
easy to
implement. Its purpose is to handle data super
efficiently,
support
flexible ways of evolution, etc.
For metadata I would suggest to look at the
SimpleVersionedSerializer
instead, which is used for example for checkpoint
master
hooks,
or for
the
streaming file sink. I think that is is a good match
for
cases
where
we
do
not need more than ser/deser (no copy, etc.) and don't
need to
push
versioning out of the serialization paths for best
performance
(as in
the
TypeSerializer)
On Tue, Nov 27, 2018 at 11:45 AM Kostas Kloudas <
k.klou...@data-artisans.com>
wrote:
Hi Biao,
Thanks for the answer!
So given the multi-threaded readers, now we have as
open
questions:
1) How do we let the checkpoints pass through our
multi-threaded
reader
operator?
2) Do we have separate reader and source operators or
not? In
the
strategy
that has a separate source, the source operator has a
parallelism of
1
and
is responsible for split recovery only.
For the first one, given also the constraints
(blocking,
finite
queues,
etc), I do not have an answer yet.
For the 2nd, I think that we should go with separate
operators
for
the
source and the readers, for the following reasons:
1) This is more aligned with a potential future
improvement
where the
split
discovery becomes a responsibility of the JobManager
and
readers are
pooling more work from the JM.
2) The source is going to be the "single point of
truth".
It
will
know
what
has been processed and what not. If the source and the
readers
are a
single
operator with parallelism > 1, or in general, if the
split
discovery
is
done by each task individually, then:
i) we have to have a deterministic scheme for each
reader to
assign
splits to itself (e.g. mod subtaskId). This is not
necessarily
trivial
for
all sources.
ii) each reader would have to keep a copy of all its
processed
slpits
iii) the state has to be a union state with a
non-trivial
merging
logic
in order to support rescaling.
Two additional points that you raised above:
i) The point that you raised that we need to keep all
splits
(processed
and
not-processed) I think is a bit of a strong
requirement.
This
would
imply
that for infinite sources the state will grow
indefinitely.
This is
problem
is even more pronounced if we do not have a single
source
that
assigns
splits to readers, as each reader will have its own
copy
of
the
state.
ii) it is true that for finite sources we need to
somehow
not
close
the
readers when the source/split discoverer finishes. The
ContinuousFileReaderOperator has a work-around for
that.
It is
not
elegant,
and checkpoints are not emitted after closing the
source,
but
this, I
believe, is a bigger problem which requires more
changes
than
just
refactoring the source interface.
Cheers,
Kostas
--
Best, Jingsong Lee
--
Best, Jingsong Lee