[ https://issues.apache.org/jira/browse/FLINK-20579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17248708#comment-17248708 ]
donglei commented on FLINK-20579: --------------------------------- 因此,我们采取的如下的方法,在ElasticsearchSinkBase中的beforeBulk根据同一个批次,写入同一个bulk。 private class BulkProcessorListener implements BulkProcessor.Listener \{ @Override public void beforeBulk(long executionId, BulkRequest request) { if (routePreBulk) {//需要验证下,是否上游有设置route String routing = UUID.randomUUID() + "_" + executionId; List<ActionRequest> requests = request.requests(); requests.forEach(x -> { ((IndexRequest) x).routing(routing); }); LOG.info("start bulk actions: {}, routing: {}", request.numberOfActions(), routing); } } {{}} 这样做的好处是,后面es分片多的时候,由于每次的bulk都有同一个route都发到同一个es node,节省es数据拆分时间和数据落地时间,提升es性能。 初步估计,这部分能提升2倍以上的性能。 此处的讨论点是: # 上游keyby是否可以 由于我们是采用此功能提升性能的,上游keyby之后设置同一个route值,无法保证所有的数据都在一个批次发送,如1w条数据一个route值,但是无法保证1w条数据刚好在同一批次。 # 怎么样判断是否要加route值 由于oceanus不能提供对外的API接口,建议此处采样,比如看一个批次有没有route,如果都没有,认为此sink不需要route值。 # 数据是否均匀 鹰眼运行很久,这种设置,由于bulk基本均匀,es数据均匀 > eash es sink will have > ----------------------- > > Key: FLINK-20579 > URL: https://issues.apache.org/jira/browse/FLINK-20579 > Project: Flink > Issue Type: Improvement > Reporter: donglei > Priority: Major > > BulkProcessorListener beforebulk must have the same route to speed up write > to es > > As we know bulk with same route will send to es only one node and with one > netio one disk io so every > > !http://km.oa.com/files/photos/captures/202007/1593922902_79_w1275_h710.png! > > Therefore, we take the following method. The beforeBulk in > ElasticsearchSinkBase writes the same bulk according to the same batch. like > this, > private class BulkProcessorListener implements BulkProcessor.Listener { > @Override > public void beforeBulk(long executionId, BulkRequest request) { > if (routePreBulk) {//Need to verify, whether there is a route set upstream > String routing = UUID.randomUUID() + "_" + executionId; > List<ActionRequest> requests = request.requests(); > requests.forEach(x -> { > ((IndexRequest) x).routing(routing); > }); > LOG.info("start bulk actions: {}, routing: {}", request.numberOfActions(), > routing); > } > } > The advantage of this is that when there are many es fragments later, because > every bulk has the same route sent to the same es node, it saves es data > splitting time and data landing time, and improves es performance. > Preliminary estimates, this part can improve the performance of more than 2 > times. > The discussion points here are: > Q: can we use keyby and with same route value > A: Since we use this function to improve performance, setting the same route > value after upstream keyby cannot guarantee that all data will be sent in one > batch, such as 1w data and one route value, but there is no guarantee that 1w > data will be in the same batch. . > Q: How to judge whether to add route value > A: Since oceanus cannot provide an external API interface, it is recommended > to sample here, for example, to see if there is a route in a batch, if there > are none, think that this sink does not need a route value. > Q: Is the data uniform > A: we has been running for a long time. In this setting, because bulk is > route value is uniform, es data is uniform > > -- This message was sent by Atlassian Jira (v8.3.4#803005)