Skip to content

Commit 33689c3

Browse files
committed
Support bounded parallel chunk transfers
1 parent a19391c commit 33689c3

6 files changed

Lines changed: 946 additions & 53 deletions

File tree

src/main/java/com/google/devtools/build/lib/remote/ChunkedBlobDownloader.java

Lines changed: 127 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
package com.google.devtools.build.lib.remote;
1616

17+
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
1718
import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
1819

1920
import build.bazel.remote.execution.v2.Digest;
@@ -23,10 +24,19 @@
2324
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
2425
import java.io.IOException;
2526
import java.io.OutputStream;
27+
import java.util.ArrayList;
28+
import java.util.HashMap;
2629
import java.util.List;
30+
import java.util.Map;
31+
import java.util.concurrent.LinkedBlockingQueue;
2732

28-
/** Downloads blobs by sequentially fetching chunks via the SplitBlob API. */
33+
/** Downloads blobs by fetching chunks through a per-blob sliding window via the SplitBlob API. */
2934
public class ChunkedBlobDownloader {
35+
// Guard against pathological fanout from a single large chunked blob. This is only a per-blob
36+
// cap; chunk requests still flow through CombinedCache and the shared remote cache transport
37+
// stack below it, which is what bounds active remote RPC concurrency across blobs.
38+
private static final int MAX_IN_FLIGHT_CHUNK_DOWNLOADS = 32;
39+
3040
private final GrpcCacheClient grpcCacheClient;
3141
private final CombinedCache combinedCache;
3242

@@ -37,8 +47,8 @@ public ChunkedBlobDownloader(GrpcCacheClient grpcCacheClient, CombinedCache comb
3747

3848
/**
3949
* Downloads a blob using chunked download via the SplitBlob API. This should be called with
40-
* virtual threads, as it blocks on futures via {@link
41-
* com.google.devtools.build.lib.remote.util.Utils#getFromFuture}.
50+
* virtual threads, as it may block while waiting for chunk metadata and completed chunk
51+
* downloads.
4252
*/
4353
public void downloadChunked(
4454
RemoteActionExecutionContext context, Digest blobDigest, OutputStream out)
@@ -64,11 +74,123 @@ private List<Digest> getChunkDigests(RemoteActionExecutionContext context, Diges
6474
return chunkDigests;
6575
}
6676

77+
private static final class PendingDownload {
78+
private final Digest digest;
79+
private final ListenableFuture<byte[]> future;
80+
private final List<Integer> chunkIndices = new ArrayList<>(1);
81+
82+
PendingDownload(Digest digest, ListenableFuture<byte[]> future, int firstChunkIndex) {
83+
this.digest = digest;
84+
this.future = future;
85+
chunkIndices.add(firstChunkIndex);
86+
}
87+
88+
void addChunkIndex(int chunkIndex) {
89+
chunkIndices.add(chunkIndex);
90+
}
91+
92+
Digest digest() {
93+
return digest;
94+
}
95+
96+
ListenableFuture<byte[]> future() {
97+
return future;
98+
}
99+
100+
List<Integer> chunkIndices() {
101+
return chunkIndices;
102+
}
103+
}
104+
67105
private void downloadAndReassembleChunks(
68106
RemoteActionExecutionContext context, List<Digest> chunkDigests, OutputStream out)
69107
throws IOException, InterruptedException {
70-
for (Digest chunkDigest : chunkDigests) {
71-
getFromFuture(combinedCache.downloadBlob(context, chunkDigest, out));
108+
new DownloadSession(context, chunkDigests, out).run();
109+
}
110+
111+
private final class DownloadSession {
112+
private final LinkedBlockingQueue<PendingDownload> completedDownloads =
113+
new LinkedBlockingQueue<>();
114+
private final Map<Digest, PendingDownload> activeDownloads =
115+
new HashMap<>(MAX_IN_FLIGHT_CHUNK_DOWNLOADS);
116+
private final Map<Integer, byte[]> readyChunks =
117+
new HashMap<>(MAX_IN_FLIGHT_CHUNK_DOWNLOADS);
118+
private final RemoteActionExecutionContext context;
119+
private final List<Digest> chunkDigests;
120+
private final OutputStream out;
121+
private int nextToStart = 0;
122+
private int nextToWrite = 0;
123+
124+
DownloadSession(
125+
RemoteActionExecutionContext context, List<Digest> chunkDigests, OutputStream out) {
126+
this.context = context;
127+
this.chunkDigests = chunkDigests;
128+
this.out = out;
129+
}
130+
131+
void run() throws IOException, InterruptedException {
132+
try {
133+
fillWindow();
134+
while (nextToWrite < chunkDigests.size()) {
135+
awaitCompletedDownload();
136+
fillWindow();
137+
drainReadyChunks();
138+
}
139+
} catch (IOException | InterruptedException | RuntimeException | Error e) {
140+
cancelAllDownloads();
141+
throw e;
142+
}
143+
}
144+
145+
private void fillWindow() {
146+
while (nextToStart < chunkDigests.size()) {
147+
Digest chunkDigest = chunkDigests.get(nextToStart);
148+
PendingDownload existing = activeDownloads.get(chunkDigest);
149+
if (existing != null) {
150+
existing.addChunkIndex(nextToStart);
151+
nextToStart++;
152+
continue;
153+
}
154+
if (activeDownloads.size() >= MAX_IN_FLIGHT_CHUNK_DOWNLOADS) {
155+
return;
156+
}
157+
startDownload(chunkDigest, nextToStart);
158+
nextToStart++;
159+
}
160+
}
161+
162+
private void startDownload(Digest chunkDigest, int chunkIndex) {
163+
PendingDownload download =
164+
new PendingDownload(
165+
chunkDigest, combinedCache.downloadBlob(context, chunkDigest), chunkIndex);
166+
activeDownloads.put(chunkDigest, download);
167+
download.future().addListener(() -> completedDownloads.add(download), directExecutor());
168+
}
169+
170+
private void awaitCompletedDownload() throws IOException, InterruptedException {
171+
PendingDownload download = completedDownloads.take();
172+
activeDownloads.remove(download.digest());
173+
byte[] chunkData = getFromFuture(download.future());
174+
for (int chunkIndex : download.chunkIndices()) {
175+
readyChunks.put(chunkIndex, chunkData);
176+
}
177+
}
178+
179+
private void drainReadyChunks() throws IOException {
180+
while (true) {
181+
byte[] chunk = readyChunks.remove(nextToWrite);
182+
if (chunk == null) {
183+
return;
184+
}
185+
out.write(chunk);
186+
nextToWrite++;
187+
}
188+
}
189+
190+
private void cancelAllDownloads() {
191+
for (PendingDownload download : activeDownloads.values()) {
192+
download.future().cancel(/* mayInterruptIfRunning= */ true);
193+
}
72194
}
73195
}
74196
}

src/main/java/com/google/devtools/build/lib/remote/ChunkedBlobUploader.java

Lines changed: 93 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@
1414

1515
package com.google.devtools.build.lib.remote;
1616

17+
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
1718
import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
1819

1920
import build.bazel.remote.execution.v2.Digest;
2021
import com.google.common.collect.ImmutableSet;
2122
import com.google.common.io.ByteStreams;
23+
import com.google.common.util.concurrent.ListenableFuture;
2224
import com.google.devtools.build.lib.remote.chunking.ChunkingConfig;
2325
import com.google.devtools.build.lib.remote.chunking.FastCdcChunker;
2426
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
@@ -30,6 +32,7 @@
3032
import java.util.HashSet;
3133
import java.util.List;
3234
import java.util.Set;
35+
import java.util.concurrent.LinkedBlockingQueue;
3336

3437
/**
3538
* Uploads blobs in chunks using Content-Defined Chunking with FastCDC 2020.
@@ -44,6 +47,10 @@
4447
* </ol>
4548
*/
4649
public class ChunkedBlobUploader {
50+
// Guard against pathological fanout from a single large chunked blob. This is only a per-blob
51+
// cap; chunk uploads still flow through CombinedCache and the shared remote cache transport
52+
// stack below it, which is what bounds active remote RPC concurrency across blobs.
53+
private static final int MAX_IN_FLIGHT_CHUNK_UPLOADS = 32;
4754

4855
private final GrpcCacheClient grpcCacheClient;
4956
private final CombinedCache combinedCache;
@@ -104,18 +111,95 @@ private void uploadMissingChunks(
104111
if (missingDigests.isEmpty()) {
105112
return;
106113
}
114+
new UploadSession(context, missingDigests, chunkDigests).run(file);
115+
}
107116

108-
Set<Digest> uploaded = new HashSet<>();
109-
try (InputStream input = file.getInputStream()) {
110-
for (Digest chunkDigest : chunkDigests) {
111-
if (missingDigests.contains(chunkDigest) && uploaded.add(chunkDigest)) {
112-
ByteString.Output out = ByteString.newOutput((int) chunkDigest.getSizeBytes());
113-
ByteStreams.limit(input, chunkDigest.getSizeBytes()).transferTo(out);
114-
getFromFuture(combinedCache.uploadBlob(context, chunkDigest, out.toByteString()));
115-
} else {
116-
input.skipNBytes(chunkDigest.getSizeBytes());
117+
private final class UploadSession {
118+
private final LinkedBlockingQueue<ListenableFuture<Void>> completedUploads =
119+
new LinkedBlockingQueue<>();
120+
private final Set<ListenableFuture<Void>> inFlightUploads =
121+
new HashSet<>(MAX_IN_FLIGHT_CHUNK_UPLOADS);
122+
private final Set<Digest> scheduledDigests = new HashSet<>();
123+
private final RemoteActionExecutionContext context;
124+
private final ImmutableSet<Digest> missingDigests;
125+
private final List<Digest> chunkDigests;
126+
127+
UploadSession(
128+
RemoteActionExecutionContext context,
129+
ImmutableSet<Digest> missingDigests,
130+
List<Digest> chunkDigests) {
131+
this.context = context;
132+
this.missingDigests = missingDigests;
133+
this.chunkDigests = chunkDigests;
134+
}
135+
136+
void run(Path file) throws IOException, InterruptedException {
137+
try (InputStream input = file.getInputStream()) {
138+
for (Digest chunkDigest : chunkDigests) {
139+
drainCompletedUploads();
140+
if (!shouldScheduleUpload(chunkDigest)) {
141+
skipChunk(input, chunkDigest);
142+
continue;
143+
}
144+
if (inFlightUploads.size() >= MAX_IN_FLIGHT_CHUNK_UPLOADS) {
145+
awaitCompletedUpload();
146+
}
147+
startUpload(chunkDigest, readChunk(input, chunkDigest));
148+
}
149+
while (!inFlightUploads.isEmpty()) {
150+
awaitCompletedUpload();
117151
}
152+
} catch (IOException | InterruptedException | RuntimeException | Error e) {
153+
cancelAllUploads();
154+
throw e;
118155
}
119156
}
157+
158+
private boolean shouldScheduleUpload(Digest chunkDigest) {
159+
return missingDigests.contains(chunkDigest) && scheduledDigests.add(chunkDigest);
160+
}
161+
162+
private void startUpload(Digest chunkDigest, ByteString chunkData) {
163+
ListenableFuture<Void> upload = combinedCache.uploadBlob(context, chunkDigest, chunkData);
164+
inFlightUploads.add(upload);
165+
upload.addListener(() -> completedUploads.add(upload), directExecutor());
166+
}
167+
168+
private void drainCompletedUploads() throws IOException, InterruptedException {
169+
while (true) {
170+
ListenableFuture<Void> upload = completedUploads.poll();
171+
if (upload == null) {
172+
return;
173+
}
174+
finishUpload(upload);
175+
}
176+
}
177+
178+
private void awaitCompletedUpload() throws IOException, InterruptedException {
179+
finishUpload(completedUploads.take());
180+
drainCompletedUploads();
181+
}
182+
183+
private void finishUpload(ListenableFuture<Void> upload)
184+
throws IOException, InterruptedException {
185+
inFlightUploads.remove(upload);
186+
getFromFuture(upload);
187+
}
188+
189+
private void cancelAllUploads() {
190+
for (ListenableFuture<Void> upload : inFlightUploads) {
191+
upload.cancel(/* mayInterruptIfRunning= */ true);
192+
}
193+
}
194+
}
195+
196+
private static void skipChunk(InputStream input, Digest chunkDigest) throws IOException {
197+
input.skipNBytes(chunkDigest.getSizeBytes());
198+
}
199+
200+
private static ByteString readChunk(InputStream input, Digest chunkDigest) throws IOException {
201+
ByteString.Output out = ByteString.newOutput((int) chunkDigest.getSizeBytes());
202+
ByteStreams.limit(input, chunkDigest.getSizeBytes()).transferTo(out);
203+
return out.toByteString();
120204
}
121205
}

src/test/java/com/google/devtools/build/lib/remote/BUILD

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
load("@rules_java//java:defs.bzl", "java_library", "java_test")
2+
load("//src:java_opt_binary.bzl", "java_opt_binary")
23

34
package(
45
default_applicable_licenses = ["//:license"],
@@ -65,6 +66,7 @@ java_library(
6566
"RemoteActionFileSystemTestBase.java",
6667
"BuildWithoutTheBytesIntegrationTest.java",
6768
"BuildWithoutTheBytesIntegrationTestBase.java",
69+
"ChunkedTransferBenchmark.java",
6870
"ChunkedCacheIntegrationTest.java",
6971
"ChunkedDiskCacheIntegrationTest.java",
7072
"DiskCacheIntegrationTest.java",
@@ -251,6 +253,27 @@ java_test(
251253
],
252254
)
253255

256+
java_opt_binary(
257+
name = "ChunkedTransferBenchmark",
258+
srcs = ["ChunkedTransferBenchmark.java"],
259+
main_class = "org.openjdk.jmh.Main",
260+
deps = [
261+
"@com_google_protobuf//java/core:lite_runtime_only",
262+
"//src/main/java/com/google/devtools/build/lib/clock",
263+
"//src/main/java/com/google/devtools/build/lib/remote:combined_cache",
264+
"//src/main/java/com/google/devtools/build/lib/remote:grpc_cache_client",
265+
"//src/main/java/com/google/devtools/build/lib/remote/chunking",
266+
"//src/main/java/com/google/devtools/build/lib/remote/common",
267+
"//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils",
268+
"//src/main/java/com/google/devtools/build/lib/vfs",
269+
"//src/main/java/com/google/devtools/build/lib/vfs/inmemoryfs",
270+
"//third_party:guava",
271+
"//third_party:jmh",
272+
"//third_party:mockito",
273+
"@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto",
274+
],
275+
)
276+
254277
java_library(
255278
name = "build_without_the_bytes_integration_test_base",
256279
srcs = [

0 commit comments

Comments
 (0)