You're correct in your assessment that ElasticsearchIO does not currently
support queries with aggregations.  There's a large difference between
scrolling over large sets of documents (which has a common interface
provided by ES) Vs aggregations where user-code in the query will impact
the output fields.  Another major difference is that queries with aggs
effectively have a singular output – the result of the aggs – rather than a
potentially huge number of documents as "hits".  However, aggs could
produce thousands of buckets where each bucket is desired to be an element
in a PCollection, so it could be useful to support this use-case in the
future.

In order to work around this, there are 2 options that jump to mind:

   1. Transfer the filtering logic to Beam:  Read all the documents that
   you might be interested in using ElasticsearchIO with just a search (no
   aggs) and implement the grouping per user + filtering within Beam using
   PTransforms
   2. Write your own Source (SplittableDoFn?) which makes your above query
   (with aggs) using the Elasticsearch RestClient, parses the response, and
   outputs the parsed elements one-by-one to a PCollection for further
   processing. This would effectively be the approach to supporting your
   desired functionality within ElasticsearchIO as well.

- Evan

On Wed, Mar 9, 2022 at 3:52 AM Nick Pan <ashfur1...@gmail.com> wrote:

> Hello,
>
> I have a use case where I need to first compute an aggregation for each
> key, and then filter out the keys based on some criteria.   And finally
> feed the matched keys as an input to PCollection using ElasticsearchIO
> read.  But ElasticsearchIO does not seem to support query that contains
> aggregation:
>
> Error message from worker: org.elasticsearch.client.ResponseException:
> method [GET], host [https://...], URI [...], status line [HTTP/1.1 400
> Bad Request]
> {"error":{"root_cause":[{"type":"parsing_exception","reason":"request does
> not support
> [aggregations]","line":1,"col":135}],"type":"parsing_exception","reason":"request
> does not support [aggregations]","line":1,"col":135},"status":400}
> org.elasticsearch.client.RestClient.convertResponse(RestClient.java:331)
> org.elasticsearch.client.RestClient.performRequest(RestClient.java:301)
> org.elasticsearch.client.RestClient.performRequest(RestClient.java:276)
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource.queryCount(ElasticsearchIO.java:780)
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource.getEstimatedSizeBytes(ElasticsearchIO.java:762)
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource.split(ElasticsearchIO.java:710)
>
>
> Here is an example of the Elasticsearch query I am trying to do:
>
> {
>
>  "aggs": {
>
>     "user_id": {
>
>       "composite": {
>
>         "sources": [
>
>           { "user_id": { "terms": { "field": "user_id" } } }
>
>         ]
>
>       },
>
>       "aggs": {
>
>         "min": {
>
>           "min": {
>
>             "field": "play_time"
>
>           }
>
>         },
>
>         "max": {
>
>           "max": {
>
>             "field": "play_time"
>
>           }
>
>         },
>
>         "diff": {
>
>           "bucket_selector": {
>
>             "buckets_path": {
>
>               "min": "min",
>
>               "max": "max"
>
>             },
>
>             "script": "params.max - params.min > 5000"
>
>           }
>
>         }
>
>       }
>
>     }
>
>   }
>
> }
>
>
>
> Is Elasticsearch query that contains aggregation not supported in
> ElasticsearchIO?  If not, is there a way to work around this?
>
>
> Thanks,
>
> Nick
>

Reply via email to