arthurdm commented on issue #2116:
URL: https://github.com/apache/camel-k/issues/2116#issuecomment-797047908
Thanks @oscerd! I think a very useful sample would show how to create a
`DataSet` that took into consideration the incoming payload. Here's the
snippet I was trying to get to work [1]. I couldn't get my `DataSet` to be
registered. Keep getting this error [2].
**[1]**
```java
public class FlinkSample extends RouteBuilder implements Processor {
@Override
public void configure() throws Exception {
// Receive a msg from the configured Kafka topic
from("timer:foo")
.log("Received body: ${body}")
.split(body().tokenize("\n"))
.process(this)
.to("flink:dataSet?dataSet=#myDataSet&dataSetCallback=#myDataSetCallback")
.log("Response body: ${body}");
}
@BindToRegistry
public DataSetCallback<String> myDataSetCallback () {
return new DataSetCallback<String>() {
@Override
public String onDataSet(DataSet ds, Object... payloads) {
return "this is the result";
}
};
}
@Override
public void process(Exchange exchange) throws Exception {
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> myDataSet =
env.fromElements(exchange.getIn().toString());
exchange.getContext().getRegistry().bind("myDataSet", DataSet.class,
myDataSet);
}
}
```
**[2]**
```
Failed to start application (with profile prod):
org.apache.camel.NoSuchBeanException: No bean could be found in the registry
for: myDataSet of type: org.apache.flink.api.java.DataSet
at
org.apache.camel.support.CamelContextHelper.mandatoryLookupAndConvert(CamelContextHelper.java:253)
at
org.apache.camel.support.EndpointHelper.resolveReferenceParameter(EndpointHelper.java:290)
at
org.apache.camel.support.EndpointHelper.resolveReferenceParameter(EndpointHelper.java:250)
at
org.apache.camel.support.component.PropertyConfigurerSupport.property(PropertyConfigurerSupport.java:53)
at
org.apache.camel.component.flink.FlinkEndpointConfigurer.configure(FlinkEndpointConfigurer.java:26)
at
org.apache.camel.support.PropertyBindingSupport.setSimplePropertyViaConfigurer(PropertyBindingSupport.java:703)
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]