rework rpc

This commit is contained in:
Norwin Roosen 2020-02-11 14:46:58 +01:00
parent 067bef55f6
commit cf15da0732
5 changed files with 133 additions and 77 deletions

10
main.go
View File

@ -51,7 +51,7 @@ func main() {
}
if *rán != "" { // @fixme: should validate proper address?
rpc.StartRán(*rán)
rpc.SummonRán(*rán)
}
if *hevring != "" { // @fixme: should validate proper address?
rpc.ConnectHevring(*hevring)
@ -60,10 +60,10 @@ func main() {
offset := image.Pt(*image_offsetx, *image_offsety)
img := readImage(*image_path)
var fetchedImg *image.NRGBA
if *fetchImgPath != "" {
fetchedImg = pixelflut.FetchImage(img.Bounds().Add(offset), *address, 1)
fetchedImg := pixelflut.FetchImage(img.Bounds().Add(offset), *address, 1)
*connections -= 1
defer writeImage(*fetchImgPath, fetchedImg)
}
// 🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊
@ -75,8 +75,4 @@ func main() {
log.Fatal("Invalid runtime specified: " + err.Error())
}
time.Sleep(timer)
if *fetchImgPath != "" {
writeImage(*fetchImgPath, fetchedImg)
}
}

View File

@ -3,26 +3,40 @@ package rpc
import (
"fmt"
"log"
"net"
"net/rpc"
)
// const handshake_magick = "Sæl!"
func ConnectHevring(ránAddress string) {
fmt.Printf("[hevring] connecting to %s\n", ránAddress)
rpc.Register(new(Hevring))
client, err := rpc.Dial("tcp", ránAddress)
fmt.Printf("[hevring] greeting Rán at %s\n", ránAddress)
conn, err := net.Dial("tcp", ránAddress)
if err != nil {
log.Fatal(err)
}
job := RánJob{}
err = client.Call("Rán.Hello", RánHelloReq{}, &job)
if err != nil {
log.Fatal(err)
}
if (job == RánJob{}) {
fmt.Printf("[hevring] Rán has no job for us. :(\n")
} else {
fmt.Printf("[hevring] Rán gave us /w o r k/! %v\n", job)
}
go rpc.ServeConn(conn)
fmt.Printf("[hevring] awaiting task from Rán\n")
}
type Hevring struct {}
type FlutAck struct{ Ok bool }
func (h *Hevring) Flut(job RánJob, reply *FlutAck) error {
fmt.Printf("[hevring] Rán gave us /w o r k/! %v\n", job)
reply.Ok = true
return nil
}
func (h *Hevring) Status(x int, reply *FlutAck) error {
reply.Ok = true
return nil
}
func (h *Hevring) Stop(x int, reply *FlutAck) error {
reply.Ok = true
return nil
}

73
rpc/ran.go Normal file
View File

@ -0,0 +1,73 @@
package rpc
import (
"fmt"
"image"
"log"
"net"
"net/rpc"
"time"
)
func SummonRán(address string) {
rán := new(Rán)
l, err := net.Listen("tcp", address)
if err != nil {
log.Fatal(err)
}
fmt.Printf("[rán] rpc server listening on %s\n", l.Addr())
go func() {
for {
conn, err := l.Accept()
if err != nil {
log.Fatal(err)
}
fmt.Printf("[rán] client connected (%v)\n", conn.RemoteAddr())
client := rpc.NewClient(conn)
rán.clientConns = append(rán.clientConns, client)
ack := FlutAck{}
err = client.Call("Hevring.Flut", RánJob{}, &ack)
if err != nil {
log.Fatal(err)
}
fmt.Printf("[rán] client accepted job: %v\n", ack.Ok)
}
}()
// poll clients
go func() {
for {
time.Sleep(500 * time.Millisecond)
var clients []*rpc.Client
for _, c := range rán.clientConns {
status := FlutAck{}
c.Call("Hevring.Status", 0, &status)
if status.Ok {
clients = append(clients, c)
}
}
rán.clientConns = clients
fmt.Printf("[rán] current clients: %v\n", clients)
// @incomplete: if clients changed, assign tasks anew
}
}()
}
type Rán struct {
clientConns []*rpc.Client
}
type RánJob struct {
Address string
MaxConns int
Img image.Image
Bounds image.Rectangle
Shuffle bool
}

View File

@ -1,56 +0,0 @@
package rpc
import (
"fmt"
"image"
"log"
"net"
"net/rpc"
)
func StartRán(address string) {
rán := new(Rán)
rpc.Register(rán)
l, err := net.Listen("tcp", address)
if err != nil {
log.Fatal(err)
}
fmt.Printf("[rán] rpc server listening on %s\n", l.Addr())
go func() {
for {
conn, err := l.Accept()
if err != nil {
log.Fatal(err)
}
rán.clientAddresses = append(rán.clientAddresses, conn.RemoteAddr())
fmt.Printf("[rán] client connected (%v)\n", rán.clientAddresses)
// @incomplete: detect client disconnect, update clients.
go rpc.ServeConn(conn)
// @bug: second connection does not send Hello..?!
}
}()
}
type Rán struct {
clientAddresses []net.Addr
}
type RánHelloReq struct{}
type RánJob struct {
Address string
MaxConns int
Img image.Image
Bounds image.Rectangle
Shuffle bool
}
func (z *Rán) Hello(args RánHelloReq, reply *RánJob) error {
fmt.Printf("[rán] a client said hello!\n")
reply = nil
return nil
}

29
rpc/test.sh Executable file
View File

@ -0,0 +1,29 @@
#!/bin/bash
# runs two instances of hochwasser (rpc server & client each) against a fake (iperf)
# pixelflut server for a few seconds, and opens pprof afterwar
runtime=${1:-"2s"}
function cleanup {
kill ${pids[@]} > /dev/null 2>&1
}
trap cleanup EXIT
wd=$(dirname "$0")
pids=()
iperf -p 1337 -s > /dev/null 2>&1 &
pids+=($!)
go run . -image $wd/../benchmarks/black_small.png -rán :1234 &
pids+=($!)
sleep 0.2
go run . -image $wd/../benchmarks/white_small.png -hevring :1234 -runtime "$runtime" -cpuprofile hevring.prof
pids+=($!)
cleanup
go tool pprof -http :8080 Hochwasser hevring.prof
#go tool pprof -http :8081 Hochwasser ran.prof