diff --git a/README.md b/README.md index 96a45a9..ff65a42 100644 --- a/README.md +++ b/README.md @@ -147,30 +147,47 @@ tasks: The following response it received when executing the first yaml of the above files (without the `-s` parameters, which just outputs the below JSON). ```shell -~/meshbook$ python3 meshbook.py -pb examples/echo_example.yaml +$ python3 meshbook.py -mb books/aggregate_example.yaml -i --nograce -pr + ---------------------------------------- -Playbook: examples/echo_example.yaml +meshbook: books/aggregate_example.yaml Operating System Categorisation file: ./os_categories.json -Congiguration file: ./meshcentral.conf -Target group: Development -Grace: True +Configuration file: ./config.conf +Target Operating System category given: Linux +Target group: Systemec Development +Grace: False Silent: False ---------------------------------------- Trying to load the MeshCentral account credential file... -Trying to load the Playbook yaml file and compile it into something workable... +Trying to load the meshbook yaml file and compile it into something workable... Trying to load the Operating System categorisation JSON file... Connecting to MeshCentral and establish a session using variables from previous credential file. Generating group list with nodes and reference the targets from that. ---------------------------------------- Executing playbook on the target(s): Development. -Initiating grace-period... -1... -2... -3... ---------------------------------------- -1. Running: Echo! +1. Running: Ping! ---------------------------------------- -{"Task 1": "ALL THE DATA"} # Not sharing due to PID +{ + "Task 1": { + "task_name": "Ping Quad9 DNS", + "data": [ + { + "complete": true, + "result": [ + "PING 9.9.9.9 (9.9.9.9) 56(84) bytes of data.", + "64 bytes from 9.9.9.9: icmp_seq=1 ttl=61 time=26.8 ms", + "--- 9.9.9.9 ping statistics ---", + "1 packets transmitted, 1 received, 0% packet loss, time 0ms", + "rtt min/avg/max/mdev = 26.809/26.809/26.809/0.000 ms" + ], + "command": "ping 9.9.9.9 -c 1", + "device_id": "yourn nodeip", + "device_name": "yournodename" + } + ] + } +} ``` The above without `-s` is quite verbose. use `--help` to read about parameters and getting a minimal response for example. diff --git a/meshbook.py b/meshbook.py index b2c8c7c..2704234 100644 --- a/meshbook.py +++ b/meshbook.py @@ -1,65 +1,20 @@ #!/bin/python3 +# Public Python libraries import argparse import asyncio -from base64 import b64encode from colorama import just_fix_windows_console -from configparser import ConfigParser import json import meshctrl -import os -import yaml +# Local Python libraries/modules +from modules.console import * +from modules.executor import * +from modules.utilities import * + +meshbook_version = 1.3 grace_period = 3 # Grace period will last for x (by default 3) second(s). -''' -Script utilities are handled in the following section. -''' - -class ScriptEndTrigger(Exception): - pass - -class text_color: - black = "\033[30m" - red = "\033[31m" - green = "\033[32m" - yellow = "\033[33m" - blue = "\033[34m" - magenta = "\033[35m" - cyan = "\033[36m" - white = "\033[37m" - italic = "\x1B[3m" - reset = "\x1B[0m" - -def console(message: str, final: bool=False): - ''' - Helper function for terminal output, with a couple variables for the silent flag. Also clears terminal color each time. - ''' - if final: - print(message) # Assuming final message, there is no need for clearing. - elif not args.silent: - print(message + text_color.reset) - -async def load_config(segment: str = 'meshcentral-account') -> dict: - ''' - Function that loads the segment from the config.conf (by default) file and returns the it in a dict. - ''' - - conf_file = args.conf - if not os.path.exists(conf_file): - raise ScriptEndTrigger(f'Missing config file {conf_file}. Provide an alternative path.') - - config = ConfigParser() - try: - config.read(conf_file) - except Exception as err: - raise ScriptEndTrigger(f"Error reading configuration file '{conf_file}': {err}") - - if segment not in config: - raise ScriptEndTrigger(f'Segment "{segment}" not found in config file {conf_file}.') - - return config[segment] - async def init_connection(credentials: dict) -> meshctrl.Session: ''' Use the libmeshctrl library to initiate a Secure Websocket (wss) connection to the MeshCentral instance. @@ -73,234 +28,120 @@ async def init_connection(credentials: dict) -> meshctrl.Session: await session.initialized.wait() return session -async def translate_nodeid_to_name(target_id: str, group_list: dict) -> str: - ''' - Simple function that looks up nodeid to the human-readable name if existent - otherwise return None. - ''' - - for group in group_list: - for device in group_list[group]: - if device["device_id"] == target_id: - return device["device_name"] - return None - -''' -Creation and compilation happends in the following section, where the yaml gets read in, and edited accordingly. -''' - -async def compile_book(meshbook_file: dict) -> dict: - ''' - Simple function that opens the file and replaces placeholders through the next function. After that just return it. - ''' - - meshbook = open(meshbook_file, 'r') - meshbook = await replace_placeholders(yaml.safe_load(meshbook)) - return meshbook - -async def replace_placeholders(meshbook: dict) -> dict: - ''' - Replace the placeholders in both name and command fields of the tasks. According to the variables defined in the variables list. - ''' - - variables = {} - if "variables" in meshbook and isinstance(meshbook["variables"], list): - for var in meshbook["variables"]: - var_name = var["name"] - var_value = var["value"] - variables[var_name] = var_value - else: - return meshbook - - for task in meshbook.get("tasks", []): - task_name = task.get("name") - for var_name, var_value in variables.items(): - placeholder = f"{{{{ {var_name} }}}}" - task_name = task_name.replace(placeholder, var_value) - task["name"] = task_name - - command = task.get("command") - for var_name, var_value in variables.items(): - placeholder = f"{{{{ {var_name} }}}}" # Create the placeholder string like "{{ host1 }}" - command = command.replace(placeholder, var_value) - task["command"] = command - - return meshbook - -''' -Creation and compilation of the MeshCentral nodes list (list of all nodes available to the user in the configuration) is handled in the following section. -''' - -async def compile_group_list(session: meshctrl.Session) -> dict: - ''' - Function that retrieves the devices from MeshCentral and compiles it into a efficient list. - ''' - - devices_response = await session.list_devices(details=False, timeout=10) - - local_device_list = {} - for device in devices_response: - if device.meshname not in local_device_list: - local_device_list[device.meshname] = [] - - local_device_list[device.meshname].append({ - "device_id": device.nodeid, - "device_name": device.name, - "device_os": device.os_description, - "device_tags": device.tags, - "reachable": device.connected - }) - return local_device_list - -async def filter_targets(devices: list[dict], os_categories: dict, target_os: str = None, target_tag: str = None) -> list[str]: - ''' - Filters devices based on reachability and optional OS criteria, supporting nested OS categories. - ''' - - valid_devices = [] - - def get_os_variants(category: str, os_map: dict) -> set: - ''' - Extracts all OS names under a given category if it exists. - ''' - - for key, value in os_map.items(): - if key == category: - if isinstance(value, dict): # Expand nested categories - os_set = set() - for subcat in value: - os_set.update(get_os_variants(subcat, value)) - return os_set - elif isinstance(value, list): # Direct OS list - return set(value) - return set() - - allowed_os = set() - - # Identify correct OS filtering scope - for key in os_categories: - if key == target_os: - allowed_os = get_os_variants(target_os, os_categories) - break # Stop searching once a match is found - - if isinstance(os_categories[key], dict) and target_os in os_categories[key]: - allowed_os = get_os_variants(target_os, os_categories[key]) - break # Stop searching once a match is found - - # Filter out unwanted or unreachable devices. - for device in devices: - if not device["reachable"]: - continue # Skip unreachable devices. - - if target_tag and target_tag not in device["device_tags"]: - continue - - if device["device_os"] not in allowed_os: - continue - - valid_devices.append(device["device_id"]) - - return valid_devices - -async def gather_targets(meshbook: dict, group_list: dict[str, list[dict]], os_categories: dict) -> list[str]: +async def gather_targets(args: argparse.Namespace, + meshbook: dict, + group_list: dict[str, list[dict]], + os_categories: dict) -> list[str]: ''' Finds target devices based on meshbook criteria (device or group). ''' target_list = [] + offline_list = [] target_os = meshbook.get("target_os") + ignore_categorisation = meshbook.get("ignore_categorisation", False) target_tag = meshbook.get("target_tag") - async def process_device_or_group(pseudo_target, group_list, os_categories, target_os) -> list[str]: - ''' - Helper function to process devices or groups. - ''' - - matched_devices = [] - for group in group_list: - for device in group_list[group]: - if device["device_name"] == pseudo_target: - matched_devices.append(device) - - if matched_devices: - return await filter_targets(matched_devices, os_categories, target_os, target_tag) - return [] - match meshbook: case {"device": pseudo_target}: # Single device target if isinstance(pseudo_target, str): - matched_devices = await process_device_or_group(pseudo_target, group_list, os_categories, target_os) - target_list.extend(matched_devices) + processed_devices = await utilities.process_device_or_group(pseudo_target, + group_list, + os_categories, + target_os, + ignore_categorisation, + target_tag) + if len(processed_devices) > 0: + matched_devices = processed_devices["valid_devices"] + target_list.extend(matched_devices) + if len(processed_devices) > 0: + offline_devices = processed_devices["offline_devices"] + offline_list.extend(offline_devices) + else: - console(text_color.yellow + "Please use devices (Notice the 'S') for multiple devices.", True) + console.nice_print(args, + console.text_color.yellow + "Please use devices (Notice the 'S') for multiple devices.", True) case {"devices": pseudo_target}: # List of devices if isinstance(pseudo_target, list): for sub_pseudo_device in pseudo_target: - matched_devices = await process_device_or_group(sub_pseudo_device, group_list, os_categories, target_os) + processed_devices = await utilities.process_device_or_group(sub_pseudo_device, + group_list, + os_categories, + target_os, + ignore_categorisation, + target_tag,) + if len(processed_devices) > 0: + matched_devices = processed_devices["valid_devices"] target_list.extend(matched_devices) + if len(processed_devices) > 0: + offline_devices = processed_devices["offline_devices"] + offline_list.extend(offline_devices) else: - console(text_color.yellow + "The 'devices' method is being used, but only one string is given. Did you mean 'device'?", True) + console.nice_print(args, console.text_color.yellow + "The 'devices' method is being used, but only one string is given. Did you mean 'device'?", True) case {"group": pseudo_target}: # Single group target if isinstance(pseudo_target, str) and pseudo_target in group_list: - matched_devices = await filter_targets(group_list[pseudo_target], os_categories, target_os, target_tag) - target_list.extend(matched_devices) + processed_devices = await utilities.filter_targets(group_list[pseudo_target], + os_categories, + target_os, + ignore_categorisation, + target_tag) + if len(processed_devices) > 0: + matched_devices = processed_devices["valid_devices"] + target_list.extend(matched_devices) + if len(processed_devices) > 0: + offline_devices = processed_devices["offline_devices"] + offline_list.extend(offline_devices) + elif pseudo_target not in group_list: - console(text_color.yellow + "Targeted group not found on the MeshCentral server.", True) + console.nice_print(args, + console.text_color.yellow + "Targeted group not found on the MeshCentral server.", True) + else: - console(text_color.yellow + "Please use groups (Notice the 'S') for multiple groups.", True) + console.nice_print(args, + console.text_color.yellow + "Please use groups (Notice the 'S') for multiple groups.", True) + case {"groups": pseudo_target}: # List of groups if isinstance(pseudo_target, list): for sub_pseudo_target in pseudo_target: if sub_pseudo_target in group_list: - matched_devices = await filter_targets(group_list[sub_pseudo_target], os_categories, target_os, target_tag) + processed_devices = await utilities.filter_targets(group_list[sub_pseudo_target], + os_categories, + target_os, + ignore_categorisation, + target_tag) + if len(processed_devices) > 0: + matched_devices = processed_devices["valid_devices"] target_list.extend(matched_devices) - if pseudo_target.lower() == "all": + if len(processed_devices) > 0: + offline_devices = processed_devices["offline_devices"] + offline_list.extend(offline_devices) + + elif pseudo_target.lower() == "all": for group in group_list: - matched_devices = await filter_targets(group_list[group], os_categories, target_os, target_tag) - target_list.extend(matched_devices) + processed_devices = await utilities.filter_targets(group_list[group], + os_categories, + target_os, + ignore_categorisation, + target_tag) + if len(processed_devices) > 0: + matched_devices = processed_devices["valid_devices"] + target_list.extend(matched_devices) + if len(processed_devices) > 0: + offline_devices = processed_devices["offline_devices"] + offline_list.extend(offline_devices) + else: - console(text_color.yellow + "The 'groups' method is being used, but only one string is given. Did you mean 'group'?", True) + console.nice_print(args, + console.text_color.yellow + "The 'groups' method is being used, but only one string is given. Did you mean 'group'?", True) - return target_list - -async def execute_meshbook(session: meshctrl.Session, targets: dict, meshbook: dict, group_list: dict) -> None: - ''' - Actual function that handles meshbook execution, also responsible for formatting the resulting JSON. - ''' - - responses_list = {} - round = 1 - - for task in meshbook["tasks"]: - console(text_color.green + str(round) + ". Running: " + task["name"]) - if "powershell" in meshbook and meshbook["powershell"]: - response = await session.run_command(nodeids=targets, command=task["command"],powershell=True,ignore_output=False,timeout=900) - else: - response = await session.run_command(nodeids=targets, command=task["command"],ignore_output=False,timeout=900) - - task_batch = [] - for device in response: - device_result = response[device]["result"] - response[device]["result"] = device_result.replace("Run commands completed.", "") - response[device]["device_id"] = device - response[device]["device_name"] = await translate_nodeid_to_name(device, group_list) - task_batch.append(response[device]) - - responses_list["Task " + str(round)] = { - "task_name": task["name"], - "data": task_batch - } - round += 1 - - console(text_color.reset + ("-" * 40)) - if args.indent: - console((json.dumps(responses_list,indent=4)), True) - - else: - console(json.dumps(responses_list), True) + return { + "target_list": target_list, + "offline_list": offline_list + } async def main(): just_fix_windows_console() @@ -309,70 +150,130 @@ async def main(): ''' parser = argparse.ArgumentParser(description="Process command-line arguments") - parser.add_argument("-mb", "--meshbook", type=str, help="Path to the meshbook yaml file.", required=True) + parser.add_argument("-mb", "--meshbook", type=str, help="Path to the meshbook yaml file.") - parser.add_argument("-oc", "--oscategories", type=str, help="Path to the Operating System categories JSON file.", required=False, default="./os_categories.json") - parser.add_argument("--conf", type=str, help="Path for the API configuration file (default: ./config.conf).", required=False, default="./config.conf") - parser.add_argument("--nograce", action="store_true", help="Disable the grace 3 seconds before running the meshbook.", required=False) - parser.add_argument("-i", "--indent", action="store_true", help="Use an JSON indentation of 4 when this flag is passed.", required=False) - parser.add_argument("-s", "--silent", action="store_true", help="Suppress terminal output", required=False) + parser.add_argument("-oc", "--oscategories", type=str, help="Path to the Operating System categories JSON file.", default="./os_categories.json") + parser.add_argument("--conf", type=str, help="Path for the API configuration file (default: ./config.conf).", default="./config.conf") + parser.add_argument("--nograce", action="store_true", help="Disable the grace 3 seconds before running the meshbook.") + parser.add_argument("-i", "--indent", action="store_true", help="Use an JSON indentation of 4 when this flag is passed.") + parser.add_argument("-r", "--raw-result", action="store_true", help="Print the raw result.") + parser.add_argument("-s", "--silent", action="store_true", help="Suppress terminal output.") + parser.add_argument("--shlex", action="store_true", help="Shlex the lines.") + + parser.add_argument("-v", "--version", action="store_true", help="Show the Meshbook version.") - global args args = parser.parse_args() local_categories_file = "./os_categories.json" + if args.version: + console.nice_print(args, + console.text_color.reset + "MeshBook Version: " + console.text_color.yellow + str(meshbook_version)) + return + + if not args.meshbook: + parser.print_help() + return + try: with open(local_categories_file, "r") as file: os_categories = json.load(file) credentials, meshbook = await asyncio.gather( - (load_config()), - (compile_book(args.meshbook)) + (utilities.load_config(args)), + (utilities.compile_book(args.meshbook)) ) ''' The following section mainly displays used variables and first steps of the program to the console. ''' - console(text_color.reset + ("-" * 40)) - console("meshbook: " + text_color.yellow + args.meshbook) - console("Operating System Categorisation file: " + text_color.yellow + args.oscategories) - console("Configuration file: " + text_color.yellow + args.conf) + # INIT ARGUMENTS PRINTING + console.nice_print(args, + console.text_color.reset + ("-" * 40)) + console.nice_print(args, + "meshbook: " + console.text_color.yellow + args.meshbook) + console.nice_print(args, + "Operating System Categorisation file: " + console.text_color.yellow + args.oscategories) + console.nice_print(args, + "Configuration file: " + console.text_color.yellow + args.conf) + + # TARGET OS PRINTING if "target_os" in meshbook: - console("Target Operating System category given: " + text_color.yellow + meshbook["target_os"]) + console.nice_print(args, + "Target Operating System category given: " + console.text_color.yellow + meshbook["target_os"]) else: - console("Target Operating System category given: " + text_color.yellow + "All") + console.nice_print(args, + "Target Operating System category given: " + console.text_color.yellow + "All") + + # Should Meshbook ignore categorisation? + if "ignore_categorisation" in meshbook: + console.nice_print(args, + "Ignore the OS Categorisation file: " + console.text_color.yellow + str(meshbook["ignore_categorisation"])) + if meshbook["ignore_categorisation"]: + console.nice_print(args, + console.text_color.red + "!!!!\n" + + console.text_color.yellow + + "Ignore categorisation is True.\nThis means that the program checks if the target Operating System is somewhere in the reported device Operating System." + + console.text_color.red + "\n!!!!") + else: + console.nice_print(args, + "Ignore the OS Categorisation file: " + console.text_color.yellow + "False") + + # TARGET TAG PRINTING + if "target_tag" in meshbook: + console.nice_print(args, + "Target Device tag given: " + console.text_color.yellow + meshbook["target_tag"]) + else: + console.nice_print(args, + "Target Device tag given: " + console.text_color.yellow + "All") + # TARGET PRINTING if "device" in meshbook: - console("Target device: " + text_color.yellow + str(meshbook["device"])) - + console.nice_print(args, + "Target device: " + console.text_color.yellow + str(meshbook["device"])) + elif "devices" in meshbook: + console.nice_print(args, + "Target devices: " + console.text_color.yellow + str(meshbook["devices"])) elif "group" in meshbook: - console("Target group: " + text_color.yellow + str(meshbook["group"])) + console.nice_print(args, + "Target group: " + console.text_color.yellow + str(meshbook["group"])) + elif "groups" in meshbook: + console.nice_print(args, + "Target groups: " + console.text_color.yellow + str(meshbook["groups"])) - console("Grace: " + text_color.yellow + str((not args.nograce))) # Negation of bool for correct explanation - console("Silent: " + text_color.yellow + "False") # Can be pre-defined because if silent flag was passed then none of this would be printed. + # RUNNING PARAMETERS PRINTING + console.nice_print(args, "Grace: " + console.text_color.yellow + str((not args.nograce))) # Negation of bool for correct explanation + console.nice_print(args, "Silent: " + console.text_color.yellow + "False") # Can be pre-defined because if silent flag was passed then none of this would be printed. session = await init_connection(credentials) - console(text_color.reset + ("-" * 40)) - console(text_color.italic + "Trying to load the MeshCentral account credential file...") - console(text_color.italic + "Trying to load the meshbook yaml file and compile it into something workable...") - console(text_color.italic + "Trying to load the Operating System categorisation JSON file...") - console(text_color.italic + "Connecting to MeshCentral and establish a session using variables from previous credential file.") - console(text_color.italic + "Generating group list with nodes and reference the targets from that.") + + # PROCESS PRINTING aka what its doing in the moment... + console.nice_print(args, + console.text_color.reset + ("-" * 40)) + console.nice_print(args, + console.text_color.italic + "Trying to load the MeshCentral account credential file...") + console.nice_print(args, + console.text_color.italic + "Trying to load the meshbook yaml file and compile it into something workable...") + console.nice_print(args, + console.text_color.italic + "Trying to load the Operating System categorisation JSON file...") + console.nice_print(args, + console.text_color.italic + "Connecting to MeshCentral and establish a session using variables from previous credential file.") + console.nice_print(args, + console.text_color.italic + "Generating group list with nodes and reference the targets from that.") ''' End of the main information displaying section. ''' - group_list = await compile_group_list(session) - targets_list = await gather_targets(meshbook, group_list, os_categories) + group_list = await transform.compile_group_list(session) + compiled_device_list = await gather_targets(args, meshbook, group_list, os_categories) - if len(targets_list) == 0: - console(text_color.red + "No targets found or targets unreachable, quitting.", True) - console(text_color.reset + ("-" * 40), True) + if len(compiled_device_list["target_list"]) == 0: + console.nice_print(args, console.text_color.red + "No targets found or targets unreachable, quitting.", True) + console.nice_print(args, console.text_color.reset + ("-" * 40), True) else: - console(text_color.reset + ("-" * 40)) + console.nice_print(args, console.text_color.reset + ("-" * 40)) match meshbook: case {"group": candidate_target_name}: @@ -387,22 +288,26 @@ async def main(): case {"devices": candidate_target_name}: target_name = str(candidate_target_name) - console(text_color.yellow + "Executing meshbook on the target(s): " + text_color.green + target_name + ".") + console.nice_print(args, console.text_color.yellow + "Executing meshbook on the target(s): " + console.text_color.green + target_name + ".") if not args.nograce: - console(text_color.yellow + "Initiating grace-period...") + console.nice_print(args, console.text_color.yellow + "Initiating grace-period...") for x in range(grace_period): - console(text_color.yellow + "{}...".format(x+1)) # Countdown! + console.nice_print(args, console.text_color.yellow + "{}...".format(x+1)) # Countdown! await asyncio.sleep(1) - console(text_color.reset + ("-" * 40)) - await execute_meshbook(session, targets_list, meshbook, group_list) + console.nice_print(args, console.text_color.reset + ("-" * 40)) + await executor.execute_meshbook(args, + session, + compiled_device_list, + meshbook, + group_list) await session.close() except OSError as message: - console(text_color.red + message, True) + console.nice_print(args, console.text_color.red + message, True) if __name__ == "__main__": - asyncio.run(main()) + asyncio.run(main()) \ No newline at end of file diff --git a/modules/console.py b/modules/console.py new file mode 100644 index 0000000..be8407c --- /dev/null +++ b/modules/console.py @@ -0,0 +1,25 @@ +# Public Python libraries +import argparse + +class console: + class text_color: + black = "\033[30m" + red = "\033[31m" + green = "\033[32m" + yellow = "\033[33m" + blue = "\033[34m" + magenta = "\033[35m" + cyan = "\033[36m" + white = "\033[37m" + italic = "\x1B[3m" + reset = "\x1B[0m" + + def nice_print(args: argparse.Namespace, message: str, final: bool=False): + ''' + Helper function for terminal output, with a couple variables for the silent flag. Also clears terminal color each time. + ''' + + if final: + print(message) # Assuming final message, there is no need for clearing. + elif not args.silent: + print(message + console.text_color.reset) \ No newline at end of file diff --git a/modules/executor.py b/modules/executor.py new file mode 100644 index 0000000..d19adc9 --- /dev/null +++ b/modules/executor.py @@ -0,0 +1,63 @@ +# Public Python libraries +import argparse +import json +import meshctrl +from time import sleep + +# Local Python libraries/modules +from modules.console import console +from modules.utilities import transform + +class executor: + async def execute_meshbook(args: argparse.Namespace, session: meshctrl.Session, compiled_device_list: dict, meshbook: dict, group_list: dict) -> None: + ''' + Actual function that handles meshbook execution, also responsible for formatting the resulting JSON. + ''' + + responses_list = {} + targets = compiled_device_list["target_list"] + offline = compiled_device_list["offline_list"] + round = 1 + + for task in meshbook["tasks"]: + console.nice_print(args, + console.text_color.green + str(round) + ". Running: " + task["name"]) + + if "powershell" in meshbook and meshbook["powershell"]: + response = await session.run_command(nodeids=targets, command=task["command"],powershell=True,ignore_output=False,timeout=900) + else: + response = await session.run_command(nodeids=targets, command=task["command"],ignore_output=False,timeout=900) + + task_batch = [] + for device in response: + device_result = response[device]["result"] + response[device]["result"] = device_result.replace("Run commands completed.", "") + response[device]["device_id"] = device + response[device]["device_name"] = await transform.translate_nodeid_to_name(device, group_list) + task_batch.append(response[device]) + + responses_list["Task " + str(round)] = { + "task_name": task["name"], + "data": task_batch + } + round += 1 + sleep(0.5) # Sleep for 0.5 seconds. + + for index, device in enumerate(offline): # Replace Device_id with actual human readable name + device_name = await transform.translate_nodeid_to_name(device, group_list) + offline[index] = device_name + responses_list["Offline"] = offline + + console.nice_print(args, + console.text_color.reset + ("-" * 40)) + + if args.indent: + if not args.raw_result: + responses_list = transform.process_shell_response(args.shlex, responses_list) + console.nice_print(args, + json.dumps(responses_list,indent=4), True) + + + else: + console.nice_print(args, + json.dumps(responses_list), True) \ No newline at end of file diff --git a/modules/utilities.py b/modules/utilities.py new file mode 100644 index 0000000..57d00d7 --- /dev/null +++ b/modules/utilities.py @@ -0,0 +1,219 @@ +# Public Python libraries +import argparse +from configparser import ConfigParser +import meshctrl +import os +import yaml + +''' +Creation and compilation of the MeshCentral nodes list (list of all nodes available to the user in the configuration) is handled in the following section. +''' + +class utilities: + async def load_config(args: argparse.Namespace, + segment: str = 'meshcentral-account') -> dict: + ''' + Function that loads the segment from the config.conf (by default) file and returns the it in a dict. + ''' + + conf_file = args.conf + if not os.path.exists(conf_file): + print(f'Missing config file {conf_file}. Provide an alternative path.') + os._exit(1) + + config = ConfigParser() + try: + config.read(conf_file) + except Exception as err: + print(f"Error reading configuration file '{conf_file}': {err}") + os._exit(1) + + if segment not in config: + print(f'Segment "{segment}" not found in config file {conf_file}.') + os._exit(1) + + return config[segment] + + async def compile_book(meshbook_file: dict) -> dict: + ''' + Simple function that opens the file and replaces placeholders through the next function. After that just return it. + ''' + + meshbook = open(meshbook_file, 'r') + meshbook = await transform.replace_placeholders(yaml.safe_load(meshbook)) + return meshbook + + def get_os_variants(target_category: str, + os_map: dict) -> set: + ''' + Extracts all OS names under a given category if it exists. + ''' + + for key, value in os_map.items(): + if key == target_category: + + if isinstance(value, dict): # Expand nested categories + os_set = set() + + for sub_target_cat in value: + os_set.update(utilities.get_os_variants(sub_target_cat, value)) + + return os_set + + elif isinstance(value, list): # Direct OS list + return set(value) + + return set() + + async def filter_targets(devices: list[dict], + os_categories: dict, + target_os: str = None, + ignore_categorisation: bool = False, + target_tag: str = None) -> dict: + ''' + Filters devices based on reachability and optional OS criteria, supporting nested OS categories. + ''' + + valid_devices = [] + offline_devices = [] + + # Identify correct OS filtering scope + for key in os_categories: + if key == target_os: + allowed_os = utilities.get_os_variants(target_os, os_categories) + break # Stop searching once a match is found + + if isinstance(os_categories[key], dict) and target_os in os_categories[key]: + allowed_os = utilities.get_os_variants(target_os, os_categories[key]) + break # Stop searching once a match is found + + for device in devices: # Filter out unwanted or unreachable devices. + if target_tag and target_tag not in device["device_tags"]: + continue + + if not ignore_categorisation: + if device["device_os"] not in allowed_os: + continue + else: + if target_os not in device["device_os"]: + continue + + if not device["reachable"]: + offline_devices.append(device["device_id"]) + continue + + valid_devices.append(device["device_id"]) + + return { + "valid_devices": valid_devices, + "offline_devices": offline_devices + } + + async def process_device_or_group(pseudo_target: str, + group_list: dict, + os_categories: dict, + target_os: str, + ignore_categorisation: bool, + target_tag: str) -> dict: + ''' + Helper function to process devices or groups. + ''' + + matched_devices = [] + for group in group_list: + for device in group_list[group]: + if device["device_name"] == pseudo_target: + matched_devices.append(device) + + if matched_devices: + return await utilities.filter_targets(matched_devices, os_categories, target_os, ignore_categorisation, target_tag) + return [] + +import shlex +class transform: + def process_shell_response(shlex_enable: bool, meshbook_result: dict) -> dict: + for task_name, task_data in meshbook_result.items(): + if task_name == "Offline": # Failsafe + continue + + for node_responses in task_data["data"]: + task_result = node_responses["result"].splitlines() + + if shlex_enable: + for index, line in enumerate(task_result): + line = shlex.split(line) + task_result[index] = line + + clean_output = [] + for line in task_result: + if len(line) > 0: + clean_output.append(line) + + node_responses["result"] = clean_output + return meshbook_result + + async def translate_nodeid_to_name(target_id: str, group_list: dict) -> str: + ''' + Simple function that looks up nodeid to the human-readable name if existent - otherwise return None. + ''' + + for group in group_list: + for device in group_list[group]: + if device["device_id"] == target_id: + return device["device_name"] + return None + + async def replace_placeholders(meshbook: dict) -> dict: + ''' + Replace the placeholders in both name and command fields of the tasks. According to the variables defined in the variables list. + ''' + + variables = {} + if "variables" in meshbook and isinstance(meshbook["variables"], list): + for var in meshbook["variables"]: + var_name = var["name"] + var_value = var["value"] + variables[var_name] = var_value + + else: + return meshbook + + for task in meshbook.get("tasks", []): + task_name = task.get("name") + + for var_name, var_value in variables.items(): + placeholder = f"{{{{ {var_name} }}}}" + task_name = task_name.replace(placeholder, var_value) + + task["name"] = task_name + + command = task.get("command") + for var_name, var_value in variables.items(): + placeholder = f"{{{{ {var_name} }}}}" # Create the placeholder string like "{{ host1 }}" + command = command.replace(placeholder, var_value) + + task["command"] = command + + return meshbook + + async def compile_group_list(session: meshctrl.Session) -> dict: + ''' + Function that retrieves the devices from MeshCentral and compiles it into a efficient list. + ''' + + devices_response = await session.list_devices(details=False, timeout=10) + + local_device_list = {} + for device in devices_response: + if device.meshname not in local_device_list: + local_device_list[device.meshname] = [] + + local_device_list[device.meshname].append({ + "device_id": device.nodeid, + "device_name": device.name, + "device_os": device.os_description, + "device_tags": device.tags, + "reachable": device.connected + }) + + return local_device_list \ No newline at end of file