Files
chronorunner/runner/modules/connect.py
2025-06-03 16:57:10 +02:00

59 lines
2.1 KiB
Python

import meshctrl
from json import dumps
from modules.utilities import transform
class connect:
@staticmethod
async def quit(session: meshctrl.Session) -> None: # Function to use when quitting, but gracefully.
await session.close()
@staticmethod
async def connect(hostname: str, username: str, password: str) -> meshctrl.Session:
session = meshctrl.Session(
hostname,
user=username,
password=password
)
await session.initialized.wait()
return session
@staticmethod
async def run(session: meshctrl.Session, command: str, nodeid: str) -> list[str]:
try:
response = await session.run_command(nodeids=nodeid,
command=command,
ignore_output=False,
timeout=900)
except Exception as error:
print("Run Command failed.", error)
return []
result = []
for device in response:
result = transform.process_shell_response(response[device]["result"])
return result
@staticmethod
async def list_online(session: meshctrl.Session) -> dict: # Default is return online devices, but function can also return the offline devices if specified.
raw_device_list = await session.list_devices()
complete_list = {}
complete_list["online_devices"] = []
complete_list["offline_devices"] = []
for raw_device in raw_device_list:
if raw_device.connected:
complete_list["online_devices"].append({
"name": raw_device.name,
"nodeid": raw_device.nodeid
})
else:
complete_list["offline_devices"].append({
"name": raw_device.name,
"nodeid": raw_device.nodeid
})
complete_list["total_devices"] = len(complete_list["online_devices"]) + len(complete_list["offline_devices"])
return complete_list