If I'm reading Flink manul correctly (and this is not simple - no examples), this code should read CSV file:
 
 
package flinkTest2;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
 
public class flinkTest2 {

    public static void main(String[] args) throws Exception {

    EnvironmentSettings settings = EnvironmentSettings
            .newInstance()
            //.inStreamingMode()
            .inBatchMode()
            .build();
    final TableEnvironment tEnv = TableEnvironment.create(settings);
    
    tEnv.executeSql("CREATE TABLE test_table (column_name1 STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', 'connector.path' = 'file://C/temp/test4.txt', 'format.type' = 'csv')");
    
    tEnv.sqlQuery("SELECT COUNT(*) AS some_value FROM test_table")
    .execute()
    .print();
 }
}
 
Job is running and running and running...
I think Flink is not able to open file. Does not matter what is here: 'connector.path' = 'file://C/temp/test4.txt' Flink does not display any error or something. 'connector.path' = 'file:blah/blah/blah' could be fine for flink as well.
 
Anyone could help me with that?
Thanks
Mike
 

Reply via email to