My sourcefunction is intrinsically single-thread. Is there a way to force this aspect? I can't find a real difference between a RichParallelSourceFunction and a RichSourceFunction. Is this last (RichSourceFunction) implicitly using parallelism = 1?
On Wed, Jun 19, 2019 at 2:25 PM Chesnay Schepler <ches...@apache.org> wrote: > It returns a list of states so that state can be re-distributed if the > parallelism changes. > > If you hard-code the interface to return a single value then you're > implicitly locking the parallelism. > When you reduce the parallelism you'd no longer be able to restore all > state, since you have less instances than stored state. > > On 19/06/2019 14:19, Flavio Pompermaier wrote: > > It's not clear to me why the source checkpoint returns a list of > object...when it could be useful to use a list instead of a single value? > The documentation says The returned list should contain one entry for > redistributable unit of state" but this is not very clear to me.. > > Best, > Flavio > > On Wed, Jun 19, 2019 at 12:40 PM Chesnay Schepler <ches...@apache.org> > wrote: > >> This looks fine to me. >> >> What exactly were you worried about? >> >> On 19/06/2019 12:33, Flavio Pompermaier wrote: >> > Hi to all, >> > in my use case I have to ingest data from a rest service, where I >> > periodically poll the data (of course a queue would be a better choice >> > but this doesn't depend on me). >> > >> > So I wrote a RichSourceFunction that starts a thread that poll for new >> > data. >> > However, I'd like to restart from the last "from" value (in the case >> > the job is stopped). >> > >> > My initial thought was to write somewhere the last used date and, on >> > job restart, read that date (from a file for example). However, Flink >> > stateful source should be a better choice here...am I wrong? So I >> > made my source function implementing ListCheckpointed<String>: >> > >> > @Override >> > public List<String> snapshotState(long checkpointId, long timestamp) >> > throws Exception { >> > return >> Collections.singletonList(pollingThread.getDateFromAsString()); >> > } >> > @Override >> > public void restoreState(List<String> state) throws Exception { >> > for (String dateFrom : state) { >> > startDateStr = dateFrom; >> > } >> > } >> > >> > @Override >> > public void run(SourceContext<MyEvent> ctx) throws Exception { >> > final Object lock = ctx.getCheckpointLock(); >> > Client httpClient = getHttpClient(); >> > try { >> > pollingThread = new MyPollingThread.Builder(baseUrl, >> > httpClient)// >> > .setStartDate(startDateStr, datePatternStr)// >> > .build(); >> > // start the polling thread >> > new Thread(pr).start(); >> > .... (etc) >> > } >> > >> > Is this the correct approach or did I misunderstood how stateful >> > source functions work? >> > >> > Best, >> > Flavio >> >> >> > >