In the meantime, I looked into the code of avro-confluent format and compared it with avro format.
The *avro* format implemented *BulkWriterFormatFactory & BulkReaderFormatFactory* (see [1]) for file system connector and also *DeserializationFormatFactory & SerializationFormatFactory* for other connectors such as kafka (see [2]). However, *avro-confluent* format only implemented *DeserializationFormatFactory & SerializationFormatFactory* (see [3]). In Filesystem connector, it will use the bulk writer/reader format factory if they exist and use *DeserializationFormatFactory & SerializationFormatFactory as a fallback. * What is the main difference between (*BulkWriterFormatFactory, BulkReaderFormatFactory) vs * (*DeserializationFormatFactory, SerializationFormatFactory)*? Is it performance? I'm wondering if avro should only be used with bulk writer/reader format due to performance reasons and that is why the Flink documentation claims the avro-confluent format can only be used with kafka connector. On Thu, Oct 6, 2022 at 1:47 PM liuxiangcao <xiangcaohe...@gmail.com> wrote: > Actually I got the previous option validation issue resolved after adding > 'avro-confluent.url' = 'https://confluent-schema-registry-url', > 'avro-confluent.subject' = 'xxxx' > > Now I'm one step closer to verifying whether avro-confluent format works > with filesystem connector or not. Unfortuantely, I'm getting error when > talking to our internal confluent schema registry: > Caused by: org.apache.flink.util.WrappingRuntimeException: Failed to > serialize schema registry. > at org.apache.flink.formats.avro.RegistryAvroSerializationSchema > .serialize(RegistryAvroSerializationSchema.java:90) > at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize( > AvroRowDataSerializationSchema.java:88) > ... 23 more > Caused by: javax.net.ssl.SSLHandshakeException: PKIX path building > failed: sun.security.provider.certpath.SunCertPathBuilderException: > unable to find valid certification path to requested target > > This is because of an issue with our internal schema registry host cert > and internally we just skip the host check in the ssl connection in java. > Is there a way to configure skipping host check avro-confluent ssl > connection? > > > > On Thu, Oct 6, 2022 at 1:27 PM liuxiangcao <xiangcaohe...@gmail.com> > wrote: > >> typo in my first sentence: I actually also noticed the reference of >> filesystem >> connector in the avro-confluent format doc which is confusing. >> >> On Thu, Oct 6, 2022 at 1:24 PM liuxiangcao <xiangcaohe...@gmail.com> >> wrote: >> >>> Hi Martin, >>> >>> Thank you for the reply. I actually also noticed the reference of >>> avro-confluent.subject in the avro-confluent format doc which is confusing. >>> >>> I tried using 'connector' = 'filesystem' and 'format' = >>> 'avro-confluent' and it would fail with options validation when I tried to >>> insert data into the table. >>> >>> If I do not specify 'url' in the WITH block, it gives >>> "org.apache.flink.table.api.ValidationException: One or more required >>> options are missing. >>> Missing required options are: >>> url >>> " >>> >>> if I specify 'url', it gives >>> "org.apache.flink.table.api.ValidationException: Unsupported options >>> found for 'filesystem'. >>> Unsupported options: >>> url" >>> >>> I also tried to specify 'value.avro-confluent.url' and fails with >>> "org.apache.flink.table.api.ValidationException: Unsupported options >>> found for 'filesystem'. >>> Unsupported options: >>> value.avro-confluent.url" >>> >>> On Tue, Oct 4, 2022 at 1:30 PM Martijn Visser <martijnvis...@apache.org> >>> wrote: >>> >>>> Hi, >>>> >>>> I'm wondering if the documentation is correct, because the documentation >>>> also mentions the option to use 'avro-confluent.subject' which refers to >>>> using it with filesystem [1]. >>>> >>>> Have you tried to use this already? Did you get an error message? >>>> >>>> Best regards, >>>> >>>> Martijn >>>> >>>> [1] >>>> >>>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/formats/avro-confluent/#avro-confluent-subject >>>> >>>> On Tue, Oct 4, 2022 at 3:18 PM liuxiangcao <xiangcaohe...@gmail.com> >>>> wrote: >>>> >>>> > Hi Flink developer community, >>>> > >>>> > According to flink doc, avro-confluent([1]) is only supported for >>>> kafka sql >>>> > connector and upsert kafka sql connector. >>>> > >>>> > I'm wondering if there is any reason this format is not supported for >>>> > Filesystem sql connector ([2]) ? >>>> > >>>> > We are looking to use FileSystem sink to write to s3 in avro format >>>> and >>>> > would like to keep the sink to be in sync with avro schema registry. >>>> Are >>>> > there any gotchas that we should be aware of in implementing >>>> avro-confluent >>>> > format for file system sink? >>>> > >>>> > [1] >>>> > >>>> > >>>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/formats/avro-confluent/ >>>> > [2] >>>> > >>>> > >>>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/filesystem/ >>>> > >>>> > >>>> > -- >>>> > Best Wishes & Regards >>>> > Shawn Xiangcao Liu >>>> > >>>> >>> >>> >>> -- >>> Best Wishes & Regards >>> Shawn Xiangcao Liu >>> >> >> >> -- >> Best Wishes & Regards >> Shawn Xiangcao Liu >> > > > -- > Best Wishes & Regards > Shawn Xiangcao Liu > -- Best Wishes & Regards Shawn Xiangcao Liu