Hi Sihua, I will test keying by device ID. I was trying to implement this suggestion: https://stackoverflow.com/a/49395606, but I guess that may be unnecessary in my case.
Thanks, Mike On Wed, May 23, 2018 at 11:30 PM, sihua zhou <summerle...@163.com> wrote: > Hi Mike, > if I'm not misunderstand, you are doing aggregation for every device on > the stream. You mentioned that, you want to use the MapState to store the > state for each device ID? this is a bit confusing to me, I think what you > need maybe a ValueState. In flink, every keyed state(Value, MapState,...so > on) is already scoped to the key that you keyed. For example, > *source.keyBy(deviceId).process(processFunction);* > if you keyBy the source by deviceId, then in processFunction every keyed > state is scoped to the deviceID internally, you don't need to use the > MapState<DeviceID, ?> to maintance the device state yourself. > > Concerning to the TTL question. I think the tumbling windows & the > per-window state is enough for you, than that is a better way to go > currently. > > Best, > Sihua > On 05/24/2018 13:46,Mike Urbach<mikeurb...@gmail.com> > <mikeurb...@gmail.com> wrote: > > Hi, > > I have a two-part question related to processing and storing large amounts > of time-series data. The first part is related to the preferred way to keep > state on the time-series data in an efficient way, and the second part is > about how to further enrich the processed data and feed it back into the > state. > > For the sake of discussion, let's say that I am tracking tens to hundreds > of millions of IoT devices. This could grow but that's what I'm looking at > right now. I will receive an initial event from each device, as well as an > unknown number of subsequent events. I will need to aggregate together all > the events related to one device for some period of time after the initial > event, say 1 hour, at which point I can discard the state. After that, I > will never hear from that device again. (I'm not actually working with IoT > devices, but that is the gist. At any given point in time, I will have > millions of active keys, and as some keys expire new keys are added). > > The output of my application should contain the full state for a given > device, and a new output should be generated every time a new event comes > in. This application must be fault tolerant. I am currently checkpointing > my state using the RocksDB state backend. > > Part 1 of my question is how best to manage this state. This sounds like > an excellent use case for State TTL (https://cwiki.apache.org/conf > luence/display/FLINK/FLIP-25%3A+Support+User+State+TTL+Natively). Since > this is still a pending feature under active discussion, I did some reading > about how others have dealt with similar use-cases. What I gleaned boils > down to this: naively storing everything in one large MapState keyed by > device ID, and using Triggers to clear the state 1 hour after the initial > event will lead to far to many Triggers to be efficient. > > An alternate approach is to bucket my devices into a far smaller amount of > keys (not in the millions, maybe thousands), and maintain a MapState for > each bucket. I can fire a Trigger every minute for every bucket, and > iterate over the MapState to clear any state that has past its TTL. > > A similar, alternative approach is to use tumbling Windows to achieve the > same effect. Every incoming event has a copy of the timestamp of the > initial event for that device (think of it as when the device came online), > so I can use that for event time, and let the watermarks lag by 1 hour. The > devices are bucketed into some fixed amount of keys like above, so I will > have a Window for each bucket, for each time slice. The Window has a > Trigger that eagerly fires and purges each element, and a > ProcessWindowFunction updates a MapState using per-window state, so that > when a Window expires I can clear the state. I am currently using this > approach, since it uses Flink's own Windowing and per-window state to clear > old data, rather than manually doing it with Triggers. > > Other than waiting for the State TTL feature, is there a more efficient > approach to maintain the aggregate state of all events related to one > device, and output this every time a new event arrives? > > Part 2 of my question relates to how I can enrich the state I have > accumulated before generating outputs. I have some set of enrichments I'd > like to do using AsyncFunctions to call out to external services. The issue > is some enrichments require data that may never be present on any one > event; I need to work with the stream of aggregated data described above to > be able to make some of those calls. Furthermore, some enrichments might > need the data added by other enrichments. I would like to feed the enriched > data back into the state. > > This initially sounded like a perfect use case for an IterativeStream, > until I tried to use it and realized the only way to enable checkpointing > was to force it using a feature that is deprecated in Flink 1.4.2. Is that > approach a dead end? If checkpoints will never be supported for > IterativeStream, I don't want to explore this route, but it would be nice > if checkpointed IterativeStreams are on the roadmap, or at least a > possibility. > > Now I'm kind of stumped. The only way I can think of aggregating together > all the state *before* applying enrichments, and feeding the enriched data > back into that state *after* the enrichments is to sink the enriched data > to Kafka or something, and then create a source that reads it back and > feeds into the operator that keeps the state. That works, but I'd prefer to > keep all the data flowing within the Flink application if possible. Are > there other approaches to creating feedback loops that play well with fault > tolerance and checkpoints? > > I appreciate any suggestions related to the two points above. > > Thanks, > > Mike Urbach > > > -- > Mike Urbach > > -- Mike Urbach