Unsubscribe

2021-12-07 Thread rimin515
Unsubscribe




Re: use of Scala versions >= 2.13 in Flink 1.15

2021-12-07 Thread guenterh.lists

Hi Chesnay,

thanks for the info - this is really good news for us.

I set up a playground using the snapshot from yesterday [1] and a really 
quick and short Job using Scala 2.13 [2]


The job starts and returns correct results. Even the use of a case class 
against the Java API is possible.


Then I made a second try with the same job (compiled with Scala 2.13.6) 
running on a Flink 1.14 cluster which was again successful.


My question:
Is this compilation with Scala versions >=2.13 already part of 1.14 or 
is my example too small and simple that binary incompatibilities between 
the versions doesn't matter?


Günter


[1] 
https://gitlab.com/guenterh/flink_1_15_scala_2_13/-/tree/main/flink-1.15-SNAPSHOT
[2] 
https://gitlab.com/guenterh/flink_1_15_scala_2_13/-/blob/main/flink_scala_213/build.sbt#L12

https://gitlab.com/guenterh/flink_1_15_scala_2_13/-/blob/main/flink_scala_213/src/main/scala/de/ub/unileipzig/Job.scala#L8


On 06.12.21 13:59, Chesnay Schepler wrote:
With regards to the Java APIs, you will definitely be able to use the 
Java DataSet/DataStream APIs from Scala without any restrictions 
imposed by Flink. This is already working with the current SNAPSHOT 
version.


As we speak we are also working to achieve the same for the Table API; 
we expect to achieve that but with some caveats (i.e., if you use the 
Python API or the Hive connector then you still need to use the Scala 
version provided by Flink).


As for the Scala APIs, we haven't really decided yet how this will 
work in the future. However, one of the big benefits of the Scala-free 
runtime is that it should now be easier for us to release the APIs for 
more Scala versions.


On 06/12/2021 11:47, guenterh.lists wrote:

Dear list,

there have been some discussions and activities in the last months 
about a Scala free runtime which should make it possible to use newer 
Scala version (>= 2.13 / 3.x) on the application side.


Stephan Ewen announced the implementation is on the way [1] and 
Martijn Vissr mentioned in the ask me anything session on version 
1.14 that it is planned to make this possible in the upcoming 1.15 
version (~ next February ) [2]


This would be very nice for our currently started project where we 
are discussing the used tools and infrastructure. "Personally" I 
would prefer that people with less experience on the JVM could make 
their start and first experiences with a "pythonized" Scala using the 
last versions of the language (2.13.x or maybe 3.x).


My question: Do you think your plans to provide the possibility of a 
Scala free runtime with the upcoming version is still realistic?


Out of curiosity: If you can make this possible and applications with 
current Scala versions are going to use the Java APIs of Flink what's 
the future of the current Scala API of Flink where you have to decide 
to use either Scala 2.11 or <2.12.8?

Is this then still possible as an alternative?

Thanks for some hints for our planning and decisions

Günter




[1] https://twitter.com/data_fly/status/1415012793347149830
[2] https://www.youtube.com/watch?v=wODmlow0ip0




--
Günter Hipler
University library Leipzig



Re: Unsubscribe

2021-12-07 Thread Adriel Peng
Please send an empty email to:
user-unsubscr...@flink.apache.org
to unsubscribe yourself from the list.

On Tue, Dec 7, 2021 at 4:25 PM  wrote:

> Unsubscribe
>
>
>


Re: [DISCUSS] Drop Zookeeper 3.4

2021-12-07 Thread Chesnay Schepler
Since this is only relevant for 1.15, if you intend to migrate to 1.15 
close to the release, then somewhere around February.


The only resource I could find for migrating Zookeeper is this FAQ: 
https://cwiki.apache.org/confluence/display/ZOOKEEPER/Upgrade+FAQ


On 07/12/2021 04:02, Dongwon Kim wrote:

When should I prepare for upgrading ZK to 3.5 or newer?
We're operating a Hadoop cluster w/ ZK 3.4.6 for running only Flink jobs.
Just hope that the rolling update is not that painful - any advice on 
this?


Best,

Dongwon

On Tue, Dec 7, 2021 at 3:22 AM Chesnay Schepler  
wrote:


Current users of ZK 3.4 and below would need to upgrade their
Zookeeper installation that is used by Flink to 3.5+.

Whether K8s users are affected depends on whether they use ZK or
not. If they do, see above, otherwise they are not affected at all.

On 06/12/2021 18:49, Arvid Heise wrote:

Could someone please help me understand the implications of the
upgrade?

As far as I understood this upgrade would only affect users that
have a zookeeper shared across multiple services, some of which
require ZK 3.4-? A workaround for those users would be to run two
ZKs with different versions, eventually deprecating old ZK, correct?

If that is the only limitation, I'm +1 for the proposal since ZK
3.4 is already EOL.

How are K8s users affected?

Best,

Arvid

On Mon, Dec 6, 2021 at 2:00 PM Chesnay Schepler
 wrote:

ping @users; any input on how this would affect you is highly
appreciated.

On 25/11/2021 22:39, Chesnay Schepler wrote:
> I included the user ML in the thread.
>
> @users Are you still using Zookeeper 3.4? If so, were you
planning to
> upgrade Zookeeper in the near future?
>
> I'm not sure about ZK compatibility, but we'd also upgrade
Curator to
> 5.x, which doesn't support ookeeperK 3.4 anymore.
>
> On 25/11/2021 21:56, Till Rohrmann wrote:
>> Should we ask on the user mailing list whether anybody is
still using
>> ZooKeeper 3.4 and thus needs support for this version or
can a ZooKeeper
>> 3.5/3.6 client talk to a ZooKeeper 3.4 cluster? I would
expect that
>> not a
>> lot of users depend on it but just to make sure that we
aren't
>> annoying a
>> lot of our users with this change. Apart from that +1 for
removing it if
>> not a lot of user depend on it.
>>
>> Cheers,
>> Till
>>
>> On Wed, Nov 24, 2021 at 11:03 AM Matthias Pohl

>> wrote:
>>
>>> Thanks for starting this discussion, Chesnay. +1 from my
side. It's
>>> time to
>>> move forward with the ZK support considering the EOL of
3.4 you already
>>> mentioned. The benefits we gain from upgrading Curator to
5.x as a
>>> consequence is another plus point. Just for reference on the
>>> inconsistent
>>> state issue you mentioned: FLINK-24543 [1].
>>>
>>> Matthias
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-24543
>>>
>>> On Wed, Nov 24, 2021 at 10:19 AM Chesnay Schepler

>>> wrote:
>>>
 Hello,

 I'd like to drop support for Zookeeper 3.4 in 1.15,
upgrading the
 default to 3.5 with an opt-in for 3.6.

 Supporting Zookeeper 3.4 (which is already EOL) prevents
us from
 upgrading Curator to 5.x, which would allow us to
properly fix an
 issue
 with inconsistent state. It is also required to
eventually support ZK
>>> 3.6.
>
>





Re: use of Scala versions >= 2.13 in Flink 1.15

2021-12-07 Thread Chesnay Schepler

We haven't changed anything significant in 1.14.

Whether the 2.13 job can run on Scala 2.12 depends a bit on the job (and 
of course, used libraries!); it depends on the backwards-compatibility 
from Scala, which APIs are used and what kind of Scala magic is being 
employed.

We haven't really tested that scenario in 1.14 or below.

On 07/12/2021 09:28, guenterh.lists wrote:

Hi Chesnay,

thanks for the info - this is really good news for us.

I set up a playground using the snapshot from yesterday [1] and a 
really quick and short Job using Scala 2.13 [2]


The job starts and returns correct results. Even the use of a case 
class against the Java API is possible.


Then I made a second try with the same job (compiled with Scala 
2.13.6) running on a Flink 1.14 cluster which was again successful.


My question:
Is this compilation with Scala versions >=2.13 already part of 1.14 or 
is my example too small and simple that binary incompatibilities 
between the versions doesn't matter?


Günter


[1] 
https://gitlab.com/guenterh/flink_1_15_scala_2_13/-/tree/main/flink-1.15-SNAPSHOT
[2] 
https://gitlab.com/guenterh/flink_1_15_scala_2_13/-/blob/main/flink_scala_213/build.sbt#L12
https://gitlab.com/guenterh/flink_1_15_scala_2_13/-/blob/main/flink_scala_213/src/main/scala/de/ub/unileipzig/Job.scala#L8 




On 06.12.21 13:59, Chesnay Schepler wrote:
With regards to the Java APIs, you will definitely be able to use the 
Java DataSet/DataStream APIs from Scala without any restrictions 
imposed by Flink. This is already working with the current SNAPSHOT 
version.


As we speak we are also working to achieve the same for the Table 
API; we expect to achieve that but with some caveats (i.e., if you 
use the Python API or the Hive connector then you still need to use 
the Scala version provided by Flink).


As for the Scala APIs, we haven't really decided yet how this will 
work in the future. However, one of the big benefits of the 
Scala-free runtime is that it should now be easier for us to release 
the APIs for more Scala versions.


On 06/12/2021 11:47, guenterh.lists wrote:

Dear list,

there have been some discussions and activities in the last months 
about a Scala free runtime which should make it possible to use 
newer Scala version (>= 2.13 / 3.x) on the application side.


Stephan Ewen announced the implementation is on the way [1] and 
Martijn Vissr mentioned in the ask me anything session on version 
1.14 that it is planned to make this possible in the upcoming 1.15 
version (~ next February ) [2]


This would be very nice for our currently started project where we 
are discussing the used tools and infrastructure. "Personally" I 
would prefer that people with less experience on the JVM could make 
their start and first experiences with a "pythonized" Scala using 
the last versions of the language (2.13.x or maybe 3.x).


My question: Do you think your plans to provide the possibility of a 
Scala free runtime with the upcoming version is still realistic?


Out of curiosity: If you can make this possible and applications 
with current Scala versions are going to use the Java APIs of Flink 
what's the future of the current Scala API of Flink where you have 
to decide to use either Scala 2.11 or <2.12.8?

Is this then still possible as an alternative?

Thanks for some hints for our planning and decisions

Günter




[1] https://twitter.com/data_fly/status/1415012793347149830
[2] https://www.youtube.com/watch?v=wODmlow0ip0







Re: Re: Re: how to run streaming process after batch process is completed?

2021-12-07 Thread Joern Kottmann
Hello,

One of the applications Spire [1] is using Flink for is to process AIS [2]
data collected by our satellites and from other sources. AIS is
transmitting a ships' static and dynamic information, such as names,
callsigns or positions. One of the challenges processing AIS data is that
there are no unique keys, since the mmsi or imo can be spoofed or is
sometimes shared between vessels.

To deal with multiple vessels per mmsi we use a Keyed Process Function that
keeps state per detected vessel, data about the vessel is stored in the
state of the function and is hard to transfer out of the batch processing.
Batch processing really helps to collect data about a vessel and is
therefore necessary for us before we can switch to stream mode.
Since the state and the outputs are not the same the reconstruction of the
state for stream mode can't be achieved by feeding the outputs into the
pipeline via some source. Therefore we need code in our batch job just to
deal with extracting the state.

A vessel is usually outputted for each update that is received for it, but
outputting it together with it's entire state is not desirable for
performance reasons in batch mode. Also some vessels should never be
outputted but need to be restored.

The pipeline has a couple of stateful functions and the more we add the
harder it gets to restore the state.

Best,
Jörn


Scala Case Class Serialization

2021-12-07 Thread Lars Skjærven
Hello,
We're running Flink 1.14 with scala, and we're suspecting that performance
is suffering due to serialization of some scala case classes. Specifically
we're seeing that our Case Class "cannot be used as a POJO type because not
all fields are valid POJO fields, and must be processed as GenericType",
and that the case class "does not contain a setter for field X". I'm
interpreting these log messages as performance warnings.

A simple case class example we're writing to state that triggers the
mentioned 'warnings':
case class Progress(position: Int, eventTime: Int, alive: Boolean)

I'm understanding the docs that case classes with primitive types should be
supported "out of the box".

Any tips on how to proceed ?

Kind regards,
Lars


Re: Unable to create new native thread error

2021-12-07 Thread Ilan Huchansky
Hi David,

In that case, I will start working on using the CLI instead of the REST API 
right away.

Will update you when I finish.

Thanks for the help,
Ilan.


From: David Morávek 
Date: Monday, 6 December 2021 at 10:34
To: Ilan Huchansky 
Cc: user@flink.apache.org , Start.io SDP 
Subject: Re: Unable to create new native thread error
Hi Ilan,

I think so, using CLI instead of REST API should solve this, as the user code 
execution would be pulled out to a separate JVM. If you're going to try that, 
it would be great to hear back whether it has solved your issue.

As for 1.13.4, there is currently no on-going effort / concrete plan on the 
release.

Best,
D.

On Sun, Dec 5, 2021 at 4:06 PM Ilan Huchansky 
mailto:ilan.huchan...@start.io>> wrote:
Hi David,

Thanks for your fast response.

Do you think that changing the submission method could solve the problem? Using 
the CLI instead of the REST API.

Another question, I see that the most critical issue (FLINK-25022) is in 
progress and should be released on with version 1.13.4 , do you know when this 
version is planned to be released?

Thanks again,
Ilan.

From: David Morávek mailto:d...@apache.org>>
Date: Thursday, 2 December 2021 at 17:25
To: Ilan Huchansky mailto:ilan.huchan...@start.io>>
Cc: user@flink.apache.org 
mailto:user@flink.apache.org>>, Start.io SDP 
mailto:s...@start.io>>
Subject: Re: Unable to create new native thread error
Hi Ilan,

we are aware of multiple issues when web-submission can result in classloader / 
thread local leaks, which could potentially result in the behavior you're 
describing. We're working on addressing them.

FLINK-25022 [1]: The most critical one leaking thread locals.
FLINK-25027 [2]: Is only a memory improvement for a particular situation (a lot 
of small batch jobs) and could be fixed by accounting for when setting 
Metaspace size.
FLINK-25023 [3]: Can leak the classloader of the first job submitted via rest 
API. (constant overhead for Metaspace)

In general, web-submission is different from a normal submission in way, that 
the "main method" of the uploaded jar is executed on JobManager and it's really 
hard to isolate it's execution from possible side effects.

[1] https://issues.apache.org/jira/browse/FLINK-25022
[2] https://issues.apache.org/jira/browse/FLINK-25027
[3] https://issues.apache.org/jira/browse/FLINK-25023

Best,
D.

On Thu, Dec 2, 2021 at 3:51 PM Ilan Huchansky 
mailto:ilan.huchan...@start.io>> wrote:
Hi Flink mailing list,

I am Ilan from Start.io data platform team, need some guidance.

We have a flow with the following use case:


  *   We read files from AWS S3 buckets process them on our cluster and sink 
the data into files using Flink file sink.
  *   The jobs use always the same jar, we uploaded it to every job manager on 
the cluster.
  *   We are submitting jobs constantly through the REST API.
  *   Each job reads one or more files from S3.
  *   The jobs can run from 20 seconds up to 3.5 hours.
  *   The jobs run on batch mode
  *   Running flink 1.13.1
  *   We are running in cluster mode using docker, same machines are being used 
for task and job manager.

 We are struggling with the same error, over and over again. We encounter it in 
the job manager and in the task manager.

After a while that the cluster is running and jobs are finishing correctly the 
task and job manager fail to operate due to:
Caused by: java.lang.OutOfMemoryError: unable to create new native thread.


We also see some sporadic failure of java.lang.NoClassDefFoundError, not sure 
it is related.

Our set up and configuration are as follow:
* 5 nodes cluster running on docker
* Relevant memory config:
jobmanager.memory.heap.size: 1600m
taskmanager.memory.process.size: 231664m
taskmanager.memory.network.fraction: 0.3
taskmanager.memory.jvm-metaspace.size: 10g
jobmanager.memory.jvm-metaspace.size: 2g
taskmanager.memory.framework.off-heap.size: 1g

* Host details
max locked memory  (kbytes, -l) 65536
max memory size   (kbytes, -m) unlimited
open files (-n) 1024
max user processes(-u) 1547269
virtual memory   (kbytes, -v) unlimited
file locks   (-x) unlimited

cat /proc/sys/kernel/threads-max: 3094538
kernel.pid_max = 57344


We try to increase the max user processes, also to increase and decrease the 
jvm-metaspace.

Should we keep increasing the max number of processes on the host, Is there a 
way to limit the number of threads from flink config?

What should we do? Any insights?
I can provide more information as needed.

Thanks in advance

 Ilan



Alternatives of KafkaDeserializationSchema.isEndOfStream()

2021-12-07 Thread Ayush Chauhan
Hi,

Can you please let me know the alternatives of isEndOfStream() as now
according to docs this method will no longer be used to determine the end
of the stream.

-- 
 Ayush Chauhan
 Data Platform
 [image: mobile-icon]  +91 9990747111

-- 












This email is intended only for the person or the entity to 
whom it is addressed. If you are not the intended recipient, please delete 
this email and contact the sender.



Running a flink job with Yarn per-job mode from application code.

2021-12-07 Thread Kamil ty
Hello all,

I'm looking for a way to submit a Yarn job from another flink jobs
application code. I can see that you can access a cluster and submit jobs
with a RestClusterClient, but it seems a Yarn per-job mode is not supported
with it.

Any suggestions would be appreciated.

Best Regards
Kamil


Re: Scala Case Class Serialization

2021-12-07 Thread Matthias Pohl
Hi Lars,
not sure about the out-of-the-box support for case classes with primitive
member types (could you refer to the section which made you conclude
this?). I haven't used Scala with Flink, yet. So maybe, others can give
more context.
But have you looked into using the TypeInfoFactory to define the schema [1]?

Best,
Matthias

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory

On Tue, Dec 7, 2021 at 10:03 AM Lars Skjærven  wrote:

> Hello,
> We're running Flink 1.14 with scala, and we're suspecting that performance
> is suffering due to serialization of some scala case classes. Specifically
> we're seeing that our Case Class "cannot be used as a POJO type because not
> all fields are valid POJO fields, and must be processed as GenericType",
> and that the case class "does not contain a setter for field X". I'm
> interpreting these log messages as performance warnings.
>
> A simple case class example we're writing to state that triggers the
> mentioned 'warnings':
> case class Progress(position: Int, eventTime: Int, alive: Boolean)
>
> I'm understanding the docs that case classes with primitive types should
> be supported "out of the box".
>
> Any tips on how to proceed ?
>
> Kind regards,
> Lars
>


Re: Scala Case Class Serialization

2021-12-07 Thread Roman Grebennikov
Hi Lars,

can you please show a small reproducer of the way you construct the DataStream, 
and which imports do you use?

We also often experience similar performance issues with scala, but usually 
they are related to accidental usage of Flink Java API. A couple of hints from 
my experience:
1. Make sure that you always use the scala DataStream, and not the java one.
2. All operations on scala datastream require an implicit TypeInformation[T] 
parameter, which is usually generated automatically for you if you do an 
"import org.apache.flink.api.scala._" by the createTypeInformation[T] macro. So 
make sure you have this import present.
3. You can do a "env.getConfig.disableGenericTypes" and flink will throw an 
exception each time it have to fall back to generic kryo serialization. 
Backtrace will highlight you an exact place in your code where it have to do a 
kryo fallback.

Also Flink will always revert to Kryo in case if you use sum types (or ADTs, or 
"sealed traits"). Shameless plug: we made a library to support that: 
https://github.com/findify/flink-adt

Roman Grebennikov | g...@dfdx.me


On Tue, Dec 7, 2021, at 11:20, Matthias Pohl wrote:
> Hi Lars,
> not sure about the out-of-the-box support for case classes with primitive 
> member types (could you refer to the section which made you conclude this?). 
> I haven't used Scala with Flink, yet. So maybe, others can give more context.
> But have you looked into using the TypeInfoFactory to define the schema [1]?
> 
> Best,
> Matthias
> 
> [1] 
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory
> 
> On Tue, Dec 7, 2021 at 10:03 AM Lars Skjærven  wrote:
>> Hello,
>> We're running Flink 1.14 with scala, and we're suspecting that performance 
>> is suffering due to serialization of some scala case classes. Specifically 
>> we're seeing that our Case Class "cannot be used as a POJO type because not 
>> all fields are valid POJO fields, and must be processed as GenericType", and 
>> that the case class "does not contain a setter for field X". I'm 
>> interpreting these log messages as performance warnings. 
>> 
>> A simple case class example we're writing to state that triggers the 
>> mentioned 'warnings': 
>> 
>> case class Progress(position: Int, eventTime: Int, alive: Boolean)
>> 
>> I'm understanding the docs that case classes with primitive types should be 
>> supported "out of the box". 
>> 
>> Any tips on how to proceed ? 
>> 
>> Kind regards, 
>> Lars
> 


Re: use of Scala versions >= 2.13 in Flink 1.15

2021-12-07 Thread Roman Grebennikov
Hi,

I guess using scala 2.13 with scala-free Flink 1.15 assumes that it will always 
use generic/Kryo serialization, which has a large performance penalty (YMMV, 
but it happens all the time with us when we accidentaly use flink java apis 
with scala case classes).

As far as I know, Flink's set of scala serializers for collections is using 
some 2.11/2.12 specific deprecated internal things like CanBuildFrom, which are 
not available on 2.13. So implementing a state migration from 2.12 to 2.13 is 
not that easy due to a way flink TraversableSerializer is implemented. And 
createTypeInformation scala macro flink is using for deriving serializers for 
scala case classes is not directly compatible with 3.0, as there is a 
completely new scala macro API on 3.x.

Chesnay, I'm wondering what is the plan on 2.13/3.0 support in the future?

If I was the one writing a FLIP for this process, I can imagine it like this:
* as 2.11 is finally removed in 1.15, the createTypeInformation macro can be 
re-done on top of magnolia, which supports 2.12, 2.13 and 3.x with the same API.
* current impementation of flink's serializers for scala collections (afaik in 
TraversableSerializer) is serializing the whole CanBuildFrom code for a 
specific concrete collection type right in the snapshot. So it cannot be 
deserialized on 2.13, as there is no CanBuildFrom. But my own opinion is that 
the cases when someone has custom CanBuildFrom for their own hand-made scala 
collection implementation is extremely rare, so with a set of heuristics we can 
guess the concrete collection type right from the serialized CanBuildFrom scala 
code, assuming that there is finite number of collection types (around 10 or 
something).

With this approach we can: support 2.12/2.13/3.x with the same codebase, and 
allow state migrations between scala versions.

I did some sort of prototype for step 1 (and partially step 2) in 
https://github.com/findify/flink-adt , although with a different goal of 
supporting scala ADTs, so if anyone interested, I can make a draft FLIP 
proposal based on this research to start the discussion.

with best regards,
Roman Grebennikov | g...@dfdx.me

On Tue, Dec 7, 2021, at 08:46, Chesnay Schepler wrote:
> We haven't changed anything significant in 1.14.
>
> Whether the 2.13 job can run on Scala 2.12 depends a bit on the job (and 
> of course, used libraries!); it depends on the backwards-compatibility 
> from Scala, which APIs are used and what kind of Scala magic is being 
> employed.
> We haven't really tested that scenario in 1.14 or below.
>
> On 07/12/2021 09:28, guenterh.lists wrote:
>> Hi Chesnay,
>>
>> thanks for the info - this is really good news for us.
>>
>> I set up a playground using the snapshot from yesterday [1] and a 
>> really quick and short Job using Scala 2.13 [2]
>>
>> The job starts and returns correct results. Even the use of a case 
>> class against the Java API is possible.
>>
>> Then I made a second try with the same job (compiled with Scala 
>> 2.13.6) running on a Flink 1.14 cluster which was again successful.
>>
>> My question:
>> Is this compilation with Scala versions >=2.13 already part of 1.14 or 
>> is my example too small and simple that binary incompatibilities 
>> between the versions doesn't matter?
>>
>> Günter
>>
>>
>> [1] 
>> https://gitlab.com/guenterh/flink_1_15_scala_2_13/-/tree/main/flink-1.15-SNAPSHOT
>> [2] 
>> https://gitlab.com/guenterh/flink_1_15_scala_2_13/-/blob/main/flink_scala_213/build.sbt#L12
>> https://gitlab.com/guenterh/flink_1_15_scala_2_13/-/blob/main/flink_scala_213/src/main/scala/de/ub/unileipzig/Job.scala#L8
>>  
>>
>>
>>
>> On 06.12.21 13:59, Chesnay Schepler wrote:
>>> With regards to the Java APIs, you will definitely be able to use the 
>>> Java DataSet/DataStream APIs from Scala without any restrictions 
>>> imposed by Flink. This is already working with the current SNAPSHOT 
>>> version.
>>>
>>> As we speak we are also working to achieve the same for the Table 
>>> API; we expect to achieve that but with some caveats (i.e., if you 
>>> use the Python API or the Hive connector then you still need to use 
>>> the Scala version provided by Flink).
>>>
>>> As for the Scala APIs, we haven't really decided yet how this will 
>>> work in the future. However, one of the big benefits of the 
>>> Scala-free runtime is that it should now be easier for us to release 
>>> the APIs for more Scala versions.
>>>
>>> On 06/12/2021 11:47, guenterh.lists wrote:
 Dear list,

 there have been some discussions and activities in the last months 
 about a Scala free runtime which should make it possible to use 
 newer Scala version (>= 2.13 / 3.x) on the application side.

 Stephan Ewen announced the implementation is on the way [1] and 
 Martijn Vissr mentioned in the ask me anything session on version 
 1.14 that it is planned to make this possible in the upcoming 1.15 
 version (~ next February ) [2]

 This would be very nic

Customize Kafka client (module.yaml)

2021-12-07 Thread Jérémy Albrecht
Hi All,

I encounter a blocking problem linked to exchanging messages between Stateful 
functions.
The context is: I am sending a very large payload from a Stateful Function to a 
Kafka topic. I am blocked by the Kafka client (I think) because here is the 
output of the statefun-manager container:
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message 
is 6660172 bytes when serialized which is larger than the maximum request size 
you have configured with the max.request.size configuration.

Now if I take a look at the documentation 
(https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/io/apache-kafka/)
 they refer to the Confluent doc to customize the configuration of the Kafka 
client. It is unclear on how to implement this into the module.yaml file. I 
tried several ways:

ingresses:
  - ingress:
  meta:
type: io.statefun.kafka/ingress
id: project.A/input
  spec:
max:
  request:
size: 104857600
max.request.size: 11000
message:
  max:
bytes: 104857600
address: kafka:9092
consumerGroupId: my-consumer-group
startupPosition:
  type: earliest
topics:
  - topic: entry # used for retrop-compatibility, to be removed in next 
release
valueType: project.A/Message
targets:
  - project.redacted/Entry

None of the above solutions seems to be working.
Does anyone have the ability to clarify what I am not doing correctly ?

Thanks in advance,
Jérémy


Re: use of Scala versions >= 2.13 in Flink 1.15

2021-12-07 Thread Chesnay Schepler
Indeed, if you use a scala-free Flink then Scala types would currently 
go through Kryo, hence why we will recommend to use Java types /for the 
time being/.
We are aware that this is an annoying limitation, and it is certainly 
not a state we want to at in the long-term.
There are some ideas floating around to have both Scala/Java types go 
through the same type extraction / serialization stack, but I don't 
think there is anything concrete to share yet.


As for supporting 2.13, and the corresponding migration from 2.12, I'm 
not aware of a concrete plan at this time.
We do want to support 2.13/3.0 at some point, but the migration is a 
tricky thing, hence why we put off upgrading Scala beyond 2.12.7 for so 
long.
At the moment we are primarily concerned with making such upgrades 
easier in the future by isolating individual Scala-reliant components 
from the Scala APIs.


If you have ideas in this direction that you'd like to share, then I'd 
suggest to head on over to 
https://issues.apache.org/jira/browse/FLINK-13414 and present them there.
At a glance your plan sounds pretty good, but I'm also not too deeply 
involved in the serializer stack ;)


On 07/12/2021 14:51, Roman Grebennikov wrote:

Hi,

I guess using scala 2.13 with scala-free Flink 1.15 assumes that it will always 
use generic/Kryo serialization, which has a large performance penalty (YMMV, 
but it happens all the time with us when we accidentaly use flink java apis 
with scala case classes).

As far as I know, Flink's set of scala serializers for collections is using 
some 2.11/2.12 specific deprecated internal things like CanBuildFrom, which are 
not available on 2.13. So implementing a state migration from 2.12 to 2.13 is 
not that easy due to a way flink TraversableSerializer is implemented. And 
createTypeInformation scala macro flink is using for deriving serializers for 
scala case classes is not directly compatible with 3.0, as there is a 
completely new scala macro API on 3.x.

Chesnay, I'm wondering what is the plan on 2.13/3.0 support in the future?

If I was the one writing a FLIP for this process, I can imagine it like this:
* as 2.11 is finally removed in 1.15, the createTypeInformation macro can be 
re-done on top of magnolia, which supports 2.12, 2.13 and 3.x with the same API.
* current impementation of flink's serializers for scala collections (afaik in 
TraversableSerializer) is serializing the whole CanBuildFrom code for a 
specific concrete collection type right in the snapshot. So it cannot be 
deserialized on 2.13, as there is no CanBuildFrom. But my own opinion is that 
the cases when someone has custom CanBuildFrom for their own hand-made scala 
collection implementation is extremely rare, so with a set of heuristics we can 
guess the concrete collection type right from the serialized CanBuildFrom scala 
code, assuming that there is finite number of collection types (around 10 or 
something).

With this approach we can: support 2.12/2.13/3.x with the same codebase, and 
allow state migrations between scala versions.

I did some sort of prototype for step 1 (and partially step 2) 
inhttps://github.com/findify/flink-adt  , although with a different goal of 
supporting scala ADTs, so if anyone interested, I can make a draft FLIP 
proposal based on this research to start the discussion.

with best regards,
Roman Grebennikov |g...@dfdx.me

On Tue, Dec 7, 2021, at 08:46, Chesnay Schepler wrote:

We haven't changed anything significant in 1.14.

Whether the 2.13 job can run on Scala 2.12 depends a bit on the job (and
of course, used libraries!); it depends on the backwards-compatibility
from Scala, which APIs are used and what kind of Scala magic is being
employed.
We haven't really tested that scenario in 1.14 or below.

On 07/12/2021 09:28, guenterh.lists wrote:

Hi Chesnay,

thanks for the info - this is really good news for us.

I set up a playground using the snapshot from yesterday [1] and a
really quick and short Job using Scala 2.13 [2]

The job starts and returns correct results. Even the use of a case
class against the Java API is possible.

Then I made a second try with the same job (compiled with Scala
2.13.6) running on a Flink 1.14 cluster which was again successful.

My question:
Is this compilation with Scala versions >=2.13 already part of 1.14 or
is my example too small and simple that binary incompatibilities
between the versions doesn't matter?

Günter


[1]
https://gitlab.com/guenterh/flink_1_15_scala_2_13/-/tree/main/flink-1.15-SNAPSHOT
[2]
https://gitlab.com/guenterh/flink_1_15_scala_2_13/-/blob/main/flink_scala_213/build.sbt#L12
https://gitlab.com/guenterh/flink_1_15_scala_2_13/-/blob/main/flink_scala_213/src/main/scala/de/ub/unileipzig/Job.scala#L8  




On 06.12.21 13:59, Chesnay Schepler wrote:

With regards to the Java APIs, you will definitely be able to use the
Java DataSet/DataStream APIs from Scala without any restrictions
imposed by Flink. This is already working with the current SNAPSHOT

Re: Windows and data loss.

2021-12-07 Thread John Smith
Hi, same here there's no pressure...

So my overall architecture is as follows. I try to follow the 12 factor app
design principles.
1- My APIs all or 99% run inside DC/OS as docker containers.
2- The apps log to the console, which get written to disk by DC/OS log
driver.
3- The apps are balanced by the deployment scheduler over multiple hosts
and most apps have at least 2 to 3 instances running load balanced and HA.
4- Each host has Filebeat running and tailing the logs.
5- All filebeat agents are pushing into 1 big topic called app-logs that
has 18 partitions.
6- From that 1 topic
 a) There's a logstash agent pushing into Elastic for our centralised
logs solution. So we can go search and debug logs from Kibana.
 b) There are various flink jobs running as ETL (NO TIME REQUIREMENTS)
pushing some logs to SQL Database and caches.
7- So far all app logs in this one big topic are "logically" independent
from each other. I.e: There's no real requirement to join them or associate
them together in any way.

Ok that said for this new thing I want to do. In the above platform/design
there is 1 app where I want to start analysing it's HTTP logs and start
doing "click" counting. So for each HTTP GET we want to group and count for
each path the total count since the "start". Example:

/hello/ = 200,000 clicks
/foo/ = 50,000 clicks
/foo/bar/ = 150,000 clicks

So the first thing I do out of that 1 big topic is filter that specific
application logs as follows...
For now it's done in the same job or I can maybe write a job to filter out
into a separate kafka topic ( I guess it depends how much strain the one
big topic has) but for now it's ok.

DataStream clickStream = env.addSource(flinkKafkaConsumer)
.uid(kafkaTopic).name(kafkaTopic)
.setParallelism(kafkaParallelism)
// Filter the log based on the provided "tag"
.filter(new FilterFilebeatLogs("click-stream-app"))
.uid("filter-app-logs").name("filter-app-logs")
// Extract encode JSon string
.flatMap(new JsonEncodedStringToJsonObject("message"))
.uid("map-json-logs").name("map-json-logs")
// Specifically filter the click HTTP logs streams.
.filter(...)
.uid("filter-clicks").name("filter-clicks")
// Now that we have the logs we need. Add key and windowing functions
here...

So the easy way out of this is just to use wall time and just count buckets
every X minute keyed by PATH and time. This will give 100% accurate counts.
This is implemented and works perfectly. And can display running count on
my UI.

Now say I wanted 100% accurate count + accurate time buckets and near real
time display.

My concern for using event time is as follows...
1- Given that the application is load balanced on 3 separate hosts. Say one
log agent falls/crashes for 1 hour. But the application is still working.
2- It means that 2/3 of the logs are being pushed to the kafka topic,
regardless if it's the big one topic described above or even if we pushed
the logs into 1 specific topic.
3- So if we have a window of 5 minutes it means that 1/3 will miss 12
windows.

So are we saying if we put a latness ND watermark strategy and after 1 hour
if that agent comes back and starts pushing those logs that were generated
1 hour ago, flink will be smart enough to go back and replay all logs from
all hosts from an hour ago? And of course at the sink level we have to have
an idempotent process to make sure we replace the proper key/values???





On Wed, 1 Dec 2021 at 03:00, Schwalbe Matthias 
wrote:

> Hi John,
>
>
>
> Sorry for the delay … I’m a little tight on spare time for user@flink
> currently.
>
> If you are still interested we could pick up the discussion and continue.
>
> However I’m don’t exactly understand what you want to achieve:
>
>1. Would processing time windows be enough for you (and misplacement
>of events into the wrong window acceptable)?
>2. Do you want to use event time windows, but cannot afford losing
>late events? (we can work out a scheme, that this would work)
>3. How do you currently organize your input events in kafka?
>   1. 1 event per log row?
>   2. Kafka-event timestamp extracted from/per the log row?
>   3. You mentioned shuffling (random assignment) to kafka partition,
>
>   i.Is this per log row, or
> is this per log file
>
>  ii.Do you kafka-key by log
> file, or even by log application
>
>1. Do you select log files to be collected in file timestamp order
>1. I assume your windows are keyed by application, or do you use
>another keyBy()?
>2. What watermarking strategy did you configure?
>   1. You mentioned that watermarks advance even if file-ingress is
>   blocked
>   2. Can you publish/share the 3 odd lines of code for your watermark
>   strategy setup?
>
>
>
> Just as said before, ignoring-late-events is a default strategy, that can
> be adjusted by means of a cust

Re: Scala Case Class Serialization

2021-12-07 Thread Lars Skjærven
Thanks for quick response. Please find attached a minimal example
illustrating the issue. I've added implicit TypeInformation, and checked
that I'm importing the scala variant only.

Matthias: Just my superficial impression from [1]. Will look into
TypeInfoFactory.

Thanks again!

package com.mystuff
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
import org.apache.flink.api.common.typeinfo.{TypeInformation}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector

case class TestCaseClass(id: String, pos: Int)

class MyRichFlatMap extends RichFlatMapFunction[TestCaseClass, String] {
  implicit val ttestclass: TypeInformation[TestCaseClass] =
createTypeInformation[TestCaseClass]

  lazy val myState: MapState[String, TestCaseClass] =
getRuntimeContext.getMapState(
new MapStateDescriptor[String, TestCaseClass]("test-state",
classOf[String], ttestclass.getTypeClass)
  )

  override def flatMap(value: TestCaseClass, out: Collector[String]): Unit
= {
myState.put(value.id, value)
myState.get(value.id)
out.collect(value.id)
  }
}

object TestJob {

  def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.createLocalEnvironment()
env.getConfig.disableGenericTypes()

val s = Seq[TestCaseClass](
  TestCaseClass(id = "1", pos = 1),
  TestCaseClass(id = "2", pos = 2),
  TestCaseClass(id = "3", pos = 3),
)

env
  .fromCollection[TestCaseClass](s)
  .keyBy(s => s.id)
  .flatMap(new MyRichFlatMap)
  .print()

env.execute("Test Job")
  }
}

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory

On Tue, Dec 7, 2021 at 2:25 PM Roman Grebennikov  wrote:

> Hi Lars,
>
> can you please show a small reproducer of the way you construct the
> DataStream, and which imports do you use?
>
> We also often experience similar performance issues with scala, but
> usually they are related to accidental usage of Flink Java API. A couple of
> hints from my experience:
> 1. Make sure that you always use the scala DataStream, and not the java
> one.
> 2. All operations on scala datastream require an implicit
> TypeInformation[T] parameter, which is usually generated automatically for
> you if you do an "import org.apache.flink.api.scala._" by the
> createTypeInformation[T] macro. So make sure you have this import present.
> 3. You can do a "env.getConfig.disableGenericTypes" and flink will throw
> an exception each time it have to fall back to generic kryo serialization.
> Backtrace will highlight you an exact place in your code where it have to
> do a kryo fallback.
>
> Also Flink will always revert to Kryo in case if you use sum types (or
> ADTs, or "sealed traits"). Shameless plug: we made a library to support
> that: https://github.com/findify/flink-adt
>
> Roman Grebennikov | g...@dfdx.me
>
>
> On Tue, Dec 7, 2021, at 11:20, Matthias Pohl wrote:
>
> Hi Lars,
> not sure about the out-of-the-box support for case classes with primitive
> member types (could you refer to the section which made you conclude
> this?). I haven't used Scala with Flink, yet. So maybe, others can give
> more context.
> But have you looked into using the TypeInfoFactory to define the schema
> [1]?
>
> Best,
> Matthias
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory
>
> On Tue, Dec 7, 2021 at 10:03 AM Lars Skjærven  wrote:
>
> Hello,
> We're running Flink 1.14 with scala, and we're suspecting that performance
> is suffering due to serialization of some scala case classes. Specifically
> we're seeing that our Case Class "cannot be used as a POJO type because not
> all fields are valid POJO fields, and must be processed as GenericType",
> and that the case class "does not contain a setter for field X". I'm
> interpreting these log messages as performance warnings.
>
> A simple case class example we're writing to state that triggers the
> mentioned 'warnings':
>
> case class Progress(position: Int, eventTime: Int, alive: Boolean)
>
> I'm understanding the docs that case classes with primitive types should
> be supported "out of the box".
>
> Any tips on how to proceed ?
>
> Kind regards,
> Lars
>
>
>
>


Creating custom connector lib - dependency scope

2021-12-07 Thread Krzysztof Chmielewski
Hi all,
I was wondering, when implementing a custom Flink Connector that will be
later used as a dependency for other projects, whether dependencies to
Flink like:
flink-core
flink-runtime
flink-table-common
flink-table-api-java-bridge flink-java
etc...

Should be in scope provided in the connector's pom.xml or it does not
matter if they are in default scope?


Regards,
Krzysztof Chmielewski


How to write from Flink to a write throttled database?

2021-12-07 Thread Jing Lu
Hi, community

I have a Kafka stream and want to use Flink for 10 minutes aggregation.
However, the number of events is large, and the writes are throttled for
the output database for a few seconds during an hour. I was thinking to
write from Flink to another Kafka stream and using another Flink app to
write to a database. Will this smooth the writing? What should I do for the
second Flink app?


Thanks,
Jing


Re: Creating custom connector lib - dependency scope

2021-12-07 Thread Chesnay Schepler
They should be set to provided so that they are not bundled into the 
user-jar.


You can also take a look at the connectors in the Flink repo to see how 
they handle dependencies.


On 07/12/2021 22:31, Krzysztof Chmielewski wrote:

Hi all,
I was wondering, when implementing a custom Flink Connector that will 
be later used as a dependency for other projects, whether dependencies 
to Flink like:

flink-core
flink-runtime
flink-table-common
flink-table-api-java-bridge flink-java
etc...

Should be in scope provided in the connector's pom.xml or it does not 
matter if they are in default scope?



Regards,
Krzysztof Chmielewski




Re: How to write from Flink to a write throttled database?

2021-12-07 Thread Caizhi Weng
Hi!

Which database are you referring to? If there is no officially supported
connector of this database you can create your own sink operator
with GenericWriteAheadSink.

Note that if you're using Flink < 1.14 and if your source is bounded (that
is to say, your source will eventually come to an end and finishes the job)
you might lose the last bit of result. See [1] for detail.

[1] https://lists.apache.org/thread/qffl2pvnng9kkd51z5xp65x7ssnnm526

Jing Lu  于2021年12月8日周三 05:51写道:

> Hi, community
>
> I have a Kafka stream and want to use Flink for 10 minutes aggregation.
> However, the number of events is large, and the writes are throttled for
> the output database for a few seconds during an hour. I was thinking to
> write from Flink to another Kafka stream and using another Flink app to
> write to a database. Will this smooth the writing? What should I do for the
> second Flink app?
>
>
> Thanks,
> Jing
>


Re: Running a flink job with Yarn per-job mode from application code.

2021-12-07 Thread Caizhi Weng
Hi!

So you would like to submit a yarn job with Java code, not using /bin/flink
run?

If that is the case, you'll need to set 'execution.target' config option to
'yarn-per-job'. Set this in the configuration of ExecutionEnvironment and
execute the job with Flink API as normal.

Kamil ty  于2021年12月7日周二 19:16写道:

> Hello all,
>
> I'm looking for a way to submit a Yarn job from another flink jobs
> application code. I can see that you can access a cluster and submit jobs
> with a RestClusterClient, but it seems a Yarn per-job mode is not supported
> with it.
>
> Any suggestions would be appreciated.
>
> Best Regards
> Kamil
>


Re: How to write from Flink to a write throttled database?

2021-12-07 Thread Jing Lu
Hi Cazhi,

Thanks for your reply! The database is DynamoDB. The connector I use is
https://github.com/klarna-incubator/flink-connector-dynamodb. My source is
a continuous event stream. My Flink version is 1.12.

Best,
Jing

On Tue, Dec 7, 2021 at 6:15 PM Caizhi Weng  wrote:

> Hi!
>
> Which database are you referring to? If there is no officially supported
> connector of this database you can create your own sink operator
> with GenericWriteAheadSink.
>
> Note that if you're using Flink < 1.14 and if your source is bounded (that
> is to say, your source will eventually come to an end and finishes the job)
> you might lose the last bit of result. See [1] for detail.
>
> [1] https://lists.apache.org/thread/qffl2pvnng9kkd51z5xp65x7ssnnm526
>
> Jing Lu  于2021年12月8日周三 05:51写道:
>
>> Hi, community
>>
>> I have a Kafka stream and want to use Flink for 10 minutes aggregation.
>> However, the number of events is large, and the writes are throttled for
>> the output database for a few seconds during an hour. I was thinking to
>> write from Flink to another Kafka stream and using another Flink app to
>> write to a database. Will this smooth the writing? What should I do for the
>> second Flink app?
>>
>>
>> Thanks,
>> Jing
>>
>


Re: Issue with incremental checkpointing size

2021-12-07 Thread Caizhi Weng
Hi!

Thanks for the clarification.

Could you expand checkpoint #195 and #196 for details? Slow end-to-end
checkpoint time may be caused by various reasons, for example if the data
processing is slow it will backpressure the checkpoint (if that is the
case, alignment duration should be high for some vertices), or it is
possible that JVM is under heavy GC at that time.

By "increase the parallelism" I mean to increase the parallelism of the
Flink job. If you double the parallelism each subtask will only have to
deal with half of the data before and this should speed up data processing
and checkpointing. state.backend.rocksdb.checkpoint.transfer.thread.num
might help but for a ~300MB checkpoint I guess we do not need to tune this.

Vidya Sagar Mula  于2021年12月7日周二 15:22写道:

> Hi Caizhi,
>
> Thank you for your response.
> I am attaching 3 screen shots for better understanding. Can you please
> take a look.
> Screenshot 1: Flink UI with the checkpoiting history (Size and time taken
> are displayed)
> Screenshot 2: Flink UI for Task managers. I have one TM.
> Screenshot 3: Grafana UI for CPU and memory utilization.
>
> For point 1: It is incremental checkpointing. However, I am making my
> input data in such a way that the keys in the data records are not the
> same. So, it is going to be full checkpointing size.
>
>
> For point 2: If you notice the screen shot with Flink UI, when the
> checkpointing size is reached to 267MB, the time taken is almost 4 mins,
> which is definitely not expected. You mentioned increasing the parallelism.
> Can you please explain a little bit more on this?
> There is a RocksDB configurable parameter "
> *state.backend.rocksdb.checkpoint.transfer.thread.num"* (default=1). Are
> you referring to this parameter to increase the parallelism. Can you please
> elaborate on this?
>
>
> And also, there are some tunable RocksDB parameters as mentioned below. I
> am currently using the default values. Do I need to set some higher values
> to increase the checkpoint size with the increased input data size.
>
> Can you please clarify if I need to tune up any of the configurable
> paremeters to increase the checkpointing size and to reduce the time taken
> for each checkpointing.
>
> *state.backend.rocksdb.checkpoint.transfer.thread.num (default to 1)*
>
> *state.backend.rocksdb.writebuffer.size (RocksDB defaults to 4MB)*
>
> state.backend.rocksdb.writebuffer.count (RocksDB defaults to 2)
>
> state.backend.rocksdb.writebuffer.number-to-merge (RocksDB defaults to 1)
>
> *state.backend.rocksdb.block.blocksize (RocksDB defaults to 4KB)*
>
> state.backend.rocksdb.block.cache-size ( RocksDB defaults to 8MB)
>
>
>
>
> On Mon, Dec 6, 2021 at 6:05 PM Caizhi Weng  wrote:
>
>> Hi!
>>
>> the checkpointing size is not going beyond 300 MB
>>
>>
>> Is 300MB the total size of checkpoint or the incremental size of
>> checkpoint? If it is the latter one, Flink will only store necessary
>> information (for example the keys and the fields that are selected) in
>> checkpoint and it is compressed, so for 3~5GB input records it is
>> reasonable for the incremental checkpoint size to shrink to ~300MB. Of
>> course this depends on the detailed workflow.
>>
>> there must be something bottleneck with Flink RocksDB configurations.
>>>
>>
>> By "bottleneck" do you mean the checkpoint speed is too slow? If yes you
>> can try to increase the parallelism of the job so that there will be less
>> burden on each parallelism when making checkpoints.
>>
>> Vidya Sagar Mula  于2021年12月7日周二 08:04写道:
>>
>>> Hi,
>>>
>>> In my project, we are trying to configure the "Incremental
>>> checkpointing" with RocksDB in the backend.
>>>
>>> We are using Flink 1.11 version and RockDB with AWS : S3 backend
>>>
>>> Issue:
>>> --
>>> In my pipeline, my window size is 5 mins and the incremental
>>> checkpointing is happening for every 2 mins.
>>> I am pumping the data in such a way that the keys are not the same for
>>> each record. That means, the incremental checkpointing size should keep
>>> increasing for each checkpoint.
>>>
>>> So, the expectation here is that the size of the checkpointing should
>>> reach atleast 3-5 GB with the amount of the data pumped in.
>>>
>>> However, the checkpointing size is not going beyond 300 MB and that too
>>> it is taking around 2 mins duration for taking this 300 MB checkpoint.
>>>
>>> In my set up, I am using
>>>
>>> Cluster: Cloud cluster with instance storage.
>>> Memory : 20 GB,
>>> Heap : 10 GB
>>> Flink Memory: 4.5 GB
>>> Flink Version : 1.11
>>> Back end: RocksDB with AWS S3 backend
>>>
>>>
>>> I would feel that, there must be something bottleneck with Flink RocksDB
>>> configurations.
>>> Can you please advise me?
>>>
>>> Thanks,
>>>
>>>
>>>
>>>
>>>


Re: How to write from Flink to a write throttled database?

2021-12-07 Thread Caizhi Weng
Hi!

Thanks for the clarification.

I'm not familiar with DynamoDB and you might want to modify this connector
a bit. Will a WriteRequest immediately send write requests to the database?
If yes you may want to instead cache the requests in memory and send them
only at snapshots. See [1] for the code to deal with incoming records and
[2] for snapshots.

[1]
https://github.com/klarna-incubator/flink-connector-dynamodb/blob/10e8abaec2ef5473618efba41c31484dadb9ad39/src/main/java/com/klarna/flink/connectors/dynamodb/FlinkDynamoDBSink.java#L127
[2]
https://github.com/klarna-incubator/flink-connector-dynamodb/blob/10e8abaec2ef5473618efba41c31484dadb9ad39/src/main/java/com/klarna/flink/connectors/dynamodb/FlinkDynamoDBSink.java#L202

Jing Lu  于2021年12月8日周三 10:22写道:

> Hi Cazhi,
>
> Thanks for your reply! The database is DynamoDB. The connector I use is
> https://github.com/klarna-incubator/flink-connector-dynamodb. My source
> is a continuous event stream. My Flink version is 1.12.
>
> Best,
> Jing
>
> On Tue, Dec 7, 2021 at 6:15 PM Caizhi Weng  wrote:
>
>> Hi!
>>
>> Which database are you referring to? If there is no officially supported
>> connector of this database you can create your own sink operator
>> with GenericWriteAheadSink.
>>
>> Note that if you're using Flink < 1.14 and if your source is bounded
>> (that is to say, your source will eventually come to an end and finishes
>> the job) you might lose the last bit of result. See [1] for detail.
>>
>> [1] https://lists.apache.org/thread/qffl2pvnng9kkd51z5xp65x7ssnnm526
>>
>> Jing Lu  于2021年12月8日周三 05:51写道:
>>
>>> Hi, community
>>>
>>> I have a Kafka stream and want to use Flink for 10 minutes aggregation.
>>> However, the number of events is large, and the writes are throttled for
>>> the output database for a few seconds during an hour. I was thinking to
>>> write from Flink to another Kafka stream and using another Flink app to
>>> write to a database. Will this smooth the writing? What should I do for the
>>> second Flink app?
>>>
>>>
>>> Thanks,
>>> Jing
>>>
>>


Which issue or commit should i merge in flink-1.13.3 for buffer debloating?

2021-12-07 Thread vtygoss
Hi community!


Because of the limitation of connector, i couldn't upgrade apache flink from 
version 1.13.3 to versin 1.14.0. But i really need the important feature of 
buffer debloating in 1.14.0 for heavy checkpoint at backpressure. 


So which issue or commit should i merge in flink-1.13.3 for buffer debloating? 


- [FLINK-24189] Perform buffer debloating per single gate
- PR-17024 mentioned in  https://issues.apache.org/jira/browse/FLINK-23973


Thanks for your any reply or suggestion. 


Best Regards!

Re: How to write from Flink to a write throttled database?

2021-12-07 Thread Jing Lu
Hi Caizhi,

Here is my current configuration:

val dynamoDBSinkConfig: DynamoDBSinkConfig =
  (new DynamoDBSinkConfig.Builder).batchSize(50).queueLimit(20).build()

new FlinkDynamoDBSink[Row](
  dynamoDBBuilder,
  "tablename",
  dynamoDBSinkConfig,
  mapper
)


I think this is batch write.




On Tue, Dec 7, 2021 at 6:34 PM Caizhi Weng  wrote:

> Hi!
>
> Thanks for the clarification.
>
> I'm not familiar with DynamoDB and you might want to modify this connector
> a bit. Will a WriteRequest immediately send write requests to the database?
> If yes you may want to instead cache the requests in memory and send them
> only at snapshots. See [1] for the code to deal with incoming records and
> [2] for snapshots.
>
> [1]
> https://github.com/klarna-incubator/flink-connector-dynamodb/blob/10e8abaec2ef5473618efba41c31484dadb9ad39/src/main/java/com/klarna/flink/connectors/dynamodb/FlinkDynamoDBSink.java#L127
> [2]
> https://github.com/klarna-incubator/flink-connector-dynamodb/blob/10e8abaec2ef5473618efba41c31484dadb9ad39/src/main/java/com/klarna/flink/connectors/dynamodb/FlinkDynamoDBSink.java#L202
>
> Jing Lu  于2021年12月8日周三 10:22写道:
>
>> Hi Cazhi,
>>
>> Thanks for your reply! The database is DynamoDB. The connector I use is
>> https://github.com/klarna-incubator/flink-connector-dynamodb. My source
>> is a continuous event stream. My Flink version is 1.12.
>>
>> Best,
>> Jing
>>
>> On Tue, Dec 7, 2021 at 6:15 PM Caizhi Weng  wrote:
>>
>>> Hi!
>>>
>>> Which database are you referring to? If there is no officially supported
>>> connector of this database you can create your own sink operator
>>> with GenericWriteAheadSink.
>>>
>>> Note that if you're using Flink < 1.14 and if your source is bounded
>>> (that is to say, your source will eventually come to an end and finishes
>>> the job) you might lose the last bit of result. See [1] for detail.
>>>
>>> [1] https://lists.apache.org/thread/qffl2pvnng9kkd51z5xp65x7ssnnm526
>>>
>>> Jing Lu  于2021年12月8日周三 05:51写道:
>>>
 Hi, community

 I have a Kafka stream and want to use Flink for 10 minutes aggregation.
 However, the number of events is large, and the writes are throttled for
 the output database for a few seconds during an hour. I was thinking to
 write from Flink to another Kafka stream and using another Flink app to
 write to a database. Will this smooth the writing? What should I do for the
 second Flink app?


 Thanks,
 Jing

>>>


Job Listener not working as expected

2021-12-07 Thread Puneet Duggal
Hi,

I have registered a job listener which notifies slack with JobId on successful 
submission. Also it notifies slack on successful/failed Execution. Now this job 
listener is working as expected when running on local IDE  , but somehow 
behaving unexpectedly when running on a cluster i.e. both onJobSubmitted and 
onJobExecuted are being called simultaneously on submitting a real time data 
streaming job. Currently, jobs are being deployed in session mode. 

Thanks,
Puneet Duggal

Re: How to write from Flink to a write throttled database?

2021-12-07 Thread Jing Lu
Write throttled but I may not have data loss, right? I saw this line in
producer:

https://github.com/klarna-incubator/flink-connector-dynamodb/blob/10e8abaec2ef5473618efba41c31484dadb9ad39/src/main/java/com/klarna/flink/connectors/dynamodb/FlinkDynamoDBSink.java#L202




On Tue, Dec 7, 2021 at 7:48 PM Jing Lu  wrote:

> Hi Caizhi,
>
> Here is my current configuration:
>
> val dynamoDBSinkConfig: DynamoDBSinkConfig =
>   (new DynamoDBSinkConfig.Builder).batchSize(50).queueLimit(20).build()
>
> new FlinkDynamoDBSink[Row](
>   dynamoDBBuilder,
>   "tablename",
>   dynamoDBSinkConfig,
>   mapper
> )
>
>
> I think this is batch write.
>
>
>
>
> On Tue, Dec 7, 2021 at 6:34 PM Caizhi Weng  wrote:
>
>> Hi!
>>
>> Thanks for the clarification.
>>
>> I'm not familiar with DynamoDB and you might want to modify this
>> connector a bit. Will a WriteRequest immediately send write requests to the
>> database? If yes you may want to instead cache the requests in memory and
>> send them only at snapshots. See [1] for the code to deal with incoming
>> records and [2] for snapshots.
>>
>> [1]
>> https://github.com/klarna-incubator/flink-connector-dynamodb/blob/10e8abaec2ef5473618efba41c31484dadb9ad39/src/main/java/com/klarna/flink/connectors/dynamodb/FlinkDynamoDBSink.java#L127
>> [2]
>> https://github.com/klarna-incubator/flink-connector-dynamodb/blob/10e8abaec2ef5473618efba41c31484dadb9ad39/src/main/java/com/klarna/flink/connectors/dynamodb/FlinkDynamoDBSink.java#L202
>>
>> Jing Lu  于2021年12月8日周三 10:22写道:
>>
>>> Hi Cazhi,
>>>
>>> Thanks for your reply! The database is DynamoDB. The connector I use is
>>> https://github.com/klarna-incubator/flink-connector-dynamodb. My source
>>> is a continuous event stream. My Flink version is 1.12.
>>>
>>> Best,
>>> Jing
>>>
>>> On Tue, Dec 7, 2021 at 6:15 PM Caizhi Weng  wrote:
>>>
 Hi!

 Which database are you referring to? If there is no officially
 supported connector of this database you can create your own sink operator
 with GenericWriteAheadSink.

 Note that if you're using Flink < 1.14 and if your source is bounded
 (that is to say, your source will eventually come to an end and finishes
 the job) you might lose the last bit of result. See [1] for detail.

 [1] https://lists.apache.org/thread/qffl2pvnng9kkd51z5xp65x7ssnnm526

 Jing Lu  于2021年12月8日周三 05:51写道:

> Hi, community
>
> I have a Kafka stream and want to use Flink for 10 minutes
> aggregation. However, the number of events is large, and the writes are
> throttled for the output database for a few seconds during an hour. I was
> thinking to write from Flink to another Kafka stream and using another
> Flink app to write to a database. Will this smooth the writing? What 
> should
> I do for the second Flink app?
>
>
> Thanks,
> Jing
>



Weird operator ID check restore from checkpoint fails

2021-12-07 Thread Dan Hill
Hi.  I noticed this warning has "operator 811d3b279c8b26ed99ff0883b7630242"
in it.  I assume this should be an operator uid or name.  It looks like
something else.  What is it?  Is something corrupted?


org.apache.flink.runtime.client.JobInitializationException: Could not
instantiate JobManager.
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:494)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Failed to rollback to
checkpoint/savepoint
s3a://my-flink-state/checkpoints/ce9b90eafde97ca4629c13936c34426f/chk-626.
Cannot map checkpoint/savepoint state for operator
811d3b279c8b26ed99ff0883b7630242 to the new program, because the
operator is not available in the new program. If you want to allow to
skip this, you can set the --allowNonRestoredState option on the CLI.
at 
org.apache.flink.runtime.checkpoint.Checkpoints.throwNonRestoredStateException(Checkpoints.java:226)
at 
org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:190)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1627)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:362)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:292)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:249)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:133)
at 
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111)
at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)
at 
org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:330)
at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95)
at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39)
at 
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:162)
at 
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:478)
... 4 more


Re: Weird operator ID check restore from checkpoint fails

2021-12-07 Thread Dan Hill
I'm restoring the job with the same binary and same flags/args.

On Tue, Dec 7, 2021 at 8:48 PM Dan Hill  wrote:

> Hi.  I noticed this warning has "operator
> 811d3b279c8b26ed99ff0883b7630242" in it.  I assume this should be an
> operator uid or name.  It looks like something else.  What is it?  Is
> something corrupted?
>
>
> org.apache.flink.runtime.client.JobInitializationException: Could not 
> instantiate JobManager.
>   at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:494)
>   at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalStateException: Failed to rollback to 
> checkpoint/savepoint 
> s3a://my-flink-state/checkpoints/ce9b90eafde97ca4629c13936c34426f/chk-626. 
> Cannot map checkpoint/savepoint state for operator 
> 811d3b279c8b26ed99ff0883b7630242 to the new program, because the operator is 
> not available in the new program. If you want to allow to skip this, you can 
> set the --allowNonRestoredState option on the CLI.
>   at 
> org.apache.flink.runtime.checkpoint.Checkpoints.throwNonRestoredStateException(Checkpoints.java:226)
>   at 
> org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:190)
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1627)
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:362)
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:292)
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:249)
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:133)
>   at 
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:330)
>   at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95)
>   at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39)
>   at 
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:162)
>   at 
> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86)
>   at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:478)
>   ... 4 more
>
>


Re: Weird operator ID check restore from checkpoint fails

2021-12-07 Thread Robert Metzger
Hi Dan,

When restoring a savepoint/checkpoint, Flink is matching the state for the
operators based on the uuid of the operator. The exception says that there
is some state that doesn't match any operator. So from Flink's perspective,
the operator is gone.
Here is more information:
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#assigning-operator-ids


Somehow something must have changed in your job: Did you change the Flink
version?

Hope this helps!

On Wed, Dec 8, 2021 at 5:49 AM Dan Hill  wrote:

> I'm restoring the job with the same binary and same flags/args.
>
> On Tue, Dec 7, 2021 at 8:48 PM Dan Hill  wrote:
>
>> Hi.  I noticed this warning has "operator
>> 811d3b279c8b26ed99ff0883b7630242" in it.  I assume this should be an
>> operator uid or name.  It looks like something else.  What is it?  Is
>> something corrupted?
>>
>>
>> org.apache.flink.runtime.client.JobInitializationException: Could not 
>> instantiate JobManager.
>>  at 
>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:494)
>>  at 
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>  at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.IllegalStateException: Failed to rollback to 
>> checkpoint/savepoint 
>> s3a://my-flink-state/checkpoints/ce9b90eafde97ca4629c13936c34426f/chk-626. 
>> Cannot map checkpoint/savepoint state for operator 
>> 811d3b279c8b26ed99ff0883b7630242 to the new program, because the operator is 
>> not available in the new program. If you want to allow to skip this, you can 
>> set the --allowNonRestoredState option on the CLI.
>>  at 
>> org.apache.flink.runtime.checkpoint.Checkpoints.throwNonRestoredStateException(Checkpoints.java:226)
>>  at 
>> org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:190)
>>  at 
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1627)
>>  at 
>> org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:362)
>>  at 
>> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:292)
>>  at 
>> org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:249)
>>  at 
>> org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:133)
>>  at 
>> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111)
>>  at 
>> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)
>>  at 
>> org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:330)
>>  at 
>> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95)
>>  at 
>> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39)
>>  at 
>> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:162)
>>  at 
>> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86)
>>  at 
>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:478)
>>  ... 4 more
>>
>>


Re: Weird operator ID check restore from checkpoint fails

2021-12-07 Thread Dan Hill
Nothing changed (as far as I know).  It's the same binary and the same
args.  It's Flink v1.12.3.  I'm going to switch away from auto-gen uids and
see if that helps.  The job randomly started failing to checkpoint.  I
cancelled the job and started it from the last successful checkpoint.

I'm confused why `811d3b279c8b26ed99ff0883b7630242` is used and not the
auto-generated uid.  That seems like a bug.

On Tue, Dec 7, 2021 at 10:40 PM Robert Metzger  wrote:

> Hi Dan,
>
> When restoring a savepoint/checkpoint, Flink is matching the state for the
> operators based on the uuid of the operator. The exception says that there
> is some state that doesn't match any operator. So from Flink's perspective,
> the operator is gone.
> Here is more information:
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#assigning-operator-ids
>
>
> Somehow something must have changed in your job: Did you change the Flink
> version?
>
> Hope this helps!
>
> On Wed, Dec 8, 2021 at 5:49 AM Dan Hill  wrote:
>
>> I'm restoring the job with the same binary and same flags/args.
>>
>> On Tue, Dec 7, 2021 at 8:48 PM Dan Hill  wrote:
>>
>>> Hi.  I noticed this warning has "operator
>>> 811d3b279c8b26ed99ff0883b7630242" in it.  I assume this should be an
>>> operator uid or name.  It looks like something else.  What is it?  Is
>>> something corrupted?
>>>
>>>
>>> org.apache.flink.runtime.client.JobInitializationException: Could not 
>>> instantiate JobManager.
>>> at 
>>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:494)
>>> at 
>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.IllegalStateException: Failed to rollback to 
>>> checkpoint/savepoint 
>>> s3a://my-flink-state/checkpoints/ce9b90eafde97ca4629c13936c34426f/chk-626. 
>>> Cannot map checkpoint/savepoint state for operator 
>>> 811d3b279c8b26ed99ff0883b7630242 to the new program, because the operator 
>>> is not available in the new program. If you want to allow to skip this, you 
>>> can set the --allowNonRestoredState option on the CLI.
>>> at 
>>> org.apache.flink.runtime.checkpoint.Checkpoints.throwNonRestoredStateException(Checkpoints.java:226)
>>> at 
>>> org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:190)
>>> at 
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1627)
>>> at 
>>> org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:362)
>>> at 
>>> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:292)
>>> at 
>>> org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:249)
>>> at 
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:133)
>>> at 
>>> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111)
>>> at 
>>> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)
>>> at 
>>> org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:330)
>>> at 
>>> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95)
>>> at 
>>> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39)
>>> at 
>>> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:162)
>>> at 
>>> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86)
>>> at 
>>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:478)
>>> ... 4 more
>>>
>>>


Re: Job Listener not working as expected

2021-12-07 Thread Robert Metzger
Hi Puneet,

Are you submitting the Flink jobs using the "/bin/flink" command line tool
to a cluster in session mode?
Maybe the command line tool is just "fire and forget" submitting the job to
the cluster, that's why the listeners are firing immediately.
Can you try to use "env.executeAsync()" instead of "execute()"? (Sorry, I
don't have time right now to experiment with this myself). In either case,
the command line tool needs to stay connected to the cluster to listen to
the job status.
What probably works is using the Application Mode, instead of Session Mode.
In AppMode, the main() method runs on the cluster.

Best,
Robert


On Wed, Dec 8, 2021 at 4:55 AM Puneet Duggal 
wrote:

> Hi,
>
> I have registered a job listener which notifies slack with JobId on
> successful submission. Also it notifies slack on successful/failed
> Execution. Now this job listener is working as expected when running on
> local IDE  , but somehow behaving unexpectedly when running on a cluster
> i.e. both *onJobSubmitted *and *onJobExecuted *are being called
> simultaneously on submitting a real time data streaming job. Currently,
> jobs are being deployed in session mode.
>
> Thanks,
> Puneet Duggal
>


Re: Weird operator ID check restore from checkpoint fails

2021-12-07 Thread Robert Metzger
811d3b279c8b26ed99ff0883b7630242 is the operator id.
If I'm not mistaken, running the job graph generation (e.g. the main
method) in DEBUG log level will show you all the IDs generated. This should
help you map this ID to your code.

On Wed, Dec 8, 2021 at 7:52 AM Dan Hill  wrote:

> Nothing changed (as far as I know).  It's the same binary and the same
> args.  It's Flink v1.12.3.  I'm going to switch away from auto-gen uids and
> see if that helps.  The job randomly started failing to checkpoint.  I
> cancelled the job and started it from the last successful checkpoint.
>
> I'm confused why `811d3b279c8b26ed99ff0883b7630242` is used and not the
> auto-generated uid.  That seems like a bug.
>
> On Tue, Dec 7, 2021 at 10:40 PM Robert Metzger 
> wrote:
>
>> Hi Dan,
>>
>> When restoring a savepoint/checkpoint, Flink is matching the state for
>> the operators based on the uuid of the operator. The exception says that
>> there is some state that doesn't match any operator. So from Flink's
>> perspective, the operator is gone.
>> Here is more information:
>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#assigning-operator-ids
>>
>>
>> Somehow something must have changed in your job: Did you change the Flink
>> version?
>>
>> Hope this helps!
>>
>> On Wed, Dec 8, 2021 at 5:49 AM Dan Hill  wrote:
>>
>>> I'm restoring the job with the same binary and same flags/args.
>>>
>>> On Tue, Dec 7, 2021 at 8:48 PM Dan Hill  wrote:
>>>
 Hi.  I noticed this warning has "operator
 811d3b279c8b26ed99ff0883b7630242" in it.  I assume this should be an
 operator uid or name.  It looks like something else.  What is it?  Is
 something corrupted?


 org.apache.flink.runtime.client.JobInitializationException: Could not 
 instantiate JobManager.
at 
 org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:494)
at 
 java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
 Caused by: java.lang.IllegalStateException: Failed to rollback to 
 checkpoint/savepoint 
 s3a://my-flink-state/checkpoints/ce9b90eafde97ca4629c13936c34426f/chk-626. 
 Cannot map checkpoint/savepoint state for operator 
 811d3b279c8b26ed99ff0883b7630242 to the new program, because the operator 
 is not available in the new program. If you want to allow to skip this, 
 you can set the --allowNonRestoredState option on the CLI.
at 
 org.apache.flink.runtime.checkpoint.Checkpoints.throwNonRestoredStateException(Checkpoints.java:226)
at 
 org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:190)
at 
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1627)
at 
 org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:362)
at 
 org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:292)
at 
 org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:249)
at 
 org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:133)
at 
 org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111)
at 
 org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)
at 
 org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:330)
at 
 org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95)
at 
 org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39)
at 
 org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:162)
at 
 org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86)
at 
 org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:478)
... 4 more




Re: Which issue or commit should i merge in flink-1.13.3 for buffer debloating?

2021-12-07 Thread Robert Metzger
Hi,

I guess all the commits mentioned in all the subtasks of this ticket will
give you the feature: https://issues.apache.org/jira/browse/FLINK-23451

Hower, I'm pretty sure that you can't just cherry-pick such a big feature
to an older Flink version.

I would rather try to fix the connector to upgrade to 1.14.

On Wed, Dec 8, 2021 at 4:07 AM vtygoss  wrote:

> Hi community!
>
>
> Because of the limitation of connector, i couldn't upgrade apache flink
> from version 1.13.3 to versin 1.14.0. But i really need the important
> feature of buffer debloating in 1.14.0 for heavy checkpoint at
> backpressure.
>
>
> So which issue or commit should i merge in flink-1.13.3 for buffer
> debloating?
>
>
> - [FLINK-24189] Perform buffer debloating per single gate
>
> - PR-17024 mentioned in  https://issues.apache.org/jira/browse/FLINK-23973
>
>
> Thanks for your any reply or suggestion.
>
>
> Best Regards!
>


Re: Weird operator ID check restore from checkpoint fails

2021-12-07 Thread Dan Hill
Thanks!

On Tue, Dec 7, 2021, 22:55 Robert Metzger  wrote:

> 811d3b279c8b26ed99ff0883b7630242 is the operator id.
> If I'm not mistaken, running the job graph generation (e.g. the main
> method) in DEBUG log level will show you all the IDs generated. This should
> help you map this ID to your code.
>
> On Wed, Dec 8, 2021 at 7:52 AM Dan Hill  wrote:
>
>> Nothing changed (as far as I know).  It's the same binary and the same
>> args.  It's Flink v1.12.3.  I'm going to switch away from auto-gen uids and
>> see if that helps.  The job randomly started failing to checkpoint.  I
>> cancelled the job and started it from the last successful checkpoint.
>>
>> I'm confused why `811d3b279c8b26ed99ff0883b7630242` is used and not the
>> auto-generated uid.  That seems like a bug.
>>
>> On Tue, Dec 7, 2021 at 10:40 PM Robert Metzger 
>> wrote:
>>
>>> Hi Dan,
>>>
>>> When restoring a savepoint/checkpoint, Flink is matching the state for
>>> the operators based on the uuid of the operator. The exception says that
>>> there is some state that doesn't match any operator. So from Flink's
>>> perspective, the operator is gone.
>>> Here is more information:
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#assigning-operator-ids
>>>
>>>
>>> Somehow something must have changed in your job: Did you change the
>>> Flink version?
>>>
>>> Hope this helps!
>>>
>>> On Wed, Dec 8, 2021 at 5:49 AM Dan Hill  wrote:
>>>
 I'm restoring the job with the same binary and same flags/args.

 On Tue, Dec 7, 2021 at 8:48 PM Dan Hill  wrote:

> Hi.  I noticed this warning has "operator
> 811d3b279c8b26ed99ff0883b7630242" in it.  I assume this should be an
> operator uid or name.  It looks like something else.  What is it?  Is
> something corrupted?
>
>
> org.apache.flink.runtime.client.JobInitializationException: Could not 
> instantiate JobManager.
>   at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:494)
>   at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalStateException: Failed to rollback to 
> checkpoint/savepoint 
> s3a://my-flink-state/checkpoints/ce9b90eafde97ca4629c13936c34426f/chk-626.
>  Cannot map checkpoint/savepoint state for operator 
> 811d3b279c8b26ed99ff0883b7630242 to the new program, because the operator 
> is not available in the new program. If you want to allow to skip this, 
> you can set the --allowNonRestoredState option on the CLI.
>   at 
> org.apache.flink.runtime.checkpoint.Checkpoints.throwNonRestoredStateException(Checkpoints.java:226)
>   at 
> org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:190)
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1627)
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:362)
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:292)
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:249)
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:133)
>   at 
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:330)
>   at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95)
>   at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39)
>   at 
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:162)
>   at 
> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86)
>   at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:478)
>   ... 4 more
>
>


Re: Customize Kafka client (module.yaml)

2021-12-07 Thread Robert Metzger
Hi Jérémy,

In my understanding of the StateFun docs, you need to pass custom
properties using "ingress.spec.properties".
For example:

ingresses:
  - ingress:
  meta:
type: io.statefun.kafka/ingress
id: project.A/input
  spec:
properties:

  max.request.size: 11000

(or the nested variant?)



On Tue, Dec 7, 2021 at 4:31 PM Jérémy Albrecht  wrote:

> Hi All,
>
> I encounter a blocking problem linked to exchanging messages between
> Stateful functions.
> The context is: I am sending a very large payload from a Stateful Function
> to a Kafka topic. I am blocked by the Kafka client (I think) because here
> is the output of the statefun-manager container:
> Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The
> message is 6660172 bytes when serialized which is larger than the maximum
> request size you have configured with the max.request.size configuration.
>
> Now if I take a look at the documentation (
> https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/io/apache-kafka/)
>  they
> refer to the Confluent doc to customize the configuration of the Kafka
> client. It is unclear on how to implement this into the module.yaml file. I
> tried several ways:
>
> ingresses:
>   - ingress:
>   meta:
> type: io.statefun.kafka/ingress
> id: project.A/input
>   spec:
> max:
>   request:
> size: 104857600
> max.request.size: 11000
> message:
>   max:
> bytes: 104857600
> address: kafka:9092
> consumerGroupId: my-consumer-group
> startupPosition:
>   type: earliest
> topics:
>   - topic: entry # used for retrop-compatibility, to be removed in 
> next release
> valueType: project.A/Message
> targets:
>   - project.redacted/Entry
>
>
> None of the above solutions seems to be working.
> Does anyone have the ability to clarify what I am not doing correctly ?
>
> Thanks in advance,
> Jérémy
>


Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

2021-12-07 Thread Robert Metzger
Hi Ayush,

I couldn't find the documentation you've mentioned. Can you send me a link
to it?

On Tue, Dec 7, 2021 at 10:40 AM Ayush Chauhan 
wrote:

> Hi,
>
> Can you please let me know the alternatives of isEndOfStream() as now
> according to docs this method will no longer be used to determine the end
> of the stream.
>
> --
>  Ayush Chauhan
>  Data Platform
>  [image: mobile-icon]  +91 9990747111
>
>
> This email is intended only for the person or the entity to whom it is
> addressed. If you are not the intended recipient, please delete this email
> and contact the sender.
>


Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

2021-12-07 Thread Ayush Chauhan
https://github.com/apache/flink/blob/release-1.13.1/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java#L69



On Wed, Dec 8, 2021 at 12:40 PM Robert Metzger  wrote:

> Hi Ayush,
>
> I couldn't find the documentation you've mentioned. Can you send me a link
> to it?
>
> On Tue, Dec 7, 2021 at 10:40 AM Ayush Chauhan 
> wrote:
>
>> Hi,
>>
>> Can you please let me know the alternatives of isEndOfStream() as now
>> according to docs this method will no longer be used to determine the end
>> of the stream.
>>
>> --
>>  Ayush Chauhan
>>  Data Platform
>>  [image: mobile-icon]  +91 9990747111
>>
>>
>> This email is intended only for the person or the entity to whom it is
>> addressed. If you are not the intended recipient, please delete this email
>> and contact the sender.
>>
>

-- 
 Ayush Chauhan
 Data Platform
 [image: mobile-icon]  +91 9990747111

-- 












This email is intended only for the person or the entity to 
whom it is addressed. If you are not the intended recipient, please delete 
this email and contact the sender.



Re: Scala Case Class Serialization

2021-12-07 Thread Roman Grebennikov
Hi,

I guess the problematic line where the kryo fallback is happening is here:

  lazy val myState: MapState[String, TestCaseClass] = 
getRuntimeContext.getMapState(
new MapStateDescriptor[String, TestCaseClass]("test-state", 
classOf[String], ttestclass.getTypeClass)
 

MapStateDescriptor has multiple constructors, some of them do have strong java 
smell :)
The one you've used here with classOf[String] - is passing a class instance 
inside of the java constructor, and the constructor implicitly uses java 
typeinformation derivation under the hood, which has no idea about scala.

MapStateDescriptor also has another constructor, which can take the explicit 
TypeInformation for key and value, like this:
val keyTypeInfo = createTypeInformation[String]
val valueTypeInfo = createTypeInformation[TestCaseClass]
new MapStateDescriptor[String,TestCaseClass]("test", keyTypeInfo, valueTypeInfo)

then it won't try to behave too smart, won't try to derive typeinfo for 
Class[_] and will use the one you provided.

with best regards,
Roman Grebennikov | g...@dfdx.me


On Tue, Dec 7, 2021, at 19:05, Lars Skjærven wrote:
> Thanks for quick response. Please find attached a minimal example 
> illustrating the issue. I've added implicit TypeInformation, and checked that 
> I'm importing the scala variant only. 
> 
> Matthias: Just my superficial impression from [1]. Will look into 
> TypeInfoFactory. 
> 
> Thanks again!
> 
> package com.mystuff
> import org.apache.flink.api.common.functions.RichFlatMapFunction
> import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
> import org.apache.flink.api.common.typeinfo.{TypeInformation}
> import org.apache.flink.api.scala._
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.util.Collector
> 
> case class TestCaseClass(id: String, pos: Int)
> 
> class MyRichFlatMap extends RichFlatMapFunction[TestCaseClass, String] {
>   implicit val ttestclass: TypeInformation[TestCaseClass] = 
> createTypeInformation[TestCaseClass]
> 
>   lazy val myState: MapState[String, TestCaseClass] = 
> getRuntimeContext.getMapState(
> new MapStateDescriptor[String, TestCaseClass]("test-state", 
> classOf[String], ttestclass.getTypeClass)
>   )
> 
>   override def flatMap(value: TestCaseClass, out: Collector[String]): Unit = {
> myState.put(value.id, value)
> myState.get(value.id)
> out.collect(value.id)
>   }
> }
> 
> object TestJob {
> 
>   def main(args: Array[String]): Unit = {
> 
> val env = StreamExecutionEnvironment.createLocalEnvironment()
> env.getConfig.disableGenericTypes()
> 
> val s = Seq[TestCaseClass](
>   TestCaseClass(id = "1", pos = 1),
>   TestCaseClass(id = "2", pos = 2),
>   TestCaseClass(id = "3", pos = 3),
> )
> 
> env
>   .fromCollection[TestCaseClass](s)
>   .keyBy(s => s.id)
>   .flatMap(new MyRichFlatMap)
>   .print()
> 
> env.execute("Test Job")
>   }
> }
> 
> [1] 
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory
> 
> On Tue, Dec 7, 2021 at 2:25 PM Roman Grebennikov  wrote:
>> __
>> Hi Lars,
>> 
>> can you please show a small reproducer of the way you construct the 
>> DataStream, and which imports do you use?
>> 
>> We also often experience similar performance issues with scala, but usually 
>> they are related to accidental usage of Flink Java API. A couple of hints 
>> from my experience:
>> 1. Make sure that you always use the scala DataStream, and not the java one.
>> 2. All operations on scala datastream require an implicit TypeInformation[T] 
>> parameter, which is usually generated automatically for you if you do an 
>> "import org.apache.flink.api.scala._" by the createTypeInformation[T] macro. 
>> So make sure you have this import present.
>> 3. You can do a "env.getConfig.disableGenericTypes" and flink will throw an 
>> exception each time it have to fall back to generic kryo serialization. 
>> Backtrace will highlight you an exact place in your code where it have to do 
>> a kryo fallback.
>> 
>> Also Flink will always revert to Kryo in case if you use sum types (or ADTs, 
>> or "sealed traits"). Shameless plug: we made a library to support that: 
>> https://github.com/findify/flink-adt
>> 
>> Roman Grebennikov | g...@dfdx.me
>> 
>> 
>> On Tue, Dec 7, 2021, at 11:20, Matthias Pohl wrote:
>>> Hi Lars,
>>> not sure about the out-of-the-box support for case classes with primitive 
>>> member types (could you refer to the section which made you conclude 
>>> this?). I haven't used Scala with Flink, yet. So maybe, others can give 
>>> more context.
>>> But have you looked into using the TypeInfoFactory to define the schema [1]?
>>> 
>>> Best,
>>> Matthias
>>> 
>>> [1] 
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-usi