From 474901fbd561b0426d846ab23c98e5b40ba7f56c Mon Sep 17 00:00:00 2001 From: Norwin Roosen Date: Thu, 31 Dec 2020 18:44:56 +0100 Subject: [PATCH] move FlutTask to pixelflut pkg --- main.go | 10 +++- pixelflut/{api.go => canvas.go} | 55 ----------------- pixelflut/flut.go | 102 ++++++++++++++++++++++++++++++++ rpc/dottir.go | 27 ++------- rpc/ran.go | 27 +++------ rpc/repl.go | 30 +++++----- 6 files changed, 139 insertions(+), 112 deletions(-) rename pixelflut/{api.go => canvas.go} (52%) create mode 100644 pixelflut/flut.go diff --git a/main.go b/main.go index 2b2e103..df37052 100644 --- a/main.go +++ b/main.go @@ -70,7 +70,15 @@ func taskFromFlags(stop chan bool, wg *sync.WaitGroup) { } } - r.SetTask(img, image.Pt(*x, *y), *address, *connections) + r.SetTask(pixelflut.FlutTask{ + FlutTaskOpts: pixelflut.FlutTaskOpts{ + Address: *address, + MaxConns: *connections, + Offset: image.Pt(*x, *y), + Shuffle: true, + }, + Img: img, + }) } if startClient { diff --git a/pixelflut/api.go b/pixelflut/canvas.go similarity index 52% rename from pixelflut/api.go rename to pixelflut/canvas.go index 77c7282..6f89971 100644 --- a/pixelflut/api.go +++ b/pixelflut/canvas.go @@ -7,63 +7,8 @@ import ( "image/color" "log" "net" - "sync" - "time" - - "github.com/SpeckiJ/Hochwasser/render" ) -// Flut asynchronously sends the given image to pixelflut server at `address` -// using `conns` connections. Pixels are sent column wise, unless `shuffle` -// is set. Stops when stop is closed. -// @cleanup: use FlutTask{} as arg -func Flut(img *image.NRGBA, position image.Point, shuffle, rgbsplit, randoffset bool, address string, conns int, stop chan bool, wg *sync.WaitGroup) { - var cmds commands - if rgbsplit { - // do a RGB split of white - imgmod := render.ImgColorFilter(img, color.NRGBA{0xff, 0xff, 0xff, 0xff}, color.NRGBA{0xff, 0, 0, 0xff}) - cmds = append(cmds, commandsFromImage(imgmod, image.Pt(position.X-10, position.Y-10))...) - imgmod = render.ImgColorFilter(img, color.NRGBA{0xff, 0xff, 0xff, 0xff}, color.NRGBA{0, 0xff, 0, 0xff}) - cmds = append(cmds, commandsFromImage(imgmod, image.Pt(position.X+10, position.Y))...) - imgmod = render.ImgColorFilter(img, color.NRGBA{0xff, 0xff, 0xff, 0xff}, color.NRGBA{0, 0, 0xff, 0xff}) - cmds = append(cmds, commandsFromImage(imgmod, image.Pt(position.X-10, position.Y+10))...) - cmds = append(cmds, commandsFromImage(img, position)...) - } else { - cmds = commandsFromImage(img, position) - } - - if shuffle { - cmds.Shuffle() - } - - var messages [][]byte - var maxOffsetX, maxOffsetY int - if randoffset { - maxX, maxY := CanvasSize(address) - maxOffsetX = maxX - img.Bounds().Canon().Dx() - maxOffsetY = maxY - img.Bounds().Canon().Dy() - messages = cmds.Chunk(1) // each connection should send the full img - } else { - messages = cmds.Chunk(conns) - } - - bombWg := sync.WaitGroup{} - for i := 0; i < conns; i++ { - msg := messages[0] - if len(messages) > i { - msg = messages[i] - } - - time.Sleep(66 * time.Millisecond) // avoid crashing the server - - go bombAddress(msg, address, maxOffsetX, maxOffsetY, stop, &bombWg) - } - bombWg.Wait() - if wg != nil { - wg.Done() - } -} - // CanvasSize returns the size of the canvas as returned by the server func CanvasSize(address string) (int, int) { conn, err := net.Dial("tcp", address) diff --git a/pixelflut/flut.go b/pixelflut/flut.go new file mode 100644 index 0000000..afe3b1c --- /dev/null +++ b/pixelflut/flut.go @@ -0,0 +1,102 @@ +package pixelflut + +import ( + "fmt" + "image" + "image/color" + "sync" + "time" + + "github.com/SpeckiJ/Hochwasser/render" +) + +// FlutTask contains all data that is needed to flut +type FlutTask struct { + FlutTaskOpts + Img FlutTaskData +} + +// FlutTaskOpts specifies parameters of the flut +type FlutTaskOpts struct { + Address string + MaxConns int + Offset image.Point + Paused bool + Shuffle bool + RGBSplit bool + RandOffset bool +} + +// FlutTaskData contains the actual pixeldata to flut, separated because of size +type FlutTaskData = *image.NRGBA + +func (t FlutTask) String() string { + img := "nil" + if t.Img != nil { + img = t.Img.Bounds().Size().String() + } + return fmt.Sprintf( + " %d conns @ %s\n img %v offset %v\n shuffle %v rgbsplit %v randoffset %v paused %v", + t.MaxConns, t.Address, img, t.Offset, t.Shuffle, t.RGBSplit, t.RandOffset, t.Paused, + ) +} + +// IsFlutable indicates if a task is properly initialized & not paused +func (t FlutTask) IsFlutable() bool { + return t.Img != nil && t.MaxConns > 0 && t.Address != "" && !t.Paused +} + +// Flut asynchronously sends the given image to pixelflut server at `address` +// using `conns` connections. Pixels are sent column wise, unless `shuffle` +// is set. Stops when stop is closed. +// @cleanup: use FlutTask{} as arg +func Flut(t FlutTask, stop chan bool, wg *sync.WaitGroup) { + if !t.IsFlutable() { + return // @robustness: actually return an error here? + } + + var cmds commands + if t.RGBSplit { + // do a RGB split of white + imgmod := render.ImgColorFilter(t.Img, color.NRGBA{0xff, 0xff, 0xff, 0xff}, color.NRGBA{0xff, 0, 0, 0xff}) + cmds = append(cmds, commandsFromImage(imgmod, image.Pt(t.Offset.X-10, t.Offset.Y-10))...) + imgmod = render.ImgColorFilter(t.Img, color.NRGBA{0xff, 0xff, 0xff, 0xff}, color.NRGBA{0, 0xff, 0, 0xff}) + cmds = append(cmds, commandsFromImage(imgmod, image.Pt(t.Offset.X+10, t.Offset.Y))...) + imgmod = render.ImgColorFilter(t.Img, color.NRGBA{0xff, 0xff, 0xff, 0xff}, color.NRGBA{0, 0, 0xff, 0xff}) + cmds = append(cmds, commandsFromImage(imgmod, image.Pt(t.Offset.X-10, t.Offset.Y+10))...) + cmds = append(cmds, commandsFromImage(t.Img, t.Offset)...) + } else { + cmds = commandsFromImage(t.Img, t.Offset) + } + + if t.Shuffle { + cmds.Shuffle() + } + + var messages [][]byte + var maxOffsetX, maxOffsetY int + if t.RandOffset { + maxX, maxY := CanvasSize(t.Address) + maxOffsetX = maxX - t.Img.Bounds().Canon().Dx() + maxOffsetY = maxY - t.Img.Bounds().Canon().Dy() + messages = cmds.Chunk(1) // each connection should send the full img + } else { + messages = cmds.Chunk(t.MaxConns) + } + + bombWg := sync.WaitGroup{} + for i := 0; i < t.MaxConns; i++ { + msg := messages[0] + if len(messages) > i { + msg = messages[i] + } + + time.Sleep(66 * time.Millisecond) // avoid crashing the server + + go bombAddress(msg, t.Address, maxOffsetX, maxOffsetY, stop, &bombWg) + } + bombWg.Wait() + if wg != nil { + wg.Done() + } +} diff --git a/rpc/dottir.go b/rpc/dottir.go index 2485f49..ed55b96 100644 --- a/rpc/dottir.go +++ b/rpc/dottir.go @@ -2,7 +2,6 @@ package rpc import ( "fmt" - "image" "log" "net" "net/rpc" @@ -40,30 +39,12 @@ func ConnectHevring(ránAddress string, stop chan bool, wg *sync.WaitGroup) { } type Hevring struct { - task FlutTask + task pixelflut.FlutTask taskQuit chan bool quit chan bool wg *sync.WaitGroup } -type FlutTask struct { - Address string - MaxConns int - Img *image.NRGBA - Offset image.Point - Paused bool - Shuffle bool // TODO: refactor these as RenderOpts bitfield - RGBSplit bool - RandOffset bool -} - -func (t FlutTask) String() string { - return fmt.Sprintf( - " %d conns @ %s\n img %v offset %v\n shuffle %v rgbsplit %v randoffset %v paused %v", - t.MaxConns, t.Address, t.Img.Bounds().Size(), t.Offset, t.Shuffle, t.RGBSplit, t.RandOffset, t.Paused, - ) -} - type FlutAck struct{ Ok bool } type FlutStatus struct { @@ -72,7 +53,7 @@ type FlutStatus struct { Fluting bool } -func (h *Hevring) Flut(task FlutTask, reply *FlutAck) error { +func (h *Hevring) Flut(task pixelflut.FlutTask, reply *FlutAck) error { // stop old task if new task is received if h.taskQuit != nil { close(h.taskQuit) @@ -82,7 +63,7 @@ func (h *Hevring) Flut(task FlutTask, reply *FlutAck) error { h.task = task h.taskQuit = make(chan bool) - go pixelflut.Flut(task.Img, task.Offset, task.Shuffle, task.RGBSplit, task.RandOffset, task.Address, task.MaxConns, h.taskQuit, nil) + go pixelflut.Flut(task, h.taskQuit, nil) reply.Ok = true return nil } @@ -98,7 +79,7 @@ func (h *Hevring) Status(metrics bool, reply *FlutStatus) error { func (h *Hevring) Stop(x int, reply *FlutAck) error { if h.taskQuit != nil { fmt.Println("[hevring] stopping task") - h.task = FlutTask{} + h.task = pixelflut.FlutTask{} close(h.taskQuit) h.taskQuit = nil reply.Ok = true diff --git a/rpc/ran.go b/rpc/ran.go index 602f364..0f2928c 100644 --- a/rpc/ran.go +++ b/rpc/ran.go @@ -2,7 +2,6 @@ package rpc import ( "fmt" - "image" "log" "net" "net/rpc" @@ -16,7 +15,7 @@ import ( // Implements `Fluter` type Rán struct { clients []*rpc.Client - task FlutTask + task pixelflut.FlutTask metrics pixelflut.Performance } @@ -44,7 +43,7 @@ func SummonRán(address string, stopChan chan bool, wg *sync.WaitGroup) *Rán { fmt.Printf("[rán] client connected (%v). current clients: %v\n", conn.RemoteAddr(), len(r.clients)) - if (r.task != FlutTask{}) { + if r.task.IsFlutable() { ack := FlutAck{} err = client.Call("Hevring.Flut", r.task, &ack) if err != nil || !ack.Ok { @@ -98,18 +97,15 @@ func SummonRán(address string, stopChan chan bool, wg *sync.WaitGroup) *Rán { return r } -func (r *Rán) getTask() FlutTask { return r.task } +func (r *Rán) getTask() pixelflut.FlutTask { return r.task } func (r *Rán) toggleMetrics() { r.metrics.Enabled = !r.metrics.Enabled } -func (r *Rán) applyTask(t FlutTask) { - if (t == FlutTask{}) { // @robustness: FlutTask should provide .IsValid() - return - } +func (r *Rán) applyTask(t pixelflut.FlutTask) { r.task = t - if t.Paused { + if !t.IsFlutable() { return } for i, c := range r.clients { @@ -139,17 +135,10 @@ func (r *Rán) handleExit(stopChan <-chan bool, wg *sync.WaitGroup) { } } -// SetTask assigns a FlutTask to Rán, distributing it to all clients -func (r *Rán) SetTask(img *image.NRGBA, offset image.Point, address string, maxConns int) { +// SetTask assigns a pixelflut.FlutTask to Rán, distributing it to all clients +func (r *Rán) SetTask(t pixelflut.FlutTask) { // @incomplete: smart task creation: // fetch server state & sample foreign activity in image regions. assign // subregions to clients (per connection), considering their bandwidth. - - r.applyTask(FlutTask{ - Address: address, - MaxConns: maxConns, - Img: img, - Offset: offset, - Shuffle: true, - }) + r.applyTask(t) } diff --git a/rpc/repl.go b/rpc/repl.go index 4377d08..6e93484 100644 --- a/rpc/repl.go +++ b/rpc/repl.go @@ -10,13 +10,14 @@ import ( "strconv" "strings" + "github.com/SpeckiJ/Hochwasser/pixelflut" "github.com/SpeckiJ/Hochwasser/render" ) // Fluter implements flut operations that can be triggered via a REPL type Fluter interface { - getTask() FlutTask - applyTask(FlutTask) + getTask() pixelflut.FlutTask + applyTask(pixelflut.FlutTask) stopTask() toggleMetrics() } @@ -122,6 +123,7 @@ func RunREPL(f Fluter) { path := strings.Join(args, " ") if img, err := render.ReadImage(path); err != nil { fmt.Println(err) + continue } else { t.Img = img } @@ -147,19 +149,19 @@ func RunREPL(f Fluter) { func printHelp() { fmt.Println(`available commands: - start start fluting - stop pause fluting - conns set number of connections per client - addr : set target server - offset set top-left offset - offset rand random offset for each draw - metrics toggle bandwidth reporting (may cost some performance) + start start fluting + stop pause fluting + conns set number of connections per client + addr : set target server + offset set top-left offset + offset rand random offset for each draw + metrics toggle bandwidth reporting (may cost some performance) - img set image - txt send text - txt [ [ []] enter interactive text mode - rgbsplit toggle RGB split effect - shuffle toggle between column-wise & randomized draw order`) + img set image + txt send text + txt [ [ []] enter interactive text mode + rgbsplit toggle RGB split effect + shuffle toggle between column-wise & randomized draw order`) } // try to parse as hex-encoded RGB color,