json input support, some shutdown fixes. json needs cleanup.
This commit is contained in:
@@ -116,8 +116,12 @@ int main(int argc, char *argv[]) {
|
||||
return EXIT_FAILURE;
|
||||
}
|
||||
|
||||
assert(!close(1));
|
||||
|
||||
peer_loop();
|
||||
|
||||
json_cleanup();
|
||||
|
||||
receive_cleanup();
|
||||
send_cleanup();
|
||||
incoming_cleanup();
|
||||
@@ -127,7 +131,8 @@ int main(int argc, char *argv[]) {
|
||||
resolve_cleanup();
|
||||
wakeup_cleanup();
|
||||
|
||||
assert(!close(1));
|
||||
peer_cleanup();
|
||||
|
||||
assert(!close(2));
|
||||
|
||||
return EXIT_SUCCESS;
|
||||
|
||||
@@ -67,7 +67,7 @@ static bool airspy_adsb_parse_mode_s_short(struct buf *buf, struct packet *packe
|
||||
if (!airspy_adsb_parse_common(&short_overlay->common, packet, state)) {
|
||||
return false;
|
||||
}
|
||||
packet->type = MODE_S_SHORT;
|
||||
packet->type = PACKET_TYPE_MODE_S_SHORT;
|
||||
hex_to_bin(packet->payload, short_overlay->payload, sizeof(short_overlay->payload) / 2);
|
||||
buf_consume(buf, sizeof(*short_overlay));
|
||||
return true;
|
||||
@@ -85,7 +85,7 @@ static bool airspy_adsb_parse_mode_s_long(struct buf *buf, struct packet *packet
|
||||
if (!airspy_adsb_parse_common(&long_overlay->common, packet, state)) {
|
||||
return false;
|
||||
}
|
||||
packet->type = MODE_S_LONG;
|
||||
packet->type = PACKET_TYPE_MODE_S_LONG;
|
||||
hex_to_bin(packet->payload, long_overlay->payload, sizeof(long_overlay->payload) / 2);
|
||||
buf_consume(buf, sizeof(*long_overlay));
|
||||
return true;
|
||||
@@ -147,11 +147,14 @@ void airspy_adsb_serialize(struct packet *packet, struct buf *buf) {
|
||||
}
|
||||
|
||||
switch (packet->type) {
|
||||
case MODE_S_SHORT:
|
||||
case PACKET_TYPE_NONE:
|
||||
break;
|
||||
|
||||
case PACKET_TYPE_MODE_S_SHORT:
|
||||
airspy_adsb_serialize_mode_s_short(packet, buf);
|
||||
break;
|
||||
|
||||
case MODE_S_LONG:
|
||||
case PACKET_TYPE_MODE_S_LONG:
|
||||
airspy_adsb_serialize_mode_s_long(packet, buf);
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -85,7 +85,7 @@ static bool beast_parse_mode_s_short(struct buf *buf, struct packet *packet, str
|
||||
return false;
|
||||
}
|
||||
struct beast_mode_s_short_overlay *overlay = (struct beast_mode_s_short_overlay *) buf_at(&buf2, 0);
|
||||
packet->type = MODE_S_SHORT;
|
||||
packet->type = PACKET_TYPE_MODE_S_SHORT;
|
||||
uint64_t source_mlat = beast_parse_mlat(overlay->mlat_timestamp);
|
||||
packet->mlat_timestamp = packet_mlat_timestamp_scale_in(source_mlat, UINT64_C(0xffffffffffff), 12, &state->mlat_state);
|
||||
packet->rssi = packet_rssi_scale_in(overlay->rssi, UINT8_MAX);
|
||||
@@ -101,7 +101,7 @@ static bool beast_parse_mode_s_long(struct buf *buf, struct packet *packet, stru
|
||||
return false;
|
||||
}
|
||||
struct beast_mode_s_long_overlay *overlay = (struct beast_mode_s_long_overlay *) buf_at(&buf2, 0);
|
||||
packet->type = MODE_S_LONG;
|
||||
packet->type = PACKET_TYPE_MODE_S_LONG;
|
||||
uint64_t source_mlat = beast_parse_mlat(overlay->mlat_timestamp);
|
||||
packet->mlat_timestamp = packet_mlat_timestamp_scale_in(source_mlat, UINT64_C(0xffffffffffff), 12, &state->mlat_state);
|
||||
packet->rssi = packet_rssi_scale_in(overlay->rssi == UINT8_MAX ? 0 : overlay->rssi, UINT8_MAX);
|
||||
@@ -133,7 +133,7 @@ bool beast_parse(struct buf *buf, struct packet *packet, void *state_in) {
|
||||
return beast_parse_mode_s_long(buf, packet, state);
|
||||
|
||||
default:
|
||||
fprintf(stderr, "unknown beast type %x\n", overlay->type);
|
||||
fprintf(stderr, "R %s: Unknown beast type %x\n", packet->source_id, overlay->type);
|
||||
return false;
|
||||
}
|
||||
return false;
|
||||
@@ -185,11 +185,14 @@ void beast_serialize(struct packet *packet, struct buf *buf) {
|
||||
}
|
||||
|
||||
switch (packet->type) {
|
||||
case MODE_S_SHORT:
|
||||
case PACKET_TYPE_NONE:
|
||||
break;
|
||||
|
||||
case PACKET_TYPE_MODE_S_SHORT:
|
||||
beast_serialize_mode_s_short(packet, buf);
|
||||
break;
|
||||
|
||||
case MODE_S_LONG:
|
||||
case PACKET_TYPE_MODE_S_LONG:
|
||||
beast_serialize_mode_s_long(packet, buf);
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -68,7 +68,9 @@ static void incoming_handler(struct peer *peer) {
|
||||
}
|
||||
|
||||
static void incoming_del(struct incoming *incoming) {
|
||||
assert(!close(incoming->peer.fd));
|
||||
if (incoming->peer.fd >= 0) {
|
||||
assert(!close(incoming->peer.fd));
|
||||
}
|
||||
free(incoming->node);
|
||||
free(incoming->service);
|
||||
free(incoming);
|
||||
|
||||
154
adsbus/json.c
154
adsbus/json.c
@@ -14,6 +14,16 @@
|
||||
|
||||
#include "json.h"
|
||||
|
||||
struct json_parser_state {
|
||||
struct packet_mlat_state mlat_state;
|
||||
uint16_t mlat_timestamp_mhz;
|
||||
uint64_t mlat_timestamp_max;
|
||||
uint32_t rssi_max;
|
||||
bool have_header;
|
||||
};
|
||||
|
||||
static json_t *json_prev = NULL;
|
||||
|
||||
static void json_serialize_to_buf(json_t *obj, struct buf *buf) {
|
||||
assert(json_dump_callback(obj, json_buf_append_callback, buf, 0) == 0);
|
||||
json_decref(obj);
|
||||
@@ -60,7 +70,36 @@ static void json_serialize_mode_s_long(struct packet *packet, struct buf *buf) {
|
||||
json_serialize_to_buf(out, buf);
|
||||
}
|
||||
|
||||
static bool json_parse_common(json_t *in, struct packet *packet, struct json_parser_state *state) {
|
||||
if (!state->have_header) {
|
||||
return false;
|
||||
}
|
||||
|
||||
json_t *source_id = json_object_get(in, "source_id");
|
||||
if (!source_id || !json_is_string(source_id)) {
|
||||
return false;
|
||||
}
|
||||
packet->source_id = json_string_value(source_id);
|
||||
|
||||
json_t *mlat_timestamp = json_object_get(in, "mlat_timestamp");
|
||||
if (mlat_timestamp && json_is_integer(mlat_timestamp)) {
|
||||
packet->mlat_timestamp = packet_mlat_timestamp_scale_in(
|
||||
json_integer_value(mlat_timestamp),
|
||||
state->mlat_timestamp_max,
|
||||
state->mlat_timestamp_mhz,
|
||||
&state->mlat_state);
|
||||
}
|
||||
|
||||
json_t *rssi = json_object_get(in, "rssi");
|
||||
if (rssi && json_is_integer(rssi)) {
|
||||
packet->rssi = packet_rssi_scale_in(json_integer_value(rssi), state->rssi_max);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void json_init() {
|
||||
assert(sizeof(struct json_parser_state) <= PARSER_STATE_LEN);
|
||||
assert(JSON_INTEGER_IS_LONG_LONG);
|
||||
|
||||
size_t seed;
|
||||
@@ -68,6 +107,114 @@ void json_init() {
|
||||
json_object_seed(seed);
|
||||
}
|
||||
|
||||
void json_cleanup() {
|
||||
if (json_prev) {
|
||||
json_decref(json_prev);
|
||||
}
|
||||
}
|
||||
|
||||
bool json_parse(struct buf *buf, struct packet *packet, void *state_in) {
|
||||
struct json_parser_state *state = (struct json_parser_state *) state_in;
|
||||
|
||||
if (json_prev) {
|
||||
json_decref(json_prev);
|
||||
json_prev = NULL;
|
||||
}
|
||||
|
||||
json_error_t err;
|
||||
json_t *in = json_loadb(buf_at(buf, 0), buf->length, JSON_DISABLE_EOF_CHECK | JSON_REJECT_DUPLICATES, &err);
|
||||
if (!in) {
|
||||
return false;
|
||||
}
|
||||
if (!json_is_object(in)) {
|
||||
json_decref(in);
|
||||
return false;
|
||||
}
|
||||
json_t *type = json_object_get(in, "type");
|
||||
if (!type || !json_is_string(type)) {
|
||||
json_decref(in);
|
||||
return false;
|
||||
}
|
||||
const char *type_str = json_string_value(type);
|
||||
if (!strcmp(type_str, "header")) {
|
||||
json_t *mlat_timestamp_mhz = json_object_get(in, "mlat_timestamp_mhz");
|
||||
if (!mlat_timestamp_mhz || !json_is_integer(mlat_timestamp_mhz)) {
|
||||
json_decref(in);
|
||||
return false;
|
||||
}
|
||||
state->mlat_timestamp_mhz = json_integer_value(mlat_timestamp_mhz);
|
||||
|
||||
json_t *mlat_timestamp_max = json_object_get(in, "mlat_timestamp_max");
|
||||
if (!mlat_timestamp_max || !json_is_integer(mlat_timestamp_max)) {
|
||||
json_decref(in);
|
||||
return false;
|
||||
}
|
||||
state->mlat_timestamp_max = json_integer_value(mlat_timestamp_max);
|
||||
|
||||
json_t *rssi_max = json_object_get(in, "rssi_max");
|
||||
if (!rssi_max || !json_is_integer(rssi_max)) {
|
||||
json_decref(in);
|
||||
return false;
|
||||
}
|
||||
state->rssi_max = json_integer_value(rssi_max);
|
||||
|
||||
json_t *json_server_id = json_object_get(in, "server_id");
|
||||
if (!json_server_id || !json_is_string(json_server_id)) {
|
||||
json_decref(in);
|
||||
return false;
|
||||
}
|
||||
const char *server_id_str = json_string_value(json_server_id);
|
||||
if (!strcmp(server_id_str, server_id)) {
|
||||
fprintf(stderr, "R %s: Attempt to receive json data from our own server ID (%s); loop!\n", packet->source_id, server_id);
|
||||
json_decref(in);
|
||||
return false;
|
||||
}
|
||||
|
||||
fprintf(stderr, "R %s: Connected to server ID: %s\n", packet->source_id, server_id_str);
|
||||
|
||||
state->have_header = true;
|
||||
packet->type = PACKET_TYPE_NONE;
|
||||
} else if (!strcmp(type_str, "Mode-S short")) {
|
||||
if (!json_parse_common(in, packet, state)) {
|
||||
json_decref(in);
|
||||
return false;
|
||||
}
|
||||
|
||||
json_t *payload = json_object_get(in, "payload");
|
||||
if (!payload || !json_is_string(payload) || json_string_length(payload) != 14) {
|
||||
json_decref(in);
|
||||
return false;
|
||||
}
|
||||
|
||||
hex_to_bin(packet->payload, json_string_value(payload), 7);
|
||||
packet->type = PACKET_TYPE_MODE_S_SHORT;
|
||||
} else if (!strcmp(type_str, "Mode-S long")) {
|
||||
if (!json_parse_common(in, packet, state)) {
|
||||
json_decref(in);
|
||||
return false;
|
||||
}
|
||||
|
||||
json_t *payload = json_object_get(in, "payload");
|
||||
if (!payload || !json_is_string(payload) || json_string_length(payload) != 28) {
|
||||
json_decref(in);
|
||||
return false;
|
||||
}
|
||||
|
||||
hex_to_bin(packet->payload, json_string_value(payload), 14);
|
||||
packet->type = PACKET_TYPE_MODE_S_LONG;
|
||||
} else {
|
||||
json_decref(in);
|
||||
return false;
|
||||
}
|
||||
|
||||
buf_consume(buf, err.position);
|
||||
while (buf->length && (buf_chr(buf, 0) == '\r' || buf_chr(buf, 0) == '\n')) {
|
||||
buf_consume(buf, 1);
|
||||
}
|
||||
json_prev = in;
|
||||
return true;
|
||||
}
|
||||
|
||||
void json_serialize(struct packet *packet, struct buf *buf) {
|
||||
if (!packet) {
|
||||
json_hello(buf);
|
||||
@@ -75,13 +222,16 @@ void json_serialize(struct packet *packet, struct buf *buf) {
|
||||
}
|
||||
|
||||
switch (packet->type) {
|
||||
case MODE_S_SHORT:
|
||||
case PACKET_TYPE_MODE_S_SHORT:
|
||||
json_serialize_mode_s_short(packet, buf);
|
||||
break;
|
||||
|
||||
case MODE_S_LONG:
|
||||
case PACKET_TYPE_MODE_S_LONG:
|
||||
json_serialize_mode_s_long(packet, buf);
|
||||
break;
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,9 +1,13 @@
|
||||
#pragma once
|
||||
|
||||
#include <stdbool.h>
|
||||
|
||||
struct buf;
|
||||
struct packet;
|
||||
|
||||
void json_init();
|
||||
void json_cleanup();
|
||||
bool json_parse(struct buf *, struct packet *, void *);
|
||||
void json_serialize(struct packet *, struct buf *);
|
||||
|
||||
int json_buf_append_callback(const char *, size_t, void *);
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include "incoming.h"
|
||||
#include "outgoing.h"
|
||||
@@ -49,7 +50,7 @@ bool opts_add_dump(char *arg) {
|
||||
if (!serializer) {
|
||||
return false;
|
||||
}
|
||||
send_add(1, serializer);
|
||||
send_add(dup(1), serializer);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
#include "packet.h"
|
||||
|
||||
char *packet_type_names[] = {
|
||||
"INVALID",
|
||||
"Mode-S short",
|
||||
"Mode-S long",
|
||||
};
|
||||
|
||||
@@ -4,10 +4,11 @@
|
||||
|
||||
#define PACKET_DATA_LEN_MAX 14
|
||||
struct packet {
|
||||
char *source_id;
|
||||
const char *source_id;
|
||||
enum {
|
||||
MODE_S_SHORT,
|
||||
MODE_S_LONG,
|
||||
PACKET_TYPE_NONE,
|
||||
PACKET_TYPE_MODE_S_SHORT,
|
||||
PACKET_TYPE_MODE_S_LONG,
|
||||
} type;
|
||||
#define NUM_TYPES 2
|
||||
uint8_t payload[PACKET_DATA_LEN_MAX];
|
||||
|
||||
@@ -48,6 +48,10 @@ void peer_init() {
|
||||
signal(SIGINT, peer_cancel);
|
||||
}
|
||||
|
||||
void peer_cleanup() {
|
||||
assert(!close(peer_epoll_fd));
|
||||
}
|
||||
|
||||
void peer_epoll_add(struct peer *peer, uint32_t events) {
|
||||
struct epoll_event ev = {
|
||||
.events = events,
|
||||
@@ -80,5 +84,4 @@ void peer_loop() {
|
||||
|
||||
wakeup_dispatch();
|
||||
}
|
||||
assert(!close(peer_epoll_fd));
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ struct peer {
|
||||
peer_event_handler event_handler;
|
||||
};
|
||||
void peer_init();
|
||||
void peer_cleanup();
|
||||
void peer_epoll_add(struct peer *, uint32_t);
|
||||
void peer_epoll_del(struct peer *);
|
||||
void peer_loop();
|
||||
|
||||
11
adsbus/raw.c
11
adsbus/raw.c
@@ -34,7 +34,7 @@ static bool raw_parse_mode_s_short(struct buf *buf, struct packet *packet) {
|
||||
(overlay->cr_lf != '\r' || overlay->lf != '\n'))) {
|
||||
return false;
|
||||
}
|
||||
packet->type = MODE_S_SHORT;
|
||||
packet->type = PACKET_TYPE_MODE_S_SHORT;
|
||||
hex_to_bin(packet->payload, overlay->payload, sizeof(overlay->payload) / 2);
|
||||
buf_consume(buf, overlay->cr_lf == '\r' ? sizeof(*overlay) : sizeof(*overlay) - 1);
|
||||
return true;
|
||||
@@ -49,7 +49,7 @@ static bool raw_parse_mode_s_long(struct buf *buf, struct packet *packet) {
|
||||
(overlay->cr_lf != '\r' || overlay->lf != '\n'))) {
|
||||
return false;
|
||||
}
|
||||
packet->type = MODE_S_LONG;
|
||||
packet->type = PACKET_TYPE_MODE_S_LONG;
|
||||
hex_to_bin(packet->payload, overlay->payload, sizeof(overlay->payload) / 2);
|
||||
buf_consume(buf, overlay->cr_lf == '\r' ? sizeof(*overlay) : sizeof(*overlay) - 1);
|
||||
return true;
|
||||
@@ -90,11 +90,14 @@ void raw_serialize(struct packet *packet, struct buf *buf) {
|
||||
}
|
||||
|
||||
switch (packet->type) {
|
||||
case MODE_S_SHORT:
|
||||
case PACKET_TYPE_NONE:
|
||||
break;
|
||||
|
||||
case PACKET_TYPE_MODE_S_SHORT:
|
||||
raw_serialize_mode_s_short(packet, buf);
|
||||
break;
|
||||
|
||||
case MODE_S_LONG:
|
||||
case PACKET_TYPE_MODE_S_LONG:
|
||||
raw_serialize_mode_s_long(packet, buf);
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
#include "airspy_adsb.h"
|
||||
#include "beast.h"
|
||||
#include "buf.h"
|
||||
#include "json.h"
|
||||
#include "packet.h"
|
||||
#include "peer.h"
|
||||
#include "raw.h"
|
||||
@@ -42,6 +43,10 @@ struct parser {
|
||||
.name = "beast",
|
||||
.parse = beast_parse,
|
||||
},
|
||||
{
|
||||
.name = "json",
|
||||
.parse = json_parse,
|
||||
},
|
||||
{
|
||||
.name = "raw",
|
||||
.parse = raw_parse,
|
||||
@@ -69,6 +74,7 @@ static bool receive_autodetect_parse(struct receive *receive, struct packet *pac
|
||||
}
|
||||
|
||||
static void receive_del(struct receive *receive) {
|
||||
peer_epoll_del((struct peer *) receive);
|
||||
assert(!close(receive->peer.fd));
|
||||
if (receive->prev) {
|
||||
receive->prev->next = receive->next;
|
||||
@@ -94,6 +100,9 @@ static void receive_read(struct peer *peer) {
|
||||
.source_id = receive->id,
|
||||
};
|
||||
while (receive->parser_wrapper(receive, &packet)) {
|
||||
if (packet.type == PACKET_TYPE_NONE) {
|
||||
continue;
|
||||
}
|
||||
send_write(&packet);
|
||||
}
|
||||
|
||||
|
||||
@@ -64,7 +64,8 @@ static void send_hangup(struct send *send) {
|
||||
if (send->next) {
|
||||
send->next->prev = send->prev;
|
||||
}
|
||||
close(send->peer.fd);
|
||||
peer_epoll_del((struct peer *) send);
|
||||
assert(!close(send->peer.fd));
|
||||
free(send);
|
||||
}
|
||||
|
||||
@@ -94,6 +95,8 @@ void send_cleanup() {
|
||||
struct send *send = serializer->send_head;
|
||||
while (send) {
|
||||
struct send *next = send->next;
|
||||
peer_epoll_del((struct peer *) send);
|
||||
assert(!close(send->peer.fd));
|
||||
free(send);
|
||||
send = next;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user