Hochwasser/main.go

154 lines
4.0 KiB
Go

package main
import (
"flag"
"fmt"
"image"
"log"
"math/rand"
"os"
"os/signal"
"runtime/pprof"
"sync"
"time"
"github.com/SpeckiJ/Hochwasser/pixelflut"
"github.com/SpeckiJ/Hochwasser/render"
"github.com/SpeckiJ/Hochwasser/rpc"
)
var (
imgPath = flag.String("image", "", "Filepath of an image to flut")
ránAddr = flag.String("rán", "", "Start RPC server to distribute jobs, listening on the given address/port")
hevringAddr = flag.String("hevring", "", "Connect to RPC server at given address/port")
address = flag.String("host", ":1234", "Target server address")
connections = flag.Int("connections", 4, "Number of simultaneous connections. Each connection posts a subimage")
x = flag.Int("x", 0, "Offset of posted image from left border")
y = flag.Int("y", 0, "Offset of posted image from top border")
order = flag.String("order", "rtl", "Draw order (shuffle, ltr, rtl, ttb, btt)")
fetchImgPath = flag.String("fetch", "", "Enable fetching the screen area to the given local file, updating it each second")
hevringImgPath = flag.String("hevring-preview", "", "Write the current task image to the given PNG file")
cpuprofile = flag.String("cpuprofile", "", "Destination file for CPU Profile")
)
func main() {
flag.Parse()
rand.Seed(time.Now().UnixNano())
task := runWithExitHandler(taskFromFlags)
if *cpuprofile != "" {
runWithProfiler(*cpuprofile, task)
} else {
task()
}
}
func taskFromFlags(stop chan bool, wg *sync.WaitGroup) {
rán := *ránAddr
hev := *hevringAddr
startServer := rán != "" || (hev == "" && *imgPath != "")
startClient := hev != "" || (rán == "" && *imgPath != "")
fetchImg := *fetchImgPath != ""
if !(startServer || startClient || fetchImg) {
fmt.Println("Error: At least one of the following flags is needed:\n -image -rán -hevring\n")
flag.Usage()
os.Exit(1)
}
if startServer && startClient && rán == "" && hev == "" {
rán = fmt.Sprintf(":%d", rand.Intn(30000)+1000)
hev = rán
}
if startServer {
r := rpc.SummonRán(rán, stop, wg)
var img *image.NRGBA
if *imgPath != "" {
var err error
if img, err = render.ReadImage(*imgPath); err != nil {
log.Fatal(err)
}
}
r.SetTask(pixelflut.FlutTask{
FlutTaskOpts: pixelflut.FlutTaskOpts{
Address: *address,
MaxConns: *connections,
Offset: image.Pt(*x, *y),
RenderOrder: pixelflut.NewOrder(*order),
},
Img: img,
})
}
if startClient {
hevring := rpc.ConnectHevring(hev, stop, wg)
hevring.PreviewPath = *hevringImgPath
}
if fetchImg {
canvasToFile(*fetchImgPath, *address, time.Second, stop, wg)
}
}
func canvasToFile(filepath, server string, interval time.Duration, stop chan bool, wg *sync.WaitGroup) {
// async fetch the image
fetchedImg := pixelflut.FetchImage(nil, server, 1, stop)
// write it in a fixed interval
go func() {
wg.Add(1)
defer wg.Done()
for loop := true; loop; {
select {
case <-stop:
loop = false
case <-time.Tick(interval):
}
render.WriteImage(filepath, fetchedImg)
}
}()
}
// Takes a non-blocking function, and provides it an interface for graceful shutdown:
// stop chan is closed if the routine should be stopped. before quitting, wg is awaited.
func runWithExitHandler(task func(stop chan bool, wg *sync.WaitGroup)) func() {
return func() {
wg := sync.WaitGroup{}
stopChan := make(chan bool)
interruptChan := make(chan os.Signal)
signal.Notify(interruptChan, os.Interrupt)
task(stopChan, &wg)
// block until we get an interrupt, or somebody says we need to quit (by closing stopChan)
select {
case <-interruptChan:
case <-stopChan:
stopChan = nil
}
if stopChan != nil {
// notify all async tasks to stop on interrupt, if channel wasn't closed already
close(stopChan)
}
// then wait for clean shutdown of all tasks before exiting
wg.Wait()
}
}
func runWithProfiler(outfile string, task func()) {
f, err := os.Create(outfile)
if err != nil {
log.Fatal(err)
}
defer f.Close()
pprof.StartCPUProfile(f)
defer pprof.StopCPUProfile()
task()
}