There are plans to also derive the table schema from Avro schema. But we haven't decided on a syntax for this yet. For now, we only support this through catalogs such as Confluent schema registry.

Regards,
Timo


On 07.01.21 21:42, Aeden Jameson wrote:
 Brilliant, thank you. That will come in handy. I was looking through docs hoping there was a way to still specify the schema with no luck. Does such an option exist?

On Thu, Jan 7, 2021 at 2:33 AM Timo Walther <twal...@apache.org <mailto:twal...@apache.org>> wrote:

    Hi Aeden,

    `format.avro-schema` is not required anymore in the new design. The
    Avro
    schema is derived entirely from the table's schema.

    Regards,
    Timo



    On 07.01.21 09:41, Aeden Jameson wrote:
     > Hi Timo,
     >
     > Thanks for responding. You're right. So I did update the properties.
     >>From what I can tell the new design you're referring to uses the
     > KafkaDynamicTableFactory, which contains the KEY_FIELDS (key.fields)
     > options, instead of KafkaTableSourceSinkFactoryBase, which doesn't
     > support those options. Is that right? So I updated my
    configuration to
     >
     > connector    = 'kafka'
     > topic   = 'my-topic'
     > properties.group.id <http://properties.group.id> =
    'my-consumer-group'
     > properties.bootstrap.servers = '...'
     > format = 'avro'
     > format.avro-schema = '....'
     > key.fields = 'my_key_field'
     >
     > However, the property format.avro-schema doesn't appear to be
     > supported by KafkaDynamicTableFactory. I get this exception.
     >
     > Caused by: org.apache.flink.table.api.ValidationException:
    Unsupported
     > options found for connector 'kafka'.
     >
     > Unsupported options:
     >
     > format.avro-schema
     >
     > Supported options:
     >
     > connector
     > format
     > key.fields
     > key.fields-prefix
     > key.format
     > properties.bootstrap.servers
     > properties.group.id <http://properties.group.id>
     > property-version
     > scan.startup.mode
     > scan.startup.specific-offsets
     > scan.startup.timestamp-millis
     > scan.topic-partition-discovery.interval
     > sink.parallelism
     > sink.partitioner
     > sink.semantic
     > topic
     > topic-pattern
     > value.fields-include
     > value.format
     >          at
    
org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:324)
     >          at
    
org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:554)
     >          at
    
org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validateExcept(FactoryUtil.java:573)
     >          at
    
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.createDynamicTableSource(KafkaDynamicTableFactory.java:141)
     >          at
    
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
     >          ... 21 more
     >
     > FAILURE: Build failed with an exception.
     >
     >
     >
     >
     > The format.avro-schema property was supported it what looks to me the
     > old design in in KafkaTableSourceSinkFactoryBase with this line,
     >
     >      properties.add(FORMAT + ".*");
     >
     >
    
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java#L160
    
<https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java#L160>
     >
     > Does format.avro-schema need to be specified differently?
     >
     > Thank you,
     > Aeden
     >
     > On Thu, Jan 7, 2021 at 12:15 AM Timo Walther <twal...@apache.org
    <mailto:twal...@apache.org>> wrote:
     >>
     >> Hi Aeden,
     >>
     >> we updated the connector property design in 1.11 [1]. The old
     >> translation layer exists for backwards compatibility and is
    indicated by
     >> `connector.type=kafka`.
     >>
     >> However, `connector = kafka` indicates the new property design and
     >> `key.fields` is only available there. Please check all
    properties again
     >> when upgrading, they are mentioned here [2].
     >>
     >> Regards,
     >> Timo
     >>
     >>
     >> [1]
     >>
    
https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory
    
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory>
     >> [2]
    https://ci.apache.org/projects/flink/flink-docs-release-1.12/
    <https://ci.apache.org/projects/flink/flink-docs-release-1.12/>
     >>
     >>
     >> On 06.01.21 18:35, Aeden Jameson wrote:
     >>> Yes, I do have that dependency. I see it in the dependency view of
     >>> intellij and directly. in the uber jar. Thanks for responding.
     >>>
     >>> - Aeden
     >>>
     >>> On Wed, Jan 6, 2021 at 8:06 AM Piotr Nowojski
    <pnowoj...@apache.org <mailto:pnowoj...@apache.org>> wrote:
     >>>>
     >>>> Hey,
     >>>>
     >>>> have you added Kafka connector as the dependency? [1]
     >>>>
     >>>> [1]
    
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html#dependencies
    
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html#dependencies>
     >>>>
     >>>> Best,
     >>>> Piotrek
     >>>>
     >>>> śr., 6 sty 2021 o 04:37 Aeden Jameson <aeden.jame...@gmail.com
    <mailto:aeden.jame...@gmail.com>> napisał(a):
     >>>>>
     >>>>> I've upgraded from 1.11.1 to 1.12 in hopes of using the
    key.fields
     >>>>> feature of the Kafa SQL Connector. My current connector is
    configured
     >>>>> as ,
     >>>>>
     >>>>> connector.type    = 'kafka'
     >>>>> connector.version = 'universal'
     >>>>> connector.topic   = 'my-topic'
     >>>>> connector.properties.group.id
    <http://connector.properties.group.id> = 'my-consumer-group'
     >>>>> connector.properties.bootstrap.servers = '...'
     >>>>> format.type = 'avro'
     >>>>> format.avro-schema = '....'
     >>>>>
     >>>>> I tried adding
     >>>>>
     >>>>> key.fields = 'my_key_field'
     >>>>>
     >>>>> as well as
     >>>>>
     >>>>> key.format = 'avro'
     >>>>> key.fields = 'my_key_field'
     >>>>>
     >>>>> but I get the exception
     >>>>>
     >>>>> Caused by:
    org.apache.flink.table.api.NoMatchingTableFactoryException:
     >>>>> Could not find a suitable table factory for
     >>>>> 'org.apache.flink.table.factories.TableSourceFactory' in
     >>>>> the classpath.
     >>>>>
     >>>>> Reason: No factory supports all properties.
     >>>>>
     >>>>> The matching candidates:
     >>>>>
    org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
     >>>>> Unsupported property keys:
     >>>>> key.fields
     >>>>> key.format
     >>>>>
     >>>>> The following factories have been considered:
     >>>>> org.apache.flink.table.sources.CsvBatchTableSourceFactory
     >>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
     >>>>>
    org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
     >>>>>           at
    
org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434)
     >>>>>           at
    
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195)
     >>>>>           at
    
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
     >>>>>           at
    
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
     >>>>>           at
    
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46)
     >>>>>           ... 21 more
     >>>>>
     >>>>> I have validated that the uber jar clearly contains the 1.12
     >>>>> dependencies. What is that magic combination of properties to get
     >>>>> key.fields to work? Or is it not supported with avro?
     >>>>>
     >>>>> --
     >>>>> Thank You,
     >>>>> Aeden
     >>>
     >>>
     >>>
     >>
     >


Reply via email to