From 9494aa14c9a89f2dc27f09a64aa16b33b665cf1c Mon Sep 17 00:00:00 2001 From: Daan Selen Date: Thu, 13 Feb 2025 11:44:53 +0100 Subject: [PATCH] Changed logic in device filtering and made code a little more readable with newlines. --- examples/aggregate_example.yaml | 3 +- meshbook.py | 69 +++++++++++++++++++++++---------- 2 files changed, 50 insertions(+), 22 deletions(-) diff --git a/examples/aggregate_example.yaml b/examples/aggregate_example.yaml index d8758be..b43ded5 100644 --- a/examples/aggregate_example.yaml +++ b/examples/aggregate_example.yaml @@ -1,6 +1,7 @@ --- name: Echo some text in the terminal of the device -group: "Dev" +group: "Development" +target_os: "Debian" variables: - name: package_manager value: "apt" diff --git a/meshbook.py b/meshbook.py index a36b49f..05a32ae 100644 --- a/meshbook.py +++ b/meshbook.py @@ -95,18 +95,21 @@ async def compile_group_list(session: meshctrl.Session) -> dict: }) return local_device_list -async def filter_devices(devices: list[dict], os_categories: dict, target_os: str = None) -> list[str]: +async def filter_targets(devices: list[dict], os_categories: dict, target_os: str = None) -> list[str]: """Filters devices based on reachability and optional OS criteria.""" valid_devices = [] for device in devices: if not device["reachable"]: - continue # Skip unreachable devices + continue # Skip unreachable devices. - if target_os: - if target_os in os_categories: - if device["device_os"] not in os_categories[target_os]: - continue # Skip if the device's OS is not in the allowed OS category + match target_os: + case None: # No filtering needed. + pass + case _ if target_os in os_categories and device["device_os"] in os_categories[target_os]: + pass # Allowed OS, continue processing. + case _: + continue # Skip if the OS is not in the allowed category. valid_devices.append(device["device_id"]) @@ -124,14 +127,14 @@ async def gather_targets(playbook: dict, group_list: dict[str, list[dict]], os_c for group in group_list: for device in group_list[group]: if device["device_name"] == pseudo_target: - matched_devices = await filter_devices([device], os_categories, target_os) + matched_devices = await filter_targets([device], os_categories, target_os) target_list.extend(matched_devices) elif "group" in playbook: pseudo_target = playbook["group"] if pseudo_target in group_list: - matched_devices = await filter_devices(group_list[pseudo_target], os_categories, target_os) + matched_devices = await filter_targets(group_list[pseudo_target], os_categories, target_os) target_list.extend(matched_devices) return target_list @@ -139,6 +142,7 @@ async def gather_targets(playbook: dict, group_list: dict[str, list[dict]], os_c async def execute_playbook(session: meshctrl.Session, targets: dict, playbook: dict, group_list: dict) -> None: responses_list = {} round = 1 + for task in playbook["tasks"]: console(("\033[1m\033[92m" + str(round) + ". Running: " + task["name"] + "\033[0m")) response = await session.run_command(nodeids=targets, command=task["command"],ignore_output=False,timeout=900) @@ -158,7 +162,11 @@ async def execute_playbook(session: meshctrl.Session, targets: dict, playbook: d round += 1 console(("-" * 40)) - console((json.dumps(responses_list,indent=4)), True) + if args.indent: + console((json.dumps(responses_list,indent=4)), True) + + else: + console(json.dumps(responses_list), True) async def main(): parser = argparse.ArgumentParser(description="Process command-line arguments") @@ -167,23 +175,13 @@ async def main(): parser.add_argument("-oc", "--oscategories", type=str, help="Path to the Operating System categories JSON file.", required=False, default="./os_categories.json") parser.add_argument("--conf", type=str, help="Path for the API configuration file (default: ./meshcentral.conf).", required=False, default="./meshcentral.conf") parser.add_argument("--nograce", action="store_true", help="Disable the grace 3 seconds before running the playbook.", required=False) + parser.add_argument("-i", "--indent", action="store_true", help="Use an JSON indentation of 4 when this flag is passed.", required=False) parser.add_argument("-s", "--silent", action="store_true", help="Suppress terminal output", required=False) global args args = parser.parse_args() local_categories_file = "./os_categories.json" - console(("-" * 40)) - console("Playbook: " + args.playbook) - console("Operating System Categorisation file: " + args.oscategories) - console("Congiguration file: " + args.conf) - console("Grace: " + str((not args.nograce))) # Negation of bool for correct explanation - console("Silent: False") # Can be pre-defined because if silent flag was passed then none of this would be printed. - - console(("-" * 40)) - console(("\x1B[3mTrying to load the MeshCentral account credential file...\x1B[0m")) - console(("\x1B[3mTrying to load the Playbook yaml file and compile it into something workable...\x1B[0m")) - console(("\x1B[3mTrying to load the Operating System categorisation JSON file...\x1B[0m")) try: with open(local_categories_file, "r") as file: os_categories = json.load(file) @@ -193,8 +191,33 @@ async def main(): (compile_book(args.playbook)) ) - console(("\x1B[3mConnecting to MeshCentral and establish a session using variables from previous credential file.\x1B[0m")) + ''' + The following section mainly displays used variables and first steps of the program to the console. + ''' + + console(("-" * 40)) + console("Playbook: " + args.playbook) + console("Operating System Categorisation file: " + args.oscategories) + console("Congiguration file: " + args.conf) + if "device" in playbook: + console("Target device: " + playbook["device"]) + + elif "group" in playbook: + console("Target group: " + playbook["group"]) + + console("Grace: " + str((not args.nograce))) # Negation of bool for correct explanation + console("Silent: False") # Can be pre-defined because if silent flag was passed then none of this would be printed. + session = await init_connection(credentials) + console(("-" * 40)) + console(("\x1B[3mTrying to load the MeshCentral account credential file...\x1B[0m")) + console(("\x1B[3mTrying to load the Playbook yaml file and compile it into something workable...\x1B[0m")) + console(("\x1B[3mTrying to load the Operating System categorisation JSON file...\x1B[0m")) + console(("\x1B[3mConnecting to MeshCentral and establish a session using variables from previous credential file.\x1B[0m")) + + ''' + End of the main information displaying section. + ''' console(("\x1B[3mGenerating group list with nodes and reference the targets from that.\x1B[0m")) group_list = await compile_group_list(session) @@ -202,6 +225,8 @@ async def main(): if len(targets_list) == 0: console(("\033[91mNo targets found or targets unreachable, quitting.\x1B[0m"), True) + console(("-" * 40), True) + else: console(("-" * 40)) target_name = playbook["group"] if "group" in playbook else playbook["device"] # Quickly get the name. @@ -209,9 +234,11 @@ async def main(): if not args.nograce: console(("\033[91mInitiating grace-period...\x1B[0m")) + for x in range(3): console(("\033[91m{}...\x1B[0m".format(x+1))) await asyncio.sleep(1) + console(("-" * 40)) await execute_playbook(session, targets_list, playbook, group_list)