Source code for tcex.tcex

"""TcEx Framework"""

# standard library
import inspect
import logging
import os
import platform
import signal
import threading
from typing import TYPE_CHECKING, Dict, Optional, Union

# third-party
from redis import Redis
from requests import Session

# first-party
from tcex.api import API
from tcex.api.tc.utils.threat_intel_utils import ThreatIntelUtils
from tcex.api.tc.v2.v2 import V2
from tcex.api.tc.v3.v3 import V3
from tcex.app_config.install_json import InstallJson
from tcex.app_feature import AdvancedRequest
from tcex.backports import cached_property
from tcex.exit.exit import ExitCode, ExitService
from tcex.input.field_types.sensitive import Sensitive
from tcex.input.input import Input
from tcex.key_value_store import KeyValueApi, KeyValueMock, KeyValueRedis, RedisClient
from tcex.logger.logger import Logger  # pylint: disable=no-name-in-module
from tcex.playbook import Playbook
from tcex.pleb.proxies import proxies
from tcex.pleb.registry import registry
from tcex.pleb.scoped_property import scoped_property
from tcex.sessions.auth.tc_auth import TcAuth
from tcex.sessions.external_session import ExternalSession
from tcex.sessions.tc_session import TcSession
from tcex.tokens import Tokens
from tcex.utils import Utils
from tcex.utils.file_operations import FileOperations

if TYPE_CHECKING:
    # first-party
    from tcex.logger.trace_logger import TraceLogger  # pylint: disable=no-name-in-module
    from tcex.services.api_service import ApiService
    from tcex.services.common_service_trigger import CommonServiceTrigger
    from tcex.services.webhook_trigger_service import WebhookTriggerService
    from tcex.sessions.auth.hmac_auth import HmacAuth
    from tcex.sessions.auth.token_auth import TokenAuth


[docs]class TcEx: """Provides basic functionality for all types of TxEx Apps. Args: config (dict, kwargs): A dictionary containing configuration items typically used by external Apps. config_file (str, kwargs): A filename containing JSON configuration items typically used by external Apps. """ def __init__(self, **kwargs): """Initialize Class Properties.""" # catch interrupt signals specifically based on thread name signal.signal(signal.SIGINT, self._signal_handler) if platform.system() != 'Windows': signal.signal(signal.SIGHUP, self._signal_handler) signal.signal(signal.SIGTERM, self._signal_handler) # Property defaults self._config: dict = kwargs.get('config') or {} self._log = None self._jobs = None self._redis_client = None self._service = None self.ij = InstallJson() self.main_os_pid = os.getpid() # init inputs self.inputs = Input(self._config, kwargs.get('config_file')) # add methods to registry registry.add_method(self.inputs.resolve_variable) # add methods to registry registry.register(self) registry.add_service(Input, self.inputs) # log standard App info early so it shows at the top of the logfile self.logger.log_info(self.inputs.model_unresolved)
[docs] def _signal_handler(self, signal_interrupt: int, _): """Handle signal interrupt.""" call_file: str = os.path.basename(inspect.stack()[1][0].f_code.co_filename) call_module: str = inspect.stack()[1][0].f_globals['__name__'].lstrip('Functions.') call_line: int = inspect.stack()[1][0].f_lineno self.log.error( f'App interrupted - file: {call_file}, method: {call_module}, line: {call_line}.' ) exit_code = ExitCode.SUCCESS if threading.current_thread().name == 'MainThread' and signal_interrupt in (2, 15): exit_code = ExitCode.FAILURE self.exit(exit_code, 'The App received an interrupt signal and will now exit.')
@property def _user_agent(self): """Return a User-Agent string.""" return { 'User-Agent': ( f'TcEx/{__import__(__name__).__version__}, ' f'{self.ij.model.display_name}/{self.ij.model.program_version}' ) }
[docs] def advanced_request( self, session: Session, output_prefix: str, timeout: Optional[int] = 600, ) -> 'AdvancedRequest': """Return instance of AdvancedRequest. Args: session: An instance of requests.Session. output_prefix: A value to prepend to outputs. timeout: The number of second before timing out the request. """ return AdvancedRequest(self.inputs, self.playbook, session, output_prefix, timeout)
@property def api(self) -> 'API': """Return instance of Threat Intel Utils.""" return API(self.inputs, self.session_tc)
[docs] def exit(self, code: Optional[ExitCode] = None, msg: Optional[str] = None): """Application exit method with proper exit code The method will run the Python standard sys.exit() with the exit code previously defined via :py:meth:`~tcex.tcex.TcEx.exit_code` or provided during the call of this method. Args: code: The exit code value for the app. msg: A message to log and add to message tc output. """ # get correct code self.exit_service.exit(code, msg) # pylint: disable=no-member
@property def exit_code(self) -> 'ExitCode': """Return the current exit code.""" return self.exit_service.exit_code # pylint: disable=no-member @exit_code.setter def exit_code(self, code: 'ExitCode'): """Set the App exit code. For TC Exchange Apps there are 3 supported exit codes. * 0 indicates a normal exit * 1 indicates a failure during execution * 3 indicates a partial failure Args: code (int): The exit code value for the app. """ self.exit_service.exit_code = code @registry.factory(ExitService) @scoped_property def exit_service(self) -> 'ExitService': """Return an ExitService object.""" # TODO: [high] @cblades - inputs being required for exit prevents AOT from exiting return self.get_exit_service(self.inputs) @cached_property def file_operations(self) -> 'FileOperations': # pylint: disable=no-self-use """Include the Utils module.""" return FileOperations( out_path=self.inputs.model_unresolved.tc_out_path, temp_path=self.inputs.model_unresolved.tc_temp_path, )
[docs] @staticmethod def get_exit_service(inputs) -> 'ExitService': """Create an ExitService object.""" return ExitService(inputs)
[docs] def get_playbook( self, context: Optional[str] = None, output_variables: Optional[list] = None ) -> 'Playbook': """Return a new instance of playbook module. Args: context: The KV Store context/session_id. For PB Apps the context is provided on startup, but for service Apps each request gets a different context. output_variables: The requested output variables. For PB Apps outputs are provided on startup, but for service Apps each request gets different outputs. """ return Playbook(self.key_value_store, context, output_variables)
[docs] @staticmethod def get_redis_client( host: str, port: int, db: int = 0, blocking_pool: bool = False, **kwargs ) -> Redis: """Return a *new* instance of Redis client. For a full list of kwargs see https://redis-py.readthedocs.io/en/latest/#redis.Connection. Args: host: The REDIS host. Defaults to localhost. port: The REDIS port. Defaults to 6379. db: The REDIS db. Defaults to 0. blocking_pool: Use BlockingConnectionPool instead of ConnectionPool. **kwargs: Additional keyword arguments. Keyword Args: errors (str): The REDIS errors policy (e.g. strict). max_connections (int): The maximum number of connections to REDIS. password (Sensitive): The REDIS password. socket_timeout (int): The REDIS socket timeout. timeout (int): The REDIS Blocking Connection Pool timeout value. username (str): The REDIS username. """ # get value from Sensitive value before passing to Redis password = kwargs.get('password') kwargs['password'] = password.value if isinstance(password, Sensitive) else password return RedisClient( host=host, port=port, db=db, blocking_pool=blocking_pool, **kwargs ).client
[docs] def get_session_tc( self, auth: Optional[Union['HmacAuth', 'TokenAuth', 'TcAuth']] = None, base_url: Optional[str] = None, log_curl: Optional[bool] = None, proxies: Optional[Dict[str, str]] = None, # pylint: disable=redefined-outer-name proxies_enabled: Optional[bool] = None, verify: Optional[Union[bool, str]] = None, ) -> 'TcSession': """Return an instance of Requests Session configured for the ThreatConnect API. No args are required to get a working instance of TC Session instance. This method allows for getting a new instance of TC Session instance. This can be very useful when connecting between multiple TC instances (e.g., migrating data). """ if log_curl is None: log_curl = self.inputs.model_unresolved.tc_log_curl if proxies_enabled is None: proxies_enabled = self.inputs.model_unresolved.tc_proxy_tc if verify is None: verify = self.inputs.model_unresolved.tc_verify if self.ij.is_external_app is True: auth = auth or TcAuth( tc_api_access_id=self.inputs.model_unresolved.tc_api_access_id, tc_api_secret_key=self.inputs.model_unresolved.tc_api_secret_key, ) else: auth = auth or TcAuth( tc_api_access_id=self.inputs.model_unresolved.tc_api_access_id, tc_api_secret_key=self.inputs.model_unresolved.tc_api_secret_key, tc_token=self.token, ) return TcSession( auth=auth, base_url=base_url or self.inputs.model_unresolved.tc_api_path, log_curl=log_curl, proxies=proxies or self.proxies, proxies_enabled=proxies_enabled, user_agent=self._user_agent, verify=verify, )
[docs] def get_session_external(self) -> 'ExternalSession': """Return an instance of Requests Session configured for the ThreatConnect API.""" _session_external = ExternalSession() # add User-Agent to headers _session_external.headers.update(self._user_agent) # add proxy support if requested if self.inputs.model_unresolved.tc_proxy_external: _session_external.proxies = self.proxies self.log.info( f'Using proxy host {self.inputs.model_unresolved.tc_proxy_host}:' f'{self.inputs.model_unresolved.tc_proxy_port} for external session.' ) if self.inputs.model_unresolved.tc_log_curl: _session_external.log_curl = True return _session_external
# def get_ti(self) -> 'ThreatIntelligence': # """Include the Threat Intel Module.""" # return ThreatIntelligence(session=self.get_session_tc()) @registry.factory('KeyValueStore') @scoped_property def key_value_store(self) -> Union['KeyValueApi', 'KeyValueRedis']: """Return the correct KV store for this execution. The TCKeyValueAPI KV store is limited to two operations (create and read), while the Redis kvstore wraps a few other Redis methods. """ if self.inputs.model_unresolved.tc_kvstore_type == 'Redis': return KeyValueRedis(self.redis_client) if self.inputs.model_unresolved.tc_kvstore_type == 'TCKeyValueAPI': return KeyValueApi(self.session_tc) if self.inputs.model_unresolved.tc_kvstore_type == 'Mock': self.log.warning( 'Using mock key-value store. ' 'This should *never* happen when running in-platform.' ) return KeyValueMock() raise RuntimeError( f'Invalid KV Store Type: ({self.inputs.model_unresolved.tc_kvstore_type})' ) @property def log(self) -> 'TraceLogger': """Return a valid logger.""" if self._log is None: self._log = self.logger.log return self._log @log.setter def log(self, log: object): """Return a valid logger.""" if isinstance(log, logging.Logger): self._log = log @cached_property def logger(self) -> 'Logger': """Return logger.""" _logger = Logger(logger_name='tcex') # set logger to prevent recursion issue on get_session_tc self._log = _logger.log # add api handler if ( self.inputs.contents.get('tc_token') is not None and self.inputs.contents.get('tc_log_to_api') is True ): _logger.add_api_handler( session_tc=self.get_session_tc(), level=self.inputs.model_unresolved.tc_log_level ) # add rotating log handler _logger.add_rotating_file_handler( name='rfh', filename=self.inputs.model_unresolved.tc_log_file, path=self.inputs.model_unresolved.tc_log_path, backup_count=self.inputs.model_unresolved.tc_log_backup_count, max_bytes=self.inputs.model_unresolved.tc_log_max_bytes, level=self.inputs.model_unresolved.tc_log_level, ) # set logging level _logger.update_handler_level(level=self.inputs.model_unresolved.tc_log_level) _logger.log.setLevel(_logger.log_level(self.inputs.model_unresolved.tc_log_level)) # replay cached log events _logger.replay_cached_events(handler_name='cache') return _logger @registry.factory(Playbook) @scoped_property def playbook(self) -> 'Playbook': """Return an instance of Playbooks module. This property defaults context and outputvariables to arg values. Returns: tcex.playbook.Playbooks: An instance of Playbooks """ return self.get_playbook( context=self.inputs.model_unresolved.tc_playbook_kvstore_context, output_variables=self.inputs.model_unresolved.tc_playbook_out_variables, ) @cached_property def proxies(self) -> dict: """Format the proxy configuration for Python Requests module. Generates a dictionary for use with the Python Requests module format when proxy is required for remote connections. **Example Response** :: {"http": "http://user:[email protected]:3128/"} Returns: (dict): Dictionary of proxy settings """ return proxies( proxy_host=self.inputs.model_unresolved.tc_proxy_host, proxy_port=self.inputs.model_unresolved.tc_proxy_port, proxy_user=self.inputs.model_unresolved.tc_proxy_username, proxy_pass=self.inputs.model_unresolved.tc_proxy_password, ) @registry.factory(RedisClient) @scoped_property def redis_client(self) -> Redis: """Return redis client instance configure for Playbook/Service Apps.""" return self.get_redis_client( host=self.inputs.contents.get('tc_kvstore_host'), port=self.inputs.contents.get('tc_kvstore_port'), db=0, username=self.inputs.contents.get('tc_kvstore_user'), password=self.inputs.contents.get('tc_kvstore_pass'), )
[docs] def results_tc(self, key: str, value: str): """Write data to results_tc file in TcEX specified directory. The TcEx platform support persistent values between executions of the App. This method will store the values for TC to read and put into the Database. Args: key: The data key to be stored. value: The data value to be stored. """ if os.access(self.inputs.model_unresolved.tc_out_path, os.W_OK): results_file = f'{self.inputs.model_unresolved.tc_out_path}/results.tc' else: results_file = 'results.tc' new = True # ensure file exists open(results_file, 'a').close() # pylint: disable=consider-using-with with open(results_file, 'r+') as fh: results = '' for line in fh.read().strip().split('\n'): if not line: continue try: k, v = line.split(' = ') except ValueError: # handle null/empty value (e.g., "name =") k, v = line.split(' =') if k == key: v = value new = False if v is not None: results += f'{k} = {v}\n' if new and value is not None: # indicates the key/value pair didn't already exist results += f'{key} = {value}\n' fh.seek(0) fh.write(results) fh.truncate()
@cached_property def service(self) -> Union['ApiService', 'CommonServiceTrigger', 'WebhookTriggerService']: """Include the Service Module.""" if self.ij.model.is_api_service_app: from .services import ApiService as Service elif self.ij.model.is_trigger_app and not self.ij.model.is_webhook_trigger_app: from .services import CommonServiceTrigger as Service elif self.ij.model.is_webhook_trigger_app: from .services import WebhookTriggerService as Service else: self.exit(1, 'Could not determine the service type.') return Service(self) @registry.factory(TcSession) @scoped_property def session_tc(self) -> 'TcSession': """Return an instance of Requests Session configured for the ThreatConnect API.""" return self.get_session_tc() @scoped_property def session_external(self) -> 'ExternalSession': """Return an instance of Requests Session configured for the ThreatConnect API.""" return self.get_session_external()
[docs] def set_exit_code(self, exit_code: int): """Set the exit code (registry)""" self.exit_code = exit_code
@registry.factory(Tokens, singleton=True) @cached_property def token(self) -> 'Tokens': """Return token object.""" _proxies = None if self.inputs.model_unresolved.tc_proxy_tc is True: _proxies = self.proxies _tokens = Tokens( self.inputs.model_unresolved.tc_api_path, self.inputs.model_unresolved.tc_verify, _proxies, ) # register token for Apps that pass token on start if all( [self.inputs.model_unresolved.tc_token, self.inputs.model_unresolved.tc_token_expires] ): _tokens.register_token( key=threading.current_thread().name, token=self.inputs.model_unresolved.tc_token, expires=self.inputs.model_unresolved.tc_token_expires, ) return _tokens @property def ti_utils(self) -> 'ThreatIntelUtils': """Return instance of Threat Intel Utils.""" return ThreatIntelUtils(self.session_tc) @cached_property def utils(self) -> 'Utils': # pylint: disable=no-self-use """Include the Utils module.""" return Utils() @cached_property def v2(self) -> 'V2': """Return a case management instance.""" return V2(self.inputs, self.session_tc) @cached_property def v3(self) -> 'V3': """Return a case management instance.""" return V3(self.session_tc)