[ https://issues.apache.org/jira/browse/BEAM-10945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17479258#comment-17479258 ]
Etienne Chauchot commented on BEAM-10945: ----------------------------------------- 1024 is the ES slice API limitation. > ElasticsearchIO performs 0 division on DirectRunner > --------------------------------------------------- > > Key: BEAM-10945 > URL: https://issues.apache.org/jira/browse/BEAM-10945 > Project: Beam > Issue Type: Bug > Components: io-java-elasticsearch, runner-direct > Affects Versions: 2.23.0 > Environment: Beam 2.23 > Java 1.8.0_265 > Ubuntu 16.04 > Elastic version of cluster 7.9.1, cross cluster setup > Parallelism of direct runner 8 > Reporter: Milan Nikl > Priority: P3 > > h1. Environment configuration > In my company we use [Elasticsearch cross > cluster|https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-cross-cluster-search.html#ccs-supported-apis] > setup for search. Cluster version is 7.9.1. > I intended to use ElasticsearchIO for reading application logs and > subsequently producing some aggregated data. > h1. Problem description > # In cross cluster ES setup, there is no {{/<index>/_stats}} API available, > so it is not possible to compute > [ElasticsearchIO#getEstimatedSizeBytes|https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L692] > properly. > # {{statsJson}} returned by the cluster looks like this: > {quote}Unknown macro: \{ "_shards" } > , > "_all" : > Unknown macro: \{ "primaries" } > , > "total" : \{ } > }, > "indices" : \{ } > } > {quote} > # That means that {{totalCount}} value cannot be parsed from the json and is > thus set to {{0}}. > # Which means that {{estimatedByteSize}} value [is set to > 1|https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L707] > (Which itself is a workaround for similar issue.) > # {{ElasticsearchIO#getEstimatedSizeBytes}} is used in > [BoundedReadEvaluatorFactory#getInitialInputs|https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java#L212] > which does not check the value and performs division of two {{long}} values, > which of course results in {{0}} for any {{targetParallelism > 1}}. > # Then > [ElasticsearchIO#split|https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L665] > is called with {{indexSize = 1}} and {{desiredBundleSizeBytes = 1}}. Which > sets {{nbBundlesFloat}} value to infinity. > # Even though the number of bundles is ceiled at {{1024}}, reading from 1024 > BoundedElasticsearchSources concurrently makes the ElasticsearchIO virtually > impossible to use on direct runner. > h1. Resolution suggestion > I still haven't tested reading from ElasticsearchIO on proper runner (we use > flink 1.10.2), so I cannot either confirm or deny its functionality on our > elastic setup. At the moment I'm just suggesting few checks of input values > so the zero division and unnecessary parallelism problems are eliminated on > direct runner. -- This message was sent by Atlassian Jira (v8.20.1#820001)