Thanks FG.
    On Friday, February 18, 2022, 02:54:44 AM EST, Francesco Guardiani 
<france...@ververica.com> wrote:  
 
 Hi,
Filesystem source directory watching is going to be available from 1.15: 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#directory-watching

FG

On Fri, Feb 18, 2022 at 1:28 AM M Singh <mans2si...@yahoo.com> wrote:

Hi:  
I have a simple application and am using file system connector to monitor a 
directory and then print to the console (using datastream).  However, the 
application stops after reading the file in the directory (at the moment I have 
a single file in the directory).   I am using Apache Flink version 1.14.3.
 believe there is a configuration option to be used in the 'with' clause but I 
could not find the right config - I tried 'streaming-source.enable' = 'true' 
but that results in exception.
I have also tried using EnvironmentSettings in streaming mode (as shown below) 
but still the application stops after reading the file in the directory.
Here is the code segment:
 import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import 
org.apache.flink.table.api.EnvironmentSettings;import 
org.apache.flink.table.api.Table;import 
org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class TestApplication {
    public static void main(String [] args) throws Exception {        
StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();        
EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inStreamingMode().build();        
StreamTableEnvironment tEnv = StreamTableEnvironment.create(see, settings);
        tEnv.executeSql(                "  CREATE TEMPORARY TABLE events (" +   
                     "  `event_id` STRING" +                        ")" +       
                 "WITH (" +                        "  'connector' = 
'filesystem'," +                        "  'path' = 
'./src/main/resources/events/'," +                        "  'format' = 'json'" 
+                        ")"        );
        Table events = tEnv.sqlQuery(                "SELECT * from events"     
   );        tEnv.toDataStream(events).print("events");
        see.execute();    }}
Here is the console output:
events:7> +I[8b8fabde-45f5-4e94-b6af-7cd1396a11e9]
Process finished with exit code 0

Thanks
  

Reply via email to