Skip to content
This repository was archived by the owner on Apr 9, 2026. It is now read-only.

Commit 2304adc

Browse files
committed
fix(admin): fixes
1 parent 5ebc6fc commit 2304adc

1 file changed

Lines changed: 99 additions & 18 deletions

File tree

admin/failover_reprocessor.go

Lines changed: 99 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -112,31 +112,40 @@ func NewReprocessingJobManager(config *Config, dbpool *pgxpool.Pool, k8sClient *
112112

113113
// StartJob starts a new reprocessing job
114114
func (m *ReprocessingJobManager) StartJob(config ReprocessingJobConfig) (*ReprocessingJob, error) {
115+
m.Infof("[StartJob] Starting new reprocessing job")
116+
115117
// Validate that either S3 or local path is provided
116118
if config.S3Path == "" && config.LocalPath == "" {
119+
m.Errorf("[StartJob] Validation failed: no path provided")
117120
return nil, fmt.Errorf("either s3_path or local_path must be provided")
118121
}
119122
if config.S3Path != "" && config.LocalPath != "" {
123+
m.Errorf("[StartJob] Validation failed: both paths provided")
120124
return nil, fmt.Errorf("only one of s3_path or local_path can be provided")
121125
}
122126

123127
// Set defaults
124128
if config.BatchSize <= 0 {
125129
config.BatchSize = 100
126130
}
131+
m.Infof("[StartJob] Config validated, batch_size=%d", config.BatchSize)
127132

128133
// Create Kubernetes Job for reprocessing
129134
ctx := context.Background()
130135

131136
// List and prepare files with sizes
137+
m.Infof("[StartJob] Preparing file list from %s%s", config.S3Path, config.LocalPath)
132138
fileItems, err := m.prepareFileList(ctx, config)
133139
if err != nil {
140+
m.Errorf("[StartJob] Failed to prepare file list: %v", err)
134141
return nil, fmt.Errorf("failed to prepare file list: %w", err)
135142
}
136143

137144
if len(fileItems) == 0 {
145+
m.Errorf("[StartJob] No files found to process")
138146
return nil, fmt.Errorf("no files found to process")
139147
}
148+
m.Infof("[StartJob] Found %d files to process", len(fileItems))
140149

141150
// Determine number of workers (default: 1 worker per 10 files, max 50 workers)
142151
workerCount := (len(fileItems) + 9) / 10
@@ -146,6 +155,7 @@ func (m *ReprocessingJobManager) StartJob(config ReprocessingJobConfig) (*Reproc
146155
if workerCount < 1 {
147156
workerCount = 1
148157
}
158+
m.Infof("[StartJob] Determined worker count: %d", workerCount)
149159

150160
// Create job record
151161
job := &ReprocessingJob{
@@ -156,10 +166,13 @@ func (m *ReprocessingJobManager) StartJob(config ReprocessingJobConfig) (*Reproc
156166
TotalFiles: len(fileItems),
157167
TotalWorkers: workerCount,
158168
}
169+
m.Infof("[StartJob] Created job record with ID %s", job.ID)
159170

160171
// Insert job into database
172+
m.Infof("[StartJob] Inserting job %s into database", job.ID)
161173
err = InsertReprocessingJob(m.dbpool, job)
162174
if err != nil {
175+
m.Errorf("[StartJob] Failed to insert job %s into database: %v", job.ID, err)
163176
return nil, fmt.Errorf("failed to insert job into database: %w", err)
164177
}
165178

@@ -169,37 +182,45 @@ func (m *ReprocessingJobManager) StartJob(config ReprocessingJobConfig) (*Reproc
169182
workerIndex := i % workerCount
170183
filesPerWorker[workerIndex]++
171184
}
185+
m.Infof("[StartJob] Distributed %d files across %d workers", len(fileItems), workerCount)
172186

173187
// Initialize worker records
188+
m.Infof("[StartJob] Initializing %d worker records for job %s", workerCount, job.ID)
174189
err = InitializeWorkers(m.dbpool, job.ID, workerCount, filesPerWorker)
175190
if err != nil {
191+
m.Errorf("[StartJob] Failed to initialize workers for job %s: %v", job.ID, err)
176192
return nil, fmt.Errorf("failed to initialize workers: %w", err)
177193
}
178194

179195
// Create K8s Indexed Job
196+
m.Infof("[StartJob] Creating K8s job for job %s", job.ID)
180197
k8sJobName, err := m.k8sClient.CreateReprocessingJob(ctx, job.ID, fileItems, config, workerCount)
181198
if err != nil {
199+
m.Errorf("[StartJob] Failed to create K8s job for job %s: %v", job.ID, err)
182200
_ = UpdateReprocessingJobStatus(m.dbpool, job.ID, JobStatusFailed, nil, nil, err.Error())
183201
return nil, fmt.Errorf("failed to create k8s job: %w", err)
184202
}
203+
m.Infof("[StartJob] Created K8s job %s for job %s", k8sJobName, job.ID)
185204

186205
job.K8sJobName = k8sJobName
187206
job.Status = JobStatusRunning
188207
now := time.Now()
189208
job.StartedAt = &now
190209

191210
// Update job with K8s job name and status
211+
m.Infof("[StartJob] Updating job %s with K8s job name %s", job.ID, k8sJobName)
192212
err = UpdateReprocessingJobK8sName(m.dbpool, job.ID, k8sJobName)
193213
if err != nil {
194-
m.Warnf("Failed to update k8s job name: %v", err)
214+
m.Warnf("[StartJob] Failed to update k8s job name for job %s: %v", job.ID, err)
195215
}
196216

217+
m.Infof("[StartJob] Updating job %s status to running", job.ID)
197218
err = UpdateReprocessingJobStatus(m.dbpool, job.ID, JobStatusRunning, &now, nil, "")
198219
if err != nil {
199-
m.Warnf("Failed to update job status: %v", err)
220+
m.Warnf("[StartJob] Failed to update job %s status: %v", job.ID, err)
200221
}
201222

202-
m.Infof("Created K8s reprocessing job %s with %d workers processing %d files", job.ID, workerCount, len(fileItems))
223+
m.Infof("[StartJob] Successfully created K8s reprocessing job %s with %d workers processing %d files", job.ID, workerCount, len(fileItems))
203224

204225
return job, nil
205226
}
@@ -240,48 +261,82 @@ func (m *ReprocessingJobManager) prepareFileList(ctx context.Context, config Rep
240261

241262
// GetJob returns a job by ID with enriched K8s status
242263
func (m *ReprocessingJobManager) GetJob(id string) (*ReprocessingJob, error) {
264+
m.Infof("[GetJob] Starting for job %s", id)
265+
243266
job, err := GetReprocessingJob(m.dbpool, id)
244267
if err != nil {
268+
m.Errorf("[GetJob] Failed to get job %s from database: %v", id, err)
245269
return nil, err
246270
}
271+
m.Infof("[GetJob] Retrieved job %s from database: status=%s, k8s_job_name=%s", id, job.Status, job.K8sJobName)
247272

248273
// Enrich with K8s job status if available
249274
if job.Status == JobStatusRunning && job.K8sJobName != "" && m.k8sClient != nil {
275+
m.Infof("[GetJob] Job %s is running, enriching with K8s status", id)
250276
if err := m.enrichJobWithK8sStatus(job); err != nil {
251277
// Log warning but don't fail the request
252-
m.Warnf("Failed to get K8s status for job %s: %v", id, err)
278+
m.Warnf("[GetJob] Failed to get K8s status for job %s: %v", id, err)
279+
} else {
280+
m.Infof("[GetJob] Successfully enriched job %s with K8s status, new status=%s", id, job.Status)
253281
}
282+
} else {
283+
m.Infof("[GetJob] Skipping K8s enrichment for job %s (status=%s, has_k8s_name=%v, has_k8s_client=%v)",
284+
id, job.Status, job.K8sJobName != "", m.k8sClient != nil)
254285
}
255286

287+
m.Infof("[GetJob] Completed for job %s", id)
256288
return job, nil
257289
}
258290

259291
// enrichJobWithK8sStatus adds Kubernetes job status information to the job
260292
func (m *ReprocessingJobManager) enrichJobWithK8sStatus(job *ReprocessingJob) error {
261-
ctx := context.Background()
293+
m.Infof("[enrichJobWithK8sStatus] Starting for job %s (k8s_job=%s)", job.ID, job.K8sJobName)
294+
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
295+
defer cancel()
296+
297+
err := m.enrichJobWithK8sStatusWithContext(ctx, job)
298+
if err != nil {
299+
m.Warnf("[enrichJobWithK8sStatus] Failed for job %s: %v", job.ID, err)
300+
} else {
301+
m.Infof("[enrichJobWithK8sStatus] Completed for job %s", job.ID)
302+
}
303+
return err
304+
}
305+
306+
// enrichJobWithK8sStatusWithContext adds Kubernetes job status information to the job with a custom context
307+
func (m *ReprocessingJobManager) enrichJobWithK8sStatusWithContext(ctx context.Context, job *ReprocessingJob) error {
308+
m.Infof("[enrichJobWithK8sStatusWithContext] Calling K8s API for job %s", job.K8sJobName)
309+
262310
k8sStatus, err := m.k8sClient.GetJobStatus(ctx, job.K8sJobName)
263311
if err != nil {
312+
m.Warnf("[enrichJobWithK8sStatusWithContext] K8s API call failed for job %s: %v", job.K8sJobName, err)
264313
// If K8s Job not found, check worker statuses to determine completion
265314
if strings.Contains(err.Error(), "job not found") {
266-
m.Infof("K8s Job %s not found (likely cleaned up), checking worker statuses", job.K8sJobName)
315+
m.Infof("[enrichJobWithK8sStatusWithContext] K8s Job %s not found (likely cleaned up), checking worker statuses", job.K8sJobName)
267316
return m.checkCompletionFromWorkers(job)
268317
}
269318
return err
270319
}
271320

321+
m.Infof("[enrichJobWithK8sStatusWithContext] K8s API returned for job %s: Active=%d, Succeeded=%d, Failed=%d, CompletionTime=%v",
322+
job.K8sJobName, k8sStatus.Active, k8sStatus.Succeeded, k8sStatus.Failed, k8sStatus.CompletionTime != nil)
323+
272324
// K8s Job has CompletionTime when all pods are done (succeeded or failed)
273325
k8sJobCompleted := k8sStatus.CompletionTime != nil || (k8sStatus.Failed == int32(job.TotalWorkers) && k8sStatus.Active == 0)
274326

275327
if !k8sJobCompleted {
328+
m.Infof("[enrichJobWithK8sStatusWithContext] K8s Job %s not yet completed, skipping status update", job.K8sJobName)
276329
return nil
277330
}
331+
m.Infof("[enrichJobWithK8sStatusWithContext] K8s Job %s is completed, updating status", job.K8sJobName)
278332
// Update job status based on K8s job status if our status is stale
279333
// Only update if job is in running state and K8s reports completion
280334
hasFailures := k8sStatus.Failed > 0 && k8sStatus.Active == 0
281335
allSucceeded := k8sStatus.Succeeded == int32(job.TotalWorkers) && k8sStatus.Active == 0
282336

283337
if hasFailures {
284338
// Job has failed pods and no active pods - mark as failed
339+
m.Infof("[enrichJobWithK8sStatusWithContext] Marking job %s as failed (failed=%d, active=%d)", job.ID, k8sStatus.Failed, k8sStatus.Active)
285340
now := time.Now()
286341
if k8sStatus.CompletionTime != nil {
287342
now = k8sStatus.CompletionTime.Time
@@ -290,12 +345,13 @@ func (m *ReprocessingJobManager) enrichJobWithK8sStatus(job *ReprocessingJob) er
290345
job.Status = JobStatusFailed
291346
err := UpdateReprocessingJobStatus(m.dbpool, job.ID, JobStatusFailed, nil, &now, "K8s job has failed pods")
292347
if err != nil {
293-
m.Warnf("Failed to update job status to failed: %v", err)
348+
m.Errorf("[enrichJobWithK8sStatusWithContext] Failed to update job %s status to failed: %v", job.ID, err)
294349
} else {
295-
m.Infof("Job %s marked as failed based on K8s status (failed=%d, active=%d)", job.ID, k8sStatus.Failed, k8sStatus.Active)
350+
m.Infof("[enrichJobWithK8sStatusWithContext] Job %s marked as failed based on K8s status (failed=%d, active=%d)", job.ID, k8sStatus.Failed, k8sStatus.Active)
296351
}
297352
} else if allSucceeded || k8sJobCompleted {
298353
// All workers succeeded OR K8s job is complete - mark as completed
354+
m.Infof("[enrichJobWithK8sStatusWithContext] Marking job %s as completed (succeeded=%d, active=%d)", job.ID, k8sStatus.Succeeded, k8sStatus.Active)
299355
now := time.Now()
300356
if k8sStatus.CompletionTime != nil {
301357
now = k8sStatus.CompletionTime.Time
@@ -304,9 +360,9 @@ func (m *ReprocessingJobManager) enrichJobWithK8sStatus(job *ReprocessingJob) er
304360
job.Status = JobStatusCompleted
305361
err := UpdateReprocessingJobStatus(m.dbpool, job.ID, JobStatusCompleted, nil, &now, "")
306362
if err != nil {
307-
m.Warnf("Failed to update job status to completed: %v", err)
363+
m.Errorf("[enrichJobWithK8sStatusWithContext] Failed to update job %s status to completed: %v", job.ID, err)
308364
} else {
309-
m.Infof("Job %s marked as completed based on K8s status (succeeded=%d, active=%d, completion_time=%v)",
365+
m.Infof("[enrichJobWithK8sStatusWithContext] Job %s marked as completed based on K8s status (succeeded=%d, active=%d, completion_time=%v)",
310366
job.ID, k8sStatus.Succeeded, k8sStatus.Active, k8sStatus.CompletionTime != nil)
311367
}
312368
}
@@ -316,13 +372,18 @@ func (m *ReprocessingJobManager) enrichJobWithK8sStatus(job *ReprocessingJob) er
316372

317373
// checkCompletionFromWorkers checks worker statuses to determine if job is complete
318374
func (m *ReprocessingJobManager) checkCompletionFromWorkers(job *ReprocessingJob) error {
375+
m.Infof("[checkCompletionFromWorkers] Starting for job %s", job.ID)
376+
319377
workers, err := GetAllWorkerStatuses(m.dbpool, job.ID)
320378
if err != nil {
379+
m.Errorf("[checkCompletionFromWorkers] Failed to get worker statuses for job %s: %v", job.ID, err)
321380
return fmt.Errorf("failed to get worker statuses: %w", err)
322381
}
382+
m.Infof("[checkCompletionFromWorkers] Retrieved %d worker statuses for job %s", len(workers), job.ID)
323383

324384
if len(workers) == 0 {
325385
// No workers found - cannot determine completion
386+
m.Warnf("[checkCompletionFromWorkers] No workers found for job %s, cannot determine completion", job.ID)
326387
return nil
327388
}
328389

@@ -341,30 +402,36 @@ func (m *ReprocessingJobManager) checkCompletionFromWorkers(job *ReprocessingJob
341402
}
342403
}
343404

344-
m.Infof("Worker status for job %s: completed=%d, failed=%d, total=%d", job.ID, completedCount, failedCount, len(workers))
405+
m.Infof("[checkCompletionFromWorkers] Worker status for job %s: completed=%d, failed=%d, total=%d", job.ID, completedCount, failedCount, len(workers))
345406

346407
// If all workers are completed or failed, mark the job accordingly
347408
if completedCount+failedCount == job.TotalWorkers {
409+
m.Infof("[checkCompletionFromWorkers] All workers done for job %s, updating job status", job.ID)
348410
now := time.Now()
349411
job.CompletedAt = &now
350412

351413
if failedCount > 0 {
414+
m.Infof("[checkCompletionFromWorkers] Marking job %s as failed (failed=%d)", job.ID, failedCount)
352415
job.Status = JobStatusFailed
353416
err := UpdateReprocessingJobStatus(m.dbpool, job.ID, JobStatusFailed, nil, &now, fmt.Sprintf("%d workers failed", failedCount))
354417
if err != nil {
355-
m.Warnf("Failed to update job status to failed: %v", err)
418+
m.Errorf("[checkCompletionFromWorkers] Failed to update job %s status to failed: %v", job.ID, err)
356419
} else {
357-
m.Infof("Job %s marked as failed based on worker statuses (failed=%d)", job.ID, failedCount)
420+
m.Infof("[checkCompletionFromWorkers] Job %s marked as failed based on worker statuses (failed=%d)", job.ID, failedCount)
358421
}
359422
} else {
423+
m.Infof("[checkCompletionFromWorkers] Marking job %s as completed (completed=%d)", job.ID, completedCount)
360424
job.Status = JobStatusCompleted
361425
err := UpdateReprocessingJobStatus(m.dbpool, job.ID, JobStatusCompleted, nil, &now, "")
362426
if err != nil {
363-
m.Warnf("Failed to update job status to completed: %v", err)
427+
m.Errorf("[checkCompletionFromWorkers] Failed to update job %s status to completed: %v", job.ID, err)
364428
} else {
365-
m.Infof("Job %s marked as completed based on worker statuses (completed=%d)", job.ID, completedCount)
429+
m.Infof("[checkCompletionFromWorkers] Job %s marked as completed based on worker statuses (completed=%d)", job.ID, completedCount)
366430
}
367431
}
432+
} else {
433+
m.Infof("[checkCompletionFromWorkers] Not all workers done for job %s yet (completed+failed=%d, total=%d)",
434+
job.ID, completedCount+failedCount, job.TotalWorkers)
368435
}
369436

370437
return nil
@@ -377,11 +444,14 @@ func (m *ReprocessingJobManager) GetJobWorkers(id string) ([]map[string]interfac
377444

378445
// ListJobs returns all jobs with enriched K8s status
379446
func (m *ReprocessingJobManager) ListJobs() []*ReprocessingJob {
447+
m.Infof("[ListJobs] Starting")
448+
380449
jobs, err := ListReprocessingJobs(m.dbpool)
381450
if err != nil {
382-
m.Errorf("Failed to list jobs from database: %v", err)
451+
m.Errorf("[ListJobs] Failed to list jobs from database: %v", err)
383452
return []*ReprocessingJob{}
384453
}
454+
m.Infof("[ListJobs] Retrieved %d jobs from database", len(jobs))
385455

386456
// Enrich running jobs with K8s status to detect completion
387457
if m.k8sClient != nil {
@@ -395,38 +465,49 @@ func (m *ReprocessingJobManager) ListJobs() []*ReprocessingJob {
395465
}
396466
}
397467

468+
m.Infof("[ListJobs] Completed, returning %d jobs", len(jobs))
398469
return jobs
399470
}
400471

401472
// CancelJob cancels a job
402473
func (m *ReprocessingJobManager) CancelJob(id string) error {
474+
m.Infof("[CancelJob] Starting for job %s", id)
475+
403476
job, err := m.GetJob(id)
404477
if err != nil {
478+
m.Errorf("[CancelJob] Failed to get job %s: %v", id, err)
405479
return err
406480
}
407481

408482
if job.Status == JobStatusCompleted || job.Status == JobStatusCancelled {
483+
m.Warnf("[CancelJob] Job %s is already finished (status=%s)", id, job.Status)
409484
return fmt.Errorf("job %s is already finished", id)
410485
}
411486

412487
// Delete the K8s job
413488
ctx := context.Background()
414489
if job.K8sJobName == "" {
415-
m.Warnf("Job %s has no K8s job name, cannot delete K8s resources", id)
490+
m.Warnf("[CancelJob] Job %s has no K8s job name, cannot delete K8s resources", id)
416491
} else {
492+
m.Infof("[CancelJob] Deleting K8s job %s for job %s", job.K8sJobName, id)
417493
err = m.k8sClient.DeleteJob(ctx, job.K8sJobName, job.ID)
418494
if err != nil {
419-
m.Warnf("Failed to delete K8s job: %v", err)
495+
m.Warnf("[CancelJob] Failed to delete K8s job %s: %v", job.K8sJobName, err)
496+
} else {
497+
m.Infof("[CancelJob] Successfully deleted K8s job %s", job.K8sJobName)
420498
}
421499
}
422500

423501
// Update status in database
502+
m.Infof("[CancelJob] Updating job %s status to cancelled", id)
424503
now := time.Now()
425504
err = UpdateReprocessingJobStatus(m.dbpool, id, JobStatusCancelled, nil, &now, "")
426505
if err != nil {
506+
m.Errorf("[CancelJob] Failed to update job %s status: %v", id, err)
427507
return fmt.Errorf("failed to update job status: %w", err)
428508
}
429509

510+
m.Infof("[CancelJob] Successfully cancelled job %s", id)
430511
return nil
431512
}
432513

0 commit comments

Comments
 (0)