If I'm reading Flink manul correctly (and this is not simple - no examples), this code should read CSV file:
package flinkTest2;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
public class flinkTest2 {
public static void main(String[] args) throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
//.inStreamingMode()
.inBatchMode()
.build();
final TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.executeSql("CREATE TABLE test_table (column_name1 STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', 'connector.path' = 'file://C/temp/test4.txt', 'format.type' = 'csv')");
tEnv.sqlQuery("SELECT COUNT(*) AS some_value FROM test_table")
.execute()
.print();
}
}
}
Job is running and running and running...
I think Flink is not able to open file. Does not matter what is here: 'connector.path' = 'file://C/temp/test4.txt' Flink does not display any error or something. 'connector.path' = 'file:blah/blah/blah' could be fine for flink as well.
Anyone could help me with that?
Thanks
Mike