Source code for tcex.services.webhook_trigger_service

"""TcEx Framework Webhook Service Trigger module."""
# standard library
import base64
import json
import traceback
from typing import Any, Callable, Optional, Union

from .common_service_trigger import CommonServiceTrigger


[docs]class WebhookTriggerService(CommonServiceTrigger): """TcEx Framework Webhook Service Trigger module.""" def __init__(self, tcex: object): """Initialize the Class properties. Args: tcex: Instance of TcEx. """ super().__init__(tcex) # config callbacks self.webhook_event_callback = None self.webhook_marshall_event_callback = None
[docs] def callback_response_handler(self, callback_response: Any, message: dict): """Handle the different types of callback responses. # Webhook App (default) * Dict - Playbook will not be launched and provided data will be used in the response to the client. * True - Playbook will be launched. * Else - Playbook will NOT be launched. # webhookResponseMarshall Feature App * Callable - Playbook will be launched and if marshall callback will be set to response. * True - Playbook will be launched. * Else - Playbook will NOT be launched. # webhookServiceEndpoint Feature App For this feature the callback method must fire the event on it's own. * Dict - Playbook will not be launched and provided data will be used in the response to the client. * Else - Response will be set to default of statusCode=200, body=None, and headers=[]. Args: callback_response: The response from the webhook callback method. message: The message payload from the server topic. """ if self.ij.has_feature('webhookserviceendpoint'): self.callback_response_service_endpoint(callback_response, message) elif self.ij.has_feature('webhookresponsemarshall'): self.callback_response_marshall(callback_response, message) else: self.callback_response_webhook(callback_response, message)
[docs] def callback_response_webhook(self, callback_response: Any, message: dict): """Handle the different types of callback responses. * Dict - Playbook will not be launched and provided data will be used in the response to the client. * True - Playbook will be launched. * Else - Playbook will NOT be launched. Args: callback_response: The response from the webhook callback method. message: The message payload from the server topic. """ if isinstance(callback_response, dict): # webhook responses are for providers that require a subscription req/resp. self.publish_webhook_event_response(message, callback_response) elif callback_response is True: self.increment_metric('Hits') self.fire_event_publish( message.get('triggerId'), self.session_id, message.get('requestKey') ) # only required for testing in tcex framework self._tcex_testing(self.session_id, message.get('triggerId')) # capture fired status for testing framework self._tcex_testing_fired_events(self.session_id, True) else: self.increment_metric('Misses') # capture fired status for testing framework self._tcex_testing_fired_events(self.session_id, False)
[docs] def callback_response_marshall(self, callback_response: Any, message: dict): """Handle the different types of callback responses. # webhookResponseMarshall Feature App * Callable - Playbook will be launched and if marshall callback will be set to response. * True - Playbook will be launched. * Else - Playbook will NOT be launched. Args: callback_response: The response from the webhook callback method. message: The message payload from the server topic. """ fire_trigger = False if callable(callback_response): self.webhook_marshall_event_callback = callback_response fire_trigger = True # handle response the same a normal response self.callback_response_webhook(fire_trigger, message)
[docs] def callback_response_service_endpoint(self, callback_response: Any, message: dict): """Handle the different types of callback responses. # webhookServiceEndpoint Feature App For this feature the callback method must fire the event on it's own. * Dict - Playbook will not be launched and provided data will be used in the response to the client. * Else - Response will be set to default of statusCode=200, body=None, and headers=[]. Args: callback_response: The response from the webhook callback method. message: The message payload from the server topic. """ response = { 'body': None, 'headers': [], 'statusCode': 200, } if isinstance(callback_response, dict): # webhook responses are for providers that require a subscription req/resp. response.update(callback_response) self.publish_webhook_event_response(message, callback_response)
@property def command_map(self) -> dict: """Return the command map for the current Service type.""" command_map: dict = super().command_map command_map.update({'webhookevent': self.process_webhook_event_command}) command_map.update({'webhookmarshallevent': self.process_webhook_marshall_event_command}) return command_map
[docs] def process_webhook_event_command(self, message: dict): """Process the WebhookEvent command. .. code-block:: python :linenos: :lineno-start: 1 { "appId": 387, "command": "WebhookEvent", "triggerId": 1, "requestKey": "cd8c7a3a-7968-4b97-80c9-68b83a8ef1a1", "method": "GET", "headers": [ { "name": "Accept-Encoding", "value": "gzip, deflate, br" } ], "queryParams": [ { "name": "registration", "value": "true" } ] } Args: message: The message payload from the server topic. """ self._create_logging_handler() self.log.trace( f'feature=webhook-trigger-service, event=process-webhook-event, message={message}' ) # acknowledge webhook event (nothing is currently done with this message on the core side) self.publish_webhook_event_acknowledge(message) # get config using triggerId passed in WebhookEvent data config = None outputs = {} if not self.ij.has_feature('webhookserviceendpoint'): config: dict = self.configs.get(message.get('triggerId')) if config is None: self.log.error( '''feature=webhook-trigger-service, event=missing-config, ''' f'''trigger-id={message.get('triggerId')}''' ) return # get an instance of playbooks for App outputs: Union[list, str] = config.get('tc_playbook_out_variables') or [] if isinstance(outputs, str): outputs = outputs.split(',') # get a context aware pb instance for the App callback method playbook: object = self.tcex.pb(context=self.session_id, output_variables=outputs) try: body: Any = self.key_value_store.read(message.get('requestKey'), 'request.body') if body is not None: body = base64.b64decode(body).decode() # pylint: disable=not-callable callback_data = { 'body': body, 'headers': message.get('headers'), 'method': message.get('method'), 'params': message.get('queryParams'), } if self.ij.has_feature('webhookresponsemarshall') or self.ij.has_feature( 'webhookserviceendpoint' ): # add request_key arg when marshall or services endpoints feature is set (kwarg) callback_data.update({'request_key': message.get('requestKey')}) elif not self.ij.has_feature('webhookserviceendpoint'): # add optional inputs for "standard" and "marshall" webhook trigger callback_data.update( { 'config': config, 'playbook': playbook, 'trigger_id': message.get('triggerId'), } ) callback_response: Union[bool, Callable[..., Any], dict] = self.webhook_event_callback( **callback_data ) self.callback_response_handler(callback_response, message) except Exception as e: self.increment_metric('Errors') self.log.error( 'feature=webhook-trigger-service, event=webhook-callback-exception, ' f'error="""{e}"""' ) self.log.trace(traceback.format_exc()) finally: self.logger.remove_handler_by_name(self.thread_name)
[docs] def process_webhook_marshall_event_command(self, message: dict): """Process the WebhookMarshallEvent command. .. code-block:: python :linenos: :lineno-start: 1 { "appId": 95, "bodyVariable": "request.body", "command": "WebhookMarshallEvent", "headers": [ { "name": "Accept", "value": "*/*" } ], "requestKey": "c29927c8-b94d-4116-a397-e6eb7002f41c", "statusCode": 200, "triggerId": 1234 } Args: message: The message payload from the server topic. """ self._create_logging_handler() self.log.trace( 'feature=webhook-trigger-service, event=process-webhook-marshall-event, ' f'message={message}' ) # acknowledge webhook event (nothing is currently done with this message on the core side) self.publish_webhook_marshall_event_acknowledge(message) # get config using triggerId passed in WebhookMarshallEvent data config: dict = self.configs.get(message.get('triggerId')) if config is None: self.log.error( '''feature=webhook-trigger-service, event=missing-config, ''' f'''trigger-id={message.get('triggerId')}''' ) return body = None request_key: str = message.get('requestKey') try: body: Any = self.key_value_store.read(request_key, 'request.body') if body is not None: body = base64.b64decode(body).decode() except Exception as e: self.increment_metric('Errors') self.log.error( 'feature=webhook-trigger-service, event=webhook-marshall-callback-exception, ' f'error="""{e}"""' ) self.log.trace(traceback.format_exc()) # set default value for callback response to the data returned from the playbook response = { 'body': body, 'headers': message.get('headers'), 'status_code': message.get('statusCode'), } if callable(self.webhook_marshall_event_callback): try: # call callback method # pylint: disable=not-callable callback_response: Optional[dict] = self.webhook_marshall_event_callback( body=body, headers=message.get('headers'), request_key=request_key, status_code=message.get('statusCode'), trigger_id=message.get('triggerId'), ) if isinstance(callback_response, dict): response.update(callback_response) except Exception as e: self.increment_metric('Errors') self.log.error( 'feature=webhook-trigger-service, event=webhook-marshall-callback-exception, ' f'error="""{e}"""' ) self.log.trace(traceback.format_exc()) finally: self.logger.remove_handler_by_name(self.thread_name) # webhook responses are for providers that require a subscription req/resp. self.publish_webhook_event_response(message, response)
[docs] def publish_webhook_event_acknowledge(self, message: dict): """Publish the WebhookEventResponse message. Args: message: The message from the broker. """ self.message_broker.publish( json.dumps( { 'command': 'Acknowledged', 'requestKey': message.get('requestKey'), 'triggerId': message.get('triggerId'), 'type': 'WebhookEvent', } ), self.args.tc_svc_client_topic, )
[docs] def publish_webhook_marshall_event_acknowledge(self, message: dict): """Publish the WebhookEventResponse message. .. code-block:: python :linenos: :lineno-start: 1 { "command": "Acknowledged", "requestKey": "cd8c7a3a-7968-4b97-80c9-68b83a8ef1a1", "triggerId": 1, "type": "WebhookMarshallResponse" } Args: message: The message from the broker. """ self.message_broker.publish( json.dumps( { 'command': 'Acknowledged', 'requestKey': message.get('requestKey'), 'triggerId': message.get('triggerId'), 'type': 'WebhookMarshallEvent', } ), self.args.tc_svc_client_topic, )
[docs] def publish_webhook_event_response(self, message: dict, callback_response: dict): """Publish the WebhookEventResponse message. Args: message: The message from the broker. callback_response: The data from the callback method. playbook: Configure instance of Playbook used to write body. """ playbook: object = self.tcex.pb(context=self.session_id, output_variables=[]) # write response body to redis if callback_response.get('body') is not None: playbook.create_string( 'response.body', base64.b64encode(callback_response.get('body').encode('utf-8')).decode('utf-8'), ) # publish response self.message_broker.publish( json.dumps( { 'sessionId': self.session_id, # session/context 'requestKey': message.get('requestKey'), 'command': 'WebhookEventResponse', 'triggerId': message.get('triggerId'), 'bodyVariable': 'response.body', 'headers': callback_response.get('headers', []), 'statusCode': callback_response.get('status_code', 200), } ), self.args.tc_svc_client_topic, )