Re: [DISCUSS] Drop connectors for 5.x and restart the flink es source connector

2020-07-02 Thread Robert Metzger
The discussion on dropping the ES5 connector was not conclusive, when we discussed it in February 2020. We wanted to revisit it for the 1.12 release. >From maven central, we have the following download numbers ES2: 500 downloads ES5: 10500 downloads (the es5_2.10:1.3.1 had 8000 downloads last mont

Re: Re: How to dynamically initialize flink metrics in invoke method and then reuse it?

2020-07-02 Thread Xintong Song
Ok, I see your problem. And yes, keeping a map of metrics should work. Just for double checking, I assume there's an upper bound of your map keys (table names)? Because if not, an infinitely increasing in-memory map that is not managed by Flink's state might become problematic. Thank you~ Xinton

Re: Question about RocksDB performance tunning

2020-07-02 Thread Peter Huang
Hi Yun, Thanks for the info. These materials help a lot. Best Regards Peter Huang On Thu, Jul 2, 2020 at 11:36 PM Yun Tang wrote: > Hi Peter > > This is a general problem and you could refer to RocksDB's tuning > guides[1][2], you could also refer to Flink built-in PredefinedOptions.java > [3

Re: Re: How to dynamically initialize flink metrics in invoke method and then reuse it?

2020-07-02 Thread wangl...@geekplus.com.cn
Seems there's no direct solution. Perhaps i can implement this by initializing a HashMap with all the possible value of tableName in `open` mehtod and get the corresponding Meter according to tableName in the `invoke` method. Thanks, Lei wangl...@geekplus.com.cn Sender: wangl...@geekp

Re: Question about RocksDB performance tunning

2020-07-02 Thread Yun Tang
Hi Peter This is a general problem and you could refer to RocksDB's tuning guides[1][2], you could also refer to Flink built-in PredefinedOptions.java [3]. Generally speaking, increase write buffer size to reduce write amplification, increase the parallelism of keyed operator to share the pressu

Re: Checkpoint is disable, will history data in rocksdb be leak when job restart?

2020-07-02 Thread SmileSmile
Hi,yun tang I dont open checkpoint,so when my job restart,flink how to clean history state? my pod be killed only happend after the job restart again and again, in this case ,I have to rebuild the flink cluster 。 | | a511955993 | | 邮箱:a511955...@163.com | 签名由 网易邮箱大师 定制 On 07/03/2020 14:

Re: Re: How to dynamically initialize flink metrics in invoke method and then reuse it?

2020-07-02 Thread wangl...@geekplus.com.cn
Hi Xintong, Yes, initializing the metric in the `open` method works, but it doesn't solve my problem. I want to initialize the metric with a name that is extracted from the record content. Only in the `invoke` method i can do it. Actually my scenario is as follows. The record is MySQL binlog

Re: Timeout when using RockDB to handle large state in a stream app

2020-07-02 Thread Yun Tang
Hi Felipe, I noticed my previous mail has a typo: RocksDB is executed in task main thread which does not take the role to respond to heart beat. Sorry for previous typo, and the key point I want to clarify is that RocksDB should not have business for heartbeat problem. Best Yun Tang __

Re: Checkpoint is disable, will history data in rocksdb be leak when job restart?

2020-07-02 Thread Yun Tang
Hi If your job does not need checkpoint, why you would still restore your job with checkpoints? Actually, I did not total understand what you want, are you afraid that the state restored from last checkpoint would not be cleared? Since the event timer is also stored in checkpoint, after you re

Checkpoint is disable, will history data in rocksdb be leak when job restart?

2020-07-02 Thread SmileSmile
Hi My job work on flink 1.10.1 with event time , container memory usage will rise 2G after one restart,then pod will be killed by os after some times restart。 I find history data will be cleared when new data arrive, call the function onEventTime() to clearAllState.But my job no need Checkpo

Question about RocksDB performance tunning

2020-07-02 Thread Peter Huang
Hi, I have a stateful Flink job with 500k QPS. The job basically counts the message number on a combination key with 10 minutes tumbling window. If I use memory state backend, the job can run without lag but periodically fails due to OOM. If I turn up RocksDB state backend, it will have a high Ka

Re: Avro from avrohugger still invalid

2020-07-02 Thread Georg Heiler
But would it be possible to somehow use AvroSerializer for now? Best, Georg Am Do., 2. Juli 2020 um 23:44 Uhr schrieb Georg Heiler < georg.kf.hei...@gmail.com>: > What is the suggested workaround for now? > > > Thanks! > > Aljoscha Krettek schrieb am Do. 2. Juli 2020 um > 20:55: > >> Hi Georg,

Re: How to dynamically initialize flink metrics in invoke method and then reuse it?

2020-07-02 Thread Xintong Song
Hi Lei, I think you should initialize the metric in the `open` method. Then you can save the initialized metric as a class field, and update it in the `invoke` method for each record. Thank you~ Xintong Song On Fri, Jul 3, 2020 at 11:50 AM wangl...@geekplus.com.cn < wangl...@geekplus.com.cn>

Re: Flink AskTimeoutException killing the jobs

2020-07-02 Thread Xintong Song
The configuration option you're looking for is `akka.ask.timeout`. However, I'm not sure increasing this configuration would help in your case. The error message shows that there is a timeout on a local message. It is wired a local message does not get replied within 10 sec. I would suggest to lo

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

2020-07-02 Thread Si-li Liu
Hi, Thanks for your help. The checkpoint configuration is checkpoint.intervalMS=30 checkpoint.timeoutMS=30 The error callstack is from JM's log, which happened in every cp. Currently I don't have a success cp yet. Khachatryan Roman 于2020年7月3日周五 上午3:50写道: > Hi, > > Thanks for the detai

Re: Heartbeat of TaskManager timed out.

2020-07-02 Thread Xintong Song
I agree with Roman's suggestion for increasing heap size. It seems that the heap grows faster than freed. Thus eventually the Full GC is triggered, taking more than 50s and causing the timeout. However, even the full GC frees only 2GB space out of the 28GB max size. That probably suggests that the

How to dynamically initialize flink metrics in invoke method and then reuse it?

2020-07-02 Thread wangl...@geekplus.com.cn
In one flink operator, i want to initialize multiple flink metrics according to message content. As the code below. public void invoke(ObjectNode node, Context context) throws Exception { String tableName = node.get("metadata").get("topic").asText(); Meter meter = getRuntimeContext().g

Re: Avro from avrohugger still invalid

2020-07-02 Thread Georg Heiler
What is the suggested workaround for now? Thanks! Aljoscha Krettek schrieb am Do. 2. Juli 2020 um 20:55: > Hi Georg, > > unfortunately, it seems I only fixed the issue for AvroSerializer and > not for AvroDeserializationSchema. I created a new issue (which is a > clone of the old one) to track

Parquet data stream group converter error

2020-07-02 Thread Jesse Lord
I am trying to read a parquet file into a datastream and then register that stream as a temporary table. This file is created by spark 2.4 in HDFS on AWS EMR. I am using flink version 1.10.0 with EMR 5.30. I am getting the following error: Caused by: org.apache.flink.streaming.runtime.tasks.Asy

Flink Logging on EMR

2020-07-02 Thread mars
Hi, I am running my Flink jobs on EMR and i didn't include any log4j.properties as part of my JAR and i am using slf4j (and included the dependent jars in the uber jar i created) and logging in my app. When i run my everything is running fine except i cannot find my application logs any where

Flink AskTimeoutException killing the jobs

2020-07-02 Thread M Singh
Hi: I am using Flink 1.10 on AWS EMR cluster. We are getting AskTimeoutExceptions which is causing the flink jobs to die.    Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/resourcemanager#-1602864959]] after [1 ms]. Message of type [org.apache.flink.run

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

2020-07-02 Thread Khachatryan Roman
Hi, Thanks for the details. However, I was not able to reproduce the issue. I used parallelism levels 4, file system backend and tried different timings for checkpointing, windowing and source. Do you encounter this problem deterministically, is it always 1st checkpoint? What checkpointing interva

Re: Flink Logging on EMR

2020-07-02 Thread Ken Krugler
Hi Sateesh, Note that there are three classes of log files, when running Flink on EMR: 1. The output from the main class. Since I typically run the job by sshing onto the master and using the CLI from there, I have control over where that output goes. E.g. nohup bin/flink run -m yarn-cluster -

Re: Avro from avrohugger still invalid

2020-07-02 Thread Aljoscha Krettek
Hi Georg, unfortunately, it seems I only fixed the issue for AvroSerializer and not for AvroDeserializationSchema. I created a new issue (which is a clone of the old one) to track this [1]. The fix should be very simple since it's the same issue. Best, Aljoscha [1] https://issues.apache.org

Stateful Functions: Routing to remote functions

2020-07-02 Thread Jan Brusch
Hi, based on Gordons excellent advice on how to handle JSON messages with remote functions (https://www.mail-archive.com/user@flink.apache.org/msg34385.html) I was able to: 1) Deserialize JSON Messages from a Kafka Stream 2) Route the message to an embedded StatefulFunction 3) Serialize th

Re: Purpose of starting LeaderRetrievalService in DefaultDispatcherResourceManagerComponentFactory#create

2020-07-02 Thread Andrey Zagrebin
Hi Linlin, There may be a historic confusion in terminology. We often refer to 'JobManager' as a component which manages a single job. Names of all related classes usually contain 'JobManager'. At the same time, we can refer to it as a master process in Flink's cluster, potentially running multipl

Re: Heartbeat of TaskManager timed out.

2020-07-02 Thread Ori Popowski
Thank you very much for your analysis. When I said there was no memory leak - I meant that from the specific TaskManager I monitored in real-time using JProfiler. Unfortunately, this problem occurs only in 1 of the TaskManager and you cannot anticipate which. So when you pick a TM to profile at ra

Re: Flink Kafka connector in Python

2020-07-02 Thread Xingbo Huang
Hi, Manas You need to define the schema. You can refer to the following example: t_env.connect( Kafka() .version('0.11') .topic(INPUT_TOPIC) .property("bootstrap.servers", PROD_KAFKA) .property("zookeeper.connect", "localhost:2181") .start_from_latest()

Re: Heartbeat of TaskManager timed out.

2020-07-02 Thread Khachatryan Roman
Thanks, Ori >From the log, it looks like there IS a memory leak. At 10:12:53 there was the last "successfull" gc when 13Gb freed in 0.4653809 secs: [Eden: 17336.0M(17336.0M)->0.0B(2544.0M) Survivors: 40960.0K->2176.0M Heap: 23280.3M(28960.0M)->10047.0M(28960.0M)] Then the heap grew from 10G to 2

Re: Flink Kafka connector in Python

2020-07-02 Thread Manas Kale
Hi, I'm trying to get a simple consumer/producer running using the following code referred from the provided links : from pyflink.dataset import ExecutionEnvironment from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import TableConfig, BatchTableEnvironment, DataTypes, S

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

2020-07-02 Thread Si-li Liu
Hi, this is our production code so I have to modify it a little bit, such as variable name and function name. I think 3 classes I provide here is enough. I try to join two streams, but I don't want to use the default join function, because I want to send the joined log immediately and remove it fr

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

2020-07-02 Thread Khachatryan Roman
Thanks for the clarification. Can you also share the code of other parts, particularly MyFunction? Regards, Roman On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu wrote: > Rocksdb backend has the same problem > > Khachatryan Roman 于2020年7月2日周四 下午6:11写道: > >> Thanks for reporting this. >> >> Looks l

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

2020-07-02 Thread Si-li Liu
Rocksdb backend has the same problem Khachatryan Roman 于2020年7月2日周四 下午6:11写道: > Thanks for reporting this. > > Looks like the window namespace was replaced by VoidNamespace in state > entry. > I've created https://issues.apache.org/jira/browse/FLINK-18464 to further > investigate it. > > Regards

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

2020-07-02 Thread Khachatryan Roman
Thanks for reporting this. Looks like the window namespace was replaced by VoidNamespace in state entry. I've created https://issues.apache.org/jira/browse/FLINK-18464 to further investigate it. Regards, Roman On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu wrote: > I'm using flink 1.9 on Mesos and

Dockerised Flink 1.8 with Hadoop S3 FS support

2020-07-02 Thread Lorenzo Nicora
Hi I need to set up a dockerized *session cluster* using Flink *1.8.2* for development and troubleshooting. We are bound to 1.8.2 as we are deploying to AWS Kinesis Data Analytics for Flink. I am using an image based on the semi-official flink:1.8-scala_2.11 I need to add to my dockerized cluster

Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

2020-07-02 Thread Si-li Liu
I'm using flink 1.9 on Mesos and I try to use my own trigger and evictor. The state is stored to memory. input.setParallelism(processParallelism) .assignTimestampsAndWatermarks(new UETimeAssigner) .keyBy(_.key) .window(TumblingEventTimeWindows.of(Time.minutes(20)))

Re: Dynamic source and sink.

2020-07-02 Thread Paul Lam
Hi Doinesh, I think the problem you meet is quite common. But with the current Flink architecture, operators must be determined at compile time (when you submit your job). This is by design IIUC. Suppose the operators are changeable, then Flink would need to go through the compile-optimize-sch