Re: What causes a buffer pool exception? How can I mitigate it?

2021-01-26 Thread Arvid Heise
Hi Marco, could you share your full task manager and job manager log? We double-checked and saw that the buffer pool is only released on cancellation or shutdown. So I'm assuming that there is another issue (e.g., Kafka cluster not reachable) and there is a race condition while shutting down. It

Re: Datadog reporter timeout & OOM issue

2021-01-26 Thread Juha Mynttinen
Hey, A few months back, I had a very similar problem with Datadog when I tried to do a proof of concept using it with Flink. I had quite a lot of user defined metrics. I got similar exceptions and the metrics didn't end up in Datadog. Without too much deeper analysis, I assumed Datadog was throttl

Re: How User-Defined AggregateFunctions handle deletes of all aggregated rows.

2021-01-26 Thread Rex Fenley
Hello, I still don't have a good understanding of how UDAF in the Table API handles deletes. If every row aggregated into one groupBy(key) gets a retract, meaning nothing should be grouped by that key, will the state get deleted? Is there a way to delete the state for that row i.e. forward a retra

Re: Deduplicating record amplification

2021-01-26 Thread Rex Fenley
Hello, I began reading https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows - *> Redistributing* streams (between *map()* and *keyBy/window*, as well as between *keyBy/window* and *sink*) change the partitioning of streams. Each *operator subtask*

Re: Deduplicating record amplification

2021-01-26 Thread Rex Fenley
Our data arrives in order from Kafka, so we are hoping to use that same order for our processing. On Tue, Jan 26, 2021 at 8:40 PM Rex Fenley wrote: > Going further, if "Flink provides no guarantees about the order of the > elements within a window" then with minibatch, which I assume uses a wind

Datadog reporter timeout & OOM issue

2021-01-26 Thread Xingcan Cui
Hi all, Recently, I tried to use the Datadog reporter to collect some user-defined metrics. Sometimes when reaching traffic peaks (which are also peaks for metrics), the HTTP client will throw the following exception: ``` [OkHttp https://app.datadoghq.com/...] WARN org.apache.flink.metrics.datad

Re: Deduplicating record amplification

2021-01-26 Thread Rex Fenley
Going further, if "Flink provides no guarantees about the order of the elements within a window" then with minibatch, which I assume uses a window under the hood, any aggregates that expect rows to arrive in order will fail to keep their consistency. Is this correct? On Tue, Jan 26, 2021 at 5:36 P

Deduplicating record amplification

2021-01-26 Thread Rex Fenley
Hello, We have a job from CDC to a large unbounded Flink plan to Elasticsearch. Currently, we have been relentlessly trying to reduce our record amplification which, when our Elasticsearch index is near fully populated, completely bottlenecks our write performance. We decided recently to try a ne

Re: Seeing Rocks Native Metrics in Data Dog

2021-01-26 Thread Rex Fenley
Oddly, I'm seeing them now. I'm not sure what has changed. Fwiw, we have modified the scopes per https://docs.datadoghq.com/integrations/flink/#metric-collection but their modifications ids as tags. We do need to modify them according to that documentation - "*Note*: The system scopes must be remap

Re: Initializing broadcast state

2021-01-26 Thread Jaffe, Julian
One thing to consider could be using a CoProcessFunction instead of a BroadcastProcessFunction, and calling .broadcast on the input stream you want every task manager to receive. Then you could follow the pattern you laid out in your sample code (e.g. initialize state in the initializeState func

Rocksdb - org.apache.flink.util.SerializedThrowable : bad entry in block

2021-01-26 Thread Deshpande, Omkar
Hello, I am using flink 1.9 with beam 2.26 and rocksdb state backend. I am getting this exception - org.apache.flink.util.SerializedThrowable: Caught exception while processing timer. at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(St

Re: Seeing Rocks Native Metrics in Data Dog

2021-01-26 Thread Chesnay Schepler
It is good to know that something from the task executors arrives at datadog. Do you see any metrics for a specific job, like the numRestarts metric of the JobManager? Are you using the default scope formats

Re: Initializing broadcast state

2021-01-26 Thread Nick Bendtner
Thanks a lot Guowei, that makes sense. I will go with the caching approach. Can you point me to any example which shows what is the most efficient way to cache elements. Thanks a ton for your help. Best, Nick On Mon, Jan 25, 2021 at 10:38 PM Guowei Ma wrote: > Hi,Nick > I do not think you could

Re: Seeing Rocks Native Metrics in Data Dog

2021-01-26 Thread Rex Fenley
All taskmanager and jobmanager logs show up. Anything specific to an operator does not. For example, flink.taskmanager.Status.JVM.Memory.Heap.Used shows up, but I can't see stats on an individual operator. I mostly followed a combination of https://docs.datadoghq.com/integrations/flink/#metric-col

Re: windows and triggers

2021-01-26 Thread Marco Villalobos
Ah, this works as expected, since Flink documentation states this: By specifying a trigger using trigger() you are overwriting the default > trigger of a WindowAssigner. For example, if you specify a CountTrigger for > TumblingEventTimeWindows you will no longer get window firings based on the > p

windows and triggers

2021-01-26 Thread Marco Villalobos
I wrote this simple test: .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) .trigger(PurgingTrigger.of(CountTrigger.of(5))) Thinking that if I send 2 elements of data, it would collect them after a minute. But that doesn't seem to be happening. Is my understanding of how windows and tri

Re: Seeing Rocks Native Metrics in Data Dog

2021-01-26 Thread Chesnay Schepler
Anything metric that is shown in the Flink UI should also appear in DataDog. If this is not the case then something goes wrong within the reporter. Is there anything suspicious in the Flink logs? Can you give some example of metrics that /do/ show up in DataDog? On 1/26/2021 6:32 PM, Rex Fenley

Re: Annotating AggregateFunction accumulator type with @DataTypeHint

2021-01-26 Thread Seth Wiesman
Yes, the FunctionHint annotation has an accumulator field. There is an example in its JavaDoc. Seth On Tue, Jan 26, 2021 at 6:39 AM Yuval Itzchakov wrote: > Hi, I have an aggregate function of the form: > > class Foo extends AggregateFunction[Array[Json], util.List[Json]] > > I want to treat th

Seeing Rocks Native Metrics in Data Dog

2021-01-26 Thread Rex Fenley
Hi, I need to get a deeper dive into how rocks is performing so I turned on Rocks Native Metrics. However, I don't see any of the metrics in DataDog (though I have other Flink metrics in DataDog). I only see rocks metrics in the operator metrics in Flink UI, and unfortunately I can't really zoom i

Re: Using double quotes for SQL identifiers

2021-01-26 Thread Gyula Fóra
Thanks guys for the pointers, I will give it a try! Gyula On Tue, Jan 26, 2021 at 5:53 PM Timo Walther wrote: > Hi Gyula, > > the TableEnvironment.getConfig offers a setPlannerConfig. And > org.apache.flink.table.planner.calcite.CalciteConfigBuilder helps in > creating an object that implements

Re: Using double quotes for SQL identifiers

2021-01-26 Thread Timo Walther
Hi Gyula, the TableEnvironment.getConfig offers a setPlannerConfig. And org.apache.flink.table.planner.calcite.CalciteConfigBuilder helps in creating an object that implements this interface. You should be able to influence the Calcite parser config with this. However, I'm not sure how well

Re: Using double quotes for SQL identifiers

2021-01-26 Thread Sebastian Liu
Hi Gyula, AFAIK, except the sql-dialect, table API does not expose any parser related configuration to the user. But we still can change the config of quoting identifiers in parser with some code changing. You can reference this test class: org.apache.flink.sql.parser.FlinkDDLDataTypeTest.TestFact

Re: Difference between table.exec.source.idle-timeout and setIdleStateRetentionTime ?

2021-01-26 Thread Dcosta, Agnelo (HBO)
Hi Dawid, thanks for the clarification and it helps a lot. Reply to couple of points : what is causing the state to grow? We are using flink SQL and have 5 pattern match queries , 3 group by tumble windows. State growth over time is primarily coming from pattern match queries. Is it ever growing

Re: What causes a buffer pool exception? How can I mitigate it?

2021-01-26 Thread Marco Villalobos
Actually, the log I sent in my previous message, shows the only error that occurred before the buffer pool was destroyed. That intermittent warning: 2021-01-26 04:14:33,140 WARN org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher [] - Committing offsets to Kafka takes longer than th

Re: memory tuning

2021-01-26 Thread Marco Villalobos
Yes, I will do that. PRODUCTION 2021-01-26 04:03:50,804 INFO org.apache.flink.yarn.YarnTaskExecutorRunner [] - 2021-01-26 04:03:50,807 INFO org.apache.flink.yarn.YarnTaskExecutorRunner

Re: Conflicts between the JDBC and postgresql-cdc SQL connectors

2021-01-26 Thread Sebastián Magrí
Thanks a lot for looking into it Dawid, In the src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory file I only see org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory Even after applying the ServicesResourceTransformer. On Tue, 26 Jan 2021 at 11:58, Dawid Wy

[DISCUSS] Removal of flink-swift-fs-hadoop module

2021-01-26 Thread Robert Metzger
Hi all, during a security maintenance PR [1], Chesnay noticed that the flink-swift-fs-hadoop module is lacking test coverage [2]. Also, there hasn't been any substantial change since 2018, when it was introduced. On the user@ ML, I could not find any proof of significant use of the module (no one

Using double quotes for SQL identifiers

2021-01-26 Thread Gyula Fóra
Hi All! Is it possible in any way to configure the Table environments to allow double quotes (") to be used for identifiers instead of backticks (`). Thank you! Gyula

Annotating AggregateFunction accumulator type with @DataTypeHint

2021-01-26 Thread Yuval Itzchakov
Hi, I have an aggregate function of the form: class Foo extends AggregateFunction[Array[Json], util.List[Json]] I want to treat the accumulator as a "RAW" type, since Json is an abstract class and this fails at runtime. Is there any way to annotate the AggregateFunction accumulator type? All the

Re: Conflicts between the JDBC and postgresql-cdc SQL connectors

2021-01-26 Thread Dawid Wysakowicz
Hi, Unfortunately I am not familiar with the packaging of flink-connector-postgres-cdc. Maybe @Jark could help here? However, I think the problem that you cannot find the connector is caused because of lack of entry in the resulting Manifest file. If there are overlapping classes maven does not e

Re: flink slot communication

2021-01-26 Thread Dawid Wysakowicz
Hi, If tasks end up in the same TaskManager, they us LocalInputChannel(s), which does not go through network, but reads directly from local partitions. I am also pulling in @Piotr who might give you some more insights, or correct me if I am wrong. [1] https://ci.apache.org/projects/flink/flink-d

Conflicts between the JDBC and postgresql-cdc SQL connectors

2021-01-26 Thread Sebastián Magrí
Hi! I've reported an issue with the postgresql-cdc connector apparently caused by the maven shade plugin excluding either the JDBC connector or the cdc connector due to overlapping classes. The issue for reference is here: https://github.com/ververica/flink-cdc-connectors/issues/90 In the meanti

Re: JobManager seems to be leaking temporary jar files

2021-01-26 Thread Maciek Próchniak
Hi Matthias, I think the problem lies somewhere in JarRunHandler, as this is the place where the files are created. I think these are not the files that are managed via BlobService, as they are not stored in BlobService folders (I made experiment changing default BlobServer folders). It se

Re: Difference between table.exec.source.idle-timeout and setIdleStateRetentionTime ?

2021-01-26 Thread Dawid Wysakowicz
Hi, The difference is that the *table.exec.source.idle-timeout *is used for dealing with source idleness[1]. It is a problem that a watermark cannot advance if some of the partition become idle (do not produce any records). Watermark is always the minimum of watermarks of all input partitions. The

Re: A few questions about minibatch

2021-01-26 Thread Dawid Wysakowicz
I am pulling Jark and Godfrey who are more familiar with the planner internals. Best, Dawid On 22/01/2021 20:11, Rex Fenley wrote: > Hello, > > Does anyone have any more information here? > > Thanks! > > On Wed, Jan 20, 2021 at 9:13 PM Rex Fenley > wrote: > > Hi,

Re: Comment in source code of CoGroupedStreams

2021-01-26 Thread Dawid Wysakowicz
For the problem of the uid you can follow Guowei's advice. As for the comment, I think it means that all elements of a single key must fit into the memory when they're passed as iterators to the CoGroupFunction. Best, Dawid On 21/01/2021 21:32, Sudharsan R wrote: > Is this comment in the file >

Re: Converting non-mini-batch to mini-batch from checkpoint or savepoint

2021-01-26 Thread Dawid Wysakowicz
I am pulling in Jark and Godfrey who are more familiar with the internals of the planner. On 21/01/2021 01:43, Rex Fenley wrote: > Just tested this and I couldn't restore from a savepoint. If I do a > new job from scratch, can I tune the minibatch parameters and restore > from a savepoint without

Re: DataStream API: Best way for reading csv file

2021-01-26 Thread Dawid Wysakowicz
Hi Jan, First of all I'd rather recommend Table API for processing structured data. However if you are convinced you want to use the DataStream API, the CsvInputFormat supports the java.sql.Date type. You can try that or what I would suggest is to parse the Date field as string and then parse it

Re: JobManager seems to be leaking temporary jar files

2021-01-26 Thread Matthias Pohl
Hi Maciek, my understanding is that the jars in the JobManager should be cleaned up after the job is terminated (I assume that your jobs successfully finished). The jars are managed by the BlobService. The dispatcher will trigger the jobCleanup in [1] after job termination. Are there any suspicious

Re: memory tuning

2021-01-26 Thread Matthias Pohl
Hi Marco, Could you share the preconfiguration logs? They are printed in the beginning of the taskmanagers' logs and contain a summary of the used memory configuration? Best, Matthias On Tue, Jan 26, 2021 at 6:35 AM Marco Villalobos wrote: > > I have a flink job that collects and aggregates tim