From 3d20bf4c33d06fa539f3cfdc07bef8dd478890d0 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Tue, 7 Apr 2026 22:30:26 +0900 Subject: [PATCH] Per-packet source tracking in transport, via/from in info output --- cmd/picomap/main.go | 11 ++++++----- lib/client/client.go | 29 ++++++++++++++++++----------- lib/client/serial.go | 17 ++++++++++++----- lib/client/udp.go | 21 +++++---------------- 4 files changed, 41 insertions(+), 37 deletions(-) diff --git a/cmd/picomap/main.go b/cmd/picomap/main.go index 6d595eb..03f3762 100644 --- a/cmd/picomap/main.go +++ b/cmd/picomap/main.go @@ -50,9 +50,10 @@ type deviceResult struct { err error } -func printInfo(dev string, info *client.ResponseInfo) { +func printInfo(via string, from string, info *client.ResponseInfo) { slog.Info("device", - "dev", dev, + "via", via, + "from", from, "board_id", hex.EncodeToString(info.BoardID[:]), "mac", net.HardwareAddr(info.MAC[:]).String(), "ip", net.IP(info.IP[:]).String(), @@ -86,8 +87,8 @@ func cmdInfo(args []string) error { if len(infos) == 0 { return fmt.Errorf("no devices responded") } - for _, info := range infos { - printInfo(net.IP(info.IP[:]).String(), info) + for _, r := range infos { + printInfo(*udpAddr, r.From, r.Value) } return nil } @@ -126,7 +127,7 @@ func cmdInfo(args []string) error { slog.Error("device error", "dev", r.dev, "err", r.err) continue } - printInfo(r.dev, r.info) + printInfo(r.dev, r.dev, r.info) } return nil diff --git a/lib/client/client.go b/lib/client/client.go index 18bc917..031f0aa 100644 --- a/lib/client/client.go +++ b/lib/client/client.go @@ -7,7 +7,6 @@ import ( "github.com/theater/picomap/lib/halfsiphash" "github.com/theater/picomap/lib/msgpack" - "io" ) var HashKey = [8]byte{} @@ -15,7 +14,7 @@ var HashKey = [8]byte{} type transport interface { Send(data []byte) error SetReadTimeout(timeout time.Duration) - Reader() io.Reader + Recv() (data []byte, from string, err error) Broadcast() bool Close() error } @@ -45,20 +44,28 @@ func (c *Client) send(msg any) (uint32, error) { return id, c.transport.Send(data) } -func roundTrip[T any](c *Client, req any) ([]*T, error) { +type Response[T any] struct { + From string + Value *T +} + +func roundTrip[T any](c *Client, req any) ([]Response[T], error) { id, err := c.send(req) if err != nil { return nil, err } c.transport.SetReadTimeout(c.timeout) - dec := msgpack.NewDecoder(c.transport.Reader()) broadcast := c.transport.Broadcast() - var results []*T + var results []Response[T] for { - var env Envelope - if err := dec.Decode(&env); err != nil { + data, from, err := c.transport.Recv() + if err != nil { break } + var env Envelope + if err := msgpack.Unmarshal(data, &env); err != nil { + continue + } if env.MessageID != id { continue } @@ -74,7 +81,7 @@ func roundTrip[T any](c *Client, req any) ([]*T, error) { return nil, devErr } if typed, ok := inner.(*T); ok { - results = append(results, typed) + results = append(results, Response[T]{From: from, Value: typed}) if !broadcast { break } @@ -83,14 +90,14 @@ func roundTrip[T any](c *Client, req any) ([]*T, error) { return results, nil } -func first[T any](results []*T, err error) (*T, error) { +func first[T any](results []Response[T], err error) (*T, error) { if err != nil { return nil, err } if len(results) == 0 { return nil, fmt.Errorf("no response") } - return results[0], nil + return results[0].Value, nil } func (c *Client) PICOBOOT() error { @@ -102,7 +109,7 @@ func (c *Client) Info() (*ResponseInfo, error) { return first(roundTrip[ResponseInfo](c, &RequestInfo{})) } -func (c *Client) InfoAll() ([]*ResponseInfo, error) { +func (c *Client) InfoAll() ([]Response[ResponseInfo], error) { return roundTrip[ResponseInfo](c, &RequestInfo{}) } diff --git a/lib/client/serial.go b/lib/client/serial.go index 9ac0503..3512cf3 100644 --- a/lib/client/serial.go +++ b/lib/client/serial.go @@ -2,11 +2,11 @@ package client import ( "fmt" - "io" "slices" "strings" "time" + "github.com/theater/picomap/lib/msgpack" "go.bug.st/serial" "go.bug.st/serial/enumerator" ) @@ -37,7 +37,9 @@ func ListSerial() ([]string, error) { } type serialTransport struct { - port serial.Port + port serial.Port + portName string + dec *msgpack.Decoder } func NewSerial(portName string, timeout time.Duration) (*Client, error) { @@ -45,7 +47,8 @@ func NewSerial(portName string, timeout time.Duration) (*Client, error) { if err != nil { return nil, fmt.Errorf("opening %s: %w", portName, err) } - return &Client{transport: &serialTransport{port: port}, timeout: timeout}, nil + t := &serialTransport{port: port, portName: portName, dec: msgpack.NewDecoder(port)} + return &Client{transport: t, timeout: timeout}, nil } func (t *serialTransport) Send(data []byte) error { @@ -57,8 +60,12 @@ func (t *serialTransport) SetReadTimeout(timeout time.Duration) { t.port.SetReadTimeout(timeout) } -func (t *serialTransport) Reader() io.Reader { - return t.port +func (t *serialTransport) Recv() ([]byte, string, error) { + var raw msgpack.RawMessage + if err := t.dec.Decode(&raw); err != nil { + return nil, "", err + } + return []byte(raw), t.portName, nil } func (t *serialTransport) Broadcast() bool { return false } diff --git a/lib/client/udp.go b/lib/client/udp.go index 9985bc4..e9dc0ec 100644 --- a/lib/client/udp.go +++ b/lib/client/udp.go @@ -1,9 +1,7 @@ package client import ( - "bytes" "fmt" - "io" "net" "syscall" "time" @@ -17,7 +15,6 @@ type udpTransport struct { conn *net.UDPConn addr *net.UDPAddr broadcast bool - buf bytes.Buffer } func interfaceIPv4Net(name string) (net.IP, *net.IPNet, error) { @@ -117,21 +114,13 @@ func (t *udpTransport) SetReadTimeout(timeout time.Duration) { t.conn.SetReadDeadline(time.Now().Add(timeout)) } -func (t *udpTransport) Reader() io.Reader { - return t -} - -func (t *udpTransport) Read(p []byte) (int, error) { - if t.buf.Len() > 0 { - return t.buf.Read(p) - } - pkt := make([]byte, 1500) - n, err := t.conn.Read(pkt) +func (t *udpTransport) Recv() ([]byte, string, error) { + buf := make([]byte, 1500) + n, addr, err := t.conn.ReadFromUDP(buf) if err != nil { - return 0, err + return nil, "", err } - t.buf.Write(pkt[:n]) - return t.buf.Read(p) + return buf[:n], addr.IP.String(), nil } func (t *udpTransport) Broadcast() bool { return t.broadcast }