Clean up serialize interface.

Switch to jansson for JSON handling.
Add server ID.
Expose IDs in JSON.
This commit is contained in:
Ian Gulliver
2016-02-17 00:21:28 +00:00
parent b993764a14
commit 01d548dbb5
9 changed files with 93 additions and 55 deletions

View File

@@ -1,7 +1,7 @@
CC ?= clang
CFLAGS ?= -Wall -Werror -O4 -g --std=gnu11 --pedantic-errors
LDFLAGS ?= -Wall -O4
LIBS ?= -luuid
LIBS ?= -luuid -ljansson
all: adsbus

View File

@@ -67,6 +67,7 @@ static int loop(int epoll_fd) {
}
int main(int argc, char *argv[]) {
server_init();
hex_init();
airspy_adsb_init();
beast_init();

View File

@@ -45,7 +45,7 @@ bool backend_connect(char *node, char *service, struct backend *backend, int epo
int gai_err = getaddrinfo(node, service, &hints, &addrs);
if (gai_err) {
fprintf(stderr, "%s: getaddrinfo(%s %s): %s\n", backend->id, node, service, gai_strerror(gai_err));
fprintf(stderr, "B %s: getaddrinfo(%s %s): %s\n", backend->id, node, service, gai_strerror(gai_err));
return false;
}
}
@@ -68,13 +68,13 @@ bool backend_connect(char *node, char *service, struct backend *backend, int epo
if (addr == NULL) {
freeaddrinfo(addrs);
fprintf(stderr, "%s: Can't connect to %s %s\n", backend->id, node, service);
fprintf(stderr, "B %s: Can't connect to %s %s\n", backend->id, node, service);
return false;
}
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
if (getnameinfo(addr->ai_addr, addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0) {
fprintf(stderr, "%s: Connected to %s %s\n", backend->id, hbuf, sbuf);
fprintf(stderr, "B %s: Connected to %s %s\n", backend->id, hbuf, sbuf);
}
}
@@ -98,17 +98,19 @@ bool backend_connect(char *node, char *service, struct backend *backend, int epo
bool backend_read(struct backend *backend) {
if (buf_fill(&backend->buf, backend->fd) < 0) {
fprintf(stderr, "%s: Connection closed by backend\n", backend->id);
fprintf(stderr, "B %s: Connection closed by backend\n", backend->id);
return false;
}
struct packet packet;
struct packet packet = {
.backend = backend,
};
while (backend->parser(backend, &packet)) {
client_write(&packet);
}
if (backend->buf.length == BUF_LEN_MAX) {
fprintf(stderr, "%s: Input buffer overrun. This probably means that adsbus doesn't understand the protocol that this source is speaking.\n", backend->id);
fprintf(stderr, "B %s: Input buffer overrun. This probably means that adsbus doesn't understand the protocol that this source is speaking.\n", backend->id);
return false;
}
return true;

View File

@@ -15,7 +15,7 @@ struct client {
struct client *next;
};
typedef size_t (*serializer)(struct packet *, char *);
typedef void (*serializer)(struct packet *, struct buf *);
struct serializer {
char *name;
serializer serialize;
@@ -40,13 +40,12 @@ struct serializer *client_get_serializer(char *name) {
}
static bool client_hello(int fd, struct serializer *serializer) {
char buf[SERIALIZE_LEN];
size_t len = serializer->serialize(NULL, buf);
if (len == 0) {
struct buf buf = BUF_INIT;
serializer->serialize(NULL, &buf);
if (buf.length == 0) {
return true;
}
if (write(fd, buf, len) != len) {
fprintf(stderr, "Failed to write hello to client\n");
if (write(fd, buf_at(&buf, 0), buf.length) != buf.length) {
return false;
}
return true;
@@ -59,6 +58,7 @@ void client_add(int fd, struct serializer *serializer) {
assert(fcntl(fd, F_SETFL, flags) == 0);
if (!client_hello(fd, serializer)) {
fprintf(stderr, "C xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx: Failed to write hello to client\n");
return;
}
@@ -70,7 +70,7 @@ void client_add(int fd, struct serializer *serializer) {
client->next = serializer->client_head;
serializer->client_head = client;
fprintf(stderr, "%s (%s): New client\n", client->id, serializer->name);
fprintf(stderr, "C %s (%s): New client\n", client->id, serializer->name);
}
void client_write(struct packet *packet) {
@@ -79,18 +79,18 @@ void client_write(struct packet *packet) {
if (serializer->client_head == NULL) {
continue;
}
char buf[SERIALIZE_LEN];
size_t len = serializer->serialize(packet, buf);
if (len == 0) {
struct buf buf = BUF_INIT;
serializer->serialize(packet, &buf);
if (buf.length == 0) {
continue;
}
struct client *client = serializer->client_head, *prev_client = NULL;
while (client) {
if (write(client->fd, buf, len) == len) {
if (write(client->fd, buf_at(&buf, 0), buf.length) == buf.length) {
prev_client = client;
client = client->next;
} else {
fprintf(stderr, "%s (%s): Client disconnected\n", client->id, serializer->name);
fprintf(stderr, "C %s (%s): Client disconnected\n", client->id, serializer->name);
if (prev_client) {
prev_client->next = client->next;
} else {

View File

@@ -2,8 +2,6 @@
#include "common.h"
#define SERIALIZE_LEN 256
struct serializer *client_get_serializer(char *);
void client_add(int, struct serializer *);

View File

@@ -111,3 +111,11 @@ void uuid_gen(char *out) {
uuid_generate(uuid);
uuid_unparse(uuid, out);
}
char server_id[UUID_LEN];
void server_init() {
uuid_gen(server_id);
fprintf(stderr, "S %s: Server start\n", server_id);
}

View File

@@ -39,6 +39,7 @@ void buf_consume(struct buf *, size_t);
//////// packet
#define DATA_LEN_MAX 14
struct backend;
struct packet {
enum {
MODE_AC,
@@ -48,13 +49,15 @@ struct packet {
uint8_t payload[DATA_LEN_MAX];
uint64_t mlat_timestamp;
uint32_t rssi;
struct backend *backend;
};
//////// mlat
#define MLAT_MHZ 120
#define MLAT_MAX UINT64_MAX
// Use the signed max to avoid problems with some consumers; it's large enough to not matter.
#define MLAT_MAX INT64_MAX
#define RSSI_MAX UINT32_MAX
struct mlat_state {
@@ -82,3 +85,9 @@ void hex_from_bin(char *, uint8_t *, size_t);
#define UUID_LEN 37
void uuid_gen(char *);
///////// server
extern char server_id[];
void server_init();

84
json.c
View File

@@ -1,6 +1,9 @@
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <jansson.h>
#include "backend.h"
#include "client.h"
#include "json.h"
@@ -8,57 +11,74 @@
// Hobo JSON to avoid overhead. Assumes that we can't get quotes in the data.
void json_init() {
assert(JSON_INTEGER_IS_LONG_LONG);
}
static size_t json_hello(char *buf) {
int len = snprintf(buf, SERIALIZE_LEN,
"{\"mlat_timestamp_mhz\":%ju, \"mlat_timestamp_max\":%ju, \"rssi_max\":%ju}\n",
(uintmax_t) MLAT_MHZ,
(uintmax_t) MLAT_MAX,
(uintmax_t) RSSI_MAX);
assert(len < SERIALIZE_LEN);
return len;
static int json_buf_append_callback(const char *buffer, size_t size, void *data) {
struct buf *buf = data;
if (size > BUF_LEN_MAX - buf->length - 1) {
return -1;
}
memcpy(buf_at(buf, buf->length), buffer, size);
buf->length += size;
return 0;
}
static size_t json_serialize_mode_s_short(struct packet *packet, char *buf) {
static void json_hello(struct buf *buf) {
json_t *hello = json_pack("{sssIsIsI}",
"server_id", server_id,
"mlat_timestamp_mhz", (json_int_t) MLAT_MHZ,
"mlat_timestamp_max", (json_int_t) MLAT_MAX,
"rssi_max", (json_int_t) RSSI_MAX);
assert(json_dump_callback(hello, json_buf_append_callback, buf, 0) == 0);
json_decref(hello);
buf_chr(buf, buf->length++) = '\n';
}
static void json_serialize_mode_s_short(struct packet *packet, struct buf *buf) {
assert(packet->mlat_timestamp < MLAT_MAX);
char hexbuf[14];
hex_from_bin(hexbuf, packet->payload, 7);
int len = snprintf(buf, SERIALIZE_LEN,
"{\"payload\":\"%.14s\", \"mlat_timestamp\":%ju, \"rssi\":%ju}\n",
hexbuf,
(uintmax_t) packet->mlat_timestamp,
(uintmax_t) packet->rssi);
assert(len < SERIALIZE_LEN);
return len;
json_t *out = json_pack("{ssss#sIsI}",
"backend_id", packet->backend->id,
"payload", hexbuf, 14,
"mlat_timestamp", (json_int_t) packet->mlat_timestamp,
"rssi", (json_int_t) packet->rssi);
assert(json_dump_callback(out, json_buf_append_callback, buf, 0) == 0);
json_decref(out);
buf_chr(buf, buf->length++) = '\n';
}
static size_t json_serialize_mode_s_long(struct packet *packet, char *buf) {
char hexbuf[28];
static void json_serialize_mode_s_long(struct packet *packet, struct buf *buf) {
assert(packet->mlat_timestamp < MLAT_MAX);
char hexbuf[14];
hex_from_bin(hexbuf, packet->payload, 14);
int len = snprintf(buf, SERIALIZE_LEN,
"{\"payload\":\"%.28s\", \"mlat_timestamp\":%ju, \"rssi\":%ju}\n",
hexbuf,
(uintmax_t) packet->mlat_timestamp,
(uintmax_t) packet->rssi);
assert(len < SERIALIZE_LEN);
return len;
json_t *out = json_pack("{ssss#sIsI}",
"backend_id", packet->backend->id,
"payload", hexbuf, 28,
"mlat_timestamp", (json_int_t) packet->mlat_timestamp,
"rssi", (json_int_t) packet->rssi);
assert(json_dump_callback(out, json_buf_append_callback, buf, 0) == 0);
json_decref(out);
buf_chr(buf, buf->length++) = '\n';
}
size_t json_serialize(struct packet *packet, char *buf) {
void json_serialize(struct packet *packet, struct buf *buf) {
if (!packet) {
return json_hello(buf);
json_hello(buf);
return;
}
switch (packet->type) {
case MODE_AC:
return 0;
break;
case MODE_S_SHORT:
return json_serialize_mode_s_short(packet, buf);
json_serialize_mode_s_short(packet, buf);
break;
case MODE_S_LONG:
return json_serialize_mode_s_long(packet, buf);
json_serialize_mode_s_long(packet, buf);
break;
}
return 0;
}

2
json.h
View File

@@ -4,4 +4,4 @@
void json_init();
size_t json_serialize(struct packet *, char *);
void json_serialize(struct packet *, struct buf *);