Hi Manas,
If you want to return a RowType in Python UDF, you can use Row Class which
extends from python tuple.
You can use the following statement to import Row : from pyflink.table
import Row
Best,
Xingbo
Manas Kale 于2020年7月6日周一 下午8:08写道:
> I also tried doing this by using a User Defined Func
I also tried doing this by using a User Defined Function.
class DataConverter(ScalarFunction):
def eval(self, str_data):
data = json.loads(str_data)
return ?? # I want to return data['0001'] in field
'feature1', data['0002'] in field 'feature2' etc.
t_env.register_
Hi Xingbo,
Thanks for the reply, I didn't know that a table schema also needs to be
declared after the connect or but I understand now.
I have another question: how do I write the parsing schemas for a field
that itself is a valid JSON string? For example:
{
"monitorId": 865,
"deviceId": "9
Hi, Manas
You need to define the schema. You can refer to the following example:
t_env.connect(
Kafka()
.version('0.11')
.topic(INPUT_TOPIC)
.property("bootstrap.servers", PROD_KAFKA)
.property("zookeeper.connect", "localhost:2181")
.start_from_latest()
Hi,
I'm trying to get a simple consumer/producer running using the following
code referred from the provided links :
from pyflink.dataset import ExecutionEnvironment
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import TableConfig, BatchTableEnvironment,
DataTypes, S
Hi Xingbo,
Thank you for the information, it certainly helps!
Regards,
Manas
On Mon, Jun 29, 2020 at 6:18 PM Xingbo Huang wrote:
> Hi Manas,
>
> Since Flink 1.9, the entire architecture of PyFlink has been redesigned.
> So the method described in the link won't work.
> But you can use more conv
Hi Manas,
Since Flink 1.9, the entire architecture of PyFlink has been redesigned. So
the method described in the link won't work.
But you can use more convenient DDL[1] or descriptor[2] to read kafka data.
Besides, You can refer to the common questions about PyFlink[3]
[1]
https://ci.apache.org/