Compare commits

..

2 Commits

Author SHA1 Message Date
Josiah Baldwin
b870aa25bd Added ping and raw raw_events; Removed some debug stuff 2024-12-03 17:46:57 -08:00
Josiah Baldwin
c63604f624 Removed test users file 2024-12-02 18:22:04 -08:00
4 changed files with 108 additions and 54 deletions

View File

@@ -159,13 +159,20 @@ class Session(object):
async def _send_data_task(self, websocket):
while True:
message = await self._message_queue.get()
print(f"{self._user} send: {message}\n")
await websocket.send(message)
async def _listen_data_task(self, websocket):
async for message in websocket:
print(f"{self._user} recv: {message}\n")
data = json.loads(message)
await self._eventer.emit("raw", message)
# Meshcentral does pong wrong and breaks our parsing, so fix it here.
if message == '{action:"pong"}':
message = '{"action":"pong"}'
# Can't process non-json data, don't even try
try:
data = json.loads(message)
except SyntaxError:
continue
action = data.get("action", None)
await self._eventer.emit("server_event", data)
if action == "close":
@@ -234,14 +241,14 @@ class Session(object):
return response
@util._check_socket
async def _send_command_no_response_id(self, data, timeout=None):
async def _send_command_no_response_id(self, data, action_override=None, timeout=None):
responded = asyncio.Event()
response = None
async def _(data):
nonlocal response
response = data
responded.set()
self._eventer.once(data["action"], _)
self._eventer.once(action_override if action_override is not None else data["action"], _)
await self._message_queue.put(json.dumps(data))
await asyncio.wait_for(responded.wait(), timeout=timeout)
if isinstance(response, Exception):
@@ -268,6 +275,23 @@ class Session(object):
"""
return self._user_info
async def ping(self, timeout=None):
'''
Ping the server. WARNING: Non namespaced call. Calling this function again before it returns may cause unintended consequences.
Args:
timeout (int): duration in seconds to wait for a response before throwing an error
Returns:
dict: {"action": "pong"}
Raises:
:py:class:`~meshctrl.exceptions.ServerError`: Error from server
:py:class:`~meshctrl.exceptions.SocketError`: Info about socket closure
asyncio.TimeoutError: Command timed out
'''
data = await self._send_command_no_response_id({"action": "ping"}, action_override="pong", timeout=timeout)
return data
async def list_device_groups(self, timeout=None):
'''
@@ -284,7 +308,7 @@ class Session(object):
:py:class:`~meshctrl.exceptions.SocketError`: Info about socket closure
asyncio.TimeoutError: Command timed out
'''
data = await self._send_command({"action": "meshes"}, "list_device_groups", timeout)
data = await self._send_command({"action": "meshes"}, "list_device_groups", timeout=timeout)
return [mesh.Mesh(m["_id"], self, **m) for m in data["meshes"]]
@@ -323,7 +347,7 @@ class Session(object):
op["name"] = name
if message:
op["msg"] = message
data = await self._send_command(op, "send_invite_email", timeout)
data = await self._send_command(op, "send_invite_email", timeout=timeout)
if ("result" in data and data["result"].lower() != "ok"):
raise exceptions.ServerError(data["result"])
return True
@@ -359,7 +383,7 @@ class Session(object):
op["meshname"] = group
if flags != None:
op["flags"] = flags
data = await self._send_command(op, "generate_invite_link", timeout)
data = await self._send_command(op, "generate_invite_link", timeout=timeout)
if ("result" in data and data["result"].lower() != "ok"):
raise exceptions.ServerError(data["result"])
del data["tag"]
@@ -382,7 +406,7 @@ class Session(object):
:py:class:`~meshctrl.exceptions.SocketError`: Info about socket closure
asyncio.TimeoutError: Command timed out
'''
data = await self._send_command({"action": "users"}, "list_users", timeout)
data = await self._send_command({"action": "users"}, "list_users", timeout=timeout)
if ("result" in data and data["result"].lower() != "ok"):
raise exceptions.ServerError(data["result"])
return data["users"]
@@ -401,7 +425,7 @@ class Session(object):
:py:class:`~meshctrl.exceptions.SocketError`: Info about socket closure
asyncio.TimeoutError: Command timed out
'''
return (await self._send_command({"action": "wssessioncount"}, "list_user_sessions", timeout))["wssessions"]
return (await self._send_command({"action": "wssessioncount"}, "list_user_sessions", timeout=timeout))["wssessions"]
async def list_devices(self, details=False, group=None, meshid=None, timeout=None):
@@ -426,14 +450,14 @@ class Session(object):
tasks = []
async with asyncio.TaskGroup() as tg:
if details:
tasks.append(tg.create_task(self._send_command_no_response_id({"action": "getDeviceDetails", "type":"json"}, timeout)))
tasks.append(tg.create_task(self._send_command_no_response_id({"action": "getDeviceDetails", "type":"json"}, timeout=timeout)))
elif group:
tasks.append(tg.create_task(self._send_command({ "action": 'nodes', "meshname": group}, "list_devices", timeout)))
tasks.append(tg.create_task(self._send_command({ "action": 'nodes', "meshname": group}, "list_devices", timeout=timeout)))
elif meshid:
tasks.append(tg.create_task(self._send_command({ "action": 'nodes', "meshid": meshid}, "list_devices", timeout)))
tasks.append(tg.create_task(self._send_command({ "action": 'nodes', "meshid": meshid}, "list_devices", timeout=timeout)))
else:
tasks.append(tg.create_task(self._send_command({ "action": 'meshes' }, "list_devices", timeout)))
tasks.append(tg.create_task(self._send_command({ "action": 'nodes' }, "list_devices", timeout)))
tasks.append(tg.create_task(self._send_command({ "action": 'meshes' }, "list_devices", timeout=timeout)))
tasks.append(tg.create_task(self._send_command({ "action": 'nodes' }, "list_devices", timeout=timeout)))
res0 = tasks[0].result()
if "result" in res0:
@@ -478,6 +502,24 @@ class Session(object):
node["mesh"] = mesh.Mesh(node.get("meshid"), self)
return [device.Device(n["_id"], self, **n) for n in nodes]
async def raw_messages(self):
'''
Listen to raw messages from the server. These will be strings that have not been parsed at all. Consider this an emergency fallback if meshcentral sends something odd. You will get every message from the websocket.
Returns:
generator(data): A generator which will generate every message the server sends
'''
event_queue = asyncio.Queue()
async def _(data):
await event_queue.put(data)
self._eventer.on("raw", _)
try:
while True:
data = await event_queue.get()
yield data
finally:
self._eventer.off("server_event", _)
async def events(self, filter=None):
'''
Listen to events from the server
@@ -535,7 +577,7 @@ class Session(object):
if limit:
cmd["limit"] = limit
data = await self._send_command(cmd, "list_events", timeout)
data = await self._send_command(cmd, "list_events", timeout=timeout)
return data["events"]
async def list_login_tokens(self, timeout=None):
@@ -552,7 +594,7 @@ class Session(object):
:py:class:`~meshctrl.exceptions.SocketError`: Info about socket closure
asyncio.TimeoutError: Command timed out
'''
return (await self._send_command_no_response_id({"action": "loginTokens"}, timeout))["loginTokens"]
return (await self._send_command_no_response_id({"action": "loginTokens"}, timeout=timeout))["loginTokens"]
async def add_login_token(self, name, expire=None, timeout=None):
'''
@@ -571,7 +613,7 @@ class Session(object):
asyncio.TimeoutError: Command timed out
'''
cmd = { "action": 'createLoginToken', "name": name, "expire": 0 if not expire else expire }
data = await self._send_command_no_response_id(cmd, timeout)
data = await self._send_command_no_response_id(cmd, timeout=timeout)
del data["action"]
return data
@@ -603,7 +645,7 @@ class Session(object):
name = token["tokenUser"]
break
realnames.append(name)
return (await self._send_command_no_response_id({ "action": 'loginTokens', "remove": realnames }, timeout))["loginTokens"]
return (await self._send_command_no_response_id({ "action": 'loginTokens', "remove": realnames }, timeout=timeout))["loginTokens"]
async def add_user(self, name, password=None, randompass=False, domain=None, email=None, emailverified=False, resetpass=False, realname=None, phone=None, rights=None, timeout=None):
'''
@@ -651,7 +693,7 @@ class Session(object):
if isinstance(realname, str):
op["realname"] = realname
data = await self._send_command(op, "add_user", timeout)
data = await self._send_command(op, "add_user", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
return True
@@ -706,7 +748,7 @@ class Session(object):
op["realname"] = realname
if realname is True:
op["realname"] = ''
data = await self._send_command(op, "edit_user", timeout)
data = await self._send_command(op, "edit_user", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
return True
@@ -732,7 +774,7 @@ class Session(object):
elif (self._domain is not None) and ("/" not in userid):
userid = f"user/{self._domain}/{userid}"
data = await self._send_command({ "action": 'deleteuser', "userid": userid }, "remove_user", timeout)
data = await self._send_command({ "action": 'deleteuser', "userid": userid }, "remove_user", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
return True
@@ -760,7 +802,7 @@ class Session(object):
op["domain"] = self._domain
elif self._domain is not None:
op["domain"] = self._domain
data = await self._send_command(op, "add_user_group", timeout)
data = await self._send_command(op, "add_user_group", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
@@ -795,7 +837,7 @@ class Session(object):
if (not groupid.startswith("ugrp/")):
groupid = f"ugrp//{groupid}"
data = await self._send_command({ "action": 'deleteusergroup', "ugrpid": groupid }, "remove_user_group", timeout)
data = await self._send_command({ "action": 'deleteusergroup', "ugrpid": groupid }, "remove_user_group", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
return True
@@ -814,7 +856,7 @@ class Session(object):
:py:class:`~meshctrl.exceptions.SocketError`: Info about socket closure
asyncio.TimeoutError: Command timed out
'''
r = await self._send_command({"action": "usergroups"}, "list_user_groups", timeout)
r = await self._send_command({"action": "usergroups"}, "list_user_groups", timeout=timeout)
groups = []
for key, val in r["ugroups"].items():
val["_id"] = key
@@ -888,7 +930,7 @@ class Session(object):
async with asyncio.TaskGroup() as tg:
tasks.append(tg.create_task(asyncio.wait_for(_(tg), timeout=timeout)))
tasks.append(tg.create_task(asyncio.wait_for(__(tg), timeout=timeout)))
tasks.append(tg.create_task(self._send_command({ "action": 'addusertousergroup', "ugrpid": groupid, "usernames": usernames}, "add_users_to_user_group", timeout)))
tasks.append(tg.create_task(self._send_command({ "action": 'addusertousergroup', "ugrpid": groupid, "usernames": usernames}, "add_users_to_user_group", timeout=timeout)))
res = tasks[2].result()
@@ -922,7 +964,7 @@ class Session(object):
if (not groupid.startswith("ugrp/")):
groupid = f"ugrp//{groupid}"
data = await self._send_command({ "action": 'removeuserfromusergroup', "ugrpid": groupid, "userid": userid }, "remove_from_user_group", timeout)
data = await self._send_command({ "action": 'removeuserfromusergroup', "ugrpid": groupid, "userid": userid }, "remove_from_user_group", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
@@ -953,7 +995,7 @@ class Session(object):
if rights is None:
rights = 0
data = await self._send_command({ "action": 'adddeviceuser', "nodeid": nodeid, "userids": userids, "rights": rights}, "add_users_to_device", timeout)
data = await self._send_command({ "action": 'adddeviceuser', "nodeid": nodeid, "userids": userids, "rights": rights}, "add_users_to_device", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
@@ -981,7 +1023,7 @@ class Session(object):
userids = [f"user//{u}" if not u.startswith("user//") else u for u in userids]
data = await self._send_command({ "action": 'adddeviceuser', "nodeid": nodeid, "usernames": userids, "rights": 0, "remove": True }, "remove_users_from_device", timeout)
data = await self._send_command({ "action": 'adddeviceuser', "nodeid": nodeid, "usernames": userids, "rights": 0, "remove": True }, "remove_users_from_device", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
@@ -1018,7 +1060,7 @@ class Session(object):
if consent:
op["consent"] = consent
data = await self._send_command(op, "add_device_group", timeout)
data = await self._send_command(op, "add_device_group", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
@@ -1053,7 +1095,7 @@ class Session(object):
op["meshname"] = meshid
del op["meshid"]
data = await self._send_command(op, "remove_device_group", timeout)
data = await self._send_command(op, "remove_device_group", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
@@ -1107,7 +1149,7 @@ class Session(object):
if consent is not None:
op["consent"] = consent
data = await self._send_command(op, "edit_device_group", timeout)
data = await self._send_command(op, "edit_device_group", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
@@ -1138,7 +1180,7 @@ class Session(object):
op["meshname"] = meshid
del op["meshid"]
data = await self._send_command(op, "move_to_device_group", timeout)
data = await self._send_command(op, "move_to_device_group", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
@@ -1171,7 +1213,7 @@ class Session(object):
op["meshname"] = meshid
del op["meshid"]
data = await self._send_command(op, "add_user_to_device_group", timeout)
data = await self._send_command(op, "add_user_to_device_group", timeout=timeout)
results = data["result"].split(",")
out = {}
for i, result in enumerate(results):
@@ -1213,7 +1255,7 @@ class Session(object):
tasks = []
async with asyncio.TaskGroup() as tg:
for userid in userids:
tasks.append(tg.create_task(self._send_command({ "action": 'removemeshuser', "userid": userid } | id_obj, "remove_users_from_device_group", timeout)))
tasks.append(tg.create_task(self._send_command({ "action": 'removemeshuser', "userid": userid } | id_obj, "remove_users_from_device_group", timeout=timeout)))
out = {}
for i, task in enumerate(tasks):
@@ -1247,7 +1289,7 @@ class Session(object):
if userid:
op["userid"] = userid
data = await self._send_command(op, "broadcast", timeout)
data = await self._send_command(op, "broadcast", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
@@ -1271,10 +1313,10 @@ class Session(object):
'''
tasks = []
async with asyncio.TaskGroup() as tg:
tasks.append(tg.create_task(self._send_command({ "action": 'nodes' }, "device_info", timeout)))
tasks.append(tg.create_task(self._send_command_no_response_id({ "action": 'getnetworkinfo', "nodeid": nodeid }, timeout)))
tasks.append(tg.create_task(self._send_command_no_response_id({ "action": 'lastconnect', "nodeid": nodeid }, timeout)))
tasks.append(tg.create_task(self._send_command({ "action": 'getsysinfo', "nodeid": nodeid, "nodeinfo": True }, "device_info", timeout)))
tasks.append(tg.create_task(self._send_command({ "action": 'nodes' }, "device_info", timeout=timeout)))
tasks.append(tg.create_task(self._send_command_no_response_id({ "action": 'getnetworkinfo', "nodeid": nodeid }, timeout=timeout)))
tasks.append(tg.create_task(self._send_command_no_response_id({ "action": 'lastconnect', "nodeid": nodeid }, timeout=timeout)))
tasks.append(tg.create_task(self._send_command({ "action": 'getsysinfo', "nodeid": nodeid, "nodeinfo": True }, "device_info", timeout=timeout)))
tasks.append(tg.create_task(self.list_device_groups(timeout=timeout)))
nodes, network, lastconnect, sysinfo, meshes = (_.result() for _ in tasks)
@@ -1344,7 +1386,7 @@ class Session(object):
if consent is not None:
op["consent"] = consent
data = await self._send_command(op, "edit_device", timeout)
data = await self._send_command(op, "edit_device", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
@@ -1402,7 +1444,7 @@ class Session(object):
continue
result[node]["result"].append(event["value"])
async def __(command):
data = await self._send_command(command, "run_command", timeout)
data = await self._send_command(command, "run_command", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
@@ -1479,7 +1521,7 @@ class Session(object):
if isinstance(nodeids, str):
nodeids = [nodeids]
return await self._send_command({ "action": 'wakedevices', "nodeids": nodeids }, "wake_devices", timeout)
return await self._send_command({ "action": 'wakedevices', "nodeids": nodeids }, "wake_devices", timeout=timeout)
async def reset_devices(self, nodeids, timeout=None):
'''
@@ -1500,7 +1542,7 @@ class Session(object):
if isinstance(nodeids, str):
nodeids = [nodeids]
return await self._send_command({ "action": 'poweraction', "nodeids": nodeids, "actiontype": 3 }, "reset_devices", timeout)
return await self._send_command({ "action": 'poweraction', "nodeids": nodeids, "actiontype": 3 }, "reset_devices", timeout=timeout)
async def sleep_devices(self, nodeids, timeout=None):
'''
@@ -1521,7 +1563,7 @@ class Session(object):
if isinstance(nodeids, str):
nodeids = [nodeids]
return await self._send_command({ "action": 'poweraction', "nodeids": nodeids, "actiontype": 4 }, "sleep_devices", timeout)
return await self._send_command({ "action": 'poweraction', "nodeids": nodeids, "actiontype": 4 }, "sleep_devices", timeout=timeout)
async def power_off_devices(self, nodeids, timeout=None):
'''
@@ -1542,7 +1584,7 @@ class Session(object):
if isinstance(nodeids, str):
nodeids = [nodeids]
return await self._send_command({ "action": 'poweraction', "nodeids": nodeids, "actiontype": 2 }, "power_off_devices", timeout)
return await self._send_command({ "action": 'poweraction', "nodeids": nodeids, "actiontype": 2 }, "power_off_devices", timeout=timeout)
async def list_device_shares(self, nodeid, timeout=None):
'''
@@ -1559,7 +1601,7 @@ class Session(object):
:py:class:`~meshctrl.exceptions.SocketError`: Info about socket closure
asyncio.TimeoutError: Command timed out
'''
data = await self._send_command_no_response_id({ "action": 'deviceShares', "nodeid": nodeid }, timeout)
data = await self._send_command_no_response_id({ "action": 'deviceShares', "nodeid": nodeid }, timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
@@ -1604,7 +1646,7 @@ class Session(object):
end = int(start.timestamp())
if end <= start:
raise ValueError("End time must be ahead of start time")
data = await self._send_command({ "action": 'createDeviceShareLink', "nodeid": nodeid, "guestname": name, "p": constants.SharingTypeEnum[type], "consent": consent, "start": start, "end": end }, "add_device_share", timeout)
data = await self._send_command({ "action": 'createDeviceShareLink', "nodeid": nodeid, "guestname": name, "p": constants.SharingTypeEnum[type], "consent": consent, "start": start, "end": end }, "add_device_share", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
@@ -1633,7 +1675,7 @@ class Session(object):
:py:class:`~meshctrl.exceptions.SocketError`: Info about socket closure
asyncio.TimeoutError: Command timed out
'''
data = await self._send_command({ "action": 'removeDeviceShare', "nodeid": nodeid, "publicid": shareid }, "remove_device_share", timeout)
data = await self._send_command({ "action": 'removeDeviceShare', "nodeid": nodeid, "publicid": shareid }, "remove_device_share", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
@@ -1669,7 +1711,7 @@ class Session(object):
tasks = []
async with asyncio.TaskGroup() as tg:
tasks.append(tg.create_task(asyncio.wait_for(_(), timeout=timeout)))
tasks.append({ "action": 'msg', "type": 'openUrl', "nodeid": nodeid, "url": url }, "device_open_url", timeout)
tasks.append({ "action": 'msg', "type": 'openUrl', "nodeid": nodeid, "url": url }, "device_open_url", timeout=timeout)
res = tasks[1].result()
success = tasks[2].result()
@@ -1701,7 +1743,7 @@ class Session(object):
:py:class:`~meshctrl.exceptions.SocketError`: Info about socket closure
asyncio.TimeoutError: Command timed out
'''
data = await self._send_command({ "action": 'msg', "type": 'messagebox', "nodeid": nodeid, "title": title, "msg": message }, "device_message", timeout)
data = await self._send_command({ "action": 'msg', "type": 'messagebox', "nodeid": nodeid, "title": title, "msg": message }, "device_message", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
@@ -1731,7 +1773,7 @@ class Session(object):
if isinstance(nodeids, str):
nodeids = [nodeids]
data = self._send_command({ "action": 'toast', "nodeids": nodeids, "title": "MeshCentral", "msg": message }, "device_toast", timeout)
data = self._send_command({ "action": 'toast', "nodeids": nodeids, "title": "MeshCentral", "msg": message }, "device_toast", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])

3
tests/.gitignore vendored
View File

@@ -1 +1,2 @@
/data
data
environment/scripts/meshcentral/users.json

View File

@@ -1 +0,0 @@
{"admin": "vt9BbctCg59vcuxKPh8v2tbDjudjwyeX", "privileged": "BWl1vhSe0j0lBfoAkx1JLXLBOwIWc0st", "unprivileged": "vCuatfTQGq8bL2pxdvrNzF+Dc4xBq+5Z"}

View File

@@ -9,6 +9,18 @@ import requests
async def test_sanity(env):
async with meshctrl.Session(env.mcurl, user="unprivileged", password=env.users["unprivileged"], ignore_ssl=True) as s:
got_pong = asyncio.Event()
async def _():
async for raw in s.raw_messages():
if raw == '{action:"pong"}':
got_pong.set()
break
ping_task = None
async with asyncio.TaskGroup() as tg:
tg.create_task(asyncio.wait_for(_(), timeout=5))
tg.create_task(asyncio.wait_for(got_pong.wait(), timeout=5))
ping_task = tg.create_task(s.ping(timeout=10))
print("\ninfo ping: {}\n".format(ping_task.result()))
print("\ninfo user_info: {}\n".format(await s.user_info()))
print("\ninfo server_info: {}\n".format(await s.server_info()))
pass