Hi Jafee, Can u please help me out with the sample code how you have written the custom sink and how you using this broadcast pattern to update schema at run time. It will help me.
On Sat, Oct 17, 2020 at 1:55 PM Piotr Nowojski <pnowoj...@apache.org> wrote: > Hi Julian, > > Glad to hear it worked! And thanks for coming back to us :) > > Best, > Piotrek > > sob., 17 paź 2020 o 04:22 Jaffe, Julian <julianja...@activision.com> > napisał(a): > >> Hey Piotr, >> >> >> >> Thanks for your help! The main thing I was missing was the .broadcast >> partition operation on a stream (searching for “broadcasting” obviously >> brought up the broadcast state pattern). This coupled with my >> misunderstanding of an error in my code as being an error in Flink code >> resulted in me making this a much harder problem than it needed to be. >> >> >> >> For anyone who may find this in the future, Piotr’s suggestion is pretty >> spot-on. I wound up broadcasting (as in the partitioning strategy) my >> schema stream and connecting it to my event stream. I then processed those >> using a CoProcessFunction, using the schema messages to update the parsing >> for the events. I also emitted a side output message when I processed a new >> schema, using the same type as my main output messages. I once again >> broadcast-as-in-partitioning the side output stream, unioned it with my >> processed output from the CoProcessFunction and passed it to my sink, >> making sure to handle control messages before attempting to do any >> bucketing. >> >> >> >> In poor ASCII art, it looks something like the below: >> >> >> >> >> >> _______________ ____________ >> >> | Schema Source | | Event Source | >> >> ----------------------- ------------------- >> >> | | >> >> Broadcast | >> >> | __________ | >> >> ----- | Processor | ----------- >> >> | | ----------- Control >> message side output >> >> --------------- | >> >> | | >> >> | Broadcast >> >> | | >> >> Union -------------- >> >> | >> >> _______ >> >> | Sink | >> >> ----------- >> >> >> >> I hope this is helpful to someone. >> >> >> >> Julian >> >> >> >> *From: *Piotr Nowojski <pnowoj...@apache.org> >> *Date: *Wednesday, October 14, 2020 at 11:22 PM >> *To: *"Jaffe, Julian" <julianja...@activision.com> >> *Cc: *"user@flink.apache.org" <user@flink.apache.org> >> *Subject: *Re: Broadcasting control messages to a sink >> >> >> >> Hi Julian, >> >> >> >> I think the problem is that BroadcastProcessFunction and SinkFunction >> will be executed by separate operators, so they won't be able to share >> state. If you can not split your logic into two, I think you will have to >> workaround this problem differently. >> >> >> >> 1. Relay on operator chaining and wire both of them together. >> >> >> >> If you set up your BroadcastProcessFunction and SinkFunction one after >> another, with the same parallelism, with the default chaining, without any >> rebalance/keyBy in between, you can be sure they will be chained together. >> So the output type of your record between BroadcastProcessFunction and >> SinkFunction, can be a Union type, of a) your actual payload, b) >> broadcasted message. Upon initialization/before processing first record, if >> you have any broadcast state, you would need to forward it's content to the >> downstream SinkFunction as well. >> >> >> >> 2. Another solution is that maybe you can try to embed SinkFunction >> inside the BroadcastProcessFunction? This will require some >> careful proxying and wrapping calls. >> >> 3. As always, you can also write a custom operator that will be doing the >> same thing. >> >> >> >> For the 2. and 3. I'm not entirely sure if there are some gotchas that I >> haven't thought through (state handling?), so if you can make 1. work for >> you, it will probably be a safer route. >> >> >> >> Best, >> >> Piotrek >> >> >> >> >> >> >> >> >> >> śr., 14 paź 2020 o 19:42 Jaffe, Julian <julianja...@activision.com> >> napisał(a): >> >> Thanks for the suggestion Piotr! >> >> >> >> The problem is that the sink needs to have access to the schema (so that >> it can write the schema only once per file instead of record) and thus >> needs to know when the schema has been updated. In this proposed >> architecture, I think the sink would still need to check each record to see >> if the current schema matches the new record or not? The main problem I >> encountered when playing around with broadcast state was that I couldn’t >> figure out how to access the broadcast state within the sink, but perhaps I >> just haven’t thought about it the right way. I’ll meditate on the docs >> further 🙂 >> >> >> >> Julian >> >> >> >> *From: *Piotr Nowojski <pnowoj...@apache.org> >> *Date: *Wednesday, October 14, 2020 at 6:35 AM >> *To: *"Jaffe, Julian" <julianja...@activision.com> >> *Cc: *"user@flink.apache.org" <user@flink.apache.org> >> *Subject: *Re: Broadcasting control messages to a sink >> >> >> >> Hi Julian, >> >> >> >> Have you seen Broadcast State [1]? I have never used it personally, but >> it sounds like something you want. Maybe your job should look like: >> >> >> >> 1. read raw messages from Kafka, without using the schema >> >> 2. read schema changes and broadcast them to 3. and 5. >> >> 3. deserialize kafka records in BroadcastProcessFunction by using >> combined 1. and 2. >> >> 4. do your logic o >> >> 5. serialize records using schema in another BroadcastProcessFunction by >> using combined 4. and 2. >> >> 6. write raw records using BucketingSink >> >> ? >> >> >> >> Best, >> >> Piotrek >> >> >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html >> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_stream_state_broadcast-5Fstate.html&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=0fL33mv_n-SUiL8AARIrGXmY1d8pdhu4ivDeRjg5f84&s=RjsXnxEVCBz2BGLxe89FU_SpbtfTlRkjsT5J-gbvqFI&e=> >> >> >> >> śr., 14 paź 2020 o 11:01 Jaffe, Julian <julianja...@activision.com> >> napisał(a): >> >> Hey all, >> >> >> >> I’m building a Flink app that pulls in messages from a Kafka topic and >> writes them out to disk using a custom bucketed sink. Each message needs to >> be parsed using a schema that is also needed when writing in the sink. This >> schema is read from a remote file on a distributed file system (it could >> also be fetched from a service). The schema will be updated very >> infrequently. >> >> >> >> In order to support schema evolution, I have created a custom source that >> occasionally polls for updates and if it finds one parses the new schema >> and sends a message containing the serialized schema. I’ve connected these >> two streams and then use a RichCoFlatMapFunction to flatten them back into >> a single output stream (schema events get used to update the parser, >> messages get parsed using the parser and emitted). >> >> >> >> However, I need some way to communicate the updated schema to every task >> of the sink. Simply emitting a control message that is ignored when writing >> to disk means that only one sink partition will receive the message and >> thus update the schema. I thought about sending the control message as side >> output and then broadcasting the resulting stream to the sink alongside the >> processed event input but I couldn’t figure out a way to do so. For now, >> I’m bundling the schema used to parse each event with the event, storing >> the schema in the sink, and then checking every event’s schema against the >> stored schema but this is fairly inefficient. Also, I’d like to eventually >> increase the types of control messages I can send to the sink, some of >> which may not be idempotent. Is there a better way to handle this pattern? >> >> >> (Bonus question: ideally, I’d like to be able to perform an action when >> all sink partitions have picked up the new schema. I’m not aware of any way >> to emit metadata of this sort from Flink tasks beyond abusing the metrics >> system. This approach still leaves open the possibility of tasks picking up >> the new schema and then crashing for unrelated reasons thus inflating the >> count of tasks using a specific schema and moreover requires tracking at >> least the current level of parallelism and probably also Flink task state >> outside of Flink. Are there any patterns for reporting metadata like this >> to the job manager?) >> >> >> >> I’m using Flink 1.8. >> >> -- Thanks & Regards, Anuj Jain Mob. : +91- 8588817877 Skype : anuj.jain07 <http://www.oracle.com/> <http://www.cse.iitm.ac.in/%7Eanujjain/>