Hi, 你需要把你新增的Factory添加到 resources下的 META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢?
<[email protected]> 于2020年3月28日周六 下午5:38写道: > 各位大佬: > > 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照 > KafkaTableSink 和 HbaseUpsertTableSink 的一套逻辑实现了一套 > KafkaUpsertTableSink: > > KafkaUpsertTableSink > > KafkaUpsertTableSinkBase > > KafkaUpsertTableSourceSinkFactory > > KafkaUpsertTableSourceSinkFactoryBase > > MyKafkaValidator > > 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的 > KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册 > 呢? > > > > > /** > * Searches for factories using Java service providers. > * > * @return all factories in the classpath > */ > private static List<TableFactory> discoverFactories(Optional<ClassLoader> > classLoader) { > try { > List<TableFactory> result = new LinkedList<>(); > ClassLoader cl = > classLoader.orElse(Thread.currentThread().getContextClassLoader()); > ServiceLoader > .load(TableFactory.class, cl) > .iterator() > .forEachRemaining(result::add); > //todo add > result.add(new KafkaUpsertTableSourceSinkFactory()); > return result; > } catch (ServiceConfigurationError e) { > LOG.error("Could not load service provider for table factories.", e); > throw new TableException("Could not load service provider for table > factories.", e); > } > > } > > > > > > 直接在返回的 result 里面添加对应的 KafkaUpsertTableSourceSinkFactory 是可 > 以成功运行的。 > > 非常感谢 > > > > > > ------------------ > > Thanks > > venn > > > > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [email protected]; [email protected]
