Thanks Nico. As there are 2 ways to achieve this which is better ?
1st option -> dataStream.flatMap( ... ) -> this takes in out and provides me N number of outputs, depending on my key combination . On each of the output the same windowing logic is applied or the one you suggested 2nd option -> use keyBy to create N number of streams With the fist option I would use an external config, and it allows me to change the number of combinations dynamically at runtime. Would it be possible with 2nd option as well ? Can I modify or add data stream at runtime without restarting ? On Wed, Aug 16, 2017 at 4:37 AM, Nico Kruber <n...@data-artisans.com> wrote: > [back to the ml...] > > also including your other mail's additional content... > > I have been able to do this by the following and repeating this for every > > key + window combination. So in the above case there would be 8 blocks > like > > below. (4 combinations and 2 window period for each combination) > > > > modelDataStream.keyBy("campaiginId","addId") > > .timeWindow(Time.minutes(1)) > > .trigger(CountTrigger.of(2)) > > .reduce(..) > > As mentioned in my last email, I only see one way for reducing duplication > (for the key combinations) but this involves more handling from your side > and > I'd probably not recommend this. Regarding the different windows, I do not > see > something you may do otherwise here. > > Maybe Aljoscha (cc'd) has an idea of how to do this better > > > Nico > > On Monday, 14 August 2017 19:08:29 CEST Basanth Gowda wrote: > > Hi Nico, > > Thank you . This is pretty much what I am doing , was wondering if there > is > > a better way. > > > > If there are 10 dimensions on which I want to aggregate with 2 windows - > > this would become about 20 different combinations > > > > Thank you > > Basanth > > > > On Mon, Aug 14, 2017 at 12:50 PM Nico Kruber <n...@data-artisans.com> > wrote: > > > Hi Basanth, > > > Let's assume you have records of the form > > > Record = {timestamp, country, state, city, value} > > > Then you'd like to create aggregates, e.g. the average, for the > following > > > combinations? > > > 1) avg per country > > > 2) avg per state and country > > > 3) avg per city and state and country > > > > > > * You could create three streams and aggregate each individually: > > > DataStream<Record> ds = //... > > > DataStream<Record> ds1 = ds.keyBy("country"); > > > DataStream<Record> ds2 = ds.keyBy("country","state"); > > > DataStream<Record> ds3 = ds.keyBy("country","state","city"); > > > // + your aggregation per stream ds1, ds2, ds3 > > > > > > You probably want to do different things for each of the resulting > > > aggregations anyway, so having separate streams is probably right for > you. > > > > > > * Alternatively, you could go with ds1 only and create the aggregates > of > > > the > > > per-state (2) and per-city (3) ones in a stateful aggregation function > > > yourself, e.g. in a MapState [1]. At the end of your aggregation > window, > > > you > > > could then emit those with different keys to be able to distinguish > > > between > > > them. > > > > > > > > > Nico > > > > > > [1] > > > https://ci.apache.org/projects/flink/flink-docs- > release-1.3/dev/stream/ > > > state.html > > > <https://ci.apache.org/projects/flink/flink-docs- > release-1.3/dev/stream/st > > > ate.html>> > > > On Sunday, 13 August 2017 23:13:00 CEST Basanth Gowda wrote: > > > > For example - this is a sample model from one of the Apache Apex > > > > presentation. > > > > > > > > I would want to aggregate for different combinations, and different > time > > > > buckets. What is the best way to do this in Flink ? > > > > > > > > {"keys":[{"name":"campaignId","type":"integer"}, > > > > > > > > {"name":"adId","type":"integer"}, > > > > {"name":"creativeId","type":"integer"}, > > > > {"name":"publisherId","type":"integer"}, > > > > {"name":"adOrderId","type":"integer"}], > > > > "timeBuckets":["1h","1d"], > > > > > > > > "values": > > > > [{"name":"impressions","type":"integer","aggregators":["SUM"]} > > > > , > > > > > > > > {"name":"clicks","type":"integer","aggregators":["SUM"]}, > > > > {"name":"revenue","type":"integer"}], > > > > "dimensions": > > > > [{"combination":["campaignId","adId"]}, > > > > {"combination":["creativeId","campaignId"]}, > > > > {"combination":["campaignId"]}, > > > > {"combination":["publisherId","adOrderId","campaignId"], > > > > > > > > "additionalValues":["revenue:SUM"]}] > > > > } > > > > > > > > > > > > thank you, > > > > B > > > > > > > > On Sun, Aug 13, 2017 at 2:09 PM, Basanth Gowda < > basanth.go...@gmail.com> > > > > > > > > wrote: > > > > > Hi, > > > > > I want to aggregate hits by Country, State, City. I would these as > > > > > > tags in > > > > > > > > my sample data. > > > > > > > > > > How would I do aggregation at different levels ? Input data would > be > > > > > single record > > > > > > > > > > Should I do flatMap transformation first and create 3 records from > 1 > > > > > > input > > > > > > > > record, or is there a better way to do it ? > > > > > > > > > > thank you, > > > > > basanth > >