add udp version to the c port
This commit is contained in:
parent
37c20f8096
commit
7951ce61bc
170
src/c90/metric_client_udp.c
Normal file
170
src/c90/metric_client_udp.c
Normal file
@ -0,0 +1,170 @@
|
||||
/* metric_client_udp.c - STRICT C89/C90 UDP client with clean UI */
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <unistd.h>
|
||||
#include <sys/socket.h>
|
||||
#include <netinet/in.h>
|
||||
#include <arpa/inet.h>
|
||||
#include <sys/select.h>
|
||||
#include <errno.h>
|
||||
|
||||
#define PORT 9999
|
||||
#define BUF_SIZE 1024
|
||||
|
||||
#define MSG_PING 0x01
|
||||
#define MSG_PONG 0x02
|
||||
#define MSG_METRIC_REQ 0x03
|
||||
#define MSG_METRIC_RESP 0x04
|
||||
#define MSG_SUBSCRIBE 0x05
|
||||
#define MSG_PUSH 0x06
|
||||
#define MSG_UNSUB 0x07
|
||||
#define MSG_ERROR 0xFF
|
||||
|
||||
#define HEADER_SIZE 3
|
||||
|
||||
static void send_to(int sock, struct sockaddr_in *to,
|
||||
unsigned char type, const void *payload, unsigned short len)
|
||||
{
|
||||
unsigned char header[HEADER_SIZE];
|
||||
header[0] = type;
|
||||
header[1] = (len >> 8) & 0xFF;
|
||||
header[2] = len & 0xFF;
|
||||
sendto(sock, header, HEADER_SIZE, 0, (struct sockaddr *)to, sizeof(*to));
|
||||
if (len > 0)
|
||||
sendto(sock, payload, len, 0, (struct sockaddr *)to, sizeof(*to));
|
||||
}
|
||||
|
||||
static int recv_from_exact(int sock, void *buf, int n)
|
||||
{
|
||||
int total = 0;
|
||||
while (total < n) {
|
||||
int r = recv(sock, (char *)buf + total, n - total, 0);
|
||||
if (r <= 0) return -1;
|
||||
total += r;
|
||||
}
|
||||
return total;
|
||||
}
|
||||
|
||||
static void print_wire_comparison(void)
|
||||
{
|
||||
printf("\n--- Wire overhead comparison ---\n");
|
||||
printf("Our protocol (METRIC_REQ 'cpu'): %d bytes\n", HEADER_SIZE + 3);
|
||||
printf("Equivalent HTTP GET: 77 bytes\n");
|
||||
printf("HTTP is ~13x larger\n\n");
|
||||
}
|
||||
|
||||
int main(void)
|
||||
{
|
||||
int sock, maxfd;
|
||||
struct sockaddr_in server_addr;
|
||||
unsigned char header[HEADER_SIZE];
|
||||
unsigned char payload[BUF_SIZE];
|
||||
double val;
|
||||
char line[256];
|
||||
unsigned char type;
|
||||
unsigned short len;
|
||||
fd_set readfds;
|
||||
|
||||
sock = socket(AF_INET, SOCK_DGRAM, 0);
|
||||
if (sock < 0) { perror("socket"); exit(1); }
|
||||
|
||||
server_addr.sin_family = AF_INET;
|
||||
server_addr.sin_port = htons(PORT);
|
||||
inet_pton(AF_INET, "127.0.0.1", &server_addr.sin_addr);
|
||||
|
||||
printf("Connected to UDP 127.0.0.1:%d (FINAL clean C89 demo client)\n", PORT);
|
||||
print_wire_comparison();
|
||||
|
||||
printf("Commands: ping | get <metric> | sub <metric> <ms> | unsub | quit\n");
|
||||
printf("Metrics: cpu, memory, disk, loadavg, uptime\n\n");
|
||||
|
||||
printf("> "); fflush(stdout);
|
||||
|
||||
maxfd = sock + 1;
|
||||
|
||||
while (1) {
|
||||
FD_ZERO(&readfds);
|
||||
FD_SET(0, &readfds);
|
||||
FD_SET(sock, &readfds);
|
||||
|
||||
if (select(maxfd, &readfds, NULL, NULL, NULL) < 0) {
|
||||
if (errno == EINTR) continue;
|
||||
perror("select");
|
||||
break;
|
||||
}
|
||||
|
||||
if (FD_ISSET(sock, &readfds)) {
|
||||
if (recv_from_exact(sock, header, HEADER_SIZE) != HEADER_SIZE) break;
|
||||
|
||||
type = header[0];
|
||||
len = (header[1] << 8) | header[2];
|
||||
|
||||
if (len > 0) {
|
||||
if (recv_from_exact(sock, payload, len) != len) break;
|
||||
}
|
||||
|
||||
printf("\r\x1b[2K");
|
||||
|
||||
if (type == MSG_PONG) {
|
||||
printf(" [PONG] server alive\n");
|
||||
} else if (type == MSG_METRIC_RESP) {
|
||||
memcpy(&val, payload, sizeof(double));
|
||||
printf(" [METRIC] %.2f\n", val);
|
||||
} else if (type == MSG_PUSH) {
|
||||
unsigned char name_len = payload[0];
|
||||
payload[1 + name_len] = '\0';
|
||||
memcpy(&val, payload + 1 + name_len, sizeof(double));
|
||||
printf(" [PUSH] %s: %.2f\n", (char *)payload + 1, val);
|
||||
} else if (type == MSG_ERROR) {
|
||||
payload[len] = '\0';
|
||||
printf(" [ERROR] %s\n", payload);
|
||||
}
|
||||
|
||||
printf("> "); fflush(stdout);
|
||||
}
|
||||
|
||||
if (FD_ISSET(0, &readfds)) {
|
||||
if (fgets(line, sizeof(line), stdin) == NULL) break;
|
||||
line[strcspn(line, "\n")] = '\0';
|
||||
|
||||
if (strcmp(line, "quit") == 0) break;
|
||||
else if (strcmp(line, "ping") == 0) {
|
||||
send_to(sock, &server_addr, MSG_PING, NULL, 0);
|
||||
}
|
||||
else if (strncmp(line, "get ", 4) == 0) {
|
||||
const char *metric = line + 4;
|
||||
send_to(sock, &server_addr, MSG_METRIC_REQ, metric, strlen(metric));
|
||||
}
|
||||
else if (strncmp(line, "sub ", 4) == 0) {
|
||||
char *metric = strtok(line + 4, " ");
|
||||
if (metric) {
|
||||
unsigned int ms = atoi(strtok(NULL, " "));
|
||||
unsigned char name_len = strlen(metric);
|
||||
unsigned char p[BUF_SIZE];
|
||||
p[0] = name_len;
|
||||
memcpy(p + 1, metric, name_len);
|
||||
p[1 + name_len] = (ms >> 24) & 0xFF;
|
||||
p[1 + name_len + 1] = (ms >> 16) & 0xFF;
|
||||
p[1 + name_len + 2] = (ms >> 8) & 0xFF;
|
||||
p[1 + name_len + 3] = ms & 0xFF;
|
||||
send_to(sock, &server_addr, MSG_SUBSCRIBE, p, 1 + name_len + 4);
|
||||
printf(" Subscribed to '%s' every %u ms\n", metric, ms);
|
||||
}
|
||||
}
|
||||
else if (strcmp(line, "unsub") == 0) {
|
||||
send_to(sock, &server_addr, MSG_UNSUB, NULL, 0);
|
||||
printf(" Unsubscribed (push stopped)\n");
|
||||
}
|
||||
else if (line[0] != '\0') {
|
||||
printf(" Unknown command: '%s'\n", line);
|
||||
}
|
||||
|
||||
printf("> "); fflush(stdout);
|
||||
}
|
||||
}
|
||||
|
||||
close(sock);
|
||||
printf("\nDisconnected.\n");
|
||||
return 0;
|
||||
}
|
||||
209
src/c90/metric_server_udp.c
Normal file
209
src/c90/metric_server_udp.c
Normal file
@ -0,0 +1,209 @@
|
||||
/* metric_server_udp.c - STRICT C89/C90 UDP metric server with SUBSCRIBE/PUSH */
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <unistd.h>
|
||||
#include <sys/socket.h>
|
||||
#include <netinet/in.h>
|
||||
#include <arpa/inet.h>
|
||||
#include <time.h>
|
||||
#include <sys/select.h>
|
||||
#include <errno.h>
|
||||
|
||||
#define PORT 9999
|
||||
#define MAX_SUBS 32
|
||||
#define BUF_SIZE 1024
|
||||
|
||||
#define MSG_PING 0x01
|
||||
#define MSG_PONG 0x02
|
||||
#define MSG_METRIC_REQ 0x03
|
||||
#define MSG_METRIC_RESP 0x04
|
||||
#define MSG_SUBSCRIBE 0x05
|
||||
#define MSG_PUSH 0x06
|
||||
#define MSG_UNSUB 0x07
|
||||
#define MSG_ERROR 0xFF
|
||||
|
||||
#define HEADER_SIZE 3
|
||||
|
||||
struct Subscriber {
|
||||
struct sockaddr_in addr;
|
||||
char metric[64];
|
||||
unsigned int interval_ms;
|
||||
time_t last_push;
|
||||
int active;
|
||||
};
|
||||
|
||||
static double get_metric(const char *name)
|
||||
{
|
||||
if (strcmp(name, "cpu") == 0) return 5.0 + (rand() % 9000)/100.0;
|
||||
if (strcmp(name, "memory") == 0) return 30.0 + (rand() % 5000)/100.0;
|
||||
if (strcmp(name, "disk") == 0) return 40.0 + (rand() % 5000)/100.0;
|
||||
if (strcmp(name, "loadavg") == 0) return 0.5 + (rand() % 350)/100.0;
|
||||
if (strcmp(name, "uptime") == 0) return (double)(time(NULL) % 100000);
|
||||
return -1.0;
|
||||
}
|
||||
|
||||
static void send_to(int sock, const struct sockaddr_in *to,
|
||||
unsigned char type, const void *payload, unsigned short len)
|
||||
{
|
||||
unsigned char header[HEADER_SIZE];
|
||||
header[0] = type;
|
||||
header[1] = (len >> 8) & 0xFF;
|
||||
header[2] = len & 0xFF;
|
||||
sendto(sock, header, HEADER_SIZE, 0, (struct sockaddr *)to, sizeof(*to));
|
||||
if (len > 0)
|
||||
sendto(sock, payload, len, 0, (struct sockaddr *)to, sizeof(*to));
|
||||
}
|
||||
|
||||
static int recv_from_exact(int sock, void *buf, int n, struct sockaddr_in *from)
|
||||
{
|
||||
socklen_t len = sizeof(*from);
|
||||
int total = 0;
|
||||
while (total < n) {
|
||||
int r = recvfrom(sock, (char *)buf + total, n - total, 0,
|
||||
(struct sockaddr *)from, &len);
|
||||
if (r <= 0) return -1;
|
||||
total += r;
|
||||
}
|
||||
return total;
|
||||
}
|
||||
|
||||
static void handle_datagram(int sock, struct Subscriber subs[])
|
||||
{
|
||||
unsigned char header[HEADER_SIZE];
|
||||
unsigned char payload[BUF_SIZE];
|
||||
struct sockaddr_in from;
|
||||
unsigned char type;
|
||||
unsigned short len;
|
||||
double val;
|
||||
int i;
|
||||
|
||||
if (recv_from_exact(sock, header, HEADER_SIZE, &from) != HEADER_SIZE)
|
||||
return;
|
||||
|
||||
type = header[0];
|
||||
len = (header[1] << 8) | header[2];
|
||||
|
||||
if (len > 0) {
|
||||
if (recv_from_exact(sock, payload, len, &from) != len)
|
||||
return;
|
||||
}
|
||||
|
||||
if (type == MSG_PING) {
|
||||
send_to(sock, &from, MSG_PONG, NULL, 0);
|
||||
}
|
||||
else if (type == MSG_METRIC_REQ) {
|
||||
payload[len] = '\0';
|
||||
val = get_metric((char *)payload);
|
||||
if (val < 0.0) {
|
||||
const char *err = "Unknown metric";
|
||||
send_to(sock, &from, MSG_ERROR, err, strlen(err));
|
||||
} else {
|
||||
send_to(sock, &from, MSG_METRIC_RESP, &val, sizeof(double));
|
||||
}
|
||||
}
|
||||
else if (type == MSG_SUBSCRIBE) {
|
||||
unsigned char name_len = payload[0];
|
||||
unsigned int interval_ms = (payload[1 + name_len] << 24) |
|
||||
(payload[1 + name_len + 1] << 16) |
|
||||
(payload[1 + name_len + 2] << 8) |
|
||||
payload[1 + name_len + 3];
|
||||
|
||||
if (name_len < 63) {
|
||||
/* find free slot or overwrite existing for same client */
|
||||
for (i = 0; i < MAX_SUBS; ++i) {
|
||||
if (!subs[i].active || memcmp(&subs[i].addr, &from, sizeof(from)) == 0) {
|
||||
memcpy(&subs[i].addr, &from, sizeof(from));
|
||||
memcpy(subs[i].metric, payload + 1, name_len);
|
||||
subs[i].metric[name_len] = '\0';
|
||||
subs[i].interval_ms = interval_ms;
|
||||
subs[i].last_push = time(NULL);
|
||||
subs[i].active = 1;
|
||||
printf(" -> SUBSCRIBE '%s' every %u ms from %s:%d\n",
|
||||
subs[i].metric, interval_ms,
|
||||
inet_ntoa(from.sin_addr), ntohs(from.sin_port));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (type == MSG_UNSUB) {
|
||||
for (i = 0; i < MAX_SUBS; ++i) {
|
||||
if (subs[i].active && memcmp(&subs[i].addr, &from, sizeof(from)) == 0) {
|
||||
subs[i].active = 0;
|
||||
printf(" <- UNSUB from %s:%d\n",
|
||||
inet_ntoa(from.sin_addr), ntohs(from.sin_port));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int main(void)
|
||||
{
|
||||
int sock, i;
|
||||
struct sockaddr_in server_addr;
|
||||
fd_set readfds;
|
||||
struct timeval tv;
|
||||
struct Subscriber subs[MAX_SUBS];
|
||||
time_t now;
|
||||
|
||||
srand(time(NULL));
|
||||
memset(subs, 0, sizeof(subs));
|
||||
|
||||
sock = socket(AF_INET, SOCK_DGRAM, 0);
|
||||
if (sock < 0) { perror("socket"); exit(1); }
|
||||
|
||||
server_addr.sin_family = AF_INET;
|
||||
server_addr.sin_addr.s_addr = INADDR_ANY;
|
||||
server_addr.sin_port = htons(PORT);
|
||||
|
||||
if (bind(sock, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
|
||||
perror("bind"); exit(1);
|
||||
}
|
||||
|
||||
printf("UDP Metric server listening on port %d (fire-and-forget mode)\n", PORT);
|
||||
printf("Available metrics: cpu, memory, disk, loadavg, uptime\n");
|
||||
printf("Commands: ping, get <metric>, sub <metric> <ms>, unsub\n\n");
|
||||
|
||||
while (1) {
|
||||
FD_ZERO(&readfds);
|
||||
FD_SET(sock, &readfds);
|
||||
|
||||
tv.tv_sec = 0;
|
||||
tv.tv_usec = 100000; /* 100 ms tick for pushes */
|
||||
|
||||
if (select(sock + 1, &readfds, NULL, NULL, &tv) < 0) {
|
||||
if (errno == EINTR) continue;
|
||||
perror("select");
|
||||
break;
|
||||
}
|
||||
|
||||
if (FD_ISSET(sock, &readfds)) {
|
||||
handle_datagram(sock, subs);
|
||||
}
|
||||
|
||||
/* Periodic PUSHes */
|
||||
now = time(NULL);
|
||||
for (i = 0; i < MAX_SUBS; ++i) {
|
||||
if (subs[i].active) {
|
||||
if (now - subs[i].last_push >= subs[i].interval_ms / 1000) {
|
||||
double val = get_metric(subs[i].metric);
|
||||
if (val >= 0.0) {
|
||||
unsigned char name_len = strlen(subs[i].metric);
|
||||
unsigned char p[BUF_SIZE];
|
||||
p[0] = name_len;
|
||||
memcpy(p + 1, subs[i].metric, name_len);
|
||||
memcpy(p + 1 + name_len, &val, sizeof(double));
|
||||
send_to(sock, &subs[i].addr, MSG_PUSH,
|
||||
p, 1 + name_len + sizeof(double));
|
||||
}
|
||||
subs[i].last_push = now;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
close(sock);
|
||||
return 0;
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user