43 Commits
v1.0.1 ... v1.3

Author SHA1 Message Date
Daan Selen
f857b79d82 Added dedicated var 2025-04-25 16:09:36 +02:00
dselen
58598d8d17 Rewrote Meshbook with submodules with classes (#12)
* Massive update, bringing a lot of QoL. RC

* 0.5 seconds delay between tasks.
    insignificant for humans.
    Like a thousand years for computers to get ready.

* Added new features such as ignore_categorisation.

* Version 1.3 RC

* Added defaults.
2025-04-25 16:03:07 +02:00
Daan Selen
ac4dd8994c Removed more prints 2025-04-07 16:55:58 +02:00
Daan Selen
7a60cd7280 hotfix 2025-04-07 16:52:19 +02:00
Daan
e2eca57a0a bump libmeshctrl 2025-04-01 22:17:20 +02:00
dselen
a4b6062c0e Merge pull request #10 from DaanSelen/tag_test
Added targeting tags.
2025-03-27 11:23:00 +01:00
Daan Selen
de4fe0258c added doc note about target_tag 2025-03-27 11:20:51 +01:00
Daan Selen
1d4b89a2ed Targeting tags seems to work well. Needing doc update. 2025-03-27 11:19:04 +01:00
Daan Selen
b2bf899d42 Renamed config due to autocorrect. 2025-03-27 10:15:21 +01:00
Daan Selen
0a211da4d6 Slight modification. 2025-03-27 10:11:33 +01:00
Daan Selen
1450416d62 Test new filtering logic. 2025-03-27 10:01:49 +01:00
Daan Selen
b0f34e9ea0 add target_tag in function parameters. 2025-03-26 16:58:12 +01:00
Daan Selen
47eef4cfb0 Added 'all' option to the groups option. 2025-03-04 11:57:41 +01:00
Daan Selen
ba74e038f7 Renamed meshcentral.conf -> meshbook.conf 2025-03-03 14:01:41 +01:00
Daan Selen
f1df522f61 Modified dir name in gitignore and changed disk info example. 2025-02-28 15:18:57 +01:00
Daan Selen
f5453353fe +el 2025-02-28 14:52:16 +01:00
Daan Selen
898098105c Small logic change. Added extra check and error handling. 2025-02-28 14:19:40 +01:00
Daan
6f945d30d7 Hotfix in powershell checking code. 2025-02-27 21:58:09 +01:00
Daan
a722c024f5 Expansion of previous commit. 2025-02-27 21:47:55 +01:00
Daan
db7ff19bfb changed .gitignore
Added examples directory back if you want to customize them: cp examples meshbooks (which is ignored).
Added powershell support into the yaml (for Windows).
Added extra check in console output segment.
Updated readme.md
updated os_categories with Windows examples
Added more examples.
2025-02-27 21:45:47 +01:00
Daan Selen
9caa52f59e pb -> mb reference. (M)esh(B)ook 2025-02-27 14:52:56 +01:00
Daan Selen
d62d80fb16 Changed hostname value. 2025-02-27 14:52:25 +01:00
Daan Selen
ff6e1f6cb7 Added submodule to meshbook (public) 2025-02-27 09:46:30 +01:00
Daan Selen
a4335ce8ac Expanded logic in replace_placeholder() function so this is easier to work with. 2025-02-25 10:39:07 +01:00
Daan Selen
30d49059c5 Bumped libmeshctrl after: https://github.com/HuFlungDu/pylibmeshctrl/issues/35 2025-02-18 08:43:08 +01:00
dselen
50e413581a Freeze websockets at 14.2 due to breakage. 2025-02-17 16:53:43 +01:00
Daan Selen
64bf28c565 Freeze websockets at 14.2 due to breakage. 2025-02-17 16:53:01 +01:00
dselen
4359a40eb3 Added slight change to fix some Windows JSON formatting.
Merge pull request #6 from DaanSelen/dev
2025-02-14 13:54:51 +01:00
Daan Selen
193cb546f4 Added slight change to fix some Windows JSON formatting. 2025-02-14 13:30:44 +01:00
dselen
aa1d6a1a97 Fix Windows terminal outputs.
Merge pull request #5 from DaanSelen/dev
2025-02-14 13:13:45 +01:00
Daan Selen
e69ad445e2 Fix Windows terminal outputs. 2025-02-14 13:03:35 +01:00
dselen
bd833456d0 Merge pull request #4 from DaanSelen/dev
Tidying up everything and adding slight QoL changes.
2025-02-13 15:37:29 +01:00
Daan Selen
7f0159a8fa Tidying up everything and adding slight QoL changes. 2025-02-13 15:21:18 +01:00
dselen
03683976a8 Added targeting of multiple groups and devices through a new 'keyword'
Through a new sorting method, all cool and logically formatted.
2025-02-13 14:27:02 +01:00
Daan Selen
4f75969ed8 Added multiple group and device targeting with 'groups' and 'devices'. Also more error handling. 2025-02-13 14:22:03 +01:00
Daan Selen
77b1cfc73c Changed documentation according to feedback. 2025-02-13 13:44:23 +01:00
dselen
ad220e8c1a Update operating_system_filtering.md
reformat sentence. (a whole commit for that yes.)
2025-02-13 12:30:58 +01:00
Daan Selen
b7b9fdaea7 Added minor changes (documentation) and indentation for readability.
Readme correct referencing to new docs directory its file.
    couple comments and indentations to improve readability.
    Added a dummy Ubuntu 22 and MacOS categories as an example.
2025-02-13 12:29:06 +01:00
Daan Selen
ab1105b058 Added OS filtering doc and supported (1 level deep) nested definition. 2025-02-13 12:20:03 +01:00
Daan Selen
9494aa14c9 Changed logic in device filtering and made code a little more readable with newlines. 2025-02-13 11:44:53 +01:00
Daan Selen
6a7ec78fe9 Updated OS categorisation and bumped libmeshctrl to 1.1.1 2025-02-12 16:58:20 +01:00
dselen
1779025a97 Update README.md 2025-02-11 16:27:30 +01:00
dselen
e0de06c57e Update requirements.txt 2025-02-11 16:26:33 +01:00
21 changed files with 780 additions and 194 deletions

4
.gitignore vendored
View File

@@ -1,10 +1,8 @@
*.conf
venv
books
.vscode
# temporary or to prevent big commits
examples/
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]

View File

@@ -11,46 +11,50 @@ And many people will be comfortable with YAML configurations! It's almost like J
The quickest way to start is to grab a template from the templates folder in this repository.<br>
Make sure to correctly pass the MeshCentral websocket API as `wss://<MeshCentral-Host>`.<br>
And make sure to fill in the credentails of an account which has `Remote Commands` permissions on the targeted devices or groups.<br>
And make sure to fill in the credentails of an account which has `Remote Commands`, `Details` and `Agent Console` permissions on the targeted devices or groups.<br>
> I did this through a "Global Service" group which I added the meshbook account to!
> I did this through a "Service account" with rights on the device group.
Then make a yaml with a target and some commands! See below examples as a guideline. And do not forget to look at the bottom's notice.<br>
To install, follow the following commands:<br>
### Linux setup:
```shell
```bash
git clone https://github.com/daanselen/meshbook
cd ./meshbook
python3 -m venv ./venv
source ./venv/bin/activate
pip3 install -r ./requirements.txt
cp ./templates/meshcentral.conf.template ./meshcentral.conf
```
### Windows setup:
### Windows setup (PowerShell, not cmd):
```shell
git clone https://github.com/daanselen/meshbook
cd ./meshbook
python3 -m venv ./venv
python -m venv ./venv # or python3 when done through the Microsoft Store.
.\venv\Scripts\activate # Make sure to check the terminal prefix.
pip3 install -r ./requirements.txt
cp .\templates\meshcentral.conf.template .\meshcentral.conf
```
Now copy the configuration template from ./templates and fill it in with the correct details. The url should start with `wss://`.<br>
Now copy the configuration template from ./templates and fill it in with the correct details (remove .template from the file) this is shown in the last step of the setup(s).<br>
The url should start with `wss://`.<br>
You can check pre-made examples in the examples directory, make sure the values are set to your situation.<br>
After this you can use meshbook, for example:
### Linux run:
```shell
```bash
python3 .\meshbook.py -pb .\examples\echo.yaml
```
### Windows run:
```shell
.\venv\Scripts\python.exe .\meshbook.py -pb .\examples\echo.yaml
.\venv\Scripts\python.exe .\meshbook.py -pb .\examples\echo_example.yaml
```
### How to check if everything is okay?
@@ -80,6 +84,8 @@ So to target for example a mesh/group in MeshCentral called: "Nerthus" do:
---
name: example configuration
group: "Nerthus"
#target_os: "Linux" # <--- according to os_categories.json.
powershell: True # <--- this can be important for Windows clients.
variables:
- name: var1
value: "This is the first variable"
@@ -102,14 +108,21 @@ The tasks you want to run should be contained under the `tasks:` with two fields
The name field is for the user of meshbook, to clarify what the following command does in a summary.<br>
The command field actually gets executed on the end-point.<br>
### Granual Operating System control:
### Windows Client Extra-information:
I have made the program so it can have a basic filter with the Operating systems. If you have a mixed group, then you need to match the image below like this:
If you want to launch commands at Windows machines, make sure you have your `os_categories.conf` up-to-date with the correct reported Windows versions.<br>
And then make sure to create compatible commands, see: [windows examples](./examples/windows)<br>
Related is the yaml option: `powershell: True`.
<img src="./assets/meshcentral_os.png" alt="MeshCentral Operating System Version" width="600"/><br>
<img src="./assets/meshbook_yaml_os.png" alt="MeshBook yaml example" width="600"/>
### Granual Operating System filtering:
This will filter the nodes/machines in the MeshCentral group to the ones matching this. Very basic and open for feedback. This must match the full string, not case sensitive.
I have made the program so it can have a filter with the Operating systems. If you have a mixed group, please read:
[This explanation](./docs/operating_system_filtering.md)
### Tag filtering:
Filtering on MeshCentral tags is also possible with `target_tag` inside the meshbook. This string is case-sensitive, lower- and uppercase must match.<br>
This is done because its human made and therefor needs to be keps well administrated.
# Example:
@@ -122,6 +135,7 @@ You can expand the command chain as follows:<br>
---
name: Echo a string to the terminal through the meshbook example.
group: "Dev"
#target_os: "Linux" # <--- according to os_categories.json
variables:
- name: file
value: "/etc/os-release"
@@ -133,33 +147,46 @@ tasks:
The following response it received when executing the first yaml of the above files (without the `-s` parameters, which just outputs the below JSON).
```shell
python3 meshbook.py -pb examples/echo_example.yaml
$ python3 meshbook.py -mb books/aggregate_example.yaml -i --nograce -pr
----------------------------------------
meshbook: books/aggregate_example.yaml
Operating System Categorisation file: ./os_categories.json
Configuration file: ./config.conf
Target Operating System category given: Linux
Target group: Systemec Development
Grace: False
Silent: False
----------------------------------------
Trying to load the MeshCentral account credential file...
Trying to load the Playbook yaml file and compile it into something workable...
Trying to load the meshbook yaml file and compile it into something workable...
Trying to load the Operating System categorisation JSON file...
Connecting to MeshCentral and establish a session using variables from previous credential file.
Generating group list with nodes and reference the targets from that.
----------------------------------------
Executing playbook on the targets.
1. Running: Echo!
Executing playbook on the target(s): Development.
----------------------------------------
1. Running: Ping!
----------------------------------------
{
"Task 1": [
"Task 1": {
"task_name": "Ping Quad9 DNS",
"data": [
{
"complete": true,
"result": "PRETTY_NAME=\"Debian GNU/Linux 12 (bookworm)\" NAME=\"Debian GNU/Linux\" VERSION_ID=\"12\" VERSION=\"12 (bookworm)\" VERSION_CODENAME=bookworm ID=debian HOME_URL=\"https://www.debian.org/\" SUPPORT_URL=\"https://www.debian.org/support\" BUG_REPORT_URL=\"https://bugs.debian.org/\"\n",
"command": "echo $(cat /etc/os-release)",
"device_id": "<Node-Unique>",
"device_name": "raspberrypi5"
},
{
"complete": true,
"result": "PRETTY_NAME=\"Debian GNU/Linux 12 (bookworm)\" NAME=\"Debian GNU/Linux\" VERSION_ID=\"12\" VERSION=\"12 (bookworm)\" VERSION_CODENAME=bookworm ID=debian HOME_URL=\"https://www.debian.org/\" SUPPORT_URL=\"https://www.debian.org/support\" BUG_REPORT_URL=\"https://bugs.debian.org/\"\n",
"command": "echo $(cat /etc/os-release)",
"device_id": "<Node-Unique>",
"device_name": "Cubic"
"result": [
"PING 9.9.9.9 (9.9.9.9) 56(84) bytes of data.",
"64 bytes from 9.9.9.9: icmp_seq=1 ttl=61 time=26.8 ms",
"--- 9.9.9.9 ping statistics ---",
"1 packets transmitted, 1 received, 0% packet loss, time 0ms",
"rtt min/avg/max/mdev = 26.809/26.809/26.809/0.000 ms"
],
"command": "ping 9.9.9.9 -c 1",
"device_id": "yourn nodeip",
"device_name": "yournodename"
}
]
}
}
```
The above without `-s` is quite verbose. use `--help` to read about parameters and getting a minimal response for example.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 14 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 68 KiB

View File

@@ -0,0 +1,65 @@
# **Understanding the OS Filtering Mechanism**
## **Overview**
This function filters devices based on their **reachability** and an optional **OS category filter**. It supports:
- **Broad OS categories** (e.g., `"Linux"` includes all OS versions under `"Linux"`)
- **Specific OS categories** (e.g., `"Debian"` only includes OS versions under `"Linux" -> "Debian"`)
- **Single category selection** (Only `target_os="Linux"` OR `target_os="Debian"` is used, never both at once)
---
## **How It Works (Simplified)**
### **1. OS Category Expansion**
The function first expands the `target_os` category by retrieving all valid OS names under it.
#### **Example OS Category Structure:**
```json
{
"Linux": {
"Debian": [
"Debian GNU/Linux 12 (bookworm)",
"Debian GNU/Linux 11 (bullseye)"
],
"Ubuntu": [
"Ubuntu 24.04.1 LTS"
]
}
}
```
#### **Expanding Different `target_os` Values:**
| `target_os` | Expanded OS Versions |
|--------------|---------------------------------------------------|
| `"Linux"` | `{ "Debian GNU/Linux 12 (bookworm)", "Debian GNU/Linux 11 (bullseye)", "Ubuntu 24.04.1 LTS" }` |
| `"Debian"` | `{ "Debian GNU/Linux 12 (bookworm)", "Debian GNU/Linux 11 (bullseye)" }` |
---
### **2. Device Filtering**
Once the function has the allowed OS versions, it checks each device:
#### **Example Device List:**
```json
[
{"device_id": "A1", "device_os": "Debian GNU/Linux 12 (bookworm)", "reachable": true},
{"device_id": "A2", "device_os": "Ubuntu 24.04.1 LTS", "reachable": true},
{"device_id": "A3", "device_os": "Windows 11", "reachable": true},
{"device_id": "A4", "device_os": "Debian GNU/Linux 11 (bullseye)", "reachable": false}
]
```
#### **Filtering Behavior:**
| Device ID | Device OS | Reachable | Matches `target_os="Linux"` | Matches `target_os="Debian"` |
|-----------|----------------------------------|-----------|-------------------------------|-------------------------------|
| A1 | Debian GNU/Linux 12 (bookworm) | ✅ | ✅ | ✅ |
| A2 | Ubuntu 24.04.1 LTS | ✅ | ✅ | ❌ |
| A3 | Windows 11 | ✅ | ❌ | ❌ |
| A4 | Debian GNU/Linux 11 (bullseye) | ❌ | ❌ (Unreachable) | ❌ (Unreachable) |
#### **Final Output:**
- If `target_os="Linux"`: `["A1", "A2"]`
- If `target_os="Debian"`: `["A1"]`
- If `target_os=None` or `target_os` is undefined: `["A1", "A2", "A3"]`

View File

@@ -1,6 +1,7 @@
---
name: Ping Multiple Points
group: "Kubernetes"
#target_os: "Debian"
variables:
- name: host1
value: "1.1.1.1"

View File

@@ -1,6 +1,7 @@
---
name: Echo some text in the terminal of the device
group: "Dev"
group: "Development"
target_os: "Linux" # <----
variables:
- name: package_manager
value: "apt"
@@ -28,4 +29,4 @@ tasks:
command: "ping {{ google_dns }} -c 4"
- name: Ping Quad9 DNS
command: "ping {{ quad9_dns }} -c 4"
command: "ping {{ quad9_dns }} -c 4"

View File

@@ -1,12 +1,13 @@
---
name: Refresh the apt cache
device: Cubic
device: "<Device-Name>"
#target_os: "Linux"
variables:
- name: package_manager
value: "apt"
tasks:
- name: refresh the cache
- name: refresh the {{ package_manager }} cache
command: "{{ package_manager }} update"
- name: display available upgrades
- name: display available upgrades with {{ package_manager }}
command: "{{ package_manager }} list --upgradable"

View File

@@ -1,6 +1,7 @@
---
name: Refresh the apt cache
group: "Dev"
#target_os: "Linux"
variables:
- name: package_manager
value: "apt"

View File

@@ -0,0 +1,8 @@
---
name: Use DF to get drive information in JSON.
group: Systemec Development
target_os: "Linux"
tasks:
- name: Get disk-info with df returning JSON.
command: >
df -Th -x overlay -x tmpfs -x devtmpfs | awk 'NR>1 {printf "%s{\"size\":\"%s\",\"used\":\"%s\",\"available\":\"%s\",\"mount_point\":\"%s\",\"type\":\"%s\"}", (NR==2?"[":","), $3, $4, $5, $7, $2} END {print "]"}'

View File

@@ -1,7 +1,7 @@
---
name: Echo a string to the terminal through the meshbook example.
group: "Dev"
target_os: Debian GnU/Linux 12 (bookworm)
group: "Development"
target_os: "Linux"
variables:
- name: file
value: "/etc/os-release"

View File

@@ -0,0 +1,11 @@
---
name: Echo a string to the terminal through the meshbook example.
group: "Endpoint"
target_os: "Windows"
powershell: True
#variables:
# - name: file
# value: "/etc/os-release"
tasks:
- name: Echo!
command: "Get-ComputerInfo | Select-Object CsName, OsName, OsArchitecture, OsLastBootUpTime | Write-Output"

View File

@@ -0,0 +1,8 @@
---
name: Echo a string to the terminal through the meshbook example.
group: "Endpoint"
target_os: "Windows"
powershell: True
tasks:
- name: Get some update information
command: "Get-HotFix | Select-Object PSComputerName, HotFixID, InstalledOn"

View File

@@ -0,0 +1,16 @@
---
name: Echo a string to the terminal through the meshbook example.
group: "Endpoint"
target_os: "Windows"
powershell: True
#variables:
# - name: file
# value: "/etc/os-release"
tasks:
- name: Echo!
command: >
$systemInfo = Get-ComputerInfo | Select-Object CsName, OsName, OsArchitecture, OsLastBootUpTime;
$systemInfo | Format-Table -AutoSize;
Write-Output "I like monkeys.";
Get-SystemLanguage | Write-Output;
Get-ComputerInfo | Format-List;

View File

@@ -1,197 +1,313 @@
#!/bin/python3
# Public Python libraries
import argparse
import asyncio
from base64 import b64encode
from configparser import ConfigParser
from colorama import just_fix_windows_console
import json
import math
import meshctrl
import os
import yaml
'''
Script utilities are handled in the following section.
'''
# Local Python libraries/modules
from modules.console import *
from modules.executor import *
from modules.utilities import *
class ScriptEndTrigger(Exception):
pass
def output_text(message: str, required=False):
if required:
print(message)
elif not args.silent:
print(message)
async def load_config(conf_file: str = './api.conf', segment: str = 'meshcentral-account') -> ConfigParser:
if not os.path.exists(conf_file):
raise ScriptEndTrigger(f'Missing config file {conf_file}. Provide an alternative path.')
config = ConfigParser()
try:
config.read(conf_file)
except Exception as err:
raise ScriptEndTrigger(f"Error reading configuration file '{conf_file}': {err}")
if segment not in config:
raise ScriptEndTrigger(f'Segment "{segment}" not found in config file {conf_file}.')
return config[segment]
meshbook_version = 1.3
grace_period = 3 # Grace period will last for x (by default 3) second(s).
async def init_connection(credentials: dict) -> meshctrl.Session:
'''
Use the libmeshctrl library to initiate a Secure Websocket (wss) connection to the MeshCentral instance.
'''
session = meshctrl.Session(
credentials['websocket_url'],
credentials['hostname'],
user=credentials['username'],
password=credentials['password']
)
await session.initialized.wait()
return session
async def translate_id_to_name(target_id: str, group_list: dict) -> str:
for group in group_list:
for device in group_list[group]:
if device["device_id"] == target_id:
return device["device_name"]
async def gather_targets(args: argparse.Namespace,
meshbook: dict,
group_list: dict[str, list[dict]],
os_categories: dict) -> list[str]:
'''
Finds target devices based on meshbook criteria (device or group).
'''
'''
Creation and compilation happends in the following section, where the yaml gets read in, and edited accordingly.
'''
async def compile_book(playbook_file: dict) -> dict:
playbook = open(playbook_file, 'r')
playbook = await replace_placeholders(yaml.safe_load(playbook))
return playbook
async def replace_placeholders(playbook: dict) -> dict:
variables = {var["name"]: var["value"] for var in playbook.get("variables", [])}
for task in playbook.get("tasks", []):
command = task.get("command", "")
for var_name, var_value in variables.items():
placeholder = f"{{{{ {var_name} }}}}" # Create the placeholder string like "{{ host1 }}"
command = command.replace(placeholder, var_value)
task["command"] = command
return playbook
'''
Creation and compilation of the MeshCentral nodes list (list of all nodes available to the user in the configuration) is handled in the following section.
'''
async def compile_group_list(session: meshctrl.Session) -> dict:
devices_response = await session.list_devices(details=False, timeout=10)
local_device_list = {}
for device in devices_response:
if device.meshname not in local_device_list:
local_device_list[device.meshname] = []
local_device_list[device.meshname].append({
"device_id": device.nodeid,
"device_name": device.name,
"device_os": device.os_description,
"device_tags": device.tags,
"reachable": device.connected
})
return local_device_list
async def gather_targets(playbook: dict, group_list: dict) -> dict:
target_list = []
offline_list = []
target_os = meshbook.get("target_os")
ignore_categorisation = meshbook.get("ignore_categorisation", False)
target_tag = meshbook.get("target_tag")
if "device" in playbook and "group" not in playbook:
pseudo_target = playbook["device"]
match meshbook:
case {"device": pseudo_target}: # Single device target
if isinstance(pseudo_target, str):
processed_devices = await utilities.process_device_or_group(pseudo_target,
group_list,
os_categories,
target_os,
ignore_categorisation,
target_tag)
if len(processed_devices) > 0:
matched_devices = processed_devices["valid_devices"]
target_list.extend(matched_devices)
if len(processed_devices) > 0:
offline_devices = processed_devices["offline_devices"]
offline_list.extend(offline_devices)
for group in group_list:
for device in group_list[group]:
if device["reachable"] and pseudo_target == device["device_name"]:
if "target_os" in playbook and str(playbook["target_os"]).lower() == str(device["device_os"]).lower():
target_list.append(device["device_id"])
elif "target_os" not in playbook:
target_list.append(device["device_id"])
else:
console.nice_print(args,
console.text_color.yellow + "Please use devices (Notice the 'S') for multiple devices.", True)
elif "group" in playbook and "device" not in playbook:
pseudo_target = playbook["group"]
case {"devices": pseudo_target}: # List of devices
if isinstance(pseudo_target, list):
for sub_pseudo_device in pseudo_target:
processed_devices = await utilities.process_device_or_group(sub_pseudo_device,
group_list,
os_categories,
target_os,
ignore_categorisation,
target_tag,)
if len(processed_devices) > 0:
matched_devices = processed_devices["valid_devices"]
target_list.extend(matched_devices)
if len(processed_devices) > 0:
offline_devices = processed_devices["offline_devices"]
offline_list.extend(offline_devices)
for group in group_list:
if pseudo_target == group:
for device in group_list[group]:
if device["reachable"]:
if "target_os" in playbook and str(playbook["target_os"]).lower() == str(device["device_os"]).lower():
target_list.append(device["device_id"])
elif "target_os" not in playbook:
target_list.append(device["device_id"])
else:
console.nice_print(args, console.text_color.yellow + "The 'devices' method is being used, but only one string is given. Did you mean 'device'?", True)
return target_list
case {"group": pseudo_target}: # Single group target
if isinstance(pseudo_target, str) and pseudo_target in group_list:
processed_devices = await utilities.filter_targets(group_list[pseudo_target],
os_categories,
target_os,
ignore_categorisation,
target_tag)
if len(processed_devices) > 0:
matched_devices = processed_devices["valid_devices"]
target_list.extend(matched_devices)
if len(processed_devices) > 0:
offline_devices = processed_devices["offline_devices"]
offline_list.extend(offline_devices)
async def execute_playbook(session: meshctrl.Session, targets: dict, playbook: dict, group_list: dict) -> None:
responses_list = {}
round = 1
for task in playbook["tasks"]:
output_text(("\033[1m\033[92m" + str(round) + ". Running: " + task["name"] + "\033[0m"), False)
response = await session.run_command(nodeids=targets, command=task["command"], ignore_output=False, timeout=900)
task_batch = []
for device in response:
device_result = response[device]["result"]
response[device]["result"] = device_result.replace("Run commands completed.", "")
response[device]["device_id"] = device
response[device]["device_name"] = await translate_id_to_name(device, group_list)
task_batch.append(response[device])
elif pseudo_target not in group_list:
console.nice_print(args,
console.text_color.yellow + "Targeted group not found on the MeshCentral server.", True)
responses_list["Task " + str(round)] = {
"task_name": task["name"],
"data": task_batch
}
round += 1
output_text(("-" * 40), False)
output_text((json.dumps(responses_list,indent=4)), True)
else:
console.nice_print(args,
console.text_color.yellow + "Please use groups (Notice the 'S') for multiple groups.", True)
case {"groups": pseudo_target}: # List of groups
if isinstance(pseudo_target, list):
for sub_pseudo_target in pseudo_target:
if sub_pseudo_target in group_list:
processed_devices = await utilities.filter_targets(group_list[sub_pseudo_target],
os_categories,
target_os,
ignore_categorisation,
target_tag)
if len(processed_devices) > 0:
matched_devices = processed_devices["valid_devices"]
target_list.extend(matched_devices)
if len(processed_devices) > 0:
offline_devices = processed_devices["offline_devices"]
offline_list.extend(offline_devices)
elif pseudo_target.lower() == "all":
for group in group_list:
processed_devices = await utilities.filter_targets(group_list[group],
os_categories,
target_os,
ignore_categorisation,
target_tag)
if len(processed_devices) > 0:
matched_devices = processed_devices["valid_devices"]
target_list.extend(matched_devices)
if len(processed_devices) > 0:
offline_devices = processed_devices["offline_devices"]
offline_list.extend(offline_devices)
else:
console.nice_print(args,
console.text_color.yellow + "The 'groups' method is being used, but only one string is given. Did you mean 'group'?", True)
return {
"target_list": target_list,
"offline_list": offline_list
}
async def main():
parser = argparse.ArgumentParser(description="Process command-line arguments")
parser.add_argument("-pb", "--playbook", type=str, help="Path to the playbook file.", required=True)
parser.add_argument("--conf", type=str, help="Path for the API configuration file (default: ./api.conf).", required=False)
parser.add_argument("--nograce", action="store_true", help="Disable the grace 3 seconds before running the playbook.", required=False)
parser.add_argument("-s", "--silent", action="store_true", help="Suppress terminal output", required=False)
just_fix_windows_console()
'''
Main function where the program starts. Place from which all comands originate (eventually).
'''
parser = argparse.ArgumentParser(description="Process command-line arguments")
parser.add_argument("-mb", "--meshbook", type=str, help="Path to the meshbook yaml file.")
parser.add_argument("-oc", "--oscategories", type=str, help="Path to the Operating System categories JSON file.", default="./os_categories.json")
parser.add_argument("--conf", type=str, help="Path for the API configuration file (default: ./config.conf).", default="./config.conf")
parser.add_argument("--nograce", action="store_true", help="Disable the grace 3 seconds before running the meshbook.")
parser.add_argument("-i", "--indent", action="store_true", help="Use an JSON indentation of 4 when this flag is passed.")
parser.add_argument("-r", "--raw-result", action="store_true", help="Print the raw result.")
parser.add_argument("-s", "--silent", action="store_true", help="Suppress terminal output.")
parser.add_argument("--shlex", action="store_true", help="Shlex the lines.")
parser.add_argument("-v", "--version", action="store_true", help="Show the Meshbook version.")
global args
args = parser.parse_args()
local_categories_file = "./os_categories.json"
if args.version:
console.nice_print(args,
console.text_color.reset + "MeshBook Version: " + console.text_color.yellow + str(meshbook_version))
return
if not args.meshbook:
parser.print_help()
return
try:
output_text(("-" * 40), False)
output_text(("\x1B[3mTrying to load the MeshCentral account credential file...\x1B[0m"), False)
output_text(("\x1B[3mTrying to load the Playbook yaml file and compile it into something workable...\x1B[0m"), False)
with open(local_categories_file, "r") as file:
os_categories = json.load(file)
credentials, playbook = await asyncio.gather(
(load_config() if args.conf is None else load_config(args.conf)),
(compile_book(args.playbook))
credentials, meshbook = await asyncio.gather(
(utilities.load_config(args)),
(utilities.compile_book(args.meshbook))
)
output_text(("\x1B[3mConnecting to MeshCentral and establish a session using variables from previous credential file.\x1B[0m"), False)
session = await init_connection(credentials)
output_text(("\x1B[3mGenerating group list with nodes and reference the targets from that.\x1B[0m"), False)
group_list = await compile_group_list(session)
targets_list = await gather_targets(playbook, group_list)
'''
The following section mainly displays used variables and first steps of the program to the console.
'''
if len(targets_list) == 0:
output_text(("\033[91mNo targets found or targets unreachable, quitting.\x1B[0m"), True)
# INIT ARGUMENTS PRINTING
console.nice_print(args,
console.text_color.reset + ("-" * 40))
console.nice_print(args,
"meshbook: " + console.text_color.yellow + args.meshbook)
console.nice_print(args,
"Operating System Categorisation file: " + console.text_color.yellow + args.oscategories)
console.nice_print(args,
"Configuration file: " + console.text_color.yellow + args.conf)
# TARGET OS PRINTING
if "target_os" in meshbook:
console.nice_print(args,
"Target Operating System category given: " + console.text_color.yellow + meshbook["target_os"])
else:
output_text(("-" * 40), False)
target_name = playbook["group"] if "group" in playbook else playbook["device"] # Quickly get the name.
output_text(("\033[91mExecuting playbook on the target(s): " + target_name + ".\x1B[0m"), False)
console.nice_print(args,
"Target Operating System category given: " + console.text_color.yellow + "All")
# Should Meshbook ignore categorisation?
if "ignore_categorisation" in meshbook:
console.nice_print(args,
"Ignore the OS Categorisation file: " + console.text_color.yellow + str(meshbook["ignore_categorisation"]))
if meshbook["ignore_categorisation"]:
console.nice_print(args,
console.text_color.red + "!!!!\n" +
console.text_color.yellow +
"Ignore categorisation is True.\nThis means that the program checks if the target Operating System is somewhere in the reported device Operating System." +
console.text_color.red + "\n!!!!")
else:
console.nice_print(args,
"Ignore the OS Categorisation file: " + console.text_color.yellow + "False")
# TARGET TAG PRINTING
if "target_tag" in meshbook:
console.nice_print(args,
"Target Device tag given: " + console.text_color.yellow + meshbook["target_tag"])
else:
console.nice_print(args,
"Target Device tag given: " + console.text_color.yellow + "All")
# TARGET PRINTING
if "device" in meshbook:
console.nice_print(args,
"Target device: " + console.text_color.yellow + str(meshbook["device"]))
elif "devices" in meshbook:
console.nice_print(args,
"Target devices: " + console.text_color.yellow + str(meshbook["devices"]))
elif "group" in meshbook:
console.nice_print(args,
"Target group: " + console.text_color.yellow + str(meshbook["group"]))
elif "groups" in meshbook:
console.nice_print(args,
"Target groups: " + console.text_color.yellow + str(meshbook["groups"]))
# RUNNING PARAMETERS PRINTING
console.nice_print(args, "Grace: " + console.text_color.yellow + str((not args.nograce))) # Negation of bool for correct explanation
console.nice_print(args, "Silent: " + console.text_color.yellow + "False") # Can be pre-defined because if silent flag was passed then none of this would be printed.
session = await init_connection(credentials)
# PROCESS PRINTING aka what its doing in the moment...
console.nice_print(args,
console.text_color.reset + ("-" * 40))
console.nice_print(args,
console.text_color.italic + "Trying to load the MeshCentral account credential file...")
console.nice_print(args,
console.text_color.italic + "Trying to load the meshbook yaml file and compile it into something workable...")
console.nice_print(args,
console.text_color.italic + "Trying to load the Operating System categorisation JSON file...")
console.nice_print(args,
console.text_color.italic + "Connecting to MeshCentral and establish a session using variables from previous credential file.")
console.nice_print(args,
console.text_color.italic + "Generating group list with nodes and reference the targets from that.")
'''
End of the main information displaying section.
'''
group_list = await transform.compile_group_list(session)
compiled_device_list = await gather_targets(args, meshbook, group_list, os_categories)
if len(compiled_device_list["target_list"]) == 0:
console.nice_print(args, console.text_color.red + "No targets found or targets unreachable, quitting.", True)
console.nice_print(args, console.text_color.reset + ("-" * 40), True)
else:
console.nice_print(args, console.text_color.reset + ("-" * 40))
match meshbook:
case {"group": candidate_target_name}:
target_name = candidate_target_name
case {"groups": candidate_target_name}:
target_name = str(candidate_target_name)
case {"device": candidate_target_name}:
target_name = candidate_target_name
case {"devices": candidate_target_name}:
target_name = str(candidate_target_name)
console.nice_print(args, console.text_color.yellow + "Executing meshbook on the target(s): " + console.text_color.green + target_name + ".")
if not args.nograce:
output_text(("\033[91mInitiating grace-period...\x1B[0m"), False)
for x in range(3):
output_text(("\033[91m{}...\x1B[0m".format(x+1)), False)
console.nice_print(args, console.text_color.yellow + "Initiating grace-period...")
for x in range(grace_period):
console.nice_print(args, console.text_color.yellow + "{}...".format(x+1)) # Countdown!
await asyncio.sleep(1)
output_text(("-" * 40), False)
await execute_playbook(session, targets_list, playbook, group_list)
console.nice_print(args, console.text_color.reset + ("-" * 40))
await executor.execute_meshbook(args,
session,
compiled_device_list,
meshbook,
group_list)
await session.close()
except OSError as message:
output_text(message, True)
console.nice_print(args, console.text_color.red + message, True)
if __name__ == "__main__":
asyncio.run(main())

25
modules/console.py Normal file
View File

@@ -0,0 +1,25 @@
# Public Python libraries
import argparse
class console:
class text_color:
black = "\033[30m"
red = "\033[31m"
green = "\033[32m"
yellow = "\033[33m"
blue = "\033[34m"
magenta = "\033[35m"
cyan = "\033[36m"
white = "\033[37m"
italic = "\x1B[3m"
reset = "\x1B[0m"
def nice_print(args: argparse.Namespace, message: str, final: bool=False):
'''
Helper function for terminal output, with a couple variables for the silent flag. Also clears terminal color each time.
'''
if final:
print(message) # Assuming final message, there is no need for clearing.
elif not args.silent:
print(message + console.text_color.reset)

65
modules/executor.py Normal file
View File

@@ -0,0 +1,65 @@
# Public Python libraries
import argparse
import json
import meshctrl
from time import sleep
# Local Python libraries/modules
from modules.console import console
from modules.utilities import transform
intertask_delay = 0.5
class executor:
async def execute_meshbook(args: argparse.Namespace, session: meshctrl.Session, compiled_device_list: dict, meshbook: dict, group_list: dict) -> None:
'''
Actual function that handles meshbook execution, also responsible for formatting the resulting JSON.
'''
responses_list = {}
targets = compiled_device_list["target_list"]
offline = compiled_device_list["offline_list"]
round = 1
for task in meshbook["tasks"]:
console.nice_print(args,
console.text_color.green + str(round) + ". Running: " + task["name"])
if "powershell" in meshbook and meshbook["powershell"]:
response = await session.run_command(nodeids=targets, command=task["command"],powershell=True,ignore_output=False,timeout=900)
else:
response = await session.run_command(nodeids=targets, command=task["command"],ignore_output=False,timeout=900)
task_batch = []
for device in response:
device_result = response[device]["result"]
response[device]["result"] = device_result.replace("Run commands completed.", "")
response[device]["device_id"] = device
response[device]["device_name"] = await transform.translate_nodeid_to_name(device, group_list)
task_batch.append(response[device])
responses_list["Task " + str(round)] = {
"task_name": task["name"],
"data": task_batch
}
round += 1
sleep(intertask_delay) # Sleep for x amount of time.
for index, device in enumerate(offline): # Replace Device_id with actual human readable name
device_name = await transform.translate_nodeid_to_name(device, group_list)
offline[index] = device_name
responses_list["Offline"] = offline
console.nice_print(args,
console.text_color.reset + ("-" * 40))
if args.indent:
if not args.raw_result:
responses_list = transform.process_shell_response(args.shlex, responses_list)
console.nice_print(args,
json.dumps(responses_list,indent=4), True)
else:
console.nice_print(args,
json.dumps(responses_list), True)

219
modules/utilities.py Normal file
View File

@@ -0,0 +1,219 @@
# Public Python libraries
import argparse
from configparser import ConfigParser
import meshctrl
import os
import yaml
'''
Creation and compilation of the MeshCentral nodes list (list of all nodes available to the user in the configuration) is handled in the following section.
'''
class utilities:
async def load_config(args: argparse.Namespace,
segment: str = 'meshcentral-account') -> dict:
'''
Function that loads the segment from the config.conf (by default) file and returns the it in a dict.
'''
conf_file = args.conf
if not os.path.exists(conf_file):
print(f'Missing config file {conf_file}. Provide an alternative path.')
os._exit(1)
config = ConfigParser()
try:
config.read(conf_file)
except Exception as err:
print(f"Error reading configuration file '{conf_file}': {err}")
os._exit(1)
if segment not in config:
print(f'Segment "{segment}" not found in config file {conf_file}.')
os._exit(1)
return config[segment]
async def compile_book(meshbook_file: dict) -> dict:
'''
Simple function that opens the file and replaces placeholders through the next function. After that just return it.
'''
meshbook = open(meshbook_file, 'r')
meshbook = await transform.replace_placeholders(yaml.safe_load(meshbook))
return meshbook
def get_os_variants(target_category: str,
os_map: dict) -> set:
'''
Extracts all OS names under a given category if it exists.
'''
for key, value in os_map.items():
if key == target_category:
if isinstance(value, dict): # Expand nested categories
os_set = set()
for sub_target_cat in value:
os_set.update(utilities.get_os_variants(sub_target_cat, value))
return os_set
elif isinstance(value, list): # Direct OS list
return set(value)
return set()
async def filter_targets(devices: list[dict],
os_categories: dict,
target_os: str = None,
ignore_categorisation: bool = False,
target_tag: str = None) -> dict:
'''
Filters devices based on reachability and optional OS criteria, supporting nested OS categories.
'''
valid_devices = []
offline_devices = []
# Identify correct OS filtering scope
for key in os_categories:
if key == target_os:
allowed_os = utilities.get_os_variants(target_os, os_categories)
break # Stop searching once a match is found
if isinstance(os_categories[key], dict) and target_os in os_categories[key]:
allowed_os = utilities.get_os_variants(target_os, os_categories[key])
break # Stop searching once a match is found
for device in devices: # Filter out unwanted or unreachable devices.
if target_tag and target_tag not in device["device_tags"]:
continue
if not ignore_categorisation:
if device["device_os"] not in allowed_os:
continue
else:
if target_os not in device["device_os"]:
continue
if not device["reachable"]:
offline_devices.append(device["device_id"])
continue
valid_devices.append(device["device_id"])
return {
"valid_devices": valid_devices,
"offline_devices": offline_devices
}
async def process_device_or_group(pseudo_target: str,
group_list: dict,
os_categories: dict,
target_os: str,
ignore_categorisation: bool,
target_tag: str) -> dict:
'''
Helper function to process devices or groups.
'''
matched_devices = []
for group in group_list:
for device in group_list[group]:
if device["device_name"] == pseudo_target:
matched_devices.append(device)
if matched_devices:
return await utilities.filter_targets(matched_devices, os_categories, target_os, ignore_categorisation, target_tag)
return []
import shlex
class transform:
def process_shell_response(shlex_enable: bool, meshbook_result: dict) -> dict:
for task_name, task_data in meshbook_result.items():
if task_name == "Offline": # Failsafe
continue
for node_responses in task_data["data"]:
task_result = node_responses["result"].splitlines()
if shlex_enable:
for index, line in enumerate(task_result):
line = shlex.split(line)
task_result[index] = line
clean_output = []
for line in task_result:
if len(line) > 0:
clean_output.append(line)
node_responses["result"] = clean_output
return meshbook_result
async def translate_nodeid_to_name(target_id: str, group_list: dict) -> str:
'''
Simple function that looks up nodeid to the human-readable name if existent - otherwise return None.
'''
for group in group_list:
for device in group_list[group]:
if device["device_id"] == target_id:
return device["device_name"]
return None
async def replace_placeholders(meshbook: dict) -> dict:
'''
Replace the placeholders in both name and command fields of the tasks. According to the variables defined in the variables list.
'''
variables = {}
if "variables" in meshbook and isinstance(meshbook["variables"], list):
for var in meshbook["variables"]:
var_name = var["name"]
var_value = var["value"]
variables[var_name] = var_value
else:
return meshbook
for task in meshbook.get("tasks", []):
task_name = task.get("name")
for var_name, var_value in variables.items():
placeholder = f"{{{{ {var_name} }}}}"
task_name = task_name.replace(placeholder, var_value)
task["name"] = task_name
command = task.get("command")
for var_name, var_value in variables.items():
placeholder = f"{{{{ {var_name} }}}}" # Create the placeholder string like "{{ host1 }}"
command = command.replace(placeholder, var_value)
task["command"] = command
return meshbook
async def compile_group_list(session: meshctrl.Session) -> dict:
'''
Function that retrieves the devices from MeshCentral and compiles it into a efficient list.
'''
devices_response = await session.list_devices(details=False, timeout=10)
local_device_list = {}
for device in devices_response:
if device.meshname not in local_device_list:
local_device_list[device.meshname] = []
local_device_list[device.meshname].append({
"device_id": device.nodeid,
"device_name": device.name,
"device_os": device.os_description,
"device_tags": device.tags,
"reachable": device.connected
})
return local_device_list

25
os_categories.json Normal file
View File

@@ -0,0 +1,25 @@
{
"Linux": {
"Debian": [
"Debian GNU/Linux 12 (bookworm)",
"Debian GNU/Linux 11 (bullseye)"
],
"Ubuntu": [
"Ubuntu 24.04.1 LTS",
"Ubuntu 22.04.5 LTS",
"Ubuntu 20.04.6 LTS"
]
},
"MacOS": {
"Sequoia": [
"macOS 15.0.1"
]
},
"Windows": {
"11": [
"Microsoft Windows 11 Home - 24H2/26100",
"Microsoft Windows 11 Pro - 24H2/26100"
]
}
}

View File

@@ -1,4 +1,3 @@
asyncio==3.4.3
configparser==7.1.0
colorama==0.4.6
pyyaml==6.0.2
libmeshctrl==1.1.0
libmeshctrl==1.2.0

View File

@@ -1,4 +1,4 @@
[meshcentral-account]
websocket_url =
hostname =
username =
password =