Working prototype

This commit is contained in:
Daan Selen
2025-06-03 16:57:10 +02:00
parent 46f3eb735f
commit fa6725e77c
9 changed files with 100 additions and 39 deletions

View File

@@ -94,9 +94,20 @@ func RetrieveTokenNames() []string {
func InsertTask(name, command string, nodeids []string, date string) error {
for _, singleNodeid := range nodeids {
_, err := db.Exec(declStat.CreateTask, name, command, singleNodeid, date)
var count int
err := db.QueryRow(declStat.CountDuplTasks, command, singleNodeid).Scan(&count)
if err != nil {
return fmt.Errorf("failed to create task: %w", err)
return fmt.Errorf("failed to count the task occurence: %w", err)
}
if count > 0 {
log.Println(utilities.WarnTag, "Skipped creation of a task because its a duplicate.")
log.Println(utilities.WarnTag, fmt.Sprintf("Details: NodeID: %s, Task name: %s", singleNodeid, name))
} else {
_, err = db.Exec(declStat.CreateTask, name, command, singleNodeid, date)
if err != nil {
return fmt.Errorf("failed to create task: %w", err)
}
}
}
return nil

View File

@@ -10,10 +10,11 @@ type Statements struct {
RetrieveTokens string
RetrieveTokenNames string
CreateTask string
DeleteTask string
ListAllTasks string
CountTasks string
CreateTask string
DeleteTask string
ListAllTasks string
CountTasks string
CountDuplTasks string
}
var declStat = Statements{
@@ -67,4 +68,8 @@ var declStat = Statements{
SELECT COUNT(*)
FROM tasks
WHERE name = ?;`,
CountDuplTasks: `
SELECT COUNT(*)
FROM tasks
WHERE command = ? AND nodeid = ?;`,
}

View File

@@ -156,6 +156,7 @@ func createTaskHandler(hmacKey []byte) http.HandlerFunc {
}
data.Details.Name = strings.ToLower(data.Details.Name) //Transform to lower
if err := createTask(data.Details.Name, data.Details.Command, data.Details.Nodeids); err != nil {
log.Println(utilities.ErrTag, "createTask failed:", err)
http.Error(w, "Task creation failed", http.StatusInternalServerError)

View File

@@ -6,33 +6,55 @@ import (
"ghostrunner-server/modules/utilities"
"ghostrunner-server/modules/wrapper"
"log"
"slices"
"strings"
"sync"
)
func routine(venvName string, pyListArgs []string) {
d := listDevices(venvName, pyListArgs) // Retrieve the Online devices.
curTasks := database.RetrieveTasks()
var wg sync.WaitGroup
var offDevices []string
for index, task := range curTasks {
relevantNodeid := task.Nodeid
log.Printf("Processing Task %d", index)
if slices.Contains(offDevices, relevantNodeid) {
continue
}
log.Println(utilities.InfoTag, fmt.Sprintf("Processing Task %d", index))
if isNodeOnline(relevantNodeid, d.OnlineDevices) {
log.Printf("Node online: %s", relevantNodeid)
result := forgeAndExec(venvName, relevantNodeid, task.Command) // Forge the Python command and execute it with the Python libraries.
log.Println(result)
log.Println()
wg.Add(1)
//generateResult()
go func(task utilities.InternalQTaskData) {
defer wg.Done()
log.Println("Removing Task from database...")
database.RemoveTask(task.Name, task.Nodeid)
log.Println(utilities.InfoTag, fmt.Sprintf("Node online. NodeID: %s", task.Nodeid))
rawResult := forgeAndExec(venvName, task.Nodeid, task.Command)
pResult := generateResult(rawResult)
for idx, item := range pResult {
log.Println(idx, item)
}
log.Println(utilities.InfoTag, "Removing Task from database...")
database.RemoveTask(task.Name, task.Nodeid)
}(task)
} else {
log.Printf("Node offline %s", relevantNodeid) // Just a debug line to tell the user that the node is offline.
log.Println(utilities.InfoTag, fmt.Sprintf("Node offline all further tasks with this NodeID will be defered. NodeID: %s", relevantNodeid)) // Just a debug line to tell the user that the node is offline.
offDevices = append(offDevices, relevantNodeid)
}
}
log.Println(utilities.InfoTag, "Waiting for tasks to finish.")
wg.Wait()
}
func listDevices(venvName string, pyArgs []string) utilities.PyOnlineDevices {
@@ -41,7 +63,6 @@ func listDevices(venvName string, pyArgs []string) utilities.PyOnlineDevices {
log.Println(utilities.ErrTag, err)
}
log.Println(onDevices)
return onDevices
}
@@ -54,11 +75,34 @@ func isNodeOnline(nodeid string, onlineDevices []utilities.Device) bool {
return false
}
func forgeAndExec(venvName string, nodeid, command string) string {
log.Printf("Triggered %s, on %s", command, nodeid)
func forgeAndExec(venvName string, nodeid string, cmdStr string) string {
// Example args: ["--node", nodeid, "--command", cmdStr]
pyArgs := []string{"--run", "--nodeid", nodeid, "--command", cmdStr}
pyArgs := strings.Fields(fmt.Sprintf("--run --nodeid %s --command", nodeid))
pyArgs = append(pyArgs, command)
//log.Printf("[DEBUG] forgeAndExec: Preparing to run ExecTask for node %s", nodeid)
output := wrapper.ExecTask(venvName, pyArgs)
//log.Printf("[DEBUG] forgeAndExec: Output received for node %s", nodeid)
return wrapper.ExecTask(venvName, pyArgs)
return output
}
func generateResult(rawOut string) []string {
// Trim the brackets if they exist
rawOut = strings.TrimSpace(rawOut)
rawOut = strings.TrimPrefix(rawOut, "[")
rawOut = strings.TrimSuffix(rawOut, "]")
// Split on comma
items := strings.Split(rawOut, ",")
var result []string
for _, item := range items {
item = strings.TrimSpace(item)
item = strings.Trim(item, `' "`) // Remove surrounding single quotes and extra spaces
if item != "" {
result = append(result, item)
}
}
return result
}

View File

@@ -17,9 +17,10 @@ func KeepTime(interval int, venvName string) {
ticker := time.NewTicker(transInterval)
defer ticker.Stop()
for t := range ticker.C {
log.Println(utilities.InfoTag, "Tick at:", t)
log.Println(utilities.InfoTag, "Starting Routine.")
for range ticker.C {
log.Println(utilities.InfoTag, "----------------------------------------")
log.Println(utilities.InfoTag, "Routine Started.")
routine(venvName, pyListArgs)
log.Println(utilities.InfoTag, "Routine Ended.")
}
}

View File

@@ -15,8 +15,8 @@ const (
func pyExec(venvName string, pyArgs []string) ([]byte, error) {
pyBin := fmt.Sprintf("./../runner/%s/bin/python", venvName)
runtimeArgs := append([]string{pyFile}, pyArgs...)
runtimeArgs := append([]string{pyFile}, pyArgs...)
cmd := exec.Command(pyBin, runtimeArgs...)
return cmd.CombinedOutput()