refactor main
cleaner code, more flexible flag usage, graceful exit for hevring, improved image fetching
This commit is contained in:
parent
19dbfa34bf
commit
bb7ffbddfc
194
main.go
194
main.go
|
@ -2,105 +2,141 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"flag"
|
"flag"
|
||||||
|
"fmt"
|
||||||
"image"
|
"image"
|
||||||
"log"
|
"log"
|
||||||
"net"
|
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"runtime/pprof"
|
"runtime/pprof"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/SpeckiJ/Hochwasser/pixelflut"
|
"github.com/SpeckiJ/Hochwasser/pixelflut"
|
||||||
"github.com/SpeckiJ/Hochwasser/render"
|
"github.com/SpeckiJ/Hochwasser/render"
|
||||||
"github.com/SpeckiJ/Hochwasser/rpc"
|
"github.com/SpeckiJ/Hochwasser/rpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
var err error
|
var (
|
||||||
var cpuprofile = flag.String("cpuprofile", "", "Destination file for CPU Profile")
|
imgPath = flag.String("image", "", "Filepath of an image to flut")
|
||||||
var imgPath = flag.String("image", "", "Absolute Path to image")
|
ránAddr = flag.String("rán", "", "Start RPC server to distribute jobs, listening on the given address/port")
|
||||||
var x = flag.Int("x", 0, "Offset of posted image from left border")
|
hevringAddr = flag.String("hevring", "", "Connect to PRC server at given address/port")
|
||||||
var y = flag.Int("y", 0, "Offset of posted image from top border")
|
address = flag.String("host", ":1234", "Target server address")
|
||||||
var connections = flag.Int("connections", 4, "Number of simultaneous connections. Each connection posts a subimage")
|
connections = flag.Int("connections", 4, "Number of simultaneous connections. Each connection posts a subimage")
|
||||||
var address = flag.String("host", "127.0.0.1:1337", "Server address")
|
x = flag.Int("x", 0, "Offset of posted image from left border")
|
||||||
var shuffle = flag.Bool("shuffle", false, "pixel send ordering")
|
y = flag.Int("y", 0, "Offset of posted image from top border")
|
||||||
var fetchImgPath = flag.String("fetch-image", "", "path to save the fetched pixel state to")
|
fetchImgPath = flag.String("fetch", "", "Enable fetching the screen area to the given local file, updating it each second")
|
||||||
var ránAddr = flag.String("rán", "", "enable rpc server to distribute jobs, listening on the given address/port")
|
cpuprofile = flag.String("cpuprofile", "", "Destination file for CPU Profile")
|
||||||
var hevringAddr = flag.String("hevring", "", "connect to rán rpc server at given address")
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
task := runWithExitHandler(taskFromFlags)
|
||||||
// Start cpu profiling if wanted
|
|
||||||
if *cpuprofile != "" {
|
if *cpuprofile != "" {
|
||||||
f, err := os.Create(*cpuprofile)
|
runWithProfiler(*cpuprofile, task)
|
||||||
|
} else {
|
||||||
|
task()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func taskFromFlags(stop chan bool, wg *sync.WaitGroup) {
|
||||||
|
rán := *ránAddr
|
||||||
|
hev := *hevringAddr
|
||||||
|
|
||||||
|
startServer := rán != "" || (hev == "" && *imgPath != "")
|
||||||
|
startClient := hev != "" || (rán == "" && *imgPath != "")
|
||||||
|
fetchImg := *fetchImgPath != ""
|
||||||
|
|
||||||
|
if !(startServer || startClient || fetchImg) {
|
||||||
|
fmt.Println("Error: At least one of the following flags is needed:\n -image -rán -hevring\n")
|
||||||
|
flag.Usage()
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
if startServer {
|
||||||
|
if rán == "" {
|
||||||
|
rán = ":5555"
|
||||||
|
}
|
||||||
|
r := rpc.SummonRán(rán, stop, wg)
|
||||||
|
|
||||||
|
var img *image.NRGBA
|
||||||
|
if *imgPath != "" {
|
||||||
|
imgTmp, err := render.ReadImage(*imgPath)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
img = render.ImgToNRGBA(imgTmp)
|
||||||
|
}
|
||||||
|
|
||||||
|
r.SetTask(img, image.Pt(*x, *y), *address, *connections)
|
||||||
|
}
|
||||||
|
|
||||||
|
if startClient {
|
||||||
|
if hev == "" {
|
||||||
|
hev = ":5555"
|
||||||
|
}
|
||||||
|
rpc.ConnectHevring(hev, stop, wg)
|
||||||
|
}
|
||||||
|
|
||||||
|
if fetchImg {
|
||||||
|
canvasToFile(*fetchImgPath, *address, time.Second, stop, wg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func canvasToFile(filepath, server string, interval time.Duration, stop chan bool, wg *sync.WaitGroup) {
|
||||||
|
// async fetch the image
|
||||||
|
fetchedImg := pixelflut.FetchImage(nil, server, 1, stop)
|
||||||
|
|
||||||
|
// write it in a fixed interval
|
||||||
|
go func() {
|
||||||
|
wg.Add(1)
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
for loop := true; loop; {
|
||||||
|
select {
|
||||||
|
case <-stop:
|
||||||
|
loop = false
|
||||||
|
case <-time.Tick(interval):
|
||||||
|
}
|
||||||
|
render.WriteImage(filepath, fetchedImg)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Takes a non-blocking function, and provides it an interface for graceful shutdown:
|
||||||
|
// stop chan is closed if the routine should be stopped. before quitting, wg is awaited.
|
||||||
|
func runWithExitHandler(task func(stop chan bool, wg *sync.WaitGroup)) func() {
|
||||||
|
return func() {
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
stopChan := make(chan bool)
|
||||||
|
interruptChan := make(chan os.Signal)
|
||||||
|
signal.Notify(interruptChan, os.Interrupt)
|
||||||
|
|
||||||
|
task(stopChan, &wg)
|
||||||
|
|
||||||
|
// block until we get an interrupt, or somebody says we need to quit (by closing stopChan)
|
||||||
|
select {
|
||||||
|
case <-interruptChan:
|
||||||
|
case <-stopChan:
|
||||||
|
stopChan = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if stopChan != nil {
|
||||||
|
// notify all async tasks to stop on interrupt, if channel wasn't closed already
|
||||||
|
close(stopChan)
|
||||||
|
}
|
||||||
|
|
||||||
|
// then wait for clean shutdown of all tasks before exiting
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func runWithProfiler(outfile string, task func()) {
|
||||||
|
f, err := os.Create(outfile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
defer f.Close()
|
defer f.Close()
|
||||||
pprof.StartCPUProfile(f)
|
pprof.StartCPUProfile(f)
|
||||||
defer pprof.StopCPUProfile()
|
defer pprof.StopCPUProfile()
|
||||||
}
|
task()
|
||||||
|
|
||||||
// :cleanExit setup
|
|
||||||
// stop chan is closed at end of main process, telling async tasks to stop.
|
|
||||||
// wg waits until async tasks gracefully stopped
|
|
||||||
wg := sync.WaitGroup{}
|
|
||||||
stopChan := make(chan bool)
|
|
||||||
interruptChan := make(chan os.Signal)
|
|
||||||
signal.Notify(interruptChan, os.Interrupt)
|
|
||||||
|
|
||||||
if *imgPath != "" {
|
|
||||||
offset := image.Pt(*x, *y)
|
|
||||||
imgTmp, err := render.ReadImage(*imgPath)
|
|
||||||
if err != nil {
|
|
||||||
log.Println(err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
img := render.ImgToNRGBA(imgTmp)
|
|
||||||
|
|
||||||
// check connectivity by opening one test connection
|
|
||||||
conn, err := net.Dial("tcp", *address)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
conn.Close()
|
|
||||||
|
|
||||||
if *ránAddr != "" {
|
|
||||||
// run RPC server, tasking clients to flut
|
|
||||||
wg.Add(1)
|
|
||||||
r := rpc.SummonRán(*ránAddr, stopChan, &wg)
|
|
||||||
// TODO: startup without a task, but init params
|
|
||||||
r.SetTask(img, offset, *address, *connections)
|
|
||||||
|
|
||||||
} else {
|
|
||||||
// fetch server state and save to file
|
|
||||||
// @incomplete: make this available also when not fluting?
|
|
||||||
if *fetchImgPath != "" {
|
|
||||||
fetchedImg := pixelflut.FetchImage(img.Bounds().Add(offset), *address, 1, stopChan)
|
|
||||||
*connections -= 1
|
|
||||||
defer render.WriteImage(*fetchImgPath, fetchedImg)
|
|
||||||
}
|
|
||||||
|
|
||||||
// local 🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊
|
|
||||||
wg.Add(1)
|
|
||||||
go pixelflut.Flut(img, offset, *shuffle, false, false, *address, *connections, stopChan, &wg)
|
|
||||||
}
|
|
||||||
|
|
||||||
} else if *hevringAddr != "" {
|
|
||||||
// connect to RPC server and execute their tasks
|
|
||||||
rpc.ConnectHevring(*hevringAddr)
|
|
||||||
|
|
||||||
} else {
|
|
||||||
log.Fatal("must specify -image or -hevring")
|
|
||||||
}
|
|
||||||
|
|
||||||
// :cleanExit logic:
|
|
||||||
// notify all async tasks to stop on interrupt
|
|
||||||
// then wait for clean shutdown of all tasks before exiting
|
|
||||||
// TODO: make this available to all invocation types
|
|
||||||
select {
|
|
||||||
case <-interruptChan:
|
|
||||||
}
|
|
||||||
close(stopChan)
|
|
||||||
wg.Wait()
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -83,9 +83,15 @@ func CanvasSize(address string) (int, int) {
|
||||||
|
|
||||||
// FetchImage asynchronously uses `conns` to fetch pixels within `bounds` from
|
// FetchImage asynchronously uses `conns` to fetch pixels within `bounds` from
|
||||||
// a pixelflut server at `address`, and writes them into the returned Image.
|
// a pixelflut server at `address`, and writes them into the returned Image.
|
||||||
func FetchImage(bounds image.Rectangle, address string, conns int, stop chan bool) (img *image.NRGBA) {
|
// If bounds is nil, the server's entire canvas is fetched.
|
||||||
img = image.NewNRGBA(bounds)
|
func FetchImage(bounds *image.Rectangle, address string, conns int, stop chan bool) (img *image.NRGBA) {
|
||||||
cmds := cmdsFetchImage(bounds).Chunk(conns)
|
if bounds == nil {
|
||||||
|
x, y := CanvasSize(address)
|
||||||
|
bounds = &image.Rectangle{Max: image.Pt(x, y)}
|
||||||
|
}
|
||||||
|
|
||||||
|
img = image.NewNRGBA(*bounds)
|
||||||
|
cmds := cmdsFetchImage(*bounds).Chunk(conns)
|
||||||
|
|
||||||
for i := 0; i < conns; i++ {
|
for i := 0; i < conns; i++ {
|
||||||
conn, err := net.Dial("tcp", address)
|
conn, err := net.Dial("tcp", address)
|
||||||
|
@ -102,7 +108,7 @@ func FetchImage(bounds image.Rectangle, address string, conns int, stop chan boo
|
||||||
|
|
||||||
func readPixels(target *image.NRGBA, conn net.Conn, stop chan bool) {
|
func readPixels(target *image.NRGBA, conn net.Conn, stop chan bool) {
|
||||||
reader := bufio.NewReader(conn)
|
reader := bufio.NewReader(conn)
|
||||||
col := make([]byte, 3)
|
col := make([]byte, 4)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-stop:
|
case <-stop:
|
||||||
|
@ -114,12 +120,13 @@ func readPixels(target *image.NRGBA, conn net.Conn, stop chan bool) {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// parse response ("PX <x> <y> <col>\n")
|
// parse response ("PX <x> <y> <rrggbbaa>\n")
|
||||||
colorStart := len(res) - 7
|
// NOTE: shoreline sends alpha, pixelnuke does not!
|
||||||
x, y := parseXY(res[3 : colorStart-1])
|
colorStart := len(res) - 9
|
||||||
|
x, y := parseXY(res[3:colorStart])
|
||||||
hex.Decode(col, res[colorStart:len(res)-1])
|
hex.Decode(col, res[colorStart:len(res)-1])
|
||||||
|
|
||||||
target.SetNRGBA(x, y, color.NRGBA{col[0], col[1], col[2], 255})
|
target.SetNRGBA(x, y, color.NRGBA{col[0], col[1], col[2], col[3]})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,14 +6,15 @@ import (
|
||||||
"log"
|
"log"
|
||||||
"net"
|
"net"
|
||||||
"net/rpc"
|
"net/rpc"
|
||||||
"os"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/SpeckiJ/Hochwasser/pixelflut"
|
"github.com/SpeckiJ/Hochwasser/pixelflut"
|
||||||
)
|
)
|
||||||
|
|
||||||
func ConnectHevring(ránAddress string) {
|
func ConnectHevring(ránAddress string, stop chan bool, wg *sync.WaitGroup) {
|
||||||
rpc.Register(new(Hevring))
|
h := new(Hevring)
|
||||||
|
rpc.Register(h)
|
||||||
|
|
||||||
fmt.Printf("[hevring] greeting Rán at %s\n", ránAddress)
|
fmt.Printf("[hevring] greeting Rán at %s\n", ránAddress)
|
||||||
conn, err := net.Dial("tcp", ránAddress)
|
conn, err := net.Dial("tcp", ránAddress)
|
||||||
|
@ -22,11 +23,27 @@ func ConnectHevring(ránAddress string) {
|
||||||
}
|
}
|
||||||
go rpc.ServeConn(conn)
|
go rpc.ServeConn(conn)
|
||||||
fmt.Printf("[hevring] awaiting task from Rán\n")
|
fmt.Printf("[hevring] awaiting task from Rán\n")
|
||||||
|
|
||||||
|
h.quit = stop
|
||||||
|
h.wg = wg
|
||||||
|
h.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
select {
|
||||||
|
case <-h.quit:
|
||||||
|
}
|
||||||
|
if h.taskQuit != nil {
|
||||||
|
close(h.taskQuit)
|
||||||
|
h.taskQuit = nil
|
||||||
|
}
|
||||||
|
h.wg.Done()
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
type Hevring struct {
|
type Hevring struct {
|
||||||
task FlutTask
|
task FlutTask
|
||||||
taskQuit chan bool
|
taskQuit chan bool
|
||||||
|
quit chan bool
|
||||||
|
wg *sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
type FlutTask struct {
|
type FlutTask struct {
|
||||||
|
@ -61,7 +78,7 @@ func (h *Hevring) Flut(task FlutTask, reply *FlutAck) error {
|
||||||
close(h.taskQuit)
|
close(h.taskQuit)
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Printf("[hevring] Rán gave us /w o r k/!\n%v\n", task)
|
fmt.Printf("[hevring] Rán gave us work!\n%v\n", task)
|
||||||
h.task = task
|
h.task = task
|
||||||
h.taskQuit = make(chan bool)
|
h.taskQuit = make(chan bool)
|
||||||
|
|
||||||
|
@ -93,9 +110,9 @@ func (h *Hevring) Die(x int, reply *FlutAck) error {
|
||||||
// @robustness: waiting for reply to be sent via timeout
|
// @robustness: waiting for reply to be sent via timeout
|
||||||
// @incomplete: should try to reconnect for a bit first
|
// @incomplete: should try to reconnect for a bit first
|
||||||
go func() {
|
go func() {
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
fmt.Println("[hevring] Rán disconnected, stopping")
|
fmt.Println("[hevring] Rán disconnected, stopping")
|
||||||
os.Exit(0)
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
close(h.quit)
|
||||||
}()
|
}()
|
||||||
reply.Ok = true
|
reply.Ok = true
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -93,7 +93,7 @@ func SummonRán(address string, stopChan chan bool, wg *sync.WaitGroup) *Rán {
|
||||||
}()
|
}()
|
||||||
|
|
||||||
go RunREPL(r)
|
go RunREPL(r)
|
||||||
go r.killClients(stopChan, wg)
|
go r.handleExit(stopChan, wg)
|
||||||
|
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
@ -129,13 +129,14 @@ func (r *Rán) stopTask() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Rán) killClients(stopChan <-chan bool, wg *sync.WaitGroup) {
|
func (r *Rán) handleExit(stopChan <-chan bool, wg *sync.WaitGroup) {
|
||||||
|
wg.Add(1)
|
||||||
|
defer wg.Done()
|
||||||
<-stopChan
|
<-stopChan
|
||||||
for _, c := range r.clients {
|
for _, c := range r.clients {
|
||||||
ack := FlutAck{}
|
ack := FlutAck{}
|
||||||
c.Call("Hevring.Die", 0, &ack) // @speed: async
|
c.Call("Hevring.Die", 0, &ack) // @speed: async
|
||||||
}
|
}
|
||||||
wg.Done()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetTask assigns a FlutTask to Rán, distributing it to all clients
|
// SetTask assigns a FlutTask to Rán, distributing it to all clients
|
||||||
|
|
Loading…
Reference in New Issue