package client import ( "fmt" "sync/atomic" "time" "github.com/theater/picomap/lib/halfsiphash" "github.com/theater/picomap/lib/msgpack" ) var HashKey = [8]byte{} type transport interface { Send(data []byte) error SetReadTimeout(timeout time.Duration) Recv() (data []byte, from string, err error) Broadcast() bool Close() error } type Client struct { transport transport timeout time.Duration nextID atomic.Uint32 } func (c *Client) Close() error { return c.transport.Close() } func (c *Client) send(msg any) (uint32, error) { id := c.nextID.Add(1) payload, err := msgpack.Marshal(msg) if err != nil { return 0, fmt.Errorf("encode inner: %w", err) } checksum := halfsiphash.Sum32(payload, HashKey) env := Envelope{MessageID: id, Checksum: checksum, Payload: payload} data, err := msgpack.Marshal(&env) if err != nil { return 0, fmt.Errorf("encode envelope: %w", err) } return id, c.transport.Send(data) } type Response[T any] struct { From string Value *T } func roundTrip[T any](c *Client, req any) ([]Response[T], error) { id, err := c.send(req) if err != nil { return nil, err } c.transport.SetReadTimeout(c.timeout) broadcast := c.transport.Broadcast() var results []Response[T] for { data, from, err := c.transport.Recv() if err != nil { break } var env Envelope if err := msgpack.Unmarshal(data, &env); err != nil { continue } if env.MessageID != id { continue } expected := halfsiphash.Sum32(env.Payload, HashKey) if env.Checksum != expected { continue } var inner any if err := msgpack.Unmarshal(env.Payload, &inner); err != nil { continue } if devErr, ok := inner.(*DeviceError); ok { return nil, devErr } if typed, ok := inner.(*T); ok { results = append(results, Response[T]{From: from, Value: typed}) if !broadcast { break } } } return results, nil } func first[T any](results []Response[T], err error) (*T, error) { if err != nil { return nil, err } if len(results) == 0 { return nil, fmt.Errorf("no response") } return results[0].Value, nil } func (c *Client) PICOBOOT() error { _, err := first(roundTrip[ResponsePICOBOOT](c, &RequestPICOBOOT{})) return err } func (c *Client) Info() (*ResponseInfo, error) { return first(roundTrip[ResponseInfo](c, &RequestInfo{})) } func (c *Client) InfoAll() ([]Response[ResponseInfo], error) { return roundTrip[ResponseInfo](c, &RequestInfo{}) } func (c *Client) Log() (*ResponseLog, error) { return first(roundTrip[ResponseLog](c, &RequestLog{})) } func (c *Client) Test(name string) (*ResponseTest, error) { return first(roundTrip[ResponseTest](c, &RequestTest{Name: name})) }