Hi Tauseef,

This error is because your Class
com.hds.alta.pipeline.model.TopologyDTO cannot be serialized by ES
xcontent util.

The following solutions may fix it.
1. convert your TopologyDTO class data to a Map, and avoid using some
custom Class that cannot be serialized by ES.
or 2. make your TopologyDTO extend the ToXContent[1] interface, and
implement the toXContent method.

1. 
https://artifacts.elastic.co/javadoc/org/elasticsearch/elasticsearch-x-content/6.3.2/org/elasticsearch/common/xcontent/ToXContent.html

Best regards,
Jiadong Lu

On 2024/1/25 21:00, Tauseef Janvekar wrote:
> Hi Team,
>
> We get the below error message when we try to add an elastick sink
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
> ... 23 more
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at
> com.hds.alta.pipeline.topology.TopologyJob.lambda$workflow$cde51820$1(TopologyJob.java:186)
> at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
> ... 27 more
> Caused by: java.lang.IllegalArgumentException: cannot write xcontent for
> unknown value of type class com.hds.alta.pipeline.model.TopologyDTO.
>
> The code written for the same is here
>
> workflow(filterItems(openTelSrc)).sinkTo(new
> Elasticsearch7SinkBuilder<TopologyDTO>().setBulkFlushMaxActions(1)
>
> .setHosts(new HttpHost("elastic-host.com <http://elastic-host.com>",
> 9200, "https"))
>
> .setConnectionPassword("password").setConnectionUsername("elastic")
>
> .setEmitter((element, context, indexer) ->
> indexer.add(createIndexRequest(element))).build())
>
> .name("topology_sink");
>
>
> private static IndexRequest createIndexRequest(TopologyDTO data) {
>
> Map<String, TopologyDTO> json = new HashMap<>();
>
> json.put("data", data);
>
> return Requests.indexRequest()
>
> .index("topology")
>
> .id(data.getUuid()) //here uuid is String
>
> .source(json);
>
> }
>
>
> Any help would be greatly appreciated.
>
> Thanks,
> Tauseef

Reply via email to