From e716ae8d25da6a1917a639739a0ee946469213ed Mon Sep 17 00:00:00 2001 From: Norwin Roosen Date: Wed, 12 Feb 2020 11:20:15 +0100 Subject: [PATCH] MVP distributed hochwasser --- main.go | 91 ++++++++++++++++++++++++++++++--------------------- rpc/dottir.go | 33 ++++++++++++++++--- rpc/ran.go | 71 ++++++++++++++++++++++++---------------- 3 files changed, 125 insertions(+), 70 deletions(-) diff --git a/main.go b/main.go index 8320caf..eaed2d8 100644 --- a/main.go +++ b/main.go @@ -15,29 +15,19 @@ import ( var err error var cpuprofile = flag.String("cpuprofile", "", "Destination file for CPU Profile") -var image_path = flag.String("image", "", "Absolute Path to image") -var image_offsetx = flag.Int("xoffset", 0, "Offset of posted image from left border") -var image_offsety = flag.Int("yoffset", 0, "Offset of posted image from top border") +var imgPath = flag.String("image", "", "Absolute Path to image") +var x = flag.Int("x", 0, "Offset of posted image from left border") +var y = flag.Int("y", 0, "Offset of posted image from top border") var connections = flag.Int("connections", 4, "Number of simultaneous connections. Each connection posts a subimage") var address = flag.String("host", "127.0.0.1:1337", "Server address") var runtime = flag.String("runtime", "60s", "exit after timeout") var shuffle = flag.Bool("shuffle", false, "pixel send ordering") var fetchImgPath = flag.String("fetch-image", "", "path to save the fetched pixel state to") -var rán = flag.String("rán", "", "enable rpc server to distribute jobs, listening on the given address/port") -var hevring = flag.String("hevring", "", "connect to rán rpc server at given address") +var ránAddr = flag.String("rán", "", "enable rpc server to distribute jobs, listening on the given address/port") +var hevringAddr = flag.String("hevring", "", "connect to rán rpc server at given address") func main() { flag.Parse() - if *image_path == "" { - log.Fatal("No image provided") - } - - // check connectivity by opening one test connection - conn, err := net.Dial("tcp", *address) - if err != nil { - log.Fatal(err) - } - conn.Close() // Start cpu profiling if wanted if *cpuprofile != "" { @@ -50,29 +40,56 @@ func main() { defer pprof.StopCPUProfile() } - if *rán != "" { // @fixme: should validate proper address? - rpc.SummonRán(*rán) - } - if *hevring != "" { // @fixme: should validate proper address? - rpc.ConnectHevring(*hevring) - } + if *imgPath != "" { + offset := image.Pt(*x, *y) + img := readImage(*imgPath) - offset := image.Pt(*image_offsetx, *image_offsety) - img := readImage(*image_path) + // check connectivity by opening one test connection + conn, err := net.Dial("tcp", *address) + if err != nil { + log.Fatal(err) + } + conn.Close() - if *fetchImgPath != "" { - fetchedImg := pixelflut.FetchImage(img.Bounds().Add(offset), *address, 1) - *connections -= 1 - defer writeImage(*fetchImgPath, fetchedImg) + if *ránAddr != "" { + // run RPC server, tasking clients to flut + r := rpc.SummonRán(*ránAddr) + r.SetTask(img, offset, *address, *connections) // @incomplete + select {} // block forever + + } else { + + // local 🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊 + pixelflut.Flut(img, offset, *shuffle, *address, *connections) + + // fetch server state and save to file + // @incomplete: make this available also when not fluting? + if *fetchImgPath != "" { + fetchedImg := pixelflut.FetchImage(img.Bounds().Add(offset), *address, 1) + *connections -= 1 + defer writeImage(*fetchImgPath, fetchedImg) + } + + // Terminate after timeout to save resources + timer, err := time.ParseDuration(*runtime) + if err != nil { + log.Fatal("Invalid runtime specified: " + err.Error()) + } + time.Sleep(timer) + } + + } else if *hevringAddr != "" { + // connect to RPC server and execute their tasks + rpc.ConnectHevring(*hevringAddr) + select {} // block forever + + } else { + log.Fatal("must specify -image or -hevring") } - - // 🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊 - pixelflut.Flut(img, offset, *shuffle, *address, *connections) - - // Terminate after timeout to save resources - timer, err := time.ParseDuration(*runtime) - if err != nil { - log.Fatal("Invalid runtime specified: " + err.Error()) - } - time.Sleep(timer) } + +/** + * @incomplete: clean exit + * to ensure cleanup is done (rpc disconnects, cpuprof, image writing, ...), + * we should catch signals and force-exit all goroutines (bomb, rpc). via channel? + */ diff --git a/rpc/dottir.go b/rpc/dottir.go index 04b7214..31d2fc2 100644 --- a/rpc/dottir.go +++ b/rpc/dottir.go @@ -2,12 +2,14 @@ package rpc import ( "fmt" + "image" "log" "net" "net/rpc" -) + "time" -// const handshake_magick = "Sæl!" + "github.com/SpeckiJ/Hochwasser/pixelflut" +) func ConnectHevring(ránAddress string) { rpc.Register(new(Hevring)) @@ -21,17 +23,29 @@ func ConnectHevring(ránAddress string) { fmt.Printf("[hevring] awaiting task from Rán\n") } -type Hevring struct {} +type Hevring struct{} + +type FlutTask struct { + Address string + MaxConns int + Img *image.NRGBA // bug :imageType: should be image.Image, but can't be serialized. do conversion in task creation? + Offset image.Point + Shuffle bool +} type FlutAck struct{ Ok bool } -func (h *Hevring) Flut(job RánJob, reply *FlutAck) error { - fmt.Printf("[hevring] Rán gave us /w o r k/! %v\n", job) +func (h *Hevring) Flut(task FlutTask, reply *FlutAck) error { + // @incomplete: async errorhandling + // @incomplete: stop old task if new task is received + fmt.Printf("[hevring] Rán gave us /w o r k/! %v\n", task) + pixelflut.Flut(task.Img, task.Offset, task.Shuffle, task.Address, task.MaxConns) reply.Ok = true return nil } func (h *Hevring) Status(x int, reply *FlutAck) error { + // @incomplete: provide performance metrics reply.Ok = true return nil } @@ -40,3 +54,12 @@ func (h *Hevring) Stop(x int, reply *FlutAck) error { reply.Ok = true return nil } + +func (h *Hevring) Die(x int, reply *FlutAck) error { + go func() { // @cleanup: hacky + time.Sleep(100 * time.Millisecond) + log.Fatal("[hevring] Rán disconnected, stopping") + }() + reply.Ok = true + return nil +} diff --git a/rpc/ran.go b/rpc/ran.go index 393db90..5ef63e1 100644 --- a/rpc/ran.go +++ b/rpc/ran.go @@ -9,8 +9,13 @@ import ( "time" ) -func SummonRán(address string) { - rán := new(Rán) +type Rán struct { + clients []*rpc.Client + task FlutTask +} + +func SummonRán(address string) *Rán { + r := new(Rán) l, err := net.Listen("tcp", address) if err != nil { @@ -18,56 +23,66 @@ func SummonRán(address string) { } fmt.Printf("[rán] rpc server listening on %s\n", l.Addr()) + // serve tcp port, handshake go func() { for { conn, err := l.Accept() if err != nil { log.Fatal(err) } - fmt.Printf("[rán] client connected (%v)\n", conn.RemoteAddr()) - client := rpc.NewClient(conn) - rán.clientConns = append(rán.clientConns, client) + r.clients = append(r.clients, client) + fmt.Printf("[rán] client connected (%v). current clients: %v\n", + conn.RemoteAddr(), len(r.clients)) - ack := FlutAck{} - err = client.Call("Hevring.Flut", RánJob{}, &ack) - if err != nil { - log.Fatal(err) + if (r.task != FlutTask{}) { + ack := FlutAck{} + err = client.Call("Hevring.Flut", r.task, &ack) + if err != nil || !ack.Ok { + log.Printf("[rán] client didn't accept task") + } } - fmt.Printf("[rán] client accepted job: %v\n", ack.Ok) } }() // poll clients go func() { for { - time.Sleep(500 * time.Millisecond) + time.Sleep(100 * time.Millisecond) var clients []*rpc.Client - for _, c := range rán.clientConns { + for _, c := range r.clients { status := FlutAck{} - c.Call("Hevring.Status", 0, &status) - if status.Ok { + err3 := c.Call("Hevring.Status", 0, &status) + if err3 == nil || status.Ok { clients = append(clients, c) } } - rán.clientConns = clients - fmt.Printf("[rán] current clients: %v\n", clients) - - // @incomplete: if clients changed, assign tasks anew + if len(r.clients) != len(clients) { + fmt.Printf("[rán] client disconnected. current clients: %v\n", len(clients)) + } + r.clients = clients } }() + + // @incomplete: REPL to change tasks without loosing clients + + return r } +func (r *Rán) SetTask(img image.Image, offset image.Point, address string, maxConns int) { + // @incomplete: smart task creation: + // fetch server state & sample foreign activity in image regions. assign + // subregions to clients (per connection), considering their bandwidth. -type Rán struct { - clientConns []*rpc.Client -} - -type RánJob struct { - Address string - MaxConns int - Img image.Image - Bounds image.Rectangle - Shuffle bool + // @bug :imageType + r.task = FlutTask{address, maxConns, img.(*image.NRGBA), offset, true} + for _, c := range r.clients { + ack := FlutAck{} + // @speed: should send tasks async + err := c.Call("Hevring.Flut", r.task, &ack) + if err != nil || !ack.Ok { + log.Printf("[rán] client didn't accept task") + } + } }