Re: How to reprocess certain events in Flink?

2019-12-17 Thread Rafi Aroch
Hi Pooja, Here's an implementation from Jamie Grier for de-duplication using in-memory cache with some expiration time: https://github.com/jgrier/FilteringExample/blob/master/src/main/java/com/dataartisans/DedupeFilteringJob.java If for your use-case you can limit the time period where duplicatio

Re: How to reprocess certain events in Flink?

2019-12-17 Thread Pooja Agrawal
Hey, I am sorry for the confusion. So, the value is not already present in the event. We are reading it from a static table (kind of data enrichment in flink job). Above event is an enriched event. If we say that this is some transaction event, the user would have done the transaction once and hen

Re: RichAsyncFunction Timeout

2019-12-17 Thread Biao Liu
Hi Polarisary, It's hard to tell what happened without further detail. Just some guesses. 1. Have you completed the "resultFuture" in "asyncInvoke"? Asking this is because there is only a part of "asyncInvoke" implementation, I can't see the completion part. 2. The capacity (10) of async waiting q

Re: RichAsyncFunction Timeout

2019-12-17 Thread vino yang
Hi Polarisary, IMO, firstly, it would be better to monitor the OS and Flink/HBase metrics. For example: - Flink and HBase cluster Network I/O metrics; - Flink TM CPU/Memory/Backpressure metrics and so on; You can view these metrics to find some potential reasons. If you can not figure it

RichAsyncFunction Timeout

2019-12-17 Thread Polarisary
Hi ALL, When I use RichAsyncFunction read data from hbase, it always timeout after a few minutes. but the hbase connection is not close, it also can get data in the override method timeout. Following is the code, does somebody know why trigger timeout. ==

Re: Different jobName per Job when reporting Flink metrics to PushGateway

2019-12-17 Thread Zhu Zhu
Hi Sidney, "metrics.reporter.promgateway.jobName" is a Flink cluster wide config, so you will need to set it in flink-conf.yaml before launching the Flink cluster. An alternative is to use -D(or -yD for yarn) params to override the config when running a command to launch the Flink session cluster

Re: MiniCluster with ProcessingTimeTrigger

2019-12-17 Thread Biao Liu
Hi John, The root cause is the collection source exits too fast. The window would also exit without being triggered. You could verify that by waiting a second before releasing the window. For example, insert a map operator between source and window operator. Blocking a second or more in the "clos

Re: How to reprocess certain events in Flink?

2019-12-17 Thread Zhu Zhu
Hi Pooja, I'm a bit confused since in 1) it says that "If two events have same transaction_id, we can say that they are duplicates", and in 2) it says that "Since this is just a value change, the transaction_id will be same". Looks to me they are conflicting. Usually in case 2) scenarios, the valu

Re: Questions about taskmanager.memory.off-heap and taskmanager.memory.preallocate

2019-12-17 Thread vino yang
Hi Ethan, Share two things: - I have found "taskmanager.memory.preallocate" config option has been removed in the master codebase. - After researching git history, I found the description of " taskmanager.memory.preallocate" was written by @Chesnay Schepler (from 1.8 branch). So

Re: Questions about taskmanager.memory.off-heap and taskmanager.memory.preallocate

2019-12-17 Thread Ethan Li
I didn’t realize we was not chatting in the mailing list :) I think it’s wrong because it kind of says full GC is triggered by reaching MaxDirecMemorySize. > On Dec 16, 2019, at 11:03 PM, Xintong Song wrote: > > Glad that helped. I'm also posting this conversation to the public mailing > li

Re: Thread access and broadcast state initialization in BroadcastProcessFunction

2019-12-17 Thread KristoffSC
Thank you for your reply Timo. Regarding point 2. I'm sorry for the delay. I rerun my test and everything seems to be in order. Open method was called as first. I guess it was a false alarm. Sorry for that. Regards, Krzysztof -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.

MapState with List Type for values

2019-12-17 Thread Aaron Langford
Hello Flink Community, I have a question about using MapState with lists as values. Here is a description of the use case: I have an operator over a keyed stream where for each record that comes, it needs to look into some state to determine if a value has arrived or not. If the value has not arr

Keyed stream, parallelism, load balancing and ensuring that the same key go to the same Task Manager and task slot

2019-12-17 Thread KristoffSC
Hi community, I'm trying to build a PoC pipeline for my project and I have few questions regarding load balancing between task managers and ensuring that keyed stream events for the same key will go to the same Task Manager (hence the same task slot). Lets assume that we have 3 task managers, 3 t

MiniCluster with ProcessingTimeTrigger

2019-12-17 Thread John Morrow
Hi All, I'm trying to test a pipeline that consists of two Flink tasks with a MiniCluster. The 1st task has a WindowAll operator which groups items into batches every second, and the 2nd task does an async operation with each batch and flatMaps the result. I've whittled it down to the bare bon

Restore metrics on broadcast state after restart

2019-12-17 Thread Gaël Renoux
Hi everyone I have an KeyedBroadcastProcessFunction with a broadcast state (a bunch of rules), and I have set up a few gauge metrics on that state (things such as number of known rules and timestamp of the last rule received). However, I have on an issue when the server restarts from a checkpoint

Different jobName per Job when reporting Flink metrics to PushGateway

2019-12-17 Thread Sidney Feiner
I'm using Flink 1.9.1 with PrometheusPushGateway to report my metrics. The jobName the metrics are reported with is defined in the flink-conf.yaml file which makes the jobName identical for all jobs who run on the cluster, but I want a different jobName to be reported for every running job. To d

Flink x GATE Integration

2019-12-17 Thread Austin Cawley-Edwards
Hi all, GATE (General Architecture for Text Engineering)[1] is a text processing framework that runs on the JVM. Cross-posting from the GATE community.[2] I'm wondering if anyone has tried to integrate GATE with Flink and, if so, how successful has the integration been? I'm completely new to GAT

Fwd: How to reprocess certain events in Flink?

2019-12-17 Thread Pooja Agrawal
Hi, I have a use case where we are reading events from kinesis stream.The event can look like this Event { event_id, transaction_id key1, key2, value, timestamp, some other fields... } We want to aggregate the values per key for all events we have seen till now (as simple as "select

Re: S3A "Data read has a different length than the expected" issue root cause

2019-12-17 Thread Kostas Kloudas
Thanks a lot for reporting this! I believe that this can be really useful for the community! Cheers, Kostas On Tue, Dec 17, 2019 at 1:29 PM spoganshev wrote: > > In case you experience an exception similar to the following: > > org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException:

S3A "Data read has a different length than the expected" issue root cause

2019-12-17 Thread spoganshev
In case you experience an exception similar to the following: org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Data read has a different length than the expected: dataLength=53562; expectedLength=65536; includeSkipped=true; in.getClass()=class org.apache.flink.fs.s3base.shaded.c

Re: Flink slot utilization

2019-12-17 Thread Robert Metzger
Hi, 1) By default, Flink's Kafka connector is polling data from Kafka every 100ms. There's a configuration key "flink.poll-timeout" to change the frequency. I don't have experience with these internal log messages from Kafka, but since they are on INFO level (and if you don't see any unexpected da

Re: Scala ListBuffer cannot be used as a POJO type in Flink

2019-12-17 Thread Andrey Zagrebin
Hi Utopia, There were already couple of hints in comments to your stack overflow questions about immutability. In general, I would also recommend this because when you work with Flink state the general API contract is that if you update the your state object (schoolDescriptor) you have to call

Re: Flink slot utilization

2019-12-17 Thread Andrés Garagiola
Thanks Roberts, About your questions, I don't have yet a real estimation regarding the number of records received by the pipeline but I guess that the pipeline could be idle for several minutes (I don't think that for hours). My concern comes to me from two aspects: 1) I saw multiple lines in

Re: sink type error in scala

2019-12-17 Thread Timo Walther
Hi Fanbin, I think you are mixing different APIs together. We have a Scala and Java version of both DataStream and Table API. The error message indicates that `toRetractStream` is called on a Java Table API class because it returns org.apache.flink.api.java.tuple.Tuple2 but your sink is imple

Re: [ANNOUNCE] Weekly Community Update 2019/50

2019-12-17 Thread Hequn Cheng
Cool. I will do it in the next three weeks. Thanks a lot for your continued great work! Best, Hequn On Tue, Dec 17, 2019 at 6:16 PM Konstantin Knauf wrote: > Hi Hequn, > > thanks, and thanks for the offer. Of course, you can cover the holiday > break, i.e. the next three weeks. Looking forward

Re: [ANNOUNCE] Weekly Community Update 2019/50

2019-12-17 Thread Konstantin Knauf
Hi Hequn, thanks, and thanks for the offer. Of course, you can cover the holiday break, i.e. the next three weeks. Looking forward to your updates! Cheers, Konstantin On Mon, Dec 16, 2019 at 5:53 AM Hequn Cheng wrote: > Hi Konstantin, > > Happy holidays and thanks a lot for your great job on

Re: Flink slot utilization

2019-12-17 Thread Robert Metzger
Hi Andrés, sorry for the late reply. 1. The slots are released, when the streaming pipeline ends. In principle, it is not a problem when a slot is allocated, even when not processing any incoming messages. So you are not doing something wrong. How many records do you receive per pipeline? (are the