start distributed hochwasser

only RPC plumbing for now.

naming: https://en.wikipedia.org/wiki/Nine_Daughters_of_%C3%86gir_and_R%C3%A1n
This commit is contained in:
Norwin Roosen 2020-02-07 12:20:53 +01:00
parent 3f0acd9694
commit d86e769e9a
3 changed files with 94 additions and 0 deletions

10
main.go
View File

@ -10,6 +10,7 @@ import (
"time" "time"
"github.com/SpeckiJ/Hochwasser/pixelflut" "github.com/SpeckiJ/Hochwasser/pixelflut"
"github.com/SpeckiJ/Hochwasser/rpc"
) )
var err error var err error
@ -22,6 +23,8 @@ var address = flag.String("host", "127.0.0.1:1337", "Server address")
var runtime = flag.String("runtime", "60s", "exit after timeout") var runtime = flag.String("runtime", "60s", "exit after timeout")
var shuffle = flag.Bool("shuffle", false, "pixel send ordering") var shuffle = flag.Bool("shuffle", false, "pixel send ordering")
var fetchImgPath = flag.String("fetch-image", "", "path to save the fetched pixel state to") var fetchImgPath = flag.String("fetch-image", "", "path to save the fetched pixel state to")
var rán = flag.String("rán", "", "enable rpc server to distribute jobs, listening on the given address/port")
var hevring = flag.String("hevring", "", "connect to rán rpc server at given address")
func main() { func main() {
flag.Parse() flag.Parse()
@ -47,6 +50,13 @@ func main() {
defer pprof.StopCPUProfile() defer pprof.StopCPUProfile()
} }
if *rán != "" { // @fixme: should validate proper address?
rpc.StartRán(*rán)
}
if *hevring != "" { // @fixme: should validate proper address?
rpc.ConnectHevring(*hevring)
}
offset := image.Pt(*image_offsetx, *image_offsety) offset := image.Pt(*image_offsetx, *image_offsety)
img := readImage(*image_path) img := readImage(*image_path)

28
rpc/dottir.go Normal file
View File

@ -0,0 +1,28 @@
package rpc
import (
"fmt"
"log"
"net/rpc"
)
func ConnectHevring(ránAddress string) {
fmt.Printf("[hevring] connecting to %s\n", ránAddress)
client, err := rpc.Dial("tcp", ránAddress)
if err != nil {
log.Fatal(err)
}
job := RánJob{}
err = client.Call("Rán.Hello", RánHelloReq{}, &job)
if err != nil {
log.Fatal(err)
}
if (job == RánJob{}) {
fmt.Printf("[hevring] Rán has no job for us. :(\n")
} else {
fmt.Printf("[hevring] Rán gave us /w o r k/! %v\n", job)
}
}

56
rpc/rán.go Normal file
View File

@ -0,0 +1,56 @@
package rpc
import (
"fmt"
"image"
"log"
"net"
"net/rpc"
)
func StartRán(address string) {
rán := new(Rán)
rpc.Register(rán)
l, err := net.Listen("tcp", address)
if err != nil {
log.Fatal(err)
}
fmt.Printf("[rán] rpc server listening on %s\n", l.Addr())
go func() {
for {
conn, err := l.Accept()
if err != nil {
log.Fatal(err)
}
rán.clientAddresses = append(rán.clientAddresses, conn.RemoteAddr())
fmt.Printf("[rán] client connected (%v)\n", rán.clientAddresses)
// @incomplete: detect client disconnect, update clients.
go rpc.ServeConn(conn)
// @bug: second connection does not send Hello..?!
}
}()
}
type Rán struct {
clientAddresses []net.Addr
}
type RánHelloReq struct{}
type RánJob struct {
Address string
MaxConns int
Img image.Image
Bounds image.Rectangle
Shuffle bool
}
func (z *Rán) Hello(args RánHelloReq, reply *RánJob) error {
fmt.Printf("[rán] a client said hello!\n")
reply = nil
return nil
}