From 3ccee958040886d54417e72b4a8249af727b8391 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Thu, 29 Jan 2026 21:44:39 -0800 Subject: [PATCH] Initial multicast library with IGMP query response and periodic advertisements Co-Authored-By: Claude Opus 4.5 --- go.mod | 10 +++ go.sum | 18 ++++ listener.go | 124 ++++++++++++++++++++++++++ multicast.go | 248 +++++++++++++++++++++++++++++++++++++++++++++++++++ querier.go | 89 ++++++++++++++++++ 5 files changed, 489 insertions(+) create mode 100644 go.mod create mode 100644 go.sum create mode 100644 listener.go create mode 100644 multicast.go create mode 100644 querier.go diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..12140df --- /dev/null +++ b/go.mod @@ -0,0 +1,10 @@ +module github.com/gopatchy/multicast + +go 1.23.0 + +require ( + github.com/google/gopacket v1.1.19 + golang.org/x/net v0.34.0 +) + +require golang.org/x/sys v0.29.0 // indirect diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..a0b9556 --- /dev/null +++ b/go.sum @@ -0,0 +1,18 @@ +github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8= +github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= +golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= +golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= +golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/listener.go b/listener.go new file mode 100644 index 0000000..eb22ad4 --- /dev/null +++ b/listener.go @@ -0,0 +1,124 @@ +package multicast + +import ( + "context" + "net" + "time" + + "github.com/google/gopacket" + "github.com/google/gopacket/layers" + "github.com/google/gopacket/pcap" +) + +type JoinLeaveHandler func(sourceIP, groupIP net.IP, join bool) + +type Listener struct { + iface *net.Interface + handle *pcap.Handle + joinHandler JoinLeaveHandler +} + +func NewListener(iface *net.Interface, joinHandler JoinLeaveHandler) (*Listener, error) { + handle, err := pcap.OpenLive(iface.Name, 65536, true, 5*time.Second) + if err != nil { + return nil, err + } + + if err := handle.SetBPFFilter("igmp"); err != nil { + handle.Close() + return nil, err + } + + return &Listener{ + iface: iface, + handle: handle, + joinHandler: joinHandler, + }, nil +} + +func (l *Listener) Run(ctx context.Context) { + defer l.handle.Close() + + packetSource := gopacket.NewPacketSource(l.handle, l.handle.LinkType()) + packets := packetSource.Packets() + + for { + select { + case <-ctx.Done(): + return + case packet, ok := <-packets: + if !ok { + return + } + l.handlePacket(packet) + } + } +} + +func (l *Listener) Close() { + l.handle.Close() +} + +func (l *Listener) handlePacket(packet gopacket.Packet) { + ipLayer := packet.Layer(layers.LayerTypeIPv4) + if ipLayer == nil { + return + } + ip := ipLayer.(*layers.IPv4) + sourceIP := ip.SrcIP + + igmpLayer := packet.Layer(layers.LayerTypeIGMP) + if igmpLayer == nil { + return + } + + switch igmp := igmpLayer.(type) { + case *layers.IGMPv1or2: + l.handleIGMPv1or2(sourceIP, igmp) + case *layers.IGMP: + l.handleIGMPv3(sourceIP, igmp) + } +} + +func (l *Listener) handleIGMPv1or2(sourceIP net.IP, igmp *layers.IGMPv1or2) { + switch igmp.Type { + case layers.IGMPMembershipReportV1, layers.IGMPMembershipReportV2: + groupIP := igmp.GroupAddress + if !groupIP.IsMulticast() || groupIP.IsLinkLocalMulticast() { + return + } + if l.joinHandler != nil { + l.joinHandler(sourceIP, groupIP, true) + } + + case layers.IGMPLeaveGroup: + groupIP := igmp.GroupAddress + if l.joinHandler != nil { + l.joinHandler(sourceIP, groupIP, false) + } + } +} + +func (l *Listener) handleIGMPv3(sourceIP net.IP, igmp *layers.IGMP) { + if igmp.Type != layers.IGMPMembershipReportV3 { + return + } + + for _, record := range igmp.GroupRecords { + groupIP := record.MulticastAddress + if !groupIP.IsMulticast() || groupIP.IsLinkLocalMulticast() { + continue + } + + switch record.Type { + case layers.IGMPIsEx, layers.IGMPToEx, layers.IGMPIsIn, layers.IGMPToIn: + if l.joinHandler != nil { + l.joinHandler(sourceIP, groupIP, true) + } + case layers.IGMPBlock: + if l.joinHandler != nil { + l.joinHandler(sourceIP, groupIP, false) + } + } + } +} diff --git a/multicast.go b/multicast.go new file mode 100644 index 0000000..05c4232 --- /dev/null +++ b/multicast.go @@ -0,0 +1,248 @@ +package multicast + +import ( + "context" + "math/rand" + "net" + "time" + + "github.com/google/gopacket" + "github.com/google/gopacket/layers" + "github.com/google/gopacket/pcap" + "golang.org/x/net/ipv4" +) + +type Conn struct { + *ipv4.PacketConn + rawConn net.PacketConn + iface *net.Interface + groupIP net.IP + srcIP net.IP + srcMAC net.HardwareAddr + queryChan chan struct{} + ctx context.Context + cancel context.CancelFunc +} + +func ListenMulticastUDP(network string, iface *net.Interface, gaddr *net.UDPAddr) (*Conn, error) { + srcIP, _ := getInterfaceIPv4(iface) + + c, err := net.ListenPacket(network, gaddr.String()) + if err != nil { + return nil, err + } + + p := ipv4.NewPacketConn(c) + if iface != nil { + p.SetMulticastInterface(iface) + } + + if err := p.JoinGroup(iface, gaddr); err != nil { + c.Close() + return nil, err + } + + ctx, cancel := context.WithCancel(context.Background()) + + conn := &Conn{ + PacketConn: p, + rawConn: c, + iface: iface, + groupIP: gaddr.IP, + srcIP: srcIP, + srcMAC: iface.HardwareAddr, + queryChan: make(chan struct{}, 1), + ctx: ctx, + cancel: cancel, + } + + go conn.runAdvertiser() + go conn.listenForQueries() + + return conn, nil +} + +func (c *Conn) Close() error { + c.cancel() + c.PacketConn.LeaveGroup(c.iface, &net.UDPAddr{IP: c.groupIP}) + return c.rawConn.Close() +} + +func (c *Conn) RawConn() net.PacketConn { + return c.rawConn +} + +func (c *Conn) runAdvertiser() { + ticker := time.NewTicker(60 * time.Second) + defer ticker.Stop() + + c.sendReport() + + for { + select { + case <-c.ctx.Done(): + return + case <-c.queryChan: + delay := time.Duration(rand.Intn(1000)) * time.Millisecond + time.Sleep(delay) + c.sendReport() + case <-ticker.C: + c.sendReport() + } + } +} + +func (c *Conn) listenForQueries() { + handle, err := pcap.OpenLive(c.iface.Name, 65536, true, 5*time.Second) + if err != nil { + return + } + defer handle.Close() + + handle.SetBPFFilter("igmp") + + packetSource := gopacket.NewPacketSource(handle, handle.LinkType()) + packets := packetSource.Packets() + + for { + select { + case <-c.ctx.Done(): + return + case packet, ok := <-packets: + if !ok { + return + } + if c.isQuery(packet) { + select { + case c.queryChan <- struct{}{}: + default: + } + } + } + } +} + +func (c *Conn) isQuery(packet gopacket.Packet) bool { + igmpLayer := packet.Layer(layers.LayerTypeIGMP) + if igmpLayer == nil { + return false + } + + switch igmp := igmpLayer.(type) { + case *layers.IGMPv1or2: + if igmp.Type == layers.IGMPMembershipQuery { + return igmp.GroupAddress.IsUnspecified() || igmp.GroupAddress.Equal(c.groupIP) + } + case *layers.IGMP: + if igmp.Type == layers.IGMPMembershipQuery { + return true + } + } + return false +} + +func (c *Conn) sendReport() { + if c.srcIP == nil { + return + } + + handle, err := pcap.OpenLive(c.iface.Name, 65536, true, pcap.BlockForever) + if err != nil { + return + } + defer handle.Close() + + eth := &layers.Ethernet{ + SrcMAC: c.srcMAC, + DstMAC: multicastIPToMAC(c.groupIP), + EthernetType: layers.EthernetTypeIPv4, + } + + ip := &layers.IPv4{ + Version: 4, + IHL: 6, + TTL: 1, + Protocol: layers.IPProtocolIGMP, + SrcIP: c.srcIP, + DstIP: c.groupIP, + Options: []layers.IPv4Option{{OptionType: 148, OptionLength: 4, OptionData: []byte{0, 0}}}, + } + + buf := gopacket.NewSerializeBuffer() + opts := gopacket.SerializeOptions{ComputeChecksums: true, FixLengths: true} + gopacket.SerializeLayers(buf, opts, eth, ip, gopacket.Payload(buildIGMPv2Report(c.groupIP))) + handle.WritePacketData(buf.Bytes()) +} + +func buildIGMPv2Report(groupIP net.IP) []byte { + data := make([]byte, 8) + data[0] = 0x16 + data[1] = 0 + + ip4 := groupIP.To4() + if ip4 != nil { + copy(data[4:8], ip4) + } + + checksum := igmpChecksum(data) + data[2] = byte(checksum >> 8) + data[3] = byte(checksum) + + return data +} + +func buildIGMPQuery() []byte { + data := make([]byte, 8) + data[0] = 0x11 + data[1] = 100 + + checksum := igmpChecksum(data) + data[2] = byte(checksum >> 8) + data[3] = byte(checksum) + + return data +} + +func igmpChecksum(data []byte) uint16 { + var sum uint32 + for i := 0; i < len(data)-1; i += 2 { + sum += uint32(data[i])<<8 | uint32(data[i+1]) + } + if len(data)%2 == 1 { + sum += uint32(data[len(data)-1]) << 8 + } + for sum > 0xffff { + sum = (sum & 0xffff) + (sum >> 16) + } + return ^uint16(sum) +} + +func multicastIPToMAC(ip net.IP) net.HardwareAddr { + ip4 := ip.To4() + if ip4 == nil { + return net.HardwareAddr{0x01, 0x00, 0x5e, 0x00, 0x00, 0x01} + } + return net.HardwareAddr{ + 0x01, 0x00, 0x5e, + ip4[1] & 0x7f, + ip4[2], + ip4[3], + } +} + +func getInterfaceIPv4(iface *net.Interface) (net.IP, error) { + addrs, err := iface.Addrs() + if err != nil { + return nil, err + } + + for _, addr := range addrs { + if ipNet, ok := addr.(*net.IPNet); ok { + if ip4 := ipNet.IP.To4(); ip4 != nil { + return ip4, nil + } + } + } + + return nil, nil +} diff --git a/querier.go b/querier.go new file mode 100644 index 0000000..7ecb7f7 --- /dev/null +++ b/querier.go @@ -0,0 +1,89 @@ +package multicast + +import ( + "context" + "net" + "time" + + "github.com/google/gopacket" + "github.com/google/gopacket/layers" + "github.com/google/gopacket/pcap" +) + +type Querier struct { + iface *net.Interface + srcIP net.IP + srcMAC net.HardwareAddr +} + +func NewQuerier(iface *net.Interface) (*Querier, error) { + srcIP, err := getInterfaceIPv4(iface) + if err != nil { + return nil, err + } + if srcIP == nil { + return nil, nil + } + + return &Querier{ + iface: iface, + srcIP: srcIP, + srcMAC: iface.HardwareAddr, + }, nil +} + +func (q *Querier) Run(ctx context.Context) { + if q == nil { + return + } + + ticker := time.NewTicker(60 * time.Second) + defer ticker.Stop() + + q.SendQuery() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + q.SendQuery() + } + } +} + +func (q *Querier) SendQuery() { + handle, err := pcap.OpenLive(q.iface.Name, 65536, true, pcap.BlockForever) + if err != nil { + return + } + defer handle.Close() + + eth := &layers.Ethernet{ + SrcMAC: q.srcMAC, + DstMAC: net.HardwareAddr{0x01, 0x00, 0x5e, 0x00, 0x00, 0x01}, + EthernetType: layers.EthernetTypeIPv4, + } + + ip := &layers.IPv4{ + Version: 4, + IHL: 6, + TTL: 1, + Protocol: layers.IPProtocolIGMP, + SrcIP: q.srcIP, + DstIP: net.IPv4(224, 0, 0, 1), + Options: []layers.IPv4Option{{OptionType: 148, OptionLength: 4, OptionData: []byte{0, 0}}}, + } + + igmpPayload := buildIGMPQuery() + + buf := gopacket.NewSerializeBuffer() + opts := gopacket.SerializeOptions{ComputeChecksums: true, FixLengths: true} + + if err := gopacket.SerializeLayers(buf, opts, eth, ip, gopacket.Payload(igmpPayload)); err != nil { + return + } + + handle.WritePacketData(buf.Bytes()) +} +