Re: [DISCUSS] Drop Scala 2.11

2020-09-13 Thread Chesnay Schepler
Are we then also dropping the scala-shell, since it still doesn't work 
on 2.12?


On 9/11/2020 2:12 PM, Timo Walther wrote:

Big +1 to drop Scala 2.11

This would mean that we can finally use Java 8 language features that 
are integrated with Scala.


Regards,
Timo

On 11.09.20 13:15, Igal Shilman wrote:

@Galen  FYI: the upcoming StateFun release would use Scala2.12

On Thu, Sep 10, 2020 at 5:14 PM Seth Wiesman  
wrote:



@glen

Yes, we would absolutely migrate statefun. StateFun can be compiled 
with

Scala 2.12 today, I'm not sure why it's not cross released.

@aljoscha :)

@mathieu Its on the roadmap but it's non-trivial and I'm not aware of
anyone actively working on it.

On Thu, Sep 10, 2020 at 10:09 AM Matthieu Bonneviot
 wrote:


That makes sense.
We are using 2.12 for our production
Also, for flink scala 2.12 support, it is in fact limited to scala 
2.12.7.

It is binary incompatible with version 2.12 above (
https://issues.apache.org/jira/browse/FLINK-12461 )
That would be great to at least move to a more recent 2.12 version, 
and

ideally to 2.13.

Is there any scala support plan available?

Matthieu


On Thu, Sep 10, 2020 at 5:00 PM Aljoscha Krettek 
wrote:

Yes! I would be in favour of this since it's blocking us from 
upgrading

certain dependencies.

I would also be in favour of dropping Scala completely but that's a
different story.

Aljoscha

On 10.09.20 16:51, Seth Wiesman wrote:

Hi Everyone,

Think of this as a pre-flip, but what does everyone think about

dropping

Scala 2.11 support from Flink.

The last patch release was in 2017 and in that time the scala

community

has

released 2.13 and is working towards a 3.0 release. Apache Kafka and

Spark

have both dropped 2.11 support in recent versions. In fact, Flink's
universal Kafka connector is stuck on 2.4 because that is the last

version

with scala 2.11 support.

What are people's thoughts on dropping Scala 2.11? How many are 
still

using

it in production?

Seth






--
Matthieu Bonneviot
Senior R&D Engineer, DataDome
M +33 7 68 29 79 34  <+33+7+68+29+79+34>
E matthieu.bonnev...@datadome.co 
W www.datadome.co
<
http://www.datadome.co?utm_source=WiseStamp&utm_medium=email&utm_term=&utm_content=&utm_campaign=signature 





[image: facebook]
<
https://www.facebook.com/datadome/?utm_source=WiseStamp&utm_medium=email&utm_term=&utm_content=&utm_campaign=signature 




[image:
linkedin]
<
https://fr.linkedin.com/company/datadome?utm_source=WiseStamp&utm_medium=email&utm_term=&utm_content=&utm_campaign=signature 




[image:
twitter]
<
https://twitter.com/data_dome?utm_source=WiseStamp&utm_medium=email&utm_term=&utm_content=&utm_campaign=signature 
















Re: [DISCUSS] Drop Scala 2.11

2020-09-13 Thread Jeff Zhang
I would hope we can make scala shell with scala 2.12 before dropping 2.11,
many users are still using scala shell to try out new features of flink,
especially for new users.


Chesnay Schepler  于2020年9月13日周日 下午7:59写道:

> Are we then also dropping the scala-shell, since it still doesn't work
> on 2.12?
>
> On 9/11/2020 2:12 PM, Timo Walther wrote:
> > Big +1 to drop Scala 2.11
> >
> > This would mean that we can finally use Java 8 language features that
> > are integrated with Scala.
> >
> > Regards,
> > Timo
> >
> > On 11.09.20 13:15, Igal Shilman wrote:
> >> @Galen  FYI: the upcoming StateFun release would use Scala2.12
> >>
> >> On Thu, Sep 10, 2020 at 5:14 PM Seth Wiesman 
> >> wrote:
> >>
> >>> @glen
> >>>
> >>> Yes, we would absolutely migrate statefun. StateFun can be compiled
> >>> with
> >>> Scala 2.12 today, I'm not sure why it's not cross released.
> >>>
> >>> @aljoscha :)
> >>>
> >>> @mathieu Its on the roadmap but it's non-trivial and I'm not aware of
> >>> anyone actively working on it.
> >>>
> >>> On Thu, Sep 10, 2020 at 10:09 AM Matthieu Bonneviot
> >>>  wrote:
> >>>
>  That makes sense.
>  We are using 2.12 for our production
>  Also, for flink scala 2.12 support, it is in fact limited to scala
>  2.12.7.
>  It is binary incompatible with version 2.12 above (
>  https://issues.apache.org/jira/browse/FLINK-12461 )
>  That would be great to at least move to a more recent 2.12 version,
>  and
>  ideally to 2.13.
> 
>  Is there any scala support plan available?
> 
>  Matthieu
> 
> 
>  On Thu, Sep 10, 2020 at 5:00 PM Aljoscha Krettek  >
>  wrote:
> 
> > Yes! I would be in favour of this since it's blocking us from
> > upgrading
> > certain dependencies.
> >
> > I would also be in favour of dropping Scala completely but that's a
> > different story.
> >
> > Aljoscha
> >
> > On 10.09.20 16:51, Seth Wiesman wrote:
> >> Hi Everyone,
> >>
> >> Think of this as a pre-flip, but what does everyone think about
>  dropping
> >> Scala 2.11 support from Flink.
> >>
> >> The last patch release was in 2017 and in that time the scala
>  community
> > has
> >> released 2.13 and is working towards a 3.0 release. Apache Kafka and
> > Spark
> >> have both dropped 2.11 support in recent versions. In fact, Flink's
> >> universal Kafka connector is stuck on 2.4 because that is the last
> > version
> >> with scala 2.11 support.
> >>
> >> What are people's thoughts on dropping Scala 2.11? How many are
> >> still
> > using
> >> it in production?
> >>
> >> Seth
> >>
> >
> >
> 
>  --
>  Matthieu Bonneviot
>  Senior R&D Engineer, DataDome
>  M +33 7 68 29 79 34  <+33+7+68+29+79+34>
>  E matthieu.bonnev...@datadome.co 
>  W www.datadome.co
>  <
> 
> http://www.datadome.co?utm_source=WiseStamp&utm_medium=email&utm_term=&utm_content=&utm_campaign=signature
> 
> >
> 
>  [image: facebook]
>  <
> 
> https://www.facebook.com/datadome/?utm_source=WiseStamp&utm_medium=email&utm_term=&utm_content=&utm_campaign=signature
> 
> >
>  [image:
>  linkedin]
>  <
> 
> https://fr.linkedin.com/company/datadome?utm_source=WiseStamp&utm_medium=email&utm_term=&utm_content=&utm_campaign=signature
> 
> >
>  [image:
>  twitter]
>  <
> 
> https://twitter.com/data_dome?utm_source=WiseStamp&utm_medium=email&utm_term=&utm_content=&utm_campaign=signature
> 
> >
> 
> >>>
> >>
> >
> >
>
>

-- 
Best Regards

Jeff Zhang


Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-13 Thread Steven Wu
Aljoscha, thanks a lot for the detailed response. Now I have a better
understanding of the initial scope.

To me, there are possibly three different committer behaviors. For the lack
of better names, let's call them
* No/NoopCommitter
* Committer / LocalCommitter (file sink?)
* GlobalCommitter (Iceberg)

## Writer interface

For the Writer interface, should we add "*prepareSnapshot"* before the
checkpoint barrier emitted downstream?  IcebergWriter would need it. Or
would the framework call "*flush*" before the barrier emitted downstream?
that guarantee would achieve the same goal.
-
// before barrier emitted to downstream
void prepareSnapshot(long checkpointId) throws Exception;

// or will flush be called automatically before the barrier emitted
downstream?
// if yes, we need the checkpointId arg for the reason listed in [1]
void flush(WriterOutput output) throws IOException;

In [1], we discussed the reason for Writer to emit (checkpointId, CommT)
tuple to the committer. The committer needs checkpointId to separate out
data files for different checkpoints if concurrent checkpoints are enabled.
For that reason, writers need to know the checkpointId where the restore
happened. Can we add a RestoreContext interface to the restoreWriter method?
---
Writer restoreWriter(InitContext context,
RestoreContext restoreContext, List state, List share);

interface RestoreContext {
  long getCheckpointId();
}


## Committer interface

For the Committer interface, I am wondering if we should split the single
commit method into separate "*collect"* and "*commit"* methods? This way,
it can handle both single and multiple CommT objects.
--
void commit(CommT committable) throws Exception;
  ==>
void collect(CommT committable) throws Exception;
void commit() throws Exception;

As discussed in [1] and mentioned above, the Iceberg committer needs to
know which checkpointId is the commit for. So can we add checkpiontId arg
to the commit API. However, I don't know how this would affect the batch
execution where checkpoints are disabled.
--
void commit(long checkpointId) throws Exception;

For Iceberg, writers don't need any state. But the GlobalCommitter needs to
checkpoint StateT. For the committer, CommT is "DataFile". Since a single
committer can collect thousands (or more) data files in one checkpoint
cycle, as an optimization we checkpoint a single "ManifestFile" (for the
collected thousands data files) as StateT. This allows us to absorb
extended commit outages without losing written/uploaded data files, as
operator state size is as small as one manifest file per checkpoint cycle
[2].
--
StateT snapshotState(SnapshotContext context) throws Exception;

That means we also need the restoreCommitter API in the Sink interface
---
Committer restoreCommitter(InitContext context, StateT
state);


Thanks,
Steven

[1] https://github.com/apache/iceberg/pull/1185#discussion_r479589663
[2] https://github.com/apache/iceberg/pull/1185#discussion_r479457104



On Fri, Sep 11, 2020 at 3:27 AM Aljoscha Krettek 
wrote:

> Regarding the FLIP itself, I like the motivation section and the
> What/How/When/Where section a lot!
>
> I don't understand why we need the "Drain and Snapshot" section. It
> seems to be some details about stop-with-savepoint and drain, and the
> relation to BATCH execution but I don't know if it is needed to
> understand the rest of the document. I'm happy to be wrong here, though,
> if there's good reasons for the section.
>
> On the question of Alternative 1 and 2, I have a strong preference for
> Alternative 1 because we could avoid strong coupling to other modules.
> With Alternative 2 we would depend on `flink-streaming-java` and even
> `flink-runtime`. For the new source API (FLIP-27) we managed to keep the
> dependencies slim and the code is in flink-core. I'd be very happy if we
> can manage the same for the new sink API.
>
> Best,
> Aljoscha
>
> On 11.09.20 12:02, Aljoscha Krettek wrote:
> > Hi Everyone,
> >
> > thanks to Guowei for publishing the FLIP, and thanks Steven for the very
> > thoughtful email!
> >
> > We thought a lot internally about some of the questions you posted but
> > left a lot (almost all) of the implementation details out of the FLIP
> > for now because we wanted to focus on semantics and API. I'll try and
> > address the points below.
> >
> > ## Initial Scope of the new Sink API
> >
> > We need to accept some initial scope that we want to achieve for Flink
> > 1.12. I don't think we can try and find the solution that will work for
> > all current and future external systems. For me, the initial goal would
> > be to produce a Sink API and implementations for systems where you can
> > prepare "committables" in one process and commit those from another
> > process. Those are systems that support "real" transactions as you need
> > them in a two-phase commit protocol. This includes:
> >
> >   - File Sin

Re: [DISCUSS] Deprecate and remove UnionList OperatorState

2020-09-13 Thread Steven Wu
Right now, we use UnionState to store the `nextCheckpointId` in the Iceberg
sink use case, because we can't retrieve the checkpointId from
the FunctionInitializationContext during the restore case. But we can move
away from it if the restore context provides the checkpointId.

On Sat, Sep 12, 2020 at 8:20 AM Alexey Trenikhun  wrote:

> -1
>
> We use union state to generate sequences, each operator generates offset0
> + number-of-tasks -  task-index + task-specific-counter * number-of-tasks
> (e.g. for 2 instances of operator -one instance produce even number,
> another odd). Last generated sequence number is stored union list state, on
> restart from where we should start to avoid collision with already
> generated numbers, to do saw we calculate offset0 as max over union list
> state.
>
> Alexey
>
> --
> *From:* Seth Wiesman 
> *Sent:* Wednesday, September 9, 2020 9:37:03 AM
> *To:* dev 
> *Cc:* Aljoscha Krettek ; user 
> *Subject:* Re: [DISCUSS] Deprecate and remove UnionList OperatorState
>
> Generally +1
>
> The one use case I've seen of union state I've seen in production (outside
> of sources and sinks) is as a "poor mans" broadcast state. This was
> obviously before that feature was added which is now a few years ago so I
> don't know if those pipelines still exist. FWIW, if they do the state
> processor api can provide a migration path as it supports rewriting union
> state as broadcast state.
>
> Seth
>
> On Wed, Sep 9, 2020 at 10:21 AM Arvid Heise  wrote:
>
> +1 to getting rid of non-keyed state as is in general and for union state
> in particular. I had a hard time to wrap my head around the semantics of
> non-keyed state when designing the rescale of unaligned checkpoint.
>
> The only plausible use cases are legacy source and sinks. Both should also
> be reworked in deprecated.
>
> My main question is how to represent state in these two cases. For sources,
> state should probably be bound to splits. In that regard, split (id) may
> act as a key. More generally, there should be probably a concept that
> supersedes keys and includes splits.
>
> For sinks, I can see two cases:
> - Either we are in a keyed context, then state should be bound to the key.
> - Or we are in a non-keyed context, then state might be bound to the split
> (?) in case of a source->sink chaining.
> - Maybe it should also be a new(?) concept like output partition.
>
> It's not clear to me if there are more cases and if we can always find a
> good way to bind state to some sort of key, especially for arbitrary
> communication patterns (which we may need to replace as well potentially).
>
> On Wed, Sep 9, 2020 at 4:09 PM Aljoscha Krettek 
> wrote:
>
> > Hi Devs,
> >
> > @Users: I'm cc'ing the user ML to see if there are any users that are
> > relying on this feature. Please comment here if that is the case.
> >
> > I'd like to discuss the deprecation and eventual removal of UnionList
> > Operator State, aka Operator State with Union Redistribution. If you
> > don't know what I'm talking about you can take a look in the
> > documentation: [1]. It's not documented thoroughly because it started
> > out as mostly an internal feature.
> >
> > The immediate main reason for removing this is also mentioned in the
> > documentation: "Do not use this feature if your list may have high
> > cardinality. Checkpoint metadata will store an offset to each list
> > entry, which could lead to RPC framesize or out-of-memory errors." The
> > insidious part of this limitation is that you will only notice that
> > there is a problem when it is too late. Checkpointing will still work
> > and a program can continue when the state size is too big. The system
> > will only fail when trying to restore from a snapshot that has union
> > state that is too big. This could be fixed by working around that issue
> > but I think there are more long-term issues with this type of state.
> >
> > I think we need to deprecate and remove API for state that is not tied
> > to a key. Keyed state is easy to reason about, the system can
> > re-partition state and also re-partition records and therefore scale the
> > system in and out. Operator state, on the other hand is not tied to a
> > key but an operator. This is a more "physical" concept, if you will,
> > that potentially ties business logic closer to the underlying runtime
> > execution model, which in turns means less degrees of freedom for the
> > framework, that is Flink. This is future work, though, but we should
> > start with deprecating union list state because it is the potentially
> > most dangerous type of state.
> >
> > We currently use this state type internally in at least the
> > StreamingFileSink, FlinkKafkaConsumer, and FlinkKafkaProducer. However,
> > we're in the process of hopefully getting rid of it there with our work
> > on sources and sinks. Before we fully remove it, we should of course
> > signal this to users by deprecating it.
> >
> > What do you think?
> >
> >