From 8a22c7bf2972324c68e05598653c76dd44354f54 Mon Sep 17 00:00:00 2001 From: Norwin Roosen Date: Fri, 14 Feb 2020 16:37:19 +0100 Subject: [PATCH] add performance reporting MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit costs ~9% bomb performance; can be toggled in Rán REPL via 'metrics' --- pixelflut/net.go | 75 ++++++++++++++++++++++++++++++++++++++++++++++-- rpc/dottir.go | 12 ++++++-- rpc/ran.go | 42 +++++++++++++++++++++++++-- 3 files changed, 121 insertions(+), 8 deletions(-) diff --git a/pixelflut/net.go b/pixelflut/net.go index d222fbb..fc0ec4c 100644 --- a/pixelflut/net.go +++ b/pixelflut/net.go @@ -1,13 +1,76 @@ package pixelflut import ( + "fmt" "log" "net" "sync" + "time" ) -// @speed: add some performance reporting mechanism on these functions when -// called as goroutines +// Performance contains pixelflut metrics +type Performance struct { + Enabled bool + Conns int + BytesPerSec int + BytesTotal int + + connsReporter chan int + bytesReporter chan int + bytes int +} + +func (p Performance) String() string { + return fmt.Sprintf("%v conns\t%v\t%v/s", + p.Conns, fmtBytes(p.BytesTotal), fmtBytes(p.BytesPerSec)) +} + +// https://yourbasic.org/golang/byte-count.go +func fmtBytes(b int) string { + const unit = 1024 + if b < unit { + return fmt.Sprintf("%d B", b) + } + div, exp := int64(unit), 0 + for n := b / unit; n >= unit; n /= unit { + div *= unit + exp++ + } + return fmt.Sprintf("%.1f %ciB", + float64(b)/float64(div), "KMGTPE"[exp]) +} + +// PerformanceReporter provides pixelflut performance metrics, when Enabled is true. +// @speed: Note that enabling costs ~9% bomb performance under high throughput. +var PerformanceReporter = initPerfReporter() + +// should be called only once +func initPerfReporter() *Performance { + r := new(Performance) + r.bytesReporter = make(chan int, 512) + r.connsReporter = make(chan int, 512) + + go func() { + for { + select { + case b := <-r.bytesReporter: + r.bytes += b + r.BytesTotal += b + case c := <-r.connsReporter: + r.Conns += c + } + } + }() + go func() { + for { + time.Sleep(time.Second) + r.BytesPerSec = r.bytes + r.bytes = 0 + } + }() + + return r +} // bombAddress writes the given message via plain TCP to the given address, // as fast as possible, until stop is closed. @@ -22,15 +85,21 @@ func bombAddress(message []byte, address string, stop chan bool, wg *sync.WaitGr } func bombConn(message []byte, conn net.Conn, stop chan bool) { + PerformanceReporter.connsReporter <- 1 + defer func() { PerformanceReporter.connsReporter <- -1 }() + for { select { case <-stop: return default: - _, err := conn.Write(message) + b, err := conn.Write(message) if err != nil { log.Fatal(err) } + if PerformanceReporter.Enabled { + PerformanceReporter.bytesReporter <- b + } } } } diff --git a/rpc/dottir.go b/rpc/dottir.go index 074c434..392fd10 100644 --- a/rpc/dottir.go +++ b/rpc/dottir.go @@ -39,6 +39,12 @@ type FlutTask struct { type FlutAck struct{ Ok bool } +type FlutStatus struct { + *pixelflut.Performance + Ok bool + Fluting bool +} + func (h *Hevring) Flut(task FlutTask, reply *FlutAck) error { if (h.task != FlutTask{}) { // @incomplete: stop old task if new task is received @@ -56,9 +62,11 @@ func (h *Hevring) Flut(task FlutTask, reply *FlutAck) error { return nil } -func (h *Hevring) Status(x int, reply *FlutAck) error { - // @incomplete: provide performance metrics +func (h *Hevring) Status(metrics bool, reply *FlutStatus) error { + pixelflut.PerformanceReporter.Enabled = metrics + reply.Performance = pixelflut.PerformanceReporter reply.Ok = true + reply.Fluting = h.taskQuit != nil return nil } diff --git a/rpc/ran.go b/rpc/ran.go index 03fbeb5..3e046f3 100644 --- a/rpc/ran.go +++ b/rpc/ran.go @@ -1,7 +1,6 @@ package rpc import ( - // "io" "bufio" "fmt" "image" @@ -12,11 +11,14 @@ import ( "strings" "sync" "time" + + "github.com/SpeckiJ/Hochwasser/pixelflut" ) type Rán struct { clients []*rpc.Client task FlutTask + metrics pixelflut.Performance } // SummonRán sets up the RPC master, accepting connections at addres (":1234") @@ -59,11 +61,19 @@ func SummonRán(address string, stopChan chan bool, wg *sync.WaitGroup) *Rán { time.Sleep(100 * time.Millisecond) var clients []*rpc.Client + + r.metrics.Conns = 0 + r.metrics.BytesPerSec = 0 + r.metrics.BytesTotal = 0 + for _, c := range r.clients { - status := FlutAck{} - err := c.Call("Hevring.Status", 0, &status) + status := FlutStatus{} + err := c.Call("Hevring.Status", r.metrics.Enabled, &status) if err == nil && status.Ok { clients = append(clients, c) + r.metrics.Conns += status.Conns + r.metrics.BytesPerSec += status.BytesPerSec + r.metrics.BytesTotal += status.BytesTotal } } if len(r.clients) != len(clients) { @@ -73,6 +83,16 @@ func SummonRán(address string, stopChan chan bool, wg *sync.WaitGroup) *Rán { } }() + // print performance + go func() { + for { + time.Sleep(5 * time.Second) + if r.metrics.Enabled { + fmt.Println(r.metrics) + } + } + }() + // REPL to change tasks without loosing clients go func() { scanner := bufio.NewScanner(os.Stdin) @@ -86,6 +106,18 @@ func SummonRán(address string, stopChan chan bool, wg *sync.WaitGroup) *Rán { c.Call("Hevring.Stop", 0, &ack) // @speed: async } + } else if cmd == "start" { + if (r.task != FlutTask{}) { + for _, c := range r.clients { + ack := FlutAck{} + // @speed: should send tasks async + err := c.Call("Hevring.Flut", r.task, &ack) + if err != nil || !ack.Ok { + log.Printf("[rán] client didn't accept task") + } + } + } + } else if cmd == "img" && len(args) > 0 { // // @incomplete // path := args[0] @@ -97,6 +129,10 @@ func SummonRán(address string, stopChan chan bool, wg *sync.WaitGroup) *Rán { // offset = image.Pt(x, y) // } // task := FlutTask{} + + } else if cmd == "metrics" { + r.metrics.Enabled = !r.metrics.Enabled + } } }()