PATH:
usr
/
share
/
lve
/
dbgovernor
/
scripts
/
Editing: sentry_daemon.py
#!/opt/cloudlinux/venv/bin/python3 # coding:utf-8 # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2024 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT # import os import sys import time import signal import sentry_sdk import sentry_sdk_wrapper import logging import glob import re DAEMON_INTERVAL = 10 SENTRY_DEPOT_ROOT = "/var/lve/dbgovernor/logging/sentry-depot" SENTRY_DEPOT_DB_GOVERNOR = SENTRY_DEPOT_ROOT + "/db_governor" SENTRY_DEPOT_MYSQLD = SENTRY_DEPOT_ROOT + "/mysqld" SENTRY_DEPOT_EXT = ".txt" DB_GOVERNOR_LOGS_WILDCARD = SENTRY_DEPOT_DB_GOVERNOR + "/*" + SENTRY_DEPOT_EXT MYSQLD_LOGS_WILDCARD = SENTRY_DEPOT_MYSQLD + "/*" + SENTRY_DEPOT_EXT class SentryDaemon: """ A daemon process to forward 'db_governor' and extended 'mysqld' logs to Sentry. """ def __init__(self, db_governor_logs_wildcard, mysqld_logs_wildcard): """ Initializes SentryDaemon with given log path wildcards. Args: db_governor_logs_wildcard (str): wildcard path for Sentry log files from db_governor. mysqld_logs_wildcard (str): wildcard path for Sentry log files from mysqld. """ self.db_governor_logs_wildcard = db_governor_logs_wildcard self.mysqld_logs_wildcard = mysqld_logs_wildcard sentry_sdk_wrapper.init() self.internal_logger = logging.getLogger("sentry_daemon") # for internal, non-forwarded events self.preface_sent = False class TerminateException(Exception): def __init__(self): super().__init__() @staticmethod def handle_sigterm(signum, frame): """ On SIGTERM signal, throw an exception caught in the outermost code and triggering daemon shutdown. Make sure this type of exception is re-thrown in all of inner try/except's. Args: signum (int): signal number. frame (frame): current stack frame. """ raise SentryDaemon.TerminateException() @staticmethod def print(s): print(s, file=sys.stderr) @staticmethod def print_sentry_transport_status(prompt): # use it for debugging, if you want to reconsider our interaction with Sentry transport SentryDaemon.print(f"{prompt}: queuse size={sentry_sdk.Hub.current.client.transport._worker._queue.qsize()}, healthy={sentry_sdk_wrapper.is_healthy()}") def run(self): """ Starts the daemon to read log files and send logs to Sentry. """ self.print(f"Started reading log files in {self.db_governor_logs_wildcard} and {self.mysqld_logs_wildcard}") events_ever_lost, loss_report_sent, loss_reporting_complete = False, False, False while True: send_logs = True # by default, all found logs will be sent to Sentry # Handle loss reporting if events_ever_lost and not loss_reporting_complete: # Since the detection of event loss, we're struggling for reporting it. This reporting takes place only once per daemon session. send_logs = False # For this period, we suspend transmission of regular log files - otherwise they can choke us again and leave us no chance to render the loss on Sentry server. healthy = sentry_sdk_wrapper.is_healthy() # It's vital to make no movements while this is False. 'sentry_sdk' is fragile enough under high load, and we need to report the loss reliably. self.print(f"Event loss reporting phase, regular log files are being skipped; transport healthy: {healthy}") if healthy: # No state change while unhealthy. We wait for health _before_ sending the loss report, and once again _after_ sending it. if loss_report_sent: loss_reporting_complete = True # We're healthy after the loss report transmission. Loss reporting is over. self.print("Event loss reporting complete") # On the next iteration we shall return to normal log file processing. else: self.internal_logger.warning("Errors possibly lost") # We're healthy, but haven't yet sent the loss report. Send it now. loss_report_sent = True # We send it only once per daemon session. self.print("Event loss report sent") # Scan for log files report = "" report_nonzero = False for wildcard, logger in [ (self.db_governor_logs_wildcard, "db_governor"), (self.mysqld_logs_wildcard, "mysqld")]: try: logs = glob.glob(wildcard) except SentryDaemon.TerminateException: # handling it separately spares us of knowing possible exception types from glob() raise except Exception as e: self.internal_logger.error(f"Failed to scan '{wildcard}': {e}") # bug, can't normally happen -> print locally + report to Sentry (loggers are intercepted by 'sentry_sdk') n_sent, n_deleted = 0, 0 for log in logs: if not os.path.exists(log): self.print(f"Disappeared file '{log}'") # can be caused by races with sentry_cleaner.sh -> print only locally continue # MySQL version can be empty - it's not always available inside 'db_governor', # and never available in 'mysqld'. # The latter sounds so ridiculous, we surely have to fix it soon. match = re.match(r"(.*)-mysql\.", os.path.basename(log)) # basename() must not throw - 'log' is a valid path, proven above if not match: self.internal_logger.error(f"Invalid file name '{log}'") # bug -> print + Sentry else: ver_mysql = match.group(1) if send_logs: message = None try: with open(log, 'r') as f: try: message = f.read() except SentryDaemon.TerminateException: raise except Exception as e: self.print(f"Failed to read {log}: {e}") # races -> print only except SentryDaemon.TerminateException: raise except Exception as e: self.print(f"Failed to open {log}: {e}") # races -> print only if message is not None: self.process_message(logger, message.strip(), ver_mysql) n_sent += 1 try: os.remove(log) n_deleted += 1 except SentryDaemon.TerminateException: raise except Exception as e: self.print(f"Failed to delete {log}: {e}") # races -> print only if len(report): report += "; " report += f"{logger}: {n_sent} sent, {n_deleted} deleted" if n_sent or n_deleted: report_nonzero = True if report_nonzero: # CLOS-2885, don't bloat log under zero Sentry activity self.print(report) # how many files were sent and deleted # Detect event loss if not events_ever_lost and not sentry_sdk_wrapper.is_healthy(): events_ever_lost = True # this could trigger due to Rate Limiting response from Sentry server, or due to local queue overflow, or other internal problem self.print("Event loss first detected") # Sleep for a bit before checking the log files again time.sleep(DAEMON_INTERVAL) def process_message(self, logger, message, ver_mysql): """ Processes a single message received from the client. Args: message (str): log message received. logger (str): logger to use for sending the message to Sentry. """ # The purpose of this preface Sentry event is to guarantee that we see the complete Python attributes, like module list, at least once - # because we strip them from the following forwarded events. if not self.preface_sent: self.internal_logger.warning("Hello, bad news, errors follow...") self.preface_sent = True self.print("Preface sent") norsqr = r"([^]]+)" # anything without right square bracket insqr = rf"\[{norsqr}\]" # anything in square brackets message_format = rf"\s*{insqr}\s*\[(\d+):(\d+)\][\s!]*\[{norsqr}:(\d+):{norsqr}\]\s*{insqr}\s*(.*)$" match = re.match(message_format, message) if match: timestamp, process, thread, src_file, src_line, src_func, tags, text = match.groups() try: process, thread, src_line = int(process), int(thread), int(src_line) except ValueError: match = None # use 'match' as a generic validity marker tags = tags.split(":") if not all(tags): # empty tags not permitted match = None tags = [t for t in tags if t != "ERRSENTRY"] # omit this one - it's always present in Sentry-reported log messages (unless we use some cryptic internal-use-only file flags) src_func += "()" if not match: self.internal_logger.error(f"Invalid message format in '{logger}' log: '{message}'") # sends to Sentry and prints locally return with sentry_sdk.push_scope() as scope: # set message-specific tags scope.set_tag("mysql.version", sentry_sdk_wrapper.VIS(ver_mysql)) scope.set_tag("actual_time", timestamp) scope.set_tag("process", process) scope.set_tag("thread", thread) for tag in tags: scope.set_tag(tag, True) # Extract every "key=val", replace with "key=<...>", and add "val" as a Sentry tag. def substitute_one_match(match): key, val = match.groups() # "specific." prefix - to easily distinguish them visually and to avoid clashes with common tags scope.set_tag("specific." + key, val.strip("'")) return f"{key}=<...>" text = re.sub(r"\b([a-zA-Z]\w*)=('.+?'|\w+\b)", substitute_one_match, text) # Sentry server overrides the transmitted value of 'event.type' and sets it to 'error' only if it finds the actual error cause - the exception. # 'sentry_sdk' is designed to catch and report exceptions in its native language environment - Python in our case. # To emulate an error event with the appropriate type, logger name and source code attributes, I found no easier way than building it manually: event = { "level": "error", "logger": logger, "exception": { # We need to trigger an error somehow. Alternatively, we could use 'threads'->'stacktrace', but it has its downsides. "values": [ { "type": text, # SIC! This is shown as an event title in case of exceptions. "value": "", "thread_id": thread, "stacktrace": { "frames": [ { "function": src_func, "lineno": src_line, "filename": src_file } ] } } ] } } sentry_sdk_wrapper.strip_event_pythonicity = True # tell before_send() to remove event attributes that are irrelevant for an event forwarded from C code sentry_sdk.capture_event(event) sentry_sdk_wrapper.strip_event_pythonicity = False def cleanup(self): """ Cleans up the resources used by the daemon. Does not clean the log files, so that they could be transmitted to Sentry on the next daemon run. """ pass if __name__ == "__main__": daemon = SentryDaemon(DB_GOVERNOR_LOGS_WILDCARD, MYSQLD_LOGS_WILDCARD) signal.signal(signal.SIGTERM, SentryDaemon.handle_sigterm) try: daemon.run() except SentryDaemon.TerminateException: # Unfortunately, we shouldn't print() here, because it often leads to errors - e.g., BrokenPipe. # stdout and stderr seem to be in complicated state during SIGTERM handling. daemon.cleanup() except KeyboardInterrupt: daemon.cleanup() finally: daemon.cleanup()
SAVE
CANCEL