"""TcEx Framework Service Trigger Common module."""
# standard library
import json
import os
import threading
import traceback
from typing import Any, Callable, Optional, Union
# first-party
from tcex.pleb.registry import registry
from tcex.services.common_service import CommonService
[docs]class CommonServiceTrigger(CommonService):
"""TcEx Framework Service Trigger Common module.
Shared service logic between the supported service types:
* Custom Trigger Service
* Webhook Trigger Service
"""
def __init__(self, tcex: object):
"""Initialize the Class properties.
Args:
tcex: Instance of TcEx.
"""
super().__init__(tcex)
# properties
self._metrics = {'Active Playbooks': 0, 'Errors': 0, 'Hits': 0, 'Misses': 0}
self.configs = {}
self.config_thread = None
# config callbacks
self.create_config_callback = None
self.delete_config_callback = None
self.trigger_input_model = None
[docs] def _tcex_testing(self, session_id: str, trigger_id: int):
"""Write data required for testing framework to Redis.
Args:
session_id: The context/session id value for the current operation.
trigger_id: The trigger ID for the current playbook.
"""
if self.args.tcex_testing_context is not None:
_context_tracker: str = (
self.redis_client.hget(self.args.tcex_testing_context, '_context_tracker') or '[]'
)
_context_tracker = json.loads(_context_tracker)
_context_tracker.append(session_id)
self.redis_client.hset(
self.args.tcex_testing_context,
'_context_tracker',
json.dumps(_context_tracker),
)
self.redis_client.hset(session_id, '_trigger_id', trigger_id)
# log
self.log.info(
'feature=service, event=testing-context-tracker, '
f'context={session_id}, trigger-id={trigger_id}'
)
[docs] def _tcex_testing_fired_events(self, session_id: str, fired: bool):
"""Write fired event data to KV Store to be used in test validation.
Args:
session_id: The context/session id value for the current operation.
fired: The value to increment the count by.
"""
if self.args.tcex_testing_context is not None:
self.redis_client.hset(
session_id, '#Trigger:9876:_fired!String', json.dumps(str(fired).lower())
)
@property
def command_map(self) -> dict:
"""Return the command map for the current Service type."""
command_map = super().command_map
command_map.update(
{
'createconfig': self.process_create_config_command,
'deleteconfig': self.process_delete_config_command,
}
)
return command_map
[docs] def create_config(self, trigger_id: int, message: str, status: bool):
"""Add config item to service config object.
Args:
trigger_id: The trigger ID for the current config.
message: A simple message for the action.
status: The passed/fail status for the App handling of config.
logfile: The CreateConfig logfile to return in response ack.
"""
try:
if status is not True and self.configs.get(str(trigger_id)) is not None:
# add config to configs
del self.configs[str(trigger_id)]
# send ack response
self.message_broker.publish(
json.dumps(
{
'command': 'Acknowledged',
'logFile': os.path.join(
os.path.basename(os.path.dirname(self.trigger_logfile)),
os.path.basename(self.trigger_logfile),
),
'message': message,
'status': 'Success' if status is True else 'Failed',
'type': 'CreateConfig',
'triggerId': trigger_id,
}
),
self.args.tc_svc_client_topic,
)
except Exception as e:
self.log.error(
'feature=service, event=create-config-callback-exception, '
f'trigger-id={trigger_id}, error="""{e}"""'
)
self.log.trace(traceback.format_exc())
[docs] def delete_config(self, trigger_id: int, message: str, status: str):
"""Delete config item from config object.
Args:
trigger_id: The trigger ID for the current config.
message: A simple message for the action.
status: The passed/fail status for the App handling of config.
"""
try:
# always delete config from configs dict, even when status is False
del self.configs[trigger_id]
# send ack response
self.message_broker.publish(
json.dumps(
{
'command': 'Acknowledged',
'message': message,
'status': 'Success' if status is True else 'Failed',
'type': 'DeleteConfig',
'triggerId': trigger_id,
}
),
self.args.tc_svc_client_topic,
)
except Exception as e:
self.log.error(
'feature=service, event=delete-config-callback-exception, '
f'trigger-id={trigger_id}, error="""{e}"""'
)
# self.log.trace(traceback.format_exc())
[docs] def fire_event(self, callback: Callable[[], bool], **kwargs):
"""Trigger a FireEvent command.
Args:
callback: The trigger method in the App to call.
trigger_ids: A list of trigger ids to trigger.
"""
if not callable(callback):
raise RuntimeError('Callback method (callback) is not a callable.')
# get developer passed trigger_ids
trigger_ids: Optional[list] = kwargs.pop('trigger_ids', None)
for trigger_id, config in list(self.configs.items()):
if trigger_ids is not None and trigger_id not in trigger_ids:
# skip config that don't match developer provided trigger ids
continue
try:
# get a session_id specifically for this thread
session_id: str = self.create_session_id()
# only required for testing in tcex framework
self._tcex_testing(session_id, trigger_id)
# get an instance of PB module with current
# session_id and outputs to pass to callback
outputs: Union[list, str] = config.tc_playbook_out_variables or []
if isinstance(outputs, str):
outputs = outputs.split(',')
playbook: object = self.tcex.get_playbook(
context=session_id, output_variables=outputs
)
self.log.info(f'feature=trigger-service, event=fire-event, trigger-id={session_id}')
# current thread has session_id as name
self.service_thread(
name=session_id,
target=self.fire_event_trigger,
args=(
callback,
playbook,
session_id,
trigger_id,
config,
),
kwargs=kwargs,
session_id=session_id,
trigger_id=trigger_id,
)
except Exception:
self.log.trace(traceback.format_exc())
[docs] def update_trigger_value(self, trigger_id: int, input_name: str, new_value: Any):
"""Send UpdateTriggerValue command.
Args:
trigger_id: the ID of the trigger to update.
input_name: the name of the input to update.
new_value: the new value for the input.
"""
msg = {
'command': 'UpdateTriggerValue',
'triggerId': trigger_id,
'inputName': input_name,
'inputValue': new_value,
}
self.log.info(f'feature=service, event=update-trigger-value, msg={msg}')
self.message_broker.publish(json.dumps(msg), self.args.tc_svc_client_topic)
[docs] def fire_event_publish(
self, trigger_id: int, session_id: str, request_key: Optional[str] = None
):
"""Send FireEvent command.
Args:
trigger_id: The ID of the trigger.
session_id: The generated session for this fired event.
request_key: The request key for this response.
"""
msg = {
'command': 'FireEvent',
'triggerId': trigger_id, # reference to single playbook
'sessionId': session_id, # session for the playbook execution
}
if request_key is not None:
msg['requestKey'] = request_key # reference for a specific playbook execution
self.log.info(f'feature=service, event=fire-event, msg={msg}')
# publish FireEvent command to client topic
self.message_broker.publish(json.dumps(msg), self.args.tc_svc_client_topic)
[docs] def fire_event_trigger(
self,
callback: Callable[[], bool],
playbook: object,
session_id: str,
trigger_id: int,
config: dict,
**kwargs: str,
):
"""Fire event for trigger.
Args:
callback: The App callback method for firing an event.
playbook: A configure playbook instance for using to interact with KvStore.
session_id: The current session Id.
trigger_id: The current trigger Id.
config: A dict containing the configuration information.
"""
self._create_logging_handler()
self.log.info('feature=trigger-service, event=fire-event-trigger')
try:
if callback(playbook, trigger_id, config, **kwargs):
self.increment_metric('Hits')
self.fire_event_publish(trigger_id, session_id)
# capture fired status for testing framework
self._tcex_testing_fired_events(session_id, True)
else:
self.increment_metric('Misses')
self.log.info(
'feature=trigger-service, event=fire-event-callback-miss, '
f'trigger-id={trigger_id}'
)
# capture fired status for testing framework
self._tcex_testing_fired_events(session_id, False)
except Exception as e:
self.increment_metric('Errors')
self.log.error(
f'feature=trigger-service, event=fire-event-callback-exception, error="""{e}"""'
)
self.log.trace(traceback.format_exc())
finally:
self.logger.remove_handler_by_name(self.thread_name)
[docs] def log_config(self, trigger_id: str, config: dict):
"""Log the config while hiding encrypted values.
Args:
trigger_id: The current trigger Id.
config: The configuration to be logged.
"""
logged_config = config.copy()
for param in self.ij.model.params:
if param.encrypt and config.__contains__(param.name):
logged_config[param.get('name')] = '***'
self.log.info(
f'feature=service, event=create-config, trigger_id={trigger_id}, config={logged_config}'
)
[docs] def process_create_config_command(self, message: dict):
"""Process the CreateConfig command.
.. code-block:: python
:linenos:
:lineno-start: 1
{
"appId": 387,
"command": "CreateConfig",
"triggerId": 1,
"config": {
"password": "test-pass",
"username": "test-user",
"cc_action": "pass",
"tc_playbook_out_variables": "#Trigger:1:testing.body!String"
},
"apiToken": "SVC:8:QQuyIp:1596817138182:387:+9vBOAT8Y56caHRcjLa4IwAqABoatsYOU ... ",
"expireSeconds": 1596817138
}
Args:
message: The message payload from the server topic.
"""
config: dict = message.get('config')
status = True
trigger_id = int(message.get('triggerId'))
# create trigger id logging filehandler
self.logger.add_thread_file_handler(
backup_count=1, # required for logs to be rotated
name=self.thread_name,
filename=self.trigger_logfile,
level=self.args.tc_log_level,
max_bytes=1048576, # 1Mb
path=self.args.tc_log_path,
handler_key=trigger_id,
thread_key='trigger_id',
)
# log the config
self.log_config(trigger_id, config)
# register config apiToken
self.token.register_token(trigger_id, message.get('apiToken'), message.get('expireSeconds'))
# temporarily add config, will be removed if callback fails
self.configs[trigger_id] = config
msg = 'Create Config'
if callable(self.create_config_callback):
kwargs = {}
if self.ij.model.is_webhook_trigger_app:
# only webhook triggers get and require the PB url
kwargs['url'] = message.get('url')
try:
# Resolve any variables in config
updated_config = {
k: (
registry.inputs.resolve_variable(v)
if registry.inputs.utils.is_tc_variable(v)
else v
)
for k, v in config.items()
}
updated_config['trigger_id'] = trigger_id
# Instantiate trigger input model
# pylint: disable=not-callable
config_input = self.trigger_input_model(**updated_config)
self.configs[trigger_id] = config_input
# call callback for create config and handle exceptions to protect thread
# pylint: disable=not-callable
response: Optional[dict] = self.create_config_callback(config_input, **kwargs)
if isinstance(response, dict):
status = response.get('status', False)
msg = response.get('msg')
# if callback does not return a boolean value assume it worked
if not isinstance(status, bool):
status = True
except Exception as e:
status = False
msg = str(e)
self.log.error(
f'feature=service, event=create-config-callback-exception, error="""{e}"""'
)
self.log.error(message)
self.log.trace(traceback.format_exc())
# create config after callback to report status and message
self.create_config(trigger_id, msg, status)
[docs] def process_delete_config_command(self, message: dict):
"""Process the DeleteConfig command.
.. code-block:: python
:linenos:
:lineno-start: 1
{
"appId": 387,
"command": "DeleteConfig",
"triggerId": 1
}
Args:
message: The message payload from the server topic.
"""
status = True
trigger_id = int(message.get('triggerId'))
self.log.info(f'feature=service, event=delete-config, trigger_id={trigger_id}')
# unregister config apiToken
self.token.unregister_token(trigger_id)
msg = 'Delete Config'
if callable(self.delete_config_callback):
try:
# call callback for delete config and handle exceptions to protect thread
# pylint: disable=not-callable
status: Optional[bool] = self.delete_config_callback(trigger_id)
# if callback does not return a boolean value assume it worked
if not isinstance(status, bool):
status = True
except Exception as e:
self.log.error(
f'feature=service, event=delete-config-callback-exception, error="""{e}"""'
)
self.log.trace(traceback.format_exc())
status = False
# delete config
self.delete_config(trigger_id, msg, status)
# remove temporary logging file handler
self.logger.remove_handler_by_name(self.thread_name)
@property
def trigger_logfile(self) -> str:
"""Return the logfile name based on date and thread name."""
return f'''trigger-id-{self.thread_trigger_id}.log'''
@property
def thread_trigger_id(self) -> Optional[str]:
"""Return the current thread trigger id."""
trigger_id = None
if hasattr(threading.current_thread(), 'trigger_id'):
trigger_id = threading.current_thread().trigger_id
return trigger_id