Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 70 additions & 65 deletions lib/sequin/sinks/meilisearch/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,78 +20,83 @@ defmodule Sequin.Sinks.Meilisearch.Client do
defp decode_body(_), do: %{}

defp wait_for_task(%MeilisearchSink{} = sink, task_id) do
req =
sink
|> base_request()
|> Req.merge(
url: "/tasks/#{task_id}",
# We need to disable this if we use a custom retry that emits delays
retry_delay: nil,
max_retries: 5,
retry: fn request, response_or_exception ->
should_retry =
case response_or_exception do
%Req.Response{status: 200, body: encoded_body} ->
# NOTE: Req does not automatically decode on retry functions
case encoded_body |> :zlib.gunzip() |> Jason.decode() do
{:ok, %{"status" => status}} when status in ["enqueued", "processing"] ->
true

_ ->
false
end

%Req.Response{status: status} when status in [408, 429, 500, 502, 503, 504] ->
true

%Req.TransportError{reason: reason} when reason in [:timeout, :econnrefused, :closed] ->
true

_ ->
false
end

if should_retry do
# Req tracks retry count internally via request.private[:req_retry_count]
count = request.private[:req_retry_count] || 0
delay = Sequin.Time.exponential_backoff(200, count, 10_000)

Logger.debug("[Meilisearch] Task #{task_id} has not succeeded, retrying in #{delay}ms (attempt #{count + 1})")

{:delay, delay}
else
false
deadline_ms = System.monotonic_time(:millisecond) + sink.timeout_seconds * 1000
do_wait_for_task(sink, task_id, deadline_ms, 0, nil)
end

defp do_wait_for_task(%MeilisearchSink{} = sink, task_id, deadline_ms, attempt, last_status) do
now_ms = System.monotonic_time(:millisecond)
remaining_ms = deadline_ms - now_ms

if remaining_ms <= 0 do
{:error,
Error.service(
service: :meilisearch,
message: "Task verification timed out",
details: %{task_id: task_id, last_status: last_status}
)}
else
req =
sink
|> base_request()
|> Req.merge(
url: "/tasks/#{task_id}",
receive_timeout: min(1_000, remaining_ms),
retry: false,
max_retries: 0
)

case Req.get(req) do
{:ok, %{status: 200, body: body}} ->
decoded_body = decode_body(body)

case decoded_body do
%{"status" => status} when status in ["succeeded", "success"] ->
:ok

%{"status" => "failed", "error" => error} ->
message = extract_error_message(error)
{:error, Error.service(service: :meilisearch, message: message, details: error)}

%{"status" => status} when status in ["enqueued", "processing"] ->
retry_task_poll(sink, task_id, deadline_ms, attempt, status)

_ ->
{:error, Error.service(service: :meilisearch, message: "Invalid response format")}
end
end
)

case Req.get(req) do
{:ok, %{body: body}} ->
decoded_body = decode_body(body)
{:error, %Req.TransportError{reason: reason}}
when reason in [:timeout, :econnrefused, :closed] ->
retry_task_poll(sink, task_id, deadline_ms, attempt, last_status)

case decoded_body do
%{"status" => status} when status in ["succeeded", "success"] ->
:ok
{:ok, %{status: status}} when status in [408, 429, 500, 502, 503, 504] ->
Logger.debug("[Meilisearch] Task #{task_id} check returned #{status}, retrying")
retry_task_poll(sink, task_id, deadline_ms, attempt, last_status)

%{"status" => "failed", "error" => error} ->
message = extract_error_message(error)
{:error, Error.service(service: :meilisearch, message: message, details: error)}
{:error, reason} ->
{:error, Error.service(service: :meilisearch, message: "Unknown error", details: reason)}
end
end
end

%{"status" => status} when status in ["enqueued", "processing"] ->
# This means we exhausted retries
{:error,
Error.service(
service: :meilisearch,
message: "Task verification timed out",
details: %{task_id: task_id, last_status: status}
)}
defp retry_task_poll(%MeilisearchSink{} = sink, task_id, deadline_ms, attempt, last_status) do
now_ms = System.monotonic_time(:millisecond)
remaining_ms = deadline_ms - now_ms

_ ->
{:error, Error.service(service: :meilisearch, message: "Invalid response format")}
end
if remaining_ms <= 0 do
{:error,
Error.service(
service: :meilisearch,
message: "Task verification timed out",
details: %{task_id: task_id, last_status: last_status}
)}
else
delay = min(Sequin.Time.exponential_backoff(200, attempt, 10_000), remaining_ms)

{:error, reason} ->
{:error, Error.service(service: :meilisearch, message: "Unknown error", details: reason)}
Logger.debug("[Meilisearch] Task #{task_id} has not succeeded, retrying in #{delay}ms (attempt #{attempt + 1})")

Process.sleep(delay)
do_wait_for_task(sink, task_id, deadline_ms, attempt + 1, last_status)
end
end

Expand Down
4 changes: 2 additions & 2 deletions test/sequin/meilisearch_client_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ defmodule Sequin.Sinks.Meilisearch.ClientTest do
assert_receive {:task_check, 3}, 2000
end

test "wait_for_task times out after exhausting retries" do
test "wait_for_task times out after exhausting the timeout budget" do
test_pid = self()
call_count = :counters.new(1, [])
records = [SinkFactory.meilisearch_record()]
Expand Down Expand Up @@ -339,7 +339,7 @@ defmodule Sequin.Sinks.Meilisearch.ClientTest do
assert error.details.task_id == 456
assert error.details.last_status == "processing"

# Verify we made all 6 attempts (1 initial + 5 retries)
# Verify we polled until the total timeout budget was exhausted.
for i <- 0..5 do
assert_receive {:task_check, ^i}, 3_000
end
Expand Down
Loading