Re: Flink Kafka connector in Python

2020-07-14 Thread Xingbo Huang
Hi Manas, If you want to return a RowType in Python UDF, you can use Row Class which extends from python tuple. You can use the following statement to import Row : from pyflink.table import Row Best, Xingbo Manas Kale 于2020年7月6日周一 下午8:08写道: > I also tried doing this by using a User Defined Func

Re: Flink Kafka connector in Python

2020-07-06 Thread Manas Kale
I also tried doing this by using a User Defined Function. class DataConverter(ScalarFunction): def eval(self, str_data): data = json.loads(str_data) return ?? # I want to return data['0001'] in field 'feature1', data['0002'] in field 'feature2' etc. t_env.register_

Re: Flink Kafka connector in Python

2020-07-03 Thread Manas Kale
Hi Xingbo, Thanks for the reply, I didn't know that a table schema also needs to be declared after the connect or but I understand now. I have another question: how do I write the parsing schemas for a field that itself is a valid JSON string? For example: { "monitorId": 865, "deviceId": "9

Re: Flink Kafka connector in Python

2020-07-02 Thread Xingbo Huang
Hi, Manas You need to define the schema. You can refer to the following example: t_env.connect( Kafka() .version('0.11') .topic(INPUT_TOPIC) .property("bootstrap.servers", PROD_KAFKA) .property("zookeeper.connect", "localhost:2181") .start_from_latest()

Re: Flink Kafka connector in Python

2020-07-02 Thread Manas Kale
Hi, I'm trying to get a simple consumer/producer running using the following code referred from the provided links : from pyflink.dataset import ExecutionEnvironment from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import TableConfig, BatchTableEnvironment, DataTypes, S

Re: Flink Kafka connector in Python

2020-06-29 Thread Manas Kale
Hi Xingbo, Thank you for the information, it certainly helps! Regards, Manas On Mon, Jun 29, 2020 at 6:18 PM Xingbo Huang wrote: > Hi Manas, > > Since Flink 1.9, the entire architecture of PyFlink has been redesigned. > So the method described in the link won't work. > But you can use more conv

Re: Flink Kafka connector in Python

2020-06-29 Thread Xingbo Huang
Hi Manas, Since Flink 1.9, the entire architecture of PyFlink has been redesigned. So the method described in the link won't work. But you can use more convenient DDL[1] or descriptor[2] to read kafka data. Besides, You can refer to the common questions about PyFlink[3] [1] https://ci.apache.org/