add igmp querier for multicast group membership tracking

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Ian Gulliver
2026-01-22 23:46:56 -08:00
parent 395180493a
commit 7bced7b350
5 changed files with 378 additions and 17 deletions

View File

@@ -11,11 +11,13 @@ func main() {
noARP := flag.Bool("no-arp", false, "disable ARP discovery") noARP := flag.Bool("no-arp", false, "disable ARP discovery")
noLLDP := flag.Bool("no-lldp", false, "disable LLDP discovery") noLLDP := flag.Bool("no-lldp", false, "disable LLDP discovery")
noSNMP := flag.Bool("no-snmp", false, "disable SNMP discovery") noSNMP := flag.Bool("no-snmp", false, "disable SNMP discovery")
noIGMP := flag.Bool("no-igmp", false, "disable IGMP querier")
logEvents := flag.Bool("log-events", false, "log node events") logEvents := flag.Bool("log-events", false, "log node events")
logNodes := flag.Bool("log-nodes", false, "log full node details on changes") logNodes := flag.Bool("log-nodes", false, "log full node details on changes")
debugARP := flag.Bool("debug-arp", false, "debug ARP discovery") debugARP := flag.Bool("debug-arp", false, "debug ARP discovery")
debugLLDP := flag.Bool("debug-lldp", false, "debug LLDP discovery") debugLLDP := flag.Bool("debug-lldp", false, "debug LLDP discovery")
debugSNMP := flag.Bool("debug-snmp", false, "debug SNMP discovery") debugSNMP := flag.Bool("debug-snmp", false, "debug SNMP discovery")
debugIGMP := flag.Bool("debug-igmp", false, "debug IGMP querier")
flag.Parse() flag.Parse()
t := tendrils.New() t := tendrils.New()
@@ -23,10 +25,12 @@ func main() {
t.DisableARP = *noARP t.DisableARP = *noARP
t.DisableLLDP = *noLLDP t.DisableLLDP = *noLLDP
t.DisableSNMP = *noSNMP t.DisableSNMP = *noSNMP
t.DisableIGMP = *noIGMP
t.LogEvents = *logEvents t.LogEvents = *logEvents
t.LogNodes = *logNodes t.LogNodes = *logNodes
t.DebugARP = *debugARP t.DebugARP = *debugARP
t.DebugLLDP = *debugLLDP t.DebugLLDP = *debugLLDP
t.DebugSNMP = *debugSNMP t.DebugSNMP = *debugSNMP
t.DebugIGMP = *debugIGMP
t.Run() t.Run()
} }

228
igmp.go Normal file
View File

@@ -0,0 +1,228 @@
package tendrils
import (
"context"
"log"
"net"
"time"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
)
func (t *Tendrils) listenIGMP(ctx context.Context, iface net.Interface) {
handle, err := pcap.OpenLive(iface.Name, 65536, true, 5*time.Second)
if err != nil {
log.Printf("[ERROR] failed to open interface %s for igmp: %v", iface.Name, err)
return
}
defer handle.Close()
if err := handle.SetBPFFilter("igmp"); err != nil {
log.Printf("[ERROR] failed to set igmp filter on %s: %v", iface.Name, err)
return
}
go t.runIGMPQuerier(ctx, iface)
packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
packets := packetSource.Packets()
for {
select {
case <-ctx.Done():
return
case packet, ok := <-packets:
if !ok {
return
}
t.handleIGMPPacket(iface.Name, packet)
}
}
}
func (t *Tendrils) handleIGMPPacket(ifaceName string, packet gopacket.Packet) {
ipLayer := packet.Layer(layers.LayerTypeIPv4)
if ipLayer == nil {
return
}
ip := ipLayer.(*layers.IPv4)
sourceIP := ip.SrcIP
igmpLayer := packet.Layer(layers.LayerTypeIGMP)
if igmpLayer == nil {
return
}
switch igmp := igmpLayer.(type) {
case *layers.IGMPv1or2:
t.handleIGMPv1or2(ifaceName, sourceIP, igmp)
case *layers.IGMP:
t.handleIGMPv3(ifaceName, sourceIP, igmp)
}
}
func (t *Tendrils) handleIGMPv1or2(ifaceName string, sourceIP net.IP, igmp *layers.IGMPv1or2) {
switch igmp.Type {
case layers.IGMPMembershipReportV1, layers.IGMPMembershipReportV2:
groupIP := igmp.GroupAddress
if !groupIP.IsMulticast() || groupIP.IsLinkLocalMulticast() {
return
}
if t.DebugIGMP {
log.Printf("[igmp] %s: join %s -> %s", ifaceName, sourceIP, groupIP)
}
t.nodes.UpdateMulticastMembership(sourceIP, groupIP)
case layers.IGMPLeaveGroup:
groupIP := igmp.GroupAddress
if t.DebugIGMP {
log.Printf("[igmp] %s: leave %s -> %s", ifaceName, sourceIP, groupIP)
}
t.nodes.RemoveMulticastMembership(sourceIP, groupIP)
}
}
func (t *Tendrils) handleIGMPv3(ifaceName string, sourceIP net.IP, igmp *layers.IGMP) {
if igmp.Type != layers.IGMPMembershipReportV3 {
return
}
for _, record := range igmp.GroupRecords {
groupIP := record.MulticastAddress
if !groupIP.IsMulticast() || groupIP.IsLinkLocalMulticast() {
continue
}
switch record.Type {
case layers.IGMPIsEx, layers.IGMPToEx, layers.IGMPIsIn, layers.IGMPToIn:
if t.DebugIGMP {
log.Printf("[igmp] %s: v3 join %s -> %s", ifaceName, sourceIP, groupIP)
}
t.nodes.UpdateMulticastMembership(sourceIP, groupIP)
case layers.IGMPBlock:
if t.DebugIGMP {
log.Printf("[igmp] %s: v3 leave %s -> %s", ifaceName, sourceIP, groupIP)
}
t.nodes.RemoveMulticastMembership(sourceIP, groupIP)
}
}
}
func (t *Tendrils) runIGMPQuerier(ctx context.Context, iface net.Interface) {
addrs, err := iface.Addrs()
if err != nil {
return
}
var srcIP net.IP
for _, addr := range addrs {
if ipnet, ok := addr.(*net.IPNet); ok && ipnet.IP.To4() != nil {
srcIP = ipnet.IP.To4()
break
}
}
if srcIP == nil {
return
}
ticker := time.NewTicker(60 * time.Second)
defer ticker.Stop()
t.sendIGMPQuery(iface.Name, srcIP)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
t.sendIGMPQuery(iface.Name, srcIP)
}
}
}
func (t *Tendrils) sendIGMPQuery(ifaceName string, srcIP net.IP) {
handle, err := pcap.OpenLive(ifaceName, 65536, true, pcap.BlockForever)
if err != nil {
return
}
defer handle.Close()
eth := &layers.Ethernet{
SrcMAC: t.getInterfaceMAC(ifaceName),
DstMAC: net.HardwareAddr{0x01, 0x00, 0x5e, 0x00, 0x00, 0x01},
EthernetType: layers.EthernetTypeIPv4,
}
ip := &layers.IPv4{
Version: 4,
IHL: 6,
TTL: 1,
Protocol: layers.IPProtocolIGMP,
SrcIP: srcIP,
DstIP: net.IPv4(224, 0, 0, 1),
Options: []layers.IPv4Option{{OptionType: 148, OptionLength: 4, OptionData: []byte{0, 0}}},
}
igmpPayload := buildIGMPQuery()
buf := gopacket.NewSerializeBuffer()
opts := gopacket.SerializeOptions{ComputeChecksums: true, FixLengths: true}
if err := gopacket.SerializeLayers(buf, opts, eth, ip, gopacket.Payload(igmpPayload)); err != nil {
if t.DebugIGMP {
log.Printf("[igmp] %s: failed to serialize query: %v", ifaceName, err)
}
return
}
if err := handle.WritePacketData(buf.Bytes()); err != nil {
if t.DebugIGMP {
log.Printf("[igmp] %s: failed to send query: %v", ifaceName, err)
}
return
}
if t.DebugIGMP {
log.Printf("[igmp] %s: sent general query", ifaceName)
}
}
func buildIGMPQuery() []byte {
data := make([]byte, 8)
data[0] = 0x11
data[1] = 100
data[4] = 0
data[5] = 0
data[6] = 0
data[7] = 0
checksum := igmpChecksum(data)
data[2] = byte(checksum >> 8)
data[3] = byte(checksum)
return data
}
func igmpChecksum(data []byte) uint16 {
var sum uint32
for i := 0; i < len(data)-1; i += 2 {
sum += uint32(data[i])<<8 | uint32(data[i+1])
}
if len(data)%2 == 1 {
sum += uint32(data[len(data)-1]) << 8
}
for sum > 0xffff {
sum = (sum & 0xffff) + (sum >> 16)
}
return ^uint16(sum)
}
func (t *Tendrils) getInterfaceMAC(ifaceName string) net.HardwareAddr {
iface, err := net.InterfaceByName(ifaceName)
if err != nil {
return net.HardwareAddr{0, 0, 0, 0, 0, 0}
}
return iface.HardwareAddr
}

123
nodes.go
View File

@@ -133,12 +133,41 @@ func (n *Node) String() string {
return joinParts(parts) return joinParts(parts)
} }
type MulticastGroup struct {
IP net.IP
}
func (g *MulticastGroup) Name() string {
ip := g.IP.To4()
if ip == nil {
return g.IP.String()
}
if ip[0] == 239 && ip[1] == 255 {
universe := int(ip[2])*256 + int(ip[3])
return fmt.Sprintf("sacn:%d", universe)
}
return g.IP.String()
}
type MulticastMembership struct {
Node *Node
LastSeen time.Time
}
type MulticastGroupMembers struct {
Group *MulticastGroup
Members map[string]*MulticastMembership // source IP -> membership
}
type Nodes struct { type Nodes struct {
mu sync.RWMutex mu sync.RWMutex
nodes map[int]*Node nodes map[int]*Node
ipIndex map[string]int ipIndex map[string]int
macIndex map[string]int macIndex map[string]int
nodeCancel map[int]context.CancelFunc nodeCancel map[int]context.CancelFunc
multicastGroups map[string]*MulticastGroupMembers // group IP string -> group with members
nextID int nextID int
t *Tendrils t *Tendrils
ctx context.Context ctx context.Context
@@ -152,6 +181,7 @@ func NewNodes(t *Tendrils) *Nodes {
ipIndex: map[string]int{}, ipIndex: map[string]int{},
macIndex: map[string]int{}, macIndex: map[string]int{},
nodeCancel: map[int]context.CancelFunc{}, nodeCancel: map[int]context.CancelFunc{},
multicastGroups: map[string]*MulticastGroupMembers{},
nextID: 1, nextID: 1,
t: t, t: t,
ctx: ctx, ctx: ctx,
@@ -433,6 +463,52 @@ func (n *Nodes) UpdateMACTable(node *Node, peerMAC net.HardwareAddr, ifaceName s
node.MACTable[peerMAC.String()] = ifaceName node.MACTable[peerMAC.String()] = ifaceName
} }
func (n *Nodes) UpdateMulticastMembership(sourceIP, groupIP net.IP) {
n.mu.Lock()
defer n.mu.Unlock()
node := n.getNodeByIPLocked(sourceIP)
groupKey := groupIP.String()
sourceKey := sourceIP.String()
gm := n.multicastGroups[groupKey]
if gm == nil {
gm = &MulticastGroupMembers{
Group: &MulticastGroup{IP: groupIP},
Members: map[string]*MulticastMembership{},
}
n.multicastGroups[groupKey] = gm
}
gm.Members[sourceKey] = &MulticastMembership{
Node: node,
LastSeen: time.Now(),
}
}
func (n *Nodes) RemoveMulticastMembership(sourceIP, groupIP net.IP) {
n.mu.Lock()
defer n.mu.Unlock()
groupKey := groupIP.String()
sourceKey := sourceIP.String()
if gm := n.multicastGroups[groupKey]; gm != nil {
delete(gm.Members, sourceKey)
if len(gm.Members) == 0 {
delete(n.multicastGroups, groupKey)
}
}
}
func (n *Nodes) getNodeByIPLocked(ip net.IP) *Node {
if id, exists := n.ipIndex[ip.String()]; exists {
return n.nodes[id]
}
return nil
}
func (n *Nodes) logNode(node *Node) { func (n *Nodes) logNode(node *Node) {
name := node.Name name := node.Name
if name == "" { if name == "" {
@@ -508,6 +584,53 @@ func (n *Nodes) LogAll() {
log.Printf("[sigusr1] %s", link) log.Printf("[sigusr1] %s", link)
} }
} }
n.expireMulticastMemberships()
if len(n.multicastGroups) > 0 {
var groups []*MulticastGroupMembers
for _, gm := range n.multicastGroups {
groups = append(groups, gm)
}
sort.Slice(groups, func(i, j int) bool {
return sortorder.NaturalLess(groups[i].Group.Name(), groups[j].Group.Name())
})
log.Printf("[sigusr1] ================ %d multicast groups ================", len(groups))
for _, gm := range groups {
var memberNames []string
for sourceIP, membership := range gm.Members {
var name string
if membership.Node != nil {
name = membership.Node.Name
if name == "" {
name = sourceIP
}
} else {
name = sourceIP
}
memberNames = append(memberNames, name)
}
sort.Slice(memberNames, func(i, j int) bool {
return sortorder.NaturalLess(memberNames[i], memberNames[j])
})
log.Printf("[sigusr1] %s: %v", gm.Group.Name(), memberNames)
}
}
}
func (n *Nodes) expireMulticastMemberships() {
expireTime := time.Now().Add(-5 * time.Minute)
for groupKey, gm := range n.multicastGroups {
for sourceKey, membership := range gm.Members {
if membership.LastSeen.Before(expireTime) {
delete(gm.Members, sourceKey)
}
}
if len(gm.Members) == 0 {
delete(n.multicastGroups, groupKey)
}
}
} }
type Link struct { type Link struct {

View File

@@ -517,3 +517,4 @@ func (t *Tendrils) getInterfaceNames(snmp *gosnmp.GoSNMP) map[int]string {
return names return names
} }

View File

@@ -18,11 +18,13 @@ type Tendrils struct {
DisableARP bool DisableARP bool
DisableLLDP bool DisableLLDP bool
DisableSNMP bool DisableSNMP bool
DisableIGMP bool
LogEvents bool LogEvents bool
LogNodes bool LogNodes bool
DebugARP bool DebugARP bool
DebugLLDP bool DebugLLDP bool
DebugSNMP bool DebugSNMP bool
DebugIGMP bool
} }
func New() *Tendrils { func New() *Tendrils {
@@ -178,4 +180,7 @@ func (t *Tendrils) startInterface(ctx context.Context, iface net.Interface) {
if !t.DisableLLDP { if !t.DisableLLDP {
go t.listenLLDP(ctx, iface) go t.listenLLDP(ctx, iface)
} }
if !t.DisableIGMP {
go t.listenIGMP(ctx, iface)
}
} }