From 3f06bf6b825d437a8964edf3108acd1e9d46de01 Mon Sep 17 00:00:00 2001 From: Norwin Roosen Date: Thu, 13 Feb 2020 23:57:56 +0100 Subject: [PATCH] refactor clean exit code so much work just to let cleanup run & to pause fluting :( --- main.go | 54 +++++++++++++++++++++--------------------------- pixelflut/api.go | 16 +++++++------- pixelflut/net.go | 3 +-- rpc/dottir.go | 7 +++---- rpc/ran.go | 26 +++++++++++------------ 5 files changed, 47 insertions(+), 59 deletions(-) diff --git a/main.go b/main.go index d9cb5aa..1cd9dc7 100644 --- a/main.go +++ b/main.go @@ -9,7 +9,6 @@ import ( "os/signal" "runtime/pprof" "sync" - "time" "github.com/SpeckiJ/Hochwasser/pixelflut" "github.com/SpeckiJ/Hochwasser/rpc" @@ -22,7 +21,6 @@ var x = flag.Int("x", 0, "Offset of posted image from left border") var y = flag.Int("y", 0, "Offset of posted image from top border") var connections = flag.Int("connections", 4, "Number of simultaneous connections. Each connection posts a subimage") var address = flag.String("host", "127.0.0.1:1337", "Server address") -var runtime = flag.String("runtime", "60s", "exit after timeout") var shuffle = flag.Bool("shuffle", false, "pixel send ordering") var fetchImgPath = flag.String("fetch-image", "", "path to save the fetched pixel state to") var ránAddr = flag.String("rán", "", "enable rpc server to distribute jobs, listening on the given address/port") @@ -42,6 +40,14 @@ func main() { defer pprof.StopCPUProfile() } + // :cleanExit setup + // stop chan is closed at end of main process, telling async tasks to stop. + // wg waits until async tasks gracefully stopped + wg := sync.WaitGroup{} + stopChan := make(chan bool) + interruptChan := make(chan os.Signal) + signal.Notify(interruptChan, os.Interrupt) + if *imgPath != "" { offset := image.Pt(*x, *y) img := readImage(*imgPath) @@ -55,21 +61,11 @@ func main() { if *ránAddr != "" { // run RPC server, tasking clients to flut - r := rpc.SummonRán(*ránAddr) + wg.Add(1) + r := rpc.SummonRán(*ránAddr, stopChan, &wg) r.SetTask(img, offset, *address, *connections) // @incomplete - select {} // block forever } else { - - // local 🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊 - var wg sync.WaitGroup - defer wg.Wait() - stopChan := make(chan bool) - defer close(stopChan) - - wg.Add(1) // :cleanExit: is this WG needed? we only have one task running at a time? - go pixelflut.Flut(img, offset, *shuffle, *address, *connections, stopChan, &wg) - // fetch server state and save to file // @incomplete: make this available also when not fluting? if *fetchImgPath != "" { @@ -78,30 +74,26 @@ func main() { defer writeImage(*fetchImgPath, fetchedImg) } - // :cleanExit logic: - // notify all async tasks to stop on interrupt or after timeout, - // then wait for clean shutdown of all tasks before exiting - // TODO: make this available to all invocation types - - timer, err := time.ParseDuration(*runtime) - if err != nil { - log.Fatal("Invalid runtime specified: " + err.Error()) - } - - interruptChan := make(chan os.Signal) - signal.Notify(interruptChan, os.Interrupt) - select { - case <-time.After(timer): - case <-interruptChan: - } + // local 🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊 + wg.Add(1) + go pixelflut.Flut(img, offset, *shuffle, *address, *connections, stopChan, &wg) } } else if *hevringAddr != "" { // connect to RPC server and execute their tasks rpc.ConnectHevring(*hevringAddr) - select {} // block forever } else { log.Fatal("must specify -image or -hevring") } + + // :cleanExit logic: + // notify all async tasks to stop on interrupt + // then wait for clean shutdown of all tasks before exiting + // TODO: make this available to all invocation types + select { + case <-interruptChan: + } + close(stopChan) + wg.Wait() } diff --git a/pixelflut/api.go b/pixelflut/api.go index 01b6f9a..99bf9b2 100644 --- a/pixelflut/api.go +++ b/pixelflut/api.go @@ -11,9 +11,9 @@ import ( ) // Flut asynchronously sends the given image to pixelflut server at `address` -// using `conns` connections. Pixels are sent row wise, unless `shuffle` is set. +// using `conns` connections. Pixels are sent column wise, unless `shuffle` +// is set. Stops when stop is closed. // @cleanup: use FlutTask{} as arg -// @incomplete :cleanExit func Flut(img image.Image, position image.Point, shuffle bool, address string, conns int, stop chan bool, wg *sync.WaitGroup) { cmds := commandsFromImage(img, position) if shuffle { @@ -27,14 +27,16 @@ func Flut(img image.Image, position image.Point, shuffle bool, address string, c go bombAddress(msg, address, stop, &bombWg) } bombWg.Wait() - wg.Done() + if wg != nil { + wg.Done() + } } // FetchImage asynchronously uses `conns` to fetch pixels within `bounds` from // a pixelflut server at `address`, and writes them into the returned Image. func FetchImage(bounds image.Rectangle, address string, conns int, stop chan bool) (img *image.NRGBA) { img = image.NewNRGBA(bounds) - // cmds := cmdsFetchImage(bounds).Chunk(conns) + cmds := cmdsFetchImage(bounds).Chunk(conns) for i := 0; i < conns; i++ { conn, err := net.Dial("tcp", address) @@ -42,18 +44,14 @@ func FetchImage(bounds image.Rectangle, address string, conns int, stop chan boo log.Fatal(err) } - // @cleanup: parsePixels calls conn.Close(), as deferring it here would - // instantly close it go readPixels(img, conn, stop) - // go bombConn(cmds[i], conn, stop) + go bombConn(cmds[i], conn, stop) } return img } func readPixels(target *image.NRGBA, conn net.Conn, stop chan bool) { - defer conn.Close() - reader := bufio.NewReader(conn) col := make([]byte, 3) for { diff --git a/pixelflut/net.go b/pixelflut/net.go index 8eb85c8..d222fbb 100644 --- a/pixelflut/net.go +++ b/pixelflut/net.go @@ -10,7 +10,7 @@ import ( // called as goroutines // bombAddress writes the given message via plain TCP to the given address, -// forever, as fast as possible. +// as fast as possible, until stop is closed. func bombAddress(message []byte, address string, stop chan bool, wg *sync.WaitGroup) { conn, err := net.Dial("tcp", address) if err != nil { @@ -25,7 +25,6 @@ func bombConn(message []byte, conn net.Conn, stop chan bool) { for { select { case <-stop: - log.Println("stopChan bombConn") return default: _, err := conn.Write(message) diff --git a/rpc/dottir.go b/rpc/dottir.go index b265f3b..d904cf9 100644 --- a/rpc/dottir.go +++ b/rpc/dottir.go @@ -6,7 +6,7 @@ import ( "log" "net" "net/rpc" - // "sync" + "os" "time" "github.com/SpeckiJ/Hochwasser/pixelflut" @@ -50,7 +50,6 @@ func (h *Hevring) Flut(task FlutTask, reply *FlutAck) error { fmt.Printf("[hevring] Rán gave us /w o r k/! %v\n", task) h.task = task h.taskQuit = make(chan bool) - // @incomplete: async errorhandling go pixelflut.Flut(task.Img, task.Offset, task.Shuffle, task.Address, task.MaxConns, h.taskQuit, nil) reply.Ok = true @@ -64,7 +63,6 @@ func (h *Hevring) Status(x int, reply *FlutAck) error { } func (h *Hevring) Stop(x int, reply *FlutAck) error { - // @incomplete if (h.task != FlutTask{}) { fmt.Println("[hevring] stopping task") h.task = FlutTask{} @@ -78,7 +76,8 @@ func (h *Hevring) Stop(x int, reply *FlutAck) error { func (h *Hevring) Die(x int, reply *FlutAck) error { go func() { // @cleanup: hacky time.Sleep(100 * time.Millisecond) - log.Fatal("[hevring] Rán disconnected, stopping") + fmt.Println("[hevring] Rán disconnected, stopping") + os.Exit(0) }() reply.Ok = true return nil diff --git a/rpc/ran.go b/rpc/ran.go index 5b590fc..a521d56 100644 --- a/rpc/ran.go +++ b/rpc/ran.go @@ -9,8 +9,8 @@ import ( "net" "net/rpc" "os" - "os/signal" "strings" + "sync" "time" ) @@ -19,7 +19,10 @@ type Rán struct { task FlutTask } -func SummonRán(address string) *Rán { +// SummonRán sets up the RPC master, accepting connections at addres (":1234") +// Connects calls methods on each client's rpc provider, killing all clients +// when stopChan is closed. +func SummonRán(address string, stopChan chan bool, wg *sync.WaitGroup) *Rán { r := new(Rán) l, err := net.Listen("tcp", address) @@ -58,8 +61,8 @@ func SummonRán(address string) *Rán { var clients []*rpc.Client for _, c := range r.clients { status := FlutAck{} - err3 := c.Call("Hevring.Status", 0, &status) - if err3 == nil || status.Ok { + err := c.Call("Hevring.Status", 0, &status) + if err == nil && status.Ok { clients = append(clients, c) } } @@ -100,21 +103,18 @@ func SummonRán(address string) *Rán { // kill clients on exit go func() { - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, os.Interrupt) - for { - <-sigChan - for _, c := range r.clients { - ack := FlutAck{} - c.Call("Hevring.Die", 0, &ack) // @speed: async - } - os.Exit(0) // @bug :cleanExit + <-stopChan + for _, c := range r.clients { + ack := FlutAck{} + c.Call("Hevring.Die", 0, &ack) // @speed: async } + wg.Done() }() return r } +// SetTask assigns a FlutTask to Rán, distributing it to all clients func (r *Rán) SetTask(img image.Image, offset image.Point, address string, maxConns int) { // @incomplete: smart task creation: // fetch server state & sample foreign activity in image regions. assign