Hi Henkka, The behavior of the text file source meets expectation. Flink will not keep your source task thread when it exit from it's invoke method. That means you should keep your source task alive. So to implement this, you should customize a text file source (implement SourceFunction interface).
For your requirement, you can check a no more data idle time, if expire, then exit, finally the job will stop. You can also refer the implementation of other source connectors. Thanks, vino. 2018-07-19 19:52 GMT+08:00 Henri Heiskanen <henri.heiska...@gmail.com>: > Hi, > > I've been looking into how to initialise large state and especially > checked this presentation by Lyft referenced in this group as well: > https://www.youtube.com/watch?v=WdMcyN5QZZQ > > In our use case we would like to load roughly 4 billion entries into this > state and I believe loading this data from s3, creating a savepoint and > then restarting in streaming mode from a savepoint would work very well. In > the presentation I get an impression that I could read from s3 and when all > done (without any custom termination detector etc) I could just make a > savepoint by calling the rest api from the app. However, I've noticed that > if I read data from files the job will auto-terminate when all data is read > and job appears not to be running even if I add the sleep in the main > program (very simple app attached below). > > I could use FileProcessingMode.PROCESS_CONTINUOUSLY to prevent the job > from terminating and create the savepoint from outside the app, but that > would require termination detection etc and would make the solution less > clean. > > Has anyone more details how I could accomplish this? > > Br, > Henkka > > public class StreamingJob { > > public static void main(String[] args) throws Exception { > if (args.length == 0) { > args = "--initFile init.csv".split(" "); > } > > // set up the streaming execution environment > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > ParameterTool params = ParameterTool.fromArgs(args); > > String initFile = params.get("initFile"); > if (initFile != null) { > env.readTextFile(initFile).map(new MapFunction<String, > Tuple4<String, String, String, String>>() { > @Override > public Tuple4<String, String, String, String> map(String s) > throws Exception { > String[] data = s.split(","); > return new Tuple4<String, String, String, String>(data[0], > data[1], data[2], data[3]); > } > }).keyBy(0, 1).map(new ProfileInitMapper()); > } > > // execute program > env.execute("Flink Streaming Java API Skeleton"); > > // when all data read, save the state > Thread.sleep(10000); > } > } > > > >