Patrick,
I'm not sure what is going on. Could you try the attached? It should log
some additional information about the queue
Thanks. - Rich
On Monday, 24 June 2019 01:20:35 UTC-4, Patrick Mendiuk wrote:
>
> The original topic has been queued and processed without issue for over 24
> hours. I added some other topics that are queued but never processed:
>
> Jun 23 21:51:08 MITX-6930 weewx[8383]: MQTTSubscribeService: Queue was
> empty
> Jun 23 21:51:08 MITX-6930 weewx[8383]: MQTTSubscribeService: Packet after
> update is: 2019-06-23 21:51:07 PDT (1561351867) dateTime: 1561351867,
> usUnits: 17, windDir: 0, windSpeed: 0.0
> Jun 23 21:51:08 MITX-6930 weewx[8383]: MQTTSubscribeService: Packet prior
> to update is: 2019-06-23 21:51:07 PDT (1561351867) dateTime: 1561351867,
> usUnits: 17, windDir: 0, windSpeed: 0.0
> Jun 23 21:51:08 MITX-6930 weewx[8383]: MQTTSubscribeService: Processing
> interval: 1561351864.000000 1561351867.000000
> Jun 23 21:51:08 MITX-6930 weewx[8383]: MQTTSubscribe: TopicManager queue
> size is: 0
> Jun 23 21:51:08 MITX-6930 weewx[8383]: MQTTSubscribeService: Queue was
> empty
> Jun 23 21:51:08 MITX-6930 weewx[8383]: MQTTSubscribeService: Packet after
> update is: 2019-06-23 21:51:07 PDT (1561351867) dateTime: 1561351867,
> usUnits: 17, windDir: 0, windSpeed: 0.0
> Jun 23 21:51:08 MITX-6930 weewx[8383]: restx: MQTT: Published record 2019-
> 06-23 21:51:07 PDT (1561351867)
> Jun 23 21:51:09 MITX-6930 weewx[8383]: MQTTSubscribe:
> MessageCallbackProvider For sensor/bathroom/Humidity has QOS of 0 and
> retain of 0 received: {"dateTime":1561351868.0,"extraHumid1":63.0}
> Jun 23 21:51:09 MITX-6930 weewx[8383]: MQTTSubscribe: TopicManager Added
> to queue sensor/bathroom/Humidity sensor/bathroom/Humidity 2019-06-23 21:
> 51:08 PDT (1561351868): dateTime: 1561351868.0, extraHumid1: 63.0, usUnits
> : 1
> Jun 23 21:51:11 MITX-6930 weewx[8383]: weatherflowudp: MainThread: raw
> packet: {'serial_number': 'SK-00015052', 'type': 'rapid_wind', 'ob': [
> 1561351870, 0.0, 0], 'hub_sn': 'HB-00011764'}
> Jun 23 21:51:11 MITX-6930 weewx[8383]: weatherflowudp: MainThread: parsed
> packet: {'serial_number.SK_00015052.rapid_wind': 'SK-00015052',
> 'type.SK_00015052.rapid_wind': 'rapid_wind',
> 'hub_sn.SK_00015052.rapid_wind': 'HB-00011764',
> 'time_epoch.SK_00015052.rapid_wind': 1561351870,
> 'ob.SK_00015052.rapid_wind': [1561351870, 0.0, 0],
> 'wind_speed.SK_00015052.rapid_wind': 0.0,
> 'wind_direction.SK_00015052.rapid_wind': 0, 'time_epoch': 1561351870}
> Jun 23 21:51:11 MITX-6930 weewx[8383]: weatherflowudp: MainThread: Loop
> packet: {'windDir': 0, 'windSpeed': 0.0, 'usUnits': 17, 'dateTime':
> 1561351870}
> Jun 23 21:51:11 MITX-6930 weewx[8383]: MQTTSubscribeService: Packet prior
> to update is: 2019-06-23 21:51:10 PDT (1561351870) dateTime: 1561351870,
> usUnits: 17, windDir: 0, windSpeed: 0.0
> Jun 23 21:51:11 MITX-6930 weewx[8383]: MQTTSubscribeService: Processing
> interval: 1561351867.000000 1561351870.000000
> Jun 23 21:51:11 MITX-6930 weewx[8383]: MQTTSubscribe: TopicManager queue
> size is: 7
> Jun 23 21:51:11 MITX-6930 weewx[8383]: MQTTSubscribeService: Queue was
> empty
> Jun 23 21:51:11 MITX-6930 weewx[8383]: MQTTSubscribeService: Packet after
> update is: 2019-06-23 21:51:10 PDT (1561351870) dateTime: 1561351870,
> usUnits: 17, windDir: 0, windSpeed: 0.0
> Jun 23 21:51:11 MITX-6930 weewx[8383]: MQTTSubscribeService: Packet prior
> to update is: 2019-06-23 21:51:10 PDT (1561351870) dateTime: 1561351870,
> usUnits: 17, windDir: 0, windSpeed: 0.0
> Jun 23 21:51:11 MITX-6930 weewx[8383]: MQTTSubscribeService: Processing
> interval: 1561351867.000000 1561351870.000000
> Jun 23 21:51:11 MITX-6930 weewx[8383]: MQTTSubscribe: TopicManager queue
> size is: 16
> Jun 23 21:51:11 MITX-6930 weewx[8383]: MQTTSubscribeService: Queue was
> empty
> Jun 23 21:51:11 MITX-6930 weewx[8383]: MQTTSubscribeService: Packet after
> update is: 2019-06-23 21:51:10 PDT (1561351870) dateTime: 1561351870,
> usUnits: 17, windDir: 0, windSpeed: 0.0
> Jun 23 21:51:11 MITX-6930 weewx[8383]: MQTTSubscribeService: Packet prior
> to update is: 2019-06-23 21:51:10 PDT (1561351870) dateTime: 1561351870,
> usUnits: 17, windDir: 0, windSpeed: 0.0
> Jun 23 21:51:11 MITX-6930 weewx[8383]: MQTTSubscribeService: Processing
> interval: 1561351867.000000 1561351870.000000
> Jun 23 21:51:11 MITX-6930 weewx[8383]: MQTTSubscribe: TopicManager queue
> size is: 0
>
> -Patrick
>
>
--
You received this message because you are subscribed to the Google Groups
"weewx-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To view this discussion on the web visit
https://groups.google.com/d/msgid/weewx-user/78d8cf6a-6582-429d-9c4b-1b39b6f46da7%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
"""
WeeWX driver and service that subscribes to MQTT topics and
creates/updates loop packets/archive records.
Installation:
Put this file in the bin/user directory.
Update weewx.conf [MQTTSubscribeService] as needed to configure the service.
Update weewx.conf [MQTTSubscribeDriver] as needed to configure the driver.
Update weewx.conf [Accumulator] for any custom fields.
Overview:
The MQTT loop_start is used to run a separate thread to manage the MQTT subscriptions.
The payloads are put on a queue to be processed by the driver or service.
The service binds to either the NEW_LOOP_PACKET or NEW_ARCHIVE_RECORD event.
On this event, it processes the queue of MQTT payloads and updates the packet or record
The driver processes the queue and generates a packet for each element currently in the queue.
A topic can be desinated as an 'archive topic'. Data in this topic is returned as an archive record.
Configuration:
[MQTTSubscribeService] or [MQTTSubscribeDriver]
# The MQTT server.
# Default is: localhost
host = localhost
# The port to connect to.
# Default is: 1883
port = 1883
# Maximum period in seconds allowed between communications with the broker.
# Default is: 60
keepalive = 60
# username for broker authentication.
# Default is: None
username =
# password for broker authentication.
# Default is: None
password =
# Controls the MQTT logging.
# Default is: false
log = false
# The clientid to connect with.
# Service default is: MQTTSubscribeService-xxxx
# Driver default is: MQTTSubscribeDriver-xxxx
# where xxxx is a random number between 1000 and 9999
clientid =
# The topic to subscribe to.
# DEPRECATED - use [[topics]]
topic =
# Turn the service on and off.
# Default is: true
# Only used by the service.
enable = false
# The amount of time in seconds to overlap the start time when processing the MQTT queue.
# When the time of the MQTT payload is less than the end time of the previous packet/record,
# the MQTT payload is ignored. When overlap is set, MQTT payloads within this number of seconds
# of the previous end time will be processed.
# It is best to keep this 0 and only set it if 'Ignoring record outside of interval' messages are seen.
# This option maybe removed in the future.
# Default is: 0
# Only used by the service.
overlap = 0
# The binding, loop or archive.
# Default is: loop
# Only used by the service.
binding = loop
# When the MQTT queue has no data, the amount of time in seconds to wait
# before checking again.
# Default is: 2
# Only used by the driver
wait_before_retry = 2
# Payload in this topic is processed like an archive record.
# Default is: None
# Only used by the driver.
archive_topic =
# Configuration for the message callback.
[[message_callback]]
# The format of the MQTT payload.
# Currently support: individual, json, keyword
# Must be specified.
type = REPLACE_ME
# When True, the full topic (weather/outTemp) is used as the fieldname.
# When false, the topic furthest to the right is used.
# Valid values: True, False
# Default is: False
# Only used when type is 'individual'.
# The delimiter between fieldname and value pairs. (field1=value1, field2=value2).
# Default is: ,
keyword_delimiter = ,
# The separator between fieldname and value pairs. (field1=value1, field2=value2).
# Default is: =
keyword_separator = =
# Mapping to WeeWX names.
[[[label_map]]]
temp1 = extraTemp1
# The topics to subscribe to.
[[topics]
# Units for MQTT payloads without unit value.
# Valid values: US, METRIC, METRICWX
# Default is: US
unit_system = US
# Todo - think about default size
# The maximum queue size.
# When the queue is larger than this value, the oldest element is removed.
# In general the queue should not grow large, but it might if the time
# between the driver creating packets is large and the MQTT broker publishes frequently.
# Or if subscribing to 'individual' payloads with wildcards. This results in many topic
# in a single queue.
# Default is: six.MAXSIZE
max_queue = six.MAXSIZE
[[[first/topic]]]
[[[second/one]]]
"""
from __future__ import with_statement
import six
import configobj
import syslog
import paho.mqtt.client as mqtt
import json
import random
import time
#import weeutil.weeutil
import weeutil
from weeutil.weeutil import to_bool, to_float, to_int, to_sorted_string
import weewx
import weewx.drivers
from weewx.engine import StdService
from collections import deque
VERSION='1.1.3-a'
DRIVER_NAME = 'MQTTSubscribeDriver'
DRIVER_VERSION = VERSION
class Logger:
def __init__(self, console):
self.console = console
def logmsg(self, dst, prefix, msg):
syslog.syslog(dst, '%s: %s' % (prefix, msg))
if self.console:
print('%s: %s' % (prefix, msg))
def logdbg(self, prefix, msg):
self.logmsg(syslog.LOG_DEBUG, prefix, msg)
def loginf(self, prefix, msg):
self.logmsg(syslog.LOG_INFO, prefix, msg)
def logerr(self, prefix, msg):
self.logmsg(syslog.LOG_ERR, prefix, msg)
class CollectData:
def __init__(self, fields):
self.fields = fields
self.data = {}
def add_data(self, in_data):
old_data = {}
for field in self.fields:
# ToDo - might be a better way to determine the fieldname
if field in in_data:
if field in self.data:
old_data = dict(self.data)
self.data = {}
self.data[field] = in_data[field]
self.data['usUnits'] = in_data['usUnits']
self.data['dateTime'] = in_data['dateTime']
return old_data
def get_data(self):
return self.data
class TopicManager:
def __init__(self, config, logger):
self.logger=logger
if len(config.sections) < 1:
raise ValueError("At least one topic must be configured.")
self.logger.loginf("MQTTSubscribe", "TopicManager config is %s" % config)
default_qos = to_int(config.get('qos', 0))
default_unit_system_name = config.get('unit_system', 'US').strip().upper()
if default_unit_system_name not in weewx.units.unit_constants:
raise ValueError("MQTTSubscribe: Unknown unit system: %s" % default_unit_system_name)
unit_system = weewx.units.unit_constants[default_unit_system_name]
max_queue = config.get('max_queue', six.MAXSIZE)
self.wind_fields = ['windGust', 'windGustDir', 'windDir', 'windSpeed']
self.topics = {}
self.subscribed_topics = {}
for topic in config.sections:
topic_dict = config.get(topic, {})
qos = to_int(topic_dict.get('qos', default_qos))
unit_system_name = topic_dict.get('unit_system', default_unit_system_name).strip().upper()
if unit_system_name not in weewx.units.unit_constants:
raise ValueError("MQTTSubscribe: Unknown unit system: %s" % unit_system_name)
unit_system = weewx.units.unit_constants[unit_system_name]
self.subscribed_topics[topic] = {}
self.subscribed_topics[topic]['unit_system'] = unit_system
self.subscribed_topics[topic]['qos'] = qos
self.subscribed_topics[topic]['max_queue'] = topic_dict.get('max_queue',max_queue)
self.subscribed_topics[topic]['queue'] = deque()
self.subscribed_topics[topic]['queue_wind'] = deque()
def append_data(self, topic, in_data, fieldname=None):
data = dict(in_data)
payload = {}
payload['wind_data'] = False
if fieldname in self.wind_fields:
payload['wind_data'] = True
queue = self._get_queue(topic)
self._queue_size_check(queue, self._get_max_queue(topic))
if 'dateTime' not in data:
data['dateTime'] = time.time()
if 'usUnits' not in data:
data['usUnits'] = self._get_unit_system(topic)
self.logger.logdbg("MQTTSubscribe", "TopicManager Added to queue %s %s %s: %s" %(topic, self._lookup_topic(topic), weeutil.weeutil.timestamp_to_string(data['dateTime']), to_sorted_string(data)))
payload['data'] = data
queue.append(payload,)
def get_data(self, topic, end_ts=six.MAXSIZE):
queue = self._get_queue(topic)
self.logger.logdbg("MQTTSubscribe", "TopicManager starting queue %s size is: %i" %(topic, len(queue)))
collector = CollectData(self.wind_fields)
while len(queue) > 0:
if queue[0]['data']['dateTime'] > end_ts:
self.logger.logdbg("MQTTSubscribe", "TopicManager leaving queue: %s size: %i content: %s" %(topic, len(queue), queue[0]))
break
payload = queue.popleft()
wind_data = payload['wind_data']
if wind_data:
self.logger.logdbg("MQTTSubscribe", "TopicManager processing wind data.")
temp_data = payload['data']
data = collector.add_data(temp_data)
else:
data = payload['data']
if data:
self.logger.logdbg("MQTTSubscribe", "TopicManager retrieved queue %s %s: %s" %(topic, weeutil.weeutil.timestamp_to_string(data['dateTime']), to_sorted_string(data)))
yield data
data = collector.get_data()
if data:
self.logger.logdbg("MQTTSubscribe", "TopicManager retrieved wind queue final %s %s: %s" %(topic, weeutil.weeutil.timestamp_to_string(data['dateTime']), to_sorted_string(data)))
yield data
def get_accumulated_data(self, topic, start_ts, end_ts, units):
self.logger.logdbg("MQTTSubscribeService", "Processing interval: %f %f" %(start_ts, end_ts))
accumulator = weewx.accum.Accum(weeutil.weeutil.TimeSpan(start_ts, end_ts))
for data in self.get_data(topic, end_ts):
if data:
try:
self.logger.logdbg("MQTTSubscribeService", "Data to accumulate: %s %s" % (weeutil.weeutil.timestamp_to_string(data['dateTime']), to_sorted_string(data)))
accumulator.addRecord(data)
except weewx.accum.OutOfSpan:
self.logger.loginf("MQTTSubscribeService", "Ignoring record outside of interval %f %f %f %s"
%(start_ts, end_ts, data['dateTime'], (to_sorted_string(data))))
else:
break
target_data = {}
if not accumulator.isEmpty:
aggregate_data = accumulator.getRecord()
self.logger.logdbg("MQTTSubscribeService", "Data prior to conversion is: %s %s" % (weeutil.weeutil.timestamp_to_string(aggregate_data['dateTime']), to_sorted_string(aggregate_data)))
target_data = weewx.units.to_std_system(aggregate_data, units)
self.logger.logdbg("MQTTSubscribeService", "Data after to conversion is: %s %s" % (weeutil.weeutil.timestamp_to_string(target_data['dateTime']), to_sorted_string(target_data)))
else:
self.logger.logdbg("MQTTSubscribeService", "Queue was empty")
return target_data
def _queue_size_check(self, queue, max_queue):
while len(queue) >= max_queue:
element = queue.popleft()
self.logger.logerr("MQTTSubscribe", "TopicManager queue limit %i reached. Removing: %s" %(max_queue, element))
def get_qos(self, topic):
return self._get_value('qos', topic)
def _get_unit_system(self, topic):
return self._get_value('unit_system', topic)
def _get_max_queue(self, topic):
return self._get_value('max_queue', topic)
def _get_queue(self, topic):
return self._get_value('queue', topic)
def _get_wind_queue(self, topic):
return self._get_value('queue_wind', topic)
def _get_value(self, value, topic):
subscribed_topic = self._lookup_topic(topic)
return self.subscribed_topics[subscribed_topic][value]
def _lookup_topic(self, topic):
if topic in self.topics:
return self.topics[topic]
for subscribed_topic in self.subscribed_topics:
if mqtt.topic_matches_sub(subscribed_topic, topic):
self.topics[topic] = subscribed_topic
return subscribed_topic
class MessageCallbackProvider:
def __init__(self, config, logger, topic_manager):
self.logger = logger
self.topic_manager = topic_manager
self._setup_callbacks()
self.type = config.get('type', None)
self.keyword_delimiter = config.get('keyword_delimiter', ',')
self.keyword_separator = config.get('keyword_separator', '=')
self.label_map = config.get('label_map', {})
self.full_topic_fieldname = to_bool(config.get('full_topic_fieldname', False))
if self.type not in self.callbacks:
raise ValueError("Invalid type configured: %s" % self.type)
def get_callback(self):
return self.callbacks[self.type]
def _setup_callbacks(self):
self.callbacks = {}
self.callbacks['individual'] = self._on_message_individual
self.callbacks['json'] = self._on_message_json
self.callbacks['keyword'] = self._on_message_keyword
def _byteify(self, data, ignore_dicts = False):
# if this is a unicode string, return its string representation
if isinstance(data, unicode):
return data.encode('utf-8')
# if this is a list of values, return list of byteified values
if isinstance(data, list):
return [ self._byteify(item, ignore_dicts=True) for item in data ]
# if this is a dictionary, return dictionary of byteified keys and values
# but only if we haven't already byteified it
if isinstance(data, dict) and not ignore_dicts:
data2 = {}
for key, value in data.items():
key2 = self._byteify(key, ignore_dicts=True)
data2[self.label_map.get(key2,key2)] = self._byteify(value, ignore_dicts=True)
return data2
# if it's anything else, return it in its original form
return data
def _log_message(self, msg):
self.logger.logdbg("MQTTSubscribe", "MessageCallbackProvider For %s has QOS of %i and retain of %s received: %s" %(msg.topic, msg.qos, msg.retain, msg.payload))
def _log_exception(self, exception, msg):
self.logger.logerr("MQTTSubscribe", "MessageCallbackProvider on_message_keyword failed with: %s" % exception)
self.logger.logerr("MQTTSubscribe", "**** MessageCallbackProvider Ignoring topic=%s and payload=%s" % (msg.topic, msg.payload))
def _on_message_keyword(self, client, userdata, msg):
# Wrap all the processing in a try, so it doesn't crash and burn on any error
try:
self._log_message(msg)
fields = msg.payload.split(self.keyword_delimiter)
data = {}
for field in fields:
eq_index = field.find(self.keyword_separator)
# Ignore all fields that do not have the separator
if eq_index == -1:
self.logger.logerr("MQTTSubscribe", "MessageCallbackProvider on_message_keyword failed to find separator: %s" % self.keyword_separator)
self.logger.logerr("MQTTSubscribe", "**** MessageCallbackProvider Ignoring field=%s " % field)
continue
name = field[:eq_index].strip()
value = field[eq_index + 1:].strip()
data[self.label_map.get(name, name)] = to_float(value)
if data:
self.topic_manager.append_data(msg.topic, data)
else:
self.logger.logerr("MQTTSubscribe", "MessageCallbackProvider on_message_keyword failed to find data in: topic=%s and payload=%s" % (msg.topic, msg.payload))
except Exception as exception:
self._log_exception(exception, msg)
def _on_message_json(self, client, userdata, msg):
# Wrap all the processing in a try, so it doesn't crash and burn on any error
try:
self._log_message(msg)
# ToDo - better way?
if six.PY2:
data = self._byteify(
json.loads(msg.payload, object_hook=self._byteify),
ignore_dicts=True)
else:
data = json.loads(msg.payload.decode("utf-8"))
self.topic_manager.append_data(msg.topic, data)
except Exception as exception:
self._log_exception(exception, msg)
def _on_message_individual(self, client, userdata, msg):
wind_fields = ['windGust', 'windGustDir', 'windDir', 'windSpeed']
# Wrap all the processing in a try, so it doesn't crash and burn on any error
try:
self._log_message(msg)
if self.full_topic_fieldname:
key = msg.topic.encode('ascii', 'ignore') # ToDo - research
else:
tkey = msg.topic.rpartition('/')[2]
key = tkey.encode('ascii', 'ignore') # ToDo - research
fieldname = self.label_map.get(key,key)
data = {}
data[fieldname] = to_float(msg.payload)
self.topic_manager.append_data(msg.topic, data)
except Exception as exception:
self._log_exception(exception, msg)
# Class to manage MQTT subscriptions
class MQTTSubscribe():
def __init__(self, service_dict, logger):
self.logger = logger
message_callback_config = service_dict.get('message_callback', None)
if message_callback_config is None:
raise ValueError("[[message_callback]] is required.")
message_callback_provider_name = service_dict.get('message_callback_provider', 'user.MQTTSubscribe.MessageCallbackProvider')
self.manager = TopicManager(service_dict.get('topics', {}), self.logger)
clientid = service_dict.get('clientid',
'MQTTSubscribe-' + str(random.randint(1000, 9999)))
clean_session = to_bool(service_dict.get('clean_session', True))
host = service_dict.get('host', 'localhost')
keepalive = to_int(service_dict.get('keepalive', 60))
port = to_int(service_dict.get('port', 1883))
username = service_dict.get('username', None)
password = service_dict.get('password', None)
log = to_bool(service_dict.get('log', False))
self.archive_topic = service_dict.get('archive_topic', None)
if self.archive_topic and self.archive_topic not in service_dict['topics']:
raise ValueError("Archive topic %s must be in [[topics]]" % self.archive_topic)
self.logger.loginf("MQTTSubscribe", "Message callback config is %s" % message_callback_config)
self.logger.loginf("MQTTSubscribe", "Message callback provider is %s" % message_callback_provider_name)
self.logger.loginf("MQTTSubscribe", "Client id is %s" % clientid)
self.logger.loginf("MQTTSubscribe", "Clean session is %s" % clean_session)
self.logger.loginf("MQTTSubscribe", "MQTTSubscribe version is %s" % VERSION)
self.logger.loginf("MQTTSubscribe", "Host is %s" % host)
self.logger.loginf("MQTTSubscribe", "Port is %s" % port)
self.logger.loginf("MQTTSubscribe", "Keep alive is %s" % keepalive)
self.logger.loginf("MQTTSubscribe", "Username is %s" % username)
if password is not None:
self.logger.loginf("MQTTSubscribe", "Password is set")
else:
self.logger.loginf("MQTTSubscribe", "Password is not set")
self.logger.loginf("MQTTSubscribe", "Archive topic is %s" % self.archive_topic)
self.mqtt_logger = {
mqtt.MQTT_LOG_INFO: self.logger.loginf,
mqtt.MQTT_LOG_NOTICE: self.logger.loginf,
mqtt.MQTT_LOG_WARNING: self.logger.loginf,
mqtt.MQTT_LOG_ERR: self.logger.logdbg,
mqtt.MQTT_LOG_DEBUG: self.logger.logdbg
}
self.client = mqtt.Client(client_id=clientid, clean_session=clean_session)
if log:
self.client.on_log = self._on_log
MessageCallbackProvider_class = weeutil.weeutil._get_object(message_callback_provider_name)
message_callback_provider = MessageCallbackProvider_class(message_callback_config,
self.logger,
self.manager)
self.client.on_message = message_callback_provider.get_callback()
self.client.on_subscribe = self._on_subscribe
self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect
if username is not None and password is not None:
self.client.username_pw_set(username, password)
self.client.connect(host, port, keepalive)
@property
def Subscribed_topics(self):
return self.manager.subscribed_topics
def get_data(self, topic, end_ts=six.MAXSIZE):
return self.manager.get_data(topic, end_ts)
def get_accumulated_data(self, topic, start_ts, end_ts, units):
return self.manager.get_accumulated_data(topic, start_ts, end_ts, units)
# start subscribing to the topics
def start(self):
self.logger.logdbg("MQTTSubscribe", "Starting loop")
self.client.loop_start()
# shut it down
def disconnect(self):
self.client.disconnect()
def _on_connect(self, client, userdata, flags, rc):
# https://pypi.org/project/paho-mqtt/#on-connect
# rc:
# 0: Connection successful
# 1: Connection refused - incorrect protocol version
# 2: Connection refused - invalid client identifier
# 3: Connection refused - server unavailable
# 4: Connection refused - bad username or password
# 5: Connection refused - not authorised
# 6-255: Currently unused.
self.logger.logdbg("MQTTSubscribe", "Connected with result code %i" % rc)
self.logger.logdbg("MQTTSubscribe", "Connected flags %s" % str(flags))
for topic in self.manager.subscribed_topics:
(result, mid) = client.subscribe(topic, self.manager.get_qos(topic))
self.logger.logdbg("MQTTSubscribe","Subscribe to %s has a mid %i and rc %i" %(topic, mid, result))
def _on_disconnect(self, client, userdata, rc):
self.logger.logdbg("MQTTSubscribe", "Disconnected with result code %i" %rc)
def _on_subscribe(self, client, userdata, mid, granted_qos):
self.logger.logdbg("MQTTSubscribe", "Subscribed to topic mid: %i is size %i has a QOS of %i" %(mid, len(granted_qos), granted_qos[0]))
def _on_log(self, client, userdata, level, msg):
self.mqtt_logger[level]("MQTTSubscribe/MQTT", msg)
class MQTTSubscribeService(StdService):
def __init__(self, engine, config_dict):
super(MQTTSubscribeService, self).__init__(engine, config_dict)
service_dict = config_dict.get('MQTTSubscribeService', {})
console = to_bool(service_dict.get('console', False))
self.logger = Logger(console)
self.enable = to_bool(service_dict.get('enable', True))
if not self.enable:
self.logger.loginf("MQTTSubscribeService", "Service is not enabled, exiting.")
return
self.overlap = to_float(service_dict.get('overlap', 0))
binding = service_dict.get('binding', 'loop')
self.logger.loginf("MQTTSubscribeService", "Binding is %s" % binding)
self.logger.loginf("MQTTSubscribeService", "Overlap is %s" % self.overlap)
if 'archive_topic' in service_dict:
raise ValueError("archive_topic, %s, is invalid when running as a service" % service_dict['archive_topic'])
self.end_ts = 0 # prime for processing loop packet
self.wind_fields = ['windGust', 'windGustDir', 'windDir', 'windSpeed']
self.subscriber = MQTTSubscribe(service_dict, self.logger)
self.subscriber.start()
if (binding == 'archive'):
self.bind(weewx.NEW_ARCHIVE_RECORD, self.new_archive_record)
elif (binding == 'loop'):
self.bind(weewx.NEW_LOOP_PACKET, self.new_loop_packet)
else:
raise ValueError("MQTTSubscribeService: Unknown binding: %s" % binding)
def shutDown(self):
self.subscriber.disconnect()
def new_loop_packet(self, event):
# packet has traveled back in time
if self.end_ts - self.overlap > event.packet['dateTime']:
self.logger.logerr("MQTTSubscribeService", "Ignoring packet has dateTime of %f which is prior to previous packet %f" %(event.packet['dateTime'], self.end_ts))
else:
start_ts = self.end_ts - self.overlap
self.end_ts = event.packet['dateTime']
for topic in self.subscriber.Subscribed_topics: # investigate that topics might not be cached.. therefore use subscribed
self.logger.logdbg("MQTTSubscribeService", "Packet prior to update is: %s %s" % (weeutil.weeutil.timestamp_to_string(event.packet['dateTime']), to_sorted_string(event.packet)))
target_data = self.subscriber.get_accumulated_data(topic, start_ts, self.end_ts, event.packet['usUnits'])
event.packet.update(target_data)
self.logger.logdbg("MQTTSubscribeService", "Packet after update is: %s %s" % (weeutil.weeutil.timestamp_to_string(event.packet['dateTime']), to_sorted_string(event.packet)))
# this works for hardware generation, but software generation does not 'quality control'
# the archive record, so this data is not 'QC' in this case.
# If this is important, bind to the loop packet.
def new_archive_record(self, event):
end_ts = event.record['dateTime']
start_ts = end_ts - event.record['interval'] * 60 - self.overlap
for topic in self.subscriber.Subscribed_topics: # investigate that topics might not be cached.. therefore use subscribed
self.logger.logdbg("MQTTSubscribeService", "Record prior to update is: %s %s" % (weeutil.weeutil.timestamp_to_string(event.record['dateTime']), to_sorted_string(event.record)))
target_data = self.subscriber.get_accumulated_data(topic, start_ts, self.end_ts, event.record['usUnits'])
event.record.update(target_data)
self.logger.logdbg("MQTTSubscribeService", "Record after update is: %s %s" % (weeutil.weeutil.timestamp_to_string(event.record['dateTime']), to_sorted_string(event.record)))
def loader(config_dict, engine):
config = configobj.ConfigObj(config_dict)
return MQTTSubscribeDriver(**config[DRIVER_NAME])
def confeditor_loader():
return MQTTSubscribeDriverConfEditor()
class MQTTSubscribeDriver(weewx.drivers.AbstractDevice):
"""weewx driver that reads data from MQTT"""
def __init__(self, **stn_dict):
console = to_bool(stn_dict.get('console', False))
self.logger = Logger(console)
self.wait_before_retry = float(stn_dict.get('wait_before_retry', 2))
self._archive_interval = to_int(stn_dict.get('archive_interval', 300))
self.archive_topic = stn_dict.get('archive_topic', None)
self.logger.loginf("MQTTSubscribeDriver", "Wait before retry is %i" % self.wait_before_retry)
self.wind_fields = ['windGust', 'windGustDir', 'windDir', 'windSpeed']
self.subscriber = MQTTSubscribe(stn_dict, self.logger)
self.subscriber.start()
@property
def hardware_name(self):
return "MQTTSubscribeDriver"
@property
def archive_interval(self):
return self._archive_interval
def closePort(self):
self.subscriber.disconnect()
def genLoopPackets(self):
while True:
for topic in self.subscriber.Subscribed_topics: # investigate that topics might not be cached.. therefore use subscribed
if topic == self.archive_topic:
continue
for data in self.subscriber.get_data(topic):
if data:
yield data
else:
break
self.logger.logdbg("MQTTSubscribeDriver", "Queues are empty.")
time.sleep(self.wait_before_retry)
def genArchiveRecords(self, lastgood_ts):
if not self.archive_topic:
self.logger.logdbg("MQTTSubscribeDriver", "No archive topic configured.")
raise NotImplementedError
else:
for data in self.subscriber.get_data(self.archive_topic, lastgood_ts):
if data:
yield data
else:
break
class MQTTSubscribeDriverConfEditor(weewx.drivers.AbstractConfEditor): # pragma: no cover
@property
def default_stanza(self):
return """
[MQTTSubscribeDriver]
# This section is for the MQTTSubscribe driver.
# The driver to use:
driver = user.MQTTSubscribe
# The MQTT server.
# Default is: localhost
host = localhost
# The port to connect to.
# Default is: 1883
port = 1883
# Maximum period in seconds allowed between communications with the broker.
# Default is: 60
keepalive = 60
# Units for MQTT payloads without unit value.
# Valid values: US, METRIC, METRICWX
# Default is: US
unit_system = US
# Configuration for the message callback.
[[message_callback]]
# The format of the MQTT payload.
# Currently support: individual, json, keyword
# Must be specified.
type = REPLACE_ME
# The topics to subscribe to.
[[topics]]
[[[FIRST/REPLACE_ME]]]
[[[SECOND/REPLACE_ME]]]
"""
def prompt_for_settings(self):
settings = {}
settings['message_callback'] = {}
settingd['topics'] = {}
print("Enter the host.")
settings['host'] = self._prompt('host', 'localhost')
print("Enter the port on the host.")
settings['port'] = self._prompt('port', '1883')
print("Enter the maximum period in seconds allowed between communications with the broker.")
settings['keepalive'] = self._prompt('keepalive', '60')
print("Enter the units for MQTT payloads without unit value: US|METRIC|METRICWX")
settings['topics']['unit_system'] = self._prompt('unit_system', 'US', ['US', 'METRIC', 'METRICWX'])
print("Enter the MQTT paylod type: individual|json|keyword")
settings['message_callback']['type'] = self._prompt('type', 'json', ['individual', 'json', 'keyword'])
return settings
# To Run
# setup.py install:
# PYTHONPATH=/home/weewx/bin python /home/weewx/bin/user/MQTTSubscribe.py
#
# rpm or deb package install:
# PYTHONPATH=/usr/share/weewx python /usr/share/weewx/user/MQTTSubscribe.py
if __name__ == '__main__': # pragma: no cover
import optparse
import os
import sys
from weewx.engine import StdEngine
usage = """MQTTSubscribeService --help
wee_config CONFIG_FILE
[--records=RECORD_COUNT]
[--interval=INTERVAL]
[--delay=DELAY]
[--units=US|METRIC|METRICWX]
[--binding=archive|loop]
[--type=driver|service]
[--verbose]
[--console]
"""
def main():
parser = optparse.OptionParser(usage=usage)
parser.add_option('--records', dest='record_count', type=int,
help='The number of archive records to create.',
default=2)
parser.add_option('--interval', dest='interval', type=int,
help='The archive interval in seconds.',
default=300)
parser.add_option('--delay', dest='delay', type=int,
help='The archive delay in seconds.',
default=15)
parser.add_option("--units", choices=["US", "METRIC", "METRICWX"],
help="The default units if not in MQTT payload.",
default="US")
parser.add_option("--binding", choices=["archive", "loop"],
help="The type of binding.",
default="archive")
parser.add_option("--type", choices=["driver", "service"],
help="The simulation type.",
default="driver")
parser.add_option("--verbose", action="store_true", dest="verbose",
help="Log extra output (debug=1).")
parser.add_option("--console", action="store_true", dest="console",
help="Log to console in addition to syslog.")
(options, args) = parser.parse_args()
simulation_type = options.type
binding = options.binding
record_count = options.record_count
interval = options.interval
delay = options.delay
units= weewx.units.unit_constants[options.units]
syslog.openlog('wee_MQTTSS', syslog.LOG_PID | syslog.LOG_CONS)
if options.verbose:
syslog.setlogmask(syslog.LOG_UPTO(syslog.LOG_DEBUG))
else:
syslog.setlogmask(syslog.LOG_UPTO(syslog.LOG_INFO))
config_path = os.path.abspath(args[0])
config_dict = configobj.ConfigObj(config_path, file_error=True)
min_config_dict = {
'Station': {
'altitude': [0, 'foot'],
'latitude': 0,
'station_type': 'Simulator',
'longitude': 0
},
'Simulator': {
'driver': 'weewx.drivers.simulator',
},
'Engine': {
'Services': {}
}
}
print("Simulation is %s" % simulation_type)
print("Creating %i %s records" % (record_count, binding))
print("Interval is %i seconds" % interval)
print("Delay is %i seconds" % delay)
engine = StdEngine(min_config_dict)
weewx.accum.initialize(config_dict)
# override the configured binding with the parameter value
weeutil.weeutil.merge_config(config_dict,
{'MQTTSubscribeService': {'binding': binding}})
# if specified, override the console logging
if options.console:
weeutil.weeutil.merge_config(config_dict,
{'MQTTSubscribeService': {'console': True}})
weeutil.weeutil.merge_config(config_dict,
{'MQTTSubscribeDriver': {'console': True}})
if simulation_type =="service":
simulate_service(engine, config_dict, binding, record_count, interval, delay, units)
elif simulation_type == "driver":
driver = "user.MQTTSubscribe"
__import__(driver)
# This is a bit of Python wizardry. First, find the driver module
# in sys.modules.
driver_module = sys.modules[driver]
# Find the function 'loader' within the module:
loader_function = getattr(driver_module, 'loader')
driver = loader_function(config_dict, engine)
if binding == "archive":
simulate_driver_archive(driver, record_count, interval, delay)
elif binding == "loop":
simulate_driver_packet(driver, record_count)
def simulate_driver_archive(driver, record_count, interval, delay):
i = 0
while i < record_count:
current_time = int(time.time() + 0.5)
end_period_ts = (int(current_time / interval) + 1) * interval
end_delay_ts = end_period_ts + delay
sleep_amount = end_delay_ts - current_time
print("Sleeping %i seconds" % sleep_amount)
time.sleep(sleep_amount)
for record in driver.genArchiveRecords(end_period_ts):
print("Record is: %s %s" % (weeutil.weeutil.timestamp_to_string(record['dateTime']), to_sorted_string(record)))
i += 1
def simulate_driver_packet(driver, record_count):
i = 0
for packet in driver.genLoopPackets():
print("Packet is: %s %s" % (weeutil.weeutil.timestamp_to_string(packet['dateTime']), to_sorted_string(packet)))
i += 1
if i >= record_count:
break
def simulate_service(engine, config_dict, binding, record_count, interval, delay, units):
service = MQTTSubscribeService(engine, config_dict)
i = 0
while i < record_count:
current_time = int(time.time() + 0.5)
end_period_ts = (int(current_time / interval) + 1) * interval
end_delay_ts = end_period_ts + delay
sleep_amount = end_delay_ts - current_time
print("Sleeping %i seconds" % sleep_amount)
time.sleep(sleep_amount)
data = {}
data['dateTime'] = end_period_ts
data['usUnits'] = units
if binding == 'archive':
data['interval'] = interval / 60
new_archive_record_event = weewx.Event(weewx.NEW_ARCHIVE_RECORD,
record=data,
origin='hardware')
engine.dispatchEvent(new_archive_record_event)
print("Archive Record is: %s %s" % (weeutil.weeutil.timestamp_to_string(new_archive_record_event.record['dateTime']), to_sorted_string(new_archive_record_event.record)))
elif binding == 'loop':
new_loop_packet_event = weewx.Event(weewx.NEW_LOOP_PACKET,
packet=data)
engine.dispatchEvent(new_loop_packet_event)
print("Loop packet is: %s %s" % (weeutil.weeutil.timestamp_to_string(new_loop_packet_event.packet['dateTime']), to_sorted_string(new_loop_packet_event.packet)))
else:
pass
i += 1
service.shutDown()
main()