Hi Julian, Glad to hear it worked! And thanks for coming back to us :)
Best, Piotrek sob., 17 paź 2020 o 04:22 Jaffe, Julian <julianja...@activision.com> napisał(a): > Hey Piotr, > > > > Thanks for your help! The main thing I was missing was the .broadcast > partition operation on a stream (searching for “broadcasting” obviously > brought up the broadcast state pattern). This coupled with my > misunderstanding of an error in my code as being an error in Flink code > resulted in me making this a much harder problem than it needed to be. > > > > For anyone who may find this in the future, Piotr’s suggestion is pretty > spot-on. I wound up broadcasting (as in the partitioning strategy) my > schema stream and connecting it to my event stream. I then processed those > using a CoProcessFunction, using the schema messages to update the parsing > for the events. I also emitted a side output message when I processed a new > schema, using the same type as my main output messages. I once again > broadcast-as-in-partitioning the side output stream, unioned it with my > processed output from the CoProcessFunction and passed it to my sink, > making sure to handle control messages before attempting to do any > bucketing. > > > > In poor ASCII art, it looks something like the below: > > > > > > _______________ ____________ > > | Schema Source | | Event Source | > > ----------------------- ------------------- > > | | > > Broadcast | > > | __________ | > > ----- | Processor | ----------- > > | | ----------- Control > message side output > > --------------- | > > | | > > | Broadcast > > | | > > Union -------------- > > | > > _______ > > | Sink | > > ----------- > > > > I hope this is helpful to someone. > > > > Julian > > > > *From: *Piotr Nowojski <pnowoj...@apache.org> > *Date: *Wednesday, October 14, 2020 at 11:22 PM > *To: *"Jaffe, Julian" <julianja...@activision.com> > *Cc: *"user@flink.apache.org" <user@flink.apache.org> > *Subject: *Re: Broadcasting control messages to a sink > > > > Hi Julian, > > > > I think the problem is that BroadcastProcessFunction and SinkFunction will > be executed by separate operators, so they won't be able to share state. If > you can not split your logic into two, I think you will have to workaround > this problem differently. > > > > 1. Relay on operator chaining and wire both of them together. > > > > If you set up your BroadcastProcessFunction and SinkFunction one after > another, with the same parallelism, with the default chaining, without any > rebalance/keyBy in between, you can be sure they will be chained together. > So the output type of your record between BroadcastProcessFunction and > SinkFunction, can be a Union type, of a) your actual payload, b) > broadcasted message. Upon initialization/before processing first record, if > you have any broadcast state, you would need to forward it's content to the > downstream SinkFunction as well. > > > > 2. Another solution is that maybe you can try to embed SinkFunction inside > the BroadcastProcessFunction? This will require some careful proxying and > wrapping calls. > > 3. As always, you can also write a custom operator that will be doing the > same thing. > > > > For the 2. and 3. I'm not entirely sure if there are some gotchas that I > haven't thought through (state handling?), so if you can make 1. work for > you, it will probably be a safer route. > > > > Best, > > Piotrek > > > > > > > > > > śr., 14 paź 2020 o 19:42 Jaffe, Julian <julianja...@activision.com> > napisał(a): > > Thanks for the suggestion Piotr! > > > > The problem is that the sink needs to have access to the schema (so that > it can write the schema only once per file instead of record) and thus > needs to know when the schema has been updated. In this proposed > architecture, I think the sink would still need to check each record to see > if the current schema matches the new record or not? The main problem I > encountered when playing around with broadcast state was that I couldn’t > figure out how to access the broadcast state within the sink, but perhaps I > just haven’t thought about it the right way. I’ll meditate on the docs > further 🙂 > > > > Julian > > > > *From: *Piotr Nowojski <pnowoj...@apache.org> > *Date: *Wednesday, October 14, 2020 at 6:35 AM > *To: *"Jaffe, Julian" <julianja...@activision.com> > *Cc: *"user@flink.apache.org" <user@flink.apache.org> > *Subject: *Re: Broadcasting control messages to a sink > > > > Hi Julian, > > > > Have you seen Broadcast State [1]? I have never used it personally, but it > sounds like something you want. Maybe your job should look like: > > > > 1. read raw messages from Kafka, without using the schema > > 2. read schema changes and broadcast them to 3. and 5. > > 3. deserialize kafka records in BroadcastProcessFunction by using combined > 1. and 2. > > 4. do your logic o > > 5. serialize records using schema in another BroadcastProcessFunction by > using combined 4. and 2. > > 6. write raw records using BucketingSink > > ? > > > > Best, > > Piotrek > > > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html > <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_stream_state_broadcast-5Fstate.html&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=0fL33mv_n-SUiL8AARIrGXmY1d8pdhu4ivDeRjg5f84&s=RjsXnxEVCBz2BGLxe89FU_SpbtfTlRkjsT5J-gbvqFI&e=> > > > > śr., 14 paź 2020 o 11:01 Jaffe, Julian <julianja...@activision.com> > napisał(a): > > Hey all, > > > > I’m building a Flink app that pulls in messages from a Kafka topic and > writes them out to disk using a custom bucketed sink. Each message needs to > be parsed using a schema that is also needed when writing in the sink. This > schema is read from a remote file on a distributed file system (it could > also be fetched from a service). The schema will be updated very > infrequently. > > > > In order to support schema evolution, I have created a custom source that > occasionally polls for updates and if it finds one parses the new schema > and sends a message containing the serialized schema. I’ve connected these > two streams and then use a RichCoFlatMapFunction to flatten them back into > a single output stream (schema events get used to update the parser, > messages get parsed using the parser and emitted). > > > > However, I need some way to communicate the updated schema to every task > of the sink. Simply emitting a control message that is ignored when writing > to disk means that only one sink partition will receive the message and > thus update the schema. I thought about sending the control message as side > output and then broadcasting the resulting stream to the sink alongside the > processed event input but I couldn’t figure out a way to do so. For now, > I’m bundling the schema used to parse each event with the event, storing > the schema in the sink, and then checking every event’s schema against the > stored schema but this is fairly inefficient. Also, I’d like to eventually > increase the types of control messages I can send to the sink, some of > which may not be idempotent. Is there a better way to handle this pattern? > > > (Bonus question: ideally, I’d like to be able to perform an action when > all sink partitions have picked up the new schema. I’m not aware of any way > to emit metadata of this sort from Flink tasks beyond abusing the metrics > system. This approach still leaves open the possibility of tasks picking up > the new schema and then crashing for unrelated reasons thus inflating the > count of tasks using a specific schema and moreover requires tracking at > least the current level of parallelism and probably also Flink task state > outside of Flink. Are there any patterns for reporting metadata like this > to the job manager?) > > > > I’m using Flink 1.8. > >