Split backend and outgoing. Rename backend -> receive, client -> send.

This commit is contained in:
Ian Gulliver
2016-02-17 17:19:57 -08:00
parent ae5b4c1ce4
commit 5671f37f8a
16 changed files with 464 additions and 429 deletions

View File

@@ -11,5 +11,5 @@ clean:
%.o: %.c *.h
$(CC) -c $(CFLAGS) $< -o $@
adsbus: adsbus.o backend.o client.o incoming.o airspy_adsb.o beast.o json.o raw.o stats.o common.o
$(CC) $(LDFLAGS) -o adsbus adsbus.o backend.o client.o incoming.o airspy_adsb.o beast.o json.o raw.o stats.o common.o $(LIBS)
adsbus: adsbus.o receive.o send.o incoming.o outgoing.o airspy_adsb.o beast.o json.o raw.o stats.o common.o
$(CC) $(LDFLAGS) -o adsbus adsbus.o receive.o send.o incoming.o outgoing.o airspy_adsb.o beast.o json.o raw.o stats.o common.o $(LIBS)

123
adsbus.c
View File

@@ -4,9 +4,12 @@
#include <string.h>
#include "common.h"
#include "backend.h"
#include "client.h"
#include "receive.h"
#include "send.h"
#include "incoming.h"
#include "outgoing.h"
#include "airspy_adsb.h"
#include "beast.h"
@@ -20,106 +23,134 @@ static void print_usage(const char *name) {
"\n"
"Options:\n"
"\t--help\n"
"\t--backend=HOST/PORT\n"
"\t--dump=FORMAT\n"
"\t--incoming=[HOST/]PORT\n"
"\t--listen=FORMAT=[HOST/]PORT\n"
"\t--connect-receive=HOST/PORT\n"
"\t--connect-send=FORMAT=HOST/PORT\n"
"\t--listen-receive=[HOST/]PORT\n"
"\t--listen-send=FORMAT=[HOST/]PORT\n"
, name);
backend_print_usage();
client_print_usage();
receive_print_usage();
send_print_usage();
}
static bool add_dump(char *arg) {
struct serializer *serializer = client_get_serializer(arg);
struct serializer *serializer = send_get_serializer(arg);
if (!serializer) {
fprintf(stderr, "Unknown --dump=FORMAT: %s\n", arg);
return false;
}
client_add(1, serializer);
send_add(1, serializer);
return true;
}
static bool add_backend(char *arg) {
static bool add_connect_receive(char *arg) {
char *port = strrchr(arg, '/');
if (!port) {
fprintf(stderr, "Invalid --backend=HOST/PORT (missing \"/\"): %s\n", arg);
fprintf(stderr, "Invalid --connect-receive=HOST/PORT (missing \"/\"): %s\n", arg);
return false;
}
*(port++) = '\0';
backend_new(arg, port);
outgoing_new(arg, port, receive_new, NULL);
return true;
}
static bool add_incoming(char *arg){
char *port = strrchr(arg, '/');
if (port) {
*(port++) = '\0';
incoming_new(arg, port, backend_new_fd_wrapper, NULL);
} else {
incoming_new(NULL, arg, backend_new_fd_wrapper, NULL);
}
return true;
}
static bool add_listener(char *arg) {
static bool add_connect_send(char *arg) {
char *host_port = strchr(arg, '=');
if (!host_port) {
fprintf(stderr, "Invalid --listener=FORMAT=[HOST/]PORT (missing \"=\"): %s\n", arg);
fprintf(stderr, "Invalid --connect-send=FORMAT=HOST/PORT (missing \"=\"): %s\n", arg);
return false;
}
*(host_port++) = '\0';
struct serializer *serializer = client_get_serializer(arg);
struct serializer *serializer = send_get_serializer(arg);
if (!serializer) {
fprintf(stderr, "Unknown --listener=FORMAT=[HOST/]PORT format: %s\n", arg);
fprintf(stderr, "Unknown --connect-send=FORMAT=HOST/PORT format: %s\n", arg);
return false;
}
char *port = strrchr(host_port, '/');
if (!port) {
fprintf(stderr, "Invalid --connect-send=FORMAT=HOST/PORT (missing \"/\"): %s\n", host_port);
return false;
}
*(port++) = '\0';
incoming_new(host_port, port, send_add_wrapper, serializer);
return true;
}
static bool add_listen_receive(char *arg){
char *port = strrchr(arg, '/');
if (port) {
*(port++) = '\0';
incoming_new(arg, port, receive_new, NULL);
} else {
incoming_new(NULL, arg, receive_new, NULL);
}
return true;
}
static bool add_listen_send(char *arg) {
char *host_port = strchr(arg, '=');
if (!host_port) {
fprintf(stderr, "Invalid --listen-send=FORMAT=[HOST/]PORT (missing \"=\"): %s\n", arg);
return false;
}
*(host_port++) = '\0';
struct serializer *serializer = send_get_serializer(arg);
if (!serializer) {
fprintf(stderr, "Unknown --listen-send=FORMAT=[HOST/]PORT format: %s\n", arg);
return false;
}
char *port = strrchr(host_port, '/');
if (port) {
*(port++) = '\0';
incoming_new(host_port, port, client_add_wrapper, serializer);
incoming_new(host_port, port, send_add_wrapper, serializer);
} else {
incoming_new(NULL, host_port, client_add_wrapper, serializer);
incoming_new(NULL, host_port, send_add_wrapper, serializer);
}
return true;
}
static bool parse_opts(int argc, char *argv[]) {
static struct option long_options[] = {
{"backend", required_argument, 0, 'b'},
{"dump", required_argument, 0, 'd'},
{"incoming", required_argument, 0, 'i'},
{"listen", required_argument, 0, 'l'},
{"help", no_argument, 0, 'h'},
{0, 0, 0, 0 },
{"dump", required_argument, 0, 'd'},
{"connect-receive", required_argument, 0, 'c'},
{"connect-send", required_argument, 0, 's'},
{"listen-receive", required_argument, 0, 'l'},
{"listen-send", required_argument, 0, 'm'},
{"help", no_argument, 0, 'h'},
{0, 0, 0, 0 },
};
int opt;
while ((opt = getopt_long_only(argc, argv, "", long_options, NULL)) != -1) {
bool (*handler)(char *) = NULL;
switch (opt) {
case 'b':
handler = add_backend;
break;
case 'd':
handler = add_dump;
break;
case 'h':
print_usage(argv[0]);
return false;
case 'c':
handler = add_connect_receive;
break;
case 'i':
handler = add_incoming;
case 's':
handler = add_connect_send;
break;
case 'l':
handler = add_listener;
handler = add_listen_receive;
break;
case 'm':
handler = add_listen_send;
break;
case 'h':
default:
print_usage(argv[0]);
return false;
@@ -145,7 +176,7 @@ static bool parse_opts(int argc, char *argv[]) {
int main(int argc, char *argv[]) {
peer_init();
hex_init();
client_init();
send_init();
airspy_adsb_init();
beast_init();
json_init();

View File

@@ -3,7 +3,7 @@
#include <string.h>
#include "common.h"
#include "backend.h"
#include "receive.h"
#include "airspy_adsb.h"
struct __attribute__((packed)) airspy_adsb_common_overlay {

207
backend.c
View File

@@ -1,207 +0,0 @@
#include <stdio.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include "airspy_adsb.h"
#include "beast.h"
#include "raw.h"
#include "client.h"
#include "backend.h"
typedef bool (*parser_wrapper)(struct backend *, struct packet *);
typedef bool (*parser)(struct buf *, struct packet *, void *state);
struct backend {
struct peer peer;
char id[UUID_LEN];
const char *node;
const char *service;
struct addrinfo *addrs;
struct addrinfo *addr;
struct buf buf;
char parser_state[PARSER_STATE_LEN];
parser_wrapper parser_wrapper;
parser parser;
};
static void backend_connect_result(struct backend *, int);
struct parser {
char *name;
parser parse;
} parsers[] = {
{
.name = "airspy_adsb",
.parse = airspy_adsb_parse,
},
{
.name = "beast",
.parse = beast_parse,
},
{
.name = "raw",
.parse = raw_parse,
},
};
#define NUM_PARSERS (sizeof(parsers) / sizeof(*parsers))
static bool backend_parse_wrapper(struct backend *backend, struct packet *packet) {
return backend->parser(&backend->buf, packet, backend->parser_state);
}
static bool backend_autodetect_parse(struct backend *backend, struct packet *packet) {
struct buf *buf = &backend->buf;
void *state = backend->parser_state;
for (int i = 0; i < NUM_PARSERS; i++) {
if (parsers[i].parse(buf, packet, state)) {
fprintf(stderr, "B %s: Detected input format %s\n", backend->id, parsers[i].name);
backend->parser_wrapper = backend_parse_wrapper;
backend->parser = parsers[i].parse;
return true;
}
}
return false;
}
static struct backend *backend_create() {
struct backend *backend = malloc(sizeof(*backend));
assert(backend);
uuid_gen(backend->id);
backend->peer.fd = -1;
backend->node = NULL;
backend->service = NULL;
buf_init(&backend->buf);
memset(backend->parser_state, 0, PARSER_STATE_LEN);
backend->parser_wrapper = backend_autodetect_parse;
return backend;
}
static void backend_connect_handler(struct peer *peer) {
struct backend *backend = (struct backend *) peer;
peer_epoll_del(peer);
int error;
socklen_t len = sizeof(error);
assert(getsockopt(backend->peer.fd, SOL_SOCKET, SO_ERROR, &error, &len) == 0);
backend_connect_result(backend, error);
}
static void backend_connect_next(struct backend *backend) {
if (backend->addr == NULL) {
freeaddrinfo(backend->addrs);
fprintf(stderr, "B %s: Can't connect to any addresses of %s/%s\n", backend->id, backend->node, backend->service);
return;
}
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
assert(getnameinfo(backend->addr->ai_addr, backend->addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0);
fprintf(stderr, "B %s: Connecting to %s/%s...\n", backend->id, hbuf, sbuf);
backend->peer.fd = socket(backend->addr->ai_family, backend->addr->ai_socktype | SOCK_NONBLOCK, backend->addr->ai_protocol);
assert(backend->peer.fd >= 0);
int result = connect(backend->peer.fd, backend->addr->ai_addr, backend->addr->ai_addrlen);
backend_connect_result(backend, result == 0 ? result : errno);
}
static void backend_read(struct peer *peer) {
struct backend *backend = (struct backend *) peer;
if (buf_fill(&backend->buf, backend->peer.fd) <= 0) {
fprintf(stderr, "B %s: Connection closed by backend\n", backend->id);
close(backend->peer.fd);
// TODO: reconnect
return;
}
struct packet packet = {
.backend = backend,
};
while (backend->parser_wrapper(backend, &packet)) {
client_write(&packet);
}
if (backend->buf.length == BUF_LEN_MAX) {
fprintf(stderr, "B %s: Input buffer overrun. This probably means that adsbus doesn't understand the protocol that this source is speaking.\n", backend->id);
close(backend->peer.fd);
// TODO: reconnect
return;
}
}
static void backend_connect_result(struct backend *backend, int result) {
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
assert(getnameinfo(backend->addr->ai_addr, backend->addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0);
switch (result) {
case 0:
fprintf(stderr, "B %s: Connected to %s/%s\n", backend->id, hbuf, sbuf);
freeaddrinfo(backend->addrs);
backend->peer.event_handler = backend_read;
peer_epoll_add((struct peer *) backend, EPOLLIN);
break;
case EINPROGRESS:
backend->peer.event_handler = backend_connect_handler;
peer_epoll_add((struct peer *) backend, EPOLLOUT);
break;
default:
fprintf(stderr, "B %s: Can't connect to %s/%s: %s\n", backend->id, hbuf, sbuf, strerror(result));
close(backend->peer.fd);
backend->addr = backend->addr->ai_next;
// Tail recursion :/
backend_connect_next(backend);
break;
}
}
static void backend_connect(struct backend *backend) {
fprintf(stderr, "B %s: Resolving %s/%s...\n", backend->id, backend->node, backend->service);
struct addrinfo hints = {
.ai_family = AF_UNSPEC,
.ai_socktype = SOCK_STREAM,
};
int gai_err = getaddrinfo(backend->node, backend->service, &hints, &backend->addrs);
if (gai_err) {
fprintf(stderr, "B %s: Failed to resolve %s/%s: %s\n", backend->id, backend->node, backend->service, gai_strerror(gai_err));
return;
}
backend->addr = backend->addrs;
backend_connect_next(backend);
}
void backend_new(const char *node, const char *service) {
struct backend *backend = backend_create();
backend->node = node;
backend->service = service;
backend_connect(backend);
}
void backend_new_fd(int fd) {
struct backend *backend = backend_create();
backend->peer.fd = fd;
backend->peer.event_handler = backend_read;
peer_epoll_add((struct peer *) backend, EPOLLIN);
fprintf(stderr, "B %s: New backend from incoming connection\n", backend->id);
}
void backend_new_fd_wrapper(int fd, void *unused) {
backend_new_fd(fd);
}
void backend_print_usage() {
fprintf(stderr, "\nSupported input formats (autodetected):\n");
for (int i = 0; i < NUM_PARSERS; i++) {
fprintf(stderr, "\t%s\n", parsers[i].name);
}
}

View File

@@ -1,12 +0,0 @@
#pragma once
#include <stdbool.h>
#include "common.h"
#define PARSER_STATE_LEN 256
void backend_new(const char *, const char *);
void backend_new_fd(int);
void backend_new_fd_wrapper(int, void *);
void backend_print_usage();

View File

@@ -4,7 +4,7 @@
#include <stdint.h>
#include "common.h"
#include "backend.h"
#include "receive.h"
#include "beast.h"
struct __attribute__((packed)) beast_common_overlay {

145
client.c
View File

@@ -1,145 +0,0 @@
#include <stdbool.h>
#include <stdlib.h>
#include <assert.h>
#include <strings.h>
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <signal.h>
#include "common.h"
#include "json.h"
#include "stats.h"
#include "client.h"
struct client {
struct peer peer;
char id[UUID_LEN];
struct serializer *serializer;
struct client *prev;
struct client *next;
};
typedef void (*serializer)(struct packet *, struct buf *);
struct serializer {
char *name;
serializer serialize;
struct client *client_head;
} serializers[] = {
{
.name = "json",
.serialize = json_serialize,
},
{
.name = "stats",
.serialize = stats_serialize,
},
};
#define NUM_SERIALIZERS (sizeof(serializers) / sizeof(*serializers))
static void client_hangup(struct client *client) {
fprintf(stderr, "C %s (%s): Client disconnected\n", client->id, client->serializer->name);
if (client->prev) {
client->prev->next = client->next;
} else {
client->serializer->client_head = client->next;
}
if (client->next) {
client->next->prev = client->prev;
}
close(client->peer.fd);
free(client);
}
static void client_hangup_wrapper(struct peer *peer) {
client_hangup((struct client *) peer);
}
static bool client_hello(int fd, struct serializer *serializer) {
struct buf buf = BUF_INIT;
serializer->serialize(NULL, &buf);
if (buf.length == 0) {
return true;
}
if (write(fd, buf_at(&buf, 0), buf.length) != buf.length) {
return false;
}
return true;
}
void client_init() {
signal(SIGPIPE, SIG_IGN);
}
struct serializer *client_get_serializer(char *name) {
for (int i = 0; i < NUM_SERIALIZERS; i++) {
if (strcasecmp(serializers[i].name, name) == 0) {
return &serializers[i];
}
}
return NULL;
}
void client_add(int fd, struct serializer *serializer) {
int flags = fcntl(fd, F_GETFL, 0);
assert(flags >= 0);
flags |= O_NONBLOCK;
assert(fcntl(fd, F_SETFL, flags) == 0);
if (!client_hello(fd, serializer)) {
fprintf(stderr, "C xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx: Failed to write hello to client\n");
return;
}
struct client *client = malloc(sizeof(*client));
assert(client);
client->peer.fd = fd;
client->peer.event_handler = client_hangup_wrapper;
uuid_gen(client->id);
client->serializer = serializer;
client->prev = NULL;
client->next = serializer->client_head;
serializer->client_head = client;
// Only listen for hangup
peer_epoll_add((struct peer *) client, EPOLLIN);
fprintf(stderr, "C %s (%s): New client\n", client->id, serializer->name);
}
void client_add_wrapper(int fd, void *passthrough) {
client_add(fd, (struct serializer *) passthrough);
}
void client_write(struct packet *packet) {
for (int i = 0; i < NUM_SERIALIZERS; i++) {
struct serializer *serializer = &serializers[i];
if (serializer->client_head == NULL) {
continue;
}
struct buf buf = BUF_INIT;
serializer->serialize(packet, &buf);
if (buf.length == 0) {
continue;
}
struct client *client = serializer->client_head;
while (client) {
if (write(client->peer.fd, buf_at(&buf, 0), buf.length) == buf.length) {
client = client->next;
} else {
struct client *next = client->next;
client_hangup(client);
client = next;
}
}
}
}
void client_print_usage() {
fprintf(stderr, "\nSupported output formats:\n");
for (int i = 0; i < NUM_SERIALIZERS; i++) {
fprintf(stderr, "\t%s\n", serializers[i].name);
}
}

View File

@@ -1,10 +0,0 @@
#pragma once
#include "common.h"
void client_init();
struct serializer *client_get_serializer(char *);
void client_add(int, struct serializer *);
void client_add_wrapper(int, void *);
void client_write(struct packet *);
void client_print_usage();

View File

@@ -44,7 +44,6 @@ void buf_consume(struct buf *, size_t);
//////// packet
#define DATA_LEN_MAX 14
struct backend;
struct packet {
enum {
MODE_S_SHORT,
@@ -54,7 +53,6 @@ struct packet {
uint8_t payload[DATA_LEN_MAX];
uint64_t mlat_timestamp;
uint32_t rssi;
struct backend *backend;
};
extern char *packet_type_names[];

6
json.c
View File

@@ -3,8 +3,8 @@
#include <string.h>
#include <jansson.h>
#include "backend.h"
#include "client.h"
#include "receive.h"
#include "send.h"
#include "json.h"
static void json_serialize_to_buf(json_t *obj, struct buf *buf) {
@@ -14,7 +14,7 @@ static void json_serialize_to_buf(json_t *obj, struct buf *buf) {
}
static void json_hello(struct buf *buf) {
json_t *hello = json_pack("{sssIsIsI}",
json_t *hello = json_pack("{sIsIsI}",
"mlat_timestamp_mhz", (json_int_t) MLAT_MHZ,
"mlat_timestamp_max", (json_int_t) MLAT_MAX,
"rssi_max", (json_int_t) RSSI_MAX);

105
outgoing.c Normal file
View File

@@ -0,0 +1,105 @@
#include <stdio.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include "common.h"
#include "outgoing.h"
struct outgoing {
struct peer peer;
char id[UUID_LEN];
const char *node;
const char *service;
struct addrinfo *addrs;
struct addrinfo *addr;
outgoing_connection_handler handler;
void *passthrough;
};
static void outgoing_connect_result(struct outgoing *, int);
static void outgoing_connect_next(struct outgoing *outgoing) {
if (outgoing->addr == NULL) {
freeaddrinfo(outgoing->addrs);
fprintf(stderr, "O %s: Can't connect to any addresses of %s/%s\n", outgoing->id, outgoing->node, outgoing->service);
return;
}
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
assert(getnameinfo(outgoing->addr->ai_addr, outgoing->addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0);
fprintf(stderr, "O %s: Connecting to %s/%s...\n", outgoing->id, hbuf, sbuf);
outgoing->peer.fd = socket(outgoing->addr->ai_family, outgoing->addr->ai_socktype | SOCK_NONBLOCK, outgoing->addr->ai_protocol);
assert(outgoing->peer.fd >= 0);
int result = connect(outgoing->peer.fd, outgoing->addr->ai_addr, outgoing->addr->ai_addrlen);
outgoing_connect_result(outgoing, result == 0 ? result : errno);
}
static void outgoing_connect_handler(struct peer *peer) {
struct outgoing *outgoing = (struct outgoing *) peer;
peer_epoll_del(peer);
int error;
socklen_t len = sizeof(error);
assert(getsockopt(outgoing->peer.fd, SOL_SOCKET, SO_ERROR, &error, &len) == 0);
outgoing_connect_result(outgoing, error);
}
static void outgoing_connect_result(struct outgoing *outgoing, int result) {
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
assert(getnameinfo(outgoing->addr->ai_addr, outgoing->addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0);
switch (result) {
case 0:
fprintf(stderr, "O %s: Connected to %s/%s\n", outgoing->id, hbuf, sbuf);
freeaddrinfo(outgoing->addrs);
outgoing->handler(outgoing->peer.fd, outgoing->passthrough);
break;
case EINPROGRESS:
outgoing->peer.event_handler = outgoing_connect_handler;
peer_epoll_add((struct peer *) outgoing, EPOLLOUT);
break;
default:
fprintf(stderr, "O %s: Can't connect to %s/%s: %s\n", outgoing->id, hbuf, sbuf, strerror(result));
close(outgoing->peer.fd);
outgoing->addr = outgoing->addr->ai_next;
// Tail recursion :/
outgoing_connect_next(outgoing);
break;
}
}
static void outgoing_resolve(struct outgoing *outgoing) {
fprintf(stderr, "O %s: Resolving %s/%s...\n", outgoing->id, outgoing->node, outgoing->service);
struct addrinfo hints = {
.ai_family = AF_UNSPEC,
.ai_socktype = SOCK_STREAM,
};
int gai_err = getaddrinfo(outgoing->node, outgoing->service, &hints, &outgoing->addrs);
if (gai_err) {
fprintf(stderr, "O %s: Failed to resolve %s/%s: %s\n", outgoing->id, outgoing->node, outgoing->service, gai_strerror(gai_err));
return;
}
outgoing->addr = outgoing->addrs;
outgoing_connect_next(outgoing);
}
void outgoing_new(const char *node, const char *service, outgoing_connection_handler handler, void *passthrough) {
struct outgoing *outgoing = malloc(sizeof(*outgoing));
uuid_gen(outgoing->id);
outgoing->node = node;
outgoing->service = service;
outgoing->handler = handler;
outgoing->passthrough = passthrough;
outgoing_resolve(outgoing);
}

4
outgoing.h Normal file
View File

@@ -0,0 +1,4 @@
#pragma once
typedef void (*outgoing_connection_handler)(int fd, void *);
void outgoing_new(const char *, const char *, outgoing_connection_handler, void *);

106
receive.c Normal file
View File

@@ -0,0 +1,106 @@
#include <stdio.h>
#include <assert.h>
#include <string.h>
#include <unistd.h>
#include "airspy_adsb.h"
#include "beast.h"
#include "raw.h"
#include "send.h"
#include "receive.h"
struct receive;
typedef bool (*parser_wrapper)(struct receive *, struct packet *);
typedef bool (*parser)(struct buf *, struct packet *, void *state);
struct receive {
struct peer peer;
char id[UUID_LEN];
struct buf buf;
char parser_state[PARSER_STATE_LEN];
parser_wrapper parser_wrapper;
parser parser;
};
struct parser {
char *name;
parser parse;
} parsers[] = {
{
.name = "airspy_adsb",
.parse = airspy_adsb_parse,
},
{
.name = "beast",
.parse = beast_parse,
},
{
.name = "raw",
.parse = raw_parse,
},
};
#define NUM_PARSERS (sizeof(parsers) / sizeof(*parsers))
static bool receive_parse_wrapper(struct receive *receive, struct packet *packet) {
return receive->parser(&receive->buf, packet, receive->parser_state);
}
static bool receive_autodetect_parse(struct receive *receive, struct packet *packet) {
struct buf *buf = &receive->buf;
void *state = receive->parser_state;
for (int i = 0; i < NUM_PARSERS; i++) {
if (parsers[i].parse(buf, packet, state)) {
fprintf(stderr, "R %s: Detected input format %s\n", receive->id, parsers[i].name);
receive->parser_wrapper = receive_parse_wrapper;
receive->parser = parsers[i].parse;
return true;
}
}
return false;
}
static void receive_read(struct peer *peer) {
struct receive *receive = (struct receive *) peer;
if (buf_fill(&receive->buf, receive->peer.fd) <= 0) {
fprintf(stderr, "R %s: Connection closed by peer\n", receive->id);
close(receive->peer.fd);
// TODO: reconnect
return;
}
struct packet packet = { 0 };
while (receive->parser_wrapper(receive, &packet)) {
send_write(&packet);
}
if (receive->buf.length == BUF_LEN_MAX) {
fprintf(stderr, "R %s: Input buffer overrun. This probably means that adsbus doesn't understand the protocol that this source is speaking.\n", receive->id);
close(receive->peer.fd);
// TODO: reconnect
return;
}
}
void receive_new(int fd, void *unused) {
struct receive *receive = malloc(sizeof(*receive));
assert(receive);
uuid_gen(receive->id);
receive->peer.fd = fd;
buf_init(&receive->buf);
memset(receive->parser_state, 0, PARSER_STATE_LEN);
receive->parser_wrapper = receive_autodetect_parse;
receive->peer.event_handler = receive_read;
peer_epoll_add((struct peer *) receive, EPOLLIN);
fprintf(stderr, "R %s: New connection\n", receive->id);
}
void receive_print_usage() {
fprintf(stderr, "\nSupported receive formats (autodetected):\n");
for (int i = 0; i < NUM_PARSERS; i++) {
fprintf(stderr, "\t%s\n", parsers[i].name);
}
}

10
receive.h Normal file
View File

@@ -0,0 +1,10 @@
#pragma once
#include <stdbool.h>
#include "common.h"
#define PARSER_STATE_LEN 256
void receive_new(int, void *);
void receive_print_usage();

145
send.c Normal file
View File

@@ -0,0 +1,145 @@
#include <stdbool.h>
#include <stdlib.h>
#include <assert.h>
#include <strings.h>
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <signal.h>
#include "common.h"
#include "json.h"
#include "stats.h"
#include "send.h"
struct send {
struct peer peer;
char id[UUID_LEN];
struct serializer *serializer;
struct send *prev;
struct send *next;
};
typedef void (*serializer)(struct packet *, struct buf *);
struct serializer {
char *name;
serializer serialize;
struct send *send_head;
} serializers[] = {
{
.name = "json",
.serialize = json_serialize,
},
{
.name = "stats",
.serialize = stats_serialize,
},
};
#define NUM_SERIALIZERS (sizeof(serializers) / sizeof(*serializers))
static void send_hangup(struct send *send) {
fprintf(stderr, "S %s (%s): Peer disconnected\n", send->id, send->serializer->name);
if (send->prev) {
send->prev->next = send->next;
} else {
send->serializer->send_head = send->next;
}
if (send->next) {
send->next->prev = send->prev;
}
close(send->peer.fd);
free(send);
}
static void send_hangup_wrapper(struct peer *peer) {
send_hangup((struct send *) peer);
}
static bool send_hello(int fd, struct serializer *serializer) {
struct buf buf = BUF_INIT;
serializer->serialize(NULL, &buf);
if (buf.length == 0) {
return true;
}
if (write(fd, buf_at(&buf, 0), buf.length) != buf.length) {
return false;
}
return true;
}
void send_init() {
signal(SIGPIPE, SIG_IGN);
}
struct serializer *send_get_serializer(char *name) {
for (int i = 0; i < NUM_SERIALIZERS; i++) {
if (strcasecmp(serializers[i].name, name) == 0) {
return &serializers[i];
}
}
return NULL;
}
void send_add(int fd, struct serializer *serializer) {
int flags = fcntl(fd, F_GETFL, 0);
assert(flags >= 0);
flags |= O_NONBLOCK;
assert(fcntl(fd, F_SETFL, flags) == 0);
if (!send_hello(fd, serializer)) {
fprintf(stderr, "S xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx: Failed to write hello\n");
return;
}
struct send *send = malloc(sizeof(*send));
assert(send);
send->peer.fd = fd;
send->peer.event_handler = send_hangup_wrapper;
uuid_gen(send->id);
send->serializer = serializer;
send->prev = NULL;
send->next = serializer->send_head;
serializer->send_head = send;
// Only listen for hangup
peer_epoll_add((struct peer *) send, EPOLLIN);
fprintf(stderr, "S %s (%s): New connection\n", send->id, serializer->name);
}
void send_add_wrapper(int fd, void *passthrough) {
send_add(fd, (struct serializer *) passthrough);
}
void send_write(struct packet *packet) {
for (int i = 0; i < NUM_SERIALIZERS; i++) {
struct serializer *serializer = &serializers[i];
if (serializer->send_head == NULL) {
continue;
}
struct buf buf = BUF_INIT;
serializer->serialize(packet, &buf);
if (buf.length == 0) {
continue;
}
struct send *send = serializer->send_head;
while (send) {
if (write(send->peer.fd, buf_at(&buf, 0), buf.length) == buf.length) {
send = send->next;
} else {
struct send *next = send->next;
send_hangup(send);
send = next;
}
}
}
}
void send_print_usage() {
fprintf(stderr, "\nSupported send formats:\n");
for (int i = 0; i < NUM_SERIALIZERS; i++) {
fprintf(stderr, "\t%s\n", serializers[i].name);
}
}

10
send.h Normal file
View File

@@ -0,0 +1,10 @@
#pragma once
#include "common.h"
void send_init();
struct serializer *send_get_serializer(char *);
void send_add(int, struct serializer *);
void send_add_wrapper(int, void *);
void send_write(struct packet *);
void send_print_usage();