mirror of
https://github.com/HuFlungDu/pylibmeshctrl.git
synced 2026-02-20 21:52:16 +00:00
Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7cefd24a9d | ||
|
|
cbc1f9223f | ||
|
|
3fa1ca2e32 | ||
|
|
ee812220fb |
@@ -2,6 +2,12 @@
|
|||||||
Changelog
|
Changelog
|
||||||
=========
|
=========
|
||||||
|
|
||||||
|
version 1.3.2
|
||||||
|
=============
|
||||||
|
|
||||||
|
Improvments:
|
||||||
|
* Fix race condition that could occur when running `run_command` or `run_console_command`
|
||||||
|
|
||||||
version 1.3.1
|
version 1.3.1
|
||||||
=============
|
=============
|
||||||
|
|
||||||
|
|||||||
@@ -240,23 +240,28 @@ class Session(object):
|
|||||||
async def __aexit__(self, exc_t, exc_v, exc_tb):
|
async def __aexit__(self, exc_t, exc_v, exc_tb):
|
||||||
await self.close()
|
await self.close()
|
||||||
|
|
||||||
@util._check_socket
|
def _generate_response_id(self, name):
|
||||||
async def _send_command(self, data, name, timeout=None):
|
responseid = f"meshctrl_{name}_{self._get_command_id()}"
|
||||||
id = f"meshctrl_{name}_{self._get_command_id()}"
|
|
||||||
# This fixes a very theoretical bug with hash colisions in the case of an infinite int of requests. Now the bug will only happen if there are currently 2**32-1 of the same type of request going out at the same time
|
# This fixes a very theoretical bug with hash colisions in the case of an infinite int of requests. Now the bug will only happen if there are currently 2**32-1 of the same type of request going out at the same time
|
||||||
while id in self._inflight:
|
while responseid in self._inflight:
|
||||||
id = f"meshctrl_{name}_{self._get_command_id()}"
|
responseid = f"meshctrl_{name}_{self._get_command_id()}"
|
||||||
|
return responseid
|
||||||
|
|
||||||
self._inflight.add(id)
|
@util._check_socket
|
||||||
|
async def _send_command(self, data, name, timeout=None, responseid=None):
|
||||||
|
if responseid is None:
|
||||||
|
responseid = self._generate_response_id(name)
|
||||||
|
|
||||||
|
self._inflight.add(responseid)
|
||||||
responded = asyncio.Event()
|
responded = asyncio.Event()
|
||||||
response = None
|
response = None
|
||||||
async def _(data):
|
async def _(data):
|
||||||
self._inflight.remove(id)
|
self._inflight.remove(responseid)
|
||||||
nonlocal response
|
nonlocal response
|
||||||
response = data
|
response = data
|
||||||
responded.set()
|
responded.set()
|
||||||
self._eventer.once(id, _)
|
self._eventer.once(responseid, _)
|
||||||
await self._message_queue.put(json.dumps(data | {"tag": id, "responseid": id}))
|
await self._message_queue.put(json.dumps(data | {"tag": responseid, "responseid": responseid}))
|
||||||
await asyncio.wait_for(responded.wait(), timeout=timeout)
|
await asyncio.wait_for(responded.wait(), timeout=timeout)
|
||||||
if isinstance(response, Exception):
|
if isinstance(response, Exception):
|
||||||
raise response
|
raise response
|
||||||
@@ -1459,7 +1464,7 @@ class Session(object):
|
|||||||
|
|
||||||
async def run_command(self, nodeids, command, powershell=False, runasuser=False, runasuseronly=False, ignore_output=False, timeout=None):
|
async def run_command(self, nodeids, command, powershell=False, runasuser=False, runasuseronly=False, ignore_output=False, timeout=None):
|
||||||
'''
|
'''
|
||||||
Run a command on any number of nodes. WARNING: Non namespaced call. Calling this function again before it returns may cause unintended consequences.
|
Run a command on any number of nodes. WARNING: Non namespaced call on older versions of meshcentral (<1.0.22). Calling this function on those versions again before it returns may cause unintended consequences.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
nodeids (str|list[str]): Unique ids of nodes on which to run the command
|
nodeids (str|list[str]): Unique ids of nodes on which to run the command
|
||||||
@@ -1496,29 +1501,52 @@ class Session(object):
|
|||||||
if (f"node//{nid}" == id):
|
if (f"node//{nid}" == id):
|
||||||
return nid
|
return nid
|
||||||
|
|
||||||
result = {n: {"complete": False, "result": [], "command": command} for n in nodeids}
|
result = None
|
||||||
|
console_result = {n: {"complete": False, "result": [], "command": command} for n in nodeids}
|
||||||
|
reply_result = {n: {"complete": False, "result": [], "command": command} for n in nodeids}
|
||||||
async def _console():
|
async def _console():
|
||||||
async for event in self.events({"action": "msg", "type": "console"}):
|
async for event in self.events({"action": "msg", "type": "console"}):
|
||||||
node = match_nodeid(event["nodeid"], nodeids)
|
node = match_nodeid(event["nodeid"], nodeids)
|
||||||
if node:
|
if node:
|
||||||
if event["value"] == "Run commands completed.":
|
if event["value"] == "Run commands completed.":
|
||||||
result.setdefault(node, {})["complete"] = True
|
console_result.setdefault(node, {})["complete"] = True
|
||||||
if all(_["complete"] for key, _ in result.items()):
|
if all(_["complete"] for key, _ in console_result.items()):
|
||||||
break
|
break
|
||||||
continue
|
continue
|
||||||
elif (event["value"].startswith("Run commands")):
|
elif (event["value"].startswith("Run commands")):
|
||||||
continue
|
continue
|
||||||
result[node]["result"].append(event["value"])
|
console_result[node]["result"].append(event["value"])
|
||||||
|
|
||||||
|
async def _reply(responseid, data=None):
|
||||||
|
# Returns True when all results are in, Falsey otherwise
|
||||||
|
def _parse_event(event):
|
||||||
|
node = match_nodeid(event["nodeid"], nodeids)
|
||||||
|
if node:
|
||||||
|
reply_result.setdefault(node, {})["complete"] = True
|
||||||
|
reply_result[node]["result"].append(event["result"])
|
||||||
|
if all(_["complete"] for key, _ in reply_result.items()):
|
||||||
|
return True
|
||||||
|
if data is not None:
|
||||||
|
if _parse_event(data):
|
||||||
|
return
|
||||||
|
async for event in self.events({"action": "msg", "type": "runcommands", "responseid":responseid}):
|
||||||
|
if _parse_event(event):
|
||||||
|
break
|
||||||
|
|
||||||
async def __(command, tg, tasks):
|
async def __(command, tg, tasks):
|
||||||
data = await self._send_command(command, "run_command", timeout=timeout)
|
nonlocal result
|
||||||
|
responseid = self._generate_response_id("run_command")
|
||||||
|
if not ignore_output:
|
||||||
|
reply_task = tg.create_task(asyncio.wait_for(_reply(responseid), timeout=timeout))
|
||||||
|
console_task = tg.create_task(asyncio.wait_for(_console(), timeout=timeout))
|
||||||
|
data = await self._send_command(command, "run_command", timeout=timeout, responseid=responseid)
|
||||||
|
|
||||||
if data.get("type", None) != "runcommands" and data.get("result", "ok").lower() != "ok":
|
if data.get("type", None) != "runcommands" and data.get("result", "ok").lower() != "ok":
|
||||||
raise exceptions.ServerError(data["result"])
|
raise exceptions.ServerError(data["result"])
|
||||||
elif data.get("type", None) != "runcommands" and data.get("result", "ok").lower() == "ok":
|
elif data.get("type", None) != "runcommands" and data.get("result", "ok").lower() == "ok":
|
||||||
|
reply_task.cancel()
|
||||||
|
result = console_result
|
||||||
expect_response = False
|
expect_response = False
|
||||||
console_task = tg.create_task(asyncio.wait_for(_console(), timeout=timeout))
|
|
||||||
if not ignore_output:
|
if not ignore_output:
|
||||||
userid = (await self.user_info())["_id"]
|
userid = (await self.user_info())["_id"]
|
||||||
for n in nodeids:
|
for n in nodeids:
|
||||||
@@ -1539,20 +1567,14 @@ class Session(object):
|
|||||||
else:
|
else:
|
||||||
console_task.cancel()
|
console_task.cancel()
|
||||||
elif data.get("type", None) == "runcommands" and not ignore_output:
|
elif data.get("type", None) == "runcommands" and not ignore_output:
|
||||||
# Returns True when all results are in, Falsey otherwise
|
result = reply_result
|
||||||
def _parse_event(event):
|
console_task.cancel()
|
||||||
node = match_nodeid(event["nodeid"], nodeids)
|
tasks.append(reply_task)
|
||||||
if node:
|
else:
|
||||||
result.setdefault(node, {})["complete"] = True
|
if not ignore_output:
|
||||||
result[node]["result"].append(event["result"])
|
console_task.cancel()
|
||||||
if all(_["complete"] for key, _ in result.items()):
|
reply_task.cancel()
|
||||||
return True
|
raise exceptions.ServerError(f"Unrecognized response: {data}")
|
||||||
if data is not None:
|
|
||||||
if _parse_event(data):
|
|
||||||
return
|
|
||||||
async for event in self.events({"action": "msg", "type": "runcommands", "responseid": data["responseid"]}):
|
|
||||||
if _parse_event(event):
|
|
||||||
break
|
|
||||||
|
|
||||||
tasks = []
|
tasks = []
|
||||||
async with asyncio.TaskGroup() as tg:
|
async with asyncio.TaskGroup() as tg:
|
||||||
@@ -1603,13 +1625,14 @@ class Session(object):
|
|||||||
if all(_["complete"] for key, _ in result.items()):
|
if all(_["complete"] for key, _ in result.items()):
|
||||||
break
|
break
|
||||||
async def __(command, tg, tasks):
|
async def __(command, tg, tasks):
|
||||||
|
console_task = tg.create_task(asyncio.wait_for(_console(), timeout=timeout))
|
||||||
data = await self._send_command(command, "run_console_command", timeout=timeout)
|
data = await self._send_command(command, "run_console_command", timeout=timeout)
|
||||||
|
|
||||||
if data.get("type", None) != "runcommands" and data.get("result", "ok").lower() != "ok":
|
if data.get("type", None) != "runcommands" and data.get("result", "ok").lower() != "ok":
|
||||||
raise exceptions.ServerError(data["result"])
|
raise exceptions.ServerError(data["result"])
|
||||||
elif data.get("type", None) != "runcommands" and data.get("result", "ok").lower() == "ok":
|
elif data.get("type", None) != "runcommands" and data.get("result", "ok").lower() == "ok":
|
||||||
expect_response = False
|
expect_response = False
|
||||||
console_task = tg.create_task(asyncio.wait_for(_console(), timeout=timeout))
|
|
||||||
if not ignore_output:
|
if not ignore_output:
|
||||||
userid = (await self.user_info())["_id"]
|
userid = (await self.user_info())["_id"]
|
||||||
for n in nodeids:
|
for n in nodeids:
|
||||||
@@ -1629,6 +1652,9 @@ class Session(object):
|
|||||||
tasks.append(console_task)
|
tasks.append(console_task)
|
||||||
else:
|
else:
|
||||||
console_task.cancel()
|
console_task.cancel()
|
||||||
|
else:
|
||||||
|
console_task.cancel()
|
||||||
|
raise exceptions.ServerError(f"Unrecognized response: {data}")
|
||||||
|
|
||||||
tasks = []
|
tasks = []
|
||||||
async with asyncio.TaskGroup() as tg:
|
async with asyncio.TaskGroup() as tg:
|
||||||
|
|||||||
Reference in New Issue
Block a user