Re: Json deserialisation with .jsonValue vs format=json in Table API

2022-02-16 Thread Илья Соин
Thank you, Francesco

> On 3 Feb 2022, at 18:21, Francesco Guardiani  wrote:
> 
> Hi,
> 
> I think the more stable option would be the first one, as it also gives you 
> more flexibility. Reading the row as string and then parsing it in a query 
> definitely costs more, and makes less straightforward to use the other Schema 
> features of table, such as watermark definition, primary keys, etc.
> 
> I guess you can implement it straightforwardly subclassing the existing json 
> format provided by flink, in particular JsonRowDataDeserializationSchema.
> 
> A third solution would be to create a SplitFunction, like the one you 
> created, which directly performs the parsing, outputting rows rather than 
> strings. This removes the double parsing issue, but still create problems 
> when interacting with other schema features.
> 
> Hope it helps,
> FG
> 
> On Thu, Feb 3, 2022 at 3:56 PM Илья Соин  > wrote:
> Hi, 
> 
> I’m using the Table / SQL API. 
> 
> I have a stream of strings, where each message contains several json strings 
> separated by "\n”. 
> For example:
> {“timestamp”: “2021-01-01T00:00:00”, person: {“name”: “Vasya”}}\n 
> {“timestamp”: “2021-01-01T01:00:00”, person: {“name”: “Max” }}
> 
> I would like to split each message by “\n”, parse each string as a json 
> object and get some of the fields. 
> 
> AFIK there are 2 ways to do it:
> 
> 1) Write custom deserialiser and provide it in source table DDL, i.e. 
> CREATE TABLE source (
> timestamp STRING,
> person: ROW(name STRING)
> )
> WITH(‘format’ = ‘multiline-json’, …);
> 
> 2) Use ‘format’ = ‘raw’ and extract the needed fields using .jsonValue, i.e.
> 
> CREATE TABLE source (
> row STRING
> );
> 
> env.from("source")
> .joinLateral(
> call(SplitFunction.class, $("row"), "\n").as(“msg")
> )
> .select(
>  $("msg").jsonValue("$.timestamp", DataTypes.STRING()),
>  $("msg").jsonValue(“$.person.name ", 
> DataTypes.STRING()).as(“name”)
>);
> 
> In 2), will each call of .jsonValue parse the string all over again or will 
> it reuse the same JsonNode object internally? Which option better fits my 
> problem?
> 
> __
> Best, Ilya



Re: Failed to serialize the result for RPC call : requestMultipleJobDetails after Upgrading to Flink 1.14.3

2022-02-16 Thread Chesnay Schepler
There are no side-effects; it just means that certain pages of the UI / 
REST API aren't working (i.e., the overview over all jobs).


On 16/02/2022 06:15, Chirag Dewan wrote:
Ah, should have looked better. I think 
https://issues.apache.org/jira/browse/FLINK-25732 causes this.


Are there any side effects of this? How can I avoid this problem so 
that it doesn't affect my processing?


Thanks

On Wednesday, 16 February, 2022, 10:19:12 am IST, Chirag Dewan 
 wrote:



Hi,

We are running a Flink cluster with 2 JMs in HA and 2 TMs on a 
standalone K8 cluster. After migrating to 1.14.3, we started to see 
some exceptions in the JM logs:


2022-02-15 11:30:00,100 ERROR 
org.apache.flink.runtime.rest.handler.job.JobIdsHandler     [] 
POD_NAME: eric-bss-em-sm-streamserver-jobmanager-868fd68b5d-zs9pv - 
Unhandled exception.
org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Failed 
to serialize the result for RPC call : requestMultipleJobDetails.
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:417) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$sendAsyncResponse$2(AkkaRpcActor.java:373) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
        at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) 
~[?:1.8.0_321]
        at 
java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:848) 
~[?:1.8.0_321]
        at 
java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2168) 
~[?:1.8.0_321]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.sendAsyncResponse(AkkaRpcActor.java:365) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:332) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
        at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
        at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
        at 
scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
        at 
scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
        at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
        at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
        at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
        at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
        at akka.actor.Actor.aroundReceive(Actor.scala:537) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
        at akka.actor.Actor.aroundReceive$(Actor.scala:535) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
        at 
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) 
~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) 
[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
        at akka.actor.ActorCell.invoke(ActorCell.scala:548) 
[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) 
[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
        at akka.dispatch.Mailbox.run(Mailbox.scala:231) 
[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:243) 
[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
        at 
java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 
[?:1.8.0_321]
        at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067) 
[?:1.8.0_321]
        at 
java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703) 
[?:1.8.0_321]
        at 
java.util.concurrent.ForkJ

Re: Flink 1.12.8 release

2022-02-16 Thread Martijn Visser
Hi Joey,

Since the Flink community only supports the latest and previous minor
release [1] (currently Flink 1.14 and 1.13), I'm not expecting another
release of Flink 1.12.

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82

[1] https://flink.apache.org/downloads.html#update-policy-for-old-releases


On Wed, 16 Feb 2022 at 08:54, Joey L  wrote:

> Hi,
>
> Is there a planned release date for 1.12.8 or scheduled release cycle for
> minor versions?
>
> Regards,
> J
>


Python Function for Datastream Transformation in Flink Java Job

2022-02-16 Thread Jesry Pandawa
Hello,

Currently, Flink already supports adding Python UDF and using that on Flink
Java job. It can be used on Table API. Can we do the same for creating
custom python function for Datastream transformation and use that on Flink
Java job?

Regards,

Jesry


getting "original" ingestion timestamp after using a TimestampAssigner

2022-02-16 Thread Frank Dekervel

Hello,

I'm getting messages from a kafka stream. The messages are JSON records 
with a "timestamp" key in the json. This timestamp key contains the time 
at which the message was generated. Now i'd like if these messages had a 
delivery delay (eg delay between message generation and arrival in 
kafka). So i don't want to have the "full" delay (eg difference between 
generation time and processing time), just de delivery delay.


In my timestamp assigner i get a "long" with the original timestamp as 
an argument, but i cannot yield an updated record from the timestamp 
assigner (eg with an extra field "deliveryDelay" or so).


So i guess my only option is to not specify the timestamp/watermark 
extractor in the env.fromSource, then first mapping the stream to add a 
lateness field and only after that reassign timestamps/watermarks ... is 
that right ?


Thanks!

Greetings,
Frank





Re: Re: [External] : Re: Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?

2022-02-16 Thread Yun Gao
Hi Fuyao,

Very sorry for the late reply. 

For the question 1, I think it would not cause data corruption: in Flink the 
checkpoint is achived via 
inserting barriers into the stream of normal records, and the snapshot is taken 
in the same thread with 
the record processing. Thus the snapshot of the operators would always at the 
boundary of the records.

For the question 3, if the outside system does not support transaction, there 
are might two other ways to 
implement the exactly-once semantics:

1. If the record always has a key and the external systems support 
deduplication, then it might be possible to 
use AT_LEAST_ONCE sinks and let the external system to deduplicate the records. 
2. Another possible method to reduce the requirements on the external systems 
is to use WAL sinks: the record might
be first written into some external systems (like file system) as a kind of 
logs. Once a checkpoint succeed, we could
then write the records before this checkpoint into the external systems. It 
needs note that writting these records into the
external systems must also be retriable: the Flink jobs might still fail during 
writting and after restarted, the writting should
restarted exactly from the next record. This required the external system have 
some method to query the offset of the currently
written records.

For AT_LEASE_ONCE sink RichSinkFunction should also works, but if possible we 
still recommend to use the new sink API~

Best,
Yun



 --Original Mail --
Sender:Fuyao Li 
Send Date:Tue Feb 15 08:26:32 2022
Recipients:Yun Gao , user 
Subject:Re: [External] : Re: Use TwoPhaseCommitSinkFunction or 
StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?

Hi Yun,

Please ignore my question 2. I think the Sink part is the decisive factor to 
ensure end to end exactly once.

If I want to implement a AT LEAST ONCE sink, which interface should I 
implement? MaybeRichSinkFunction should be enough? Any suggestions on this?

For question 3, maybe I can add some deduplication code at consumer side to 
uptake the AT_LEASE_ONCE sink produced messages. If OSS doesn’t support exactly 
once semantic, it seems impossible for me to handle it at Flink code side.

Thanks,
Fuyao

From:Fuyao Li 
Date: Thursday, February 10, 2022 at 15:48
To: Yun Gao , user 
Subject: Re: [External] : Re: Use TwoPhaseCommitSinkFunction or 
StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?
Hello Yun,

Thanks for the quick response. This is really helpful.

I have confirmed with Oracle Streaming Service (OSS) that they currently don’t 
support EXACTLY_ONCE semantic, only AT_LEAST_ONCE semantic works. They suggest 
to add some deduplicate mechanisms at Sink to mitigate the issue. 

Question 1:
So the scenario should looks like this:
When the flink application restarts after it fails, it will start from this 
checkpoint offset. The messages has been processed after the checkpoint before 
the failure will be processed twice here after the restart. Is there any chance 
of data corruption here, for example, breaking the window and sending out 
incomplete records? I am using some session windows based on DataStream event 
time timers.

Question 2:
For the KafkaSource, I noticed that we don’t have a place to configure the 
semantic? Maybe enabling the checkpoint with EXACTLY_ONCE should guarantee the 
source’s exactly once semantic here? Please correct me if I am wrong here.

Question 3:
To guarantee the end-to-end exactly once, I think we must make sure the sink is 
exactly once, right? Since OSS has such limitation,is it possible to achieve 
effective EXACTLY_ONCE semantic through additional logic at Flink side since I 
can’t do too much on OSS side? Or it is technically impossible?
If possible, I think I should implement theSink you mentioned.

Thank you very much for the help!
Fuyao


From:Yun Gao 
Date: Wednesday, February 9, 2022 at 23:17
To: Fuyao Li , user 
Subject: [External] : Re: Use TwoPhaseCommitSinkFunction or 
StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?
Hi Fuyao,
 
Logically if a system want to support end-to-end exactly once,
it should support transactions:
1. The transactions hold the records, it get pre-committed on snapshot state
and get committed on checkpont succeed.
2. The transaction should still be able to be aborted after pre-committed.
3. Once pre-committed, the transactions must be able to be committed, even if
the flink jobs fails between pre-committed and committed, after the job 
restarted
these transaction should be able to be committed again. 
 
If the external system meet such conditions, to implement an exactly-once sink, 
the option b) should be more recommend. However, these interface is newly added
in the upcoming 1.15 and it might need to be wait for about 1.5 month before 
releasing.
 
An early version for option b is the org.apache.flink.api.connector.sink.Sink. 
It is much 
similar to the option b) and are supported since 1

Re: Flink 1.12.8 release

2022-02-16 Thread Joey L
Hey Martin,

Thanks for the response. That's unfortunate, I assumed there would be a
1.12.8 release since there are many Flink issues in JIRA marked with `Fix
Versions: 1.12.8` and can see that there are many unreleased commits in the
release-1.12 branch.

Any chance that they would be released at all?

Regards,
J

On Wed, 16 Feb 2022 at 19:39, Martijn Visser  wrote:

> Hi Joey,
>
> Since the Flink community only supports the latest and previous minor
> release [1] (currently Flink 1.14 and 1.13), I'm not expecting another
> release of Flink 1.12.
>
> Best regards,
>
> Martijn Visser
> https://twitter.com/MartijnVisser82
>
> [1] https://flink.apache.org/downloads.html#update-policy-for-old-releases
>
>
> On Wed, 16 Feb 2022 at 08:54, Joey L  wrote:
>
>> Hi,
>>
>> Is there a planned release date for 1.12.8 or scheduled release cycle for
>> minor versions?
>>
>> Regards,
>> J
>>
>


Re: Log4j2 configuration

2022-02-16 Thread Chesnay Schepler
hmmyes then it is indeed weird that it can't find the logger, but 
their error messages are notorious for being misleading in my experience.
Can you set the log4j2.debug system property (to any value, even an 
empty string) and try again?

If that doesn't reveal anything I would try the following:
* run a job without bundling any logging classes
    * if that works, add one jar at a time to the fat jar and see if it 
continues to work
* check what happens if the logging classes are added to lib/ instead of 
being bundled in the fat jar


You can also test all of this with a standalone cluster and a dummy job 
if that makes things easier; I don't think there is anything 
Kubernetes-specific going on.


On 15/02/2022 18:18, jonas eyob wrote:

1. Ok, thanks!
2. We are using application mode. No changes to the distribution other 
than updating the log4j-console.properties file.


content of /lib/:

* flink-csv-1.14.3.jar
* flink-json-1.14.3.jar
* flink-table_2.12-1.14.3.jar
* log4j-api-2.17.1.jar
* log4j-slf4j-impl-2.17.1.jar
* flink-dist_2.12-1.14.3.jar
* flink-shaded-zookeeper-3.4.14.jar
* log4j-1.2-api-2.17.1.jar
* log4j-core-2.17.1.jar

Den tis 15 feb. 2022 kl 16:30 skrev Chesnay Schepler :

1) You either need to modify the log4j-console.properties file, or
explicitly set the log4j.configurationFile property to point to
your .xml file.
2)
Have you made modifications to the distribution (e.g., removing
other logging jars from the lib directory)?
Are you using application mode, or session clusters?

On 15/02/2022 16:41, jonas eyob wrote:

Hey,

We are deploying our Flink Cluster on a standalone Kubernetes
with the longrunning job written in scala.

We recently upgraded our Flink cluster from 1.12 to 1.14.3 -
after which we started seeing a few problems related to logging
which I have been struggling to fix for the past days).
Related is also an attempt to add, we are also attempting to add
a Sentry integration for our error logs.

PROBLEM 1 - Error logs not being sent to Sentry.
We are bundling our code and dependencies into a FAT jar, which
includes a log4j2.xml specifying the Sentry Appender. But if I
understand the documentation


correctly our log4j2.xml won't be picked up by Flink as it
already defines a set of default logging configurations files
(e.g. log4j and logback).

Q: How does Flink resolve logging configurations to use?

I can see the following JVM override params provided when running
in our dockerized version locally.


-Dlog.file=/opt/flink/log/flink--taskexecutor-0-thoros-taskmanager-6b9785d4df-c28n4.log
2022-02-15 10:01:59,826 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] -
-Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
2022-02-15 10:01:59,827 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] -
-Dlog4j.configurationFile=file:/opt/flink/conf/log4j-console.properties
2022-02-15 10:01:59,830 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] -
-Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml

Content of the log4j2.xml (path: src/main/resources):
 
 

 

For our kubernetes deployment we have followed the reference
example here

https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/#common-cluster-resource-definitions.

My assumption is that I would need to also provide the
Sentry-related configuration to the "log4-console.properties" for
it to be picked up by the Taskmanager and JobManager?

PROBLEM 2:
ERROR StatusLogger Log4j2 could not find a logging implementation.
Please add log4j-core to the classpath. Using SimpleLogger to log
to the console

I am not sure what's going on here. Following dependencies are
bundled with the FAT jar
"com.typesafe.scala-logging" %%"scala-logging" % scalaLoggingVersion, "org.slf4j" %"slf4j-api" %"1.7.33", "org.apache.logging.log4j" %"log4j-slf4j-impl" %"2.17.0", 
"org.apache.logging.log4j" %"log4j-core" %"2.17.0", "org.apache.logging.log4j" %%"log4j-api-scala" %"12.0", "io.sentry" %"sentry-log4j2" %"5.6.0",
Confused about what is going on here, possible this might not be
Flink related matter but I am not sure..any tips on how to best
debug this would be much appreciated.
-- 
*Thanks,*

*Jonas*





--
*Med Vänliga Hälsningar*
/Jonas Eyob/




AWS Kinesis Flink vs K8s

2022-02-16 Thread Puneet Duggal
Hi,

Just wanted to ask the community various pros and cons of deploying flink using 
AWS Kinesis vs using K8s application mode. Currently we are deploying flink 
cluster in HA session standalone mode and planning to switch to application 
deployment mode.

Regards, 
Puneet

Implement watermark buffering with Process Function

2022-02-16 Thread Ruibin Xing
Hi,

I'm trying to implement customized state logic with KeyedProcessFunction.
But I'm not quite sure how to implement the correct watermark behavior when
late data is involved.

According to the answer on stackoverflow:
https://stackoverflow.com/questions/59468154/how-to-sort-an-out-of-order-event-time-stream-using-flink
, there should be a state buffering all events until watermark passed the
expected time and a event time trigger will fetch from the state and do the
calculation. The buffer type should be Map> where T is the
timestamp and E is the event type.

However, the interface provided by Flink currently is only a MapStae.
If the V is a List and buffered all events, every time an event comes
Flink will do ser/deser and could be very expensive when throughput is huge.

I checked the built-in window implementation which implements the watermark
buffering.  It seems that WindowOperator consists of some InternalStates,
 of which signature is where window is namespace or key, if I understand
correctly. But internal states are not available for Flink users.

So my question is: is there an efficient way to simulate watermark
buffering using process function for Flink users?

Thanks.


Re: Log4j2 configuration

2022-02-16 Thread Tamir Sagi
Hey

I encountered the same issue while upgrading from v1.12.2 to 1.14.2 few weeks 
ago.
starting from v1.13.0, deployment has been changed in K8s native mode.(detailed 
about its flow [1])

user input properties get overriden in flink-console.sh. which means, pointing 
to xml file(-Dlog4j2) gets ignored.

I created a ticket[2] and open MR with a fix[3]
unfortunately, E2E keep failing(Not sure whether it's related to change or not, 
to be honest, did not have time to check the E2E pipeline)

[1] https://lists.apache.org/thread/b24g1nd00q5pln5h9w2mh1s3ocxwb61b

[2] https://issues.apache.org/jira/browse/FLINK-25762

[3] https://github.com/apache/flink/pull/18447/files#

Hope it helps,

Tamir


From: Chesnay Schepler 
Sent: Wednesday, February 16, 2022 11:15 AM
To: jonas eyob 
Cc: user 
Subject: Re: Log4j2 configuration


EXTERNAL EMAIL


hmmyes then it is indeed weird that it can't find the logger, but their 
error messages are notorious for being misleading in my experience.
Can you set the log4j2.debug system property (to any value, even an empty 
string) and try again?
If that doesn't reveal anything I would try the following:
* run a job without bundling any logging classes
* if that works, add one jar at a time to the fat jar and see if it 
continues to work
* check what happens if the logging classes are added to lib/ instead of being 
bundled in the fat jar

You can also test all of this with a standalone cluster and a dummy job if that 
makes things easier; I don't think there is anything Kubernetes-specific going 
on.

On 15/02/2022 18:18, jonas eyob wrote:
1. Ok, thanks!
2. We are using application mode. No changes to the distribution other than 
updating the log4j-console.properties file.

content of /lib/:

* flink-csv-1.14.3.jar
* flink-json-1.14.3.jar
* flink-table_2.12-1.14.3.jar
* log4j-api-2.17.1.jar
* log4j-slf4j-impl-2.17.1.jar
* flink-dist_2.12-1.14.3.jar
* flink-shaded-zookeeper-3.4.14.jar
* log4j-1.2-api-2.17.1.jar
* log4j-core-2.17.1.jar

Den tis 15 feb. 2022 kl 16:30 skrev Chesnay Schepler 
mailto:ches...@apache.org>>:
1) You either need to modify the log4j-console.properties file, or explicitly 
set the log4j.configurationFile property to point to your .xml file.
2)
Have you made modifications to the distribution (e.g., removing other logging 
jars from the lib directory)?
Are you using application mode, or session clusters?

On 15/02/2022 16:41, jonas eyob wrote:
Hey,

We are deploying our Flink Cluster on a standalone Kubernetes with the 
longrunning job written in scala.

We recently upgraded our Flink cluster from 1.12 to 1.14.3 - after which we 
started seeing a few problems related to logging which I have been struggling 
to fix for the past days).
Related is also an attempt to add, we are also attempting to add a Sentry 
integration for our error logs.

PROBLEM 1 - Error logs not being sent to Sentry.
We are bundling our code and dependencies into a FAT jar, which includes a 
log4j2.xml specifying the Sentry Appender. But if I understand the 
documentation
 correctly our log4j2.xml won't be picked up by Flink as it already defines a 
set of default logging configurations files (e.g. log4j and logback).

Q: How does Flink resolve logging configurations to use?

I can see the following JVM override params provided when running in our 
dockerized version locally.

-Dlog.file=/opt/flink/log/flink--taskexecutor-0-thoros-taskmanager-6b9785d4df-c28n4.log
2022-02-15 10:01:59,826 INFO 
org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - 
-Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
2022-02-15 10:01:59,827 INFO 
org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - 
-Dlog4j.configurationFile=file:/opt/flink/conf/log4j-console.properties
2022-02-15 10:01:59,830 INFO 
org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - 
-Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml

Content of the log4j2.xml (path: src/main/resources):

















For our kubernetes deployment we have followed the reference example here 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/#common-cluster-resource-definitions.
My assumption is that I would need to also provide the Sentry-related 
configuration to the "log4-console.properties" for it to be picked up by the 
Taskmanager and JobManager?

PROBLEM 2:
ERROR StatusLogger Log4j2 could not find a logging implementation.
Please add log4j-core to the classpath. Using SimpleLogger to log to the console

I am not sure what's going on here. Following dependencies are bundled with the 
FAT jar

"com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingVersion,
"org.slf4j" % "slf4j-api" % "1

Re: Performance Issues in Source Operator while migrating to Flink-1.14 from 1.9

2022-02-16 Thread Arujit Pradhan
Hey Martijn,

Thanks a lot for getting back to us. To give you a little bit more context,
we do maintain an open-source project around flink dagger
 which is a wrapper for proto processing. As
part of the upgrade to the latest version, we did some refactoring and
moved to KafkaSource since the older FlinkKafkaConsumer was getting
deprecated.

So we currently do not have any set up to test the hypothesis. Also just
increasing the resources by a bit fixes it and it does happen with a small
set of jobs during high traffic.

We would love to get some input from the community as it might cause errors
in some of the jobs in production.

Thanks and regards,
//arujit

On Tue, Feb 15, 2022 at 8:48 PM Martijn Visser 
wrote:

> Hi Arujit,
>
> I'm also looping in some contributors from the connector and runtime
> perspective in this thread. Did you also test the upgrade first by only
> upgrading to Flink 1.14 and keeping the FlinkKafkaConsumer? That would
> offer a better way to determine if a regression is caused by the upgrade of
> Flink or because of the change in connector.
>
> Best regards,
>
> Martijn Visser
> https://twitter.com/MartijnVisser82
>
>
> On Tue, 15 Feb 2022 at 13:07, Arujit Pradhan 
> wrote:
>
>> Hey team,
>>
>> We are migrating our Flink codebase and a bunch of jobs from Flink-1.9 to
>> Flink-1.14. To ensure uniformity in performance we ran a bunch of jobs for
>> a week both in 1.9 and 1.14 simultaneously with the same resources and
>> configurations and monitored them.
>>
>> Though most of the jobs are running fine, we have significant performance
>> degradation in some of the high throughput jobs during peak hours. As a
>> result, we can see high lag and data drops while processing messages from
>> Kafka in some of the jobs in 1.14 while in 1.9 they are working just fine.
>> Now we are debugging and trying to understand the potential reason for it.
>>
>> One of the hypotheses that we can think of is the change in the sequence
>> of processing in the source-operator. To explain this, adding screenshots
>> for the problematic tasks below.
>> The first one is for 1.14 and the second is for 1.9. Upon inspection, it
>> can be seen the sequence of processing 1.14 is -
>>
>> data_streams_0 -> Timestamps/Watermarks -> Filter -> Select.
>>
>> While in 1.9 it was,
>>
>> data_streams_0 -> Filter -> Timestamps/Watermarks -> Select.
>>
>> In 1.14 we are using KafkaSource API while in the older version it was
>> FlinkKafkaConsumer API. Wanted to understand if it can cause potential
>> performance decline as all other configurations/resources for both of the
>> jobs are identical and if so then how to avoid it. Also, we can not see any
>> unusual behaviour for the CPU/Memory while monitoring the affected jobs.
>>
>> Source Operator in 1.14 :
>> [image: image.png]
>> Source Operator in 1.9 :
>> [image: image.png]
>> Thanks in advance,
>> //arujit
>>
>>
>>
>>
>>
>>
>>


SQL / Table Api lag() over partition by ... and windowing

2022-02-16 Thread HG
Hello all

I need to calculate the difference in time between ordered rows per
transactionId. All events should arrive within the timeframe set by the
out-of-orderness ( a couple of minutes). Events outside this time should be
ignored.

In SQL this would be :
select transactionId  , handlingTime , *handlingTime - lag(handlingTime,1)
over (partition by transactionId order by handlingTime) as elapsedTime*
from table

When I code :
Table result = tableEnv.sqlQuery("select transactionId, handlingTime,
*handlingTime
- if(null(lag(handlingTime) over (partition by transactionId order by
handlingTime),handlingTime) as elapsedTime* from tupled3DsTableVw")

*Will this obey the watermark strategy of the original Datastream? (see
further below)*
I have also tried to use the Table Api with a session window like :
Table t = tupled3DsTable
   .window(Session.withGap(lit(2).minutes()).on($("handlingTime")).as("w"))
.groupBy($("transactionId"), $("w"))
   .select($("handlingTime"), $("transactionId"), $("originalEvent"), $(
"handlingTime").max().over($("w")));
This gives:
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Could not resolve over call.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)

and also :
Table t = tupled3DsTable
.window(Over.partitionby($("transactionId")).orderBy($(
"handlingTime")).as("w")).select($("handlingTime"), $("transactionId"), $(
"originalEvent"), $("handlingTime").lag().as("previousHandlingTime"));
Which does not work since it cannot find lag function :-(

In java I have the following setup:
WatermarkStrategy> wmstrategy =
WatermarkStrategy
.>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
.withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
.withTimestampAssigner(new SerializableTimestampAssigner>() {
@Override
public long extractTimestamp(Tuple3
element, long handlingTime) {
return element.f0;
 }});

DataStream> tuple3dswm =
tuple3ds.assignTimestampsAndWatermarks(wmstrategy);

Table tupled3DsTable = tableEnv.fromDataStream(tuple3ds,
Schema.newBuilder().column("f0","TIMESTAMP_LTZ(3)").column("f1","STRING").column("f2","STRING").watermark("f0",
"SOURCE_WATERMARK()")
.build()).as("handlingTime", "transactionId", "originalEvent");


Re: Performance Issues in Source Operator while migrating to Flink-1.14 from 1.9

2022-02-16 Thread Piotr Nowojski
Hi,

Unfortunately the new KafkaSource was contributed without good benchmarks,
and so far you are the first one that noticed and reported this issue.
Without more direct comparison (as Martijn suggested), it's hard for us to
help right away. It would be a tremendous help for us if you could for
example provide us steps to reproduce this exact issue? Another thing that
you could do, is to attach some code profiler to both Flink 1.9 and 1.14
versions and compare the results of source task threads from both (Flink
task threads are named after the task name, so they are easy to
distinguish).

Also have you observed some degradation in metrics reported by Flink? Like
the records processing rate between those two versions?

Best,
Piotrek

śr., 16 lut 2022 o 13:24 Arujit Pradhan 
napisał(a):

> Hey Martijn,
>
> Thanks a lot for getting back to us. To give you a little bit more
> context, we do maintain an open-source project around flink dagger
>  which is a wrapper for proto processing.
> As part of the upgrade to the latest version, we did some refactoring and
> moved to KafkaSource since the older FlinkKafkaConsumer was getting
> deprecated.
>
> So we currently do not have any set up to test the hypothesis. Also just
> increasing the resources by a bit fixes it and it does happen with a small
> set of jobs during high traffic.
>
> We would love to get some input from the community as it might cause
> errors in some of the jobs in production.
>
> Thanks and regards,
> //arujit
>
> On Tue, Feb 15, 2022 at 8:48 PM Martijn Visser 
> wrote:
>
>> Hi Arujit,
>>
>> I'm also looping in some contributors from the connector and runtime
>> perspective in this thread. Did you also test the upgrade first by only
>> upgrading to Flink 1.14 and keeping the FlinkKafkaConsumer? That would
>> offer a better way to determine if a regression is caused by the upgrade of
>> Flink or because of the change in connector.
>>
>> Best regards,
>>
>> Martijn Visser
>> https://twitter.com/MartijnVisser82
>>
>>
>> On Tue, 15 Feb 2022 at 13:07, Arujit Pradhan 
>> wrote:
>>
>>> Hey team,
>>>
>>> We are migrating our Flink codebase and a bunch of jobs from Flink-1.9
>>> to Flink-1.14. To ensure uniformity in performance we ran a bunch of jobs
>>> for a week both in 1.9 and 1.14 simultaneously with the same resources and
>>> configurations and monitored them.
>>>
>>> Though most of the jobs are running fine, we have significant
>>> performance degradation in some of the high throughput jobs during peak
>>> hours. As a result, we can see high lag and data drops while processing
>>> messages from Kafka in some of the jobs in 1.14 while in 1.9 they are
>>> working just fine.
>>> Now we are debugging and trying to understand the potential reason for
>>> it.
>>>
>>> One of the hypotheses that we can think of is the change in the sequence
>>> of processing in the source-operator. To explain this, adding screenshots
>>> for the problematic tasks below.
>>> The first one is for 1.14 and the second is for 1.9. Upon inspection, it
>>> can be seen the sequence of processing 1.14 is -
>>>
>>> data_streams_0 -> Timestamps/Watermarks -> Filter -> Select.
>>>
>>> While in 1.9 it was,
>>>
>>> data_streams_0 -> Filter -> Timestamps/Watermarks -> Select.
>>>
>>> In 1.14 we are using KafkaSource API while in the older version it was
>>> FlinkKafkaConsumer API. Wanted to understand if it can cause potential
>>> performance decline as all other configurations/resources for both of the
>>> jobs are identical and if so then how to avoid it. Also, we can not see any
>>> unusual behaviour for the CPU/Memory while monitoring the affected jobs.
>>>
>>> Source Operator in 1.14 :
>>> [image: image.png]
>>> Source Operator in 1.9 :
>>> [image: image.png]
>>> Thanks in advance,
>>> //arujit
>>>
>>>
>>>
>>>
>>>
>>>
>>>


Re: Change column names Pyflink Table/Datastream API

2022-02-16 Thread Dian Fu
Hi Francis,

There should be multiple ways to achieve this. Do you mean that all these
methods don't work for you? If so, could you show the sample code? Besides,
another way you may try is `inputmetrics.alias("timestamp, device, name,
value")`.

Regards,
Dian

On Wed, Feb 16, 2022 at 8:14 AM Francis Conroy 
wrote:

> Hi all,
>
> I'm hoping to be able to change the column names when creating a table
> from a datastream, the flatmap function generating the stream is returning
> a Tuple4.
>
> It's currently working as follows:
>
> inputmetrics = table_env.from_data_stream(ds, Schema.new_builder()
>   .column("f0", "BIGINT")
>   .column("f1", "STRING")
>   .column("f2", "STRING")
>   .column("f3", "DOUBLE")
>   .build())
>
> I'm trying to rename the columns f0, f1, f2, f3 to proper names e.g.
> timestamp, device, name, value. So far I've tried using from_fields, and
>
> column_by_expression("timestamp", "f0")
>
> I'd prefer not to change the output type of my previous flatMapFunction
> (to say a named Row) for performance purposes.
>
> Thanks,
> Francis
>
> This email and any attachments are proprietary and confidential and are
> intended solely for the use of the individual to whom it is addressed. Any
> views or opinions expressed are solely those of the author and do not
> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
> received this email in error, please let us know immediately by reply email
> and delete it from your system. You may not use, disseminate, distribute or
> copy this message nor disclose its contents to anyone.
> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
> Australia
>


Re: [statefun] Looking for a polyglot example

2022-02-16 Thread Igal Shilman
Hello,
You can take a look at the flink stateful functions playground[1] where you
can find  a handful of examples, in all the supported
SDKs, in addition for each language you will find a walkthrough that shows
how to use the individual SDK features.
Furthermore take a look at the documentations [2][3].

Generally, remote functions from different languages can be mixed, you can
deploy a Python function and a Java function
that can send messages to each other using the built-in type system[4].

[1] https://github.com/apache/flink-statefun-playground/tree/release-3.2
[2]
https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/http-endpoint/
[3]
https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/deployment/overview/
[4]
https://github.com/apache/flink-statefun-playground/blob/release-3.2/java/showcase/src/main/java/org/apache/flink/statefun/playground/java/showcase/part1/types/TypeSystemShowcaseFn.java


Good luck,
Igal.

On Tue, Feb 15, 2022 at 3:32 AM casel.chen  wrote:

> Hello,
> I am looking for polyglot example of stateful functions and learn how to
> program functions with different language then deploy them together as a
> unit of event driven application.
> Examples like Fraud Detection which use python functions in ML while use
> java funtions to process data etc. Thanks!
>
>
>
>


Re: Statefun with no Protobuf ingress and egress

2022-02-16 Thread Igal Shilman
Hello,
I've noticed that you've linked to a very old release of stateful function
(2.0) where statefun 3.2 which is the latest, added support for exactly
that.
You are no longer required to use Protobuf, and you can simply send strings
and even JSON.
Checkout the following repository for some examples[1]

[1]
https://github.com/apache/flink-statefun-playground/blob/release-3.2/python/greeter/functions.py#L26

Good luck,
Igal

On Mon, Feb 14, 2022 at 5:07 PM mrAlexTFB  wrote:

> Hi,
>
> I have a very simple schema where one python statefun application reads
> from a kafka topic and writes in another kafka topic, those topics are
> produced and consumed with another python script as it is done in the Python
> Flink Walkthrough
> ,
> Is there a way to read and write in those topics a plain string (as a JSON)
> and not to use Protobuf?
>
> More concrete:
> I'm trying to use statefun as a solution for not finding some libraries in
> JAVA or SCALA that exists in python, then, I'm trying to combine embedded
> Flink applications with statefun applications using with docker a master
> and worker with the embedded applications with JobManager and TaskManager,
> All the embedded applications communicate using JSON, now that I want to
> use an statefun Application in between, is there a way to communicate using
> JSON and not protobuf?
>
> Thanks in advance.
>


Re: Problem with kafka with key=None using pyhton-kafka module

2022-02-16 Thread Igal Shilman
Hello,
The default kafka ingress for remote functions, does require a key
component. The key is being translated to the 'id' part of the
receiving function address.
If your functions are stateless, or the id doesn't have a meaning for you,
you can simply provide a random id.

I hope that helps,
Igal.

On Thu, Feb 10, 2022 at 3:08 PM mrAlexTFB  wrote:

> Hello,
>
> I am following the example in Python Walkthrough
> ,
> I downloaded the zip file with the project skeleton. I'm having a problem
> when changing the key attribute in the function producer.send to none.
> From:
>
> def produce():
> if len(sys.argv) == 2:
> delay_seconds = int(sys.argv[1])
> else:
> delay_seconds = 1
> producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER])
> for request in random_requests():
> key = request.name.encode('utf-8')
> val = request.SerializeToString()
> producer.send(topic='names', key=key, value=val)
> producer.flush()
> time.sleep(delay_seconds)
>
> To:
>
> def produce():
> if len(sys.argv) == 2:
> delay_seconds = int(sys.argv[1])
> else:
> delay_seconds = 1
> producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER])
> for request in random_requests():
> key = request.name.encode('utf-8')
> val = request.SerializeToString()
> producer.send(topic='names', key=None, value=val)
> producer.flush()
> time.sleep(delay_seconds)
>
> After doing this the consumer is not displaying anything.
>
> I modified python code so the message arrived is printed and it is not
> being printed here, I suppose that the problem could be a bad configuration
> in module.yaml file?
>
> I understand that by putting key=None the topic partition will be chosen
> randomly, that was the behaviour that I was aiming for as I do not need any
>  ordering in the messages.
>
> Do I need any additional configuration in this walkthrough to achieve this?
>
> Thank you very much in advance.
>
>
>


Re: Exception Help

2022-02-16 Thread Francesco Guardiani
Hi,

>From what I understand, you're creating a scalar function taking a string
with json and then converting it to a map using a custom function.

Assuming I understood correctly, I think the problem here is that you're
using internal data types for UDFs, which is discouraged in most of the use
cases. Rather than using StringData, MapData, ArrayData etc you should just
use Java's String, Map and arrays. Check out this particular paragraph of
our docs that shows using complex types for scalar functions:
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#type-inference.
Please try to convert
Looking only at the exception you provide here, it definitely seems like a
wrong usage of the internal data types, like that Tuple2 inserted into a
GenericMapData. There are no checks in GenericMapData to check that you're
constructing it with the correct types, and since Tuple2 is not a correct
type, the serializer just fails hard.

Please correct me if I misunderstood what you're doing, and in case provide
more info about what your goal and how you've implemented the job.

FG

On Wed, Feb 16, 2022 at 4:02 AM Jonathan Weaver 
wrote:

> I've narrowed it down to a TableSource that is returning a MAP type as a
> column. Only errors when the column is referenced, and not on the first
> row, but somewhere in the stream of rows.
>
> On 1.15 master branch (I need the new JSON features in 1.15 for this
> project so riding the daily snapshot during development)
>
> In catalog column is defined as
> .column("vc", DataTypes.MAP(DataTypes.STRING(),
> DataTypes.ARRAY(DataTypes.STRING(
>
> My TableFunction is returning the following for the column
>
>   return new GenericMapData(
>   fields.toJavaMap(
>   v ->
>   new Tuple2(
>   StringData.fromString(v.getKey()),
>   new GenericArrayData(
>   v.getValue().isArray()
>   ? List.ofAll(() -> ((ArrayNode)
> v.getValue()).elements())
>   .map(vv ->
> StringData.fromString(vv.asText()))
>
> .toJavaArray(StringData[]::new)
>   :
> List.of(StringData.fromString(v.getValue().asText()))
>
> .toJavaArray(StringData[]::new);
> });
>
> Where it's basically looping over a jackson JsonNode parsed from a DB
> table and returning as a MAP (the keys and values are sparse amongst
> hundreds of possibilities). The values in the Json are either a single text
> value, or an array of text values so I'm just turning all values into an
> array.
>
> There are around ~190 key-values in the map on average.
>
> The SQL that references the column is just
>
> COALESCE(ELEMENT(vc [ 'ARCHIVE_TASKING' ]), product_type) type,
>
> So looks up a specific key and uses it if it exists, otherwise coalesces
> to a generic string.
>
> And I keep getting this exception during the processing on a random row.
>
> Caused by: java.lang.IndexOutOfBoundsException: offset=0, targetOffset=24,
> numBytes=8, address=16, targetAddress=16
> at
> org.apache.flink.core.memory.MemorySegment.copyTo(MemorySegment.java:1441)
> at
> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeSegmentsToVarLenPart(AbstractBinaryWriter.java:249)
> at
> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeString(AbstractBinaryWriter.java:93)
> at
> org.apache.flink.table.data.writer.BinaryArrayWriter.writeString(BinaryArrayWriter.java:30)
> at
> org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:140)
> at
> org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.toBinaryArray(ArrayDataSerializer.java:210)
> at
> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeArray(AbstractBinaryWriter.java:109)
> at
> org.apache.flink.table.data.writer.BinaryArrayWriter.writeArray(BinaryArrayWriter.java:30)
> at
> org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:147)
> at
> org.apache.flink.table.runtime.typeutils.MapDataSerializer.toBinaryMap(MapDataSerializer.java:175)
> at
> org.apache.flink.table.runtime.typeutils.MapDataSerializer.copy(MapDataSerializer.java:109)
> at TableCalcMapFunction$130.flatMap_split26(Unknown Source)
> at TableCalcMapFunction$130.flatMap(Unknown Source)
> at
> org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$TemporalTableCalcResultFuture.complete(AsyncLookupJoinWithCalcRunner.java:119)
>
> Is that enough context or is there something else I can give you all?
>
> Thanks!
>
>
>
>
> On Tue, Feb 15, 2022 at 1:24 PM Sid Kal  wrote:
>
>> Hi Jonathan,
>>
>> It would be better if you describe your scenario along with the code. It
>> would be easier for the community to help.
>>
>> On Tue, 15 Feb 2022, 23:33 Jonathan Weaver, 
>> wrote:
>>
>>> I'm getting the following exception 

Re: Exception Help

2022-02-16 Thread Jonathan Weaver
No, I'm creating a custom SQL lookup table (which uses
AsyncTableFunction) which requires the internal types.

I implement
the LookupTableSource, AsyncTableFunction, DynamicTableSourceFactory
trio as per the examples in the docs.

My construction is the equivalent of this, and it still errors with that
exception when using exactly this.

  Map foo = new HashMap();
  foo.put(
  StringData.fromString("foo"),
  new GenericArrayData(new Object[]
{StringData.fromString("bar")}));
  MapData mapColumn = new GenericMapData(foo);

  return (RowData)GenericRowData(new Object[] { mapColumn }
);




On Wed, Feb 16, 2022 at 8:02 AM Francesco Guardiani 
wrote:

> Hi,
>
> From what I understand, you're creating a scalar function taking a string
> with json and then converting it to a map using a custom function.
>
> Assuming I understood correctly, I think the problem here is that you're
> using internal data types for UDFs, which is discouraged in most of the use
> cases. Rather than using StringData, MapData, ArrayData etc you should just
> use Java's String, Map and arrays. Check out this particular paragraph of
> our docs that shows using complex types for scalar functions:
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#type-inference.
> Please try to convert
> Looking only at the exception you provide here, it definitely seems like a
> wrong usage of the internal data types, like that Tuple2 inserted into a
> GenericMapData. There are no checks in GenericMapData to check that you're
> constructing it with the correct types, and since Tuple2 is not a correct
> type, the serializer just fails hard.
>
> Please correct me if I misunderstood what you're doing, and in case
> provide more info about what your goal and how you've implemented the job.
>
> FG
>
> On Wed, Feb 16, 2022 at 4:02 AM Jonathan Weaver 
> wrote:
>
>> I've narrowed it down to a TableSource that is returning a MAP type as a
>> column. Only errors when the column is referenced, and not on the first
>> row, but somewhere in the stream of rows.
>>
>> On 1.15 master branch (I need the new JSON features in 1.15 for this
>> project so riding the daily snapshot during development)
>>
>> In catalog column is defined as
>> .column("vc", DataTypes.MAP(DataTypes.STRING(),
>> DataTypes.ARRAY(DataTypes.STRING(
>>
>> My TableFunction is returning the following for the column
>>
>>   return new GenericMapData(
>>   fields.toJavaMap(
>>   v ->
>>   new Tuple2(
>>   StringData.fromString(v.getKey()),
>>   new GenericArrayData(
>>   v.getValue().isArray()
>>   ? List.ofAll(() -> ((ArrayNode)
>> v.getValue()).elements())
>>   .map(vv ->
>> StringData.fromString(vv.asText()))
>>
>> .toJavaArray(StringData[]::new)
>>   :
>> List.of(StringData.fromString(v.getValue().asText()))
>>
>> .toJavaArray(StringData[]::new);
>> });
>>
>> Where it's basically looping over a jackson JsonNode parsed from a DB
>> table and returning as a MAP (the keys and values are sparse amongst
>> hundreds of possibilities). The values in the Json are either a single text
>> value, or an array of text values so I'm just turning all values into an
>> array.
>>
>> There are around ~190 key-values in the map on average.
>>
>> The SQL that references the column is just
>>
>> COALESCE(ELEMENT(vc [ 'ARCHIVE_TASKING' ]), product_type) type,
>>
>> So looks up a specific key and uses it if it exists, otherwise coalesces
>> to a generic string.
>>
>> And I keep getting this exception during the processing on a random row.
>>
>> Caused by: java.lang.IndexOutOfBoundsException: offset=0,
>> targetOffset=24, numBytes=8, address=16, targetAddress=16
>> at
>> org.apache.flink.core.memory.MemorySegment.copyTo(MemorySegment.java:1441)
>> at
>> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeSegmentsToVarLenPart(AbstractBinaryWriter.java:249)
>> at
>> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeString(AbstractBinaryWriter.java:93)
>> at
>> org.apache.flink.table.data.writer.BinaryArrayWriter.writeString(BinaryArrayWriter.java:30)
>> at
>> org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:140)
>> at
>> org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.toBinaryArray(ArrayDataSerializer.java:210)
>> at
>> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeArray(AbstractBinaryWriter.java:109)
>> at
>> org.apache.flink.table.data.writer.BinaryArrayWriter.writeArray(BinaryArrayWriter.java:30)
>> at
>> org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:147)
>> at
>

Re: Joining Flink tables with different watermark delay

2022-02-16 Thread Francesco Guardiani
> We plan to use this parquet source to create a Hybrid Source later.
Hence, we had to use a File Source.

FYI there is an open issue for this:
https://issues.apache.org/jira/browse/FLINK-22793, but for the other points
it makes sense to create the data stream directly, as it circumvents the
intrinsic limitations of CREATE TABLE.

> Does it mean that the output of the join will be flushed to the sink at
the period defined by the minimum watermark ? That is, 60 minutes in the
above case ?
> Also, I read here

that
Flink will remove old data from its state in case of interval joins. Does
this mean that data present in both the tables will be removed after the
minimum watermark delay ( 60 minutes in this case) ?

The output "interval" of the aforementioned query is still governed by the
last windowing operation. But when the query begins, you might need to wait
long as the first watermark from the second stream is going to take time to
be generated.

> Also, I read here

that
Flink will remove old data from its state in case of interval joins. Does
this mean that data present in both the tables will be removed after the
minimum watermark delay ( 60 minutes in this case) ?

The interval join will cleanup state that it doesn't need anymore, looking
at its "internal clock", which value is always the minimum of the two last
watermarks received by the input streams. So once this clock moves at time
10, every cached event (from any of the two input streams) with event time
< 10 will be cleared from the operator state.

Also, please note that when we talk about time in this specific query we're
talking about event time and not the natural flow of the time in the "real
world" (which in Flink is called process time).

FG


On Wed, Feb 16, 2022 at 8:08 AM Meghajit Mazumdar <
meghajit.mazum...@gojek.com> wrote:

> Hi Francesco,
>
> Thank you so much for your reply. This was really helpful. In reply to
> your tips:
>
> *> As described here
> ,
> we have deprecated the syntax `GROUP BY WINDOW`, you should use windowing
> TVF instead
> *
> Yes, we are trying to move towards windowing TVFs as well. Some of our
> existing jobs still use Group Window Aggregation and hence we are still
> using it.
>
> *> You can directly use Window joins
> 
>  as
> well for your query, as they're meant exactly to cover your use case*
> Thanks. Looks like it is used along with Windowing TVFs though. But I will
> try to explore this.
>
> *> Any particular reason you're creating the input tables from DataStream
> instead than creating them directly from Table API using either CREATE
> TABLE
> 
>  or
> TableDescriptor?*
> We are creating a File Source which can read parquet files from a remote
> GCS(Google Cloud Storage) bucket. We had evaluated this
> 
>  to
> create a table but we faced the following challenges :
>
>- We plan to use this parquet source to create a Hybrid Source later.
>Hence, we had to use a File Source.
>- A call to GCS returns files in lexicographic order. We wanted a high
>level deterministic order in which files are picked for reading and hence
>we resorted to using a File Source with a custom Split Assigner to assign
>the files to the source readers in some order.
>- Creating the table
>
> 

Re: AWS Kinesis Flink vs K8s

2022-02-16 Thread Danny Cranmer
+Jeremy who can help answer this question.

Thanks,

On Wed, Feb 16, 2022 at 10:26 AM Puneet Duggal 
wrote:

> Hi,
>
> Just wanted to ask the community various pros and cons of deploying flink
> using AWS Kinesis vs using K8s application mode. Currently we are deploying
> flink cluster in HA session standalone mode and planning to switch to
> application deployment mode.
>
> Regards,
> Puneet


Re: Job manager slots are in bad state.

2022-02-16 Thread Piotr Nowojski
Hi Josson,

Would you be able to reproduce this issue on a more recent version of
Flink? I'm afraid that we won't be able to help with this issue as this
affects a Flink version that is not supported for quite some time and
moreover `SlotSharingManager` has been completed removed in Flink 1.13.

Can you upgrade to a more recent Flink version and try it out? I would
assume the bug should be gone in 1.13.x or 1.14.x branches. If not, you can
also try out Flink 1.11.4, as maybe it has fixed this issue as well.

Best,
Piotrek

śr., 16 lut 2022 o 08:16 Josson Paul  napisał(a):

> We are using Flink version 1.11.2.
> At times if task managers are restarted for some reason, the job managers
> throw the exception that I attached here. It is an illegal state exception.
> We never had this issue with Flink 1.8. It started happening after
> upgrading to Flink 1.11.2.
>
> Why are the slots not released if it is in a bad state?. The issue doesn't
> get resolved even if I restart all the task managers. It will get resolved
> only if I restart Job manager.
>
> java.util.concurrent.CompletionException: java.util.concurrent.
> CompletionException: java.lang.IllegalStateException
> at org.apache.flink.runtime.jobmaster.slotpool.
> SlotSharingManager$MultiTaskSlot.lambda$new$0(SlotSharingManager.java:433)
> at java.base/java.util.concurrent.CompletableFuture.uniHandle(
> CompletableFuture.java:930)
> at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(
> CompletableFuture.java:907)
> at java.base/java.util.concurrent.CompletableFuture.postComplete(
> CompletableFuture.java:506)
> at java.base/java.util.concurrent.CompletableFuture
> .completeExceptionally(CompletableFuture.java:2088)
> at org.apache.flink.runtime.concurrent.FutureUtils
> .lambda$forwardTo$21(FutureUtils.java:1132)
> at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(
> CompletableFuture.java:859)
> at java.base/java.util.concurrent.CompletableFuture
> .uniWhenCompleteStage(CompletableFuture.java:883)
> at java.base/java.util.concurrent.CompletableFuture.whenComplete(
> CompletableFuture.java:2251)
> at org.apache.flink.runtime.concurrent.FutureUtils.forward(FutureUtils
> .java:1100)
> at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager
> .createRootSlot(SlotSharingManager.java:155)
> at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
> .allocateMultiTaskSlot(SchedulerImpl.java:477)
> at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
> .allocateSharedSlot(SchedulerImpl.java:311)
> at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
> .internalAllocateSlot(SchedulerImpl.java:160)
> at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
> .allocateSlotInternal(SchedulerImpl.java:143)
> at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
> .allocateSlot(SchedulerImpl.java:113)
> at org.apache.flink.runtime.executiongraph.
> SlotProviderStrategy$NormalSlotProviderStrategy.allocateSlot(
> SlotProviderStrategy.java:115)
> at org.apache.flink.runtime.scheduler.DefaultExecutionSlotAllocator
> .lambda$allocateSlotsFor$0(DefaultExecutionSlotAllocator.java:104)
> at java.base/java.util.concurrent.CompletableFuture.uniComposeStage(
> CompletableFuture.java:1106)
> at java.base/java.util.concurrent.CompletableFuture.thenCompose(
> CompletableFuture.java:2235)
> at org.apache.flink.runtime.scheduler.DefaultExecutionSlotAllocator
> .allocateSlotsFor(DefaultExecutionSlotAllocator.java:102)
> at org.apache.flink.runtime.scheduler.DefaultScheduler.allocateSlots(
> DefaultScheduler.java:339)
> at org.apache.flink.runtime.scheduler.DefaultScheduler
> .allocateSlotsAndDeploy(DefaultScheduler.java:312)
> at org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy
> .allocateSlotsAndDeploy(EagerSchedulingStrategy.java:76)
> at org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy
> .restartTasks(EagerSchedulingStrategy.java:57)
> at org.apache.flink.runtime.scheduler.DefaultScheduler
> .lambda$restartTasks$2(DefaultScheduler.java:265)
> at java.base/java.util.concurrent.CompletableFuture$UniRun.tryFire(
> CompletableFuture.java:783)
> at java.base/java.util.concurrent.CompletableFuture$Completion.run(
> CompletableFuture.java:478)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(
> AkkaRpcActor.java:402)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(
> AkkaRpcActor.java:195)
> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
> .handleRpcMessage(FencedAkkaRpcActor.java:74)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(
> AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.ja

Re: Python Function for Datastream Transformation in Flink Java Job

2022-02-16 Thread Piotr Nowojski
Hi,

As far as I can tell the answer is unfortunately no. With Table API (SQL)
things are much simpler, as you have a restricted number of types of
columns that you need to support and you don't need to support arbitrary
Java classes as the records.

I'm shooting blindly here, but maybe you can use your Python UDF in Table
API and then convert a Table to DataStream? [1]

Piotrek

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/

śr., 16 lut 2022 o 09:46 Jesry Pandawa 
napisał(a):

> Hello,
>
> Currently, Flink already supports adding Python UDF and using that on
> Flink Java job. It can be used on Table API. Can we do the same for
> creating custom python function for Datastream transformation and use that
> on Flink Java job?
>
> Regards,
>
> Jesry
>


Re: How to proper hashCode() for keys.

2022-02-16 Thread Ali Bahadir Zeybek
Hello John,

The requirement you have can be achieved by having a process window function
in order to enrich the aggregate data with metadata information of the
window.
Please have a look at the training example[1] to see how to access the
window
information within a process window function.

Sincerely,

Ali

[1]:
https://github.com/ververica/flink-training/blob/master/troubleshooting/introduction/src/main/java/com/ververica/flink/training/exercises/TroubledStreamingJob.java#L155

On Mon, Feb 14, 2022 at 5:43 PM John Smith  wrote:

> Hi, I get that but I want to output that key so I can store it in Elastic
> grouped by the minute.
>
> I had explained with data examples above. But just to be sure
>
> Lets pretends the current WALL time is 2022-02-14T11:38:01.123Z and I get
> the bellow clicks
>
> event time here (ignored/not read)|cnn.com|/some-article
> event time here (ignored/not read)|cnn.com|/some-article
> event time here (ignored/not read)|cnn.com|/another-article
> event time here (ignored/not read)|cnn.com|/some-article
>
> The output should be...
>
> 2022-02-14T11:38:00.000Z (this is the wall time rounded to the minute)|
> cnn.com|some-article  count = 3
> 2022-02-14T11:38:00.000Z( this is the wall time rounded to the minute)|
> cnn.com|another-article  count = 1
>
>
>
>
>
> On Mon, Feb 14, 2022 at 10:08 AM Ali Bahadir Zeybek 
> wrote:
>
>> Hello John,
>>
>> That is what exactly the window operator does for you. Can you please
>> check the
>> documentation[1] and let us know what part of the window operator alone
>> does
>> not suffice for the use case?
>>
>> Sincerely,
>>
>> Ali
>>
>> [1]:
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows
>>
>> On Mon, Feb 14, 2022 at 4:03 PM John Smith 
>> wrote:
>>
>>> Because I want to group them for the last X minutes. In this case last 1
>>> minute.
>>>
>>> On Mon, Feb 14, 2022 at 10:01 AM Ali Bahadir Zeybek 
>>> wrote:
>>>
 Hello John,

 Then may I ask you why you need to use a time attribute as part of your
 key?
 Why not just key by the fields like `mydomain.com` and `some-article`
 in your
 example and use only window operator for grouping elements based on
 time?

 Sincerely,

 Ali

 On Mon, Feb 14, 2022 at 3:55 PM John Smith 
 wrote:

> Hi, thanks. As previously mentioned, processing time. So I
> regardless when the event was generated I want to count all events I have
> right now (as soon as they are seen by the flink job).
>
> On Mon, Feb 14, 2022 at 4:16 AM Ali Bahadir Zeybek 
> wrote:
>
>> Hello John,
>>
>> Currently you are grouping the elements two times based on some time
>> attribute, one while keying - with event time - and one while
>> windowing - with
>> processing time. Therefore, the windowing mechanism produces a new
>> window
>> computation when you see an element with the same key but arrived
>> later from
>> the previous window start and end timestamps. Can you please clarify
>> with
>> which notion of time you would like to handle the stream of data?
>>
>> Sincerely,
>>
>> Ali
>>
>> On Fri, Feb 11, 2022 at 6:43 PM John Smith 
>> wrote:
>>
>>> Ok I used the method suggested by Ali. The error is gone. But now I
>>> see multiple counts emitted for the same key...
>>>
>>> DataStream slStream = env.fromSource(kafkaSource, 
>>> WatermarkStrategy.noWatermarks(), "Kafka Source")
>>> .uid(kafkaTopic).name(kafkaTopic)
>>> .setParallelism(kafkaParallelism)
>>> .flatMap(new MapToMyEvent("my-event", windowSizeMins, 
>>> "message")) <-- Timestamp in GMT created here rounded to the 
>>> closest minute down.
>>> .uid("map-json-logs").name("map-json-logs");
>>>
>>> slStream.keyBy(new MinutesKeySelector())
>>> 
>>> .window(TumblingProcessingTimeWindows.of(Time.minutes(windowSizeMins))) 
>>> < Tumbling window of 1 minute.
>>>
>>>
>>>
>>> So below you will see a new count was emitted at 16:51 and 16:55
>>>
>>> {"countId":"2022-02-11T16:50:00Z|mydomain.com
>>> |/some-article","countDateTime":"2022-02-11T16:50:00Z","domain":"
>>> mydomain.com","uri":"/some-article","count":3542}
>>> -
>>> {"countId":"2022-02-11T16:51:00Z|mydomain.com
>>> |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":"
>>> mydomain.com","uri":"/some-article","count":16503}
>>> {"countId":"2022-02-11T16:51:00Z|mydomain.com
>>> |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":"
>>> mydomain.com","uri":"/some-article","count":70}
>>> -
>>>
>>> {"countId":"2022-02-11T16:52:00Z|mydomain.com
>>> |/some-article","countDateTime":"2022-02-11T16:52:00Z","domain":"
>>> mydomain.com","uri":"/some-article","count":16037}
>>> {"countId":"2022

Re: SQL / Table Api lag() over partition by ... and windowing

2022-02-16 Thread Francesco Guardiani
> Which does not work since it cannot find lag function :-(

lag and over are not supported at the moment with Table, so you need to use
SQL for that.

> *Will this obey the watermark strategy of the original Datastream? (see
further below)*

Yes. The code at the end of the mail is correct and should work fine. I
have just one comment, if you're using this DataStream only to create the
Table instance, you could also just define the watermark using the Schema
builder itself, as described here:
https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/Schema.Builder.html#watermark-java.lang.String-org.apache.flink.table.expressions.Expression-

On Wed, Feb 16, 2022 at 2:35 PM HG  wrote:

> Hello all
>
> I need to calculate the difference in time between ordered rows per
> transactionId. All events should arrive within the timeframe set by the
> out-of-orderness ( a couple of minutes). Events outside this time should be
> ignored.
>
> In SQL this would be :
> select transactionId  , handlingTime , *handlingTime -
> lag(handlingTime,1) over (partition by transactionId order by handlingTime)
> as elapsedTime*
> from table
>
> When I code :
> Table result = tableEnv.sqlQuery("select transactionId, handlingTime, 
> *handlingTime
> - if(null(lag(handlingTime) over (partition by transactionId order by
> handlingTime),handlingTime) as elapsedTime* from tupled3DsTableVw")
>
> *Will this obey the watermark strategy of the original Datastream? (see
> further below)*
> I have also tried to use the Table Api with a session window like :
> Table t = tupled3DsTable
>.window(Session.withGap(lit(2).minutes()).on($("handlingTime")).as("w"
> )).groupBy($("transactionId"), $("w"))
>.select($("handlingTime"), $("transactionId"), $("originalEvent"), $(
> "handlingTime").max().over($("w")));
> This gives:
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Could not resolve over call.
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>
> and also :
> Table t = tupled3DsTable
> .window(Over.partitionby($("transactionId")).orderBy($(
> "handlingTime")).as("w")).select($("handlingTime"), $("transactionId"), $(
> "originalEvent"), $("handlingTime").lag().as("previousHandlingTime"));
> Which does not work since it cannot find lag function :-(
>
> In java I have the following setup:
> WatermarkStrategy> wmstrategy =
> WatermarkStrategy
> . String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
> .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
> .withTimestampAssigner(new SerializableTimestampAssigner String, String>>() {
> @Override
> public long extractTimestamp(Tuple3
> element, long handlingTime) {
> return element.f0;
>  }});
>
> DataStream> tuple3dswm = 
> tuple3ds.assignTimestampsAndWatermarks(wmstrategy);
>
> Table tupled3DsTable = tableEnv.fromDataStream(tuple3ds, 
> Schema.newBuilder().column("f0","TIMESTAMP_LTZ(3)").column("f1","STRING").column("f2","STRING").watermark("f0",
>  "SOURCE_WATERMARK()")
> .build()).as("handlingTime", "transactionId", "originalEvent");
>
>
>
>
>
>


Basic questions about resuming stateful Flink jobs

2022-02-16 Thread James Sandys-Lumsdaine
Hi all,

I have a 1.14 Flink streaming workflow with many stateful functions that has a 
FsStateBackend and checkpointed enabled, although I haven't set a location for 
the checkpointed state.

I've really struggled to understand how I can stop my Flink job and restart it 
and ensure it carries off exactly where is left off by using the state or 
checkpoints or savepoints. This is not clearly explained in the book or the web 
documentation.

Since I have no control over my Flink job id I assume I can not force Flink to 
pick up the state recorded under the jobId directory for the FsStateBackend. 
Therefore I think​ Flink should read back in the last checkpointed data but I 
don't understand how to force my program to read this in? Do I use retained 
checkpoints or not? How can I force my program either use the last checkpointed 
state (e.g. when running from my IDE, starting and stopping the program) or 
maybe force it not to read in the state and start completely fresh?

The web documentation talks about bin/flink but I am running from my IDE so I 
want my Java code to control this progress using the Flink API in Java.

Can anyone give me some basic pointers as I'm obviously missing something 
fundamental on how to allow my program to be stopped and started without losing 
all the state.

Many thanks,

James.



Re: Exception Help

2022-02-16 Thread Francesco Guardiani
Are you sure you're always matching the output row type provided by
DynamicTableFactory

?

Also looking at the javadocs it seems like you can use both internal and
external types, depending on your preference:

* AsyncTableFunction

* AsyncTableFunctionProvider


Not sure how I can help more without looking at the full code, perhaps can
you provide a fully working reproducible?

FG


On Wed, Feb 16, 2022 at 4:15 PM Jonathan Weaver 
wrote:

> No, I'm creating a custom SQL lookup table (which uses
> AsyncTableFunction) which requires the internal types.
>
> I implement
> the LookupTableSource, AsyncTableFunction, DynamicTableSourceFactory
> trio as per the examples in the docs.
>
> My construction is the equivalent of this, and it still errors with that
> exception when using exactly this.
>
>   Map foo = new HashMap ArrayData>();
>   foo.put(
>   StringData.fromString("foo"),
>   new GenericArrayData(new Object[]
> {StringData.fromString("bar")}));
>   MapData mapColumn = new GenericMapData(foo);
>
>   return (RowData)GenericRowData(new Object[] { mapColumn
> } );
>
>
>
>
> On Wed, Feb 16, 2022 at 8:02 AM Francesco Guardiani <
> france...@ververica.com> wrote:
>
>> Hi,
>>
>> From what I understand, you're creating a scalar function taking a string
>> with json and then converting it to a map using a custom function.
>>
>> Assuming I understood correctly, I think the problem here is that you're
>> using internal data types for UDFs, which is discouraged in most of the use
>> cases. Rather than using StringData, MapData, ArrayData etc you should just
>> use Java's String, Map and arrays. Check out this particular paragraph of
>> our docs that shows using complex types for scalar functions:
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#type-inference.
>> Please try to convert
>> Looking only at the exception you provide here, it definitely seems like
>> a wrong usage of the internal data types, like that Tuple2 inserted into a
>> GenericMapData. There are no checks in GenericMapData to check that you're
>> constructing it with the correct types, and since Tuple2 is not a correct
>> type, the serializer just fails hard.
>>
>> Please correct me if I misunderstood what you're doing, and in case
>> provide more info about what your goal and how you've implemented the job.
>>
>> FG
>>
>> On Wed, Feb 16, 2022 at 4:02 AM Jonathan Weaver 
>> wrote:
>>
>>> I've narrowed it down to a TableSource that is returning a MAP type as a
>>> column. Only errors when the column is referenced, and not on the first
>>> row, but somewhere in the stream of rows.
>>>
>>> On 1.15 master branch (I need the new JSON features in 1.15 for this
>>> project so riding the daily snapshot during development)
>>>
>>> In catalog column is defined as
>>> .column("vc", DataTypes.MAP(DataTypes.STRING(),
>>> DataTypes.ARRAY(DataTypes.STRING(
>>>
>>> My TableFunction is returning the following for the column
>>>
>>>   return new GenericMapData(
>>>   fields.toJavaMap(
>>>   v ->
>>>   new Tuple2(
>>>   StringData.fromString(v.getKey()),
>>>   new GenericArrayData(
>>>   v.getValue().isArray()
>>>   ? List.ofAll(() ->
>>> ((ArrayNode) v.getValue()).elements())
>>>   .map(vv ->
>>> StringData.fromString(vv.asText()))
>>>
>>> .toJavaArray(StringData[]::new)
>>>   :
>>> List.of(StringData.fromString(v.getValue().asText()))
>>>
>>> .toJavaArray(StringData[]::new);
>>> });
>>>
>>> Where it's basically looping over a jackson JsonNode parsed from a DB
>>> table and returning as a MAP (the keys and values are sparse amongst
>>> hundreds of possibilities). The values in the Json are either a single text
>>> value, or an array of text values so I'm just turning all values into an
>>> array.
>>>
>>> There are around ~190 key-values in the map on average.
>>>
>>> The SQL that references the column is just
>>>
>>> COALESCE(ELEMENT(vc [ 'ARCHIVE_TASKING' ]), product_type) type,
>>>
>>> So looks up a specific key and uses it if it exists, otherwise coalesces
>>

Re: getting "original" ingestion timestamp after using a TimestampAssigner

2022-02-16 Thread Piotr Nowojski
Hi Frank,

I'm not sure exactly what you are trying to accomplish, but yes. In
the TimestampAssigner you can only return what should be the new timestamp
for the given record.

If you want to use "ingestion time" - "true even time"  as some kind of
delay metric, you will indeed need to have both of them calculated
somewhere. You could:
1. As you described, use first ingestion time assigner, a mapper function
to extract this to a separate field, re assign the true event time, and
calculate the delay
2. Or you could simply assign the correct event time and in a simple single
mapper, chained directly to the source, use for example
`System.currentTimeMillis() - eventTime` to calculate this delay in a
single step. After all, that's more or less what Flink is doing to
calculate the ingestion time [1]

Best, Piotrek

[1]
https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-core/src/main/java/org/apache/flink/api/common/eventtime/IngestionTimeAssigner.java

śr., 16 lut 2022 o 09:46 Frank Dekervel  napisał(a):

> Hello,
>
> I'm getting messages from a kafka stream. The messages are JSON records
> with a "timestamp" key in the json. This timestamp key contains the time
> at which the message was generated. Now i'd like if these messages had a
> delivery delay (eg delay between message generation and arrival in
> kafka). So i don't want to have the "full" delay (eg difference between
> generation time and processing time), just de delivery delay.
>
> In my timestamp assigner i get a "long" with the original timestamp as
> an argument, but i cannot yield an updated record from the timestamp
> assigner (eg with an extra field "deliveryDelay" or so).
>
> So i guess my only option is to not specify the timestamp/watermark
> extractor in the env.fromSource, then first mapping the stream to add a
> lateness field and only after that reassign timestamps/watermarks ... is
> that right ?
>
> Thanks!
>
> Greetings,
> Frank
>
>
>
>


Re: Basic questions about resuming stateful Flink jobs

2022-02-16 Thread Piotr Nowojski
Hi James,

Sure! The basic idea of checkpoints is that they are fully owned by the
running job and used for failure recovery. Thus by default if you stopped
the job, checkpoints are being removed. If you want to stop a job and then
later resume working from the same point that it has previously stopped,
you most likely want to use savepoints [1]. You can stop the job with a
savepoint and later you can restart another job from that savepoint.

Regarding the externalised checkpoints. Technically you could use them in
the similar way, but there is no command like "take a checkpoint and stop
the job". Nevertheless you might consider enabling them as this allows you
to manually cancel the job if it enters an endless recovery/failure
loop, fix the underlying issue, and restart the job from the externalised
checkpoint.

Best,
Piotrek

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/savepoints/

śr., 16 lut 2022 o 16:44 James Sandys-Lumsdaine 
napisał(a):

> Hi all,
>
> I have a 1.14 Flink streaming workflow with many stateful functions that
> has a FsStateBackend and checkpointed enabled, although I haven't set a
> location for the checkpointed state.
>
> I've really struggled to understand how I can stop my Flink job and
> restart it and ensure it carries off exactly where is left off by using the
> state or checkpoints or savepoints. This is not clearly explained in the
> book or the web documentation.
>
> Since I have no control over my Flink job id I assume I can not force
> Flink to pick up the state recorded under the jobId directory for the
> FsStateBackend. Therefore I *think*​ Flink should read back in the last
> checkpointed data but I don't understand how to force my program to read
> this in? Do I use retained checkpoints or not? How can I force my program
> either use the last checkpointed state (e.g. when running from my IDE,
> starting and stopping the program) or maybe force it *not *to read in the
> state and start completely fresh?
>
> The web documentation talks about bin/flink but I am running from my IDE
> so I want my Java code to control this progress using the Flink API in Java.
>
> Can anyone give me some basic pointers as I'm obviously missing something
> fundamental on how to allow my program to be stopped and started without
> losing all the state.
>
> Many thanks,
>
> James.
>
>


Fwd: How to get memory specific metrics for tasknodes

2022-02-16 Thread Diwakar Jha
Hello,

Could someone please help! I'm trying  to publish only these three metrics
per tasknode
Status.JVM.Memory.Heap.Used
Status.JVM.Memory.Heap.Committed
Status.JVM.Memory.NonHeap.Max

But, with my current setting I see all Flink metrics getting published.
Please let me know if I need to provide any other information.

Thank you!


-- Forwarded message -
From: Diwakar Jha 
Date: Tue, Feb 15, 2022 at 1:31 PM
Subject: How to get memory specific metrics for tasknodes
To: user 


Hello,

I'm running Flink 1.11 on AWS EMR using the Yarn application. I'm trying to
access memory metrics(Heap.Max, Heap.Used) per tasknode in CloudWatch. I
have 50 tasknodes and it creates Millions of metrics(including per
operator) though I need only a few metrics per tasknode (Heap.Max,
Heap.Used). It is way too much than my current cloudwatch limit and I also
don't need so many metrics.
Could someone please help me how to get only the tasknode memory specific
metrics ?
I'm referring to this doc :
https://nightlies.apache.org/flink/flink-docs-release-1.7/monitoring/metrics.html#memory

I used the following approach to enable Flink metrics.
1. Enable Flink Metrics
copy /opt/flink-metrics-statsd-x.x.jar into the /lib folder of your Flink
distribution
2.  Add StatsD metric reporter in Flink-conf to send to CloudWatch Agent's
StatsD interface
metrics.reporters: stsd
metrics.reporter.stsd.factory.class:
org.apache.flink.metrics.statsd.StatsDReporterFactory
metrics.reporter.stsd.host: localhost
metrics.reporter.stsd.port: 8125
3. Setup tasknode scope
metrics.scope.tm: taskmanager
4. setup Cloudwatch agent to publish the metrics
"metrics":{
  "namespace": "CustomeNamespace/FlinkMemoryMetrics",
  "metrics_collected":{
 "statsd":{
"service_address":":8125",
"metrics_collection_interval":60,
"metrics_aggregation_interval":300
 }
  }
  },

Thanks!


Re: Failed to serialize the result for RPC call : requestMultipleJobDetails after Upgrading to Flink 1.14.3

2022-02-16 Thread Chirag Dewan
Thanks Chesnay. So it should be the /jobs API only? If that's the case I can 
disable my dashboards. 

Sent from Yahoo Mail on Android 
 
  On Wed, 16 Feb 2022 at 2:01 pm, Chesnay Schepler wrote:   
 There are no side-effects; it just means that certain pages of the UI / REST 
API aren't working (i.e., the overview over all jobs).
  
  On 16/02/2022 06:15, Chirag Dewan wrote:
  
 
 Ah, should have looked better. I think 
https://issues.apache.org/jira/browse/FLINK-25732 causes this. 
  Are there any side effects of this? How can I avoid this problem so that it 
doesn't affect my processing? 
  Thanks 
  On Wednesday, 16 February, 2022, 10:19:12 am IST, Chirag Dewan 
 wrote:  
  
  Hi, 
  We are running a Flink cluster with 2 JMs in HA and 2 TMs on a standalone K8 
cluster. After migrating to 1.14.3, we started to see some exceptions in the JM 
logs: 
2022-02-15 11:30:00,100 ERROR 
org.apache.flink.runtime.rest.handler.job.JobIdsHandler      [] POD_NAME: 
eric-bss-em-sm-streamserver-jobmanager-868fd68b5d-zs9pv - Unhandled exception. 
org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Failed to 
serialize the result for RPC call : requestMultipleJobDetails.         
atorg.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:417)~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
         
atorg.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$sendAsyncResponse$2(AkkaRpcActor.java:373)~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
         
atjava.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) 
~[?:1.8.0_321]         
atjava.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:848)
 ~[?:1.8.0_321]         
atjava.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2168) 
~[?:1.8.0_321]         
atorg.apache.flink.runtime.rpc.akka.AkkaRpcActor.sendAsyncResponse(AkkaRpcActor.java:365)~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
         
atorg.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:332)~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
         
atorg.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
         
atorg.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
         
atorg.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
         at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
         at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
         at 
scala.PartialFunction.applyOrElse(PartialFunction.scala:123)~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
         at 
scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
         at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
         at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
         at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
         at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
         at 
akka.actor.Actor.aroundReceive(Actor.scala:537)~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
         at 
akka.actor.Actor.aroundReceive$(Actor.scala:535)~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
         at 
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)~[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
         at 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
         at 
akka.actor.ActorCell.invoke(ActorCell.scala:548)[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
         at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]
         at akka.dispatch.Mailbox.run(Mailbox.scala:231) 
[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]         at 
akka.dispatch.Mailbox.exec(Mailbox.scala:243) 
[flink-rpc-akka_e7ab3036-42d0-412d-9a01-e55694061b27.jar:1.14.3]         at 
java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_321]   
      
atjava.util.concurren

RE: Basic questions about resuming stateful Flink jobs

2022-02-16 Thread Sandys-Lumsdaine, James
Thanks for your reply, Piotr.

Some follow on questions:
>". Nevertheless you might consider enabling them as this allows you to 
>manually cancel the job if it enters an endless recovery/failure loop, fix the 
>underlying issue, and restart the job from the externalised checkpoint.

How is this done? Are you saying the retained checkpoint (i.e. the last 
checkpoint that isn’t deleted) can somehow be used when restarting the Flink 
application? If I am running in my IDE and just using the local streaming 
environment, how can I test my recovery code either with a retained checkpoint? 
All my attempts so far just say “No checkpoint found during restore.” Do I copy 
the checkpoint into a savepoint directory and treat it like a savepoint?

On the topic of savepoints, that web page [1] says I need to use “bin/flink 
savepoint” or “bin/flink stop --savepointPath” – but again, if I’m currently 
not running in a real cluster how else can I create and recover from the save 
points?

From what I’ve read there is state, checkpoints and save points – all of them 
hold state - and currently I can’t get any of these to restore when developing 
in an IDE and the program builds up all state from scratch. So what else do I 
need to do in my Java code to tell Flink to load a savepoint?

Thanks,

James.


From: Piotr Nowojski 
Sent: 16 February 2022 16:36
To: James Sandys-Lumsdaine 
Cc: user@flink.apache.org
Subject: Re: Basic questions about resuming stateful Flink jobs

CAUTION: External email. The email originated outside of our company
Hi James,

Sure! The basic idea of checkpoints is that they are fully owned by the running 
job and used for failure recovery. Thus by default if you stopped the job, 
checkpoints are being removed. If you want to stop a job and then later resume 
working from the same point that it has previously stopped, you most likely 
want to use savepoints [1]. You can stop the job with a savepoint and later you 
can restart another job from that savepoint.

Regarding the externalised checkpoints. Technically you could use them in the 
similar way, but there is no command like "take a checkpoint and stop the job". 
Nevertheless you might consider enabling them as this allows you to manually 
cancel the job if it enters an endless recovery/failure loop, fix the 
underlying issue, and restart the job from the externalised checkpoint.

Best,
Piotrek

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/savepoints/

śr., 16 lut 2022 o 16:44 James Sandys-Lumsdaine 
mailto:jas...@hotmail.com>> napisał(a):
Hi all,

I have a 1.14 Flink streaming workflow with many stateful functions that has a 
FsStateBackend and checkpointed enabled, although I haven't set a location for 
the checkpointed state.

I've really struggled to understand how I can stop my Flink job and restart it 
and ensure it carries off exactly where is left off by using the state or 
checkpoints or savepoints. This is not clearly explained in the book or the web 
documentation.

Since I have no control over my Flink job id I assume I can not force Flink to 
pick up the state recorded under the jobId directory for the FsStateBackend. 
Therefore I think​ Flink should read back in the last checkpointed data but I 
don't understand how to force my program to read this in? Do I use retained 
checkpoints or not? How can I force my program either use the last checkpointed 
state (e.g. when running from my IDE, starting and stopping the program) or 
maybe force it not to read in the state and start completely fresh?

The web documentation talks about bin/flink but I am running from my IDE so I 
want my Java code to control this progress using the Flink API in Java.

Can anyone give me some basic pointers as I'm obviously missing something 
fundamental on how to allow my program to be stopped and started without losing 
all the state.

Many thanks,

James.


The information transmitted is intended only for the person or entity to which 
it is addressed and may contain confidential and/or privileged material. Any 
review, retransmission, dissemination or other use of, or taking of any action 
in reliance upon, this information by persons or entities other than the 
intended recipient is prohibited. If you received this in error, please contact 
the sender and delete the material from any computer.

This communication is for informational purposes only. It is not intended as an 
offer or solicitation for the purchase or sale of any financial instrument or 
as an official confirmation of any transaction. Any market prices, data and 
other information are not warranted as to completeness or accuracy and are 
subject to change without notice. Any comments or statements made herein do not 
necessarily reflect those of Systematica Investments UK LLP, its parents, 
subsidiaries or affiliates.

Systematica Investments UK LLP (“SIUK”), which is auth

Re: SQL / Table Api lag() over partition by ... and windowing

2022-02-16 Thread HG
Thanks

Would the option for datastream be to write a MapPartitionFunction?

Op wo 16 feb. 2022 om 16:35 schreef Francesco Guardiani <
france...@ververica.com>:

> > Which does not work since it cannot find lag function :-(
>
> lag and over are not supported at the moment with Table, so you need to
> use SQL for that.
>
> > *Will this obey the watermark strategy of the original Datastream? (see
> further below)*
>
> Yes. The code at the end of the mail is correct and should work fine. I
> have just one comment, if you're using this DataStream only to create the
> Table instance, you could also just define the watermark using the Schema
> builder itself, as described here:
> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/Schema.Builder.html#watermark-java.lang.String-org.apache.flink.table.expressions.Expression-
>
> On Wed, Feb 16, 2022 at 2:35 PM HG  wrote:
>
>> Hello all
>>
>> I need to calculate the difference in time between ordered rows per
>> transactionId. All events should arrive within the timeframe set by the
>> out-of-orderness ( a couple of minutes). Events outside this time should be
>> ignored.
>>
>> In SQL this would be :
>> select transactionId  , handlingTime , *handlingTime -
>> lag(handlingTime,1) over (partition by transactionId order by handlingTime)
>> as elapsedTime*
>> from table
>>
>> When I code :
>> Table result = tableEnv.sqlQuery("select transactionId, handlingTime, 
>> *handlingTime
>> - if(null(lag(handlingTime) over (partition by transactionId order by
>> handlingTime),handlingTime) as elapsedTime* from tupled3DsTableVw")
>>
>> *Will this obey the watermark strategy of the original Datastream? (see
>> further below)*
>> I have also tried to use the Table Api with a session window like :
>> Table t = tupled3DsTable
>>.window(Session.withGap(lit(2).minutes()).on($("handlingTime")).as("w"
>> )).groupBy($("transactionId"), $("w"))
>>.select($("handlingTime"), $("transactionId"), $("originalEvent"), $(
>> "handlingTime").max().over($("w")));
>> This gives:
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: Could not resolve over call.
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>>
>> and also :
>> Table t = tupled3DsTable
>> .window(Over.partitionby($("transactionId")).orderBy($(
>> "handlingTime")).as("w")).select($("handlingTime"), $("transactionId"),
>> $("originalEvent"), $("handlingTime").lag().as("previousHandlingTime"));
>> Which does not work since it cannot find lag function :-(
>>
>> In java I have the following setup:
>> WatermarkStrategy> wmstrategy =
>> WatermarkStrategy
>> .> String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
>> .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
>> .withTimestampAssigner(new SerializableTimestampAssigner> String, String>>() {
>> @Override
>> public long extractTimestamp(Tuple3
>> element, long handlingTime) {
>> return element.f0;
>>  }});
>>
>> DataStream> tuple3dswm = 
>> tuple3ds.assignTimestampsAndWatermarks(wmstrategy);
>>
>> Table tupled3DsTable = tableEnv.fromDataStream(tuple3ds, 
>> Schema.newBuilder().column("f0","TIMESTAMP_LTZ(3)").column("f1","STRING").column("f2","STRING").watermark("f0",
>>  "SOURCE_WATERMARK()")
>> .build()).as("handlingTime", "transactionId", "originalEvent");
>>
>>
>>
>>
>>
>>


Re: Basic questions about resuming stateful Flink jobs

2022-02-16 Thread Cristian Constantinescu
Hi James,

I literally just went through what you're doing at my job. While I'm using
Apache Beam and not the Flink api directly, the concepts still apply.
TL;DR: it works as expected.

What I did is I set up a kafka topic listener that always throws an
exception if the last received message's timestamp is less than 5 minutes
from when the processing happens (basically simulating a code fix after 5
minutes). Then I let the pipeline execute the normal processing and I'd
send a message on the exception topic.

I have set up flink to retry twice, Beam offers a flag
(numberOfExecutionRetries) [1] but it boils down to one of the Flink flags
here [2]. What that does is that once Flink encounters an exception, say
for example like my exception throwing topic, it will restore itself from
the last checkpoint which includes kafka offsets and other things that
transforms might have in there. Effectively this replays the messages after
the checkpoint, and of course, my exception is thrown again when it tries
to reprocess that message. After the second try, Flink will give up and the
Flink job will stop (just like if you cancel it). If ran in an IDE, process
will stop, if ran on a Flink cluster, the job will stop.

When a Flink job stops, it usually clears up its checkpoints, unless you
externalize them, for Beam it's the externalizedCheckpointsEnabled flag set
to true. Check the docs to see what that maps to.

Then, when you restart the flink job, just add the -s Flink flag followed
by the latest checkpoint path. If you're running from an IDE, say IntelliJ,
you can still pass the -s flag to Main method launcher.

We use a bash script to restart or Flink jobs in our UAT/PROD boxes for
now, you can use this command: find "$PATH_WHERE_YOU_SAVE_STATE" -name
"_metadata" -print0 | xargs -r -0 ls -1 -t | head -1 to find the latest
checkpoint in that path. And you know where PATH_WHERE_YOU_SAVE_STATE is,
because you have to specify it when you initially start the flink job. For
Beam, that's the stateBackendStoragePath flag. This is going to pick up the
latest checkpoint before the pipeline stopped and will continue from it
with your updated jar that handles the exception properly.

Also note that I think you can set all these flags with Java code. In Beam
it's just adding to the Main method args parameter or adding them to the
PipelineOptions once you build that object from args. I've never used the
Flink libs, just the runner, but from [1] and [3] it looks like you can
configure things in code if you prefer that.

Hope it helps,
Cristian

[1] https://beam.apache.org/documentation/runners/flink/
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/task_failure_recovery/
[3]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/savepoints/#configuration


On Wed, Feb 16, 2022 at 12:28 PM Sandys-Lumsdaine, James <
james.sandys-lumsda...@systematica.com> wrote:

> Thanks for your reply, Piotr.
>
>
>
> Some follow on questions:
>
> >". Nevertheless you might consider enabling them as this allows you to
> manually cancel the job if it enters an endless recovery/failure loop, fix
> the underlying issue, and restart the job from the externalised checkpoint.
>
>
>
> How is this done? Are you saying the retained checkpoint (i.e. the last
> checkpoint that isn’t deleted) can somehow be used when restarting the
> Flink application? If I am running in my IDE and just using the local
> streaming environment, how can I test my recovery code either with a
> retained checkpoint? All my attempts so far just say “No checkpoint found
> during restore.” Do I copy the checkpoint into a savepoint directory and
> treat it like a savepoint?
>
>
>
> On the topic of savepoints, that web page [1] says I need to use
> “bin/flink savepoint” or “bin/flink stop --savepointPath” – but again, if
> I’m currently not running in a real cluster how else can I create and
> recover from the save points?
>
>
>
> From what I’ve read there is state, checkpoints and save points – all of
> them hold state - and currently I can’t get any of these to restore when
> developing in an IDE and the program builds up all state from scratch. So
> what else do I need to do in my Java code to tell Flink to load a savepoint?
>
>
>
> Thanks,
>
>
>
> James.
>
>
>
>
>
> *From:* Piotr Nowojski 
> *Sent:* 16 February 2022 16:36
> *To:* James Sandys-Lumsdaine 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Basic questions about resuming stateful Flink jobs
>
>
>
> *CAUTION: External email. The email originated outside of our company *
>
> Hi James,
>
>
>
> Sure! The basic idea of checkpoints is that they are fully owned by the
> running job and used for failure recovery. Thus by default if you stopped
> the job, checkpoints are being removed. If you want to stop a job and then
> later resume working from the same point that it has previously stopped,
> you most likely want to use savepoints [1]. You can stop the job with a
> savepoint and lat

Task manager errors with Flink ZooKeeper High Availability

2022-02-16 Thread Koffman, Noa (Nokia - IL/Kfar Sava)

Hi,
We are currently running flink in session deployment on k8s cluster, with 1 
job-manager and 3 task-managers
To support recovery from job-manager failure, following a different mail thread,
We have enabled zookeeper high availability using a k8s Persistent Volume

To achieve this, we’ve added these conf values:
high-availability: zookeeper
high-availability.zookeeper.quorum: zk-noa-edge-infra:2181
high-availability.zookeeper.path.root: /flink
high-availability.storageDir: /flink_state
high-availability.jobmanager.port: 6150
for the storageDir, we are using a k8s persistent volume with ReadWriteOnce

Recovery of job-manager failure is working now, but it looks like there are 
issues with the task-managers:
The same configuration file is used in the task-managers as well, and there are 
a lot of error in the task-manager’s logs –
java.io.FileNotFoundException: 
/flink_state/flink/blob/job_9f4be579c7ab79817e25ed56762b7623/blob_p-5cf39313e388d9120c235528672fd267105be0e0-938e4347a98aa6166dc2625926fdab56
 (No such file or directory)

It seems that the task-managers are trying to access the job-manager’s storage 
dir – can this be avoided?
The task manager does not have access to the job manager persistent volume – is 
this mandatory?
If we don’t have the option to use shared storage, is there a way to make 
zookeeper hold and manage the job states, instead of using the shared storage?

Thanks
Noa




Re: Implement watermark buffering with Process Function

2022-02-16 Thread David Anderson
I'm afraid not. The DataStream window implementation uses internal APIs to
manipulate the state backend namespace, which isn't possible to do with the
public-facing API. And without this, you can't implement this as
efficiently.

David

On Wed, Feb 16, 2022 at 12:04 PM Ruibin Xing  wrote:

> Hi,
>
> I'm trying to implement customized state logic with KeyedProcessFunction.
> But I'm not quite sure how to implement the correct watermark behavior when
> late data is involved.
>
> According to the answer on stackoverflow:
> https://stackoverflow.com/questions/59468154/how-to-sort-an-out-of-order-event-time-stream-using-flink
> , there should be a state buffering all events until watermark passed the
> expected time and a event time trigger will fetch from the state and do the
> calculation. The buffer type should be Map> where T is the
> timestamp and E is the event type.
>
> However, the interface provided by Flink currently is only a MapStae V>. If the V is a List and buffered all events, every time an event
> comes Flink will do ser/deser and could be very expensive when throughput
> is huge.
>
> I checked the built-in window implementation which implements the
> watermark buffering.  It seems that WindowOperator consists of some
> InternalStates,  of which signature is where window is namespace or key, if
> I understand correctly. But internal states are not available for Flink
> users.
>
> So my question is: is there an efficient way to simulate watermark
> buffering using process function for Flink users?
>
> Thanks.
>


Re: Change column names Pyflink Table/Datastream API

2022-02-16 Thread Francis Conroy
Hi Dian,

Using .alias ended up working for me. Thanks for getting back to me.


On Thu, 17 Feb 2022 at 01:15, Dian Fu  wrote:

> Hi Francis,
>
> There should be multiple ways to achieve this. Do you mean that all these
> methods don't work for you? If so, could you show the sample code? Besides,
> another way you may try is `inputmetrics.alias("timestamp, device, name,
> value")`.
>
> Regards,
> Dian
>
> On Wed, Feb 16, 2022 at 8:14 AM Francis Conroy <
> francis.con...@switchdin.com> wrote:
>
>> Hi all,
>>
>> I'm hoping to be able to change the column names when creating a table
>> from a datastream, the flatmap function generating the stream is returning
>> a Tuple4.
>>
>> It's currently working as follows:
>>
>> inputmetrics = table_env.from_data_stream(ds, Schema.new_builder()
>>   .column("f0", "BIGINT")
>>   .column("f1", "STRING")
>>   .column("f2", "STRING")
>>   .column("f3", "DOUBLE")
>>   .build())
>>
>> I'm trying to rename the columns f0, f1, f2, f3 to proper names e.g.
>> timestamp, device, name, value. So far I've tried using from_fields, and
>>
>> column_by_expression("timestamp", "f0")
>>
>> I'd prefer not to change the output type of my previous flatMapFunction
>> (to say a named Row) for performance purposes.
>>
>> Thanks,
>> Francis
>>
>> This email and any attachments are proprietary and confidential and are
>> intended solely for the use of the individual to whom it is addressed. Any
>> views or opinions expressed are solely those of the author and do not
>> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
>> received this email in error, please let us know immediately by reply email
>> and delete it from your system. You may not use, disseminate, distribute or
>> copy this message nor disclose its contents to anyone.
>> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
>> Australia
>>
>

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


Re:Re: [statefun] Looking for a polyglot example

2022-02-16 Thread casel.chen
Thank you Igal. The remote functions from different language can be "mixed" by 
deploying different unit services in Dockerfile, and they can exchange messages 
by common message types like json, protobuf etc, or even connect together by 
in/out kafka topics, right? 
















At 2022-02-16 22:30:27, "Igal Shilman"  wrote:

Hello,
You can take a look at the flink stateful functions playground[1] where you can 
find  a handful of examples, in all the supported
SDKs, in addition for each language you will find a walkthrough that shows how 
to use the individual SDK features.
Furthermore take a look at the documentations [2][3].


Generally, remote functions from different languages can be mixed, you can 
deploy a Python function and a Java function
that can send messages to each other using the built-in type system[4].



[1] https://github.com/apache/flink-statefun-playground/tree/release-3.2
[2] 
https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/http-endpoint/
[3] 
https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/deployment/overview/
[4] 
https://github.com/apache/flink-statefun-playground/blob/release-3.2/java/showcase/src/main/java/org/apache/flink/statefun/playground/java/showcase/part1/types/TypeSystemShowcaseFn.java




Good luck,
Igal.



On Tue, Feb 15, 2022 at 3:32 AM casel.chen  wrote:

Hello,
I am looking for polyglot example of stateful functions and learn how to 
program functions with different language then deploy them together as a unit 
of event driven application.
Examples like Fraud Detection which use python functions in ML while use java 
funtions to process data etc. Thanks!




 

Apache Flink - User Defined Functions - Exception when passing all arguments

2022-02-16 Thread M Singh
Hi:
I have a simple concatenate UDF (for testing purpose) defined as:
    public static class ConcatenateFunction extends ScalarFunction {        
public String eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object ... 
inputs) {            return Arrays.stream(inputs).map(i -> 
i.toString()).collect(                    Collectors.joining(","));        }    
}

and register it with the streaming table env:
        tEnv.createTemporarySystemFunction("concatenateFunction", 
ConcatenateFunction.class);

However when I call the function as shown below - I get an exception indicating 
that the '*' is  an unknown identifier as shown below.
        Table concat = tEnv.sqlQuery(                "SELECT  
concatenateFunction(*) " +                        "FROM test_table"        );

I am printing the rows at the end of the test application.
The exception is:
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL 
validation failed. At line 1, column 29: Unknown identifier '*'


The document (User-defined Functions) shows how to call the function with all 
args using scala/java :
env.from("MyTable").select(call(MyConcatFunction.class, $("*")));
But I could not find how to call the UDF using SQL syntax as shown above 
(select concatenateFunction(*) from test_table).
Can you please let me know if there a way to pass all arguments to a UDF in SQL 
 ?
Thanks

Re: Fwd: How to get memory specific metrics for tasknodes

2022-02-16 Thread Chesnay Schepler

It is currently not possible to select metrics.

What you can do however is create a custom reporter that wraps the 
StatsD reporter which does this filtering.


On 16/02/2022 17:41, Diwakar Jha wrote:


Hello,

Could someone please help! I'm trying  to publish only these three 
metrics per tasknode

Status.JVM.Memory.Heap.Used
Status.JVM.Memory.Heap.Committed
Status.JVM.Memory.NonHeap.Max

But, with my current setting I see all Flink metrics getting 
published. Please let me know if I need to provide any other information.


Thank you!


-- Forwarded message -
From: *Diwakar Jha* 
Date: Tue, Feb 15, 2022 at 1:31 PM
Subject: How to get memory specific metrics for tasknodes
To: user 


Hello,

I'm running Flink 1.11 on AWS EMR using the Yarn application. I'm 
trying to access memory metrics(Heap.Max, Heap.Used) per tasknode in 
CloudWatch. I have 50 tasknodes and it creates Millions of 
metrics(including per operator) though I need only a few metrics per 
tasknode (Heap.Max, Heap.Used). It is way too much than my current 
cloudwatch limit and I also don't need so many metrics.
Could someone please help me how to get only the tasknode memory 
specific metrics ?
I'm referring to this doc : 
https://nightlies.apache.org/flink/flink-docs-release-1.7/monitoring/metrics.html#memory


I used the following approach to enable Flink metrics.
1. Enable Flink Metrics
copy |/opt/flink-metrics-statsd-x.x.jar| into the |/lib| folder of 
your Flink distribution
2.  Add StatsD metric reporter in Flink-conf to send to CloudWatch 
Agent's StatsD interface

            metrics.reporters: stsd
            metrics.reporter.stsd.factory.class: 
org.apache.flink.metrics.statsd.StatsDReporterFactory

            metrics.reporter.stsd.host: localhost
            metrics.reporter.stsd.port: 8125
3. Setup tasknode scope
metrics.scope.tm : taskmanager
4. setup Cloudwatch agent to publish the metrics
"metrics":{
              "namespace": "CustomeNamespace/FlinkMemoryMetrics",
              "metrics_collected":{
                 "statsd":{
                    "service_address":":8125",
                    "metrics_collection_interval":60,
                    "metrics_aggregation_interval":300
                 }
              }
          },

Thanks!




Re: Implement watermark buffering with Process Function

2022-02-16 Thread David Anderson
I've done some work on this with Nico Kruber.

In our benchmarking, the performance loss (from not being able to use the
namespace) was roughly a factor of two, so it is significant. We prototyped
an API extension that addresses this particular concern but without
exposing the namespace directly, which I believe there is some reluctance
to do. I've been thinking of turning this into a FLIP, but it needs more
work first.

Another direction that could be explored would be to use finer-grained
timestamps. E.g., with nanosecond-precision timestamps the number of
colliding events will be dramatically smaller.

David

On Wed, Feb 16, 2022 at 10:17 PM David Anderson 
wrote:

> I'm afraid not. The DataStream window implementation uses internal APIs to
> manipulate the state backend namespace, which isn't possible to do with the
> public-facing API. And without this, you can't implement this as
> efficiently.
>
> David
>
> On Wed, Feb 16, 2022 at 12:04 PM Ruibin Xing  wrote:
>
>> Hi,
>>
>> I'm trying to implement customized state logic with KeyedProcessFunction.
>> But I'm not quite sure how to implement the correct watermark behavior when
>> late data is involved.
>>
>> According to the answer on stackoverflow:
>> https://stackoverflow.com/questions/59468154/how-to-sort-an-out-of-order-event-time-stream-using-flink
>> , there should be a state buffering all events until watermark passed the
>> expected time and a event time trigger will fetch from the state and do the
>> calculation. The buffer type should be Map> where T is the
>> timestamp and E is the event type.
>>
>> However, the interface provided by Flink currently is only a MapStae> V>. If the V is a List and buffered all events, every time an event
>> comes Flink will do ser/deser and could be very expensive when throughput
>> is huge.
>>
>> I checked the built-in window implementation which implements the
>> watermark buffering.  It seems that WindowOperator consists of some
>> InternalStates,  of which signature is where window is namespace or key, if
>> I understand correctly. But internal states are not available for Flink
>> users.
>>
>> So my question is: is there an efficient way to simulate watermark
>> buffering using process function for Flink users?
>>
>> Thanks.
>>
>