In the meantime, I looked into the code of avro-confluent format and
compared it with avro format.

The *avro* format implemented *BulkWriterFormatFactory &
BulkReaderFormatFactory*  (see [1]) for file system connector and
also *DeserializationFormatFactory & SerializationFormatFactory* for other
connectors such as kafka (see [2]).
However, *avro-confluent* format only implemented *DeserializationFormatFactory
& SerializationFormatFactory* (see [3]).
In Filesystem connector, it will use the bulk writer/reader format factory
if they exist and use
*DeserializationFormatFactory & SerializationFormatFactory as a fallback. *

What is the main difference between (*BulkWriterFormatFactory,
BulkReaderFormatFactory) vs *
(*DeserializationFormatFactory, SerializationFormatFactory)*? Is it
performance? I'm wondering if avro should only be used
with bulk writer/reader format due to performance reasons and that is why
the Flink documentation
claims the avro-confluent format can only be used with kafka connector.


On Thu, Oct 6, 2022 at 1:47 PM liuxiangcao <xiangcaohe...@gmail.com> wrote:

> Actually I got the previous option validation issue resolved after adding
>   'avro-confluent.url' = 'https://confluent-schema-registry-url',
>   'avro-confluent.subject' = 'xxxx'
>
> Now I'm one step closer to verifying whether avro-confluent format works
> with filesystem connector or not.  Unfortuantely, I'm getting error when
> talking to our internal confluent schema registry:
> Caused by: org.apache.flink.util.WrappingRuntimeException: Failed to
> serialize schema registry.
> at org.apache.flink.formats.avro.RegistryAvroSerializationSchema
> .serialize(RegistryAvroSerializationSchema.java:90)
> at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(
> AvroRowDataSerializationSchema.java:88)
> ... 23 more
> Caused by: javax.net.ssl.SSLHandshakeException: PKIX path building
> failed: sun.security.provider.certpath.SunCertPathBuilderException:
> unable to find valid certification path to requested target
>
> This is because of an issue with our internal schema registry host cert
> and internally we just skip the host check in the ssl connection in java.
> Is there a way to configure skipping host check avro-confluent ssl
> connection?
>
>
>
> On Thu, Oct 6, 2022 at 1:27 PM liuxiangcao <xiangcaohe...@gmail.com>
> wrote:
>
>> typo in my first sentence: I actually also noticed the reference of 
>> filesystem
>> connector in the avro-confluent format doc which is confusing.
>>
>> On Thu, Oct 6, 2022 at 1:24 PM liuxiangcao <xiangcaohe...@gmail.com>
>> wrote:
>>
>>> Hi Martin,
>>>
>>> Thank you for the reply. I actually also noticed the reference of
>>> avro-confluent.subject in the avro-confluent format doc which is confusing.
>>>
>>>  I tried using 'connector' = 'filesystem' and 'format' =
>>> 'avro-confluent' and it would fail with options validation when I tried to
>>> insert data into the table.
>>>
>>> If I do not specify 'url' in the WITH block, it gives
>>> "org.apache.flink.table.api.ValidationException: One or more required
>>> options are missing.
>>> Missing required options are:
>>> url
>>> "
>>>
>>> if I specify 'url', it gives
>>> "org.apache.flink.table.api.ValidationException: Unsupported options
>>> found for 'filesystem'.
>>> Unsupported options:
>>> url"
>>>
>>> I also tried to specify 'value.avro-confluent.url' and fails with
>>> "org.apache.flink.table.api.ValidationException: Unsupported options
>>> found for 'filesystem'.
>>> Unsupported options:
>>> value.avro-confluent.url"
>>>
>>> On Tue, Oct 4, 2022 at 1:30 PM Martijn Visser <martijnvis...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm wondering if the documentation is correct, because the documentation
>>>> also mentions the option to use 'avro-confluent.subject' which refers to
>>>> using it with filesystem [1].
>>>>
>>>> Have you tried to use this already? Did you get an error message?
>>>>
>>>> Best regards,
>>>>
>>>> Martijn
>>>>
>>>> [1]
>>>>
>>>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/formats/avro-confluent/#avro-confluent-subject
>>>>
>>>> On Tue, Oct 4, 2022 at 3:18 PM liuxiangcao <xiangcaohe...@gmail.com>
>>>> wrote:
>>>>
>>>> > Hi Flink developer community,
>>>> >
>>>> > According to flink doc, avro-confluent([1]) is only supported for
>>>> kafka sql
>>>> > connector and upsert kafka sql connector.
>>>> >
>>>> > I'm wondering if there is any reason this format is not supported for
>>>> > Filesystem sql connector ([2])  ?
>>>> >
>>>> > We are looking to use FileSystem sink to write to s3 in avro format
>>>> and
>>>> > would like to keep the sink to be in sync with avro schema registry.
>>>> Are
>>>> > there any gotchas that we should be aware of in implementing
>>>> avro-confluent
>>>> > format for file system sink?
>>>> >
>>>> > [1]
>>>> >
>>>> >
>>>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/formats/avro-confluent/
>>>> > [2]
>>>> >
>>>> >
>>>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/filesystem/
>>>> >
>>>> >
>>>> > --
>>>> > Best Wishes & Regards
>>>> > Shawn Xiangcao Liu
>>>> >
>>>>
>>>
>>>
>>> --
>>> Best Wishes & Regards
>>> Shawn Xiangcao Liu
>>>
>>
>>
>> --
>> Best Wishes & Regards
>> Shawn Xiangcao Liu
>>
>
>
> --
> Best Wishes & Regards
> Shawn Xiangcao Liu
>


-- 
Best Wishes & Regards
Shawn Xiangcao Liu

Reply via email to