Compare commits

...

5 Commits

Author SHA1 Message Date
Josiah Baldwin
ab2a4c40bc Fixed auto-reconnect for proxy and created tests for auto-reconnect 2024-12-10 13:05:22 -08:00
Josiah Baldwin
0a657cee48 Added default port numbers to URL. This fixes an issue with proxy handling mhen you don't pass a port in the url. 2024-12-10 10:33:05 -08:00
Josiah Baldwin
03441161b2 Added timeout to check_socket decorator in case a connection fails to be made 2024-12-10 10:31:52 -08:00
Josiah Baldwin
24adf3baa5 Updated docs for proxy 2024-12-09 16:45:12 -08:00
Josiah Baldwin
1adaccabc0 Added proxy and tests for proxy 2024-12-09 16:42:32 -08:00
12 changed files with 9506 additions and 40 deletions

Binary file not shown.

View File

@@ -46,6 +46,7 @@ install_requires =
importlib-metadata
cryptography>=43.0.3
websockets>=13.1
python-socks[asyncio]
[options.packages.find]

View File

@@ -8,6 +8,8 @@ import json
import datetime
import io
import ssl
import urllib
from python_socks.async_.asyncio import Proxy
from . import constants
from . import exceptions
from . import util
@@ -28,7 +30,7 @@ class Session(object):
domain (str): Domain to connect to
password (str): Password with which to connect. Can also be password generated from token.
loginkey (str|bytes): Key from already handled login. Overrides username/password.
proxy (str): "url:port" to use for proxy server NOTE: This is currently not implemented due to a limitation of the undersying websocket library. Upvote the issue if you find this important.
proxy (str): "url:port" to use for proxy server
token (str): Login token. This appears to be superfluous
ignore_ssl (bool): Ignore SSL errors
auto_reconnect (bool): In case of server failure, attempt to auto reconnect. All outstanding requests will be killed.
@@ -44,9 +46,19 @@ class Session(object):
'''
def __init__(self, url, user=None, domain=None, password=None, loginkey=None, proxy=None, token=None, ignore_ssl=False, auto_reconnect=False):
if len(url) < 5 or ((not url.startswith('wss://')) and (not url.startswith('ws://'))):
parsed = urllib.parse.urlparse(url)
if parsed.scheme not in ("wss", "ws"):
raise ValueError("Invalid URL")
port = 80
if parsed.port is None:
if parsed.scheme == "wss":
port = 443
p = list(parsed)
p[1] = f"{parsed.hostname}:{port}"
url = urllib.parse.urlunparse(p)
if (not url.endswith('/')):
url += '/'
@@ -122,18 +134,15 @@ class Session(object):
ssl_context.verify_mode = ssl.CERT_NONE
options = { "ssl": ssl_context }
# Setup the HTTP proxy if needed
# if (self._proxy != None):
# options.agent = new https_proxy_agent(urllib.parse(self._proxy))
headers = websockets.datastructures.Headers()
if (self._password):
token = self._token if self._token else b""
headers['x-meshauth'] = (base64.b64encode(self._user.encode()) + b',' + base64.b64encode(self._password.encode()) + token).decode()
options["additional_headers"] = headers
async for websocket in websockets.asyncio.client.connect(self.url, process_exception=util._process_websocket_exception, **options):
async for websocket in util.proxy_connect(self.url, proxy_url=self._proxy, process_exception=util._process_websocket_exception, **options):
self.alive = True
self._socket_open.set()
try:
@@ -141,10 +150,10 @@ class Session(object):
tg.create_task(self._listen_data_task(websocket))
tg.create_task(self._send_data_task(websocket))
except* websockets.ConnectionClosed as e:
self._socket_open.clear()
if not self.auto_reconnect:
raise
except* Exception as eg:
self.alive = False
self._socket_open.clear()

View File

@@ -4,6 +4,8 @@ import websockets.asyncio
import websockets.asyncio.client
import asyncio
import ssl
from python_socks.async_.asyncio import Proxy
import urllib
from . import exceptions
from . import util
from . import constants
@@ -52,10 +54,6 @@ class Tunnel(object):
ssl_context.verify_mode = ssl.CERT_NONE
options = { "ssl": ssl_context }
# Setup the HTTP proxy if needed
# if (self._session._proxy != None):
# options.agent = new https_proxy_agent(urllib.parse(this._proxy))
if (self.node_id.split('/') != 3) and (self._session._currentDomain is not None):
self.node_id = f"node/{self._session._currentDomain}/{self.node_id}"
@@ -72,14 +70,7 @@ class Tunnel(object):
self.url = self._session.url.replace('/control.ashx', '/meshrelay.ashx?browser=1&p=' + str(self._protocol) + '&nodeid=' + self.node_id + '&id=' + self._tunnel_id + '&auth=' + authcookie["cookie"])
# headers = websockets.datastructures.Headers()
# if (self._password):
# token = self._token if self._token else b""
# headers['x-meshauth'] = (base64.b64encode(self._user.encode()) + b',' + base64.b64encode(self._password.encode()) + token).decode()
# options["additional_headers"] = headers
async for websocket in websockets.asyncio.client.connect(self.url, process_exception=util._process_websocket_exception, **options):
async for websocket in util.proxy_connect(self.url, proxy_url=self._session._proxy, process_exception=util._process_websocket_exception, **options):
self.alive = True
self._socket_open.set()
try:

View File

@@ -9,6 +9,9 @@ import re
import websockets
import ssl
import functools
import urllib
import python_socks
from python_socks.async_.asyncio import Proxy
from . import exceptions
def _encode_cookie(o, key):
@@ -139,17 +142,36 @@ def compare_dict(dict1, dict2):
def _check_socket(f):
@functools.wraps(f)
async def wrapper(self, *args, **kwargs):
await self.initialized.wait()
if not self.alive and self._main_loop_error is not None:
raise self._main_loop_error
elif not self.alive:
raise exceptions.SocketError("Socket Closed")
return await f(self, *args, **kwargs)
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(asyncio.wait_for(self.initialized.wait(), 10))
tg.create_task(asyncio.wait_for(self._socket_open.wait(), 10))
finally:
if not self.alive and self._main_loop_error is not None:
raise self._main_loop_error
elif not self.alive:
raise exceptions.SocketError("Socket Closed")
return await f(self, *args, **kwargs)
return wrapper
def _process_websocket_exception(exc):
tmp = websockets.asyncio.client.process_exception(exc)
# SSLVerification error is a subclass of OSError, but doesn't make sense no retry, so we need to handle it separately.
# SSLVerification error is a subclass of OSError, but doesn't make sense to retry, so we need to handle it separately.
if isinstance(exc, (ssl.SSLCertVerificationError, TimeoutError)):
return exc
if isinstance(exc, python_socks._errors.ProxyError):
return None
return tmp
class proxy_connect(websockets.asyncio.client.connect):
def __init__(self,*args, proxy_url=None, **kwargs):
self.proxy = None
if proxy_url is not None:
self.proxy = Proxy.from_url(proxy_url)
super().__init__(*args, **kwargs)
async def create_connection(self, *args, **kwargs):
if self.proxy is not None:
parsed = urllib.parse.urlparse(self.uri)
self.connection_kwargs["sock"] = await self.proxy.connect(dest_host=parsed.hostname, dest_port=parsed.port)
return await super().create_connection(*args, **kwargs)

View File

@@ -54,15 +54,23 @@ class TestEnvironment(object):
self._subp = None
self.mcurl = "wss://localhost:8086"
self.clienturl = "http://localhost:5000"
self._dockerurl = "host.docker.internal:8086"
self.dockerurl = "host.docker.internal:8086"
self.proxyurl = "http://localhost:3128"
def __enter__(self):
global _docker_process
if _docker_process is not None:
self._subp = _docker_process
return self
# Destroy the env in case it wasn't killed correctly last time.
subprocess.check_call(["docker", "compose", "down"], stdout=subprocess.DEVNULL, cwd=thisdir)
self._subp = _docker_process = subprocess.Popen(["docker", "compose", "up", "--build", "--force-recreate", "--no-deps"], stdout=subprocess.DEVNULL, cwd=thisdir)
timeout = 30
if not self._wait_for_meshcentral():
self.__exit__(None, None, None)
raise Exception("Failed to create docker instance")
return self
def _wait_for_meshcentral(self, timeout=30):
start = time.time()
while time.time() - start < timeout:
try:
@@ -79,16 +87,23 @@ class TestEnvironment(object):
pass
time.sleep(1)
else:
self.__exit__(None, None, None)
raise Exception("Failed to create docker instance")
return self
return False
return True
def __exit__(self, exc_t, exc_v, exc_tb):
pass
def create_agent(self, meshid):
return Agent(meshid, self.mcurl, self.clienturl, self._dockerurl)
return Agent(meshid, self.mcurl, self.clienturl, self.dockerurl)
# Restart our docker instances, to test reconnect code.
def restart_mesh(self):
subprocess.check_call(["docker", "container", "restart", "meshctrl-meshcentral"], stdout=subprocess.DEVNULL, cwd=thisdir)
assert self._wait_for_meshcentral(), "Failed to restart docker instance"
def restart_proxy(self):
subprocess.check_call(["docker", "container", "restart", "meshctrl-squid"], stdout=subprocess.DEVNULL, cwd=thisdir)
def _kill_docker_process():
if _docker_process is not None:

View File

@@ -19,9 +19,9 @@ services:
# # mongodb data-directory - A must for data persistence
# - ./meshcentral/mongodb_data:/data/db
networks:
- meshctrl
- meshctrl
extra_hosts:
- "host.docker.internal:host-gateway"
- "host.docker.internal:host-gateway"
meshcentral:
restart: always
@@ -49,4 +49,21 @@ services:
healthcheck:
test: curl -k --fail https://localhost:443/ || exit 1
interval: 5s
timeout: 120s
timeout: 120s
squid:
image: ubuntu/squid:latest
restart: unless-stopped
container_name: meshctrl-squid
ports:
- 3128:3128
networks:
- meshctrl
extra_hosts:
- "host.docker.internal:host-gateway"
volumes:
- ./config/squid/conf.d:/etc/squid/conf.d
- ./config/squid/squid.conf:/etc/squid/squid.conf

View File

@@ -0,0 +1,11 @@
# Logs are managed by logrotate on Debian
logfile_rotate 0
acl all src all
acl Safe_ports port 8086
acl SSS_ports port 8086
http_access allow all
debug_options ALL,0 85,2 88,2
# Set max_filedescriptors to avoid using system's RLIMIT_NOFILE. See LP: #1978272
max_filedescriptors 1024

File diff suppressed because it is too large Load Diff

View File

@@ -7,7 +7,7 @@ import io
import random
async def test_commands(env):
async with meshctrl.Session(env.mcurl, user="admin", password=env.users["admin"], ignore_ssl=True) as admin_session:
async with meshctrl.Session("wss://" + env.dockerurl, user="admin", password=env.users["admin"], ignore_ssl=True, proxy=env.proxyurl) as admin_session:
mesh = await admin_session.add_device_group("test", description="This is a test group", amtonly=False, features=0, consent=0, timeout=10)
try:
with env.create_agent(mesh.short_meshid) as agent:
@@ -53,7 +53,7 @@ async def test_commands(env):
assert (await admin_session.remove_device_group(mesh.meshid, timeout=10)), "Failed to remove device group"
async def test_upload_download(env):
async with meshctrl.Session(env.mcurl, user="admin", password=env.users["admin"], ignore_ssl=True) as admin_session:
async with meshctrl.Session("wss://" + env.dockerurl, user="admin", password=env.users["admin"], ignore_ssl=True, proxy=env.proxyurl) as admin_session:
mesh = await admin_session.add_device_group("test", description="This is a test group", amtonly=False, features=0, consent=0, timeout=10)
try:
with env.create_agent(mesh.short_meshid) as agent:

View File

@@ -25,6 +25,10 @@ async def test_sanity(env):
print("\ninfo server_info: {}\n".format(await s.server_info()))
pass
async def test_proxy(env):
async with meshctrl.Session("wss://" + env.dockerurl, user="unprivileged", password=env.users["unprivileged"], ignore_ssl=True, proxy=env.proxyurl) as s:
pass
async def test_ssl(env):
try:
async with meshctrl.Session(env.mcurl, user="unprivileged", password=env.users["unprivileged"], ignore_ssl=False) as s:
@@ -32,4 +36,21 @@ async def test_ssl(env):
except* ssl.SSLCertVerificationError:
pass
else:
raise Exception("Invalid SSL certificate accepted")
raise Exception("Invalid SSL certificate accepted")
async def test_urlparse():
# This tests the url port adding necessitated by python-socks. Our test environment doesn't use 443, so this is just a quick sanity test.
try:
async with meshctrl.Session("wss://localhost", user="unprivileged", password="Not a real password", ignore_ssl=True) as s:
pass
except* TimeoutError:
#We're not running a server, so timeout is our expected outcome
pass
# This tests our check for wss/ws url schemes
try:
async with meshctrl.Session("https://localhost", user="unprivileged", password="Not a real password", ignore_ssl=True) as s:
pass
except* ValueError:
#We're not running a server, so timeout is our expected outcome
pass

View File

@@ -31,6 +31,35 @@ async def test_admin(env):
assert len(admin_users) == len(env.users.keys()), "Admin cannot see correct number of users"
assert len(admin_sessions) == 2, "Admin cannot see correct number of oser sessions"
async def test_auto_reconnect(env):
async with meshctrl.Session(env.mcurl, user="admin", password=env.users["admin"], ignore_ssl=True, auto_reconnect=True) as admin_session:
env.restart_mesh()
await asyncio.sleep(10)
await admin_session.ping(timeout=10)
# As above, but with proxy
async with meshctrl.Session("wss://" + env.dockerurl, user="admin", password=env.users["admin"], ignore_ssl=True, auto_reconnect=True, proxy=env.proxyurl) as admin_session:
env.restart_mesh()
for i in range(3):
try:
await admin_session.ping(timeout=10)
except:
continue
break
else:
raise Exception("Failed to reconnect")
env.restart_proxy()
for i in range(3):
try:
await admin_session.ping(timeout=10)
except* Exception as e:
pass
else:
break
else:
raise Exception("Failed to reconnect")
async def test_users(env):
try: