Hi Quynh, The same code in my last reply showed how to set the UID for the source operator generated using Table API. I meant that you could firstly create a source using Table API, then convert it to a DataStream API and set uid for the source operator using the same code above, then perform operations with DataStream API.
Regards, Dian On Mon, Apr 25, 2022 at 9:27 PM lan tran <indigoblue7...@gmail.com> wrote: > Hi Dian, > > Thank again for fast response. > > As your suggestion above, we can apply to set the UID for only for the > DataStream state (as you suggest to convert from table to data stream). > > However, at the first phase which is collecting the data from Kafka ( > having Debezium format), the UID cannot be set since we are using Table API > (auto generate the UID). > > Therefore, if there is some crashed or needed revert using SavePoint, we > cannot use it in the first phase since we cannot set the UID for this => so > how can we revert it ?. > > As a result of that, we want to use DebeziumAvroRowDeserializationSchema > and DebeziumJsonRowDeserializationSchema in the DataStream job to be able > to use the Savepoint for the whole full flow. > > Best, > Quynh > > > > Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for > Windows > > > > *From: *Dian Fu <dian0511...@gmail.com> > *Sent: *Monday, April 25, 2022 7:46 PM > *To: *lan tran <indigoblue7...@gmail.com> > *Cc: *user@flink.apache.org > *Subject: *Re: AvroRowDeserializationSchema > > > > Hi Quynh, > > You could try the following code (also it may be a little hacky): > ``` > > def set_uid_for_source(ds: DataStream, uid: str): > > transformation = ds._j_data_stream.getTransformation() > > > > source_transformation = transformation > > while not source_transformation.getInputs().isEmpty(): > > source_transformation = source_transformation.getInputs().get(0) > > > > source_transformation.setUid(uid) > > ``` > > Besides, could you describe your use case a bit and also how you want to > use DebeziumAvroRowDeserializationSchema and > DebeziumJsonRowDeserializationSchema in the DataStream job? Note that for > the sources with these formats, it will send UPDATE messages to downstream > operators. > > Regards > Dian > > > > On Mon, Apr 25, 2022 at 12:31 PM lan tran <indigoblue7...@gmail.com> > wrote: > > Yeah, I already tried that way. However, if we did not use DataStream at > first. We cannot implement the Savepoint since through the doc if we use > TableAPI (SQL API), the uid is generated automatically which means we > cannot revert if the system is crashed. > > Best, > Quynh > > > > Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for > Windows > > > > *From: *Dian Fu <dian0511...@gmail.com> > *Sent: *Monday, April 25, 2022 11:04 AM > *To: *lan tran <indigoblue7...@gmail.com> > *Cc: *user@flink.apache.org > *Subject: *Re: AvroRowDeserializationSchema > > > > DebeziumAvroRowDeserializationSchema and > DebeziumJsonRowDeserializationSchema are still not supported in > Python DataStream API. > > Just take a further look at the Java implementation of > DebeziumAvroDeserializationSchema and DebeziumJsonDeserializationSchema, > the results type is RowData instead of Row and so it should be not that > easy to be directly supported in Python DataStream API. However, it > supports conversion between Table API & DataStream API[1]. Could you > firstly create a Table which consumes data from kafka and then convert it > to a DataStream API? > > Regards, > Dian > > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/datastream/intro_to_datastream_api/#create-using-table--sql-connectors > > > > On Mon, Apr 25, 2022 at 11:48 AM Dian Fu <dian0511...@gmail.com> wrote: > > Yes, we should support them. > > For now, if you want to use them, you could create ones in your own > project. You could refer to AvroRowDeserializationSchema[1] as an example. > It should not be complicated as it's simply a wrapper of the > Java implementation. > > Regards, > Dian > > [1] > https://github.com/apache/flink/blob/e11a5c52c613e121f7a7868cbbfd9e7c21551394/flink-python/pyflink/common/serialization.py#L308 > > > > On Mon, Apr 25, 2022 at 11:27 AM lan tran <indigoblue7...@gmail.com> > wrote: > > Thank Dian !! Very appreciate this. > > However, I have another questions related to this. In current version or > any updating in future, does DataStream support > DebeziumAvroRowDeserializationSchema and > DebeziumJsonRowDeserializationSchema in PyFlink ? Since I look at the > documentation and seem it is not supported yet. > > Best, > Quynh > > Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for > Windows > > > > *From: *Dian Fu <dian0511...@gmail.com> > *Sent: *Friday, April 22, 2022 9:36 PM > *To: *lan tran <indigoblue7...@gmail.com> > *Cc: *user@flink.apache.org > *Subject: *Re: AvroRowDeserializationSchema > > > > Hi Quynh, > > I have added an example on how to use AvroRowDeserializationSchema in > Python DataStream API in [1]. Please take a look at if that helps for you~ > > Regards, > Dian > > [1] > https://github.com/apache/flink/blob/release-1.15/flink-python/pyflink/examples/datastream/formats/avro_format.py > > > > On Fri, Apr 22, 2022 at 7:24 PM Dian Fu <dian0511...@gmail.com> wrote: > > Hi Quynh, > > Could you show some sample code on how you use it? > > Regards, > Dian > > > > On Fri, Apr 22, 2022 at 1:42 PM lan tran <indigoblue7...@gmail.com> wrote: > > Wonder if this is a bug or not but if I use > *AvroRowDeserializationSchema,* > > In PyFlink the error still occure ? > > py4j.protocol.Py4JError: An error occurred while calling > None.org.apache.flink.formats.avro.AvroRowDeserializationSchema. Trace: > > org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor > org.apache.flink.formats.avro.AvroRowDeserializationSchema([class > org.apache.avro.Schema$RecordSchema]) does not exist > > Therefore, please help check. Thanks > Best, > Quynh > > > > > > Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for > Windows > > > > *From: *lan tran <indigoblue7...@gmail.com> > *Sent: *Thursday, April 21, 2022 1:43 PM > *To: *user@flink.apache.org > *Subject: *AvroRowDeserializationSchema > > > > Hi team, > > I want to implement AvroRowDeserializationSchema when consume data from > Kafka, however from the documentation, I did not understand what are > avro_schema_string and record_class ? I would be great if you can give me > the example on this (I only have the example on Java, however, I was doing > it using PyFlink ). > > As my understanding avro_schema_string is schema_registry_url ? Does it > support this > 'debezium-avro-confluent.schema-registry.url'='{schema_registry_url}' like > in TableAPI ? > > Best, > Quynh. > > Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for > Windows > > > > > > > > > > >