Merge pull request #4 from edgebox-iot/loop_loop_execution

Sysctl Loop execution
pull/5/head
Paulo Truta 2021-03-04 13:27:38 +01:00 committed by GitHub
commit 2e92ff53fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 799 additions and 13 deletions

6
.gitignore vendored
View File

@ -6,6 +6,9 @@
*.dylib
main
# Local .env file for overwriting info when doing local dev.
.env
# Test binary, built with `go test -c`
*.test
@ -18,3 +21,6 @@ main
# Build
/bin
# Go Sum files
*.sum

View File

@ -2,28 +2,40 @@ package main
import (
"flag"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
//"syscall"
"github.com/edgebox-iot/sysctl/internal/diagnostics"
"fmt"
"github.com/edgebox-iot/sysctl/internal/tasks"
"github.com/edgebox-iot/sysctl/internal/utils"
)
const defaultNotReadySleepTime time.Duration = time.Second * 60
const defaultSleepTime time.Duration = time.Second
func main() {
// load command line arguments
version := flag.Bool("version", false, "Get the version info")
name := flag.String("name", "edgebox", "name for the service")
db := flag.Bool("database", false, "Get database connection info")
name := flag.String("name", "edgebox", "Name for the service")
flag.Parse()
if *version {
printVersion()
os.Exit(0)
}
}
if *db {
printDbDetails()
os.Exit(0)
}
log.Printf("Starting Sysctl service for %s", *name)
@ -31,7 +43,7 @@ func main() {
sigs := make(chan os.Signal, 1)
// catch all signals since not explicitly listing
signal.Notify(sigs)
signal.Notify(sigs, syscall.SIGQUIT)
// Cathing specific signals can be done with:
//signal.Notify(sigs,syscall.SIGQUIT)
@ -44,15 +56,25 @@ func main() {
os.Exit(1)
}()
printVersion()
printDbDetails()
tick := 0
// infinite loop
for {
log.Printf("Executing instruction %s", *name)
if isSystemReady() {
tick++ // Tick is an int, so eventually will "go out of ticks?" Maybe we want to reset the ticks every once in a while, to avoid working with big numbers...
systemIterator(name, tick)
} else {
// Wait about 60 seconds before trying again.
log.Printf("System not ready. Next try will be executed in 60 seconds")
time.Sleep(defaultNotReadySleepTime)
}
// wait random number of milliseconds
Nsecs := 1000
log.Printf("Next instruction executed in %dms", Nsecs)
time.Sleep(time.Millisecond * time.Duration(Nsecs))
}
}
@ -64,7 +86,44 @@ func appCleanup() {
func printVersion() {
fmt.Printf(
"version: %s\ncommit: %s\nbuild time: %s",
"\nversion: %s\ncommit: %s\nbuild time: %s\n",
diagnostics.Version, diagnostics.Commit, diagnostics.BuildDate,
)
}
func printDbDetails() {
fmt.Printf(
"\n\nDatabase Connection Information:\n %s\n\n",
utils.GetMySQLDbConnectionDetails(),
)
}
// IsSystemReady : Checks hability of the service to execute commands (Only after "edgebox --build" is ran at least once via SSH, or if built for distribution)
func isSystemReady() bool {
_, err := os.Stat("/home/system/components/ws/.ready")
return !os.IsNotExist(err)
}
// IsDatabaseReady : Checks is it can successfully connect to the task queue db
func isDatabaseReady() bool {
return false
}
func systemIterator(name *string, tick int) {
log.Printf("Tick is %d", tick)
tasks.ExecuteSchedules(tick)
nextTask := tasks.GetNextTask()
if nextTask.Task != "" {
log.Printf("Executing task %s / Args: %s", nextTask.Task, nextTask.Args)
tasks.ExecuteTask(nextTask)
} else {
log.Printf("No tasks to execute.")
}
// Wait about 1 second before resumming operations.
log.Printf("Next instruction will be executed 1 second")
time.Sleep(defaultSleepTime)
}

9
go.mod
View File

@ -1,4 +1,9 @@
module github.com/edgebox-iot/sysctl
go 1.15
go 1.15
require (
github.com/go-sql-driver/mysql v1.5.0
github.com/joho/godotenv v1.3.0
gopkg.in/yaml.v2 v2.4.0
)

View File

@ -0,0 +1,272 @@
package edgeapps
import (
"io/ioutil"
"log"
"os"
"strings"
"time"
"github.com/joho/godotenv"
"github.com/edgebox-iot/sysctl/internal/utils"
)
// EdgeApp : Struct representing an EdgeApp in the system
type EdgeApp struct {
ID string `json:"id"`
Name string `json:"name"`
Status EdgeAppStatus `json:"status"`
Services []EdgeAppService `json:"services"`
InternetAccessible bool `json:"internet_accessible"`
NetworkURL string `json:"network_url"`
InternetURL string `json:"internet_url"`
}
// MaybeEdgeApp : Boolean flag for validation of edgeapp existance
type MaybeEdgeApp struct {
EdgeApp EdgeApp `json:"edge_app"`
Valid bool `json:"valid"`
}
// EdgeAppStatus : Struct representing possible EdgeApp statuses (code + description)
type EdgeAppStatus struct {
ID int `json:"id"`
Description string `json:"description"`
}
// EdgeAppService : Struct representing a single container that can be part of an EdgeApp package
type EdgeAppService struct {
ID string `json:"id"`
IsRunning bool `json:"is_running"`
}
const configFilename = "/edgebox-compose.yml"
const envFilename = "/edgebox.env"
const myEdgeAppServiceEnvFilename = "/myedgeapp.env"
const defaultContainerOperationSleepTime time.Duration = time.Second * 10
// GetEdgeApp : Returns a EdgeApp struct with the current application information
func GetEdgeApp(ID string) MaybeEdgeApp {
result := MaybeEdgeApp{
EdgeApp: EdgeApp{},
Valid: false,
}
_, err := os.Stat(utils.GetPath("edgeAppsPath") + ID + configFilename)
if !os.IsNotExist(err) {
// File exists. Start digging!
edgeAppName := ID
edgeAppEnv, err := godotenv.Read(utils.GetPath("edgeAppsPath") + ID + envFilename)
if err != nil {
log.Println("Error loading .env file for edgeapp " + edgeAppName)
} else {
if edgeAppEnv["EDGEAPP_NAME"] != "" {
edgeAppName = edgeAppEnv["EDGEAPP_NAME"]
}
}
edgeAppInternetAccessible := false
edgeAppInternetURL := ""
myEdgeAppServiceEnv, err := godotenv.Read(utils.GetPath("edgeAppsPath") + ID + myEdgeAppServiceEnvFilename)
if err != nil {
log.Println("No myedge.app environment file found. Status is Network-Only")
} else {
if myEdgeAppServiceEnv["INTERNET_URL"] != "" {
edgeAppInternetAccessible = true
edgeAppInternetURL = myEdgeAppServiceEnv["INTERNET_URL"]
}
}
result = MaybeEdgeApp{
EdgeApp: EdgeApp{
ID: ID,
Name: edgeAppName,
Status: GetEdgeAppStatus(ID),
Services: GetEdgeAppServices(ID),
InternetAccessible: edgeAppInternetAccessible,
NetworkURL: ID + ".edgebox.local",
InternetURL: edgeAppInternetURL,
},
Valid: true,
}
}
return result
}
// GetEdgeApps : Returns a list of all available EdgeApps in structs filled with information
func GetEdgeApps() []EdgeApp {
var edgeApps []EdgeApp
// Building list of available edgeapps in the system with their status
files, err := ioutil.ReadDir(utils.GetPath("edgeAppsPath"))
if err != nil {
log.Fatal(err)
}
for _, f := range files {
if f.IsDir() {
// It is a folder that most probably contains an EdgeApp.
// To be fully sure, test that edgebox-compose.yml file exists in the target directory.
maybeEdgeApp := GetEdgeApp(f.Name())
if maybeEdgeApp.Valid {
edgeApp := maybeEdgeApp.EdgeApp
edgeApps = append(edgeApps, edgeApp)
}
}
}
// return edgeApps
return edgeApps
}
// GetEdgeAppStatus : Returns a struct representing the current status of the EdgeApp
func GetEdgeAppStatus(ID string) EdgeAppStatus {
// Possible states of an EdgeApp:
// - All services running = EdgeApp running
// - Some services running = Problem detected, needs restart
// - No service running = EdgeApp is off
runningServices := 0
status := EdgeAppStatus{0, "off"}
services := GetEdgeAppServices(ID)
for _, edgeAppService := range services {
if edgeAppService.IsRunning {
runningServices++
}
}
if runningServices > 0 && runningServices != len(services) {
status = EdgeAppStatus{2, "error"}
}
if runningServices == len(services) {
status = EdgeAppStatus{1, "on"}
}
return status
}
// GetEdgeAppServices : Returns a
func GetEdgeAppServices(ID string) []EdgeAppService {
cmdArgs := []string{"-r", ".services | keys[]", utils.GetPath("edgeAppsPath") + ID + configFilename}
servicesString := utils.Exec("yq", cmdArgs)
serviceSlices := strings.Split(servicesString, "\n")
serviceSlices = utils.DeleteEmptySlices(serviceSlices)
var edgeAppServices []EdgeAppService
for _, serviceID := range serviceSlices {
cmdArgs = []string{"-f", utils.GetPath("wsPath") + "/docker-compose.yml", "exec", "-T", serviceID, "echo", "'Service Check'"}
cmdResult := utils.Exec("docker-compose", cmdArgs)
isRunning := false
if cmdResult != "" {
isRunning = true
}
edgeAppServices = append(edgeAppServices, EdgeAppService{ID: serviceID, IsRunning: isRunning})
}
return edgeAppServices
}
// RunEdgeApp : Run an EdgeApp and return its most current status
func RunEdgeApp(ID string) EdgeAppStatus {
services := GetEdgeAppServices(ID)
cmdArgs := []string{}
for _, service := range services {
cmdArgs = []string{"-f", utils.GetPath("wsPath") + "/docker-compose.yml", "start", service.ID}
utils.Exec("docker-compose", cmdArgs)
}
// Wait for it to settle up before continuing...
time.Sleep(defaultContainerOperationSleepTime)
return GetEdgeAppStatus(ID)
}
// StopEdgeApp : Stops an EdgeApp and return its most current status
func StopEdgeApp(ID string) EdgeAppStatus {
services := GetEdgeAppServices(ID)
cmdArgs := []string{}
for _, service := range services {
cmdArgs = []string{"-f", utils.GetPath("wsPath") + "/docker-compose.yml", "stop", service.ID}
utils.Exec("docker-compose", cmdArgs)
}
// Wait for it to settle up before continuing...
time.Sleep(defaultContainerOperationSleepTime)
return GetEdgeAppStatus(ID)
}
// EnableOnline : Write environment file and rebuild the necessary containers. Rebuilds containers in project (in case of change only)
func EnableOnline(ID string, InternetURL string) MaybeEdgeApp {
maybeEdgeApp := GetEdgeApp(ID)
if maybeEdgeApp.Valid { // We're only going to do this operation if the EdgeApp actually exists.
// Create the myedgeapp.env file and add the InternetURL entry to it
envFilePath := utils.GetPath("edgeAppsPath") + ID + myEdgeAppServiceEnvFilename
env, _ := godotenv.Unmarshal("INTERNET_URL=" + InternetURL)
_ = godotenv.Write(env, envFilePath)
}
buildFrameworkContainers()
return GetEdgeApp(ID) // Return refreshed information
}
// DisableOnline : Removes env files necessary for system external access config. Rebuilds containers in project (in case of change only).
func DisableOnline(ID string) MaybeEdgeApp {
envFilePath := utils.GetPath("edgeAppsPath") + ID + myEdgeAppServiceEnvFilename
_, err := godotenv.Read(envFilePath)
if err != nil {
log.Println("myedge.app environment file for " + ID + " not found. No need to delete.")
} else {
cmdArgs := []string{envFilePath}
utils.Exec("rm", cmdArgs)
}
buildFrameworkContainers()
return GetEdgeApp(ID)
}
func buildFrameworkContainers() {
cmdArgs := []string{utils.GetPath("wsPath") + "ws", "--build"}
utils.ExecAndStream("sh", cmdArgs)
time.Sleep(defaultContainerOperationSleepTime)
}

View File

@ -0,0 +1,313 @@
package tasks
import (
"database/sql"
"encoding/json"
"fmt"
"log"
"strconv"
"github.com/edgebox-iot/sysctl/internal/diagnostics"
"github.com/edgebox-iot/sysctl/internal/edgeapps"
"github.com/edgebox-iot/sysctl/internal/utils"
_ "github.com/go-sql-driver/mysql" // Mysql Driver
)
// Task : Struct for Task type
type Task struct {
ID int `json:"id"`
Task string `json:"task"`
Args string `json:"args"`
Status string `json:"status"`
Result sql.NullString `json:"result"` // Database fields that can be null must use the sql.NullString type
Created string `json:"created"`
Updated string `json:"updated"`
}
type taskSetupTunnelArgs struct {
BootnodeAddress string `json:"bootnode_address"`
BootnodeToken string `json:"bootnode_token"`
AssignedAddress string `json:"assigned_address"`
NodeName string `json:"node_name"`
}
type taskStartEdgeAppArgs struct {
ID string `json:"id"`
}
type taskStopEdgeAppArgs struct {
ID string `json:"id"`
}
type taskEnableOnlineArgs struct {
ID string `json:"id"`
InternetURL string `json:"internet_url"`
}
type taskDisableOnlineArgs struct {
ID string `json:"id"`
}
// GetNextTask : Performs a MySQL query over the device's Edgebox API
func GetNextTask() Task {
// Will try to connect to API database, which should be running locally under WS.
db, err := sql.Open("mysql", utils.GetMySQLDbConnectionDetails())
// if there is an error opening the connection, handle it
if err != nil {
panic(err.Error())
}
// defer the close till after the main function has finished executing
defer db.Close()
// perform a db.Query insert
results, err := db.Query("SELECT * FROM tasks WHERE status = 0 ORDER BY created ASC LIMIT 1;")
// if there is an error inserting, handle it
if err != nil {
panic(err.Error())
}
var task Task
for results.Next() {
// for each row, scan the result into our task composite object
err = results.Scan(&task.ID, &task.Task, &task.Args, &task.Status, &task.Result, &task.Created, &task.Updated)
if err != nil {
panic(err.Error()) // proper error handling instead of panic in your app
}
}
// be careful deferring Queries if you are using transactions
defer results.Close()
return task
}
// ExecuteTask : Performs execution of the given task, updating the task status as it goes, and publishing the task result
func ExecuteTask(task Task) Task {
db, err := sql.Open("mysql", utils.GetMySQLDbConnectionDetails())
if err != nil {
panic(err.Error())
}
defer db.Close()
_, err = db.Query("UPDATE tasks SET status = 1 WHERE ID = " + strconv.Itoa(task.ID))
if err != nil {
panic(err.Error())
}
if diagnostics.Version == "dev" {
log.Printf("Dev environemnt. Not executing tasks.")
} else {
log.Println("Task: " + task.Task)
switch task.Task {
case "setup_tunnel":
log.Println("Setting up bootnode connection...")
var args taskSetupTunnelArgs
err := json.Unmarshal([]byte(task.Args), &args)
if err != nil {
log.Printf("Error reading arguments of setup_bootnode task: %s", err)
} else {
taskResult := taskSetupTunnel(args)
task.Result = sql.NullString{String: taskResult, Valid: true}
}
case "start_edgeapp":
log.Println("Starting EdgeApp...")
var args taskStartEdgeAppArgs
err := json.Unmarshal([]byte(task.Args), &args)
if err != nil {
log.Printf("Error reading arguments of start_edgeapp task: %s", err)
} else {
taskResult := taskStartEdgeApp(args)
task.Result = sql.NullString{String: taskResult, Valid: true}
}
case "stop_edgeapp":
log.Println("Stopping EdgeApp...")
var args taskStopEdgeAppArgs
err := json.Unmarshal([]byte(task.Args), &args)
if err != nil {
log.Printf("Error reading arguments of stop_edgeapp task: %s", err)
} else {
taskResult := taskStopEdgeApp(args)
task.Result = sql.NullString{String: taskResult, Valid: true}
}
case "enable_online":
log.Println("Enabling online access to EdgeApp...")
var args taskEnableOnlineArgs
err := json.Unmarshal([]byte(task.Args), &args)
if err != nil {
log.Printf("Error reading arguments of enable_online task: %s", err)
} else {
taskResult := taskEnableOnline(args)
task.Result = sql.NullString{String: taskResult, Valid: true}
}
case "disable_online":
log.Println("Disabling online access to EdgeApp...")
var args taskDisableOnlineArgs
err := json.Unmarshal([]byte(task.Args), &args)
if err != nil {
log.Printf("Error reading arguments of enable_online task: %s", err)
} else {
taskResult := taskDisableOnline(args)
task.Result = sql.NullString{String: taskResult, Valid: true}
}
}
}
if task.Result.Valid {
db.Query("Update tasks SET status = 2, result = '" + task.Result.String + "' WHERE ID = " + strconv.Itoa(task.ID) + ";")
} else {
db.Query("Update tasks SET status = 3, result = 'Error' WHERE ID = " + strconv.Itoa(task.ID) + ";")
}
if err != nil {
panic(err.Error())
}
returnTask := task
return returnTask
}
// ExecuteSchedules - Run Specific tasks without input each multiple x of ticks.
func ExecuteSchedules(tick int) {
if tick == 1 {
// Executing on startup (first tick). Schedules run before tasks in the SystemIterator
log.Println(taskGetEdgeApps())
}
if tick%30 == 0 {
// Executing every 30 ticks
log.Println(taskGetEdgeApps())
}
if tick%60 == 0 {
// Every 60 ticks...
}
// Just add a schedule here if you need a custom one (every "tick hour", every "tick day", etc...)
}
func taskSetupTunnel(args taskSetupTunnelArgs) string {
fmt.Println("Executing taskSetupTunnel")
cmdargs := []string{"gen", "--name", args.NodeName, "--token", args.BootnodeToken, args.BootnodeAddress + ":8655", "--prefix", args.AssignedAddress}
utils.Exec("tinc-boot", cmdargs)
cmdargs = []string{"start", "tinc@dnet"}
utils.Exec("systemctl", cmdargs)
cmdargs = []string{"enable", "tinc@dnet"}
utils.Exec("systemctl", cmdargs)
output := "OK" // Better check / logging of command execution result.
return output
}
func taskStartEdgeApp(args taskStartEdgeAppArgs) string {
fmt.Println("Executing taskStartEdgeApp for " + args.ID)
result := edgeapps.RunEdgeApp(args.ID)
resultJSON, _ := json.Marshal(result)
taskGetEdgeApps() // This task will imediatelly update the entry in the api database.
return string(resultJSON)
}
func taskStopEdgeApp(args taskStopEdgeAppArgs) string {
fmt.Println("Executing taskStopEdgeApp for " + args.ID)
result := edgeapps.StopEdgeApp(args.ID)
resultJSON, _ := json.Marshal(result)
taskGetEdgeApps() // This task will imediatelly update the entry in the api database.
return string(resultJSON)
}
func taskEnableOnline(args taskEnableOnlineArgs) string {
fmt.Println("Executing taskEnableOnline for " + args.ID)
result := edgeapps.EnableOnline(args.ID, args.InternetURL)
resultJSON, _ := json.Marshal(result)
taskGetEdgeApps()
return string(resultJSON)
}
func taskDisableOnline(args taskDisableOnlineArgs) string {
fmt.Println("Executing taskDisableOnline for " + args.ID)
result := edgeapps.DisableOnline(args.ID)
resultJSON, _ := json.Marshal(result)
taskGetEdgeApps()
return string(resultJSON)
}
func taskGetEdgeApps() string {
fmt.Println("Executing taskGetEdgeApps")
edgeApps := edgeapps.GetEdgeApps()
edgeAppsJSON, _ := json.Marshal(edgeApps)
db, err := sql.Open("mysql", utils.GetMySQLDbConnectionDetails())
if err != nil {
panic(err.Error())
}
defer db.Close()
_, err = db.Query("REPLACE into options (name, value) VALUES ('EDGEAPPS_LIST','" + string(edgeAppsJSON) + "');")
if err != nil {
panic(err.Error())
}
return string(edgeAppsJSON)
}

View File

@ -0,0 +1,131 @@
package utils
import (
"bytes"
"fmt"
"io"
"log"
"os"
"os/exec"
"github.com/joho/godotenv"
)
// ExecAndStream : Runs a terminal command, but streams progress instead of outputting. Ideal for long lived process that need to be logged.
func ExecAndStream(command string, args []string) {
cmd := exec.Command(command, args...)
var stdoutBuf, stderrBuf bytes.Buffer
cmd.Stdout = io.MultiWriter(os.Stdout, &stdoutBuf)
cmd.Stderr = io.MultiWriter(os.Stderr, &stderrBuf)
cmd.Dir = GetPath("wsPath")
err := cmd.Run()
if err != nil {
log.Fatalf("cmd.Run() failed with %s\n", err)
}
outStr, errStr := string(stdoutBuf.Bytes()), string(stderrBuf.Bytes())
fmt.Printf("\nout:\n%s\nerr:\n%s\n", outStr, errStr)
}
// Exec : Runs a terminal Command, catches and logs errors, returns the result.
func Exec(command string, args []string) string {
cmd := exec.Command(command, args...)
var out bytes.Buffer
var stderr bytes.Buffer
cmd.Stdout = &out
cmd.Stderr = &stderr
cmd.Dir = GetPath("wsPath")
err := cmd.Run()
if err != nil {
// TODO: Deal with possibility of error in command, allow explicit error handling and return proper formatted stderr
// log.Println(fmt.Sprint(err) + ": " + stderr.String()) // ... Silence...
}
// log.Println("Result: " + out.String()) // ... Silence ...
return out.String()
}
// DeleteEmptySlices : Given a string array, delete empty entries.
func DeleteEmptySlices(s []string) []string {
var r []string
for _, str := range s {
if str != "" {
r = append(r, str)
}
}
return r
}
// GetMySQLDbConnectionDetails : Returns the necessary string as connection info for SQL.db()
func GetMySQLDbConnectionDetails() string {
var apiEnv map[string]string
apiEnv, err := godotenv.Read(GetPath("apiEnvFileLocation"))
if err != nil {
log.Fatal("Error loading .env file")
}
Dbhost := "127.0.0.1:" + apiEnv["HOST_MACHINE_MYSQL_PORT"]
Dbname := apiEnv["MYSQL_DATABASE"]
Dbuser := apiEnv["MYSQL_USER"]
Dbpass := apiEnv["MYSQL_PASSWORD"]
return Dbuser + ":" + Dbpass + "@tcp(" + Dbhost + ")/" + Dbname
}
// GetPath : Returns either the hardcoded path, or a overwritten value via .env file at project root. Register paths here for seamless working code between dev and prod environments ;)
func GetPath(pathKey string) string {
// Read whole of .env file to map.
var env map[string]string
env, err := godotenv.Read()
targetPath := ""
if err != nil {
// log.Println("Project .env file not found withing project root. Using only hardcoded path variables.")
// Do Nothing...
}
switch pathKey {
case "apiEnvFileLocation":
if env["API_ENV_FILE_LOCATION"] != "" {
targetPath = env["API_ENV_FILE_LOCATION"]
} else {
targetPath = "/home/system/components/api/edgebox.env"
}
case "edgeAppsPath":
if env["EDGEAPPS_PATH"] != "" {
targetPath = env["EDGEAPPS_PATH"]
} else {
targetPath = "/home/system/components/apps/"
}
case "wsPath":
if env["WS_PATH"] != "" {
targetPath = env["WS_PATH"]
} else {
targetPath = "/home/system/components/ws/"
}
default:
log.Printf("path_key %s nonexistant in GetPath().\n", pathKey)
}
return targetPath
}