Hi Team,

I have a requirement where I want MQTT broker to validate the records as
per the schema registered in the registry and if it matches then write to
the kafka topic or park it in a separate kafka topic.

So I was wondering if I can do something like this part of the code with
any of the MQTT brokers like EMQ, Mosquitto and Waterstream.

schema_registry_conf = {'url': args.schema_registry}
    schema_registry_client = SchemaRegistryClient(schema_registry_conf)

    string_serializer = StringSerializer('utf8')
    protobuf_serializer = ProtobufSerializer(user_pb2.User,
                                             schema_registry_client,
                                             {'use.deprecated.format':
False})

    producer_conf = {'bootstrap.servers': args.bootstrap_servers}

    producer = Producer(producer_conf)

producer.produce(topic=topic, partition=0,
                             key=string_serializer(str(uuid4())),
                             value=protobuf_serializer(user,
SerializationContext(topic, MessageField.VALUE)),
                             on_delivery=delivery_report)

I didn't get any such relevant information w.r.t MQTT broker so any hints
would also help.

Thanks

Reply via email to