Rewrote Meshbook with submodules with classes (#12)

* Massive update, bringing a lot of QoL. RC

* 0.5 seconds delay between tasks.
    insignificant for humans.
    Like a thousand years for computers to get ready.

* Added new features such as ignore_categorisation.

* Version 1.3 RC

* Added defaults.
This commit is contained in:
dselen
2025-04-25 16:03:07 +02:00
committed by GitHub
parent ac4dd8994c
commit 58598d8d17
5 changed files with 527 additions and 298 deletions

View File

@@ -147,30 +147,47 @@ tasks:
The following response it received when executing the first yaml of the above files (without the `-s` parameters, which just outputs the below JSON).
```shell
~/meshbook$ python3 meshbook.py -pb examples/echo_example.yaml
$ python3 meshbook.py -mb books/aggregate_example.yaml -i --nograce -pr
----------------------------------------
Playbook: examples/echo_example.yaml
meshbook: books/aggregate_example.yaml
Operating System Categorisation file: ./os_categories.json
Congiguration file: ./meshcentral.conf
Target group: Development
Grace: True
Configuration file: ./config.conf
Target Operating System category given: Linux
Target group: Systemec Development
Grace: False
Silent: False
----------------------------------------
Trying to load the MeshCentral account credential file...
Trying to load the Playbook yaml file and compile it into something workable...
Trying to load the meshbook yaml file and compile it into something workable...
Trying to load the Operating System categorisation JSON file...
Connecting to MeshCentral and establish a session using variables from previous credential file.
Generating group list with nodes and reference the targets from that.
----------------------------------------
Executing playbook on the target(s): Development.
Initiating grace-period...
1...
2...
3...
----------------------------------------
1. Running: Echo!
1. Running: Ping!
----------------------------------------
{"Task 1": "ALL THE DATA"} # Not sharing due to PID
{
"Task 1": {
"task_name": "Ping Quad9 DNS",
"data": [
{
"complete": true,
"result": [
"PING 9.9.9.9 (9.9.9.9) 56(84) bytes of data.",
"64 bytes from 9.9.9.9: icmp_seq=1 ttl=61 time=26.8 ms",
"--- 9.9.9.9 ping statistics ---",
"1 packets transmitted, 1 received, 0% packet loss, time 0ms",
"rtt min/avg/max/mdev = 26.809/26.809/26.809/0.000 ms"
],
"command": "ping 9.9.9.9 -c 1",
"device_id": "yourn nodeip",
"device_name": "yournodename"
}
]
}
}
```
The above without `-s` is quite verbose. use `--help` to read about parameters and getting a minimal response for example.

View File

@@ -1,65 +1,20 @@
#!/bin/python3
# Public Python libraries
import argparse
import asyncio
from base64 import b64encode
from colorama import just_fix_windows_console
from configparser import ConfigParser
import json
import meshctrl
import os
import yaml
# Local Python libraries/modules
from modules.console import *
from modules.executor import *
from modules.utilities import *
meshbook_version = 1.3
grace_period = 3 # Grace period will last for x (by default 3) second(s).
'''
Script utilities are handled in the following section.
'''
class ScriptEndTrigger(Exception):
pass
class text_color:
black = "\033[30m"
red = "\033[31m"
green = "\033[32m"
yellow = "\033[33m"
blue = "\033[34m"
magenta = "\033[35m"
cyan = "\033[36m"
white = "\033[37m"
italic = "\x1B[3m"
reset = "\x1B[0m"
def console(message: str, final: bool=False):
'''
Helper function for terminal output, with a couple variables for the silent flag. Also clears terminal color each time.
'''
if final:
print(message) # Assuming final message, there is no need for clearing.
elif not args.silent:
print(message + text_color.reset)
async def load_config(segment: str = 'meshcentral-account') -> dict:
'''
Function that loads the segment from the config.conf (by default) file and returns the it in a dict.
'''
conf_file = args.conf
if not os.path.exists(conf_file):
raise ScriptEndTrigger(f'Missing config file {conf_file}. Provide an alternative path.')
config = ConfigParser()
try:
config.read(conf_file)
except Exception as err:
raise ScriptEndTrigger(f"Error reading configuration file '{conf_file}': {err}")
if segment not in config:
raise ScriptEndTrigger(f'Segment "{segment}" not found in config file {conf_file}.')
return config[segment]
async def init_connection(credentials: dict) -> meshctrl.Session:
'''
Use the libmeshctrl library to initiate a Secure Websocket (wss) connection to the MeshCentral instance.
@@ -73,234 +28,120 @@ async def init_connection(credentials: dict) -> meshctrl.Session:
await session.initialized.wait()
return session
async def translate_nodeid_to_name(target_id: str, group_list: dict) -> str:
'''
Simple function that looks up nodeid to the human-readable name if existent - otherwise return None.
'''
for group in group_list:
for device in group_list[group]:
if device["device_id"] == target_id:
return device["device_name"]
return None
'''
Creation and compilation happends in the following section, where the yaml gets read in, and edited accordingly.
'''
async def compile_book(meshbook_file: dict) -> dict:
'''
Simple function that opens the file and replaces placeholders through the next function. After that just return it.
'''
meshbook = open(meshbook_file, 'r')
meshbook = await replace_placeholders(yaml.safe_load(meshbook))
return meshbook
async def replace_placeholders(meshbook: dict) -> dict:
'''
Replace the placeholders in both name and command fields of the tasks. According to the variables defined in the variables list.
'''
variables = {}
if "variables" in meshbook and isinstance(meshbook["variables"], list):
for var in meshbook["variables"]:
var_name = var["name"]
var_value = var["value"]
variables[var_name] = var_value
else:
return meshbook
for task in meshbook.get("tasks", []):
task_name = task.get("name")
for var_name, var_value in variables.items():
placeholder = f"{{{{ {var_name} }}}}"
task_name = task_name.replace(placeholder, var_value)
task["name"] = task_name
command = task.get("command")
for var_name, var_value in variables.items():
placeholder = f"{{{{ {var_name} }}}}" # Create the placeholder string like "{{ host1 }}"
command = command.replace(placeholder, var_value)
task["command"] = command
return meshbook
'''
Creation and compilation of the MeshCentral nodes list (list of all nodes available to the user in the configuration) is handled in the following section.
'''
async def compile_group_list(session: meshctrl.Session) -> dict:
'''
Function that retrieves the devices from MeshCentral and compiles it into a efficient list.
'''
devices_response = await session.list_devices(details=False, timeout=10)
local_device_list = {}
for device in devices_response:
if device.meshname not in local_device_list:
local_device_list[device.meshname] = []
local_device_list[device.meshname].append({
"device_id": device.nodeid,
"device_name": device.name,
"device_os": device.os_description,
"device_tags": device.tags,
"reachable": device.connected
})
return local_device_list
async def filter_targets(devices: list[dict], os_categories: dict, target_os: str = None, target_tag: str = None) -> list[str]:
'''
Filters devices based on reachability and optional OS criteria, supporting nested OS categories.
'''
valid_devices = []
def get_os_variants(category: str, os_map: dict) -> set:
'''
Extracts all OS names under a given category if it exists.
'''
for key, value in os_map.items():
if key == category:
if isinstance(value, dict): # Expand nested categories
os_set = set()
for subcat in value:
os_set.update(get_os_variants(subcat, value))
return os_set
elif isinstance(value, list): # Direct OS list
return set(value)
return set()
allowed_os = set()
# Identify correct OS filtering scope
for key in os_categories:
if key == target_os:
allowed_os = get_os_variants(target_os, os_categories)
break # Stop searching once a match is found
if isinstance(os_categories[key], dict) and target_os in os_categories[key]:
allowed_os = get_os_variants(target_os, os_categories[key])
break # Stop searching once a match is found
# Filter out unwanted or unreachable devices.
for device in devices:
if not device["reachable"]:
continue # Skip unreachable devices.
if target_tag and target_tag not in device["device_tags"]:
continue
if device["device_os"] not in allowed_os:
continue
valid_devices.append(device["device_id"])
return valid_devices
async def gather_targets(meshbook: dict, group_list: dict[str, list[dict]], os_categories: dict) -> list[str]:
async def gather_targets(args: argparse.Namespace,
meshbook: dict,
group_list: dict[str, list[dict]],
os_categories: dict) -> list[str]:
'''
Finds target devices based on meshbook criteria (device or group).
'''
target_list = []
offline_list = []
target_os = meshbook.get("target_os")
ignore_categorisation = meshbook.get("ignore_categorisation", False)
target_tag = meshbook.get("target_tag")
async def process_device_or_group(pseudo_target, group_list, os_categories, target_os) -> list[str]:
'''
Helper function to process devices or groups.
'''
matched_devices = []
for group in group_list:
for device in group_list[group]:
if device["device_name"] == pseudo_target:
matched_devices.append(device)
if matched_devices:
return await filter_targets(matched_devices, os_categories, target_os, target_tag)
return []
match meshbook:
case {"device": pseudo_target}: # Single device target
if isinstance(pseudo_target, str):
matched_devices = await process_device_or_group(pseudo_target, group_list, os_categories, target_os)
target_list.extend(matched_devices)
processed_devices = await utilities.process_device_or_group(pseudo_target,
group_list,
os_categories,
target_os,
ignore_categorisation,
target_tag)
if len(processed_devices) > 0:
matched_devices = processed_devices["valid_devices"]
target_list.extend(matched_devices)
if len(processed_devices) > 0:
offline_devices = processed_devices["offline_devices"]
offline_list.extend(offline_devices)
else:
console(text_color.yellow + "Please use devices (Notice the 'S') for multiple devices.", True)
console.nice_print(args,
console.text_color.yellow + "Please use devices (Notice the 'S') for multiple devices.", True)
case {"devices": pseudo_target}: # List of devices
if isinstance(pseudo_target, list):
for sub_pseudo_device in pseudo_target:
matched_devices = await process_device_or_group(sub_pseudo_device, group_list, os_categories, target_os)
processed_devices = await utilities.process_device_or_group(sub_pseudo_device,
group_list,
os_categories,
target_os,
ignore_categorisation,
target_tag,)
if len(processed_devices) > 0:
matched_devices = processed_devices["valid_devices"]
target_list.extend(matched_devices)
if len(processed_devices) > 0:
offline_devices = processed_devices["offline_devices"]
offline_list.extend(offline_devices)
else:
console(text_color.yellow + "The 'devices' method is being used, but only one string is given. Did you mean 'device'?", True)
console.nice_print(args, console.text_color.yellow + "The 'devices' method is being used, but only one string is given. Did you mean 'device'?", True)
case {"group": pseudo_target}: # Single group target
if isinstance(pseudo_target, str) and pseudo_target in group_list:
matched_devices = await filter_targets(group_list[pseudo_target], os_categories, target_os, target_tag)
target_list.extend(matched_devices)
processed_devices = await utilities.filter_targets(group_list[pseudo_target],
os_categories,
target_os,
ignore_categorisation,
target_tag)
if len(processed_devices) > 0:
matched_devices = processed_devices["valid_devices"]
target_list.extend(matched_devices)
if len(processed_devices) > 0:
offline_devices = processed_devices["offline_devices"]
offline_list.extend(offline_devices)
elif pseudo_target not in group_list:
console(text_color.yellow + "Targeted group not found on the MeshCentral server.", True)
console.nice_print(args,
console.text_color.yellow + "Targeted group not found on the MeshCentral server.", True)
else:
console(text_color.yellow + "Please use groups (Notice the 'S') for multiple groups.", True)
console.nice_print(args,
console.text_color.yellow + "Please use groups (Notice the 'S') for multiple groups.", True)
case {"groups": pseudo_target}: # List of groups
if isinstance(pseudo_target, list):
for sub_pseudo_target in pseudo_target:
if sub_pseudo_target in group_list:
matched_devices = await filter_targets(group_list[sub_pseudo_target], os_categories, target_os, target_tag)
processed_devices = await utilities.filter_targets(group_list[sub_pseudo_target],
os_categories,
target_os,
ignore_categorisation,
target_tag)
if len(processed_devices) > 0:
matched_devices = processed_devices["valid_devices"]
target_list.extend(matched_devices)
if pseudo_target.lower() == "all":
if len(processed_devices) > 0:
offline_devices = processed_devices["offline_devices"]
offline_list.extend(offline_devices)
elif pseudo_target.lower() == "all":
for group in group_list:
matched_devices = await filter_targets(group_list[group], os_categories, target_os, target_tag)
target_list.extend(matched_devices)
processed_devices = await utilities.filter_targets(group_list[group],
os_categories,
target_os,
ignore_categorisation,
target_tag)
if len(processed_devices) > 0:
matched_devices = processed_devices["valid_devices"]
target_list.extend(matched_devices)
if len(processed_devices) > 0:
offline_devices = processed_devices["offline_devices"]
offline_list.extend(offline_devices)
else:
console(text_color.yellow + "The 'groups' method is being used, but only one string is given. Did you mean 'group'?", True)
console.nice_print(args,
console.text_color.yellow + "The 'groups' method is being used, but only one string is given. Did you mean 'group'?", True)
return target_list
async def execute_meshbook(session: meshctrl.Session, targets: dict, meshbook: dict, group_list: dict) -> None:
'''
Actual function that handles meshbook execution, also responsible for formatting the resulting JSON.
'''
responses_list = {}
round = 1
for task in meshbook["tasks"]:
console(text_color.green + str(round) + ". Running: " + task["name"])
if "powershell" in meshbook and meshbook["powershell"]:
response = await session.run_command(nodeids=targets, command=task["command"],powershell=True,ignore_output=False,timeout=900)
else:
response = await session.run_command(nodeids=targets, command=task["command"],ignore_output=False,timeout=900)
task_batch = []
for device in response:
device_result = response[device]["result"]
response[device]["result"] = device_result.replace("Run commands completed.", "")
response[device]["device_id"] = device
response[device]["device_name"] = await translate_nodeid_to_name(device, group_list)
task_batch.append(response[device])
responses_list["Task " + str(round)] = {
"task_name": task["name"],
"data": task_batch
}
round += 1
console(text_color.reset + ("-" * 40))
if args.indent:
console((json.dumps(responses_list,indent=4)), True)
else:
console(json.dumps(responses_list), True)
return {
"target_list": target_list,
"offline_list": offline_list
}
async def main():
just_fix_windows_console()
@@ -309,70 +150,130 @@ async def main():
'''
parser = argparse.ArgumentParser(description="Process command-line arguments")
parser.add_argument("-mb", "--meshbook", type=str, help="Path to the meshbook yaml file.", required=True)
parser.add_argument("-mb", "--meshbook", type=str, help="Path to the meshbook yaml file.")
parser.add_argument("-oc", "--oscategories", type=str, help="Path to the Operating System categories JSON file.", required=False, default="./os_categories.json")
parser.add_argument("--conf", type=str, help="Path for the API configuration file (default: ./config.conf).", required=False, default="./config.conf")
parser.add_argument("--nograce", action="store_true", help="Disable the grace 3 seconds before running the meshbook.", required=False)
parser.add_argument("-i", "--indent", action="store_true", help="Use an JSON indentation of 4 when this flag is passed.", required=False)
parser.add_argument("-s", "--silent", action="store_true", help="Suppress terminal output", required=False)
parser.add_argument("-oc", "--oscategories", type=str, help="Path to the Operating System categories JSON file.", default="./os_categories.json")
parser.add_argument("--conf", type=str, help="Path for the API configuration file (default: ./config.conf).", default="./config.conf")
parser.add_argument("--nograce", action="store_true", help="Disable the grace 3 seconds before running the meshbook.")
parser.add_argument("-i", "--indent", action="store_true", help="Use an JSON indentation of 4 when this flag is passed.")
parser.add_argument("-r", "--raw-result", action="store_true", help="Print the raw result.")
parser.add_argument("-s", "--silent", action="store_true", help="Suppress terminal output.")
parser.add_argument("--shlex", action="store_true", help="Shlex the lines.")
parser.add_argument("-v", "--version", action="store_true", help="Show the Meshbook version.")
global args
args = parser.parse_args()
local_categories_file = "./os_categories.json"
if args.version:
console.nice_print(args,
console.text_color.reset + "MeshBook Version: " + console.text_color.yellow + str(meshbook_version))
return
if not args.meshbook:
parser.print_help()
return
try:
with open(local_categories_file, "r") as file:
os_categories = json.load(file)
credentials, meshbook = await asyncio.gather(
(load_config()),
(compile_book(args.meshbook))
(utilities.load_config(args)),
(utilities.compile_book(args.meshbook))
)
'''
The following section mainly displays used variables and first steps of the program to the console.
'''
console(text_color.reset + ("-" * 40))
console("meshbook: " + text_color.yellow + args.meshbook)
console("Operating System Categorisation file: " + text_color.yellow + args.oscategories)
console("Configuration file: " + text_color.yellow + args.conf)
# INIT ARGUMENTS PRINTING
console.nice_print(args,
console.text_color.reset + ("-" * 40))
console.nice_print(args,
"meshbook: " + console.text_color.yellow + args.meshbook)
console.nice_print(args,
"Operating System Categorisation file: " + console.text_color.yellow + args.oscategories)
console.nice_print(args,
"Configuration file: " + console.text_color.yellow + args.conf)
# TARGET OS PRINTING
if "target_os" in meshbook:
console("Target Operating System category given: " + text_color.yellow + meshbook["target_os"])
console.nice_print(args,
"Target Operating System category given: " + console.text_color.yellow + meshbook["target_os"])
else:
console("Target Operating System category given: " + text_color.yellow + "All")
console.nice_print(args,
"Target Operating System category given: " + console.text_color.yellow + "All")
# Should Meshbook ignore categorisation?
if "ignore_categorisation" in meshbook:
console.nice_print(args,
"Ignore the OS Categorisation file: " + console.text_color.yellow + str(meshbook["ignore_categorisation"]))
if meshbook["ignore_categorisation"]:
console.nice_print(args,
console.text_color.red + "!!!!\n" +
console.text_color.yellow +
"Ignore categorisation is True.\nThis means that the program checks if the target Operating System is somewhere in the reported device Operating System." +
console.text_color.red + "\n!!!!")
else:
console.nice_print(args,
"Ignore the OS Categorisation file: " + console.text_color.yellow + "False")
# TARGET TAG PRINTING
if "target_tag" in meshbook:
console.nice_print(args,
"Target Device tag given: " + console.text_color.yellow + meshbook["target_tag"])
else:
console.nice_print(args,
"Target Device tag given: " + console.text_color.yellow + "All")
# TARGET PRINTING
if "device" in meshbook:
console("Target device: " + text_color.yellow + str(meshbook["device"]))
console.nice_print(args,
"Target device: " + console.text_color.yellow + str(meshbook["device"]))
elif "devices" in meshbook:
console.nice_print(args,
"Target devices: " + console.text_color.yellow + str(meshbook["devices"]))
elif "group" in meshbook:
console("Target group: " + text_color.yellow + str(meshbook["group"]))
console.nice_print(args,
"Target group: " + console.text_color.yellow + str(meshbook["group"]))
elif "groups" in meshbook:
console.nice_print(args,
"Target groups: " + console.text_color.yellow + str(meshbook["groups"]))
console("Grace: " + text_color.yellow + str((not args.nograce))) # Negation of bool for correct explanation
console("Silent: " + text_color.yellow + "False") # Can be pre-defined because if silent flag was passed then none of this would be printed.
# RUNNING PARAMETERS PRINTING
console.nice_print(args, "Grace: " + console.text_color.yellow + str((not args.nograce))) # Negation of bool for correct explanation
console.nice_print(args, "Silent: " + console.text_color.yellow + "False") # Can be pre-defined because if silent flag was passed then none of this would be printed.
session = await init_connection(credentials)
console(text_color.reset + ("-" * 40))
console(text_color.italic + "Trying to load the MeshCentral account credential file...")
console(text_color.italic + "Trying to load the meshbook yaml file and compile it into something workable...")
console(text_color.italic + "Trying to load the Operating System categorisation JSON file...")
console(text_color.italic + "Connecting to MeshCentral and establish a session using variables from previous credential file.")
console(text_color.italic + "Generating group list with nodes and reference the targets from that.")
# PROCESS PRINTING aka what its doing in the moment...
console.nice_print(args,
console.text_color.reset + ("-" * 40))
console.nice_print(args,
console.text_color.italic + "Trying to load the MeshCentral account credential file...")
console.nice_print(args,
console.text_color.italic + "Trying to load the meshbook yaml file and compile it into something workable...")
console.nice_print(args,
console.text_color.italic + "Trying to load the Operating System categorisation JSON file...")
console.nice_print(args,
console.text_color.italic + "Connecting to MeshCentral and establish a session using variables from previous credential file.")
console.nice_print(args,
console.text_color.italic + "Generating group list with nodes and reference the targets from that.")
'''
End of the main information displaying section.
'''
group_list = await compile_group_list(session)
targets_list = await gather_targets(meshbook, group_list, os_categories)
group_list = await transform.compile_group_list(session)
compiled_device_list = await gather_targets(args, meshbook, group_list, os_categories)
if len(targets_list) == 0:
console(text_color.red + "No targets found or targets unreachable, quitting.", True)
console(text_color.reset + ("-" * 40), True)
if len(compiled_device_list["target_list"]) == 0:
console.nice_print(args, console.text_color.red + "No targets found or targets unreachable, quitting.", True)
console.nice_print(args, console.text_color.reset + ("-" * 40), True)
else:
console(text_color.reset + ("-" * 40))
console.nice_print(args, console.text_color.reset + ("-" * 40))
match meshbook:
case {"group": candidate_target_name}:
@@ -387,22 +288,26 @@ async def main():
case {"devices": candidate_target_name}:
target_name = str(candidate_target_name)
console(text_color.yellow + "Executing meshbook on the target(s): " + text_color.green + target_name + ".")
console.nice_print(args, console.text_color.yellow + "Executing meshbook on the target(s): " + console.text_color.green + target_name + ".")
if not args.nograce:
console(text_color.yellow + "Initiating grace-period...")
console.nice_print(args, console.text_color.yellow + "Initiating grace-period...")
for x in range(grace_period):
console(text_color.yellow + "{}...".format(x+1)) # Countdown!
console.nice_print(args, console.text_color.yellow + "{}...".format(x+1)) # Countdown!
await asyncio.sleep(1)
console(text_color.reset + ("-" * 40))
await execute_meshbook(session, targets_list, meshbook, group_list)
console.nice_print(args, console.text_color.reset + ("-" * 40))
await executor.execute_meshbook(args,
session,
compiled_device_list,
meshbook,
group_list)
await session.close()
except OSError as message:
console(text_color.red + message, True)
console.nice_print(args, console.text_color.red + message, True)
if __name__ == "__main__":
asyncio.run(main())
asyncio.run(main())

25
modules/console.py Normal file
View File

@@ -0,0 +1,25 @@
# Public Python libraries
import argparse
class console:
class text_color:
black = "\033[30m"
red = "\033[31m"
green = "\033[32m"
yellow = "\033[33m"
blue = "\033[34m"
magenta = "\033[35m"
cyan = "\033[36m"
white = "\033[37m"
italic = "\x1B[3m"
reset = "\x1B[0m"
def nice_print(args: argparse.Namespace, message: str, final: bool=False):
'''
Helper function for terminal output, with a couple variables for the silent flag. Also clears terminal color each time.
'''
if final:
print(message) # Assuming final message, there is no need for clearing.
elif not args.silent:
print(message + console.text_color.reset)

63
modules/executor.py Normal file
View File

@@ -0,0 +1,63 @@
# Public Python libraries
import argparse
import json
import meshctrl
from time import sleep
# Local Python libraries/modules
from modules.console import console
from modules.utilities import transform
class executor:
async def execute_meshbook(args: argparse.Namespace, session: meshctrl.Session, compiled_device_list: dict, meshbook: dict, group_list: dict) -> None:
'''
Actual function that handles meshbook execution, also responsible for formatting the resulting JSON.
'''
responses_list = {}
targets = compiled_device_list["target_list"]
offline = compiled_device_list["offline_list"]
round = 1
for task in meshbook["tasks"]:
console.nice_print(args,
console.text_color.green + str(round) + ". Running: " + task["name"])
if "powershell" in meshbook and meshbook["powershell"]:
response = await session.run_command(nodeids=targets, command=task["command"],powershell=True,ignore_output=False,timeout=900)
else:
response = await session.run_command(nodeids=targets, command=task["command"],ignore_output=False,timeout=900)
task_batch = []
for device in response:
device_result = response[device]["result"]
response[device]["result"] = device_result.replace("Run commands completed.", "")
response[device]["device_id"] = device
response[device]["device_name"] = await transform.translate_nodeid_to_name(device, group_list)
task_batch.append(response[device])
responses_list["Task " + str(round)] = {
"task_name": task["name"],
"data": task_batch
}
round += 1
sleep(0.5) # Sleep for 0.5 seconds.
for index, device in enumerate(offline): # Replace Device_id with actual human readable name
device_name = await transform.translate_nodeid_to_name(device, group_list)
offline[index] = device_name
responses_list["Offline"] = offline
console.nice_print(args,
console.text_color.reset + ("-" * 40))
if args.indent:
if not args.raw_result:
responses_list = transform.process_shell_response(args.shlex, responses_list)
console.nice_print(args,
json.dumps(responses_list,indent=4), True)
else:
console.nice_print(args,
json.dumps(responses_list), True)

219
modules/utilities.py Normal file
View File

@@ -0,0 +1,219 @@
# Public Python libraries
import argparse
from configparser import ConfigParser
import meshctrl
import os
import yaml
'''
Creation and compilation of the MeshCentral nodes list (list of all nodes available to the user in the configuration) is handled in the following section.
'''
class utilities:
async def load_config(args: argparse.Namespace,
segment: str = 'meshcentral-account') -> dict:
'''
Function that loads the segment from the config.conf (by default) file and returns the it in a dict.
'''
conf_file = args.conf
if not os.path.exists(conf_file):
print(f'Missing config file {conf_file}. Provide an alternative path.')
os._exit(1)
config = ConfigParser()
try:
config.read(conf_file)
except Exception as err:
print(f"Error reading configuration file '{conf_file}': {err}")
os._exit(1)
if segment not in config:
print(f'Segment "{segment}" not found in config file {conf_file}.')
os._exit(1)
return config[segment]
async def compile_book(meshbook_file: dict) -> dict:
'''
Simple function that opens the file and replaces placeholders through the next function. After that just return it.
'''
meshbook = open(meshbook_file, 'r')
meshbook = await transform.replace_placeholders(yaml.safe_load(meshbook))
return meshbook
def get_os_variants(target_category: str,
os_map: dict) -> set:
'''
Extracts all OS names under a given category if it exists.
'''
for key, value in os_map.items():
if key == target_category:
if isinstance(value, dict): # Expand nested categories
os_set = set()
for sub_target_cat in value:
os_set.update(utilities.get_os_variants(sub_target_cat, value))
return os_set
elif isinstance(value, list): # Direct OS list
return set(value)
return set()
async def filter_targets(devices: list[dict],
os_categories: dict,
target_os: str = None,
ignore_categorisation: bool = False,
target_tag: str = None) -> dict:
'''
Filters devices based on reachability and optional OS criteria, supporting nested OS categories.
'''
valid_devices = []
offline_devices = []
# Identify correct OS filtering scope
for key in os_categories:
if key == target_os:
allowed_os = utilities.get_os_variants(target_os, os_categories)
break # Stop searching once a match is found
if isinstance(os_categories[key], dict) and target_os in os_categories[key]:
allowed_os = utilities.get_os_variants(target_os, os_categories[key])
break # Stop searching once a match is found
for device in devices: # Filter out unwanted or unreachable devices.
if target_tag and target_tag not in device["device_tags"]:
continue
if not ignore_categorisation:
if device["device_os"] not in allowed_os:
continue
else:
if target_os not in device["device_os"]:
continue
if not device["reachable"]:
offline_devices.append(device["device_id"])
continue
valid_devices.append(device["device_id"])
return {
"valid_devices": valid_devices,
"offline_devices": offline_devices
}
async def process_device_or_group(pseudo_target: str,
group_list: dict,
os_categories: dict,
target_os: str,
ignore_categorisation: bool,
target_tag: str) -> dict:
'''
Helper function to process devices or groups.
'''
matched_devices = []
for group in group_list:
for device in group_list[group]:
if device["device_name"] == pseudo_target:
matched_devices.append(device)
if matched_devices:
return await utilities.filter_targets(matched_devices, os_categories, target_os, ignore_categorisation, target_tag)
return []
import shlex
class transform:
def process_shell_response(shlex_enable: bool, meshbook_result: dict) -> dict:
for task_name, task_data in meshbook_result.items():
if task_name == "Offline": # Failsafe
continue
for node_responses in task_data["data"]:
task_result = node_responses["result"].splitlines()
if shlex_enable:
for index, line in enumerate(task_result):
line = shlex.split(line)
task_result[index] = line
clean_output = []
for line in task_result:
if len(line) > 0:
clean_output.append(line)
node_responses["result"] = clean_output
return meshbook_result
async def translate_nodeid_to_name(target_id: str, group_list: dict) -> str:
'''
Simple function that looks up nodeid to the human-readable name if existent - otherwise return None.
'''
for group in group_list:
for device in group_list[group]:
if device["device_id"] == target_id:
return device["device_name"]
return None
async def replace_placeholders(meshbook: dict) -> dict:
'''
Replace the placeholders in both name and command fields of the tasks. According to the variables defined in the variables list.
'''
variables = {}
if "variables" in meshbook and isinstance(meshbook["variables"], list):
for var in meshbook["variables"]:
var_name = var["name"]
var_value = var["value"]
variables[var_name] = var_value
else:
return meshbook
for task in meshbook.get("tasks", []):
task_name = task.get("name")
for var_name, var_value in variables.items():
placeholder = f"{{{{ {var_name} }}}}"
task_name = task_name.replace(placeholder, var_value)
task["name"] = task_name
command = task.get("command")
for var_name, var_value in variables.items():
placeholder = f"{{{{ {var_name} }}}}" # Create the placeholder string like "{{ host1 }}"
command = command.replace(placeholder, var_value)
task["command"] = command
return meshbook
async def compile_group_list(session: meshctrl.Session) -> dict:
'''
Function that retrieves the devices from MeshCentral and compiles it into a efficient list.
'''
devices_response = await session.list_devices(details=False, timeout=10)
local_device_list = {}
for device in devices_response:
if device.meshname not in local_device_list:
local_device_list[device.meshname] = []
local_device_list[device.meshname].append({
"device_id": device.nodeid,
"device_name": device.name,
"device_os": device.os_description,
"device_tags": device.tags,
"reachable": device.connected
})
return local_device_list