您好,我目前在调研Flink对于即席查询场景的支持程度,打算通过Flink查询HDFS中的数据,对查询实效性要求高,查询平均时延要求在秒级。 我调研了Flink集群的多种部署模式,发现Standalone on k8s 模式下的 Flink Session集群<https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/deployment/resource-providers/standalone/kubernetes/#kubernetes-%e4%b8%8a%e7%9a%84-flink-session-%e9%9b%86%e7%be%a4> 最满足这种需求,因此搭建了该种模式的Flink集群,打算通过我们自研的Java项目集成Flink API提交查询SQL到Flink集群执行。
目前我发现通过Java项目往Flink提交SQL有两种方式: 方式一:通过Flink Table API<https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/connectors/table/hive/overview/#%e8%bf%9e%e6%8e%a5%e5%88%b0hive> 的方式 这种方式需要将集成Flink Table API的代码打成jar包,放在我们Java项目服务的服务器上,然后在Java项目内通过调用启动脚本的方式往Flink集群提交任务,类似:flink run -m {host}:{port} xxx.jar。 这种方式的缺点是main()方法在客户端执行,而且涉及到客户端往JobManager、JobManager往TaskManager分发jar包的过程,时延较高,一般至少需要十秒以上,不太满足即席查询对时延的要求。 方式二:采用类似SQL客户端<https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/sqlclient/> 的方式 这种方式没有分发jar包的过程,相对第一种方式而言时延较低,问题就在于Java项目该如何集成SQL客户端?我研究了相关代码,打算通过我们自研的Java项目直接调用Flink SqlClient的相关方法,类似:SQL客户端提交SQL demo<https://github.com/apache/flink/blob/release-1.14.3/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java#L176>。但是这种方式我们接收到的返回内容是字符串,而不是结构化的Java对象,不像Table API封装的那么好,需要自行做反序列化处理,而且我个人觉得这种方式不太合适。 综上,我想请教下您两个问题: 问题一:Flink Standalone集群其实就是常驻进程了,类似Presto这种引擎,上述方式一有没有可能Java项目集成Flink Table API时,直接在Java项目内运行这段代码,相当于该Java服务作为客户端,直接往Flink集群提交SQL,而不是绕了一次,先打好jar包再通过 flink run提交jar包的方式提交SQL。 我想让这段代码直接在Java项目提供的服务内直接运行,不知道目前能不能做到。 [cid:image001.png@01D89778.222ACE50] 问题二:除了问题一的解决方案,还有没有其他方式能满足目前我们这种需求? 抱歉打扰您了,万分感谢!