refactor clean exit code

so much work just to let cleanup run & to pause fluting :(
This commit is contained in:
Norwin Roosen 2020-02-13 23:57:56 +01:00
parent 9ab04b4f26
commit 3f06bf6b82
5 changed files with 47 additions and 59 deletions

54
main.go
View File

@ -9,7 +9,6 @@ import (
"os/signal" "os/signal"
"runtime/pprof" "runtime/pprof"
"sync" "sync"
"time"
"github.com/SpeckiJ/Hochwasser/pixelflut" "github.com/SpeckiJ/Hochwasser/pixelflut"
"github.com/SpeckiJ/Hochwasser/rpc" "github.com/SpeckiJ/Hochwasser/rpc"
@ -22,7 +21,6 @@ var x = flag.Int("x", 0, "Offset of posted image from left border")
var y = flag.Int("y", 0, "Offset of posted image from top border") var y = flag.Int("y", 0, "Offset of posted image from top border")
var connections = flag.Int("connections", 4, "Number of simultaneous connections. Each connection posts a subimage") var connections = flag.Int("connections", 4, "Number of simultaneous connections. Each connection posts a subimage")
var address = flag.String("host", "127.0.0.1:1337", "Server address") var address = flag.String("host", "127.0.0.1:1337", "Server address")
var runtime = flag.String("runtime", "60s", "exit after timeout")
var shuffle = flag.Bool("shuffle", false, "pixel send ordering") var shuffle = flag.Bool("shuffle", false, "pixel send ordering")
var fetchImgPath = flag.String("fetch-image", "", "path to save the fetched pixel state to") var fetchImgPath = flag.String("fetch-image", "", "path to save the fetched pixel state to")
var ránAddr = flag.String("rán", "", "enable rpc server to distribute jobs, listening on the given address/port") var ránAddr = flag.String("rán", "", "enable rpc server to distribute jobs, listening on the given address/port")
@ -42,6 +40,14 @@ func main() {
defer pprof.StopCPUProfile() defer pprof.StopCPUProfile()
} }
// :cleanExit setup
// stop chan is closed at end of main process, telling async tasks to stop.
// wg waits until async tasks gracefully stopped
wg := sync.WaitGroup{}
stopChan := make(chan bool)
interruptChan := make(chan os.Signal)
signal.Notify(interruptChan, os.Interrupt)
if *imgPath != "" { if *imgPath != "" {
offset := image.Pt(*x, *y) offset := image.Pt(*x, *y)
img := readImage(*imgPath) img := readImage(*imgPath)
@ -55,21 +61,11 @@ func main() {
if *ránAddr != "" { if *ránAddr != "" {
// run RPC server, tasking clients to flut // run RPC server, tasking clients to flut
r := rpc.SummonRán(*ránAddr) wg.Add(1)
r := rpc.SummonRán(*ránAddr, stopChan, &wg)
r.SetTask(img, offset, *address, *connections) // @incomplete r.SetTask(img, offset, *address, *connections) // @incomplete
select {} // block forever
} else { } else {
// local 🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊
var wg sync.WaitGroup
defer wg.Wait()
stopChan := make(chan bool)
defer close(stopChan)
wg.Add(1) // :cleanExit: is this WG needed? we only have one task running at a time?
go pixelflut.Flut(img, offset, *shuffle, *address, *connections, stopChan, &wg)
// fetch server state and save to file // fetch server state and save to file
// @incomplete: make this available also when not fluting? // @incomplete: make this available also when not fluting?
if *fetchImgPath != "" { if *fetchImgPath != "" {
@ -78,30 +74,26 @@ func main() {
defer writeImage(*fetchImgPath, fetchedImg) defer writeImage(*fetchImgPath, fetchedImg)
} }
// :cleanExit logic: // local 🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊
// notify all async tasks to stop on interrupt or after timeout, wg.Add(1)
// then wait for clean shutdown of all tasks before exiting go pixelflut.Flut(img, offset, *shuffle, *address, *connections, stopChan, &wg)
// TODO: make this available to all invocation types
timer, err := time.ParseDuration(*runtime)
if err != nil {
log.Fatal("Invalid runtime specified: " + err.Error())
}
interruptChan := make(chan os.Signal)
signal.Notify(interruptChan, os.Interrupt)
select {
case <-time.After(timer):
case <-interruptChan:
}
} }
} else if *hevringAddr != "" { } else if *hevringAddr != "" {
// connect to RPC server and execute their tasks // connect to RPC server and execute their tasks
rpc.ConnectHevring(*hevringAddr) rpc.ConnectHevring(*hevringAddr)
select {} // block forever
} else { } else {
log.Fatal("must specify -image or -hevring") log.Fatal("must specify -image or -hevring")
} }
// :cleanExit logic:
// notify all async tasks to stop on interrupt
// then wait for clean shutdown of all tasks before exiting
// TODO: make this available to all invocation types
select {
case <-interruptChan:
}
close(stopChan)
wg.Wait()
} }

View File

@ -11,9 +11,9 @@ import (
) )
// Flut asynchronously sends the given image to pixelflut server at `address` // Flut asynchronously sends the given image to pixelflut server at `address`
// using `conns` connections. Pixels are sent row wise, unless `shuffle` is set. // using `conns` connections. Pixels are sent column wise, unless `shuffle`
// is set. Stops when stop is closed.
// @cleanup: use FlutTask{} as arg // @cleanup: use FlutTask{} as arg
// @incomplete :cleanExit
func Flut(img image.Image, position image.Point, shuffle bool, address string, conns int, stop chan bool, wg *sync.WaitGroup) { func Flut(img image.Image, position image.Point, shuffle bool, address string, conns int, stop chan bool, wg *sync.WaitGroup) {
cmds := commandsFromImage(img, position) cmds := commandsFromImage(img, position)
if shuffle { if shuffle {
@ -27,14 +27,16 @@ func Flut(img image.Image, position image.Point, shuffle bool, address string, c
go bombAddress(msg, address, stop, &bombWg) go bombAddress(msg, address, stop, &bombWg)
} }
bombWg.Wait() bombWg.Wait()
wg.Done() if wg != nil {
wg.Done()
}
} }
// FetchImage asynchronously uses `conns` to fetch pixels within `bounds` from // FetchImage asynchronously uses `conns` to fetch pixels within `bounds` from
// a pixelflut server at `address`, and writes them into the returned Image. // a pixelflut server at `address`, and writes them into the returned Image.
func FetchImage(bounds image.Rectangle, address string, conns int, stop chan bool) (img *image.NRGBA) { func FetchImage(bounds image.Rectangle, address string, conns int, stop chan bool) (img *image.NRGBA) {
img = image.NewNRGBA(bounds) img = image.NewNRGBA(bounds)
// cmds := cmdsFetchImage(bounds).Chunk(conns) cmds := cmdsFetchImage(bounds).Chunk(conns)
for i := 0; i < conns; i++ { for i := 0; i < conns; i++ {
conn, err := net.Dial("tcp", address) conn, err := net.Dial("tcp", address)
@ -42,18 +44,14 @@ func FetchImage(bounds image.Rectangle, address string, conns int, stop chan boo
log.Fatal(err) log.Fatal(err)
} }
// @cleanup: parsePixels calls conn.Close(), as deferring it here would
// instantly close it
go readPixels(img, conn, stop) go readPixels(img, conn, stop)
// go bombConn(cmds[i], conn, stop) go bombConn(cmds[i], conn, stop)
} }
return img return img
} }
func readPixels(target *image.NRGBA, conn net.Conn, stop chan bool) { func readPixels(target *image.NRGBA, conn net.Conn, stop chan bool) {
defer conn.Close()
reader := bufio.NewReader(conn) reader := bufio.NewReader(conn)
col := make([]byte, 3) col := make([]byte, 3)
for { for {

View File

@ -10,7 +10,7 @@ import (
// called as goroutines // called as goroutines
// bombAddress writes the given message via plain TCP to the given address, // bombAddress writes the given message via plain TCP to the given address,
// forever, as fast as possible. // as fast as possible, until stop is closed.
func bombAddress(message []byte, address string, stop chan bool, wg *sync.WaitGroup) { func bombAddress(message []byte, address string, stop chan bool, wg *sync.WaitGroup) {
conn, err := net.Dial("tcp", address) conn, err := net.Dial("tcp", address)
if err != nil { if err != nil {
@ -25,7 +25,6 @@ func bombConn(message []byte, conn net.Conn, stop chan bool) {
for { for {
select { select {
case <-stop: case <-stop:
log.Println("stopChan bombConn")
return return
default: default:
_, err := conn.Write(message) _, err := conn.Write(message)

View File

@ -6,7 +6,7 @@ import (
"log" "log"
"net" "net"
"net/rpc" "net/rpc"
// "sync" "os"
"time" "time"
"github.com/SpeckiJ/Hochwasser/pixelflut" "github.com/SpeckiJ/Hochwasser/pixelflut"
@ -50,7 +50,6 @@ func (h *Hevring) Flut(task FlutTask, reply *FlutAck) error {
fmt.Printf("[hevring] Rán gave us /w o r k/! %v\n", task) fmt.Printf("[hevring] Rán gave us /w o r k/! %v\n", task)
h.task = task h.task = task
h.taskQuit = make(chan bool) h.taskQuit = make(chan bool)
// @incomplete: async errorhandling
go pixelflut.Flut(task.Img, task.Offset, task.Shuffle, task.Address, task.MaxConns, h.taskQuit, nil) go pixelflut.Flut(task.Img, task.Offset, task.Shuffle, task.Address, task.MaxConns, h.taskQuit, nil)
reply.Ok = true reply.Ok = true
@ -64,7 +63,6 @@ func (h *Hevring) Status(x int, reply *FlutAck) error {
} }
func (h *Hevring) Stop(x int, reply *FlutAck) error { func (h *Hevring) Stop(x int, reply *FlutAck) error {
// @incomplete
if (h.task != FlutTask{}) { if (h.task != FlutTask{}) {
fmt.Println("[hevring] stopping task") fmt.Println("[hevring] stopping task")
h.task = FlutTask{} h.task = FlutTask{}
@ -78,7 +76,8 @@ func (h *Hevring) Stop(x int, reply *FlutAck) error {
func (h *Hevring) Die(x int, reply *FlutAck) error { func (h *Hevring) Die(x int, reply *FlutAck) error {
go func() { // @cleanup: hacky go func() { // @cleanup: hacky
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
log.Fatal("[hevring] Rán disconnected, stopping") fmt.Println("[hevring] Rán disconnected, stopping")
os.Exit(0)
}() }()
reply.Ok = true reply.Ok = true
return nil return nil

View File

@ -9,8 +9,8 @@ import (
"net" "net"
"net/rpc" "net/rpc"
"os" "os"
"os/signal"
"strings" "strings"
"sync"
"time" "time"
) )
@ -19,7 +19,10 @@ type Rán struct {
task FlutTask task FlutTask
} }
func SummonRán(address string) *Rán { // SummonRán sets up the RPC master, accepting connections at addres (":1234")
// Connects calls methods on each client's rpc provider, killing all clients
// when stopChan is closed.
func SummonRán(address string, stopChan chan bool, wg *sync.WaitGroup) *Rán {
r := new(Rán) r := new(Rán)
l, err := net.Listen("tcp", address) l, err := net.Listen("tcp", address)
@ -58,8 +61,8 @@ func SummonRán(address string) *Rán {
var clients []*rpc.Client var clients []*rpc.Client
for _, c := range r.clients { for _, c := range r.clients {
status := FlutAck{} status := FlutAck{}
err3 := c.Call("Hevring.Status", 0, &status) err := c.Call("Hevring.Status", 0, &status)
if err3 == nil || status.Ok { if err == nil && status.Ok {
clients = append(clients, c) clients = append(clients, c)
} }
} }
@ -100,21 +103,18 @@ func SummonRán(address string) *Rán {
// kill clients on exit // kill clients on exit
go func() { go func() {
sigChan := make(chan os.Signal, 1) <-stopChan
signal.Notify(sigChan, os.Interrupt) for _, c := range r.clients {
for { ack := FlutAck{}
<-sigChan c.Call("Hevring.Die", 0, &ack) // @speed: async
for _, c := range r.clients {
ack := FlutAck{}
c.Call("Hevring.Die", 0, &ack) // @speed: async
}
os.Exit(0) // @bug :cleanExit
} }
wg.Done()
}() }()
return r return r
} }
// SetTask assigns a FlutTask to Rán, distributing it to all clients
func (r *Rán) SetTask(img image.Image, offset image.Point, address string, maxConns int) { func (r *Rán) SetTask(img image.Image, offset image.Point, address string, maxConns int) {
// @incomplete: smart task creation: // @incomplete: smart task creation:
// fetch server state & sample foreign activity in image regions. assign // fetch server state & sample foreign activity in image regions. assign