6 Commits

Author SHA1 Message Date
9d2999476d refac: added some type checking and fixed a duplicate bug (apparently) 2025-09-25 09:49:19 +02:00
DaanSelen
82cc31e0f6 Update executor.py 2025-09-16 10:37:13 +02:00
f34d1dc7ae refac(meshbook): rework the targeting module to now be case insensitive 2025-09-10 16:26:52 +02:00
DaanSelen
f0e9e40cca chore(docs): README correction 2025-09-10 13:28:42 +02:00
208e9c1223 chore(os-handling): added Debian 13 to the default Debian list 2025-08-26 13:51:18 +02:00
DaanSelen
a736a74af6 Added more robust code checking (#15)
Co-authored-by: Daan Selen <dselen@systemec.nl>
2025-08-07 13:56:38 +02:00
6 changed files with 173 additions and 124 deletions

View File

@@ -92,7 +92,7 @@ python3 meshbook.py --help
Meshbook configurations are written in YAML. Below is an overview of supported fields.
### ▶️ Group Targeting
### ▶️ Group Targeting (Primary*)
```yaml
---
@@ -110,7 +110,7 @@ tasks:
* `group`: MeshCentral group (aka "mesh"). Quotation marks required for multi-word names.
* `powershell`: Set `true` for PowerShell commands on Windows clients.
### ▶️ Device Targeting
### ▶️ Device Targeting (Secondary*)
You can also target a **specific device** rather than a group. See [`apt_update_example.yaml`](./examples/linux/apt_update_example.yaml) for reference.
@@ -128,6 +128,8 @@ tasks:
command: 'echo "{{ example_var }}"'
```
* Primary and Secondary mark the order in which will take prescendence
### ▶️ Tasks
Define multiple tasks:
@@ -277,4 +279,4 @@ meshbook/
## 📄 License
This project is licensed under the terms of the MIT License. See [LICENSE](./LICENSE).
This project is licensed under the terms of the GPL3 License. See [LICENSE](./LICENSE).

View File

@@ -43,117 +43,132 @@ async def init_connection(credentials: dict) -> meshctrl.Session:
async def gather_targets(args: argparse.Namespace,
meshbook: dict,
group_list: dict[str, list[dict]],
os_categories: dict) -> list[str]:
'''
Finds target devices based on meshbook criteria (device or group).
'''
os_categories: dict) -> dict:
"""
Finds target devices based on meshbook criteria (device, devices, group or groups).
"""
group_list = {k.lower(): v for k, v in group_list.items()} # Normalize keys
target_list = []
offline_list = []
target_os = meshbook.get("target_os")
ignore_categorisation = meshbook.get("ignore_categorisation", False)
target_tag = meshbook.get("target_tag")
ignore_categorisation = meshbook.get("ignore_categorisation", False)
assert target_os is not None, "target_os must be provided"
assert target_tag is not None, "target_tag must be provided"
async def add_processed_devices(processed):
"""Helper to update target and offline lists."""
if processed:
target_list.extend(processed.get("valid_devices", []))
offline_list.extend(processed.get("offline_devices", []))
async def process_device_helper(device):
processed = await utilities.process_device(
device,
group_list,
os_categories,
target_os,
ignore_categorisation,
target_tag,
add_processed_devices
)
await add_processed_devices(processed)
async def process_group_helper(group):
processed = await utilities.filter_targets(
group, os_categories, target_os, ignore_categorisation, target_tag
)
await add_processed_devices(processed)
'''
Groups receive the first priority, then device targets.
'''
match meshbook:
case {"device": pseudo_target}: # Single device target
case {"group": pseudo_target}:
if isinstance(pseudo_target, str):
processed_devices = await utilities.process_device_or_group(pseudo_target,
group_list,
os_categories,
target_os,
ignore_categorisation,
target_tag)
if len(processed_devices) > 0:
matched_devices = processed_devices["valid_devices"]
target_list.extend(matched_devices)
if len(processed_devices) > 0:
offline_devices = processed_devices["offline_devices"]
offline_list.extend(offline_devices)
pseudo_target = pseudo_target.lower()
if pseudo_target in group_list:
await process_group_helper(group_list[pseudo_target])
elif pseudo_target not in group_list:
console.nice_print(
args,
console.text_color.yellow + "Targeted group not found on the MeshCentral server.",
True
)
elif isinstance(pseudo_target, list):
console.nice_print(
args,
console.text_color.yellow + "Please use groups (Notice the plural with 'S') for multiple groups.",
True
)
else:
console.nice_print(args,
console.text_color.yellow + "Please use devices (Notice the 'S') for multiple devices.", True)
console.nice_print(
args,
console.text_color.yellow + "The 'group' key is being used, but an unknown data type was found, please check your values.",
True
)
case {"devices": pseudo_target}: # List of devices
case {"groups": pseudo_target}:
if isinstance(pseudo_target, list):
for sub_pseudo_device in pseudo_target:
processed_devices = await utilities.process_device_or_group(sub_pseudo_device,
group_list,
os_categories,
target_os,
ignore_categorisation,
target_tag,)
if len(processed_devices) > 0:
matched_devices = processed_devices["valid_devices"]
target_list.extend(matched_devices)
if len(processed_devices) > 0:
offline_devices = processed_devices["offline_devices"]
offline_list.extend(offline_devices)
for sub_group in pseudo_target:
sub_group = sub_group.lower()
if sub_group in group_list:
await process_group_helper(group_list[sub_group])
elif isinstance(pseudo_target, str) and pseudo_target.lower() == "all":
for group in group_list.values():
await process_group_helper(group)
elif isinstance(pseudo_target, str):
console.nice_print(
args,
console.text_color.yellow + "The 'groups' key is being used, but only one string is given. Did you mean 'group'?",
True
)
else:
console.nice_print(args, console.text_color.yellow + "The 'devices' method is being used, but only one string is given. Did you mean 'device'?", True)
case {"group": pseudo_target}: # Single group target
if isinstance(pseudo_target, str) and pseudo_target in group_list:
processed_devices = await utilities.filter_targets(group_list[pseudo_target],
os_categories,
target_os,
ignore_categorisation,
target_tag)
if len(processed_devices) > 0:
matched_devices = processed_devices["valid_devices"]
target_list.extend(matched_devices)
if len(processed_devices) > 0:
offline_devices = processed_devices["offline_devices"]
offline_list.extend(offline_devices)
elif pseudo_target not in group_list:
console.nice_print(args,
console.text_color.yellow + "Targeted group not found on the MeshCentral server.", True)
console.nice_print(
args,
console.text_color.yellow + "The 'groups' key is being used, but an unknown data type was found, please check your values.",
True
)
case {"device": pseudo_target}:
if isinstance(pseudo_target, str):
await process_device_helper(pseudo_target)
elif isinstance(pseudo_target, list):
console.nice_print(
args,
console.text_color.yellow + "Please use devices (Notice the plural with 'S') for multiple devices.",
True
)
else:
console.nice_print(args,
console.text_color.yellow + "Please use groups (Notice the 'S') for multiple groups.", True)
console.nice_print(
args,
console.text_color.yellow + "The 'device' key is being used, but an unknown data type was found, please check your values.",
True
)
case {"groups": pseudo_target}: # List of groups
case {"devices": pseudo_target}:
if isinstance(pseudo_target, list):
for sub_pseudo_target in pseudo_target:
if sub_pseudo_target in group_list:
processed_devices = await utilities.filter_targets(group_list[sub_pseudo_target],
os_categories,
target_os,
ignore_categorisation,
target_tag)
if len(processed_devices) > 0:
matched_devices = processed_devices["valid_devices"]
target_list.extend(matched_devices)
if len(processed_devices) > 0:
offline_devices = processed_devices["offline_devices"]
offline_list.extend(offline_devices)
elif pseudo_target.lower() == "all":
for group in group_list:
processed_devices = await utilities.filter_targets(group_list[group],
os_categories,
target_os,
ignore_categorisation,
target_tag)
if len(processed_devices) > 0:
matched_devices = processed_devices["valid_devices"]
target_list.extend(matched_devices)
if len(processed_devices) > 0:
offline_devices = processed_devices["offline_devices"]
offline_list.extend(offline_devices)
for sub_device in pseudo_target:
await process_device_helper(sub_device)
elif isinstance(pseudo_target, str):
console.nice_print(
args,
console.text_color.yellow + "The 'devices' key is being used, but only one string is given. Did you mean 'device'?",
True
)
else:
console.nice_print(args,
console.text_color.yellow + "The 'groups' method is being used, but only one string is given. Did you mean 'group'?", True)
console.nice_print(
args,
console.text_color.yellow + "The 'devices' key is being used, but an unknown data type was found, please check your values.",
True
)
return {
"target_list": target_list,
"offline_list": offline_list
}
return {"target_list": target_list, "offline_list": offline_list}
async def main():
just_fix_windows_console()
@@ -199,10 +214,12 @@ async def main():
if args.group != "":
meshbook["group"] = args.group
del meshbook["device"]
if "device" in meshbook:
del meshbook["device"]
elif args.device != "":
meshbook["device"] = args.device
del meshbook["group"]
if "group" in meshbook:
del meshbook["group"]
'''
The following section mainly displays used variables and first steps of the program to the console.
@@ -289,7 +306,7 @@ async def main():
group_list = await transform.compile_group_list(session)
compiled_device_list = await gather_targets(args, meshbook, group_list, os_categories)
if len(compiled_device_list["target_list"]) == 0:
if "target_list" not in compiled_device_list or len(compiled_device_list["target_list"]) == 0:
console.nice_print(args,
console.text_color.red + "No targets found or targets unreachable, quitting.", True)
@@ -313,6 +330,10 @@ async def main():
case {"devices": candidate_target_name}:
target_name = str(candidate_target_name)
case _:
target_name = ""
console.nice_print(args,
console.text_color.yellow + "Executing meshbook on the target(s): " + console.text_color.green + target_name + console.text_color.yellow + ".")
@@ -336,7 +357,7 @@ async def main():
except OSError as message:
console.nice_print(args,
console.text_color.red + message, True)
console.text_color.red + f'{message}', True)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -14,6 +14,7 @@ class console:
italic = "\x1B[3m"
reset = "\x1B[0m"
@staticmethod
def nice_print(args: argparse.Namespace, message: str, final: bool=False):
'''
Helper function for terminal output, with a couple variables for the silent flag. Also clears terminal color each time.

View File

@@ -8,9 +8,10 @@ from time import sleep
from modules.console import console
from modules.utilities import transform
intertask_delay = 0.5
intertask_delay = 1
class executor:
@staticmethod
async def execute_meshbook(args: argparse.Namespace, session: meshctrl.Session, compiled_device_list: dict, meshbook: dict, group_list: dict) -> None:
'''
Actual function that handles meshbook execution, also responsible for formatting the resulting JSON.
@@ -28,7 +29,7 @@ class executor:
if "powershell" in meshbook and meshbook["powershell"]:
response = await session.run_command(nodeids=targets, command=task["command"],powershell=True,ignore_output=False,timeout=900)
else:
response = await session.run_command(nodeids=targets, command=task["command"],ignore_output=False,timeout=900)
response = await session.run_command(nodeids=targets, command=task["command"],powershell=False,ignore_output=False,timeout=900)
task_batch = []
for device in response:
@@ -62,4 +63,4 @@ class executor:
else:
console.nice_print(args,
json.dumps(responses_list), True)
json.dumps(responses_list), True)

View File

@@ -10,6 +10,7 @@ Creation and compilation of the MeshCentral nodes list (list of all nodes availa
'''
class utilities:
@staticmethod
async def load_config(args: argparse.Namespace,
segment: str = 'meshcentral-account') -> dict:
'''
@@ -32,17 +33,20 @@ class utilities:
print(f'Segment "{segment}" not found in config file {conf_file}.')
os._exit(1)
return config[segment]
return dict(config[segment])
async def compile_book(meshbook_file: dict) -> dict:
@staticmethod
async def compile_book(meshbook_file: str) -> dict:
'''
Simple function that opens the file and replaces placeholders through the next function. After that just return it.
'''
meshbook = open(meshbook_file, 'r')
with open(meshbook_file, 'r') as f:
meshbook = f.read()
meshbook = await transform.replace_placeholders(yaml.safe_load(meshbook))
return meshbook
@staticmethod
def get_os_variants(target_category: str,
os_map: dict) -> set:
'''
@@ -65,17 +69,19 @@ class utilities:
return set()
@staticmethod
async def filter_targets(devices: list[dict],
os_categories: dict,
target_os: str = None,
target_os: str = "",
ignore_categorisation: bool = False,
target_tag: str = None) -> dict:
target_tag: str = "") -> dict:
'''
Filters devices based on reachability and optional OS criteria, supporting nested OS categories.
'''
valid_devices = []
offline_devices = []
allowed_os = set()
# Identify correct OS filtering scope
for key in os_categories:
@@ -109,28 +115,42 @@ class utilities:
"offline_devices": offline_devices
}
async def process_device_or_group(pseudo_target: str,
group_list: dict,
os_categories: dict,
target_os: str,
ignore_categorisation: bool,
target_tag: str) -> dict:
'''
Helper function to process devices or groups.
'''
@staticmethod
async def process_device(device: str,
group_list: dict,
os_categories: dict,
target_os: str,
ignore_categorisation: bool,
target_tag: str,
add_processed_devices=None) -> dict:
"""
Processes a single device or pseudo-target against group_list,
filters matches by OS and tags, and adds processed devices.
"""
matched_devices = []
for group in group_list:
for device in group_list[group]:
if device["device_name"] == pseudo_target:
matched_devices.append(device)
pseudo_target = device.lower()
# Find devices that match the pseudo_target
for group in group_list:
for dev in group_list[group]:
if dev["device_name"].lower() == pseudo_target:
matched_devices.append(dev)
# If matches found, filter them and add processed devices
if matched_devices:
return await utilities.filter_targets(matched_devices, os_categories, target_os, ignore_categorisation, target_tag)
return []
processed = await utilities.filter_targets(
matched_devices, os_categories, target_os, ignore_categorisation, target_tag
)
if add_processed_devices:
await add_processed_devices(processed)
return processed
# No matches found
return {"valid_devices": [], "offline_devices": []}
import shlex
class transform:
@staticmethod
def process_shell_response(shlex_enable: bool, meshbook_result: dict) -> dict:
for task_name, task_data in meshbook_result.items():
if task_name == "Offline": # Failsafe
@@ -152,6 +172,7 @@ class transform:
node_responses["result"] = clean_output
return meshbook_result
@staticmethod
async def translate_nodeid_to_name(target_id: str, group_list: dict) -> str:
'''
Simple function that looks up nodeid to the human-readable name if existent - otherwise return None.
@@ -161,8 +182,9 @@ class transform:
for device in group_list[group]:
if device["device_id"] == target_id:
return device["device_name"]
return None
return ""
@staticmethod
async def replace_placeholders(meshbook: dict) -> dict:
'''
Replace the placeholders in both name and command fields of the tasks. According to the variables defined in the variables list.
@@ -196,6 +218,7 @@ class transform:
return meshbook
@staticmethod
async def compile_group_list(session: meshctrl.Session) -> dict:
'''
Function that retrieves the devices from MeshCentral and compiles it into a efficient list.

View File

@@ -2,6 +2,7 @@
{
"Linux": {
"Debian": [
"Debian GNU/Linux 13 (trixie)",
"Debian GNU/Linux 12 (bookworm)",
"Debian GNU/Linux 11 (bullseye)"
],