There are plans to also derive the table schema from Avro schema. But we
haven't decided on a syntax for this yet. For now, we only support this
through catalogs such as Confluent schema registry.
Regards,
Timo
On 07.01.21 21:42, Aeden Jameson wrote:
Brilliant, thank you. That will come in handy. I was looking through
docs hoping there was a way to still specify the schema with no luck.
Does such an option exist?
On Thu, Jan 7, 2021 at 2:33 AM Timo Walther <twal...@apache.org
<mailto:twal...@apache.org>> wrote:
Hi Aeden,
`format.avro-schema` is not required anymore in the new design. The
Avro
schema is derived entirely from the table's schema.
Regards,
Timo
On 07.01.21 09:41, Aeden Jameson wrote:
> Hi Timo,
>
> Thanks for responding. You're right. So I did update the properties.
>>From what I can tell the new design you're referring to uses the
> KafkaDynamicTableFactory, which contains the KEY_FIELDS (key.fields)
> options, instead of KafkaTableSourceSinkFactoryBase, which doesn't
> support those options. Is that right? So I updated my
configuration to
>
> connector = 'kafka'
> topic = 'my-topic'
> properties.group.id <http://properties.group.id> =
'my-consumer-group'
> properties.bootstrap.servers = '...'
> format = 'avro'
> format.avro-schema = '....'
> key.fields = 'my_key_field'
>
> However, the property format.avro-schema doesn't appear to be
> supported by KafkaDynamicTableFactory. I get this exception.
>
> Caused by: org.apache.flink.table.api.ValidationException:
Unsupported
> options found for connector 'kafka'.
>
> Unsupported options:
>
> format.avro-schema
>
> Supported options:
>
> connector
> format
> key.fields
> key.fields-prefix
> key.format
> properties.bootstrap.servers
> properties.group.id <http://properties.group.id>
> property-version
> scan.startup.mode
> scan.startup.specific-offsets
> scan.startup.timestamp-millis
> scan.topic-partition-discovery.interval
> sink.parallelism
> sink.partitioner
> sink.semantic
> topic
> topic-pattern
> value.fields-include
> value.format
> at
org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:324)
> at
org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:554)
> at
org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validateExcept(FactoryUtil.java:573)
> at
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.createDynamicTableSource(KafkaDynamicTableFactory.java:141)
> at
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
> ... 21 more
>
> FAILURE: Build failed with an exception.
>
>
>
>
> The format.avro-schema property was supported it what looks to me the
> old design in in KafkaTableSourceSinkFactoryBase with this line,
>
> properties.add(FORMAT + ".*");
>
>
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java#L160
<https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java#L160>
>
> Does format.avro-schema need to be specified differently?
>
> Thank you,
> Aeden
>
> On Thu, Jan 7, 2021 at 12:15 AM Timo Walther <twal...@apache.org
<mailto:twal...@apache.org>> wrote:
>>
>> Hi Aeden,
>>
>> we updated the connector property design in 1.11 [1]. The old
>> translation layer exists for backwards compatibility and is
indicated by
>> `connector.type=kafka`.
>>
>> However, `connector = kafka` indicates the new property design and
>> `key.fields` is only available there. Please check all
properties again
>> when upgrading, they are mentioned here [2].
>>
>> Regards,
>> Timo
>>
>>
>> [1]
>>
https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory>
>> [2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/
<https://ci.apache.org/projects/flink/flink-docs-release-1.12/>
>>
>>
>> On 06.01.21 18:35, Aeden Jameson wrote:
>>> Yes, I do have that dependency. I see it in the dependency view of
>>> intellij and directly. in the uber jar. Thanks for responding.
>>>
>>> - Aeden
>>>
>>> On Wed, Jan 6, 2021 at 8:06 AM Piotr Nowojski
<pnowoj...@apache.org <mailto:pnowoj...@apache.org>> wrote:
>>>>
>>>> Hey,
>>>>
>>>> have you added Kafka connector as the dependency? [1]
>>>>
>>>> [1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html#dependencies
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html#dependencies>
>>>>
>>>> Best,
>>>> Piotrek
>>>>
>>>> śr., 6 sty 2021 o 04:37 Aeden Jameson <aeden.jame...@gmail.com
<mailto:aeden.jame...@gmail.com>> napisał(a):
>>>>>
>>>>> I've upgraded from 1.11.1 to 1.12 in hopes of using the
key.fields
>>>>> feature of the Kafa SQL Connector. My current connector is
configured
>>>>> as ,
>>>>>
>>>>> connector.type = 'kafka'
>>>>> connector.version = 'universal'
>>>>> connector.topic = 'my-topic'
>>>>> connector.properties.group.id
<http://connector.properties.group.id> = 'my-consumer-group'
>>>>> connector.properties.bootstrap.servers = '...'
>>>>> format.type = 'avro'
>>>>> format.avro-schema = '....'
>>>>>
>>>>> I tried adding
>>>>>
>>>>> key.fields = 'my_key_field'
>>>>>
>>>>> as well as
>>>>>
>>>>> key.format = 'avro'
>>>>> key.fields = 'my_key_field'
>>>>>
>>>>> but I get the exception
>>>>>
>>>>> Caused by:
org.apache.flink.table.api.NoMatchingTableFactoryException:
>>>>> Could not find a suitable table factory for
>>>>> 'org.apache.flink.table.factories.TableSourceFactory' in
>>>>> the classpath.
>>>>>
>>>>> Reason: No factory supports all properties.
>>>>>
>>>>> The matching candidates:
>>>>>
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
>>>>> Unsupported property keys:
>>>>> key.fields
>>>>> key.format
>>>>>
>>>>> The following factories have been considered:
>>>>> org.apache.flink.table.sources.CsvBatchTableSourceFactory
>>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>>>>>
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
>>>>> at
org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434)
>>>>> at
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195)
>>>>> at
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
>>>>> at
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
>>>>> at
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46)
>>>>> ... 21 more
>>>>>
>>>>> I have validated that the uber jar clearly contains the 1.12
>>>>> dependencies. What is that magic combination of properties to get
>>>>> key.fields to work? Or is it not supported with avro?
>>>>>
>>>>> --
>>>>> Thank You,
>>>>> Aeden
>>>
>>>
>>>
>>
>