From bb7ffbddfcd86af013d205aabf4adb24015954db Mon Sep 17 00:00:00 2001 From: Norwin Roosen Date: Thu, 31 Dec 2020 07:15:16 +0100 Subject: [PATCH] refactor main cleaner code, more flexible flag usage, graceful exit for hevring, improved image fetching --- main.go | 198 ++++++++++++++++++++++++++++------------------- pixelflut/api.go | 25 +++--- rpc/dottir.go | 29 +++++-- rpc/ran.go | 7 +- 4 files changed, 160 insertions(+), 99 deletions(-) diff --git a/main.go b/main.go index 2420ccf..5d3115a 100644 --- a/main.go +++ b/main.go @@ -2,105 +2,141 @@ package main import ( "flag" + "fmt" "image" "log" - "net" "os" "os/signal" "runtime/pprof" "sync" + "time" "github.com/SpeckiJ/Hochwasser/pixelflut" "github.com/SpeckiJ/Hochwasser/render" "github.com/SpeckiJ/Hochwasser/rpc" ) -var err error -var cpuprofile = flag.String("cpuprofile", "", "Destination file for CPU Profile") -var imgPath = flag.String("image", "", "Absolute Path to image") -var x = flag.Int("x", 0, "Offset of posted image from left border") -var y = flag.Int("y", 0, "Offset of posted image from top border") -var connections = flag.Int("connections", 4, "Number of simultaneous connections. Each connection posts a subimage") -var address = flag.String("host", "127.0.0.1:1337", "Server address") -var shuffle = flag.Bool("shuffle", false, "pixel send ordering") -var fetchImgPath = flag.String("fetch-image", "", "path to save the fetched pixel state to") -var ránAddr = flag.String("rán", "", "enable rpc server to distribute jobs, listening on the given address/port") -var hevringAddr = flag.String("hevring", "", "connect to rán rpc server at given address") +var ( + imgPath = flag.String("image", "", "Filepath of an image to flut") + ránAddr = flag.String("rán", "", "Start RPC server to distribute jobs, listening on the given address/port") + hevringAddr = flag.String("hevring", "", "Connect to PRC server at given address/port") + address = flag.String("host", ":1234", "Target server address") + connections = flag.Int("connections", 4, "Number of simultaneous connections. Each connection posts a subimage") + x = flag.Int("x", 0, "Offset of posted image from left border") + y = flag.Int("y", 0, "Offset of posted image from top border") + fetchImgPath = flag.String("fetch", "", "Enable fetching the screen area to the given local file, updating it each second") + cpuprofile = flag.String("cpuprofile", "", "Destination file for CPU Profile") +) func main() { flag.Parse() - - // Start cpu profiling if wanted + task := runWithExitHandler(taskFromFlags) if *cpuprofile != "" { - f, err := os.Create(*cpuprofile) - if err != nil { - log.Fatal(err) - } - defer f.Close() - pprof.StartCPUProfile(f) - defer pprof.StopCPUProfile() - } - - // :cleanExit setup - // stop chan is closed at end of main process, telling async tasks to stop. - // wg waits until async tasks gracefully stopped - wg := sync.WaitGroup{} - stopChan := make(chan bool) - interruptChan := make(chan os.Signal) - signal.Notify(interruptChan, os.Interrupt) - - if *imgPath != "" { - offset := image.Pt(*x, *y) - imgTmp, err := render.ReadImage(*imgPath) - if err != nil { - log.Println(err) - return - } - img := render.ImgToNRGBA(imgTmp) - - // check connectivity by opening one test connection - conn, err := net.Dial("tcp", *address) - if err != nil { - log.Fatal(err) - } - conn.Close() - - if *ránAddr != "" { - // run RPC server, tasking clients to flut - wg.Add(1) - r := rpc.SummonRán(*ránAddr, stopChan, &wg) - // TODO: startup without a task, but init params - r.SetTask(img, offset, *address, *connections) - - } else { - // fetch server state and save to file - // @incomplete: make this available also when not fluting? - if *fetchImgPath != "" { - fetchedImg := pixelflut.FetchImage(img.Bounds().Add(offset), *address, 1, stopChan) - *connections -= 1 - defer render.WriteImage(*fetchImgPath, fetchedImg) - } - - // local 🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊 - wg.Add(1) - go pixelflut.Flut(img, offset, *shuffle, false, false, *address, *connections, stopChan, &wg) - } - - } else if *hevringAddr != "" { - // connect to RPC server and execute their tasks - rpc.ConnectHevring(*hevringAddr) - + runWithProfiler(*cpuprofile, task) } else { - log.Fatal("must specify -image or -hevring") + task() + } +} + +func taskFromFlags(stop chan bool, wg *sync.WaitGroup) { + rán := *ránAddr + hev := *hevringAddr + + startServer := rán != "" || (hev == "" && *imgPath != "") + startClient := hev != "" || (rán == "" && *imgPath != "") + fetchImg := *fetchImgPath != "" + + if !(startServer || startClient || fetchImg) { + fmt.Println("Error: At least one of the following flags is needed:\n -image -rán -hevring\n") + flag.Usage() + os.Exit(1) } - // :cleanExit logic: - // notify all async tasks to stop on interrupt - // then wait for clean shutdown of all tasks before exiting - // TODO: make this available to all invocation types - select { - case <-interruptChan: + if startServer { + if rán == "" { + rán = ":5555" + } + r := rpc.SummonRán(rán, stop, wg) + + var img *image.NRGBA + if *imgPath != "" { + imgTmp, err := render.ReadImage(*imgPath) + if err != nil { + log.Fatal(err) + } + img = render.ImgToNRGBA(imgTmp) + } + + r.SetTask(img, image.Pt(*x, *y), *address, *connections) + } + + if startClient { + if hev == "" { + hev = ":5555" + } + rpc.ConnectHevring(hev, stop, wg) + } + + if fetchImg { + canvasToFile(*fetchImgPath, *address, time.Second, stop, wg) } - close(stopChan) - wg.Wait() +} + +func canvasToFile(filepath, server string, interval time.Duration, stop chan bool, wg *sync.WaitGroup) { + // async fetch the image + fetchedImg := pixelflut.FetchImage(nil, server, 1, stop) + + // write it in a fixed interval + go func() { + wg.Add(1) + defer wg.Done() + + for loop := true; loop; { + select { + case <-stop: + loop = false + case <-time.Tick(interval): + } + render.WriteImage(filepath, fetchedImg) + } + }() +} + +// Takes a non-blocking function, and provides it an interface for graceful shutdown: +// stop chan is closed if the routine should be stopped. before quitting, wg is awaited. +func runWithExitHandler(task func(stop chan bool, wg *sync.WaitGroup)) func() { + return func() { + wg := sync.WaitGroup{} + stopChan := make(chan bool) + interruptChan := make(chan os.Signal) + signal.Notify(interruptChan, os.Interrupt) + + task(stopChan, &wg) + + // block until we get an interrupt, or somebody says we need to quit (by closing stopChan) + select { + case <-interruptChan: + case <-stopChan: + stopChan = nil + } + + if stopChan != nil { + // notify all async tasks to stop on interrupt, if channel wasn't closed already + close(stopChan) + } + + // then wait for clean shutdown of all tasks before exiting + wg.Wait() + } +} + +func runWithProfiler(outfile string, task func()) { + f, err := os.Create(outfile) + if err != nil { + log.Fatal(err) + } + defer f.Close() + pprof.StartCPUProfile(f) + defer pprof.StopCPUProfile() + task() } diff --git a/pixelflut/api.go b/pixelflut/api.go index 954087e..77c7282 100644 --- a/pixelflut/api.go +++ b/pixelflut/api.go @@ -82,10 +82,16 @@ func CanvasSize(address string) (int, int) { } // FetchImage asynchronously uses `conns` to fetch pixels within `bounds` from -// a pixelflut server at `address`, and writes them into the returned Image. -func FetchImage(bounds image.Rectangle, address string, conns int, stop chan bool) (img *image.NRGBA) { - img = image.NewNRGBA(bounds) - cmds := cmdsFetchImage(bounds).Chunk(conns) +// a pixelflut server at `address`, and writes them into the returned Image. +// If bounds is nil, the server's entire canvas is fetched. +func FetchImage(bounds *image.Rectangle, address string, conns int, stop chan bool) (img *image.NRGBA) { + if bounds == nil { + x, y := CanvasSize(address) + bounds = &image.Rectangle{Max: image.Pt(x, y)} + } + + img = image.NewNRGBA(*bounds) + cmds := cmdsFetchImage(*bounds).Chunk(conns) for i := 0; i < conns; i++ { conn, err := net.Dial("tcp", address) @@ -102,7 +108,7 @@ func FetchImage(bounds image.Rectangle, address string, conns int, stop chan boo func readPixels(target *image.NRGBA, conn net.Conn, stop chan bool) { reader := bufio.NewReader(conn) - col := make([]byte, 3) + col := make([]byte, 4) for { select { case <-stop: @@ -114,12 +120,13 @@ func readPixels(target *image.NRGBA, conn net.Conn, stop chan bool) { log.Fatal(err) } - // parse response ("PX \n") - colorStart := len(res) - 7 - x, y := parseXY(res[3 : colorStart-1]) + // parse response ("PX \n") + // NOTE: shoreline sends alpha, pixelnuke does not! + colorStart := len(res) - 9 + x, y := parseXY(res[3:colorStart]) hex.Decode(col, res[colorStart:len(res)-1]) - target.SetNRGBA(x, y, color.NRGBA{col[0], col[1], col[2], 255}) + target.SetNRGBA(x, y, color.NRGBA{col[0], col[1], col[2], col[3]}) } } } diff --git a/rpc/dottir.go b/rpc/dottir.go index b69cc9f..2485f49 100644 --- a/rpc/dottir.go +++ b/rpc/dottir.go @@ -6,14 +6,15 @@ import ( "log" "net" "net/rpc" - "os" + "sync" "time" "github.com/SpeckiJ/Hochwasser/pixelflut" ) -func ConnectHevring(ránAddress string) { - rpc.Register(new(Hevring)) +func ConnectHevring(ránAddress string, stop chan bool, wg *sync.WaitGroup) { + h := new(Hevring) + rpc.Register(h) fmt.Printf("[hevring] greeting Rán at %s\n", ránAddress) conn, err := net.Dial("tcp", ránAddress) @@ -22,11 +23,27 @@ func ConnectHevring(ránAddress string) { } go rpc.ServeConn(conn) fmt.Printf("[hevring] awaiting task from Rán\n") + + h.quit = stop + h.wg = wg + h.wg.Add(1) + go func() { + select { + case <-h.quit: + } + if h.taskQuit != nil { + close(h.taskQuit) + h.taskQuit = nil + } + h.wg.Done() + }() } type Hevring struct { task FlutTask taskQuit chan bool + quit chan bool + wg *sync.WaitGroup } type FlutTask struct { @@ -61,7 +78,7 @@ func (h *Hevring) Flut(task FlutTask, reply *FlutAck) error { close(h.taskQuit) } - fmt.Printf("[hevring] Rán gave us /w o r k/!\n%v\n", task) + fmt.Printf("[hevring] Rán gave us work!\n%v\n", task) h.task = task h.taskQuit = make(chan bool) @@ -93,9 +110,9 @@ func (h *Hevring) Die(x int, reply *FlutAck) error { // @robustness: waiting for reply to be sent via timeout // @incomplete: should try to reconnect for a bit first go func() { - time.Sleep(100 * time.Millisecond) fmt.Println("[hevring] Rán disconnected, stopping") - os.Exit(0) + time.Sleep(100 * time.Millisecond) + close(h.quit) }() reply.Ok = true return nil diff --git a/rpc/ran.go b/rpc/ran.go index 8655051..602f364 100644 --- a/rpc/ran.go +++ b/rpc/ran.go @@ -93,7 +93,7 @@ func SummonRán(address string, stopChan chan bool, wg *sync.WaitGroup) *Rán { }() go RunREPL(r) - go r.killClients(stopChan, wg) + go r.handleExit(stopChan, wg) return r } @@ -129,13 +129,14 @@ func (r *Rán) stopTask() { } } -func (r *Rán) killClients(stopChan <-chan bool, wg *sync.WaitGroup) { +func (r *Rán) handleExit(stopChan <-chan bool, wg *sync.WaitGroup) { + wg.Add(1) + defer wg.Done() <-stopChan for _, c := range r.clients { ack := FlutAck{} c.Call("Hevring.Die", 0, &ack) // @speed: async } - wg.Done() } // SetTask assigns a FlutTask to Rán, distributing it to all clients