Skip to content

Fix/sw calculation demand#426

Open
priyanshu-egov wants to merge 4 commits into
egovernments:masterfrom
priyanshu-egov:fix/sw-calculation-demand
Open

Fix/sw calculation demand#426
priyanshu-egov wants to merge 4 commits into
egovernments:masterfrom
priyanshu-egov:fix/sw-calculation-demand

Conversation

@priyanshu-egov
Copy link
Copy Markdown
Collaborator

@priyanshu-egov priyanshu-egov commented Jan 21, 2026

Summary by CodeRabbit

  • New Features

    • Added a configurable Kafka demand batch size for demand processing.
  • Bug Fixes

    • Improved null-safety and validation in boundary and owner enrichment; incomplete or missing data is skipped with warnings instead of failing.
    • Per-chunk error handling with dead-letter routing to prevent whole-request failures.
  • Performance

    • Demand processing now uses chunked batching for improved memory use and throughput.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Jan 21, 2026

Walkthrough

Added configurable Kafka batch-size to sewerage calculation and refactored demand publishing to send calculation requests in chunked batches with per-chunk error handling and dead-letter routing. Trade license services received null-safety guards and non-fatal logging for owner and boundary enrichment.

Changes

Cohort / File(s) Summary
Sewerage Configuration
sewerage-connection/sw-calculator/src/main/java/org/egov/swcalculation/config/SWCalculationConfiguration.java
Added private Integer kafkaDemandBatchSize injected via @Value("${kafka.demand.batch.size:100}").
Sewerage Demand Batching
sewerage-connection/sw-calculator/src/main/java/org/egov/swcalculation/service/DemandService.java
Replaced single bulk publish with chunked processing using configured batch size; builds per-chunk MigrationCount and CalculationReq, publishes each chunk, waits for acks, logs per-chunk results, routes failures to dead-letter topic, clears list after processing.
Trade License Boundary Enrichment
tradelicense/tl-services/src/main/java/org/egov/tl/service/BoundaryService.java
Added null/empty checks when collecting locality codes; early return if none; guards JSON-path lookup and boundary object access to prevent NPEs; validates location service response.
Trade License Enrichment Logging
tradelicense/tl-services/src/main/java/org/egov/tl/service/EnrichmentService.java
Added @Slf4j; enrichOwner now guards null license/details/owners, skips owners with null UUIDs with warnings, and logs missing user details instead of throwing.

Sequence Diagram(s)

sequenceDiagram
    participant DS as DemandService
    participant K as Kafka
    participant DL as DeadLetter
    participant L as Logger

    rect rgba(173, 216, 230, 0.5)
    Note over DS,L: Chunked publish loop
    end

    loop for each chunk
        DS->>DS: Build MigrationCount (offset, limit, recordCount)
        DS->>DS: Build CalculationReq with chunk
        DS->>K: Publish CalculationReq
        alt Publish success
            K-->>DS: Ack
            DS->>L: Log chunk success
        else Publish failure
            K-->>DS: Error
            DS->>DL: Publish failed chunk + context
            DS->>L: Log failure details
            alt Dead-letter publish fails
                DL-->>DS: Error
                DS->>L: Log critical DL failure
            end
        end
    end

    DS->>DS: Clear calculationCriteriaList
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~22 minutes

Poem

🐇 A little batch, a careful hop,
Kafka parcels never stop.
Null guards stitch each fragile seam,
Chunks and logs—a steady team.
Hooray for safer flow and dream!

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 inconclusive)
Check name Status Explanation Resolution
Title check ❓ Inconclusive The title 'Fix/sw calculation demand' is vague and generic, using a prefix pattern without clearly conveying what specific issue is being fixed or what the main change accomplishes. Provide a more descriptive title that clearly explains the specific fix, such as 'Implement batched Kafka publishing for demand calculations' or 'Fix NPE issues in sewerage and trade license services'.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Fix all issues with AI agents
In
`@sewerage-connection/sw-calculator/src/main/java/org/egov/swcalculation/service/DemandService.java`:
- Around line 807-813: The loop that chunks calculationCriteriaList uses
kafkaBatchSize = configs.getKafkaDemandBatchSize() without validating it, which
can cause divide-by-zero or infinite loop when kafkaBatchSize is zero or
negative; add a guard in DemandService before the for-loop (validate
kafkaBatchSize from configs.getKafkaDemandBatchSize()), and either throw a clear
IllegalArgumentException when kafkaBatchSize <= 0 or clamp it to a safe minimum
(e.g., 1), ensuring subsequent logic that iterates chunkOffset and computes
chunkEnd behaves correctly.
- Around line 836-852: In DemandService, the kafkaTemplate.send calls (for
configs.getCreateDemand and configs.getDeadLetterTopicBatch) are async and won’t
surface broker/ack errors to the surrounding try/catch; change the sends to
handle the returned CompletableFuture (from kafkaTemplate.send) either by
calling .get(timeout, TimeUnit.SECONDS) to block and catch failures or by
attaching a completion callback that on failure logs the full error and then
prepares and sends migrationCount to the dead-letter topic (ensuring that DLQ
send itself is also awaited/handled the same way); update the error handling
around kafkaTemplate.send, migrationCount.setMessage and setAuditTime so they
only run when a send actually fails and ensure both main send and DLQ send are
handled consistently.

In
`@tradelicense/tl-services/src/main/java/org/egov/tl/service/BoundaryService.java`:
- Around line 65-68: The early-return makes the later null/empty guard
redundant; in BoundaryService remove the redundant
if(!CollectionUtils.isEmpty(localities)) check (and its corresponding
else/closing) around the location enrichment logic so the code path that follows
the initial guard executes directly; locate the block that references localities
and calls the location service (the method in BoundaryService handling locality
enrichment) and un-nest its contents so there is no duplicate
CollectionUtils.isEmpty check.

In
`@tradelicense/tl-services/src/main/java/org/egov/tl/service/EnrichmentService.java`:
- Around line 321-330: Refactor the owners loop in EnrichmentService to avoid
double map lookup and handle null owner UUIDs: inside the
license.getTradeLicenseDetail().getOwners().forEach(owner -> { ... }) block
first capture owner.getUuid() into a variable (e.g., ownerUuid), check if
ownerUuid is null and log a clear warning for missing UUID, then retrieve
userDetail once from userIdToOwnerMap using that ownerUuid into a variable
(e.g., userDetail); if userDetail is null log the existing "not found in user
search" warning with the owner id, otherwise call
owner.addUserDetail(userDetail). Ensure you reference userIdToOwnerMap,
owner.getUuid(), owner.addUserDetail(...) and log.warn(...) in your changes.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In
`@sewerage-connection/sw-calculator/src/main/java/org/egov/swcalculation/service/DemandService.java`:
- Around line 840-865: In DemandService.java around the
kafkaTemplate.send(...).get(...) calls (the main send to
configs.getCreateDemand() and the DLQ send to
configs.getDeadLetterTopicBatch()), add an explicit catch for
InterruptedException before the existing generic catch, call
Thread.currentThread().interrupt() to restore the interrupt flag, and then abort
the send flow (e.g., return or rethrow) so the method stops further processing;
keep the existing handling for other Exceptions and still set
migrationCount.message/auditTime and attempt the DLQ only when appropriate.

Comment on lines +840 to +865
try {
kafkaTemplate.send(configs.getCreateDemand(), calculationReq)
.get(30, java.util.concurrent.TimeUnit.SECONDS);
log.info("Bulk bill Gen chunk info : " + migrationCount + " (chunk " + (chunkOffset/kafkaBatchSize + 1) + " of " + ((totalRecords + kafkaBatchSize - 1) / kafkaBatchSize) + ")");
} catch (Exception ex) {
// Log the failure with full context
log.error("Failed to push chunk to Kafka - offset: " + migrationCount.getOffset()
+ ", recordCount: " + migrationCount.getRecordCount()
+ ", chunk: " + (chunkOffset/kafkaBatchSize + 1)
+ ", error: " + ex.getMessage(), ex);

// Send to audit topic for tracking and manual retry
migrationCount.setMessage("Failed to push to Kafka: " + ex.getMessage());
migrationCount.setAuditTime(System.currentTimeMillis());
try {
kafkaTemplate.send(configs.getDeadLetterTopicBatch(), migrationCount)
.get(30, java.util.concurrent.TimeUnit.SECONDS);
log.info("Failed chunk sent to dead letter topic: offset=" + migrationCount.getOffset());
} catch (Exception auditEx) {
// Critical: Both Kafka push and audit failed
log.error("CRITICAL: Failed to send chunk to both main topic AND dead letter topic! "
+ "offset: " + migrationCount.getOffset()
+ ", recordCount: " + migrationCount.getRecordCount()
+ ", connections may be lost. Manual intervention required.", auditEx);
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

cd sewerage-connection/sw-calculator && \
find . -name "DemandService.java" -type f

Repository: egovernments/URBAN

Length of output: 127


🏁 Script executed:

cd sewerage-connection/sw-calculator && \
wc -l src/main/java/org/egov/swcalculation/service/DemandService.java

Repository: egovernments/URBAN

Length of output: 130


🏁 Script executed:

cd sewerage-connection/sw-calculator && \
sed -n '830,870p' src/main/java/org/egov/swcalculation/service/DemandService.java

Repository: egovernments/URBAN

Length of output: 2938


🏁 Script executed:

cd sewerage-connection/sw-calculator && \
grep -n "kafkaTemplate.send" src/main/java/org/egov/swcalculation/service/DemandService.java | head -20

Repository: egovernments/URBAN

Length of output: 269


🌐 Web query:

CompletableFuture.get() throws InterruptedException behavior Java

💡 Result:

Short answer

  • CompletableFuture.get() will throw InterruptedException if the calling thread is interrupted while waiting for the future to complete. [1][2]
  • When InterruptedException is thrown the thread’s interrupted status is cleared by convention. [3]
  • Note: CompletableFuture.cancel(boolean) does NOT interrupt the running task — it completes the future exceptionally with a CancellationException but does not necessarily stop or interrupt the underlying worker. To actually interrupt a thread you must call Thread.interrupt() (or use a Future implementation that interrupts its worker). [2][4]

Sources
[1] CompletableFuture.get javadoc (Java SE).
[2] CompletableFuture javadoc / cancel behavior (Java SE).
[3] Oracle Java tutorial: interrupt status and InterruptedException.
[4] Discussion / blog showing CompletableFuture.cancel doesn't interrupt underlying thread.


🌐 Web query:

catching InterruptedException as Exception clears interrupt flag Java thread

💡 Result:

Short answer: Yes. When a blocking method throws InterruptedException it clears the thread's interrupt status; catching that exception (even via a catch(Exception e)) does not restore the flag — you must re-set it yourself with Thread.currentThread().interrupt() if you need to preserve the interrupt. [1][2]

Example:
try {
Thread.sleep(10_000);
} catch (Exception e) { // catches InterruptedException too
Thread.currentThread().interrupt(); // restore interrupt status
// handle or rethrow
}

References:
[1] Oracle Java tutorial — The Interrupt Status (InterruptedException clears the flag).
[2] java.lang.InterruptedException Javadoc (shows Thread.interrupted() clears status and docs on InterruptedException).


Catch and re-interrupt InterruptedException explicitly in Kafka send operations.

CompletableFuture.get(...) throws InterruptedException when the calling thread is interrupted; when this is caught by the generic catch (Exception ex), the interrupt status is cleared and never restored, preventing graceful shutdown. Catch InterruptedException explicitly before the generic catch, call Thread.currentThread().interrupt() to restore the flag, and abort or rethrow.

This applies to both the main Kafka send (line 841) and the DLQ send (line 855).

🔧 Proposed fix
                            try {
                                kafkaTemplate.send(configs.getCreateDemand(), calculationReq)
                                        .get(30, java.util.concurrent.TimeUnit.SECONDS);
                                log.info("Bulk bill Gen chunk info : " + migrationCount + " (chunk " + (chunkOffset/kafkaBatchSize + 1) + " of " + ((totalRecords + kafkaBatchSize - 1) / kafkaBatchSize) + ")");
+                            } catch (InterruptedException ie) {
+                                Thread.currentThread().interrupt();
+                                throw new CustomException("EG_SW_KAFKA_INTERRUPTED",
+                                        "Kafka send interrupted for chunk offset " + migrationCount.getOffset());
                            } catch (Exception ex) {
                                // Log the failure with full context
                                log.error("Failed to push chunk to Kafka - offset: " + migrationCount.getOffset()
                                        + ", recordCount: " + migrationCount.getRecordCount()
                                        + ", chunk: " + (chunkOffset/kafkaBatchSize + 1)
                                        + ", error: " + ex.getMessage(), ex);

                                // Send to audit topic for tracking and manual retry
                                migrationCount.setMessage("Failed to push to Kafka: " + ex.getMessage());
                                migrationCount.setAuditTime(System.currentTimeMillis());
                                try {
                                    kafkaTemplate.send(configs.getDeadLetterTopicBatch(), migrationCount)
                                            .get(30, java.util.concurrent.TimeUnit.SECONDS);
                                    log.info("Failed chunk sent to dead letter topic: offset=" + migrationCount.getOffset());
+                                } catch (InterruptedException ie) {
+                                    Thread.currentThread().interrupt();
+                                    log.error("Interrupted while sending chunk to dead letter topic. Aborting.", ie);
+                                    throw new CustomException("EG_SW_KAFKA_DLQ_INTERRUPTED",
+                                            "DLQ send interrupted for chunk offset " + migrationCount.getOffset());
                                } catch (Exception auditEx) {
                                    // Critical: Both Kafka push and audit failed
                                    log.error("CRITICAL: Failed to send chunk to both main topic AND dead letter topic! "
                                            + "offset: " + migrationCount.getOffset()
                                            + ", recordCount: " + migrationCount.getRecordCount()
                                            + ", connections may be lost. Manual intervention required.", auditEx);
                                }
                            }
🤖 Prompt for AI Agents
In
`@sewerage-connection/sw-calculator/src/main/java/org/egov/swcalculation/service/DemandService.java`
around lines 840 - 865, In DemandService.java around the
kafkaTemplate.send(...).get(...) calls (the main send to
configs.getCreateDemand() and the DLQ send to
configs.getDeadLetterTopicBatch()), add an explicit catch for
InterruptedException before the existing generic catch, call
Thread.currentThread().interrupt() to restore the interrupt flag, and then abort
the send flow (e.g., return or rethrow) so the method stops further processing;
keep the existing handling for other Exceptions and still set
migrationCount.message/auditTime and attempt the DLQ only when appropriate.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant