Hi Julian,

Glad to hear it worked! And thanks for coming back to us :)

Best,
Piotrek

sob., 17 paź 2020 o 04:22 Jaffe, Julian <julianja...@activision.com>
napisał(a):

> Hey Piotr,
>
>
>
> Thanks for your help! The main thing I was missing was the .broadcast
> partition operation on a stream (searching for “broadcasting” obviously
> brought up the broadcast state pattern). This coupled with my
> misunderstanding of an error in my code as being an error in Flink code
> resulted in me making this a much harder problem than it needed to be.
>
>
>
> For anyone who may find this in the future, Piotr’s suggestion is pretty
> spot-on. I wound up broadcasting (as in the partitioning strategy) my
> schema stream and connecting it to my event stream. I then processed those
> using a CoProcessFunction, using the schema messages to update the parsing
> for the events. I also emitted a side output message when I processed a new
> schema, using the same type as my main output messages. I once again
> broadcast-as-in-partitioning the side output stream, unioned it with my
> processed output from the CoProcessFunction and passed it to my sink,
> making sure to handle control messages before attempting to do any
> bucketing.
>
>
>
> In poor ASCII art, it looks something like the below:
>
>
>
>
>
> _______________                       ____________
>
> | Schema Source |                | Event Source |
>
> -----------------------                  -------------------
>
>               |                                         |
>
>        Broadcast                                 |
>
>               |        __________               |
>
>                ----- | Processor | -----------
>
>                       |                  | -----------        Control
> message side output
>
>                        ---------------               |
>
>                                  |                      |
>
>                                  |               Broadcast
>
>                                  |                      |
>
>                             Union  --------------
>
>                                  |
>
>                           _______
>
>                          |   Sink   |
>
>                           -----------
>
>
>
> I hope this is helpful to someone.
>
>
>
> Julian
>
>
>
> *From: *Piotr Nowojski <pnowoj...@apache.org>
> *Date: *Wednesday, October 14, 2020 at 11:22 PM
> *To: *"Jaffe, Julian" <julianja...@activision.com>
> *Cc: *"user@flink.apache.org" <user@flink.apache.org>
> *Subject: *Re: Broadcasting control messages to a sink
>
>
>
> Hi Julian,
>
>
>
> I think the problem is that BroadcastProcessFunction and SinkFunction will
> be executed by separate operators, so they won't be able to share state. If
> you can not split your logic into two, I think you will have to workaround
> this problem differently.
>
>
>
> 1. Relay on operator chaining and wire both of them together.
>
>
>
> If you set up your BroadcastProcessFunction and SinkFunction one after
> another, with the same parallelism, with the default chaining, without any
> rebalance/keyBy in between, you can be sure they will be chained together.
> So the output type of your record between BroadcastProcessFunction and
> SinkFunction, can be a Union type, of a) your actual payload, b)
> broadcasted message. Upon initialization/before processing first record, if
> you have any broadcast state, you would need to forward it's content to the
> downstream SinkFunction as well.
>
>
>
> 2. Another solution is that maybe you can try to embed SinkFunction inside
> the BroadcastProcessFunction? This will require some careful proxying and
> wrapping calls.
>
> 3. As always, you can also write a custom operator that will be doing the
> same thing.
>
>
>
> For the 2. and 3. I'm not entirely sure if there are some gotchas that I
> haven't thought through (state handling?), so if you can make 1. work for
> you, it will probably be a safer route.
>
>
>
> Best,
>
> Piotrek
>
>
>
>
>
>
>
>
>
> śr., 14 paź 2020 o 19:42 Jaffe, Julian <julianja...@activision.com>
> napisał(a):
>
> Thanks for the suggestion Piotr!
>
>
>
> The problem is that the sink needs to have access to the schema (so that
> it can write the schema only once per file instead of record) and thus
> needs to know when the schema has been updated. In this proposed
> architecture, I think the sink would still need to check each record to see
> if the current schema matches the new record or not? The main problem I
> encountered when playing around with broadcast state was that I couldn’t
> figure out how to access the broadcast state within the sink, but perhaps I
> just haven’t thought about it the right way. I’ll meditate on the docs
> further  🙂
>
>
>
> Julian
>
>
>
> *From: *Piotr Nowojski <pnowoj...@apache.org>
> *Date: *Wednesday, October 14, 2020 at 6:35 AM
> *To: *"Jaffe, Julian" <julianja...@activision.com>
> *Cc: *"user@flink.apache.org" <user@flink.apache.org>
> *Subject: *Re: Broadcasting control messages to a sink
>
>
>
> Hi Julian,
>
>
>
> Have you seen Broadcast State [1]? I have never used it personally, but it
> sounds like something you want. Maybe your job should look like:
>
>
>
> 1. read raw messages from Kafka, without using the schema
>
> 2. read schema changes and broadcast them to 3. and 5.
>
> 3. deserialize kafka records in BroadcastProcessFunction by using combined
> 1. and 2.
>
> 4. do your logic o
>
> 5. serialize records using schema in another BroadcastProcessFunction by
> using combined 4. and 2.
>
> 6. write raw records using BucketingSink
>
> ?
>
>
>
> Best,
>
> Piotrek
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_stream_state_broadcast-5Fstate.html&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=0fL33mv_n-SUiL8AARIrGXmY1d8pdhu4ivDeRjg5f84&s=RjsXnxEVCBz2BGLxe89FU_SpbtfTlRkjsT5J-gbvqFI&e=>
>
>
>
> śr., 14 paź 2020 o 11:01 Jaffe, Julian <julianja...@activision.com>
> napisał(a):
>
> Hey all,
>
>
>
> I’m building a Flink app that pulls in messages from a Kafka topic and
> writes them out to disk using a custom bucketed sink. Each message needs to
> be parsed using a schema that is also needed when writing in the sink. This
> schema is read from a remote file on a distributed file system (it could
> also be fetched from a service). The schema will be updated very
> infrequently.
>
>
>
> In order to support schema evolution, I have created a custom source that
> occasionally polls for updates and if it finds one parses the new schema
> and sends a message containing the serialized schema. I’ve connected these
> two streams and then use a RichCoFlatMapFunction to flatten them back into
> a single output stream (schema events get used to update the parser,
> messages get parsed using the parser and emitted).
>
>
>
> However, I need some way to communicate the updated schema to every task
> of the sink. Simply emitting a control message that is ignored when writing
> to disk means that only one sink partition will receive the message and
> thus update the schema. I thought about sending the control message as side
> output and then broadcasting the resulting stream to the sink alongside the
> processed event input but I couldn’t figure out a way to do so. For now,
> I’m bundling the schema used to parse each event with the event, storing
> the schema in the sink, and then checking every event’s schema against the
> stored schema but this is fairly inefficient. Also, I’d like to eventually
> increase the types of control messages I can send to the sink, some of
> which may not be idempotent. Is there a better way to handle this pattern?
>
>
> (Bonus question: ideally, I’d like to be able to perform an action when
> all sink partitions have picked up the new schema. I’m not aware of any way
> to emit metadata of this sort from Flink tasks beyond abusing the metrics
> system. This approach still leaves open the possibility of tasks picking up
> the new schema and then crashing for unrelated reasons thus inflating the
> count of tasks using a specific schema and moreover requires tracking at
> least the current level of parallelism and probably also Flink task state
> outside of Flink. Are there any patterns for reporting metadata like this
> to the job manager?)
>
>
>
> I’m using Flink 1.8.
>
>

Reply via email to