Per-packet source tracking in transport, via/from in info output

This commit is contained in:
Ian Gulliver
2026-04-07 22:30:26 +09:00
parent f96ed20aa0
commit 3d20bf4c33
4 changed files with 41 additions and 37 deletions

View File

@@ -50,9 +50,10 @@ type deviceResult struct {
err error err error
} }
func printInfo(dev string, info *client.ResponseInfo) { func printInfo(via string, from string, info *client.ResponseInfo) {
slog.Info("device", slog.Info("device",
"dev", dev, "via", via,
"from", from,
"board_id", hex.EncodeToString(info.BoardID[:]), "board_id", hex.EncodeToString(info.BoardID[:]),
"mac", net.HardwareAddr(info.MAC[:]).String(), "mac", net.HardwareAddr(info.MAC[:]).String(),
"ip", net.IP(info.IP[:]).String(), "ip", net.IP(info.IP[:]).String(),
@@ -86,8 +87,8 @@ func cmdInfo(args []string) error {
if len(infos) == 0 { if len(infos) == 0 {
return fmt.Errorf("no devices responded") return fmt.Errorf("no devices responded")
} }
for _, info := range infos { for _, r := range infos {
printInfo(net.IP(info.IP[:]).String(), info) printInfo(*udpAddr, r.From, r.Value)
} }
return nil return nil
} }
@@ -126,7 +127,7 @@ func cmdInfo(args []string) error {
slog.Error("device error", "dev", r.dev, "err", r.err) slog.Error("device error", "dev", r.dev, "err", r.err)
continue continue
} }
printInfo(r.dev, r.info) printInfo(r.dev, r.dev, r.info)
} }
return nil return nil

View File

@@ -7,7 +7,6 @@ import (
"github.com/theater/picomap/lib/halfsiphash" "github.com/theater/picomap/lib/halfsiphash"
"github.com/theater/picomap/lib/msgpack" "github.com/theater/picomap/lib/msgpack"
"io"
) )
var HashKey = [8]byte{} var HashKey = [8]byte{}
@@ -15,7 +14,7 @@ var HashKey = [8]byte{}
type transport interface { type transport interface {
Send(data []byte) error Send(data []byte) error
SetReadTimeout(timeout time.Duration) SetReadTimeout(timeout time.Duration)
Reader() io.Reader Recv() (data []byte, from string, err error)
Broadcast() bool Broadcast() bool
Close() error Close() error
} }
@@ -45,20 +44,28 @@ func (c *Client) send(msg any) (uint32, error) {
return id, c.transport.Send(data) return id, c.transport.Send(data)
} }
func roundTrip[T any](c *Client, req any) ([]*T, error) { type Response[T any] struct {
From string
Value *T
}
func roundTrip[T any](c *Client, req any) ([]Response[T], error) {
id, err := c.send(req) id, err := c.send(req)
if err != nil { if err != nil {
return nil, err return nil, err
} }
c.transport.SetReadTimeout(c.timeout) c.transport.SetReadTimeout(c.timeout)
dec := msgpack.NewDecoder(c.transport.Reader())
broadcast := c.transport.Broadcast() broadcast := c.transport.Broadcast()
var results []*T var results []Response[T]
for { for {
var env Envelope data, from, err := c.transport.Recv()
if err := dec.Decode(&env); err != nil { if err != nil {
break break
} }
var env Envelope
if err := msgpack.Unmarshal(data, &env); err != nil {
continue
}
if env.MessageID != id { if env.MessageID != id {
continue continue
} }
@@ -74,7 +81,7 @@ func roundTrip[T any](c *Client, req any) ([]*T, error) {
return nil, devErr return nil, devErr
} }
if typed, ok := inner.(*T); ok { if typed, ok := inner.(*T); ok {
results = append(results, typed) results = append(results, Response[T]{From: from, Value: typed})
if !broadcast { if !broadcast {
break break
} }
@@ -83,14 +90,14 @@ func roundTrip[T any](c *Client, req any) ([]*T, error) {
return results, nil return results, nil
} }
func first[T any](results []*T, err error) (*T, error) { func first[T any](results []Response[T], err error) (*T, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
if len(results) == 0 { if len(results) == 0 {
return nil, fmt.Errorf("no response") return nil, fmt.Errorf("no response")
} }
return results[0], nil return results[0].Value, nil
} }
func (c *Client) PICOBOOT() error { func (c *Client) PICOBOOT() error {
@@ -102,7 +109,7 @@ func (c *Client) Info() (*ResponseInfo, error) {
return first(roundTrip[ResponseInfo](c, &RequestInfo{})) return first(roundTrip[ResponseInfo](c, &RequestInfo{}))
} }
func (c *Client) InfoAll() ([]*ResponseInfo, error) { func (c *Client) InfoAll() ([]Response[ResponseInfo], error) {
return roundTrip[ResponseInfo](c, &RequestInfo{}) return roundTrip[ResponseInfo](c, &RequestInfo{})
} }

View File

@@ -2,11 +2,11 @@ package client
import ( import (
"fmt" "fmt"
"io"
"slices" "slices"
"strings" "strings"
"time" "time"
"github.com/theater/picomap/lib/msgpack"
"go.bug.st/serial" "go.bug.st/serial"
"go.bug.st/serial/enumerator" "go.bug.st/serial/enumerator"
) )
@@ -37,7 +37,9 @@ func ListSerial() ([]string, error) {
} }
type serialTransport struct { type serialTransport struct {
port serial.Port port serial.Port
portName string
dec *msgpack.Decoder
} }
func NewSerial(portName string, timeout time.Duration) (*Client, error) { func NewSerial(portName string, timeout time.Duration) (*Client, error) {
@@ -45,7 +47,8 @@ func NewSerial(portName string, timeout time.Duration) (*Client, error) {
if err != nil { if err != nil {
return nil, fmt.Errorf("opening %s: %w", portName, err) return nil, fmt.Errorf("opening %s: %w", portName, err)
} }
return &Client{transport: &serialTransport{port: port}, timeout: timeout}, nil t := &serialTransport{port: port, portName: portName, dec: msgpack.NewDecoder(port)}
return &Client{transport: t, timeout: timeout}, nil
} }
func (t *serialTransport) Send(data []byte) error { func (t *serialTransport) Send(data []byte) error {
@@ -57,8 +60,12 @@ func (t *serialTransport) SetReadTimeout(timeout time.Duration) {
t.port.SetReadTimeout(timeout) t.port.SetReadTimeout(timeout)
} }
func (t *serialTransport) Reader() io.Reader { func (t *serialTransport) Recv() ([]byte, string, error) {
return t.port var raw msgpack.RawMessage
if err := t.dec.Decode(&raw); err != nil {
return nil, "", err
}
return []byte(raw), t.portName, nil
} }
func (t *serialTransport) Broadcast() bool { return false } func (t *serialTransport) Broadcast() bool { return false }

View File

@@ -1,9 +1,7 @@
package client package client
import ( import (
"bytes"
"fmt" "fmt"
"io"
"net" "net"
"syscall" "syscall"
"time" "time"
@@ -17,7 +15,6 @@ type udpTransport struct {
conn *net.UDPConn conn *net.UDPConn
addr *net.UDPAddr addr *net.UDPAddr
broadcast bool broadcast bool
buf bytes.Buffer
} }
func interfaceIPv4Net(name string) (net.IP, *net.IPNet, error) { func interfaceIPv4Net(name string) (net.IP, *net.IPNet, error) {
@@ -117,21 +114,13 @@ func (t *udpTransport) SetReadTimeout(timeout time.Duration) {
t.conn.SetReadDeadline(time.Now().Add(timeout)) t.conn.SetReadDeadline(time.Now().Add(timeout))
} }
func (t *udpTransport) Reader() io.Reader { func (t *udpTransport) Recv() ([]byte, string, error) {
return t buf := make([]byte, 1500)
} n, addr, err := t.conn.ReadFromUDP(buf)
func (t *udpTransport) Read(p []byte) (int, error) {
if t.buf.Len() > 0 {
return t.buf.Read(p)
}
pkt := make([]byte, 1500)
n, err := t.conn.Read(pkt)
if err != nil { if err != nil {
return 0, err return nil, "", err
} }
t.buf.Write(pkt[:n]) return buf[:n], addr.IP.String(), nil
return t.buf.Read(p)
} }
func (t *udpTransport) Broadcast() bool { return t.broadcast } func (t *udpTransport) Broadcast() bool { return t.broadcast }