Ha! this is turning out to be quite the discussion. :-) Also, thanks
Kenn, for chiming in with the Beam perspective!
I'll try and address some stuff.
It seems we have some consensus on using N-ary operator to implement
side inputs. I see two ways forward there:
- Have a "pure" N-ary operator that has zero inputs by default and all
N inputs are equal: this exists side-by-side with the current one-input
operator and two-input operator.
- Extends the existing operators with more inputs: the main input(s)
would be considered different from the N other inputs, internally. With
this, we would not have to rewrite existing operators and could simply
have side inputs as an add-on.
There weren't any (many?) comments on using broadcast state for side
inputs. I think there is not much to agree on there because it seems
pretty straightforward to me that we need this.
About buffering: I think we need this as a Flink service because it is
right now not (easily) possible to buffer keyed input. For keyed input
we need to checkpoint the input buffers with the key-grouped state.
Otherwise the data would not be distributed to the correct operator when
restoring. This is explained in the FLIP in more detail.
If we have these three components (n-ary inputs, broadcast state and
input buffering) then side inputs are mostly API sugar on top. I even
believe that it might be enough to simply provide these and then users
have a very flexible system that allows them to implement different
side-input variants. I'm suggesting this because I see there are a lot
of different opinions and because the "field" of determining a side
input to be finished is still quite open.
Now, regarding Gabor's comments which, I think, pretty nicely summed up
the ongoing discussion and added some new stuff:
About the CoFlatMap for the simple case: I think this is almost
possible, except for the buffering in case of a keyed input stream.
Also, the side input is not easy to store because we need broadcast
state for that (depending, of course, on whether the input(s) are keyed
or not). I think with the above-mentioned additions this case would be
possible without explicit support for side inputs in the API.
Re 1)
I would prefer to use windowing/triggers for determining side-input
readiness. There are, right now, enough messages flying around the
system and introducing yet more doesn't seem to desirable for me right
now. We should, of course, revisit this once we have the basic
components in place.
Re 2)
See my comments about buffering in a keyed operator above. Regarding
blocking, this is currently not possible because all inputs are consumed
by one thread. This could, of course, change in the future but it is a
feature (limitation?) of the current implementation. In general, I think
blocking an input is only ever feasible while waiting for some bounded
inputs to be fully consumed. I.e. when you have some initial loading of
data from a static data set.
Re 3)
Agreed, I think that we should keep the side-input in the (yet to be
introduced) broadcast state. Again, once we have the basics in place we
can investigate further optimisations here such as not checkpointing
side-input data from a static data set because we know that we can
easily rebuild it.
What do you think?
On Fri, Mar 10, 2017, at 20:44, Kenneth Knowles wrote:
Hi all,
I thought I would briefly join this thread to mention some side input
lessons from Apache Beam. My knowledge of Flink is not deep enough,
technically or philosophically, to make any specific recommendations. And
I
might just be repeating things that the docs and threads cover, but I
hope
it might be helpful anyhow.
Side Input Visibility / matching: Beam started with a coupling between
the
windowing on a stream and the way that windows are mapped between main
input and side input. This is actually not needed and we'll be making the
mapping explicit (with sensible defaults). In particular, the mapping
determines when you can garbage collect, when you know that no main input
element will ever map to a particular window again (so opaque mappings
need
some metadata).
Side Input Readiness: There is an unpleasant asymmetry between waiting
for
the first triggering of a side input but not waiting for any later
triggering. This manifests strongly when a user actually wants to know
something about the relationship to side input update latency and main
input processing. This echoes some of the concern here about user-defined
control over readiness. IMO this is a rather open area.
Default values for singleton side inputs: A special case of side input
readiness that is related also to windowing. By far the most useful
singleton side input is the result of a global reduction with an
associative&commutative operator. A lot of these operators also have an
identity element. It is nice for this identity element (known a priori)
to
be "always available" on the side input, for every window, if it is
expected to be something that is continually updated. But if the
configuration is such that it is a one-time triggering of bounded data,
that behavior is not right. Related, after some amount of time we
conclude
that no input will ever be received for a window, and the side input
becomes ready.
Map Side Inputs with triggers: When new data arrives for a key in Beam,
there's no way to know which value should "win", so you basically just
can't use map side inputs with triggers.
These are just some quick thoughts at a very high level.
Kenn
On Thu, Mar 9, 2017 at 7:59 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:
Hi Jamie,
actually the approach where the .withSideInput() comes before the user
function is only required for implementation proposal #1, which I like
the least. For the other two it can be after the user function, which
is
also what I prefer.
Regarding semantics: yes, we simply wait for anything to be available.
For GlobalWindows, i.e. side inputs on a normal function where we
simply
don't have windows, this means that we wait for anything. For the
windowed case, which I'm proposing as a second step we will wait for
side input in a window to be available that matches the main-input
window. For the keyed case we wait for something on the same key to be
available, for the broadcast case we wait for anything.
Best,
Aljoscha
On Thu, Mar 9, 2017, at 16:55, Jamie Grier wrote:
Hi, I think the proposal looks good. The only thing I wasn't clear
on
was
which API is actually being proposed. The one where .withSideInput()
comes
before the user function or after. I would definitely prefer it come
after
since that's the normal pattern in the Flink API. I understood that
makes
the implementation different (maybe harder) but I think it helps
keep the
API uniform which is really good.
Overall I think the API looks good and yes there are some tricky
semantics
here but in general if, when processing keyed main streams, we always
wait
until there is a side-input available for that key we're off to a
great
start and I think that was what you're suggesting in the design doc.
-Jamie
On Thu, Mar 9, 2017 at 7:27 AM, Aljoscha Krettek <
aljos...@apache.org>
wrote:
Hi,
these are all valuable suggestions and I think that we should
implement
them when the time is right. However, I would like to first get a
minimal viable version of this feature into Flink and then expand
on
it.
I think the last few tries of tackling this problem fizzled out
because
we got to deep into discussing special semantics and features. I
think
the most important thing to agree on right now is the basic API
and the
implementation plan. What do you think about that?
Regarding your suggestions, I have in fact a branch [1] from May
2016
where I implemented a prototype implementation. This has an n-ary
operator and inputs can be either bounded or unbounded and the
implementation actually waits for all bounded inputs to finish
before
starting to process the unbounded inputs.
In general, I think blocking on an input is only possible while
you're
waiting for a bounded input to finish. If all inputs are unbounded
you
cannot block because you might run into deadlocks (in the
processing
graph, due to back pressure) and also because blocking will also
block
elements that might have a lower timestamp and might fall into a
different window which is already ready for processing.
Best,
Aljoscha
[1]
https://github.com/aljoscha/flink/commits/operator-ng-side-
input-wrapper
On Tue, Mar 7, 2017, at 14:39, wenlong.lwl wrote:
Hi Aljoscha, thank you for the proposal, it is great to hear
about
the
progress in side input.
Following is my point of view:
1. I think there may be an option to block the processing of the
main
input
instead of buffer the data in state because in production, the
through
put
of the main input is usually much larger, and buffering the data
before
the
side input may slow down the preparing of side input since the
i-o
and
computing resources are always limited.
2. another issue may need to be disscussed: how can we do
checkpointing
with side input, because static side input may finish soon once
started
which will stop the checkpointing.
3. I agree with Gyula that user should be able to determines
when a
side
input is ready? Maybe we can do it one step further: whether
users
can
determine a operator with multiple inputs to process which input
each
time
or not? It would be more flexible.
Best Regards!
Wenlong
On 7 March 2017 at 18:39, Ventura Del Monte <
venturadelmo...@gmail.com>
wrote:
Hi Aljoscha,
Thank you for the proposal and for bringing up again this
discussion.
Regarding the implementation aspect,I would say the first way
could
be easier/faster to implement but it could add some overhead
when
dealing with multiple side inputs through the current 2-streams
union
transform. I tried the second option myself as it has less
overhead
but then the outcome was something close to a N-ary operator
consuming
first each side input while buffering the main one.
Therefore, I would choose the third option as it is more
generic
and might help also in other scenarios, although its
implementation
requires more effort.
I also agree with Gyula, I think the user should be allowed to
define
the
condition that determines when a side input is ready, e.g.,
load
the
side
input first, incrementally update the side input.
Best,
Ventura
This message, for the D. Lgs n. 196/2003 (Privacy Code), may
contain
confidential and/or privileged information. If you are not the
addressee or
authorized to receive this for the addressee, you must not use,
copy,
disclose or take any action based on this message or any
information
herein. If you have received this message in error, please
advise
the
sender immediately by reply e-mail and delete this message.
Thank
you
for
your cooperation.
On Mon, Mar 6, 2017 at 3:50 PM, Gyula Fóra <
gyula.f...@gmail.com>
wrote:
Hi Aljoscha,
Thank you for the nice proposal!
I think it would make sense to allow user's to affect the
readiness
of
the
side input. I think making it ready when the first element
arrives is
only
slightly better then making it always ready from usability
perspective.
For
instance if I am joining against a static data set I want to
wait
for the
whole set before making it ready. This could be exposed as a
user
defined
condition that could also recognize bounded inputs maybe.
Maybe we could also add an aggregating (merging) side input
type,
that
could work as a broadcast state.
What do you think?
Gyula
Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont:
2017.
márc.
6.,
H, 15:18):
Hi Folks,
I would like to finally agree on a plan for implementing
side
inputs in
Flink. There has already been an attempt to come to
consensus
[1],
which
resulted in two design documents. I tried to consolidate
those
two
and
also added a section about implementation plans. This is
the
resulting
FLIP:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-
17+Side+Inputs+for+DataStream+API
In terms of semantics I tried to go with the minimal viable
solution.
The part that needs discussing is how we want to implement
this. I
outlined three possible implementation plans in the FLIP
but
what
it
boils down to is that we need to introduce some way of
getting
several
inputs into an operator/task.
Please have a look at the doc and let us know what you
think.
Best,
Aljoscha
[1]
https://lists.apache.org/thread.html/
797df0ba066151b77c7951fd7d603a
8afd7023920d0607a0c6337db3@1462181294@%3Cdev.flink.apache.
org%3E
--
Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com