Hi Jaffe, I am also working on something similar type of a problem.
I am receiving a set of events in Avro format on different topics. I want to consume these and write to s3 in parquet format. I have written a job that creates a different stream for each event and fetches its schema from the confluent schema registry to create a parquet sink for an event. This is working fine but the only problem I am facing is whenever a new event start coming or any change in the schema I have to change in the YAML config and restart the job every time. Is there any way I do not have to restart the job and it starts consuming a new set of events? As I see you are handling schema evolution can u help me with this and also how can I handle the new events. In the config, I am keeping a mapping of events and schema subjects. Please share how you solving this. So currently this is the way I am doing it but wanna know some better way to handle it. FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(topics, new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), properties); DataStream<GenericRecord> dataStream = streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source"); try { for (EventConfig eventConfig : eventTypesList) { LOG.info("creating a stream for ", eventConfig.getEvent_name()); final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat (path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(), registryClient))) .withBucketAssigner(new EventTimeBucketAssigner()) .build(); DataStream<GenericRecord> outStream = dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> { if (genericRecord != null && genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) { return true; } return false; }); outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism); } } catch (Exception e) { e.printStackTrace(); } On Wed, Oct 14, 2020, 23:12 Jaffe, Julian <julianja...@activision.com> wrote: > Thanks for the suggestion Piotr! > > > > The problem is that the sink needs to have access to the schema (so that > it can write the schema only once per file instead of record) and thus > needs to know when the schema has been updated. In this proposed > architecture, I think the sink would still need to check each record to see > if the current schema matches the new record or not? The main problem I > encountered when playing around with broadcast state was that I couldn’t > figure out how to access the broadcast state within the sink, but perhaps I > just haven’t thought about it the right way. I’ll meditate on the docs > further 🙂 > > > > Julian > > > > *From: *Piotr Nowojski <pnowoj...@apache.org> > *Date: *Wednesday, October 14, 2020 at 6:35 AM > *To: *"Jaffe, Julian" <julianja...@activision.com> > *Cc: *"user@flink.apache.org" <user@flink.apache.org> > *Subject: *Re: Broadcasting control messages to a sink > > > > Hi Julian, > > > > Have you seen Broadcast State [1]? I have never used it personally, but it > sounds like something you want. Maybe your job should look like: > > > > 1. read raw messages from Kafka, without using the schema > > 2. read schema changes and broadcast them to 3. and 5. > > 3. deserialize kafka records in BroadcastProcessFunction by using combined > 1. and 2. > > 4. do your logic o > > 5. serialize records using schema in another BroadcastProcessFunction by > using combined 4. and 2. > > 6. write raw records using BucketingSink > > ? > > > > Best, > > Piotrek > > > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html > <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_stream_state_broadcast-5Fstate.html&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=0fL33mv_n-SUiL8AARIrGXmY1d8pdhu4ivDeRjg5f84&s=RjsXnxEVCBz2BGLxe89FU_SpbtfTlRkjsT5J-gbvqFI&e=> > > > > śr., 14 paź 2020 o 11:01 Jaffe, Julian <julianja...@activision.com> > napisał(a): > > Hey all, > > > > I’m building a Flink app that pulls in messages from a Kafka topic and > writes them out to disk using a custom bucketed sink. Each message needs to > be parsed using a schema that is also needed when writing in the sink. This > schema is read from a remote file on a distributed file system (it could > also be fetched from a service). The schema will be updated very > infrequently. > > > > In order to support schema evolution, I have created a custom source that > occasionally polls for updates and if it finds one parses the new schema > and sends a message containing the serialized schema. I’ve connected these > two streams and then use a RichCoFlatMapFunction to flatten them back into > a single output stream (schema events get used to update the parser, > messages get parsed using the parser and emitted). > > > > However, I need some way to communicate the updated schema to every task > of the sink. Simply emitting a control message that is ignored when writing > to disk means that only one sink partition will receive the message and > thus update the schema. I thought about sending the control message as side > output and then broadcasting the resulting stream to the sink alongside the > processed event input but I couldn’t figure out a way to do so. For now, > I’m bundling the schema used to parse each event with the event, storing > the schema in the sink, and then checking every event’s schema against the > stored schema but this is fairly inefficient. Also, I’d like to eventually > increase the types of control messages I can send to the sink, some of > which may not be idempotent. Is there a better way to handle this pattern? > > > (Bonus question: ideally, I’d like to be able to perform an action when > all sink partitions have picked up the new schema. I’m not aware of any way > to emit metadata of this sort from Flink tasks beyond abusing the metrics > system. This approach still leaves open the possibility of tasks picking up > the new schema and then crashing for unrelated reasons thus inflating the > count of tasks using a specific schema and moreover requires tracking at > least the current level of parallelism and probably also Flink task state > outside of Flink. Are there any patterns for reporting metadata like this > to the job manager?) > > > > I’m using Flink 1.8. > >