From 9ab04b4f2691b1627e3892cfb7c3f43b1a94a3d9 Mon Sep 17 00:00:00 2001 From: Norwin Roosen Date: Thu, 13 Feb 2020 22:26:50 +0100 Subject: [PATCH] wip: clean exit start adding a mechanism for stopping async tasks, so we can cleanly quit and stop fluting without quitting --- main.go | 32 ++++++++++++++++-------- pixelflut/api.go | 63 +++++++++++++++++++++++++++++------------------- pixelflut/net.go | 21 ++++++++++------ rpc/dottir.go | 10 ++++++-- 4 files changed, 82 insertions(+), 44 deletions(-) diff --git a/main.go b/main.go index eaed2d8..d9cb5aa 100644 --- a/main.go +++ b/main.go @@ -6,7 +6,9 @@ import ( "log" "net" "os" + "os/signal" "runtime/pprof" + "sync" "time" "github.com/SpeckiJ/Hochwasser/pixelflut" @@ -60,22 +62,38 @@ func main() { } else { // local 🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊 - pixelflut.Flut(img, offset, *shuffle, *address, *connections) + var wg sync.WaitGroup + defer wg.Wait() + stopChan := make(chan bool) + defer close(stopChan) + + wg.Add(1) // :cleanExit: is this WG needed? we only have one task running at a time? + go pixelflut.Flut(img, offset, *shuffle, *address, *connections, stopChan, &wg) // fetch server state and save to file // @incomplete: make this available also when not fluting? if *fetchImgPath != "" { - fetchedImg := pixelflut.FetchImage(img.Bounds().Add(offset), *address, 1) + fetchedImg := pixelflut.FetchImage(img.Bounds().Add(offset), *address, 1, stopChan) *connections -= 1 defer writeImage(*fetchImgPath, fetchedImg) } - // Terminate after timeout to save resources + // :cleanExit logic: + // notify all async tasks to stop on interrupt or after timeout, + // then wait for clean shutdown of all tasks before exiting + // TODO: make this available to all invocation types + timer, err := time.ParseDuration(*runtime) if err != nil { log.Fatal("Invalid runtime specified: " + err.Error()) } - time.Sleep(timer) + + interruptChan := make(chan os.Signal) + signal.Notify(interruptChan, os.Interrupt) + select { + case <-time.After(timer): + case <-interruptChan: + } } } else if *hevringAddr != "" { @@ -87,9 +105,3 @@ func main() { log.Fatal("must specify -image or -hevring") } } - -/** - * @incomplete: clean exit - * to ensure cleanup is done (rpc disconnects, cpuprof, image writing, ...), - * we should catch signals and force-exit all goroutines (bomb, rpc). via channel? - */ diff --git a/pixelflut/api.go b/pixelflut/api.go index 0ed768a..01b6f9a 100644 --- a/pixelflut/api.go +++ b/pixelflut/api.go @@ -7,27 +7,34 @@ import ( "image/color" "log" "net" + "sync" ) // Flut asynchronously sends the given image to pixelflut server at `address` // using `conns` connections. Pixels are sent row wise, unless `shuffle` is set. -func Flut(img image.Image, position image.Point, shuffle bool, address string, conns int) { +// @cleanup: use FlutTask{} as arg +// @incomplete :cleanExit +func Flut(img image.Image, position image.Point, shuffle bool, address string, conns int, stop chan bool, wg *sync.WaitGroup) { cmds := commandsFromImage(img, position) if shuffle { cmds.Shuffle() } - messages := cmds.Chunk(conns) + + bombWg := sync.WaitGroup{} for _, msg := range messages { - go bombAddress(msg, address) + bombWg.Add(1) + go bombAddress(msg, address, stop, &bombWg) } + bombWg.Wait() + wg.Done() } // FetchImage asynchronously uses `conns` to fetch pixels within `bounds` from // a pixelflut server at `address`, and writes them into the returned Image. -func FetchImage(bounds image.Rectangle, address string, conns int) (img *image.NRGBA) { +func FetchImage(bounds image.Rectangle, address string, conns int, stop chan bool) (img *image.NRGBA) { img = image.NewNRGBA(bounds) - cmds := cmdsFetchImage(bounds).Chunk(conns) + // cmds := cmdsFetchImage(bounds).Chunk(conns) for i := 0; i < conns; i++ { conn, err := net.Dial("tcp", address) @@ -37,44 +44,50 @@ func FetchImage(bounds image.Rectangle, address string, conns int) (img *image.N // @cleanup: parsePixels calls conn.Close(), as deferring it here would // instantly close it - go readPixels(img, conn) - go bombConn(cmds[i], conn) + go readPixels(img, conn, stop) + // go bombConn(cmds[i], conn, stop) } return img } -func readPixels(target *image.NRGBA, conn net.Conn) { +func readPixels(target *image.NRGBA, conn net.Conn, stop chan bool) { defer conn.Close() reader := bufio.NewReader(conn) col := make([]byte, 3) for { - res, err := reader.ReadSlice('\n') - if err != nil { - log.Fatal(err) - } + select { + case <-stop: + return - // parse response ("PX \n") - colorStart := len(res) - 7 - xy := res[3:colorStart - 1] - yStart := 0 - for yStart = len(xy) - 2; yStart >= 0; yStart-- { - if xy[yStart] == ' ' { - break + default: + res, err := reader.ReadSlice('\n') + if err != nil { + log.Fatal(err) } - } - x := asciiToInt(xy[:yStart]) - y := asciiToInt(xy[yStart + 1:]) - hex.Decode(col, res[colorStart:len(res) - 1]) - target.SetNRGBA(x, y, color.NRGBA{ col[0], col[1], col[2], 255 }) + // parse response ("PX \n") + colorStart := len(res) - 7 + xy := res[3 : colorStart-1] + yStart := 0 + for yStart = len(xy) - 2; yStart >= 0; yStart-- { + if xy[yStart] == ' ' { + break + } + } + x := asciiToInt(xy[:yStart]) + y := asciiToInt(xy[yStart+1:]) + hex.Decode(col, res[colorStart:len(res)-1]) + + target.SetNRGBA(x, y, color.NRGBA{col[0], col[1], col[2], 255}) + } } } func asciiToInt(buf []byte) (v int) { for _, c := range buf { - v = v * 10 + int(c - '0') + v = v*10 + int(c-'0') } return v } diff --git a/pixelflut/net.go b/pixelflut/net.go index a09aaf6..8eb85c8 100644 --- a/pixelflut/net.go +++ b/pixelflut/net.go @@ -3,6 +3,7 @@ package pixelflut import ( "log" "net" + "sync" ) // @speed: add some performance reporting mechanism on these functions when @@ -10,21 +11,27 @@ import ( // bombAddress writes the given message via plain TCP to the given address, // forever, as fast as possible. -func bombAddress(message []byte, address string) { +func bombAddress(message []byte, address string, stop chan bool, wg *sync.WaitGroup) { conn, err := net.Dial("tcp", address) if err != nil { log.Fatal(err) } defer conn.Close() - - bombConn(message, conn) + bombConn(message, conn, stop) + wg.Done() } -func bombConn(message []byte, conn net.Conn) { +func bombConn(message []byte, conn net.Conn, stop chan bool) { for { - _, err := conn.Write(message) - if err != nil { - log.Fatal(err) + select { + case <-stop: + log.Println("stopChan bombConn") + return + default: + _, err := conn.Write(message) + if err != nil { + log.Fatal(err) + } } } } diff --git a/rpc/dottir.go b/rpc/dottir.go index 14e9b83..b265f3b 100644 --- a/rpc/dottir.go +++ b/rpc/dottir.go @@ -6,6 +6,7 @@ import ( "log" "net" "net/rpc" + // "sync" "time" "github.com/SpeckiJ/Hochwasser/pixelflut" @@ -24,7 +25,8 @@ func ConnectHevring(rÑnAddress string) { } type Hevring struct { - task FlutTask + task FlutTask + taskQuit chan bool } type FlutTask struct { @@ -47,8 +49,10 @@ func (h *Hevring) Flut(task FlutTask, reply *FlutAck) error { fmt.Printf("[hevring] RÑn gave us /w o r k/! %v\n", task) h.task = task + h.taskQuit = make(chan bool) // @incomplete: async errorhandling - pixelflut.Flut(task.Img, task.Offset, task.Shuffle, task.Address, task.MaxConns) + + go pixelflut.Flut(task.Img, task.Offset, task.Shuffle, task.Address, task.MaxConns, h.taskQuit, nil) reply.Ok = true return nil } @@ -64,6 +68,8 @@ func (h *Hevring) Stop(x int, reply *FlutAck) error { if (h.task != FlutTask{}) { fmt.Println("[hevring] stopping task") h.task = FlutTask{} + close(h.taskQuit) + h.taskQuit = nil reply.Ok = true } return nil