Brilliant, thank you. That will come in handy. I was looking through docs hoping there was a way to still specify the schema with no luck. Does such an option exist?
On Thu, Jan 7, 2021 at 2:33 AM Timo Walther <twal...@apache.org> wrote: > Hi Aeden, > > `format.avro-schema` is not required anymore in the new design. The Avro > schema is derived entirely from the table's schema. > > Regards, > Timo > > > > On 07.01.21 09:41, Aeden Jameson wrote: > > Hi Timo, > > > > Thanks for responding. You're right. So I did update the properties. > >>From what I can tell the new design you're referring to uses the > > KafkaDynamicTableFactory, which contains the KEY_FIELDS (key.fields) > > options, instead of KafkaTableSourceSinkFactoryBase, which doesn't > > support those options. Is that right? So I updated my configuration to > > > > connector = 'kafka' > > topic = 'my-topic' > > properties.group.id = 'my-consumer-group' > > properties.bootstrap.servers = '...' > > format = 'avro' > > format.avro-schema = '....' > > key.fields = 'my_key_field' > > > > However, the property format.avro-schema doesn't appear to be > > supported by KafkaDynamicTableFactory. I get this exception. > > > > Caused by: org.apache.flink.table.api.ValidationException: Unsupported > > options found for connector 'kafka'. > > > > Unsupported options: > > > > format.avro-schema > > > > Supported options: > > > > connector > > format > > key.fields > > key.fields-prefix > > key.format > > properties.bootstrap.servers > > properties.group.id > > property-version > > scan.startup.mode > > scan.startup.specific-offsets > > scan.startup.timestamp-millis > > scan.topic-partition-discovery.interval > > sink.parallelism > > sink.partitioner > > sink.semantic > > topic > > topic-pattern > > value.fields-include > > value.format > > at > org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:324) > > at > org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:554) > > at > org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validateExcept(FactoryUtil.java:573) > > at > org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.createDynamicTableSource(KafkaDynamicTableFactory.java:141) > > at > org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122) > > ... 21 more > > > > FAILURE: Build failed with an exception. > > > > > > > > > > The format.avro-schema property was supported it what looks to me the > > old design in in KafkaTableSourceSinkFactoryBase with this line, > > > > properties.add(FORMAT + ".*"); > > > > > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java#L160 > > > > Does format.avro-schema need to be specified differently? > > > > Thank you, > > Aeden > > > > On Thu, Jan 7, 2021 at 12:15 AM Timo Walther <twal...@apache.org> wrote: > >> > >> Hi Aeden, > >> > >> we updated the connector property design in 1.11 [1]. The old > >> translation layer exists for backwards compatibility and is indicated by > >> `connector.type=kafka`. > >> > >> However, `connector = kafka` indicates the new property design and > >> `key.fields` is only available there. Please check all properties again > >> when upgrading, they are mentioned here [2]. > >> > >> Regards, > >> Timo > >> > >> > >> [1] > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory > >> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/ > >> > >> > >> On 06.01.21 18:35, Aeden Jameson wrote: > >>> Yes, I do have that dependency. I see it in the dependency view of > >>> intellij and directly. in the uber jar. Thanks for responding. > >>> > >>> - Aeden > >>> > >>> On Wed, Jan 6, 2021 at 8:06 AM Piotr Nowojski <pnowoj...@apache.org> > wrote: > >>>> > >>>> Hey, > >>>> > >>>> have you added Kafka connector as the dependency? [1] > >>>> > >>>> [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html#dependencies > >>>> > >>>> Best, > >>>> Piotrek > >>>> > >>>> śr., 6 sty 2021 o 04:37 Aeden Jameson <aeden.jame...@gmail.com> > napisał(a): > >>>>> > >>>>> I've upgraded from 1.11.1 to 1.12 in hopes of using the key.fields > >>>>> feature of the Kafa SQL Connector. My current connector is configured > >>>>> as , > >>>>> > >>>>> connector.type = 'kafka' > >>>>> connector.version = 'universal' > >>>>> connector.topic = 'my-topic' > >>>>> connector.properties.group.id = 'my-consumer-group' > >>>>> connector.properties.bootstrap.servers = '...' > >>>>> format.type = 'avro' > >>>>> format.avro-schema = '....' > >>>>> > >>>>> I tried adding > >>>>> > >>>>> key.fields = 'my_key_field' > >>>>> > >>>>> as well as > >>>>> > >>>>> key.format = 'avro' > >>>>> key.fields = 'my_key_field' > >>>>> > >>>>> but I get the exception > >>>>> > >>>>> Caused by: > org.apache.flink.table.api.NoMatchingTableFactoryException: > >>>>> Could not find a suitable table factory for > >>>>> 'org.apache.flink.table.factories.TableSourceFactory' in > >>>>> the classpath. > >>>>> > >>>>> Reason: No factory supports all properties. > >>>>> > >>>>> The matching candidates: > >>>>> > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory > >>>>> Unsupported property keys: > >>>>> key.fields > >>>>> key.format > >>>>> > >>>>> The following factories have been considered: > >>>>> org.apache.flink.table.sources.CsvBatchTableSourceFactory > >>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory > >>>>> > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory > >>>>> at > org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434) > >>>>> at > org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195) > >>>>> at > org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) > >>>>> at > org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96) > >>>>> at > org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46) > >>>>> ... 21 more > >>>>> > >>>>> I have validated that the uber jar clearly contains the 1.12 > >>>>> dependencies. What is that magic combination of properties to get > >>>>> key.fields to work? Or is it not supported with avro? > >>>>> > >>>>> -- > >>>>> Thank You, > >>>>> Aeden > >>> > >>> > >>> > >> > > > >