Brilliant, thank you. That will come in handy. I was looking through docs
hoping there was a way to still specify the schema with no luck. Does such
an option exist?

On Thu, Jan 7, 2021 at 2:33 AM Timo Walther <twal...@apache.org> wrote:

> Hi Aeden,
>
> `format.avro-schema` is not required anymore in the new design. The Avro
> schema is derived entirely from the table's schema.
>
> Regards,
> Timo
>
>
>
> On 07.01.21 09:41, Aeden Jameson wrote:
> > Hi Timo,
> >
> > Thanks for responding. You're right. So I did update the properties.
> >>From what I can tell the new design you're referring to uses the
> > KafkaDynamicTableFactory, which contains the KEY_FIELDS (key.fields)
> > options, instead of KafkaTableSourceSinkFactoryBase, which doesn't
> > support those options. Is that right? So I updated my configuration to
> >
> > connector    = 'kafka'
> > topic   = 'my-topic'
> > properties.group.id = 'my-consumer-group'
> > properties.bootstrap.servers = '...'
> > format = 'avro'
> > format.avro-schema = '....'
> > key.fields = 'my_key_field'
> >
> > However, the property format.avro-schema doesn't appear to be
> > supported by KafkaDynamicTableFactory. I get this exception.
> >
> > Caused by: org.apache.flink.table.api.ValidationException: Unsupported
> > options found for connector 'kafka'.
> >
> > Unsupported options:
> >
> > format.avro-schema
> >
> > Supported options:
> >
> > connector
> > format
> > key.fields
> > key.fields-prefix
> > key.format
> > properties.bootstrap.servers
> > properties.group.id
> > property-version
> > scan.startup.mode
> > scan.startup.specific-offsets
> > scan.startup.timestamp-millis
> > scan.topic-partition-discovery.interval
> > sink.parallelism
> > sink.partitioner
> > sink.semantic
> > topic
> > topic-pattern
> > value.fields-include
> > value.format
> >          at
> org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:324)
> >          at
> org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:554)
> >          at
> org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validateExcept(FactoryUtil.java:573)
> >          at
> org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.createDynamicTableSource(KafkaDynamicTableFactory.java:141)
> >          at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
> >          ... 21 more
> >
> > FAILURE: Build failed with an exception.
> >
> >
> >
> >
> > The format.avro-schema property was supported it what looks to me the
> > old design in in KafkaTableSourceSinkFactoryBase with this line,
> >
> >      properties.add(FORMAT + ".*");
> >
> >
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java#L160
> >
> > Does format.avro-schema need to be specified differently?
> >
> > Thank you,
> > Aeden
> >
> > On Thu, Jan 7, 2021 at 12:15 AM Timo Walther <twal...@apache.org> wrote:
> >>
> >> Hi Aeden,
> >>
> >> we updated the connector property design in 1.11 [1]. The old
> >> translation layer exists for backwards compatibility and is indicated by
> >> `connector.type=kafka`.
> >>
> >> However, `connector = kafka` indicates the new property design and
> >> `key.fields` is only available there. Please check all properties again
> >> when upgrading, they are mentioned here [2].
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >> [1]
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory
> >> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/
> >>
> >>
> >> On 06.01.21 18:35, Aeden Jameson wrote:
> >>> Yes, I do have that dependency. I see it in the dependency view of
> >>> intellij and directly. in the uber jar. Thanks for responding.
> >>>
> >>> - Aeden
> >>>
> >>> On Wed, Jan 6, 2021 at 8:06 AM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
> >>>>
> >>>> Hey,
> >>>>
> >>>> have you added Kafka connector as the dependency? [1]
> >>>>
> >>>> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html#dependencies
> >>>>
> >>>> Best,
> >>>> Piotrek
> >>>>
> >>>> śr., 6 sty 2021 o 04:37 Aeden Jameson <aeden.jame...@gmail.com>
> napisał(a):
> >>>>>
> >>>>> I've upgraded from 1.11.1 to 1.12 in hopes of using the key.fields
> >>>>> feature of the Kafa SQL Connector. My current connector is configured
> >>>>> as ,
> >>>>>
> >>>>> connector.type    = 'kafka'
> >>>>> connector.version = 'universal'
> >>>>> connector.topic   = 'my-topic'
> >>>>> connector.properties.group.id = 'my-consumer-group'
> >>>>> connector.properties.bootstrap.servers = '...'
> >>>>> format.type = 'avro'
> >>>>> format.avro-schema = '....'
> >>>>>
> >>>>> I tried adding
> >>>>>
> >>>>> key.fields = 'my_key_field'
> >>>>>
> >>>>> as well as
> >>>>>
> >>>>> key.format = 'avro'
> >>>>> key.fields = 'my_key_field'
> >>>>>
> >>>>> but I get the exception
> >>>>>
> >>>>> Caused by:
> org.apache.flink.table.api.NoMatchingTableFactoryException:
> >>>>> Could not find a suitable table factory for
> >>>>> 'org.apache.flink.table.factories.TableSourceFactory' in
> >>>>> the classpath.
> >>>>>
> >>>>> Reason: No factory supports all properties.
> >>>>>
> >>>>> The matching candidates:
> >>>>>
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> >>>>> Unsupported property keys:
> >>>>> key.fields
> >>>>> key.format
> >>>>>
> >>>>> The following factories have been considered:
> >>>>> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> >>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> >>>>>
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> >>>>>           at
> org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434)
> >>>>>           at
> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195)
> >>>>>           at
> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
> >>>>>           at
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
> >>>>>           at
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46)
> >>>>>           ... 21 more
> >>>>>
> >>>>> I have validated that the uber jar clearly contains the 1.12
> >>>>> dependencies. What is that magic combination of properties to get
> >>>>> key.fields to work? Or is it not supported with avro?
> >>>>>
> >>>>> --
> >>>>> Thank You,
> >>>>> Aeden
> >>>
> >>>
> >>>
> >>
> >
>
>

Reply via email to