From 0973b193d46275c59ed172d6c723366954e95fcb Mon Sep 17 00:00:00 2001 From: Pierre Mesure Date: Tue, 28 Apr 2026 11:25:12 +0200 Subject: [PATCH 1/2] Increase max_retries to 10 --- lib/sequin/sinks/meilisearch/client.ex | 2 +- test/sequin/meilisearch_client_test.exs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/sequin/sinks/meilisearch/client.ex b/lib/sequin/sinks/meilisearch/client.ex index 26effd614..a75bb6ef3 100644 --- a/lib/sequin/sinks/meilisearch/client.ex +++ b/lib/sequin/sinks/meilisearch/client.ex @@ -27,7 +27,7 @@ defmodule Sequin.Sinks.Meilisearch.Client do url: "/tasks/#{task_id}", # We need to disable this if we use a custom retry that emits delays retry_delay: nil, - max_retries: 5, + max_retries: 10, retry: fn request, response_or_exception -> should_retry = case response_or_exception do diff --git a/test/sequin/meilisearch_client_test.exs b/test/sequin/meilisearch_client_test.exs index d6441c692..bc3d0770f 100644 --- a/test/sequin/meilisearch_client_test.exs +++ b/test/sequin/meilisearch_client_test.exs @@ -339,12 +339,12 @@ defmodule Sequin.Sinks.Meilisearch.ClientTest do assert error.details.task_id == 456 assert error.details.last_status == "processing" - # Verify we made all 6 attempts (1 initial + 5 retries) - for i <- 0..5 do + # Verify we made all 11 attempts (1 initial + 10 retries) + for i <- 0..10 do assert_receive {:task_check, ^i}, 3_000 end - refute_receive {:task_check, 6}, 100 + refute_receive {:task_check, 11}, 100 end test "wait_for_task handles task failure" do From 9c65d3ea50828bf4f1cf5a2f367cb7c29a9117c9 Mon Sep 17 00:00:00 2001 From: Pierre Mesure Date: Tue, 28 Apr 2026 12:37:41 +0200 Subject: [PATCH 2/2] Replace max_retries with timeout_seconds --- lib/sequin/sinks/meilisearch/client.ex | 135 ++++++++++++------------ test/sequin/meilisearch_client_test.exs | 8 +- 2 files changed, 74 insertions(+), 69 deletions(-) diff --git a/lib/sequin/sinks/meilisearch/client.ex b/lib/sequin/sinks/meilisearch/client.ex index a75bb6ef3..564e96f94 100644 --- a/lib/sequin/sinks/meilisearch/client.ex +++ b/lib/sequin/sinks/meilisearch/client.ex @@ -20,78 +20,83 @@ defmodule Sequin.Sinks.Meilisearch.Client do defp decode_body(_), do: %{} defp wait_for_task(%MeilisearchSink{} = sink, task_id) do - req = - sink - |> base_request() - |> Req.merge( - url: "/tasks/#{task_id}", - # We need to disable this if we use a custom retry that emits delays - retry_delay: nil, - max_retries: 10, - retry: fn request, response_or_exception -> - should_retry = - case response_or_exception do - %Req.Response{status: 200, body: encoded_body} -> - # NOTE: Req does not automatically decode on retry functions - case encoded_body |> :zlib.gunzip() |> Jason.decode() do - {:ok, %{"status" => status}} when status in ["enqueued", "processing"] -> - true - - _ -> - false - end - - %Req.Response{status: status} when status in [408, 429, 500, 502, 503, 504] -> - true - - %Req.TransportError{reason: reason} when reason in [:timeout, :econnrefused, :closed] -> - true - - _ -> - false - end - - if should_retry do - # Req tracks retry count internally via request.private[:req_retry_count] - count = request.private[:req_retry_count] || 0 - delay = Sequin.Time.exponential_backoff(200, count, 10_000) - - Logger.debug("[Meilisearch] Task #{task_id} has not succeeded, retrying in #{delay}ms (attempt #{count + 1})") - - {:delay, delay} - else - false + deadline_ms = System.monotonic_time(:millisecond) + sink.timeout_seconds * 1000 + do_wait_for_task(sink, task_id, deadline_ms, 0, nil) + end + + defp do_wait_for_task(%MeilisearchSink{} = sink, task_id, deadline_ms, attempt, last_status) do + now_ms = System.monotonic_time(:millisecond) + remaining_ms = deadline_ms - now_ms + + if remaining_ms <= 0 do + {:error, + Error.service( + service: :meilisearch, + message: "Task verification timed out", + details: %{task_id: task_id, last_status: last_status} + )} + else + req = + sink + |> base_request() + |> Req.merge( + url: "/tasks/#{task_id}", + receive_timeout: min(1_000, remaining_ms), + retry: false, + max_retries: 0 + ) + + case Req.get(req) do + {:ok, %{status: 200, body: body}} -> + decoded_body = decode_body(body) + + case decoded_body do + %{"status" => status} when status in ["succeeded", "success"] -> + :ok + + %{"status" => "failed", "error" => error} -> + message = extract_error_message(error) + {:error, Error.service(service: :meilisearch, message: message, details: error)} + + %{"status" => status} when status in ["enqueued", "processing"] -> + retry_task_poll(sink, task_id, deadline_ms, attempt, status) + + _ -> + {:error, Error.service(service: :meilisearch, message: "Invalid response format")} end - end - ) - case Req.get(req) do - {:ok, %{body: body}} -> - decoded_body = decode_body(body) + {:error, %Req.TransportError{reason: reason}} + when reason in [:timeout, :econnrefused, :closed] -> + retry_task_poll(sink, task_id, deadline_ms, attempt, last_status) - case decoded_body do - %{"status" => status} when status in ["succeeded", "success"] -> - :ok + {:ok, %{status: status}} when status in [408, 429, 500, 502, 503, 504] -> + Logger.debug("[Meilisearch] Task #{task_id} check returned #{status}, retrying") + retry_task_poll(sink, task_id, deadline_ms, attempt, last_status) - %{"status" => "failed", "error" => error} -> - message = extract_error_message(error) - {:error, Error.service(service: :meilisearch, message: message, details: error)} + {:error, reason} -> + {:error, Error.service(service: :meilisearch, message: "Unknown error", details: reason)} + end + end + end - %{"status" => status} when status in ["enqueued", "processing"] -> - # This means we exhausted retries - {:error, - Error.service( - service: :meilisearch, - message: "Task verification timed out", - details: %{task_id: task_id, last_status: status} - )} + defp retry_task_poll(%MeilisearchSink{} = sink, task_id, deadline_ms, attempt, last_status) do + now_ms = System.monotonic_time(:millisecond) + remaining_ms = deadline_ms - now_ms - _ -> - {:error, Error.service(service: :meilisearch, message: "Invalid response format")} - end + if remaining_ms <= 0 do + {:error, + Error.service( + service: :meilisearch, + message: "Task verification timed out", + details: %{task_id: task_id, last_status: last_status} + )} + else + delay = min(Sequin.Time.exponential_backoff(200, attempt, 10_000), remaining_ms) - {:error, reason} -> - {:error, Error.service(service: :meilisearch, message: "Unknown error", details: reason)} + Logger.debug("[Meilisearch] Task #{task_id} has not succeeded, retrying in #{delay}ms (attempt #{attempt + 1})") + + Process.sleep(delay) + do_wait_for_task(sink, task_id, deadline_ms, attempt + 1, last_status) end end diff --git a/test/sequin/meilisearch_client_test.exs b/test/sequin/meilisearch_client_test.exs index bc3d0770f..b4e3dcb79 100644 --- a/test/sequin/meilisearch_client_test.exs +++ b/test/sequin/meilisearch_client_test.exs @@ -305,7 +305,7 @@ defmodule Sequin.Sinks.Meilisearch.ClientTest do assert_receive {:task_check, 3}, 2000 end - test "wait_for_task times out after exhausting retries" do + test "wait_for_task times out after exhausting the timeout budget" do test_pid = self() call_count = :counters.new(1, []) records = [SinkFactory.meilisearch_record()] @@ -339,12 +339,12 @@ defmodule Sequin.Sinks.Meilisearch.ClientTest do assert error.details.task_id == 456 assert error.details.last_status == "processing" - # Verify we made all 11 attempts (1 initial + 10 retries) - for i <- 0..10 do + # Verify we polled until the total timeout budget was exhausted. + for i <- 0..5 do assert_receive {:task_check, ^i}, 3_000 end - refute_receive {:task_check, 11}, 100 + refute_receive {:task_check, 6}, 100 end test "wait_for_task handles task failure" do