Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions e2e/test_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,51 @@ def test_very_long_url(self, basic_r2h):
status, _, _ = http_get("127.0.0.1", basic_r2h.port, "/status")
assert status == 200

def _send_raw_request(self, port: int, request: bytes, timeout: float = 2.0) -> tuple[bytes, bool]:
sock = socket.create_connection(("127.0.0.1", port), timeout=timeout)
sock.settimeout(timeout)
try:
sock.sendall(request)
data = b""
while True:
try:
chunk = sock.recv(4096)
except socket.timeout:
return data, True
if not chunk:
return data, False
data += chunk
if b"\r\n\r\n" in data:
return data, False
finally:
sock.close()

def test_oversized_incomplete_request_line_fails_deterministically(self, r2h_binary):
"""An oversized request line without CRLF should not spin forever."""
port = find_free_port()
r2h = R2HProcess(r2h_binary, port, extra_args=["-v", "4", "-m", "100"])
try:
r2h.start()
request = b"GET /" + (b"a" * 9000)
data, timed_out = self._send_raw_request(port, request)
assert not timed_out
assert b"400 Bad Request" in data
finally:
r2h.stop()

def test_oversized_incomplete_header_fails_deterministically(self, r2h_binary):
"""Oversized headers without the terminating CRLF should not spin forever."""
port = find_free_port()
r2h = R2HProcess(r2h_binary, port, extra_args=["-v", "4", "-m", "100"])
try:
r2h.start()
request = b"GET /status HTTP/1.1\r\nHost: 127.0.0.1\r\nX-Fill: " + (b"a" * 9000)
data, timed_out = self._send_raw_request(port, request)
assert not timed_out
assert b"400 Bad Request" in data
finally:
r2h.stop()


# ---------------------------------------------------------------------------
# Concurrent connections
Expand Down
111 changes: 111 additions & 0 deletions e2e/test_http_proxy_m3u_rewrite.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,23 @@
proxy. These tests verify the rewriting logic end-to-end.
"""

import socket
import threading

import pytest

from helpers import (
MockHTTPUpstream,
R2HProcess,
find_free_port,
http_get,
stream_get,
)

pytestmark = pytest.mark.http_proxy

_TIMEOUT = 5.0
_HEADER_PARSE_READ_SIZE = 8191 # HTTP_PROXY_RESPONSE_BUFFER_SIZE - 1


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -66,6 +71,81 @@ def _make_m3u_upstream(path, body, content_type="application/vnd.apple.mpegurl")
return upstream


class _RawHTTPResponseUpstream:
"""Serve a prebuilt raw HTTP response and keep the connection open."""

def __init__(self, response):
self.port = find_free_port()
self.response = response
self._server_sock = None
self._thread = None
self._stop = threading.Event()
self._client_threads = []

def start(self):
self._server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._server_sock.bind(("127.0.0.1", self.port))
self._server_sock.listen(5)
self._server_sock.settimeout(0.5)
self._thread = threading.Thread(target=self._accept, daemon=True)
self._thread.start()

def stop(self):
self._stop.set()
if self._server_sock:
self._server_sock.close()
for thread in self._client_threads:
thread.join(timeout=1)
if self._thread:
self._thread.join(timeout=3)

def _accept(self):
assert self._server_sock is not None
while not self._stop.is_set():
try:
conn, _ = self._server_sock.accept()
except socket.timeout:
continue
except OSError:
break
thread = threading.Thread(target=self._handle, args=(conn,), daemon=True)
self._client_threads.append(thread)
thread.start()

def _handle(self, conn):
try:
conn.settimeout(1.0)
request = b""
while b"\r\n\r\n" not in request:
chunk = conn.recv(1024)
if not chunk:
return
request += chunk
conn.sendall(self.response)
self._stop.wait(_TIMEOUT * 2)
except OSError:
pass
finally:
conn.close()


def _make_padded_header_m3u_upstream(body, content_type="application/vnd.apple.mpegurl"):
"""Create an upstream whose first proxy header read contains no body bytes."""
if isinstance(body, str):
body = body.encode()
prefix = (f"HTTP/1.1 200 OK\r\nContent-Type: {content_type}\r\nContent-Length: {len(body)}\r\nX-Pad: ").encode()
suffix = b"\r\n\r\n"
pad_len = _HEADER_PARSE_READ_SIZE - len(prefix) - len(suffix)
assert pad_len > 0
headers = prefix + (b"a" * pad_len) + suffix
assert len(headers) == _HEADER_PARSE_READ_SIZE
response = headers + body
upstream = _RawHTTPResponseUpstream(response)
upstream.start()
return upstream


# ---------------------------------------------------------------------------
# Basic absolute http:// URL rewriting
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -571,6 +651,37 @@ def test_mixed_absolute_and_relative(self, shared_r2h):
finally:
upstream.stop()

@pytest.mark.parametrize("upstream_mode", ["normal_headers", "padded_header_only"], ids=["normal", "header-only"])
def test_large_playlist_body_is_fully_buffered(self, shared_r2h, upstream_mode):
"""A large M3U body should be fully read after header parsing."""
segment_count = 4096
segments = "".join("#EXTINF:10,\nsegment-%04d.ts?token=abcdef0123456789\n" % i for i in range(segment_count))
m3u = "#EXTM3U\n#EXT-X-TARGETDURATION:10\n" + segments + "#EXT-X-ENDLIST\n"
if upstream_mode == "normal_headers":
upstream = _make_m3u_upstream("/lookback/long.m3u8", m3u)
else:
upstream = _make_padded_header_m3u_upstream(m3u)
try:
status, hdrs, body = stream_get(
"127.0.0.1",
shared_r2h.port,
f"/http/127.0.0.1:{upstream.port}/lookback/long.m3u8",
read_bytes=512 * 1024,
timeout=_TIMEOUT,
)
text = body.decode("utf-8", errors="replace")
assert status == 200
assert f"/http/127.0.0.1:{upstream.port}/lookback/segment-0000.ts?token=abcdef0123456789" in text
assert (
f"/http/127.0.0.1:{upstream.port}/lookback/segment-{segment_count - 1:04d}.ts?token=abcdef0123456789"
in text
)
cl = hdrs.get("content-length")
assert cl is not None
assert int(cl) == len(body)
finally:
upstream.stop()


# ---------------------------------------------------------------------------
# Edge cases
Expand Down
6 changes: 6 additions & 0 deletions src/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,12 @@ void connection_handle_read(connection_t *c) {
c->state = CONN_CLOSING;
return;
}
if (c->in_len == INBUF_SIZE) {
logger(LOG_WARN, "HTTP request exceeded input buffer before headers completed");
http_send_400(c);
Comment thread
stackia marked this conversation as resolved.
Comment on lines +715 to +716
c->state = CONN_CLOSING;
return;
}
/* else parse_result == 0: need more data, keep reading */
} else {
return; /* Not in a request-reading state */
Expand Down
6 changes: 3 additions & 3 deletions src/fcc.c
Original file line number Diff line number Diff line change
Expand Up @@ -421,9 +421,9 @@ int fcc_handle_socket_event(stream_context_t *ctx, int fd, int64_t now) {
/* Buffer pool exhausted - drop this packet */
logger(LOG_DEBUG, "FCC: Buffer pool exhausted, dropping packet");
fcc->last_data_time = now;
/* Drain the socket to prevent event loop spinning */
uint8_t dummy[BUFFER_POOL_BUFFER_SIZE];
recvfrom(recv_sock, dummy, sizeof(dummy), 0, NULL, NULL);
if (drain_datagram_socket_until_eagain(recv_sock) < 0) {
logger(LOG_ERROR, "FCC: Drain failed: %s", strerror(errno));
}
return 0;
}

Expand Down
9 changes: 6 additions & 3 deletions src/http_proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,7 @@ static int http_proxy_try_receive_response(http_proxy_session_t *session) {
return http_proxy_finalize_rewrite(session);
}

return 0; /* Keep buffering */
return (int)received; /* Progress: keep draining edge-triggered sockets */
}

/* Phase 2: Zero-copy streaming - recv directly to buffer pool */
Expand Down Expand Up @@ -891,6 +891,7 @@ static int http_proxy_try_receive_response(http_proxy_session_t *session) {
}

session->response_buffer_pos += received;
int socket_progress = (int)received;

/* Try to parse headers */
if (!session->headers_received) {
Expand All @@ -899,7 +900,7 @@ static int http_proxy_try_receive_response(http_proxy_session_t *session) {
return -1;
}
if (result == 0) {
return 0; /* Need more data for headers */
return socket_progress; /* Progress: keep draining edge-triggered sockets */
}
/* result > 0 means headers complete, state is now STREAMING */
}
Expand Down Expand Up @@ -930,6 +931,8 @@ static int http_proxy_try_receive_response(http_proxy_session_t *session) {
logger(LOG_DEBUG, "HTTP Proxy: All M3U content received with headers (%zd bytes)", session->bytes_received);
return http_proxy_finalize_rewrite(session);
}

bytes_forwarded = (int)initial_size;
} else {
/* Normal mode: forward immediately */
if (connection_queue_output(session->conn, session->response_buffer, session->response_buffer_pos) < 0) {
Expand All @@ -949,7 +952,7 @@ static int http_proxy_try_receive_response(http_proxy_session_t *session) {
}
}

return bytes_forwarded;
return bytes_forwarded > 0 ? bytes_forwarded : socket_progress;
Comment on lines 808 to +955
}

/* Check if status code is a redirect that may have Location header */
Expand Down
6 changes: 3 additions & 3 deletions src/multicast.c
Original file line number Diff line number Diff line change
Expand Up @@ -512,9 +512,9 @@ int mcast_session_handle_event(mcast_session_t *session, stream_context_t *ctx,
if (!recv_buf) {
logger(LOG_DEBUG, "Multicast: Buffer pool exhausted, dropping packet");
session->last_data_time = now;
/* Drain socket to prevent event loop spinning */
uint8_t dummy[BUFFER_POOL_BUFFER_SIZE];
recv(session->sock, dummy, sizeof(dummy), 0);
if (drain_datagram_socket_until_eagain(session->sock) < 0) {
logger(LOG_DEBUG, "Multicast: Drain failed: %s", strerror(errno));
}
return 0;
}

Expand Down
Loading
Loading