Hochwasser/rpc/ran.go

146 lines
3.3 KiB
Go
Raw Normal View History

2020-02-11 14:46:58 +01:00
package rpc
import (
"fmt"
"log"
"net"
"net/rpc"
"sync"
2020-02-11 14:46:58 +01:00
"time"
"github.com/SpeckiJ/Hochwasser/pixelflut"
2020-02-11 14:46:58 +01:00
)
2020-02-14 22:29:58 +01:00
// Rán represents the RPC hub, used to coordinate `Hevring` clients.
// Implements `Fluter`
2020-02-12 11:20:15 +01:00
type Rán struct {
clients []*rpc.Client
2020-12-31 18:44:56 +01:00
task pixelflut.FlutTask
metrics pixelflut.Performance
2020-02-12 11:20:15 +01:00
}
// SummonRán sets up the RPC master, accepting connections at addres (":1234")
// Connects calls methods on each client's rpc provider, killing all clients
// when stopChan is closed.
func SummonRán(address string, stopChan chan bool, wg *sync.WaitGroup) *Rán {
2020-02-12 11:20:15 +01:00
r := new(Rán)
2020-02-11 14:46:58 +01:00
l, err := net.Listen("tcp", address)
if err != nil {
log.Fatal(err)
}
fmt.Printf("[rán] rpc server listening on %s\n", l.Addr())
2020-02-12 11:20:15 +01:00
// serve tcp port, handshake
2020-02-11 14:46:58 +01:00
go func() {
for {
conn, err := l.Accept()
if err != nil {
log.Fatal(err)
}
client := rpc.NewClient(conn)
2020-02-12 11:20:15 +01:00
r.clients = append(r.clients, client)
fmt.Printf("[rán] client connected (%v). current clients: %v\n",
conn.RemoteAddr(), len(r.clients))
2020-02-11 14:46:58 +01:00
2020-12-31 18:44:56 +01:00
if r.task.IsFlutable() {
2020-02-12 11:20:15 +01:00
ack := FlutAck{}
err = client.Call("Hevring.Flut", r.task, &ack)
if err != nil || !ack.Ok {
log.Printf("[rán] client didn't accept task")
}
2020-02-11 14:46:58 +01:00
}
}
}()
// poll clients
go func() {
for {
2020-02-12 11:20:15 +01:00
time.Sleep(100 * time.Millisecond)
2020-02-11 14:46:58 +01:00
var clients []*rpc.Client
r.metrics.Conns = 0
r.metrics.BytesPerSec = 0
r.metrics.BytesTotal = 0
2020-02-12 11:20:15 +01:00
for _, c := range r.clients {
status := FlutStatus{}
err := c.Call("Hevring.Status", r.metrics.Enabled, &status)
if err == nil && status.Ok {
2020-02-11 14:46:58 +01:00
clients = append(clients, c)
r.metrics.Conns += status.Conns
r.metrics.BytesPerSec += status.BytesPerSec
r.metrics.BytesTotal += status.BytesTotal
2020-02-11 14:46:58 +01:00
}
}
2020-02-12 11:20:15 +01:00
if len(r.clients) != len(clients) {
fmt.Printf("[rán] client disconnected. current clients: %v\n", len(clients))
}
r.clients = clients
2020-02-11 14:46:58 +01:00
}
}()
// print performance
go func() {
for {
time.Sleep(5 * time.Second)
if r.metrics.Enabled {
fmt.Println(r.metrics)
}
}
}()
2020-02-14 22:29:58 +01:00
go RunREPL(r)
go r.handleExit(stopChan, wg)
2020-02-12 13:55:04 +01:00
2020-02-14 22:29:58 +01:00
return r
}
2020-12-31 18:44:56 +01:00
func (r *Rán) getTask() pixelflut.FlutTask { return r.task }
2020-02-14 22:29:58 +01:00
func (r *Rán) toggleMetrics() {
r.metrics.Enabled = !r.metrics.Enabled
}
2020-02-12 13:55:04 +01:00
2020-12-31 18:44:56 +01:00
func (r *Rán) applyTask(t pixelflut.FlutTask) {
2020-02-14 22:29:58 +01:00
r.task = t
2020-12-31 18:44:56 +01:00
if !t.IsFlutable() {
return
}
for i, c := range r.clients {
2020-02-14 22:29:58 +01:00
ack := FlutAck{}
err := c.Call("Hevring.Flut", r.task, &ack)
if err != nil || !ack.Ok {
log.Printf("[rán] client %d didn't accept task", i)
2020-02-12 13:55:04 +01:00
}
2020-02-14 22:29:58 +01:00
}
}
2020-02-11 14:46:58 +01:00
2020-02-14 22:29:58 +01:00
func (r *Rán) stopTask() {
// @robustness: errorchecking
for _, c := range r.clients {
ack := FlutAck{}
c.Call("Hevring.Stop", 0, &ack) // @speed: async
}
}
func (r *Rán) handleExit(stopChan <-chan bool, wg *sync.WaitGroup) {
wg.Add(1)
defer wg.Done()
2020-02-14 22:29:58 +01:00
<-stopChan
for _, c := range r.clients {
ack := FlutAck{}
c.Call("Hevring.Die", 0, &ack) // @speed: async
}
// FIXME: why the fuck are we quitting before this loop is complete?
2020-02-11 14:46:58 +01:00
}
2020-12-31 18:44:56 +01:00
// SetTask assigns a pixelflut.FlutTask to Rán, distributing it to all clients
func (r *Rán) SetTask(t pixelflut.FlutTask) {
2020-02-12 11:20:15 +01:00
// @incomplete: smart task creation:
// fetch server state & sample foreign activity in image regions. assign
// subregions to clients (per connection), considering their bandwidth.
2020-12-31 18:44:56 +01:00
r.applyTask(t)
2020-02-11 14:46:58 +01:00
}