From 74656cb95256a8e6c770929b9c4f857769d8a1ee Mon Sep 17 00:00:00 2001 From: Stackie Jia Date: Mon, 18 May 2026 18:34:13 +0800 Subject: [PATCH 1/3] fix(http-proxy): drain large m3u rewrite bodies --- e2e/test_http_proxy_m3u_rewrite.py | 28 ++++++++++++++++++++++++++++ src/http_proxy.c | 4 +++- 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/e2e/test_http_proxy_m3u_rewrite.py b/e2e/test_http_proxy_m3u_rewrite.py index 0d258ad..1703697 100644 --- a/e2e/test_http_proxy_m3u_rewrite.py +++ b/e2e/test_http_proxy_m3u_rewrite.py @@ -13,6 +13,7 @@ R2HProcess, find_free_port, http_get, + stream_get, ) pytestmark = pytest.mark.http_proxy @@ -571,6 +572,33 @@ def test_mixed_absolute_and_relative(self, shared_r2h): finally: upstream.stop() + def test_large_playlist_body_is_fully_buffered(self, shared_r2h): + """A large M3U body should be fully read before rewriting.""" + segment_count = 4096 + segments = "".join("#EXTINF:10,\nsegment-%04d.ts?token=abcdef0123456789\n" % i for i in range(segment_count)) + m3u = "#EXTM3U\n#EXT-X-TARGETDURATION:10\n" + segments + "#EXT-X-ENDLIST\n" + upstream = _make_m3u_upstream("/lookback/long.m3u8", m3u) + try: + status, hdrs, body = stream_get( + "127.0.0.1", + shared_r2h.port, + f"/http/127.0.0.1:{upstream.port}/lookback/long.m3u8", + read_bytes=512 * 1024, + timeout=_TIMEOUT, + ) + text = body.decode("utf-8", errors="replace") + assert status == 200 + assert f"/http/127.0.0.1:{upstream.port}/lookback/segment-0000.ts?token=abcdef0123456789" in text + assert ( + f"/http/127.0.0.1:{upstream.port}/lookback/segment-{segment_count - 1:04d}.ts?token=abcdef0123456789" + in text + ) + cl = hdrs.get("content-length") + assert cl is not None + assert int(cl) == len(body) + finally: + upstream.stop() + # --------------------------------------------------------------------------- # Edge cases diff --git a/src/http_proxy.c b/src/http_proxy.c index 52f85bc..ee27780 100644 --- a/src/http_proxy.c +++ b/src/http_proxy.c @@ -805,7 +805,7 @@ static int http_proxy_try_receive_response(http_proxy_session_t *session) { return http_proxy_finalize_rewrite(session); } - return 0; /* Keep buffering */ + return (int)received; /* Progress: keep draining edge-triggered sockets */ } /* Phase 2: Zero-copy streaming - recv directly to buffer pool */ @@ -930,6 +930,8 @@ static int http_proxy_try_receive_response(http_proxy_session_t *session) { logger(LOG_DEBUG, "HTTP Proxy: All M3U content received with headers (%zd bytes)", session->bytes_received); return http_proxy_finalize_rewrite(session); } + + bytes_forwarded = (int)initial_size; } else { /* Normal mode: forward immediately */ if (connection_queue_output(session->conn, session->response_buffer, session->response_buffer_pos) < 0) { From f5b918d16f5d2ee671f8987f0633db965b8115fe Mon Sep 17 00:00:00 2001 From: Stackie Jia Date: Mon, 18 May 2026 18:54:03 +0800 Subject: [PATCH 2/3] fix(http-proxy): keep draining after header reads --- e2e/test_http_proxy_m3u_rewrite.py | 89 +++++++++++++++++++++++++++++- src/http_proxy.c | 5 +- 2 files changed, 89 insertions(+), 5 deletions(-) diff --git a/e2e/test_http_proxy_m3u_rewrite.py b/e2e/test_http_proxy_m3u_rewrite.py index 1703697..79b1a4b 100644 --- a/e2e/test_http_proxy_m3u_rewrite.py +++ b/e2e/test_http_proxy_m3u_rewrite.py @@ -6,6 +6,9 @@ proxy. These tests verify the rewriting logic end-to-end. """ +import socket +import threading + import pytest from helpers import ( @@ -19,6 +22,7 @@ pytestmark = pytest.mark.http_proxy _TIMEOUT = 5.0 +_HEADER_PARSE_READ_SIZE = 8191 # HTTP_PROXY_RESPONSE_BUFFER_SIZE - 1 # --------------------------------------------------------------------------- @@ -67,6 +71,81 @@ def _make_m3u_upstream(path, body, content_type="application/vnd.apple.mpegurl") return upstream +class _RawHTTPResponseUpstream: + """Serve a prebuilt raw HTTP response and keep the connection open.""" + + def __init__(self, response): + self.port = find_free_port() + self.response = response + self._server_sock = None + self._thread = None + self._stop = threading.Event() + self._client_threads = [] + + def start(self): + self._server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self._server_sock.bind(("127.0.0.1", self.port)) + self._server_sock.listen(5) + self._server_sock.settimeout(0.5) + self._thread = threading.Thread(target=self._accept, daemon=True) + self._thread.start() + + def stop(self): + self._stop.set() + if self._server_sock: + self._server_sock.close() + for thread in self._client_threads: + thread.join(timeout=1) + if self._thread: + self._thread.join(timeout=3) + + def _accept(self): + assert self._server_sock is not None + while not self._stop.is_set(): + try: + conn, _ = self._server_sock.accept() + except socket.timeout: + continue + except OSError: + break + thread = threading.Thread(target=self._handle, args=(conn,), daemon=True) + self._client_threads.append(thread) + thread.start() + + def _handle(self, conn): + try: + conn.settimeout(1.0) + request = b"" + while b"\r\n\r\n" not in request: + chunk = conn.recv(1024) + if not chunk: + return + request += chunk + conn.sendall(self.response) + self._stop.wait(_TIMEOUT * 2) + except OSError: + pass + finally: + conn.close() + + +def _make_padded_header_m3u_upstream(body, content_type="application/vnd.apple.mpegurl"): + """Create an upstream whose first proxy header read contains no body bytes.""" + if isinstance(body, str): + body = body.encode() + prefix = (f"HTTP/1.1 200 OK\r\nContent-Type: {content_type}\r\nContent-Length: {len(body)}\r\nX-Pad: ").encode() + suffix = b"\r\n\r\n" + pad_len = _HEADER_PARSE_READ_SIZE - len(prefix) - len(suffix) + assert pad_len > 0 + headers = prefix + (b"a" * pad_len) + suffix + assert len(headers) == _HEADER_PARSE_READ_SIZE + response = headers + body + upstream = _RawHTTPResponseUpstream(response) + upstream.start() + return upstream + + # --------------------------------------------------------------------------- # Basic absolute http:// URL rewriting # --------------------------------------------------------------------------- @@ -572,12 +651,16 @@ def test_mixed_absolute_and_relative(self, shared_r2h): finally: upstream.stop() - def test_large_playlist_body_is_fully_buffered(self, shared_r2h): - """A large M3U body should be fully read before rewriting.""" + @pytest.mark.parametrize("upstream_mode", ["normal_headers", "padded_header_only"], ids=["normal", "header-only"]) + def test_large_playlist_body_is_fully_buffered(self, shared_r2h, upstream_mode): + """A large M3U body should be fully read after header parsing.""" segment_count = 4096 segments = "".join("#EXTINF:10,\nsegment-%04d.ts?token=abcdef0123456789\n" % i for i in range(segment_count)) m3u = "#EXTM3U\n#EXT-X-TARGETDURATION:10\n" + segments + "#EXT-X-ENDLIST\n" - upstream = _make_m3u_upstream("/lookback/long.m3u8", m3u) + if upstream_mode == "normal_headers": + upstream = _make_m3u_upstream("/lookback/long.m3u8", m3u) + else: + upstream = _make_padded_header_m3u_upstream(m3u) try: status, hdrs, body = stream_get( "127.0.0.1", diff --git a/src/http_proxy.c b/src/http_proxy.c index ee27780..7a61e69 100644 --- a/src/http_proxy.c +++ b/src/http_proxy.c @@ -891,6 +891,7 @@ static int http_proxy_try_receive_response(http_proxy_session_t *session) { } session->response_buffer_pos += received; + int socket_progress = (int)received; /* Try to parse headers */ if (!session->headers_received) { @@ -899,7 +900,7 @@ static int http_proxy_try_receive_response(http_proxy_session_t *session) { return -1; } if (result == 0) { - return 0; /* Need more data for headers */ + return socket_progress; /* Progress: keep draining edge-triggered sockets */ } /* result > 0 means headers complete, state is now STREAMING */ } @@ -951,7 +952,7 @@ static int http_proxy_try_receive_response(http_proxy_session_t *session) { } } - return bytes_forwarded; + return bytes_forwarded > 0 ? bytes_forwarded : socket_progress; } /* Check if status code is a redirect that may have Location header */ From 786abe3cd238ec53d1c62423279761c552da6859 Mon Sep 17 00:00:00 2001 From: Stackie Jia Date: Mon, 18 May 2026 19:09:03 +0800 Subject: [PATCH 3/3] refactor(http-proxy): update return-value contract for socket event handler The M3U rewrite drain fix changed the return semantics from "bytes forwarded to client" to "progress made" (including buffered-but-not-yet- forwarded bytes). Update the doc comment and rename the accumulator variable to reflect this. --- src/http_proxy.c | 6 +++--- src/http_proxy.h | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/http_proxy.c b/src/http_proxy.c index 7a61e69..81136e1 100644 --- a/src/http_proxy.c +++ b/src/http_proxy.c @@ -1308,7 +1308,7 @@ int http_proxy_handle_socket_event(http_proxy_session_t *session, uint32_t event * edge-triggered pollers (epoll EPOLLET / kqueue EV_CLEAR) where the read event fires * only once per data arrival and won't re-trigger while unread data * remains in the socket buffer. */ - int bytes_forwarded = 0; + int progress = 0; if (events & POLLER_IN) { while (session->state == HTTP_PROXY_STATE_AWAITING_HEADERS || session->state == HTTP_PROXY_STATE_STREAMING) { result = http_proxy_try_receive_response(session); @@ -1319,7 +1319,7 @@ int http_proxy_handle_socket_event(http_proxy_session_t *session, uint32_t event } if (result == 0) break; /* EAGAIN or need more data - wait for next event */ - bytes_forwarded += result; + progress += result; } } @@ -1359,7 +1359,7 @@ int http_proxy_handle_socket_event(http_proxy_session_t *session, uint32_t event } } - return bytes_forwarded; + return progress; } int http_proxy_session_cleanup(http_proxy_session_t *session) { diff --git a/src/http_proxy.h b/src/http_proxy.h index b4b2de1..44681cb 100644 --- a/src/http_proxy.h +++ b/src/http_proxy.h @@ -178,8 +178,8 @@ int http_proxy_connect(http_proxy_session_t *session); * Called when socket has EPOLLIN or EPOLLOUT events * @param session HTTP proxy session * @param events Epoll events (EPOLLIN, EPOLLOUT, etc.) - * @return Number of bytes forwarded to client (>0), 0 if no data forwarded, -1 - * on error + * @return Positive value if progress was made (data received/forwarded), + * 0 if no progress (EAGAIN), -1 on error */ int http_proxy_handle_socket_event(http_proxy_session_t *session, uint32_t events);