From fa6725e77c07e99a2b048fc11acf6f4e7e4b3795 Mon Sep 17 00:00:00 2001 From: Daan Selen Date: Tue, 3 Jun 2025 16:57:10 +0200 Subject: [PATCH] Working prototype --- runner/modules/connect.py | 11 ++-- runner/modules/utilities.py | 3 +- runner/runner.py | 15 ++--- server/src/modules/database/handlers.go | 15 ++++- server/src/modules/database/statements.go | 13 ++-- server/src/modules/restapi/handlers.go | 1 + server/src/modules/timekeeper/routine.go | 72 +++++++++++++++++---- server/src/modules/timekeeper/timekeeper.go | 7 +- server/src/modules/wrapper/python.go | 2 +- 9 files changed, 100 insertions(+), 39 deletions(-) diff --git a/runner/modules/connect.py b/runner/modules/connect.py index afe560b..530d023 100644 --- a/runner/modules/connect.py +++ b/runner/modules/connect.py @@ -19,19 +19,20 @@ class connect: return session @staticmethod - async def run(session: meshctrl.Session, command: str, nodeids: str) -> None: + async def run(session: meshctrl.Session, command: str, nodeid: str) -> list[str]: try: - response = await session.run_command(nodeids=nodeids, + response = await session.run_command(nodeids=nodeid, command=command, ignore_output=False, timeout=900) except Exception as error: print("Run Command failed.", error) - return + return [] + result = [] for device in response: - response[device]["result"] = transform.process_shell_response(response[device]["result"]) - print(dumps(response[device]["result"])) + result = transform.process_shell_response(response[device]["result"]) + return result @staticmethod async def list_online(session: meshctrl.Session) -> dict: # Default is return online devices, but function can also return the offline devices if specified. diff --git a/runner/modules/utilities.py b/runner/modules/utilities.py index ba8c7f0..77467bd 100644 --- a/runner/modules/utilities.py +++ b/runner/modules/utilities.py @@ -43,5 +43,4 @@ class transform: if len(line) > 0: clean_output.append(line) - print(clean_output) - return clean_output + return clean_output \ No newline at end of file diff --git a/runner/runner.py b/runner/runner.py index 4a8c02b..7ecfe7f 100644 --- a/runner/runner.py +++ b/runner/runner.py @@ -33,26 +33,25 @@ async def main() -> None: session = await connect.connect(credentials["hostname"], credentials["username"], credentials["password"]) - + if args.list_online: online_devices = await connect.list_online(session) if args.indent: print(dumps(online_devices,indent=4)) else: print(dumps(online_devices)) - return await connect.quit(session) # Exit gracefully. Because python. if args.run: - if not args.command or not args.nodeid: - print("When using run, also use --command and --nodeid") - return await connect.quit(session) # Exit gracefully. Because python. - command = args.command nodeid = args.nodeid nodeid = await prepare_command(command, nodeid) - await connect.run(session, command, nodeid) - + result = await connect.run(session, command, nodeid) + if args.indent: + print(dumps(result,indent=4)) + else: + print(dumps(result)) + await session.close() if __name__ == "__main__": diff --git a/server/src/modules/database/handlers.go b/server/src/modules/database/handlers.go index 0bae70e..97bb57f 100644 --- a/server/src/modules/database/handlers.go +++ b/server/src/modules/database/handlers.go @@ -94,9 +94,20 @@ func RetrieveTokenNames() []string { func InsertTask(name, command string, nodeids []string, date string) error { for _, singleNodeid := range nodeids { - _, err := db.Exec(declStat.CreateTask, name, command, singleNodeid, date) + var count int + err := db.QueryRow(declStat.CountDuplTasks, command, singleNodeid).Scan(&count) if err != nil { - return fmt.Errorf("failed to create task: %w", err) + return fmt.Errorf("failed to count the task occurence: %w", err) + } + + if count > 0 { + log.Println(utilities.WarnTag, "Skipped creation of a task because its a duplicate.") + log.Println(utilities.WarnTag, fmt.Sprintf("Details: NodeID: %s, Task name: %s", singleNodeid, name)) + } else { + _, err = db.Exec(declStat.CreateTask, name, command, singleNodeid, date) + if err != nil { + return fmt.Errorf("failed to create task: %w", err) + } } } return nil diff --git a/server/src/modules/database/statements.go b/server/src/modules/database/statements.go index 17f33e4..247edf0 100644 --- a/server/src/modules/database/statements.go +++ b/server/src/modules/database/statements.go @@ -10,10 +10,11 @@ type Statements struct { RetrieveTokens string RetrieveTokenNames string - CreateTask string - DeleteTask string - ListAllTasks string - CountTasks string + CreateTask string + DeleteTask string + ListAllTasks string + CountTasks string + CountDuplTasks string } var declStat = Statements{ @@ -67,4 +68,8 @@ var declStat = Statements{ SELECT COUNT(*) FROM tasks WHERE name = ?;`, + CountDuplTasks: ` + SELECT COUNT(*) + FROM tasks + WHERE command = ? AND nodeid = ?;`, } diff --git a/server/src/modules/restapi/handlers.go b/server/src/modules/restapi/handlers.go index b62a0d9..8b974c2 100644 --- a/server/src/modules/restapi/handlers.go +++ b/server/src/modules/restapi/handlers.go @@ -156,6 +156,7 @@ func createTaskHandler(hmacKey []byte) http.HandlerFunc { } data.Details.Name = strings.ToLower(data.Details.Name) //Transform to lower + if err := createTask(data.Details.Name, data.Details.Command, data.Details.Nodeids); err != nil { log.Println(utilities.ErrTag, "createTask failed:", err) http.Error(w, "Task creation failed", http.StatusInternalServerError) diff --git a/server/src/modules/timekeeper/routine.go b/server/src/modules/timekeeper/routine.go index 1f0fb0a..7a5db56 100644 --- a/server/src/modules/timekeeper/routine.go +++ b/server/src/modules/timekeeper/routine.go @@ -6,33 +6,55 @@ import ( "ghostrunner-server/modules/utilities" "ghostrunner-server/modules/wrapper" "log" + "slices" "strings" + "sync" ) func routine(venvName string, pyListArgs []string) { d := listDevices(venvName, pyListArgs) // Retrieve the Online devices. curTasks := database.RetrieveTasks() + var wg sync.WaitGroup + var offDevices []string + for index, task := range curTasks { relevantNodeid := task.Nodeid - log.Printf("Processing Task %d", index) + if slices.Contains(offDevices, relevantNodeid) { + continue + } + + log.Println(utilities.InfoTag, fmt.Sprintf("Processing Task %d", index)) if isNodeOnline(relevantNodeid, d.OnlineDevices) { log.Printf("Node online: %s", relevantNodeid) - result := forgeAndExec(venvName, relevantNodeid, task.Command) // Forge the Python command and execute it with the Python libraries. - log.Println(result) - log.Println() + wg.Add(1) - //generateResult() + go func(task utilities.InternalQTaskData) { + defer wg.Done() - log.Println("Removing Task from database...") - database.RemoveTask(task.Name, task.Nodeid) + log.Println(utilities.InfoTag, fmt.Sprintf("Node online. NodeID: %s", task.Nodeid)) + + rawResult := forgeAndExec(venvName, task.Nodeid, task.Command) + pResult := generateResult(rawResult) + + for idx, item := range pResult { + log.Println(idx, item) + } + + log.Println(utilities.InfoTag, "Removing Task from database...") + database.RemoveTask(task.Name, task.Nodeid) + + }(task) } else { - log.Printf("Node offline %s", relevantNodeid) // Just a debug line to tell the user that the node is offline. + log.Println(utilities.InfoTag, fmt.Sprintf("Node offline all further tasks with this NodeID will be defered. NodeID: %s", relevantNodeid)) // Just a debug line to tell the user that the node is offline. + offDevices = append(offDevices, relevantNodeid) } } + log.Println(utilities.InfoTag, "Waiting for tasks to finish.") + wg.Wait() } func listDevices(venvName string, pyArgs []string) utilities.PyOnlineDevices { @@ -41,7 +63,6 @@ func listDevices(venvName string, pyArgs []string) utilities.PyOnlineDevices { log.Println(utilities.ErrTag, err) } - log.Println(onDevices) return onDevices } @@ -54,11 +75,34 @@ func isNodeOnline(nodeid string, onlineDevices []utilities.Device) bool { return false } -func forgeAndExec(venvName string, nodeid, command string) string { - log.Printf("Triggered %s, on %s", command, nodeid) +func forgeAndExec(venvName string, nodeid string, cmdStr string) string { + // Example args: ["--node", nodeid, "--command", cmdStr] + pyArgs := []string{"--run", "--nodeid", nodeid, "--command", cmdStr} - pyArgs := strings.Fields(fmt.Sprintf("--run --nodeid %s --command", nodeid)) - pyArgs = append(pyArgs, command) + //log.Printf("[DEBUG] forgeAndExec: Preparing to run ExecTask for node %s", nodeid) + output := wrapper.ExecTask(venvName, pyArgs) + //log.Printf("[DEBUG] forgeAndExec: Output received for node %s", nodeid) - return wrapper.ExecTask(venvName, pyArgs) + return output +} + +func generateResult(rawOut string) []string { + // Trim the brackets if they exist + rawOut = strings.TrimSpace(rawOut) + rawOut = strings.TrimPrefix(rawOut, "[") + rawOut = strings.TrimSuffix(rawOut, "]") + + // Split on comma + items := strings.Split(rawOut, ",") + + var result []string + for _, item := range items { + item = strings.TrimSpace(item) + item = strings.Trim(item, `' "`) // Remove surrounding single quotes and extra spaces + if item != "" { + result = append(result, item) + } + } + + return result } diff --git a/server/src/modules/timekeeper/timekeeper.go b/server/src/modules/timekeeper/timekeeper.go index 311c9d8..cec11f6 100644 --- a/server/src/modules/timekeeper/timekeeper.go +++ b/server/src/modules/timekeeper/timekeeper.go @@ -17,9 +17,10 @@ func KeepTime(interval int, venvName string) { ticker := time.NewTicker(transInterval) defer ticker.Stop() - for t := range ticker.C { - log.Println(utilities.InfoTag, "Tick at:", t) - log.Println(utilities.InfoTag, "Starting Routine.") + for range ticker.C { + log.Println(utilities.InfoTag, "----------------------------------------") + log.Println(utilities.InfoTag, "Routine Started.") routine(venvName, pyListArgs) + log.Println(utilities.InfoTag, "Routine Ended.") } } diff --git a/server/src/modules/wrapper/python.go b/server/src/modules/wrapper/python.go index 08dd97f..738695e 100644 --- a/server/src/modules/wrapper/python.go +++ b/server/src/modules/wrapper/python.go @@ -15,8 +15,8 @@ const ( func pyExec(venvName string, pyArgs []string) ([]byte, error) { pyBin := fmt.Sprintf("./../runner/%s/bin/python", venvName) - runtimeArgs := append([]string{pyFile}, pyArgs...) + runtimeArgs := append([]string{pyFile}, pyArgs...) cmd := exec.Command(pyBin, runtimeArgs...) return cmd.CombinedOutput()