In/out reference counting.

This commit is contained in:
Ian Gulliver
2016-02-25 16:17:25 -08:00
parent aa940137c1
commit 5e604f0596
9 changed files with 35 additions and 12 deletions

View File

@@ -27,6 +27,7 @@ struct incoming {
uint32_t attempt;
incoming_connection_handler handler;
void *passthrough;
uint32_t *count;
struct incoming *next;
};
@@ -68,6 +69,7 @@ static void incoming_handler(struct peer *peer) {
}
static void incoming_del(struct incoming *incoming) {
(*incoming->count)--;
if (incoming->peer.fd >= 0) {
assert(!close(incoming->peer.fd));
}
@@ -142,7 +144,9 @@ void incoming_cleanup() {
}
}
void incoming_new(char *node, char *service, incoming_connection_handler handler, void *passthrough) {
void incoming_new(char *node, char *service, incoming_connection_handler handler, void *passthrough, uint32_t *count) {
(*count)++;
struct incoming *incoming = malloc(sizeof(*incoming));
incoming->peer.event_handler = incoming_handler;
uuid_gen(incoming->id);
@@ -151,6 +155,7 @@ void incoming_new(char *node, char *service, incoming_connection_handler handler
incoming->attempt = 0;
incoming->handler = handler;
incoming->passthrough = passthrough;
incoming->count = count;
incoming->next = incoming_head;
incoming_head = incoming;

View File

@@ -1,7 +1,9 @@
#pragma once
#include <stdint.h>
struct peer;
void incoming_cleanup();
typedef void (*incoming_connection_handler)(int fd, void *, struct peer *);
void incoming_new(char *, char *, incoming_connection_handler, void *);
void incoming_new(char *, char *, incoming_connection_handler, void *, uint32_t *);

View File

@@ -4,7 +4,7 @@
#include "incoming.h"
#include "outgoing.h"
#include "peer.h"
#include "receive.h"
#include "send.h"
@@ -20,13 +20,13 @@ static char *opts_split(char **arg, char delim) {
return ret;
}
static void opts_add_listen(char *host_port, incoming_connection_handler handler, void *passthrough) {
static void opts_add_listen(char *host_port, incoming_connection_handler handler, void *passthrough, uint32_t *count) {
char *host = opts_split(&host_port, '/');
if (host) {
incoming_new(host, host_port, handler, passthrough);
incoming_new(host, host_port, handler, passthrough, count);
free(host);
} else {
incoming_new(NULL, host_port, handler, passthrough);
incoming_new(NULL, host_port, handler, passthrough, count);
}
}
@@ -51,7 +51,7 @@ bool opts_add_connect_receive(char *arg) {
return false;
}
outgoing_new(host, arg, receive_new, NULL);
outgoing_new(host, arg, receive_new, NULL, &peer_count_in);
free(host);
return true;
}
@@ -67,13 +67,13 @@ bool opts_add_connect_send(char *arg) {
return false;
}
outgoing_new(host, arg, send_new_wrapper, serializer);
outgoing_new(host, arg, send_new_wrapper, serializer, &peer_count_out);
free(host);
return true;
}
bool opts_add_listen_receive(char *arg) {
opts_add_listen(arg, receive_new, NULL);
opts_add_listen(arg, receive_new, NULL, &peer_count_in);
return true;
}
@@ -83,7 +83,7 @@ bool opts_add_listen_send(char *arg) {
return false;
}
opts_add_listen(arg, send_new_wrapper, serializer);
opts_add_listen(arg, send_new_wrapper, serializer, &peer_count_out);
return true;
}

View File

@@ -26,6 +26,7 @@ struct outgoing {
uint32_t attempt;
outgoing_connection_handler handler;
void *passthrough;
uint32_t *count;
struct outgoing *next;
};
@@ -134,6 +135,7 @@ static void outgoing_resolve_wrapper(struct peer *peer) {
}
static void outgoing_del(struct outgoing *outgoing) {
(*outgoing->count)--;
if (outgoing->peer.fd >= 0) {
assert(!close(outgoing->peer.fd));
}
@@ -151,7 +153,9 @@ void outgoing_cleanup() {
}
}
void outgoing_new(char *node, char *service, outgoing_connection_handler handler, void *passthrough) {
void outgoing_new(char *node, char *service, outgoing_connection_handler handler, void *passthrough, uint32_t *count) {
(*count)++;
struct outgoing *outgoing = malloc(sizeof(*outgoing));
uuid_gen(outgoing->id);
outgoing->node = strdup(node);
@@ -159,6 +163,7 @@ void outgoing_new(char *node, char *service, outgoing_connection_handler handler
outgoing->attempt = 0;
outgoing->handler = handler;
outgoing->passthrough = passthrough;
outgoing->count = count;
outgoing->next = outgoing_head;
outgoing_head = outgoing;

View File

@@ -4,4 +4,4 @@ struct peer;
void outgoing_cleanup();
typedef void (*outgoing_connection_handler)(int fd, void *, struct peer *);
void outgoing_new(char *, char *, outgoing_connection_handler, void *);
void outgoing_new(char *, char *, outgoing_connection_handler, void *, uint32_t *);

View File

@@ -16,6 +16,8 @@
#include "peer.h"
uint32_t peer_count_in = 0, peer_count_out = 0;
static int peer_epoll_fd;
static int peer_shutdown_fd;
static bool peer_shutdown_flag = false;

View File

@@ -9,6 +9,9 @@ struct peer {
int fd;
peer_event_handler event_handler;
};
extern uint32_t peer_count_in, peer_count_out;
void peer_init();
void peer_cleanup();
void peer_shutdown();

View File

@@ -83,6 +83,7 @@ static bool receive_autodetect_parse(struct receive *receive, struct packet *pac
static void receive_del(struct receive *receive) {
fprintf(stderr, "R %s: Connection closed\n", receive->id);
peer_count_out--;
peer_epoll_del((struct peer *) receive);
assert(!close(receive->peer.fd));
if (receive->prev) {
@@ -129,6 +130,8 @@ void receive_cleanup() {
}
void receive_new(int fd, void *unused, struct peer *on_close) {
peer_count_in++;
int res = shutdown(fd, SHUT_WR);
assert(res == 0 || (res == -1 && errno == ENOTSOCK));

View File

@@ -64,6 +64,7 @@ struct serializer {
static void send_del(struct send *send) {
fprintf(stderr, "S %s (%s): Connection closed\n", send->id, send->serializer->name);
peer_count_out--;
peer_epoll_del((struct peer *) send);
assert(!close(send->peer.fd));
if (send->prev) {
@@ -116,6 +117,8 @@ struct serializer *send_get_serializer(char *name) {
}
void send_new(int fd, struct serializer *serializer, struct peer *on_close) {
peer_count_out++;
struct send *send = malloc(sizeof(*send));
assert(send);