Don’t expect that answer =)) However, I am very appreciate everything you did Thanks again for helping me out.
Best, Quynh. Sent from Mail for Windows Hi Dian,
Sorry for missing your mail, so if I did as your suggestion and the Flink somehow crashed and we have to restart the service, does the Flink job know the offset where does it read from Kafka ? Sent from Mail for Windows Hi Quynh,
The same code in my last reply showed how to set the UID for the source operator generated using Table API. I meant that you could firstly create a source using Table API, then convert it to a DataStream API and set uid for the source operator using the same code above, then perform operations with DataStream API.
Regards, Dian Hi Dian,
Thank again for fast response.
As your suggestion above, we can apply to set the UID for only for the DataStream state (as you suggest to convert from table to data stream).
However, at the first phase which is collecting the data from Kafka ( having Debezium format), the UID cannot be set since we are using Table API (auto generate the UID).
Therefore, if there is some crashed or needed revert using SavePoint, we cannot use it in the first phase since we cannot set the UID for this => so how can we revert it ?.
As a result of that, we want to use DebeziumAvroRowDeserializationSchema and DebeziumJsonRowDeserializationSchema in the DataStream job to be able to use the Savepoint for the whole full flow.
Best, Quynh Sent from Mail for Windows Hi Quynh,
You could try the following code (also it may be a little hacky): ``` def set_uid_for_source(ds: DataStream, uid: str): transformation = ds._j_data_stream.getTransformation() source_transformation = transformation while not source_transformation.getInputs().isEmpty(): source_transformation = source_transformation.getInputs().get(0) source_transformation.setUid(uid) ```
Besides, could you describe your use case a bit and also how you want to use DebeziumAvroRowDeserializationSchema and DebeziumJsonRowDeserializationSchema in the DataStream job? Note that for the sources with these formats, it will send UPDATE messages to downstream operators.
Regards Dian Yeah, I already tried that way. However, if we did not use DataStream at first. We cannot implement the Savepoint since through the doc if we use TableAPI (SQL API), the uid is generated automatically which means we cannot revert if the system is crashed.
Best, Quynh Sent from Mail for Windows Thank Dian !! Very appreciate this.
However, I have another questions related to this. In current version or any updating in future, does DataStream support DebeziumAvroRowDeserializationSchema and DebeziumJsonRowDeserializationSchema in PyFlink ? Since I look at the documentation and seem it is not supported yet.
Best, Quynh Sent from Mail for Windows Hi Quynh,
Could you show some sample code on how you use it?
Regards, Dian Wonder if this is a bug or not but if I use AvroRowDeserializationSchema, In PyFlink the error still occure ?
py4j.protocol.Py4JError: An error occurred while calling None.org.apache.flink.formats.avro.AvroRowDeserializationSchema. Trace: org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor org.apache.flink.formats.avro.AvroRowDeserializationSchema([class org.apache.avro.Schema$RecordSchema]) does not exist
Therefore, please help check. Thanks Best, Quynh Sent from Mail for Windows Hi team,
I want to implement AvroRowDeserializationSchema when consume data from Kafka, however from the documentation, I did not understand what are avro_schema_string and record_class ? I would be great if you can give me the example on this (I only have the example on Java, however, I was doing it using PyFlink ).
As my understanding avro_schema_string is schema_registry_url ? Does it support this 'debezium-avro-confluent.schema-registry.url'='{schema_registry_url}' like in TableAPI ?
Best, Quynh. Sent from Mail for Windows
|