Hi Daniel, The answer to you original question is you can just keyBy[1] by e.g. the machineId and then computations on KeyedStream are applied independently for each key.
Best, Dawid [1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/#datastream-transformations On 31/01/2019 12:55, Daniel Krenn wrote: > I don't get what happened here. Did Selvaraj just hijack this > question? Or what is going on? > > Am Di., 29. Jan. 2019 um 17:01 Uhr schrieb Selvaraj chennappan > <selvarajchennap...@gmail.com <mailto:selvarajchennap...@gmail.com>>: > > I think there is misunderstanding . I want to compare raw json and > transformed record . > Hence I need two consumer and merge the stream for comparison. > I have pipeline defined . pipeline does source(kafka) > ,transformation,dedup and persisting to DB . > image.png > > Before reaching to DB task lots of transformation is applied on > the pipeline Therefore want to validate the record with raw json > message which is available in kafka with the transformed record. > > Hence I want to know How to do that in flink. > > > On Tue, Jan 29, 2019 at 8:54 PM Puneet Kinra > <puneet.ki...@customercentria.com > <mailto:puneet.ki...@customercentria.com>> wrote: > > Hi Selvaraj > > In your pojo add data member as status or something like > that,now set it error in case it is invaild .pass the output > of flatmap > to split opertor there you can split the stream > > On Tue, Jan 29, 2019 at 6:39 PM Selvaraj chennappan > <selvarajchennap...@gmail.com > <mailto:selvarajchennap...@gmail.com>> wrote: > > UseCase:- We have kafka consumer to read messages(json ) > then it applies to flatmap for transformation based on > the rules ( rules are complex ) and convert it to pojo . > We want to verify the record(pojo) is valid by checking > field by field of that record .if record is invalid due to > transformation rules then move to error topic otherwise > send to DB. > > I thought of Implementing like adding another consumer to > read json message and compare json message attributes > with transformed record attributes . > > Hence I need to join/coprocess these two streams to > validate then decide whether persist to db or sending to > error topic. > > Please let me know if you need more information. > > On Tue, Jan 29, 2019 at 6:21 PM miki haiat > <miko5...@gmail.com <mailto:miko5...@gmail.com>> wrote: > > Im not sure if i got your question correctly, can you > elaborate more on your use case > > > > -- > > > > > > Regards, > Selvaraj C > > > > -- > *Cheers * > * > * > *Puneet Kinra* > * > * > > *Mobile:+918800167808 | Skype : > puneet.ki...@customercentria.com > <mailto:puneet.ki...@customercentria.com>* > > *e-mail :puneet.ki...@customercentria.com > <mailto:puneet.ki...@customercentria.com>* > > > > > -- > > > > > > Regards, > Selvaraj C >
signature.asc
Description: OpenPGP digital signature