Hi Kat, thanks for the clarification about cases and traces.
Regarding the aggregation of traces: You can either do that in the same job that constructs the cases or in a job which is decoupled by for instance Kafka. If I got your requirements right, you need a mechanism for retraction. A case (id: 1, event-1, event-3) would result in a trace (event-1, event-3) and go into the corresponding aggregates (e.g., increment a count by 1). If the case with id: 1 receives another event (say event-2), it would change its trace to (event-1, event-3, event-2) such that the counter of (event-1, event-3) needs to be decreased and the counter for the new trace (1, 3, 2) be increased. You can solve this by keeping the latest version of each case in a keyed-state (keyBy(case-id).flatMap()). Whenever, an update is received, you have to send out a retraction record for the old trace and an update record for the new trace. The aggregation would again be done on a keyed state (keyBy(trace-hash).map()). You might want to write the result into some kind of datastore with a primary key (trace-hash) to be able to update the results in place (updates cause by retraction and update trace records). This could be a relational DB (Postres) or a compacted Kafka topic. Hope this helps. Best, Fabian 2017-01-16 9:49 GMT+01:00 Kathleen Sharp <kathleen.sh...@signavio.com>: > Hi Fabian, > > A case consists of all events sharing the same case id. This id is > what we initially key the stream by. > > The order of these events is the trace. > > For example, > caseid: case1, consisting of event1, event2, event3. Start time 11:00, > end 11:05, run time 5 minutes > caseid: case12, consisting of event1, event2, event3 Start time 11:00, > end 11:15, run time 15 minutes > > These are 2 distinct cases, with the same trace (event1, event2, > event3). This trace would have 2 occurrences with a min run time of 5 > minutes, max 15 and average 10. > > I have implemented your 2nd suggestion for the first job, I hope I > have made the Traces clearer as I am still unsure of the best approach > here. > > Thanks a lot, > Kat > > On Fri, Jan 13, 2017 at 10:45 PM, Fabian Hueske <fhue...@gmail.com> wrote: > > On thing to add: the Flink KafkaProducer provides only at-least-once if > > flush-on-checkpoint is enabled [1]. > > > > [1] > > https://ci.apache.org/projects/flink/flink-docs- > release-1.1/api/java/org/apache/flink/streaming/connectors/kafka/ > FlinkKafkaProducerBase.html#setFlushOnCheckpoint-boolean- > > > > 2017-01-13 22:02 GMT+01:00 Fabian Hueske <fhue...@gmail.com>: > >> > >> Hi Kat, > >> > >> I did not understand the difference between a case and a trace. > >> If I got it right, the goal of your first job is to assemble the > >> individual events into cases. Is a case here the last event for a > case-id or > >> all events of a case-id? > >> If a case is the collection of all events (which I assume) what is the > >> difference to a trace which is also the list of events (if I got it > right)? > >> > >> In any case, I think your first job can also be solved without a session > >> window (which is quite complex internally). > >> There are two options: > >> 1) use a global window [1] with a custom trigger that triggers for each > >> arriving record. A global window does never end, which would be OK since > >> your cases do not end as well. > >> 2) use a MapFunction with key-partitioned operator state [2]. The map > >> function would simply update the state for every new event and emit a > new > >> result. > >> > >> Regarding your concerns of losing data when writing to Kafka. Flink's > >> KafkaProducer provides at-least-once guarantees, which means that data > might > >> be written more than once in case of a failure but won't be lost. If the > >> Kafka topic is partitioned by case-id and you only need the last record > per > >> case-id, Kafka's log compaction should give you upsert semantics. > >> > >> Regarding your question "Is using state in this way a somewhat standard > >> practice, or is state intended more for recovery?": > >> Many streaming applications require state for their semantics (just like > >> yours), i.e., they need to buffer data and wait for more data to > arrive. In > >> order to guarantee consistent result semantics of an application, the > state > >> must not be lost and be recovered in case of a failure. So state is not > >> intended for recovery, but recovery is needed to guarantee application > >> semantics. > >> > >> As I said before, I did not get the difference between cases and trace, > so > >> I cannot really comment on the job to analyze traces. > >> > >> Hope this helps, > >> Fabian > >> > >> [1] > >> https://ci.apache.org/projects/flink/flink-docs- > release-1.1/apis/streaming/windows.html#global-windows > >> [2] > >> https://ci.apache.org/projects/flink/flink-docs- > release-1.1/apis/streaming/state.html#using-the-keyvalue-state-interface > >> > >> 2017-01-13 11:04 GMT+01:00 Kathleen Sharp <kathleen.sh...@signavio.com > >: > >>> > >>> I have been playing around with Flink for a few weeks to try to > >>> ascertain whether or not it meets our use cases, and also what best > >>> practices we should be following. I have a few questions I would > >>> appreciate answers to. > >>> > >>> > >>> Our scenario is that we want to process a lot of event data into > >>> cases. A case is an inorder sequence of events; this event data could > >>> be quite old. We never know when a case is complete, so we just want > >>> to have the most up to date picture of what a case looks like. > >>> > >>> > >>> The inorder sequence of events of a case is called the trace. Many > >>> cases could have an identical trace. We would like to construct these > >>> traces, and do some aggregations on those (case count, average/min/max > >>> life-cycle time). > >>> > >>> > >>> We then have further downstream processing we will do on a case, some > >>> of which would require additional inputs, either from side-inputs of > >>> somehow joining data sources. > >>> > >>> > >>> We don’t really care about event time at the moment, because we just > >>> want to build cases and traces with all the data we have received. > >>> > >>> > >>> The end results should be available for our web front end via rest api. > >>> > >>> > >>> Based on the above I have the following idea for a first > implementation: > >>> > >>> > >>> Kafka source -> key by case id -> session window with rocks db state > >>> backend holding case for that key -> postgres sink > >>> > >>> > >>> The reason for a session window is that, as I mentioned above, we just > >>> want to build a group with all the data we have received into kafka up > >>> until that point in time. We would experiment with what this gap time > >>> should be, and in future it might be specific to the type of group, > >>> but for the start a naive approach is acceptable. I think this could > >>> be better than just doing it, say, every 10 minutes because we really > >>> don’t know yet the frequency of the data received. Also, some inputs > >>> to kafka come directly from a CSV upload, so we will get “firehose” > >>> periods, and periods of nothing. > >>> > >>> In short: I think what we have closely matches session behaviour. > >>> > >>> > >>> We also have to implement a postgres sink that is capable of doing > >>> upserts. The reason for postgres is to service the rest front end. > >>> > >>> > >>> We then have to build our traces and can see two options for it: > >>> > >>> > >>> 1) The most obvious solution would be to use a kafka sink for the > >>> keyed case stream, and to do the trace aggregations in a downstream > >>> flink job with this kafka topic as a source. However, I have some > >>> concerns over losing any data (i.e. how do we know whether or not an > >>> event has been successfully pushed into the kafka stream). > >>> > >>> > >>> 2) Another approach might be to use some other type of sink (perhaps > >>> postgres), and to use this as a source for the traces job. This would > >>> help us guarantee data consistency. > >>> > >>> > >>> 3) Or, to somehow re-merge the keyed cases stream (is this a > broadcast?), > >>> so: > >>> > >>> Keyed cases stream -> broadcast -> key by tracehash with rocks db > >>> state backend holding trace for that tracehash -> perform > >>> aggregrations -> postgres sink > >>> > >>> Is broadcast an option here? How costly is it? > >>> > >>> > >>> Which of these approaches (or any other), would you recommend? > >>> > >>> > >>> ------------------------------------- > >>> > >>> Another question regarding the state: > >>> > >>> As we never know when a case is complete this means that the rocksdb > >>> backend could grow infinitely (!). Obviously we would need to get a > >>> bit smarter here. > >>> > >>> > >>> Is using state in this way a somewhat standard practice, or is state > >>> intended more for recovery? > >>> > >>> Managing growing state: I found some discussion regarding how to clear > >>> state here > >>> http://apache-flink-user-mailing-list-archive.2336050. > n4.nabble.com/Time-To-Live-Setting-for-State-StateDescriptor-td10391.html# > a10402 > >>> which references https://issues.apache.org/jira/browse/FLINK-3946 > >>> > >>> Thanks, > >>> > >>> Kat > >> > >> > > > > > > -- > _______________________________________________________ > > Besuchen Sie uns / Meet us: > OPEX-Woche DACH: January 17-19, Wiesbaden, Germany > OPEX Week World Summit: January 23-27, Orlando, USA > CIO EDGE Experience: Febuary 22-23, Melbourne, Australia > Business Transformation and Operational Excellence World Summit: March > 21-24, Orlando, USA > > Folgen Sie uns / Follow us: > Twitter | Facebook | LinkedIn | Xing | Youtube > ________________________________________________________ > > > HRB 121584 B Amtsgericht Charlottenburg, Ust-ID: DE265675123 > Geschäftsführer: Dr. Gero Decker, Guido Sachs >