Updater logic for the demo ()

Updater allows concurrent processes to update different fields on a json object
which is serialized and passed to a func([]byte). In the demo, different SetFunc
for different fields on the base json object will be passed to the relevant components
(clients, director, game servers, etc). These components will run and pass their state
to the updater.

This updater will be combined with bytesub by passing bytesub's AnnounceLatest
method into the base updater New. This way the demo state of each component
will be passed to all current dashboard viewers.
This commit is contained in:
Scott Redig
2019-07-02 16:12:31 -07:00
committed by GitHub
parent 043ffd69e3
commit 6b1b84c54e
2 changed files with 268 additions and 0 deletions
examples/demo/updater

@ -0,0 +1,137 @@
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package updater provides the ability for concurrently running demo pieces to
// update a shared json object.
package updater
import (
"context"
"encoding/json"
)
// Updater is like a json object, with each field allowed to be updated
// concurrently by a different process. After processing updates, Updater will
// call a provided method with the json serialized value of all of its fields.
type Updater struct {
ctx context.Context
children map[string]*json.RawMessage
updates chan update
set SetFunc
}
// SetFunc serializes the value passed in into json and sets the associated field
// to that value. If nil is passed (BUT NOT a nil value of an interface), the
// field will be removed from the Updater's json object.
type SetFunc func(v interface{})
// New creates an Updater. Set is called when fields update, using the json
// sererialized value of Updater's tree. All updates after ctx is canceled are
// ignored.
func New(ctx context.Context, set func([]byte)) *Updater {
f := func(v interface{}) {
set([]byte(*forceMarshalJson(v)))
}
return NewNested(ctx, SetFunc(f))
}
// NewNested creates an updater based on a field in another updater. This
// allows for grouping of related demo pieces into a single conceptual group.
func NewNested(ctx context.Context, set SetFunc) *Updater {
u := create(ctx, set)
go u.start()
return u
}
func create(ctx context.Context, set SetFunc) *Updater {
return &Updater{
ctx: ctx,
children: make(map[string]*json.RawMessage),
updates: make(chan update),
set: set,
}
}
// ForField returns a function to set the latest value of that demo piece.
func (u *Updater) ForField(field string) SetFunc {
return SetFunc(func(v interface{}) {
var r *json.RawMessage
if v != nil {
r = forceMarshalJson(v)
}
select {
case <-u.ctx.Done():
case u.updates <- update{field, r}:
}
})
}
func (u *Updater) start() {
for {
u.set(u.children)
select {
case <-u.ctx.Done():
u.set(nil)
return
case up := <-u.updates:
if up.value == nil {
delete(u.children, up.field)
} else {
u.children[up.field] = up.value
}
}
applyAllWaitingUpdates:
for {
select {
case up := <-u.updates:
if up.value == nil {
delete(u.children, up.field)
} else {
u.children[up.field] = up.value
}
default:
break applyAllWaitingUpdates
}
}
}
}
type update struct {
field string
value *json.RawMessage
}
// forceMarshalJson is like json.Marshal, but cannot fail. It will instead
// encode any error encountered into the json object on the field Error.
func forceMarshalJson(v interface{}) *json.RawMessage {
b, err := json.Marshal(v)
if err != nil {
e := struct {
Error string
}{
err.Error(),
}
b, err = json.Marshal(e)
if err != nil {
b = []byte("{\"Error\":\"There was an error encoding the json message, additional there was an error encoding that error message.\"}")
}
}
r := json.RawMessage(b)
return &r
}

@ -0,0 +1,131 @@
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package updater
import (
"context"
"encoding/json"
"strconv"
"sync"
"testing"
)
func TestUpdater(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
latest := make(chan string)
base := New(ctx, func(b []byte) {
latest <- string(b)
})
l := <-latest
if l != "{}" {
t.Errorf("Got %s, expected %s", l, "{}")
}
child := NewNested(ctx, base.ForField("Foo"))
l = <-latest
if l != "{\"Foo\":{}}" {
t.Errorf("Got %s, expected %s", l, "{\"Foo\":{}}")
}
child.ForField("Bar")(interface{}((*int)(nil)))
l = <-latest
if l != "{\"Foo\":{\"Bar\":null}}" {
t.Errorf("Got %s, expected %s", l, "{\"Foo\":{\"Bar\":null}}")
}
child.ForField("Bar")(nil)
l = <-latest
if l != "{\"Foo\":{}}" {
t.Errorf("Got %s, expected %s", l, "{\"Foo\":{}}")
}
}
// Fully testing the updater's logic is difficult because it combines multiple
// calls. This test method creates 100 different go routines all trying to
// update a value to force the logic to be invoked.
func TestUpdaterInternal(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Add(100)
go func() {
wg.Wait()
cancel()
}()
latest := ""
set := SetFunc(func(v interface{}) {
if v != nil {
latest = string(*forceMarshalJson(v))
}
})
u := create(ctx, set)
for i := 0; i < 100; i++ {
set := u.ForField(strconv.Itoa(i))
go func() {
set("Hi")
wg.Done()
}()
}
// Blocking call ensures that canceling the context will clean up the internal go routine.
u.start()
expectedMap := make(map[string]string)
for i := 0; i < 100; i++ {
expectedMap[strconv.Itoa(i)] = "Hi"
}
// Not using forceMashal because it by design hides errors, and is used in the
// code being tested.
expectedB, err := json.Marshal(expectedMap)
if err != nil {
t.Fatal(err)
}
expected := string(expectedB)
if latest != expected {
t.Errorf("latest value is wrong. Expected '%s', got '%s'", expected, latest)
}
}
var marshalTests = []struct {
in interface{}
out string
}{
{map[string]int{"hi": 1}, "{\"hi\":1}"},
{make(chan int), "{\"Error\":\"json: unsupported type: chan int\"}"},
}
func TestForceMarshalJson(t *testing.T) {
for _, tt := range marshalTests {
t.Run(tt.out, func(t *testing.T) {
s := string(*forceMarshalJson(tt.in))
if s != tt.out {
t.Errorf("got %s, want %s", s, tt.out)
}
})
}
}