Move adsbus into its own dir.

This commit is contained in:
Ian Gulliver
2016-02-18 09:36:53 -08:00
parent 6ac29e9704
commit cf6eef6118
26 changed files with 0 additions and 3 deletions
+15
View File
@@ -0,0 +1,15 @@
CC ?= clang
CFLAGS ?= -Wall -Werror -O4 -g --std=gnu11 --pedantic-errors -fPIE -pie -fstack-protector-strong
LDFLAGS ?= $(CFLAGS) -Wl,-z,relro -Wl,-z,now
LIBS ?= -luuid -ljansson
all: adsbus
clean:
rm -f *.o adsbus
%.o: %.c *.h
$(CC) -c $(CFLAGS) $< -o $@
adsbus: adsbus.o receive.o send.o incoming.o outgoing.o airspy_adsb.o beast.o json.o raw.o stats.o wakeup.o common.o
$(CC) $(LDFLAGS) -o adsbus adsbus.o receive.o send.o incoming.o outgoing.o airspy_adsb.o beast.o json.o raw.o stats.o wakeup.o common.o $(LIBS)
+13
View File
@@ -0,0 +1,13 @@
# adsbus
adsbus is a hub and protocol translator for [ADS-B](https://en.wikipedia.org/wiki/Automatic_dependent_surveillance_%E2%80%93_broadcast) messages.
## Building
```bash
sudo apt-get -y install uuid-dev libjansson-dev
git clone https://github.com/flamingcowtv/adsb-tools.git
cd adsb-tools
make
```
+196
View File
@@ -0,0 +1,196 @@
#include <stdlib.h>
#include <stdio.h>
#include <getopt.h>
#include <string.h>
#include "common.h"
#include "wakeup.h"
#include "receive.h"
#include "send.h"
#include "incoming.h"
#include "outgoing.h"
#include "airspy_adsb.h"
#include "beast.h"
#include "json.h"
#include "stats.h"
static void print_usage(const char *name) {
fprintf(stderr,
"\n"
"Usage: %s [OPTION]...\n"
"\n"
"Options:\n"
"\t--help\n"
"\t--dump=FORMAT\n"
"\t--connect-receive=HOST/PORT\n"
"\t--connect-send=FORMAT=HOST/PORT\n"
"\t--listen-receive=[HOST/]PORT\n"
"\t--listen-send=FORMAT=[HOST/]PORT\n"
, name);
receive_print_usage();
send_print_usage();
}
static bool add_dump(char *arg) {
struct serializer *serializer = send_get_serializer(arg);
if (!serializer) {
fprintf(stderr, "Unknown --dump=FORMAT: %s\n", arg);
return false;
}
send_add(1, serializer);
return true;
}
static bool add_connect_receive(char *arg) {
char *port = strrchr(arg, '/');
if (!port) {
fprintf(stderr, "Invalid --connect-receive=HOST/PORT (missing \"/\"): %s\n", arg);
return false;
}
*(port++) = '\0';
outgoing_new(arg, port, receive_new, NULL);
return true;
}
static bool add_connect_send(char *arg) {
char *host_port = strchr(arg, '=');
if (!host_port) {
fprintf(stderr, "Invalid --connect-send=FORMAT=HOST/PORT (missing \"=\"): %s\n", arg);
return false;
}
*(host_port++) = '\0';
struct serializer *serializer = send_get_serializer(arg);
if (!serializer) {
fprintf(stderr, "Unknown --connect-send=FORMAT=HOST/PORT format: %s\n", arg);
return false;
}
char *port = strrchr(host_port, '/');
if (!port) {
fprintf(stderr, "Invalid --connect-send=FORMAT=HOST/PORT (missing \"/\"): %s\n", host_port);
return false;
}
*(port++) = '\0';
incoming_new(host_port, port, send_add_wrapper, serializer);
return true;
}
static bool add_listen_receive(char *arg){
char *port = strrchr(arg, '/');
if (port) {
*(port++) = '\0';
incoming_new(arg, port, receive_new, NULL);
} else {
incoming_new(NULL, arg, receive_new, NULL);
}
return true;
}
static bool add_listen_send(char *arg) {
char *host_port = strchr(arg, '=');
if (!host_port) {
fprintf(stderr, "Invalid --listen-send=FORMAT=[HOST/]PORT (missing \"=\"): %s\n", arg);
return false;
}
*(host_port++) = '\0';
struct serializer *serializer = send_get_serializer(arg);
if (!serializer) {
fprintf(stderr, "Unknown --listen-send=FORMAT=[HOST/]PORT format: %s\n", arg);
return false;
}
char *port = strrchr(host_port, '/');
if (port) {
*(port++) = '\0';
incoming_new(host_port, port, send_add_wrapper, serializer);
} else {
incoming_new(NULL, host_port, send_add_wrapper, serializer);
}
return true;
}
static bool parse_opts(int argc, char *argv[]) {
static struct option long_options[] = {
{"dump", required_argument, 0, 'd'},
{"connect-receive", required_argument, 0, 'c'},
{"connect-send", required_argument, 0, 's'},
{"listen-receive", required_argument, 0, 'l'},
{"listen-send", required_argument, 0, 'm'},
{"help", no_argument, 0, 'h'},
{0, 0, 0, 0 },
};
int opt;
while ((opt = getopt_long_only(argc, argv, "", long_options, NULL)) != -1) {
bool (*handler)(char *) = NULL;
switch (opt) {
case 'd':
handler = add_dump;
break;
case 'c':
handler = add_connect_receive;
break;
case 's':
handler = add_connect_send;
break;
case 'l':
handler = add_listen_receive;
break;
case 'm':
handler = add_listen_send;
break;
case 'h':
default:
print_usage(argv[0]);
return false;
}
if (handler) {
if (!handler(optarg)) {
print_usage(argv[0]);
return false;
}
}
}
if (optind != argc) {
fprintf(stderr, "Not a flag: %s\n", argv[optind]);
print_usage(argv[0]);
return false;
}
return true;
}
int main(int argc, char *argv[]) {
hex_init();
peer_init();
wakeup_init();
send_init();
airspy_adsb_init();
beast_init();
json_init();
stats_init();
if (!parse_opts(argc, argv)) {
return EXIT_FAILURE;
}
peer_loop();
return EXIT_SUCCESS;
}
+154
View File
@@ -0,0 +1,154 @@
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include "common.h"
#include "receive.h"
#include "airspy_adsb.h"
#define SEND_MHZ 20
struct __attribute__((packed)) airspy_adsb_common_overlay {
char mlat_timestamp[8];
char semicolon1;
char mlat_precision[2];
char semicolon2;
char rssi[4];
char semicolon3;
};
struct __attribute__((packed)) airspy_adsb_mode_s_short_overlay {
char asterisk;
char payload[14];
char semicolon;
struct airspy_adsb_common_overlay common;
char cr;
char lf;
};
struct __attribute__((packed)) airspy_adsb_mode_s_long_overlay {
char asterisk;
char payload[28];
char semicolon;
struct airspy_adsb_common_overlay common;
char cr;
char lf;
};
struct airspy_adsb_parser_state {
struct mlat_state mlat_state;
};
static bool airspy_adsb_parse_common(const struct airspy_adsb_common_overlay *overlay, struct packet *packet, struct airspy_adsb_parser_state *state) {
if (overlay->semicolon1 != ';' ||
overlay->semicolon2 != ';' ||
overlay->semicolon3 != ';') {
return false;
}
uint16_t mlat_mhz = 2 * hex_to_int(overlay->mlat_precision, sizeof(overlay->mlat_precision) / 2);
packet->mlat_timestamp = mlat_timestamp_scale_in(hex_to_int(overlay->mlat_timestamp, sizeof(overlay->mlat_timestamp) / 2), UINT32_MAX, mlat_mhz, &state->mlat_state);
packet->rssi = rssi_scale_in(hex_to_int(overlay->rssi, sizeof(overlay->rssi) / 2), UINT16_MAX);
return true;
}
static bool airspy_adsb_parse_mode_s_short(struct buf *buf, struct packet *packet, struct airspy_adsb_parser_state *state) {
struct airspy_adsb_mode_s_short_overlay *short_overlay = (struct airspy_adsb_mode_s_short_overlay *) buf_at(buf, 0);
if (buf->length < sizeof(*short_overlay) ||
short_overlay->asterisk != '*' ||
short_overlay->semicolon != ';' ||
short_overlay->cr != '\r' ||
short_overlay->lf != '\n') {
return false;
}
if (!airspy_adsb_parse_common(&short_overlay->common, packet, state)) {
return false;
}
packet->type = MODE_S_SHORT;
hex_to_bin(packet->payload, short_overlay->payload, sizeof(short_overlay->payload) / 2);
buf_consume(buf, sizeof(*short_overlay));
return true;
}
static bool airspy_adsb_parse_mode_s_long(struct buf *buf, struct packet *packet, struct airspy_adsb_parser_state *state) {
struct airspy_adsb_mode_s_long_overlay *long_overlay = (struct airspy_adsb_mode_s_long_overlay *) buf_at(buf, 0);
if (buf->length < sizeof(*long_overlay) ||
long_overlay->asterisk != '*' ||
long_overlay->semicolon != ';' ||
long_overlay->cr != '\r' ||
long_overlay->lf != '\n') {
return false;
}
if (!airspy_adsb_parse_common(&long_overlay->common, packet, state)) {
return false;
}
packet->type = MODE_S_LONG;
hex_to_bin(packet->payload, long_overlay->payload, sizeof(long_overlay->payload) / 2);
buf_consume(buf, sizeof(*long_overlay));
return true;
}
static void airspy_adsb_fill_common(struct packet *packet, struct airspy_adsb_common_overlay *overlay) {
overlay->semicolon1 = overlay->semicolon2 = overlay->semicolon3 = ';';
hex_from_int(
overlay->mlat_timestamp,
mlat_timestamp_scale_out(packet->mlat_timestamp, UINT32_MAX, SEND_MHZ),
sizeof(overlay->mlat_timestamp) / 2);
hex_from_int(overlay->mlat_precision, SEND_MHZ / 2, sizeof(overlay->mlat_precision) / 2);
hex_from_int(overlay->rssi, rssi_scale_out(packet->rssi, UINT16_MAX), sizeof(overlay->rssi) / 2);
}
static void airspy_adsb_serialize_mode_s_short(struct packet *packet, struct buf *buf) {
struct airspy_adsb_mode_s_short_overlay *overlay = (struct airspy_adsb_mode_s_short_overlay *) buf_at(buf, 0);
overlay->asterisk = '*';
overlay->semicolon = ';';
overlay->cr = '\r';
overlay->lf = '\n';
hex_from_bin(overlay->payload, packet->payload, sizeof(overlay->payload) / 2);
airspy_adsb_fill_common(packet, &overlay->common);
buf->length = sizeof(*overlay);
}
static void airspy_adsb_serialize_mode_s_long(struct packet *packet, struct buf *buf) {
struct airspy_adsb_mode_s_long_overlay *overlay = (struct airspy_adsb_mode_s_long_overlay *) buf_at(buf, 0);
overlay->asterisk = '*';
overlay->semicolon = ';';
overlay->cr = '\r';
overlay->lf = '\n';
hex_from_bin(overlay->payload, packet->payload, sizeof(overlay->payload) / 2);
airspy_adsb_fill_common(packet, &overlay->common);
buf->length = sizeof(*overlay);
}
void airspy_adsb_init() {
assert(sizeof(struct airspy_adsb_parser_state) <= PARSER_STATE_LEN);
assert(sizeof(struct airspy_adsb_mode_s_short_overlay) < BUF_LEN_MAX);
assert(sizeof(struct airspy_adsb_mode_s_long_overlay) < BUF_LEN_MAX);
}
bool airspy_adsb_parse(struct buf *buf, struct packet *packet, void *state_in) {
struct airspy_adsb_parser_state *state = (struct airspy_adsb_parser_state *) state_in;
return (
airspy_adsb_parse_mode_s_short(buf, packet, state) ||
airspy_adsb_parse_mode_s_long(buf, packet, state));
}
void airspy_adsb_serialize(struct packet *packet, struct buf *buf) {
if (!packet) {
return;
}
switch (packet->type) {
case MODE_S_SHORT:
airspy_adsb_serialize_mode_s_short(packet, buf);
break;
case MODE_S_LONG:
airspy_adsb_serialize_mode_s_long(packet, buf);
break;
}
}
+10
View File
@@ -0,0 +1,10 @@
#pragma once
#include <stdbool.h>
struct buf;
struct packet;
void airspy_adsb_init();
bool airspy_adsb_parse(struct buf *, struct packet *, void *);
void airspy_adsb_serialize(struct packet *, struct buf *);
+194
View File
@@ -0,0 +1,194 @@
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <stdint.h>
#include "common.h"
#include "receive.h"
#include "beast.h"
struct __attribute__((packed)) beast_common_overlay {
uint8_t one_a;
uint8_t type;
};
struct __attribute__((packed)) beast_mode_s_short_overlay {
struct beast_common_overlay common;
uint8_t mlat_timestamp[6];
uint8_t rssi;
uint8_t payload[7];
};
struct __attribute__((packed)) beast_mode_s_long_overlay {
struct beast_common_overlay common;
uint8_t mlat_timestamp[6];
uint8_t rssi;
uint8_t payload[14];
};
struct beast_parser_state {
struct mlat_state mlat_state;
};
static uint64_t beast_parse_mlat(uint8_t *mlat_timestamp) {
return (
((uint64_t) mlat_timestamp[0]) << 40 |
((uint64_t) mlat_timestamp[1]) << 32 |
((uint64_t) mlat_timestamp[2]) << 24 |
((uint64_t) mlat_timestamp[3]) << 16 |
((uint64_t) mlat_timestamp[4]) << 8 |
((uint64_t) mlat_timestamp[5]));
}
static void beast_write_mlat(uint64_t timestamp, uint8_t *mlat_timestamp) {
mlat_timestamp[0] = (timestamp >> 40) & 0xff;
mlat_timestamp[1] = (timestamp >> 32) & 0xff;
mlat_timestamp[2] = (timestamp >> 24) & 0xff;
mlat_timestamp[3] = (timestamp >> 16) & 0xff;
mlat_timestamp[4] = (timestamp >> 8) & 0xff;
mlat_timestamp[5] = (timestamp) & 0xff;
}
static ssize_t beast_unescape(struct buf *out, const struct buf *in, size_t out_bytes) {
int o = 0, i = 0;
for (; i < in->length && o < out_bytes; i++, o++) {
if (i > 0 && buf_chr(in, i) == 0x1a) {
if (i == in->length - 1 || buf_chr(in, i + 1) != 0x1a) {
return -1;
}
i++;
}
buf_chr(out, o) = buf_chr(in, i);
}
if (o == out_bytes) {
return i;
} else {
return -1;
}
}
static void beast_escape(struct buf *out, const struct buf *in) {
for (int i = 0; i < in->length; i++, out->length++) {
buf_chr(out, out->length) = buf_chr(in, i);
if (i > 0 && buf_chr(in, i) == 0x1a) {
buf_chr(out, ++(out->length)) = 0x1a;
}
}
}
static bool beast_parse_mode_s_short(struct buf *buf, struct packet *packet, struct beast_parser_state *state) {
struct buf buf2 = BUF_INIT;
ssize_t in_bytes = beast_unescape(&buf2, buf, sizeof(struct beast_mode_s_short_overlay));
if (in_bytes < 0) {
return false;
}
struct beast_mode_s_short_overlay *overlay = (struct beast_mode_s_short_overlay *) buf_at(&buf2, 0);
packet->type = MODE_S_SHORT;
uint64_t source_mlat = beast_parse_mlat(overlay->mlat_timestamp);
packet->mlat_timestamp = mlat_timestamp_scale_in(source_mlat, UINT64_C(0xffffffffffff), 12, &state->mlat_state);
packet->rssi = rssi_scale_in(overlay->rssi, UINT8_MAX);
memcpy(packet->payload, overlay->payload, sizeof(overlay->payload));
buf_consume(buf, in_bytes);
return true;
}
static bool beast_parse_mode_s_long(struct buf *buf, struct packet *packet, struct beast_parser_state *state) {
struct buf buf2 = BUF_INIT;
ssize_t in_bytes = beast_unescape(&buf2, buf, sizeof(struct beast_mode_s_long_overlay));
if (in_bytes < 0) {
return false;
}
struct beast_mode_s_long_overlay *overlay = (struct beast_mode_s_long_overlay *) buf_at(&buf2, 0);
packet->type = MODE_S_LONG;
uint64_t source_mlat = beast_parse_mlat(overlay->mlat_timestamp);
packet->mlat_timestamp = mlat_timestamp_scale_in(source_mlat, UINT64_C(0xffffffffffff), 12, &state->mlat_state);
packet->rssi = rssi_scale_in(overlay->rssi == UINT8_MAX ? 0 : overlay->rssi, UINT8_MAX);
memcpy(packet->payload, overlay->payload, sizeof(overlay->payload));
buf_consume(buf, in_bytes);
return true;
}
void beast_init() {
assert(sizeof(struct beast_parser_state) <= PARSER_STATE_LEN);
assert(sizeof(struct beast_mode_s_short_overlay) * 2 <= BUF_LEN_MAX);
assert(sizeof(struct beast_mode_s_long_overlay) * 2 <= BUF_LEN_MAX);
}
bool beast_parse(struct buf *buf, struct packet *packet, void *state_in) {
struct beast_parser_state *state = (struct beast_parser_state *) state_in;
if (buf->length < sizeof(struct beast_common_overlay) ||
buf_chr(buf, 0) != 0x1a) {
return false;
}
struct beast_common_overlay *overlay = (struct beast_common_overlay *) buf_at(buf, 0);
switch (overlay->type) {
case 0x32:
return beast_parse_mode_s_short(buf, packet, state);
case 0x33:
return beast_parse_mode_s_long(buf, packet, state);
default:
fprintf(stderr, "unknown beast type %x\n", overlay->type);
return false;
}
return false;
}
void beast_serialize_mode_s_short(struct packet *packet, struct buf *buf) {
struct buf buf2 = BUF_INIT;
struct beast_mode_s_short_overlay *overlay = (struct beast_mode_s_short_overlay *) buf_at(&buf2, 0);
overlay->common.one_a = 0x1a;
overlay->common.type = 0x32;
memcpy(overlay->payload, packet->payload, sizeof(overlay->payload));
beast_write_mlat(
mlat_timestamp_scale_out(packet->mlat_timestamp, UINT64_C(0xffffffffffff), 12),
overlay->mlat_timestamp);
if (packet->rssi) {
overlay->rssi = rssi_scale_out(packet->rssi, UINT8_MAX);
} else {
overlay->rssi = UINT8_MAX;
}
buf2.length = sizeof(*overlay);
beast_escape(buf, &buf2);
}
void beast_serialize_mode_s_long(struct packet *packet, struct buf *buf) {
struct buf buf2 = BUF_INIT;
struct beast_mode_s_long_overlay *overlay = (struct beast_mode_s_long_overlay *) buf_at(&buf2, 0);
overlay->common.one_a = 0x1a;
overlay->common.type = 0x33;
memcpy(overlay->payload, packet->payload, sizeof(overlay->payload));
beast_write_mlat(
mlat_timestamp_scale_out(packet->mlat_timestamp, UINT64_C(0xffffffffffff), 12),
overlay->mlat_timestamp);
if (packet->rssi) {
overlay->rssi = rssi_scale_out(packet->rssi, UINT8_MAX);
} else {
overlay->rssi = UINT8_MAX;
}
buf2.length = sizeof(*overlay);
beast_escape(buf, &buf2);
}
void beast_serialize(struct packet *packet, struct buf *buf) {
if (!packet) {
return;
}
switch (packet->type) {
case MODE_S_SHORT:
beast_serialize_mode_s_short(packet, buf);
break;
case MODE_S_LONG:
beast_serialize_mode_s_long(packet, buf);
break;
}
}
+10
View File
@@ -0,0 +1,10 @@
#pragma once
#include <stdbool.h>
struct buf;
struct packet;
void beast_init();
bool beast_parse(struct buf *, struct packet *, void *);
void beast_serialize(struct packet *, struct buf *);
+179
View File
@@ -0,0 +1,179 @@
#include <stdio.h>
#include <assert.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <uuid/uuid.h>
#include "common.h"
int epoll_fd;
void peer_init() {
epoll_fd = epoll_create1(0);
assert(epoll_fd >= 0);
}
void peer_epoll_add(struct peer *peer, uint32_t events) {
struct epoll_event ev = {
.events = events,
.data = {
.ptr = peer,
},
};
assert(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peer->fd, &ev) == 0);
}
void peer_epoll_del(struct peer *peer) {
assert(epoll_ctl(epoll_fd, EPOLL_CTL_DEL, peer->fd, NULL) == 0);
}
void peer_loop() {
while (1) {
#define MAX_EVENTS 10
struct epoll_event events[MAX_EVENTS];
int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
assert(nfds >= 0);
for (int n = 0; n < nfds; n++) {
struct peer *peer = events[n].data.ptr;
peer->event_handler(peer);
}
}
}
void buf_init(struct buf *buf) {
buf->start = 0;
buf->length = 0;
}
ssize_t buf_fill(struct buf *buf, int fd) {
if (buf->start + buf->length == BUF_LEN_MAX) {
assert(buf->start > 0);
memmove(buf->buf, buf_at(buf, 0), buf->length);
buf->start = 0;
}
size_t space = BUF_LEN_MAX - buf->length - buf->start;
ssize_t in = read(fd, buf_at(buf, buf->length), space);
if (in <= 0) {
return in;
}
buf->length += in;
return in;
}
void buf_consume(struct buf *buf, size_t length) {
assert(buf->length >= length);
buf->length -= length;
if (buf->length) {
buf->start += length;
} else {
buf->start = 0;
}
}
char *packet_type_names[] = {
"Mode-S short",
"Mode-S long",
};
static uint64_t mlat_timestamp_scale_mhz_in(uint64_t timestamp, uint32_t mhz) {
return timestamp * (MLAT_MHZ / mhz);
}
static uint64_t mlat_timestamp_scale_width_in(uint64_t timestamp, uint64_t max, struct mlat_state *state) {
if (timestamp < state->timestamp_last) {
// Counter reset
state->timestamp_generation += max;
}
state->timestamp_last = timestamp;
return state->timestamp_generation + timestamp;
}
uint64_t mlat_timestamp_scale_in(uint64_t timestamp, uint64_t max, uint16_t mhz, struct mlat_state *state) {
return mlat_timestamp_scale_mhz_in(mlat_timestamp_scale_width_in(timestamp, max, state), mhz);
}
static uint64_t mlat_timestamp_scale_mhz_out(uint64_t timestamp, uint64_t mhz) {
return timestamp / (MLAT_MHZ / mhz);
}
static uint64_t mlat_timestamp_scale_width_out(uint64_t timestamp, uint64_t max) {
return timestamp % max;
}
uint64_t mlat_timestamp_scale_out(uint64_t timestamp, uint64_t max, uint16_t mhz) {
return mlat_timestamp_scale_width_out(mlat_timestamp_scale_mhz_out(timestamp, mhz), max);
}
uint32_t rssi_scale_in(uint32_t value, uint32_t max) {
return value * (RSSI_MAX / max);
}
uint32_t rssi_scale_out(uint32_t value, uint32_t max) {
return value / (RSSI_MAX / max);
}
static uint8_t hex_table[256] = {0};
static char hex_char_table[16] = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F', };
void hex_init() {
for (int i = '0'; i <= '9'; i++) {
hex_table[i] = i - '0';
}
for (int i = 'a'; i <= 'f'; i++) {
hex_table[i] = 10 + i - 'a';
}
for (int i = 'A'; i <= 'F'; i++) {
hex_table[i] = 10 + i - 'A';
}
}
void hex_to_bin(uint8_t *out, const char *in, size_t bytes) {
const uint8_t *in2 = (uint8_t *) in;
for (size_t i = 0, j = 0; i < bytes; i++, j += 2) {
out[i] = (hex_table[in2[j]] << 4) | hex_table[in2[j + 1]];
}
}
uint64_t hex_to_int(const char *in, size_t bytes) {
const uint8_t *in2 = (uint8_t *) in;
uint64_t ret = 0;
bytes *= 2;
for (size_t i = 0; i < bytes; i++) {
ret <<= 4;
ret |= hex_table[in2[i]];
}
return ret;
}
void hex_from_bin(char *out, const uint8_t *in, size_t bytes) {
for (size_t i = 0, j = 0; i < bytes; i++, j += 2) {
out[j] = hex_char_table[in[i] >> 4];
out[j + 1] = hex_char_table[in[i] & 0xf];
}
}
void hex_from_int(char *out, uint64_t in, size_t bytes) {
bytes *= 2;
for (int o = bytes - 1; o >= 0; o--) {
out[o] = hex_char_table[in & 0xf];
in >>= 4;
}
}
void uuid_gen(char *out) {
uuid_t uuid;
uuid_generate(uuid);
uuid_unparse(uuid, out);
}
+94
View File
@@ -0,0 +1,94 @@
#pragma once
#include <stdint.h>
#include <sys/epoll.h>
//////// peer
// All specific peer structs must be castable to this.
struct peer;
typedef void (*peer_event_handler)(struct peer *);
struct peer {
int fd;
peer_event_handler event_handler;
};
extern int epoll_fd;
void peer_init();
void peer_epoll_add(struct peer *, uint32_t);
void peer_epoll_del(struct peer *);
void peer_loop();
//////// buf
#define BUF_LEN_MAX 256
struct buf {
char buf[BUF_LEN_MAX];
size_t start;
size_t length;
};
#define BUF_INIT { \
.start = 0, \
.length = 0, \
}
#define buf_chr(buff, at) ((buff)->buf[(buff)->start + (at)])
#define buf_at(buff, at) (&buf_chr(buff, at))
void buf_init(struct buf *);
ssize_t buf_fill(struct buf *, int);
void buf_consume(struct buf *, size_t);
//////// packet
#define DATA_LEN_MAX 14
struct packet {
enum {
MODE_S_SHORT,
MODE_S_LONG,
} type;
#define NUM_TYPES 2
uint8_t payload[DATA_LEN_MAX];
uint64_t mlat_timestamp;
uint32_t rssi;
};
extern char *packet_type_names[];
//////// mlat
#define MLAT_MHZ 120
// Use the signed max to avoid problems with some consumers; it's large enough to not matter.
#define MLAT_MAX INT64_MAX
#define RSSI_MAX UINT32_MAX
struct mlat_state {
uint64_t timestamp_last;
uint64_t timestamp_generation;
};
uint64_t mlat_timestamp_scale_in(uint64_t, uint64_t, uint16_t, struct mlat_state *);
uint64_t mlat_timestamp_scale_out(uint64_t, uint64_t, uint16_t);
//////// rssi
uint32_t rssi_scale_in(uint32_t, uint32_t);
uint32_t rssi_scale_out(uint32_t, uint32_t);
//////// hex
void hex_init();
void hex_to_bin(uint8_t *, const char *, size_t);
uint64_t hex_to_int(const char *, size_t);
void hex_from_bin(char *, const uint8_t *, size_t);
void hex_from_int(char *, uint64_t, size_t);
///////// uuid
#define UUID_LEN 37
void uuid_gen(char *);
+107
View File
@@ -0,0 +1,107 @@
#define _GNU_SOURCE
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include "common.h"
#include "incoming.h"
struct incoming {
struct peer peer;
char id[UUID_LEN];
const char *node;
const char *service;
incoming_connection_handler handler;
void *passthrough;
};
static void incoming_handler(struct peer *peer) {
struct incoming *incoming = (struct incoming *) peer;
struct sockaddr peer_addr, local_addr;
socklen_t peer_addrlen = sizeof(peer_addr), local_addrlen = sizeof(local_addr);
int fd = accept4(incoming->peer.fd, &peer_addr, &peer_addrlen, SOCK_NONBLOCK);
if (fd == -1) {
fprintf(stderr, "I %s: Failed to accept new connection on %s/%s: %s\n", incoming->id, incoming->node, incoming->service, strerror(errno));
return;
}
char peer_hbuf[NI_MAXHOST], local_hbuf[NI_MAXHOST], peer_sbuf[NI_MAXSERV], local_sbuf[NI_MAXSERV];
assert(getsockname(fd, &local_addr, &local_addrlen) == 0);
assert(getnameinfo(&peer_addr, peer_addrlen, peer_hbuf, sizeof(peer_hbuf), peer_sbuf, sizeof(peer_sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0);
assert(getnameinfo(&local_addr, local_addrlen, local_hbuf, sizeof(local_hbuf), local_sbuf, sizeof(local_sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0);
fprintf(stderr, "I %s: New incoming connection on %s/%s (%s/%s) from %s/%s\n",
incoming->id,
incoming->node, incoming->service,
local_hbuf, local_sbuf,
peer_hbuf, peer_sbuf);
incoming->handler(fd, incoming->passthrough);
}
void incoming_new(const char *node, const char *service, incoming_connection_handler handler, void *passthrough) {
struct incoming *incoming = malloc(sizeof(*incoming));
incoming->peer.event_handler = incoming_handler;
uuid_gen(incoming->id);
incoming->node = node;
incoming->service = service;
incoming->handler = handler;
incoming->passthrough = passthrough;
fprintf(stderr, "I %s: Resolving %s/%s...\n", incoming->id, incoming->node, incoming->service);
struct addrinfo hints = {
.ai_family = AF_UNSPEC,
.ai_socktype = SOCK_STREAM,
.ai_flags = AI_PASSIVE | AI_V4MAPPED | AI_ADDRCONFIG,
};
struct addrinfo *addrs;
int gai_err = getaddrinfo(incoming->node, incoming->service, &hints, &addrs);
if (gai_err) {
fprintf(stderr, "I %s: Failed to resolve %s/%s: %s\n", incoming->id, incoming->node, incoming->service, gai_strerror(gai_err));
free(incoming);
return;
}
struct addrinfo *addr;
for (addr = addrs; addr; addr = addr->ai_next) {
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
assert(getnameinfo(addr->ai_addr, addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0);
fprintf(stderr, "I %s: Listening on %s/%s...\n", incoming->id, hbuf, sbuf);
incoming->peer.fd = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
assert(incoming->peer.fd >= 0);
int optval = 1;
setsockopt(incoming->peer.fd, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval));
if (bind(incoming->peer.fd, addr->ai_addr, addr->ai_addrlen) != 0) {
fprintf(stderr, "I %s: Failed to bind to %s/%s: %s\n", incoming->id, hbuf, sbuf, strerror(errno));
close(incoming->peer.fd);
continue;
}
assert(listen(incoming->peer.fd, 255) == 0);
break;
}
freeaddrinfo(addrs);
if (addr == NULL) {
fprintf(stderr, "I %s: Failed to bind any addresses for %s/%s...\n", incoming->id, incoming->node, incoming->service);
free(incoming);
return;
}
peer_epoll_add((struct peer *) incoming, EPOLLIN);
}
+4
View File
@@ -0,0 +1,4 @@
#pragma once
typedef void (*incoming_connection_handler)(int fd, void *);
void incoming_new(const char *, const char *, incoming_connection_handler, void *);
+81
View File
@@ -0,0 +1,81 @@
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <jansson.h>
#include "receive.h"
#include "send.h"
#include "json.h"
static void json_serialize_to_buf(json_t *obj, struct buf *buf) {
assert(json_dump_callback(obj, json_buf_append_callback, buf, 0) == 0);
json_decref(obj);
buf_chr(buf, buf->length++) = '\n';
}
static void json_hello(struct buf *buf) {
json_t *hello = json_pack("{sIsIsI}",
"mlat_timestamp_mhz", (json_int_t) MLAT_MHZ,
"mlat_timestamp_max", (json_int_t) MLAT_MAX,
"rssi_max", (json_int_t) RSSI_MAX);
json_serialize_to_buf(hello, buf);
}
static void json_add_common(struct packet *packet, json_t *obj) {
json_object_set_new(obj, "type", json_string(packet_type_names[packet->type]));
if (packet->mlat_timestamp) {
json_object_set_new(obj, "mlat_timestamp", json_integer(packet->mlat_timestamp));
}
if (packet->rssi) {
json_object_set_new(obj, "rssi", json_integer(packet->rssi));
}
}
static void json_serialize_mode_s_short(struct packet *packet, struct buf *buf) {
assert(packet->mlat_timestamp < MLAT_MAX);
char hexbuf[14];
hex_from_bin(hexbuf, packet->payload, 7);
json_t *out = json_pack("{ss#}", "payload", hexbuf, 14);
json_add_common(packet, out);
json_serialize_to_buf(out, buf);
}
static void json_serialize_mode_s_long(struct packet *packet, struct buf *buf) {
assert(packet->mlat_timestamp < MLAT_MAX);
char hexbuf[28];
hex_from_bin(hexbuf, packet->payload, 14);
json_t *out = json_pack("{ss#}", "payload", hexbuf, 28);
json_add_common(packet, out);
json_serialize_to_buf(out, buf);
}
void json_init() {
assert(JSON_INTEGER_IS_LONG_LONG);
}
void json_serialize(struct packet *packet, struct buf *buf) {
if (!packet) {
json_hello(buf);
return;
}
switch (packet->type) {
case MODE_S_SHORT:
json_serialize_mode_s_short(packet, buf);
break;
case MODE_S_LONG:
json_serialize_mode_s_long(packet, buf);
break;
}
}
int json_buf_append_callback(const char *buffer, size_t size, void *data) {
struct buf *buf = data;
if (buf->length + size + 1 > BUF_LEN_MAX) {
return -1;
}
memcpy(buf_at(buf, buf->length), buffer, size);
buf->length += size;
return 0;
}
+6
View File
@@ -0,0 +1,6 @@
#pragma once
void json_init();
void json_serialize(struct packet *, struct buf *);
int json_buf_append_callback(const char *, size_t, void *);
+121
View File
@@ -0,0 +1,121 @@
#include <stdio.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include "common.h"
#include "outgoing.h"
struct outgoing {
struct peer peer;
char id[UUID_LEN];
const char *node;
const char *service;
struct addrinfo *addrs;
struct addrinfo *addr;
outgoing_connection_handler handler;
void *passthrough;
};
static void outgoing_connect_result(struct outgoing *, int);
static void outgoing_resolve(struct outgoing *);
static void outgoing_connect_next(struct outgoing *outgoing) {
if (outgoing->addr == NULL) {
freeaddrinfo(outgoing->addrs);
fprintf(stderr, "O %s: Can't connect to any addresses of %s/%s\n", outgoing->id, outgoing->node, outgoing->service);
// TODO: timed retry
return;
}
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
assert(getnameinfo(outgoing->addr->ai_addr, outgoing->addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0);
fprintf(stderr, "O %s: Connecting to %s/%s...\n", outgoing->id, hbuf, sbuf);
outgoing->peer.fd = socket(outgoing->addr->ai_family, outgoing->addr->ai_socktype | SOCK_NONBLOCK, outgoing->addr->ai_protocol);
assert(outgoing->peer.fd >= 0);
int result = connect(outgoing->peer.fd, outgoing->addr->ai_addr, outgoing->addr->ai_addrlen);
outgoing_connect_result(outgoing, result == 0 ? result : errno);
}
static void outgoing_connect_handler(struct peer *peer) {
struct outgoing *outgoing = (struct outgoing *) peer;
peer_epoll_del(&outgoing->peer);
int error;
socklen_t len = sizeof(error);
assert(getsockopt(outgoing->peer.fd, SOL_SOCKET, SO_ERROR, &error, &len) == 0);
outgoing_connect_result(outgoing, error);
}
static void outgoing_disconnect_handler(struct peer *peer) {
struct outgoing *outgoing = (struct outgoing *) peer;
close(outgoing->peer.fd);
fprintf(stderr, "O %s: Peer disconnected; reconnecting...\n", outgoing->id);
outgoing_resolve(outgoing);
}
static void outgoing_connect_result(struct outgoing *outgoing, int result) {
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
assert(getnameinfo(outgoing->addr->ai_addr, outgoing->addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0);
switch (result) {
case 0:
fprintf(stderr, "O %s: Connected to %s/%s\n", outgoing->id, hbuf, sbuf);
freeaddrinfo(outgoing->addrs);
// We listen for just hangup on this fd. We pass a duplicate to the handler, so our epoll setings are independent.
outgoing->peer.event_handler = outgoing_disconnect_handler;
peer_epoll_add((struct peer *) outgoing, EPOLLRDHUP);
outgoing->handler(dup(outgoing->peer.fd), outgoing->passthrough);
break;
case EINPROGRESS:
outgoing->peer.event_handler = outgoing_connect_handler;
peer_epoll_add((struct peer *) outgoing, EPOLLOUT);
break;
default:
fprintf(stderr, "O %s: Can't connect to %s/%s: %s\n", outgoing->id, hbuf, sbuf, strerror(result));
close(outgoing->peer.fd);
outgoing->addr = outgoing->addr->ai_next;
// Tail recursion :/
outgoing_connect_next(outgoing);
break;
}
}
static void outgoing_resolve(struct outgoing *outgoing) {
fprintf(stderr, "O %s: Resolving %s/%s...\n", outgoing->id, outgoing->node, outgoing->service);
struct addrinfo hints = {
.ai_family = AF_UNSPEC,
.ai_socktype = SOCK_STREAM,
};
int gai_err = getaddrinfo(outgoing->node, outgoing->service, &hints, &outgoing->addrs);
if (gai_err) {
fprintf(stderr, "O %s: Failed to resolve %s/%s: %s\n", outgoing->id, outgoing->node, outgoing->service, gai_strerror(gai_err));
// TODO: timed retry
return;
}
outgoing->addr = outgoing->addrs;
outgoing_connect_next(outgoing);
}
void outgoing_new(const char *node, const char *service, outgoing_connection_handler handler, void *passthrough) {
struct outgoing *outgoing = malloc(sizeof(*outgoing));
uuid_gen(outgoing->id);
outgoing->node = node;
outgoing->service = service;
outgoing->handler = handler;
outgoing->passthrough = passthrough;
outgoing_resolve(outgoing);
}
+4
View File
@@ -0,0 +1,4 @@
#pragma once
typedef void (*outgoing_connection_handler)(int fd, void *);
void outgoing_new(const char *, const char *, outgoing_connection_handler, void *);
+93
View File
@@ -0,0 +1,93 @@
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include "common.h"
#include "raw.h"
struct __attribute__((packed)) raw_mode_s_short_overlay {
char asterisk;
char payload[14];
char semicolon;
char lf;
};
struct __attribute__((packed)) raw_mode_s_long_overlay {
char asterisk;
char payload[28];
char semicolon;
char lf;
};
static bool raw_parse_mode_s_short(struct buf *buf, struct packet *packet) {
struct raw_mode_s_short_overlay *short_overlay = (struct raw_mode_s_short_overlay *) buf_at(buf, 0);
if (buf->length < sizeof(*short_overlay) ||
short_overlay->asterisk != '*' ||
short_overlay->semicolon != ';' ||
short_overlay->lf != '\n') {
return false;
}
packet->type = MODE_S_SHORT;
hex_to_bin(packet->payload, short_overlay->payload, sizeof(short_overlay->payload) / 2);
buf_consume(buf, sizeof(*short_overlay));
return true;
}
static bool raw_parse_mode_s_long(struct buf *buf, struct packet *packet) {
struct raw_mode_s_long_overlay *long_overlay = (struct raw_mode_s_long_overlay *) buf_at(buf, 0);
if (buf->length < sizeof(*long_overlay) ||
long_overlay->asterisk != '*' ||
long_overlay->semicolon != ';' ||
long_overlay->lf != '\n') {
return false;
}
packet->type = MODE_S_LONG;
hex_to_bin(packet->payload, long_overlay->payload, sizeof(long_overlay->payload) / 2);
buf_consume(buf, sizeof(*long_overlay));
return true;
}
static void raw_serialize_mode_s_short(struct packet *packet, struct buf *buf) {
struct raw_mode_s_short_overlay *overlay = (struct raw_mode_s_short_overlay *) buf_at(buf, 0);
overlay->asterisk = '*';
overlay->semicolon = ';';
overlay->lf = '\n';
hex_from_bin(overlay->payload, packet->payload, sizeof(overlay->payload) / 2);
buf->length = sizeof(*overlay);
}
static void raw_serialize_mode_s_long(struct packet *packet, struct buf *buf) {
struct raw_mode_s_long_overlay *overlay = (struct raw_mode_s_long_overlay *) buf_at(buf, 0);
overlay->asterisk = '*';
overlay->semicolon = ';';
overlay->lf = '\n';
hex_from_bin(overlay->payload, packet->payload, sizeof(overlay->payload) / 2);
buf->length = sizeof(*overlay);
}
void raw_init() {
assert(sizeof(struct raw_mode_s_short_overlay) < BUF_LEN_MAX);
assert(sizeof(struct raw_mode_s_long_overlay) < BUF_LEN_MAX);
}
bool raw_parse(struct buf *buf, struct packet *packet, void *state_in) {
return (
raw_parse_mode_s_short(buf, packet) ||
raw_parse_mode_s_long(buf, packet));
}
void raw_serialize(struct packet *packet, struct buf *buf) {
if (!packet) {
return;
}
switch (packet->type) {
case MODE_S_SHORT:
raw_serialize_mode_s_short(packet, buf);
break;
case MODE_S_LONG:
raw_serialize_mode_s_long(packet, buf);
break;
}
}
+10
View File
@@ -0,0 +1,10 @@
#pragma once
#include <stdbool.h>
struct buf;
struct packet;
void raw_init();
bool raw_parse(struct buf *, struct packet *, void *);
void raw_serialize(struct packet *, struct buf *);
+104
View File
@@ -0,0 +1,104 @@
#include <stdio.h>
#include <assert.h>
#include <string.h>
#include <unistd.h>
#include "airspy_adsb.h"
#include "beast.h"
#include "raw.h"
#include "send.h"
#include "receive.h"
struct receive;
typedef bool (*parser_wrapper)(struct receive *, struct packet *);
typedef bool (*parser)(struct buf *, struct packet *, void *state);
struct receive {
struct peer peer;
char id[UUID_LEN];
struct buf buf;
char parser_state[PARSER_STATE_LEN];
parser_wrapper parser_wrapper;
parser parser;
};
struct parser {
char *name;
parser parse;
} parsers[] = {
{
.name = "airspy_adsb",
.parse = airspy_adsb_parse,
},
{
.name = "beast",
.parse = beast_parse,
},
{
.name = "raw",
.parse = raw_parse,
},
};
#define NUM_PARSERS (sizeof(parsers) / sizeof(*parsers))
static bool receive_parse_wrapper(struct receive *receive, struct packet *packet) {
return receive->parser(&receive->buf, packet, receive->parser_state);
}
static bool receive_autodetect_parse(struct receive *receive, struct packet *packet) {
struct buf *buf = &receive->buf;
void *state = receive->parser_state;
for (int i = 0; i < NUM_PARSERS; i++) {
if (parsers[i].parse(buf, packet, state)) {
fprintf(stderr, "R %s: Detected input format %s\n", receive->id, parsers[i].name);
receive->parser_wrapper = receive_parse_wrapper;
receive->parser = parsers[i].parse;
return true;
}
}
return false;
}
static void receive_read(struct peer *peer) {
struct receive *receive = (struct receive *) peer;
if (buf_fill(&receive->buf, receive->peer.fd) <= 0) {
fprintf(stderr, "R %s: Connection closed by peer\n", receive->id);
close(receive->peer.fd);
return;
}
struct packet packet = { 0 };
while (receive->parser_wrapper(receive, &packet)) {
send_write(&packet);
}
if (receive->buf.length == BUF_LEN_MAX) {
fprintf(stderr, "R %s: Input buffer overrun. This probably means that adsbus doesn't understand the protocol that this source is speaking.\n", receive->id);
close(receive->peer.fd);
return;
}
}
void receive_new(int fd, void *unused) {
struct receive *receive = malloc(sizeof(*receive));
assert(receive);
uuid_gen(receive->id);
receive->peer.fd = fd;
buf_init(&receive->buf);
memset(receive->parser_state, 0, PARSER_STATE_LEN);
receive->parser_wrapper = receive_autodetect_parse;
receive->peer.event_handler = receive_read;
peer_epoll_add((struct peer *) receive, EPOLLIN);
fprintf(stderr, "R %s: New receive connection\n", receive->id);
}
void receive_print_usage() {
fprintf(stderr, "\nSupported receive formats (auto-detected):\n");
for (int i = 0; i < NUM_PARSERS; i++) {
fprintf(stderr, "\t%s\n", parsers[i].name);
}
}
+10
View File
@@ -0,0 +1,10 @@
#pragma once
#include <stdbool.h>
#include "common.h"
#define PARSER_STATE_LEN 256
void receive_new(int, void *);
void receive_print_usage();
+155
View File
@@ -0,0 +1,155 @@
#include <stdbool.h>
#include <stdlib.h>
#include <assert.h>
#include <strings.h>
#include <stdio.h>
#include <unistd.h>
#include <signal.h>
#include "common.h"
#include "send.h"
#include "airspy_adsb.h"
#include "beast.h"
#include "json.h"
#include "raw.h"
#include "stats.h"
struct send {
struct peer peer;
char id[UUID_LEN];
struct serializer *serializer;
struct send *prev;
struct send *next;
};
typedef void (*serializer)(struct packet *, struct buf *);
struct serializer {
char *name;
serializer serialize;
struct send *send_head;
} serializers[] = {
{
.name = "airspy_adsb",
.serialize = airspy_adsb_serialize,
},
{
.name = "beast",
.serialize = beast_serialize,
},
{
.name = "json",
.serialize = json_serialize,
},
{
.name = "raw",
.serialize = raw_serialize,
},
{
.name = "stats",
.serialize = stats_serialize,
},
};
#define NUM_SERIALIZERS (sizeof(serializers) / sizeof(*serializers))
static void send_hangup(struct send *send) {
fprintf(stderr, "S %s (%s): Peer disconnected\n", send->id, send->serializer->name);
if (send->prev) {
send->prev->next = send->next;
} else {
send->serializer->send_head = send->next;
}
if (send->next) {
send->next->prev = send->prev;
}
close(send->peer.fd);
free(send);
}
static void send_hangup_wrapper(struct peer *peer) {
send_hangup((struct send *) peer);
}
static bool send_hello(int fd, struct serializer *serializer) {
struct buf buf = BUF_INIT;
serializer->serialize(NULL, &buf);
if (buf.length == 0) {
return true;
}
if (write(fd, buf_at(&buf, 0), buf.length) != buf.length) {
return false;
}
return true;
}
void send_init() {
signal(SIGPIPE, SIG_IGN);
}
struct serializer *send_get_serializer(char *name) {
for (int i = 0; i < NUM_SERIALIZERS; i++) {
if (strcasecmp(serializers[i].name, name) == 0) {
return &serializers[i];
}
}
return NULL;
}
void send_add(int fd, struct serializer *serializer) {
if (!send_hello(fd, serializer)) {
fprintf(stderr, "S xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx: Failed to write hello\n");
return;
}
struct send *send = malloc(sizeof(*send));
assert(send);
send->peer.fd = fd;
send->peer.event_handler = send_hangup_wrapper;
uuid_gen(send->id);
send->serializer = serializer;
send->prev = NULL;
send->next = serializer->send_head;
serializer->send_head = send;
peer_epoll_add((struct peer *) send, EPOLLRDHUP);
fprintf(stderr, "S %s (%s): New send connection\n", send->id, serializer->name);
}
void send_add_wrapper(int fd, void *passthrough) {
send_add(fd, (struct serializer *) passthrough);
}
void send_write(struct packet *packet) {
for (int i = 0; i < NUM_SERIALIZERS; i++) {
struct serializer *serializer = &serializers[i];
if (serializer->send_head == NULL) {
continue;
}
struct buf buf = BUF_INIT;
serializer->serialize(packet, &buf);
if (buf.length == 0) {
continue;
}
struct send *send = serializer->send_head;
while (send) {
if (write(send->peer.fd, buf_at(&buf, 0), buf.length) == buf.length) {
send = send->next;
} else {
struct send *next = send->next;
send_hangup(send);
send = next;
}
}
}
}
void send_print_usage() {
fprintf(stderr, "\nSupported send formats:\n");
for (int i = 0; i < NUM_SERIALIZERS; i++) {
fprintf(stderr, "\t%s\n", serializers[i].name);
}
}
+10
View File
@@ -0,0 +1,10 @@
#pragma once
#include "common.h"
void send_init();
struct serializer *send_get_serializer(char *);
void send_add(int, struct serializer *);
void send_add_wrapper(int, void *);
void send_write(struct packet *);
void send_print_usage();
+39
View File
@@ -0,0 +1,39 @@
#include <assert.h>
#include <time.h>
#include <jansson.h>
#include "common.h"
#include "json.h"
#include "stats.h"
static struct stats_state {
uint64_t total_count;
uint64_t type_count[NUM_TYPES];
struct timespec start;
} stats_state = { 0 };
void stats_init() {
assert(clock_gettime(CLOCK_MONOTONIC, &stats_state.start) == 0);
}
void stats_serialize(struct packet *packet, struct buf *buf) {
if (packet) {
stats_state.total_count++;
stats_state.type_count[packet->type]++;
}
if (stats_state.total_count % 1000 != 0) {
return;
}
json_t *counts = json_object();
for (int i = 0; i < NUM_TYPES; i++) {
json_object_set_new(counts, packet_type_names[i], json_integer(stats_state.type_count[i]));
}
struct timespec now;
assert(clock_gettime(CLOCK_MONOTONIC, &now) == 0);
json_t *out = json_pack("{sIso}",
"uptime_seconds", (json_int_t) (now.tv_sec - stats_state.start.tv_sec),
"packet_counts", counts);
assert(json_dump_callback(out, json_buf_append_callback, buf, 0) == 0);
json_decref(out);
buf_chr(buf, buf->length++) = '\n';
}
+7
View File
@@ -0,0 +1,7 @@
#pragma once
struct packet;
struct buf;
void stats_init();
void stats_serialize(struct packet *, struct buf *);
+14
View File
@@ -0,0 +1,14 @@
#include <assert.h>
#include <unistd.h>
#include "common.h"
#include "wakeup.h"
void wakeup_init() {
int pipefd[2];
assert(!pipe(pipefd));
}
void wakeup_add(struct peer *peer, int delay_ms) {
}
+6
View File
@@ -0,0 +1,6 @@
#pragma once
struct peer;
void wakeup_init();
void wakeup_add(struct peer *, int);