diff --git a/adsbus/adsb.pb-c.c b/adsbus/adsb.pb-c.c index 1fbef9a..8381d08 100644 --- a/adsbus/adsb.pb-c.c +++ b/adsbus/adsb.pb-c.c @@ -136,6 +136,49 @@ void adsb__free_unpacked assert(message->base.descriptor == &adsb__descriptor); protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator); } +void adsb_stream__init + (AdsbStream *message) +{ + static AdsbStream init_value = ADSB_STREAM__INIT; + *message = init_value; +} +size_t adsb_stream__get_packed_size + (const AdsbStream *message) +{ + assert(message->base.descriptor == &adsb_stream__descriptor); + return protobuf_c_message_get_packed_size ((const ProtobufCMessage*)(message)); +} +size_t adsb_stream__pack + (const AdsbStream *message, + uint8_t *out) +{ + assert(message->base.descriptor == &adsb_stream__descriptor); + return protobuf_c_message_pack ((const ProtobufCMessage*)message, out); +} +size_t adsb_stream__pack_to_buffer + (const AdsbStream *message, + ProtobufCBuffer *buffer) +{ + assert(message->base.descriptor == &adsb_stream__descriptor); + return protobuf_c_message_pack_to_buffer ((const ProtobufCMessage*)message, buffer); +} +AdsbStream * + adsb_stream__unpack + (ProtobufCAllocator *allocator, + size_t len, + const uint8_t *data) +{ + return (AdsbStream *) + protobuf_c_message_unpack (&adsb_stream__descriptor, + allocator, len, data); +} +void adsb_stream__free_unpacked + (AdsbStream *message, + ProtobufCAllocator *allocator) +{ + assert(message->base.descriptor == &adsb_stream__descriptor); + protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator); +} static const ProtobufCFieldDescriptor adsb_header__field_descriptors[6] = { { @@ -380,3 +423,41 @@ const ProtobufCMessageDescriptor adsb__descriptor = (ProtobufCMessageInit) adsb__init, NULL,NULL,NULL /* reserved[123] */ }; +static const ProtobufCFieldDescriptor adsb_stream__field_descriptors[1] = +{ + { + "msg", + 1, + PROTOBUF_C_LABEL_REPEATED, + PROTOBUF_C_TYPE_MESSAGE, + offsetof(AdsbStream, n_msg), + offsetof(AdsbStream, msg), + &adsb__descriptor, + NULL, + 0, /* flags */ + 0,NULL,NULL /* reserved1,reserved2, etc */ + }, +}; +static const unsigned adsb_stream__field_indices_by_name[] = { + 0, /* field[0] = msg */ +}; +static const ProtobufCIntRange adsb_stream__number_ranges[1 + 1] = +{ + { 1, 0 }, + { 0, 1 } +}; +const ProtobufCMessageDescriptor adsb_stream__descriptor = +{ + PROTOBUF_C__MESSAGE_DESCRIPTOR_MAGIC, + "AdsbStream", + "AdsbStream", + "AdsbStream", + "", + sizeof(AdsbStream), + 1, + adsb_stream__field_descriptors, + adsb_stream__field_indices_by_name, + 1, adsb_stream__number_ranges, + (ProtobufCMessageInit) adsb_stream__init, + NULL,NULL,NULL /* reserved[123] */ +}; diff --git a/adsbus/adsb.pb-c.h b/adsbus/adsb.pb-c.h index 9b14fca..298b82c 100644 --- a/adsbus/adsb.pb-c.h +++ b/adsbus/adsb.pb-c.h @@ -18,6 +18,7 @@ PROTOBUF_C__BEGIN_DECLS typedef struct _AdsbHeader AdsbHeader; typedef struct _AdsbPacket AdsbPacket; typedef struct _Adsb Adsb; +typedef struct _AdsbStream AdsbStream; /* --- enums --- */ @@ -67,6 +68,17 @@ struct _Adsb , NULL, NULL, NULL } +struct _AdsbStream +{ + ProtobufCMessage base; + size_t n_msg; + Adsb **msg; +}; +#define ADSB_STREAM__INIT \ + { PROTOBUF_C_MESSAGE_INIT (&adsb_stream__descriptor) \ + , 0,NULL } + + /* AdsbHeader methods */ void adsb_header__init (AdsbHeader *message); @@ -124,6 +136,25 @@ Adsb * void adsb__free_unpacked (Adsb *message, ProtobufCAllocator *allocator); +/* AdsbStream methods */ +void adsb_stream__init + (AdsbStream *message); +size_t adsb_stream__get_packed_size + (const AdsbStream *message); +size_t adsb_stream__pack + (const AdsbStream *message, + uint8_t *out); +size_t adsb_stream__pack_to_buffer + (const AdsbStream *message, + ProtobufCBuffer *buffer); +AdsbStream * + adsb_stream__unpack + (ProtobufCAllocator *allocator, + size_t len, + const uint8_t *data); +void adsb_stream__free_unpacked + (AdsbStream *message, + ProtobufCAllocator *allocator); /* --- per-message closures --- */ typedef void (*AdsbHeader_Closure) @@ -135,6 +166,9 @@ typedef void (*AdsbPacket_Closure) typedef void (*Adsb_Closure) (const Adsb *message, void *closure_data); +typedef void (*AdsbStream_Closure) + (const AdsbStream *message, + void *closure_data); /* --- services --- */ @@ -144,6 +178,7 @@ typedef void (*Adsb_Closure) extern const ProtobufCMessageDescriptor adsb_header__descriptor; extern const ProtobufCMessageDescriptor adsb_packet__descriptor; extern const ProtobufCMessageDescriptor adsb__descriptor; +extern const ProtobufCMessageDescriptor adsb_stream__descriptor; PROTOBUF_C__END_DECLS diff --git a/adsbus/proto.c b/adsbus/proto.c index d862a76..8352fde 100644 --- a/adsbus/proto.c +++ b/adsbus/proto.c @@ -16,10 +16,6 @@ #define PROTO_MAGIC "aDsB" -struct __attribute__((packed)) proto_header { - uint32_t length; -}; - struct proto_parser_state { struct packet_mlat_state mlat_state; uint16_t mlat_timestamp_mhz; @@ -31,15 +27,18 @@ struct proto_parser_state { static Adsb *proto_prev = NULL; static struct buf proto_hello_buf = BUF_INIT; -static void proto_obj_to_buf(Adsb *wrapper, struct buf *buf) { +static void proto_obj_to_buf(ProtobufCMessage *obj, struct buf *buf) { assert(!buf->length); - struct proto_header *header = (struct proto_header *) buf_at(buf, 0); - assert(sizeof(*header) <= BUF_LEN_MAX); - size_t msg_len = adsb__get_packed_size(wrapper); - buf->length = sizeof(*header) + msg_len; - assert(buf->length <= BUF_LEN_MAX); - assert(adsb__pack(wrapper, (uint8_t *) buf_at(buf, sizeof(*header))) == msg_len); - header->length = htonl(msg_len); + assert(protobuf_c_message_get_packed_size(obj) <= BUF_LEN_MAX); + buf->length = protobuf_c_message_pack(obj, buf_at(buf, 0)); + assert(buf->length); +} + +static void proto_wrap_to_buf(Adsb *msg, struct buf *buf) { + AdsbStream wrapper = ADSB_STREAM__INIT; + wrapper.n_msg = 1; + wrapper.msg = &msg; + proto_obj_to_buf((struct ProtobufCMessage *) &wrapper, buf); } static void proto_serialize_packet(struct packet *packet, AdsbPacket *out, size_t len) { @@ -59,17 +58,66 @@ static void proto_serialize_packet(struct packet *packet, AdsbPacket *out, size_ static void proto_serialize_mode_s_short(struct packet *packet, struct buf *buf) { AdsbPacket packet_out = ADSB_PACKET__INIT; proto_serialize_packet(packet, &packet_out, 7); - Adsb wrapper = ADSB__INIT; - wrapper.mode_s_short = &packet_out; - proto_obj_to_buf(&wrapper, buf); + Adsb msg = ADSB__INIT; + msg.mode_s_short = &packet_out; + proto_wrap_to_buf(&msg, buf); } static void proto_serialize_mode_s_long(struct packet *packet, struct buf *buf) { AdsbPacket packet_out = ADSB_PACKET__INIT; proto_serialize_packet(packet, &packet_out, 14); - Adsb wrapper = ADSB__INIT; - wrapper.mode_s_long = &packet_out; - proto_obj_to_buf(&wrapper, buf); + Adsb msg = ADSB__INIT; + msg.mode_s_long = &packet_out; + proto_wrap_to_buf(&msg, buf); +} + +static ssize_t proto_parse_varint(const struct buf *buf, size_t *start) { + uint64_t value = 0; + uint16_t shift = 0; + for (; *start < buf->length; (*start)++) { + uint8_t c = buf_chr(buf, *start); + value |= ((uint64_t) c & 0x7f) << shift; + if (value > SSIZE_MAX) { + return -1; + } + if (!(c & 0x80)) { + (*start)++; + return (ssize_t) value; + } + shift += 7; + if (shift > 21) { + return -1; + } + } + return -1; +} + +static ssize_t proto_unwrap(const struct buf *wrapper, Adsb **msg) { + if (wrapper->length < 1) { + return -1; + } + + // Field ID 1, encoding type 2 (length-prefixed blob) + if (buf_chr(wrapper, 0) != ((1 << 3) | 2)) { + return -1; + } + + size_t start = 1; + ssize_t len = proto_parse_varint(wrapper, &start); + if (len == -1) { + return -1; + } + size_t msg_len = (size_t) len; + if (start + msg_len > wrapper->length) { + return -1; + } + + *msg = adsb__unpack(NULL, msg_len, buf_at(wrapper, start)); + if (!*msg) { + return -1; + } + + return (ssize_t) (start + msg_len); } static bool proto_parse_header(AdsbHeader *header, struct packet *packet, struct proto_parser_state *state) { @@ -136,10 +184,10 @@ void proto_init() { header.mlat_timestamp_max = PACKET_MLAT_MAX; header.rssi_max = PACKET_RSSI_MAX; - Adsb wrapper = ADSB__INIT; - wrapper.header = &header; + Adsb msg = ADSB__INIT; + msg.header = &header; - proto_obj_to_buf(&wrapper, &proto_hello_buf); + proto_wrap_to_buf(&msg, &proto_hello_buf); } void proto_cleanup() { @@ -156,46 +204,38 @@ bool proto_parse(struct buf *buf, struct packet *packet, void *state_in) { proto_prev = NULL; } - struct proto_header *header = (struct proto_header *) buf_at(buf, 0); - if (buf->length < sizeof(*header)) { - return false; - } - uint32_t msg_len = ntohl(header->length); - if (buf->length < sizeof(*header) + msg_len) { + Adsb *msg; + ssize_t len = proto_unwrap(buf, &msg); + if (len == -1) { return false; } - Adsb *wrapper = adsb__unpack(NULL, msg_len, (uint8_t *) buf_at(buf, sizeof(*header))); - if (!wrapper) { - return false; - } - - if (wrapper->header) { - if (!proto_parse_header(wrapper->header, packet, state)) { - adsb__free_unpacked(wrapper, NULL); + if (msg->header) { + if (!proto_parse_header(msg->header, packet, state)) { + adsb__free_unpacked(msg, NULL); return false; } packet->type = PACKET_TYPE_NONE; - } else if (wrapper->mode_s_short) { - if (!proto_parse_packet(wrapper->mode_s_short, packet, state, 7)) { - adsb__free_unpacked(wrapper, NULL); + } else if (msg->mode_s_short) { + if (!proto_parse_packet(msg->mode_s_short, packet, state, 7)) { + adsb__free_unpacked(msg, NULL); return false; } packet->type = PACKET_TYPE_MODE_S_SHORT; - } else if (wrapper->mode_s_long) { - if (!proto_parse_packet(wrapper->mode_s_long, packet, state, 14)) { - adsb__free_unpacked(wrapper, NULL); + } else if (msg->mode_s_long) { + if (!proto_parse_packet(msg->mode_s_long, packet, state, 14)) { + adsb__free_unpacked(msg, NULL); return false; } packet->type = PACKET_TYPE_MODE_S_LONG; } else { // "oneof" is actually "zero or oneof" - adsb__free_unpacked(wrapper, NULL); + adsb__free_unpacked(msg, NULL); return false; } - proto_prev = wrapper; - buf_consume(buf, sizeof(*header) + msg_len); + proto_prev = msg; + buf_consume(buf, (size_t) len); return true; } diff --git a/proto/adsb.proto b/proto/adsb.proto index c7b05ec..e9bf4fc 100644 --- a/proto/adsb.proto +++ b/proto/adsb.proto @@ -21,3 +21,7 @@ message Adsb { AdsbPacket mode_s_long = 3; } } + +message AdsbStream { + repeated Adsb msg = 1; +}