各位大佬:
由于现在的 KafkaTableSink 不支持 sql 中有group ,参照
KafkaTableSink 和 HbaseUpsertTableSink 的一套逻辑实现了一套
KafkaUpsertTableSink:
KafkaUpsertTableSink
KafkaUpsertTableSinkBase
KafkaUpsertTableSourceSinkFactory
KafkaUpsertTableSourceSinkFactoryBase
MyKafkaValidator
但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的
KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册
呢?
/**
* Searches for factories using Java service providers.
*
* @return all factories in the classpath
*/
private static List<TableFactory> discoverFactories(Optional<ClassLoader>
classLoader) {
try {
List<TableFactory> result = new LinkedList<>();
ClassLoader cl =
classLoader.orElse(Thread.currentThread().getContextClassLoader());
ServiceLoader
.load(TableFactory.class, cl)
.iterator()
.forEachRemaining(result::add);
//todo add
result.add(new KafkaUpsertTableSourceSinkFactory());
return result;
} catch (ServiceConfigurationError e) {
LOG.error("Could not load service provider for table factories.", e);
throw new TableException("Could not load service provider for table
factories.", e);
}
}
直接在返回的 result 里面添加对应的 KafkaUpsertTableSourceSinkFactory 是可
以成功运行的。
非常感谢
------------------
Thanks
venn