Re: best practice for handling corrupted records / exceptions in custom DefaultKryoSerializer?

2020-05-31 Thread Yun Gao
Hi Yu, I think when the serializer returns null, the following operator should still receive a record of null. A possible thought is that the following operator may couting the number of null records received and use a metric to publish the value to a monitor system, and the monitor system prom

Re: State expiration in Flink

2020-05-31 Thread Vasily Melnik
Thanks, Yun! One more question: is it possible to create some kind of handler on clearing up the state? For example i want to flush state to external storage (e.g. HBase) before cleanup.Now we make this manually with onTimer method, but is there another way? On Mon, 1 Jun 2020 at 05:28, Yun Tang

Re: Tumbling windows - increasing checkpoint size over time

2020-05-31 Thread Guowei Ma
Hi, 1. I am not the expert of Rocksdb. However, I think the state garbage collection depends on the rocksdb compaction especially if the checkpoint interval is 2s. This is because the window element is still in the sst file even if the window is triggerred. 2. Do you try the checkpoint interval 15

Rest Api body size

2020-05-31 Thread snack white
Hi, When I using rest api submit job , my post body size is 25.83kb (my jar has being uploaded to job manager before ) , my body was cut off in flink job manager, can someone tell me how to modify the post body length limit . Thanks, White

Re: kerberos integration with flink

2020-05-31 Thread Yangze Guo
Hi, Nick. Do you mean that you manually execute "kinit -R" to renew the ticket cache? If that is the case, Flink already sets the "renewTGT" to true. If everything is ok, you do not need to do it yourself. However, it seems this mechanism has a bug and this bug is not fixed in all JDK versions. Pl

Re: StateFun remote/embedded polyglot example

2020-05-31 Thread Tzu-Li (Gordon) Tai
Hi, On Mon, Jun 1, 2020 at 5:47 AM Omid Bakhshandeh wrote: > Hi, > > I'm very confused about StateFun 2.0 new architecture. > > Is it possible to have both remote and embedded functions in the same > deployment? > Yes that is possible. Embedded functions simply run within the Flink StateFun wor

Re: How to create schema for flexible json data in Flink SQL

2020-05-31 Thread Jark Wu
Hi all, This is an interesting topic. Schema inference will be the next big feature planned in the next release. I added this thread link into FLINK-16420. I think the case of Guodong is schema evolution, which I think there is something to do with schema inference. I don't have a clear idea for

Re: Flink Dashboard UI Tasks hard limit

2020-05-31 Thread Xintong Song
Hi Vijay, The error message suggests that another task manager (10.127.106.54) is not responding. This could happen when the remote task manager has failed or under severe GC pressure. You would need to find the log of the remote task manager to understand what is happening. Thank you~ Xintong S

Re: pyflink Table Api 使用udf函数报错 Given parameters do not match any signature.

2020-05-31 Thread Dian Fu
The input types should be as following: input_types=[DataTypes.STRING(), DataTypes.ARRAY(DataTypes.STRING())] Regards, Dian > 在 2020年6月1日,上午10:49,刘亚坤 写道: > > 目前在学习使用pyflink的Table api,请教一个pyflink udf函数报错问题: > > @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING()) > def drop_f

pyflink Table Api 使用udf函数报错 Given parameters do not match any signature.

2020-05-31 Thread 刘亚坤
目前在学习使用pyflink的Table api,请教一个pyflink udf函数报错问题: @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING()) def drop_fields(message, *fields): import json message = json.loads(message) for field in fields: message.pop(field) return json.dumps(messa

Re: State expiration in Flink

2020-05-31 Thread Yun Tang
Hi Vasily After Flink-1.10, state will be cleaned up periodically as CleanupInBackground is enabled by default. Thus, even you never access some specific entry of state and that entry could still be cleaned up. Best Yun Tang From: Vasily Melnik Sent: Saturday,

best practice for handling corrupted records / exceptions in custom DefaultKryoSerializer?

2020-05-31 Thread Yu Yang
Hi all, To deal with corrupted messages that can leak into the data source once in a while, we implement a custom DefaultKryoSerializer class as below that catches exceptions. The custom serializer returns null in read(...) method when it encounters exception in reading. With this implementation,

StateFun remote/embedded polyglot example

2020-05-31 Thread Omid Bakhshandeh
Hi, I'm very confused about StateFun 2.0 new architecture. Is it possible to have both remote and embedded functions in the same deployment? Is there a tutorial that shows the deployment of the two types in the same Kubernetes cluster alongside with Flink(possible in Python and Java)? Also, is t

Re: Flink s3 streaming performance

2020-05-31 Thread venkata sateesh` kolluru
Hi David, The avg size of each file is around 30KB and I have checkpoint interval of 5 minutes. Some files are even 1 kb, because of checkpoint some files are merged into 1 big file around 300MB. With 120 million files and 4Tb, if the rate of transfer is 300 per minute, it is taking weeks to writ

Re: Flink s3 streaming performance

2020-05-31 Thread David Magalhães
Hi Venkata. 300 requests per minute look like a 200ms per request, which should be a normal response time to send a file if there isn't any speed limitation (how big are the files?). Have you changed the parallelization to be higher than 1? I also recommend to limit the source parallelization, be