add forking versions and subscribe feature

This commit is contained in:
Gregory Gauthier 2026-03-26 13:44:51 +00:00
parent 27ccd5e5b0
commit b6df19eac3
7 changed files with 440 additions and 43 deletions

1
.gitignore vendored
View File

@ -1 +1,2 @@
.idea/ .idea/
build/

BIN
metric_client_sub Executable file

Binary file not shown.

BIN
metric_server_fork_sub Executable file

Binary file not shown.

View File

@ -1,4 +1,4 @@
/* metric_client.c - C89/C90 custom binary metric client */ /* metric_client.c - STRICT C89/C90 custom binary metric client */
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
@ -10,7 +10,7 @@
#define PORT 9999 #define PORT 9999
#define BUF_SIZE 1024 #define BUF_SIZE 1024
/* same constants */ /* Protocol constants (must match server) */
#define MSG_PING 0x01 #define MSG_PING 0x01
#define MSG_PONG 0x02 #define MSG_PONG 0x02
#define MSG_METRIC_REQ 0x03 #define MSG_METRIC_REQ 0x03
@ -19,89 +19,119 @@
#define HEADER_SIZE 3 #define HEADER_SIZE 3
static void send_message(int sock, unsigned char type, const void *payload, unsigned short len) { static void send_message(int sock, unsigned char type,
const void *payload, unsigned short len)
{
unsigned char header[HEADER_SIZE]; unsigned char header[HEADER_SIZE];
header[0] = type; header[0] = type;
header[1] = (len >> 8) & 0xFF; header[1] = (len >> 8) & 0xFF; /* big-endian */
header[2] = len & 0xFF; header[2] = len & 0xFF;
write(sock, header, HEADER_SIZE); write(sock, header, HEADER_SIZE);
if (len > 0) write(sock, payload, len); if (len > 0)
write(sock, payload, len);
} }
static int recv_exact(int sock, void *buf, int n) { static int recv_exact(int sock, void *buf, int n)
{
int total = 0; int total = 0;
while (total < n) { while (total < n) {
int r = read(sock, (char*)buf + total, n - total); int r = read(sock, (char *)buf + total, n - total);
if (r <= 0) return -1; if (r <= 0) return -1;
total += r; total += r;
} }
return total; return total;
} }
static void print_wire_comparison(void) { static void print_wire_comparison(void)
{
printf("\n--- Wire overhead comparison ---\n"); printf("\n--- Wire overhead comparison ---\n");
printf("Our protocol (METRIC_REQ 'cpu'): %d bytes\n", HEADER_SIZE + 3); printf("Our protocol (METRIC_REQ 'cpu'): %d bytes\n", HEADER_SIZE + 3);
printf("Equivalent HTTP GET: 77 bytes\n"); printf("Equivalent HTTP GET: 77 bytes\n");
printf("HTTP is ~13x larger\n\n"); printf("HTTP is ~13x larger\n\n");
} }
int main(void) { int main(void)
{
int sock; int sock;
struct sockaddr_in addr; struct sockaddr_in addr;
unsigned char header[HEADER_SIZE]; unsigned char header[HEADER_SIZE];
unsigned char payload[BUF_SIZE]; unsigned char payload[BUF_SIZE];
double val; double val;
char line[256]; char line[256];
unsigned char type;
unsigned short len;
const char *metric;
sock = socket(AF_INET, SOCK_STREAM, 0); sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0) {
perror("socket");
exit(1);
}
addr.sin_family = AF_INET; addr.sin_family = AF_INET;
addr.sin_port = htons(PORT); addr.sin_port = htons(PORT);
inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr); inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr);
if (connect(sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) { if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
perror("connect"); perror("connect");
close(sock);
exit(1); exit(1);
} }
printf("Connected to 127.0.0.1:%d\n", PORT); printf("Connected to 127.0.0.1:%d (STRICT C89 client)\n", PORT);
print_wire_comparison(); print_wire_comparison();
printf("Commands: ping | get <metric> | quit\nMetrics: cpu, memory, disk, loadavg, uptime\n\n"); printf("Commands: ping | get <metric> | quit\n");
printf("Metrics: cpu, memory, disk, loadavg, uptime\n\n");
while (1) { while (1) {
printf("> "); printf("> ");
if (fgets(line, sizeof(line), stdin) == NULL) break; if (fgets(line, sizeof(line), stdin) == NULL)
break;
line[strcspn(line, "\n")] = '\0'; line[strcspn(line, "\n")] = '\0';
if (strcmp(line, "quit") == 0) break; if (strcmp(line, "quit") == 0) {
if (strcmp(line, "ping") == 0) { break;
}
else if (strcmp(line, "ping") == 0) {
send_message(sock, MSG_PING, NULL, 0); send_message(sock, MSG_PING, NULL, 0);
} else if (strncmp(line, "get ", 4) == 0) { }
const char *metric = line + 4; else if (strncmp(line, "get ", 4) == 0) {
metric = line + 4;
send_message(sock, MSG_METRIC_REQ, metric, strlen(metric)); send_message(sock, MSG_METRIC_REQ, metric, strlen(metric));
} else { }
else {
printf(" Unknown command.\n"); printf(" Unknown command.\n");
continue; continue;
} }
/* blocking receive */ /* blocking receive */
if (recv_exact(sock, header, HEADER_SIZE) != HEADER_SIZE) break; if (recv_exact(sock, header, HEADER_SIZE) != HEADER_SIZE)
unsigned char type = header[0]; break;
unsigned short len = (header[1] << 8) | header[2];
type = header[0];
len = (header[1] << 8) | header[2];
if (len > 0) { if (len > 0) {
if (recv_exact(sock, payload, len) != len) break; if (recv_exact(sock, payload, len) != len)
break;
} }
if (type == MSG_PONG) { if (type == MSG_PONG) {
printf(" [PONG] server alive\n"); printf(" [PONG] server alive\n");
} else if (type == MSG_METRIC_RESP) { }
else if (type == MSG_METRIC_RESP) {
memcpy(&val, payload, sizeof(double)); memcpy(&val, payload, sizeof(double));
printf(" [METRIC] %.2f\n", val); printf(" [METRIC] %.2f\n", val);
} else if (type == MSG_ERROR) { }
else if (type == MSG_ERROR) {
payload[len] = '\0'; payload[len] = '\0';
printf(" [ERROR] %s\n", payload); printf(" [ERROR] %s\n", payload);
} }
else {
printf(" [???] Unknown message type 0x%02X\n", type);
}
} }
close(sock); close(sock);

173
src/c90/metric_client_sub.c Normal file
View File

@ -0,0 +1,173 @@
/* metric_client.c - STRICT C89/C90 client with live SUBSCRIBE/PUSH using select() */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/select.h>
#include <errno.h>
#define PORT 9999
#define BUF_SIZE 1024
#define MSG_PING 0x01
#define MSG_PONG 0x02
#define MSG_METRIC_REQ 0x03
#define MSG_METRIC_RESP 0x04
#define MSG_SUBSCRIBE 0x05
#define MSG_PUSH 0x06
#define MSG_ERROR 0xFF
#define HEADER_SIZE 3
static void send_message(int sock, unsigned char type,
const void *payload, unsigned short len)
{
unsigned char header[HEADER_SIZE];
header[0] = type;
header[1] = (len >> 8) & 0xFF;
header[2] = len & 0xFF;
write(sock, header, HEADER_SIZE);
if (len > 0)
write(sock, payload, len);
}
static int recv_exact(int sock, void *buf, int n)
{
int total = 0;
while (total < n) {
int r = read(sock, (char *)buf + total, n - total);
if (r <= 0) return -1;
total += r;
}
return total;
}
static void print_wire_comparison(void)
{
printf("\n--- Wire overhead comparison ---\n");
printf("Our protocol (METRIC_REQ 'cpu'): %d bytes\n", HEADER_SIZE + 3);
printf("Equivalent HTTP GET: 77 bytes\n");
printf("HTTP is ~13x larger\n\n");
}
int main(void)
{
int sock, maxfd;
struct sockaddr_in addr;
unsigned char header[HEADER_SIZE];
unsigned char payload[BUF_SIZE];
double val;
char line[256];
unsigned char type;
unsigned short len;
const char *metric;
unsigned int interval_ms;
unsigned char name_len;
fd_set readfds;
sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0) { perror("socket"); exit(1); }
addr.sin_family = AF_INET;
addr.sin_port = htons(PORT);
inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr);
if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
perror("connect"); close(sock); exit(1);
}
printf("Connected to 127.0.0.1:%d (STRICT C89 live client)\n", PORT);
print_wire_comparison();
printf("Commands: ping | get <metric> | sub <metric> <ms> | quit\n");
printf("Metrics: cpu, memory, disk, loadavg, uptime\n\n");
maxfd = sock > 0 ? sock : 0;
while (1) {
FD_ZERO(&readfds);
FD_SET(0, &readfds); /* stdin */
FD_SET(sock, &readfds); /* socket */
if (select(maxfd + 1, &readfds, NULL, NULL, NULL) < 0) {
if (errno == EINTR) continue;
perror("select");
break;
}
/* 1. Data ready on the socket → handle PUSH / response immediately */
if (FD_ISSET(sock, &readfds)) {
if (recv_exact(sock, header, HEADER_SIZE) != HEADER_SIZE) break;
type = header[0];
len = (header[1] << 8) | header[2];
if (len > 0) {
if (recv_exact(sock, payload, len) != len) break;
}
if (type == MSG_PONG) {
printf("\n [PONG] server alive\n");
}
else if (type == MSG_METRIC_RESP) {
memcpy(&val, payload, sizeof(double));
printf("\n [METRIC] %.2f\n", val);
}
else if (type == MSG_PUSH) {
name_len = payload[0];
payload[1 + name_len] = '\0';
memcpy(&val, payload + 1 + name_len, sizeof(double));
printf("\n [PUSH] %s: %.2f\n", (char *)payload + 1, val);
}
else if (type == MSG_ERROR) {
payload[len] = '\0';
printf("\n [ERROR] %s\n", payload);
}
else {
printf("\n [???] Unknown type 0x%02X\n", type);
}
printf("> "); fflush(stdout); /* re-print prompt after async message */
}
/* 2. User typed something on stdin */
if (FD_ISSET(0, &readfds)) {
if (fgets(line, sizeof(line), stdin) == NULL) break;
line[strcspn(line, "\n")] = '\0';
if (strcmp(line, "quit") == 0) break;
else if (strcmp(line, "ping") == 0) {
send_message(sock, MSG_PING, NULL, 0);
}
else if (strncmp(line, "get ", 4) == 0) {
metric = line + 4;
send_message(sock, MSG_METRIC_REQ, metric, strlen(metric));
}
else if (strncmp(line, "sub ", 4) == 0) {
metric = strtok(line + 4, " ");
if (metric) {
interval_ms = atoi(strtok(NULL, " "));
name_len = strlen(metric);
payload[0] = name_len;
memcpy(payload + 1, metric, name_len);
payload[1 + name_len] = (interval_ms >> 24) & 0xFF;
payload[1 + name_len + 1] = (interval_ms >> 16) & 0xFF;
payload[1 + name_len + 2] = (interval_ms >> 8) & 0xFF;
payload[1 + name_len + 3] = interval_ms & 0xFF;
send_message(sock, MSG_SUBSCRIBE, payload, 1 + name_len + 4);
printf(" Subscribed to '%s' every %u ms\n", metric, interval_ms);
}
}
else {
printf(" Unknown command.\n");
}
printf("> "); fflush(stdout);
}
}
close(sock);
printf("\nDisconnected.\n");
return 0;
}

View File

@ -1,4 +1,4 @@
/* metric_server_fork.c - C89/C90 multi-client metric server using fork() */ /* metric_server_fork.c - STRICT C89/C90 multi-client metric server using fork() */
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
@ -58,9 +58,12 @@ static int recv_exact(int sock, void *buf, int n)
static void handle_client(int client_sock, struct sockaddr_in *addr) static void handle_client(int client_sock, struct sockaddr_in *addr)
{ {
unsigned char header[HEADER_SIZE]; unsigned char header[HEADER_SIZE]; /* all declarations first */
unsigned char payload[BUF_SIZE]; unsigned char payload[BUF_SIZE];
char client_ip[INET_ADDRSTRLEN]; char client_ip[INET_ADDRSTRLEN];
unsigned char type;
unsigned short len;
double val;
inet_ntop(AF_INET, &addr->sin_addr, client_ip, sizeof(client_ip)); inet_ntop(AF_INET, &addr->sin_addr, client_ip, sizeof(client_ip));
printf("[+] Connected from %s:%d\n", client_ip, ntohs(addr->sin_port)); printf("[+] Connected from %s:%d\n", client_ip, ntohs(addr->sin_port));
@ -69,8 +72,8 @@ static void handle_client(int client_sock, struct sockaddr_in *addr)
if (recv_exact(client_sock, header, HEADER_SIZE) != HEADER_SIZE) if (recv_exact(client_sock, header, HEADER_SIZE) != HEADER_SIZE)
break; break;
unsigned char type = header[0]; type = header[0];
unsigned short len = (header[1] << 8) | header[2]; len = (header[1] << 8) | header[2];
if (len > 0) { if (len > 0) {
if (recv_exact(client_sock, payload, len) != len) if (recv_exact(client_sock, payload, len) != len)
@ -83,7 +86,7 @@ static void handle_client(int client_sock, struct sockaddr_in *addr)
} }
else if (type == MSG_METRIC_REQ) { else if (type == MSG_METRIC_REQ) {
payload[len] = '\0'; payload[len] = '\0';
double val = get_metric((char *)payload); val = get_metric((char *)payload);
printf(" <- METRIC_REQ '%s' from %s\n", (char *)payload, client_ip); printf(" <- METRIC_REQ '%s' from %s\n", (char *)payload, client_ip);
if (val < 0.0) { if (val < 0.0) {
const char *err = "Unknown metric"; const char *err = "Unknown metric";
@ -111,7 +114,6 @@ int main(void)
srand(time(NULL)); srand(time(NULL));
/* Classic Unix trick to reap children automatically (no zombies) */
signal(SIGCHLD, SIG_IGN); signal(SIGCHLD, SIG_IGN);
srv_sock = socket(AF_INET, SOCK_STREAM, 0); srv_sock = socket(AF_INET, SOCK_STREAM, 0);
@ -147,17 +149,17 @@ int main(void)
} }
pid = fork(); pid = fork();
if (pid < 0) { /* fork failed */ if (pid < 0) {
perror("fork"); perror("fork");
close(client_sock); close(client_sock);
} }
else if (pid == 0) { /* CHILD process */ else if (pid == 0) { /* CHILD */
close(srv_sock); /* child does not need the listening socket */ close(srv_sock);
handle_client(client_sock, &client_addr); handle_client(client_sock, &client_addr);
exit(0); exit(0);
} }
else { /* PARENT process */ else { /* PARENT */
close(client_sock); /* parent does not need the client socket */ close(client_sock);
} }
} }

View File

@ -0,0 +1,191 @@
/* metric_server_fork_sub.c - STRICT C89/C90 multi-client metric server with SUBSCRIBE/PUSH */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <time.h>
#include <signal.h>
#include <errno.h>
#define PORT 9999
#define BACKLOG 10
#define BUF_SIZE 1024
#define MSG_PING 0x01
#define MSG_PONG 0x02
#define MSG_METRIC_REQ 0x03
#define MSG_METRIC_RESP 0x04
#define MSG_SUBSCRIBE 0x05
#define MSG_PUSH 0x06
#define MSG_ERROR 0xFF
#define HEADER_SIZE 3
static double get_metric(const char *name)
{
if (strcmp(name, "cpu") == 0) return 5.0 + (rand() % 9000)/100.0;
if (strcmp(name, "memory") == 0) return 30.0 + (rand() % 5000)/100.0;
if (strcmp(name, "disk") == 0) return 40.0 + (rand() % 5000)/100.0;
if (strcmp(name, "loadavg") == 0) return 0.5 + (rand() % 350)/100.0;
if (strcmp(name, "uptime") == 0) return (double)(time(NULL) % 100000);
return -1.0;
}
static void send_message(int sock, unsigned char type,
const void *payload, unsigned short len)
{
unsigned char header[HEADER_SIZE];
header[0] = type;
header[1] = (len >> 8) & 0xFF;
header[2] = len & 0xFF;
write(sock, header, HEADER_SIZE);
if (len > 0)
write(sock, payload, len);
}
static int recv_exact(int sock, void *buf, int n)
{
int total = 0;
while (total < n) {
int r = read(sock, (char *)buf + total, n - total);
if (r <= 0) return -1;
total += r;
}
return total;
}
static void handle_client(int client_sock, struct sockaddr_in *addr)
{
unsigned char header[HEADER_SIZE];
unsigned char payload[BUF_SIZE];
char client_ip[INET_ADDRSTRLEN];
unsigned char type;
unsigned short len;
double val;
char *metric_name;
unsigned int interval_ms;
unsigned char name_len;
inet_ntop(AF_INET, &addr->sin_addr, client_ip, sizeof(client_ip));
printf("[+] Connected from %s:%d\n", client_ip, ntohs(addr->sin_port));
while (1) {
if (recv_exact(client_sock, header, HEADER_SIZE) != HEADER_SIZE)
break;
type = header[0];
len = (header[1] << 8) | header[2];
if (len > 0) {
if (recv_exact(client_sock, payload, len) != len)
break;
}
if (type == MSG_PING) {
printf(" <- PING from %s\n", client_ip);
send_message(client_sock, MSG_PONG, NULL, 0);
}
else if (type == MSG_METRIC_REQ) {
payload[len] = '\0';
val = get_metric((char *)payload);
printf(" <- METRIC_REQ '%s' from %s\n", (char *)payload, client_ip);
if (val < 0.0) {
const char *err = "Unknown metric";
send_message(client_sock, MSG_ERROR, err, strlen(err));
} else {
send_message(client_sock, MSG_METRIC_RESP, &val, sizeof(double));
}
}
else if (type == MSG_SUBSCRIBE) {
name_len = payload[0];
payload[name_len + 1] = '\0';
metric_name = (char *)payload + 1;
interval_ms = (payload[name_len + 1] << 24) |
(payload[name_len + 2] << 16) |
(payload[name_len + 3] << 8) |
payload[name_len + 4];
printf(" -> SUBSCRIBE '%s' every %u ms from %s\n", metric_name, interval_ms, client_ip);
/* Simple push loop inside this child process */
while (1) {
val = get_metric(metric_name);
if (val < 0.0) break;
name_len = strlen(metric_name);
payload[0] = name_len;
memcpy(payload + 1, metric_name, name_len);
memcpy(payload + 1 + name_len, &val, sizeof(double));
send_message(client_sock, MSG_PUSH,
payload, 1 + name_len + sizeof(double));
usleep(interval_ms * 1000);
}
}
else {
const char *err = "Unknown message type";
send_message(client_sock, MSG_ERROR, err, strlen(err));
}
}
close(client_sock);
printf("[-] Disconnected from %s\n", client_ip);
}
int main(void)
{
int srv_sock, client_sock;
struct sockaddr_in server_addr, client_addr;
socklen_t client_len = sizeof(client_addr);
pid_t pid;
srand(time(NULL));
signal(SIGCHLD, SIG_IGN);
srv_sock = socket(AF_INET, SOCK_STREAM, 0);
if (srv_sock < 0) { perror("socket"); exit(1); }
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(PORT);
if (bind(srv_sock, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
perror("bind"); exit(1);
}
if (listen(srv_sock, BACKLOG) < 0) {
perror("listen"); exit(1);
}
printf("Metric server (fork() + SUBSCRIBE/PUSH) listening on port %d\n", PORT);
printf("Available metrics: cpu, memory, disk, loadavg, uptime\n");
printf("Commands: ping, get <metric>, sub <metric> <ms>\n\n");
while (1) {
client_sock = accept(srv_sock, (struct sockaddr *)&client_addr, &client_len);
if (client_sock < 0) {
if (errno == EINTR) continue;
perror("accept");
continue;
}
pid = fork();
if (pid < 0) {
perror("fork");
close(client_sock);
}
else if (pid == 0) {
close(srv_sock);
handle_client(client_sock, &client_addr);
exit(0);
}
else {
close(client_sock);
}
}
close(srv_sock);
return 0;
}