From 74656cb95256a8e6c770929b9c4f857769d8a1ee Mon Sep 17 00:00:00 2001 From: Stackie Jia Date: Mon, 18 May 2026 18:34:13 +0800 Subject: [PATCH 1/5] fix(http-proxy): drain large m3u rewrite bodies --- e2e/test_http_proxy_m3u_rewrite.py | 28 ++++++++++++++++++++++++++++ src/http_proxy.c | 4 +++- 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/e2e/test_http_proxy_m3u_rewrite.py b/e2e/test_http_proxy_m3u_rewrite.py index 0d258ad..1703697 100644 --- a/e2e/test_http_proxy_m3u_rewrite.py +++ b/e2e/test_http_proxy_m3u_rewrite.py @@ -13,6 +13,7 @@ R2HProcess, find_free_port, http_get, + stream_get, ) pytestmark = pytest.mark.http_proxy @@ -571,6 +572,33 @@ def test_mixed_absolute_and_relative(self, shared_r2h): finally: upstream.stop() + def test_large_playlist_body_is_fully_buffered(self, shared_r2h): + """A large M3U body should be fully read before rewriting.""" + segment_count = 4096 + segments = "".join("#EXTINF:10,\nsegment-%04d.ts?token=abcdef0123456789\n" % i for i in range(segment_count)) + m3u = "#EXTM3U\n#EXT-X-TARGETDURATION:10\n" + segments + "#EXT-X-ENDLIST\n" + upstream = _make_m3u_upstream("/lookback/long.m3u8", m3u) + try: + status, hdrs, body = stream_get( + "127.0.0.1", + shared_r2h.port, + f"/http/127.0.0.1:{upstream.port}/lookback/long.m3u8", + read_bytes=512 * 1024, + timeout=_TIMEOUT, + ) + text = body.decode("utf-8", errors="replace") + assert status == 200 + assert f"/http/127.0.0.1:{upstream.port}/lookback/segment-0000.ts?token=abcdef0123456789" in text + assert ( + f"/http/127.0.0.1:{upstream.port}/lookback/segment-{segment_count - 1:04d}.ts?token=abcdef0123456789" + in text + ) + cl = hdrs.get("content-length") + assert cl is not None + assert int(cl) == len(body) + finally: + upstream.stop() + # --------------------------------------------------------------------------- # Edge cases diff --git a/src/http_proxy.c b/src/http_proxy.c index 52f85bc..ee27780 100644 --- a/src/http_proxy.c +++ b/src/http_proxy.c @@ -805,7 +805,7 @@ static int http_proxy_try_receive_response(http_proxy_session_t *session) { return http_proxy_finalize_rewrite(session); } - return 0; /* Keep buffering */ + return (int)received; /* Progress: keep draining edge-triggered sockets */ } /* Phase 2: Zero-copy streaming - recv directly to buffer pool */ @@ -930,6 +930,8 @@ static int http_proxy_try_receive_response(http_proxy_session_t *session) { logger(LOG_DEBUG, "HTTP Proxy: All M3U content received with headers (%zd bytes)", session->bytes_received); return http_proxy_finalize_rewrite(session); } + + bytes_forwarded = (int)initial_size; } else { /* Normal mode: forward immediately */ if (connection_queue_output(session->conn, session->response_buffer, session->response_buffer_pos) < 0) { From 2ab468794b746d9f4b3dbfa9d03f113d85c32443 Mon Sep 17 00:00:00 2001 From: Stackie Jia Date: Mon, 18 May 2026 18:52:36 +0800 Subject: [PATCH 2/5] fix(polling): drain edge-triggered socket paths --- e2e/test_error.py | 45 ++++++++++ src/connection.c | 5 ++ src/fcc.c | 6 +- src/http_proxy.c | 2 +- src/multicast.c | 6 +- src/rtsp.c | 217 +++++++++++++++++++++++++++------------------- src/utils.c | 23 +++++ src/utils.h | 8 ++ 8 files changed, 217 insertions(+), 95 deletions(-) diff --git a/e2e/test_error.py b/e2e/test_error.py index 1b513d1..a95ea5c 100644 --- a/e2e/test_error.py +++ b/e2e/test_error.py @@ -186,6 +186,51 @@ def test_very_long_url(self, basic_r2h): status, _, _ = http_get("127.0.0.1", basic_r2h.port, "/status") assert status == 200 + def _send_raw_request(self, port: int, request: bytes, timeout: float = 2.0) -> tuple[bytes, bool]: + sock = socket.create_connection(("127.0.0.1", port), timeout=timeout) + sock.settimeout(timeout) + try: + sock.sendall(request) + data = b"" + while True: + try: + chunk = sock.recv(4096) + except socket.timeout: + return data, True + if not chunk: + return data, False + data += chunk + if b"\r\n\r\n" in data: + return data, False + finally: + sock.close() + + def test_oversized_incomplete_request_line_fails_deterministically(self, r2h_binary): + """An oversized request line without CRLF should not spin forever.""" + port = find_free_port() + r2h = R2HProcess(r2h_binary, port, extra_args=["-v", "4", "-m", "100"]) + try: + r2h.start() + request = b"GET /" + (b"a" * 9000) + data, timed_out = self._send_raw_request(port, request) + assert not timed_out + assert data == b"" or b"400 Bad Request" in data + finally: + r2h.stop() + + def test_oversized_incomplete_header_fails_deterministically(self, r2h_binary): + """Oversized headers without the terminating CRLF should not spin forever.""" + port = find_free_port() + r2h = R2HProcess(r2h_binary, port, extra_args=["-v", "4", "-m", "100"]) + try: + r2h.start() + request = b"GET /status HTTP/1.1\r\nHost: 127.0.0.1\r\nX-Fill: " + (b"a" * 9000) + data, timed_out = self._send_raw_request(port, request) + assert not timed_out + assert data == b"" or b"400 Bad Request" in data + finally: + r2h.stop() + # --------------------------------------------------------------------------- # Concurrent connections diff --git a/src/connection.c b/src/connection.c index 90ac037..8a02fae 100644 --- a/src/connection.c +++ b/src/connection.c @@ -711,6 +711,11 @@ void connection_handle_read(connection_t *c) { c->state = CONN_CLOSING; return; } + if (c->in_len == INBUF_SIZE) { + logger(LOG_WARN, "HTTP request exceeded input buffer before headers completed"); + http_send_400(c); + return; + } /* else parse_result == 0: need more data, keep reading */ } else { return; /* Not in a request-reading state */ diff --git a/src/fcc.c b/src/fcc.c index 1bb4366..a1a17c6 100644 --- a/src/fcc.c +++ b/src/fcc.c @@ -421,9 +421,9 @@ int fcc_handle_socket_event(stream_context_t *ctx, int fd, int64_t now) { /* Buffer pool exhausted - drop this packet */ logger(LOG_DEBUG, "FCC: Buffer pool exhausted, dropping packet"); fcc->last_data_time = now; - /* Drain the socket to prevent event loop spinning */ - uint8_t dummy[BUFFER_POOL_BUFFER_SIZE]; - recvfrom(recv_sock, dummy, sizeof(dummy), 0, NULL, NULL); + if (drain_socket_until_eagain(recv_sock) < 0) { + logger(LOG_ERROR, "FCC: Drain failed: %s", strerror(errno)); + } return 0; } diff --git a/src/http_proxy.c b/src/http_proxy.c index ee27780..c9c333c 100644 --- a/src/http_proxy.c +++ b/src/http_proxy.c @@ -899,7 +899,7 @@ static int http_proxy_try_receive_response(http_proxy_session_t *session) { return -1; } if (result == 0) { - return 0; /* Need more data for headers */ + return (int)received; /* Progress: keep draining edge-triggered sockets */ } /* result > 0 means headers complete, state is now STREAMING */ } diff --git a/src/multicast.c b/src/multicast.c index ffe291f..196bf2d 100644 --- a/src/multicast.c +++ b/src/multicast.c @@ -512,9 +512,9 @@ int mcast_session_handle_event(mcast_session_t *session, stream_context_t *ctx, if (!recv_buf) { logger(LOG_DEBUG, "Multicast: Buffer pool exhausted, dropping packet"); session->last_data_time = now; - /* Drain socket to prevent event loop spinning */ - uint8_t dummy[BUFFER_POOL_BUFFER_SIZE]; - recv(session->sock, dummy, sizeof(dummy), 0); + if (drain_socket_until_eagain(session->sock) < 0) { + logger(LOG_DEBUG, "Multicast: Drain failed: %s", strerror(errno)); + } return 0; } diff --git a/src/rtsp.c b/src/rtsp.c index 604fed0..c72d392 100644 --- a/src/rtsp.c +++ b/src/rtsp.c @@ -35,6 +35,7 @@ static const char rtsp_default_user_agent[] = "rtp2httpd/" VERSION; #define RTSP_RESPONSE_ADVANCE 1 #define RTSP_RESPONSE_KEEPALIVE 2 #define RTSP_RESPONSE_DURATION 3 +#define RTSP_RESPONSE_PROGRESS 4 #define RTSP_RESPONSE_ERROR -1 /* Helper function prototypes */ static int rtsp_prepare_request(rtsp_session_t *session, const char *method, const char *url, @@ -51,6 +52,7 @@ static void rtsp_send_udp_nat_probe(rtsp_session_t *session); static int rtsp_process_interleaved_buffer(rtsp_session_t *session, connection_t *conn); static int rtsp_handle_redirect(rtsp_session_t *session, const char *location); static void rtsp_parse_describe_sdp(rtsp_session_t *session, const char *header_start, const char *sdp_body); +static int rtsp_handle_terminal_socket_event(rtsp_session_t *session, int is_error); static int rtsp_initiate_teardown(rtsp_session_t *session); static int rtsp_reconnect_for_teardown(rtsp_session_t *session); static void rtsp_force_cleanup(rtsp_session_t *session); @@ -816,46 +818,47 @@ int rtsp_connect(rtsp_session_t *session) { return 0; } -int rtsp_handle_socket_event(rtsp_session_t *session, uint32_t events) { - int result; +static int rtsp_handle_terminal_socket_event(rtsp_session_t *session, int is_error) { + if (!is_error) { + logger(LOG_INFO, "RTSP: Server closed connection"); + } - /* Check for connection errors or hangup */ - if (events & (POLLER_HUP | POLLER_ERR | POLLER_RDHUP)) { - if (events & POLLER_ERR) { - int sock_error = 0; - socklen_t error_len = sizeof(sock_error); - if (getsockopt(session->socket, SOL_SOCKET, SO_ERROR, &sock_error, &error_len) == 0 && sock_error != 0) { - logger(LOG_ERROR, "RTSP: Socket error: %s", strerror(sock_error)); - } else { - logger(LOG_ERROR, "RTSP: Socket error event received"); - } - } else if (events & (POLLER_HUP | POLLER_RDHUP)) { - logger(LOG_INFO, "RTSP: Server closed connection"); - } + /* Some servers close instead of sending a TEARDOWN response. */ + if (session->state == RTSP_STATE_SENDING_TEARDOWN || session->state == RTSP_STATE_AWAITING_TEARDOWN) { + logger(LOG_DEBUG, "RTSP: Server closed connection during TEARDOWN (acceptable)"); + rtsp_force_cleanup(session); + return -1; + } - /* If we're in TEARDOWN states, server closing connection is acceptable - * (some servers don't send TEARDOWN response before closing) */ - if (session->state == RTSP_STATE_SENDING_TEARDOWN || session->state == RTSP_STATE_AWAITING_TEARDOWN) { - logger(LOG_DEBUG, "RTSP: Server closed connection during TEARDOWN (acceptable)"); - rtsp_force_cleanup(session); - return -1; - } + if (session->state == RTSP_STATE_PLAYING) { + logger(LOG_INFO, "RTSP: Upstream closed during PLAYING, draining client"); + rtsp_force_cleanup(session); + connection_begin_drain_close(session->conn); + return 0; + } - /* During PLAYING: upstream is done — drain pending client output - * before disconnecting regardless of error/hangup distinction. */ - if (session->state == RTSP_STATE_PLAYING) { - logger(LOG_INFO, "RTSP: Upstream closed during PLAYING, draining client"); - rtsp_force_cleanup(session); - if (session->conn && session->conn->state != CONN_CLOSING) { - session->conn->state = CONN_CLOSING; - connection_epoll_update_events(session->conn->epfd, session->conn->fd, - POLLER_IN | POLLER_OUT | POLLER_RDHUP | POLLER_HUP | POLLER_ERR); - } - return 0; + rtsp_session_set_state(session, RTSP_STATE_ERROR); + return -1; +} + +int rtsp_handle_socket_event(rtsp_session_t *session, uint32_t events) { + int result; + + /* Keep real socket errors strict; plain hangup is deferred below when + * readable data arrived in the same edge-triggered event. */ + if (events & POLLER_ERR) { + int sock_error = 0; + socklen_t error_len = sizeof(sock_error); + if (getsockopt(session->socket, SOL_SOCKET, SO_ERROR, &sock_error, &error_len) == 0 && sock_error != 0) { + logger(LOG_ERROR, "RTSP: Socket error: %s", strerror(sock_error)); + } else { + logger(LOG_ERROR, "RTSP: Socket error event received"); } + return rtsp_handle_terminal_socket_event(session, 1); + } - rtsp_session_set_state(session, RTSP_STATE_ERROR); - return -1; /* Connection closed or error */ + if ((events & (POLLER_HUP | POLLER_RDHUP)) && !(events & POLLER_IN)) { + return rtsp_handle_terminal_socket_event(session, 0); } /* Handle connection completion (both initial and reconnect for TEARDOWN) */ @@ -944,65 +947,81 @@ int rtsp_handle_socket_event(rtsp_session_t *session, uint32_t events) { /* Handle readable socket - try to receive response */ if (events & POLLER_IN) { if (session->awaiting_response) { - int response_result = rtsp_try_receive_response(session); - if (response_result < 0) { - logger(LOG_ERROR, "RTSP: Failed to receive response"); - rtsp_session_set_state(session, RTSP_STATE_ERROR); - return -1; - } - - if (response_result == RTSP_RESPONSE_DURATION) { - return -2; - } - - /* Re-enable POLLER_OUT for next request */ - if (response_result == RTSP_RESPONSE_ADVANCE && session->epoll_fd >= 0) { - if (poller_mod(session->epoll_fd, session->socket, - POLLER_IN | POLLER_OUT | POLLER_HUP | POLLER_ERR | POLLER_RDHUP) < 0) { - logger(LOG_ERROR, "RTSP: Failed to modify poller events: %s", strerror(errno)); + for (;;) { + int response_result = rtsp_try_receive_response(session); + if (response_result < 0) { + logger(LOG_ERROR, "RTSP: Failed to receive response"); rtsp_session_set_state(session, RTSP_STATE_ERROR); return -1; } - } - if (response_result == RTSP_RESPONSE_KEEPALIVE) { - /* For TCP mode, process any preserved interleaved data in buffer - * (without recv - just drain what's already buffered) */ - if (session->transport_mode == RTSP_TRANSPORT_TCP && session->response_buffer_pos > 0 && session->conn) { - result = rtsp_process_interleaved_buffer(session, session->conn); - if (result < 0) { + if (response_result == RTSP_RESPONSE_PROGRESS) { + continue; + } + + if (response_result == RTSP_RESPONSE_DURATION) { + return -2; + } + + /* Re-enable POLLER_OUT for next request */ + if (response_result == RTSP_RESPONSE_ADVANCE && session->epoll_fd >= 0) { + if (poller_mod(session->epoll_fd, session->socket, + POLLER_IN | POLLER_OUT | POLLER_HUP | POLLER_ERR | POLLER_RDHUP) < 0) { + logger(LOG_ERROR, "RTSP: Failed to modify poller events: %s", strerror(errno)); rtsp_session_set_state(session, RTSP_STATE_ERROR); return -1; } - return result; } - return 0; /* Keepalive handled */ - } - /* If response is still incomplete (return 0 with awaiting_response - * still set), just return - don't advance state machine. This happens - * in TCP mode when interleaved data was drained but the RTSP response - * hasn't arrived yet. */ - if (session->awaiting_response) { - return 0; - } + if (response_result == RTSP_RESPONSE_KEEPALIVE) { + /* For TCP mode, process any preserved interleaved data in buffer + * (without recv - just drain what's already buffered) */ + if (session->transport_mode == RTSP_TRANSPORT_TCP && session->response_buffer_pos > 0 && session->conn) { + result = rtsp_process_interleaved_buffer(session, session->conn); + if (result < 0) { + rtsp_session_set_state(session, RTSP_STATE_ERROR); + return -1; + } + if (events & (POLLER_HUP | POLLER_RDHUP)) { + return rtsp_handle_terminal_socket_event(session, 0); + } + return result; + } + if (events & (POLLER_HUP | POLLER_RDHUP)) { + return rtsp_handle_terminal_socket_event(session, 0); + } + return 0; /* Keepalive handled */ + } - /* Advance state machine to prepare next request (or enter PLAYING state) - */ - result = rtsp_state_machine_advance(session); - if (result < 0) { - if (result == -2) + /* If response is still incomplete after draining to EAGAIN, wait for + * the next edge. */ + if (session->awaiting_response) { + if (events & (POLLER_HUP | POLLER_RDHUP)) { + return rtsp_handle_terminal_socket_event(session, 0); + } + return 0; + } + + /* Advance state machine to prepare next request (or enter PLAYING + * state). */ + result = rtsp_state_machine_advance(session); + if (result < 0) { + if (result == -2) + return -1; + rtsp_session_set_state(session, RTSP_STATE_ERROR); return -1; - rtsp_session_set_state(session, RTSP_STATE_ERROR); - return -1; - } + } - /* For edge-triggered pollers: after transitioning to PLAYING, fall - * through to process any preserved RTP data in the response buffer - * and drain remaining socket data. Without this, the preserved data - * would sit unprocessed until a new edge arrives. */ - if (session->state != RTSP_STATE_PLAYING) { - return result; + /* For edge-triggered pollers: after transitioning to PLAYING, fall + * through to process any preserved RTP data in the response buffer + * and drain remaining socket data. */ + if (session->state != RTSP_STATE_PLAYING) { + if (events & (POLLER_HUP | POLLER_RDHUP)) { + return rtsp_handle_terminal_socket_event(session, 0); + } + return result; + } + break; } } @@ -1020,10 +1039,17 @@ int rtsp_handle_socket_event(rtsp_session_t *session, uint32_t events) { } return 0; } + if (events & (POLLER_HUP | POLLER_RDHUP)) { + return rtsp_handle_terminal_socket_event(session, 0); + } return result; /* Return number of bytes forwarded to client */ } } + if (events & (POLLER_HUP | POLLER_RDHUP)) { + return rtsp_handle_terminal_socket_event(session, 0); + } + /* Only advance state machine on initial connection or after response received */ /* For SENDING_* states, just wait for POLLER_OUT to complete the send */ @@ -1222,10 +1248,13 @@ static int rtsp_try_send_pending(rtsp_session_t *session) { * RTSP_RESPONSE_ADVANCE: Response complete, re-enable POLLER_OUT * RTSP_RESPONSE_KEEPALIVE: Keepalive response handled * RTSP_RESPONSE_OK: Waiting for more data, or response processed internally + * RTSP_RESPONSE_PROGRESS: Bytes/buffered data consumed, response incomplete * RTSP_RESPONSE_ERROR: recv failure, connection closed, or parse error * RTSP_RESPONSE_DURATION: Duration query completed */ static int rtsp_try_receive_response(rtsp_session_t *session) { + int made_progress = 0; + if (!session->awaiting_response) { return RTSP_RESPONSE_OK; } @@ -1252,6 +1281,7 @@ static int rtsp_try_receive_response(rtsp_session_t *session) { } session->response_buffer_pos += (size_t)received; + made_progress = 1; } /* NUL-terminate for strstr-based parsing. Clamp to buffer bounds since @@ -1283,6 +1313,9 @@ static int rtsp_try_receive_response(rtsp_session_t *session) { session->awaiting_keepalive_response = 0; return RTSP_RESPONSE_ERROR; } + if (drain_result > 0) { + made_progress = 1; + } /* Buffer contents changed - skip stale response_offset logic */ } else if (response_offset > 0 && response_offset != session->response_buffer_pos) { /* Move RTSP response start to buffer beginning to make room */ @@ -1291,6 +1324,7 @@ static int rtsp_try_receive_response(rtsp_session_t *session) { session->response_buffer_pos = remaining; session->response_buffer[session->response_buffer_pos] = '\0'; logger(LOG_DEBUG, "RTSP: Moved incomplete RTSP header to buffer start"); + made_progress = 1; } else if (session->response_buffer_pos >= sizeof(session->response_buffer) - 1) { /* Buffer full but no valid RTSP header - try to resync to next '$' * marker or RTSP response to minimize data loss */ @@ -1300,16 +1334,18 @@ static int rtsp_try_receive_response(rtsp_session_t *session) { memmove(session->response_buffer, next_dollar, session->response_buffer_pos - skip); session->response_buffer_pos -= skip; logger(LOG_DEBUG, "RTSP: Buffer full, resynced to '$' marker (skipped %zu bytes)", skip); + made_progress = 1; } else { logger(LOG_DEBUG, "RTSP: Buffer full with no RTSP header or '$' marker, " "discarding %zu bytes", session->response_buffer_pos); session->response_buffer_pos = 0; + made_progress = 1; } } /* Wait for more data */ - return RTSP_RESPONSE_OK; + return made_progress ? RTSP_RESPONSE_PROGRESS : RTSP_RESPONSE_OK; } /* Complete response received */ @@ -1870,7 +1906,8 @@ int rtsp_handle_udp_rtp_data(rtsp_session_t *session, connection_t *conn) { } } } - return 0; /* STUN packet consumed, not RTP data */ + /* STUN consumed socket data. Continue below so edge-triggered polling + * also drains any queued RTP datagrams behind it. */ } } @@ -1882,10 +1919,14 @@ int rtsp_handle_udp_rtp_data(rtsp_session_t *session, connection_t *conn) { if (!rtp_buf) { /* Buffer pool exhausted - drop this packet */ logger(LOG_DEBUG, "RTSP UDP: Buffer pool exhausted, dropping packet"); - session->packets_dropped++; - /* Drain the socket to prevent event loop spinning */ - uint8_t dummy[BUFFER_POOL_BUFFER_SIZE]; - recv(session->rtp_socket, dummy, sizeof(dummy), 0); + int drained = drain_socket_until_eagain(session->rtp_socket); + if (drained > 0) { + session->packets_dropped += (uint64_t)drained; + } else { + session->packets_dropped++; + if (drained < 0) + logger(LOG_ERROR, "RTSP: RTP drain failed: %s", strerror(errno)); + } return total_bytes_written; } diff --git a/src/utils.c b/src/utils.c index 3adbaa1..af4d613 100644 --- a/src/utils.c +++ b/src/utils.c @@ -148,6 +148,29 @@ void bind_to_upstream_interface(int sock, const char *ifname) { } } +int drain_socket_until_eagain(int fd) { + uint8_t dummy[2048]; + int reads = 0; + + for (;;) { + ssize_t nread = recv(fd, dummy, sizeof(dummy), 0); + if (nread > 0) { + reads++; + continue; + } + if (nread == 0) { + return reads; + } + if (errno == EINTR) { + continue; + } + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return reads; + } + return -1; + } +} + const char *get_upstream_interface_for_fcc(const char *override, const char *override_fcc) { /* Priority: override_fcc > override > upstream_interface_fcc > * upstream_interface */ diff --git a/src/utils.h b/src/utils.h index cab5565..917f9be 100644 --- a/src/utils.h +++ b/src/utils.h @@ -66,6 +66,14 @@ int set_socket_rcvbuf(int fd, int size); */ void bind_to_upstream_interface(int sock, const char *ifname); +/** + * Drain readable data from a non-blocking socket until EAGAIN/EWOULDBLOCK. + * + * @param fd Socket file descriptor + * @return Number of successful reads, or -1 on non-retryable recv error + */ +int drain_socket_until_eagain(int fd); + /** * Select the appropriate upstream interface for FCC with priority logic * Priority: override_fcc > override > upstream_interface_fcc > From f5b918d16f5d2ee671f8987f0633db965b8115fe Mon Sep 17 00:00:00 2001 From: Stackie Jia Date: Mon, 18 May 2026 18:54:03 +0800 Subject: [PATCH 3/5] fix(http-proxy): keep draining after header reads --- e2e/test_http_proxy_m3u_rewrite.py | 89 +++++++++++++++++++++++++++++- src/http_proxy.c | 5 +- 2 files changed, 89 insertions(+), 5 deletions(-) diff --git a/e2e/test_http_proxy_m3u_rewrite.py b/e2e/test_http_proxy_m3u_rewrite.py index 1703697..79b1a4b 100644 --- a/e2e/test_http_proxy_m3u_rewrite.py +++ b/e2e/test_http_proxy_m3u_rewrite.py @@ -6,6 +6,9 @@ proxy. These tests verify the rewriting logic end-to-end. """ +import socket +import threading + import pytest from helpers import ( @@ -19,6 +22,7 @@ pytestmark = pytest.mark.http_proxy _TIMEOUT = 5.0 +_HEADER_PARSE_READ_SIZE = 8191 # HTTP_PROXY_RESPONSE_BUFFER_SIZE - 1 # --------------------------------------------------------------------------- @@ -67,6 +71,81 @@ def _make_m3u_upstream(path, body, content_type="application/vnd.apple.mpegurl") return upstream +class _RawHTTPResponseUpstream: + """Serve a prebuilt raw HTTP response and keep the connection open.""" + + def __init__(self, response): + self.port = find_free_port() + self.response = response + self._server_sock = None + self._thread = None + self._stop = threading.Event() + self._client_threads = [] + + def start(self): + self._server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self._server_sock.bind(("127.0.0.1", self.port)) + self._server_sock.listen(5) + self._server_sock.settimeout(0.5) + self._thread = threading.Thread(target=self._accept, daemon=True) + self._thread.start() + + def stop(self): + self._stop.set() + if self._server_sock: + self._server_sock.close() + for thread in self._client_threads: + thread.join(timeout=1) + if self._thread: + self._thread.join(timeout=3) + + def _accept(self): + assert self._server_sock is not None + while not self._stop.is_set(): + try: + conn, _ = self._server_sock.accept() + except socket.timeout: + continue + except OSError: + break + thread = threading.Thread(target=self._handle, args=(conn,), daemon=True) + self._client_threads.append(thread) + thread.start() + + def _handle(self, conn): + try: + conn.settimeout(1.0) + request = b"" + while b"\r\n\r\n" not in request: + chunk = conn.recv(1024) + if not chunk: + return + request += chunk + conn.sendall(self.response) + self._stop.wait(_TIMEOUT * 2) + except OSError: + pass + finally: + conn.close() + + +def _make_padded_header_m3u_upstream(body, content_type="application/vnd.apple.mpegurl"): + """Create an upstream whose first proxy header read contains no body bytes.""" + if isinstance(body, str): + body = body.encode() + prefix = (f"HTTP/1.1 200 OK\r\nContent-Type: {content_type}\r\nContent-Length: {len(body)}\r\nX-Pad: ").encode() + suffix = b"\r\n\r\n" + pad_len = _HEADER_PARSE_READ_SIZE - len(prefix) - len(suffix) + assert pad_len > 0 + headers = prefix + (b"a" * pad_len) + suffix + assert len(headers) == _HEADER_PARSE_READ_SIZE + response = headers + body + upstream = _RawHTTPResponseUpstream(response) + upstream.start() + return upstream + + # --------------------------------------------------------------------------- # Basic absolute http:// URL rewriting # --------------------------------------------------------------------------- @@ -572,12 +651,16 @@ def test_mixed_absolute_and_relative(self, shared_r2h): finally: upstream.stop() - def test_large_playlist_body_is_fully_buffered(self, shared_r2h): - """A large M3U body should be fully read before rewriting.""" + @pytest.mark.parametrize("upstream_mode", ["normal_headers", "padded_header_only"], ids=["normal", "header-only"]) + def test_large_playlist_body_is_fully_buffered(self, shared_r2h, upstream_mode): + """A large M3U body should be fully read after header parsing.""" segment_count = 4096 segments = "".join("#EXTINF:10,\nsegment-%04d.ts?token=abcdef0123456789\n" % i for i in range(segment_count)) m3u = "#EXTM3U\n#EXT-X-TARGETDURATION:10\n" + segments + "#EXT-X-ENDLIST\n" - upstream = _make_m3u_upstream("/lookback/long.m3u8", m3u) + if upstream_mode == "normal_headers": + upstream = _make_m3u_upstream("/lookback/long.m3u8", m3u) + else: + upstream = _make_padded_header_m3u_upstream(m3u) try: status, hdrs, body = stream_get( "127.0.0.1", diff --git a/src/http_proxy.c b/src/http_proxy.c index ee27780..7a61e69 100644 --- a/src/http_proxy.c +++ b/src/http_proxy.c @@ -891,6 +891,7 @@ static int http_proxy_try_receive_response(http_proxy_session_t *session) { } session->response_buffer_pos += received; + int socket_progress = (int)received; /* Try to parse headers */ if (!session->headers_received) { @@ -899,7 +900,7 @@ static int http_proxy_try_receive_response(http_proxy_session_t *session) { return -1; } if (result == 0) { - return 0; /* Need more data for headers */ + return socket_progress; /* Progress: keep draining edge-triggered sockets */ } /* result > 0 means headers complete, state is now STREAMING */ } @@ -951,7 +952,7 @@ static int http_proxy_try_receive_response(http_proxy_session_t *session) { } } - return bytes_forwarded; + return bytes_forwarded > 0 ? bytes_forwarded : socket_progress; } /* Check if status code is a redirect that may have Location header */ From 639499f69687ab0e25147cefcdb54a6b83f0e89c Mon Sep 17 00:00:00 2001 From: Stackie Jia Date: Mon, 18 May 2026 19:20:16 +0800 Subject: [PATCH 4/5] fix(polling): clarify datagram drain semantics --- src/fcc.c | 2 +- src/multicast.c | 2 +- src/rtsp.c | 9 +++++---- src/utils.c | 7 ++----- src/utils.h | 12 ++++++++---- 5 files changed, 17 insertions(+), 15 deletions(-) diff --git a/src/fcc.c b/src/fcc.c index a1a17c6..2e70497 100644 --- a/src/fcc.c +++ b/src/fcc.c @@ -421,7 +421,7 @@ int fcc_handle_socket_event(stream_context_t *ctx, int fd, int64_t now) { /* Buffer pool exhausted - drop this packet */ logger(LOG_DEBUG, "FCC: Buffer pool exhausted, dropping packet"); fcc->last_data_time = now; - if (drain_socket_until_eagain(recv_sock) < 0) { + if (drain_datagram_socket_until_eagain(recv_sock) < 0) { logger(LOG_ERROR, "FCC: Drain failed: %s", strerror(errno)); } return 0; diff --git a/src/multicast.c b/src/multicast.c index 196bf2d..87b69ea 100644 --- a/src/multicast.c +++ b/src/multicast.c @@ -512,7 +512,7 @@ int mcast_session_handle_event(mcast_session_t *session, stream_context_t *ctx, if (!recv_buf) { logger(LOG_DEBUG, "Multicast: Buffer pool exhausted, dropping packet"); session->last_data_time = now; - if (drain_socket_until_eagain(session->sock) < 0) { + if (drain_datagram_socket_until_eagain(session->sock) < 0) { logger(LOG_DEBUG, "Multicast: Drain failed: %s", strerror(errno)); } return 0; diff --git a/src/rtsp.c b/src/rtsp.c index c72d392..1f0022d 100644 --- a/src/rtsp.c +++ b/src/rtsp.c @@ -1919,13 +1919,14 @@ int rtsp_handle_udp_rtp_data(rtsp_session_t *session, connection_t *conn) { if (!rtp_buf) { /* Buffer pool exhausted - drop this packet */ logger(LOG_DEBUG, "RTSP UDP: Buffer pool exhausted, dropping packet"); - int drained = drain_socket_until_eagain(session->rtp_socket); - if (drained > 0) { + int drained = drain_datagram_socket_until_eagain(session->rtp_socket); + if (drained >= 0) { + /* The drain count includes the first queued datagram that could not + * get a buffer plus any additional datagrams dropped behind it. */ session->packets_dropped += (uint64_t)drained; } else { session->packets_dropped++; - if (drained < 0) - logger(LOG_ERROR, "RTSP: RTP drain failed: %s", strerror(errno)); + logger(LOG_ERROR, "RTSP: RTP drain failed: %s", strerror(errno)); } return total_bytes_written; } diff --git a/src/utils.c b/src/utils.c index af4d613..e5493cc 100644 --- a/src/utils.c +++ b/src/utils.c @@ -148,19 +148,16 @@ void bind_to_upstream_interface(int sock, const char *ifname) { } } -int drain_socket_until_eagain(int fd) { +int drain_datagram_socket_until_eagain(int fd) { uint8_t dummy[2048]; int reads = 0; for (;;) { ssize_t nread = recv(fd, dummy, sizeof(dummy), 0); - if (nread > 0) { + if (nread >= 0) { reads++; continue; } - if (nread == 0) { - return reads; - } if (errno == EINTR) { continue; } diff --git a/src/utils.h b/src/utils.h index 917f9be..e0c8a1a 100644 --- a/src/utils.h +++ b/src/utils.h @@ -67,12 +67,16 @@ int set_socket_rcvbuf(int fd, int size); void bind_to_upstream_interface(int sock, const char *ifname); /** - * Drain readable data from a non-blocking socket until EAGAIN/EWOULDBLOCK. + * Drain readable datagrams from a non-blocking socket until + * EAGAIN/EWOULDBLOCK. * - * @param fd Socket file descriptor - * @return Number of successful reads, or -1 on non-retryable recv error + * Intended for UDP/datagram sockets only. A zero-byte recv is a valid + * zero-length datagram and is counted as one drained datagram. + * + * @param fd Datagram socket file descriptor + * @return Number of drained datagrams, or -1 on non-retryable recv error */ -int drain_socket_until_eagain(int fd); +int drain_datagram_socket_until_eagain(int fd); /** * Select the appropriate upstream interface for FCC with priority logic From 120fdff84ff8193aaf62422a5b8d58639964f906 Mon Sep 17 00:00:00 2001 From: Stackie Jia Date: Mon, 18 May 2026 19:28:44 +0800 Subject: [PATCH 5/5] test(http): require oversized request 400 --- e2e/test_error.py | 4 ++-- src/connection.c | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/e2e/test_error.py b/e2e/test_error.py index a95ea5c..b4a6676 100644 --- a/e2e/test_error.py +++ b/e2e/test_error.py @@ -214,7 +214,7 @@ def test_oversized_incomplete_request_line_fails_deterministically(self, r2h_bin request = b"GET /" + (b"a" * 9000) data, timed_out = self._send_raw_request(port, request) assert not timed_out - assert data == b"" or b"400 Bad Request" in data + assert b"400 Bad Request" in data finally: r2h.stop() @@ -227,7 +227,7 @@ def test_oversized_incomplete_header_fails_deterministically(self, r2h_binary): request = b"GET /status HTTP/1.1\r\nHost: 127.0.0.1\r\nX-Fill: " + (b"a" * 9000) data, timed_out = self._send_raw_request(port, request) assert not timed_out - assert data == b"" or b"400 Bad Request" in data + assert b"400 Bad Request" in data finally: r2h.stop() diff --git a/src/connection.c b/src/connection.c index 8a02fae..7698689 100644 --- a/src/connection.c +++ b/src/connection.c @@ -714,6 +714,7 @@ void connection_handle_read(connection_t *c) { if (c->in_len == INBUF_SIZE) { logger(LOG_WARN, "HTTP request exceeded input buffer before headers completed"); http_send_400(c); + c->state = CONN_CLOSING; return; } /* else parse_result == 0: need more data, keep reading */