Hi, Tauseef.
I modify you code and the following can work. Can you try that? ``` static class C1 { Metrics metrics; public C1(Metrics metrics) { this.metrics = metrics; } public Metrics getMetrics() { return metrics; } } static class Metrics { List<String> values; public Metrics(List<String> values) { this.values = values; } public List<String> getValues() { return values; } } static class C2 { String value; public C2(String value) { this.value = value; } } public static DataStream<C2> defineWorkflow(DataStream<C1> opentelSource) { DataStream<C2> ret = opentelSource .map(input -> input.getMetrics().getValues()) .returns(new TypeHint<List<String>>() {}.getTypeInfo()) .flatMap( (FlatMapFunction<List<String>, C2>) (list, collector) -> { for (String metric : list) { collector.collect( new C2( metric + "_" + UUID.randomUUID() .toString())); } }) .returns(C2.class); return ret; } ``` [1] https://nightlies.apache.org/flink/flink-docs-master/docs/learn-flink/etl/#flatmap -- Best! Xuyang -- Best! Xuyang 在 2023-11-21 23:34:37,"Tauseef Janvekar" <tauseefjanve...@gmail.com> 写道: Hi Xuyang, I read the post that you shared but unfortunately it does not use custom POJO but uses String. Also it does not use flatmap.returns method. My flink version is 1.17.1 My code is as below publicstatic DataStream<GenericPipelineDTO> defineWorkflow(DataStream<OpenTelDTO> opentelSource) { DataStream<GenericPipelineDTO> ret = opentelSource .map(input -> input.getResourceMetrics()) .flatMap((list,collector) -> { for(ResourceMetrics metric : list) { String containerName = ""; String podName = ""; String hostName = ""; String namespaceName = ""; String cluster = "nyquist"; String tenant = "opis"; for(Attributes attr : metric.getResource().getAttributes()) { if(attr.getKey().equalsIgnoreCase("k8s.container.name")) { containerName = attr.getValue().getStringValue(); } elseif (attr.getKey().equalsIgnoreCase("k8s.pod.name")) { podName = attr.getValue().getStringValue(); } elseif (attr.getKey().equalsIgnoreCase("k8s.namespace.name")) { namespaceName = attr.getValue().getStringValue(); } elseif (attr.getKey().equalsIgnoreCase("k8s.node.name")) { hostName = attr.getValue().getStringValue(); } } String unqKey = containerName +"_"+podName+"_"+hostName+"_"+namespaceName+"_"+cluster+tenant; for(ScopeMetrics smtr:metric.getScopeMetrics()) { for(Metrics mtr : smtr.getMetrics()) { GenericPipelineDTO dto = new GenericPipelineDTO(); dto.setUniqueKey(unqKey); try { dto.setGeneratedTime(mtr.getGauge().getDataPoints().get(0).getTimeUnixNano()); } catch( Exception e) { dto.setGeneratedTime(mtr.getSum().getDataPoints().get(0).getTimeUnixNano()); } dto.setMetricName(mtr.getName()); dto.setMetricDescription(mtr.getDescription()); dto.setMetricUnit(mtr.getUnit()); try { dto.setMetricValue( mtr.getGauge().getDataPoints().get(0).getAsDouble() + Integer.parseInt(mtr.getGauge().getDataPoints().get(0).getAsInt()) ); } catch( Exception e) { dto.setMetricValue( mtr.getSum().getDataPoints().get(0).getAsDouble() + Integer.parseInt(mtr.getSum().getDataPoints().get(0).getAsInt()) ); } dto.setUuid(UUID.fromString(unqKey+mtr.getName()+dto.getGeneratedTime())); collector.collect(dto); } } } }).returns(????????);//What to put here ? returnret; } On Tue, 21 Nov 2023 at 20:57, Xuyang <xyzhong...@163.com> wrote: Hi, Tauseef. This is an example to use custom POJO with flatmap[1]. If possible, can you post your code and tag the flink version? [1] https://nightlies.apache.org/flink/flink-docs-master/docs/learn-flink/etl/#flatmap -- Best! Xuyang At 2023-11-21 22:48:41, "Tauseef Janvekar" <tauseefjanve...@gmail.com> wrote: Dear Team, I am getting the following error while using flatMap. Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: The return type of function 'defineWorkflow(OtelTransformerJob.java:75)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface. On further investigation I found that flatMap should end with .returns(Types.STRING) when the return type is a STRING. My question is if my return type is a custom DTO named mypckg.MyCustomDTO then how do I pass parameters to returns method. Thanks, Tauseef.