Skip to content

Commit 9758ca2

Browse files
committed
Fix deadlock with concurrent identical actions
Address issue #2001 where workers deadlock when numerous identical actions run simultaneously. Root causes and fixes: 1. File permit exhaustion: download_to_directory used unbounded parallelism. Added MAX_CONCURRENT_FILE_OPS=64 with buffer_unordered. 2. Race condition: create_and_add_action checked for duplicates AFTER async work. Added early registration with placeholder BEFORE async work. Fixes #2001
1 parent ae963be commit 9758ca2

1 file changed

Lines changed: 72 additions & 20 deletions

File tree

nativelink-worker/src/running_actions_manager.rs

Lines changed: 72 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,15 @@ const EXIT_CODE_FOR_SIGNAL: i32 = 9;
9191
const DEFAULT_HISTORICAL_RESULTS_STRATEGY: UploadCacheResultsStrategy =
9292
UploadCacheResultsStrategy::FailuresOnly;
9393

94+
/// Maximum number of concurrent file operations during input download.
95+
/// This prevents deadlock from file permit exhaustion when many identical
96+
/// actions run simultaneously (see issue #2001).
97+
///
98+
/// The value is chosen to balance parallelism with permit availability:
99+
/// - High enough to maintain good download performance
100+
/// - Low enough to leave headroom for other concurrent operations
101+
const MAX_CONCURRENT_FILE_OPS: usize = 64;
102+
94103
/// Valid string reasons for a failure.
95104
/// Note: If these change, the documentation should be updated.
96105
#[derive(Debug, Deserialize)]
@@ -110,9 +119,12 @@ struct SideChannelInfo {
110119
failure: Option<SideChannelFailureReason>,
111120
}
112121

113-
/// Aggressively download the digests of files and make a local folder from it. This function
114-
/// will spawn unbounded number of futures to try and get these downloaded. The store itself
115-
/// should be rate limited if spawning too many requests at once is an issue.
122+
/// Download the digests of files and make a local folder from them.
123+
///
124+
/// This function limits concurrent file operations to `MAX_CONCURRENT_FILE_OPS`
125+
/// to prevent deadlock from file permit exhaustion when many identical actions
126+
/// run simultaneously (see issue #2001).
127+
///
116128
/// We require the `FilesystemStore` to be the `fast` store of `FastSlowStore`. This is for
117129
/// efficiency reasons. We will request the `FastSlowStore` to populate the entry then we will
118130
/// assume the `FilesystemStore` has the file available immediately after and hardlink the file
@@ -130,7 +142,10 @@ pub fn download_to_directory<'a>(
130142
let directory = get_and_decode_digest::<ProtoDirectory>(cas_store, digest.into())
131143
.await
132144
.err_tip(|| "Converting digest to Directory")?;
133-
let mut futures = FuturesUnordered::new();
145+
146+
// Collect all futures into a Vec first, then process with bounded concurrency
147+
// to prevent file permit exhaustion deadlock (issue #2001)
148+
let mut all_futures: Vec<BoxFuture<'a, Result<(), Error>>> = Vec::new();
134149

135150
for file in directory.files {
136151
let digest: DigestInfo = file
@@ -147,7 +162,7 @@ pub fn download_to_directory<'a>(
147162
if file.is_executable {
148163
unix_mode = Some(unix_mode.unwrap_or(0o444) | 0o111);
149164
}
150-
futures.push(
165+
all_futures.push(
151166
cas_store
152167
.populate_fast_store(digest.into())
153168
.and_then(move |()| async move {
@@ -160,7 +175,6 @@ pub fn download_to_directory<'a>(
160175
.get_file_entry_for_digest(&digest)
161176
.await
162177
.err_tip(|| "During hard link")?;
163-
// TODO: add a test for #2051: deadlock with large number of files
164178
let src_path = file_entry.get_file_path_locked(|src| async move { Ok(PathBuf::from(src)) }).await?;
165179
fs::hard_link(&src_path, &dest)
166180
.await
@@ -224,7 +238,7 @@ pub fn download_to_directory<'a>(
224238
.try_into()
225239
.err_tip(|| "In Directory::file::digest")?;
226240
let new_directory_path = format!("{}/{}", current_directory, directory.name);
227-
futures.push(
241+
all_futures.push(
228242
async move {
229243
fs::create_dir(&new_directory_path)
230244
.await
@@ -246,7 +260,7 @@ pub fn download_to_directory<'a>(
246260
#[cfg(target_family = "unix")]
247261
for symlink_node in directory.symlinks {
248262
let dest = format!("{}/{}", current_directory, symlink_node.name);
249-
futures.push(
263+
all_futures.push(
250264
async move {
251265
fs::symlink(&symlink_node.target, &dest).await.err_tip(|| {
252266
format!(
@@ -260,7 +274,15 @@ pub fn download_to_directory<'a>(
260274
);
261275
}
262276

263-
while futures.try_next().await?.is_some() {}
277+
// Process all futures with bounded concurrency to prevent permit exhaustion
278+
// This is the key fix for issue #2001: by limiting concurrent file operations,
279+
// we prevent all permits from being exhausted simultaneously when many identical
280+
// actions are running.
281+
futures::stream::iter(all_futures)
282+
.buffer_unordered(MAX_CONCURRENT_FILE_OPS)
283+
.try_collect::<Vec<_>>()
284+
.await?;
285+
264286
Ok(())
265287
}
266288
.boxed()
@@ -2132,8 +2154,43 @@ impl RunningActionsManager for RunningActionsManagerImpl {
21322154
.queued_timestamp
21332155
.and_then(|time| time.try_into().ok())
21342156
.unwrap_or(SystemTime::UNIX_EPOCH);
2135-
let operation_id = start_execute
2157+
let operation_id: OperationId = start_execute
21362158
.operation_id.as_str().into();
2159+
2160+
// EARLY REGISTRATION: Prevent race condition (issue #2001)
2161+
// Register a placeholder BEFORE doing any async work to ensure
2162+
// duplicate operations are rejected immediately. This prevents
2163+
// the scenario where two identical actions slip through because
2164+
// both pass the duplicate check before either registers.
2165+
{
2166+
let mut running_actions = self.running_actions.lock();
2167+
if let Some(existing_weak) = running_actions.get(&operation_id) {
2168+
if existing_weak.upgrade().is_some() {
2169+
return Err(make_err!(
2170+
Code::AlreadyExists,
2171+
"Action with operation_id {} is already running",
2172+
operation_id
2173+
));
2174+
}
2175+
}
2176+
// Insert placeholder to prevent duplicates during async work
2177+
running_actions.insert(operation_id.clone(), Weak::new());
2178+
}
2179+
2180+
// Cleanup placeholder on error using a guard
2181+
let operation_id_for_cleanup = operation_id.clone();
2182+
let self_for_cleanup = self.clone();
2183+
let cleanup_guard = guard((), move |()| {
2184+
// Remove placeholder if we fail before creating the real action
2185+
let mut running_actions = self_for_cleanup.running_actions.lock();
2186+
if let Some(weak) = running_actions.get(&operation_id_for_cleanup) {
2187+
// Only remove if it's still our placeholder (Weak::new())
2188+
if weak.upgrade().is_none() {
2189+
running_actions.remove(&operation_id_for_cleanup);
2190+
}
2191+
}
2192+
});
2193+
21372194
let action_info = self.create_action_info(start_execute, queued_timestamp).await?;
21382195
debug!(
21392196
?action_info,
@@ -2175,18 +2232,13 @@ impl RunningActionsManager for RunningActionsManagerImpl {
21752232
timeout,
21762233
self.clone(),
21772234
));
2235+
2236+
// Defuse the cleanup guard - we succeeded, so don't remove placeholder
2237+
ScopeGuard::into_inner(cleanup_guard);
2238+
2239+
// Replace placeholder with real action reference
21782240
{
21792241
let mut running_actions = self.running_actions.lock();
2180-
// Check if action already exists and is still alive
2181-
if let Some(existing_weak) = running_actions.get(&operation_id) {
2182-
if let Some(_existing_action) = existing_weak.upgrade() {
2183-
return Err(make_err!(
2184-
Code::AlreadyExists,
2185-
"Action with operation_id {} is already running",
2186-
operation_id
2187-
));
2188-
}
2189-
}
21902242
running_actions.insert(operation_id, Arc::downgrade(&running_action));
21912243
}
21922244
Ok(running_action)

0 commit comments

Comments
 (0)