Hi Tauseef, We cannot directly write POJO types into Elasticsearch. You can try serializing the TopologyDTO into a JSON string like Jackson before writing it.
public static void main(String[] args) throws IOException { try (RestHighLevelClient client = new RestHighLevelClient( RestClient.builder(HttpHost.create("http://127.0.0.1:9200")))) { TopologyDTO data = new TopologyDTO(); IndexRequest request = Requests.indexRequest() .index("topology") .id(data.getUuid()) //here uuid is String .source(new ObjectMapper().writeValueAsString(data), XContentType.JSON); client.index(request); } } Best, Jiabao On 2024/01/25 13:00:58 Tauseef Janvekar wrote: > Hi Team, > > We get the below error message when we try to add an elastick sink > Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) > at > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) > ... 23 more > Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > com.hds.alta.pipeline.topology.TopologyJob.lambda$workflow$cde51820$1(TopologyJob.java:186) > at > org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) > ... 27 more > Caused by: java.lang.IllegalArgumentException: cannot write xcontent for > unknown value of type class com.hds.alta.pipeline.model.TopologyDTO. > > The code written for the same is here > > workflow(filterItems(openTelSrc)).sinkTo(new > Elasticsearch7SinkBuilder<TopologyDTO>().setBulkFlushMaxActions(1) > > .setHosts(new HttpHost("elastic-host.com", 9200, "https")) > > .setConnectionPassword("password").setConnectionUsername("elastic") > > .setEmitter((element, context, indexer) -> indexer.add(createIndexRequest( > element))).build()) > > .name("topology_sink"); > > > private static IndexRequest createIndexRequest(TopologyDTO data) { > > Map<String, TopologyDTO> json = new HashMap<>(); > > json.put("data", data); > > return Requests.indexRequest() > > .index("topology") > > .id(data.getUuid()) //here uuid is String > > .source(json); > > } > > Any help would be greatly appreciated. > > Thanks, > Tauseef >