PATH:
usr
/
share
/
lve
/
dbgovernor
/
scripts
/
Editing: dbgovernor_watchdog.py
#!/opt/cloudlinux/venv/bin/python3 # coding:utf-8 # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2024 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT # import sentry_sdk import sentry_sdk_wrapper import subprocess import argparse import datetime import struct import mmap import sys import os from ctypes import sizeof, c_byte, c_char, c_int32, c_long, Structure, Union WATCHDOG_LOG_FILE = "/var/log/dbgovernor-watchdog.log" DBCTL_BIN = '/usr/share/lve/dbgovernor/utils/dbctl_orig' sentry_sdk_wrapper.init() def global_exception_handler(exc_type, exc_value, exc_traceback): # Ignore KeyboardInterrupt to allow graceful exit if issubclass(exc_type, KeyboardInterrupt): sys.__excepthook__(exc_type, exc_value, exc_traceback) return # Report the exception to Sentry sentry_sdk.capture_exception((exc_type, exc_value, exc_traceback)) try: if os.isatty(sys.stdin.fileno()) and os.isatty(sys.stderr.fileno()): # Call the default exception handler if not running from Cron sys.__excepthook__(exc_type, exc_value, exc_traceback) except OSError: pass # Override the default exception handler behavior with a custom exception handler. # This will ensure that any unhandled exceptions are reported to the Sentry. sys.excepthook = global_exception_handler def sentry_log(message, level="info", username=None): """ Logs message to Sentry if DSN is available. Args: message (str): The message to be logged. level (str, optional): The level of the log. Defaults to "info". """ with sentry_sdk.push_scope() as scope: if username: scope.set_tag("username", username) sentry_sdk.capture_message(message, level) def log_message(level="info", message="", log_to_file=False, username=None): """ Logs a message to stdout and Sentry (if it's an error). Optionally, logs to a file. Args: message (str): The message to log. level (str, optional): The log level. Defaults to "info". log_to_file (bool, optional): Whether to log to a file. Defaults to False. """ # Log to Sentry if it's an error if level == "error": sentry_log(message, level, username) # Write to log file if log_to_file: try: with open(WATCHDOG_LOG_FILE, "a+") as log_file: timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") log_entry = f"{timestamp} [{level}]" if username: log_entry += f" username={username}," log_entry += f" {message}\n" log_file.write(log_entry) except (PermissionError, OSError, Exception) as e: sentry_log(f"Failed to log to file {WATCHDOG_LOG_FILE}: {str(e)}", "error") def get_restricted_user_list(log_to_file=False): """ Retrieves restricted users by running dbctl list-restricted command. Args: log_to_file (bool): If True, logs messages to a file. Returns: List of restricted users. """ if not os.path.exists(DBCTL_BIN): log_message("error", f"{DBCTL_BIN} does not exist", log_to_file) return [] # Run the dbctl command and capture its output try: result = subprocess.run([DBCTL_BIN, 'list-restricted'], text=True, check=True, capture_output=True) except subprocess.CalledProcessError as e: log_message("error", f"Error running dbctl: {e}", log_to_file) return [] # Parse the output output = result.stdout lines = output.splitlines() user_list = [] # Skip the header line and any potentially empty lines for line in lines[1:]: if line.strip(): # Check if the line is not empty parts = line.split() user = parts[0] user_list.append(user) return user_list def get_bad_user_list(log_to_file=False): """ Retrieves restricted users by running dbctl list-restricted-shm command. Args: log_to_file (bool): If True, logs messages to a file. Returns: List of restricted users. """ if not os.path.exists(DBCTL_BIN): log_message("error", f"{DBCTL_BIN} does not exist", log_to_file) return [] # Run the dbctl command and capture its output try: result = subprocess.run([DBCTL_BIN, 'list-restricted-shm'], text=True, check=True, capture_output=True) except subprocess.CalledProcessError as e: log_message("error", f"Error running dbctl: {e}", log_to_file) return [] # Parse the output output = result.stdout lines = output.splitlines() user_list = [] # Skip the header line and any potentially empty lines for line in lines[1:]: if line.strip(): # Check if the line is not empty parts = line.split() user = parts[0] user_list.append(user) return user_list def check_bad_user_list(log_to_file=False): """ Checks for bad users not in restricted list and logs them. Args: log_to_file (bool): If True, logs messages to a file. """ bad_user_list = get_bad_user_list(log_to_file) restricted_list = get_restricted_user_list(log_to_file) for username in bad_user_list: if username not in restricted_list: log_message("error", "user in bad list but not in list-restricted", log_to_file, username=username) def dbctl_check_governor(log_to_file=False): """ Checks if the Governor is responsive by running 'dbctl list'. Args: log_to_file (bool): If True, logs messages to a file. """ if not os.path.exists(DBCTL_BIN): log_message("warn", f"dbctl binary not found at {DBCTL_BIN}", log_to_file) return try: result = subprocess.run([DBCTL_BIN, 'list'], text=True, check=True, capture_output=True) if result.returncode != 0: log_message("error", f"dbctl exit with non-zero value: {result.returncode}, " f"stderr: {result.stderr}, stdout: {result.stdout}", log_to_file) except subprocess.CalledProcessError as e: log_message("error", f"Governor is not responsive: exit code={e.returncode}, " f"stderr={e.stderr if e.stderr else 'none'}, " f"stdout={e.stdout if e.stdout else 'none'}", log_to_file) except Exception as e: log_message("error", f"Failed to call dbctl command: {str(e)}", log_to_file) def main(argv): """ Main function that checks if the Governor is responsive and checks for bad users are in the restricted list. Args: argv (list): List of command line arguments. """ parser = argparse.ArgumentParser(description="Governor watchdog") parser.add_argument('--file-log', action='store_true', help='Enable logging to a file') args = parser.parse_args(argv) log_to_file = args.file_log dbctl_check_governor(log_to_file) #check_bad_user_list(log_to_file) if __name__ == "__main__": main(sys.argv[1:])
SAVE
CANCEL