From 0b09f648215013a6dec7fc9b0e87b9a56faafc21 Mon Sep 17 00:00:00 2001 From: Josiah Baldwin Date: Sat, 27 Sep 2025 18:12:21 -0700 Subject: [PATCH] Fix race condition for run_commands Fix run_console_command being oble to pick up run_command outputs --- src/meshctrl/session.py | 50 +++++++++++++++++++---------------------- 1 file changed, 23 insertions(+), 27 deletions(-) diff --git a/src/meshctrl/session.py b/src/meshctrl/session.py index 2c7292f..cec6bd8 100644 --- a/src/meshctrl/session.py +++ b/src/meshctrl/session.py @@ -1509,23 +1509,7 @@ class Session(object): elif (event["value"].startswith("Run commands")): continue result[node]["result"].append(event["value"]) - - # We create this task AFTER getting the first message, but I don't feel like implementing this twice, so we'll pass in the first message and have it parsed immediately - async def _reply(responseid, start_data=None): - # Returns True when all results are in, Falsey otherwise - def _parse_event(event): - node = match_nodeid(event["nodeid"], nodeids) - if node: - result.setdefault(node, {})["complete"] = True - result[node]["result"].append(event["result"]) - if all(_["complete"] for key, _ in result.items()): - return True - if start_data is not None: - if _parse_event(start_data): - return - async for event in self.events({"action": "msg", "type": "runcommands", "responseid": responseid}): - if _parse_event(event): - break + async def __(command, tg, tasks): data = await self._send_command(command, "run_command", timeout=timeout) @@ -1555,10 +1539,20 @@ class Session(object): else: console_task.cancel() elif data.get("type", None) == "runcommands" and not ignore_output: - tasks.append(tg.create_task(asyncio.wait_for(_reply(data["responseid"], start_data=data), timeout=timeout))) - # Force this to run immediately? This might be odd; but we want to make sure we get don't lose the race condition with the srever. - # Not sure if this actually works but I haven't yet seen it fail. *shrug* - await asyncio.sleep(0) + # Returns True when all results are in, Falsey otherwise + def _parse_event(event): + node = match_nodeid(event["nodeid"], nodeids) + if node: + result.setdefault(node, {})["complete"] = True + result[node]["result"].append(event["result"]) + if all(_["complete"] for key, _ in result.items()): + return True + if data is not None: + if _parse_event(data): + return + async for event in self.events({"action": "msg", "type": "runcommands", "responseid": data["responseid"]}): + if _parse_event(event): + break tasks = [] async with asyncio.TaskGroup() as tg: @@ -1600,12 +1594,14 @@ class Session(object): result = {n: {"complete": False, "result": [], "command": command} for n in nodeids} async def _console(): async for event in self.events({"action": "msg", "type": "console"}): - node = match_nodeid(event["nodeid"], nodeids) - if node: - result[node]["result"].append(event["value"]) - result.setdefault(node, {})["complete"] = True - if all(_["complete"] for key, _ in result.items()): - break + # We can pick up run commands here sometimes if they are run in quick succession. Try to avoid that. + if (not event["value"].startswith("Run commands")): + node = match_nodeid(event["nodeid"], nodeids) + if node: + result[node]["result"].append(event["value"]) + result.setdefault(node, {})["complete"] = True + if all(_["complete"] for key, _ in result.items()): + break async def __(command, tg, tasks): data = await self._send_command(command, "run_console_command", timeout=timeout)