mirror of
https://github.com/HuFlungDu/pylibmeshctrl.git
synced 2026-02-20 13:42:11 +00:00
Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4eda4e6c08 | ||
|
|
ab2a4c40bc | ||
|
|
0a657cee48 | ||
|
|
03441161b2 | ||
|
|
24adf3baa5 |
@@ -30,7 +30,7 @@ class Session(object):
|
||||
domain (str): Domain to connect to
|
||||
password (str): Password with which to connect. Can also be password generated from token.
|
||||
loginkey (str|bytes): Key from already handled login. Overrides username/password.
|
||||
proxy (str): "url:port" to use for proxy server NOTE: This is currently not implemented due to a limitation of the undersying websocket library. Upvote the issue if you find this important.
|
||||
proxy (str): "url:port" to use for proxy server
|
||||
token (str): Login token. This appears to be superfluous
|
||||
ignore_ssl (bool): Ignore SSL errors
|
||||
auto_reconnect (bool): In case of server failure, attempt to auto reconnect. All outstanding requests will be killed.
|
||||
@@ -46,9 +46,19 @@ class Session(object):
|
||||
'''
|
||||
|
||||
def __init__(self, url, user=None, domain=None, password=None, loginkey=None, proxy=None, token=None, ignore_ssl=False, auto_reconnect=False):
|
||||
if len(url) < 5 or ((not url.startswith('wss://')) and (not url.startswith('ws://'))):
|
||||
parsed = urllib.parse.urlparse(url)
|
||||
|
||||
if parsed.scheme not in ("wss", "ws"):
|
||||
raise ValueError("Invalid URL")
|
||||
|
||||
port = 80
|
||||
if parsed.port is None:
|
||||
if parsed.scheme == "wss":
|
||||
port = 443
|
||||
p = list(parsed)
|
||||
p[1] = f"{parsed.hostname}:{port}"
|
||||
url = urllib.parse.urlunparse(p)
|
||||
|
||||
if (not url.endswith('/')):
|
||||
url += '/'
|
||||
|
||||
@@ -130,13 +140,9 @@ class Session(object):
|
||||
token = self._token if self._token else b""
|
||||
headers['x-meshauth'] = (base64.b64encode(self._user.encode()) + b',' + base64.b64encode(self._password.encode()) + token).decode()
|
||||
|
||||
if self._proxy:
|
||||
proxy = Proxy.from_url(self._proxy)
|
||||
parsed = urllib.parse.urlparse(self.url)
|
||||
options["sock"] = await proxy.connect(dest_host=parsed.hostname, dest_port=parsed.port)
|
||||
|
||||
options["additional_headers"] = headers
|
||||
async for websocket in websockets.asyncio.client.connect(self.url, process_exception=util._process_websocket_exception, **options):
|
||||
async for websocket in util.proxy_connect(self.url, proxy_url=self._proxy, process_exception=util._process_websocket_exception, **options):
|
||||
self.alive = True
|
||||
self._socket_open.set()
|
||||
try:
|
||||
@@ -144,10 +150,10 @@ class Session(object):
|
||||
tg.create_task(self._listen_data_task(websocket))
|
||||
tg.create_task(self._send_data_task(websocket))
|
||||
except* websockets.ConnectionClosed as e:
|
||||
|
||||
self._socket_open.clear()
|
||||
if not self.auto_reconnect:
|
||||
raise
|
||||
|
||||
except* Exception as eg:
|
||||
self.alive = False
|
||||
self._socket_open.clear()
|
||||
@@ -1331,6 +1337,14 @@ class Session(object):
|
||||
if sysinfo is not None and sysinfo.get("node", None):
|
||||
# Node information came with system information
|
||||
node = sysinfo.get("node", None)
|
||||
for meshid, _nodes in nodes["nodes"].items():
|
||||
for _mesh in meshes:
|
||||
if _mesh.meshid == meshid:
|
||||
break
|
||||
else:
|
||||
break
|
||||
if meshid == node["meshid"]:
|
||||
node["mesh"] = _mesh
|
||||
else:
|
||||
# This device does not have system information, get node information from the nodes list.
|
||||
for meshid, _nodes in nodes["nodes"].items():
|
||||
|
||||
@@ -70,12 +70,7 @@ class Tunnel(object):
|
||||
|
||||
self.url = self._session.url.replace('/control.ashx', '/meshrelay.ashx?browser=1&p=' + str(self._protocol) + '&nodeid=' + self.node_id + '&id=' + self._tunnel_id + '&auth=' + authcookie["cookie"])
|
||||
|
||||
if self._session._proxy:
|
||||
proxy = Proxy.from_url(self._session._proxy)
|
||||
parsed = urllib.parse.urlparse(self.url)
|
||||
options["sock"] = await proxy.connect(dest_host=parsed.hostname, dest_port=parsed.port)
|
||||
|
||||
async for websocket in websockets.asyncio.client.connect(self.url, process_exception=util._process_websocket_exception, **options):
|
||||
async for websocket in util.proxy_connect(self.url, proxy_url=self._session._proxy, process_exception=util._process_websocket_exception, **options):
|
||||
self.alive = True
|
||||
self._socket_open.set()
|
||||
try:
|
||||
|
||||
@@ -9,6 +9,9 @@ import re
|
||||
import websockets
|
||||
import ssl
|
||||
import functools
|
||||
import urllib
|
||||
import python_socks
|
||||
from python_socks.async_.asyncio import Proxy
|
||||
from . import exceptions
|
||||
|
||||
def _encode_cookie(o, key):
|
||||
@@ -139,17 +142,36 @@ def compare_dict(dict1, dict2):
|
||||
def _check_socket(f):
|
||||
@functools.wraps(f)
|
||||
async def wrapper(self, *args, **kwargs):
|
||||
await self.initialized.wait()
|
||||
if not self.alive and self._main_loop_error is not None:
|
||||
raise self._main_loop_error
|
||||
elif not self.alive:
|
||||
raise exceptions.SocketError("Socket Closed")
|
||||
return await f(self, *args, **kwargs)
|
||||
try:
|
||||
async with asyncio.TaskGroup() as tg:
|
||||
tg.create_task(asyncio.wait_for(self.initialized.wait(), 10))
|
||||
tg.create_task(asyncio.wait_for(self._socket_open.wait(), 10))
|
||||
finally:
|
||||
if not self.alive and self._main_loop_error is not None:
|
||||
raise self._main_loop_error
|
||||
elif not self.alive:
|
||||
raise exceptions.SocketError("Socket Closed")
|
||||
return await f(self, *args, **kwargs)
|
||||
return wrapper
|
||||
|
||||
def _process_websocket_exception(exc):
|
||||
tmp = websockets.asyncio.client.process_exception(exc)
|
||||
# SSLVerification error is a subclass of OSError, but doesn't make sense no retry, so we need to handle it separately.
|
||||
# SSLVerification error is a subclass of OSError, but doesn't make sense to retry, so we need to handle it separately.
|
||||
if isinstance(exc, (ssl.SSLCertVerificationError, TimeoutError)):
|
||||
return exc
|
||||
if isinstance(exc, python_socks._errors.ProxyError):
|
||||
return None
|
||||
return tmp
|
||||
|
||||
class proxy_connect(websockets.asyncio.client.connect):
|
||||
def __init__(self,*args, proxy_url=None, **kwargs):
|
||||
self.proxy = None
|
||||
if proxy_url is not None:
|
||||
self.proxy = Proxy.from_url(proxy_url)
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
async def create_connection(self, *args, **kwargs):
|
||||
if self.proxy is not None:
|
||||
parsed = urllib.parse.urlparse(self.uri)
|
||||
self.connection_kwargs["sock"] = await self.proxy.connect(dest_host=parsed.hostname, dest_port=parsed.port)
|
||||
return await super().create_connection(*args, **kwargs)
|
||||
@@ -65,7 +65,12 @@ class TestEnvironment(object):
|
||||
# Destroy the env in case it wasn't killed correctly last time.
|
||||
subprocess.check_call(["docker", "compose", "down"], stdout=subprocess.DEVNULL, cwd=thisdir)
|
||||
self._subp = _docker_process = subprocess.Popen(["docker", "compose", "up", "--build", "--force-recreate", "--no-deps"], stdout=subprocess.DEVNULL, cwd=thisdir)
|
||||
timeout = 30
|
||||
if not self._wait_for_meshcentral():
|
||||
self.__exit__(None, None, None)
|
||||
raise Exception("Failed to create docker instance")
|
||||
return self
|
||||
|
||||
def _wait_for_meshcentral(self, timeout=30):
|
||||
start = time.time()
|
||||
while time.time() - start < timeout:
|
||||
try:
|
||||
@@ -82,10 +87,8 @@ class TestEnvironment(object):
|
||||
pass
|
||||
time.sleep(1)
|
||||
else:
|
||||
self.__exit__(None, None, None)
|
||||
raise Exception("Failed to create docker instance")
|
||||
return self
|
||||
|
||||
return False
|
||||
return True
|
||||
|
||||
def __exit__(self, exc_t, exc_v, exc_tb):
|
||||
pass
|
||||
@@ -93,6 +96,15 @@ class TestEnvironment(object):
|
||||
def create_agent(self, meshid):
|
||||
return Agent(meshid, self.mcurl, self.clienturl, self.dockerurl)
|
||||
|
||||
# Restart our docker instances, to test reconnect code.
|
||||
def restart_mesh(self):
|
||||
subprocess.check_call(["docker", "container", "restart", "meshctrl-meshcentral"], stdout=subprocess.DEVNULL, cwd=thisdir)
|
||||
assert self._wait_for_meshcentral(), "Failed to restart docker instance"
|
||||
|
||||
def restart_proxy(self):
|
||||
subprocess.check_call(["docker", "container", "restart", "meshctrl-squid"], stdout=subprocess.DEVNULL, cwd=thisdir)
|
||||
|
||||
|
||||
def _kill_docker_process():
|
||||
if _docker_process is not None:
|
||||
_docker_process.kill()
|
||||
|
||||
@@ -37,3 +37,20 @@ async def test_ssl(env):
|
||||
pass
|
||||
else:
|
||||
raise Exception("Invalid SSL certificate accepted")
|
||||
|
||||
async def test_urlparse():
|
||||
# This tests the url port adding necessitated by python-socks. Our test environment doesn't use 443, so this is just a quick sanity test.
|
||||
try:
|
||||
async with meshctrl.Session("wss://localhost", user="unprivileged", password="Not a real password", ignore_ssl=True) as s:
|
||||
pass
|
||||
except* TimeoutError:
|
||||
#We're not running a server, so timeout is our expected outcome
|
||||
pass
|
||||
|
||||
# This tests our check for wss/ws url schemes
|
||||
try:
|
||||
async with meshctrl.Session("https://localhost", user="unprivileged", password="Not a real password", ignore_ssl=True) as s:
|
||||
pass
|
||||
except* ValueError:
|
||||
#We're not running a server, so timeout is our expected outcome
|
||||
pass
|
||||
@@ -31,6 +31,35 @@ async def test_admin(env):
|
||||
assert len(admin_users) == len(env.users.keys()), "Admin cannot see correct number of users"
|
||||
assert len(admin_sessions) == 2, "Admin cannot see correct number of oser sessions"
|
||||
|
||||
async def test_auto_reconnect(env):
|
||||
async with meshctrl.Session(env.mcurl, user="admin", password=env.users["admin"], ignore_ssl=True, auto_reconnect=True) as admin_session:
|
||||
env.restart_mesh()
|
||||
await asyncio.sleep(10)
|
||||
await admin_session.ping(timeout=10)
|
||||
|
||||
# As above, but with proxy
|
||||
async with meshctrl.Session("wss://" + env.dockerurl, user="admin", password=env.users["admin"], ignore_ssl=True, auto_reconnect=True, proxy=env.proxyurl) as admin_session:
|
||||
env.restart_mesh()
|
||||
for i in range(3):
|
||||
try:
|
||||
await admin_session.ping(timeout=10)
|
||||
except:
|
||||
continue
|
||||
break
|
||||
else:
|
||||
raise Exception("Failed to reconnect")
|
||||
|
||||
env.restart_proxy()
|
||||
for i in range(3):
|
||||
try:
|
||||
await admin_session.ping(timeout=10)
|
||||
except* Exception as e:
|
||||
pass
|
||||
else:
|
||||
break
|
||||
else:
|
||||
raise Exception("Failed to reconnect")
|
||||
|
||||
|
||||
async def test_users(env):
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user