Add send-receive channels

This commit is contained in:
Ian Gulliver
2016-03-03 18:28:34 -08:00
parent 77b9decbcd
commit 939b3b1851
9 changed files with 196 additions and 59 deletions

View File

@@ -11,7 +11,7 @@ VALGRIND_FLAGS ?= --error-exitcode=1 --trace-children=yes --track-fds=yes --show
ADSBUS_TEST_FLAGS ?= --stdin --stdout=airspy_adsb --stdout=beast --stdout=json --stdout=proto --stdout=raw --stdout=stats
OBJ_TRANSPORT = exec.o file.o incoming.o outgoing.o
OBJ_FLOW = flow.o receive.o send.o
OBJ_FLOW = flow.o receive.o send.o send_receive.o
OBJ_PROTOCOL = airspy_adsb.o beast.o json.o proto.o raw.o stats.o
OBJ_UTIL = asyncaddrinfo.o buf.o hex.o list.o opts.o packet.o peer.o rand.o resolve.o server.o socket.o uuid.o wakeup.o
OBJ_PROTO = adsb.pb-c.o

View File

@@ -19,6 +19,7 @@
#include "receive.h"
#include "resolve.h"
#include "send.h"
#include "send_receive.h"
#include "server.h"
#include "stats.h"
#include "wakeup.h"
@@ -32,13 +33,18 @@ static void print_usage(const char *name) {
"\t--help\n"
"\t--connect-receive=HOST/PORT\n"
"\t--connect-send=FORMAT=HOST/PORT\n"
"\t--connect-send-receive=FORMAT=HOST/PORT\n"
"\t--listen-receive=[HOST/]PORT\n"
"\t--listen-send=FORMAT=[HOST/]PORT\n"
"\t--listen-send-receive=FORMAT=[HOST/]PORT\n"
"\t--file-read=PATH\n"
"\t--file-write=FORMAT=PATH\n"
"\t--file-write-read=FORMAT=PATH\n"
"\t--file-append=FORMAT=PATH\n"
"\t--file-append-read=FORMAT=PATH\n"
"\t--exec-receive=COMMAND\n"
"\t--exec-send=FORMAT=COMMAND\n"
"\t--exec-send-receive=FORMAT=COMMAND\n"
"\t--stdin\n"
"\t--stdout=FORMAT\n"
, name);
@@ -48,19 +54,24 @@ static void print_usage(const char *name) {
static bool parse_opts(int argc, char *argv[]) {
static struct option long_options[] = {
{"connect-receive", required_argument, 0, 'c'},
{"connect-send", required_argument, 0, 's'},
{"listen-receive", required_argument, 0, 'l'},
{"listen-send", required_argument, 0, 'm'},
{"file-read", required_argument, 0, 'r'},
{"file-write", required_argument, 0, 'w'},
{"file-append", required_argument, 0, 'a'},
{"exec-receive", required_argument, 0, 'e'},
{"exec-send", required_argument, 0, 'f'},
{"stdin", no_argument, 0, 'i'},
{"stdout", required_argument, 0, 'o'},
{"help", no_argument, 0, 'h'},
{0, 0, 0, 0 },
{"connect-receive", required_argument, 0, 'c'},
{"connect-send", required_argument, 0, 's'},
{"connect-send-receive", required_argument, 0, 't'},
{"listen-receive", required_argument, 0, 'l'},
{"listen-send", required_argument, 0, 'm'},
{"listen-send-receive", required_argument, 0, 'n'},
{"file-read", required_argument, 0, 'r'},
{"file-write", required_argument, 0, 'w'},
{"file-write-read", required_argument, 0, 'x'},
{"file-append", required_argument, 0, 'a'},
{"file-append-read", required_argument, 0, 'b'},
{"exec-receive", required_argument, 0, 'e'},
{"exec-send", required_argument, 0, 'f'},
{"exec-send-receive", required_argument, 0, 'g'},
{"stdin", no_argument, 0, 'i'},
{"stdout", required_argument, 0, 'o'},
{"help", no_argument, 0, 'h'},
{0, 0, 0, 0 },
};
int opt;
@@ -75,6 +86,10 @@ static bool parse_opts(int argc, char *argv[]) {
handler = opts_add_connect_send;
break;
case 't':
handler = opts_add_connect_send_receive;
break;
case 'l':
handler = opts_add_listen_receive;
break;
@@ -83,6 +98,10 @@ static bool parse_opts(int argc, char *argv[]) {
handler = opts_add_listen_send;
break;
case 'n':
handler = opts_add_listen_send_receive;
break;
case 'r':
handler = opts_add_file_read;
break;
@@ -91,10 +110,18 @@ static bool parse_opts(int argc, char *argv[]) {
handler = opts_add_file_write;
break;
case 'x':
handler = opts_add_file_write_read;
break;
case 'a':
handler = opts_add_file_append;
break;
case 'b':
handler = opts_add_file_append_read;
break;
case 'e':
handler = opts_add_exec_receive;
break;
@@ -103,6 +130,10 @@ static bool parse_opts(int argc, char *argv[]) {
handler = opts_add_exec_send;
break;
case 'g':
handler = opts_add_exec_send_receive;
break;
case 'i':
handler = opts_add_stdin;
break;
@@ -163,6 +194,7 @@ int main(int argc, char *argv[]) {
receive_cleanup();
send_cleanup();
send_receive_cleanup();
incoming_cleanup();
outgoing_cleanup();
exec_cleanup();

View File

@@ -13,6 +13,7 @@
#include "outgoing.h"
#include "receive.h"
#include "send.h"
#include "send_receive.h"
#include "opts.h"
@@ -26,7 +27,7 @@ static char *opts_split(char **arg, char delim) {
return ret;
}
static void opts_add_listen(char *host_port, struct flow *flow, void *passthrough) {
static bool opts_add_listen(char *host_port, struct flow *flow, void *passthrough) {
char *host = opts_split(&host_port, '/');
if (host) {
incoming_new(host, host_port, flow, passthrough);
@@ -34,6 +35,33 @@ static void opts_add_listen(char *host_port, struct flow *flow, void *passthroug
} else {
incoming_new(NULL, host_port, flow, passthrough);
}
return true;
}
static bool opts_add_connect(char *host_port, struct flow *flow, void *passthrough) {
char *host = opts_split(&host_port, '/');
if (!host) {
return false;
}
outgoing_new(host, host_port, flow, passthrough);
free(host);
return true;
}
static bool opts_add_file_write_int(char *path, struct flow *flow, void *passthrough) {
file_write_new(path, flow, passthrough);
return true;
}
static bool opts_add_file_append_int(char *path, struct flow *flow, void *passthrough) {
file_append_new(path, flow, passthrough);
return true;
}
static bool opts_add_exec(char *cmd, struct flow *flow, void *passthrough) {
exec_new(cmd, flow, passthrough);
return true;
}
static struct serializer *opts_get_serializer(char **arg) {
@@ -51,46 +79,36 @@ static struct serializer *opts_get_serializer(char **arg) {
return serializer;
}
bool opts_add_connect_receive(char *arg) {
char *host = opts_split(&arg, '/');
if (!host) {
static bool opts_add_send(bool (*next)(char *, struct flow *, void *), struct flow *flow, char *arg) {
struct serializer *serializer = opts_get_serializer(&arg);
if (!serializer) {
return false;
}
return next(arg, flow, serializer);
}
outgoing_new(host, arg, receive_flow, NULL);
free(host);
return true;
bool opts_add_connect_receive(char *arg) {
return opts_add_connect(arg, receive_flow, NULL);
}
bool opts_add_connect_send(char *arg) {
struct serializer *serializer = opts_get_serializer(&arg);
if (!serializer) {
return false;
}
return opts_add_send(opts_add_connect, send_flow, arg);
}
char *host = opts_split(&arg, '/');
if (!host) {
return false;
}
outgoing_new(host, arg, send_flow, serializer);
free(host);
return true;
bool opts_add_connect_send_receive(char *arg) {
return opts_add_send(opts_add_connect, send_receive_flow, arg);
}
bool opts_add_listen_receive(char *arg) {
opts_add_listen(arg, receive_flow, NULL);
return true;
return opts_add_listen(arg, receive_flow, NULL);
}
bool opts_add_listen_send(char *arg) {
struct serializer *serializer = opts_get_serializer(&arg);
if (!serializer) {
return false;
}
return opts_add_send(opts_add_listen, send_flow, arg);
}
opts_add_listen(arg, send_flow, serializer);
return true;
bool opts_add_listen_send_receive(char *arg) {
return opts_add_send(opts_add_listen, send_receive_flow, arg);
}
bool opts_add_file_read(char *arg) {
@@ -99,23 +117,19 @@ bool opts_add_file_read(char *arg) {
}
bool opts_add_file_write(char *arg) {
struct serializer *serializer = opts_get_serializer(&arg);
if (!serializer) {
return false;
}
return opts_add_send(opts_add_file_write_int, send_flow, arg);
}
file_write_new(arg, send_flow, serializer);
return true;
bool opts_add_file_write_read(char *arg) {
return opts_add_send(opts_add_file_write_int, send_receive_flow, arg);
}
bool opts_add_file_append(char *arg) {
struct serializer *serializer = opts_get_serializer(&arg);
if (!serializer) {
return false;
}
return opts_add_send(opts_add_file_append_int, send_flow, arg);
}
file_append_new(arg, send_flow, serializer);
return true;
bool opts_add_file_append_read(char *arg) {
return opts_add_send(opts_add_file_append_int, send_receive_flow, arg);
}
bool opts_add_exec_receive(char *arg) {
@@ -124,13 +138,11 @@ bool opts_add_exec_receive(char *arg) {
}
bool opts_add_exec_send(char *arg) {
struct serializer *serializer = opts_get_serializer(&arg);
if (!serializer) {
return false;
}
return opts_add_send(opts_add_exec, send_flow, arg);
}
exec_new(arg, send_flow, serializer);
return true;
bool opts_add_exec_send_receive(char *arg) {
return opts_add_send(opts_add_exec, send_receive_flow, arg);
}
bool opts_add_stdin(char __attribute__((unused)) *arg) {

View File

@@ -4,12 +4,17 @@
bool opts_add_connect_receive(char *);
bool opts_add_connect_send(char *);
bool opts_add_connect_send_receive(char *);
bool opts_add_listen_receive(char *);
bool opts_add_listen_send(char *);
bool opts_add_listen_send_receive(char *);
bool opts_add_file_read(char *);
bool opts_add_file_write(char *);
bool opts_add_file_write_read(char *);
bool opts_add_file_append(char *);
bool opts_add_file_append_read(char *);
bool opts_add_exec_receive(char *);
bool opts_add_exec_send(char *);
bool opts_add_exec_send_receive(char *);
bool opts_add_stdout(char *);
bool opts_add_stdin(char *);

View File

@@ -3,9 +3,12 @@
#include <stdbool.h>
#include <stdint.h>
struct stat;
#define PACKET_DATA_LEN_MAX 14
struct packet {
const uint8_t *source_id;
struct stat *input_stat;
enum packet_type {
PACKET_TYPE_NONE,
PACKET_TYPE_MODE_AC,

View File

@@ -4,6 +4,7 @@
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <unistd.h>
#include "airspy_adsb.h"
@@ -27,6 +28,7 @@ typedef bool (*parser_wrapper)(struct receive *, struct packet *);
typedef bool (*parser)(struct buf *, struct packet *, void *state);
struct receive {
struct peer peer;
struct stat stat;
struct peer *on_close;
uint8_t id[UUID_LEN];
struct buf buf;
@@ -114,6 +116,7 @@ static void receive_read(struct peer *peer) {
while (receive->buf.length) {
struct packet packet = {
.source_id = receive->id,
.input_stat = &receive->stat,
};
if (!receive->parser_wrapper(receive, &packet)) {
break;
@@ -144,6 +147,7 @@ static void receive_new(int fd, void __attribute__((unused)) *passthrough, struc
buf_init(&receive->buf);
memset(receive->parser_state, 0, PARSER_STATE_LEN);
receive->parser_wrapper = receive_autodetect_parse;
assert(!fstat(fd, &receive->stat));
list_add(&receive->receive_list, &receive_head);

View File

@@ -6,6 +6,7 @@
#include <stdlib.h>
#include <strings.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
@@ -27,6 +28,7 @@
struct send {
struct peer peer;
struct stat stat;
struct peer *on_close;
uint8_t id[UUID_LEN];
struct serializer *serializer;
@@ -113,6 +115,8 @@ static void send_new(int fd, void *passthrough, struct peer *on_close) {
send->on_close = on_close;
uuid_gen(send->id);
send->serializer = serializer;
assert(!fstat(fd, &send->stat));
list_add(&send->send_list, &serializer->send_head);
peer_epoll_add((struct peer *) send, 0);
@@ -166,6 +170,11 @@ void send_write(struct packet *packet) {
}
struct send *iter, *next;
list_for_each_entry_safe(iter, next, &serializer->send_head, send_list) {
if (iter->stat.st_dev == packet->input_stat->st_dev &&
iter->stat.st_ino == packet->input_stat->st_ino) {
// Same socket that this packet came from
continue;
}
if (write(iter->peer.fd, buf_at(&buf, 0), buf.length) != (ssize_t) buf.length) {
// peer_loop() will see this shutdown and call send_del
int res = shutdown(iter->peer.fd, SHUT_WR);

68
adsbus/send_receive.c Normal file
View File

@@ -0,0 +1,68 @@
#include <assert.h>
#include <fcntl.h>
#include <stdlib.h>
#include <unistd.h>
#include "flow.h"
#include "list.h"
#include "peer.h"
#include "receive.h"
#include "send.h"
#include "send_receive.h"
struct send_receive {
struct peer peer;
struct peer *on_close;
uint8_t ref_count;
struct list_head send_receive_list;
};
static struct list_head send_receive_head = LIST_HEAD_INIT(send_receive_head);
static void send_receive_new(int, void *, struct peer *);
static struct flow _send_receive_flow = {
.name = "send_receive",
.new = send_receive_new,
.get_hello = send_get_hello,
.ref_count = &peer_count_out_in,
};
struct flow *send_receive_flow = &_send_receive_flow;
static void send_receive_del(struct send_receive *send_receive) {
list_del(&send_receive->send_receive_list);
peer_call(send_receive->on_close);
free(send_receive);
}
static void send_receive_on_close(struct peer *peer) {
struct send_receive *send_receive = (struct send_receive *) peer;
if (!--(send_receive->ref_count)) {
send_receive_del(send_receive);
}
}
static void send_receive_new(int fd, void *passthrough, struct peer *on_close) {
struct send_receive *send_receive = malloc(sizeof(*send_receive));
assert(send_receive);
send_receive->peer.fd = -1;
send_receive->peer.event_handler = send_receive_on_close;
send_receive->on_close = on_close;
send_receive->ref_count = 2;
list_add(&send_receive->send_receive_list, &send_receive_head);
flow_new(fd, send_flow, passthrough, on_close);
int fd2 = fcntl(fd, F_DUPFD_CLOEXEC, 0);
assert(fd2 >= 0);
flow_new(fd2, receive_flow, NULL, on_close);
}
void send_receive_cleanup() {
struct send_receive *iter, *next;
list_for_each_entry_safe(iter, next, &send_receive_head, send_receive_list) {
send_receive_del(iter);
}
}

4
adsbus/send_receive.h Normal file
View File

@@ -0,0 +1,4 @@
#pragma once
void send_receive_cleanup(void);
extern struct flow *send_receive_flow;