Hi Henkka, If you want to customize the datastream text source for your 
purpose. You can use a read counter, if the value of counter would not change 
in a interval you can guess all the data has been read. Just a idea, you can 
choose other solution. About creating a savepoint automatically on job exists, 
it sounds a good idea. I did not know any plan about this, I would try to 
submit this idea to the community. And about "how to bootstrap a state", what 
does that mean? can you explain this? Thank, vino On 2018-07-20 20:00 , Henri 
Heiskanen Wrote: Hi, Thanks. Just to clarify, where would you then invoke the 
savepoint creation? I basically need to know when all data is read, create a 
savepoint and then exit. I think I could just as well use the 
PROCESS_CONTINUOSLY, monitor the stream outside and then use rest api to cancel 
with savepoint. Any plans to have feature where I could choose Flink to make a 
savepoint on job exists? I am also keen on hearing other ideas how to bootstrap 
a state. I was initially thinking of just reading data from Cassandra if no 
state available. Br, Henkka On Thu, Jul 19, 2018 at 3:15 PM vino yang 
<yanghua1...@gmail.com> wrote: Hi Henkka, The behavior of the text file source 
meets expectation. Flink will not keep your source task thread when it exit 
from it's invoke method. That means you should keep your source task alive. So 
to implement this, you should customize a text file source (implement 
SourceFunction interface). For your requirement, you can check a no more data 
idle time, if expire, then exit, finally the job will stop. You can also refer 
the implementation of other source connectors. Thanks, vino. 2018-07-19 19:52 
GMT+08:00 Henri Heiskanen <henri.heiska...@gmail.com>: Hi, I've been looking 
into how to initialise large state and especially checked this presentation by 
Lyft referenced in this group as well: 
https://www.youtube.com/watch?v=WdMcyN5QZZQ In our use case we would like to 
load roughly 4 billion entries into this state and I believe loading this data 
from s3, creating a savepoint and then restarting in streaming mode from a 
savepoint would work very well. In the presentation I get an impression that I 
could read from s3 and when all done (without any custom termination detector 
etc) I could just make a savepoint by calling the rest api from the app. 
However, I've noticed that if I read data from files the job will 
auto-terminate when all data is read and job appears not to be running even if 
I add the sleep in the main program (very simple app attached below). I could 
use FileProcessingMode.PROCESS_CONTINUOUSLY to prevent the job from terminating 
and create the savepoint from outside the app, but that would require 
termination detection etc and would make the solution less clean. Has anyone 
more details how I could accomplish this? Br, Henkka public class StreamingJob 
{    public static void main(String[] args) throws Exception {       if 
(args.length == 0) {          args = "--initFile init.csv".split(" ");       }  
     // set up the streaming execution environment       final 
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();       ParameterTool 
params = ParameterTool.fromArgs(args);       String initFile = 
params.get("initFile");       if (initFile != null) {          
env.readTextFile(initFile).map(new MapFunction<String, Tuple4<String, String, 
String, String>>() {             @Override             public Tuple4<String, 
String, String, String> map(String s) throws Exception {                
String[] data = s.split(",");                return new Tuple4<String, String, 
String, String>(data[0], data[1], data[2], data[3]);             }          
}).keyBy(0, 1).map(new ProfileInitMapper());       }       // execute program   
    env.execute("Flink Streaming Java API Skeleton");       // when all data 
read, save the state       Thread.sleep(10000);    } }

Reply via email to