Make epoll_fd global as a concession to clarity. Handle client disconnection even when silent.

This commit is contained in:
Ian Gulliver
2016-02-17 15:41:27 -08:00
parent 908a364aef
commit e14b444466
9 changed files with 107 additions and 87 deletions

View File

@@ -2,7 +2,6 @@
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h> #include <stdio.h>
#include <getopt.h> #include <getopt.h>
#include <sys/epoll.h>
#include <string.h> #include <string.h>
#include <signal.h> #include <signal.h>
@@ -40,7 +39,7 @@ static void print_usage(char *argv[]) {
, argv[0]); , argv[0]);
} }
static bool parse_opts(int argc, char *argv[], int epoll_fd) { static bool parse_opts(int argc, char *argv[]) {
static struct option long_options[] = { static struct option long_options[] = {
{"backend", required_argument, 0, 'b'}, {"backend", required_argument, 0, 'b'},
{"dump", required_argument, 0, 'd'}, {"dump", required_argument, 0, 'd'},
@@ -63,7 +62,7 @@ static bool parse_opts(int argc, char *argv[], int epoll_fd) {
*delim1 = '\0'; *delim1 = '\0';
delim1++; delim1++;
backend_new(optarg, delim1, epoll_fd); backend_new(optarg, delim1);
break; break;
case 'd': case 'd':
@@ -79,11 +78,11 @@ static bool parse_opts(int argc, char *argv[], int epoll_fd) {
case 'i': case 'i':
delim1 = strrchr(optarg, '/'); delim1 = strrchr(optarg, '/');
if (delim1 == NULL) { if (delim1 == NULL) {
incoming_new(NULL, optarg, epoll_fd, backend_new_fd, NULL); incoming_new(NULL, optarg, backend_new_fd, NULL);
} else { } else {
*delim1 = '\0'; *delim1 = '\0';
delim1++; delim1++;
incoming_new(optarg, delim1, epoll_fd, backend_new_fd, NULL); incoming_new(optarg, delim1, backend_new_fd, NULL);
} }
break; break;
@@ -103,11 +102,11 @@ static bool parse_opts(int argc, char *argv[], int epoll_fd) {
delim2 = strrchr(delim1, '/'); delim2 = strrchr(delim1, '/');
if (delim2 == NULL) { if (delim2 == NULL) {
incoming_new(NULL, delim1, epoll_fd, client_new_fd, serializer); incoming_new(NULL, delim1, client_add_wrapper, serializer);
} else { } else {
*delim2 = '\0'; *delim2 = '\0';
delim2++; delim2++;
incoming_new(delim1, delim2, epoll_fd, client_new_fd, serializer); incoming_new(delim1, delim2, client_add_wrapper, serializer);
} }
break; break;
@@ -126,44 +125,21 @@ static bool parse_opts(int argc, char *argv[], int epoll_fd) {
return true; return true;
} }
static int loop(int epoll_fd) {
while (1) {
#define MAX_EVENTS 10
struct epoll_event events[MAX_EVENTS];
int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
if (nfds == -1) {
perror("epoll_wait");
return -1;
}
for (int n = 0; n < nfds; n++) {
struct peer *peer = events[n].data.ptr;
peer->event_handler(peer, epoll_fd);
}
}
}
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
signal(SIGPIPE, SIG_IGN); signal(SIGPIPE, SIG_IGN);
server_init(); server_init();
peer_init();
hex_init(); hex_init();
airspy_adsb_init(); airspy_adsb_init();
beast_init(); beast_init();
json_init(); json_init();
stats_init(); stats_init();
int epoll_fd = epoll_create1(0); if (!parse_opts(argc, argv)) {
if (epoll_fd == -1) {
perror("epoll_create1");
return EXIT_FAILURE; return EXIT_FAILURE;
} }
if (!parse_opts(argc, argv, epoll_fd)) { peer_loop();
return EXIT_FAILURE;
}
loop(epoll_fd);
close(epoll_fd);
return EXIT_SUCCESS; return EXIT_SUCCESS;
} }

View File

@@ -3,7 +3,6 @@
#include <sys/types.h> #include <sys/types.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <netdb.h> #include <netdb.h>
#include <sys/epoll.h>
#include <string.h> #include <string.h>
#include <errno.h> #include <errno.h>
@@ -16,11 +15,11 @@
static bool backend_autodetect_parse(struct backend *, struct packet *); static bool backend_autodetect_parse(struct backend *, struct packet *);
static void backend_connect(struct backend *, int); static void backend_connect(struct backend *);
static void backend_connect_next(struct backend *, int); static void backend_connect_next(struct backend *);
static void backend_connect_handler(struct peer *, int); static void backend_connect_handler(struct peer *);
static void backend_read(struct peer *, int); static void backend_read(struct peer *);
struct parser { struct parser {
@@ -56,23 +55,23 @@ static struct backend *backend_create() {
return backend; return backend;
} }
void backend_new(char *node, char *service, int epoll_fd) { void backend_new(char *node, char *service) {
struct backend *backend = backend_create(); struct backend *backend = backend_create();
backend->node = node; backend->node = node;
backend->service = service; backend->service = service;
backend_connect(backend, epoll_fd); backend_connect(backend);
} }
void backend_new_fd(int fd, int epoll_fd, void *unused) { void backend_new_fd(int fd, void *unused) {
struct backend *backend = backend_create(); struct backend *backend = backend_create();
backend->peer.fd = fd; backend->peer.fd = fd;
backend->peer.event_handler = backend_read; backend->peer.event_handler = backend_read;
peer_epoll_add((struct peer *) backend, epoll_fd, EPOLLIN); peer_epoll_add((struct peer *) backend, EPOLLIN);
fprintf(stderr, "B %s: New backend from incoming connection\n", backend->id); fprintf(stderr, "B %s: New backend from incoming connection\n", backend->id);
} }
static void backend_connect(struct backend *backend, int epoll_fd) { static void backend_connect(struct backend *backend) {
fprintf(stderr, "B %s: Resolving %s/%s...\n", backend->id, backend->node, backend->service); fprintf(stderr, "B %s: Resolving %s/%s...\n", backend->id, backend->node, backend->service);
struct addrinfo hints = { struct addrinfo hints = {
@@ -86,10 +85,10 @@ static void backend_connect(struct backend *backend, int epoll_fd) {
return; return;
} }
backend->addr = backend->addrs; backend->addr = backend->addrs;
backend_connect_next(backend, epoll_fd); backend_connect_next(backend);
} }
static void backend_connect_result(struct backend *backend, int epoll_fd, int result) { static void backend_connect_result(struct backend *backend, int result) {
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
assert(getnameinfo(backend->addr->ai_addr, backend->addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0); assert(getnameinfo(backend->addr->ai_addr, backend->addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0);
switch (result) { switch (result) {
@@ -97,12 +96,12 @@ static void backend_connect_result(struct backend *backend, int epoll_fd, int re
fprintf(stderr, "B %s: Connected to %s/%s\n", backend->id, hbuf, sbuf); fprintf(stderr, "B %s: Connected to %s/%s\n", backend->id, hbuf, sbuf);
freeaddrinfo(backend->addrs); freeaddrinfo(backend->addrs);
backend->peer.event_handler = backend_read; backend->peer.event_handler = backend_read;
peer_epoll_add((struct peer *) backend, epoll_fd, EPOLLIN); peer_epoll_add((struct peer *) backend, EPOLLIN);
break; break;
case EINPROGRESS: case EINPROGRESS:
backend->peer.event_handler = backend_connect_handler; backend->peer.event_handler = backend_connect_handler;
peer_epoll_add((struct peer *) backend, epoll_fd, EPOLLOUT); peer_epoll_add((struct peer *) backend, EPOLLOUT);
break; break;
default: default:
@@ -110,12 +109,12 @@ static void backend_connect_result(struct backend *backend, int epoll_fd, int re
close(backend->peer.fd); close(backend->peer.fd);
backend->addr = backend->addr->ai_next; backend->addr = backend->addr->ai_next;
// Tail recursion :/ // Tail recursion :/
backend_connect_next(backend, epoll_fd); backend_connect_next(backend);
break; break;
} }
} }
static void backend_connect_next(struct backend *backend, int epoll_fd) { static void backend_connect_next(struct backend *backend) {
if (backend->addr == NULL) { if (backend->addr == NULL) {
freeaddrinfo(backend->addrs); freeaddrinfo(backend->addrs);
fprintf(stderr, "B %s: Can't connect to any addresses of %s/%s\n", backend->id, backend->node, backend->service); fprintf(stderr, "B %s: Can't connect to any addresses of %s/%s\n", backend->id, backend->node, backend->service);
@@ -130,21 +129,21 @@ static void backend_connect_next(struct backend *backend, int epoll_fd) {
assert(backend->peer.fd >= 0); assert(backend->peer.fd >= 0);
int result = connect(backend->peer.fd, backend->addr->ai_addr, backend->addr->ai_addrlen); int result = connect(backend->peer.fd, backend->addr->ai_addr, backend->addr->ai_addrlen);
backend_connect_result(backend, epoll_fd, result == 0 ? result : errno); backend_connect_result(backend, result == 0 ? result : errno);
} }
static void backend_connect_handler(struct peer *peer, int epoll_fd) { static void backend_connect_handler(struct peer *peer) {
struct backend *backend = (struct backend *) peer; struct backend *backend = (struct backend *) peer;
peer_epoll_del(peer, epoll_fd); peer_epoll_del(peer);
int error; int error;
socklen_t len = sizeof(error); socklen_t len = sizeof(error);
assert(getsockopt(backend->peer.fd, SOL_SOCKET, SO_ERROR, &error, &len) == 0); assert(getsockopt(backend->peer.fd, SOL_SOCKET, SO_ERROR, &error, &len) == 0);
backend_connect_result(backend, epoll_fd, error); backend_connect_result(backend, error);
} }
static void backend_read(struct peer *peer, int epoll_fd) { static void backend_read(struct peer *peer) {
struct backend *backend = (struct backend *) peer; struct backend *backend = (struct backend *) peer;
if (buf_fill(&backend->buf, backend->peer.fd) <= 0) { if (buf_fill(&backend->buf, backend->peer.fd) <= 0) {

View File

@@ -21,5 +21,5 @@ struct backend {
parser parser; parser parser;
}; };
void backend_new(char *, char *, int); void backend_new(char *, char *);
void backend_new_fd(int, int, void *); void backend_new_fd(int, void *);

View File

@@ -6,13 +6,17 @@
#include <unistd.h> #include <unistd.h>
#include <fcntl.h> #include <fcntl.h>
#include "common.h"
#include "json.h" #include "json.h"
#include "stats.h" #include "stats.h"
#include "client.h" #include "client.h"
struct client { struct client {
struct peer peer;
char id[UUID_LEN]; char id[UUID_LEN];
int fd; struct serializer *serializer;
struct client *prev;
struct client *next; struct client *next;
}; };
@@ -34,6 +38,24 @@ struct serializer {
#define NUM_SERIALIZERS (sizeof(serializers) / sizeof(*serializers)) #define NUM_SERIALIZERS (sizeof(serializers) / sizeof(*serializers))
static void client_hangup(struct client *client) {
fprintf(stderr, "C %s (%s): Client disconnected\n", client->id, client->serializer->name);
if (client->prev) {
client->prev->next = client->next;
} else {
client->serializer->client_head = client->next;
}
if (client->next) {
client->next->prev = client->prev;
}
close(client->peer.fd);
free(client);
}
static void client_hangup_wrapper(struct peer *peer) {
client_hangup((struct client *) peer);
}
struct serializer *client_get_serializer(char *name) { struct serializer *client_get_serializer(char *name) {
for (int i = 0; i < NUM_SERIALIZERS; i++) { for (int i = 0; i < NUM_SERIALIZERS; i++) {
if (strcasecmp(serializers[i].name, name) == 0) { if (strcasecmp(serializers[i].name, name) == 0) {
@@ -69,17 +91,23 @@ void client_add(int fd, struct serializer *serializer) {
struct client *client = malloc(sizeof(*client)); struct client *client = malloc(sizeof(*client));
assert(client); assert(client);
client->peer.fd = fd;
client->peer.event_handler = client_hangup_wrapper;
uuid_gen(client->id); uuid_gen(client->id);
client->fd = fd; client->serializer = serializer;
client->prev = NULL;
client->next = serializer->client_head; client->next = serializer->client_head;
serializer->client_head = client; serializer->client_head = client;
// Only listen for hangup
peer_epoll_add((struct peer *) client, EPOLLIN);
fprintf(stderr, "C %s (%s): New client\n", client->id, serializer->name); fprintf(stderr, "C %s (%s): New client\n", client->id, serializer->name);
} }
void client_new_fd(int fd, int epoll_fd, void *passthrough) { void client_add_wrapper(int fd, void *passthrough) {
struct serializer *serializer = (struct serializer *) passthrough; client_add(fd, (struct serializer *) passthrough);
client_add(fd, serializer);
} }
void client_write(struct packet *packet) { void client_write(struct packet *packet) {
@@ -93,21 +121,14 @@ void client_write(struct packet *packet) {
if (buf.length == 0) { if (buf.length == 0) {
continue; continue;
} }
struct client *client = serializer->client_head, *prev_client = NULL; struct client *client = serializer->client_head;
while (client) { while (client) {
if (write(client->fd, buf_at(&buf, 0), buf.length) == buf.length) { if (write(client->peer.fd, buf_at(&buf, 0), buf.length) == buf.length) {
prev_client = client;
client = client->next; client = client->next;
} else { } else {
fprintf(stderr, "C %s (%s): Client disconnected\n", client->id, serializer->name); struct client *next = client->next;
if (prev_client) { client_hangup(client);
prev_client->next = client->next; client = next;
} else {
serializer->client_head = client->next;
}
struct client *del = client;
client = client->next;
free(del);
} }
} }
} }

View File

@@ -5,5 +5,5 @@
struct serializer *client_get_serializer(char *); struct serializer *client_get_serializer(char *);
void client_add(int, struct serializer *); void client_add(int, struct serializer *);
void client_new_fd(int, int, void *); void client_add_wrapper(int, void *);
void client_write(struct packet *); void client_write(struct packet *);

View File

@@ -3,13 +3,20 @@
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <unistd.h> #include <unistd.h>
#include <sys/epoll.h>
#include <uuid/uuid.h> #include <uuid/uuid.h>
#include "common.h" #include "common.h"
void peer_epoll_add(struct peer *peer, int epoll_fd, uint32_t events) { int epoll_fd;
void peer_init() {
epoll_fd = epoll_create1(0);
assert(epoll_fd >= 0);
}
void peer_epoll_add(struct peer *peer, uint32_t events) {
struct epoll_event ev = { struct epoll_event ev = {
.events = events, .events = events,
.data = { .data = {
@@ -19,10 +26,24 @@ void peer_epoll_add(struct peer *peer, int epoll_fd, uint32_t events) {
assert(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peer->fd, &ev) == 0); assert(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peer->fd, &ev) == 0);
} }
void peer_epoll_del(struct peer *peer, int epoll_fd) { void peer_epoll_del(struct peer *peer) {
assert(epoll_ctl(epoll_fd, EPOLL_CTL_DEL, peer->fd, NULL) == 0); assert(epoll_ctl(epoll_fd, EPOLL_CTL_DEL, peer->fd, NULL) == 0);
} }
void peer_loop() {
while (1) {
#define MAX_EVENTS 10
struct epoll_event events[MAX_EVENTS];
int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
assert(nfds >= 0);
for (int n = 0; n < nfds; n++) {
struct peer *peer = events[n].data.ptr;
peer->event_handler(peer);
}
}
}
void buf_init(struct buf *buf) { void buf_init(struct buf *buf) {
buf->start = 0; buf->start = 0;

View File

@@ -2,19 +2,23 @@
#include <stdint.h> #include <stdint.h>
#include <unistd.h> #include <unistd.h>
#include <sys/epoll.h>
//////// peer //////// peer
// All specific peer structs must be castable to this. // All specific peer structs must be castable to this.
struct peer; struct peer;
typedef void (*peer_event_handler)(struct peer *, int epoll_fd); typedef void (*peer_event_handler)(struct peer *);
struct peer { struct peer {
int fd; int fd;
peer_event_handler event_handler; peer_event_handler event_handler;
}; };
void peer_epoll_add(struct peer *, int, uint32_t); extern int epoll_fd;
void peer_epoll_del(struct peer *, int); void peer_init();
void peer_epoll_add(struct peer *, uint32_t);
void peer_epoll_del(struct peer *);
void peer_loop();
//////// buf //////// buf

View File

@@ -6,7 +6,6 @@
#include <netdb.h> #include <netdb.h>
#include <string.h> #include <string.h>
#include <errno.h> #include <errno.h>
#include <sys/epoll.h>
#include "common.h" #include "common.h"
#include "incoming.h" #include "incoming.h"
@@ -22,7 +21,7 @@ struct incoming {
}; };
static void incoming_handler(struct peer *peer, int epoll_fd) { static void incoming_handler(struct peer *peer) {
struct incoming *incoming = (struct incoming *) peer; struct incoming *incoming = (struct incoming *) peer;
struct sockaddr peer_addr, local_addr; struct sockaddr peer_addr, local_addr;
@@ -45,10 +44,10 @@ static void incoming_handler(struct peer *peer, int epoll_fd) {
local_hbuf, local_sbuf, local_hbuf, local_sbuf,
peer_hbuf, peer_sbuf); peer_hbuf, peer_sbuf);
incoming->handler(fd, epoll_fd, incoming->passthrough); incoming->handler(fd, incoming->passthrough);
} }
void incoming_new(char *node, char *service, int epoll_fd, incoming_connection_handler handler, void *passthrough) { void incoming_new(char *node, char *service, incoming_connection_handler handler, void *passthrough) {
struct incoming *incoming = malloc(sizeof(*incoming)); struct incoming *incoming = malloc(sizeof(*incoming));
incoming->peer.event_handler = incoming_handler; incoming->peer.event_handler = incoming_handler;
uuid_gen(incoming->id); uuid_gen(incoming->id);
@@ -103,5 +102,5 @@ void incoming_new(char *node, char *service, int epoll_fd, incoming_connection_h
return; return;
} }
peer_epoll_add((struct peer *) incoming, epoll_fd, EPOLLIN); peer_epoll_add((struct peer *) incoming, EPOLLIN);
} }

View File

@@ -1,4 +1,4 @@
#pragma once #pragma once
typedef void (*incoming_connection_handler)(int fd, int epoll_fd, void *); typedef void (*incoming_connection_handler)(int fd, void *);
void incoming_new(char *, char *, int, incoming_connection_handler, void *); void incoming_new(char *, char *, incoming_connection_handler, void *);