Hi guys,
I'm developing a dynamic Flink program composer, which receive a dataflow
from a client and convert it into Flink code.

I have tried to compile a test Flink program with Janino, but it fails, the
error I receive is:
org.codehaus.commons.compiler.CompileException: Line 66, Column 0:
Non-abstract class "FlinkExecutor$1" must implement method "public abstract
java.lang.Object
org.apache.flink.api.common.functions.MapFunction.map(java.lang.Object)
throws java.lang.Exception"

It seems Janino doesn't recognize the MapFunction.

If I put this code into a java file and I execute it with Eclipse,
everything works good.

Here the code I used:

package Test;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import com.Flink.Operators.Source;

public class FlinkExecutor {
public static class RainPOJO {
private String altitude;
private String city_name;
private String latitude;
private String longitude;
private String rainfall;
private String station_name;
private String time;
public String getAltitude() {
return altitude;
}
public void setAltitude(String Altitude) {
this.altitude = Altitude;
}
public String getCity_name() {
return city_name;
}
public void setCity_name(String City_name) {
this.city_name = City_name;
}
public String getLatitude() {
return latitude;
}
public void setLatitude(String Latitude) {
this.latitude = Latitude;
}
public String getLongitude() {
return longitude;
}
public void setLongitude(String Longitude) {
this.longitude = Longitude;
}
public String getRainfall() {
return rainfall;
}
public void setRainfall(String Rainfall) {
this.rainfall = Rainfall;
}
public String getStation_name() {
return station_name;
}
public void setStation_name(String Station_name) {
this.station_name = Station_name;
}
public String getTime() {
return time;
}
public void setTime(String Time) {
this.time = Time;
}
}
public FlinkExecutor() {}
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(1);
Source Source = new Source("sensor", "rain");
String path_Source = Source.getCSVPath();
DataSet < RainPOJO > ds_s1 = env.readCsvFile("file://" + path_Source)
.ignoreFirstLine()
.pojoType(RainPOJO.class, "altitude", "city_name", "latitude", "longitude",
"rainfall", "station_name", "time");
long size = ds_s1.count();
long startTime = System.currentTimeMillis();
ds_s1.map(new MapFunction < RainPOJO, String > () {
int count = 0;@Override
public String map(RainPOJO obj) throws Exception {
count += 1;
long endTime = System.currentTimeMillis();
double elapsed_time = endTime - startTime;
if (count == size) {
double d_seconds = elapsed_time / 1000;
return "Elapsed time => " + elapsed_time + "(millis) " + d_seconds + "
seconds";
}
return " " + count;
}
})
.print();
}
}

Reply via email to