You're correct in your assessment that ElasticsearchIO does not currently support queries with aggregations. There's a large difference between scrolling over large sets of documents (which has a common interface provided by ES) Vs aggregations where user-code in the query will impact the output fields. Another major difference is that queries with aggs effectively have a singular output – the result of the aggs – rather than a potentially huge number of documents as "hits". However, aggs could produce thousands of buckets where each bucket is desired to be an element in a PCollection, so it could be useful to support this use-case in the future.
In order to work around this, there are 2 options that jump to mind: 1. Transfer the filtering logic to Beam: Read all the documents that you might be interested in using ElasticsearchIO with just a search (no aggs) and implement the grouping per user + filtering within Beam using PTransforms 2. Write your own Source (SplittableDoFn?) which makes your above query (with aggs) using the Elasticsearch RestClient, parses the response, and outputs the parsed elements one-by-one to a PCollection for further processing. This would effectively be the approach to supporting your desired functionality within ElasticsearchIO as well. - Evan On Wed, Mar 9, 2022 at 3:52 AM Nick Pan <ashfur1...@gmail.com> wrote: > Hello, > > I have a use case where I need to first compute an aggregation for each > key, and then filter out the keys based on some criteria. And finally > feed the matched keys as an input to PCollection using ElasticsearchIO > read. But ElasticsearchIO does not seem to support query that contains > aggregation: > > Error message from worker: org.elasticsearch.client.ResponseException: > method [GET], host [https://...], URI [...], status line [HTTP/1.1 400 > Bad Request] > {"error":{"root_cause":[{"type":"parsing_exception","reason":"request does > not support > [aggregations]","line":1,"col":135}],"type":"parsing_exception","reason":"request > does not support [aggregations]","line":1,"col":135},"status":400} > org.elasticsearch.client.RestClient.convertResponse(RestClient.java:331) > org.elasticsearch.client.RestClient.performRequest(RestClient.java:301) > org.elasticsearch.client.RestClient.performRequest(RestClient.java:276) > org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource.queryCount(ElasticsearchIO.java:780) > org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource.getEstimatedSizeBytes(ElasticsearchIO.java:762) > org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource.split(ElasticsearchIO.java:710) > > > Here is an example of the Elasticsearch query I am trying to do: > > { > > "aggs": { > > "user_id": { > > "composite": { > > "sources": [ > > { "user_id": { "terms": { "field": "user_id" } } } > > ] > > }, > > "aggs": { > > "min": { > > "min": { > > "field": "play_time" > > } > > }, > > "max": { > > "max": { > > "field": "play_time" > > } > > }, > > "diff": { > > "bucket_selector": { > > "buckets_path": { > > "min": "min", > > "max": "max" > > }, > > "script": "params.max - params.min > 5000" > > } > > } > > } > > } > > } > > } > > > > Is Elasticsearch query that contains aggregation not supported in > ElasticsearchIO? If not, is there a way to work around this? > > > Thanks, > > Nick >