Peter Otten, Cameron Simpson, thank you for your detailed replies :-) I confess, I didn't quite understand all you were saying. (Still only an intermediate-level programmer.) But Cameron what you said questioning my use of decorators and maybe a class instead got me thinking. I realized what I needed was a function within a function. Couldn't have gotten there without your help. Working code:
#!/usr/bin/env python3 from google.cloud import firestore import firebase_admin from firebase_admin import credentials import json import mqtt from time import sleep def bridge(col_name): def on_snapshot(col_snapshot, changes, read_time): data = dict() for doc in col_snapshot: serial = doc.id data[serial] = json.dumps(doc.to_dict()['value']) for change in changes: serial = change.document.id mqtt_topic = col_name + '/outgoing/' + serial if change.type.name in ['ADDED', 'MODIFIED']: contents = data[serial] mqtt.publish(mqtt_topic, contents) elif change.type.name == 'REMOVED': mqtt.publish(mqtt_topic, '') @mqtt.incoming def mqtt_subscribe(serial, value): # TODO Passing a None entry to delete from MQTT doesn't trigger this # callback, so it doesn't delete from Firestore. Need this ugly # workaround 'clear_mqtt'. if value == 'clear_mqtt': value = None mqtt.publish(col_name + '/incoming/' + serial, None) mqtt.publish(col_name + '/outgoing/' + serial, None) db.collection(col_name).document(serial).set({'value': value}) col_watch = db.collection(col_name).on_snapshot(on_snapshot) mqtt.subscribe(col_name + '/incoming/#', mqtt_subscribe) return col_watch cred = credentials.Certificate("certs/firebase.json") firebase_admin.initialize_app(cred) db = firestore.Client() mqtt.connect() adapters = list() for collection in ['door_status', 'cpu_temp']: adapters.append(bridge(collection)) while True: sleep(1) for adapter in adapters: adapter.unsubscribe() Christopher de Vidal Would you consider yourself a good person? Have you ever taken the 'Good Person' test? It's a fascinating five minute quiz. Google it. On Fri, May 15, 2020 at 9:55 AM Peter Otten <__pete...@web.de> wrote: > Christopher de Vidal wrote: > > > Help please? Creating an MQTT-to-Firestore bridge and I know a decorator > > would help but I'm stumped how to create one. I've used decorators before > > but not with arguments. > > > > The Firestore collection.on_snapshot() method invokes a callback and > sends > > it three parameters (collection_snapshot, changes, and read_time). I need > > the callback to also know the name of the collection so that I can > publish > > to the equivalent MQTT topic name. I had thought to add a fourth > parameter > > and I believe a decorator is the right approach but am stumped how to add > > that fourth parameter. How would I do this with the code below? > > > > #!/usr/bin/env python3 > > from google.cloud import firestore > > import firebase_admin > > from firebase_admin import credentials > > import json > > import mqtt > > > > > > firebase_admin.initialize_app(credentials.Certificate("certs/firebase.json")) > > db = firestore.Client() > > mqtt.connect() > > > > > > def load_json(contents): > > try: > > return json.loads(contents) > > except (json.decoder.JSONDecodeError, TypeError): > > return contents > > > > > > def on_snapshot(col_name, col_snapshot, changes, read_time): > > data = dict() > > for doc in col_snapshot: > > serial = doc.id > > contents = load_json(doc.to_dict()['value']) > > data[serial] = contents > > for change in changes: > > serial = change.document.id > > mqtt_topic = col_name + '/' + serial > > contents = data[serial] > > if change.type.name in ['ADDED', 'MODIFIED']: > > mqtt.publish(mqtt_topic, contents) > > elif change.type.name == 'REMOVED': > > mqtt.publish(mqtt_topic, None) > > > > > > # Start repeated code section > > # TODO Better to use decorators but I was stumped on how to pass > arguments > > def door_status_on_snapshot(col_snapshot, changes, read_time): > > on_snapshot('door_status', col_snapshot, changes, read_time) > > > > > > door_status_col_ref = db.collection('door_status') > > door_status_col_watch = > > door_status_col_ref.on_snapshot(door_status_on_snapshot) > > > > # Repetition... > > def cpu_temp_on_snapshot(col_snapshot, changes, read_time): > > on_snapshot('cpu_temp', col_snapshot, changes, read_time) > > > > > > cpu_temp_col_ref = db.collection('cpu_temp') > > cpu_temp_col_watch = cpu_temp_col_ref.on_snapshot(cpu_temp_on_snapshot) > > # End repeated code section > > > > # Start repeated code section > > door_status_col_watch.unsubscribe() > > cpu_temp_col_watch.unsubscribe() > > # Repetition... > > # End repeated code section > > > > Christopher de Vidal > > You might also consider a contextmanager: > > https://docs.python.org/3/library/contextlib.html > > # untested > > @contextmanager > def subscribe(name, col_snapshot, changes, read_time): > def status_on_snapshot(col_snapshot, changes, read_time): > on_snapshot(name, col_snapshot, changes, read_time) > > status_col_ref = db.collection(name) > status_col_watch = status_col_ref.on_snapshot(door_status_on_snapshot) > try: > yield status_col_ref > finally: > status_col_watch.unsubscribe() > > > with subscribe("door_status", ...) as door_status_col_ref: > with subscribe("cpu_temp", ...) as cpu_temp_col_ref: > ... > > > If there are many uniform ones the nested with statements can be > generalized: > > NAMES = "door_status", "cpu_temp", ... > with ExitStack() as stack: > col_refs = [ > stack.enter_context(subscribe(name)) for name in NAMES > ] > > And if you like Camoron's suggestion or the subscribe() generator above > just > gets too unwieldy: a custom class can act as a contextmanager, too. > > https://docs.python.org/3/reference/compound_stmts.html#with > > -- > https://mail.python.org/mailman/listinfo/python-list > -- https://mail.python.org/mailman/listinfo/python-list