Compare commits

...

7 Commits

Author SHA1 Message Date
Josiah Baldwin
7cefd24a9d Added release description for 1.3.2 2025-10-22 20:10:38 -07:00
Josiah Baldwin
cbc1f9223f Merge pull request #62 from HuFlungDu/fix/runcommands-race-condition
fix #46
2025-10-22 20:05:56 -07:00
Josiah Baldwin
3fa1ca2e32 Added handling for weird response from meshcentral for run_command and run_console_command 2025-10-22 19:59:17 -07:00
Josiah Baldwin
ee812220fb Attempt to fix race condition in run_commands 2025-10-15 12:17:48 -07:00
Josiah Baldwin
002f652c8c Bump version 2025-09-27 18:13:48 -07:00
Josiah Baldwin
0b09f64821 Fix race condition for run_commands
Fix run_console_command being oble to pick up run_command outputs
2025-09-27 18:12:21 -07:00
Josiah Baldwin
12a3040f89 Reapply "Feat/run console commands"
This reverts commit 4cda54ab60.
2025-09-27 14:48:55 -07:00
9 changed files with 189 additions and 49 deletions

View File

@@ -2,6 +2,18 @@
Changelog Changelog
========= =========
version 1.3.2
=============
Improvments:
* Fix race condition that could occur when running `run_command` or `run_console_command`
version 1.3.1
=============
Improvments:
* Basically just everything in 1.3.0, this is a release fix
version 1.3.0 version 1.3.0
============= =============

View File

@@ -184,7 +184,7 @@ class Session(object):
async def _listen_data_task(self, websocket): async def _listen_data_task(self, websocket):
async for message in websocket: async for message in websocket:
await self._eventer.emit("raw", message) await self._eventer.emit("raw", message)
# Meshcentral does pong wrong and breaks our parsing, so fix it here. # Meshcentral does pong wrong and breaks our parsing, so fix it here. This is fixed now, but we want compatibility with old versions.
if message == '{action:"pong"}': if message == '{action:"pong"}':
message = '{"action":"pong"}' message = '{"action":"pong"}'
@@ -240,23 +240,28 @@ class Session(object):
async def __aexit__(self, exc_t, exc_v, exc_tb): async def __aexit__(self, exc_t, exc_v, exc_tb):
await self.close() await self.close()
@util._check_socket def _generate_response_id(self, name):
async def _send_command(self, data, name, timeout=None): responseid = f"meshctrl_{name}_{self._get_command_id()}"
id = f"meshctrl_{name}_{self._get_command_id()}"
# This fixes a very theoretical bug with hash colisions in the case of an infinite int of requests. Now the bug will only happen if there are currently 2**32-1 of the same type of request going out at the same time # This fixes a very theoretical bug with hash colisions in the case of an infinite int of requests. Now the bug will only happen if there are currently 2**32-1 of the same type of request going out at the same time
while id in self._inflight: while responseid in self._inflight:
id = f"meshctrl_{name}_{self._get_command_id()}" responseid = f"meshctrl_{name}_{self._get_command_id()}"
return responseid
self._inflight.add(id) @util._check_socket
async def _send_command(self, data, name, timeout=None, responseid=None):
if responseid is None:
responseid = self._generate_response_id(name)
self._inflight.add(responseid)
responded = asyncio.Event() responded = asyncio.Event()
response = None response = None
async def _(data): async def _(data):
self._inflight.remove(id) self._inflight.remove(responseid)
nonlocal response nonlocal response
response = data response = data
responded.set() responded.set()
self._eventer.once(id, _) self._eventer.once(responseid, _)
await self._message_queue.put(json.dumps(data | {"tag": id, "responseid": id})) await self._message_queue.put(json.dumps(data | {"tag": responseid, "responseid": responseid}))
await asyncio.wait_for(responded.wait(), timeout=timeout) await asyncio.wait_for(responded.wait(), timeout=timeout)
if isinstance(response, Exception): if isinstance(response, Exception):
raise response raise response
@@ -1459,7 +1464,7 @@ class Session(object):
async def run_command(self, nodeids, command, powershell=False, runasuser=False, runasuseronly=False, ignore_output=False, timeout=None): async def run_command(self, nodeids, command, powershell=False, runasuser=False, runasuseronly=False, ignore_output=False, timeout=None):
''' '''
Run a command on any number of nodes. WARNING: Non namespaced call. Calling this function again before it returns may cause unintended consequences. Run a command on any number of nodes. WARNING: Non namespaced call on older versions of meshcentral (<1.0.22). Calling this function on those versions again before it returns may cause unintended consequences.
Args: Args:
nodeids (str|list[str]): Unique ids of nodes on which to run the command nodeids (str|list[str]): Unique ids of nodes on which to run the command
@@ -1496,27 +1501,52 @@ class Session(object):
if (f"node//{nid}" == id): if (f"node//{nid}" == id):
return nid return nid
result = {n: {"complete": False, "result": [], "command": command} for n in nodeids} result = None
async def _(): console_result = {n: {"complete": False, "result": [], "command": command} for n in nodeids}
reply_result = {n: {"complete": False, "result": [], "command": command} for n in nodeids}
async def _console():
async for event in self.events({"action": "msg", "type": "console"}): async for event in self.events({"action": "msg", "type": "console"}):
node = match_nodeid(event["nodeid"], nodeids) node = match_nodeid(event["nodeid"], nodeids)
if node: if node:
if event["value"] == "Run commands completed.": if event["value"] == "Run commands completed.":
result.setdefault(node, {})["complete"] = True console_result.setdefault(node, {})["complete"] = True
if all(_["complete"] for key, _ in result.items()): if all(_["complete"] for key, _ in console_result.items()):
break break
continue continue
elif (event["value"].startswith("Run commands")): elif (event["value"].startswith("Run commands")):
continue continue
result[node]["result"].append(event["value"]) console_result[node]["result"].append(event["value"])
async def __(command):
data = await self._send_command(command, "run_command", timeout=timeout) async def _reply(responseid, data=None):
# Returns True when all results are in, Falsey otherwise
def _parse_event(event):
node = match_nodeid(event["nodeid"], nodeids)
if node:
reply_result.setdefault(node, {})["complete"] = True
reply_result[node]["result"].append(event["result"])
if all(_["complete"] for key, _ in reply_result.items()):
return True
if data is not None:
if _parse_event(data):
return
async for event in self.events({"action": "msg", "type": "runcommands", "responseid":responseid}):
if _parse_event(event):
break
async def __(command, tg, tasks):
nonlocal result
responseid = self._generate_response_id("run_command")
if not ignore_output:
reply_task = tg.create_task(asyncio.wait_for(_reply(responseid), timeout=timeout))
console_task = tg.create_task(asyncio.wait_for(_console(), timeout=timeout))
data = await self._send_command(command, "run_command", timeout=timeout, responseid=responseid)
if data.get("type", None) != "runcommands" and data.get("result", "ok").lower() != "ok": if data.get("type", None) != "runcommands" and data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"]) raise exceptions.ServerError(data["result"])
elif data.get("type", None) != "runcommands" and data.get("result", "ok").lower() == "ok": elif data.get("type", None) != "runcommands" and data.get("result", "ok").lower() == "ok":
reply_task.cancel()
result = console_result
expect_response = False expect_response = False
console_task = tg.create_task(asyncio.wait_for(_console(), timeout=timeout))
if not ignore_output: if not ignore_output:
userid = (await self.user_info())["_id"] userid = (await self.user_info())["_id"]
for n in nodeids: for n in nodeids:
@@ -1537,16 +1567,98 @@ class Session(object):
else: else:
console_task.cancel() console_task.cancel()
elif data.get("type", None) == "runcommands" and not ignore_output: elif data.get("type", None) == "runcommands" and not ignore_output:
tasks.append(tg.create_task(asyncio.wait_for(_reply(data["responseid"], start_data=data), timeout=timeout))) result = reply_result
# Force this to run immediately? This might be odd; but we want to make sure we get don't lose the race condition with the srever. console_task.cancel()
# Not sure if this actually works but I haven't yet seen it fail. *shrug* tasks.append(reply_task)
await asyncio.sleep(0) else:
if not ignore_output:
console_task.cancel()
reply_task.cancel()
raise exceptions.ServerError(f"Unrecognized response: {data}")
tasks = [] tasks = []
async with asyncio.TaskGroup() as tg: async with asyncio.TaskGroup() as tg:
tasks.append(tg.create_task(__({ "action": 'runcommands', "nodeids": nodeids, "type": (2 if powershell else 0), "cmds": command, "runAsUser": runAsUser, "reply": not ignore_output}, tg, tasks)))
return {n: v | {"result": "".join(v["result"])} for n,v in result.items()}
async def run_console_command(self, nodeids, command, powershell=False, runasuser=False, runasuseronly=False, ignore_output=False, timeout=None):
'''
Run a mesh console command on any number of nodes. WARNING: Non namespaced call. Calling this function again before it returns may cause unintended consequences.
Args:
nodeids (str|list[str]): Unique ids of nodes on which to run the command
command (str): Command to run
ignore_output (bool): Don't bother trying to get the output. Every device will return an empty string for its result.
timeout (int): duration in seconds to wait for a response before throwing an error
Returns:
dict[str, ~meshctrl.types.RunCommandResponse]: Dict containing mapped output of the commands by device
Raises:
:py:class:`~meshctrl.exceptions.ServerError`: Error text from server if there is a failure
:py:class:`~meshctrl.exceptions.SocketError`: Info about socket closure
ValueError: `Invalid device id` if device is not found
asyncio.TimeoutError: Command timed out
'''
if isinstance(nodeids, str):
nodeids = [nodeids]
def match_nodeid(id, ids):
for nid in ids:
if (nid == id):
return nid
if (nid[6:] == id):
return nid
if (f"node//{nid}" == id):
return nid
result = {n: {"complete": False, "result": [], "command": command} for n in nodeids}
async def _console():
async for event in self.events({"action": "msg", "type": "console"}):
# We can pick up run commands here sometimes if they are run in quick succession. Try to avoid that.
if (not event["value"].startswith("Run commands")):
node = match_nodeid(event["nodeid"], nodeids)
if node:
result[node]["result"].append(event["value"])
result.setdefault(node, {})["complete"] = True
if all(_["complete"] for key, _ in result.items()):
break
async def __(command, tg, tasks):
console_task = tg.create_task(asyncio.wait_for(_console(), timeout=timeout))
data = await self._send_command(command, "run_console_command", timeout=timeout)
if data.get("type", None) != "runcommands" and data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
elif data.get("type", None) != "runcommands" and data.get("result", "ok").lower() == "ok":
expect_response = False
if not ignore_output:
userid = (await self.user_info())["_id"]
for n in nodeids:
device_info = await self.device_info(n, timeout=timeout)
try:
permissions = device_info.mesh.links.get(userid, {}).get("rights",constants.DeviceRights.norights)\
# This should work for device rights, but it only seems to work for mesh rights. Not sure why, but I can't get the events to show up when the user only has individual device rights
# |device_info.get("links", {}).get(userid, {}).get("rights", constants.DeviceRights.norights)
# If we don't have agentconsole rights, we won't be able te read the output, so fill in blanks on this node
if not permissions&constants.DeviceRights.agentconsole:
result[n]["complete"] = True
else:
expect_response = True
except AttributeError:
result[n]["complete"] = True
if expect_response: if expect_response:
tasks.append(tg.create_task(asyncio.wait_for(_(), timeout=timeout))) tasks.append(console_task)
tasks.append(tg.create_task(__({ "action": 'runcommands', "nodeids": nodeids, "type": (2 if powershell else 0), "cmds": command, "runAsUser": runAsUser }))) else:
console_task.cancel()
else:
console_task.cancel()
raise exceptions.ServerError(f"Unrecognized response: {data}")
tasks = []
async with asyncio.TaskGroup() as tg:
tasks.append(tg.create_task(__({ "action": 'runcommands', "nodeids": nodeids, "type": 4, "cmds": command}, tg, tasks)))
return {n: v | {"result": "".join(v["result"])} for n,v in result.items()} return {n: v | {"result": "".join(v["result"])} for n,v in result.items()}

View File

@@ -37,7 +37,9 @@ class Agent(object):
self._clienturl = clienturl self._clienturl = clienturl
self._dockerurl = dockerurl self._dockerurl = dockerurl
r = requests.post(f"{self._clienturl}/add-agent", json={"url": f"{self._dockerurl}", "meshid": self.meshid}) r = requests.post(f"{self._clienturl}/add-agent", json={"url": f"{self._dockerurl}", "meshid": self.meshid})
self.nodeid = r.json()["id"] agent_json = r.json()
self.nodeid = agent_json["id"]
self.nodehex = agent_json["hex"]
def __enter__(self): def __enter__(self):
return self return self

View File

@@ -1,4 +1,4 @@
FROM python:3.12 FROM python:3.13
WORKDIR /usr/local/app WORKDIR /usr/local/app
# Install the application dependencies # Install the application dependencies

View File

@@ -1,8 +1,8 @@
FROM ghcr.io/ylianst/meshcentral:latest FROM ghcr.io/ylianst/meshcentral:1.1.50
RUN apk add curl RUN apk add curl
RUN apk add python3 RUN apk add python3
WORKDIR /opt/meshcentral/ WORKDIR /opt/meshcentral/
COPY ./scripts/meshcentral ./scripts COPY ./scripts/meshcentral ./scripts
COPY ./config/meshcentral/data /opt/meshcentral/meshcentral-data COPY ./config/meshcentral/data /opt/meshcentral/meshcentral-data
COPY ./config/meshcentral/overrides /opt/meshcentral/meshcentral COPY ./config/meshcentral/overrides /opt/meshcentral/meshcentral
CMD ["python3", "/opt/meshcentral/scripts/create_users.py"] ENTRYPOINT ["python3", "/opt/meshcentral/scripts/create_users.py"]

View File

@@ -53,7 +53,7 @@ def add_agent():
time.sleep(.1) time.sleep(.1)
else: else:
raise Exception(f"Failed to start agent: {text}") raise Exception(f"Failed to start agent: {text}")
return {"id": agent_id} return {"id": agent_id, "hex": agent_hex}
@api.route('/remove-agent/<agentid>', methods=['POST']) @api.route('/remove-agent/<agentid>', methods=['POST'])
def remove_agent(agentid): def remove_agent(agentid):

View File

@@ -7,9 +7,9 @@ thisdir = os.path.abspath(os.path.dirname(__file__))
with open(os.path.join(thisdir, "users.json")) as infile: with open(os.path.join(thisdir, "users.json")) as infile:
users = json.load(infile) users = json.load(infile)
for username, password in users.items(): for username, password in users.items():
subprocess.check_output(["node", "/opt/meshcentral/meshcentral", "--createaccount", username, "--pass", password, "--name", username]) print(subprocess.check_output(["node", "/opt/meshcentral/meshcentral", "--createaccount", username, "--pass", password, "--name", username]))
subprocess.check_output(["node", "/opt/meshcentral/meshcentral", "--adminaccount", "admin"]) print(subprocess.check_output(["node", "/opt/meshcentral/meshcentral", "--adminaccount", "admin"]))
subprocess.call(["bash", "/opt/meshcentral/startup.sh"]) subprocess.call(["bash", "/opt/meshcentral/entrypoint.sh"])

View File

@@ -9,16 +9,7 @@ import requests
async def test_sanity(env): async def test_sanity(env):
async with meshctrl.Session(env.mcurl, user="unprivileged", password=env.users["unprivileged"], ignore_ssl=True) as s: async with meshctrl.Session(env.mcurl, user="unprivileged", password=env.users["unprivileged"], ignore_ssl=True) as s:
got_pong = asyncio.Event()
async def _():
async for raw in s.raw_messages():
if raw == '{action:"pong"}':
got_pong.set()
break
ping_task = None
async with asyncio.TaskGroup() as tg: async with asyncio.TaskGroup() as tg:
tg.create_task(asyncio.wait_for(_(), timeout=5))
tg.create_task(asyncio.wait_for(got_pong.wait(), timeout=5))
ping_task = tg.create_task(s.ping(timeout=10)) ping_task = tg.create_task(s.ping(timeout=10))
print("\ninfo ping: {}\n".format(ping_task.result())) print("\ninfo ping: {}\n".format(ping_task.result()))
print("\ninfo user_info: {}\n".format(await s.user_info())) print("\ninfo user_info: {}\n".format(await s.user_info()))

View File

@@ -251,7 +251,28 @@ async def test_mesh_device(env):
assert "Run commands completed." not in r[agent2.nodeid]["result"], "Didn't parse run command ending correctly" assert "Run commands completed." not in r[agent2.nodeid]["result"], "Didn't parse run command ending correctly"
assert "meshagent" in (await privileged_session.run_command(agent.nodeid, "ls", timeout=10))[agent.nodeid]["result"], "ls gave incorrect data" assert "meshagent" in (await privileged_session.run_command(agent.nodeid, "ls", timeout=10))[agent.nodeid]["result"], "ls gave incorrect data"
# Test run commands with ndividual device permissions # Test run_commands missing device
try:
await admin_session.run_command([agent.nodeid, "notanid"], "ls", timeout=10)
except* (meshctrl.exceptions.ServerError, ValueError):
pass
else:
raise Exception("Run command on a device that doesn't exist did not raise an exception")
r = await admin_session.run_console_command([agent.nodeid, agent2.nodeid], "info", timeout=10)
print("\ninfo run_console_command: {}\n".format(r))
assert agent.nodeid in r[agent.nodeid]["result"], "Run console command gave bad response"
assert agent2.nodeid in r[agent2.nodeid]["result"], "Run console command gave bad response"
# Test run_commands missing device
try:
await admin_session.run_console_command([agent.nodeid, "notanid"], "info", timeout=10)
except* (meshctrl.exceptions.ServerError, ValueError):
pass
else:
raise Exception("Run console command on a device that doesn't exist did not raise an exception")
# Test run commands with individual device permissions
try: try:
await unprivileged_session.run_command(agent.nodeid, "ls", timeout=10) await unprivileged_session.run_command(agent.nodeid, "ls", timeout=10)
except* (meshctrl.exceptions.ServerError, ValueError): except* (meshctrl.exceptions.ServerError, ValueError):
@@ -266,7 +287,7 @@ async def test_mesh_device(env):
else: else:
raise Exception("Unprivileged user has access to device it should not") raise Exception("Unprivileged user has access to device it should not")
assert (await admin_session.add_users_to_device((await unprivileged_session.user_info())["_id"], agent.nodeid, meshctrl.constants.MeshRights.norights)), "Failed to add user to device" assert (await admin_session.add_users_to_device((await unprivileged_session.user_info())["_id"], agent.nodeid, meshctrl.constants.DeviceRights.norights)), "Failed to add user to device"
try: try:
await unprivileged_session.run_command(agent.nodeid, "ls", ignore_output=True, timeout=10) await unprivileged_session.run_command(agent.nodeid, "ls", ignore_output=True, timeout=10)
@@ -284,12 +305,14 @@ async def test_mesh_device(env):
assert r.links[(await unprivileged_session.user_info())["_id"]]["rights"] == meshctrl.constants.DeviceRights.norights, "Unprivileged user has too many rights!" assert r.links[(await unprivileged_session.user_info())["_id"]]["rights"] == meshctrl.constants.DeviceRights.norights, "Unprivileged user has too many rights!"
assert (await admin_session.add_users_to_device([(await unprivileged_session.user_info())["_id"]], agent.nodeid, meshctrl.constants.DeviceRights.remotecontrol|meshctrl.constants.DeviceRights.agentconsole|meshctrl.constants.DeviceRights.remotecommands)), "Failed to modify user's permissions" assert (await admin_session.add_users_to_device([(await unprivileged_session.user_info())["_id"]], agent.nodeid, meshctrl.constants.DeviceRights.fullrights)), "Failed to modify user's permissions"
assert (await unprivileged_session.device_info(agent.nodeid, timeout=10)).links[(await unprivileged_session.user_info())["_id"]]["rights"] == meshctrl.constants.DeviceRights.remotecontrol|meshctrl.constants.DeviceRights.agentconsole|meshctrl.constants.DeviceRights.remotecommands, "Adding permissions did not update unprivileged user." assert (await unprivileged_session.device_info(agent.nodeid, timeout=10)).links[(await unprivileged_session.user_info())["_id"]]["rights"] == meshctrl.constants.DeviceRights.fullrights, "Adding permissions did not update unprivileged user."
# For now, this expects no response. If we ever figure out why the server isn't sending console information te us when it should, fix this. # For now, this expects no response. If we ever figure out why the server isn't sending console information to us when it should, fix this.
# assert "meshagent" in (await unprivileged_session.run_command(agent.nodeid, "ls", timeout=10))[agent.nodeid]["result"], "ls gave incorrect data" # assert "meshagent" in (await unprivileged_session.run_command(agent.nodeid, "ls", timeout=10))[agent.nodeid]["result"], "ls gave incorrect data"
# Meshcentral has a 10 second cache on user perms.
#await asyncio.sleep(15)
await unprivileged_session.run_command(agent.nodeid, "ls", timeout=10) await unprivileged_session.run_command(agent.nodeid, "ls", timeout=10)
assert await admin_session.move_to_device_group(agent.nodeid, mesh2.meshid, timeout=5), "Failed to move mesh to new device group" assert await admin_session.move_to_device_group(agent.nodeid, mesh2.meshid, timeout=5), "Failed to move mesh to new device group"
@@ -303,7 +326,7 @@ async def test_mesh_device(env):
assert await admin_session.move_to_device_group([agent.nodeid], mesh.name, isname=True, timeout=5), "Failed to move mesh to new device group by name" assert await admin_session.move_to_device_group([agent.nodeid], mesh.name, isname=True, timeout=5), "Failed to move mesh to new device group by name"
# For now, this expe namects no response. If we ever figure out why the server isn't sending console information te us when it should, fix this. # For now, this expects no response. If we ever figure out why the server isn't sending console information te us when it should, fix this.
# assert "meshagent" in (await unprivileged_session.run_command(agent.nodeid, "ls", timeout=10))[agent.nodeid]["result"], "ls gave incorrect data" # assert "meshagent" in (await unprivileged_session.run_command(agent.nodeid, "ls", timeout=10))[agent.nodeid]["result"], "ls gave incorrect data"
try: try:
await unprivileged_session.run_command(agent.nodeid, "ls", timeout=10) await unprivileged_session.run_command(agent.nodeid, "ls", timeout=10)