From cf15da0732ee9c4d2912c01317362198b90c050d Mon Sep 17 00:00:00 2001 From: Norwin Roosen Date: Tue, 11 Feb 2020 14:46:58 +0100 Subject: [PATCH] rework rpc --- main.go | 10 +++---- rpc/dottir.go | 42 +++++++++++++++++++---------- rpc/ran.go | 73 +++++++++++++++++++++++++++++++++++++++++++++++++++ rpc/rán.go | 56 --------------------------------------- rpc/test.sh | 29 ++++++++++++++++++++ 5 files changed, 133 insertions(+), 77 deletions(-) create mode 100644 rpc/ran.go delete mode 100644 rpc/rán.go create mode 100755 rpc/test.sh diff --git a/main.go b/main.go index 4d31c69..8320caf 100644 --- a/main.go +++ b/main.go @@ -51,7 +51,7 @@ func main() { } if *rán != "" { // @fixme: should validate proper address? - rpc.StartRán(*rán) + rpc.SummonRán(*rán) } if *hevring != "" { // @fixme: should validate proper address? rpc.ConnectHevring(*hevring) @@ -60,10 +60,10 @@ func main() { offset := image.Pt(*image_offsetx, *image_offsety) img := readImage(*image_path) - var fetchedImg *image.NRGBA if *fetchImgPath != "" { - fetchedImg = pixelflut.FetchImage(img.Bounds().Add(offset), *address, 1) + fetchedImg := pixelflut.FetchImage(img.Bounds().Add(offset), *address, 1) *connections -= 1 + defer writeImage(*fetchImgPath, fetchedImg) } // 🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊 @@ -75,8 +75,4 @@ func main() { log.Fatal("Invalid runtime specified: " + err.Error()) } time.Sleep(timer) - - if *fetchImgPath != "" { - writeImage(*fetchImgPath, fetchedImg) - } } diff --git a/rpc/dottir.go b/rpc/dottir.go index b23bf1c..04b7214 100644 --- a/rpc/dottir.go +++ b/rpc/dottir.go @@ -3,26 +3,40 @@ package rpc import ( "fmt" "log" + "net" "net/rpc" ) +// const handshake_magick = "Sæl!" + func ConnectHevring(ránAddress string) { - fmt.Printf("[hevring] connecting to %s\n", ránAddress) + rpc.Register(new(Hevring)) - client, err := rpc.Dial("tcp", ránAddress) + fmt.Printf("[hevring] greeting Rán at %s\n", ránAddress) + conn, err := net.Dial("tcp", ránAddress) if err != nil { log.Fatal(err) } - - job := RánJob{} - err = client.Call("Rán.Hello", RánHelloReq{}, &job) - if err != nil { - log.Fatal(err) - } - - if (job == RánJob{}) { - fmt.Printf("[hevring] Rán has no job for us. :(\n") - } else { - fmt.Printf("[hevring] Rán gave us /w o r k/! %v\n", job) - } + go rpc.ServeConn(conn) + fmt.Printf("[hevring] awaiting task from Rán\n") +} + +type Hevring struct {} + +type FlutAck struct{ Ok bool } + +func (h *Hevring) Flut(job RánJob, reply *FlutAck) error { + fmt.Printf("[hevring] Rán gave us /w o r k/! %v\n", job) + reply.Ok = true + return nil +} + +func (h *Hevring) Status(x int, reply *FlutAck) error { + reply.Ok = true + return nil +} + +func (h *Hevring) Stop(x int, reply *FlutAck) error { + reply.Ok = true + return nil } diff --git a/rpc/ran.go b/rpc/ran.go new file mode 100644 index 0000000..393db90 --- /dev/null +++ b/rpc/ran.go @@ -0,0 +1,73 @@ +package rpc + +import ( + "fmt" + "image" + "log" + "net" + "net/rpc" + "time" +) + +func SummonRán(address string) { + rán := new(Rán) + + l, err := net.Listen("tcp", address) + if err != nil { + log.Fatal(err) + } + fmt.Printf("[rán] rpc server listening on %s\n", l.Addr()) + + go func() { + for { + conn, err := l.Accept() + if err != nil { + log.Fatal(err) + } + fmt.Printf("[rán] client connected (%v)\n", conn.RemoteAddr()) + + client := rpc.NewClient(conn) + rán.clientConns = append(rán.clientConns, client) + + ack := FlutAck{} + err = client.Call("Hevring.Flut", RánJob{}, &ack) + if err != nil { + log.Fatal(err) + } + fmt.Printf("[rán] client accepted job: %v\n", ack.Ok) + } + }() + + // poll clients + go func() { + for { + time.Sleep(500 * time.Millisecond) + + var clients []*rpc.Client + for _, c := range rán.clientConns { + status := FlutAck{} + c.Call("Hevring.Status", 0, &status) + if status.Ok { + clients = append(clients, c) + } + } + rán.clientConns = clients + fmt.Printf("[rán] current clients: %v\n", clients) + + // @incomplete: if clients changed, assign tasks anew + } + }() +} + + +type Rán struct { + clientConns []*rpc.Client +} + +type RánJob struct { + Address string + MaxConns int + Img image.Image + Bounds image.Rectangle + Shuffle bool +} diff --git a/rpc/rán.go b/rpc/rán.go deleted file mode 100644 index 9d8ca41..0000000 --- a/rpc/rán.go +++ /dev/null @@ -1,56 +0,0 @@ -package rpc - -import ( - "fmt" - "image" - "log" - "net" - "net/rpc" -) - -func StartRán(address string) { - rán := new(Rán) - rpc.Register(rán) - - l, err := net.Listen("tcp", address) - if err != nil { - log.Fatal(err) - } - fmt.Printf("[rán] rpc server listening on %s\n", l.Addr()) - - go func() { - for { - conn, err := l.Accept() - if err != nil { - log.Fatal(err) - } - rán.clientAddresses = append(rán.clientAddresses, conn.RemoteAddr()) - fmt.Printf("[rán] client connected (%v)\n", rán.clientAddresses) - - // @incomplete: detect client disconnect, update clients. - go rpc.ServeConn(conn) - - // @bug: second connection does not send Hello..?! - } - }() -} - -type Rán struct { - clientAddresses []net.Addr -} - -type RánHelloReq struct{} - -type RánJob struct { - Address string - MaxConns int - Img image.Image - Bounds image.Rectangle - Shuffle bool -} - -func (z *Rán) Hello(args RánHelloReq, reply *RánJob) error { - fmt.Printf("[rán] a client said hello!\n") - reply = nil - return nil -} diff --git a/rpc/test.sh b/rpc/test.sh new file mode 100755 index 0000000..d3b307a --- /dev/null +++ b/rpc/test.sh @@ -0,0 +1,29 @@ +#!/bin/bash + +# runs two instances of hochwasser (rpc server & client each) against a fake (iperf) +# pixelflut server for a few seconds, and opens pprof afterwar + +runtime=${1:-"2s"} + +function cleanup { + kill ${pids[@]} > /dev/null 2>&1 +} +trap cleanup EXIT + +wd=$(dirname "$0") +pids=() + +iperf -p 1337 -s > /dev/null 2>&1 & +pids+=($!) + +go run . -image $wd/../benchmarks/black_small.png -rán :1234 & +pids+=($!) +sleep 0.2 + +go run . -image $wd/../benchmarks/white_small.png -hevring :1234 -runtime "$runtime" -cpuprofile hevring.prof +pids+=($!) + +cleanup + +go tool pprof -http :8080 Hochwasser hevring.prof +#go tool pprof -http :8081 Hochwasser ran.prof