Yes unfortunately the ES IO connector is not built in a way that can work by taking inputs from a PCollection to issue Reads. The most scalable way to support this is to revisit the implementation of Elasticsearch Read transform and instead implement it as a SplittableDoFn.
On Wed, Apr 19, 2023 at 10:51 Shahar Frank <srf...@gmail.com> wrote: > Hi Sean, > > I'm not an expert but I think the .withQuery() functions takes part of the > build stage rather than the runtime stage. > This means that the way ElasticsearchIO was built is so that while the > pipeline is being built you could set the query but it is not possible > during runtime which mean you cannot dynamically run the query based on the > element processed within the pipeline. > > To do something like that the transformation must be designed more like > the FileIO in this example: (From > https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/io/FileIO.html > ) > >> PCollection<KV<String, String>> filesAndContents = p >> .apply(FileIO.match().filepattern("hdfs://path/to/*.gz")) >> // withCompression can be omitted - by default compression is detected >> from the filename. >> .apply(FileIO.readMatches().withCompression(GZIP)) >> .apply(MapElements >> // uses imports from TypeDescriptors >> .into(kvs(strings(), strings())) >> .via((ReadableFile f) -> { >> try { >> return KV.of( >> f.getMetadata().resourceId().toString(), >> f.readFullyAsUTF8String()); >> } catch (IOException ex) { >> throw new RuntimeException("Failed to read the file", ex); >> } >> })); >> >> > If you look at how FileIO.readMatches() works - it doesn't set the > filename when building the pipeline but rather accepts that within the > ProcessElement function. > > See here > <https://github.com/apache/beam/blob/bd8950176db0116221a1b739a3916da26d822f2f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L873> > > Does that make sense? > > Cheers, > Shahar. > > ------------------------------ > > Shahar Frank > > srf...@gmail.com > > +447799561438 > > ------------------------------ > > > > > > > On Wed, 19 Apr 2023 at 18:05, Murphy, Sean P. via user < > user@beam.apache.org> wrote: > >> I'm running into an issue using the ElasticsearchIO.read() to handle >> more than one instance of a query. My queries are being dynamically built >> as a PCollection based on an incoming group of values. I'm trying to see >> how to load the .withQuery() parameter which could provide this >> capability or any approach that provides flexibility. >> >> >> >> The issue is that ElasticsearchIO.read() method expects a PBegin input >> to start a pipeline, but it seems like I need access outside of a pipeline >> context somehow. PBegin represents the beginning of a pipeline, and it's >> required to create a pipeline that can read data from Elasticsearch using >> IOElasticsearchIO.read(). >> >> >> >> Can I wrap the ElasticsearchIO.read() call in a Create transform that >> creates a PCollection with a single element (e.g., PBegin) to simulate the >> beginning of a pipeline or something similar? >> >> >> >> Here is my naive attempt without accepting the reality of PBegin: >> >> PCollection<String> queries = ... // a PCollection of Elasticsearch >> queries >> >> >> >> PCollection<String> queryResults = queries.apply( >> >> ParDo.of(new DoFn<String, String>() { >> >> @ProcessElement >> >> public void processElement(ProcessContext c) { >> >> String query = c.element(); >> >> PCollection<String> results = c.pipeline() >> >> .apply(ElasticsearchIO.read() >> >> .withConnectionConfiguration( >> >> >> ElasticsearchIO.ConnectionConfiguration.create(hosts, indexName)) >> >> .withQuery(query)); >> >> c.output(results); >> >> } >> >> }) >> >> .apply(Flatten.pCollections())); >> >> >> >> >> >> In general I'm wondering for any of IO-related classes proved by Beam >> that conforms to PBegin input -- if there is a means to introduce a >> collection. >> >> >> >> Here is one approach that might be promising: >> >> // Define a ValueProvider for a List<String> >> >> ValueProvider<List<String>> myListProvider = >> ValueProvider.StaticValueProvider.of(myList); >> >> >> >> // Use the ValueProvider to create a PCollection of Strings >> >> PCollection<String> pcoll = >> pipeline.apply(Create.ofProvider(myListProvider, ListCoder.of())); >> >> >> >> PCollection<String> partitionData = PBegin.in(pipeline) >> .apply("Read data from Elasticsearch", >> ElasticsearchIO.*read*().withConnectionConfiguration(connConfig).withQuery(ValueProvider<String> >> pcoll).withScrollKeepalive("1m").withBatchSize(50)) >> .apply(new MedTaggerESRunnerTransform(opt.getProjectAe(), >> opt.getMedTagVersion(), opt.getNoteType())); >> >> >> >> Any thoughts or ideas would be great. Thanks, ~Sean >> >> >> >