Hi Sihua,

I will test keying by device ID. I was trying to implement this suggestion:
https://stackoverflow.com/a/49395606, but I guess that may be unnecessary
in my case.

Thanks,

Mike

On Wed, May 23, 2018 at 11:30 PM, sihua zhou <summerle...@163.com> wrote:

> Hi Mike,
> if I'm not misunderstand, you are doing aggregation for every device on
> the stream. You mentioned that, you want to use the MapState to store the
> state for each device ID? this is a bit confusing to me, I think what you
> need maybe a ValueState. In flink, every keyed state(Value, MapState,...so
> on) is already scoped to the key that you keyed. For example,
> *source.keyBy(deviceId).process(processFunction);*
> if you keyBy the source by deviceId, then in processFunction every keyed
> state is scoped to the deviceID internally, you don't need to use the
> MapState<DeviceID, ?> to maintance the device state yourself.
>
> Concerning to the TTL question. I think the tumbling windows & the
> per-window state is enough for you, than that is a better way to go
> currently.
>
> Best,
> Sihua
> On 05/24/2018 13:46,Mike Urbach<mikeurb...@gmail.com>
> <mikeurb...@gmail.com> wrote:
>
> Hi,
>
> I have a two-part question related to processing and storing large amounts
> of time-series data. The first part is related to the preferred way to keep
> state on the time-series data in an efficient way, and the second part is
> about how to further enrich the processed data and feed it back into the
> state.
>
> For the sake of discussion, let's say that I am tracking tens to hundreds
> of millions of IoT devices. This could grow but that's what I'm looking at
> right now. I will receive an initial event from each device, as well as an
> unknown number of subsequent events. I will need to aggregate together all
> the events related to one device for some period of time after the initial
> event, say 1 hour, at which point I can discard the state. After that, I
> will never hear from that device again. (I'm not actually working with IoT
> devices, but that is the gist. At any given point in time, I will have
> millions of active keys, and as some keys expire new keys are added).
>
> The output of my application should contain the full state for a given
> device, and a new output should be generated every time a new event comes
> in. This application must be fault tolerant. I am currently checkpointing
> my state using the RocksDB state backend.
>
> Part 1 of my question is how best to manage this state. This sounds like
> an excellent use case for State TTL (https://cwiki.apache.org/conf
> luence/display/FLINK/FLIP-25%3A+Support+User+State+TTL+Natively). Since
> this is still a pending feature under active discussion, I did some reading
> about how others have dealt with similar use-cases. What I gleaned boils
> down to this: naively storing everything in one large MapState keyed by
> device ID, and using Triggers to clear the state 1 hour after the initial
> event will lead to far to many Triggers to be efficient.
>
> An alternate approach is to bucket my devices into a far smaller amount of
> keys (not in the millions, maybe thousands), and maintain a MapState for
> each bucket. I can fire a Trigger every minute for every bucket, and
> iterate over the MapState to clear any state that has past its TTL.
>
> A similar, alternative approach is to use tumbling Windows to achieve the
> same effect. Every incoming event has a copy of the timestamp of the
> initial event for that device (think of it as when the device came online),
> so I can use that for event time, and let the watermarks lag by 1 hour. The
> devices are bucketed into some fixed amount of keys like above, so I will
> have a Window for each bucket, for each time slice. The Window has a
> Trigger that eagerly fires and purges each element, and a
> ProcessWindowFunction updates a MapState using per-window state, so that
> when a Window expires I can clear the state. I am currently using this
> approach, since it uses Flink's own Windowing and per-window state to clear
> old data, rather than manually doing it with Triggers.
>
> Other than waiting for the State TTL feature, is there a more efficient
> approach to maintain the aggregate state of all events related to one
> device, and output this every time a new event arrives?
>
> Part 2 of my question relates to how I can enrich the state I have
> accumulated before generating outputs. I have some set of enrichments I'd
> like to do using AsyncFunctions to call out to external services. The issue
> is some enrichments require data that may never be present on any one
> event; I need to work with the stream of aggregated data described above to
> be able to make some of those calls. Furthermore, some enrichments might
> need the data added by other enrichments. I would like to feed the enriched
> data back into the state.
>
> This initially sounded like a perfect use case for an IterativeStream,
> until I tried to use it and realized the only way to enable checkpointing
> was to force it using a feature that is deprecated in Flink 1.4.2. Is that
> approach a dead end? If checkpoints will never be supported for
> IterativeStream, I don't want to explore this route, but it would be nice
> if checkpointed IterativeStreams are on the roadmap, or at least a
> possibility.
>
> Now I'm kind of stumped. The only way I can think of aggregating together
> all the state *before* applying enrichments, and feeding the enriched data
> back into that state *after* the enrichments is to sink the enriched data
> to Kafka or something, and then create a source that reads it back and
> feeds into the operator that keeps the state. That works, but I'd prefer to
> keep all the data flowing within the Flink application if possible. Are
> there other approaches to creating feedback loops that play well with fault
> tolerance and checkpoints?
>
> I appreciate any suggestions related to the two points above.
>
> Thanks,
>
> Mike Urbach
>
>
> --
> Mike Urbach
>
>


-- 
Mike Urbach

Reply via email to