Hi, With state bootstrapping I mean loading the state with initial data before starting the actual job. For example, in our case I would like to load information like registration date of our users (>5 years of data) so that I can enrich our event data in streaming (5 days retention).
Before watching the presentation by Lyft, I was loading this data per key from Cassandra DB in the mapper if the state was not found. Br, Henkka Br, Henkka On Fri, Jul 20, 2018 at 7:03 PM Vino yang <[email protected]> wrote: > Hi Henkka, > > If you want to customize the datastream text source for your purpose. You > can use a read counter, if the value of counter would not change in a > interval you can guess all the data has been read. Just a idea, you can > choose other solution. > > About creating a savepoint automatically on job exists, it sounds a good > idea. I did not know any plan about this, I would try to submit this idea > to the community. > > And about "how to bootstrap a state", what does that mean? can you explain > this? > > Thank, vino > > > On 2018-07-20 20:00 , Henri Heiskanen <[email protected]> Wrote: > > Hi, > > Thanks. Just to clarify, where would you then invoke the savepoint > creation? I basically need to know when all data is read, create a > savepoint and then exit. I think I could just as well use the > PROCESS_CONTINUOSLY, monitor the stream outside and then use rest api to > cancel with savepoint. > > Any plans to have feature where I could choose Flink to make a savepoint > on job exists? I am also keen on hearing other ideas how to bootstrap a > state. I was initially thinking of just reading data from Cassandra if no > state available. > > Br, > Henkka > > On Thu, Jul 19, 2018 at 3:15 PM vino yang <[email protected]> wrote: > >> Hi Henkka, >> >> The behavior of the text file source meets expectation. Flink will not >> keep your source task thread when it exit from it's invoke method. That >> means you should keep your source task alive. So to implement this, you >> should customize a text file source (implement SourceFunction interface). >> >> For your requirement, you can check a no more data idle time, if expire, >> then exit, finally the job will stop. >> >> You can also refer the implementation of other source connectors. >> >> Thanks, vino. >> >> 2018-07-19 19:52 GMT+08:00 Henri Heiskanen <[email protected]>: >> >>> Hi, >>> >>> I've been looking into how to initialise large state and especially >>> checked this presentation by Lyft referenced in this group as well: >>> https://www.youtube.com/watch?v=WdMcyN5QZZQ >>> >>> In our use case we would like to load roughly 4 billion entries into >>> this state and I believe loading this data from s3, creating a savepoint >>> and then restarting in streaming mode from a savepoint would work very >>> well. In the presentation I get an impression that I could read from s3 and >>> when all done (without any custom termination detector etc) I could just >>> make a savepoint by calling the rest api from the app. However, I've >>> noticed that if I read data from files the job will auto-terminate when all >>> data is read and job appears not to be running even if I add the sleep in >>> the main program (very simple app attached below). >>> >>> I could use FileProcessingMode.PROCESS_CONTINUOUSLY to prevent the job >>> from terminating and create the savepoint from outside the app, but that >>> would require termination detection etc and would make the solution less >>> clean. >>> >>> Has anyone more details how I could accomplish this? >>> >>> Br, >>> Henkka >>> >>> public class StreamingJob { >>> >>> public static void main(String[] args) throws Exception { >>> if (args.length == 0) { >>> args = "--initFile init.csv".split(" "); >>> } >>> >>> // set up the streaming execution environment >>> final StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> >>> ParameterTool params = ParameterTool.fromArgs(args); >>> >>> String initFile = params.get("initFile"); >>> if (initFile != null) { >>> env.readTextFile(initFile).map(new MapFunction<String, >>> Tuple4<String, String, String, String>>() { >>> @Override >>> public Tuple4<String, String, String, String> map(String s) >>> throws Exception { >>> String[] data = s.split(","); >>> return new Tuple4<String, String, String, String>(data[0], >>> data[1], data[2], data[3]); >>> } >>> }).keyBy(0, 1).map(new ProfileInitMapper()); >>> } >>> >>> // execute program >>> env.execute("Flink Streaming Java API Skeleton"); >>> >>> // when all data read, save the state >>> Thread.sleep(10000); >>> } >>> } >>> >>> >>> >>> >>
