wip: clean exit

start adding a mechanism for stopping async tasks,
so we can cleanly quit and stop fluting without quitting
This commit is contained in:
Norwin Roosen 2020-02-13 22:26:50 +01:00
parent ada015e90f
commit 9ab04b4f26
4 changed files with 82 additions and 44 deletions

32
main.go
View File

@ -6,7 +6,9 @@ import (
"log" "log"
"net" "net"
"os" "os"
"os/signal"
"runtime/pprof" "runtime/pprof"
"sync"
"time" "time"
"github.com/SpeckiJ/Hochwasser/pixelflut" "github.com/SpeckiJ/Hochwasser/pixelflut"
@ -60,22 +62,38 @@ func main() {
} else { } else {
// local 🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊 // local 🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊
pixelflut.Flut(img, offset, *shuffle, *address, *connections) var wg sync.WaitGroup
defer wg.Wait()
stopChan := make(chan bool)
defer close(stopChan)
wg.Add(1) // :cleanExit: is this WG needed? we only have one task running at a time?
go pixelflut.Flut(img, offset, *shuffle, *address, *connections, stopChan, &wg)
// fetch server state and save to file // fetch server state and save to file
// @incomplete: make this available also when not fluting? // @incomplete: make this available also when not fluting?
if *fetchImgPath != "" { if *fetchImgPath != "" {
fetchedImg := pixelflut.FetchImage(img.Bounds().Add(offset), *address, 1) fetchedImg := pixelflut.FetchImage(img.Bounds().Add(offset), *address, 1, stopChan)
*connections -= 1 *connections -= 1
defer writeImage(*fetchImgPath, fetchedImg) defer writeImage(*fetchImgPath, fetchedImg)
} }
// Terminate after timeout to save resources // :cleanExit logic:
// notify all async tasks to stop on interrupt or after timeout,
// then wait for clean shutdown of all tasks before exiting
// TODO: make this available to all invocation types
timer, err := time.ParseDuration(*runtime) timer, err := time.ParseDuration(*runtime)
if err != nil { if err != nil {
log.Fatal("Invalid runtime specified: " + err.Error()) log.Fatal("Invalid runtime specified: " + err.Error())
} }
time.Sleep(timer)
interruptChan := make(chan os.Signal)
signal.Notify(interruptChan, os.Interrupt)
select {
case <-time.After(timer):
case <-interruptChan:
}
} }
} else if *hevringAddr != "" { } else if *hevringAddr != "" {
@ -87,9 +105,3 @@ func main() {
log.Fatal("must specify -image or -hevring") log.Fatal("must specify -image or -hevring")
} }
} }
/**
* @incomplete: clean exit
* to ensure cleanup is done (rpc disconnects, cpuprof, image writing, ...),
* we should catch signals and force-exit all goroutines (bomb, rpc). via channel?
*/

View File

@ -7,27 +7,34 @@ import (
"image/color" "image/color"
"log" "log"
"net" "net"
"sync"
) )
// Flut asynchronously sends the given image to pixelflut server at `address` // Flut asynchronously sends the given image to pixelflut server at `address`
// using `conns` connections. Pixels are sent row wise, unless `shuffle` is set. // using `conns` connections. Pixels are sent row wise, unless `shuffle` is set.
func Flut(img image.Image, position image.Point, shuffle bool, address string, conns int) { // @cleanup: use FlutTask{} as arg
// @incomplete :cleanExit
func Flut(img image.Image, position image.Point, shuffle bool, address string, conns int, stop chan bool, wg *sync.WaitGroup) {
cmds := commandsFromImage(img, position) cmds := commandsFromImage(img, position)
if shuffle { if shuffle {
cmds.Shuffle() cmds.Shuffle()
} }
messages := cmds.Chunk(conns) messages := cmds.Chunk(conns)
bombWg := sync.WaitGroup{}
for _, msg := range messages { for _, msg := range messages {
go bombAddress(msg, address) bombWg.Add(1)
go bombAddress(msg, address, stop, &bombWg)
} }
bombWg.Wait()
wg.Done()
} }
// FetchImage asynchronously uses `conns` to fetch pixels within `bounds` from // FetchImage asynchronously uses `conns` to fetch pixels within `bounds` from
// a pixelflut server at `address`, and writes them into the returned Image. // a pixelflut server at `address`, and writes them into the returned Image.
func FetchImage(bounds image.Rectangle, address string, conns int) (img *image.NRGBA) { func FetchImage(bounds image.Rectangle, address string, conns int, stop chan bool) (img *image.NRGBA) {
img = image.NewNRGBA(bounds) img = image.NewNRGBA(bounds)
cmds := cmdsFetchImage(bounds).Chunk(conns) // cmds := cmdsFetchImage(bounds).Chunk(conns)
for i := 0; i < conns; i++ { for i := 0; i < conns; i++ {
conn, err := net.Dial("tcp", address) conn, err := net.Dial("tcp", address)
@ -37,19 +44,24 @@ func FetchImage(bounds image.Rectangle, address string, conns int) (img *image.N
// @cleanup: parsePixels calls conn.Close(), as deferring it here would // @cleanup: parsePixels calls conn.Close(), as deferring it here would
// instantly close it // instantly close it
go readPixels(img, conn) go readPixels(img, conn, stop)
go bombConn(cmds[i], conn) // go bombConn(cmds[i], conn, stop)
} }
return img return img
} }
func readPixels(target *image.NRGBA, conn net.Conn) { func readPixels(target *image.NRGBA, conn net.Conn, stop chan bool) {
defer conn.Close() defer conn.Close()
reader := bufio.NewReader(conn) reader := bufio.NewReader(conn)
col := make([]byte, 3) col := make([]byte, 3)
for { for {
select {
case <-stop:
return
default:
res, err := reader.ReadSlice('\n') res, err := reader.ReadSlice('\n')
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
@ -57,7 +69,7 @@ func readPixels(target *image.NRGBA, conn net.Conn) {
// parse response ("PX <x> <y> <col>\n") // parse response ("PX <x> <y> <col>\n")
colorStart := len(res) - 7 colorStart := len(res) - 7
xy := res[3:colorStart - 1] xy := res[3 : colorStart-1]
yStart := 0 yStart := 0
for yStart = len(xy) - 2; yStart >= 0; yStart-- { for yStart = len(xy) - 2; yStart >= 0; yStart-- {
if xy[yStart] == ' ' { if xy[yStart] == ' ' {
@ -65,16 +77,17 @@ func readPixels(target *image.NRGBA, conn net.Conn) {
} }
} }
x := asciiToInt(xy[:yStart]) x := asciiToInt(xy[:yStart])
y := asciiToInt(xy[yStart + 1:]) y := asciiToInt(xy[yStart+1:])
hex.Decode(col, res[colorStart:len(res) - 1]) hex.Decode(col, res[colorStart:len(res)-1])
target.SetNRGBA(x, y, color.NRGBA{ col[0], col[1], col[2], 255 }) target.SetNRGBA(x, y, color.NRGBA{col[0], col[1], col[2], 255})
}
} }
} }
func asciiToInt(buf []byte) (v int) { func asciiToInt(buf []byte) (v int) {
for _, c := range buf { for _, c := range buf {
v = v * 10 + int(c - '0') v = v*10 + int(c-'0')
} }
return v return v
} }

View File

@ -3,6 +3,7 @@ package pixelflut
import ( import (
"log" "log"
"net" "net"
"sync"
) )
// @speed: add some performance reporting mechanism on these functions when // @speed: add some performance reporting mechanism on these functions when
@ -10,21 +11,27 @@ import (
// bombAddress writes the given message via plain TCP to the given address, // bombAddress writes the given message via plain TCP to the given address,
// forever, as fast as possible. // forever, as fast as possible.
func bombAddress(message []byte, address string) { func bombAddress(message []byte, address string, stop chan bool, wg *sync.WaitGroup) {
conn, err := net.Dial("tcp", address) conn, err := net.Dial("tcp", address)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
defer conn.Close() defer conn.Close()
bombConn(message, conn, stop)
bombConn(message, conn) wg.Done()
} }
func bombConn(message []byte, conn net.Conn) { func bombConn(message []byte, conn net.Conn, stop chan bool) {
for { for {
select {
case <-stop:
log.Println("stopChan bombConn")
return
default:
_, err := conn.Write(message) _, err := conn.Write(message)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
} }
}
} }

View File

@ -6,6 +6,7 @@ import (
"log" "log"
"net" "net"
"net/rpc" "net/rpc"
// "sync"
"time" "time"
"github.com/SpeckiJ/Hochwasser/pixelflut" "github.com/SpeckiJ/Hochwasser/pixelflut"
@ -25,6 +26,7 @@ func ConnectHevring(ránAddress string) {
type Hevring struct { type Hevring struct {
task FlutTask task FlutTask
taskQuit chan bool
} }
type FlutTask struct { type FlutTask struct {
@ -47,8 +49,10 @@ func (h *Hevring) Flut(task FlutTask, reply *FlutAck) error {
fmt.Printf("[hevring] Rán gave us /w o r k/! %v\n", task) fmt.Printf("[hevring] Rán gave us /w o r k/! %v\n", task)
h.task = task h.task = task
h.taskQuit = make(chan bool)
// @incomplete: async errorhandling // @incomplete: async errorhandling
pixelflut.Flut(task.Img, task.Offset, task.Shuffle, task.Address, task.MaxConns)
go pixelflut.Flut(task.Img, task.Offset, task.Shuffle, task.Address, task.MaxConns, h.taskQuit, nil)
reply.Ok = true reply.Ok = true
return nil return nil
} }
@ -64,6 +68,8 @@ func (h *Hevring) Stop(x int, reply *FlutAck) error {
if (h.task != FlutTask{}) { if (h.task != FlutTask{}) {
fmt.Println("[hevring] stopping task") fmt.Println("[hevring] stopping task")
h.task = FlutTask{} h.task = FlutTask{}
close(h.taskQuit)
h.taskQuit = nil
reply.Ok = true reply.Ok = true
} }
return nil return nil