Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/api/fossa.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ pub async fn upload_scan(
opts: &Config,
project: &ProjectMetadata,
cli: &CliMetadata,
source_units: SourceUnits,
source_units: &SourceUnits,
) -> Result<Locator, Error> {
let url = opts.endpoint().join("api/builds/custom")?;

Expand Down Expand Up @@ -360,7 +360,7 @@ pub async fn upload_scan(

run_request::<UploadResponse>(req)
.await
.change_context_lazy(|| Error::upload_scan(&locator, &source_units))?
.change_context_lazy(|| Error::upload_scan(&locator, source_units))?
.into()
}

Expand Down
2 changes: 1 addition & 1 deletion src/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl Config {
tracing_subscriber::fmt::layer()
.compact()
.with_file(false)
.with_level(false)
.with_level(true)
.with_line_number(false)
.with_target(false)
.with_writer(std::io::stderr)
Expand Down
123 changes: 80 additions & 43 deletions src/subcommand/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
use std::sync::Arc;
use std::time::Duration;

use error_stack::{Result, ResultExt};
use futures::TryStreamExt;
use error_stack::{Context, Result, ResultExt};
use futures::{future::try_join_all, try_join, StreamExt};
use futures::{Future, TryStreamExt};
use indoc::indoc;
use serde::{Deserialize, Serialize};
use tap::TapFallible;
Expand Down Expand Up @@ -203,33 +203,21 @@ async fn poll_integration<D: Database>(
let remote = integration.remote().to_owned();
let poll_interval = integration.poll_interval().as_duration();

// [`Retry`] needs a function that runs without any arguments to perform the retry, so turn the method into a closure.
let get_references = || async {
match integration.references().await {
Ok(success) => Ok(success),
Err(err) => {
warn!("attempt to poll integration at {remote} failed: {err:#}");
Err(err)
}
}
};

loop {
info!("Polling '{integration}'");

// Given that this operation is not latency sensitive, and temporary network issues can interfere,
// retry several times before permanently failing since a permanent failure means Broker shuts down
// entirely.
let strategy = ExponentialBackoff::from_millis(1000).map(jitter).take(10);
let references = Retry::spawn(strategy, get_references)
.await
.change_context(Error::PollIntegration)
.describe_lazy(|| format!("poll for changes at {remote} in integration: {integration}"))
.help(indoc! {"
Issues with this process are usually related to network errors, but may be due to misconfiguration.
Each time this polling operation was attempted, it logged a warning; please review those
warnings in the logs for more details.
"})?;
let references = retry_default(format!("poll '{integration}'"), || async {
match integration.references().await {
Ok(success) => Ok(success),
Err(err) => {
warn!("attempt to poll integration at {remote} failed: {err:#}");
Err(err)
}
}
})
.await
.change_context(Error::PollIntegration)
.describe_lazy(|| format!("poll for changes at {remote} in integration: {integration}"))?;

// Filter to the list of references that are new since we last saw them.
let references = futures::stream::iter(references.into_iter())
Expand Down Expand Up @@ -310,11 +298,14 @@ async fn scan_git_references<D: Database>(
mut receiver: Receiver<ScanGitVCSReference>,
mut uploader: Sender<UploadSourceUnits>,
) -> Result<(), Error> {
let cli = fossa_cli::find_or_download(
&ctx.app,
ctx.config.debug().location(),
DesiredVersion::Latest,
)
let cli = retry_default("download_fossa_cli", || async {
fossa_cli::find_or_download(
&ctx.app,
ctx.config.debug().location(),
DesiredVersion::Latest,
)
.await
})
.await
.change_context(Error::DownloadFossaCli)
.describe("Broker relies on fossa-cli to perform analysis of your projects")?;
Expand Down Expand Up @@ -346,21 +337,24 @@ async fn scan_git_reference<D: Database>(
span_record!(scan_id, &job.scan_id);

// Clone the reference into a temporary directory.
let cloned_location = job
.integration
.clone_reference(&job.reference)
.await
.change_context_lazy(|| Error::CloneReference(job.reference.clone()))?;
let cloned_location = retry_default(
format!("Clone '{}' at '{}'", job.integration, job.reference),
|| async { job.integration.clone_reference(&job.reference).await },
)
.await
.change_context_lazy(|| Error::CloneReference(job.reference.clone()))?;

// Record the CLI version for debugging purposes.
let cli_version = cli.version().await.change_context(Error::RunFossaCli)?;
span_record!(cli_version, display cli_version);

// Run the scan.
let source_units = cli
.analyze(&job.scan_id, cloned_location.path())
.await
.change_context(Error::RunFossaCli)?;
let source_units = retry_default(
format!("Analyze '{}' at '{}'", job.integration, job.reference),
|| async { cli.analyze(&job.scan_id, cloned_location.path()).await },
)
.await
.change_context(Error::RunFossaCli)?;

info!("Scanned '{}' at '{}'", job.integration, job.reference);
Ok(UploadSourceUnits {
Expand All @@ -384,13 +378,56 @@ async fn upload_scans<D: Database>(
let meta = ProjectMetadata::new(&job.integration, &job.reference);
info!("Uploading scan for project: '{meta}'");

let locator = fossa::upload_scan(ctx.config.fossa_api(), &meta, &job.cli, job.source_units)
.await
.change_context(Error::TaskHandle)?;
let locator = retry_default(format!("Upload results for '{meta}'"), || async {
fossa::upload_scan(ctx.config.fossa_api(), &meta, &job.cli, &job.source_units).await
})
.await
.change_context(Error::TaskHandle)?;

debug!(scan_id = %job.scan_id, locator = %locator, "Uploaded scan");
info!("Uploaded scan for project '{meta}' as locator: '{locator}'");

guard.commit().change_context(Error::TaskComplete)?;
}
}

/// Retries the provided action with the provided label, using the default retry strategy.
///
/// The default retry strategy:
/// - Retries using an exponential backoff delay, with a jitter to prevent thundering herds.
/// - The delay starts at 1 second.
/// - Retries a total of ten times.
///
/// Each time the action fails, a warning is output using the label,
/// in the form `{label}: attempt failed, will retry. error: {error}`.
///
/// If the overall process fails, the returned error has help text attached instructing users
/// to review the warnings in the logs for troubleshooting.
///
/// If the process eventually succeeds, no error is returned, but warnings are still emitted.
async fn retry_default<S, A, F, T, E>(label: S, action: A) -> Result<T, E>
where
S: AsRef<str>,
A: Fn() -> F,
F: Future<Output = Result<T, E>>,
E: Context,
{
let wrapped_action = || async {
match action().await {
Ok(result) => Ok(result),
Err(err) => {
warn!(
"{}: attempt failed, will retry. error: {err:#}",
label.as_ref()
);
Err(err)
}
}
};

let strategy = ExponentialBackoff::from_millis(1000).map(jitter).take(10);
Retry::spawn(strategy, wrapped_action).await.help(indoc! {"
Each time this operation was attempted, it logged a warning; please review those
warnings in the logs for more details.
"})
}