From ab2a4c40bc748698c2a1e519482eafa77cc46f0f Mon Sep 17 00:00:00 2001 From: Josiah Baldwin Date: Tue, 10 Dec 2024 12:49:50 -0800 Subject: [PATCH] Fixed auto-reconnect for proxy and created tests for auto-reconnect --- src/meshctrl/session.py | 8 ++------ src/meshctrl/tunnel.py | 7 +------ src/meshctrl/util.py | 37 ++++++++++++++++++++++++++--------- tests/environment/__init__.py | 22 ++++++++++++++++----- tests/test_session.py | 29 +++++++++++++++++++++++++++ 5 files changed, 77 insertions(+), 26 deletions(-) diff --git a/src/meshctrl/session.py b/src/meshctrl/session.py index 88c240a..55e7df1 100644 --- a/src/meshctrl/session.py +++ b/src/meshctrl/session.py @@ -140,13 +140,9 @@ class Session(object): token = self._token if self._token else b"" headers['x-meshauth'] = (base64.b64encode(self._user.encode()) + b',' + base64.b64encode(self._password.encode()) + token).decode() - if self._proxy: - proxy = Proxy.from_url(self._proxy) - parsed = urllib.parse.urlparse(self.url) - options["sock"] = await proxy.connect(dest_host=parsed.hostname, dest_port=parsed.port) options["additional_headers"] = headers - async for websocket in websockets.asyncio.client.connect(self.url, process_exception=util._process_websocket_exception, **options): + async for websocket in util.proxy_connect(self.url, proxy_url=self._proxy, process_exception=util._process_websocket_exception, **options): self.alive = True self._socket_open.set() try: @@ -154,10 +150,10 @@ class Session(object): tg.create_task(self._listen_data_task(websocket)) tg.create_task(self._send_data_task(websocket)) except* websockets.ConnectionClosed as e: - self._socket_open.clear() if not self.auto_reconnect: raise + except* Exception as eg: self.alive = False self._socket_open.clear() diff --git a/src/meshctrl/tunnel.py b/src/meshctrl/tunnel.py index b832566..9a68c75 100644 --- a/src/meshctrl/tunnel.py +++ b/src/meshctrl/tunnel.py @@ -70,12 +70,7 @@ class Tunnel(object): self.url = self._session.url.replace('/control.ashx', '/meshrelay.ashx?browser=1&p=' + str(self._protocol) + '&nodeid=' + self.node_id + '&id=' + self._tunnel_id + '&auth=' + authcookie["cookie"]) - if self._session._proxy: - proxy = Proxy.from_url(self._session._proxy) - parsed = urllib.parse.urlparse(self.url) - options["sock"] = await proxy.connect(dest_host=parsed.hostname, dest_port=parsed.port) - - async for websocket in websockets.asyncio.client.connect(self.url, process_exception=util._process_websocket_exception, **options): + async for websocket in util.proxy_connect(self.url, proxy_url=self._session._proxy, process_exception=util._process_websocket_exception, **options): self.alive = True self._socket_open.set() try: diff --git a/src/meshctrl/util.py b/src/meshctrl/util.py index dfb35b5..532d3a8 100644 --- a/src/meshctrl/util.py +++ b/src/meshctrl/util.py @@ -9,6 +9,9 @@ import re import websockets import ssl import functools +import urllib +import python_socks +from python_socks.async_.asyncio import Proxy from . import exceptions def _encode_cookie(o, key): @@ -139,20 +142,36 @@ def compare_dict(dict1, dict2): def _check_socket(f): @functools.wraps(f) async def wrapper(self, *args, **kwargs): - await asyncio.wait_for(self.initialized.wait(), 10) - if not self.alive and self._main_loop_error is not None: - raise self._main_loop_error - elif not self.alive: - raise exceptions.SocketError("Socket Closed") - return await f(self, *args, **kwargs) + try: + async with asyncio.TaskGroup() as tg: + tg.create_task(asyncio.wait_for(self.initialized.wait(), 10)) + tg.create_task(asyncio.wait_for(self._socket_open.wait(), 10)) + finally: + if not self.alive and self._main_loop_error is not None: + raise self._main_loop_error + elif not self.alive: + raise exceptions.SocketError("Socket Closed") + return await f(self, *args, **kwargs) return wrapper def _process_websocket_exception(exc): tmp = websockets.asyncio.client.process_exception(exc) - # SSLVerification error is a subclass of OSError, but doesn't make sense no retry, so we need to handle it separately. + # SSLVerification error is a subclass of OSError, but doesn't make sense to retry, so we need to handle it separately. if isinstance(exc, (ssl.SSLCertVerificationError, TimeoutError)): return exc + if isinstance(exc, python_socks._errors.ProxyError): + return None return tmp -class Proxy(object): - pass \ No newline at end of file +class proxy_connect(websockets.asyncio.client.connect): + def __init__(self,*args, proxy_url=None, **kwargs): + self.proxy = None + if proxy_url is not None: + self.proxy = Proxy.from_url(proxy_url) + super().__init__(*args, **kwargs) + + async def create_connection(self, *args, **kwargs): + if self.proxy is not None: + parsed = urllib.parse.urlparse(self.uri) + self.connection_kwargs["sock"] = await self.proxy.connect(dest_host=parsed.hostname, dest_port=parsed.port) + return await super().create_connection(*args, **kwargs) \ No newline at end of file diff --git a/tests/environment/__init__.py b/tests/environment/__init__.py index f783990..1badb22 100644 --- a/tests/environment/__init__.py +++ b/tests/environment/__init__.py @@ -65,7 +65,12 @@ class TestEnvironment(object): # Destroy the env in case it wasn't killed correctly last time. subprocess.check_call(["docker", "compose", "down"], stdout=subprocess.DEVNULL, cwd=thisdir) self._subp = _docker_process = subprocess.Popen(["docker", "compose", "up", "--build", "--force-recreate", "--no-deps"], stdout=subprocess.DEVNULL, cwd=thisdir) - timeout = 30 + if not self._wait_for_meshcentral(): + self.__exit__(None, None, None) + raise Exception("Failed to create docker instance") + return self + + def _wait_for_meshcentral(self, timeout=30): start = time.time() while time.time() - start < timeout: try: @@ -82,10 +87,8 @@ class TestEnvironment(object): pass time.sleep(1) else: - self.__exit__(None, None, None) - raise Exception("Failed to create docker instance") - return self - + return False + return True def __exit__(self, exc_t, exc_v, exc_tb): pass @@ -93,6 +96,15 @@ class TestEnvironment(object): def create_agent(self, meshid): return Agent(meshid, self.mcurl, self.clienturl, self.dockerurl) + # Restart our docker instances, to test reconnect code. + def restart_mesh(self): + subprocess.check_call(["docker", "container", "restart", "meshctrl-meshcentral"], stdout=subprocess.DEVNULL, cwd=thisdir) + assert self._wait_for_meshcentral(), "Failed to restart docker instance" + + def restart_proxy(self): + subprocess.check_call(["docker", "container", "restart", "meshctrl-squid"], stdout=subprocess.DEVNULL, cwd=thisdir) + + def _kill_docker_process(): if _docker_process is not None: _docker_process.kill() diff --git a/tests/test_session.py b/tests/test_session.py index 5747261..484ea6a 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -31,6 +31,35 @@ async def test_admin(env): assert len(admin_users) == len(env.users.keys()), "Admin cannot see correct number of users" assert len(admin_sessions) == 2, "Admin cannot see correct number of oser sessions" +async def test_auto_reconnect(env): + async with meshctrl.Session(env.mcurl, user="admin", password=env.users["admin"], ignore_ssl=True, auto_reconnect=True) as admin_session: + env.restart_mesh() + await asyncio.sleep(10) + await admin_session.ping(timeout=10) + + # As above, but with proxy + async with meshctrl.Session("wss://" + env.dockerurl, user="admin", password=env.users["admin"], ignore_ssl=True, auto_reconnect=True, proxy=env.proxyurl) as admin_session: + env.restart_mesh() + for i in range(3): + try: + await admin_session.ping(timeout=10) + except: + continue + break + else: + raise Exception("Failed to reconnect") + + env.restart_proxy() + for i in range(3): + try: + await admin_session.ping(timeout=10) + except* Exception as e: + pass + else: + break + else: + raise Exception("Failed to reconnect") + async def test_users(env): try: