I’m not a Janino expert but it might be related to the fact that Janino not fully supports generic types (see http://unkrig.de/w/Janino under limitations). Maybe it works of you use the untyped MapFunction type.
Cheers, Till On Sat, Oct 3, 2015 at 8:04 PM, Giacomo Licari <giacomo.lic...@gmail.com> wrote: > Hi guys, > I'm developing a dynamic Flink program composer, which receive a dataflow > from a client and convert it into Flink code. > > I have tried to compile a test Flink program with Janino, but it fails, > the error I receive is: > org.codehaus.commons.compiler.CompileException: Line 66, Column 0: > Non-abstract class "FlinkExecutor$1" must implement method "public abstract > java.lang.Object > org.apache.flink.api.common.functions.MapFunction.map(java.lang.Object) > throws java.lang.Exception" > > It seems Janino doesn't recognize the MapFunction. > > If I put this code into a java file and I execute it with Eclipse, > everything works good. > > Here the code I used: > > package Test; > import org.apache.flink.api.common.functions.MapFunction; > import org.apache.flink.api.java.DataSet; > import org.apache.flink.api.java.ExecutionEnvironment; > import com.Flink.Operators.Source; > > public class FlinkExecutor { > public static class RainPOJO { > private String altitude; > private String city_name; > private String latitude; > private String longitude; > private String rainfall; > private String station_name; > private String time; > public String getAltitude() { > return altitude; > } > public void setAltitude(String Altitude) { > this.altitude = Altitude; > } > public String getCity_name() { > return city_name; > } > public void setCity_name(String City_name) { > this.city_name = City_name; > } > public String getLatitude() { > return latitude; > } > public void setLatitude(String Latitude) { > this.latitude = Latitude; > } > public String getLongitude() { > return longitude; > } > public void setLongitude(String Longitude) { > this.longitude = Longitude; > } > public String getRainfall() { > return rainfall; > } > public void setRainfall(String Rainfall) { > this.rainfall = Rainfall; > } > public String getStation_name() { > return station_name; > } > public void setStation_name(String Station_name) { > this.station_name = Station_name; > } > public String getTime() { > return time; > } > public void setTime(String Time) { > this.time = Time; > } > } > public FlinkExecutor() {} > public static void main(String[] args) throws Exception { > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > env.setDegreeOfParallelism(1); > Source Source = new Source("sensor", "rain"); > String path_Source = Source.getCSVPath(); > DataSet < RainPOJO > ds_s1 = env.readCsvFile("file://" + path_Source) > .ignoreFirstLine() > .pojoType(RainPOJO.class, "altitude", "city_name", "latitude", > "longitude", "rainfall", "station_name", "time"); > long size = ds_s1.count(); > long startTime = System.currentTimeMillis(); > ds_s1.map(new MapFunction < RainPOJO, String > () { > int count = 0;@Override > public String map(RainPOJO obj) throws Exception { > count += 1; > long endTime = System.currentTimeMillis(); > double elapsed_time = endTime - startTime; > if (count == size) { > double d_seconds = elapsed_time / 1000; > return "Elapsed time => " + elapsed_time + "(millis) " + d_seconds + " > seconds"; > } > return " " + count; > } > }) > .print(); > } > } >