我以相同的方式 实现了一个 KafkaRetractTableSink, 然后打成 jar 包放在 lib 目录下启动 sql-client:
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
at
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:146)
at
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
at
org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:377)
at
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:559)
at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
at
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:557)
at
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:494)
at
org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:159)
at
org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:118)
at
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:742)
... 3 more
这个改怎样解决呢?
谢谢,
王磊
[email protected]
Sender: [email protected]
Send Time: 2020-03-29 10:32
Receiver: [email protected]
Subject: RE: 实现 KafkaUpsertTableSink
Benchao,非常感谢大佬,之前不知道需要添加 Factory 到文件中,添加之后,可以正常运行了。
-----Original Message-----
From: [email protected]
<[email protected]> On Behalf Of Benchao
Li
Sent: Saturday, March 28, 2020 6:28 PM
To: user-zh <[email protected]>
Subject: Re: 实现 KafkaUpsertTableSink
Hi,
你需要把你新增的Factory添加到 resources下的
META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢?
<[email protected]> 于2020年3月28日周六 下午5:38写道:
> 各位大佬:
>
> 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照
> KafkaTableSink 和 HbaseUpsertTableSink 的一套逻辑实现了一套
> KafkaUpsertTableSink:
>
> KafkaUpsertTableSink
>
> KafkaUpsertTableSinkBase
>
> KafkaUpsertTableSourceSinkFactory
>
> KafkaUpsertTableSourceSinkFactoryBase
>
> MyKafkaValidator
>
> 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的
> KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册
> 呢?
>
>
>
>
> /**
> * Searches for factories using Java service providers.
> *
> * @return all factories in the classpath */ private static
> List<TableFactory> discoverFactories(Optional<ClassLoader>
> classLoader) {
> try {
> List<TableFactory> result = new LinkedList<>();
> ClassLoader cl =
> classLoader.orElse(Thread.currentThread().getContextClassLoader());
> ServiceLoader
> .load(TableFactory.class, cl)
> .iterator()
> .forEachRemaining(result::add);
> //todo add
> result.add(new KafkaUpsertTableSourceSinkFactory());
> return result;
> } catch (ServiceConfigurationError e) {
> LOG.error("Could not load service provider for table factories.", e);
> throw new TableException("Could not load service provider for
> table factories.", e);
> }
>
> }
>
>
>
>
>
> 直接在返回的 result 里面添加对应的 KafkaUpsertTableSourceSinkFactory 是可
> 以成功运行的。
>
> 非常感谢
>
>
>
>
>
> ------------------
>
> Thanks
>
> venn
>
>
>
>
--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [email protected]; [email protected]