I’m not a Janino expert but it might be related to the fact that Janino not
fully supports generic types (see http://unkrig.de/w/Janino under
limitations). Maybe it works of you use the untyped MapFunction type.

Cheers,
Till
​

On Sat, Oct 3, 2015 at 8:04 PM, Giacomo Licari <giacomo.lic...@gmail.com>
wrote:

> Hi guys,
> I'm developing a dynamic Flink program composer, which receive a dataflow
> from a client and convert it into Flink code.
>
> I have tried to compile a test Flink program with Janino, but it fails,
> the error I receive is:
> org.codehaus.commons.compiler.CompileException: Line 66, Column 0:
> Non-abstract class "FlinkExecutor$1" must implement method "public abstract
> java.lang.Object
> org.apache.flink.api.common.functions.MapFunction.map(java.lang.Object)
> throws java.lang.Exception"
>
> It seems Janino doesn't recognize the MapFunction.
>
> If I put this code into a java file and I execute it with Eclipse,
> everything works good.
>
> Here the code I used:
>
> package Test;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import com.Flink.Operators.Source;
>
> public class FlinkExecutor {
> public static class RainPOJO {
> private String altitude;
> private String city_name;
> private String latitude;
> private String longitude;
> private String rainfall;
> private String station_name;
> private String time;
> public String getAltitude() {
> return altitude;
> }
> public void setAltitude(String Altitude) {
> this.altitude = Altitude;
> }
> public String getCity_name() {
> return city_name;
> }
> public void setCity_name(String City_name) {
> this.city_name = City_name;
> }
> public String getLatitude() {
> return latitude;
> }
> public void setLatitude(String Latitude) {
> this.latitude = Latitude;
> }
> public String getLongitude() {
> return longitude;
> }
> public void setLongitude(String Longitude) {
> this.longitude = Longitude;
> }
> public String getRainfall() {
> return rainfall;
> }
> public void setRainfall(String Rainfall) {
> this.rainfall = Rainfall;
> }
> public String getStation_name() {
> return station_name;
> }
> public void setStation_name(String Station_name) {
> this.station_name = Station_name;
> }
> public String getTime() {
> return time;
> }
> public void setTime(String Time) {
> this.time = Time;
> }
> }
> public FlinkExecutor() {}
> public static void main(String[] args) throws Exception {
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> env.setDegreeOfParallelism(1);
> Source Source = new Source("sensor", "rain");
> String path_Source = Source.getCSVPath();
> DataSet < RainPOJO > ds_s1 = env.readCsvFile("file://" + path_Source)
> .ignoreFirstLine()
> .pojoType(RainPOJO.class, "altitude", "city_name", "latitude",
> "longitude", "rainfall", "station_name", "time");
> long size = ds_s1.count();
> long startTime = System.currentTimeMillis();
> ds_s1.map(new MapFunction < RainPOJO, String > () {
> int count = 0;@Override
> public String map(RainPOJO obj) throws Exception {
> count += 1;
> long endTime = System.currentTimeMillis();
> double elapsed_time = endTime - startTime;
> if (count == size) {
> double d_seconds = elapsed_time / 1000;
> return "Elapsed time => " + elapsed_time + "(millis) " + d_seconds + "
> seconds";
> }
> return " " + count;
> }
> })
> .print();
> }
> }
>

Reply via email to