Support incoming connection backends (push to adsbus).
This commit is contained in:
4
Makefile
4
Makefile
@@ -11,5 +11,5 @@ clean:
|
||||
%.o: %.c *.h
|
||||
$(CC) -c $(CFLAGS) $< -o $@
|
||||
|
||||
adsbus: adsbus.o backend.o client.o airspy_adsb.o beast.o json.o raw.o stats.o common.o
|
||||
$(CC) $(LDFLAGS) -o adsbus adsbus.o backend.o client.o airspy_adsb.o beast.o json.o raw.o stats.o common.o $(LIBS)
|
||||
adsbus: adsbus.o backend.o client.o incoming.o airspy_adsb.o beast.o json.o raw.o stats.o common.o
|
||||
$(CC) $(LDFLAGS) -o adsbus adsbus.o backend.o client.o incoming.o airspy_adsb.o beast.o json.o raw.o stats.o common.o $(LIBS)
|
||||
|
||||
21
adsbus.c
21
adsbus.c
@@ -8,6 +8,8 @@
|
||||
#include "common.h"
|
||||
#include "backend.h"
|
||||
#include "client.h"
|
||||
#include "incoming.h"
|
||||
|
||||
#include "airspy_adsb.h"
|
||||
#include "beast.h"
|
||||
#include "json.h"
|
||||
@@ -32,14 +34,16 @@ static void print_usage(char *argv[]) {
|
||||
"\t--help\n"
|
||||
"\t--backend=HOST/PORT\n"
|
||||
"\t--dump=FORMAT\n"
|
||||
"\t--incoming=[HOST/]PORT\n"
|
||||
, argv[0]);
|
||||
}
|
||||
|
||||
static bool parse_opts(int argc, char *argv[], int epoll_fd) {
|
||||
static struct option long_options[] = {
|
||||
{"backend", required_argument, 0, 'b'},
|
||||
{"dump", required_argument, 0, 'd'},
|
||||
{"help", no_argument, 0, 'h'},
|
||||
{"backend", required_argument, 0, 'b'},
|
||||
{"dump", required_argument, 0, 'd'},
|
||||
{"incoming", required_argument, 0, 'i'},
|
||||
{"help", no_argument, 0, 'h'},
|
||||
};
|
||||
|
||||
int opt;
|
||||
@@ -69,6 +73,17 @@ static bool parse_opts(int argc, char *argv[], int epoll_fd) {
|
||||
print_usage(argv);
|
||||
return false;
|
||||
|
||||
case 'i':
|
||||
delim = strrchr(optarg, '/');
|
||||
if (delim == NULL) {
|
||||
incoming_new(NULL, optarg, epoll_fd, backend_new_fd);
|
||||
} else {
|
||||
*delim = '\0';
|
||||
delim++;
|
||||
incoming_new(optarg, delim, epoll_fd, backend_new_fd);
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
print_usage(argv);
|
||||
return false;
|
||||
|
||||
30
backend.c
30
backend.c
@@ -43,19 +43,35 @@ struct parser {
|
||||
#define NUM_PARSERS (sizeof(parsers) / sizeof(*parsers))
|
||||
|
||||
|
||||
void backend_new(char *node, char *service, int epoll_fd) {
|
||||
static struct backend *backend_create() {
|
||||
struct backend *backend = malloc(sizeof(*backend));
|
||||
assert(backend);
|
||||
backend->peer.fd = -1;
|
||||
uuid_gen(backend->id);
|
||||
backend->node = node;
|
||||
backend->service = service;
|
||||
backend->peer.fd = -1;
|
||||
backend->node = NULL;
|
||||
backend->service = NULL;
|
||||
buf_init(&backend->buf);
|
||||
memset(backend->parser_state, 0, PARSER_STATE_LEN);
|
||||
backend->parser = backend_autodetect_parse;
|
||||
return backend;
|
||||
}
|
||||
|
||||
void backend_new(char *node, char *service, int epoll_fd) {
|
||||
struct backend *backend = backend_create();
|
||||
backend->node = node;
|
||||
backend->service = service;
|
||||
backend_connect(backend, epoll_fd);
|
||||
}
|
||||
|
||||
void backend_new_fd(int fd, int epoll_fd) {
|
||||
struct backend *backend = backend_create();
|
||||
backend->peer.fd = fd;
|
||||
backend->peer.event_handler = backend_read;
|
||||
peer_epoll_add((struct peer *) backend, epoll_fd, EPOLLIN);
|
||||
|
||||
fprintf(stderr, "B %s: New backend from incoming connection\n", backend->id);
|
||||
}
|
||||
|
||||
static void backend_connect(struct backend *backend, int epoll_fd) {
|
||||
fprintf(stderr, "B %s: Resolving %s/%s...\n", backend->id, backend->node, backend->service);
|
||||
|
||||
@@ -66,7 +82,7 @@ static void backend_connect(struct backend *backend, int epoll_fd) {
|
||||
|
||||
int gai_err = getaddrinfo(backend->node, backend->service, &hints, &backend->addrs);
|
||||
if (gai_err) {
|
||||
fprintf(stderr, "B %s: getaddrinfo(%s/%s): %s\n", backend->id, backend->node, backend->service, gai_strerror(gai_err));
|
||||
fprintf(stderr, "B %s: Failed to resolve %s/%s: %s\n", backend->id, backend->node, backend->service, gai_strerror(gai_err));
|
||||
return;
|
||||
}
|
||||
backend->addr = backend->addrs;
|
||||
@@ -131,10 +147,9 @@ static void backend_connect_handler(struct peer *peer, int epoll_fd) {
|
||||
static void backend_read(struct peer *peer, int epoll_fd) {
|
||||
struct backend *backend = (struct backend *) peer;
|
||||
|
||||
if (buf_fill(&backend->buf, backend->peer.fd) < 0) {
|
||||
if (buf_fill(&backend->buf, backend->peer.fd) <= 0) {
|
||||
fprintf(stderr, "B %s: Connection closed by backend\n", backend->id);
|
||||
close(backend->peer.fd);
|
||||
peer_epoll_del(peer, epoll_fd);
|
||||
// TODO: reconnect
|
||||
return;
|
||||
}
|
||||
@@ -149,7 +164,6 @@ static void backend_read(struct peer *peer, int epoll_fd) {
|
||||
if (backend->buf.length == BUF_LEN_MAX) {
|
||||
fprintf(stderr, "B %s: Input buffer overrun. This probably means that adsbus doesn't understand the protocol that this source is speaking.\n", backend->id);
|
||||
close(backend->peer.fd);
|
||||
peer_epoll_del(peer, epoll_fd);
|
||||
// TODO: reconnect
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -21,4 +21,5 @@ struct backend {
|
||||
parser parser;
|
||||
};
|
||||
|
||||
void backend_new(char *node, char *service, int epoll_fd);
|
||||
void backend_new(char *, char *, int);
|
||||
void backend_new_fd(int, int);
|
||||
|
||||
2
common.c
2
common.c
@@ -38,7 +38,7 @@ ssize_t buf_fill(struct buf *buf, int fd) {
|
||||
|
||||
size_t space = BUF_LEN_MAX - buf->length - buf->start;
|
||||
ssize_t in = read(fd, buf_at(buf, buf->length), space);
|
||||
if (in < 0) {
|
||||
if (in <= 0) {
|
||||
return in;
|
||||
}
|
||||
buf->length += in;
|
||||
|
||||
105
incoming.c
Normal file
105
incoming.c
Normal file
@@ -0,0 +1,105 @@
|
||||
#include <assert.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/socket.h>
|
||||
#include <netdb.h>
|
||||
#include <string.h>
|
||||
#include <errno.h>
|
||||
#include <sys/epoll.h>
|
||||
|
||||
#include "common.h"
|
||||
#include "incoming.h"
|
||||
|
||||
|
||||
struct incoming {
|
||||
struct peer peer;
|
||||
char id[UUID_LEN];
|
||||
char *node;
|
||||
char *service;
|
||||
incoming_connection_handler handler;
|
||||
};
|
||||
|
||||
|
||||
static void incoming_handler(struct peer *peer, int epoll_fd) {
|
||||
struct incoming *incoming = (struct incoming *) peer;
|
||||
|
||||
struct sockaddr peer_addr, local_addr;
|
||||
socklen_t peer_addrlen = sizeof(peer_addr), local_addrlen = sizeof(local_addr);
|
||||
|
||||
int fd = accept(incoming->peer.fd, &peer_addr, &peer_addrlen);
|
||||
if (fd == -1) {
|
||||
fprintf(stderr, "I %s: Failed to accept new connection on %s/%s: %s\n", incoming->id, incoming->node, incoming->service, strerror(errno));
|
||||
return;
|
||||
}
|
||||
|
||||
char peer_hbuf[NI_MAXHOST], local_hbuf[NI_MAXHOST], peer_sbuf[NI_MAXSERV], local_sbuf[NI_MAXSERV];
|
||||
assert(getsockname(fd, &local_addr, &local_addrlen) == 0);
|
||||
assert(getnameinfo(&peer_addr, peer_addrlen, peer_hbuf, sizeof(peer_hbuf), peer_sbuf, sizeof(peer_sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0);
|
||||
assert(getnameinfo(&local_addr, local_addrlen, local_hbuf, sizeof(local_hbuf), local_sbuf, sizeof(local_sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0);
|
||||
|
||||
fprintf(stderr, "I %s: New connection on %s/%s (%s/%s) from %s/%s\n",
|
||||
incoming->id,
|
||||
incoming->node, incoming->service,
|
||||
local_hbuf, local_sbuf,
|
||||
peer_hbuf, peer_sbuf);
|
||||
|
||||
incoming->handler(fd, epoll_fd);
|
||||
}
|
||||
|
||||
void incoming_new(char *node, char *service, int epoll_fd, incoming_connection_handler handler) {
|
||||
struct incoming *incoming = malloc(sizeof(*incoming));
|
||||
incoming->peer.event_handler = incoming_handler;
|
||||
uuid_gen(incoming->id);
|
||||
incoming->node = node;
|
||||
incoming->service = service;
|
||||
incoming->handler = handler;
|
||||
|
||||
fprintf(stderr, "I %s: Resolving %s/%s...\n", incoming->id, incoming->node, incoming->service);
|
||||
|
||||
struct addrinfo hints = {
|
||||
.ai_family = AF_UNSPEC,
|
||||
.ai_socktype = SOCK_STREAM,
|
||||
.ai_flags = AI_PASSIVE | AI_V4MAPPED | AI_ADDRCONFIG,
|
||||
};
|
||||
|
||||
struct addrinfo *addrs;
|
||||
int gai_err = getaddrinfo(incoming->node, incoming->service, &hints, &addrs);
|
||||
if (gai_err) {
|
||||
fprintf(stderr, "I %s: Failed to resolve %s/%s: %s\n", incoming->id, incoming->node, incoming->service, gai_strerror(gai_err));
|
||||
free(incoming);
|
||||
return;
|
||||
}
|
||||
|
||||
struct addrinfo *addr;
|
||||
for (addr = addrs; addr; addr = addr->ai_next) {
|
||||
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
|
||||
assert(getnameinfo(addr->ai_addr, addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0);
|
||||
fprintf(stderr, "I %s: Listening on %s/%s...\n", incoming->id, hbuf, sbuf);
|
||||
|
||||
incoming->peer.fd = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
|
||||
assert(incoming->peer.fd >= 0);
|
||||
|
||||
int optval = 1;
|
||||
setsockopt(incoming->peer.fd, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval));
|
||||
|
||||
if (bind(incoming->peer.fd, addr->ai_addr, addr->ai_addrlen) != 0) {
|
||||
fprintf(stderr, "I %s: Failed to bind to %s/%s: %s\n", incoming->id, hbuf, sbuf, strerror(errno));
|
||||
close(incoming->peer.fd);
|
||||
continue;
|
||||
}
|
||||
|
||||
assert(listen(incoming->peer.fd, 255) == 0);
|
||||
break;
|
||||
}
|
||||
|
||||
freeaddrinfo(addrs);
|
||||
|
||||
if (addr == NULL) {
|
||||
fprintf(stderr, "I %s: Failed to bind any addresses for %s/%s...\n", incoming->id, incoming->node, incoming->service);
|
||||
free(incoming);
|
||||
return;
|
||||
}
|
||||
|
||||
peer_epoll_add((struct peer *) incoming, epoll_fd, EPOLLIN);
|
||||
}
|
||||
4
incoming.h
Normal file
4
incoming.h
Normal file
@@ -0,0 +1,4 @@
|
||||
#pragma once
|
||||
|
||||
typedef void (*incoming_connection_handler)(int fd, int epoll_fd);
|
||||
void incoming_new(char *, char *, int, incoming_connection_handler);
|
||||
Reference in New Issue
Block a user