Fix/sw calculation demand#426
Conversation
WalkthroughAdded configurable Kafka batch-size to sewerage calculation and refactored demand publishing to send calculation requests in chunked batches with per-chunk error handling and dead-letter routing. Trade license services received null-safety guards and non-fatal logging for owner and boundary enrichment. Changes
Sequence Diagram(s)sequenceDiagram
participant DS as DemandService
participant K as Kafka
participant DL as DeadLetter
participant L as Logger
rect rgba(173, 216, 230, 0.5)
Note over DS,L: Chunked publish loop
end
loop for each chunk
DS->>DS: Build MigrationCount (offset, limit, recordCount)
DS->>DS: Build CalculationReq with chunk
DS->>K: Publish CalculationReq
alt Publish success
K-->>DS: Ack
DS->>L: Log chunk success
else Publish failure
K-->>DS: Error
DS->>DL: Publish failed chunk + context
DS->>L: Log failure details
alt Dead-letter publish fails
DL-->>DS: Error
DS->>L: Log critical DL failure
end
end
end
DS->>DS: Clear calculationCriteriaList
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~22 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In
`@sewerage-connection/sw-calculator/src/main/java/org/egov/swcalculation/service/DemandService.java`:
- Around line 807-813: The loop that chunks calculationCriteriaList uses
kafkaBatchSize = configs.getKafkaDemandBatchSize() without validating it, which
can cause divide-by-zero or infinite loop when kafkaBatchSize is zero or
negative; add a guard in DemandService before the for-loop (validate
kafkaBatchSize from configs.getKafkaDemandBatchSize()), and either throw a clear
IllegalArgumentException when kafkaBatchSize <= 0 or clamp it to a safe minimum
(e.g., 1), ensuring subsequent logic that iterates chunkOffset and computes
chunkEnd behaves correctly.
- Around line 836-852: In DemandService, the kafkaTemplate.send calls (for
configs.getCreateDemand and configs.getDeadLetterTopicBatch) are async and won’t
surface broker/ack errors to the surrounding try/catch; change the sends to
handle the returned CompletableFuture (from kafkaTemplate.send) either by
calling .get(timeout, TimeUnit.SECONDS) to block and catch failures or by
attaching a completion callback that on failure logs the full error and then
prepares and sends migrationCount to the dead-letter topic (ensuring that DLQ
send itself is also awaited/handled the same way); update the error handling
around kafkaTemplate.send, migrationCount.setMessage and setAuditTime so they
only run when a send actually fails and ensure both main send and DLQ send are
handled consistently.
In
`@tradelicense/tl-services/src/main/java/org/egov/tl/service/BoundaryService.java`:
- Around line 65-68: The early-return makes the later null/empty guard
redundant; in BoundaryService remove the redundant
if(!CollectionUtils.isEmpty(localities)) check (and its corresponding
else/closing) around the location enrichment logic so the code path that follows
the initial guard executes directly; locate the block that references localities
and calls the location service (the method in BoundaryService handling locality
enrichment) and un-nest its contents so there is no duplicate
CollectionUtils.isEmpty check.
In
`@tradelicense/tl-services/src/main/java/org/egov/tl/service/EnrichmentService.java`:
- Around line 321-330: Refactor the owners loop in EnrichmentService to avoid
double map lookup and handle null owner UUIDs: inside the
license.getTradeLicenseDetail().getOwners().forEach(owner -> { ... }) block
first capture owner.getUuid() into a variable (e.g., ownerUuid), check if
ownerUuid is null and log a clear warning for missing UUID, then retrieve
userDetail once from userIdToOwnerMap using that ownerUuid into a variable
(e.g., userDetail); if userDetail is null log the existing "not found in user
search" warning with the owner id, otherwise call
owner.addUserDetail(userDetail). Ensure you reference userIdToOwnerMap,
owner.getUuid(), owner.addUserDetail(...) and log.warn(...) in your changes.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In
`@sewerage-connection/sw-calculator/src/main/java/org/egov/swcalculation/service/DemandService.java`:
- Around line 840-865: In DemandService.java around the
kafkaTemplate.send(...).get(...) calls (the main send to
configs.getCreateDemand() and the DLQ send to
configs.getDeadLetterTopicBatch()), add an explicit catch for
InterruptedException before the existing generic catch, call
Thread.currentThread().interrupt() to restore the interrupt flag, and then abort
the send flow (e.g., return or rethrow) so the method stops further processing;
keep the existing handling for other Exceptions and still set
migrationCount.message/auditTime and attempt the DLQ only when appropriate.
| try { | ||
| kafkaTemplate.send(configs.getCreateDemand(), calculationReq) | ||
| .get(30, java.util.concurrent.TimeUnit.SECONDS); | ||
| log.info("Bulk bill Gen chunk info : " + migrationCount + " (chunk " + (chunkOffset/kafkaBatchSize + 1) + " of " + ((totalRecords + kafkaBatchSize - 1) / kafkaBatchSize) + ")"); | ||
| } catch (Exception ex) { | ||
| // Log the failure with full context | ||
| log.error("Failed to push chunk to Kafka - offset: " + migrationCount.getOffset() | ||
| + ", recordCount: " + migrationCount.getRecordCount() | ||
| + ", chunk: " + (chunkOffset/kafkaBatchSize + 1) | ||
| + ", error: " + ex.getMessage(), ex); | ||
|
|
||
| // Send to audit topic for tracking and manual retry | ||
| migrationCount.setMessage("Failed to push to Kafka: " + ex.getMessage()); | ||
| migrationCount.setAuditTime(System.currentTimeMillis()); | ||
| try { | ||
| kafkaTemplate.send(configs.getDeadLetterTopicBatch(), migrationCount) | ||
| .get(30, java.util.concurrent.TimeUnit.SECONDS); | ||
| log.info("Failed chunk sent to dead letter topic: offset=" + migrationCount.getOffset()); | ||
| } catch (Exception auditEx) { | ||
| // Critical: Both Kafka push and audit failed | ||
| log.error("CRITICAL: Failed to send chunk to both main topic AND dead letter topic! " | ||
| + "offset: " + migrationCount.getOffset() | ||
| + ", recordCount: " + migrationCount.getRecordCount() | ||
| + ", connections may be lost. Manual intervention required.", auditEx); | ||
| } | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cd sewerage-connection/sw-calculator && \
find . -name "DemandService.java" -type fRepository: egovernments/URBAN
Length of output: 127
🏁 Script executed:
cd sewerage-connection/sw-calculator && \
wc -l src/main/java/org/egov/swcalculation/service/DemandService.javaRepository: egovernments/URBAN
Length of output: 130
🏁 Script executed:
cd sewerage-connection/sw-calculator && \
sed -n '830,870p' src/main/java/org/egov/swcalculation/service/DemandService.javaRepository: egovernments/URBAN
Length of output: 2938
🏁 Script executed:
cd sewerage-connection/sw-calculator && \
grep -n "kafkaTemplate.send" src/main/java/org/egov/swcalculation/service/DemandService.java | head -20Repository: egovernments/URBAN
Length of output: 269
🌐 Web query:
CompletableFuture.get() throws InterruptedException behavior Java
💡 Result:
Short answer
- CompletableFuture.get() will throw InterruptedException if the calling thread is interrupted while waiting for the future to complete. [1][2]
- When InterruptedException is thrown the thread’s interrupted status is cleared by convention. [3]
- Note: CompletableFuture.cancel(boolean) does NOT interrupt the running task — it completes the future exceptionally with a CancellationException but does not necessarily stop or interrupt the underlying worker. To actually interrupt a thread you must call Thread.interrupt() (or use a Future implementation that interrupts its worker). [2][4]
Sources
[1] CompletableFuture.get javadoc (Java SE).
[2] CompletableFuture javadoc / cancel behavior (Java SE).
[3] Oracle Java tutorial: interrupt status and InterruptedException.
[4] Discussion / blog showing CompletableFuture.cancel doesn't interrupt underlying thread.
🌐 Web query:
catching InterruptedException as Exception clears interrupt flag Java thread
💡 Result:
Short answer: Yes. When a blocking method throws InterruptedException it clears the thread's interrupt status; catching that exception (even via a catch(Exception e)) does not restore the flag — you must re-set it yourself with Thread.currentThread().interrupt() if you need to preserve the interrupt. [1][2]
Example:
try {
Thread.sleep(10_000);
} catch (Exception e) { // catches InterruptedException too
Thread.currentThread().interrupt(); // restore interrupt status
// handle or rethrow
}
References:
[1] Oracle Java tutorial — The Interrupt Status (InterruptedException clears the flag).
[2] java.lang.InterruptedException Javadoc (shows Thread.interrupted() clears status and docs on InterruptedException).
Catch and re-interrupt InterruptedException explicitly in Kafka send operations.
CompletableFuture.get(...) throws InterruptedException when the calling thread is interrupted; when this is caught by the generic catch (Exception ex), the interrupt status is cleared and never restored, preventing graceful shutdown. Catch InterruptedException explicitly before the generic catch, call Thread.currentThread().interrupt() to restore the flag, and abort or rethrow.
This applies to both the main Kafka send (line 841) and the DLQ send (line 855).
🔧 Proposed fix
try {
kafkaTemplate.send(configs.getCreateDemand(), calculationReq)
.get(30, java.util.concurrent.TimeUnit.SECONDS);
log.info("Bulk bill Gen chunk info : " + migrationCount + " (chunk " + (chunkOffset/kafkaBatchSize + 1) + " of " + ((totalRecords + kafkaBatchSize - 1) / kafkaBatchSize) + ")");
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new CustomException("EG_SW_KAFKA_INTERRUPTED",
+ "Kafka send interrupted for chunk offset " + migrationCount.getOffset());
} catch (Exception ex) {
// Log the failure with full context
log.error("Failed to push chunk to Kafka - offset: " + migrationCount.getOffset()
+ ", recordCount: " + migrationCount.getRecordCount()
+ ", chunk: " + (chunkOffset/kafkaBatchSize + 1)
+ ", error: " + ex.getMessage(), ex);
// Send to audit topic for tracking and manual retry
migrationCount.setMessage("Failed to push to Kafka: " + ex.getMessage());
migrationCount.setAuditTime(System.currentTimeMillis());
try {
kafkaTemplate.send(configs.getDeadLetterTopicBatch(), migrationCount)
.get(30, java.util.concurrent.TimeUnit.SECONDS);
log.info("Failed chunk sent to dead letter topic: offset=" + migrationCount.getOffset());
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ log.error("Interrupted while sending chunk to dead letter topic. Aborting.", ie);
+ throw new CustomException("EG_SW_KAFKA_DLQ_INTERRUPTED",
+ "DLQ send interrupted for chunk offset " + migrationCount.getOffset());
} catch (Exception auditEx) {
// Critical: Both Kafka push and audit failed
log.error("CRITICAL: Failed to send chunk to both main topic AND dead letter topic! "
+ "offset: " + migrationCount.getOffset()
+ ", recordCount: " + migrationCount.getRecordCount()
+ ", connections may be lost. Manual intervention required.", auditEx);
}
}🤖 Prompt for AI Agents
In
`@sewerage-connection/sw-calculator/src/main/java/org/egov/swcalculation/service/DemandService.java`
around lines 840 - 865, In DemandService.java around the
kafkaTemplate.send(...).get(...) calls (the main send to
configs.getCreateDemand() and the DLQ send to
configs.getDeadLetterTopicBatch()), add an explicit catch for
InterruptedException before the existing generic catch, call
Thread.currentThread().interrupt() to restore the interrupt flag, and then abort
the send flow (e.g., return or rethrow) so the method stops further processing;
keep the existing handling for other Exceptions and still set
migrationCount.message/auditTime and attempt the DLQ only when appropriate.
Summary by CodeRabbit
New Features
Bug Fixes
Performance
✏️ Tip: You can customize this high-level summary in your review settings.