From f34d1dc7ae949ac9436e032c51c7ceb1b018daa7 Mon Sep 17 00:00:00 2001 From: Daan Selen Date: Wed, 10 Sep 2025 16:26:52 +0200 Subject: [PATCH] refac(meshbook): rework the targeting module to now be case insensitive --- README.md | 6 +- meshbook.py | 194 +++++++++++++++++++++++-------------------- modules/utilities.py | 44 ++++++---- 3 files changed, 135 insertions(+), 109 deletions(-) diff --git a/README.md b/README.md index 5cbade7..45d2c9d 100644 --- a/README.md +++ b/README.md @@ -92,7 +92,7 @@ python3 meshbook.py --help Meshbook configurations are written in YAML. Below is an overview of supported fields. -### ▶️ Group Targeting +### ▶️ Group Targeting (Primary*) ```yaml --- @@ -110,7 +110,7 @@ tasks: * `group`: MeshCentral group (aka "mesh"). Quotation marks required for multi-word names. * `powershell`: Set `true` for PowerShell commands on Windows clients. -### ▶️ Device Targeting +### ▶️ Device Targeting (Secondary*) You can also target a **specific device** rather than a group. See [`apt_update_example.yaml`](./examples/linux/apt_update_example.yaml) for reference. @@ -128,6 +128,8 @@ tasks: command: 'echo "{{ example_var }}"' ``` +* Primary and Secondary mark the order in which will take prescendence + ### ▶️ Tasks Define multiple tasks: diff --git a/meshbook.py b/meshbook.py index 391b517..4da5eaa 100644 --- a/meshbook.py +++ b/meshbook.py @@ -43,117 +43,129 @@ async def init_connection(credentials: dict) -> meshctrl.Session: async def gather_targets(args: argparse.Namespace, meshbook: dict, group_list: dict[str, list[dict]], - os_categories: dict) -> list[str]: - ''' - Finds target devices based on meshbook criteria (device or group). - ''' + os_categories: dict) -> dict: + """ + Finds target devices based on meshbook criteria (device, devices, group or groups). + """ + group_list = {k.lower(): v for k, v in group_list.items()} # Normalize keys target_list = [] offline_list = [] + target_os = meshbook.get("target_os") ignore_categorisation = meshbook.get("ignore_categorisation", False) target_tag = meshbook.get("target_tag") + async def add_processed_devices(processed): + """Helper to update target and offline lists.""" + if processed: + target_list.extend(processed.get("valid_devices", [])) + offline_list.extend(processed.get("offline_devices", [])) + + async def process_device_helper(device): + processed = await utilities.process_device( + device, + group_list, + os_categories, + target_os, + ignore_categorisation, + target_tag, + add_processed_devices + ) + await add_processed_devices(processed) + + async def process_group_helper(group): + processed = await utilities.filter_targets( + group, os_categories, target_os, ignore_categorisation, target_tag + ) + await add_processed_devices(processed) + + ''' + Groups receive the first priority, then device targets. + ''' match meshbook: - case {"device": pseudo_target}: # Single device target + case {"group": pseudo_target}: if isinstance(pseudo_target, str): - processed_devices = await utilities.process_device_or_group(pseudo_target, - group_list, - os_categories, - target_os, - ignore_categorisation, - target_tag) - if len(processed_devices) > 0: - matched_devices = processed_devices["valid_devices"] - target_list.extend(matched_devices) - if len(processed_devices) > 0: - offline_devices = processed_devices["offline_devices"] - offline_list.extend(offline_devices) + pseudo_target = pseudo_target.lower() + if pseudo_target in group_list: + await process_group_helper(group_list[pseudo_target]) + + elif pseudo_target not in group_list: + console.nice_print( + args, + console.text_color.yellow + "Targeted group not found on the MeshCentral server.", + True + ) + elif isinstance(pseudo_target, list): + console.nice_print( + args, + console.text_color.yellow + "Please use groups (Notice the plural with 'S') for multiple groups.", + True + ) else: - console.nice_print(args, - console.text_color.yellow + "Please use devices (Notice the 'S') for multiple devices.", True) + console.nice_print( + args, + console.text_color.yellow + "The 'group' key is being used, but an unknown data type was found, please check your values.", + True + ) - case {"devices": pseudo_target}: # List of devices + case {"groups": pseudo_target}: if isinstance(pseudo_target, list): - for sub_pseudo_device in pseudo_target: - processed_devices = await utilities.process_device_or_group(sub_pseudo_device, - group_list, - os_categories, - target_os, - ignore_categorisation, - target_tag,) - if len(processed_devices) > 0: - matched_devices = processed_devices["valid_devices"] - target_list.extend(matched_devices) - if len(processed_devices) > 0: - offline_devices = processed_devices["offline_devices"] - offline_list.extend(offline_devices) - + for sub_group in pseudo_target: + sub_group = sub_group.lower() + if sub_group in group_list: + await process_group_helper(group_list[sub_group]) + elif isinstance(pseudo_target, str) and pseudo_target.lower() == "all": + for group in group_list.values(): + await process_group_helper(group) + elif isintance(pseudo_target, str): + console.nice_print( + args, + console.text_color.yellow + "The 'groups' key is being used, but only one string is given. Did you mean 'group'?", + True + ) else: - console.nice_print(args, console.text_color.yellow + "The 'devices' method is being used, but only one string is given. Did you mean 'device'?", True) - - case {"group": pseudo_target}: # Single group target - if isinstance(pseudo_target, str) and pseudo_target in group_list: - processed_devices = await utilities.filter_targets(group_list[pseudo_target], - os_categories, - target_os, - ignore_categorisation, - target_tag) - if len(processed_devices) > 0: - matched_devices = processed_devices["valid_devices"] - target_list.extend(matched_devices) - if len(processed_devices) > 0: - offline_devices = processed_devices["offline_devices"] - offline_list.extend(offline_devices) - - elif pseudo_target not in group_list: - console.nice_print(args, - console.text_color.yellow + "Targeted group not found on the MeshCentral server.", True) + console.nice_print( + args, + console.text_color.yellow + "The 'groups' key is being used, but an unknown data type was found, please check your values.", + True + ) + case {"device": pseudo_target}: + if isinstance(pseudo_target, str): + await process_device_helper(pseudo_target) + elif isinstance(pseudo_target, list): + console.nice_print( + args, + console.text_color.yellow + "Please use devices (Notice the plural with 'S') for multiple devices.", + True + ) else: - console.nice_print(args, - console.text_color.yellow + "Please use groups (Notice the 'S') for multiple groups.", True) + console.nice_print( + args, + console.text_color.yellow + "The 'device' key is being used, but an unknown data type was found, please check your values.", + True + ) - - case {"groups": pseudo_target}: # List of groups + case {"devices": pseudo_target}: if isinstance(pseudo_target, list): - for sub_pseudo_target in pseudo_target: - if sub_pseudo_target in group_list: - processed_devices = await utilities.filter_targets(group_list[sub_pseudo_target], - os_categories, - target_os, - ignore_categorisation, - target_tag) - if len(processed_devices) > 0: - matched_devices = processed_devices["valid_devices"] - target_list.extend(matched_devices) - if len(processed_devices) > 0: - offline_devices = processed_devices["offline_devices"] - offline_list.extend(offline_devices) - - elif pseudo_target.lower() == "all": - for group in group_list: - processed_devices = await utilities.filter_targets(group_list[group], - os_categories, - target_os, - ignore_categorisation, - target_tag) - if len(processed_devices) > 0: - matched_devices = processed_devices["valid_devices"] - target_list.extend(matched_devices) - if len(processed_devices) > 0: - offline_devices = processed_devices["offline_devices"] - offline_list.extend(offline_devices) - + for sub_device in pseudo_target: + await process_device_helper(sub_device) + elif isinstance(pseudo_target, str): + console.nice_print( + args, + console.text_color.yellow + "The 'devices' key is being used, but only one string is given. Did you mean 'device'?", + True + ) else: - console.nice_print(args, - console.text_color.yellow + "The 'groups' method is being used, but only one string is given. Did you mean 'group'?", True) + console.nice_print( + args, + console.text_color.yellow + "The 'devices' key is being used, but an unknown data type was found, please check your values.", + True + ) - return { - "target_list": target_list, - "offline_list": offline_list - } + return {"target_list": target_list, "offline_list": offline_list} async def main(): just_fix_windows_console() diff --git a/modules/utilities.py b/modules/utilities.py index 57d00d7..8437d1c 100644 --- a/modules/utilities.py +++ b/modules/utilities.py @@ -109,25 +109,37 @@ class utilities: "offline_devices": offline_devices } - async def process_device_or_group(pseudo_target: str, - group_list: dict, - os_categories: dict, - target_os: str, - ignore_categorisation: bool, - target_tag: str) -> dict: - ''' - Helper function to process devices or groups. - ''' - + async def process_device(device: str, + group_list: dict, + os_categories: dict, + target_os: str, + ignore_categorisation: bool, + target_tag: str, + add_processed_devices=None) -> dict: + """ + Processes a single device or pseudo-target against group_list, + filters matches by OS and tags, and adds processed devices. + """ matched_devices = [] - for group in group_list: - for device in group_list[group]: - if device["device_name"] == pseudo_target: - matched_devices.append(device) + pseudo_target = device.lower() + # Find devices that match the pseudo_target + for group in group_list: + for dev in group_list[group]: + if dev["device_name"].lower() == pseudo_target: + matched_devices.append(dev) + + # If matches found, filter them and add processed devices if matched_devices: - return await utilities.filter_targets(matched_devices, os_categories, target_os, ignore_categorisation, target_tag) - return [] + processed = await utilities.filter_targets( + matched_devices, os_categories, target_os, ignore_categorisation, target_tag + ) + if add_processed_devices: + await add_processed_devices(processed) + return processed + + # No matches found + return {"valid_devices": [], "offline_devices": []} import shlex class transform: