MVP distributed hochwasser

This commit is contained in:
Norwin Roosen 2020-02-12 11:20:15 +01:00
parent cf15da0732
commit e716ae8d25
3 changed files with 125 additions and 70 deletions

69
main.go
View File

@ -15,29 +15,19 @@ import (
var err error var err error
var cpuprofile = flag.String("cpuprofile", "", "Destination file for CPU Profile") var cpuprofile = flag.String("cpuprofile", "", "Destination file for CPU Profile")
var image_path = flag.String("image", "", "Absolute Path to image") var imgPath = flag.String("image", "", "Absolute Path to image")
var image_offsetx = flag.Int("xoffset", 0, "Offset of posted image from left border") var x = flag.Int("x", 0, "Offset of posted image from left border")
var image_offsety = flag.Int("yoffset", 0, "Offset of posted image from top border") var y = flag.Int("y", 0, "Offset of posted image from top border")
var connections = flag.Int("connections", 4, "Number of simultaneous connections. Each connection posts a subimage") var connections = flag.Int("connections", 4, "Number of simultaneous connections. Each connection posts a subimage")
var address = flag.String("host", "127.0.0.1:1337", "Server address") var address = flag.String("host", "127.0.0.1:1337", "Server address")
var runtime = flag.String("runtime", "60s", "exit after timeout") var runtime = flag.String("runtime", "60s", "exit after timeout")
var shuffle = flag.Bool("shuffle", false, "pixel send ordering") var shuffle = flag.Bool("shuffle", false, "pixel send ordering")
var fetchImgPath = flag.String("fetch-image", "", "path to save the fetched pixel state to") var fetchImgPath = flag.String("fetch-image", "", "path to save the fetched pixel state to")
var rán = flag.String("rán", "", "enable rpc server to distribute jobs, listening on the given address/port") var ránAddr = flag.String("rán", "", "enable rpc server to distribute jobs, listening on the given address/port")
var hevring = flag.String("hevring", "", "connect to rán rpc server at given address") var hevringAddr = flag.String("hevring", "", "connect to rán rpc server at given address")
func main() { func main() {
flag.Parse() flag.Parse()
if *image_path == "" {
log.Fatal("No image provided")
}
// check connectivity by opening one test connection
conn, err := net.Dial("tcp", *address)
if err != nil {
log.Fatal(err)
}
conn.Close()
// Start cpu profiling if wanted // Start cpu profiling if wanted
if *cpuprofile != "" { if *cpuprofile != "" {
@ -50,29 +40,56 @@ func main() {
defer pprof.StopCPUProfile() defer pprof.StopCPUProfile()
} }
if *rán != "" { // @fixme: should validate proper address? if *imgPath != "" {
rpc.SummonRán(*rán) offset := image.Pt(*x, *y)
} img := readImage(*imgPath)
if *hevring != "" { // @fixme: should validate proper address?
rpc.ConnectHevring(*hevring)
}
offset := image.Pt(*image_offsetx, *image_offsety) // check connectivity by opening one test connection
img := readImage(*image_path) conn, err := net.Dial("tcp", *address)
if err != nil {
log.Fatal(err)
}
conn.Close()
if *ránAddr != "" {
// run RPC server, tasking clients to flut
r := rpc.SummonRán(*ránAddr)
r.SetTask(img, offset, *address, *connections) // @incomplete
select {} // block forever
} else {
// local 🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊
pixelflut.Flut(img, offset, *shuffle, *address, *connections)
// fetch server state and save to file
// @incomplete: make this available also when not fluting?
if *fetchImgPath != "" { if *fetchImgPath != "" {
fetchedImg := pixelflut.FetchImage(img.Bounds().Add(offset), *address, 1) fetchedImg := pixelflut.FetchImage(img.Bounds().Add(offset), *address, 1)
*connections -= 1 *connections -= 1
defer writeImage(*fetchImgPath, fetchedImg) defer writeImage(*fetchImgPath, fetchedImg)
} }
// 🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊
pixelflut.Flut(img, offset, *shuffle, *address, *connections)
// Terminate after timeout to save resources // Terminate after timeout to save resources
timer, err := time.ParseDuration(*runtime) timer, err := time.ParseDuration(*runtime)
if err != nil { if err != nil {
log.Fatal("Invalid runtime specified: " + err.Error()) log.Fatal("Invalid runtime specified: " + err.Error())
} }
time.Sleep(timer) time.Sleep(timer)
}
} else if *hevringAddr != "" {
// connect to RPC server and execute their tasks
rpc.ConnectHevring(*hevringAddr)
select {} // block forever
} else {
log.Fatal("must specify -image or -hevring")
}
} }
/**
* @incomplete: clean exit
* to ensure cleanup is done (rpc disconnects, cpuprof, image writing, ...),
* we should catch signals and force-exit all goroutines (bomb, rpc). via channel?
*/

View File

@ -2,12 +2,14 @@ package rpc
import ( import (
"fmt" "fmt"
"image"
"log" "log"
"net" "net"
"net/rpc" "net/rpc"
) "time"
// const handshake_magick = "Sæl!" "github.com/SpeckiJ/Hochwasser/pixelflut"
)
func ConnectHevring(ránAddress string) { func ConnectHevring(ránAddress string) {
rpc.Register(new(Hevring)) rpc.Register(new(Hevring))
@ -21,17 +23,29 @@ func ConnectHevring(ránAddress string) {
fmt.Printf("[hevring] awaiting task from Rán\n") fmt.Printf("[hevring] awaiting task from Rán\n")
} }
type Hevring struct {} type Hevring struct{}
type FlutTask struct {
Address string
MaxConns int
Img *image.NRGBA // bug :imageType: should be image.Image, but can't be serialized. do conversion in task creation?
Offset image.Point
Shuffle bool
}
type FlutAck struct{ Ok bool } type FlutAck struct{ Ok bool }
func (h *Hevring) Flut(job RánJob, reply *FlutAck) error { func (h *Hevring) Flut(task FlutTask, reply *FlutAck) error {
fmt.Printf("[hevring] Rán gave us /w o r k/! %v\n", job) // @incomplete: async errorhandling
// @incomplete: stop old task if new task is received
fmt.Printf("[hevring] Rán gave us /w o r k/! %v\n", task)
pixelflut.Flut(task.Img, task.Offset, task.Shuffle, task.Address, task.MaxConns)
reply.Ok = true reply.Ok = true
return nil return nil
} }
func (h *Hevring) Status(x int, reply *FlutAck) error { func (h *Hevring) Status(x int, reply *FlutAck) error {
// @incomplete: provide performance metrics
reply.Ok = true reply.Ok = true
return nil return nil
} }
@ -40,3 +54,12 @@ func (h *Hevring) Stop(x int, reply *FlutAck) error {
reply.Ok = true reply.Ok = true
return nil return nil
} }
func (h *Hevring) Die(x int, reply *FlutAck) error {
go func() { // @cleanup: hacky
time.Sleep(100 * time.Millisecond)
log.Fatal("[hevring] Rán disconnected, stopping")
}()
reply.Ok = true
return nil
}

View File

@ -9,8 +9,13 @@ import (
"time" "time"
) )
func SummonRán(address string) { type Rán struct {
rán := new(Rán) clients []*rpc.Client
task FlutTask
}
func SummonRán(address string) *Rán {
r := new(Rán)
l, err := net.Listen("tcp", address) l, err := net.Listen("tcp", address)
if err != nil { if err != nil {
@ -18,56 +23,66 @@ func SummonRán(address string) {
} }
fmt.Printf("[rán] rpc server listening on %s\n", l.Addr()) fmt.Printf("[rán] rpc server listening on %s\n", l.Addr())
// serve tcp port, handshake
go func() { go func() {
for { for {
conn, err := l.Accept() conn, err := l.Accept()
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
fmt.Printf("[rán] client connected (%v)\n", conn.RemoteAddr())
client := rpc.NewClient(conn) client := rpc.NewClient(conn)
rán.clientConns = append(rán.clientConns, client) r.clients = append(r.clients, client)
fmt.Printf("[rán] client connected (%v). current clients: %v\n",
conn.RemoteAddr(), len(r.clients))
if (r.task != FlutTask{}) {
ack := FlutAck{} ack := FlutAck{}
err = client.Call("Hevring.Flut", RánJob{}, &ack) err = client.Call("Hevring.Flut", r.task, &ack)
if err != nil { if err != nil || !ack.Ok {
log.Fatal(err) log.Printf("[rán] client didn't accept task")
}
} }
fmt.Printf("[rán] client accepted job: %v\n", ack.Ok)
} }
}() }()
// poll clients // poll clients
go func() { go func() {
for { for {
time.Sleep(500 * time.Millisecond) time.Sleep(100 * time.Millisecond)
var clients []*rpc.Client var clients []*rpc.Client
for _, c := range rán.clientConns { for _, c := range r.clients {
status := FlutAck{} status := FlutAck{}
c.Call("Hevring.Status", 0, &status) err3 := c.Call("Hevring.Status", 0, &status)
if status.Ok { if err3 == nil || status.Ok {
clients = append(clients, c) clients = append(clients, c)
} }
} }
rán.clientConns = clients if len(r.clients) != len(clients) {
fmt.Printf("[rán] current clients: %v\n", clients) fmt.Printf("[rán] client disconnected. current clients: %v\n", len(clients))
}
// @incomplete: if clients changed, assign tasks anew r.clients = clients
} }
}() }()
// @incomplete: REPL to change tasks without loosing clients
return r
} }
func (r *Rán) SetTask(img image.Image, offset image.Point, address string, maxConns int) {
// @incomplete: smart task creation:
// fetch server state & sample foreign activity in image regions. assign
// subregions to clients (per connection), considering their bandwidth.
type Rán struct { // @bug :imageType
clientConns []*rpc.Client r.task = FlutTask{address, maxConns, img.(*image.NRGBA), offset, true}
} for _, c := range r.clients {
ack := FlutAck{}
type RánJob struct { // @speed: should send tasks async
Address string err := c.Call("Hevring.Flut", r.task, &ack)
MaxConns int if err != nil || !ack.Ok {
Img image.Image log.Printf("[rán] client didn't accept task")
Bounds image.Rectangle }
Shuffle bool }
} }