Compare commits

..

9 Commits

Author SHA1 Message Date
Josiah Baldwin
ab2a4c40bc Fixed auto-reconnect for proxy and created tests for auto-reconnect 2024-12-10 13:05:22 -08:00
Josiah Baldwin
0a657cee48 Added default port numbers to URL. This fixes an issue with proxy handling mhen you don't pass a port in the url. 2024-12-10 10:33:05 -08:00
Josiah Baldwin
03441161b2 Added timeout to check_socket decorator in case a connection fails to be made 2024-12-10 10:31:52 -08:00
Josiah Baldwin
24adf3baa5 Updated docs for proxy 2024-12-09 16:45:12 -08:00
Josiah Baldwin
1adaccabc0 Added proxy and tests for proxy 2024-12-09 16:42:32 -08:00
Josiah Baldwin
20843dbea7 Changed download file APIs so the stream returns at the position where it was passed in 2024-12-04 13:40:49 -08:00
Josiah Baldwin
af6c020506 Kinda fixed auto reconnect 2024-12-04 13:29:17 -08:00
Josiah Baldwin
b870aa25bd Added ping and raw raw_events; Removed some debug stuff 2024-12-03 17:46:57 -08:00
Josiah Baldwin
c63604f624 Removed test users file 2024-12-02 18:22:04 -08:00
14 changed files with 9622 additions and 99 deletions

Binary file not shown.

View File

@@ -46,6 +46,7 @@ install_requires =
importlib-metadata
cryptography>=43.0.3
websockets>=13.1
python-socks[asyncio]
[options.packages.find]

View File

@@ -8,6 +8,8 @@ import json
import datetime
import io
import ssl
import urllib
from python_socks.async_.asyncio import Proxy
from . import constants
from . import exceptions
from . import util
@@ -28,9 +30,10 @@ class Session(object):
domain (str): Domain to connect to
password (str): Password with which to connect. Can also be password generated from token.
loginkey (str|bytes): Key from already handled login. Overrides username/password.
proxy (str): "url:port" to use for proxy server NOTE: This is currently not implemented due to a limitation of the undersying websocket library. Upvote the issue if you find this important.
proxy (str): "url:port" to use for proxy server
token (str): Login token. This appears to be superfluous
ignore_ssl (bool): Ignore SSL errors
auto_reconnect (bool): In case of server failure, attempt to auto reconnect. All outstanding requests will be killed.
Returns:
:py:class:`Session`: Session connected to url
@@ -43,9 +46,19 @@ class Session(object):
'''
def __init__(self, url, user=None, domain=None, password=None, loginkey=None, proxy=None, token=None, ignore_ssl=False, auto_reconnect=False):
if len(url) < 5 or ((not url.startswith('wss://')) and (not url.startswith('ws://'))):
parsed = urllib.parse.urlparse(url)
if parsed.scheme not in ("wss", "ws"):
raise ValueError("Invalid URL")
port = 80
if parsed.port is None:
if parsed.scheme == "wss":
port = 443
p = list(parsed)
p[1] = f"{parsed.hostname}:{port}"
url = urllib.parse.urlunparse(p)
if (not url.endswith('/')):
url += '/'
@@ -92,6 +105,7 @@ class Session(object):
self._inflight = set()
self._file_tunnels = {}
self._ignore_ssl = ignore_ssl
self.auto_reconnect = auto_reconnect
self._eventer = util.Eventer()
@@ -120,18 +134,15 @@ class Session(object):
ssl_context.verify_mode = ssl.CERT_NONE
options = { "ssl": ssl_context }
# Setup the HTTP proxy if needed
# if (self._proxy != None):
# options.agent = new https_proxy_agent(urllib.parse(self._proxy))
headers = websockets.datastructures.Headers()
if (self._password):
token = self._token if self._token else b""
headers['x-meshauth'] = (base64.b64encode(self._user.encode()) + b',' + base64.b64encode(self._password.encode()) + token).decode()
options["additional_headers"] = headers
async for websocket in websockets.asyncio.client.connect(self.url, process_exception=util._process_websocket_exception, **options):
async for websocket in util.proxy_connect(self.url, proxy_url=self._proxy, process_exception=util._process_websocket_exception, **options):
self.alive = True
self._socket_open.set()
try:
@@ -139,10 +150,10 @@ class Session(object):
tg.create_task(self._listen_data_task(websocket))
tg.create_task(self._send_data_task(websocket))
except* websockets.ConnectionClosed as e:
self._socket_open.clear()
if not self.auto_reconnect:
raise
except* Exception as eg:
self.alive = False
self._socket_open.clear()
@@ -159,13 +170,20 @@ class Session(object):
async def _send_data_task(self, websocket):
while True:
message = await self._message_queue.get()
print(f"{self._user} send: {message}\n")
await websocket.send(message)
async def _listen_data_task(self, websocket):
async for message in websocket:
print(f"{self._user} recv: {message}\n")
data = json.loads(message)
await self._eventer.emit("raw", message)
# Meshcentral does pong wrong and breaks our parsing, so fix it here.
if message == '{action:"pong"}':
message = '{"action":"pong"}'
# Can't process non-json data, don't even try
try:
data = json.loads(message)
except SyntaxError:
continue
action = data.get("action", None)
await self._eventer.emit("server_event", data)
if action == "close":
@@ -234,14 +252,14 @@ class Session(object):
return response
@util._check_socket
async def _send_command_no_response_id(self, data, timeout=None):
async def _send_command_no_response_id(self, data, action_override=None, timeout=None):
responded = asyncio.Event()
response = None
async def _(data):
nonlocal response
response = data
responded.set()
self._eventer.once(data["action"], _)
self._eventer.once(action_override if action_override is not None else data["action"], _)
await self._message_queue.put(json.dumps(data))
await asyncio.wait_for(responded.wait(), timeout=timeout)
if isinstance(response, Exception):
@@ -268,6 +286,23 @@ class Session(object):
"""
return self._user_info
async def ping(self, timeout=None):
'''
Ping the server. WARNING: Non namespaced call. Calling this function again before it returns may cause unintended consequences.
Args:
timeout (int): duration in seconds to wait for a response before throwing an error
Returns:
dict: {"action": "pong"}
Raises:
:py:class:`~meshctrl.exceptions.ServerError`: Error from server
:py:class:`~meshctrl.exceptions.SocketError`: Info about socket closure
asyncio.TimeoutError: Command timed out
'''
data = await self._send_command_no_response_id({"action": "ping"}, action_override="pong", timeout=timeout)
return data
async def list_device_groups(self, timeout=None):
'''
@@ -284,7 +319,7 @@ class Session(object):
:py:class:`~meshctrl.exceptions.SocketError`: Info about socket closure
asyncio.TimeoutError: Command timed out
'''
data = await self._send_command({"action": "meshes"}, "list_device_groups", timeout)
data = await self._send_command({"action": "meshes"}, "list_device_groups", timeout=timeout)
return [mesh.Mesh(m["_id"], self, **m) for m in data["meshes"]]
@@ -323,7 +358,7 @@ class Session(object):
op["name"] = name
if message:
op["msg"] = message
data = await self._send_command(op, "send_invite_email", timeout)
data = await self._send_command(op, "send_invite_email", timeout=timeout)
if ("result" in data and data["result"].lower() != "ok"):
raise exceptions.ServerError(data["result"])
return True
@@ -359,7 +394,7 @@ class Session(object):
op["meshname"] = group
if flags != None:
op["flags"] = flags
data = await self._send_command(op, "generate_invite_link", timeout)
data = await self._send_command(op, "generate_invite_link", timeout=timeout)
if ("result" in data and data["result"].lower() != "ok"):
raise exceptions.ServerError(data["result"])
del data["tag"]
@@ -382,7 +417,7 @@ class Session(object):
:py:class:`~meshctrl.exceptions.SocketError`: Info about socket closure
asyncio.TimeoutError: Command timed out
'''
data = await self._send_command({"action": "users"}, "list_users", timeout)
data = await self._send_command({"action": "users"}, "list_users", timeout=timeout)
if ("result" in data and data["result"].lower() != "ok"):
raise exceptions.ServerError(data["result"])
return data["users"]
@@ -401,7 +436,7 @@ class Session(object):
:py:class:`~meshctrl.exceptions.SocketError`: Info about socket closure
asyncio.TimeoutError: Command timed out
'''
return (await self._send_command({"action": "wssessioncount"}, "list_user_sessions", timeout))["wssessions"]
return (await self._send_command({"action": "wssessioncount"}, "list_user_sessions", timeout=timeout))["wssessions"]
async def list_devices(self, details=False, group=None, meshid=None, timeout=None):
@@ -426,14 +461,14 @@ class Session(object):
tasks = []
async with asyncio.TaskGroup() as tg:
if details:
tasks.append(tg.create_task(self._send_command_no_response_id({"action": "getDeviceDetails", "type":"json"}, timeout)))
tasks.append(tg.create_task(self._send_command_no_response_id({"action": "getDeviceDetails", "type":"json"}, timeout=timeout)))
elif group:
tasks.append(tg.create_task(self._send_command({ "action": 'nodes', "meshname": group}, "list_devices", timeout)))
tasks.append(tg.create_task(self._send_command({ "action": 'nodes', "meshname": group}, "list_devices", timeout=timeout)))
elif meshid:
tasks.append(tg.create_task(self._send_command({ "action": 'nodes', "meshid": meshid}, "list_devices", timeout)))
tasks.append(tg.create_task(self._send_command({ "action": 'nodes', "meshid": meshid}, "list_devices", timeout=timeout)))
else:
tasks.append(tg.create_task(self._send_command({ "action": 'meshes' }, "list_devices", timeout)))
tasks.append(tg.create_task(self._send_command({ "action": 'nodes' }, "list_devices", timeout)))
tasks.append(tg.create_task(self._send_command({ "action": 'meshes' }, "list_devices", timeout=timeout)))
tasks.append(tg.create_task(self._send_command({ "action": 'nodes' }, "list_devices", timeout=timeout)))
res0 = tasks[0].result()
if "result" in res0:
@@ -478,6 +513,24 @@ class Session(object):
node["mesh"] = mesh.Mesh(node.get("meshid"), self)
return [device.Device(n["_id"], self, **n) for n in nodes]
async def raw_messages(self):
'''
Listen to raw messages from the server. These will be strings that have not been parsed at all. Consider this an emergency fallback if meshcentral sends something odd. You will get every message from the websocket.
Returns:
generator(data): A generator which will generate every message the server sends
'''
event_queue = asyncio.Queue()
async def _(data):
await event_queue.put(data)
self._eventer.on("raw", _)
try:
while True:
data = await event_queue.get()
yield data
finally:
self._eventer.off("server_event", _)
async def events(self, filter=None):
'''
Listen to events from the server
@@ -535,7 +588,7 @@ class Session(object):
if limit:
cmd["limit"] = limit
data = await self._send_command(cmd, "list_events", timeout)
data = await self._send_command(cmd, "list_events", timeout=timeout)
return data["events"]
async def list_login_tokens(self, timeout=None):
@@ -552,7 +605,7 @@ class Session(object):
:py:class:`~meshctrl.exceptions.SocketError`: Info about socket closure
asyncio.TimeoutError: Command timed out
'''
return (await self._send_command_no_response_id({"action": "loginTokens"}, timeout))["loginTokens"]
return (await self._send_command_no_response_id({"action": "loginTokens"}, timeout=timeout))["loginTokens"]
async def add_login_token(self, name, expire=None, timeout=None):
'''
@@ -571,7 +624,7 @@ class Session(object):
asyncio.TimeoutError: Command timed out
'''
cmd = { "action": 'createLoginToken', "name": name, "expire": 0 if not expire else expire }
data = await self._send_command_no_response_id(cmd, timeout)
data = await self._send_command_no_response_id(cmd, timeout=timeout)
del data["action"]
return data
@@ -603,7 +656,7 @@ class Session(object):
name = token["tokenUser"]
break
realnames.append(name)
return (await self._send_command_no_response_id({ "action": 'loginTokens', "remove": realnames }, timeout))["loginTokens"]
return (await self._send_command_no_response_id({ "action": 'loginTokens', "remove": realnames }, timeout=timeout))["loginTokens"]
async def add_user(self, name, password=None, randompass=False, domain=None, email=None, emailverified=False, resetpass=False, realname=None, phone=None, rights=None, timeout=None):
'''
@@ -651,7 +704,7 @@ class Session(object):
if isinstance(realname, str):
op["realname"] = realname
data = await self._send_command(op, "add_user", timeout)
data = await self._send_command(op, "add_user", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
return True
@@ -706,7 +759,7 @@ class Session(object):
op["realname"] = realname
if realname is True:
op["realname"] = ''
data = await self._send_command(op, "edit_user", timeout)
data = await self._send_command(op, "edit_user", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
return True
@@ -732,7 +785,7 @@ class Session(object):
elif (self._domain is not None) and ("/" not in userid):
userid = f"user/{self._domain}/{userid}"
data = await self._send_command({ "action": 'deleteuser', "userid": userid }, "remove_user", timeout)
data = await self._send_command({ "action": 'deleteuser', "userid": userid }, "remove_user", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
return True
@@ -760,7 +813,7 @@ class Session(object):
op["domain"] = self._domain
elif self._domain is not None:
op["domain"] = self._domain
data = await self._send_command(op, "add_user_group", timeout)
data = await self._send_command(op, "add_user_group", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
@@ -795,7 +848,7 @@ class Session(object):
if (not groupid.startswith("ugrp/")):
groupid = f"ugrp//{groupid}"
data = await self._send_command({ "action": 'deleteusergroup', "ugrpid": groupid }, "remove_user_group", timeout)
data = await self._send_command({ "action": 'deleteusergroup', "ugrpid": groupid }, "remove_user_group", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
return True
@@ -814,7 +867,7 @@ class Session(object):
:py:class:`~meshctrl.exceptions.SocketError`: Info about socket closure
asyncio.TimeoutError: Command timed out
'''
r = await self._send_command({"action": "usergroups"}, "list_user_groups", timeout)
r = await self._send_command({"action": "usergroups"}, "list_user_groups", timeout=timeout)
groups = []
for key, val in r["ugroups"].items():
val["_id"] = key
@@ -888,7 +941,7 @@ class Session(object):
async with asyncio.TaskGroup() as tg:
tasks.append(tg.create_task(asyncio.wait_for(_(tg), timeout=timeout)))
tasks.append(tg.create_task(asyncio.wait_for(__(tg), timeout=timeout)))
tasks.append(tg.create_task(self._send_command({ "action": 'addusertousergroup', "ugrpid": groupid, "usernames": usernames}, "add_users_to_user_group", timeout)))
tasks.append(tg.create_task(self._send_command({ "action": 'addusertousergroup', "ugrpid": groupid, "usernames": usernames}, "add_users_to_user_group", timeout=timeout)))
res = tasks[2].result()
@@ -922,7 +975,7 @@ class Session(object):
if (not groupid.startswith("ugrp/")):
groupid = f"ugrp//{groupid}"
data = await self._send_command({ "action": 'removeuserfromusergroup', "ugrpid": groupid, "userid": userid }, "remove_from_user_group", timeout)
data = await self._send_command({ "action": 'removeuserfromusergroup', "ugrpid": groupid, "userid": userid }, "remove_from_user_group", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
@@ -953,7 +1006,7 @@ class Session(object):
if rights is None:
rights = 0
data = await self._send_command({ "action": 'adddeviceuser', "nodeid": nodeid, "userids": userids, "rights": rights}, "add_users_to_device", timeout)
data = await self._send_command({ "action": 'adddeviceuser', "nodeid": nodeid, "userids": userids, "rights": rights}, "add_users_to_device", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
@@ -981,7 +1034,7 @@ class Session(object):
userids = [f"user//{u}" if not u.startswith("user//") else u for u in userids]
data = await self._send_command({ "action": 'adddeviceuser', "nodeid": nodeid, "usernames": userids, "rights": 0, "remove": True }, "remove_users_from_device", timeout)
data = await self._send_command({ "action": 'adddeviceuser', "nodeid": nodeid, "usernames": userids, "rights": 0, "remove": True }, "remove_users_from_device", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
@@ -1018,7 +1071,7 @@ class Session(object):
if consent:
op["consent"] = consent
data = await self._send_command(op, "add_device_group", timeout)
data = await self._send_command(op, "add_device_group", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
@@ -1053,7 +1106,7 @@ class Session(object):
op["meshname"] = meshid
del op["meshid"]
data = await self._send_command(op, "remove_device_group", timeout)
data = await self._send_command(op, "remove_device_group", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
@@ -1107,7 +1160,7 @@ class Session(object):
if consent is not None:
op["consent"] = consent
data = await self._send_command(op, "edit_device_group", timeout)
data = await self._send_command(op, "edit_device_group", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
@@ -1138,7 +1191,7 @@ class Session(object):
op["meshname"] = meshid
del op["meshid"]
data = await self._send_command(op, "move_to_device_group", timeout)
data = await self._send_command(op, "move_to_device_group", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
@@ -1171,7 +1224,7 @@ class Session(object):
op["meshname"] = meshid
del op["meshid"]
data = await self._send_command(op, "add_user_to_device_group", timeout)
data = await self._send_command(op, "add_user_to_device_group", timeout=timeout)
results = data["result"].split(",")
out = {}
for i, result in enumerate(results):
@@ -1213,7 +1266,7 @@ class Session(object):
tasks = []
async with asyncio.TaskGroup() as tg:
for userid in userids:
tasks.append(tg.create_task(self._send_command({ "action": 'removemeshuser', "userid": userid } | id_obj, "remove_users_from_device_group", timeout)))
tasks.append(tg.create_task(self._send_command({ "action": 'removemeshuser', "userid": userid } | id_obj, "remove_users_from_device_group", timeout=timeout)))
out = {}
for i, task in enumerate(tasks):
@@ -1247,7 +1300,7 @@ class Session(object):
if userid:
op["userid"] = userid
data = await self._send_command(op, "broadcast", timeout)
data = await self._send_command(op, "broadcast", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
@@ -1271,10 +1324,10 @@ class Session(object):
'''
tasks = []
async with asyncio.TaskGroup() as tg:
tasks.append(tg.create_task(self._send_command({ "action": 'nodes' }, "device_info", timeout)))
tasks.append(tg.create_task(self._send_command_no_response_id({ "action": 'getnetworkinfo', "nodeid": nodeid }, timeout)))
tasks.append(tg.create_task(self._send_command_no_response_id({ "action": 'lastconnect', "nodeid": nodeid }, timeout)))
tasks.append(tg.create_task(self._send_command({ "action": 'getsysinfo', "nodeid": nodeid, "nodeinfo": True }, "device_info", timeout)))
tasks.append(tg.create_task(self._send_command({ "action": 'nodes' }, "device_info", timeout=timeout)))
tasks.append(tg.create_task(self._send_command_no_response_id({ "action": 'getnetworkinfo', "nodeid": nodeid }, timeout=timeout)))
tasks.append(tg.create_task(self._send_command_no_response_id({ "action": 'lastconnect', "nodeid": nodeid }, timeout=timeout)))
tasks.append(tg.create_task(self._send_command({ "action": 'getsysinfo', "nodeid": nodeid, "nodeinfo": True }, "device_info", timeout=timeout)))
tasks.append(tg.create_task(self.list_device_groups(timeout=timeout)))
nodes, network, lastconnect, sysinfo, meshes = (_.result() for _ in tasks)
@@ -1344,7 +1397,7 @@ class Session(object):
if consent is not None:
op["consent"] = consent
data = await self._send_command(op, "edit_device", timeout)
data = await self._send_command(op, "edit_device", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
@@ -1402,7 +1455,7 @@ class Session(object):
continue
result[node]["result"].append(event["value"])
async def __(command):
data = await self._send_command(command, "run_command", timeout)
data = await self._send_command(command, "run_command", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
@@ -1479,7 +1532,7 @@ class Session(object):
if isinstance(nodeids, str):
nodeids = [nodeids]
return await self._send_command({ "action": 'wakedevices', "nodeids": nodeids }, "wake_devices", timeout)
return await self._send_command({ "action": 'wakedevices', "nodeids": nodeids }, "wake_devices", timeout=timeout)
async def reset_devices(self, nodeids, timeout=None):
'''
@@ -1500,7 +1553,7 @@ class Session(object):
if isinstance(nodeids, str):
nodeids = [nodeids]
return await self._send_command({ "action": 'poweraction', "nodeids": nodeids, "actiontype": 3 }, "reset_devices", timeout)
return await self._send_command({ "action": 'poweraction', "nodeids": nodeids, "actiontype": 3 }, "reset_devices", timeout=timeout)
async def sleep_devices(self, nodeids, timeout=None):
'''
@@ -1521,7 +1574,7 @@ class Session(object):
if isinstance(nodeids, str):
nodeids = [nodeids]
return await self._send_command({ "action": 'poweraction', "nodeids": nodeids, "actiontype": 4 }, "sleep_devices", timeout)
return await self._send_command({ "action": 'poweraction', "nodeids": nodeids, "actiontype": 4 }, "sleep_devices", timeout=timeout)
async def power_off_devices(self, nodeids, timeout=None):
'''
@@ -1542,7 +1595,7 @@ class Session(object):
if isinstance(nodeids, str):
nodeids = [nodeids]
return await self._send_command({ "action": 'poweraction', "nodeids": nodeids, "actiontype": 2 }, "power_off_devices", timeout)
return await self._send_command({ "action": 'poweraction', "nodeids": nodeids, "actiontype": 2 }, "power_off_devices", timeout=timeout)
async def list_device_shares(self, nodeid, timeout=None):
'''
@@ -1559,7 +1612,7 @@ class Session(object):
:py:class:`~meshctrl.exceptions.SocketError`: Info about socket closure
asyncio.TimeoutError: Command timed out
'''
data = await self._send_command_no_response_id({ "action": 'deviceShares', "nodeid": nodeid }, timeout)
data = await self._send_command_no_response_id({ "action": 'deviceShares', "nodeid": nodeid }, timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
@@ -1604,7 +1657,7 @@ class Session(object):
end = int(start.timestamp())
if end <= start:
raise ValueError("End time must be ahead of start time")
data = await self._send_command({ "action": 'createDeviceShareLink', "nodeid": nodeid, "guestname": name, "p": constants.SharingTypeEnum[type], "consent": consent, "start": start, "end": end }, "add_device_share", timeout)
data = await self._send_command({ "action": 'createDeviceShareLink', "nodeid": nodeid, "guestname": name, "p": constants.SharingTypeEnum[type], "consent": consent, "start": start, "end": end }, "add_device_share", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
@@ -1633,7 +1686,7 @@ class Session(object):
:py:class:`~meshctrl.exceptions.SocketError`: Info about socket closure
asyncio.TimeoutError: Command timed out
'''
data = await self._send_command({ "action": 'removeDeviceShare', "nodeid": nodeid, "publicid": shareid }, "remove_device_share", timeout)
data = await self._send_command({ "action": 'removeDeviceShare', "nodeid": nodeid, "publicid": shareid }, "remove_device_share", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
@@ -1669,7 +1722,7 @@ class Session(object):
tasks = []
async with asyncio.TaskGroup() as tg:
tasks.append(tg.create_task(asyncio.wait_for(_(), timeout=timeout)))
tasks.append({ "action": 'msg', "type": 'openUrl', "nodeid": nodeid, "url": url }, "device_open_url", timeout)
tasks.append({ "action": 'msg', "type": 'openUrl', "nodeid": nodeid, "url": url }, "device_open_url", timeout=timeout)
res = tasks[1].result()
success = tasks[2].result()
@@ -1701,7 +1754,7 @@ class Session(object):
:py:class:`~meshctrl.exceptions.SocketError`: Info about socket closure
asyncio.TimeoutError: Command timed out
'''
data = await self._send_command({ "action": 'msg', "type": 'messagebox', "nodeid": nodeid, "title": title, "msg": message }, "device_message", timeout)
data = await self._send_command({ "action": 'msg', "type": 'messagebox', "nodeid": nodeid, "title": title, "msg": message }, "device_message", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
@@ -1731,7 +1784,7 @@ class Session(object):
if isinstance(nodeids, str):
nodeids = [nodeids]
data = self._send_command({ "action": 'toast', "nodeids": nodeids, "title": "MeshCentral", "msg": message }, "device_toast", timeout)
data = self._send_command({ "action": 'toast', "nodeids": nodeids, "title": "MeshCentral", "msg": message }, "device_toast", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
@@ -1817,17 +1870,20 @@ class Session(object):
:py:class:`~meshctrl.exceptions.FileTransferCancelled`: File transfer cancelled. Info available on the `stats` property
Returns:
io.IOBase: The stream which has been downloaded into. Cursor will be at the end of the stream.
io.IOBase: The stream which has been downloaded into. Cursor will be at the beginning of where the file is downloaded.
'''
if target is None:
target = io.BytesIO()
start = target.tell()
if unique_file_tunnel:
async with self.file_explorer(nodeid) as files:
await files.download(source, target)
target.seek(start)
return target
else:
files = await self._cached_file_explorer(nodeid, nodeid)
await files.download(source, target, timeout=timeout)
target.seek(start)
return target
async def download_file(self, nodeid, source, filepath, unique_file_tunnel=False, timeout=None):
@@ -1845,10 +1901,10 @@ class Session(object):
:py:class:`~meshctrl.exceptions.FileTransferCancelled`: File transfer cancelled. Info available on the `stats` property
Returns:
io.IOBase: The stream which has been downloaded into. Cursor will be at the end of the stream.
None
'''
with open(filepath, "wb") as f:
return await self.download(nodeid, source, f, unique_file_tunnel, timeout=timeout)
await self.download(nodeid, source, f, unique_file_tunnel, timeout=timeout)
async def _cached_file_explorer(self, nodeid, _id):
if (_id not in self._file_tunnels or not self._file_tunnels[_id].alive):

View File

@@ -4,6 +4,8 @@ import websockets.asyncio
import websockets.asyncio.client
import asyncio
import ssl
from python_socks.async_.asyncio import Proxy
import urllib
from . import exceptions
from . import util
from . import constants
@@ -52,10 +54,6 @@ class Tunnel(object):
ssl_context.verify_mode = ssl.CERT_NONE
options = { "ssl": ssl_context }
# Setup the HTTP proxy if needed
# if (self._session._proxy != None):
# options.agent = new https_proxy_agent(urllib.parse(this._proxy))
if (self.node_id.split('/') != 3) and (self._session._currentDomain is not None):
self.node_id = f"node/{self._session._currentDomain}/{self.node_id}"
@@ -72,14 +70,7 @@ class Tunnel(object):
self.url = self._session.url.replace('/control.ashx', '/meshrelay.ashx?browser=1&p=' + str(self._protocol) + '&nodeid=' + self.node_id + '&id=' + self._tunnel_id + '&auth=' + authcookie["cookie"])
# headers = websockets.datastructures.Headers()
# if (self._password):
# token = self._token if self._token else b""
# headers['x-meshauth'] = (base64.b64encode(self._user.encode()) + b',' + base64.b64encode(self._password.encode()) + token).decode()
# options["additional_headers"] = headers
async for websocket in websockets.asyncio.client.connect(self.url, process_exception=util._process_websocket_exception, **options):
async for websocket in util.proxy_connect(self.url, proxy_url=self._session._proxy, process_exception=util._process_websocket_exception, **options):
self.alive = True
self._socket_open.set()
try:

View File

@@ -9,6 +9,9 @@ import re
import websockets
import ssl
import functools
import urllib
import python_socks
from python_socks.async_.asyncio import Proxy
from . import exceptions
def _encode_cookie(o, key):
@@ -139,17 +142,36 @@ def compare_dict(dict1, dict2):
def _check_socket(f):
@functools.wraps(f)
async def wrapper(self, *args, **kwargs):
await self.initialized.wait()
if not self.alive and self._main_loop_error is not None:
raise self._main_loop_error
elif not self.alive:
raise exceptions.SocketError("Socket Closed")
return await f(self, *args, **kwargs)
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(asyncio.wait_for(self.initialized.wait(), 10))
tg.create_task(asyncio.wait_for(self._socket_open.wait(), 10))
finally:
if not self.alive and self._main_loop_error is not None:
raise self._main_loop_error
elif not self.alive:
raise exceptions.SocketError("Socket Closed")
return await f(self, *args, **kwargs)
return wrapper
def _process_websocket_exception(exc):
tmp = websockets.asyncio.client.process_exception(exc)
# SSLVerification error is a subclass of OSError, but doesn't make sense no retry, so we need to handle it separately.
# SSLVerification error is a subclass of OSError, but doesn't make sense to retry, so we need to handle it separately.
if isinstance(exc, (ssl.SSLCertVerificationError, TimeoutError)):
return exc
if isinstance(exc, python_socks._errors.ProxyError):
return None
return tmp
class proxy_connect(websockets.asyncio.client.connect):
def __init__(self,*args, proxy_url=None, **kwargs):
self.proxy = None
if proxy_url is not None:
self.proxy = Proxy.from_url(proxy_url)
super().__init__(*args, **kwargs)
async def create_connection(self, *args, **kwargs):
if self.proxy is not None:
parsed = urllib.parse.urlparse(self.uri)
self.connection_kwargs["sock"] = await self.proxy.connect(dest_host=parsed.hostname, dest_port=parsed.port)
return await super().create_connection(*args, **kwargs)

3
tests/.gitignore vendored
View File

@@ -1 +1,2 @@
/data
data
environment/scripts/meshcentral/users.json

View File

@@ -54,15 +54,23 @@ class TestEnvironment(object):
self._subp = None
self.mcurl = "wss://localhost:8086"
self.clienturl = "http://localhost:5000"
self._dockerurl = "host.docker.internal:8086"
self.dockerurl = "host.docker.internal:8086"
self.proxyurl = "http://localhost:3128"
def __enter__(self):
global _docker_process
if _docker_process is not None:
self._subp = _docker_process
return self
# Destroy the env in case it wasn't killed correctly last time.
subprocess.check_call(["docker", "compose", "down"], stdout=subprocess.DEVNULL, cwd=thisdir)
self._subp = _docker_process = subprocess.Popen(["docker", "compose", "up", "--build", "--force-recreate", "--no-deps"], stdout=subprocess.DEVNULL, cwd=thisdir)
timeout = 30
if not self._wait_for_meshcentral():
self.__exit__(None, None, None)
raise Exception("Failed to create docker instance")
return self
def _wait_for_meshcentral(self, timeout=30):
start = time.time()
while time.time() - start < timeout:
try:
@@ -79,16 +87,23 @@ class TestEnvironment(object):
pass
time.sleep(1)
else:
self.__exit__(None, None, None)
raise Exception("Failed to create docker instance")
return self
return False
return True
def __exit__(self, exc_t, exc_v, exc_tb):
pass
def create_agent(self, meshid):
return Agent(meshid, self.mcurl, self.clienturl, self._dockerurl)
return Agent(meshid, self.mcurl, self.clienturl, self.dockerurl)
# Restart our docker instances, to test reconnect code.
def restart_mesh(self):
subprocess.check_call(["docker", "container", "restart", "meshctrl-meshcentral"], stdout=subprocess.DEVNULL, cwd=thisdir)
assert self._wait_for_meshcentral(), "Failed to restart docker instance"
def restart_proxy(self):
subprocess.check_call(["docker", "container", "restart", "meshctrl-squid"], stdout=subprocess.DEVNULL, cwd=thisdir)
def _kill_docker_process():
if _docker_process is not None:

View File

@@ -19,9 +19,9 @@ services:
# # mongodb data-directory - A must for data persistence
# - ./meshcentral/mongodb_data:/data/db
networks:
- meshctrl
- meshctrl
extra_hosts:
- "host.docker.internal:host-gateway"
- "host.docker.internal:host-gateway"
meshcentral:
restart: always
@@ -49,4 +49,21 @@ services:
healthcheck:
test: curl -k --fail https://localhost:443/ || exit 1
interval: 5s
timeout: 120s
timeout: 120s
squid:
image: ubuntu/squid:latest
restart: unless-stopped
container_name: meshctrl-squid
ports:
- 3128:3128
networks:
- meshctrl
extra_hosts:
- "host.docker.internal:host-gateway"
volumes:
- ./config/squid/conf.d:/etc/squid/conf.d
- ./config/squid/squid.conf:/etc/squid/squid.conf

View File

@@ -0,0 +1,11 @@
# Logs are managed by logrotate on Debian
logfile_rotate 0
acl all src all
acl Safe_ports port 8086
acl SSS_ports port 8086
http_access allow all
debug_options ALL,0 85,2 88,2
# Set max_filedescriptors to avoid using system's RLIMIT_NOFILE. See LP: #1978272
max_filedescriptors 1024

File diff suppressed because it is too large Load Diff

View File

@@ -1 +0,0 @@
{"admin": "vt9BbctCg59vcuxKPh8v2tbDjudjwyeX", "privileged": "BWl1vhSe0j0lBfoAkx1JLXLBOwIWc0st", "unprivileged": "vCuatfTQGq8bL2pxdvrNzF+Dc4xBq+5Z"}

View File

@@ -7,7 +7,7 @@ import io
import random
async def test_commands(env):
async with meshctrl.Session(env.mcurl, user="admin", password=env.users["admin"], ignore_ssl=True) as admin_session:
async with meshctrl.Session("wss://" + env.dockerurl, user="admin", password=env.users["admin"], ignore_ssl=True, proxy=env.proxyurl) as admin_session:
mesh = await admin_session.add_device_group("test", description="This is a test group", amtonly=False, features=0, consent=0, timeout=10)
try:
with env.create_agent(mesh.short_meshid) as agent:
@@ -53,7 +53,7 @@ async def test_commands(env):
assert (await admin_session.remove_device_group(mesh.meshid, timeout=10)), "Failed to remove device group"
async def test_upload_download(env):
async with meshctrl.Session(env.mcurl, user="admin", password=env.users["admin"], ignore_ssl=True) as admin_session:
async with meshctrl.Session("wss://" + env.dockerurl, user="admin", password=env.users["admin"], ignore_ssl=True, proxy=env.proxyurl) as admin_session:
mesh = await admin_session.add_device_group("test", description="This is a test group", amtonly=False, features=0, consent=0, timeout=10)
try:
with env.create_agent(mesh.short_meshid) as agent:

View File

@@ -9,10 +9,26 @@ import requests
async def test_sanity(env):
async with meshctrl.Session(env.mcurl, user="unprivileged", password=env.users["unprivileged"], ignore_ssl=True) as s:
got_pong = asyncio.Event()
async def _():
async for raw in s.raw_messages():
if raw == '{action:"pong"}':
got_pong.set()
break
ping_task = None
async with asyncio.TaskGroup() as tg:
tg.create_task(asyncio.wait_for(_(), timeout=5))
tg.create_task(asyncio.wait_for(got_pong.wait(), timeout=5))
ping_task = tg.create_task(s.ping(timeout=10))
print("\ninfo ping: {}\n".format(ping_task.result()))
print("\ninfo user_info: {}\n".format(await s.user_info()))
print("\ninfo server_info: {}\n".format(await s.server_info()))
pass
async def test_proxy(env):
async with meshctrl.Session("wss://" + env.dockerurl, user="unprivileged", password=env.users["unprivileged"], ignore_ssl=True, proxy=env.proxyurl) as s:
pass
async def test_ssl(env):
try:
async with meshctrl.Session(env.mcurl, user="unprivileged", password=env.users["unprivileged"], ignore_ssl=False) as s:
@@ -20,4 +36,21 @@ async def test_ssl(env):
except* ssl.SSLCertVerificationError:
pass
else:
raise Exception("Invalid SSL certificate accepted")
raise Exception("Invalid SSL certificate accepted")
async def test_urlparse():
# This tests the url port adding necessitated by python-socks. Our test environment doesn't use 443, so this is just a quick sanity test.
try:
async with meshctrl.Session("wss://localhost", user="unprivileged", password="Not a real password", ignore_ssl=True) as s:
pass
except* TimeoutError:
#We're not running a server, so timeout is our expected outcome
pass
# This tests our check for wss/ws url schemes
try:
async with meshctrl.Session("https://localhost", user="unprivileged", password="Not a real password", ignore_ssl=True) as s:
pass
except* ValueError:
#We're not running a server, so timeout is our expected outcome
pass

View File

@@ -31,6 +31,35 @@ async def test_admin(env):
assert len(admin_users) == len(env.users.keys()), "Admin cannot see correct number of users"
assert len(admin_sessions) == 2, "Admin cannot see correct number of oser sessions"
async def test_auto_reconnect(env):
async with meshctrl.Session(env.mcurl, user="admin", password=env.users["admin"], ignore_ssl=True, auto_reconnect=True) as admin_session:
env.restart_mesh()
await asyncio.sleep(10)
await admin_session.ping(timeout=10)
# As above, but with proxy
async with meshctrl.Session("wss://" + env.dockerurl, user="admin", password=env.users["admin"], ignore_ssl=True, auto_reconnect=True, proxy=env.proxyurl) as admin_session:
env.restart_mesh()
for i in range(3):
try:
await admin_session.ping(timeout=10)
except:
continue
break
else:
raise Exception("Failed to reconnect")
env.restart_proxy()
for i in range(3):
try:
await admin_session.ping(timeout=10)
except* Exception as e:
pass
else:
break
else:
raise Exception("Failed to reconnect")
async def test_users(env):
try:
@@ -396,11 +425,9 @@ async def test_session_files(env):
assert r["size"] == len(randdata), "Uploaded wrong number of bytes"
s = await admin_session.download(agent.nodeid, f"{pwd}/test", timeout=5)
s.seek(0)
assert s.read() == randdata, "Downloaded bad data"
await admin_session.download(agent.nodeid, f"{pwd}/test", downfilestream, timeout=5)
downfilestream.seek(0)
assert downfilestream.read() == randdata, "Downloaded bad data"
await admin_session.download_file(agent.nodeid, f"{pwd}/test2", os.path.join(thisdir, "data", "test"), timeout=5)