Hide more interfaces behind the flow code.

This commit is contained in:
Ian Gulliver
2016-02-29 20:16:57 -08:00
parent 3fb3c02cf5
commit 90de5b80bc
5 changed files with 55 additions and 56 deletions

View File

@@ -33,10 +33,13 @@ struct send {
struct list_head send_list;
};
static void send_new(int, void *, struct peer *);
static void send_get_hello(struct buf **, void *);
static struct flow _send_flow = {
.name = "send",
.new = send_new_wrapper,
.get_hello = send_hello,
.new = send_new,
.get_hello = send_get_hello,
.ref_count = &peer_count_out,
};
struct flow *send_flow = &_send_flow;
@@ -96,32 +99,9 @@ static void send_del_wrapper(struct peer *peer) {
send_del((struct send *) peer);
}
void send_init() {
assert(signal(SIGPIPE, SIG_IGN) != SIG_ERR);
for (size_t i = 0; i < NUM_SERIALIZERS; i++) {
list_head_init(&serializers[i].send_head);
}
}
static void send_new(int fd, void *passthrough, struct peer *on_close) {
struct serializer *serializer = (struct serializer *) passthrough;
void send_cleanup() {
for (size_t i = 0; i < NUM_SERIALIZERS; i++) {
struct send *iter, *next;
list_for_each_entry_safe(iter, next, &serializers[i].send_head, send_list) {
send_del(iter);
}
}
}
struct serializer *send_get_serializer(char *name) {
for (size_t i = 0; i < NUM_SERIALIZERS; i++) {
if (strcasecmp(serializers[i].name, name) == 0) {
return &serializers[i];
}
}
return NULL;
}
void send_new(int fd, struct serializer *serializer, struct peer *on_close) {
peer_count_out++;
socket_send_init(fd);
@@ -141,30 +121,51 @@ void send_new(int fd, struct serializer *serializer, struct peer *on_close) {
fprintf(stderr, "S %s (%s): New send connection\n", send->id, serializer->name);
}
void send_new_wrapper(int fd, void *passthrough, struct peer *on_close) {
send_new(fd, (struct serializer *) passthrough, on_close);
static void send_get_hello(struct buf **buf_pp, void *passthrough) {
struct serializer *serializer = (struct serializer *) passthrough;
if (serializer->hello) {
serializer->hello(buf_pp);
}
}
bool send_new_hello(int fd, struct serializer *serializer, struct peer *on_close) {
void send_init() {
assert(signal(SIGPIPE, SIG_IGN) != SIG_ERR);
for (size_t i = 0; i < NUM_SERIALIZERS; i++) {
list_head_init(&serializers[i].send_head);
}
}
void send_cleanup() {
for (size_t i = 0; i < NUM_SERIALIZERS; i++) {
struct send *iter, *next;
list_for_each_entry_safe(iter, next, &serializers[i].send_head, send_list) {
send_del(iter);
}
}
}
void *send_get_serializer(char *name) {
for (size_t i = 0; i < NUM_SERIALIZERS; i++) {
if (strcasecmp(serializers[i].name, name) == 0) {
return &serializers[i];
}
}
return NULL;
}
bool send_new_hello(int fd, void *passthrough, struct peer *on_close) {
struct buf buf = BUF_INIT, *buf_ptr = &buf;
send_hello(&buf_ptr, serializer);
send_get_hello(&buf_ptr, passthrough);
if (buf_ptr->length) {
if (write(fd, buf_at(buf_ptr, 0), buf_ptr->length) != (ssize_t) buf_ptr->length) {
assert(!close(fd));
return false;
}
}
send_new(fd, serializer, on_close);
send_new(fd, passthrough, on_close);
return true;
}
void send_hello(struct buf **buf_pp, void *passthrough) {
struct serializer *serializer = (struct serializer *) passthrough;
if (serializer->hello) {
serializer->hello(buf_pp);
}
}
void send_write(struct packet *packet) {
packet_sanity_check(packet);
for (size_t i = 0; i < NUM_SERIALIZERS; i++) {