"""TcEx Framework API Service module."""
# standard library
import concurrent.futures
import json
import sys
import threading
import traceback
from functools import reduce
from io import BytesIO
from typing import Any, Callable, Optional
# first-party
from tcex.input.field_types.sensitive import Sensitive
from tcex.services.common_service import CommonService
[docs]class ApiService(CommonService):
"""TcEx Framework API Service module."""
def __init__(self, tcex: object):
"""Initialize the Class properties.
Args:
tcex: Instance of TcEx.
"""
super().__init__(tcex)
# properties
self._metrics = {'Errors': 0, 'Requests': 0, 'Responses': 0}
# config callbacks
self.api_event_callback = None
# thread pool to handle HTTP requests
self.request_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=5)
@property
def command_map(self) -> dict:
"""Return the command map for the current Service type."""
command_map = super().command_map
command_map.update({'runservice': self.process_run_service_command})
return command_map
[docs] def process_run_service_response(self, *args, **kwargs):
"""Handle service event responses.
('200 OK', [('content-type', 'application/json'), ('content-length', '103')])
"""
self.log.info('feature=api-service, event=response-received, status=waiting-for-body')
kwargs.get('event').wait(30) # wait for thread event - (set on body write)
self.log.trace(f'feature=api-service, event=response, args={args}')
try:
status_code, status = args[0].split(' ', 1)
response = {
'bodyVariable': 'response.body',
'command': 'Acknowledged',
'headers': self.format_response_headers(args[1]),
'requestKey': kwargs.get('request_key'), # pylint: disable=cell-var-from-loop
'status': status,
'statusCode': status_code,
'type': 'RunService',
}
self.log.info('feature=api-service, event=response-sent')
self.message_broker.publish(json.dumps(response), self.args.tc_svc_client_topic)
self.increment_metric('Responses')
except Exception as e:
self.log.error(
f'feature=api-service, event=failed-creating-response-body, error="""{e}"""'
)
self.log.trace(traceback.format_exc())
self.increment_metric('Errors')
[docs] def process_run_service_command(self, message: dict):
"""Process the RunService command.
.. code-block:: python
:linenos:
:lineno-start: 1
{
"command": "RunService",
"apiToken": "abc123",
"bodyVariable": "request.body",
"headers": [ { key/value pairs } ],
"method": "GET",
"queryParams": [ { key/value pairs } ],
"requestKey": "123abc",
"userConfig": [{
"name": "tlpExportSetting",
"value": "TLP:RED"
}],
}
Args:
message: The message payload from the server topic.
"""
# register config apiToken (before any logging)
self.token.register_token(
self.thread_name, Sensitive(message.get('apiToken')), message.get('expireSeconds')
)
self.log.info(f'feature=api-service, event=runservice-command, message="{message}"')
# thread event used to block response until body is written
event = threading.Event()
# process message
request_key: str = message.get('requestKey')
body = None
try:
# read body from redis
body_variable: str = message.pop('bodyVariable', None)
if body_variable is not None:
body: Any = self.key_value_store.read(request_key, body_variable)
if body is not None:
# for API service the data in Redis is not b64 encoded
body = BytesIO(body)
except Exception as e:
self.log.error(f'feature=api-service, event=failed-reading-body, error="""{e}"""')
self.log.trace(traceback.format_exc())
headers: dict = self.format_request_headers(message.pop('headers'))
method: str = message.pop('method')
params: dict = message.pop('queryParams')
path: str = message.pop('path')
try:
environ = {
'wsgi.errors': sys.stderr,
'wsgi.input': body,
'wsgi.multithread': True,
'wsgi.multiprocess': False,
'wsgi.run_once': True,
'wsgi.url_scheme': 'https',
'wsgi.version': (1, 0),
'PATH_INFO': path,
'QUERY_STRING': self.format_query_string(params),
'REMOTE_ADDR': message.get('remoteAddress', ''),
# 'REMOTE_HOST': message.get('remoteAddress', ''),
'REQUEST_METHOD': method.upper(),
'SCRIPT_NAME': '/',
'SERVER_NAME': '',
'SERVER_PORT': '',
'SERVER_PROTOCOL': 'HTTP/1.1',
}
# Add user config for TAXII or other service that supports the data type
environ['user_config'] = message.get('userConfig', [])
# add headers
if headers.get('content-type') is not None:
environ['CONTENT_TYPE'] = headers.pop('content-type')
# add content length
if headers.get('content-length') is not None:
environ['CONTENT_LENGTH'] = headers.pop('content-length')
for header, value in headers.items():
environ[f'HTTP_{header}'.upper()] = value
# make values from message available in env in camel
# case (e.g., falcon -> req.env.get('request_url))
for key, value in message.items():
if key not in environ and self.tcex.utils.camel_to_snake(key) not in environ:
environ[self.tcex.utils.camel_to_snake(key)] = value
self.log.trace(f'feature=api-service, environ={environ}')
self.increment_metric('Requests')
except Exception as e:
self.log.error(f'feature=api-service, event=failed-building-environ, error="""{e}"""')
self.log.trace(traceback.format_exc())
self.increment_metric('Errors')
return # stop processing
def response_handler(*args, **kwargs): # pylint: disable=unused-argument
"""Handle WSGI Response"""
kwargs['event'] = event # add event to kwargs for blocking
kwargs['request_key'] = request_key
self.service_thread(
name='response-handler',
target=self.process_run_service_response,
args=args,
kwargs=kwargs,
)
if callable(self.api_event_callback):
try:
body_data: Any = self.api_event_callback( # pylint: disable=not-callable
environ, response_handler
)
if body_data:
body_data = reduce(lambda a, b: a + b, body_data)
# write body to Redis
if body_data:
self.key_value_store.create(request_key, 'response.body', body_data)
# set thread event to True to trigger response
self.log.info('feature=api-service, event=response-body-written')
# release event lock
event.set()
except Exception as e:
self.log.error(
f'feature=api-service, event=api-event-callback-failed, error="""{e}""".'
)
self.log.trace(traceback.format_exc())
self.increment_metric('Errors')
# unregister config apiToken
self.token.unregister_token(self.thread_name)
[docs] def process_shutdown_command(self, message: dict):
"""Handle shutdown command."""
super().process_shutdown_command(message)
self.request_thread_pool.shutdown()
[docs] def service_thread(
self,
name: str,
target: Callable[[], bool],
args: Optional[tuple] = None,
kwargs: Optional[dict] = None,
session_id: Optional[str] = None,
trigger_id: Optional[int] = None,
):
"""If this is a run-service command, run it with the thread pool.
For everything else, use parent's implementation.
Args:
name: The name of the thread.
target: The method to call for the thread.
args: The args to pass to the target method.
kwargs: Additional args.
session_id: The current session id.
trigger_id: The current trigger id.
"""
args = args or ()
kwargs = kwargs or {}
def _thread_wrapper():
t = threading.current_thread()
t.setName(name)
t.session_id = session_id
t.trigger_id = trigger_id
target(*args, **kwargs)
if target is self.process_run_service_command:
self.log.info(f'feature=service, event=request-thread-creation, name={name}')
self.request_thread_pool.submit(_thread_wrapper)
else:
super().service_thread(name, target, args, kwargs, session_id, trigger_id)