[ 
https://issues.apache.org/jira/browse/FLINK-20579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17248708#comment-17248708
 ] 

donglei commented on FLINK-20579:
---------------------------------

因此,我们采取的如下的方法,在ElasticsearchSinkBase中的beforeBulk根据同一个批次,写入同一个bulk。

private class BulkProcessorListener implements BulkProcessor.Listener \{
        @Override
        public void beforeBulk(long executionId, BulkRequest request) {

                if (routePreBulk) {//需要验证下,是否上游有设置route
                        String routing = UUID.randomUUID() + "_" + executionId;
                        List<ActionRequest> requests = request.requests();
                        requests.forEach(x -> {
                                ((IndexRequest) x).routing(routing);
                        });
                        LOG.info("start bulk actions: {}, routing: {}", 
request.numberOfActions(), routing);
                }
        }
{{}}

这样做的好处是,后面es分片多的时候,由于每次的bulk都有同一个route都发到同一个es node,节省es数据拆分时间和数据落地时间,提升es性能。
初步估计,这部分能提升2倍以上的性能。

此处的讨论点是:
 # 上游keyby是否可以
由于我们是采用此功能提升性能的,上游keyby之后设置同一个route值,无法保证所有的数据都在一个批次发送,如1w条数据一个route值,但是无法保证1w条数据刚好在同一批次。
 # 怎么样判断是否要加route值
由于oceanus不能提供对外的API接口,建议此处采样,比如看一个批次有没有route,如果都没有,认为此sink不需要route值。
 # 数据是否均匀
鹰眼运行很久,这种设置,由于bulk基本均匀,es数据均匀

> eash es sink will have 
> -----------------------
>
>                 Key: FLINK-20579
>                 URL: https://issues.apache.org/jira/browse/FLINK-20579
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: donglei
>            Priority: Major
>
> BulkProcessorListener beforebulk must have the same route  to speed up write 
> to es
>  
> As we know bulk with same route will send to es only one node and with one 
> netio one disk io  so every 
>  
> !http://km.oa.com/files/photos/captures/202007/1593922902_79_w1275_h710.png!
>  
> Therefore, we take the following method. The beforeBulk in 
> ElasticsearchSinkBase writes the same bulk according to the same batch.  like 
> this,
> private class BulkProcessorListener implements BulkProcessor.Listener {
> @Override
> public void beforeBulk(long executionId, BulkRequest request) {
> if (routePreBulk) {//Need to verify, whether there is a route set upstream
> String routing = UUID.randomUUID() + "_" + executionId;
> List<ActionRequest> requests = request.requests();
> requests.forEach(x -> {
> ((IndexRequest) x).routing(routing);
> });
> LOG.info("start bulk actions: {}, routing: {}", request.numberOfActions(), 
> routing);
> }
> }
> The advantage of this is that when there are many es fragments later, because 
> every bulk has the same route sent to the same es node, it saves es data 
> splitting time and data landing time, and improves es performance.
> Preliminary estimates, this part can improve the performance of more than 2 
> times.
> The discussion points here are:
> Q: can we use  keyby and with same route value
> A: Since we use this function to improve performance, setting the same route 
> value after upstream keyby cannot guarantee that all data will be sent in one 
> batch, such as 1w data and one route value, but there is no guarantee that 1w 
> data will be in the same batch. .
> Q: How to judge whether to add route value
> A: Since oceanus cannot provide an external API interface, it is recommended 
> to sample here, for example, to see if there is a route in a batch, if there 
> are none, think that this sink does not need a route value.
> Q: Is the data uniform
> A: we has been running for a long time. In this setting, because bulk is 
> route value is uniform, es data is uniform
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to