Indeed, if you use a scala-free Flink then Scala types would currently go through Kryo, hence why we will recommend to use Java types /for the time being/. We are aware that this is an annoying limitation, and it is certainly not a state we want to at in the long-term. There are some ideas floating around to have both Scala/Java types go through the same type extraction / serialization stack, but I don't think there is anything concrete to share yet.

As for supporting 2.13, and the corresponding migration from 2.12, I'm not aware of a concrete plan at this time. We do want to support 2.13/3.0 at some point, but the migration is a tricky thing, hence why we put off upgrading Scala beyond 2.12.7 for so long. At the moment we are primarily concerned with making such upgrades easier in the future by isolating individual Scala-reliant components from the Scala APIs.

If you have ideas in this direction that you'd like to share, then I'd suggest to head on over to https://issues.apache.org/jira/browse/FLINK-13414 and present them there. At a glance your plan sounds pretty good, but I'm also not too deeply involved in the serializer stack ;)

On 07/12/2021 14:51, Roman Grebennikov wrote:
Hi,

I guess using scala 2.13 with scala-free Flink 1.15 assumes that it will always 
use generic/Kryo serialization, which has a large performance penalty (YMMV, 
but it happens all the time with us when we accidentaly use flink java apis 
with scala case classes).

As far as I know, Flink's set of scala serializers for collections is using 
some 2.11/2.12 specific deprecated internal things like CanBuildFrom, which are 
not available on 2.13. So implementing a state migration from 2.12 to 2.13 is 
not that easy due to a way flink TraversableSerializer is implemented. And 
createTypeInformation scala macro flink is using for deriving serializers for 
scala case classes is not directly compatible with 3.0, as there is a 
completely new scala macro API on 3.x.

Chesnay, I'm wondering what is the plan on 2.13/3.0 support in the future?

If I was the one writing a FLIP for this process, I can imagine it like this:
* as 2.11 is finally removed in 1.15, the createTypeInformation macro can be 
re-done on top of magnolia, which supports 2.12, 2.13 and 3.x with the same API.
* current impementation of flink's serializers for scala collections (afaik in 
TraversableSerializer) is serializing the whole CanBuildFrom code for a 
specific concrete collection type right in the snapshot. So it cannot be 
deserialized on 2.13, as there is no CanBuildFrom. But my own opinion is that 
the cases when someone has custom CanBuildFrom for their own hand-made scala 
collection implementation is extremely rare, so with a set of heuristics we can 
guess the concrete collection type right from the serialized CanBuildFrom scala 
code, assuming that there is finite number of collection types (around 10 or 
something).

With this approach we can: support 2.12/2.13/3.x with the same codebase, and 
allow state migrations between scala versions.

I did some sort of prototype for step 1 (and partially step 2) 
inhttps://github.com/findify/flink-adt  , although with a different goal of 
supporting scala ADTs, so if anyone interested, I can make a draft FLIP 
proposal based on this research to start the discussion.

with best regards,
Roman Grebennikov |g...@dfdx.me

On Tue, Dec 7, 2021, at 08:46, Chesnay Schepler wrote:
We haven't changed anything significant in 1.14.

Whether the 2.13 job can run on Scala 2.12 depends a bit on the job (and
of course, used libraries!); it depends on the backwards-compatibility
from Scala, which APIs are used and what kind of Scala magic is being
employed.
We haven't really tested that scenario in 1.14 or below.

On 07/12/2021 09:28, guenterh.lists wrote:
Hi Chesnay,

thanks for the info - this is really good news for us.

I set up a playground using the snapshot from yesterday [1] and a
really quick and short Job using Scala 2.13 [2]

The job starts and returns correct results. Even the use of a case
class against the Java API is possible.

Then I made a second try with the same job (compiled with Scala
2.13.6) running on a Flink 1.14 cluster which was again successful.

My question:
Is this compilation with Scala versions >=2.13 already part of 1.14 or
is my example too small and simple that binary incompatibilities
between the versions doesn't matter?

Günter


[1]
https://gitlab.com/guenterh/flink_1_15_scala_2_13/-/tree/main/flink-1.15-SNAPSHOT
[2]
https://gitlab.com/guenterh/flink_1_15_scala_2_13/-/blob/main/flink_scala_213/build.sbt#L12
https://gitlab.com/guenterh/flink_1_15_scala_2_13/-/blob/main/flink_scala_213/src/main/scala/de/ub/unileipzig/Job.scala#L8


On 06.12.21 13:59, Chesnay Schepler wrote:
With regards to the Java APIs, you will definitely be able to use the
Java DataSet/DataStream APIs from Scala without any restrictions
imposed by Flink. This is already working with the current SNAPSHOT
version.

As we speak we are also working to achieve the same for the Table
API; we expect to achieve that but with some caveats (i.e., if you
use the Python API or the Hive connector then you still need to use
the Scala version provided by Flink).

As for the Scala APIs, we haven't really decided yet how this will
work in the future. However, one of the big benefits of the
Scala-free runtime is that it should now be easier for us to release
the APIs for more Scala versions.

On 06/12/2021 11:47, guenterh.lists wrote:
Dear list,

there have been some discussions and activities in the last months
about a Scala free runtime which should make it possible to use
newer Scala version (>= 2.13 / 3.x) on the application side.

Stephan Ewen announced the implementation is on the way [1] and
Martijn Vissr mentioned in the ask me anything session on version
1.14 that it is planned to make this possible in the upcoming 1.15
version (~ next February ) [2]

This would be very nice for our currently started project where we
are discussing the used tools and infrastructure. "Personally" I
would prefer that people with less experience on the JVM could make
their start and first experiences with a "pythonized" Scala using
the last versions of the language (2.13.x or maybe 3.x).

My question: Do you think your plans to provide the possibility of a
Scala free runtime with the upcoming version is still realistic?

Out of curiosity: If you can make this possible and applications
with current Scala versions are going to use the Java APIs of Flink
what's the future of the current Scala API of Flink where you have
to decide to use either Scala 2.11 or <2.12.8?
Is this then still possible as an alternative?

Thanks for some hints for our planning and decisions

Günter




[1]https://twitter.com/data_fly/status/1415012793347149830
[2]https://www.youtube.com/watch?v=wODmlow0ip0

Reply via email to