Hi Jaffe,

I am also working on something similar type of a problem.

I am receiving a set of events in Avro format on different topics. I want
to consume these and write to s3 in parquet format.
I have written a  job that creates a different stream for each event and
fetches its schema from the confluent schema registry to create a
parquet sink for an event.
This is working fine but the only problem I am facing is whenever a new
event start coming or any change in the schema  I have to change in the
YAML config and restart the job every time. Is there any way I do not have
to restart the job and it starts consuming a new set of events?

As I see you are handling schema evolution can u help me with this and also
how can I handle the new events.  In the config, I am keeping a mapping of
events and schema subjects.  Please share how you solving this.


So currently this is the way I am doing it but wanna know some better way
to handle it.

FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(topics,
        new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
        properties);

        DataStream<GenericRecord> dataStream =
streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source");

        try {
        for (EventConfig eventConfig : eventTypesList) {

        LOG.info("creating a stream for ", eventConfig.getEvent_name());

final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
        (path, 
ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(),
registryClient)))
        .withBucketAssigner(new EventTimeBucketAssigner())
        .build();

        DataStream<GenericRecord> outStream =
dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
        if (genericRecord != null &&
genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name()))
{
        return true;
        }
        return false;
        });
        
outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism);

        }
        } catch (Exception e) {
        e.printStackTrace();
        }


On Wed, Oct 14, 2020, 23:12 Jaffe, Julian <julianja...@activision.com>
wrote:

> Thanks for the suggestion Piotr!
>
>
>
> The problem is that the sink needs to have access to the schema (so that
> it can write the schema only once per file instead of record) and thus
> needs to know when the schema has been updated. In this proposed
> architecture, I think the sink would still need to check each record to see
> if the current schema matches the new record or not? The main problem I
> encountered when playing around with broadcast state was that I couldn’t
> figure out how to access the broadcast state within the sink, but perhaps I
> just haven’t thought about it the right way. I’ll meditate on the docs
> further  🙂
>
>
>
> Julian
>
>
>
> *From: *Piotr Nowojski <pnowoj...@apache.org>
> *Date: *Wednesday, October 14, 2020 at 6:35 AM
> *To: *"Jaffe, Julian" <julianja...@activision.com>
> *Cc: *"user@flink.apache.org" <user@flink.apache.org>
> *Subject: *Re: Broadcasting control messages to a sink
>
>
>
> Hi Julian,
>
>
>
> Have you seen Broadcast State [1]? I have never used it personally, but it
> sounds like something you want. Maybe your job should look like:
>
>
>
> 1. read raw messages from Kafka, without using the schema
>
> 2. read schema changes and broadcast them to 3. and 5.
>
> 3. deserialize kafka records in BroadcastProcessFunction by using combined
> 1. and 2.
>
> 4. do your logic o
>
> 5. serialize records using schema in another BroadcastProcessFunction by
> using combined 4. and 2.
>
> 6. write raw records using BucketingSink
>
> ?
>
>
>
> Best,
>
> Piotrek
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_stream_state_broadcast-5Fstate.html&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=0fL33mv_n-SUiL8AARIrGXmY1d8pdhu4ivDeRjg5f84&s=RjsXnxEVCBz2BGLxe89FU_SpbtfTlRkjsT5J-gbvqFI&e=>
>
>
>
> śr., 14 paź 2020 o 11:01 Jaffe, Julian <julianja...@activision.com>
> napisał(a):
>
> Hey all,
>
>
>
> I’m building a Flink app that pulls in messages from a Kafka topic and
> writes them out to disk using a custom bucketed sink. Each message needs to
> be parsed using a schema that is also needed when writing in the sink. This
> schema is read from a remote file on a distributed file system (it could
> also be fetched from a service). The schema will be updated very
> infrequently.
>
>
>
> In order to support schema evolution, I have created a custom source that
> occasionally polls for updates and if it finds one parses the new schema
> and sends a message containing the serialized schema. I’ve connected these
> two streams and then use a RichCoFlatMapFunction to flatten them back into
> a single output stream (schema events get used to update the parser,
> messages get parsed using the parser and emitted).
>
>
>
> However, I need some way to communicate the updated schema to every task
> of the sink. Simply emitting a control message that is ignored when writing
> to disk means that only one sink partition will receive the message and
> thus update the schema. I thought about sending the control message as side
> output and then broadcasting the resulting stream to the sink alongside the
> processed event input but I couldn’t figure out a way to do so. For now,
> I’m bundling the schema used to parse each event with the event, storing
> the schema in the sink, and then checking every event’s schema against the
> stored schema but this is fairly inefficient. Also, I’d like to eventually
> increase the types of control messages I can send to the sink, some of
> which may not be idempotent. Is there a better way to handle this pattern?
>
>
> (Bonus question: ideally, I’d like to be able to perform an action when
> all sink partitions have picked up the new schema. I’m not aware of any way
> to emit metadata of this sort from Flink tasks beyond abusing the metrics
> system. This approach still leaves open the possibility of tasks picking up
> the new schema and then crashing for unrelated reasons thus inflating the
> count of tasks using a specific schema and moreover requires tracking at
> least the current level of parallelism and probably also Flink task state
> outside of Flink. Are there any patterns for reporting metadata like this
> to the job manager?)
>
>
>
> I’m using Flink 1.8.
>
>

Reply via email to