I also tried doing this by using a User Defined Function.
class DataConverter(ScalarFunction):
def eval(self, str_data):
data = json.loads(str_data)
return ?? # I want to return data['0001'] in field
'feature1', data['0002'] in field 'feature2' etc.
t_env.register_function("data_converter", udf(DataConverter(),
input_types = [DataTypes.STRING()],
result_type =
DataTypes.ROW([
DataTypes.FIELD("feature1", DataTypes.STRING())
])))
t_env.from_path(INPUT_TABLE) \
.select("data_converter(data)") \ # <--- here "data" is the field
"data" from the previous mail
.insert_into(OUTPUT_TABLE)
I used a ROW to hold multiple values but I can't figure out how I can
return a populated ROW object from the eval() method. Where is the method
to construct a row/field object and return it?
Thanks!
On Fri, Jul 3, 2020 at 12:40 PM Manas Kale <[email protected]> wrote:
> Hi Xingbo,
> Thanks for the reply, I didn't know that a table schema also needs to be
> declared after the connect or but I understand now.
> I have another question: how do I write the parsing schemas for a field
> that itself is a valid JSON string? For example:
> {
> "monitorId": 865,
> "deviceId": "94:54:93:49:96:13",
> "data":
> "{\"0001\":105.0,\"0002\":1.21,\"0003\":0.69,\"0004\":1.46,\"0005\":47.43,\"0006\":103.3}",
> "state": 2,
> "time": 1593687809180
> }
> The field "data" is a string of valid JSON with string:number objects. I'm
> currently trying using JSON schema object and DataTypes.ROW, but am getting
> deserialization errors.
>
> .with_format(
> Json()
> .json_schema(
> """
> {
> "type": "object",
> "properties": {
> "monitorId": {
> "type": "string"
> },
> "deviceId": {
> "type": "string"
> },
> "data": {
> "type": "object"
> },
> "state": {
> "type": "integer"
> },
> "time": {
> "type": "string"
> }
> }
> }
> """
> )
> ) \
> .with_schema(
> Schema()
> .field("monitorId", DataTypes.STRING())
> .field("deviceId", DataTypes.STRING())
> .field("data", DataTypes.ROW())
> )
>
> Regards,
>
> Manas
>
>
> On Thu, Jul 2, 2020 at 6:25 PM Xingbo Huang <[email protected]> wrote:
>
>> Hi, Manas
>> You need to define the schema. You can refer to the following example:
>> t_env.connect(
>> Kafka()
>> .version('0.11')
>> .topic(INPUT_TOPIC)
>> .property("bootstrap.servers", PROD_KAFKA)
>> .property("zookeeper.connect", "localhost:2181")
>> .start_from_latest()
>> ) \
>> .with_format(
>> Json()
>> .json_schema(
>> "{"
>> " type: 'object',"
>> " properties: {"
>> " lon: {"
>> " type: 'number'"
>> " },"
>> " rideTime: {"
>> " type: 'string',"
>> " format: 'date-time'"
>> " }"
>> " }"
>> "}"
>> )
>> ) \
>> .with_schema( # declare the schema of the table
>> Schema()
>> .field("lon", DataTypes.DECIMAL(20, 10))
>> .field("rideTime", DataTypes.TIMESTAMP(6))
>> ).register_table_source(INPUT_TABLE)
>>
>> Best,
>> Xingbo
>>
>> Manas Kale <[email protected]> 于2020年7月2日周四 下午7:59写道:
>>
>>> Hi,
>>> I'm trying to get a simple consumer/producer running using the following
>>> code referred from the provided links :
>>>
>>> from pyflink.dataset import ExecutionEnvironment
>>> from pyflink.datastream import StreamExecutionEnvironment
>>> from pyflink.table import TableConfig, BatchTableEnvironment, DataTypes,
>>> StreamTableEnvironment
>>> from pyflink.table.descriptors import Kafka, Json, FileSystem, Schema
>>>
>>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>>>
>>> t_config = TableConfig()
>>> t_env = StreamTableEnvironment.create(exec_env, t_config)
>>>
>>> INPUT_TOPIC = 'xyz'
>>> INPUT_TABLE = 'raw_message'
>>> PROD_ZOOKEEPER = '...'
>>> PROD_KAFKA = '...'
>>>
>>> OUTPUT_TOPIC = 'summary_output'
>>> OUTPUT_TABLE = 'feature_summary'
>>> LOCAL_ZOOKEEPER = 'localhost:2181'
>>> LOCAL_KAFKA = 'localhost:9092'
>>>
>>>
>>> t_env.connect(
>>> Kafka()
>>> .version('universal')
>>> .topic(INPUT_TOPIC)
>>> .property("bootstrap.servers", PROD_KAFKA)
>>>
>>> .start_from_latest()
>>> ) \
>>> .with_format(
>>> Json()
>>> .json_schema(
>>> "{"
>>> " type: 'object',"
>>> " properties: {"
>>> " lon: {"
>>> " type: 'number'"
>>> " },"
>>> " rideTime: {"
>>> " type: 'string',"
>>> " format: 'date-time'"
>>> " }"
>>> " }"
>>> "}"
>>> )
>>> ).register_table_source(INPUT_TABLE)
>>>
>>> t_env.connect(Kafka()
>>> .version('universal')
>>> .topic(OUTPUT_TOPIC)
>>> .property("bootstrap.servers", LOCAL_KAFKA)
>>>
>>> .start_from_latest()
>>> ) \
>>> .with_format(
>>> Json()
>>> .json_schema(
>>> "{"
>>> " type: 'object',"
>>> " properties: {"
>>> " lon: {"
>>> " type: 'number'"
>>> " },"
>>> " rideTime: {"
>>> " type: 'string',"
>>> " format: 'date-time'"
>>> " }"
>>> " }"
>>> "}"
>>> )).register_table_sink(OUTPUT_TABLE)
>>>
>>> t_env.from_path(INPUT_TABLE) \
>>> .insert_into(OUTPUT_TABLE)
>>>
>>> t_env.execute('IU pyflink job')
>>>
>>> *However, I am getting the following exception : *
>>>
>>> Traceback (most recent call last):
>>> File
>>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py",
>>> line 147, in deco
>>> return f(*a, **kw)
>>> File
>>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py",
>>> line 328, in get_return_value
>>> format(target_id, ".", name), value)
>>> py4j.protocol.Py4JJavaError: An error occurred while calling
>>> o32.registerTableSource.
>>> : org.apache.flink.table.api.TableException: findAndCreateTableSource
>>> failed.
>>> at
>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
>>> at
>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:42)
>>> at
>>> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:78)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at
>>> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>>> at
>>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>>> at
>>> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>>> at
>>> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>>> at
>>> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>>> at
>>> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.flink.table.api.ValidationException: Could not find
>>> the required schema in property 'schema'.
>>> at
>>> org.apache.flink.table.descriptors.SchemaValidator.validate(SchemaValidator.java:90)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getValidatedProperties(KafkaTableSourceSinkFactoryBase.java:269)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:158)
>>> at
>>> org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49)
>>> at
>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:53)
>>> ... 13 more
>>>
>>>
>>> During handling of the above exception, another exception occurred:
>>>
>>> Traceback (most recent call last):
>>> File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 46, in
>>> <module>
>>> ).register_table_source(INPUT_TABLE)
>>> File
>>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/descriptors.py",
>>> line 1295, in register_table_source
>>> self._j_connect_table_descriptor.registerTableSource(name)
>>> File
>>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py",
>>> line 1286, in __call__
>>> answer, self.gateway_client, self.target_id, self.name)
>>> File
>>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py",
>>> line 154, in deco
>>> raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
>>> pyflink.util.exceptions.TableException: 'findAndCreateTableSource failed.'
>>>
>>>
>>> The relevant part seems to be *Caused by:
>>> org.apache.flink.table.api.ValidationException: Could not find the required
>>> schema in property 'schema'.*
>>>
>>> This is probably a basic error, but I can't figure out how I can know
>>> what's wrong with the schema. Is the schema not properly declared? Is some
>>> field missing?
>>>
>>> FWIW I have included the JSON and kafka connector JARs in the required
>>> location.
>>>
>>>
>>> Regards,
>>> Manas
>>>
>>>
>>> On Tue, Jun 30, 2020 at 11:58 AM Manas Kale <[email protected]>
>>> wrote:
>>>
>>>> Hi Xingbo,
>>>> Thank you for the information, it certainly helps!
>>>>
>>>> Regards,
>>>> Manas
>>>>
>>>> On Mon, Jun 29, 2020 at 6:18 PM Xingbo Huang <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Manas,
>>>>>
>>>>> Since Flink 1.9, the entire architecture of PyFlink has been
>>>>> redesigned. So the method described in the link won't work.
>>>>> But you can use more convenient DDL[1] or descriptor[2] to read kafka
>>>>> data. Besides, You can refer to the common questions about PyFlink[3]
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#run-a-create-statement
>>>>> [2]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>>>>> [3]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/python/common_questions.html
>>>>>
>>>>> Best,
>>>>> Xingbo
>>>>>
>>>>> Manas Kale <[email protected]> 于2020年6月29日周一 下午8:10写道:
>>>>>
>>>>>> Hi,
>>>>>> I want to consume and write to Kafak from Flink's python API.
>>>>>>
>>>>>> The only way I found to do this was through this
>>>>>> <https://stackoverflow.com/questions/52744277/apache-flink-kafka-connector-in-python-streaming-api-cannot-load-user-class>
>>>>>> question
>>>>>> on SO where the user essentially copies FlinkKafka connector JARs into
>>>>>> the
>>>>>> Flink runtime's lib/ directory.
>>>>>>
>>>>>> - Is this the recommended method to do this? If not, what is?
>>>>>> - Is there any official documentation for using Kafka
>>>>>> with pyFlink? Is this officially supported?
>>>>>> - How does the method described in the link work? Does the Flink
>>>>>> runtime load and expose all JARs in /lib to the python script? Can I
>>>>>> write
>>>>>> custom operators in Java and use those through python?
>>>>>>
>>>>>> Thanks,
>>>>>> Manas
>>>>>>
>>>>>