Fraud detection demo with Flink 1.14

2022-02-14 Thread Pramit Vamsi
Hi, Problem: Watermark does not move within Dynamic Alert Function Implementing ideas (as is) from this article - https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html Code: https://github.com/afedulov/fraud-detection-demo Pipeline: Kafka -> Dynamic Key Function -> Dynamic Alert Func

Re: TM OOMKilled

2022-02-14 Thread Xintong Song
Hi Alexey, You may want to double check if `state.backend.rocksdb.memory.managed` is configured to `true`. (This should be `true` by default.) Another question that may or may not be related. I noticed that you have configured 128MB task off-heap memory, which IIRC the default should be 0. Could

Re: TM OOMKilled

2022-02-14 Thread Alexey Trenikhun
Hello, We use RocksDB, but there is no problem with Java heap, which is limited by 3.523gb, the problem with total container memory. The pod is killed not due OutOfMemoryError, but because total container memory exceeds 10gb Thanks, Alexey From: Caizhi Weng Sen

Re: Synchronization across tasks using checkpoint barriers

2022-02-14 Thread Gopi Krishna M
CheckpointedFunction docs mention the following - > The snapshotState(FunctionSnapshotContext) >

Re: There Is a Delay While Over Aggregation Sending Results

2022-02-14 Thread Caizhi Weng
Hi! Did you define watermark on ts? If yes the result will be produced only after the watermark exceeds its row time, thus causing the delay. See [1] for detail. [1] https://github.com/apache/flink/blob/ab70dcfa19827febd2c3cdc5cb81e942caa5b2f0/flink-table/flink-table-runtime/src/main/java/org/apa

Re: TM OOMKilled

2022-02-14 Thread Caizhi Weng
Hi! Heap memory usage depends heavily on your job and your state backend. Which state backend are you using and if possible could you share your user code or explain what operations your job is doing? Alexey Trenikhun 于2022年2月15日周二 05:17写道: > Hello, > We run Flink 1.13.5 job in app mode in Kube

[statefun] Looking for a polyglot example

2022-02-14 Thread casel.chen
Hello, I am looking for polyglot example of stateful functions and learn how to program functions with different language then deploy them together as a unit of event driven application. Examples like Fraud Detection which use python functions in ML while use java funtions to process data etc. T

Re:Re: [statefun] Add new state failure in Greetings example

2022-02-14 Thread casel.chen
Oh, yes. I missed to add my newly defined ValueSpec as parameter to withValueSpec() method. After placed it works, thank you Igal At 2022-02-15 01:42:39, "Igal Shilman" wrote: Hello, Make sure that you have added the state when creating the function spec like in this example[1]

Re: [External] : Re: Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?

2022-02-14 Thread Fuyao Li
Hi Yun, Please ignore my question 2. I think the Sink part is the decisive factor to ensure end to end exactly once. If I want to implement a AT LEAST ONCE sink, which interface should I implement? Maybe RichSinkFunction

Re: Flink High-Availability and Job-Manager recovery

2022-02-14 Thread Koffman, Noa (Nokia - IL/Kfar Sava)
Hi, thanks for your reply, it was very helpful. we tried to go with the 2nd approach, enabling HA mode, and added these conf values: high-availability: zookeeper high-availability.zookeeper.quorum: zk-noa-edge-infra:2181 high-availability.zookeeper.path.root: /flink high-availabil

TM OOMKilled

2022-02-14 Thread Alexey Trenikhun
Hello, We run Flink 1.13.5 job in app mode in Kubernetes, 1 JM and 1 TM, we also have Kubernetes cron job which takes savepoint every 2 hour (14 */2 * * *), once in while (~1 per 2 days) TM is OOMKilled, suspiciously it happens on even hours ~4 minutes after savepoint start (e.g. 12:18, 4:18) bu

Re: Flink 1.12.x DataSet --> Flink 1.14.x DataStream

2022-02-14 Thread saravana...@gmail.com
Hi Niklas, Thanks for your reply. Approach [1] works only if operators are chained (in order words, operators executed within the same task). Since mapPartition operator parallelism is different from previous operator parallelism, it doesn't fall under the same task(or not chained) . https://

Re: [statefun] Add new state failure in Greetings example

2022-02-14 Thread Igal Shilman
Hello, Make sure that you have added the state when creating the function spec like in this example[1] If that wasn't it, can you send your UserFn? [1] https://github.com/apache/flink-statefun-playground/blob/release-3.2/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/

Re: How to access Task.isBackPressured() from a SourceFunction?

2022-02-14 Thread Niklas Semmler
Hi Darren, No, you cannot access the Task from the operator. You can access some metrics via the RuntimeContext. getRuntimeContext().getMetricGroup() How does the backpressure help you here? Backpressure can originate in any operator or network connection. If it's an operator further downstre

Re: Synchronization across tasks using checkpoint barriers

2022-02-14 Thread Gopi Krishna M
On Mon, Feb 14, 2022 at 10:00 PM Niklas Semmler wrote: > So, you want to send basically the last message before the barrier? > Yes. > > Can you not instead send the first message after the barrier? From a first > glance this sounds easier. > I'm not sure if this will help me synchronize the sin

Unit test harness for Sources

2022-02-14 Thread James Sandys-Lumsdaine
Hi all, I've been using the test harness classes to unit test my stateful 1 and 2 stream functions. But I also have some stateful legacy Source classes I would like to unit test and can't find any documentation or example for that - is this possible? Thanks, James.

Re: Flink 1.12.x DataSet --> Flink 1.14.x DataStream

2022-02-14 Thread Niklas Semmler
Hi Saravanan, AFAIK the last record is not treated differently. Does the approach in [1] not work? Best regards, Niklas https://github.com/dmvk/flink/blob/2f1b573cd57e95ecac13c8c57c0356fb281fd753/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java#L279

Re: How to proper hashCode() for keys.

2022-02-14 Thread John Smith
Hi, I get that but I want to output that key so I can store it in Elastic grouped by the minute. I had explained with data examples above. But just to be sure Lets pretends the current WALL time is 2022-02-14T11:38:01.123Z and I get the bellow clicks event time here (ignored/not read)|cnn.co

"No operators defined in streaming topology" error when Flink app still starts successfully

2022-02-14 Thread Shane Bishop
Hi all, My team has started seeing the error "java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute." However, even with this error, the Flink application starts and runs fine, and the Flink job renders fine in the Flink Dashboard. Attached is the full sta

Re: table api watermarks, timestamps, outoforderness and head aches

2022-02-14 Thread Francesco Guardiani
Yep every operator usually cleans state of records past a received watermark On Mon, Feb 14, 2022 at 4:03 PM HG wrote: > Will keys that are out dated disappear? > > It is infact a kind of sessions window that can start at any time. > Constantly new keys will appear. > > > > > > > On Mon, Feb 14,

Re: unpredictable behaviour on KafkaSource deserialisation error

2022-02-14 Thread Niklas Semmler
Hi Frank, This sounds like an interesting issue. Can you share a minimal working example? Best regards, Niklas > On 9. Feb 2022, at 23:11, Frank Dekervel wrote: > > Hello, > > When trying to reproduce a bug, we made a DeserialisationSchema that throws > an exception when a malformed message

Re: Synchronization across tasks using checkpoint barriers

2022-02-14 Thread Niklas Semmler
So, you want to send basically the last message before the barrier? Can you not instead send the first message after the barrier? From a first glance this sounds easier. Can you share what you are trying to accomplish? Best regards, Niklas > On 14. Feb 2022, at 17:04, Gopi Krishna M wrote: >

Re: select($("transactionId)) : cannot find symbol symbol: method $(java.lang.String)

2022-02-14 Thread Francesco Guardiani
Yep that should do it, perhaps are you willing to contribute to that docs page adding the import? :) On Mon, Feb 14, 2022 at 4:34 PM HG wrote: > The static was missing 😑😑😑 > > import static org.apache.flink.table.api.Expressions.*; > > > > > Op ma 14 feb. 2022 om 15:45 schreef Francesco Guardian

Re: Need help on implementing custom`snapshotState` in KafkaSource & KafkaSourceReader

2022-02-14 Thread Niklas Semmler
Hi Santosh, It’s best to avoid cross-posting. Let’s keep the discussion to SO. Best regards, Niklas > On 12. Feb 2022, at 16:39, santosh joshi wrote: > > We are migrating to KafkaSource from FlinkKafkaConsumer. We have disabled > auto commit of offset and instead committing them manually to s

Statefun with no Protobuf ingress and egress

2022-02-14 Thread mrAlexTFB
Hi, I have a very simple schema where one python statefun application reads from a kafka topic and writes in another kafka topic, those topics are produced and consumed with another python script as it is done in the Python Flink Walkthrough

Re: Synchronization across tasks using checkpoint barriers

2022-02-14 Thread Gopi Krishna M
Thanks Niklas! This helps with synchronizing uploads across partitioned tasks. The next step is to pass the handle to this upload to the sink which should be part of the same checkpoint. Is it possible to do the following: 1. Keep reducing the events to keyedStore. 2. On snapshotState: upload the

Re: select($("transactionId)) : cannot find symbol symbol: method $(java.lang.String)

2022-02-14 Thread HG
The static was missing 😑😑😑 import static org.apache.flink.table.api.Expressions.*; Op ma 14 feb. 2022 om 15:45 schreef Francesco Guardiani < france...@ververica.com>: > > symbol: method $(java.lang.String) > > location: class esl.job.cag.verwerkingstijden.CagVerwerkingsTijden > > What is

Save app-global cache used by RichAsyncFunction to Flink State?

2022-02-14 Thread Clayton Wohl
Is there any way to save a custom application-global cache into Flink state so that it is used with checkpoints + savepoints? This cache is used by a RichAsyncFunction that queries an external database, and RichAsyncFunction doesn't support the Flink state functionality directly. I asked this last

How to cogroup multiple streams?

2022-02-14 Thread Will Lauer
OK, here's what I hope is a stupid question: what's the most efficient way to co-group more than 2 DataStreams together? I'm looking at porting a pipeline from pig to flink, and in a couple of places I use Pig's COGROUP functionality to simultaneously group 3 or 4 and sometimes even more datasets o

Re: How to proper hashCode() for keys.

2022-02-14 Thread Ali Bahadir Zeybek
Hello John, That is what exactly the window operator does for you. Can you please check the documentation[1] and let us know what part of the window operator alone does not suffice for the use case? Sincerely, Ali [1]: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastre

Re: table api watermarks, timestamps, outoforderness and head aches

2022-02-14 Thread HG
Will keys that are out dated disappear? It is infact a kind of sessions window that can start at any time. Constantly new keys will appear. On Mon, Feb 14, 2022, 15:57 Francesco Guardiani wrote: > Hi, > > - bounded out of orderness: This means that you have essentially a stream > where eve

Re: How to proper hashCode() for keys.

2022-02-14 Thread John Smith
Because I want to group them for the last X minutes. In this case last 1 minute. On Mon, Feb 14, 2022 at 10:01 AM Ali Bahadir Zeybek wrote: > Hello John, > > Then may I ask you why you need to use a time attribute as part of your > key? > Why not just key by the fields like `mydomain.com` and `s

Re: How to proper hashCode() for keys.

2022-02-14 Thread Ali Bahadir Zeybek
Hello John, Then may I ask you why you need to use a time attribute as part of your key? Why not just key by the fields like `mydomain.com` and `some-article` in your example and use only window operator for grouping elements based on time? Sincerely, Ali On Mon, Feb 14, 2022 at 3:55 PM John Sm

Re: select($("transactionId)) : cannot find symbol symbol: method $(java.lang.String)

2022-02-14 Thread HG
That's the main function. I have no idea what imports are missing. I am able to use the SQL API for the table. When I switch to .select (()) where(()) groupBy(()) I get this error. On Mon, Feb 14, 2022, 15:45 Francesco Guardiani wrote: > > symbol: method $(java.lang.String) > > location

Re: table api watermarks, timestamps, outoforderness and head aches

2022-02-14 Thread Francesco Guardiani
Hi, - bounded out of orderness: This means that you have essentially a stream where events can come late of a certain amount of time, compared to the "newest" event received. For example, with a bounded out of orderness of 5 minutes, you essentially say to Flink that your stream can receive an eve

Re: How to proper hashCode() for keys.

2022-02-14 Thread John Smith
Hi, thanks. As previously mentioned, processing time. So I regardless when the event was generated I want to count all events I have right now (as soon as they are seen by the flink job). On Mon, Feb 14, 2022 at 4:16 AM Ali Bahadir Zeybek wrote: > Hello John, > > Currently you are grouping the e

Re: select($("transactionId)) : cannot find symbol symbol: method $(java.lang.String)

2022-02-14 Thread Francesco Guardiani
> symbol: method $(java.lang.String) > location: class esl.job.cag.verwerkingstijden.CagVerwerkingsTijden What is esl.job.cag.verwerkingstijden.CagVerwerkingsTijden? Sounds like a bad import? Also, have you checked you have Flink deps aligned? On Mon, Feb 14, 2022 at 3:17 PM HG wrote: > >

Re: Joining Flink tables with different watermark delay

2022-02-14 Thread Francesco Guardiani
Hi, So my understanding of your query is that you want to do a join first, and then group by a 60 minutes distance and aggregate them. Please correct me if I'm wrong. First of all, the query you've posted is incorrect and should fail, as its plan is invalid because it's using a regular join. Regu

Re: Synchronization across tasks using checkpoint barriers

2022-02-14 Thread Niklas Semmler
Hi Gopi, You can implement CheckpointedFunction and use the method snapshotState(FunctionSnapshotContext) to upload state on each checkpoint. https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.html Make sure, you d

select($("transactionId)) : cannot find symbol symbol: method $(java.lang.String)

2022-02-14 Thread HG
Hi, When I do : Table counts = t .groupBy($("transactionId")) .select($("transactionId"), $("handlingTime").sum().as("summedhandlingTime")); The code below fails with : cannot find symbol .select($("transactionId"), $("handlingTime").sum()

Re: How to proper hashCode() for keys.

2022-02-14 Thread Ali Bahadir Zeybek
Hello John, Currently you are grouping the elements two times based on some time attribute, one while keying - with event time - and one while windowing - with processing time. Therefore, the windowing mechanism produces a new window computation when you see an element with the same key but arrive