Compare commits

...

28 Commits

Author SHA1 Message Date
Josiah Baldwin
7cefd24a9d Added release description for 1.3.2 2025-10-22 20:10:38 -07:00
Josiah Baldwin
cbc1f9223f Merge pull request #62 from HuFlungDu/fix/runcommands-race-condition
fix #46
2025-10-22 20:05:56 -07:00
Josiah Baldwin
3fa1ca2e32 Added handling for weird response from meshcentral for run_command and run_console_command 2025-10-22 19:59:17 -07:00
Josiah Baldwin
ee812220fb Attempt to fix race condition in run_commands 2025-10-15 12:17:48 -07:00
Josiah Baldwin
002f652c8c Bump version 2025-09-27 18:13:48 -07:00
Josiah Baldwin
0b09f64821 Fix race condition for run_commands
Fix run_console_command being oble to pick up run_command outputs
2025-09-27 18:12:21 -07:00
Josiah Baldwin
12a3040f89 Reapply "Feat/run console commands"
This reverts commit 4cda54ab60.
2025-09-27 14:48:55 -07:00
Josiah Baldwin
e0694f980c Merge branch 'release/1.3.0' 2025-09-26 15:58:06 -07:00
Josiah Baldwin
61053549f2 Fixed test for remove device 2025-09-26 15:54:25 -07:00
Josiah Baldwin
fb3d043431 Added Daan to contributors 2025-09-26 15:39:57 -07:00
Josiah Baldwin
c13985739b Added release notes for 1.3.0 2025-09-26 15:37:55 -07:00
Josiah Baldwin
db1914c87b Merge pull request #54 from DaanSelen/feat-remove-dev
feat: remove devices function

resolves #52
2025-09-26 15:32:09 -07:00
Daan Selen
b0d071d87f feat: add remove_device function 2025-09-26 15:29:52 -07:00
Josiah Baldwin
3bcedf5610 Kinda added a test for remove device 2025-09-26 15:20:25 -07:00
Josiah Baldwin
9c7a8c39b0 Modified some implementation details 2025-09-26 15:19:57 -07:00
Daan Selen
7ba6989325 refac: I lied, this is the last... 2025-09-26 14:50:25 -07:00
Daan Selen
748e39d5b4 refac: remove nodeid parameter 2025-09-26 14:50:25 -07:00
Daan Selen
6dae40eb40 refac: copy other style 2025-09-26 14:50:25 -07:00
Daan Selen
c7d628716e refac: renamed and added device class impl 2025-09-26 14:50:25 -07:00
Daan Selen
1f9979ddd1 feat: add remove_device function 2025-09-26 14:50:25 -07:00
d4b9524814 feat(lib): draft function for remove_device 2025-09-26 14:50:25 -07:00
Josiah Baldwin
bc1db8f2b3 Update documentation for files.rm
Resolves #53
2025-09-26 14:40:53 -07:00
Josiah Baldwin
403c0cd0ec Merge branch 'development' of github.com:HuFlungDu/pylibmeshctrl into development 2025-09-26 14:38:34 -07:00
Josiah Baldwin
0b0029563a Maybe fix race condition when using multiple nodes in run_command 2025-09-26 14:38:10 -07:00
Josiah Baldwin
0b32896c88 Merge pull request #60 from HuFlungDu/feat/run_console_commands
Add run_console_command function

resolve #55
2025-09-26 14:17:44 -07:00
Josiah Baldwin
2304810ee6 Merge pull request #59 from HuFlungDu/revert-58-feat/run_console_commands
Revert "Feat/run console commands"

Seriously, I don't want to default merge to main, github.
2025-09-26 14:16:47 -07:00
Josiah Baldwin
4cda54ab60 Revert "Feat/run console commands" 2025-09-26 14:16:18 -07:00
Josiah Baldwin
87fad5aa13 Merge pull request #58 from HuFlungDu/feat/run_console_commands
Add run_console_command function

resolve #55
2025-09-26 14:16:12 -07:00
6 changed files with 133 additions and 34 deletions

View File

@@ -3,3 +3,4 @@ Contributors
============ ============
* Josiah Baldwin <jbaldwin8889@gmail.com> * Josiah Baldwin <jbaldwin8889@gmail.com>
* Daan Selen <https://github.com/DaanSelen>

View File

@@ -2,6 +2,29 @@
Changelog Changelog
========= =========
version 1.3.2
=============
Improvments:
* Fix race condition that could occur when running `run_command` or `run_console_command`
version 1.3.1
=============
Improvments:
* Basically just everything in 1.3.0, this is a release fix
version 1.3.0
=============
Improvements:
* Improved how run_commands was handled (#51)
* Added remove device functionality (#52)
* Added run_console_commands functionality (#55)
Bugs:
* Silly documentation being wrong (#53)
version 1.2.2 version 1.2.2
============= =============

View File

@@ -295,6 +295,23 @@ class Device(object):
''' '''
return await self._session.reset_devices(self.nodeid, timeout=timeout) return await self._session.reset_devices(self.nodeid, timeout=timeout)
async def remove(self, timeout=None):
'''
Remove device from MeshCentral
Args:
nodeids (str|list[str]): nodeid(s) of the device(s) that have to be removed
timeout (int): duration in seconds to wait for a response before throwing an error
Returns:
bool: True on success, raise otherwise
Raises:
:py:class:`~meshctrl.exceptions.SocketError`: Info about socket closure
asyncio.TimeoutError: Command timed out
'''
return self._session.remove_devices(self.nodeid, timeout)
async def sleep(self, timeout=None): async def sleep(self, timeout=None):
''' '''
Sleep device Sleep device

View File

@@ -157,7 +157,7 @@ class Files(tunnel.Tunnel):
async def rm(self, path, files, recursive=False, timeout=None): async def rm(self, path, files, recursive=False, timeout=None):
""" """
Create a directory on the device. This API doesn't error if the file doesn't exist. Remove a set of files or directories from the device. This API doesn't error if the file doesn't exist.
Args: Args:
path (str): Directory from which to delete files path (str): Directory from which to delete files

View File

@@ -240,23 +240,28 @@ class Session(object):
async def __aexit__(self, exc_t, exc_v, exc_tb): async def __aexit__(self, exc_t, exc_v, exc_tb):
await self.close() await self.close()
@util._check_socket def _generate_response_id(self, name):
async def _send_command(self, data, name, timeout=None): responseid = f"meshctrl_{name}_{self._get_command_id()}"
id = f"meshctrl_{name}_{self._get_command_id()}"
# This fixes a very theoretical bug with hash colisions in the case of an infinite int of requests. Now the bug will only happen if there are currently 2**32-1 of the same type of request going out at the same time # This fixes a very theoretical bug with hash colisions in the case of an infinite int of requests. Now the bug will only happen if there are currently 2**32-1 of the same type of request going out at the same time
while id in self._inflight: while responseid in self._inflight:
id = f"meshctrl_{name}_{self._get_command_id()}" responseid = f"meshctrl_{name}_{self._get_command_id()}"
return responseid
self._inflight.add(id) @util._check_socket
async def _send_command(self, data, name, timeout=None, responseid=None):
if responseid is None:
responseid = self._generate_response_id(name)
self._inflight.add(responseid)
responded = asyncio.Event() responded = asyncio.Event()
response = None response = None
async def _(data): async def _(data):
self._inflight.remove(id) self._inflight.remove(responseid)
nonlocal response nonlocal response
response = data response = data
responded.set() responded.set()
self._eventer.once(id, _) self._eventer.once(responseid, _)
await self._message_queue.put(json.dumps(data | {"tag": id, "responseid": id})) await self._message_queue.put(json.dumps(data | {"tag": responseid, "responseid": responseid}))
await asyncio.wait_for(responded.wait(), timeout=timeout) await asyncio.wait_for(responded.wait(), timeout=timeout)
if isinstance(response, Exception): if isinstance(response, Exception):
raise response raise response
@@ -1062,6 +1067,30 @@ class Session(object):
raise exceptions.ServerError(data["result"]) raise exceptions.ServerError(data["result"])
return True return True
async def remove_devices(self, nodeids, timeout=None):
'''
Remove device(s) from MeshCentral
Args:
nodeids (str|list[str]): nodeid(s) of the device(s) that have to be removed
timeout (int): duration in seconds to wait for a response before throwing an error
Returns:
bool: True on success, raise otherwise
Raises:
:py:class:`~meshctrl.exceptions.ServerError`: Error text from server if there is a failure
:py:class:`~meshctrl.exceptions.SocketError`: Info about socket closure
asyncio.TimeoutError: Command timed out
'''
if isinstance(nodeids, str):
nodeids = [nodeids]
data = await self._send_command({ "action": 'removedevices', "nodeids": nodeids}, "remove_devices", timeout=timeout)
if data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"])
return True
async def add_device_group(self, name, description="", amtonly=False, features=0, consent=0, timeout=None): async def add_device_group(self, name, description="", amtonly=False, features=0, consent=0, timeout=None):
''' '''
@@ -1435,7 +1464,7 @@ class Session(object):
async def run_command(self, nodeids, command, powershell=False, runasuser=False, runasuseronly=False, ignore_output=False, timeout=None): async def run_command(self, nodeids, command, powershell=False, runasuser=False, runasuseronly=False, ignore_output=False, timeout=None):
''' '''
Run a command on any number of nodes. WARNING: Non namespaced call. Calling this function again before it returns may cause unintended consequences. Run a command on any number of nodes. WARNING: Non namespaced call on older versions of meshcentral (<1.0.22). Calling this function on those versions again before it returns may cause unintended consequences.
Args: Args:
nodeids (str|list[str]): Unique ids of nodes on which to run the command nodeids (str|list[str]): Unique ids of nodes on which to run the command
@@ -1472,45 +1501,52 @@ class Session(object):
if (f"node//{nid}" == id): if (f"node//{nid}" == id):
return nid return nid
result = {n: {"complete": False, "result": [], "command": command} for n in nodeids} result = None
console_result = {n: {"complete": False, "result": [], "command": command} for n in nodeids}
reply_result = {n: {"complete": False, "result": [], "command": command} for n in nodeids}
async def _console(): async def _console():
async for event in self.events({"action": "msg", "type": "console"}): async for event in self.events({"action": "msg", "type": "console"}):
node = match_nodeid(event["nodeid"], nodeids) node = match_nodeid(event["nodeid"], nodeids)
if node: if node:
if event["value"] == "Run commands completed.": if event["value"] == "Run commands completed.":
result.setdefault(node, {})["complete"] = True console_result.setdefault(node, {})["complete"] = True
if all(_["complete"] for key, _ in result.items()): if all(_["complete"] for key, _ in console_result.items()):
break break
continue continue
elif (event["value"].startswith("Run commands")): elif (event["value"].startswith("Run commands")):
continue continue
result[node]["result"].append(event["value"]) console_result[node]["result"].append(event["value"])
# We create this task AFTER getting the first message, but I don't feel like implementing this twice, so we'll pass in the first message and have it parsed immediately async def _reply(responseid, data=None):
async def _reply(responseid, start_data=None):
# Returns True when all results are in, Falsey otherwise # Returns True when all results are in, Falsey otherwise
def _parse_event(event): def _parse_event(event):
node = match_nodeid(event["nodeid"], nodeids) node = match_nodeid(event["nodeid"], nodeids)
if node: if node:
result.setdefault(node, {})["complete"] = True reply_result.setdefault(node, {})["complete"] = True
result[node]["result"].append(event["result"]) reply_result[node]["result"].append(event["result"])
if all(_["complete"] for key, _ in result.items()): if all(_["complete"] for key, _ in reply_result.items()):
return True return True
if start_data is not None: if data is not None:
if _parse_event(start_data): if _parse_event(data):
return return
async for event in self.events({"action": "msg", "type": "runcommands", "responseid":responseid}): async for event in self.events({"action": "msg", "type": "runcommands", "responseid":responseid}):
if _parse_event(event): if _parse_event(event):
break break
async def __(command, tg, tasks): async def __(command, tg, tasks):
data = await self._send_command(command, "run_command", timeout=timeout) nonlocal result
responseid = self._generate_response_id("run_command")
if not ignore_output:
reply_task = tg.create_task(asyncio.wait_for(_reply(responseid), timeout=timeout))
console_task = tg.create_task(asyncio.wait_for(_console(), timeout=timeout))
data = await self._send_command(command, "run_command", timeout=timeout, responseid=responseid)
if data.get("type", None) != "runcommands" and data.get("result", "ok").lower() != "ok": if data.get("type", None) != "runcommands" and data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"]) raise exceptions.ServerError(data["result"])
elif data.get("type", None) != "runcommands" and data.get("result", "ok").lower() == "ok": elif data.get("type", None) != "runcommands" and data.get("result", "ok").lower() == "ok":
reply_task.cancel()
result = console_result
expect_response = False expect_response = False
console_task = tg.create_task(asyncio.wait_for(_console(), timeout=timeout))
if not ignore_output: if not ignore_output:
userid = (await self.user_info())["_id"] userid = (await self.user_info())["_id"]
for n in nodeids: for n in nodeids:
@@ -1531,7 +1567,14 @@ class Session(object):
else: else:
console_task.cancel() console_task.cancel()
elif data.get("type", None) == "runcommands" and not ignore_output: elif data.get("type", None) == "runcommands" and not ignore_output:
tasks.append(tg.create_task(asyncio.wait_for(_reply(data["responseid"], start_data=data), timeout=timeout))) result = reply_result
console_task.cancel()
tasks.append(reply_task)
else:
if not ignore_output:
console_task.cancel()
reply_task.cancel()
raise exceptions.ServerError(f"Unrecognized response: {data}")
tasks = [] tasks = []
async with asyncio.TaskGroup() as tg: async with asyncio.TaskGroup() as tg:
@@ -1573,6 +1616,8 @@ class Session(object):
result = {n: {"complete": False, "result": [], "command": command} for n in nodeids} result = {n: {"complete": False, "result": [], "command": command} for n in nodeids}
async def _console(): async def _console():
async for event in self.events({"action": "msg", "type": "console"}): async for event in self.events({"action": "msg", "type": "console"}):
# We can pick up run commands here sometimes if they are run in quick succession. Try to avoid that.
if (not event["value"].startswith("Run commands")):
node = match_nodeid(event["nodeid"], nodeids) node = match_nodeid(event["nodeid"], nodeids)
if node: if node:
result[node]["result"].append(event["value"]) result[node]["result"].append(event["value"])
@@ -1580,13 +1625,14 @@ class Session(object):
if all(_["complete"] for key, _ in result.items()): if all(_["complete"] for key, _ in result.items()):
break break
async def __(command, tg, tasks): async def __(command, tg, tasks):
console_task = tg.create_task(asyncio.wait_for(_console(), timeout=timeout))
data = await self._send_command(command, "run_console_command", timeout=timeout) data = await self._send_command(command, "run_console_command", timeout=timeout)
if data.get("type", None) != "runcommands" and data.get("result", "ok").lower() != "ok": if data.get("type", None) != "runcommands" and data.get("result", "ok").lower() != "ok":
raise exceptions.ServerError(data["result"]) raise exceptions.ServerError(data["result"])
elif data.get("type", None) != "runcommands" and data.get("result", "ok").lower() == "ok": elif data.get("type", None) != "runcommands" and data.get("result", "ok").lower() == "ok":
expect_response = False expect_response = False
console_task = tg.create_task(asyncio.wait_for(_console(), timeout=timeout))
if not ignore_output: if not ignore_output:
userid = (await self.user_info())["_id"] userid = (await self.user_info())["_id"]
for n in nodeids: for n in nodeids:
@@ -1606,6 +1652,9 @@ class Session(object):
tasks.append(console_task) tasks.append(console_task)
else: else:
console_task.cancel() console_task.cancel()
else:
console_task.cancel()
raise exceptions.ServerError(f"Unrecognized response: {data}")
tasks = [] tasks = []
async with asyncio.TaskGroup() as tg: async with asyncio.TaskGroup() as tg:

View File

@@ -336,6 +336,15 @@ async def test_mesh_device(env):
r = await admin_session.remove_users_from_device_group((await privileged_session.user_info())["_id"], mesh.meshid, timeout=10) r = await admin_session.remove_users_from_device_group((await privileged_session.user_info())["_id"], mesh.meshid, timeout=10)
print("\ninfo remove_users_from_device_group: {}\n".format(r)) print("\ninfo remove_users_from_device_group: {}\n".format(r))
assert (r[(await privileged_session.user_info())["_id"]]["success"]), "Failed to remove user from device group" assert (r[(await privileged_session.user_info())["_id"]]["success"]), "Failed to remove user from device group"
await admin_session.remove_devices(agent2.nodeid, timeout=10)
try:
await admin_session.device_info(agent2.nodeid, timeout=10)
except ValueError:
pass
else:
raise Exception("Device not deleted")
assert (await admin_session.remove_users_from_device(agent.nodeid, (await unprivileged_session.user_info())["_id"], timeout=10)), "Failed to remove user from device" assert (await admin_session.remove_users_from_device(agent.nodeid, (await unprivileged_session.user_info())["_id"], timeout=10)), "Failed to remove user from device"