diff --git a/adsbus/adsbus.c b/adsbus/adsbus.c index 1cf9ab2..e491e86 100644 --- a/adsbus/adsbus.c +++ b/adsbus/adsbus.c @@ -116,8 +116,12 @@ int main(int argc, char *argv[]) { return EXIT_FAILURE; } + assert(!close(1)); + peer_loop(); + json_cleanup(); + receive_cleanup(); send_cleanup(); incoming_cleanup(); @@ -127,7 +131,8 @@ int main(int argc, char *argv[]) { resolve_cleanup(); wakeup_cleanup(); - assert(!close(1)); + peer_cleanup(); + assert(!close(2)); return EXIT_SUCCESS; diff --git a/adsbus/airspy_adsb.c b/adsbus/airspy_adsb.c index 2652c87..3bfba92 100644 --- a/adsbus/airspy_adsb.c +++ b/adsbus/airspy_adsb.c @@ -67,7 +67,7 @@ static bool airspy_adsb_parse_mode_s_short(struct buf *buf, struct packet *packe if (!airspy_adsb_parse_common(&short_overlay->common, packet, state)) { return false; } - packet->type = MODE_S_SHORT; + packet->type = PACKET_TYPE_MODE_S_SHORT; hex_to_bin(packet->payload, short_overlay->payload, sizeof(short_overlay->payload) / 2); buf_consume(buf, sizeof(*short_overlay)); return true; @@ -85,7 +85,7 @@ static bool airspy_adsb_parse_mode_s_long(struct buf *buf, struct packet *packet if (!airspy_adsb_parse_common(&long_overlay->common, packet, state)) { return false; } - packet->type = MODE_S_LONG; + packet->type = PACKET_TYPE_MODE_S_LONG; hex_to_bin(packet->payload, long_overlay->payload, sizeof(long_overlay->payload) / 2); buf_consume(buf, sizeof(*long_overlay)); return true; @@ -147,11 +147,14 @@ void airspy_adsb_serialize(struct packet *packet, struct buf *buf) { } switch (packet->type) { - case MODE_S_SHORT: + case PACKET_TYPE_NONE: + break; + + case PACKET_TYPE_MODE_S_SHORT: airspy_adsb_serialize_mode_s_short(packet, buf); break; - case MODE_S_LONG: + case PACKET_TYPE_MODE_S_LONG: airspy_adsb_serialize_mode_s_long(packet, buf); break; } diff --git a/adsbus/beast.c b/adsbus/beast.c index f057099..160cff9 100644 --- a/adsbus/beast.c +++ b/adsbus/beast.c @@ -85,7 +85,7 @@ static bool beast_parse_mode_s_short(struct buf *buf, struct packet *packet, str return false; } struct beast_mode_s_short_overlay *overlay = (struct beast_mode_s_short_overlay *) buf_at(&buf2, 0); - packet->type = MODE_S_SHORT; + packet->type = PACKET_TYPE_MODE_S_SHORT; uint64_t source_mlat = beast_parse_mlat(overlay->mlat_timestamp); packet->mlat_timestamp = packet_mlat_timestamp_scale_in(source_mlat, UINT64_C(0xffffffffffff), 12, &state->mlat_state); packet->rssi = packet_rssi_scale_in(overlay->rssi, UINT8_MAX); @@ -101,7 +101,7 @@ static bool beast_parse_mode_s_long(struct buf *buf, struct packet *packet, stru return false; } struct beast_mode_s_long_overlay *overlay = (struct beast_mode_s_long_overlay *) buf_at(&buf2, 0); - packet->type = MODE_S_LONG; + packet->type = PACKET_TYPE_MODE_S_LONG; uint64_t source_mlat = beast_parse_mlat(overlay->mlat_timestamp); packet->mlat_timestamp = packet_mlat_timestamp_scale_in(source_mlat, UINT64_C(0xffffffffffff), 12, &state->mlat_state); packet->rssi = packet_rssi_scale_in(overlay->rssi == UINT8_MAX ? 0 : overlay->rssi, UINT8_MAX); @@ -133,7 +133,7 @@ bool beast_parse(struct buf *buf, struct packet *packet, void *state_in) { return beast_parse_mode_s_long(buf, packet, state); default: - fprintf(stderr, "unknown beast type %x\n", overlay->type); + fprintf(stderr, "R %s: Unknown beast type %x\n", packet->source_id, overlay->type); return false; } return false; @@ -185,11 +185,14 @@ void beast_serialize(struct packet *packet, struct buf *buf) { } switch (packet->type) { - case MODE_S_SHORT: + case PACKET_TYPE_NONE: + break; + + case PACKET_TYPE_MODE_S_SHORT: beast_serialize_mode_s_short(packet, buf); break; - case MODE_S_LONG: + case PACKET_TYPE_MODE_S_LONG: beast_serialize_mode_s_long(packet, buf); break; } diff --git a/adsbus/incoming.c b/adsbus/incoming.c index c50116f..a69b384 100644 --- a/adsbus/incoming.c +++ b/adsbus/incoming.c @@ -68,7 +68,9 @@ static void incoming_handler(struct peer *peer) { } static void incoming_del(struct incoming *incoming) { - assert(!close(incoming->peer.fd)); + if (incoming->peer.fd >= 0) { + assert(!close(incoming->peer.fd)); + } free(incoming->node); free(incoming->service); free(incoming); diff --git a/adsbus/json.c b/adsbus/json.c index 06599e2..d664c4e 100644 --- a/adsbus/json.c +++ b/adsbus/json.c @@ -14,6 +14,16 @@ #include "json.h" +struct json_parser_state { + struct packet_mlat_state mlat_state; + uint16_t mlat_timestamp_mhz; + uint64_t mlat_timestamp_max; + uint32_t rssi_max; + bool have_header; +}; + +static json_t *json_prev = NULL; + static void json_serialize_to_buf(json_t *obj, struct buf *buf) { assert(json_dump_callback(obj, json_buf_append_callback, buf, 0) == 0); json_decref(obj); @@ -60,7 +70,36 @@ static void json_serialize_mode_s_long(struct packet *packet, struct buf *buf) { json_serialize_to_buf(out, buf); } +static bool json_parse_common(json_t *in, struct packet *packet, struct json_parser_state *state) { + if (!state->have_header) { + return false; + } + + json_t *source_id = json_object_get(in, "source_id"); + if (!source_id || !json_is_string(source_id)) { + return false; + } + packet->source_id = json_string_value(source_id); + + json_t *mlat_timestamp = json_object_get(in, "mlat_timestamp"); + if (mlat_timestamp && json_is_integer(mlat_timestamp)) { + packet->mlat_timestamp = packet_mlat_timestamp_scale_in( + json_integer_value(mlat_timestamp), + state->mlat_timestamp_max, + state->mlat_timestamp_mhz, + &state->mlat_state); + } + + json_t *rssi = json_object_get(in, "rssi"); + if (rssi && json_is_integer(rssi)) { + packet->rssi = packet_rssi_scale_in(json_integer_value(rssi), state->rssi_max); + } + + return true; +} + void json_init() { + assert(sizeof(struct json_parser_state) <= PARSER_STATE_LEN); assert(JSON_INTEGER_IS_LONG_LONG); size_t seed; @@ -68,6 +107,114 @@ void json_init() { json_object_seed(seed); } +void json_cleanup() { + if (json_prev) { + json_decref(json_prev); + } +} + +bool json_parse(struct buf *buf, struct packet *packet, void *state_in) { + struct json_parser_state *state = (struct json_parser_state *) state_in; + + if (json_prev) { + json_decref(json_prev); + json_prev = NULL; + } + + json_error_t err; + json_t *in = json_loadb(buf_at(buf, 0), buf->length, JSON_DISABLE_EOF_CHECK | JSON_REJECT_DUPLICATES, &err); + if (!in) { + return false; + } + if (!json_is_object(in)) { + json_decref(in); + return false; + } + json_t *type = json_object_get(in, "type"); + if (!type || !json_is_string(type)) { + json_decref(in); + return false; + } + const char *type_str = json_string_value(type); + if (!strcmp(type_str, "header")) { + json_t *mlat_timestamp_mhz = json_object_get(in, "mlat_timestamp_mhz"); + if (!mlat_timestamp_mhz || !json_is_integer(mlat_timestamp_mhz)) { + json_decref(in); + return false; + } + state->mlat_timestamp_mhz = json_integer_value(mlat_timestamp_mhz); + + json_t *mlat_timestamp_max = json_object_get(in, "mlat_timestamp_max"); + if (!mlat_timestamp_max || !json_is_integer(mlat_timestamp_max)) { + json_decref(in); + return false; + } + state->mlat_timestamp_max = json_integer_value(mlat_timestamp_max); + + json_t *rssi_max = json_object_get(in, "rssi_max"); + if (!rssi_max || !json_is_integer(rssi_max)) { + json_decref(in); + return false; + } + state->rssi_max = json_integer_value(rssi_max); + + json_t *json_server_id = json_object_get(in, "server_id"); + if (!json_server_id || !json_is_string(json_server_id)) { + json_decref(in); + return false; + } + const char *server_id_str = json_string_value(json_server_id); + if (!strcmp(server_id_str, server_id)) { + fprintf(stderr, "R %s: Attempt to receive json data from our own server ID (%s); loop!\n", packet->source_id, server_id); + json_decref(in); + return false; + } + + fprintf(stderr, "R %s: Connected to server ID: %s\n", packet->source_id, server_id_str); + + state->have_header = true; + packet->type = PACKET_TYPE_NONE; + } else if (!strcmp(type_str, "Mode-S short")) { + if (!json_parse_common(in, packet, state)) { + json_decref(in); + return false; + } + + json_t *payload = json_object_get(in, "payload"); + if (!payload || !json_is_string(payload) || json_string_length(payload) != 14) { + json_decref(in); + return false; + } + + hex_to_bin(packet->payload, json_string_value(payload), 7); + packet->type = PACKET_TYPE_MODE_S_SHORT; + } else if (!strcmp(type_str, "Mode-S long")) { + if (!json_parse_common(in, packet, state)) { + json_decref(in); + return false; + } + + json_t *payload = json_object_get(in, "payload"); + if (!payload || !json_is_string(payload) || json_string_length(payload) != 28) { + json_decref(in); + return false; + } + + hex_to_bin(packet->payload, json_string_value(payload), 14); + packet->type = PACKET_TYPE_MODE_S_LONG; + } else { + json_decref(in); + return false; + } + + buf_consume(buf, err.position); + while (buf->length && (buf_chr(buf, 0) == '\r' || buf_chr(buf, 0) == '\n')) { + buf_consume(buf, 1); + } + json_prev = in; + return true; +} + void json_serialize(struct packet *packet, struct buf *buf) { if (!packet) { json_hello(buf); @@ -75,13 +222,16 @@ void json_serialize(struct packet *packet, struct buf *buf) { } switch (packet->type) { - case MODE_S_SHORT: + case PACKET_TYPE_MODE_S_SHORT: json_serialize_mode_s_short(packet, buf); break; - case MODE_S_LONG: + case PACKET_TYPE_MODE_S_LONG: json_serialize_mode_s_long(packet, buf); break; + + default: + break; } } diff --git a/adsbus/json.h b/adsbus/json.h index ecd4ed2..bd63fff 100644 --- a/adsbus/json.h +++ b/adsbus/json.h @@ -1,9 +1,13 @@ #pragma once +#include + struct buf; struct packet; void json_init(); +void json_cleanup(); +bool json_parse(struct buf *, struct packet *, void *); void json_serialize(struct packet *, struct buf *); int json_buf_append_callback(const char *, size_t, void *); diff --git a/adsbus/opts.c b/adsbus/opts.c index f026b0d..86a678d 100644 --- a/adsbus/opts.c +++ b/adsbus/opts.c @@ -1,5 +1,6 @@ #include #include +#include #include "incoming.h" #include "outgoing.h" @@ -49,7 +50,7 @@ bool opts_add_dump(char *arg) { if (!serializer) { return false; } - send_add(1, serializer); + send_add(dup(1), serializer); return true; } diff --git a/adsbus/packet.c b/adsbus/packet.c index 7127275..284e688 100644 --- a/adsbus/packet.c +++ b/adsbus/packet.c @@ -1,6 +1,7 @@ #include "packet.h" char *packet_type_names[] = { + "INVALID", "Mode-S short", "Mode-S long", }; diff --git a/adsbus/packet.h b/adsbus/packet.h index 50c3f8e..07bd8a6 100644 --- a/adsbus/packet.h +++ b/adsbus/packet.h @@ -4,10 +4,11 @@ #define PACKET_DATA_LEN_MAX 14 struct packet { - char *source_id; + const char *source_id; enum { - MODE_S_SHORT, - MODE_S_LONG, + PACKET_TYPE_NONE, + PACKET_TYPE_MODE_S_SHORT, + PACKET_TYPE_MODE_S_LONG, } type; #define NUM_TYPES 2 uint8_t payload[PACKET_DATA_LEN_MAX]; diff --git a/adsbus/peer.c b/adsbus/peer.c index e338bd3..6d51825 100644 --- a/adsbus/peer.c +++ b/adsbus/peer.c @@ -48,6 +48,10 @@ void peer_init() { signal(SIGINT, peer_cancel); } +void peer_cleanup() { + assert(!close(peer_epoll_fd)); +} + void peer_epoll_add(struct peer *peer, uint32_t events) { struct epoll_event ev = { .events = events, @@ -80,5 +84,4 @@ void peer_loop() { wakeup_dispatch(); } - assert(!close(peer_epoll_fd)); } diff --git a/adsbus/peer.h b/adsbus/peer.h index 11025c5..976084e 100644 --- a/adsbus/peer.h +++ b/adsbus/peer.h @@ -10,6 +10,7 @@ struct peer { peer_event_handler event_handler; }; void peer_init(); +void peer_cleanup(); void peer_epoll_add(struct peer *, uint32_t); void peer_epoll_del(struct peer *); void peer_loop(); diff --git a/adsbus/raw.c b/adsbus/raw.c index 679f16a..79c83e7 100644 --- a/adsbus/raw.c +++ b/adsbus/raw.c @@ -34,7 +34,7 @@ static bool raw_parse_mode_s_short(struct buf *buf, struct packet *packet) { (overlay->cr_lf != '\r' || overlay->lf != '\n'))) { return false; } - packet->type = MODE_S_SHORT; + packet->type = PACKET_TYPE_MODE_S_SHORT; hex_to_bin(packet->payload, overlay->payload, sizeof(overlay->payload) / 2); buf_consume(buf, overlay->cr_lf == '\r' ? sizeof(*overlay) : sizeof(*overlay) - 1); return true; @@ -49,7 +49,7 @@ static bool raw_parse_mode_s_long(struct buf *buf, struct packet *packet) { (overlay->cr_lf != '\r' || overlay->lf != '\n'))) { return false; } - packet->type = MODE_S_LONG; + packet->type = PACKET_TYPE_MODE_S_LONG; hex_to_bin(packet->payload, overlay->payload, sizeof(overlay->payload) / 2); buf_consume(buf, overlay->cr_lf == '\r' ? sizeof(*overlay) : sizeof(*overlay) - 1); return true; @@ -90,11 +90,14 @@ void raw_serialize(struct packet *packet, struct buf *buf) { } switch (packet->type) { - case MODE_S_SHORT: + case PACKET_TYPE_NONE: + break; + + case PACKET_TYPE_MODE_S_SHORT: raw_serialize_mode_s_short(packet, buf); break; - case MODE_S_LONG: + case PACKET_TYPE_MODE_S_LONG: raw_serialize_mode_s_long(packet, buf); break; } diff --git a/adsbus/receive.c b/adsbus/receive.c index d135309..78a8142 100644 --- a/adsbus/receive.c +++ b/adsbus/receive.c @@ -7,6 +7,7 @@ #include "airspy_adsb.h" #include "beast.h" #include "buf.h" +#include "json.h" #include "packet.h" #include "peer.h" #include "raw.h" @@ -42,6 +43,10 @@ struct parser { .name = "beast", .parse = beast_parse, }, + { + .name = "json", + .parse = json_parse, + }, { .name = "raw", .parse = raw_parse, @@ -69,6 +74,7 @@ static bool receive_autodetect_parse(struct receive *receive, struct packet *pac } static void receive_del(struct receive *receive) { + peer_epoll_del((struct peer *) receive); assert(!close(receive->peer.fd)); if (receive->prev) { receive->prev->next = receive->next; @@ -94,6 +100,9 @@ static void receive_read(struct peer *peer) { .source_id = receive->id, }; while (receive->parser_wrapper(receive, &packet)) { + if (packet.type == PACKET_TYPE_NONE) { + continue; + } send_write(&packet); } diff --git a/adsbus/send.c b/adsbus/send.c index 9081ce2..34ac7fb 100644 --- a/adsbus/send.c +++ b/adsbus/send.c @@ -64,7 +64,8 @@ static void send_hangup(struct send *send) { if (send->next) { send->next->prev = send->prev; } - close(send->peer.fd); + peer_epoll_del((struct peer *) send); + assert(!close(send->peer.fd)); free(send); } @@ -94,6 +95,8 @@ void send_cleanup() { struct send *send = serializer->send_head; while (send) { struct send *next = send->next; + peer_epoll_del((struct peer *) send); + assert(!close(send->peer.fd)); free(send); send = next; }