Skip to content

Commit 6811d06

Browse files
authored
Transient error handling and concurrency limit for postgres based indexed storage (#3184)
* Transient error handling and concurrency limit for postgres based indexed storage * Format
1 parent 236b027 commit 6811d06

20 files changed

Lines changed: 1093 additions & 460 deletions

File tree

golem-debugging-service/config/debug-worker-executor.sample.env

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,11 @@ GOLEM__OPLOG__OPLOG_RATE_LIMIT_ENABLED=false
6767
GOLEM__OPLOG__PLUGIN_MAX_COMMIT_COUNT=3
6868
GOLEM__OPLOG__PLUGIN_MAX_ELAPSED_TIME="5s"
6969
GOLEM__OPLOG__DEFAULT_SNAPSHOTTING__TYPE="Disabled"
70+
GOLEM__OPLOG__INDEXED_STORAGE_RETRY__MAX_ATTEMPTS=3
71+
GOLEM__OPLOG__INDEXED_STORAGE_RETRY__MAX_DELAY="1s"
72+
GOLEM__OPLOG__INDEXED_STORAGE_RETRY__MAX_JITTER_FACTOR=0.15
73+
GOLEM__OPLOG__INDEXED_STORAGE_RETRY__MIN_DELAY="100ms"
74+
GOLEM__OPLOG__INDEXED_STORAGE_RETRY__MULTIPLIER=3.0
7075
GOLEM__OPLOG__OPLOG_PROCESSOR_SNAPSHOTTING__TYPE="EveryNInvocation"
7176
GOLEM__OPLOG__OPLOG_PROCESSOR_SNAPSHOTTING__CONFIG__COUNT=10
7277
GOLEM__PUBLIC_WORKER_API__CONNECT_TIMEOUT="10s"
@@ -219,6 +224,11 @@ GOLEM__OPLOG__OPLOG_RATE_LIMIT_ENABLED=false
219224
GOLEM__OPLOG__PLUGIN_MAX_COMMIT_COUNT=3
220225
GOLEM__OPLOG__PLUGIN_MAX_ELAPSED_TIME="5s"
221226
GOLEM__OPLOG__DEFAULT_SNAPSHOTTING__TYPE="Disabled"
227+
GOLEM__OPLOG__INDEXED_STORAGE_RETRY__MAX_ATTEMPTS=3
228+
GOLEM__OPLOG__INDEXED_STORAGE_RETRY__MAX_DELAY="1s"
229+
GOLEM__OPLOG__INDEXED_STORAGE_RETRY__MAX_JITTER_FACTOR=0.15
230+
GOLEM__OPLOG__INDEXED_STORAGE_RETRY__MIN_DELAY="100ms"
231+
GOLEM__OPLOG__INDEXED_STORAGE_RETRY__MULTIPLIER=3.0
222232
GOLEM__OPLOG__OPLOG_PROCESSOR_SNAPSHOTTING__TYPE="EveryNInvocation"
223233
GOLEM__OPLOG__OPLOG_PROCESSOR_SNAPSHOTTING__CONFIG__COUNT=10
224234
GOLEM__PUBLIC_WORKER_API__CONNECT_TIMEOUT="10s"

golem-debugging-service/config/debug-worker-executor.toml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,13 @@ plugin_max_elapsed_time = "5s"
107107
[oplog.default_snapshotting]
108108
type = "Disabled"
109109

110+
[oplog.indexed_storage_retry]
111+
max_attempts = 3
112+
max_delay = "1s"
113+
max_jitter_factor = 0.15
114+
min_delay = "100ms"
115+
multiplier = 3.0
116+
110117
[oplog.oplog_processor_snapshotting]
111118
type = "EveryNInvocation"
112119

@@ -342,6 +349,13 @@ without_time = false
342349
# [oplog.default_snapshotting]
343350
# type = "Disabled"
344351
#
352+
# [oplog.indexed_storage_retry]
353+
# max_attempts = 3
354+
# max_delay = "1s"
355+
# max_jitter_factor = 0.15
356+
# min_delay = "100ms"
357+
# multiplier = 3.0
358+
#
345359
# [oplog.oplog_processor_snapshotting]
346360
# type = "EveryNInvocation"
347361
#

golem-service-base/src/repo/mod.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,19 @@ impl RepoError {
3737
pub fn is_unique_violation(&self) -> bool {
3838
matches!(self, RepoError::UniqueViolation(_))
3939
}
40+
41+
pub fn is_transient(&self) -> bool {
42+
match self {
43+
RepoError::InternalError(err) => {
44+
if let Some(sqlx_err) = err.downcast_ref::<sqlx::Error>() {
45+
matches!(sqlx_err, sqlx::Error::PoolTimedOut | sqlx::Error::Io(_))
46+
} else {
47+
false
48+
}
49+
}
50+
RepoError::UniqueViolation(_) => false,
51+
}
52+
}
4053
}
4154

4255
error_forwarding!(RepoError);

golem-worker-executor/config/worker-executor.sample.env

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@ GOLEM__OPLOG__OPLOG_RATE_LIMIT_ENABLED=false
8383
GOLEM__OPLOG__PLUGIN_MAX_COMMIT_COUNT=3
8484
GOLEM__OPLOG__PLUGIN_MAX_ELAPSED_TIME="5s"
8585
GOLEM__OPLOG__DEFAULT_SNAPSHOTTING__TYPE="Disabled"
86+
GOLEM__OPLOG__INDEXED_STORAGE_RETRY__MAX_ATTEMPTS=3
87+
GOLEM__OPLOG__INDEXED_STORAGE_RETRY__MAX_DELAY="1s"
88+
GOLEM__OPLOG__INDEXED_STORAGE_RETRY__MAX_JITTER_FACTOR=0.15
89+
GOLEM__OPLOG__INDEXED_STORAGE_RETRY__MIN_DELAY="100ms"
90+
GOLEM__OPLOG__INDEXED_STORAGE_RETRY__MULTIPLIER=3.0
8691
GOLEM__OPLOG__OPLOG_PROCESSOR_SNAPSHOTTING__TYPE="EveryNInvocation"
8792
GOLEM__OPLOG__OPLOG_PROCESSOR_SNAPSHOTTING__CONFIG__COUNT=10
8893
GOLEM__PUBLIC_WORKER_API__CONNECT_TIMEOUT="10s"
@@ -283,6 +288,11 @@ GOLEM__OPLOG__OPLOG_RATE_LIMIT_ENABLED=false
283288
GOLEM__OPLOG__PLUGIN_MAX_COMMIT_COUNT=3
284289
GOLEM__OPLOG__PLUGIN_MAX_ELAPSED_TIME="5s"
285290
GOLEM__OPLOG__DEFAULT_SNAPSHOTTING__TYPE="Disabled"
291+
GOLEM__OPLOG__INDEXED_STORAGE_RETRY__MAX_ATTEMPTS=3
292+
GOLEM__OPLOG__INDEXED_STORAGE_RETRY__MAX_DELAY="1s"
293+
GOLEM__OPLOG__INDEXED_STORAGE_RETRY__MAX_JITTER_FACTOR=0.15
294+
GOLEM__OPLOG__INDEXED_STORAGE_RETRY__MIN_DELAY="100ms"
295+
GOLEM__OPLOG__INDEXED_STORAGE_RETRY__MULTIPLIER=3.0
286296
GOLEM__OPLOG__OPLOG_PROCESSOR_SNAPSHOTTING__TYPE="EveryNInvocation"
287297
GOLEM__OPLOG__OPLOG_PROCESSOR_SNAPSHOTTING__CONFIG__COUNT=10
288298
GOLEM__PUBLIC_WORKER_API__CONNECT_TIMEOUT="10s"
@@ -453,6 +463,11 @@ GOLEM__OPLOG__OPLOG_RATE_LIMIT_ENABLED=false
453463
GOLEM__OPLOG__PLUGIN_MAX_COMMIT_COUNT=3
454464
GOLEM__OPLOG__PLUGIN_MAX_ELAPSED_TIME="5s"
455465
GOLEM__OPLOG__DEFAULT_SNAPSHOTTING__TYPE="Disabled"
466+
GOLEM__OPLOG__INDEXED_STORAGE_RETRY__MAX_ATTEMPTS=3
467+
GOLEM__OPLOG__INDEXED_STORAGE_RETRY__MAX_DELAY="1s"
468+
GOLEM__OPLOG__INDEXED_STORAGE_RETRY__MAX_JITTER_FACTOR=0.15
469+
GOLEM__OPLOG__INDEXED_STORAGE_RETRY__MIN_DELAY="100ms"
470+
GOLEM__OPLOG__INDEXED_STORAGE_RETRY__MULTIPLIER=3.0
456471
GOLEM__OPLOG__OPLOG_PROCESSOR_SNAPSHOTTING__TYPE="EveryNInvocation"
457472
GOLEM__OPLOG__OPLOG_PROCESSOR_SNAPSHOTTING__CONFIG__COUNT=10
458473
GOLEM__PUBLIC_WORKER_API__CONNECT_TIMEOUT="10s"

golem-worker-executor/config/worker-executor.toml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,13 @@ plugin_max_elapsed_time = "5s"
135135
[oplog.default_snapshotting]
136136
type = "Disabled"
137137

138+
[oplog.indexed_storage_retry]
139+
max_attempts = 3
140+
max_delay = "1s"
141+
max_jitter_factor = 0.15
142+
min_delay = "100ms"
143+
multiplier = 3.0
144+
138145
[oplog.oplog_processor_snapshotting]
139146
type = "EveryNInvocation"
140147

@@ -441,6 +448,13 @@ without_time = false
441448
# [oplog.default_snapshotting]
442449
# type = "Disabled"
443450
#
451+
# [oplog.indexed_storage_retry]
452+
# max_attempts = 3
453+
# max_delay = "1s"
454+
# max_jitter_factor = 0.15
455+
# min_delay = "100ms"
456+
# multiplier = 3.0
457+
#
444458
# [oplog.oplog_processor_snapshotting]
445459
# type = "EveryNInvocation"
446460
#
@@ -717,6 +731,13 @@ without_time = false
717731
# [oplog.default_snapshotting]
718732
# type = "Disabled"
719733
#
734+
# [oplog.indexed_storage_retry]
735+
# max_attempts = 3
736+
# max_delay = "1s"
737+
# max_jitter_factor = 0.15
738+
# min_delay = "100ms"
739+
# multiplier = 3.0
740+
#
720741
# [oplog.oplog_processor_snapshotting]
721742
# type = "EveryNInvocation"
722743
#

golem-worker-executor/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -699,6 +699,7 @@ pub async fn create_worker_executor_impl<
699699
let svc: Arc<dyn OplogArchiveService> = Arc::new(CompressedOplogArchiveService::new(
700700
indexed_storage.clone(),
701701
idx,
702+
golem_config.oplog.indexed_storage_retry.clone(),
702703
));
703704
oplog_archives.push(svc);
704705
}
@@ -717,6 +718,7 @@ pub async fn create_worker_executor_impl<
717718
golem_config.oplog.max_operations_before_commit,
718719
golem_config.oplog.max_operations_before_commit_ephemeral,
719720
golem_config.oplog.max_payload_size,
721+
golem_config.oplog.indexed_storage_retry.clone(),
720722
)
721723
.await,
722724
),
@@ -728,6 +730,7 @@ pub async fn create_worker_executor_impl<
728730
golem_config.oplog.max_operations_before_commit,
729731
golem_config.oplog.max_operations_before_commit_ephemeral,
730732
golem_config.oplog.max_payload_size,
733+
golem_config.oplog.indexed_storage_retry.clone(),
731734
)
732735
.await,
733736
);

golem-worker-executor/src/metrics.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,12 @@ pub mod oplog {
303303
&["account_id", "environment_id"]
304304
)
305305
.unwrap();
306+
static ref OPLOG_STORAGE_RETRY_TOTAL: CounterVec = register_counter_vec!(
307+
"oplog_storage_retry_total",
308+
"Number of oplog storage operation retries due to transient errors",
309+
&["op"]
310+
)
311+
.unwrap();
306312
}
307313

308314
pub fn record_oplog_call(api_name: &'static str) {
@@ -315,6 +321,12 @@ pub mod oplog {
315321
.inc();
316322
}
317323

324+
pub fn record_oplog_storage_retry(op_name: &str) {
325+
OPLOG_STORAGE_RETRY_TOTAL
326+
.with_label_values(&[op_name])
327+
.inc();
328+
}
329+
318330
pub fn record_scheduled_archive(duration: std::time::Duration, has_more: bool) {
319331
SCHEDULED_ARCHIVE_TIME
320332
.with_label_values(if has_more {

golem-worker-executor/src/services/golem_config.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,10 @@ pub struct OplogConfig {
502502
/// (`oplog_writes_per_second`). Defaults to false (disabled).
503503
#[serde(default)]
504504
pub oplog_rate_limit_enabled: bool,
505+
/// Retry configuration for transient indexed-storage errors (pool exhaustion,
506+
/// connection resets). Defaults to 3 attempts, 100 ms–1 s exponential backoff.
507+
#[serde(default = "default_oplog_indexed_storage_retry")]
508+
pub indexed_storage_retry: RetryConfig,
505509
}
506510

507511
impl SafeDisplay for OplogConfig {
@@ -557,10 +561,19 @@ impl SafeDisplay for OplogConfig {
557561
"oplog rate limit enabled: {}",
558562
self.oplog_rate_limit_enabled
559563
);
564+
let _ = writeln!(
565+
&mut result,
566+
"indexed storage retry: {:?}",
567+
self.indexed_storage_retry
568+
);
560569
result
561570
}
562571
}
563572

573+
fn default_oplog_indexed_storage_retry() -> RetryConfig {
574+
RetryConfig::max_attempts_3()
575+
}
576+
564577
#[derive(Clone, Debug, Serialize, Deserialize)]
565578
#[serde(tag = "type", content = "config")]
566579
pub enum KeyValueStorageConfig {
@@ -812,6 +825,8 @@ pub struct IndexedStoragePostgresConfig {
812825
pub postgres: DbPostgresConfig,
813826
#[serde(default = "default_indexed_storage_postgres_drop_prefix_delete_batch_size")]
814827
pub drop_prefix_delete_batch_size: u64,
828+
#[serde(default)]
829+
pub max_concurrent_ops: Option<u32>,
815830
}
816831

817832
impl SafeDisplay for IndexedStoragePostgresConfig {
@@ -823,6 +838,11 @@ impl SafeDisplay for IndexedStoragePostgresConfig {
823838
"drop prefix delete batch size: {}",
824839
self.drop_prefix_delete_batch_size
825840
);
841+
let _ = writeln!(
842+
&mut result,
843+
"max concurrent ops: {:?}",
844+
self.max_concurrent_ops
845+
);
826846
result
827847
}
828848
}
@@ -1330,6 +1350,7 @@ impl Default for OplogConfig {
13301350
plugin_max_commit_count: 3,
13311351
plugin_max_elapsed_time: Duration::from_secs(5),
13321352
oplog_rate_limit_enabled: false,
1353+
indexed_storage_retry: default_oplog_indexed_storage_retry(),
13331354
}
13341355
}
13351356
}

0 commit comments

Comments
 (0)