Re: Deduplicating record amplification

2021-01-27 Thread Rex Fenley
Ok, that sounds like it confirms my expectations. So I tried running my above code and had to slightly edit to using java Tuple2 because our execution environment stuff is all in Java. class CompactionAggregate extends AggregateFunction[ Tuple2[java.lang.Boolean, Row], Tuple2[java.lang.Boolean, R

Re: Deduplicating record amplification

2021-01-27 Thread Arvid Heise
Hi Rex, if your keyby (and with join/grouping/windowing) is random or not depends on the relationship of the join/grouping key with your Kafka partitioning key. Say your partitioning key is document_id. Then, any join/grouping key that is composed of (or is exactly) document_id, will retain the o

Re: memory tuning

2021-01-27 Thread Matthias Pohl
Thanks for sharing the logs. The configuration looks fine. Have you analyzed the memory usage? On Tue, Jan 26, 2021 at 5:02 PM Marco Villalobos wrote: > Yes, I will do that. > > PRODUCTION > > 2021-01-26 04:03:50,804 INFO org.apache.flink.yarn.YarnTaskExecutorRunner > [] - > ---

Re: Difference between table.exec.source.idle-timeout and setIdleStateRetentionTime ?

2021-01-27 Thread Dcosta, Agnelo (HBO)
Hi Dawid, Thanks for the tip on time constraint. We are using within in our MATCH_RECOGNIZE clause. It set to 3 minutes. Increase in checkpoint size problem still persists. Thanks for adding comments to FLINK-15160. I will take a look at changes you suggested. P.S. : I initially meant to ask wh

Re: JobManager seems to be leaking temporary jar files

2021-01-27 Thread Maciek Próchniak
Hi Chesnay, thanks for reply. I wonder if FLINK-21164 will help without FLINK-9844 - if the jar file is not closed, it won't be successfully deleted? As for FLINK-9844 - I understand that having code like if (userClassLoader instanceof Closeable) { ((Closeable) userClassloader).close() } i

Re: Deduplicating record amplification

2021-01-27 Thread Rex Fenley
In addition to those questions, assuming that keyed streams are in order, I've come up with the following solution to compact our records and only pick the most recent one per id before sending to the ES sink. The first item in the Row is the document ID / primary key which we want to compact reco

Is Flink able to parse strings into dynamic JSON?

2021-01-27 Thread Devin Bost
I'm wanting to know if it's possible in Flink to parse strings into a dynamic JSON object that doesn't require me to know the primitive type details at compile time. We have over 300 event types to process, and I need a way to load the types at runtime. I only need to know if certain fields exist o

Re: Converting non-mini-batch to mini-batch from checkpoint or savepoint

2021-01-27 Thread Rex Fenley
Thanks for the clarification. On Wed, Jan 27, 2021 at 7:24 PM Jark Wu wrote: > Hi Rex, > > Currently, it is not state compatible, because we will add a new node > called MiniBatchAssigner after the source which changes the JobGraph , thus > uid is different. > > Best, > Jark > > On Tue, 26 Jan 2

Re: A few questions about minibatch

2021-01-27 Thread Rex Fenley
Thanks, that all makes sense! On Wed, Jan 27, 2021 at 7:00 PM Jark Wu wrote: > Hi Rex, > > Could you share your query here? It would be helpful to identify the root > cause if we have the query. > > 1) watermark > The framework automatically adds a node (the MiniBatchAssigner) to > generate wate

Re: Converting non-mini-batch to mini-batch from checkpoint or savepoint

2021-01-27 Thread Jark Wu
Hi Rex, Currently, it is not state compatible, because we will add a new node called MiniBatchAssigner after the source which changes the JobGraph , thus uid is different. Best, Jark On Tue, 26 Jan 2021 at 18:33, Dawid Wysakowicz wrote: > I am pulling in Jark and Godfrey who are more familiar

Re: Conflicts between the JDBC and postgresql-cdc SQL connectors

2021-01-27 Thread Jark Wu
Hi Sebastián, I think Dawid is right. Could you share the pom file? I also tried to package flink-connector-postgres-cdc with ServicesResourceTransformer, and the Factory file contains com.alibaba.ververica.cdc.connectors.postgres.table.PostgreSQLTableFactory Best, Jark On Tue, 26 Jan 2021 a

Re: Stateful Functions - accessing the state aside of normal processing

2021-01-27 Thread Tzu-Li (Gordon) Tai
Hi Stephan, Great to hear about your experience with StateFun so far! I think what you are looking for is a way to read StateFun checkpoints, which are basically an immutable consistent point-in-time snapshot of all the states across all your functions, and run some computation or simply to explo

importing types doesn't fix “could not find implicit value for evidence parameter of type …TypeInformation”

2021-01-27 Thread Devin Bost
I posted this problem on Stack Overflow here: https://stackoverflow.com/questions/65930023/flink-importing-types-doesnt-fix-could-not-find-implicit-value-for-evidence Basically, I can't even get a basic map to work like this: object AmplitudeExample { def main(args: Array[String]) { import

Re: A few questions about minibatch

2021-01-27 Thread Jark Wu
Hi Rex, Could you share your query here? It would be helpful to identify the root cause if we have the query. 1) watermark The framework automatically adds a node (the MiniBatchAssigner) to generate watermark events as the mini-batch id to broadcast and trigger mini-batch in the pipeline. 2) Min

Flink and Amazon EMR

2021-01-27 Thread Marco Villalobos
Just curious, has anybody had success with Amazon EMR with RocksDB and checkpointing in S3? That's the configuration I am trying to setup, but my system is running more slowly than expected.

stopping with save points

2021-01-27 Thread Marco Villalobos
When I try to stop with a savepoint, I usually get the error below. I have not been able to create a single save point. Please advise. I am using Flink 1.11.0 Draining job "ed51084378323a7d9fb1c4c97c2657df" with a savepoint. The progr

Flink 1.11.2 test cases fail with Scala 2.12.12

2021-01-27 Thread Sourabh Mokhasi
Hi, We have several flink applications written with Flink 1.9.1 and Scala 2.11.12 and we are in the process of upgrading to Flink 1.11.2 and Scala 2.12.12. We are using maven to manage our application dependencies. After updating the pom.xml file to use the upgraded versions of Scala and Flink as

Integration with Apache AirFlow

2021-01-27 Thread Flavio Pompermaier
Hello everybody, is there any suggested way/pointer to schedule Flink jobs using Apache AirFlow? What I'd like to achieve is the submission (using the REST API of AirFlow) of 2 jobs, where the second one can be executed only if the first one succeed. Thanks in advance Flavio

Re: Deduplicating record amplification

2021-01-27 Thread Rex Fenley
This is great info, thanks! My question then becomes, what constitutes a random shuffle? Currently we're using the Table API with minibatch on flink v1.11.3. Do our joins output a keyed stream of records by join key or is this random? I imagine that they'd have to have a key for retracts and accum

Re: Deduplicating record amplification

2021-01-27 Thread Arvid Heise
Hi Rex, indeed these two statements look like they contradict each other, but they are looking at both sides from the same coin. Flink is simply putting records in FIFO in windows. That is, there is no ordering on event time if there are late events. So if your elements arrive ordered, the orderin

Flink running in k8s pod - pod is able to access S3 bucket, flink does not

2021-01-27 Thread Oran Shuster
So i'm really stumped on this for a couple of days now Some general info - Flink version 1.12.1, using k8s HA service. The k8s is self managed on AWS our checkpoints and savepoints are on s3, i created a new bucket just for it and set the proper permissions to the k8s node The job manager is wor

How to maintain output order of events by execution initiation time.

2021-01-27 Thread narasimha
Hi, Below is my dataflow DataStream stream ... stream.process(new ProcessFunction()) .sink(...) class ProcessFunction ...{ MapState time ...; processElement(...){ //add Element to Mapstate by eventtime // register eventime+60 seconds } // Reason for maintainin

TaskManager crash. Zookeeper timeout

2021-01-27 Thread Colletta, Edward
Using flink 11.2 on java 11, session cluster with 16 jobs running on aws ecs instances. Cluster has 3 JMs and 3 TMs, separate zookeeper cluster has 3 nodes. One of our taskmanagers crashed today with what seems to be rooted in a zookeeper timeout. We are wondering if there is any tuning that

Re: Timers not firing until stream end

2021-01-27 Thread Chesnay Schepler
Actually, if the parallelism is 1 then it works as it should. sigh On 1/27/2021 6:52 PM, Chesnay Schepler wrote: Note that while this does fix the issue of timers not firing while the job is running, it seems to be firing too many timers... On 1/27/2021 6:49 PM, Chesnay Schepler wrote: My

Re: Timers not firing until stream end

2021-01-27 Thread Chesnay Schepler
Note that while this does fix the issue of timers not firing while the job is running, it seems to be firing too many timers... On 1/27/2021 6:49 PM, Chesnay Schepler wrote: My bad, I was still using the custom WatermarkStrategy that emits a watermark for each event. .assignTimestampsAndWaterma

Re: Timers not firing until stream end

2021-01-27 Thread Chesnay Schepler
My bad, I was still using the custom WatermarkStrategy that emits a watermark for each event. .assignTimestampsAndWatermarks( new WatermarkStrategy() { @Override public WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new BoundedOutOfOrdernessWa

Re: Timers not firing until stream end

2021-01-27 Thread Pilgrim Beart
I am calling env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); I am using Flink 11.1 (because I need to run on AWS Kinesis Data Analytics). -Pilgrim -- Learn more at https://devicepilot.com @devicepilot

Proctime consistency

2021-01-27 Thread Rex Fenley
Hello, I'm looking at ways to deduplicate data and found [1], but does proctime get committed with operators? How does this work against clock skew on different machines? Thanks [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication -- Rex Fe

Re: Timers not firing until stream end

2021-01-27 Thread Aljoscha Krettek
On 2021/01/27 15:09, Chesnay Schepler wrote: Put another way, if you use any of the built-in WatermarkGenerators and use event-time, then it appears that you *must* set this interval. This behavior is...less than ideal I must admit, and it does not appear to be properly documented. Setting t

Re: Timers not firing until stream end

2021-01-27 Thread Pilgrim Beart
Chesnay, Thanks for this - I've made the change you suggested (setAutoWatermarkInterval) but it hasn't changed the behaviour - timers still get processed only on stream end. I have pushed a new version, with this change, and also emitting some information in a .log field. If you search for "!!!" in

Stateful Functions - accessing the state aside of normal processing

2021-01-27 Thread Stephan Pelikan
Hi, We are trying to use Statefuns for our tool and it seems to be a good fit. I already adopted it and it works quite well. However, we have millions of different states (all the same FunctionType but different ids) and each state consists of several @Persisted values (values and tables). We w

Re: rocksdb block cache usage

2021-01-27 Thread Yun Tang
Hi, If you have enabled managed memory, and since all rocksDB instances share the same block cache within one slot, all flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_pinned_usage in the same slot would report the same value. Best Yun Tang

Re: Datadog reporter timeout & OOM issue

2021-01-27 Thread Xingcan Cui
Hi Juha and Chesnay, I do appreciate your prompt responses! I'll also continue to investigate this issue. Best, Xingcan On Wed, Jan 27, 2021, 04:32 Chesnay Schepler wrote: > (setting this field is currently not possible from a Flink user > perspective; it is something I will investigate) > > >

Re: Timers not firing until stream end

2021-01-27 Thread Chesnay Schepler
|| You were right that it is an issue with the watermarks; outside of the when the job was stopped they were never emitted downstream, so no timer was ever triggered. It appears that you need to set the setAutoWatermarkInterval in the ExecutionConfig via env.getConfig().setAutoWatermarkInte

Flink sql problem

2021-01-27 Thread ?g???U?[????
Hi all           After grouping by users, message A arrives. If message B also arrives later, and the time of message B is less than that of message A within 10 minutes, mark the field in message A with Tag = True. How to achieve this? Thanks Jiazhi

Re: Flink Job Manager & Task Manager heap size

2021-01-27 Thread Chesnay Schepler
Generally I see 2 options: a) There's a memory leak somewhere. It would be good to know how the baseline heap usage during idleness evolves over time. Are the same 20 jobs running continuously or are they (or others) periodically re-submitted? b) The JVM just doesn't feel like running garbage

Re: rocksdb block cache usage

2021-01-27 Thread Chesnay Schepler
I don't quite understand the question; all 3 metrics you listed are the same one? On 1/27/2021 9:23 AM, ?? wrote: hi, all ?0?2 ?0?2I've enable state.backend.rocksdb.metrics.block-pinned-usage metric , ?0?2and the flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_pinne

Re: Timers not firing until stream end

2021-01-27 Thread Chesnay Schepler
Based on your description you aren't doing anything obviously wrong. Would it be possible for you to share the code with us? On 1/27/2021 1:02 PM, Pilgrim Beart wrote: A newbie question: I've created a basic Flink DataStream job for an IoT use-case, with file source and sink for testing. I ke

Timers not firing until stream end

2021-01-27 Thread Pilgrim Beart
A newbie question: I've created a basic Flink DataStream job for an IoT use-case, with file source and sink for testing. I key by device ID, then in a ProcessFunction set an EventTime Timer to fire if a device falls silent, i.e. a timeout, which I cancel if another message arrives from that device

Re: JobManager seems to be leaking temporary jar files

2021-01-27 Thread Chesnay Schepler
The problem of submitted jar files not being closed is a known one: https://issues.apache.org/jira/browse/FLINK-9844 IIRC it's not exactly trivial to fix since class-loading is involved. It's not strictly related to the REST API; it also occurs in the CLI but is less noticeable since jars are us

Re: Flink upgrade to Flink-1.12

2021-01-27 Thread Aljoscha Krettek
I'm afraid I also don't know more than that. But I agree with Ufuk that it should just work. I think the best way would be to try it in a test environment and then go forward with upgrading the production jobs/cluster. Best, Aljoscha On 2021/01/25 18:59, Ufuk Celebi wrote: Thanks for reachi

Overhead when using map state

2021-01-27 Thread Lasse Nedergaard
Hi We use Rocksdb for storing state and run on Flink 1.10. We have followed best practices and used map state instead of a map in value state. We have seen problems with OOM exceptions and investigated it be creating a job with n numbers of key by where each key had a map either stored in map

Re: Datadog reporter timeout & OOM issue

2021-01-27 Thread Chesnay Schepler
(setting this field is currently not possible from a Flink user perspective; it is something I will investigate) On 1/27/2021 10:30 AM, Chesnay Schepler wrote: Yes, I could see how the memory issue can occur. However, it should be limited to buffering 64 requests; this is the default limit t

Re: Datadog reporter timeout & OOM issue

2021-01-27 Thread Chesnay Schepler
Yes, I could see how the memory issue can occur. However, it should be limited to buffering 64 requests; this is the default limit that okhttp imposes on concurrent calls. Maybe lowering this value already does the trick. On 1/27/2021 5:52 AM, Xingcan Cui wrote: Hi all, Recently, I tried to

Re: Seeing Rocks Native Metrics in Data Dog

2021-01-27 Thread Chesnay Schepler
AFAIK all IDs (and in fact all variables except ) are exposed as tags. (the is transmitted separately and I would've though Datadog automatically provides similar functionality for it). On 1/27/2021 2:11 AM, Rex Fenley wrote: Oddly, I'm seeing them now. I'm not sure what has changed. Fwiw, we

Re: Initializing broadcast state

2021-01-27 Thread Wei Jiang
Hi guys, i meet the same question, but i use a different way to init: ``` val list = ... //i use jdbc to get the init data val dimensionInitStream = env.fromCollection(list) //the main stream and the `dimensionStream` is a stream from flink cdc val dimension = dimensionStream.union(dimensionInit

Re: Difference between table.exec.source.idle-timeout and setIdleStateRetentionTime ?

2021-01-27 Thread Dawid Wysakowicz
Hey, As for the MATCH_RECOGNIZE clause, I highly recommend applying a time constraint[1]. The idle state retention time does not apply to the MATCH_RECOGNIZE, but you can think of the time constraint as something similar, but it is closer to the actual query logic. If you are hitting FLINK-15160

rocksdb block cache usage

2021-01-27 Thread ??????
hi, all    I've enable state.backend.rocksdb.metrics.block-pinned-usage metric ,   and the flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_pinned_usage metric exposed.   I'm confused  that the total memory used for block cache pinned is sum of flink_taskmanager_job_task