Hi Tauseef,

We cannot directly write POJO types into Elasticsearch. 
You can try serializing the TopologyDTO into a JSON string like Jackson before 
writing it.

    public static void main(String[] args) throws IOException {
        try (RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(HttpHost.create("http://127.0.0.1:9200";)))) {
            TopologyDTO data = new TopologyDTO();

            IndexRequest request = Requests.indexRequest()
                    .index("topology")
                    .id(data.getUuid()) //here uuid is String
                    .source(new ObjectMapper().writeValueAsString(data), 
XContentType.JSON);

            client.index(request);
        }
    }

Best,
Jiabao


On 2024/01/25 13:00:58 Tauseef Janvekar wrote:
> Hi Team,
> 
> We get the below error message when we try to add an elastick sink
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
> ... 23 more
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at
> com.hds.alta.pipeline.topology.TopologyJob.lambda$workflow$cde51820$1(TopologyJob.java:186)
> at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
> ... 27 more
> Caused by: java.lang.IllegalArgumentException: cannot write xcontent for
> unknown value of type class com.hds.alta.pipeline.model.TopologyDTO.
> 
> The code written for the same is here
> 
> workflow(filterItems(openTelSrc)).sinkTo(new
> Elasticsearch7SinkBuilder<TopologyDTO>().setBulkFlushMaxActions(1)
> 
> .setHosts(new HttpHost("elastic-host.com", 9200, "https"))
> 
> .setConnectionPassword("password").setConnectionUsername("elastic")
> 
> .setEmitter((element, context, indexer) -> indexer.add(createIndexRequest(
> element))).build())
> 
> .name("topology_sink");
> 
> 
> private static IndexRequest createIndexRequest(TopologyDTO data) {
> 
> Map<String, TopologyDTO> json = new HashMap<>();
> 
> json.put("data", data);
> 
> return Requests.indexRequest()
> 
> .index("topology")
> 
> .id(data.getUuid()) //here uuid is String
> 
> .source(json);
> 
> }
> 
> Any help would be greatly appreciated.
> 
> Thanks,
> Tauseef
> 

Reply via email to