Source code for tcex.logger.logger

"""TcEx Framework Logger module"""
# standard library
import logging
import os
import pathlib
import platform
import sys
from typing import TYPE_CHECKING, Optional
from urllib.parse import urlsplit

# third-party
from certifi import where

# first-party
# pylint: disable=no-name-in-module
from tcex.app_config.install_json import InstallJson
from tcex.logger.api_handler import ApiHandler, ApiHandlerFormatter
from tcex.logger.cache_handler import CacheHandler
from tcex.logger.pattern_file_handler import PatternFileHandler
from tcex.logger.rotating_file_handler_custom import RotatingFileHandlerCustom
from tcex.logger.thread_file_handler import ThreadFileHandler
from tcex.logger.trace_logger import TraceLogger

if TYPE_CHECKING:
    # third-party
    from pydantic import BaseModel
    from requests import Session


[docs]class Logger: """Framework logger module.""" def __init__(self, logger_name: str): """Initialize Class Properties.""" self.logger_name = logger_name # properties self.ij = InstallJson() @property def _logger(self) -> 'logging.Logger': """Return the logger. The inputs.model property is not available in init.""" logging.setLoggerClass(TraceLogger) logger = logging.getLogger(self.logger_name) logger.setLevel(logging.TRACE) return logger @property def _formatter(self): """Return log formatter.""" tx_format = ( '%(asctime)s - %(name)s - %(levelname)8s - %(message)s ' '(%(filename)s:%(funcName)s:%(lineno)d)' ) return logging.Formatter(tx_format) @property def _formatter_thread_name(self): """Return log formatter.""" tx_format = ( '%(asctime)s - %(name)s - %(levelname)8s - %(message)s ' '(%(filename)s:%(funcName)s:%(lineno)d:%(threadName)s)' ) return logging.Formatter(tx_format)
[docs] def handler_exist(self, handler_name: str) -> bool: """Remove a file handler by name. Args: handler_name: The handler name to remove. Returns: bool: True if handler current exists """ for h in self._logger.handlers: if h.get_name() == handler_name: return True return False
@property def log(self) -> 'logging.Logger': """Return logger.""" return self._logger
[docs] @staticmethod def log_level(level: str) -> int: """Return proper level from string. Args: level: The logging level. Returns: int: The logging level as an int. """ level = level or 'debug' return logging.getLevelName(level.upper())
[docs] def remove_handler_by_name(self, handler_name: str): """Remove a file handler by name. Args: handler_name: The handler name to remove. """ for h in self._logger.handlers: if h.get_name() == handler_name: self._logger.removeHandler(h) break
[docs] def replay_cached_events(self, handler_name: Optional[str] = 'cache'): """Replay cached log events and remove handler.""" for h in self._logger.handlers: if h.get_name() == handler_name: events = h.events self._logger.removeHandler(h) for event in events: self._logger.handle(event) break # remove the cache handler self.remove_handler_by_name(handler_name=handler_name)
[docs] def shutdown(self): """Close all handlers. Args: handler_name (str): The handler name to remove. """ for h in self._logger.handlers: self._logger.removeHandler(h)
[docs] def update_handler_level(self, level: str): """Update all handlers log level. Args: level: The logging level. """ level = self.log_level(level) # update all handler logging levels for h in self._logger.handlers: h.setLevel(level)
# # handlers #
[docs] def add_api_handler( self, session_tc: 'Session', name: Optional[str] = 'api', level: Optional[str] = None ): """Add API logging handler. Args: session_tc: An configured instance of request.Session with TC API Auth. name: The name of the handler. level: The level value as a string. """ self.remove_handler_by_name(name) api = ApiHandler(session_tc) api.set_name(name) api.setLevel(self.log_level(level)) api.setFormatter(ApiHandlerFormatter()) self._logger.addHandler(api)
[docs] def add_cache_handler(self, name: str): """Add cache logging handler. Args: name: The name of the handler. """ self.remove_handler_by_name(name) cache = CacheHandler() cache.set_name(name) # set logging level to INFO as event will typically # be only those that happen before args are processed cache.setLevel(self.log_level('trace')) cache.setFormatter(self._formatter) self._logger.addHandler(cache)
[docs] def add_pattern_file_handler( self, name: str, filename: str, level: int, path: str, pattern: str, formatter: Optional[str] = None, handler_key: Optional[str] = None, max_log_count: Optional[int] = 100, thread_key: Optional[str] = None, ): """Add custom file logging handler. This handler is intended for service Apps that need to log events based on the current session id. All log event would be in context to a single playbook execution. Args: name: The name of the handler. filename: The name of the logfile. level: The logging level. path: The path for the logfile. formatter: The logging formatter to use. handler_key: Additional properties for handler to thread condition. max_log_count: The maximum number of logs to keep that match the provided pattern. pattern: The pattern used to match the log files. thread_key: Additional properties for handler to thread condition. """ self.remove_handler_by_name(name) formatter = formatter or self._formatter # create customized handler fh = PatternFileHandler( filename=os.path.join(path, filename), pattern=pattern, max_log_count=max_log_count ) fh.set_name(name) fh.setFormatter(formatter) fh.setLevel(self.log_level(level)) # add keys for halder emit method conditional fh.handler_key = handler_key fh.thread_key = thread_key self._logger.addHandler(fh)
[docs] def add_rotating_file_handler( self, name: str, filename: str, path: str, backup_count: int, max_bytes: int, level: str, formatter: Optional[str] = None, mode: Optional[str] = 'a', ): """Add custom file logging handler. Args: name: The name of the handler. filename: The name of the logfile. path: The path for the logfile. backup_count: The maximum # of backup files. max_bytes: The max file size before rotating. level: The logging level. formatter: The logging formatter to use. mode: The write mode for the file. """ self.remove_handler_by_name(name) formatter = formatter or self._formatter_thread_name # create customized handler fh = RotatingFileHandlerCustom( os.path.join(path, filename), backupCount=backup_count, maxBytes=max_bytes, mode=mode ) fh.set_name(name) fh.setFormatter(formatter) fh.setLevel(self.log_level(level)) self._logger.addHandler(fh)
[docs] def add_stream_handler( self, name: Optional[str] = 'sh', formatter: Optional[str] = None, level: Optional[int] = None, ): """Add stream logging handler. Args: name: The name of the handler. formatter: The logging formatter to use. level: The logging level. """ self.remove_handler_by_name(name) formatter = formatter or self._formatter # create handler sh = logging.StreamHandler() sh.set_name(name) sh.setFormatter(formatter) sh.setLevel(self.log_level(level)) self._logger.addHandler(sh)
[docs] def add_thread_file_handler( self, name: str, filename: str, level: str, path: str, backup_count: Optional[int] = 0, formatter: Optional[str] = None, handler_key: Optional[str] = None, max_bytes: Optional[int] = 0, mode: Optional[str] = 'a', thread_key: Optional[str] = None, ): """Add custom file logging handler. This handler is intended for service Apps that need to log events based on the current trigger id. All log events would be in context to a single playbook. Args: name: The name of the handler. filename: The name of the logfile. level: The logging level. path: The path for the logfile. backup_count: The maximum # of backup files. formatter: The logging formatter to use. handler_key: Additional properties for handler to thread condition. max_bytes: The max file size before rotating. mode: The write mode for the file. thread_key: Additional properties for handler to thread condition. """ self.remove_handler_by_name(name) formatter = formatter or self._formatter_thread_name # create customized handler fh = ThreadFileHandler( os.path.join(path, filename), backupCount=backup_count, maxBytes=max_bytes, mode=mode ) fh.set_name(name) fh.setFormatter(formatter) fh.setLevel(self.log_level(level)) # add keys for halder emit method conditional fh.handler_key = handler_key fh.thread_key = thread_key self._logger.addHandler(fh)
# # App info logging #
[docs] def log_info(self, inputs: 'BaseModel'): """Send System and App data to logs. Args: inputs: The inputs model. """ self._log_app_data() self._log_platform() self._log_python_version() self._log_environment() self._log_tcex_version() self._log_tc_proxy(inputs)
[docs] def _log_app_data(self): """Log the App data information as a best effort.""" try: self.log.info(f'app-name="{self.ij.model.display_name}"') if self.ij.model.app_id is not None: self.log.info(f'app-id={self.ij.model.app_id}') if self.ij.model.features: self.log.info(f'''app-features={','.join(self.ij.model.features)}''') self.log.info(f'app-minimum-threatconnect-version={self.ij.model.min_server_version}') self.log.info(f'app-runtime-level={self.ij.model.runtime_level}') app_version = f'app-version={self.ij.model.program_version}' if self.ij.model.commit_hash is not None: app_version += f', app-commit-hash={self.ij.model.commit_hash}' self.log.info(app_version) # version and commit hash except Exception: # nosec; pragma: no cover pass
[docs] def _log_environment(self): """Log the current environment variables.""" self.log.info(f'''env-all-proxy={self._sanitize_proxy_url(os.getenv('ALL_PROXY'))}''') self.log.info(f'''env-http-proxy={self._sanitize_proxy_url(os.getenv('HTTP_PROXY'))}''') self.log.info(f'''env-https-proxy={self._sanitize_proxy_url(os.getenv('HTTPS_PROXY'))}''') self.log.info(f'''env-no-proxy={os.getenv('NO_PROXY')}''') self.log.info(f'''env-curl-ca-bundle={os.getenv('CURL_CA_BUNDLE')}''') self.log.info(f'''env-request-ca-bundle={os.getenv('REQUESTS_CA_BUNDLE')}''') self.log.info(f'''env-ssl-cert-file={os.getenv('SSL_CERT_FILE')}''') # log certifi where self.log.info(f'''certifi-where={where()}''')
[docs] def _log_platform(self): """Log the current Platform.""" self.log.info(f'platform="{platform.platform()}"') self.log.info(f'pid={os.getpid()}')
[docs] def _log_python_version(self): """Log the current Python version.""" self.log.info( f'python-version={sys.version_info.major}.' f'{sys.version_info.minor}.' f'{sys.version_info.micro}' )
[docs] def _log_tc_proxy(self, inputs: 'BaseModel'): """Log the proxy settings. Args: inputs: The inputs model. """ if inputs.tc_proxy_tc: self.log.info(f'proxy-server-tc={inputs.tc_proxy_host}:{inputs.tc_proxy_port}')
[docs] def _log_tcex_version(self): """Log the current TcEx version number.""" app_path = str(pathlib.Path().parent.absolute()) full_path = str(pathlib.Path(__file__).parent.absolute()) tcex_path = os.path.dirname(full_path.replace(app_path, '')) self.log.info(f'tcex-version={__import__(__name__).__version__}, tcex-path="{tcex_path}"')
[docs] @staticmethod def _sanitize_proxy_url(url: Optional[str]) -> Optional[str]: """Sanitize a proxy url, masking username and password.""" if url is not None: url_data = urlsplit(url) if all([url_data.username, url_data.password]): url = url.replace(f'{url_data.username}:{url_data.password}', '***:***') return url