这个应该是得到两个 tableFactories 了。 我直接平行的把 KafkaTableSourceSinkFactory 那一套变成 
KafkaRetractTableSourceSinkFactory 写了一遍
但这个应该怎样改才合适呢?

    137         private static <T extends TableFactory> T findSingleInternal(
    138                         Class<T> factoryClass,
    139                         Map<String, String> properties,
    140                         Optional<ClassLoader> classLoader) {
    141
    142                 List<TableFactory> tableFactories = 
discoverFactories(classLoader);
    143                 List<T> filtered = filter(tableFactories, factoryClass, 
properties);
    144
    145                 if (filtered.size() > 1) {
    146                         throw new AmbiguousTableFactoryException(
    147                                 filtered,
    148                                 factoryClass,
    149                                 tableFactories,
    150                                 properties);
    151                 } else {
    152                         return filtered.get(0);
    153                 }
    154         }


谢谢,
王磊


[email protected] 

 
Sender: [email protected]
Send Time: 2020-03-31 10:50
Receiver: user-zh
Subject: Re: RE: 实现 KafkaUpsertTableSink
 
我以相同的方式 实现了一个  KafkaRetractTableSink, 然后打成 jar 包放在 lib 目录下启动 sql-client:
 
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
        at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:146)
        at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
        at 
org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:377)
        at 
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:559)
        at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
        at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:557)
        at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:494)
        at 
org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:159)
        at 
org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:118)
        at 
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:742)
        ... 3 more
 
这个改怎样解决呢?
 
谢谢,
王磊
 
 
 
[email protected] 
 
Sender: [email protected]
Send Time: 2020-03-29 10:32
Receiver: [email protected]
Subject: RE: 实现 KafkaUpsertTableSink
Benchao,非常感谢大佬,之前不知道需要添加 Factory 到文件中,添加之后,可以正常运行了。
-----Original Message-----
From: [email protected] 
<[email protected]> On Behalf Of Benchao 
Li
Sent: Saturday, March 28, 2020 6:28 PM
To: user-zh <[email protected]>
Subject: Re: 实现 KafkaUpsertTableSink
Hi,
你需要把你新增的Factory添加到 resources下的
META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢?
<[email protected]> 于2020年3月28日周六 下午5:38写道:
> 各位大佬:
>
>                 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照
> KafkaTableSink 和 HbaseUpsertTableSink  的一套逻辑实现了一套
> KafkaUpsertTableSink:
>
>                 KafkaUpsertTableSink
>
> KafkaUpsertTableSinkBase
>
> KafkaUpsertTableSourceSinkFactory
>
> KafkaUpsertTableSourceSinkFactoryBase
>
> MyKafkaValidator
>
> 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的
> KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册
> 呢?
>
>
>
>
> /**
> * Searches for factories using Java service providers.
> *
> * @return all factories in the classpath */ private static 
> List<TableFactory> discoverFactories(Optional<ClassLoader>
> classLoader) {
>    try {
>       List<TableFactory> result = new LinkedList<>();
>       ClassLoader cl =
> classLoader.orElse(Thread.currentThread().getContextClassLoader());
>       ServiceLoader
>          .load(TableFactory.class, cl)
>          .iterator()
>          .forEachRemaining(result::add);
>       //todo add
>       result.add(new KafkaUpsertTableSourceSinkFactory());
>       return result;
>    } catch (ServiceConfigurationError e) {
>       LOG.error("Could not load service provider for table factories.", e);
>       throw new TableException("Could not load service provider for 
> table factories.", e);
>    }
>
> }
>
>
>
>
>
> 直接在返回的 result 里面添加对应的  KafkaUpsertTableSourceSinkFactory  是可
> 以成功运行的。
>
> 非常感谢
>
>
>
>
>
> ------------------
>
> Thanks
>
> venn
>
>
>
>
-- 
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [email protected]; [email protected]

回复