Dynamically merge multiple upstream souces

2020-06-01 Thread uuuuuu
Hi all, I have a user case where I want to merge several upstream data source (Kafka topics). The data are essential the same, but they have different field names. I guess I can say my problem is not so much about flink itself. It is about how to deserialize data and merge different data effect

Re: Native kubernetes integration for standalone job clusters.

2020-06-01 Thread Yang Wang
Hi Akshay, Currently, zookeeper HA service could be used for both session cluster and job cluster when deploying Flink on K8s. If you mean to using the native K8s HA(i.g. native leader election and configmap to store meta)[1], i think it could not be supported now. [1]. https://issues.apache.org

Re: kerberos integration with flink

2020-06-01 Thread Yangze Guo
It sounds good to me. If your job keeps running (longer than the expiration time), I think it implies that Krb5LoginModule will use your newly generated cache. It's my pleasure to help you. Best, Yangze Guo On Mon, Jun 1, 2020 at 10:47 PM Nick Bendtner wrote: > > Hi Guo, > The auto renewal happe

RE: History Server Not Showing Any Jobs - File Not Found?

2020-06-01 Thread Hailu, Andreas
So I created a new HDFS directory with just 1 archive and pointed the server to monitor that directory, et voila - I'm able to see the applications in the UI. So it must have been really churning trying to fetch all of those initial archives :) I have a couple of follow up questions if you plea

Native kubernetes integration for standalone job clusters.

2020-06-01 Thread Akshay Iyangar
Hi So we actually run Flink in Kubernetes as a standalone Flink job for each Flink pipeline currently. We wanted to take advantage of Flink HA using K8 but looks like it only supports Flink session clusters currently for version 1.10. Any ideas when will it have support for standalone job cl

Re: kerberos integration with flink

2020-06-01 Thread Nick Bendtner
Hi Guo, The auto renewal happens fine, however I want to generate a new ticket with a new renew until period so that the job can run longer than 7 days, I am talking about the second paragraph your email, I have set a custom cache by setting KRB5CCNAME . Just want to make sure that Krb5LoginModule

Re: pyflink 使用udf函数 处理成json字符串发送到kafka的格式问题

2020-06-01 Thread Xingbo Huang
客气客气,互相交流学习😀 Best, Xingbo jack 于2020年6月1日周一 下午9:07写道: > 非常感谢解惑,刚开始使用pyflink,不是很熟悉,还请多多指教 > > > > > > > 在 2020-06-01 20:50:53,"Xingbo Huang" 写道: > > Hi, > 其实这个是CSV connector的一个可选的 > quote_character参数的效果,默认值是",所以你每个字段都会这样外面包一层,你可以配置一下这个参数为\0,就可以得到你要的效果了。 > st_env.connect( > Kafka() >

Re: pyflink 使用udf函数 处理成json字符串发送到kafka的格式问题

2020-06-01 Thread jack
非常感谢解惑,刚开始使用pyflink,不是很熟悉,还请多多指教 在 2020-06-01 20:50:53,"Xingbo Huang" 写道: Hi, 其实这个是CSV connector的一个可选的quote_character参数的效果,默认值是",所以你每个字段都会这样外面包一层,你可以配置一下这个参数为\0,就可以得到你要的效果了。 st_env.connect( Kafka() .version("0.11") .topic("logSink")

Re: pyflink 使用udf函数 处理成json字符串发送到kafka的格式问题

2020-06-01 Thread Xingbo Huang
Hi, 其实这个是CSV connector的一个可选的 quote_character参数的效果,默认值是",所以你每个字段都会这样外面包一层,你可以配置一下这个参数为\0,就可以得到你要的效果了。 st_env.connect( Kafka() .version("0.11") .topic("logSink") .start_from_earliest() .property("zookeeper.connect", "localhost:2181")

Re: Creating Kafka Topic dynamically in Flink

2020-06-01 Thread Leonard Xu
I think @brat is right, I didn’t know the Kafka property 'auto.create.topics.enable’ , you can pass the property to Kafka Producer, that should work. Best, Leonard Xu > 在 2020年6月1日,18:33,satya brat 写道: > > Prasanna, > You might want to check the kafka broker configs where > 'auto.create.topi

Re: Creating Kafka Topic dynamically in Flink

2020-06-01 Thread satya brat
Prasanna, You might want to check the kafka broker configs where 'auto.create.topics.enable' helps with creating a new topic whenever a new message with non existent topic is published. https://kafka.apache.org/documentation/#brokerconfigs I am not too sure about pitfalls if any. On Mon, Jun 1, 2

[ANNOUNCE] Weekly Community Update 2020/22

2020-06-01 Thread Konstantin Knauf
Dear community, happy to share this week's community update with an update on the upcoming Apache Flink releases: Flink 1.11 and Flink Stateful Functions 2.1. With the community focused on release testing the dev@ mailing list remains relatively quiet. Flink Development == * [releas

Re: Creating Kafka Topic dynamically in Flink

2020-06-01 Thread Leonard Xu
Hi, kumar Sorry for missed the original question, I think we can not create topic dynamically current, creating topic should belong to control flow rather a data flow, and user may has some custom configurations of the topic from my understanding. Maybe you need implement the logic of check/cre

pyflink 使用udf函数 处理成json字符串发送到kafka的格式问题

2020-06-01 Thread jack
请教各位,我这边使用pyflink 消费kafka json字符串数据发送到kafka, 把kafka消息按照一个字段来处理, 数据输入: {"topic": "logSource", "message": "x=1,y=1,z=1"} 发送到kafka里面的数据结果如下: "{""topic"": ""logSource"", ""message"": ""x=1,y=1,z=1""}" 又被双引号包了一层, 我的udf函数中使用的是 json模块进行处理,请各位帮我看一下怎么样转成正常的json串。 @udf(input_types=[DataTypes.STRING()

Re: Creating Kafka Topic dynamically in Flink

2020-06-01 Thread Prasanna kumar
Leaonard, Thanks for the reply and would look into those options. But as for the original question, could we create a topic dynamically when required . Prasanna. On Mon, Jun 1, 2020 at 2:18 PM Leonard Xu wrote: > Hi, kumar > > Flink support consume/produce from/to multiple kafka topics[1], in

Re: State expiration in Flink

2020-06-01 Thread Yun Tang
Hi Vasily As far as I know, current TTL of state lack of such kind of trigger, and perhaps onTimer or process specific event to trigger could help your scenario. Best Yun Tang. From: Vasily Melnik Sent: Monday, June 1, 2020 14:13 To: Yun Tang Cc: user Subject:

Re: Creating Kafka Topic dynamically in Flink

2020-06-01 Thread Leonard Xu
Hi, kumar Flink support consume/produce from/to multiple kafka topics[1], in your case you can implement KeyedSerializationSchema(legacy interface) or KafkaSerializationSchema[2] to make one producer instance support send data to multiple topics. There is an ITCase you can reference[3]. Best,

Re: Flink s3 streaming performance

2020-06-01 Thread Jörn Franke
I think S3 is a wrong storage backend for this volumes of small messages. Try to use a NoSQL database or write multiple messages into one file in S3 (1 or 10) If you still want to go with your scenario then try a network optimized instance and use s3a in Flink and configure s3 entropy.

Creating Kafka Topic dynamically in Flink

2020-06-01 Thread Prasanna kumar
Hi, I have Use Case where i read events from a Single kafka Stream comprising of JSON messages. Requirement is to split the stream into multiple output streams based on some criteria say based on Type of Event or Based on Type and Customer associated with the event. We could achieve the splittin

Re: best practice for handling corrupted records / exceptions in custom DefaultKryoSerializer?

2020-06-01 Thread Yu Yang
Thanks for the suggestion, Yun! On Sun, May 31, 2020 at 11:15 PM Yun Gao wrote: > Hi Yu, > > I think when the serializer returns *null, *the following operator should > still receive a record of null. A possible thought is that the following > operator may couting the number of null records rece

Re: How to create schema for flexible json data in Flink SQL

2020-06-01 Thread Guodong Wang
Hi Jark, You totally got my point. Actually, the perfect solution in my opinion is to support schema evolution in one query. Although classic SQL needs to know the schema before do any computing, when integrating the nosql data source to flink datastream, if schema evolution is possible, it will s