28 Commits

Author SHA1 Message Date
e710dadfcd chore: bump libmeshctrl 2026-02-19 09:01:01 +01:00
Daan Selen
00e63e1a96 chore: remove old notice 2026-02-06 17:42:02 +01:00
Daan Selen
10057b5f91 correct text 2026-02-06 17:41:31 +01:00
Daan Selen
2cfd7ad06e chore: add caution notice 2026-02-06 17:40:35 +01:00
Daan Selen
6374ea50b7 docs: update README 2026-02-06 17:38:50 +01:00
f58e06ddec chore: edit readme 2026-01-29 16:48:44 +01:00
30026a18bd fix: KeyError due to previous commit 2026-01-15 13:40:21 +01:00
ea77ea1904 chore: add del to groups and devices 2026-01-15 13:38:31 +01:00
cc454dff40 chore: add more ends of sentences 2026-01-06 08:48:33 +01:00
cd10645056 chore: catch wrong keywords better 2026-01-02 15:54:04 +01:00
a652ea99d3 chore: fix apparent missing reference 2026-01-02 15:01:15 +01:00
e2cc746517 chore: add newline after content of meshbook in file 2026-01-02 09:40:37 +01:00
DaanSelen
9716a2376c release version 1.4 (#21)
* chore: init working on history functions
rename classes

* refac: remove raw-result option. I cannot find a use ( I have found it later on )

* chore: add basic history directory checking

* chore: further expansion on the code. Now its christmas

* chore: rework some code and make a logging/history feature workable

---------

Co-authored-by: DaanSelen <dselen@systemec.nl>
2025-12-29 10:55:57 +01:00
3a0fc215d7 fix: increase default timeout 2025-11-17 09:01:16 +01:00
dd1c97c56c chore: add debian version 2025-11-17 08:57:58 +01:00
9d49032857 fix: when not specifying any, grab all 2025-11-05 15:53:38 +01:00
dependabot[bot]
a7601a302a Bump libmeshctrl from 1.2.2 to 1.3.2 (#19)
Bumps [libmeshctrl](https://github.com/HuFlungDu/pylibmeshctrl) from 1.2.2 to 1.3.2.
- [Release notes](https://github.com/HuFlungDu/pylibmeshctrl/releases)
- [Changelog](https://github.com/HuFlungDu/pylibmeshctrl/blob/main/CHANGELOG.rst)
- [Commits](https://github.com/HuFlungDu/pylibmeshctrl/compare/1.2.2...1.3.2)

---
updated-dependencies:
- dependency-name: libmeshctrl
  dependency-version: 1.3.2
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-10-23 13:45:28 +02:00
4a92eb986c fix: increase consistency due to regression 2025-10-15 15:27:07 +02:00
dependabot[bot]
d248f0bcbe Bump libmeshctrl from 1.2.2 to 1.3.1 (#16)
Bumps [libmeshctrl](https://github.com/HuFlungDu/pylibmeshctrl) from 1.2.2 to 1.3.1.
- [Release notes](https://github.com/HuFlungDu/pylibmeshctrl/releases)
- [Changelog](https://github.com/HuFlungDu/pylibmeshctrl/blob/main/CHANGELOG.rst)
- [Commits](https://github.com/HuFlungDu/pylibmeshctrl/compare/1.2.2...1.3.1)

---
updated-dependencies:
- dependency-name: libmeshctrl
  dependency-version: 1.3.1
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-09-30 15:56:17 +02:00
dependabot[bot]
2fae74d600 Bump pyyaml from 6.0.2 to 6.0.3 (#17)
Bumps [pyyaml](https://github.com/yaml/pyyaml) from 6.0.2 to 6.0.3.
- [Release notes](https://github.com/yaml/pyyaml/releases)
- [Changelog](https://github.com/yaml/pyyaml/blob/6.0.3/CHANGES)
- [Commits](https://github.com/yaml/pyyaml/compare/6.0.2...6.0.3)

---
updated-dependencies:
- dependency-name: pyyaml
  dependency-version: 6.0.3
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-09-30 15:53:46 +02:00
465905a6ec feat: enable dependabot 2025-09-30 15:51:26 +02:00
80ad7f865a refac: remove seemingly unneeded function 2025-09-25 15:40:58 +02:00
9d2999476d refac: added some type checking and fixed a duplicate bug (apparently) 2025-09-25 09:49:19 +02:00
DaanSelen
82cc31e0f6 Update executor.py 2025-09-16 10:37:13 +02:00
f34d1dc7ae refac(meshbook): rework the targeting module to now be case insensitive 2025-09-10 16:26:52 +02:00
DaanSelen
f0e9e40cca chore(docs): README correction 2025-09-10 13:28:42 +02:00
208e9c1223 chore(os-handling): added Debian 13 to the default Debian list 2025-08-26 13:51:18 +02:00
DaanSelen
a736a74af6 Added more robust code checking (#15)
Co-authored-by: Daan Selen <dselen@systemec.nl>
2025-08-07 13:56:38 +02:00
10 changed files with 525 additions and 353 deletions

11
.github/dependabot.yml vendored Normal file
View File

@@ -0,0 +1,11 @@
# To get started with Dependabot version updates, you'll need to specify which
# package ecosystems to update and where the package manifests are located.
# Please see the documentation for all configuration options:
# https://docs.github.com/code-security/dependabot/dependabot-version-updates/configuration-options-for-the-dependabot.yml-file
version: 2
updates:
- package-ecosystem: "pip"
directory: "/"
schedule:
interval: "weekly"

1
.gitignore vendored
View File

@@ -2,6 +2,7 @@
venv venv
books books
.vscode .vscode
important/
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/

View File

@@ -2,11 +2,9 @@
[![CodeQL Advanced](https://github.com/DaanSelen/meshbook/actions/workflows/codeql.yaml/badge.svg)](https://github.com/DaanSelen/meshbook/actions/workflows/codeql.yaml) [![CodeQL Advanced](https://github.com/DaanSelen/meshbook/actions/workflows/codeql.yaml/badge.svg)](https://github.com/DaanSelen/meshbook/actions/workflows/codeql.yaml)
> \[!NOTE] > [!NOTE]
> 💬 If you experience issues or have suggestions, [submit an issue](https://github.com/DaanSelen/meshbook/issues) — I'll respond ASAP! > 💬 If you experience issues or have suggestions, [submit an issue](https://github.com/DaanSelen/meshbook/issues) — I'll respond ASAP!
---
Meshbook is a tool to **programmatically manage MeshCentral-managed machines**, inspired by tools like [Ansible](https://github.com/ansible/ansible). Meshbook is a tool to **programmatically manage MeshCentral-managed machines**, inspired by tools like [Ansible](https://github.com/ansible/ansible).
## What problem does it solve? ## What problem does it solve?
@@ -17,14 +15,11 @@ Meshbook is designed to:
* Allow configuration using simple and readable **YAML files** (like Ansible playbooks). * Allow configuration using simple and readable **YAML files** (like Ansible playbooks).
* Simplify the use of **group-based** or **tag-based** device targeting. * Simplify the use of **group-based** or **tag-based** device targeting.
---
## 🏁 Quick Start ## 🏁 Quick Start
### ✅ Prerequisites ### ✅ Prerequisites
* Python 3.7+ * Python 3
* Git
* Access to a MeshCentral instance and credentials with: * Access to a MeshCentral instance and credentials with:
* `Remote Commands` * `Remote Commands`
@@ -33,8 +28,6 @@ Meshbook is designed to:
A service account with access to the relevant device groups is recommended. A service account with access to the relevant device groups is recommended.
---
### 🔧 Installation ### 🔧 Installation
#### Linux #### Linux
@@ -45,7 +38,13 @@ cd ./meshbook
python3 -m venv ./venv python3 -m venv ./venv
source ./venv/bin/activate source ./venv/bin/activate
pip install -r requirements.txt pip install -r requirements.txt
cp ./templates/meshcentral.conf.template ./meshcentral.conf cp ./templates/api.conf.template ./api.conf
```
Next, make sure to fill in the following file:
```
nano ./api.conf
``` ```
#### Windows (PowerShell) #### Windows (PowerShell)
@@ -56,13 +55,14 @@ cd .\meshbook
python -m venv .\venv python -m venv .\venv
.\venv\Scripts\activate .\venv\Scripts\activate
pip install -r .\requirements.txt pip install -r .\requirements.txt
cp .\templates\meshcentral.conf.template .\meshcentral.conf cp .\templates\api.conf.template .\api.conf
``` ```
> 📌 Rename `meshcentral.conf.template` to `meshcentral.conf` and fill in your actual connection details. Also here, make sure to fill in the `./api.conf` file.
> The URL must start with `wss://<MeshCentral-Host>`.
---
> [!CAUTION]
> Meshbook will not work without a properly filled in `api.conf` file.
## 🚀 Running Meshbook ## 🚀 Running Meshbook
@@ -71,13 +71,13 @@ Once installed and configured, run a playbook like this:
### Linux: ### Linux:
```bash ```bash
python3 meshbook.py -pb ./examples/echo_example.yaml python3 meshbook.py -mb ./examples/echo_example.yaml
``` ```
### Windows: ### Windows:
```powershell ```powershell
.\venv\Scripts\python.exe .\meshbook.py -pb .\examples\echo_example.yaml .\venv\Scripts\python.exe .\meshbook.py -mb .\examples\echo_example.yaml
``` ```
Use `--help` to explore available command-line options: Use `--help` to explore available command-line options:
@@ -86,16 +86,14 @@ Use `--help` to explore available command-line options:
python3 meshbook.py --help python3 meshbook.py --help
``` ```
---
## 🛠️ Creating Configurations ## 🛠️ Creating Configurations
Meshbook configurations are written in YAML. Below is an overview of supported fields. Meshbook configurations are written in YAML. Below is an overview of supported fields.
### ▶️ Group Targeting ### ▶️ Group Targeting (Primary*)
```yaml ```yaml
---
name: My Configuration name: My Configuration
group: "Dev Machines" group: "Dev Machines"
powershell: true powershell: true
@@ -110,7 +108,7 @@ tasks:
* `group`: MeshCentral group (aka "mesh"). Quotation marks required for multi-word names. * `group`: MeshCentral group (aka "mesh"). Quotation marks required for multi-word names.
* `powershell`: Set `true` for PowerShell commands on Windows clients. * `powershell`: Set `true` for PowerShell commands on Windows clients.
### ▶️ Device Targeting ### ▶️ Device Targeting (Secondary*)
You can also target a **specific device** rather than a group. See [`apt_update_example.yaml`](./examples/linux/apt_update_example.yaml) for reference. You can also target a **specific device** rather than a group. See [`apt_update_example.yaml`](./examples/linux/apt_update_example.yaml) for reference.
@@ -128,6 +126,8 @@ tasks:
command: 'echo "{{ example_var }}"' command: 'echo "{{ example_var }}"'
``` ```
* Primary and Secondary mark the order in which will take prescendence
### ▶️ Tasks ### ▶️ Tasks
Define multiple tasks: Define multiple tasks:
@@ -143,7 +143,7 @@ Each task must include:
* `name`: Description for human readability. * `name`: Description for human readability.
* `command`: The actual shell or PowerShell command. * `command`: The actual shell or PowerShell command.
---
## 🪟 Windows Client Notes ## 🪟 Windows Client Notes
@@ -151,7 +151,7 @@ Each task must include:
* Ensure Windows commands are compatible (use `powershell: true` if needed). * Ensure Windows commands are compatible (use `powershell: true` if needed).
* Examples are available in [`examples/windows`](./examples/windows). * Examples are available in [`examples/windows`](./examples/windows).
---
## 🔎 OS & Tag Filtering ## 🔎 OS & Tag Filtering
@@ -175,12 +175,10 @@ target_tag: "Production"
> ⚠️ Tag values are **case-sensitive**. > ⚠️ Tag values are **case-sensitive**.
---
## 📋 Example Playbook ## 📋 Example Playbook
```yaml ```yaml
---
name: Echo OS Info name: Echo OS Info
group: "Dev" group: "Dev"
target_os: "Linux" target_os: "Linux"
@@ -196,7 +194,7 @@ Sample output:
```json ```json
{ {
"Task 1": { "task 1": {
"task_name": "Show contents of os-release", "task_name": "Show contents of os-release",
"data": [ "data": [
{ {
@@ -213,9 +211,7 @@ Sample output:
} }
``` ```
--- ## ⚠ Blocking Commands Warning
## ⚠️ Blocking Commands Warning
Avoid using commands that **block indefinitely** — MeshCentral requires **non-blocking** execution. Avoid using commands that **block indefinitely** — MeshCentral requires **non-blocking** execution.
@@ -223,7 +219,7 @@ Avoid using commands that **block indefinitely** — MeshCentral requires **non-
```bash ```bash
apt upgrade # Without -y apt upgrade # Without -y
sleep infinity sleep infinity # Will never return
ping 1.1.1.1 # Without -c ping 1.1.1.1 # Without -c
``` ```
@@ -231,10 +227,11 @@ ping 1.1.1.1 # Without -c
```bash ```bash
apt upgrade -y apt upgrade -y
sleep 3s
ping 1.1.1.1 -c 1 ping 1.1.1.1 -c 1
``` ```
---
## 🧪 Check Python Environment ## 🧪 Check Python Environment
@@ -247,8 +244,6 @@ pip3 list
The lists should match. If not, make sure the correct environment is activated. The lists should match. If not, make sure the correct environment is activated.
---
## 📂 Project Structure (excerpt) ## 📂 Project Structure (excerpt)
```bash ```bash
@@ -270,11 +265,9 @@ meshbook/
├── os_categories.json ├── os_categories.json
├── requirements.txt ├── requirements.txt
├── templates/ ├── templates/
│ └── config.conf.template │ └── api.conf.template
``` ```
---
## 📄 License ## 📄 License
This project is licensed under the terms of the MIT License. See [LICENSE](./LICENSE). This project is licensed under the terms of the GPL3 License. See [LICENSE](./LICENSE).

View File

@@ -9,13 +9,37 @@ import json
import meshctrl import meshctrl
# Local Python libraries/modules # Local Python libraries/modules
from modules.console import * from modules.console import Console
from modules.executor import * from modules.executor import Executor
from modules.utilities import * from modules.history import History
from modules.utilities import Transform, Utilities
meshbook_version = "1.3.1" meshbook_version = "1.3.2"
grace_period = 3 # Grace period will last for x (by default 3) second(s). grace_period = 3 # Grace period will last for x (by default 3) second(s).
def define_cmdargs() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Process command-line arguments")
parser.add_argument("-mb", "--meshbook", type=str, help="Path to the meshbook yaml file.")
parser.add_argument("--historydir", type=str, help="Define a custom history log directory (default: ./history).", default="./history")
parser.add_argument("--nohistory", action="store_true", help="Disable the logging of the history into a local log (text) file inside './history'.")
parser.add_argument("--flushhistory", action="store_true", help="Clear old history logs before running the Meshbook.")
parser.add_argument("-oc", "--oscategories", type=str, help="Path to the Operating System categories JSON file.", default="./os_categories.json")
parser.add_argument("--conf", type=str, help="Path for the API configuration file (default: ./config.conf).", default="./api.conf")
parser.add_argument("--nograce", action="store_true", help="Disable the grace 3 seconds before running the meshbook.")
parser.add_argument("-g", "--group", type=str, help="Specify a manual override for the group.", default="")
parser.add_argument("-d", "--device", type=str, help="Specify a manual override for a device.", default="")
parser.add_argument("-i", "--indent", action="store_true", help="Use an JSON indentation of 4 when this flag is passed.", default=False)
parser.add_argument("-s", "--silent", action="store_true", help="Suppress terminal output.", default=False)
parser.add_argument("--shlex", action="store_true", help="Shlex the lines. (SHell LEXical Analysis)", default=False)
parser.add_argument("--version", action="store_true", help="Show the Meshbook version.")
return parser
async def init_connection(credentials: dict) -> meshctrl.Session: async def init_connection(credentials: dict) -> meshctrl.Session:
''' '''
Use the libmeshctrl library to initiate a Secure Websocket (wss) connection to the MeshCentral instance. Use the libmeshctrl library to initiate a Secure Websocket (wss) connection to the MeshCentral instance.
@@ -40,148 +64,21 @@ async def init_connection(credentials: dict) -> meshctrl.Session:
await session.initialized.wait() await session.initialized.wait()
return session return session
async def gather_targets(args: argparse.Namespace,
meshbook: dict,
group_list: dict[str, list[dict]],
os_categories: dict) -> list[str]:
'''
Finds target devices based on meshbook criteria (device or group).
'''
target_list = []
offline_list = []
target_os = meshbook.get("target_os")
ignore_categorisation = meshbook.get("ignore_categorisation", False)
target_tag = meshbook.get("target_tag")
match meshbook:
case {"device": pseudo_target}: # Single device target
if isinstance(pseudo_target, str):
processed_devices = await utilities.process_device_or_group(pseudo_target,
group_list,
os_categories,
target_os,
ignore_categorisation,
target_tag)
if len(processed_devices) > 0:
matched_devices = processed_devices["valid_devices"]
target_list.extend(matched_devices)
if len(processed_devices) > 0:
offline_devices = processed_devices["offline_devices"]
offline_list.extend(offline_devices)
else:
console.nice_print(args,
console.text_color.yellow + "Please use devices (Notice the 'S') for multiple devices.", True)
case {"devices": pseudo_target}: # List of devices
if isinstance(pseudo_target, list):
for sub_pseudo_device in pseudo_target:
processed_devices = await utilities.process_device_or_group(sub_pseudo_device,
group_list,
os_categories,
target_os,
ignore_categorisation,
target_tag,)
if len(processed_devices) > 0:
matched_devices = processed_devices["valid_devices"]
target_list.extend(matched_devices)
if len(processed_devices) > 0:
offline_devices = processed_devices["offline_devices"]
offline_list.extend(offline_devices)
else:
console.nice_print(args, console.text_color.yellow + "The 'devices' method is being used, but only one string is given. Did you mean 'device'?", True)
case {"group": pseudo_target}: # Single group target
if isinstance(pseudo_target, str) and pseudo_target in group_list:
processed_devices = await utilities.filter_targets(group_list[pseudo_target],
os_categories,
target_os,
ignore_categorisation,
target_tag)
if len(processed_devices) > 0:
matched_devices = processed_devices["valid_devices"]
target_list.extend(matched_devices)
if len(processed_devices) > 0:
offline_devices = processed_devices["offline_devices"]
offline_list.extend(offline_devices)
elif pseudo_target not in group_list:
console.nice_print(args,
console.text_color.yellow + "Targeted group not found on the MeshCentral server.", True)
else:
console.nice_print(args,
console.text_color.yellow + "Please use groups (Notice the 'S') for multiple groups.", True)
case {"groups": pseudo_target}: # List of groups
if isinstance(pseudo_target, list):
for sub_pseudo_target in pseudo_target:
if sub_pseudo_target in group_list:
processed_devices = await utilities.filter_targets(group_list[sub_pseudo_target],
os_categories,
target_os,
ignore_categorisation,
target_tag)
if len(processed_devices) > 0:
matched_devices = processed_devices["valid_devices"]
target_list.extend(matched_devices)
if len(processed_devices) > 0:
offline_devices = processed_devices["offline_devices"]
offline_list.extend(offline_devices)
elif pseudo_target.lower() == "all":
for group in group_list:
processed_devices = await utilities.filter_targets(group_list[group],
os_categories,
target_os,
ignore_categorisation,
target_tag)
if len(processed_devices) > 0:
matched_devices = processed_devices["valid_devices"]
target_list.extend(matched_devices)
if len(processed_devices) > 0:
offline_devices = processed_devices["offline_devices"]
offline_list.extend(offline_devices)
else:
console.nice_print(args,
console.text_color.yellow + "The 'groups' method is being used, but only one string is given. Did you mean 'group'?", True)
return {
"target_list": target_list,
"offline_list": offline_list
}
async def main(): async def main():
local_categories_file = "./os_categories.json"
just_fix_windows_console() just_fix_windows_console()
''' '''
Main function where the program starts. Place from which all comands originate (eventually). Main function where the program starts. Place from which all comands originate (eventually).
''' '''
parser = argparse.ArgumentParser(description="Process command-line arguments") # Define the cmd arguments
parser.add_argument("-mb", "--meshbook", type=str, help="Path to the meshbook yaml file.") parser = define_cmdargs()
parser.add_argument("-oc", "--oscategories", type=str, help="Path to the Operating System categories JSON file.", default="./os_categories.json")
parser.add_argument("--conf", type=str, help="Path for the API configuration file (default: ./config.conf).", default="./api.conf")
parser.add_argument("--nograce", action="store_true", help="Disable the grace 3 seconds before running the meshbook.")
parser.add_argument("-g", "--group", type=str, help="Specify a manual override for the group.", default="")
parser.add_argument("-d", "--device", type=str, help="Specify a manual override for a device", default="")
parser.add_argument("-i", "--indent", action="store_true", help="Use an JSON indentation of 4 when this flag is passed.", default=False)
parser.add_argument("-r", "--raw-result", action="store_true", help="Print the raw result.", default=False)
parser.add_argument("-s", "--silent", action="store_true", help="Suppress terminal output.", default=False)
parser.add_argument("--shlex", action="store_true", help="Shlex the lines.", default=False)
parser.add_argument("--version", action="store_true", help="Show the Meshbook version.")
args = parser.parse_args() args = parser.parse_args()
local_categories_file = "./os_categories.json"
if args.version: if args.version:
console.nice_print(args, Console.print_text(args.silent,
console.text_color.reset + "MeshBook Version: " + console.text_color.yellow + str(meshbook_version)) Console.text_color.reset + "MeshBook Version: " + Console.text_color.yellow + str(meshbook_version))
return return
if not args.meshbook: if not args.meshbook:
@@ -192,151 +89,198 @@ async def main():
with open(local_categories_file, "r") as file: with open(local_categories_file, "r") as file:
os_categories = json.load(file) os_categories = json.load(file)
if not Utilities.path_exist(args.meshbook) or Utilities.path_type(args.meshbook) != "File":
Console.print_text(args.silent,
Console.text_color.red + "The given meshbook path is either not present on the filesystem or not a file.")
return
credentials, meshbook = await asyncio.gather( credentials, meshbook = await asyncio.gather(
(utilities.load_config(args)), (Utilities.load_config(args)),
(utilities.compile_book(args.meshbook)) (Utilities.compile_book(args.meshbook))
) )
if args.group != "": if args.group != "":
meshbook["group"] = args.group meshbook["group"] = args.group
del meshbook["device"] if "device" in meshbook:
del meshbook["device"]
if "devices" in meshbook:
del meshbook["devices"]
elif args.device != "": elif args.device != "":
meshbook["device"] = args.device meshbook["device"] = args.device
del meshbook["group"] if "group" in meshbook:
del meshbook["group"]
if "groups" in meshbook:
del meshbook["groups"]
''' '''
The following section mainly displays used variables and first steps of the program to the console. The following section mainly displays used variables and first steps of the program to the Console.
''' '''
# INIT ARGUMENTS PRINTING # INIT ARGUMENTS PRINTING
console.nice_print(args, Console.print_line(args.silent)
console.text_color.reset + ("-" * 40)) Console.print_text(args.silent,
console.nice_print(args, "meshbook: " + Console.text_color.yellow + args.meshbook + Console.text_color.reset + ".")
"meshbook: " + console.text_color.yellow + args.meshbook + console.text_color.reset + ".") Console.print_text(args.silent,
console.nice_print(args, "Operating System Categorisation file: " + Console.text_color.yellow + args.oscategories + Console.text_color.reset + ".")
"Operating System Categorisation file: " + console.text_color.yellow + args.oscategories + console.text_color.reset + ".") Console.print_text(args.silent,
console.nice_print(args, "Configuration file: " + Console.text_color.yellow + args.conf + Console.text_color.reset + ".")
"Configuration file: " + console.text_color.yellow + args.conf + console.text_color.reset + ".")
# TARGET OS PRINTING # TARGET OS PRINTING
if "target_os" in meshbook: if "target_os" in meshbook:
console.nice_print(args, Console.print_text(args.silent,
"Target Operating System category given: " + console.text_color.yellow + meshbook["target_os"] + console.text_color.reset + ".") "Target Operating System category given: " + Console.text_color.yellow + meshbook["target_os"] + Console.text_color.reset + ".")
else: else:
console.nice_print(args, Console.print_text(args.silent,
"Target Operating System category given: " + console.text_color.yellow + "All" + console.text_color.reset + ".") "Target Operating System category given: " + Console.text_color.yellow + "All" + Console.text_color.reset + ".")
# Should Meshbook ignore categorisation? # Should Meshbook ignore categorisation?
if "ignore_categorisation" in meshbook: if "ignore_categorisation" in meshbook:
console.nice_print(args, Console.print_text(args.silent,
"Ignore the OS Categorisation file: " + console.text_color.yellow + str(meshbook["ignore_categorisation"]) + console.text_color.reset + ".") "Ignore the OS Categorisation file: " + Console.text_color.yellow + str(meshbook["ignore_categorisation"]) + Console.text_color.reset + ".")
if meshbook["ignore_categorisation"]: if meshbook["ignore_categorisation"]:
console.nice_print(args, Console.print_text(args.silent,
console.text_color.red + "!!!!\n" + Console.text_color.red + "!!!!\n" +
console.text_color.yellow + Console.text_color.yellow +
"Ignore categorisation is True.\nThis means that the program checks if the target Operating System is somewhere in the reported device Operating System." + "Ignore categorisation is True.\nThis means that the program checks if the target Operating System is somewhere in the reported device Operating System." +
console.text_color.red + "\n!!!!") Console.text_color.red + "\n!!!!")
else: else:
console.nice_print(args, Console.print_text(args.silent,
"Ignore the OS Categorisation file: " + console.text_color.yellow + "False" + console.text_color.reset + ".") "Ignore the OS Categorisation file: " + Console.text_color.yellow + "False" + Console.text_color.reset + ".")
# TARGET TAG PRINTING # TARGET TAG PRINTING
if "target_tag" in meshbook: if "target_tag" in meshbook:
console.nice_print(args, Console.print_text(args.silent,
"Target Device tag given: " + console.text_color.yellow + meshbook["target_tag"] + console.text_color.reset + ".") "Target Device tag given: " + Console.text_color.yellow + meshbook["target_tag"] + Console.text_color.reset + ".")
else: else:
console.nice_print(args, Console.print_text(args.silent,
"Target Device tag given: " + console.text_color.yellow + "All" + console.text_color.reset + ".") "Target Device tag given: " + Console.text_color.yellow + "All" + Console.text_color.reset + ".")
# TARGET PRINTING # TARGET PRINTING
if "device" in meshbook: if "device" in meshbook:
console.nice_print(args, Console.print_text(args.silent,
"Target device: " + console.text_color.yellow + str(meshbook["device"]) + console.text_color.reset + ".") "Target device: " + Console.text_color.yellow + str(meshbook["device"]) + Console.text_color.reset + ".")
elif "devices" in meshbook: elif "devices" in meshbook:
console.nice_print(args, Console.print_text(args.silent,
"Target devices: " + console.text_color.yellow + str(meshbook["devices"]) + console.text_color.reset + ".") "Target devices: " + Console.text_color.yellow + str(meshbook["devices"]) + Console.text_color.reset + ".")
elif "group" in meshbook: elif "group" in meshbook:
console.nice_print(args, Console.print_text(args.silent,
"Target group: " + console.text_color.yellow + str(meshbook["group"]) + console.text_color.reset + ".") "Target group: " + Console.text_color.yellow + str(meshbook["group"]) + Console.text_color.reset + ".")
elif "groups" in meshbook: elif "groups" in meshbook:
console.nice_print(args, Console.print_text(args.silent,
"Target groups: " + console.text_color.yellow + str(meshbook["groups"]) + console.text_color.reset + ".") "Target groups: " + Console.text_color.yellow + str(meshbook["groups"]) + Console.text_color.reset + ".")
# RUNNING PARAMETERS PRINTING # RUNNING PARAMETERS PRINTING
console.nice_print(args, "Grace: " + console.text_color.yellow + str((not args.nograce))) # Negation of bool for correct explanation Console.print_text(args.silent, "Grace: " + Console.text_color.yellow + str(not args.nograce) + Console.text_color.reset + ".") # Negation of bool for correct explanation
console.nice_print(args, "Silent: " + console.text_color.yellow + "False") # Can be pre-defined because if silent flag was passed then none of this would be printed. Console.print_text(args.silent, "Silent: " + Console.text_color.yellow + "False" + Console.text_color.reset + ".") # Can be pre-defined because if silent flag was passed then none of this would be printed.
session = await init_connection(credentials) session = await init_connection(credentials)
# PROCESS PRINTING aka what its doing in the moment... # PROCESS PRINTING aka what its doing in the moment...
console.nice_print(args, Console.print_line(args.silent)
console.text_color.reset + ("-" * 40)) Console.print_text(args.silent,
console.nice_print(args, Console.text_color.italic + "Trying to load the MeshCentral account credential file...")
console.text_color.italic + "Trying to load the MeshCentral account credential file...") Console.print_text(args.silent,
console.nice_print(args, Console.text_color.italic + "Trying to load the meshbook yaml file and compile it into something workable...")
console.text_color.italic + "Trying to load the meshbook yaml file and compile it into something workable...") Console.print_text(args.silent,
console.nice_print(args, Console.text_color.italic + "Trying to load the Operating System categorisation JSON file...")
console.text_color.italic + "Trying to load the Operating System categorisation JSON file...") Console.print_text(args.silent,
console.nice_print(args, Console.text_color.italic + "Connecting to MeshCentral and establish a session using variables from previous credential file.")
console.text_color.italic + "Connecting to MeshCentral and establish a session using variables from previous credential file.") Console.print_text(args.silent,
console.nice_print(args, Console.text_color.italic + "Generating group list with nodes and reference the targets from that.")
console.text_color.italic + "Generating group list with nodes and reference the targets from that.")
''' '''
End of the main information displaying section. End of the main information displaying section.
''' '''
group_list = await transform.compile_group_list(session) group_list = await Transform.compile_group_list(session)
compiled_device_list = await gather_targets(args, meshbook, group_list, os_categories) compiled_device_list = await Utilities.gather_targets(args.silent, meshbook, group_list, os_categories)
if len(compiled_device_list["target_list"]) == 0: # Check if we have reachable targets on the MeshCentral host
console.nice_print(args, if "target_list" not in compiled_device_list or len(compiled_device_list["target_list"]) == 0:
console.text_color.red + "No targets found or targets unreachable, quitting.", True) Console.print_text(args.silent,
Console.text_color.red + "No targets found or targets unreachable, quitting.")
console.nice_print(args, Console.print_line(args.silent)
console.text_color.reset + ("-" * 40), True) return
Console.print_line(args.silent)
match meshbook:
case {"group": candidate_target_name}:
target_name = candidate_target_name
case {"groups": candidate_target_name}:
target_name = str(candidate_target_name)
case {"device": candidate_target_name}:
target_name = candidate_target_name
case {"devices": candidate_target_name}:
target_name = str(candidate_target_name)
case _:
target_name = ""
# Initialize the history / logging functions class (whatever you want to name it)
history = History(args.silent, args.historydir, args.flushhistory)
# Conclude history initlialization
Console.print_line(args.silent)
# From here on the actual exection happens
Console.print_text(args.silent,
Console.text_color.yellow + "Executing meshbook on the target(s): " + Console.text_color.green + target_name + Console.text_color.yellow + ".")
if not args.nograce:
Console.print_text(args.silent,
Console.text_color.yellow + "Initiating grace-period...")
for x in range(grace_period):
Console.print_text(args.silent,
Console.text_color.yellow + "{}...".format(x+1)) # Countdown!
await asyncio.sleep(1)
Console.print_line(args.silent)
complete_log = await Executor.execute_meshbook(args.silent,
args.shlex,
session,
compiled_device_list,
meshbook,
group_list)
Console.print_line(args.silent)
indent = None
if args.indent: indent = 4
formatted_history = json.dumps(complete_log,indent=indent)
Console.print_text(args.silent, formatted_history, 9)
# Pass the output of the whole program to the history class
if args.nohistory:
Console.print_text(args.silent, "Not writing to file.")
else: else:
console.nice_print(args, Console.print_text(args.silent, "Writing to file...")
console.text_color.reset + ("-" * 40)) history.write_history(formatted_history)
match meshbook:
case {"group": candidate_target_name}:
target_name = candidate_target_name
case {"groups": candidate_target_name}:
target_name = str(candidate_target_name)
case {"device": candidate_target_name}:
target_name = candidate_target_name
case {"devices": candidate_target_name}:
target_name = str(candidate_target_name)
console.nice_print(args,
console.text_color.yellow + "Executing meshbook on the target(s): " + console.text_color.green + target_name + console.text_color.yellow + ".")
if not args.nograce:
console.nice_print(args,
console.text_color.yellow + "Initiating grace-period...")
for x in range(grace_period):
console.nice_print(args,
console.text_color.yellow + "{}...".format(x+1)) # Countdown!
await asyncio.sleep(1)
console.nice_print(args, console.text_color.reset + ("-" * 40))
await executor.execute_meshbook(args,
session,
compiled_device_list,
meshbook,
group_list)
await session.close()
except OSError as message: except OSError as message:
console.nice_print(args, Console.print_text(
console.text_color.red + message, True) args.silent,
Console.text_color.red + f'{message}'
)
except asyncio.CancelledError:
Console.print_text(
args.silent,
Console.text_color.red + "Received SIGINT, Aborting - (Tasks may still be running on targets)."
)
raise
finally:
await session.close()
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(main()) try:
asyncio.run(main())
except KeyboardInterrupt:
Console.print_text(False, Console.text_color.red + "Cancelled execution.")

View File

@@ -1,25 +1,54 @@
# Public Python libraries # Public Python libraries
import argparse import argparse
from datetime import datetime
class console: class Console:
class text_color: class text_color:
black = "\033[30m" black = "\033[30m"
red = "\033[31m" red = "\033[31m"
green = "\033[32m" green = "\033[32m"
yellow = "\033[33m" yellow = "\033[33m"
blue = "\033[34m" blue = "\033[34m"
magenta = "\033[35m" magenta = "\033[35m"
cyan = "\033[36m" cyan = "\033[36m"
white = "\033[37m" white = "\033[37m"
italic = "\x1B[3m" italic = "\x1B[3m"
reset = "\x1B[0m" reset = "\x1B[0m"
def nice_print(args: argparse.Namespace, message: str, final: bool=False): @staticmethod
def print_text(silent: bool, message: str, prefix_select: int = 0) -> None:
''' '''
Helper function for terminal output, with a couple variables for the silent flag. Also clears terminal color each time. Helper function for terminal output, with a couple variables for the silent flag. Also clears terminal color each time.
'''
if final: int tag_select legend:
print(message) # Assuming final message, there is no need for clearing. 0 / default = timestamp
elif not args.silent: 1 = info
print(message + console.text_color.reset) 2 = warn
3 = err
4 = fatal
9 = nothing
'''
match prefix_select:
case 1:
tag_prefix = "[INFO] "
case 2:
tag_prefix = "[WARN] "
case 3:
tag_prefix = "[ERROR] "
case 4:
tag_prefix = "[FATAL] "
case 9:
tag_prefix = ""
case _:
tag_prefix = f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} "
if not silent:
print(tag_prefix + message + Console.text_color.reset)
@staticmethod
def print_line(silent: bool, special: bool = False) -> None:
if not silent:
if special:
print("-=-" * 40)
else:
print(("-" * 40))

View File

@@ -5,40 +5,41 @@ import meshctrl
from time import sleep from time import sleep
# Local Python libraries/modules # Local Python libraries/modules
from modules.console import console from modules.console import Console
from modules.utilities import transform from modules.utilities import Transform
intertask_delay = 0.5 intertask_delay = 1
class executor: class Executor:
async def execute_meshbook(args: argparse.Namespace, session: meshctrl.Session, compiled_device_list: dict, meshbook: dict, group_list: dict) -> None: @staticmethod
async def execute_meshbook(silent: bool, enable_shlex: bool, session: meshctrl.Session, compiled_device_list: dict, meshbook: dict, group_list: dict) -> dict:
''' '''
Actual function that handles meshbook execution, also responsible for formatting the resulting JSON. Actual function that handles meshbook execution, also responsible for formatting the resulting JSON.
''' '''
responses_list = {} complete_log = {}
targets = compiled_device_list["target_list"] targets = compiled_device_list["target_list"]
offline = compiled_device_list["offline_list"] offline = compiled_device_list["offline_list"]
round = 1 round = 1
for task in meshbook["tasks"]: for task in meshbook["tasks"]:
console.nice_print(args, Console.print_text(silent,
console.text_color.green + str(round) + ". Running: " + task["name"]) Console.text_color.green + str(round) + ". Running: " + task["name"])
if "powershell" in meshbook and meshbook["powershell"]: if "powershell" in meshbook and meshbook["powershell"]:
response = await session.run_command(nodeids=targets, command=task["command"],powershell=True,ignore_output=False,timeout=900) response = await session.run_command(nodeids=targets, command=task["command"],powershell=True,ignore_output=False,timeout=1800)
else: else:
response = await session.run_command(nodeids=targets, command=task["command"],ignore_output=False,timeout=900) response = await session.run_command(nodeids=targets, command=task["command"],powershell=False,ignore_output=False,timeout=1800)
task_batch = [] task_batch = []
for device in response: for device in response:
device_result = response[device]["result"] device_result = response[device]["result"]
response[device]["result"] = device_result.replace("Run commands completed.", "") response[device]["result"] = device_result.replace("Run commands completed.", "")
response[device]["device_id"] = device response[device]["device_id"] = device
response[device]["device_name"] = await transform.translate_nodeid_to_name(device, group_list) response[device]["device_name"] = await Transform.translate_nodeid_to_name(device, group_list)
task_batch.append(response[device]) task_batch.append(response[device])
responses_list["task_" + str(round)] = { complete_log["task_" + str(round)] = {
"task_name": task["name"], "task_name": task["name"],
"data": task_batch "data": task_batch
} }
@@ -46,20 +47,9 @@ class executor:
sleep(intertask_delay) # Sleep for x amount of time. sleep(intertask_delay) # Sleep for x amount of time.
for index, device in enumerate(offline): # Replace Device_id with actual human readable name for index, device in enumerate(offline): # Replace Device_id with actual human readable name
device_name = await transform.translate_nodeid_to_name(device, group_list) device_name = await Transform.translate_nodeid_to_name(device, group_list)
offline[index] = device_name offline[index] = device_name
responses_list["Offline"] = offline complete_log["Offline"] = offline
console.nice_print(args, # Return the result
console.text_color.reset + ("-" * 40)) return Transform.process_shell_response(enable_shlex, complete_log)
if args.indent:
if not args.raw_result:
responses_list = transform.process_shell_response(args.shlex, responses_list)
console.nice_print(args,
json.dumps(responses_list,indent=4), True)
else:
console.nice_print(args,
json.dumps(responses_list), True)

48
modules/history.py Normal file
View File

@@ -0,0 +1,48 @@
import os
from datetime import datetime
from modules.console import Console
class History():
def __init__(self, silent: bool, history_directory: str, flush_history: bool) -> None:
'''
Init function to declare some stuff and make sure we are good to go, mostly the directory.
'''
self.silent = silent
self.history_directory = history_directory
if not os.path.exists(history_directory):
Console.print_text(silent, "Directory absent, trying to create it now...")
try:
os.mkdir(history_directory)
except PermissionError:
Console.print_text(silent, Console.text_color.red + f"Failed to create directory, permission error.")
return
history_items = os.listdir(history_directory)
if len(history_items) == 1:
Console.print_text(silent, f"There is {len(history_items)} history item.")
else:
Console.print_text(silent, f"There are {len(history_items)} history items.")
if flush_history:
self.remove_history(history_items)
def remove_history(self, history_items: list[str]) -> None:
if not os.access(self.history_directory, os.W_OK):
Console.print_text(self.silent, Console.text_color.red + "Unable to flush history logs, no write access.")
return
for item in history_items:
stitched_path = f"{self.history_directory}/{item}"
Console.print_text(self.silent, f"Removing: {item}.")
os.remove(stitched_path)
def write_history(self, history: dict) -> bool:
stitched_file = f"{self.history_directory}/meshbook_run_{datetime.now().strftime('%Y_%m_%d_%H_%M_%S')}.log"
with open(stitched_file, "x") as f:
f.write(history + "\n")

View File

@@ -3,13 +3,17 @@ import argparse
from configparser import ConfigParser from configparser import ConfigParser
import meshctrl import meshctrl
import os import os
import shlex
import yaml import yaml
from modules.console import Console
''' '''
Creation and compilation of the MeshCentral nodes list (list of all nodes available to the user in the configuration) is handled in the following section. Creation and compilation of the MeshCentral nodes list (list of all nodes available to the user in the configuration) is handled in the following section.
''' '''
class utilities: class Utilities:
@staticmethod
async def load_config(args: argparse.Namespace, async def load_config(args: argparse.Namespace,
segment: str = 'meshcentral-account') -> dict: segment: str = 'meshcentral-account') -> dict:
''' '''
@@ -32,17 +36,138 @@ class utilities:
print(f'Segment "{segment}" not found in config file {conf_file}.') print(f'Segment "{segment}" not found in config file {conf_file}.')
os._exit(1) os._exit(1)
return config[segment] return dict(config[segment])
async def compile_book(meshbook_file: dict) -> dict: @staticmethod
async def compile_book(meshbook_file: str) -> dict:
''' '''
Simple function that opens the file and replaces placeholders through the next function. After that just return it. Simple function that opens the file and replaces placeholders through the next function. After that just return it.
''' '''
meshbook = open(meshbook_file, 'r') with open(meshbook_file, 'r') as f:
meshbook = await transform.replace_placeholders(yaml.safe_load(meshbook)) meshbook = f.read()
meshbook = await Transform.replace_placeholders(yaml.safe_load(meshbook))
return meshbook return meshbook
@staticmethod
async def gather_targets(silent: bool,
meshbook: dict,
group_list: dict[str, list[dict]],
os_categories: dict) -> dict:
"""
Finds target devices based on meshbook criteria (device, devices, group or groups).
"""
group_list = {k.lower(): v for k, v in group_list.items()} # Normalize keys
target_list = []
offline_list = []
target_os = meshbook.get("target_os")
target_tag = meshbook.get("target_tag")
ignore_categorisation = meshbook.get("ignore_categorisation", False)
async def add_processed_devices(processed):
"""Helper to update target and offline lists."""
if processed:
target_list.extend(processed.get("valid_devices", []))
offline_list.extend(processed.get("offline_devices", []))
async def process_device_helper(device):
processed = await Utilities.process_device(
device,
group_list,
os_categories,
target_os,
ignore_categorisation,
target_tag
)
await add_processed_devices(processed)
async def process_group_helper(group):
processed = await Utilities.filter_targets(
group, os_categories, target_os, ignore_categorisation, target_tag
)
await add_processed_devices(processed)
'''
Groups receive the first priority, then device targets.
'''
match meshbook:
case {"group": pseudo_target}:
if isinstance(pseudo_target, str):
pseudo_target = pseudo_target.lower()
if pseudo_target in group_list:
await process_group_helper(group_list[pseudo_target])
elif pseudo_target not in group_list:
Console.print_text(
silent,
Console.text_color.yellow + "Targeted group not found on the MeshCentral server."
)
elif isinstance(pseudo_target, list):
Console.print_text(
silent,
Console.text_color.yellow + "Please use groups (Notice the plural with 'S') for multiple groups."
)
else:
Console.print_text(
silent,
Console.text_color.yellow + "The 'group' key is being used, but an unknown data type was found, please check your values."
)
case {"groups": pseudo_target}:
if isinstance(pseudo_target, str) or (isinstance(pseudo_target, list) and len(pseudo_target) == 1):
Console.print_text(
silent,
Console.text_color.yellow + "The 'groups' key is being used, but only one group seems to be given. Did you mean 'group'?"
)
elif isinstance(pseudo_target, list):
for sub_group in pseudo_target:
sub_group = sub_group.lower()
if sub_group in group_list:
await process_group_helper(group_list[sub_group])
elif isinstance(pseudo_target, str) and pseudo_target.lower() == "all":
for group in group_list.values():
await process_group_helper(group)
else:
Console.print_text(
silent,
Console.text_color.yellow + "The 'groups' key is being used, but an unknown data type was found, please check your values."
)
case {"device": pseudo_target}:
if isinstance(pseudo_target, str):
await process_device_helper(pseudo_target)
elif isinstance(pseudo_target, list):
Console.print_text(
silent,
Console.text_color.yellow + "Please use devices (Notice the plural with 'S') for multiple devices."
)
else:
Console.print_text(
silent,
Console.text_color.yellow + "The 'device' key is being used, but an unknown data type was found, please check your values."
)
case {"devices": pseudo_target}:
if isinstance(pseudo_target, str) or (isinstance(pseudo_target, list) and len(pseudo_target) == 1):
Console.print_text(
silent,
Console.text_color.yellow + "The 'devices' key is being used, but only one device seems to be given. Did you mean 'device'?"
)
elif isinstance(pseudo_target, list):
for sub_device in pseudo_target:
await process_device_helper(sub_device)
else:
Console.print_text(
silent,
Console.text_color.yellow + "The 'devices' key is being used, but an unknown data type was found, please check your values."
)
return {"target_list": target_list, "offline_list": offline_list}
@staticmethod
def get_os_variants(target_category: str, def get_os_variants(target_category: str,
os_map: dict) -> set: os_map: dict) -> set:
''' '''
@@ -56,7 +181,7 @@ class utilities:
os_set = set() os_set = set()
for sub_target_cat in value: for sub_target_cat in value:
os_set.update(utilities.get_os_variants(sub_target_cat, value)) os_set.update(Utilities.get_os_variants(sub_target_cat, value))
return os_set return os_set
@@ -65,26 +190,28 @@ class utilities:
return set() return set()
@staticmethod
async def filter_targets(devices: list[dict], async def filter_targets(devices: list[dict],
os_categories: dict, os_categories: dict,
target_os: str = None, target_os: str = "",
ignore_categorisation: bool = False, ignore_categorisation: bool = False,
target_tag: str = None) -> dict: target_tag: str = "") -> dict:
''' '''
Filters devices based on reachability and optional OS criteria, supporting nested OS categories. Filters devices based on reachability and optional OS criteria, supporting nested OS categories.
''' '''
valid_devices = [] valid_devices = []
offline_devices = [] offline_devices = []
allowed_os = set()
# Identify correct OS filtering scope # Identify correct OS filtering scope
for key in os_categories: for key in os_categories:
if key == target_os: if key == target_os:
allowed_os = utilities.get_os_variants(target_os, os_categories) allowed_os = Utilities.get_os_variants(target_os, os_categories)
break # Stop searching once a match is found break # Stop searching once a match is found
if isinstance(os_categories[key], dict) and target_os in os_categories[key]: if isinstance(os_categories[key], dict) and target_os in os_categories[key]:
allowed_os = utilities.get_os_variants(target_os, os_categories[key]) allowed_os = Utilities.get_os_variants(target_os, os_categories[key])
break # Stop searching once a match is found break # Stop searching once a match is found
for device in devices: # Filter out unwanted or unreachable devices. for device in devices: # Filter out unwanted or unreachable devices.
@@ -109,37 +236,61 @@ class utilities:
"offline_devices": offline_devices "offline_devices": offline_devices
} }
async def process_device_or_group(pseudo_target: str, @staticmethod
group_list: dict, async def process_device(device: str,
os_categories: dict, group_list: dict,
target_os: str, os_categories: dict,
ignore_categorisation: bool, target_os: str,
target_tag: str) -> dict: ignore_categorisation: bool,
''' target_tag: str) -> dict:
Helper function to process devices or groups. """
''' Processes a single device or pseudo-target against group_list,
filters matches by OS and tags, and adds processed devices.
"""
matched_devices = [] matched_devices = []
pseudo_target = device.lower()
# Find devices that match the pseudo_target
for group in group_list: for group in group_list:
for device in group_list[group]: for dev in group_list[group]:
if device["device_name"] == pseudo_target: if dev["device_name"].lower() == pseudo_target:
matched_devices.append(device) matched_devices.append(dev)
# If matches found, filter them and add processed devices
if matched_devices: if matched_devices:
return await utilities.filter_targets(matched_devices, os_categories, target_os, ignore_categorisation, target_tag) processed = await Utilities.filter_targets(
return [] matched_devices, os_categories, target_os, ignore_categorisation, target_tag
)
return processed
import shlex # No matches found
class transform: return {"valid_devices": [], "offline_devices": []}
def process_shell_response(shlex_enable: bool, meshbook_result: dict) -> dict:
@staticmethod
def path_exist(path: str) -> bool:
return os.path.exists(path)
@staticmethod
def path_type(path: str) -> str:
if os.path.isfile(path):
return "File"
if os.path.isdir(path):
return "Dir"
if os.path.islink(path):
return "Link"
return "Undefined"
class Transform:
@staticmethod
def process_shell_response(enable_shlex: bool, meshbook_result: dict) -> dict:
for task_name, task_data in meshbook_result.items(): for task_name, task_data in meshbook_result.items():
if task_name == "Offline": # Failsafe if task_name == "Offline": # Failsafe do not parse Offline section, its simple
continue continue
for node_responses in task_data["data"]: for node_responses in task_data["data"]:
task_result = node_responses["result"].splitlines() task_result = node_responses["result"].splitlines()
if shlex_enable: if enable_shlex:
for index, line in enumerate(task_result): for index, line in enumerate(task_result):
line = shlex.split(line) line = shlex.split(line)
task_result[index] = line task_result[index] = line
@@ -152,6 +303,7 @@ class transform:
node_responses["result"] = clean_output node_responses["result"] = clean_output
return meshbook_result return meshbook_result
@staticmethod
async def translate_nodeid_to_name(target_id: str, group_list: dict) -> str: async def translate_nodeid_to_name(target_id: str, group_list: dict) -> str:
''' '''
Simple function that looks up nodeid to the human-readable name if existent - otherwise return None. Simple function that looks up nodeid to the human-readable name if existent - otherwise return None.
@@ -161,8 +313,9 @@ class transform:
for device in group_list[group]: for device in group_list[group]:
if device["device_id"] == target_id: if device["device_id"] == target_id:
return device["device_name"] return device["device_name"]
return None return ""
@staticmethod
async def replace_placeholders(meshbook: dict) -> dict: async def replace_placeholders(meshbook: dict) -> dict:
''' '''
Replace the placeholders in both name and command fields of the tasks. According to the variables defined in the variables list. Replace the placeholders in both name and command fields of the tasks. According to the variables defined in the variables list.
@@ -196,6 +349,7 @@ class transform:
return meshbook return meshbook
@staticmethod
async def compile_group_list(session: meshctrl.Session) -> dict: async def compile_group_list(session: meshctrl.Session) -> dict:
''' '''
Function that retrieves the devices from MeshCentral and compiles it into a efficient list. Function that retrieves the devices from MeshCentral and compiles it into a efficient list.

View File

@@ -2,11 +2,13 @@
{ {
"Linux": { "Linux": {
"Debian": [ "Debian": [
"Debian GNU/Linux 13 (trixie)",
"Debian GNU/Linux 12 (bookworm)", "Debian GNU/Linux 12 (bookworm)",
"Debian GNU/Linux 11 (bullseye)" "Debian GNU/Linux 11 (bullseye)",
"Debian GNU/Linux 10 (buster)"
], ],
"Ubuntu": [ "Ubuntu": [
"Ubuntu 24.04.1 LTS", "Ubuntu 24.04.3 LTS",
"Ubuntu 22.04.5 LTS", "Ubuntu 22.04.5 LTS",
"Ubuntu 20.04.6 LTS" "Ubuntu 20.04.6 LTS"
] ]

View File

@@ -1,4 +1,4 @@
colorama==0.4.6 colorama==0.4.6
pyyaml==6.0.2 pyyaml==6.0.3
libmeshctrl==1.2.2 libmeshctrl==1.3.3
pyotp==2.9.0 pyotp==2.9.0