Yes unfortunately the ES IO connector is not built in a way that can work
by taking inputs from a PCollection to issue Reads. The most scalable way
to support this is to revisit the implementation of Elasticsearch Read
transform and instead implement it as a SplittableDoFn.

On Wed, Apr 19, 2023 at 10:51 Shahar Frank <srf...@gmail.com> wrote:

> Hi Sean,
>
> I'm not an expert but I think the .withQuery() functions takes part of the
> build stage rather than the runtime stage.
> This means that the way ElasticsearchIO was built is so that while the
> pipeline is being built you could set the query but it is not possible
> during runtime which mean you cannot dynamically run the query based on the
> element processed within the pipeline.
>
> To do something like that the transformation must be designed more like
> the FileIO in this example: (From
> https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/io/FileIO.html
> )
>
>>  PCollection<KV<String, String>> filesAndContents = p
>>      .apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
>>      // withCompression can be omitted - by default compression is detected 
>> from the filename.
>>      .apply(FileIO.readMatches().withCompression(GZIP))
>>      .apply(MapElements
>>          // uses imports from TypeDescriptors
>>          .into(kvs(strings(), strings()))
>>          .via((ReadableFile f) -> {
>>            try {
>>              return KV.of(
>>                  f.getMetadata().resourceId().toString(), 
>> f.readFullyAsUTF8String());
>>            } catch (IOException ex) {
>>              throw new RuntimeException("Failed to read the file", ex);
>>            }
>>          }));
>>
>>
> If you look at how FileIO.readMatches() works - it doesn't set the
> filename when building the pipeline but rather accepts that within the
> ProcessElement function.
>
> See here
> <https://github.com/apache/beam/blob/bd8950176db0116221a1b739a3916da26d822f2f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L873>
>
> Does that make sense?
>
> Cheers,
> Shahar.
>
> ------------------------------
>
> Shahar Frank
>
> srf...@gmail.com
>
> +447799561438
>
> ------------------------------
>
>
>
>
>
>
> On Wed, 19 Apr 2023 at 18:05, Murphy, Sean P. via user <
> user@beam.apache.org> wrote:
>
>> I'm running into an issue using the ElasticsearchIO.read() to handle
>> more than one instance of a query. My queries are being dynamically built
>> as a PCollection based on an incoming group of values. I'm trying to see
>> how to load the .withQuery() parameter which could provide this
>> capability or any approach that provides flexibility.
>>
>>
>>
>> The issue is that ElasticsearchIO.read() method expects a PBegin input
>> to start a pipeline, but it seems like I need access outside of a pipeline
>> context somehow. PBegin represents the beginning of a pipeline, and it's
>> required to create a pipeline that can read data from Elasticsearch using
>> IOElasticsearchIO.read().
>>
>>
>>
>> Can I wrap the ElasticsearchIO.read() call in a Create transform that
>> creates a PCollection with a single element (e.g., PBegin) to simulate the
>> beginning of a pipeline or something similar?
>>
>>
>>
>> Here is my naive attempt without accepting the reality of PBegin:
>>
>>    PCollection<String> queries = ... // a PCollection of Elasticsearch
>> queries
>>
>>
>>
>>     PCollection<String> queryResults = queries.apply(
>>
>>         ParDo.of(new DoFn<String, String>() {
>>
>>             @ProcessElement
>>
>>             public void processElement(ProcessContext c) {
>>
>>                 String query = c.element();
>>
>>                 PCollection<String> results = c.pipeline()
>>
>>                     .apply(ElasticsearchIO.read()
>>
>>                         .withConnectionConfiguration(
>>
>>
>> ElasticsearchIO.ConnectionConfiguration.create(hosts, indexName))
>>
>>                         .withQuery(query));
>>
>>                 c.output(results);
>>
>>             }
>>
>>         })
>>
>>     .apply(Flatten.pCollections()));
>>
>>
>>
>>
>>
>> In general I'm wondering for any of IO-related classes proved by Beam
>> that conforms to PBegin input -- if there is a means to introduce a
>> collection.
>>
>>
>>
>> Here is one approach that might be promising:
>>
>> // Define a ValueProvider for a List<String>
>>
>> ValueProvider<List<String>> myListProvider =
>> ValueProvider.StaticValueProvider.of(myList);
>>
>>
>>
>> // Use the ValueProvider to create a PCollection of Strings
>>
>> PCollection<String> pcoll =
>> pipeline.apply(Create.ofProvider(myListProvider, ListCoder.of()));
>>
>>
>>
>> PCollection<String> partitionData = PBegin.in(pipeline)
>>         .apply("Read data from Elasticsearch", 
>> ElasticsearchIO.*read*().withConnectionConfiguration(connConfig).withQuery(ValueProvider<String>
>> pcoll).withScrollKeepalive("1m").withBatchSize(50))
>>         .apply(new MedTaggerESRunnerTransform(opt.getProjectAe(),
>> opt.getMedTagVersion(), opt.getNoteType()));
>>
>>
>>
>> Any thoughts or ideas would be great.   Thanks, ~Sean
>>
>>
>>
>

Reply via email to