44 Commits
v1.2 ... main

Author SHA1 Message Date
e710dadfcd chore: bump libmeshctrl 2026-02-19 09:01:01 +01:00
Daan Selen
00e63e1a96 chore: remove old notice 2026-02-06 17:42:02 +01:00
Daan Selen
10057b5f91 correct text 2026-02-06 17:41:31 +01:00
Daan Selen
2cfd7ad06e chore: add caution notice 2026-02-06 17:40:35 +01:00
Daan Selen
6374ea50b7 docs: update README 2026-02-06 17:38:50 +01:00
f58e06ddec chore: edit readme 2026-01-29 16:48:44 +01:00
30026a18bd fix: KeyError due to previous commit 2026-01-15 13:40:21 +01:00
ea77ea1904 chore: add del to groups and devices 2026-01-15 13:38:31 +01:00
cc454dff40 chore: add more ends of sentences 2026-01-06 08:48:33 +01:00
cd10645056 chore: catch wrong keywords better 2026-01-02 15:54:04 +01:00
a652ea99d3 chore: fix apparent missing reference 2026-01-02 15:01:15 +01:00
e2cc746517 chore: add newline after content of meshbook in file 2026-01-02 09:40:37 +01:00
DaanSelen
9716a2376c release version 1.4 (#21)
* chore: init working on history functions
rename classes

* refac: remove raw-result option. I cannot find a use ( I have found it later on )

* chore: add basic history directory checking

* chore: further expansion on the code. Now its christmas

* chore: rework some code and make a logging/history feature workable

---------

Co-authored-by: DaanSelen <dselen@systemec.nl>
2025-12-29 10:55:57 +01:00
3a0fc215d7 fix: increase default timeout 2025-11-17 09:01:16 +01:00
dd1c97c56c chore: add debian version 2025-11-17 08:57:58 +01:00
9d49032857 fix: when not specifying any, grab all 2025-11-05 15:53:38 +01:00
dependabot[bot]
a7601a302a Bump libmeshctrl from 1.2.2 to 1.3.2 (#19)
Bumps [libmeshctrl](https://github.com/HuFlungDu/pylibmeshctrl) from 1.2.2 to 1.3.2.
- [Release notes](https://github.com/HuFlungDu/pylibmeshctrl/releases)
- [Changelog](https://github.com/HuFlungDu/pylibmeshctrl/blob/main/CHANGELOG.rst)
- [Commits](https://github.com/HuFlungDu/pylibmeshctrl/compare/1.2.2...1.3.2)

---
updated-dependencies:
- dependency-name: libmeshctrl
  dependency-version: 1.3.2
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-10-23 13:45:28 +02:00
4a92eb986c fix: increase consistency due to regression 2025-10-15 15:27:07 +02:00
dependabot[bot]
d248f0bcbe Bump libmeshctrl from 1.2.2 to 1.3.1 (#16)
Bumps [libmeshctrl](https://github.com/HuFlungDu/pylibmeshctrl) from 1.2.2 to 1.3.1.
- [Release notes](https://github.com/HuFlungDu/pylibmeshctrl/releases)
- [Changelog](https://github.com/HuFlungDu/pylibmeshctrl/blob/main/CHANGELOG.rst)
- [Commits](https://github.com/HuFlungDu/pylibmeshctrl/compare/1.2.2...1.3.1)

---
updated-dependencies:
- dependency-name: libmeshctrl
  dependency-version: 1.3.1
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-09-30 15:56:17 +02:00
dependabot[bot]
2fae74d600 Bump pyyaml from 6.0.2 to 6.0.3 (#17)
Bumps [pyyaml](https://github.com/yaml/pyyaml) from 6.0.2 to 6.0.3.
- [Release notes](https://github.com/yaml/pyyaml/releases)
- [Changelog](https://github.com/yaml/pyyaml/blob/6.0.3/CHANGES)
- [Commits](https://github.com/yaml/pyyaml/compare/6.0.2...6.0.3)

---
updated-dependencies:
- dependency-name: pyyaml
  dependency-version: 6.0.3
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-09-30 15:53:46 +02:00
465905a6ec feat: enable dependabot 2025-09-30 15:51:26 +02:00
80ad7f865a refac: remove seemingly unneeded function 2025-09-25 15:40:58 +02:00
9d2999476d refac: added some type checking and fixed a duplicate bug (apparently) 2025-09-25 09:49:19 +02:00
DaanSelen
82cc31e0f6 Update executor.py 2025-09-16 10:37:13 +02:00
f34d1dc7ae refac(meshbook): rework the targeting module to now be case insensitive 2025-09-10 16:26:52 +02:00
DaanSelen
f0e9e40cca chore(docs): README correction 2025-09-10 13:28:42 +02:00
208e9c1223 chore(os-handling): added Debian 13 to the default Debian list 2025-08-26 13:51:18 +02:00
DaanSelen
a736a74af6 Added more robust code checking (#15)
Co-authored-by: Daan Selen <dselen@systemec.nl>
2025-08-07 13:56:38 +02:00
DaanSelen
733136c1ab Version 1.3.1 (#14)
* Looks to be manual overriding, testing now.

* Working

* Added a nice way of handling it intern

---------

Co-authored-by: Daan Selen <dselen@systemec.nl>
2025-08-07 11:14:36 +02:00
Daan Selen
7e10b98c3b Edited template for totp_secret 2025-07-21 23:10:00 +02:00
Daan Selen
615a438003 Changed compatibility 2025-07-21 22:42:16 +02:00
Daan Selen
07d0b99c47 added pyotp in requirements 2025-07-21 22:29:09 +02:00
Daan Selen
2447f65599 Changed Readme. (With the help of some AI) 2025-07-16 20:43:45 +02:00
Daan Selen
e729c72c6a added support for totp 2025-07-11 16:24:21 +02:00
Daan Selen
b20d56170e bumped libmeshctrl 2025-06-30 20:51:11 +02:00
Daan Selen
f52464909a bump libmeshctrl 2025-06-15 21:27:16 +02:00
Daan Selen
4b741c8089 badge addition 2025-06-05 15:49:21 +02:00
Daan Selen
89a57e0a1b Made it more json friendly by removing spaces. 2025-05-02 15:16:49 +02:00
Daan Selen
764ed1ef10 Minor indentation change 2025-04-30 16:58:48 +02:00
Daan Selen
f857b79d82 Added dedicated var 2025-04-25 16:09:36 +02:00
dselen
58598d8d17 Rewrote Meshbook with submodules with classes (#12)
* Massive update, bringing a lot of QoL. RC

* 0.5 seconds delay between tasks.
    insignificant for humans.
    Like a thousand years for computers to get ready.

* Added new features such as ignore_categorisation.

* Version 1.3 RC

* Added defaults.
2025-04-25 16:03:07 +02:00
Daan Selen
ac4dd8994c Removed more prints 2025-04-07 16:55:58 +02:00
Daan Selen
7a60cd7280 hotfix 2025-04-07 16:52:19 +02:00
Daan
e2eca57a0a bump libmeshctrl 2025-04-01 22:17:20 +02:00
11 changed files with 988 additions and 482 deletions

11
.github/dependabot.yml vendored Normal file
View File

@@ -0,0 +1,11 @@
# To get started with Dependabot version updates, you'll need to specify which
# package ecosystems to update and where the package manifests are located.
# Please see the documentation for all configuration options:
# https://docs.github.com/code-security/dependabot/dependabot-version-updates/configuration-options-for-the-dependabot.yml-file
version: 2
updates:
- package-ecosystem: "pip"
directory: "/"
schedule:
interval: "weekly"

1
.gitignore vendored
View File

@@ -2,6 +2,7 @@
venv
books
.vscode
important/
# Byte-compiled / optimized / DLL files
__pycache__/

343
README.md
View File

@@ -1,188 +1,273 @@
> [!NOTE]
> *If you experience issues or have suggestions, submit an issue! https://github.com/DaanSelen/meshbook/issues I'll respond ASAP!*
# Meshbook
A way to programmatically manage MeshCentral-managed machines, inspired by applications like [Ansible](https://github.com/ansible/ansible).<br>
What problem does it solve? Well, what I wanted to be able to do is to automate system updates through [MeshCentral](https://github.com/ylianst/meshcentral). And some machines are behind unmanaged or 3rd party managed firewalls.<br>
And many people will be comfortable with YAML configurations! It's almost like JSON, but different!<br>
[![CodeQL Advanced](https://github.com/DaanSelen/meshbook/actions/workflows/codeql.yaml/badge.svg)](https://github.com/DaanSelen/meshbook/actions/workflows/codeql.yaml)
# Quick-start:
> [!NOTE]
> 💬 If you experience issues or have suggestions, [submit an issue](https://github.com/DaanSelen/meshbook/issues) — I'll respond ASAP!
The quickest way to start is to grab a template from the templates folder in this repository.<br>
Make sure to correctly pass the MeshCentral websocket API as `wss://<MeshCentral-Host>`.<br>
And make sure to fill in the credentails of an account which has `Remote Commands`, `Details` and `Agent Console` permissions on the targeted devices or groups.<br>
Meshbook is a tool to **programmatically manage MeshCentral-managed machines**, inspired by tools like [Ansible](https://github.com/ansible/ansible).
> I did this through a "Service account" with rights on the device group.
## What problem does it solve?
Then make a yaml with a target and some commands! See below examples as a guideline. And do not forget to look at the bottom's notice.<br>
To install, follow the following commands:<br>
Meshbook is designed to:
### Linux setup:
* Automate system updates or commands across multiple systems via [MeshCentral](https://github.com/Ylianst/MeshCentral), even behind third-party-managed firewalls.
* Allow configuration using simple and readable **YAML files** (like Ansible playbooks).
* Simplify the use of **group-based** or **tag-based** device targeting.
## 🏁 Quick Start
### ✅ Prerequisites
* Python 3
* Access to a MeshCentral instance and credentials with:
* `Remote Commands`
* `Details`
* `Agent Console` permissions
A service account with access to the relevant device groups is recommended.
### 🔧 Installation
#### Linux
```bash
git clone https://github.com/daanselen/meshbook
cd ./meshbook
python3 -m venv ./venv
source ./venv/bin/activate
pip3 install -r ./requirements.txt
cp ./templates/meshcentral.conf.template ./meshcentral.conf
pip install -r requirements.txt
cp ./templates/api.conf.template ./api.conf
```
### Windows setup (PowerShell, not cmd):
Next, make sure to fill in the following file:
```shell
```
nano ./api.conf
```
#### Windows (PowerShell)
```powershell
git clone https://github.com/daanselen/meshbook
cd ./meshbook
python -m venv ./venv # or python3 when done through the Microsoft Store.
.\venv\Scripts\activate # Make sure to check the terminal prefix.
pip3 install -r ./requirements.txt
cp .\templates\meshcentral.conf.template .\meshcentral.conf
cd .\meshbook
python -m venv .\venv
.\venv\Scripts\activate
pip install -r .\requirements.txt
cp .\templates\api.conf.template .\api.conf
```
Now copy the configuration template from ./templates and fill it in with the correct details (remove .template from the file) this is shown in the last step of the setup(s).<br>
The url should start with `wss://`.<br>
You can check pre-made examples in the examples directory, make sure the values are set to your situation.<br>
After this you can use meshbook, for example:
Also here, make sure to fill in the `./api.conf` file.
### Linux run:
> [!CAUTION]
> Meshbook will not work without a properly filled in `api.conf` file.
## 🚀 Running Meshbook
Once installed and configured, run a playbook like this:
### Linux:
```bash
python3 .\meshbook.py -pb .\examples\echo.yaml
python3 meshbook.py -mb ./examples/echo_example.yaml
```
### Windows run:
### Windows:
```shell
.\venv\Scripts\python.exe .\meshbook.py -pb .\examples\echo_example.yaml
```powershell
.\venv\Scripts\python.exe .\meshbook.py -mb .\examples\echo_example.yaml
```
### How to check if everything is okay?
Use `--help` to explore available command-line options:
The python virtual environment can get messed up, therefore...<br>
To check if everything is in working order, make sure that the lists from the following commands are aligned:
```
python3 -m pip list
pip3 list
```bash
python3 meshbook.py --help
```
If not, perhaps you are using the wrong executable, the wrong environment and so on...
## 🛠️ Creating Configurations
# How to create a configuration?
Meshbook configurations are written in YAML. Below is an overview of supported fields.
This paragraph explains how the program interprets certain information.
### Targeting:
MeshCentral has `meshes` or `groups`, in this program they are called `group(s)`. Because of the way I designed this.<br>
So to target for example a mesh/group in MeshCentral called: "Nerthus" do:
> If your group has multiple words, then you need to use `"` to group the words.
### ▶️ Group Targeting (Primary*)
```yaml
---
name: example configuration
group: "Nerthus"
#target_os: "Linux" # <--- according to os_categories.json.
powershell: True # <--- this can be important for Windows clients.
name: My Configuration
group: "Dev Machines"
powershell: true
variables:
- name: var1
value: "This is the first variable"
- name: message
value: "Hello from Meshbook"
tasks:
- name: echo the first variable!
command: 'echo "{{ var1 }}"'
- name: Echo a message
command: 'echo "{{ message }}"'
```
It is also possible to target a single device, as seen in: [here](./examples/apt_update_example.yaml).<br>
* `group`: MeshCentral group (aka "mesh"). Quotation marks required for multi-word names.
* `powershell`: Set `true` for PowerShell commands on Windows clients.
### Variables:
### ▶️ Device Targeting (Secondary*)
Variables are done by replacing the placeholders just before the runtime (the Python program does this, not you).<br>
So if you have var1 declared, then the value of that declaration is placed wherever it finds {{ var1 }}.<br>
This is done to imitate popular methods. See below [from the example](./examples/variable_usage_example.yaml).<br>
You can also target a **specific device** rather than a group. See [`apt_update_example.yaml`](./examples/linux/apt_update_example.yaml) for reference.
### Tasks:
### ▶️ Variables
The tasks you want to run should be contained under the `tasks:` with two fields, `name` and `command`.<br>
The name field is for the user of meshbook, to clarify what the following command does in a summary.<br>
The command field actually gets executed on the end-point.<br>
### Windows Client Extra-information:
If you want to launch commands at Windows machines, make sure you have your `os_categories.conf` up-to-date with the correct reported Windows versions.<br>
And then make sure to create compatible commands, see: [windows examples](./examples/windows)<br>
Related is the yaml option: `powershell: True`.
### Granual Operating System filtering:
I have made the program so it can have a filter with the Operating systems. If you have a mixed group, please read:
[This explanation](./docs/operating_system_filtering.md)
### Tag filtering:
Filtering on MeshCentral tags is also possible with `target_tag` inside the meshbook. This string is case-sensitive, lower- and uppercase must match.<br>
This is done because its human made and therefor needs to be keps well administrated.
# Example:
For the example, I used the following yaml file (you can find more in [this directory](./examples/)):
The below group: `Dev` has three devices, of which one is offline, Meshbook checks if the device is reachable.<br>
You can expand the command chain as follows:<br>
Variables are replaced by Meshbook before execution. Syntax:
```yaml
---
name: Echo a string to the terminal through the meshbook example.
variables:
- name: example_var
value: "Example value"
tasks:
- name: Use the variable
command: 'echo "{{ example_var }}"'
```
* Primary and Secondary mark the order in which will take prescendence
### ▶️ Tasks
Define multiple tasks:
```yaml
tasks:
- name: Show OS info
command: "cat /etc/os-release"
```
Each task must include:
* `name`: Description for human readability.
* `command`: The actual shell or PowerShell command.
## 🪟 Windows Client Notes
* Keep your `os_categories.json` up to date for proper OS filtering.
* Ensure Windows commands are compatible (use `powershell: true` if needed).
* Examples are available in [`examples/windows`](./examples/windows).
## 🔎 OS & Tag Filtering
### Filter by OS
You can limit commands to specific OS types:
```yaml
target_os: "Linux" # As defined in os_categories.json
```
See [docs/operating\_system\_filtering.md](./docs/operating_system_filtering.md) for details.
### Filter by Tag
You can also filter using MeshCentral tags:
```yaml
target_tag: "Production"
```
> ⚠️ Tag values are **case-sensitive**.
## 📋 Example Playbook
```yaml
name: Echo OS Info
group: "Dev"
#target_os: "Linux" # <--- according to os_categories.json
target_os: "Linux"
variables:
- name: file
value: "/etc/os-release"
tasks:
- name: Echo!
- name: Show contents of os-release
command: "echo $(cat {{ file }})"
```
The following response it received when executing the first yaml of the above files (without the `-s` parameters, which just outputs the below JSON).
Sample output:
```shell
~/meshbook$ python3 meshbook.py -pb examples/echo_example.yaml
----------------------------------------
Playbook: examples/echo_example.yaml
Operating System Categorisation file: ./os_categories.json
Congiguration file: ./meshcentral.conf
Target group: Development
Grace: True
Silent: False
----------------------------------------
Trying to load the MeshCentral account credential file...
Trying to load the Playbook yaml file and compile it into something workable...
Trying to load the Operating System categorisation JSON file...
Connecting to MeshCentral and establish a session using variables from previous credential file.
Generating group list with nodes and reference the targets from that.
----------------------------------------
Executing playbook on the target(s): Development.
Initiating grace-period...
1...
2...
3...
----------------------------------------
1. Running: Echo!
----------------------------------------
{"Task 1": "ALL THE DATA"} # Not sharing due to PID
```json
{
"task 1": {
"task_name": "Show contents of os-release",
"data": [
{
"command": "echo $(cat /etc/os-release)",
"result": [
"NAME=\"Ubuntu\"",
"VERSION=\"22.04.4 LTS (Jammy Jellyfish)\""
],
"complete": true,
"device_name": "dev-host1"
}
]
}
}
```
The above without `-s` is quite verbose. use `--help` to read about parameters and getting a minimal response for example.
# Important Notice:
## ⚠ Blocking Commands Warning
If you want to use this, make sure to use `NON-BLOCKING` commands. MeshCentral does not work if you send it commands that wait.<br>
A couple examples of `BLOCKING COMMANDS` which will never get back to the main MeshCentral server, and Meshbook will quit after the timeout but the agent will not come back:
Avoid using commands that **block indefinitely** — MeshCentral requires **non-blocking** execution.
```shell
apt upgrade # without -y.
🚫 Examples of bad (blocking) commands:
sleep infinity
ping 1.1.1.1 # without a -c flag (because it pings forever).
```bash
apt upgrade # Without -y
sleep infinity # Will never return
ping 1.1.1.1 # Without -c
```
✅ Use instead:
```bash
apt upgrade -y
sleep 3s
ping 1.1.1.1 -c 1
```
## 🧪 Check Python Environment
Sometimes the wrong Python interpreter or environment is used. To verify:
```bash
python3 -m pip list
pip3 list
```
The lists should match. If not, make sure the correct environment is activated.
## 📂 Project Structure (excerpt)
```bash
meshbook/
├── books/
│ ├── apt-update.yaml
│ └── rdp.yaml
├── examples/
│ ├── linux/
│ │ ├── apt_update_example.yaml
│ │ └── ...
│ └── windows/
│ ├── get_sys_info.yaml
│ └── ...
├── modules/
│ ├── executor.py
│ └── utilities.py
├── meshbook.py
├── os_categories.json
├── requirements.txt
├── templates/
│ └── api.conf.template
```
## 📄 License
This project is licensed under the terms of the GPL3 License. See [LICENSE](./LICENSE).

View File

@@ -1,411 +1,286 @@
#!/bin/python3
# Public Python libraries
import argparse
import asyncio
from base64 import b64encode
from colorama import just_fix_windows_console
from configparser import ConfigParser
import pyotp
import json
import meshctrl
import os
import yaml
# Local Python libraries/modules
from modules.console import Console
from modules.executor import Executor
from modules.history import History
from modules.utilities import Transform, Utilities
meshbook_version = "1.3.2"
grace_period = 3 # Grace period will last for x (by default 3) second(s).
'''
Script utilities are handled in the following section.
'''
def define_cmdargs() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Process command-line arguments")
class ScriptEndTrigger(Exception):
pass
parser.add_argument("-mb", "--meshbook", type=str, help="Path to the meshbook yaml file.")
class text_color:
black = "\033[30m"
red = "\033[31m"
green = "\033[32m"
yellow = "\033[33m"
blue = "\033[34m"
magenta = "\033[35m"
cyan = "\033[36m"
white = "\033[37m"
italic = "\x1B[3m"
reset = "\x1B[0m"
parser.add_argument("--historydir", type=str, help="Define a custom history log directory (default: ./history).", default="./history")
parser.add_argument("--nohistory", action="store_true", help="Disable the logging of the history into a local log (text) file inside './history'.")
parser.add_argument("--flushhistory", action="store_true", help="Clear old history logs before running the Meshbook.")
def console(message: str, final: bool=False):
'''
Helper function for terminal output, with a couple variables for the silent flag. Also clears terminal color each time.
'''
if final:
print(message) # Assuming final message, there is no need for clearing.
elif not args.silent:
print(message + text_color.reset)
parser.add_argument("-oc", "--oscategories", type=str, help="Path to the Operating System categories JSON file.", default="./os_categories.json")
parser.add_argument("--conf", type=str, help="Path for the API configuration file (default: ./config.conf).", default="./api.conf")
parser.add_argument("--nograce", action="store_true", help="Disable the grace 3 seconds before running the meshbook.")
async def load_config(segment: str = 'meshcentral-account') -> dict:
'''
Function that loads the segment from the config.conf (by default) file and returns the it in a dict.
'''
parser.add_argument("-g", "--group", type=str, help="Specify a manual override for the group.", default="")
parser.add_argument("-d", "--device", type=str, help="Specify a manual override for a device.", default="")
parser.add_argument("-i", "--indent", action="store_true", help="Use an JSON indentation of 4 when this flag is passed.", default=False)
parser.add_argument("-s", "--silent", action="store_true", help="Suppress terminal output.", default=False)
parser.add_argument("--shlex", action="store_true", help="Shlex the lines. (SHell LEXical Analysis)", default=False)
conf_file = args.conf
if not os.path.exists(conf_file):
raise ScriptEndTrigger(f'Missing config file {conf_file}. Provide an alternative path.')
parser.add_argument("--version", action="store_true", help="Show the Meshbook version.")
config = ConfigParser()
try:
config.read(conf_file)
except Exception as err:
raise ScriptEndTrigger(f"Error reading configuration file '{conf_file}': {err}")
if segment not in config:
raise ScriptEndTrigger(f'Segment "{segment}" not found in config file {conf_file}.')
return config[segment]
return parser
async def init_connection(credentials: dict) -> meshctrl.Session:
'''
Use the libmeshctrl library to initiate a Secure Websocket (wss) connection to the MeshCentral instance.
'''
session = meshctrl.Session(
credentials['hostname'],
user=credentials['username'],
password=credentials['password']
)
if "totp_secret" in credentials:
totp = pyotp.TOTP(credentials["totp_secret"])
otp = totp.now()
session = meshctrl.Session(
credentials['hostname'],
user=credentials['username'],
password=credentials['password'],
token=otp
)
else:
session = meshctrl.Session(
credentials['hostname'],
user=credentials['username'],
password=credentials['password']
)
await session.initialized.wait()
return session
async def translate_nodeid_to_name(target_id: str, group_list: dict) -> str:
'''
Simple function that looks up nodeid to the human-readable name if existent - otherwise return None.
'''
for group in group_list:
for device in group_list[group]:
if device["device_id"] == target_id:
return device["device_name"]
return None
'''
Creation and compilation happends in the following section, where the yaml gets read in, and edited accordingly.
'''
async def compile_book(meshbook_file: dict) -> dict:
'''
Simple function that opens the file and replaces placeholders through the next function. After that just return it.
'''
meshbook = open(meshbook_file, 'r')
meshbook = await replace_placeholders(yaml.safe_load(meshbook))
return meshbook
async def replace_placeholders(meshbook: dict) -> dict:
'''
Replace the placeholders in both name and command fields of the tasks. According to the variables defined in the variables list.
'''
variables = {}
if "variables" in meshbook and isinstance(meshbook["variables"], list):
for var in meshbook["variables"]:
var_name = var["name"]
var_value = var["value"]
variables[var_name] = var_value
else:
return meshbook
for task in meshbook.get("tasks", []):
task_name = task.get("name")
for var_name, var_value in variables.items():
placeholder = f"{{{{ {var_name} }}}}"
task_name = task_name.replace(placeholder, var_value)
task["name"] = task_name
command = task.get("command")
for var_name, var_value in variables.items():
placeholder = f"{{{{ {var_name} }}}}" # Create the placeholder string like "{{ host1 }}"
command = command.replace(placeholder, var_value)
task["command"] = command
return meshbook
'''
Creation and compilation of the MeshCentral nodes list (list of all nodes available to the user in the configuration) is handled in the following section.
'''
async def compile_group_list(session: meshctrl.Session) -> dict:
'''
Function that retrieves the devices from MeshCentral and compiles it into a efficient list.
'''
devices_response = await session.list_devices(details=False, timeout=10)
local_device_list = {}
for device in devices_response:
if device.meshname not in local_device_list:
local_device_list[device.meshname] = []
local_device_list[device.meshname].append({
"device_id": device.nodeid,
"device_name": device.name,
"device_os": device.os_description,
"device_tags": device.tags,
"reachable": device.connected
})
return local_device_list
async def filter_targets(devices: list[dict], os_categories: dict, target_os: str = None, target_tag: str = None) -> list[str]:
'''
Filters devices based on reachability and optional OS criteria, supporting nested OS categories.
'''
valid_devices = []
def get_os_variants(category: str, os_map: dict) -> set:
'''
Extracts all OS names under a given category if it exists.
'''
for key, value in os_map.items():
if key == category:
if isinstance(value, dict): # Expand nested categories
os_set = set()
for subcat in value:
os_set.update(get_os_variants(subcat, value))
return os_set
elif isinstance(value, list): # Direct OS list
return set(value)
return set()
allowed_os = set()
# Identify correct OS filtering scope
for key in os_categories:
if key == target_os:
allowed_os = get_os_variants(target_os, os_categories)
break # Stop searching once a match is found
if isinstance(os_categories[key], dict) and target_os in os_categories[key]:
allowed_os = get_os_variants(target_os, os_categories[key])
break # Stop searching once a match is found
# Filter out unwanted or unreachable devices.
for device in devices:
if not device["reachable"]:
continue # Skip unreachable devices.
print(target_tag)
print(device["device_tags"])
if target_tag and target_tag not in device["device_tags"]:
continue
if device["device_os"] not in allowed_os:
continue
valid_devices.append(device["device_id"])
return valid_devices
async def gather_targets(meshbook: dict, group_list: dict[str, list[dict]], os_categories: dict) -> list[str]:
'''
Finds target devices based on meshbook criteria (device or group).
'''
target_list = []
target_os = meshbook.get("target_os")
target_tag = meshbook.get("target_tag")
async def process_device_or_group(pseudo_target, group_list, os_categories, target_os) -> list[str]:
'''
Helper function to process devices or groups.
'''
matched_devices = []
for group in group_list:
for device in group_list[group]:
if device["device_name"] == pseudo_target:
matched_devices.append(device)
if matched_devices:
return await filter_targets(matched_devices, os_categories, target_os, target_tag)
return []
match meshbook:
case {"device": pseudo_target}: # Single device target
if isinstance(pseudo_target, str):
matched_devices = await process_device_or_group(pseudo_target, group_list, os_categories, target_os)
target_list.extend(matched_devices)
else:
console(text_color.yellow + "Please use devices (Notice the 'S') for multiple devices.", True)
case {"devices": pseudo_target}: # List of devices
if isinstance(pseudo_target, list):
for sub_pseudo_device in pseudo_target:
matched_devices = await process_device_or_group(sub_pseudo_device, group_list, os_categories, target_os)
target_list.extend(matched_devices)
else:
console(text_color.yellow + "The 'devices' method is being used, but only one string is given. Did you mean 'device'?", True)
case {"group": pseudo_target}: # Single group target
if isinstance(pseudo_target, str) and pseudo_target in group_list:
matched_devices = await filter_targets(group_list[pseudo_target], os_categories, target_os, target_tag)
target_list.extend(matched_devices)
elif pseudo_target not in group_list:
console(text_color.yellow + "Targeted group not found on the MeshCentral server.", True)
else:
console(text_color.yellow + "Please use groups (Notice the 'S') for multiple groups.", True)
case {"groups": pseudo_target}: # List of groups
if isinstance(pseudo_target, list):
for sub_pseudo_target in pseudo_target:
if sub_pseudo_target in group_list:
matched_devices = await filter_targets(group_list[sub_pseudo_target], os_categories, target_os, target_tag)
target_list.extend(matched_devices)
if pseudo_target.lower() == "all":
for group in group_list:
matched_devices = await filter_targets(group_list[group], os_categories, target_os, target_tag)
target_list.extend(matched_devices)
else:
console(text_color.yellow + "The 'groups' method is being used, but only one string is given. Did you mean 'group'?", True)
return target_list
async def execute_meshbook(session: meshctrl.Session, targets: dict, meshbook: dict, group_list: dict) -> None:
'''
Actual function that handles meshbook execution, also responsible for formatting the resulting JSON.
'''
responses_list = {}
round = 1
for task in meshbook["tasks"]:
console(text_color.green + str(round) + ". Running: " + task["name"])
if "powershell" in meshbook and meshbook["powershell"]:
response = await session.run_command(nodeids=targets, command=task["command"],powershell=True,ignore_output=False,timeout=900)
else:
response = await session.run_command(nodeids=targets, command=task["command"],ignore_output=False,timeout=900)
task_batch = []
for device in response:
device_result = response[device]["result"]
response[device]["result"] = device_result.replace("Run commands completed.", "")
response[device]["device_id"] = device
response[device]["device_name"] = await translate_nodeid_to_name(device, group_list)
task_batch.append(response[device])
responses_list["Task " + str(round)] = {
"task_name": task["name"],
"data": task_batch
}
round += 1
console(text_color.reset + ("-" * 40))
if args.indent:
console((json.dumps(responses_list,indent=4)), True)
else:
console(json.dumps(responses_list), True)
async def main():
local_categories_file = "./os_categories.json"
just_fix_windows_console()
'''
Main function where the program starts. Place from which all comands originate (eventually).
'''
parser = argparse.ArgumentParser(description="Process command-line arguments")
parser.add_argument("-mb", "--meshbook", type=str, help="Path to the meshbook yaml file.", required=True)
parser.add_argument("-oc", "--oscategories", type=str, help="Path to the Operating System categories JSON file.", required=False, default="./os_categories.json")
parser.add_argument("--conf", type=str, help="Path for the API configuration file (default: ./config.conf).", required=False, default="./config.conf")
parser.add_argument("--nograce", action="store_true", help="Disable the grace 3 seconds before running the meshbook.", required=False)
parser.add_argument("-i", "--indent", action="store_true", help="Use an JSON indentation of 4 when this flag is passed.", required=False)
parser.add_argument("-s", "--silent", action="store_true", help="Suppress terminal output", required=False)
global args
# Define the cmd arguments
parser = define_cmdargs()
args = parser.parse_args()
local_categories_file = "./os_categories.json"
if args.version:
Console.print_text(args.silent,
Console.text_color.reset + "MeshBook Version: " + Console.text_color.yellow + str(meshbook_version))
return
if not args.meshbook:
parser.print_help()
return
try:
with open(local_categories_file, "r") as file:
os_categories = json.load(file)
if not Utilities.path_exist(args.meshbook) or Utilities.path_type(args.meshbook) != "File":
Console.print_text(args.silent,
Console.text_color.red + "The given meshbook path is either not present on the filesystem or not a file.")
return
credentials, meshbook = await asyncio.gather(
(load_config()),
(compile_book(args.meshbook))
(Utilities.load_config(args)),
(Utilities.compile_book(args.meshbook))
)
if args.group != "":
meshbook["group"] = args.group
if "device" in meshbook:
del meshbook["device"]
if "devices" in meshbook:
del meshbook["devices"]
elif args.device != "":
meshbook["device"] = args.device
if "group" in meshbook:
del meshbook["group"]
if "groups" in meshbook:
del meshbook["groups"]
'''
The following section mainly displays used variables and first steps of the program to the console.
The following section mainly displays used variables and first steps of the program to the Console.
'''
console(text_color.reset + ("-" * 40))
console("meshbook: " + text_color.yellow + args.meshbook)
console("Operating System Categorisation file: " + text_color.yellow + args.oscategories)
console("Configuration file: " + text_color.yellow + args.conf)
# INIT ARGUMENTS PRINTING
Console.print_line(args.silent)
Console.print_text(args.silent,
"meshbook: " + Console.text_color.yellow + args.meshbook + Console.text_color.reset + ".")
Console.print_text(args.silent,
"Operating System Categorisation file: " + Console.text_color.yellow + args.oscategories + Console.text_color.reset + ".")
Console.print_text(args.silent,
"Configuration file: " + Console.text_color.yellow + args.conf + Console.text_color.reset + ".")
# TARGET OS PRINTING
if "target_os" in meshbook:
console("Target Operating System category given: " + text_color.yellow + meshbook["target_os"])
Console.print_text(args.silent,
"Target Operating System category given: " + Console.text_color.yellow + meshbook["target_os"] + Console.text_color.reset + ".")
else:
console("Target Operating System category given: " + text_color.yellow + "All")
Console.print_text(args.silent,
"Target Operating System category given: " + Console.text_color.yellow + "All" + Console.text_color.reset + ".")
# Should Meshbook ignore categorisation?
if "ignore_categorisation" in meshbook:
Console.print_text(args.silent,
"Ignore the OS Categorisation file: " + Console.text_color.yellow + str(meshbook["ignore_categorisation"]) + Console.text_color.reset + ".")
if meshbook["ignore_categorisation"]:
Console.print_text(args.silent,
Console.text_color.red + "!!!!\n" +
Console.text_color.yellow +
"Ignore categorisation is True.\nThis means that the program checks if the target Operating System is somewhere in the reported device Operating System." +
Console.text_color.red + "\n!!!!")
else:
Console.print_text(args.silent,
"Ignore the OS Categorisation file: " + Console.text_color.yellow + "False" + Console.text_color.reset + ".")
# TARGET TAG PRINTING
if "target_tag" in meshbook:
Console.print_text(args.silent,
"Target Device tag given: " + Console.text_color.yellow + meshbook["target_tag"] + Console.text_color.reset + ".")
else:
Console.print_text(args.silent,
"Target Device tag given: " + Console.text_color.yellow + "All" + Console.text_color.reset + ".")
# TARGET PRINTING
if "device" in meshbook:
console("Target device: " + text_color.yellow + str(meshbook["device"]))
Console.print_text(args.silent,
"Target device: " + Console.text_color.yellow + str(meshbook["device"]) + Console.text_color.reset + ".")
elif "devices" in meshbook:
Console.print_text(args.silent,
"Target devices: " + Console.text_color.yellow + str(meshbook["devices"]) + Console.text_color.reset + ".")
elif "group" in meshbook:
console("Target group: " + text_color.yellow + str(meshbook["group"]))
Console.print_text(args.silent,
"Target group: " + Console.text_color.yellow + str(meshbook["group"]) + Console.text_color.reset + ".")
elif "groups" in meshbook:
Console.print_text(args.silent,
"Target groups: " + Console.text_color.yellow + str(meshbook["groups"]) + Console.text_color.reset + ".")
console("Grace: " + text_color.yellow + str((not args.nograce))) # Negation of bool for correct explanation
console("Silent: " + text_color.yellow + "False") # Can be pre-defined because if silent flag was passed then none of this would be printed.
# RUNNING PARAMETERS PRINTING
Console.print_text(args.silent, "Grace: " + Console.text_color.yellow + str(not args.nograce) + Console.text_color.reset + ".") # Negation of bool for correct explanation
Console.print_text(args.silent, "Silent: " + Console.text_color.yellow + "False" + Console.text_color.reset + ".") # Can be pre-defined because if silent flag was passed then none of this would be printed.
session = await init_connection(credentials)
console(text_color.reset + ("-" * 40))
console(text_color.italic + "Trying to load the MeshCentral account credential file...")
console(text_color.italic + "Trying to load the meshbook yaml file and compile it into something workable...")
console(text_color.italic + "Trying to load the Operating System categorisation JSON file...")
console(text_color.italic + "Connecting to MeshCentral and establish a session using variables from previous credential file.")
console(text_color.italic + "Generating group list with nodes and reference the targets from that.")
# PROCESS PRINTING aka what its doing in the moment...
Console.print_line(args.silent)
Console.print_text(args.silent,
Console.text_color.italic + "Trying to load the MeshCentral account credential file...")
Console.print_text(args.silent,
Console.text_color.italic + "Trying to load the meshbook yaml file and compile it into something workable...")
Console.print_text(args.silent,
Console.text_color.italic + "Trying to load the Operating System categorisation JSON file...")
Console.print_text(args.silent,
Console.text_color.italic + "Connecting to MeshCentral and establish a session using variables from previous credential file.")
Console.print_text(args.silent,
Console.text_color.italic + "Generating group list with nodes and reference the targets from that.")
'''
End of the main information displaying section.
'''
group_list = await compile_group_list(session)
targets_list = await gather_targets(meshbook, group_list, os_categories)
group_list = await Transform.compile_group_list(session)
compiled_device_list = await Utilities.gather_targets(args.silent, meshbook, group_list, os_categories)
if len(targets_list) == 0:
console(text_color.red + "No targets found or targets unreachable, quitting.", True)
console(text_color.reset + ("-" * 40), True)
# Check if we have reachable targets on the MeshCentral host
if "target_list" not in compiled_device_list or len(compiled_device_list["target_list"]) == 0:
Console.print_text(args.silent,
Console.text_color.red + "No targets found or targets unreachable, quitting.")
Console.print_line(args.silent)
return
Console.print_line(args.silent)
match meshbook:
case {"group": candidate_target_name}:
target_name = candidate_target_name
case {"groups": candidate_target_name}:
target_name = str(candidate_target_name)
case {"device": candidate_target_name}:
target_name = candidate_target_name
case {"devices": candidate_target_name}:
target_name = str(candidate_target_name)
case _:
target_name = ""
# Initialize the history / logging functions class (whatever you want to name it)
history = History(args.silent, args.historydir, args.flushhistory)
# Conclude history initlialization
Console.print_line(args.silent)
# From here on the actual exection happens
Console.print_text(args.silent,
Console.text_color.yellow + "Executing meshbook on the target(s): " + Console.text_color.green + target_name + Console.text_color.yellow + ".")
if not args.nograce:
Console.print_text(args.silent,
Console.text_color.yellow + "Initiating grace-period...")
for x in range(grace_period):
Console.print_text(args.silent,
Console.text_color.yellow + "{}...".format(x+1)) # Countdown!
await asyncio.sleep(1)
Console.print_line(args.silent)
complete_log = await Executor.execute_meshbook(args.silent,
args.shlex,
session,
compiled_device_list,
meshbook,
group_list)
Console.print_line(args.silent)
indent = None
if args.indent: indent = 4
formatted_history = json.dumps(complete_log,indent=indent)
Console.print_text(args.silent, formatted_history, 9)
# Pass the output of the whole program to the history class
if args.nohistory:
Console.print_text(args.silent, "Not writing to file.")
else:
console(text_color.reset + ("-" * 40))
match meshbook:
case {"group": candidate_target_name}:
target_name = candidate_target_name
case {"groups": candidate_target_name}:
target_name = str(candidate_target_name)
case {"device": candidate_target_name}:
target_name = candidate_target_name
case {"devices": candidate_target_name}:
target_name = str(candidate_target_name)
console(text_color.yellow + "Executing meshbook on the target(s): " + text_color.green + target_name + ".")
if not args.nograce:
console(text_color.yellow + "Initiating grace-period...")
for x in range(grace_period):
console(text_color.yellow + "{}...".format(x+1)) # Countdown!
await asyncio.sleep(1)
console(text_color.reset + ("-" * 40))
print(json.dumps(targets_list,indent=4))
#await execute_meshbook(session, targets_list, meshbook, group_list)
await session.close()
Console.print_text(args.silent, "Writing to file...")
history.write_history(formatted_history)
except OSError as message:
console(text_color.red + message, True)
Console.print_text(
args.silent,
Console.text_color.red + f'{message}'
)
except asyncio.CancelledError:
Console.print_text(
args.silent,
Console.text_color.red + "Received SIGINT, Aborting - (Tasks may still be running on targets)."
)
raise
finally:
await session.close()
if __name__ == "__main__":
asyncio.run(main())
try:
asyncio.run(main())
except KeyboardInterrupt:
Console.print_text(False, Console.text_color.red + "Cancelled execution.")

54
modules/console.py Normal file
View File

@@ -0,0 +1,54 @@
# Public Python libraries
import argparse
from datetime import datetime
class Console:
class text_color:
black = "\033[30m"
red = "\033[31m"
green = "\033[32m"
yellow = "\033[33m"
blue = "\033[34m"
magenta = "\033[35m"
cyan = "\033[36m"
white = "\033[37m"
italic = "\x1B[3m"
reset = "\x1B[0m"
@staticmethod
def print_text(silent: bool, message: str, prefix_select: int = 0) -> None:
'''
Helper function for terminal output, with a couple variables for the silent flag. Also clears terminal color each time.
int tag_select legend:
0 / default = timestamp
1 = info
2 = warn
3 = err
4 = fatal
9 = nothing
'''
match prefix_select:
case 1:
tag_prefix = "[INFO] "
case 2:
tag_prefix = "[WARN] "
case 3:
tag_prefix = "[ERROR] "
case 4:
tag_prefix = "[FATAL] "
case 9:
tag_prefix = ""
case _:
tag_prefix = f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} "
if not silent:
print(tag_prefix + message + Console.text_color.reset)
@staticmethod
def print_line(silent: bool, special: bool = False) -> None:
if not silent:
if special:
print("-=-" * 40)
else:
print(("-" * 40))

55
modules/executor.py Normal file
View File

@@ -0,0 +1,55 @@
# Public Python libraries
import argparse
import json
import meshctrl
from time import sleep
# Local Python libraries/modules
from modules.console import Console
from modules.utilities import Transform
intertask_delay = 1
class Executor:
@staticmethod
async def execute_meshbook(silent: bool, enable_shlex: bool, session: meshctrl.Session, compiled_device_list: dict, meshbook: dict, group_list: dict) -> dict:
'''
Actual function that handles meshbook execution, also responsible for formatting the resulting JSON.
'''
complete_log = {}
targets = compiled_device_list["target_list"]
offline = compiled_device_list["offline_list"]
round = 1
for task in meshbook["tasks"]:
Console.print_text(silent,
Console.text_color.green + str(round) + ". Running: " + task["name"])
if "powershell" in meshbook and meshbook["powershell"]:
response = await session.run_command(nodeids=targets, command=task["command"],powershell=True,ignore_output=False,timeout=1800)
else:
response = await session.run_command(nodeids=targets, command=task["command"],powershell=False,ignore_output=False,timeout=1800)
task_batch = []
for device in response:
device_result = response[device]["result"]
response[device]["result"] = device_result.replace("Run commands completed.", "")
response[device]["device_id"] = device
response[device]["device_name"] = await Transform.translate_nodeid_to_name(device, group_list)
task_batch.append(response[device])
complete_log["task_" + str(round)] = {
"task_name": task["name"],
"data": task_batch
}
round += 1
sleep(intertask_delay) # Sleep for x amount of time.
for index, device in enumerate(offline): # Replace Device_id with actual human readable name
device_name = await Transform.translate_nodeid_to_name(device, group_list)
offline[index] = device_name
complete_log["Offline"] = offline
# Return the result
return Transform.process_shell_response(enable_shlex, complete_log)

48
modules/history.py Normal file
View File

@@ -0,0 +1,48 @@
import os
from datetime import datetime
from modules.console import Console
class History():
def __init__(self, silent: bool, history_directory: str, flush_history: bool) -> None:
'''
Init function to declare some stuff and make sure we are good to go, mostly the directory.
'''
self.silent = silent
self.history_directory = history_directory
if not os.path.exists(history_directory):
Console.print_text(silent, "Directory absent, trying to create it now...")
try:
os.mkdir(history_directory)
except PermissionError:
Console.print_text(silent, Console.text_color.red + f"Failed to create directory, permission error.")
return
history_items = os.listdir(history_directory)
if len(history_items) == 1:
Console.print_text(silent, f"There is {len(history_items)} history item.")
else:
Console.print_text(silent, f"There are {len(history_items)} history items.")
if flush_history:
self.remove_history(history_items)
def remove_history(self, history_items: list[str]) -> None:
if not os.access(self.history_directory, os.W_OK):
Console.print_text(self.silent, Console.text_color.red + "Unable to flush history logs, no write access.")
return
for item in history_items:
stitched_path = f"{self.history_directory}/{item}"
Console.print_text(self.silent, f"Removing: {item}.")
os.remove(stitched_path)
def write_history(self, history: dict) -> bool:
stitched_file = f"{self.history_directory}/meshbook_run_{datetime.now().strftime('%Y_%m_%d_%H_%M_%S')}.log"
with open(stitched_file, "x") as f:
f.write(history + "\n")

373
modules/utilities.py Normal file
View File

@@ -0,0 +1,373 @@
# Public Python libraries
import argparse
from configparser import ConfigParser
import meshctrl
import os
import shlex
import yaml
from modules.console import Console
'''
Creation and compilation of the MeshCentral nodes list (list of all nodes available to the user in the configuration) is handled in the following section.
'''
class Utilities:
@staticmethod
async def load_config(args: argparse.Namespace,
segment: str = 'meshcentral-account') -> dict:
'''
Function that loads the segment from the config.conf (by default) file and returns the it in a dict.
'''
conf_file = args.conf
if not os.path.exists(conf_file):
print(f'Missing config file {conf_file}. Provide an alternative path.')
os._exit(1)
config = ConfigParser()
try:
config.read(conf_file)
except Exception as err:
print(f"Error reading configuration file '{conf_file}': {err}")
os._exit(1)
if segment not in config:
print(f'Segment "{segment}" not found in config file {conf_file}.')
os._exit(1)
return dict(config[segment])
@staticmethod
async def compile_book(meshbook_file: str) -> dict:
'''
Simple function that opens the file and replaces placeholders through the next function. After that just return it.
'''
with open(meshbook_file, 'r') as f:
meshbook = f.read()
meshbook = await Transform.replace_placeholders(yaml.safe_load(meshbook))
return meshbook
@staticmethod
async def gather_targets(silent: bool,
meshbook: dict,
group_list: dict[str, list[dict]],
os_categories: dict) -> dict:
"""
Finds target devices based on meshbook criteria (device, devices, group or groups).
"""
group_list = {k.lower(): v for k, v in group_list.items()} # Normalize keys
target_list = []
offline_list = []
target_os = meshbook.get("target_os")
target_tag = meshbook.get("target_tag")
ignore_categorisation = meshbook.get("ignore_categorisation", False)
async def add_processed_devices(processed):
"""Helper to update target and offline lists."""
if processed:
target_list.extend(processed.get("valid_devices", []))
offline_list.extend(processed.get("offline_devices", []))
async def process_device_helper(device):
processed = await Utilities.process_device(
device,
group_list,
os_categories,
target_os,
ignore_categorisation,
target_tag
)
await add_processed_devices(processed)
async def process_group_helper(group):
processed = await Utilities.filter_targets(
group, os_categories, target_os, ignore_categorisation, target_tag
)
await add_processed_devices(processed)
'''
Groups receive the first priority, then device targets.
'''
match meshbook:
case {"group": pseudo_target}:
if isinstance(pseudo_target, str):
pseudo_target = pseudo_target.lower()
if pseudo_target in group_list:
await process_group_helper(group_list[pseudo_target])
elif pseudo_target not in group_list:
Console.print_text(
silent,
Console.text_color.yellow + "Targeted group not found on the MeshCentral server."
)
elif isinstance(pseudo_target, list):
Console.print_text(
silent,
Console.text_color.yellow + "Please use groups (Notice the plural with 'S') for multiple groups."
)
else:
Console.print_text(
silent,
Console.text_color.yellow + "The 'group' key is being used, but an unknown data type was found, please check your values."
)
case {"groups": pseudo_target}:
if isinstance(pseudo_target, str) or (isinstance(pseudo_target, list) and len(pseudo_target) == 1):
Console.print_text(
silent,
Console.text_color.yellow + "The 'groups' key is being used, but only one group seems to be given. Did you mean 'group'?"
)
elif isinstance(pseudo_target, list):
for sub_group in pseudo_target:
sub_group = sub_group.lower()
if sub_group in group_list:
await process_group_helper(group_list[sub_group])
elif isinstance(pseudo_target, str) and pseudo_target.lower() == "all":
for group in group_list.values():
await process_group_helper(group)
else:
Console.print_text(
silent,
Console.text_color.yellow + "The 'groups' key is being used, but an unknown data type was found, please check your values."
)
case {"device": pseudo_target}:
if isinstance(pseudo_target, str):
await process_device_helper(pseudo_target)
elif isinstance(pseudo_target, list):
Console.print_text(
silent,
Console.text_color.yellow + "Please use devices (Notice the plural with 'S') for multiple devices."
)
else:
Console.print_text(
silent,
Console.text_color.yellow + "The 'device' key is being used, but an unknown data type was found, please check your values."
)
case {"devices": pseudo_target}:
if isinstance(pseudo_target, str) or (isinstance(pseudo_target, list) and len(pseudo_target) == 1):
Console.print_text(
silent,
Console.text_color.yellow + "The 'devices' key is being used, but only one device seems to be given. Did you mean 'device'?"
)
elif isinstance(pseudo_target, list):
for sub_device in pseudo_target:
await process_device_helper(sub_device)
else:
Console.print_text(
silent,
Console.text_color.yellow + "The 'devices' key is being used, but an unknown data type was found, please check your values."
)
return {"target_list": target_list, "offline_list": offline_list}
@staticmethod
def get_os_variants(target_category: str,
os_map: dict) -> set:
'''
Extracts all OS names under a given category if it exists.
'''
for key, value in os_map.items():
if key == target_category:
if isinstance(value, dict): # Expand nested categories
os_set = set()
for sub_target_cat in value:
os_set.update(Utilities.get_os_variants(sub_target_cat, value))
return os_set
elif isinstance(value, list): # Direct OS list
return set(value)
return set()
@staticmethod
async def filter_targets(devices: list[dict],
os_categories: dict,
target_os: str = "",
ignore_categorisation: bool = False,
target_tag: str = "") -> dict:
'''
Filters devices based on reachability and optional OS criteria, supporting nested OS categories.
'''
valid_devices = []
offline_devices = []
allowed_os = set()
# Identify correct OS filtering scope
for key in os_categories:
if key == target_os:
allowed_os = Utilities.get_os_variants(target_os, os_categories)
break # Stop searching once a match is found
if isinstance(os_categories[key], dict) and target_os in os_categories[key]:
allowed_os = Utilities.get_os_variants(target_os, os_categories[key])
break # Stop searching once a match is found
for device in devices: # Filter out unwanted or unreachable devices.
if target_tag and target_tag not in device["device_tags"]:
continue
if not ignore_categorisation:
if device["device_os"] not in allowed_os:
continue
else:
if target_os not in device["device_os"]:
continue
if not device["reachable"]:
offline_devices.append(device["device_id"])
continue
valid_devices.append(device["device_id"])
return {
"valid_devices": valid_devices,
"offline_devices": offline_devices
}
@staticmethod
async def process_device(device: str,
group_list: dict,
os_categories: dict,
target_os: str,
ignore_categorisation: bool,
target_tag: str) -> dict:
"""
Processes a single device or pseudo-target against group_list,
filters matches by OS and tags, and adds processed devices.
"""
matched_devices = []
pseudo_target = device.lower()
# Find devices that match the pseudo_target
for group in group_list:
for dev in group_list[group]:
if dev["device_name"].lower() == pseudo_target:
matched_devices.append(dev)
# If matches found, filter them and add processed devices
if matched_devices:
processed = await Utilities.filter_targets(
matched_devices, os_categories, target_os, ignore_categorisation, target_tag
)
return processed
# No matches found
return {"valid_devices": [], "offline_devices": []}
@staticmethod
def path_exist(path: str) -> bool:
return os.path.exists(path)
@staticmethod
def path_type(path: str) -> str:
if os.path.isfile(path):
return "File"
if os.path.isdir(path):
return "Dir"
if os.path.islink(path):
return "Link"
return "Undefined"
class Transform:
@staticmethod
def process_shell_response(enable_shlex: bool, meshbook_result: dict) -> dict:
for task_name, task_data in meshbook_result.items():
if task_name == "Offline": # Failsafe do not parse Offline section, its simple
continue
for node_responses in task_data["data"]:
task_result = node_responses["result"].splitlines()
if enable_shlex:
for index, line in enumerate(task_result):
line = shlex.split(line)
task_result[index] = line
clean_output = []
for line in task_result:
if len(line) > 0:
clean_output.append(line)
node_responses["result"] = clean_output
return meshbook_result
@staticmethod
async def translate_nodeid_to_name(target_id: str, group_list: dict) -> str:
'''
Simple function that looks up nodeid to the human-readable name if existent - otherwise return None.
'''
for group in group_list:
for device in group_list[group]:
if device["device_id"] == target_id:
return device["device_name"]
return ""
@staticmethod
async def replace_placeholders(meshbook: dict) -> dict:
'''
Replace the placeholders in both name and command fields of the tasks. According to the variables defined in the variables list.
'''
variables = {}
if "variables" in meshbook and isinstance(meshbook["variables"], list):
for var in meshbook["variables"]:
var_name = var["name"]
var_value = var["value"]
variables[var_name] = var_value
else:
return meshbook
for task in meshbook.get("tasks", []):
task_name = task.get("name")
for var_name, var_value in variables.items():
placeholder = f"{{{{ {var_name} }}}}"
task_name = task_name.replace(placeholder, var_value)
task["name"] = task_name
command = task.get("command")
for var_name, var_value in variables.items():
placeholder = f"{{{{ {var_name} }}}}" # Create the placeholder string like "{{ host1 }}"
command = command.replace(placeholder, var_value)
task["command"] = command
return meshbook
@staticmethod
async def compile_group_list(session: meshctrl.Session) -> dict:
'''
Function that retrieves the devices from MeshCentral and compiles it into a efficient list.
'''
devices_response = await session.list_devices(details=False, timeout=10)
local_device_list = {}
for device in devices_response:
if device.meshname not in local_device_list:
local_device_list[device.meshname] = []
local_device_list[device.meshname].append({
"device_id": device.nodeid,
"device_name": device.name,
"device_os": device.os_description,
"device_tags": device.tags,
"reachable": device.connected
})
return local_device_list

View File

@@ -2,11 +2,13 @@
{
"Linux": {
"Debian": [
"Debian GNU/Linux 13 (trixie)",
"Debian GNU/Linux 12 (bookworm)",
"Debian GNU/Linux 11 (bullseye)"
"Debian GNU/Linux 11 (bullseye)",
"Debian GNU/Linux 10 (buster)"
],
"Ubuntu": [
"Ubuntu 24.04.1 LTS",
"Ubuntu 24.04.3 LTS",
"Ubuntu 22.04.5 LTS",
"Ubuntu 20.04.6 LTS"
]

View File

@@ -1,3 +1,4 @@
colorama==0.4.6
pyyaml==6.0.2
libmeshctrl==1.1.2
pyyaml==6.0.3
libmeshctrl==1.3.3
pyotp==2.9.0

View File

@@ -1,4 +1,5 @@
[meshcentral-account]
hostname =
username =
password =
password =
totp_secret =