#!/usr/bin/env python3 """ Minimal metric server with custom binary protocol (cannibalised from Claude). Demonstrates low-overhead sockets vs HTTP/REST. """ import socket import struct import sys import time import os import random import threading # Protocol constants (shared with client) MSG_PING = 0x01 MSG_PONG = 0x02 MSG_METRIC_REQ = 0x03 MSG_METRIC_RESP = 0x04 MSG_ERROR = 0xFF HEADER_FMT = "!BH" # 1 byte type + 2 bytes length (big-endian) HEADER_SIZE = struct.calcsize(HEADER_FMT) def get_metric(name: str) -> float: metrics = { "cpu": lambda: random.uniform(5.0, 95.0), "memory": lambda: random.uniform(30.0, 80.0), "disk": lambda: random.uniform(40.0, 90.0), "loadavg": lambda: os.getloadavg()[0] if hasattr(os, "getloadavg") else random.uniform(0.5, 4.0), "uptime": lambda: float(int(time.time()) % 100000), } fn = metrics.get(name) if fn is None: raise KeyError(f"Unknown metric: {name}") return fn() def send_message(sock: socket.socket, msg_type: int, payload: bytes = b"") -> None: header = struct.pack(HEADER_FMT, msg_type, len(payload)) sock.sendall(header + payload) def recv_exact(sock: socket.socket, n: int) -> bytes: buf = bytearray() while len(buf) < n: chunk = sock.recv(n - len(buf)) if not chunk: return b"" buf.extend(chunk) return bytes(buf) def recv_message(sock: socket.socket) -> tuple[int, bytes]: header = recv_exact(sock, HEADER_SIZE) if not header: raise ConnectionError("Client disconnected") msg_type, length = struct.unpack(HEADER_FMT, header) payload = recv_exact(sock, length) if length > 0 else b"" return msg_type, payload def handle_client(conn: socket.socket, addr: tuple) -> None: print(f"[+] Connected: {addr[0]}:{addr[1]}") try: while True: msg_type, payload = recv_message(conn) if msg_type == MSG_PING: print(f" <- PING from {addr[0]}") send_message(conn, MSG_PONG) elif msg_type == MSG_METRIC_REQ: name = payload.decode("utf-8") print(f" <- METRIC_REQ: {name}") try: value = get_metric(name) send_message(conn, MSG_METRIC_RESP, struct.pack("!d", value)) except KeyError as e: send_message(conn, MSG_ERROR, str(e).encode("utf-8")) else: send_message(conn, MSG_ERROR, f"Unknown type 0x{msg_type:02X}".encode("utf-8")) except ConnectionError: pass finally: conn.close() print(f"[-] Disconnected: {addr[0]}:{addr[1]}") def main(): host = sys.argv[1] if len(sys.argv) > 1 else "127.0.0.1" port = int(sys.argv[2]) if len(sys.argv) > 2 else 9999 srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) srv.bind((host, port)) srv.listen(5) print(f"Metric server listening on {host}:{port}") print("Available metrics: cpu, memory, disk, loadavg, uptime\n") try: while True: conn, addr = srv.accept() threading.Thread(target=handle_client, args=(conn, addr), daemon=True).start() except KeyboardInterrupt: print("\nShutting down.") finally: srv.close() if __name__ == "__main__": main()