Hi,

With state bootstrapping I mean loading the state with initial data before
starting the actual job. For example, in our case I would like to load
information like registration date of our users (>5 years of data) so that
I can enrich our event data in streaming (5 days retention).

Before watching the presentation by Lyft, I was loading this data per key
from Cassandra DB in the mapper if the state was not found.

Br,
Henkka

Br,
Henkka

On Fri, Jul 20, 2018 at 7:03 PM Vino yang <[email protected]> wrote:

> Hi Henkka,
>
> If you want to customize the datastream text source for your purpose. You
> can use a read counter, if the value of counter would not change in a
> interval you can guess all the data has been read. Just a idea, you can
> choose other solution.
>
> About creating a savepoint automatically on job exists, it sounds a good
> idea. I did not know any plan about this, I would try to submit this idea
> to the community.
>
> And about "how to bootstrap a state", what does that mean? can you explain
> this?
>
> Thank, vino
>
>
> On 2018-07-20 20:00 , Henri Heiskanen <[email protected]> Wrote:
>
> Hi,
>
> Thanks. Just to clarify, where would you then invoke the savepoint
> creation? I basically need to know when all data is read, create a
> savepoint and then exit. I think I could just as well use the
> PROCESS_CONTINUOSLY, monitor the stream outside and then use rest api to
> cancel with savepoint.
>
> Any plans to have feature where I could choose Flink to make a savepoint
> on job exists? I am also keen on hearing other ideas how to bootstrap a
> state. I was initially thinking of just reading data from Cassandra if no
> state available.
>
> Br,
> Henkka
>
> On Thu, Jul 19, 2018 at 3:15 PM vino yang <[email protected]> wrote:
>
>> Hi Henkka,
>>
>> The behavior of the text file source meets expectation. Flink will not
>> keep your source task thread when it exit from it's invoke method. That
>> means you should keep your source task alive. So to implement this, you
>> should customize a text file source (implement SourceFunction interface).
>>
>>  For your requirement, you can check a no more data idle time, if expire,
>> then exit, finally the job will stop.
>>
>> You can also refer the implementation of other source connectors.
>>
>> Thanks, vino.
>>
>> 2018-07-19 19:52 GMT+08:00 Henri Heiskanen <[email protected]>:
>>
>>> Hi,
>>>
>>> I've been looking into how to initialise large state and especially
>>> checked this presentation by Lyft referenced in this group as well:
>>> https://www.youtube.com/watch?v=WdMcyN5QZZQ
>>>
>>> In our use case we would like to load roughly 4 billion entries into
>>> this state and I believe loading this data from s3, creating a savepoint
>>> and then restarting in streaming mode from a savepoint would work very
>>> well. In the presentation I get an impression that I could read from s3 and
>>> when all done (without any custom termination detector etc) I could just
>>> make a savepoint by calling the rest api from the app. However, I've
>>> noticed that if I read data from files the job will auto-terminate when all
>>> data is read and job appears not to be running even if I add the sleep in
>>> the main program (very simple app attached below).
>>>
>>> I could use FileProcessingMode.PROCESS_CONTINUOUSLY to prevent the job
>>> from terminating and create the savepoint from outside the app, but that
>>> would require termination detection etc and would make the solution less
>>> clean.
>>>
>>> Has anyone more details how I could accomplish this?
>>>
>>> Br,
>>> Henkka
>>>
>>> public class StreamingJob {
>>>
>>>    public static void main(String[] args) throws Exception {
>>>       if (args.length == 0) {
>>>          args = "--initFile init.csv".split(" ");
>>>       }
>>>
>>>       // set up the streaming execution environment
>>>       final StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>
>>>       ParameterTool params = ParameterTool.fromArgs(args);
>>>
>>>       String initFile = params.get("initFile");
>>>       if (initFile != null) {
>>>          env.readTextFile(initFile).map(new MapFunction<String, 
>>> Tuple4<String, String, String, String>>() {
>>>             @Override
>>>             public Tuple4<String, String, String, String> map(String s) 
>>> throws Exception {
>>>                String[] data = s.split(",");
>>>                return new Tuple4<String, String, String, String>(data[0], 
>>> data[1], data[2], data[3]);
>>>             }
>>>          }).keyBy(0, 1).map(new ProfileInitMapper());
>>>       }
>>>
>>>       // execute program
>>>       env.execute("Flink Streaming Java API Skeleton");
>>>
>>>       // when all data read, save the state
>>>       Thread.sleep(10000);
>>>    }
>>> }
>>>
>>>
>>>
>>>
>>

Reply via email to