forked from Narcissus/pylibmeshctrl
Fix race condition for run_commands
Fix run_console_command being oble to pick up run_command outputs
This commit is contained in:
@@ -1509,23 +1509,7 @@ class Session(object):
|
||||
elif (event["value"].startswith("Run commands")):
|
||||
continue
|
||||
result[node]["result"].append(event["value"])
|
||||
|
||||
# We create this task AFTER getting the first message, but I don't feel like implementing this twice, so we'll pass in the first message and have it parsed immediately
|
||||
async def _reply(responseid, start_data=None):
|
||||
# Returns True when all results are in, Falsey otherwise
|
||||
def _parse_event(event):
|
||||
node = match_nodeid(event["nodeid"], nodeids)
|
||||
if node:
|
||||
result.setdefault(node, {})["complete"] = True
|
||||
result[node]["result"].append(event["result"])
|
||||
if all(_["complete"] for key, _ in result.items()):
|
||||
return True
|
||||
if start_data is not None:
|
||||
if _parse_event(start_data):
|
||||
return
|
||||
async for event in self.events({"action": "msg", "type": "runcommands", "responseid": responseid}):
|
||||
if _parse_event(event):
|
||||
break
|
||||
|
||||
|
||||
async def __(command, tg, tasks):
|
||||
data = await self._send_command(command, "run_command", timeout=timeout)
|
||||
@@ -1555,10 +1539,20 @@ class Session(object):
|
||||
else:
|
||||
console_task.cancel()
|
||||
elif data.get("type", None) == "runcommands" and not ignore_output:
|
||||
tasks.append(tg.create_task(asyncio.wait_for(_reply(data["responseid"], start_data=data), timeout=timeout)))
|
||||
# Force this to run immediately? This might be odd; but we want to make sure we get don't lose the race condition with the srever.
|
||||
# Not sure if this actually works but I haven't yet seen it fail. *shrug*
|
||||
await asyncio.sleep(0)
|
||||
# Returns True when all results are in, Falsey otherwise
|
||||
def _parse_event(event):
|
||||
node = match_nodeid(event["nodeid"], nodeids)
|
||||
if node:
|
||||
result.setdefault(node, {})["complete"] = True
|
||||
result[node]["result"].append(event["result"])
|
||||
if all(_["complete"] for key, _ in result.items()):
|
||||
return True
|
||||
if data is not None:
|
||||
if _parse_event(data):
|
||||
return
|
||||
async for event in self.events({"action": "msg", "type": "runcommands", "responseid": data["responseid"]}):
|
||||
if _parse_event(event):
|
||||
break
|
||||
|
||||
tasks = []
|
||||
async with asyncio.TaskGroup() as tg:
|
||||
@@ -1600,12 +1594,14 @@ class Session(object):
|
||||
result = {n: {"complete": False, "result": [], "command": command} for n in nodeids}
|
||||
async def _console():
|
||||
async for event in self.events({"action": "msg", "type": "console"}):
|
||||
node = match_nodeid(event["nodeid"], nodeids)
|
||||
if node:
|
||||
result[node]["result"].append(event["value"])
|
||||
result.setdefault(node, {})["complete"] = True
|
||||
if all(_["complete"] for key, _ in result.items()):
|
||||
break
|
||||
# We can pick up run commands here sometimes if they are run in quick succession. Try to avoid that.
|
||||
if (not event["value"].startswith("Run commands")):
|
||||
node = match_nodeid(event["nodeid"], nodeids)
|
||||
if node:
|
||||
result[node]["result"].append(event["value"])
|
||||
result.setdefault(node, {})["complete"] = True
|
||||
if all(_["complete"] for key, _ in result.items()):
|
||||
break
|
||||
async def __(command, tg, tasks):
|
||||
data = await self._send_command(command, "run_console_command", timeout=timeout)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user