lxb6476 commented on issue #518: URL: https://github.com/apache/rocketmq-connect/issues/518#issuecomment-1918701833
遇到了同样的问题,对着源码看了半天,可能有理解不到位的地方,但是至少解决了创建主题这一步的问题,以下是解决方案: 源码仓库中:模块`rocketmq-connect-runtime`中 `org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSourceTask#maybeCreateAndGetTopic`方法: ```Java private String maybeCreateAndGetTopic(ConnectRecord record) { String topic = overwriteTopicFromRecord(record); topic = taskConfig.getString(SourceConnectorConfig.CONNECT_TOPICNAME); // 这里把判断条件取消掉,直接从配置中获取主题名 // if (StringUtils.isBlank(topic)) { // // topic from config // topic = taskConfig.getString(SourceConnectorConfig.CONNECT_TOPICNAME); // } if (StringUtils.isBlank(topic)) { throw new ConnectException("source connect lack of topic config"); } if (!workerConfig.isAutoCreateTopicEnable() || topicCache.contains(topic)) { return topic; } if (!ConnectUtil.isTopicExist(workerConfig, topic)) { ConnectUtil.createTopic(workerConfig, new TopicConfig(topic)); } topicCache.add(topic); return topic; } ``` 这里前半部分`overwriteTopicFromRecord`方法没有细看,但是大概理解是将插入数据的 数据库服务名.数据库名.数据库表 名作为主题名,但是因为出现了`.`符号,所以至少在RocketMQ5.x版本会出现创建主题失败的问题,代码这里对于主题名的修改,在传入的配置参数中,优先级比`overwriteTopicFromRecord`方法生成的低,出现配置了"connect.topicname"但是没有生效的问题,不知道后续是否会优化。。。 ---- Connect感觉是个比较有用的工具,但是文档中举例的demo,可能是限于个人水平,只运行成功了文件的同步的例子,在做其他demo的时候遇到不少问题,对着源码一点点打断点,也没能完全解决,由于时间原因,最后只能放弃使用。未来还是希望项目能够热度再高一些,维护得更好一些。 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org