move FlutTask to pixelflut pkg

This commit is contained in:
Norwin Roosen 2020-12-31 18:44:56 +01:00
parent 5f55530e3e
commit 474901fbd5
No known key found for this signature in database
GPG Key ID: 24BC059DE24C43A3
6 changed files with 139 additions and 112 deletions

10
main.go
View File

@ -70,7 +70,15 @@ func taskFromFlags(stop chan bool, wg *sync.WaitGroup) {
} }
} }
r.SetTask(img, image.Pt(*x, *y), *address, *connections) r.SetTask(pixelflut.FlutTask{
FlutTaskOpts: pixelflut.FlutTaskOpts{
Address: *address,
MaxConns: *connections,
Offset: image.Pt(*x, *y),
Shuffle: true,
},
Img: img,
})
} }
if startClient { if startClient {

View File

@ -7,63 +7,8 @@ import (
"image/color" "image/color"
"log" "log"
"net" "net"
"sync"
"time"
"github.com/SpeckiJ/Hochwasser/render"
) )
// Flut asynchronously sends the given image to pixelflut server at `address`
// using `conns` connections. Pixels are sent column wise, unless `shuffle`
// is set. Stops when stop is closed.
// @cleanup: use FlutTask{} as arg
func Flut(img *image.NRGBA, position image.Point, shuffle, rgbsplit, randoffset bool, address string, conns int, stop chan bool, wg *sync.WaitGroup) {
var cmds commands
if rgbsplit {
// do a RGB split of white
imgmod := render.ImgColorFilter(img, color.NRGBA{0xff, 0xff, 0xff, 0xff}, color.NRGBA{0xff, 0, 0, 0xff})
cmds = append(cmds, commandsFromImage(imgmod, image.Pt(position.X-10, position.Y-10))...)
imgmod = render.ImgColorFilter(img, color.NRGBA{0xff, 0xff, 0xff, 0xff}, color.NRGBA{0, 0xff, 0, 0xff})
cmds = append(cmds, commandsFromImage(imgmod, image.Pt(position.X+10, position.Y))...)
imgmod = render.ImgColorFilter(img, color.NRGBA{0xff, 0xff, 0xff, 0xff}, color.NRGBA{0, 0, 0xff, 0xff})
cmds = append(cmds, commandsFromImage(imgmod, image.Pt(position.X-10, position.Y+10))...)
cmds = append(cmds, commandsFromImage(img, position)...)
} else {
cmds = commandsFromImage(img, position)
}
if shuffle {
cmds.Shuffle()
}
var messages [][]byte
var maxOffsetX, maxOffsetY int
if randoffset {
maxX, maxY := CanvasSize(address)
maxOffsetX = maxX - img.Bounds().Canon().Dx()
maxOffsetY = maxY - img.Bounds().Canon().Dy()
messages = cmds.Chunk(1) // each connection should send the full img
} else {
messages = cmds.Chunk(conns)
}
bombWg := sync.WaitGroup{}
for i := 0; i < conns; i++ {
msg := messages[0]
if len(messages) > i {
msg = messages[i]
}
time.Sleep(66 * time.Millisecond) // avoid crashing the server
go bombAddress(msg, address, maxOffsetX, maxOffsetY, stop, &bombWg)
}
bombWg.Wait()
if wg != nil {
wg.Done()
}
}
// CanvasSize returns the size of the canvas as returned by the server // CanvasSize returns the size of the canvas as returned by the server
func CanvasSize(address string) (int, int) { func CanvasSize(address string) (int, int) {
conn, err := net.Dial("tcp", address) conn, err := net.Dial("tcp", address)

102
pixelflut/flut.go Normal file
View File

@ -0,0 +1,102 @@
package pixelflut
import (
"fmt"
"image"
"image/color"
"sync"
"time"
"github.com/SpeckiJ/Hochwasser/render"
)
// FlutTask contains all data that is needed to flut
type FlutTask struct {
FlutTaskOpts
Img FlutTaskData
}
// FlutTaskOpts specifies parameters of the flut
type FlutTaskOpts struct {
Address string
MaxConns int
Offset image.Point
Paused bool
Shuffle bool
RGBSplit bool
RandOffset bool
}
// FlutTaskData contains the actual pixeldata to flut, separated because of size
type FlutTaskData = *image.NRGBA
func (t FlutTask) String() string {
img := "nil"
if t.Img != nil {
img = t.Img.Bounds().Size().String()
}
return fmt.Sprintf(
" %d conns @ %s\n img %v offset %v\n shuffle %v rgbsplit %v randoffset %v paused %v",
t.MaxConns, t.Address, img, t.Offset, t.Shuffle, t.RGBSplit, t.RandOffset, t.Paused,
)
}
// IsFlutable indicates if a task is properly initialized & not paused
func (t FlutTask) IsFlutable() bool {
return t.Img != nil && t.MaxConns > 0 && t.Address != "" && !t.Paused
}
// Flut asynchronously sends the given image to pixelflut server at `address`
// using `conns` connections. Pixels are sent column wise, unless `shuffle`
// is set. Stops when stop is closed.
// @cleanup: use FlutTask{} as arg
func Flut(t FlutTask, stop chan bool, wg *sync.WaitGroup) {
if !t.IsFlutable() {
return // @robustness: actually return an error here?
}
var cmds commands
if t.RGBSplit {
// do a RGB split of white
imgmod := render.ImgColorFilter(t.Img, color.NRGBA{0xff, 0xff, 0xff, 0xff}, color.NRGBA{0xff, 0, 0, 0xff})
cmds = append(cmds, commandsFromImage(imgmod, image.Pt(t.Offset.X-10, t.Offset.Y-10))...)
imgmod = render.ImgColorFilter(t.Img, color.NRGBA{0xff, 0xff, 0xff, 0xff}, color.NRGBA{0, 0xff, 0, 0xff})
cmds = append(cmds, commandsFromImage(imgmod, image.Pt(t.Offset.X+10, t.Offset.Y))...)
imgmod = render.ImgColorFilter(t.Img, color.NRGBA{0xff, 0xff, 0xff, 0xff}, color.NRGBA{0, 0, 0xff, 0xff})
cmds = append(cmds, commandsFromImage(imgmod, image.Pt(t.Offset.X-10, t.Offset.Y+10))...)
cmds = append(cmds, commandsFromImage(t.Img, t.Offset)...)
} else {
cmds = commandsFromImage(t.Img, t.Offset)
}
if t.Shuffle {
cmds.Shuffle()
}
var messages [][]byte
var maxOffsetX, maxOffsetY int
if t.RandOffset {
maxX, maxY := CanvasSize(t.Address)
maxOffsetX = maxX - t.Img.Bounds().Canon().Dx()
maxOffsetY = maxY - t.Img.Bounds().Canon().Dy()
messages = cmds.Chunk(1) // each connection should send the full img
} else {
messages = cmds.Chunk(t.MaxConns)
}
bombWg := sync.WaitGroup{}
for i := 0; i < t.MaxConns; i++ {
msg := messages[0]
if len(messages) > i {
msg = messages[i]
}
time.Sleep(66 * time.Millisecond) // avoid crashing the server
go bombAddress(msg, t.Address, maxOffsetX, maxOffsetY, stop, &bombWg)
}
bombWg.Wait()
if wg != nil {
wg.Done()
}
}

View File

@ -2,7 +2,6 @@ package rpc
import ( import (
"fmt" "fmt"
"image"
"log" "log"
"net" "net"
"net/rpc" "net/rpc"
@ -40,30 +39,12 @@ func ConnectHevring(ránAddress string, stop chan bool, wg *sync.WaitGroup) {
} }
type Hevring struct { type Hevring struct {
task FlutTask task pixelflut.FlutTask
taskQuit chan bool taskQuit chan bool
quit chan bool quit chan bool
wg *sync.WaitGroup wg *sync.WaitGroup
} }
type FlutTask struct {
Address string
MaxConns int
Img *image.NRGBA
Offset image.Point
Paused bool
Shuffle bool // TODO: refactor these as RenderOpts bitfield
RGBSplit bool
RandOffset bool
}
func (t FlutTask) String() string {
return fmt.Sprintf(
" %d conns @ %s\n img %v offset %v\n shuffle %v rgbsplit %v randoffset %v paused %v",
t.MaxConns, t.Address, t.Img.Bounds().Size(), t.Offset, t.Shuffle, t.RGBSplit, t.RandOffset, t.Paused,
)
}
type FlutAck struct{ Ok bool } type FlutAck struct{ Ok bool }
type FlutStatus struct { type FlutStatus struct {
@ -72,7 +53,7 @@ type FlutStatus struct {
Fluting bool Fluting bool
} }
func (h *Hevring) Flut(task FlutTask, reply *FlutAck) error { func (h *Hevring) Flut(task pixelflut.FlutTask, reply *FlutAck) error {
// stop old task if new task is received // stop old task if new task is received
if h.taskQuit != nil { if h.taskQuit != nil {
close(h.taskQuit) close(h.taskQuit)
@ -82,7 +63,7 @@ func (h *Hevring) Flut(task FlutTask, reply *FlutAck) error {
h.task = task h.task = task
h.taskQuit = make(chan bool) h.taskQuit = make(chan bool)
go pixelflut.Flut(task.Img, task.Offset, task.Shuffle, task.RGBSplit, task.RandOffset, task.Address, task.MaxConns, h.taskQuit, nil) go pixelflut.Flut(task, h.taskQuit, nil)
reply.Ok = true reply.Ok = true
return nil return nil
} }
@ -98,7 +79,7 @@ func (h *Hevring) Status(metrics bool, reply *FlutStatus) error {
func (h *Hevring) Stop(x int, reply *FlutAck) error { func (h *Hevring) Stop(x int, reply *FlutAck) error {
if h.taskQuit != nil { if h.taskQuit != nil {
fmt.Println("[hevring] stopping task") fmt.Println("[hevring] stopping task")
h.task = FlutTask{} h.task = pixelflut.FlutTask{}
close(h.taskQuit) close(h.taskQuit)
h.taskQuit = nil h.taskQuit = nil
reply.Ok = true reply.Ok = true

View File

@ -2,7 +2,6 @@ package rpc
import ( import (
"fmt" "fmt"
"image"
"log" "log"
"net" "net"
"net/rpc" "net/rpc"
@ -16,7 +15,7 @@ import (
// Implements `Fluter` // Implements `Fluter`
type Rán struct { type Rán struct {
clients []*rpc.Client clients []*rpc.Client
task FlutTask task pixelflut.FlutTask
metrics pixelflut.Performance metrics pixelflut.Performance
} }
@ -44,7 +43,7 @@ func SummonRán(address string, stopChan chan bool, wg *sync.WaitGroup) *Rán {
fmt.Printf("[rán] client connected (%v). current clients: %v\n", fmt.Printf("[rán] client connected (%v). current clients: %v\n",
conn.RemoteAddr(), len(r.clients)) conn.RemoteAddr(), len(r.clients))
if (r.task != FlutTask{}) { if r.task.IsFlutable() {
ack := FlutAck{} ack := FlutAck{}
err = client.Call("Hevring.Flut", r.task, &ack) err = client.Call("Hevring.Flut", r.task, &ack)
if err != nil || !ack.Ok { if err != nil || !ack.Ok {
@ -98,18 +97,15 @@ func SummonRán(address string, stopChan chan bool, wg *sync.WaitGroup) *Rán {
return r return r
} }
func (r *Rán) getTask() FlutTask { return r.task } func (r *Rán) getTask() pixelflut.FlutTask { return r.task }
func (r *Rán) toggleMetrics() { func (r *Rán) toggleMetrics() {
r.metrics.Enabled = !r.metrics.Enabled r.metrics.Enabled = !r.metrics.Enabled
} }
func (r *Rán) applyTask(t FlutTask) { func (r *Rán) applyTask(t pixelflut.FlutTask) {
if (t == FlutTask{}) { // @robustness: FlutTask should provide .IsValid()
return
}
r.task = t r.task = t
if t.Paused { if !t.IsFlutable() {
return return
} }
for i, c := range r.clients { for i, c := range r.clients {
@ -139,17 +135,10 @@ func (r *Rán) handleExit(stopChan <-chan bool, wg *sync.WaitGroup) {
} }
} }
// SetTask assigns a FlutTask to Rán, distributing it to all clients // SetTask assigns a pixelflut.FlutTask to Rán, distributing it to all clients
func (r *Rán) SetTask(img *image.NRGBA, offset image.Point, address string, maxConns int) { func (r *Rán) SetTask(t pixelflut.FlutTask) {
// @incomplete: smart task creation: // @incomplete: smart task creation:
// fetch server state & sample foreign activity in image regions. assign // fetch server state & sample foreign activity in image regions. assign
// subregions to clients (per connection), considering their bandwidth. // subregions to clients (per connection), considering their bandwidth.
r.applyTask(t)
r.applyTask(FlutTask{
Address: address,
MaxConns: maxConns,
Img: img,
Offset: offset,
Shuffle: true,
})
} }

View File

@ -10,13 +10,14 @@ import (
"strconv" "strconv"
"strings" "strings"
"github.com/SpeckiJ/Hochwasser/pixelflut"
"github.com/SpeckiJ/Hochwasser/render" "github.com/SpeckiJ/Hochwasser/render"
) )
// Fluter implements flut operations that can be triggered via a REPL // Fluter implements flut operations that can be triggered via a REPL
type Fluter interface { type Fluter interface {
getTask() FlutTask getTask() pixelflut.FlutTask
applyTask(FlutTask) applyTask(pixelflut.FlutTask)
stopTask() stopTask()
toggleMetrics() toggleMetrics()
} }
@ -122,6 +123,7 @@ func RunREPL(f Fluter) {
path := strings.Join(args, " ") path := strings.Join(args, " ")
if img, err := render.ReadImage(path); err != nil { if img, err := render.ReadImage(path); err != nil {
fmt.Println(err) fmt.Println(err)
continue
} else { } else {
t.Img = img t.Img = img
} }