Files

216 lines
4.8 KiB
C
Raw Permalink Normal View History

#include <assert.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <strings.h>
#include <sys/socket.h>
2016-03-03 18:28:34 -08:00
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
2016-02-21 16:52:36 -08:00
#include "airspy_adsb.h"
2016-02-17 18:01:39 -08:00
#include "beast.h"
2016-02-22 16:41:34 -08:00
#include "buf.h"
#include "flow.h"
#include "json.h"
2016-03-05 22:54:26 -08:00
#include "log.h"
2016-03-08 12:22:46 -08:00
#include "opts.h"
#include "packet.h"
2016-02-22 16:49:43 -08:00
#include "peer.h"
2016-02-23 16:13:11 -08:00
#include "proto.h"
2016-02-17 20:14:44 -08:00
#include "raw.h"
2016-02-28 22:21:07 -08:00
#include "socket.h"
#include "stats.h"
#include "uuid.h"
#include "send.h"
struct send {
struct peer peer;
2016-03-03 18:28:34 -08:00
struct stat stat;
struct peer *on_close;
uint8_t id[UUID_LEN];
struct serializer *serializer;
2016-02-26 10:30:18 -08:00
struct list_head send_list;
};
static void send_new(int, void *, struct peer *);
static struct flow _send_flow = {
.name = "send",
.socket_ready = socket_ready_send,
.socket_connected = socket_connected_send,
.new = send_new,
.get_hello = send_get_hello,
.ref_count = &peer_count_out,
};
struct flow *send_flow = &_send_flow;
2016-03-07 17:02:24 -08:00
static char log_module = 'S';
2016-02-25 21:24:10 -08:00
typedef void (*serialize)(struct packet *, struct buf *);
2016-02-27 16:23:26 -08:00
typedef void (*hello)(struct buf **);
static struct serializer {
char *name;
2016-02-25 21:24:10 -08:00
serialize serialize;
2016-02-27 16:23:26 -08:00
hello hello;
2016-02-26 10:30:18 -08:00
struct list_head send_head;
} serializers[] = {
2016-02-21 16:52:36 -08:00
{
.name = "airspy_adsb",
.serialize = airspy_adsb_serialize,
2016-02-27 16:23:26 -08:00
.hello = NULL,
2016-02-21 16:52:36 -08:00
},
2016-02-17 18:01:39 -08:00
{
.name = "beast",
.serialize = beast_serialize,
2016-02-27 16:23:26 -08:00
.hello = NULL,
2016-02-17 18:01:39 -08:00
},
{
.name = "json",
.serialize = json_serialize,
2016-02-27 16:23:26 -08:00
.hello = json_hello,
},
2016-02-23 16:13:11 -08:00
{
.name = "proto",
.serialize = proto_serialize,
2016-02-27 16:23:26 -08:00
.hello = proto_hello,
2016-02-23 16:13:11 -08:00
},
2016-02-17 20:14:44 -08:00
{
.name = "raw",
.serialize = raw_serialize,
2016-02-27 16:23:26 -08:00
.hello = NULL,
2016-02-17 20:14:44 -08:00
},
{
.name = "stats",
.serialize = stats_serialize,
2016-02-27 16:23:26 -08:00
.hello = NULL,
},
};
#define NUM_SERIALIZERS (sizeof(serializers) / sizeof(*serializers))
static void send_del(struct send *send) {
2016-03-07 17:02:24 -08:00
LOG(send->id, "Connection closed");
2016-02-25 16:17:25 -08:00
peer_count_out--;
2016-03-08 20:41:00 -08:00
peer_close(&send->peer);
2016-02-26 10:30:18 -08:00
list_del(&send->send_list);
peer_call(send->on_close);
free(send);
}
static void send_del_wrapper(struct peer *peer) {
send_del(container_of(peer, struct send, peer));
}
static void send_new(int fd, void *passthrough, struct peer *on_close) {
struct serializer *serializer = (struct serializer *) passthrough;
peer_count_out++;
struct send *send = malloc(sizeof(*send));
assert(send);
send->peer.fd = fd;
send->peer.event_handler = send_del_wrapper;
send->on_close = on_close;
uuid_gen(send->id);
send->serializer = serializer;
2016-03-03 18:28:34 -08:00
assert(!fstat(fd, &send->stat));
list_add(&send->send_list, &serializer->send_head);
2016-03-08 20:41:00 -08:00
peer_epoll_add(&send->peer, 0);
2016-03-07 17:02:24 -08:00
LOG(send->id, "New send connection: %s", serializer->name);
}
2016-03-08 12:22:46 -08:00
static struct serializer *send_parse_format(const char **arg) {
char *format = opts_split(arg, '=');
if (!format) {
return NULL;
}
struct serializer *serializer = send_get_serializer(format);
free(format);
if (!serializer) {
return NULL;
}
return serializer;
}
void send_init() {
assert(signal(SIGPIPE, SIG_IGN) != SIG_ERR);
2016-02-26 10:30:18 -08:00
for (size_t i = 0; i < NUM_SERIALIZERS; i++) {
list_head_init(&serializers[i].send_head);
}
}
void send_cleanup() {
for (size_t i = 0; i < NUM_SERIALIZERS; i++) {
2016-02-26 10:30:18 -08:00
struct send *iter, *next;
list_for_each_entry_safe(iter, next, &serializers[i].send_head, send_list) {
2016-02-26 10:30:18 -08:00
send_del(iter);
}
}
}
2016-03-07 11:26:25 -08:00
void *send_get_serializer(const char *name) {
for (size_t i = 0; i < NUM_SERIALIZERS; i++) {
if (strcasecmp(serializers[i].name, name) == 0) {
return &serializers[i];
}
}
return NULL;
}
void send_get_hello(struct buf **buf_pp, void *passthrough) {
struct serializer *serializer = (struct serializer *) passthrough;
if (serializer->hello) {
serializer->hello(buf_pp);
}
}
void send_write(struct packet *packet) {
packet_sanity_check(packet);
for (size_t i = 0; i < NUM_SERIALIZERS; i++) {
struct serializer *serializer = &serializers[i];
2016-02-26 10:30:18 -08:00
if (list_is_empty(&serializer->send_head)) {
continue;
}
struct buf buf = BUF_INIT;
serializer->serialize(packet, &buf);
if (buf.length == 0) {
continue;
}
2016-02-26 10:30:18 -08:00
struct send *iter, *next;
list_for_each_entry_safe(iter, next, &serializer->send_head, send_list) {
2016-03-03 18:28:34 -08:00
if (iter->stat.st_dev == packet->input_stat->st_dev &&
iter->stat.st_ino == packet->input_stat->st_ino) {
// Same socket that this packet came from
continue;
}
2016-02-26 10:30:18 -08:00
if (write(iter->peer.fd, buf_at(&buf, 0), buf.length) != (ssize_t) buf.length) {
// peer_loop() will see this shutdown and call send_del
// Ignore error
shutdown(iter->peer.fd, SHUT_WR);
}
}
}
}
void send_print_usage() {
fprintf(stderr, "\nSupported send formats:\n");
for (size_t i = 0; i < NUM_SERIALIZERS; i++) {
fprintf(stderr, "\t%s\n", serializers[i].name);
}
}
2016-03-08 12:22:46 -08:00
bool send_add(bool (*next)(const char *, struct flow *, void *), struct flow *flow, const char *arg) {
struct serializer *serializer = send_parse_format(&arg);
if (!serializer) {
return false;
}
return next(arg, flow, serializer);
}