59 lines
2.1 KiB
Python
59 lines
2.1 KiB
Python
import meshctrl
|
|
from json import dumps
|
|
|
|
from modules.utilities import transform
|
|
|
|
class connect:
|
|
@staticmethod
|
|
async def quit(session: meshctrl.Session) -> None: # Function to use when quitting, but gracefully.
|
|
await session.close()
|
|
|
|
@staticmethod
|
|
async def connect(hostname: str, username: str, password: str) -> meshctrl.Session:
|
|
session = meshctrl.Session(
|
|
hostname,
|
|
user=username,
|
|
password=password
|
|
)
|
|
await session.initialized.wait()
|
|
return session
|
|
|
|
@staticmethod
|
|
async def run(session: meshctrl.Session, command: str, nodeid: str) -> list[str]:
|
|
try:
|
|
response = await session.run_command(nodeids=nodeid,
|
|
command=command,
|
|
ignore_output=False,
|
|
timeout=900)
|
|
except Exception as error:
|
|
print("Run Command failed.", error)
|
|
return []
|
|
|
|
result = []
|
|
for device in response:
|
|
result = transform.process_shell_response(response[device]["result"])
|
|
return result
|
|
|
|
@staticmethod
|
|
async def list_online(session: meshctrl.Session) -> dict: # Default is return online devices, but function can also return the offline devices if specified.
|
|
|
|
raw_device_list = await session.list_devices()
|
|
|
|
complete_list = {}
|
|
complete_list["online_devices"] = []
|
|
complete_list["offline_devices"] = []
|
|
|
|
for raw_device in raw_device_list:
|
|
if raw_device.connected:
|
|
complete_list["online_devices"].append({
|
|
"name": raw_device.name,
|
|
"nodeid": raw_device.nodeid
|
|
})
|
|
else:
|
|
complete_list["offline_devices"].append({
|
|
"name": raw_device.name,
|
|
"nodeid": raw_device.nodeid
|
|
})
|
|
complete_list["total_devices"] = len(complete_list["online_devices"]) + len(complete_list["offline_devices"])
|
|
|
|
return complete_list |