diff --git a/meshbook.py b/meshbook.py index 57ae3f0..86318e5 100644 --- a/meshbook.py +++ b/meshbook.py @@ -9,13 +9,37 @@ import json import meshctrl # Local Python libraries/modules -from modules.console import * -from modules.executor import * -from modules.utilities import * +from modules.console import Console +from modules.executor import Executor +from modules.history import History +from modules.utilities import Transform, Utilities -meshbook_version = "1.3.1" +meshbook_version = "1.3.2" grace_period = 3 # Grace period will last for x (by default 3) second(s). +def define_cmdargs() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Process command-line arguments") + + parser.add_argument("-mb", "--meshbook", type=str, help="Path to the meshbook yaml file.") + + parser.add_argument("--historydir", type=str, help="Define a custom history log directory (default: ./history).", default="./history") + parser.add_argument("--nohistory", action="store_true", help="Disable the logging of the history into a local log (text) file inside './history'.") + parser.add_argument("--flushhistory", action="store_true", help="Clear old history logs before running the Meshbook.") + + parser.add_argument("-oc", "--oscategories", type=str, help="Path to the Operating System categories JSON file.", default="./os_categories.json") + parser.add_argument("--conf", type=str, help="Path for the API configuration file (default: ./config.conf).", default="./api.conf") + parser.add_argument("--nograce", action="store_true", help="Disable the grace 3 seconds before running the meshbook.") + + parser.add_argument("-g", "--group", type=str, help="Specify a manual override for the group.", default="") + parser.add_argument("-d", "--device", type=str, help="Specify a manual override for a device.", default="") + parser.add_argument("-i", "--indent", action="store_true", help="Use an JSON indentation of 4 when this flag is passed.", default=False) + parser.add_argument("-s", "--silent", action="store_true", help="Suppress terminal output.", default=False) + parser.add_argument("--shlex", action="store_true", help="Shlex the lines. (SHell LEXical Analysis)", default=False) + + parser.add_argument("--version", action="store_true", help="Show the Meshbook version.") + + return parser + async def init_connection(credentials: dict) -> meshctrl.Session: ''' Use the libmeshctrl library to initiate a Secure Websocket (wss) connection to the MeshCentral instance. @@ -40,159 +64,21 @@ async def init_connection(credentials: dict) -> meshctrl.Session: await session.initialized.wait() return session -async def gather_targets(args: argparse.Namespace, - meshbook: dict, - group_list: dict[str, list[dict]], - os_categories: dict) -> dict: - """ - Finds target devices based on meshbook criteria (device, devices, group or groups). - """ - - group_list = {k.lower(): v for k, v in group_list.items()} # Normalize keys - target_list = [] - offline_list = [] - - target_os = meshbook.get("target_os") - target_tag = meshbook.get("target_tag") - ignore_categorisation = meshbook.get("ignore_categorisation", False) - - async def add_processed_devices(processed): - """Helper to update target and offline lists.""" - if processed: - target_list.extend(processed.get("valid_devices", [])) - offline_list.extend(processed.get("offline_devices", [])) - - async def process_device_helper(device): - processed = await utilities.process_device( - device, - group_list, - os_categories, - target_os, - ignore_categorisation, - target_tag - ) - await add_processed_devices(processed) - - async def process_group_helper(group): - processed = await utilities.filter_targets( - group, os_categories, target_os, ignore_categorisation, target_tag - ) - await add_processed_devices(processed) - - ''' - Groups receive the first priority, then device targets. - ''' - match meshbook: - case {"group": pseudo_target}: - if isinstance(pseudo_target, str): - pseudo_target = pseudo_target.lower() - - if pseudo_target in group_list: - await process_group_helper(group_list[pseudo_target]) - - elif pseudo_target not in group_list: - console.nice_print( - args, - console.text_color.yellow + "Targeted group not found on the MeshCentral server.", - True - ) - elif isinstance(pseudo_target, list): - console.nice_print( - args, - console.text_color.yellow + "Please use groups (Notice the plural with 'S') for multiple groups.", - True - ) - else: - console.nice_print( - args, - console.text_color.yellow + "The 'group' key is being used, but an unknown data type was found, please check your values.", - True - ) - - case {"groups": pseudo_target}: - if isinstance(pseudo_target, list): - for sub_group in pseudo_target: - sub_group = sub_group.lower() - if sub_group in group_list: - await process_group_helper(group_list[sub_group]) - elif isinstance(pseudo_target, str) and pseudo_target.lower() == "all": - for group in group_list.values(): - await process_group_helper(group) - elif isinstance(pseudo_target, str): - console.nice_print( - args, - console.text_color.yellow + "The 'groups' key is being used, but only one string is given. Did you mean 'group'?", - True - ) - else: - console.nice_print( - args, - console.text_color.yellow + "The 'groups' key is being used, but an unknown data type was found, please check your values.", - True - ) - - case {"device": pseudo_target}: - if isinstance(pseudo_target, str): - await process_device_helper(pseudo_target) - elif isinstance(pseudo_target, list): - console.nice_print( - args, - console.text_color.yellow + "Please use devices (Notice the plural with 'S') for multiple devices.", - True - ) - else: - console.nice_print( - args, - console.text_color.yellow + "The 'device' key is being used, but an unknown data type was found, please check your values.", - True - ) - - case {"devices": pseudo_target}: - if isinstance(pseudo_target, list): - for sub_device in pseudo_target: - await process_device_helper(sub_device) - elif isinstance(pseudo_target, str): - console.nice_print( - args, - console.text_color.yellow + "The 'devices' key is being used, but only one string is given. Did you mean 'device'?", - True - ) - else: - console.nice_print( - args, - console.text_color.yellow + "The 'devices' key is being used, but an unknown data type was found, please check your values.", - True - ) - - return {"target_list": target_list, "offline_list": offline_list} - async def main(): + local_categories_file = "./os_categories.json" + just_fix_windows_console() ''' Main function where the program starts. Place from which all comands originate (eventually). ''' - parser = argparse.ArgumentParser(description="Process command-line arguments") - parser.add_argument("-mb", "--meshbook", type=str, help="Path to the meshbook yaml file.") - - parser.add_argument("-oc", "--oscategories", type=str, help="Path to the Operating System categories JSON file.", default="./os_categories.json") - parser.add_argument("--conf", type=str, help="Path for the API configuration file (default: ./config.conf).", default="./api.conf") - parser.add_argument("--nograce", action="store_true", help="Disable the grace 3 seconds before running the meshbook.") - parser.add_argument("-g", "--group", type=str, help="Specify a manual override for the group.", default="") - parser.add_argument("-d", "--device", type=str, help="Specify a manual override for a device", default="") - parser.add_argument("-i", "--indent", action="store_true", help="Use an JSON indentation of 4 when this flag is passed.", default=False) - parser.add_argument("-r", "--raw-result", action="store_true", help="Print the raw result.", default=False) - parser.add_argument("-s", "--silent", action="store_true", help="Suppress terminal output.", default=False) - parser.add_argument("--shlex", action="store_true", help="Shlex the lines.", default=False) - - parser.add_argument("--version", action="store_true", help="Show the Meshbook version.") - + # Define the cmd arguments + parser = define_cmdargs() args = parser.parse_args() - local_categories_file = "./os_categories.json" if args.version: - console.nice_print(args, - console.text_color.reset + "MeshBook Version: " + console.text_color.yellow + str(meshbook_version)) + Console.print_text(args.silent, + Console.text_color.reset + "MeshBook Version: " + Console.text_color.yellow + str(meshbook_version)) return if not args.meshbook: @@ -203,9 +89,14 @@ async def main(): with open(local_categories_file, "r") as file: os_categories = json.load(file) + if not Utilities.path_exist(args.meshbook) or Utilities.path_type(args.meshbook) != "File": + Console.print_text(args.silent, + Console.text_color.red + "The given meshbook path is either not present on the filesystem or not a file.") + return + credentials, meshbook = await asyncio.gather( - (utilities.load_config(args)), - (utilities.compile_book(args.meshbook)) + (Utilities.load_config(args)), + (Utilities.compile_book(args.meshbook)) ) if args.group != "": @@ -218,142 +109,170 @@ async def main(): del meshbook["group"] ''' - The following section mainly displays used variables and first steps of the program to the console. + The following section mainly displays used variables and first steps of the program to the Console. ''' # INIT ARGUMENTS PRINTING - console.nice_print(args, - console.text_color.reset + ("-" * 40)) - console.nice_print(args, - "meshbook: " + console.text_color.yellow + args.meshbook + console.text_color.reset + ".") - console.nice_print(args, - "Operating System Categorisation file: " + console.text_color.yellow + args.oscategories + console.text_color.reset + ".") - console.nice_print(args, - "Configuration file: " + console.text_color.yellow + args.conf + console.text_color.reset + ".") + Console.print_line(args.silent) + Console.print_text(args.silent, + "meshbook: " + Console.text_color.yellow + args.meshbook + Console.text_color.reset + ".") + Console.print_text(args.silent, + "Operating System Categorisation file: " + Console.text_color.yellow + args.oscategories + Console.text_color.reset + ".") + Console.print_text(args.silent, + "Configuration file: " + Console.text_color.yellow + args.conf + Console.text_color.reset + ".") # TARGET OS PRINTING if "target_os" in meshbook: - console.nice_print(args, - "Target Operating System category given: " + console.text_color.yellow + meshbook["target_os"] + console.text_color.reset + ".") + Console.print_text(args.silent, + "Target Operating System category given: " + Console.text_color.yellow + meshbook["target_os"] + Console.text_color.reset + ".") else: - console.nice_print(args, - "Target Operating System category given: " + console.text_color.yellow + "All" + console.text_color.reset + ".") + Console.print_text(args.silent, + "Target Operating System category given: " + Console.text_color.yellow + "All" + Console.text_color.reset + ".") # Should Meshbook ignore categorisation? if "ignore_categorisation" in meshbook: - console.nice_print(args, - "Ignore the OS Categorisation file: " + console.text_color.yellow + str(meshbook["ignore_categorisation"]) + console.text_color.reset + ".") + Console.print_text(args.silent, + "Ignore the OS Categorisation file: " + Console.text_color.yellow + str(meshbook["ignore_categorisation"]) + Console.text_color.reset + ".") if meshbook["ignore_categorisation"]: - console.nice_print(args, - console.text_color.red + "!!!!\n" + - console.text_color.yellow + + Console.print_text(args.silent, + Console.text_color.red + "!!!!\n" + + Console.text_color.yellow + "Ignore categorisation is True.\nThis means that the program checks if the target Operating System is somewhere in the reported device Operating System." + - console.text_color.red + "\n!!!!") + Console.text_color.red + "\n!!!!") else: - console.nice_print(args, - "Ignore the OS Categorisation file: " + console.text_color.yellow + "False" + console.text_color.reset + ".") + Console.print_text(args.silent, + "Ignore the OS Categorisation file: " + Console.text_color.yellow + "False" + Console.text_color.reset + ".") # TARGET TAG PRINTING if "target_tag" in meshbook: - console.nice_print(args, - "Target Device tag given: " + console.text_color.yellow + meshbook["target_tag"] + console.text_color.reset + ".") + Console.print_text(args.silent, + "Target Device tag given: " + Console.text_color.yellow + meshbook["target_tag"] + Console.text_color.reset + ".") else: - console.nice_print(args, - "Target Device tag given: " + console.text_color.yellow + "All" + console.text_color.reset + ".") + Console.print_text(args.silent, + "Target Device tag given: " + Console.text_color.yellow + "All" + Console.text_color.reset + ".") # TARGET PRINTING if "device" in meshbook: - console.nice_print(args, - "Target device: " + console.text_color.yellow + str(meshbook["device"]) + console.text_color.reset + ".") + Console.print_text(args.silent, + "Target device: " + Console.text_color.yellow + str(meshbook["device"]) + Console.text_color.reset + ".") elif "devices" in meshbook: - console.nice_print(args, - "Target devices: " + console.text_color.yellow + str(meshbook["devices"]) + console.text_color.reset + ".") + Console.print_text(args.silent, + "Target devices: " + Console.text_color.yellow + str(meshbook["devices"]) + Console.text_color.reset + ".") elif "group" in meshbook: - console.nice_print(args, - "Target group: " + console.text_color.yellow + str(meshbook["group"]) + console.text_color.reset + ".") + Console.print_text(args.silent, + "Target group: " + Console.text_color.yellow + str(meshbook["group"]) + Console.text_color.reset + ".") elif "groups" in meshbook: - console.nice_print(args, - "Target groups: " + console.text_color.yellow + str(meshbook["groups"]) + console.text_color.reset + ".") + Console.print_text(args.silent, + "Target groups: " + Console.text_color.yellow + str(meshbook["groups"]) + Console.text_color.reset + ".") # RUNNING PARAMETERS PRINTING - console.nice_print(args, "Grace: " + console.text_color.yellow + str((not args.nograce))) # Negation of bool for correct explanation - console.nice_print(args, "Silent: " + console.text_color.yellow + "False") # Can be pre-defined because if silent flag was passed then none of this would be printed. + Console.print_text(args.silent, "Grace: " + Console.text_color.yellow + str((not args.nograce))) # Negation of bool for correct explanation + Console.print_text(args.silent, "Silent: " + Console.text_color.yellow + "False") # Can be pre-defined because if silent flag was passed then none of this would be printed. session = await init_connection(credentials) # PROCESS PRINTING aka what its doing in the moment... - console.nice_print(args, - console.text_color.reset + ("-" * 40)) - console.nice_print(args, - console.text_color.italic + "Trying to load the MeshCentral account credential file...") - console.nice_print(args, - console.text_color.italic + "Trying to load the meshbook yaml file and compile it into something workable...") - console.nice_print(args, - console.text_color.italic + "Trying to load the Operating System categorisation JSON file...") - console.nice_print(args, - console.text_color.italic + "Connecting to MeshCentral and establish a session using variables from previous credential file.") - console.nice_print(args, - console.text_color.italic + "Generating group list with nodes and reference the targets from that.") + Console.print_line(args.silent) + Console.print_text(args.silent, + Console.text_color.italic + "Trying to load the MeshCentral account credential file...") + Console.print_text(args.silent, + Console.text_color.italic + "Trying to load the meshbook yaml file and compile it into something workable...") + Console.print_text(args.silent, + Console.text_color.italic + "Trying to load the Operating System categorisation JSON file...") + Console.print_text(args.silent, + Console.text_color.italic + "Connecting to MeshCentral and establish a session using variables from previous credential file.") + Console.print_text(args.silent, + Console.text_color.italic + "Generating group list with nodes and reference the targets from that.") ''' End of the main information displaying section. ''' - group_list = await transform.compile_group_list(session) - compiled_device_list = await gather_targets(args, meshbook, group_list, os_categories) + group_list = await Transform.compile_group_list(session) + compiled_device_list = await Utilities.gather_targets(args, meshbook, group_list, os_categories) + # Check if we have reachable targets on the MeshCentral host if "target_list" not in compiled_device_list or len(compiled_device_list["target_list"]) == 0: - console.nice_print(args, - console.text_color.red + "No targets found or targets unreachable, quitting.", True) + Console.print_text(args.silent, + Console.text_color.red + "No targets found or targets unreachable, quitting.") - console.nice_print(args, - console.text_color.reset + ("-" * 40), True) + Console.print_line(args.silent) + return + Console.print_line(args.silent) + + match meshbook: + case {"group": candidate_target_name}: + target_name = candidate_target_name + + case {"groups": candidate_target_name}: + target_name = str(candidate_target_name) + + case {"device": candidate_target_name}: + target_name = candidate_target_name + + case {"devices": candidate_target_name}: + target_name = str(candidate_target_name) + + case _: + target_name = "" + + # Initialize the history / logging functions class (whatever you want to name it) + history = History(args.silent, args.historydir, args.flushhistory) + + # Conclude history initlialization + Console.print_line(args.silent) + + # From here on the actual exection happens + Console.print_text(args.silent, + Console.text_color.yellow + "Executing meshbook on the target(s): " + Console.text_color.green + target_name + Console.text_color.yellow + ".") + + if not args.nograce: + Console.print_text(args.silent, + Console.text_color.yellow + "Initiating grace-period...") + + for x in range(grace_period): + Console.print_text(args.silent, + Console.text_color.yellow + "{}...".format(x+1)) # Countdown! + await asyncio.sleep(1) + + Console.print_line(args.silent) + complete_log = await Executor.execute_meshbook(args.silent, + args.shlex, + session, + compiled_device_list, + meshbook, + group_list) + Console.print_line(args.silent) + + indent = None + if args.indent: indent = 4 + + formatted_history = json.dumps(complete_log,indent=indent) + + Console.print_text(args.silent, formatted_history, 9) + + # Pass the output of the whole program to the history class + if args.nohistory: + Console.print_text(args.silent, "Not writing to file.") else: - console.nice_print(args, - console.text_color.reset + ("-" * 40)) - - match meshbook: - case {"group": candidate_target_name}: - target_name = candidate_target_name - - case {"groups": candidate_target_name}: - target_name = str(candidate_target_name) - - case {"device": candidate_target_name}: - target_name = candidate_target_name - - case {"devices": candidate_target_name}: - target_name = str(candidate_target_name) - - case _: - target_name = "" - - - console.nice_print(args, - console.text_color.yellow + "Executing meshbook on the target(s): " + console.text_color.green + target_name + console.text_color.yellow + ".") - - if not args.nograce: - console.nice_print(args, - console.text_color.yellow + "Initiating grace-period...") - - for x in range(grace_period): - console.nice_print(args, - console.text_color.yellow + "{}...".format(x+1)) # Countdown! - await asyncio.sleep(1) - - console.nice_print(args, console.text_color.reset + ("-" * 40)) - await executor.execute_meshbook(args, - session, - compiled_device_list, - meshbook, - group_list) + Console.print_text(args.silent, "Writing to file...") + history.write_history(formatted_history) await session.close() except OSError as message: - console.nice_print(args, - console.text_color.red + f'{message}', True) + Console.print_text(args.silent, + Console.text_color.red + f'{message}') + + except asyncio.CancelledError: + Console.print_text(args.silent, + Console.text_color.red + "Received SIGINT, Aborting - (Tasks may still be running on targets).") + await session.close() + raise if __name__ == "__main__": - asyncio.run(main()) + try: + asyncio.run(main()) + except KeyboardInterrupt: + Console.print_text(False, Console.text_color.red + "Cancelled execution.") \ No newline at end of file diff --git a/modules/console.py b/modules/console.py index 659cba1..88e0246 100644 --- a/modules/console.py +++ b/modules/console.py @@ -1,26 +1,54 @@ # Public Python libraries import argparse +from datetime import datetime -class console: +class Console: class text_color: - black = "\033[30m" - red = "\033[31m" - green = "\033[32m" - yellow = "\033[33m" - blue = "\033[34m" - magenta = "\033[35m" - cyan = "\033[36m" - white = "\033[37m" - italic = "\x1B[3m" - reset = "\x1B[0m" + black = "\033[30m" + red = "\033[31m" + green = "\033[32m" + yellow = "\033[33m" + blue = "\033[34m" + magenta = "\033[35m" + cyan = "\033[36m" + white = "\033[37m" + italic = "\x1B[3m" + reset = "\x1B[0m" @staticmethod - def nice_print(args: argparse.Namespace, message: str, final: bool=False): + def print_text(silent: bool, message: str, prefix_select: int = 0) -> None: ''' Helper function for terminal output, with a couple variables for the silent flag. Also clears terminal color each time. - ''' - if final: - print(message) # Assuming final message, there is no need for clearing. - elif not args.silent: - print(message + console.text_color.reset) \ No newline at end of file + int tag_select legend: + 0 / default = timestamp + 1 = info + 2 = warn + 3 = err + 4 = fatal + 9 = nothing + ''' + match prefix_select: + case 1: + tag_prefix = "[INFO] " + case 2: + tag_prefix = "[WARN] " + case 3: + tag_prefix = "[ERROR] " + case 4: + tag_prefix = "[FATAL] " + case 9: + tag_prefix = "" + case _: + tag_prefix = f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} " + + if not silent: + print(tag_prefix + message + Console.text_color.reset) + + @staticmethod + def print_line(silent: bool, special: bool = False) -> None: + if not silent: + if special: + print("-=-" * 40) + else: + print(("-" * 40)) \ No newline at end of file diff --git a/modules/executor.py b/modules/executor.py index 63da539..087e4aa 100644 --- a/modules/executor.py +++ b/modules/executor.py @@ -5,26 +5,26 @@ import meshctrl from time import sleep # Local Python libraries/modules -from modules.console import console -from modules.utilities import transform +from modules.console import Console +from modules.utilities import Transform intertask_delay = 1 -class executor: +class Executor: @staticmethod - async def execute_meshbook(args: argparse.Namespace, session: meshctrl.Session, compiled_device_list: dict, meshbook: dict, group_list: dict) -> None: + async def execute_meshbook(silent: bool, enable_shlex: bool, session: meshctrl.Session, compiled_device_list: dict, meshbook: dict, group_list: dict) -> dict: ''' Actual function that handles meshbook execution, also responsible for formatting the resulting JSON. ''' - responses_list = {} + complete_log = {} targets = compiled_device_list["target_list"] offline = compiled_device_list["offline_list"] round = 1 for task in meshbook["tasks"]: - console.nice_print(args, - console.text_color.green + str(round) + ". Running: " + task["name"]) + Console.print_text(silent, + Console.text_color.green + str(round) + ". Running: " + task["name"]) if "powershell" in meshbook and meshbook["powershell"]: response = await session.run_command(nodeids=targets, command=task["command"],powershell=True,ignore_output=False,timeout=1800) @@ -36,10 +36,10 @@ class executor: device_result = response[device]["result"] response[device]["result"] = device_result.replace("Run commands completed.", "") response[device]["device_id"] = device - response[device]["device_name"] = await transform.translate_nodeid_to_name(device, group_list) + response[device]["device_name"] = await Transform.translate_nodeid_to_name(device, group_list) task_batch.append(response[device]) - responses_list["task_" + str(round)] = { + complete_log["task_" + str(round)] = { "task_name": task["name"], "data": task_batch } @@ -47,20 +47,9 @@ class executor: sleep(intertask_delay) # Sleep for x amount of time. for index, device in enumerate(offline): # Replace Device_id with actual human readable name - device_name = await transform.translate_nodeid_to_name(device, group_list) + device_name = await Transform.translate_nodeid_to_name(device, group_list) offline[index] = device_name - responses_list["Offline"] = offline + complete_log["Offline"] = offline - console.nice_print(args, - console.text_color.reset + ("-" * 40)) - - if args.indent: - if not args.raw_result: - responses_list = transform.process_shell_response(args.shlex, responses_list) - console.nice_print(args, - json.dumps(responses_list,indent=4), True) - - - else: - console.nice_print(args, - json.dumps(responses_list), True) + # Return the result + return Transform.process_shell_response(enable_shlex, complete_log) \ No newline at end of file diff --git a/modules/history.py b/modules/history.py new file mode 100644 index 0000000..e5dcecb --- /dev/null +++ b/modules/history.py @@ -0,0 +1,48 @@ +import os +from datetime import datetime + +from modules.console import Console + +class History(): + def __init__(self, silent: bool, history_directory: str, flush_history: bool) -> None: + ''' + Init function to declare some stuff and make sure we are good to go, mostly the directory. + ''' + self.silent = silent + self.history_directory = history_directory + + if not os.path.exists(history_directory): + Console.print_text(silent, "Directory absent, trying to create it now...") + + try: + os.mkdir(history_directory) + + except PermissionError: + Console.print_text(silent, Console.text_color.red + f"Failed to create directory, permission error.") + return + + history_items = os.listdir(history_directory) + if len(history_items) == 1: + Console.print_text(silent, f"There is {len(history_items)} history item.") + else: + Console.print_text(silent, f"There are {len(history_items)} history items.") + + if flush_history: + self.remove_history(history_items) + + def remove_history(self, history_items: list[str]) -> None: + if not os.access(self.history_directory, os.W_OK): + Console.print_text(self.silent, Console.text_color.red + "Unable to flush history logs, no write access.") + return + + for item in history_items: + stitched_path = f"{self.history_directory}/{item}" + + Console.print_text(self.silent, f"Removing: {item}.") + os.remove(stitched_path) + + def write_history(self, history: dict) -> bool: + stitched_file = f"{self.history_directory}/meshbook_run_{datetime.now().strftime('%Y_%m_%d_%H_%M_%S')}.log" + + with open(stitched_file, "x") as f: + f.write(history) \ No newline at end of file diff --git a/modules/utilities.py b/modules/utilities.py index 1d9efd0..7f60be3 100644 --- a/modules/utilities.py +++ b/modules/utilities.py @@ -3,13 +3,14 @@ import argparse from configparser import ConfigParser import meshctrl import os +import shlex import yaml ''' Creation and compilation of the MeshCentral nodes list (list of all nodes available to the user in the configuration) is handled in the following section. ''' -class utilities: +class Utilities: @staticmethod async def load_config(args: argparse.Namespace, segment: str = 'meshcentral-account') -> dict: @@ -43,9 +44,127 @@ class utilities: with open(meshbook_file, 'r') as f: meshbook = f.read() - meshbook = await transform.replace_placeholders(yaml.safe_load(meshbook)) + meshbook = await Transform.replace_placeholders(yaml.safe_load(meshbook)) return meshbook - + + @staticmethod + async def gather_targets(args: argparse.Namespace, + meshbook: dict, + group_list: dict[str, list[dict]], + os_categories: dict) -> dict: + """ + Finds target devices based on meshbook criteria (device, devices, group or groups). + """ + + group_list = {k.lower(): v for k, v in group_list.items()} # Normalize keys + target_list = [] + offline_list = [] + + target_os = meshbook.get("target_os") + target_tag = meshbook.get("target_tag") + ignore_categorisation = meshbook.get("ignore_categorisation", False) + + async def add_processed_devices(processed): + """Helper to update target and offline lists.""" + if processed: + target_list.extend(processed.get("valid_devices", [])) + offline_list.extend(processed.get("offline_devices", [])) + + async def process_device_helper(device): + processed = await Utilities.process_device( + device, + group_list, + os_categories, + target_os, + ignore_categorisation, + target_tag + ) + await add_processed_devices(processed) + + async def process_group_helper(group): + processed = await Utilities.filter_targets( + group, os_categories, target_os, ignore_categorisation, target_tag + ) + await add_processed_devices(processed) + + ''' + Groups receive the first priority, then device targets. + ''' + match meshbook: + case {"group": pseudo_target}: + if isinstance(pseudo_target, str): + pseudo_target = pseudo_target.lower() + + if pseudo_target in group_list: + await process_group_helper(group_list[pseudo_target]) + + elif pseudo_target not in group_list: + console.nice_print( + args, + console.text_color.yellow + "Targeted group not found on the MeshCentral server." + ) + elif isinstance(pseudo_target, list): + console.nice_print( + args, + console.text_color.yellow + "Please use groups (Notice the plural with 'S') for multiple groups." + ) + else: + console.nice_print( + args, + console.text_color.yellow + "The 'group' key is being used, but an unknown data type was found, please check your values." + ) + + case {"groups": pseudo_target}: + if isinstance(pseudo_target, list): + for sub_group in pseudo_target: + sub_group = sub_group.lower() + if sub_group in group_list: + await process_group_helper(group_list[sub_group]) + elif isinstance(pseudo_target, str) and pseudo_target.lower() == "all": + for group in group_list.values(): + await process_group_helper(group) + elif isinstance(pseudo_target, str): + console.nice_print( + args, + console.text_color.yellow + "The 'groups' key is being used, but only one string is given. Did you mean 'group'?" + ) + else: + console.nice_print( + args, + console.text_color.yellow + "The 'groups' key is being used, but an unknown data type was found, please check your values." + ) + + case {"device": pseudo_target}: + if isinstance(pseudo_target, str): + await process_device_helper(pseudo_target) + elif isinstance(pseudo_target, list): + console.nice_print( + args, + console.text_color.yellow + "Please use devices (Notice the plural with 'S') for multiple devices." + ) + else: + console.nice_print( + args, + console.text_color.yellow + "The 'device' key is being used, but an unknown data type was found, please check your values." + ) + + case {"devices": pseudo_target}: + if isinstance(pseudo_target, list): + for sub_device in pseudo_target: + await process_device_helper(sub_device) + elif isinstance(pseudo_target, str): + console.nice_print( + args, + console.text_color.yellow + "The 'devices' key is being used, but only one string is given. Did you mean 'device'?" + ) + else: + console.nice_print( + args, + console.text_color.yellow + "The 'devices' key is being used, but an unknown data type was found, please check your values." + ) + + return {"target_list": target_list, "offline_list": offline_list} + @staticmethod def get_os_variants(target_category: str, os_map: dict) -> set: @@ -60,7 +179,7 @@ class utilities: os_set = set() for sub_target_cat in value: - os_set.update(utilities.get_os_variants(sub_target_cat, value)) + os_set.update(Utilities.get_os_variants(sub_target_cat, value)) return os_set @@ -86,11 +205,11 @@ class utilities: # Identify correct OS filtering scope for key in os_categories: if key == target_os: - allowed_os = utilities.get_os_variants(target_os, os_categories) + allowed_os = Utilities.get_os_variants(target_os, os_categories) break # Stop searching once a match is found if isinstance(os_categories[key], dict) and target_os in os_categories[key]: - allowed_os = utilities.get_os_variants(target_os, os_categories[key]) + allowed_os = Utilities.get_os_variants(target_os, os_categories[key]) break # Stop searching once a match is found for device in devices: # Filter out unwanted or unreachable devices. @@ -137,7 +256,7 @@ class utilities: # If matches found, filter them and add processed devices if matched_devices: - processed = await utilities.filter_targets( + processed = await Utilities.filter_targets( matched_devices, os_categories, target_os, ignore_categorisation, target_tag ) return processed @@ -145,18 +264,31 @@ class utilities: # No matches found return {"valid_devices": [], "offline_devices": []} -import shlex -class transform: @staticmethod - def process_shell_response(shlex_enable: bool, meshbook_result: dict) -> dict: + def path_exist(path: str) -> bool: + return os.path.exists(path) + + @staticmethod + def path_type(path: str) -> str: + if os.path.isfile(path): + return "File" + if os.path.isdir(path): + return "Dir" + if os.path.islink(path): + return "Link" + return "Undefined" + +class Transform: + @staticmethod + def process_shell_response(enable_shlex: bool, meshbook_result: dict) -> dict: for task_name, task_data in meshbook_result.items(): - if task_name == "Offline": # Failsafe + if task_name == "Offline": # Failsafe do not parse Offline section, its simple continue for node_responses in task_data["data"]: task_result = node_responses["result"].splitlines() - if shlex_enable: + if enable_shlex: for index, line in enumerate(task_result): line = shlex.split(line) task_result[index] = line diff --git a/os_categories.json b/os_categories.json index 5ac5b58..02c8f78 100644 --- a/os_categories.json +++ b/os_categories.json @@ -8,7 +8,7 @@ "Debian GNU/Linux 10 (buster)" ], "Ubuntu": [ - "Ubuntu 24.04.1 LTS", + "Ubuntu 24.04.3 LTS", "Ubuntu 22.04.5 LTS", "Ubuntu 20.04.6 LTS" ]