Hello All, I am using apache beam java sdk 2.19 and elastic search IO 6.x.
I keep getting following exception while dumping streaming logs to ES: java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Cannot get Elasticsearch version 1. 1. atorg.apache.beam.runners.dataflow.worker. IntrinsicMapTaskExecutorFactory$1.typedApply ( IntrinsicMapTaskExecutorFactory.java:194 <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FIntrinsicMapTaskExecutorFactory.java&line=194&project=prosimo-test> ) 2. atorg.apache.beam.runners.dataflow.worker. IntrinsicMapTaskExecutorFactory$1.typedApply ( IntrinsicMapTaskExecutorFactory.java:165 <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FIntrinsicMapTaskExecutorFactory.java&line=165&project=prosimo-test> ) 3. atorg.apache.beam.runners.dataflow.worker.graph. Networks$TypeSafeNodeFunction.apply (Networks.java:63 <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker.graph%2FNetworks.java&line=63&project=prosimo-test> ) 4. atorg.apache.beam.runners.dataflow.worker.graph. Networks$TypeSafeNodeFunction.apply (Networks.java:50 <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker.graph%2FNetworks.java&line=50&project=prosimo-test> ) 5. atorg.apache.beam.runners.dataflow.worker.graph. Networks.replaceDirectedNetworkNodes (Networks.java:87 <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker.graph%2FNetworks.java&line=87&project=prosimo-test> ) 6. atorg.apache.beam.runners.dataflow.worker. IntrinsicMapTaskExecutorFactory.create ( IntrinsicMapTaskExecutorFactory.java:125 <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FIntrinsicMapTaskExecutorFactory.java&line=125&project=prosimo-test> ) 7. atorg.apache.beam.runners.dataflow.worker. StreamingDataflowWorker.process (StreamingDataflowWorker.java:1266 <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FStreamingDataflowWorker.java&line=1266&project=prosimo-test> ) 8. atorg.apache.beam.runners.dataflow.worker. StreamingDataflowWorker.access$1100 (StreamingDataflowWorker.java:152 <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FStreamingDataflowWorker.java&line=152&project=prosimo-test> ) 9. atorg.apache.beam.runners.dataflow.worker. StreamingDataflowWorker$7.run (StreamingDataflowWorker.java:1073 <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FStreamingDataflowWorker.java&line=1073&project=prosimo-test> ) 10. atjava.util.concurrent.ThreadPoolExecutor.runWorker ( ThreadPoolExecutor.java:1149 <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=java%2Futil.concurrent%2FThreadPoolExecutor.java&line=1149&project=prosimo-test> ) 2. Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Cannot get Elasticsearch version 1. 1. atorg.apache.beam.sdk.util.UserCodeException.wrap ( UserCodeException.java:34 <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.sdk.util%2FUserCodeException.java&line=34&project=prosimo-test> ) 2. atorg.apache.beam.sdk.io.elasticsearch. ElasticsearchIO$Write$WriteFn$DoFnInvoker.invokeSetup (Unknown Source) 3. atorg.apache.beam.runners.dataflow.worker. DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy ( DoFnInstanceManagers.java:80 <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FDoFnInstanceManagers.java&line=80&project=prosimo-test> ) 4. atorg.apache.beam.runners.dataflow.worker. DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek ( DoFnInstanceManagers.java:62 <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FDoFnInstanceManagers.java&line=62&project=prosimo-test> ) 5. atorg.apache.beam.runners.dataflow.worker.UserParDoFnFactory.create (UserParDoFnFactory.java:95 <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FUserParDoFnFactory.java&line=95&project=prosimo-test> ) 6. atorg.apache.beam.runners.dataflow.worker. DefaultParDoFnFactory.create (DefaultParDoFnFactory.java:75 <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FDefaultParDoFnFactory.java&line=75&project=prosimo-test> ) 7. atorg.apache.beam.runners.dataflow.worker. IntrinsicMapTaskExecutorFactory.createParDoOperation ( IntrinsicMapTaskExecutorFactory.java:264 <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FIntrinsicMapTaskExecutorFactory.java&line=264&project=prosimo-test> ) 8. atorg.apache.beam.runners.dataflow.worker. IntrinsicMapTaskExecutorFactory.access$000 ( IntrinsicMapTaskExecutorFactory.java:86 <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FIntrinsicMapTaskExecutorFactory.java&line=86&project=prosimo-test> ) 9. atorg.apache.beam.runners.dataflow.worker. IntrinsicMapTaskExecutorFactory$1.typedApply ( IntrinsicMapTaskExecutorFactory.java:183 <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FIntrinsicMapTaskExecutorFactory.java&line=183&project=prosimo-test> ) Caused by: java.lang.IllegalArgumentException: Cannot get Elasticsearch version 1. 1. atorg.apache.beam.sdk.io.elasticsearch. ElasticsearchIO.getBackendVersion (ElasticsearchIO.java:1472 <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.sdk.io.elasticsearch%2FElasticsearchIO.java&line=1472&project=prosimo-test> ) 2. atorg.apache.beam.sdk.io.elasticsearch. ElasticsearchIO$Write$WriteFn.setup (ElasticsearchIO.java:1270 <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.sdk.io.elasticsearch%2FElasticsearchIO.java&line=1270&project=prosimo-test> ) Caused by: org.apache.http.ConnectionClosedException: Connection is closed 1. 1. atorg.elasticsearch.client.RestClient.extractAndWrapCause ( RestClient.java:813 <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Felasticsearch.client%2FRestClient.java&line=813&project=prosimo-test> ) 2. atorg.elasticsearch.client.RestClient.performRequest ( RestClient.java:248 <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Felasticsearch.client%2FRestClient.java&line=248&project=prosimo-test> ) 3. atorg.elasticsearch.client.RestClient.performRequest ( RestClient.java:235 <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Felasticsearch.client%2FRestClient.java&line=235&project=prosimo-test> ) 2. Caused by: org.apache.http.ConnectionClosedException: Connection is closed 1. 1. atorg.apache.http.nio.protocol.HttpAsyncRequestExecutor.endOfInput (HttpAsyncRequestExecutor.java:356 <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.http.nio.protocol%2FHttpAsyncRequestExecutor.java&line=356&project=prosimo-test> ) 2. atorg.apache.http.impl.nio. DefaultNHttpClientConnection.consumeInput ( DefaultNHttpClientConnection.java:261 <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.http.impl.nio%2FDefaultNHttpClientConnection.java&line=261&project=prosimo-test> ) 3. atorg.apache.http.impl.nio.client.InternalIODispatch.onInputReady ( InternalIODispatch.java:81 <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.http.impl.nio.client%2FInternalIODispatch.java&line=81&project=prosimo-test> ) 4. atorg.apache.http.impl.nio.client.InternalIODispatch.onInputReady ( InternalIODispatch.java:39 <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.http.impl.nio.client%2FInternalIODispatch.java&line=39&project=prosimo-test> ) 5. atorg.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady ( AbstractIODispatch.java:121 <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.http.impl.nio.reactor%2FAbstractIODispatch.java&line=121&project=prosimo-test> ) 6. atorg.apache.http.impl.nio.reactor.BaseIOReactor.readable ( BaseIOReactor.java:162 <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.http.impl.nio.reactor%2FBaseIOReactor.java&line=162&project=prosimo-test> ) 7. atorg.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent ( AbstractIOReactor.java:337 <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.http.impl.nio.reactor%2FAbstractIOReactor.java&line=337&project=prosimo-test> ) 8. atorg.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents (AbstractIOReactor.java:315 <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.http.impl.nio.reactor%2FAbstractIOReactor.java&line=315&project=prosimo-test> ) 9. atorg.apache.http.impl.nio.reactor.AbstractIOReactor.execute ( AbstractIOReactor.java:276 <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.http.impl.nio.reactor%2FAbstractIOReactor.java&line=276&project=prosimo-test> ) 10. atorg.apache.http.impl.nio.reactor.BaseIOReactor.execute ( BaseIOReactor.java:104 <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.http.impl.nio.reactor%2FBaseIOReactor.java&line=104&project=prosimo-test> ) 2. 1. atorg.apache.http.impl.nio.reactor. AbstractMultiworkerIOReactor$Worker.run ( AbstractMultiworkerIOReactor.java:591) My code to write to ES is pretty simple: input .apply("Convert_PCollection<POJO:AppAccessSessionLog> to PCollection<String>", new AppAccessSessionLogToString()) .apply("Write_To_Elastic_Search", ElasticsearchIO.write() .withConnectionConfiguration( ElasticsearchIO.ConnectionConfiguration.create(elasticsearchEndPoint, “indexname”, "_doc") .withUsername(username).withPassword(password)) .withRetryConfiguration( ElasticsearchIO.RetryConfiguration.create(MAX_ATTEMPTS, Duration.millis(60000))) .withUsePartialUpdate(true) .withIdFn(new ElasticExtractNameFieldIdFn(“id”)) ); Any idea how to resolve this ? Thanks and Regards Mohil