"""TcEx Framework MQTT message broker module."""
# standard library
import logging
import ssl
import time
import traceback
from typing import TYPE_CHECKING, List, Optional
# third-party
import paho.mqtt.client as mqtt
if TYPE_CHECKING:
# first-party
from tcex.input.field_types.sensitive import Sensitive
# get tcex logger
logger = logging.getLogger('tcex')
[docs]class MqttMessageBroker:
"""TcEx Framework MQTT message broker module."""
def __init__(
self,
broker_host: str,
broker_port: int,
broker_timeout: int,
broker_token: Optional['Sensitive'] = None,
broker_cacert: Optional[str] = None,
):
"""Initialize the Class properties.
Args:
broker_host: The MQTT server host (IP).
broker_port: The MQTT server port.
broker_timeout: The MQTT connection timeout.
broker_token: The MQTT connect token.
broker_cacert: The CA certfile for connection.
logger: A logging instance
"""
self.broker_host = broker_host
self.broker_port = int(broker_port)
self.broker_timeout = int(broker_timeout)
self.broker_token = broker_token
self.broker_cacert = broker_cacert
# properties
self._client = None
self._connected = False
self._on_connect_callbacks: List[callable] = []
self._on_disconnect_callbacks: List[callable] = []
self._on_log_callbacks: List[callable] = []
self._on_message_callbacks: List[callable] = []
self._on_publish_callbacks: List[callable] = []
self._on_subscribe_callbacks: List[callable] = []
self._on_unsubscribe_callbacks: List[callable] = []
self.log = logger
self.shutdown = False # used in service App for shutdown flag
[docs] def add_on_connect_callback(self, callback: callable, index: Optional[int] = None):
"""Add a callback for on_connect events.
Args:
callback: A callback that matches signature of an on_connect event.
index: The index value to insert the callback into the list.
"""
index = index or len(self._on_connect_callbacks)
self._on_connect_callbacks.insert(index, callback)
[docs] def add_on_disconnect_callback(self, callback: callable, index: Optional[int] = None):
"""Add a callback for on_disconnect events.
Args:
callback: A callback that matches signature of an on_disconnect event.
index: The index value to insert the callback into the list.
"""
index = index or len(self._on_disconnect_callbacks)
self._on_disconnect_callbacks.insert(callback)
[docs] def add_on_log_callback(self, callback: callable, index: Optional[int] = None):
"""Add a callback for on_log events.
Args:
callback: A callback that matches signature of an on_log event.
index: The index value to insert the callback into the list.
"""
index = index or len(self._on_log_callbacks)
self._on_log_callbacks.insert(callback)
[docs] def add_on_message_callback(
self, callback: callable, index: Optional[int] = None, topics: Optional[List[str]] = None
):
"""Add a callback for on_message events.
Args:
callback: A callback that matches signature of an on_message event.
index: The index value to insert the callback into the list.
topics: A optional list of topics to call callback. If value is None then callback
will always be called.
"""
index = index or len(self._on_message_callbacks)
self._on_message_callbacks.insert(index, {'callback': callback, 'topics': topics})
[docs] def add_on_publish_callback(self, callback: callable, index: Optional[int] = None):
"""Add a callback for on_publish events.
Args:
callback: A callback that matches signature of an on_publish event.
index: The index value to insert the callback into the list.
"""
index = index or len(self._on_publish_callbacks)
self._on_publish_callbacks.insert(index, callback)
[docs] def add_on_subscribe_callback(self, callback: callable, index: Optional[int] = None):
"""Add a callback for on_subscribe events.
Args:
callback: A callback that matches signature of an on_subscribe event.
index: The index value to insert the callback into the list.
"""
index = index or len(self._on_subscribe_callbacks)
self._on_subscribe_callbacks.insert(index, callback)
[docs] def add_on_unsubscribe_callback(self, callback: callable, index: Optional[int] = None):
"""Add a callback for on_unsubscribe events.
Args:
callback: A callback that matches signature of an on_unsubscribe event.
index: The index value to insert the callback into the list.
"""
index = index or len(self._on_unsubscribe_callbacks)
self._on_unsubscribe_callbacks.insert(index, callback)
@property
def client(self) -> object:
"""Return MQTT client."""
if self._client is None:
try:
self._client = mqtt.Client(client_id='', clean_session=True)
self._client.reconnect_delay_set(min_delay=1, max_delay=5)
self._client.connect(self.broker_host, self.broker_port, self.broker_timeout)
if self.broker_cacert is not None:
self._client.tls_set(
ca_certs=self.broker_cacert,
cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLSv1_2,
)
self._client.tls_insecure_set(False)
# add logger when logging in TRACE
if self.log.getEffectiveLevel() == 5:
self._client.enable_logger(logger=self.log)
# username must be a empty string
if self.broker_token is not None:
self._client.username_pw_set('', password=self.broker_token.value)
except Exception as e:
self.log.error(f'feature=message-broker, event=failed-connection, error="""{e}"""')
self.log.trace(traceback.format_exc())
self.shutdown = True
return self._client
[docs] def connect(self):
"""Listen for message coming from broker."""
try:
# handle connection issues by not using loop_forever. give the service X seconds to
# connect to message broker, else timeout and log generic connection error.
self.client.loop_start()
deadline = time.time() + self.broker_timeout
while True:
if not self._connected and deadline < time.time():
self.client.loop_stop()
raise ConnectionError(
f'failed to connect to message broker host '
f'{self.broker_host} on port '
f'{self.broker_port}.'
)
time.sleep(1)
except Exception as e:
self.log.trace(f'feature=message-broker, event=connection-error, error="""{e}"""')
self.log.error(traceback.format_exc())
[docs] def on_connect(self, client, userdata, flags, rc):
"""Handle MQTT on_connect events."""
self.log.info(f'feature=message-broker, event=broker-connect, status={str(rc)}')
self._connected = True
for callback in self._on_connect_callbacks:
callback(client, userdata, flags, rc)
[docs] def on_disconnect(self, client, userdata, rc):
"""Handle MQTT on_disconnect events."""
self.log.info(f'feature=message-broker, event=broker-disconnect, status={str(rc)}')
for callback in self._on_disconnect_callbacks:
callback(client, userdata, rc)
[docs] def on_log(self, client, userdata, level, buf):
"""Handle MQTT on_log events."""
# self.log.trace(f'feature=message-broker, event=on_log, buf={buf}, level={level}')
for callback in self._on_log_callbacks:
callback(client, userdata, level, buf)
[docs] def on_message(self, client, userdata, message):
"""Handle MQTT on_message events."""
mp = message.payload.decode().replace('\n', '')
self.log.trace(
f'''feature=message-broker, message-topic={message.topic}, message-payload={mp}'''
)
for cd in self._on_message_callbacks:
topics = cd.get('topics')
if topics is None or message.topic in topics:
cd.get('callback')(client, userdata, message)
[docs] def on_publish(self, client, userdata, result):
"""Handle MQTT on_publish events."""
# self.log.trace(f'feature=message-broker, event=on_publish, result={result}')
for callback in self._on_publish_callbacks:
callback(client, userdata, result)
[docs] def on_subscribe(self, client, userdata, mid, granted_qos):
"""Handle MQTT on_subscribe events."""
# self.log.trace(
# f'feature=message-broker, event=on_subscribe, mid={mid}, granted_qos={granted_qos}'
# )
for callback in self._on_subscribe_callbacks:
callback(client, userdata, mid, granted_qos)
[docs] def on_unsubscribe(self, client, userdata, mid):
"""Handle MQTT on_unsubscribe events."""
# self.log.trace(f'feature=message-broker, event=on_subscribe, mid={mid}')
for callback in self._on_unsubscribe_callbacks:
callback(client, userdata, mid)
[docs] def publish(self, message: str, topic: str):
"""Publish a message on client topic.
Args:
message: The message to be sent on client topic.
topic: The broker topic.
"""
r: object = self.client.publish(topic, message)
self.log.debug(
f'feature=service, event=publish-message, topic="{topic}", '
f'message={message}, response={r}'
)
[docs] def register_callbacks(self):
"""Register all the message broker callbacks."""
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_log = self.on_log
self.client.on_message = self.on_message
self.client.on_publish = self.on_publish
self.client.on_subscribe = self.on_subscribe
self.client.on_unsubscribe = self.on_unsubscribe