move FlutTask to pixelflut pkg

This commit is contained in:
Norwin Roosen 2020-12-31 18:44:56 +01:00
parent f191386a75
commit 230a3d3f24
No known key found for this signature in database
GPG Key ID: 24BC059DE24C43A3
6 changed files with 139 additions and 112 deletions

10
main.go
View File

@ -70,7 +70,15 @@ func taskFromFlags(stop chan bool, wg *sync.WaitGroup) {
}
}
r.SetTask(img, image.Pt(*x, *y), *address, *connections)
r.SetTask(pixelflut.FlutTask{
FlutTaskOpts: pixelflut.FlutTaskOpts{
Address: *address,
MaxConns: *connections,
Offset: image.Pt(*x, *y),
Shuffle: true,
},
Img: img,
})
}
if startClient {

View File

@ -7,63 +7,8 @@ import (
"image/color"
"log"
"net"
"sync"
"time"
"github.com/SpeckiJ/Hochwasser/render"
)
// Flut asynchronously sends the given image to pixelflut server at `address`
// using `conns` connections. Pixels are sent column wise, unless `shuffle`
// is set. Stops when stop is closed.
// @cleanup: use FlutTask{} as arg
func Flut(img *image.NRGBA, position image.Point, shuffle, rgbsplit, randoffset bool, address string, conns int, stop chan bool, wg *sync.WaitGroup) {
var cmds commands
if rgbsplit {
// do a RGB split of white
imgmod := render.ImgColorFilter(img, color.NRGBA{0xff, 0xff, 0xff, 0xff}, color.NRGBA{0xff, 0, 0, 0xff})
cmds = append(cmds, commandsFromImage(imgmod, image.Pt(position.X-10, position.Y-10))...)
imgmod = render.ImgColorFilter(img, color.NRGBA{0xff, 0xff, 0xff, 0xff}, color.NRGBA{0, 0xff, 0, 0xff})
cmds = append(cmds, commandsFromImage(imgmod, image.Pt(position.X+10, position.Y))...)
imgmod = render.ImgColorFilter(img, color.NRGBA{0xff, 0xff, 0xff, 0xff}, color.NRGBA{0, 0, 0xff, 0xff})
cmds = append(cmds, commandsFromImage(imgmod, image.Pt(position.X-10, position.Y+10))...)
cmds = append(cmds, commandsFromImage(img, position)...)
} else {
cmds = commandsFromImage(img, position)
}
if shuffle {
cmds.Shuffle()
}
var messages [][]byte
var maxOffsetX, maxOffsetY int
if randoffset {
maxX, maxY := CanvasSize(address)
maxOffsetX = maxX - img.Bounds().Canon().Dx()
maxOffsetY = maxY - img.Bounds().Canon().Dy()
messages = cmds.Chunk(1) // each connection should send the full img
} else {
messages = cmds.Chunk(conns)
}
bombWg := sync.WaitGroup{}
for i := 0; i < conns; i++ {
msg := messages[0]
if len(messages) > i {
msg = messages[i]
}
time.Sleep(66 * time.Millisecond) // avoid crashing the server
go bombAddress(msg, address, maxOffsetX, maxOffsetY, stop, &bombWg)
}
bombWg.Wait()
if wg != nil {
wg.Done()
}
}
// CanvasSize returns the size of the canvas as returned by the server
func CanvasSize(address string) (int, int) {
conn, err := net.Dial("tcp", address)

102
pixelflut/flut.go Normal file
View File

@ -0,0 +1,102 @@
package pixelflut
import (
"fmt"
"image"
"image/color"
"sync"
"time"
"github.com/SpeckiJ/Hochwasser/render"
)
// FlutTask contains all data that is needed to flut
type FlutTask struct {
FlutTaskOpts
Img FlutTaskData
}
// FlutTaskOpts specifies parameters of the flut
type FlutTaskOpts struct {
Address string
MaxConns int
Offset image.Point
Paused bool
Shuffle bool
RGBSplit bool
RandOffset bool
}
// FlutTaskData contains the actual pixeldata to flut, separated because of size
type FlutTaskData = *image.NRGBA
func (t FlutTask) String() string {
img := "nil"
if t.Img != nil {
img = t.Img.Bounds().Size().String()
}
return fmt.Sprintf(
" %d conns @ %s\n img %v offset %v\n shuffle %v rgbsplit %v randoffset %v paused %v",
t.MaxConns, t.Address, img, t.Offset, t.Shuffle, t.RGBSplit, t.RandOffset, t.Paused,
)
}
// IsFlutable indicates if a task is properly initialized & not paused
func (t FlutTask) IsFlutable() bool {
return t.Img != nil && t.MaxConns > 0 && t.Address != "" && !t.Paused
}
// Flut asynchronously sends the given image to pixelflut server at `address`
// using `conns` connections. Pixels are sent column wise, unless `shuffle`
// is set. Stops when stop is closed.
// @cleanup: use FlutTask{} as arg
func Flut(t FlutTask, stop chan bool, wg *sync.WaitGroup) {
if !t.IsFlutable() {
return // @robustness: actually return an error here?
}
var cmds commands
if t.RGBSplit {
// do a RGB split of white
imgmod := render.ImgColorFilter(t.Img, color.NRGBA{0xff, 0xff, 0xff, 0xff}, color.NRGBA{0xff, 0, 0, 0xff})
cmds = append(cmds, commandsFromImage(imgmod, image.Pt(t.Offset.X-10, t.Offset.Y-10))...)
imgmod = render.ImgColorFilter(t.Img, color.NRGBA{0xff, 0xff, 0xff, 0xff}, color.NRGBA{0, 0xff, 0, 0xff})
cmds = append(cmds, commandsFromImage(imgmod, image.Pt(t.Offset.X+10, t.Offset.Y))...)
imgmod = render.ImgColorFilter(t.Img, color.NRGBA{0xff, 0xff, 0xff, 0xff}, color.NRGBA{0, 0, 0xff, 0xff})
cmds = append(cmds, commandsFromImage(imgmod, image.Pt(t.Offset.X-10, t.Offset.Y+10))...)
cmds = append(cmds, commandsFromImage(t.Img, t.Offset)...)
} else {
cmds = commandsFromImage(t.Img, t.Offset)
}
if t.Shuffle {
cmds.Shuffle()
}
var messages [][]byte
var maxOffsetX, maxOffsetY int
if t.RandOffset {
maxX, maxY := CanvasSize(t.Address)
maxOffsetX = maxX - t.Img.Bounds().Canon().Dx()
maxOffsetY = maxY - t.Img.Bounds().Canon().Dy()
messages = cmds.Chunk(1) // each connection should send the full img
} else {
messages = cmds.Chunk(t.MaxConns)
}
bombWg := sync.WaitGroup{}
for i := 0; i < t.MaxConns; i++ {
msg := messages[0]
if len(messages) > i {
msg = messages[i]
}
time.Sleep(66 * time.Millisecond) // avoid crashing the server
go bombAddress(msg, t.Address, maxOffsetX, maxOffsetY, stop, &bombWg)
}
bombWg.Wait()
if wg != nil {
wg.Done()
}
}

View File

@ -2,7 +2,6 @@ package rpc
import (
"fmt"
"image"
"log"
"net"
"net/rpc"
@ -40,30 +39,12 @@ func ConnectHevring(ránAddress string, stop chan bool, wg *sync.WaitGroup) {
}
type Hevring struct {
task FlutTask
task pixelflut.FlutTask
taskQuit chan bool
quit chan bool
wg *sync.WaitGroup
}
type FlutTask struct {
Address string
MaxConns int
Img *image.NRGBA
Offset image.Point
Paused bool
Shuffle bool // TODO: refactor these as RenderOpts bitfield
RGBSplit bool
RandOffset bool
}
func (t FlutTask) String() string {
return fmt.Sprintf(
" %d conns @ %s\n img %v offset %v\n shuffle %v rgbsplit %v randoffset %v paused %v",
t.MaxConns, t.Address, t.Img.Bounds().Size(), t.Offset, t.Shuffle, t.RGBSplit, t.RandOffset, t.Paused,
)
}
type FlutAck struct{ Ok bool }
type FlutStatus struct {
@ -72,7 +53,7 @@ type FlutStatus struct {
Fluting bool
}
func (h *Hevring) Flut(task FlutTask, reply *FlutAck) error {
func (h *Hevring) Flut(task pixelflut.FlutTask, reply *FlutAck) error {
// stop old task if new task is received
if h.taskQuit != nil {
close(h.taskQuit)
@ -82,7 +63,7 @@ func (h *Hevring) Flut(task FlutTask, reply *FlutAck) error {
h.task = task
h.taskQuit = make(chan bool)
go pixelflut.Flut(task.Img, task.Offset, task.Shuffle, task.RGBSplit, task.RandOffset, task.Address, task.MaxConns, h.taskQuit, nil)
go pixelflut.Flut(task, h.taskQuit, nil)
reply.Ok = true
return nil
}
@ -98,7 +79,7 @@ func (h *Hevring) Status(metrics bool, reply *FlutStatus) error {
func (h *Hevring) Stop(x int, reply *FlutAck) error {
if h.taskQuit != nil {
fmt.Println("[hevring] stopping task")
h.task = FlutTask{}
h.task = pixelflut.FlutTask{}
close(h.taskQuit)
h.taskQuit = nil
reply.Ok = true

View File

@ -2,7 +2,6 @@ package rpc
import (
"fmt"
"image"
"log"
"net"
"net/rpc"
@ -16,7 +15,7 @@ import (
// Implements `Fluter`
type Rán struct {
clients []*rpc.Client
task FlutTask
task pixelflut.FlutTask
metrics pixelflut.Performance
}
@ -44,7 +43,7 @@ func SummonRán(address string, stopChan chan bool, wg *sync.WaitGroup) *Rán {
fmt.Printf("[rán] client connected (%v). current clients: %v\n",
conn.RemoteAddr(), len(r.clients))
if (r.task != FlutTask{}) {
if r.task.IsFlutable() {
ack := FlutAck{}
err = client.Call("Hevring.Flut", r.task, &ack)
if err != nil || !ack.Ok {
@ -98,18 +97,15 @@ func SummonRán(address string, stopChan chan bool, wg *sync.WaitGroup) *Rán {
return r
}
func (r *Rán) getTask() FlutTask { return r.task }
func (r *Rán) getTask() pixelflut.FlutTask { return r.task }
func (r *Rán) toggleMetrics() {
r.metrics.Enabled = !r.metrics.Enabled
}
func (r *Rán) applyTask(t FlutTask) {
if (t == FlutTask{}) { // @robustness: FlutTask should provide .IsValid()
return
}
func (r *Rán) applyTask(t pixelflut.FlutTask) {
r.task = t
if t.Paused {
if !t.IsFlutable() {
return
}
for i, c := range r.clients {
@ -139,17 +135,10 @@ func (r *Rán) handleExit(stopChan <-chan bool, wg *sync.WaitGroup) {
}
}
// SetTask assigns a FlutTask to Rán, distributing it to all clients
func (r *Rán) SetTask(img *image.NRGBA, offset image.Point, address string, maxConns int) {
// SetTask assigns a pixelflut.FlutTask to Rán, distributing it to all clients
func (r *Rán) SetTask(t pixelflut.FlutTask) {
// @incomplete: smart task creation:
// fetch server state & sample foreign activity in image regions. assign
// subregions to clients (per connection), considering their bandwidth.
r.applyTask(FlutTask{
Address: address,
MaxConns: maxConns,
Img: img,
Offset: offset,
Shuffle: true,
})
r.applyTask(t)
}

View File

@ -10,13 +10,14 @@ import (
"strconv"
"strings"
"github.com/SpeckiJ/Hochwasser/pixelflut"
"github.com/SpeckiJ/Hochwasser/render"
)
// Fluter implements flut operations that can be triggered via a REPL
type Fluter interface {
getTask() FlutTask
applyTask(FlutTask)
getTask() pixelflut.FlutTask
applyTask(pixelflut.FlutTask)
stopTask()
toggleMetrics()
}
@ -122,6 +123,7 @@ func RunREPL(f Fluter) {
path := strings.Join(args, " ")
if img, err := render.ReadImage(path); err != nil {
fmt.Println(err)
continue
} else {
t.Img = img
}