Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 102 additions & 16 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,27 @@ macro_rules! impl_inner_call {
},
Err(e) => {
let failed_attempts = errors.len() + 1;

if retries_exhausted(failed_attempts, $self.config.retry()) {
warn!("call '{}' failed after {} attempts", stringify!($name), failed_attempts);
return Err(Error::AllAttemptsErrored(errors));
let configured_retries = $self.config.retry();

warn!(
"call '{}' failed with {}, retry: {}/{}",
stringify!($name),
e,
failed_attempts,
configured_retries
);

if let Err(err) =
record_attempt_error(&mut errors, e, configured_retries)
{
warn!(
"call '{}' failed after {} attempts",
stringify!($name),
failed_attempts
);
return Err(err);
}

warn!("call '{}' failed with {}, retry: {}/{}", stringify!($name), e, failed_attempts, $self.config.retry());

errors.push(e);

// Only one thread will try to recreate the client getting the write lock,
// other eventual threads will get Err and will block at the beginning of
// previous loop when trying to read()
Expand All @@ -84,15 +95,24 @@ macro_rules! impl_inner_call {
},
Err(e) => {
let failed_attempts = errors.len() + 1;

if retries_exhausted(failed_attempts, $self.config.retry()) {
warn!("re-creating client failed after {} attempts", failed_attempts);
return Err(Error::AllAttemptsErrored(errors));
let configured_retries = $self.config.retry();

warn!(
"re-creating client failed with {}, retry: {}/{}",
e,
failed_attempts,
configured_retries
);

if let Err(err) =
record_attempt_error(&mut errors, e, configured_retries)
{
warn!(
"re-creating client failed after {} attempts",
failed_attempts
);
return Err(err);
}

warn!("re-creating client failed with {}, retry: {}/{}", e, failed_attempts, $self.config.retry());

errors.push(e);
}
}
}
Expand All @@ -103,6 +123,22 @@ macro_rules! impl_inner_call {
}
}

fn record_attempt_error(
errors: &mut Vec<Error>,
error: Error,
configured_retries: u8,
) -> Result<usize, Error> {
let failed_attempts = errors.len() + 1;

errors.push(error);

if retries_exhausted(failed_attempts, configured_retries) {
Err(Error::AllAttemptsErrored(std::mem::take(errors)))
} else {
Ok(failed_attempts)
}
}

fn retries_exhausted(failed_attempts: usize, configured_retries: u8) -> bool {
match u8::try_from(failed_attempts) {
Ok(failed_attempts) => failed_attempts > configured_retries,
Expand Down Expand Up @@ -464,4 +500,54 @@ mod tests {

assert!(!exhausted)
}

#[test]
fn exhausted_attempt_includes_current_error() {
let mut errors = Vec::new();

let err = record_attempt_error(
&mut errors,
Error::Message("current".to_string()),
0,
)
.unwrap_err();

let Error::AllAttemptsErrored(attempts) = err else {
panic!("expected AllAttemptsErrored");
};

assert_eq!(attempts.len(), 1);
assert!(
matches!(&attempts[0], Error::Message(message) if message == "current")
);
assert!(errors.is_empty());
}

#[test]
fn retry_errors_accumulate_until_exhausted() {
let mut errors = Vec::new();

assert_eq!(
record_attempt_error(&mut errors, Error::Message("first".to_string()), 1).unwrap(),
1
);

let err = record_attempt_error(
&mut errors,
Error::Message("second".to_string()),
1,
)
.unwrap_err();

let Error::AllAttemptsErrored(attempts) = err else {
panic!("expected AllAttemptsErrored");
};

assert_eq!(attempts.len(), 2);
assert!(matches!(&attempts[0], Error::Message(message) if message == "first"));
assert!(
matches!(&attempts[1], Error::Message(message) if message == "second")
);
assert!(errors.is_empty());
}
}