add performance reporting

costs ~9% bomb performance;
can be toggled in Rán REPL via 'metrics'
This commit is contained in:
Norwin Roosen 2020-02-14 16:37:19 +01:00
parent 2ec417da6a
commit 8a22c7bf29
3 changed files with 121 additions and 8 deletions

View File

@ -1,13 +1,76 @@
package pixelflut
import (
"fmt"
"log"
"net"
"sync"
"time"
)
// @speed: add some performance reporting mechanism on these functions when
// called as goroutines
// Performance contains pixelflut metrics
type Performance struct {
Enabled bool
Conns int
BytesPerSec int
BytesTotal int
connsReporter chan int
bytesReporter chan int
bytes int
}
func (p Performance) String() string {
return fmt.Sprintf("%v conns\t%v\t%v/s",
p.Conns, fmtBytes(p.BytesTotal), fmtBytes(p.BytesPerSec))
}
// https://yourbasic.org/golang/byte-count.go
func fmtBytes(b int) string {
const unit = 1024
if b < unit {
return fmt.Sprintf("%d B", b)
}
div, exp := int64(unit), 0
for n := b / unit; n >= unit; n /= unit {
div *= unit
exp++
}
return fmt.Sprintf("%.1f %ciB",
float64(b)/float64(div), "KMGTPE"[exp])
}
// PerformanceReporter provides pixelflut performance metrics, when Enabled is true.
// @speed: Note that enabling costs ~9% bomb performance under high throughput.
var PerformanceReporter = initPerfReporter()
// should be called only once
func initPerfReporter() *Performance {
r := new(Performance)
r.bytesReporter = make(chan int, 512)
r.connsReporter = make(chan int, 512)
go func() {
for {
select {
case b := <-r.bytesReporter:
r.bytes += b
r.BytesTotal += b
case c := <-r.connsReporter:
r.Conns += c
}
}
}()
go func() {
for {
time.Sleep(time.Second)
r.BytesPerSec = r.bytes
r.bytes = 0
}
}()
return r
}
// bombAddress writes the given message via plain TCP to the given address,
// as fast as possible, until stop is closed.
@ -22,15 +85,21 @@ func bombAddress(message []byte, address string, stop chan bool, wg *sync.WaitGr
}
func bombConn(message []byte, conn net.Conn, stop chan bool) {
PerformanceReporter.connsReporter <- 1
defer func() { PerformanceReporter.connsReporter <- -1 }()
for {
select {
case <-stop:
return
default:
_, err := conn.Write(message)
b, err := conn.Write(message)
if err != nil {
log.Fatal(err)
}
if PerformanceReporter.Enabled {
PerformanceReporter.bytesReporter <- b
}
}
}
}

View File

@ -39,6 +39,12 @@ type FlutTask struct {
type FlutAck struct{ Ok bool }
type FlutStatus struct {
*pixelflut.Performance
Ok bool
Fluting bool
}
func (h *Hevring) Flut(task FlutTask, reply *FlutAck) error {
if (h.task != FlutTask{}) {
// @incomplete: stop old task if new task is received
@ -56,9 +62,11 @@ func (h *Hevring) Flut(task FlutTask, reply *FlutAck) error {
return nil
}
func (h *Hevring) Status(x int, reply *FlutAck) error {
// @incomplete: provide performance metrics
func (h *Hevring) Status(metrics bool, reply *FlutStatus) error {
pixelflut.PerformanceReporter.Enabled = metrics
reply.Performance = pixelflut.PerformanceReporter
reply.Ok = true
reply.Fluting = h.taskQuit != nil
return nil
}

View File

@ -1,7 +1,6 @@
package rpc
import (
// "io"
"bufio"
"fmt"
"image"
@ -12,11 +11,14 @@ import (
"strings"
"sync"
"time"
"github.com/SpeckiJ/Hochwasser/pixelflut"
)
type Rán struct {
clients []*rpc.Client
task FlutTask
metrics pixelflut.Performance
}
// SummonRán sets up the RPC master, accepting connections at addres (":1234")
@ -59,11 +61,19 @@ func SummonRán(address string, stopChan chan bool, wg *sync.WaitGroup) *Rán {
time.Sleep(100 * time.Millisecond)
var clients []*rpc.Client
r.metrics.Conns = 0
r.metrics.BytesPerSec = 0
r.metrics.BytesTotal = 0
for _, c := range r.clients {
status := FlutAck{}
err := c.Call("Hevring.Status", 0, &status)
status := FlutStatus{}
err := c.Call("Hevring.Status", r.metrics.Enabled, &status)
if err == nil && status.Ok {
clients = append(clients, c)
r.metrics.Conns += status.Conns
r.metrics.BytesPerSec += status.BytesPerSec
r.metrics.BytesTotal += status.BytesTotal
}
}
if len(r.clients) != len(clients) {
@ -73,6 +83,16 @@ func SummonRán(address string, stopChan chan bool, wg *sync.WaitGroup) *Rán {
}
}()
// print performance
go func() {
for {
time.Sleep(5 * time.Second)
if r.metrics.Enabled {
fmt.Println(r.metrics)
}
}
}()
// REPL to change tasks without loosing clients
go func() {
scanner := bufio.NewScanner(os.Stdin)
@ -86,6 +106,18 @@ func SummonRán(address string, stopChan chan bool, wg *sync.WaitGroup) *Rán {
c.Call("Hevring.Stop", 0, &ack) // @speed: async
}
} else if cmd == "start" {
if (r.task != FlutTask{}) {
for _, c := range r.clients {
ack := FlutAck{}
// @speed: should send tasks async
err := c.Call("Hevring.Flut", r.task, &ack)
if err != nil || !ack.Ok {
log.Printf("[rán] client didn't accept task")
}
}
}
} else if cmd == "img" && len(args) > 0 {
// // @incomplete
// path := args[0]
@ -97,6 +129,10 @@ func SummonRán(address string, stopChan chan bool, wg *sync.WaitGroup) *Rán {
// offset = image.Pt(x, y)
// }
// task := FlutTask{}
} else if cmd == "metrics" {
r.metrics.Enabled = !r.metrics.Enabled
}
}
}()