[ https://issues.apache.org/jira/browse/FLINK-13025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16936142#comment-16936142 ]
Victor commented on FLINK-13025: -------------------------------- [~yanghua] I've tested the basic ES sink in a live project I have running Flink 1.8.1 and Elasticsearch 7.3.1 I had to get flink-connector-elasticsearch-base_2.12 version 1.8.1 separately since there isn't a 1.10.x version yet for the connector you've created. The basic Sink does work fine without specifying the "Type" property when creating the IndexRequest as intended: {code:java} // new IndexRequest(msg.esIndexName) .create(false) .id(msg.esId) .source(msg.esDocumentBody, XContentType.JSON){code} also noticed the APIbridge is correctly pining ES cluster with Default Request Options instead of failing with a 7.3.X ES client as before, so that's good too!! and allows flink job to start well. [https://github.com/apache/flink/pull/9720/files#diff-f2f261e881f5618405a827796728a6a4R82] What I'm not sure, is if it would be better to restrict or add validations to remove the capability to add a "Type" name when creating the IndexRequest since it's no longer supported? IDEs will display the classic "deprecated" underlying, but for new users I think is a weird user experience to have the connector not complained an then see this log entry when the job gets executed: {code:java} // WARNING: request [POST http://localhost:9200/_bulk?timeout=1m] returned 1 warnings: [299 Elasticsearch-7.3.1-4749ba6 "[types removal] Specifying types in bulk requests is deprecated."]{code} If you think that's out of the scope of the connector that's fair too, but I do think at least the example should then omit the use of the "type" property in the builder: [https://github.com/apache/flink/pull/9720/files#diff-d131eab50d3d9f225c9e05ccca5a6ea4R139 ] The part that I haven't used yet and I'm not sure how to test is the UpsertTableSink and I would like to test that out since I can see we have "String docType," fields there and if a user provide those it might trigger an "java.lang.IllegalArgumentException" error on ES side. [https://github.com/apache/flink/pull/9720/files#diff-63da77546b576d0bd0b788d522fdad6fR244] [https://github.com/apache/flink/pull/9720/files#diff-63da77546b576d0bd0b788d522fdad6fR232] With a message like this: {code:java} // elasticsearch | "stacktrace": ["java.lang.IllegalArgumentException: Rejecting mapping update to [my-index] as the final mapping would have more than 1 type: [_doc, mytype]", elasticsearch | "at org.elasticsearch.cluster.metadata.MetaDataMappingService$PutMappingExecutor.applyRequest(MetaDataMappingService.java:272) ~[elasticsearch-7.3.1.jar:7.3.1]", elasticsearch | "at org.elasticsearch.cluster.metadata.MetaDataMappingService$PutMappingExecutor.execute(MetaDataMappingService.java:238) ~[elasticsearch-7.3.1.jar:7.3.1]", elasticsearch | "at org.elasticsearch.cluster.service.MasterService.executeTasks(MasterService.java:687) ~[elasticsearch-7.3.1.jar:7.3.1]", elasticsearch | "at org.elasticsearch.cluster.service.MasterService.calculateTaskOutputs(MasterService.java:310) ~[elasticsearch-7.3.1.jar:7.3.1]", elasticsearch | "at org.elasticsearch.cluster.service.MasterService.runTasks(MasterService.java:210) [elasticsearch-7.3.1.jar:7.3.1]", elasticsearch | "at org.elasticsearch.cluster.service.MasterService$Batcher.run(MasterService.java:142) [elasticsearch-7.3.1.jar:7.3.1]", elasticsearch | "at org.elasticsearch.cluster.service.TaskBatcher.runIfNotProcessed(TaskBatcher.java:150) [elasticsearch-7.3.1.jar:7.3.1]", elasticsearch | "at org.elasticsearch.cluster.service.TaskBatcher$BatchedTask.run(TaskBatcher.java:188) [elasticsearch-7.3.1.jar:7.3.1]", elasticsearch | "at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:688) [elasticsearch-7.3.1.jar:7.3.1]", elasticsearch | "at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:252) [elasticsearch-7.3.1.jar:7.3.1]", elasticsearch | "at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:215) [elasticsearch-7.3.1.jar:7.3.1]", elasticsearch | "at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]", elasticsearch | "at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]",{code} But as I said, I'm not sure how to test that in a live project so If anyone can point me to some code example or can test that too it would be awesome. I'd figure it out, but unfortunately I can't commit a lot of time at the moment. > Elasticsearch 7.x support > ------------------------- > > Key: FLINK-13025 > URL: https://issues.apache.org/jira/browse/FLINK-13025 > Project: Flink > Issue Type: New Feature > Components: Connectors / ElasticSearch > Affects Versions: 1.8.0 > Reporter: Keegan Standifer > Priority: Major > Labels: pull-request-available > Attachments: flink-connector-elasticsearch7_2.12-1.10-SNAPSHOT.jar > > Time Spent: 10m > Remaining Estimate: 0h > > Elasticsearch 7.0.0 was released in April of 2019: > [https://www.elastic.co/blog/elasticsearch-7-0-0-released] > The latest elasticsearch connector is > [flink-connector-elasticsearch6|https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-elasticsearch6] -- This message was sent by Atlassian Jira (v8.3.4#803005)