On thing to add: the Flink KafkaProducer provides only at-least-once if flush-on-checkpoint is enabled [1].
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.html#setFlushOnCheckpoint-boolean- 2017-01-13 22:02 GMT+01:00 Fabian Hueske <fhue...@gmail.com>: > Hi Kat, > > I did not understand the difference between a case and a trace. > If I got it right, the goal of your first job is to assemble the > individual events into cases. Is a case here the last event for a case-id > or all events of a case-id? > If a case is the collection of all events (which I assume) what is the > difference to a trace which is also the list of events (if I got it right)? > > In any case, I think your first job can also be solved without a session > window (which is quite complex internally). > There are two options: > 1) use a global window [1] with a custom trigger that triggers for each > arriving record. A global window does never end, which would be OK since > your cases do not end as well. > 2) use a MapFunction with key-partitioned operator state [2]. The map > function would simply update the state for every new event and emit a new > result. > > Regarding your concerns of losing data when writing to Kafka. Flink's > KafkaProducer provides at-least-once guarantees, which means that data > might be written more than once in case of a failure but won't be lost. If > the Kafka topic is partitioned by case-id and you only need the last record > per case-id, Kafka's log compaction should give you upsert semantics. > > Regarding your question "Is using state in this way a somewhat standard > practice, or is state intended more for recovery?": > Many streaming applications require state for their semantics (just like > yours), i.e., they need to buffer data and wait for more data to arrive. In > order to guarantee consistent result semantics of an application, the state > must not be lost and be recovered in case of a failure. So state is not > intended for recovery, but recovery is needed to guarantee application > semantics. > > As I said before, I did not get the difference between cases and trace, so > I cannot really comment on the job to analyze traces. > > Hope this helps, > Fabian > > [1] https://ci.apache.org/projects/flink/flink-docs- > release-1.1/apis/streaming/windows.html#global-windows > [2] https://ci.apache.org/projects/flink/flink-docs- > release-1.1/apis/streaming/state.html#using-the-keyvalue-state-interface > > 2017-01-13 11:04 GMT+01:00 Kathleen Sharp <kathleen.sh...@signavio.com>: > >> I have been playing around with Flink for a few weeks to try to >> ascertain whether or not it meets our use cases, and also what best >> practices we should be following. I have a few questions I would >> appreciate answers to. >> >> >> Our scenario is that we want to process a lot of event data into >> cases. A case is an inorder sequence of events; this event data could >> be quite old. We never know when a case is complete, so we just want >> to have the most up to date picture of what a case looks like. >> >> >> The inorder sequence of events of a case is called the trace. Many >> cases could have an identical trace. We would like to construct these >> traces, and do some aggregations on those (case count, average/min/max >> life-cycle time). >> >> >> We then have further downstream processing we will do on a case, some >> of which would require additional inputs, either from side-inputs of >> somehow joining data sources. >> >> >> We don’t really care about event time at the moment, because we just >> want to build cases and traces with all the data we have received. >> >> >> The end results should be available for our web front end via rest api. >> >> >> Based on the above I have the following idea for a first implementation: >> >> >> Kafka source -> key by case id -> session window with rocks db state >> backend holding case for that key -> postgres sink >> >> >> The reason for a session window is that, as I mentioned above, we just >> want to build a group with all the data we have received into kafka up >> until that point in time. We would experiment with what this gap time >> should be, and in future it might be specific to the type of group, >> but for the start a naive approach is acceptable. I think this could >> be better than just doing it, say, every 10 minutes because we really >> don’t know yet the frequency of the data received. Also, some inputs >> to kafka come directly from a CSV upload, so we will get “firehose” >> periods, and periods of nothing. >> >> In short: I think what we have closely matches session behaviour. >> >> >> We also have to implement a postgres sink that is capable of doing >> upserts. The reason for postgres is to service the rest front end. >> >> >> We then have to build our traces and can see two options for it: >> >> >> 1) The most obvious solution would be to use a kafka sink for the >> keyed case stream, and to do the trace aggregations in a downstream >> flink job with this kafka topic as a source. However, I have some >> concerns over losing any data (i.e. how do we know whether or not an >> event has been successfully pushed into the kafka stream). >> >> >> 2) Another approach might be to use some other type of sink (perhaps >> postgres), and to use this as a source for the traces job. This would >> help us guarantee data consistency. >> >> >> 3) Or, to somehow re-merge the keyed cases stream (is this a broadcast?), >> so: >> >> Keyed cases stream -> broadcast -> key by tracehash with rocks db >> state backend holding trace for that tracehash -> perform >> aggregrations -> postgres sink >> >> Is broadcast an option here? How costly is it? >> >> >> Which of these approaches (or any other), would you recommend? >> >> >> ------------------------------------- >> >> Another question regarding the state: >> >> As we never know when a case is complete this means that the rocksdb >> backend could grow infinitely (!). Obviously we would need to get a >> bit smarter here. >> >> >> Is using state in this way a somewhat standard practice, or is state >> intended more for recovery? >> >> Managing growing state: I found some discussion regarding how to clear >> state here http://apache-flink-user-mailing-list-archive.2336050.n4. >> nabble.com/Time-To-Live-Setting-for-State-StateDescriptor- >> td10391.html#a10402 >> which references https://issues.apache.org/jira/browse/FLINK-3946 >> >> Thanks, >> >> Kat >> > >