Hi, Tauseef.
I modify you code and the following can work. Can you try that?
```
static class C1 {
Metrics metrics;
public C1(Metrics metrics) {
this.metrics = metrics;
}
public Metrics getMetrics() {
return metrics;
}
}
static class Metrics {
List<String> values;
public Metrics(List<String> values) {
this.values = values;
}
public List<String> getValues() {
return values;
}
}
static class C2 {
String value;
public C2(String value) {
this.value = value;
}
}
public static DataStream<C2> defineWorkflow(DataStream<C1> opentelSource) {
DataStream<C2> ret =
opentelSource
.map(input -> input.getMetrics().getValues())
.returns(new TypeHint<List<String>>() {}.getTypeInfo())
.flatMap(
(FlatMapFunction<List<String>, C2>)
(list, collector) -> {
for (String metric : list) {
collector.collect(
new C2(
metric
+ "_"
+
UUID.randomUUID()
.toString()));
}
})
.returns(C2.class);
return ret;
}
```
[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/learn-flink/etl/#flatmap
--
Best!
Xuyang
--
Best!
Xuyang
在 2023-11-21 23:34:37,"Tauseef Janvekar" <[email protected]> 写道:
Hi Xuyang,
I read the post that you shared but unfortunately it does not use custom POJO
but uses String.
Also it does not use flatmap.returns method.
My flink version is 1.17.1
My code is as below
publicstatic DataStream<GenericPipelineDTO>
defineWorkflow(DataStream<OpenTelDTO> opentelSource) {
DataStream<GenericPipelineDTO> ret = opentelSource
.map(input -> input.getResourceMetrics())
.flatMap((list,collector) -> {
for(ResourceMetrics metric : list) {
String containerName = "";
String podName = "";
String hostName = "";
String namespaceName = "";
String cluster = "nyquist";
String tenant = "opis";
for(Attributes attr : metric.getResource().getAttributes()) {
if(attr.getKey().equalsIgnoreCase("k8s.container.name")) {
containerName = attr.getValue().getStringValue();
} elseif (attr.getKey().equalsIgnoreCase("k8s.pod.name")) {
podName = attr.getValue().getStringValue();
} elseif (attr.getKey().equalsIgnoreCase("k8s.namespace.name")) {
namespaceName = attr.getValue().getStringValue();
} elseif (attr.getKey().equalsIgnoreCase("k8s.node.name")) {
hostName = attr.getValue().getStringValue();
}
}
String unqKey = containerName
+"_"+podName+"_"+hostName+"_"+namespaceName+"_"+cluster+tenant;
for(ScopeMetrics smtr:metric.getScopeMetrics()) {
for(Metrics mtr : smtr.getMetrics()) {
GenericPipelineDTO dto = new GenericPipelineDTO();
dto.setUniqueKey(unqKey);
try {
dto.setGeneratedTime(mtr.getGauge().getDataPoints().get(0).getTimeUnixNano());
} catch( Exception e) {
dto.setGeneratedTime(mtr.getSum().getDataPoints().get(0).getTimeUnixNano());
}
dto.setMetricName(mtr.getName());
dto.setMetricDescription(mtr.getDescription());
dto.setMetricUnit(mtr.getUnit());
try {
dto.setMetricValue(
mtr.getGauge().getDataPoints().get(0).getAsDouble() +
Integer.parseInt(mtr.getGauge().getDataPoints().get(0).getAsInt())
);
} catch( Exception e) {
dto.setMetricValue(
mtr.getSum().getDataPoints().get(0).getAsDouble() +
Integer.parseInt(mtr.getSum().getDataPoints().get(0).getAsInt())
);
}
dto.setUuid(UUID.fromString(unqKey+mtr.getName()+dto.getGeneratedTime()));
collector.collect(dto);
}
}
}
}).returns(????????);//What to put here ?
returnret;
}
On Tue, 21 Nov 2023 at 20:57, Xuyang <[email protected]> wrote:
Hi, Tauseef.
This is an example to use custom POJO with flatmap[1]. If possible, can you
post your code and tag the flink version?
[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/learn-flink/etl/#flatmap
--
Best!
Xuyang
At 2023-11-21 22:48:41, "Tauseef Janvekar" <[email protected]> wrote:
Dear Team,
I am getting the following error while using flatMap.
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error: The return type of function
'defineWorkflow(OtelTransformerJob.java:75)' could not be determined
automatically, due to type erasure. You can give type information hints by
using the returns(...) method on the result of the transformation call, or by
letting your function implement the 'ResultTypeQueryable' interface.
On further investigation I found that flatMap should end with
.returns(Types.STRING) when the return type is a STRING.
My question is if my return type is a custom DTO named mypckg.MyCustomDTO then
how do I pass parameters to returns method.
Thanks,
Tauseef.