Skip to content

Commit 556e449

Browse files
feat: Add OpenTelemetry metrics instrumentation with Prometheus scrape endpoint
* setup open telemetry metrics for transactions and balances * instrument metrics on transactions, balance, queue and worker * setup test for verifying metrics export * setup deployment config for prometheus * add auth config for metrics * enable backward compatibility for OTLP endpoint for tracing * add documentation for supported metrics * fix double call on transaction queue name for tracking queue_enqueued_total * setup metrics using init function and update contributing.md to use act for testing ci --------- Co-authored-by: jerry enebeli <jerryenebeli@gmail.com>
1 parent a4a14eb commit 556e449

26 files changed

Lines changed: 1061 additions & 87 deletions

CONTRIBUTING.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,22 @@ When your change affects schema or persistence behavior:
8787
- Follow the existing migration naming pattern.
8888
- Include tests that validate the new behavior.
8989

90+
## Running CI Locally with `act`
91+
92+
You can run the GitHub Actions CI pipeline locally using [`act`](https://github.com/nektos/act) before pushing:
93+
94+
1. Install `act`:
95+
96+
```bash
97+
brew install act # macOS
98+
```
99+
100+
2. Make sure Docker is running, then execute the test workflow:
101+
102+
```bash
103+
act -W .github/workflows/go.yml
104+
```
105+
90106
## Pull Request Checklist
91107

92108
Before opening a PR, ensure:

api/api.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,13 @@ func NewAPI(b *blnk.Blnk) *Api {
160160
auth := middleware.NewAuthMiddleware(b)
161161
r.Use(middleware.RateLimitMiddleware(conf))
162162
r.Use(middleware.SecurityHeaders())
163-
r.Use(otelgin.Middleware("BLNK"))
163+
r.Use(otelgin.Middleware("BLNK",
164+
otelgin.WithFilter(func(r *http.Request) bool {
165+
// Exclude high-frequency operational endpoints from tracing
166+
// to avoid polluting the trace feed with noise.
167+
return r.URL.Path != "/metrics"
168+
}),
169+
))
164170

165171
r.GET("/", func(c *gin.Context) {
166172
c.JSON(200, "server running...")

api/middleware/auth.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,12 @@ func (m *AuthMiddleware) Authenticate() gin.HandlerFunc {
192192
return
193193
}
194194

195+
// Skip X-Blnk-Key auth for metrics endpoint, it uses its own bearer token auth
196+
if c.Request != nil && c.Request.URL != nil && c.Request.URL.Path == "/metrics" {
197+
c.Next()
198+
return
199+
}
200+
195201
// Check if secure mode is enabled
196202
conf, err := config.Fetch()
197203
if err == nil && conf != nil && !conf.Server.Secure {

api/middleware/middleware.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ limitations under the License.
1717
package middleware
1818

1919
import (
20+
"crypto/subtle"
21+
"net/http"
2022
"strings"
2123
"time"
2224

@@ -64,6 +66,93 @@ func RateLimitMiddleware(conf *config.Configuration) gin.HandlerFunc {
6466
}
6567
}
6668

69+
// MetricsAuth returns a middleware that controls access to the /metrics endpoint.
70+
//
71+
// Behavior based on secure mode and token configuration:
72+
// - Secure mode OFF, no token: open access (no auth required)
73+
// - Secure mode OFF, token set: require bearer token
74+
// - Secure mode ON, token set: require bearer token
75+
// - Secure mode ON, no token: block all access (misconfiguration)
76+
//
77+
// When authentication is required, requests must include "Authorization: Bearer <token>".
78+
// This uses the standard Authorization header that Prometheus natively supports via
79+
// its scrape_configs authorization block.
80+
func MetricsAuth(secure bool, token string) gin.HandlerFunc {
81+
return func(c *gin.Context) {
82+
if secure && token == "" {
83+
c.AbortWithStatusJSON(http.StatusForbidden, gin.H{
84+
"error": "Metrics endpoint unavailable: metrics_bearer_token must be configured when secure mode is enabled",
85+
})
86+
return
87+
}
88+
89+
if token == "" {
90+
c.Next()
91+
return
92+
}
93+
94+
auth := c.GetHeader("Authorization")
95+
if auth == "" {
96+
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "Authorization required for metrics endpoint"})
97+
return
98+
}
99+
100+
const prefix = "Bearer "
101+
if !strings.HasPrefix(auth, prefix) {
102+
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "Authorization header must use Bearer scheme"})
103+
return
104+
}
105+
106+
provided := auth[len(prefix):]
107+
if subtle.ConstantTimeCompare([]byte(provided), []byte(token)) != 1 {
108+
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "Invalid bearer token"})
109+
return
110+
}
111+
112+
c.Next()
113+
}
114+
}
115+
116+
// MetricsAuthHandler wraps an http.Handler with bearer token authentication.
117+
// This is the non-Gin equivalent of MetricsAuth, used for the worker monitoring server
118+
// which uses a standard http.ServeMux instead of Gin.
119+
// Same secure mode logic as MetricsAuth: blocks access when secure=true and token is empty.
120+
func MetricsAuthHandler(secure bool, token string, next http.Handler) http.Handler {
121+
if secure && token == "" {
122+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
123+
w.Header().Set("Content-Type", "application/json")
124+
w.WriteHeader(http.StatusForbidden)
125+
_, _ = w.Write([]byte(`{"error":"Metrics endpoint unavailable: metrics_bearer_token must be configured when secure mode is enabled"}`))
126+
})
127+
}
128+
129+
if token == "" {
130+
return next
131+
}
132+
133+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
134+
auth := r.Header.Get("Authorization")
135+
if auth == "" {
136+
http.Error(w, `{"error":"Authorization required for metrics endpoint"}`, http.StatusUnauthorized)
137+
return
138+
}
139+
140+
const prefix = "Bearer "
141+
if !strings.HasPrefix(auth, prefix) {
142+
http.Error(w, `{"error":"Authorization header must use Bearer scheme"}`, http.StatusUnauthorized)
143+
return
144+
}
145+
146+
provided := auth[len(prefix):]
147+
if subtle.ConstantTimeCompare([]byte(provided), []byte(token)) != 1 {
148+
http.Error(w, `{"error":"Invalid bearer token"}`, http.StatusUnauthorized)
149+
return
150+
}
151+
152+
next.ServeHTTP(w, r)
153+
})
154+
}
155+
67156
// SecurityHeaders sets security headers to the response.
68157
// It sets the following headers:
69158
// - X-Content-Type-Options: nosniff

balance.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424

2525
"github.com/blnkfinance/blnk/config"
2626
"github.com/blnkfinance/blnk/internal/filter"
27+
"github.com/blnkfinance/blnk/internal/metrics"
2728
"github.com/blnkfinance/blnk/internal/notification"
2829
"github.com/blnkfinance/blnk/model"
2930
"github.com/sirupsen/logrus"
@@ -236,6 +237,7 @@ func (l *Blnk) CreateBalance(ctx context.Context, balance model.Balance) (model.
236237
return model.Balance{}, err
237238
}
238239
l.postBalanceActions(ctx, &balance)
240+
metrics.BalanceCreatedTotal.Add(ctx, 1)
239241
span.AddEvent("Balance created", trace.WithAttributes(attribute.String("balance.id", balance.BalanceID)))
240242
return balance, nil
241243
}

cmd/server.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828

2929
"github.com/blnkfinance/blnk"
3030
"github.com/blnkfinance/blnk/api"
31+
"github.com/blnkfinance/blnk/api/middleware"
3132
"github.com/blnkfinance/blnk/config"
3233
"github.com/blnkfinance/blnk/database"
3334
"github.com/blnkfinance/blnk/internal/search"
@@ -180,7 +181,17 @@ func healthCheckHandler(c *gin.Context) {
180181

181182
func initializeRouter(b *blnkInstance) *gin.Engine {
182183
router := api.NewAPI(b.blnk).Router()
183-
router.GET("/health", healthCheckHandler) // Add health check route
184+
router.GET("/health", healthCheckHandler)
185+
if h := trace.MetricsHandler(); h != nil {
186+
cfg, _ := config.Fetch()
187+
var secure bool
188+
var token string
189+
if cfg != nil {
190+
secure = cfg.Server.Secure
191+
token = cfg.Server.MetricsBearerToken
192+
}
193+
router.GET("/metrics", middleware.MetricsAuth(secure, token), gin.WrapH(h))
194+
}
184195
return router
185196
}
186197

@@ -300,16 +311,14 @@ func serverCommands(b *blnkInstance) *cobra.Command {
300311
Run: func(cmd *cobra.Command, args []string) {
301312
ctx := context.Background()
302313

303-
// Initialize router
304-
router := initializeRouter(b)
305-
306314
// Load configuration
307315
cfg, err := config.Fetch()
308316
if err != nil {
309317
logrus.Error(err)
310318
}
311319

312-
// Initialize telemetry and observability
320+
// Initialize telemetry and observability before the router,
321+
// so MetricsHandler() is available when routes are registered.
313322
phClient, shutdown, err := initializeTelemetryAndObservability(ctx, cfg)
314323
if err != nil {
315324
logrus.Fatal(err)
@@ -325,6 +334,9 @@ func serverCommands(b *blnkInstance) *cobra.Command {
325334
defer phClient.Close()
326335
}
327336

337+
// Initialize router (after OTel so /metrics handler is available)
338+
router := initializeRouter(b)
339+
328340
// Initialize TypeSense
329341
tsClient, err := initializeTypeSense(ctx, cfg)
330342
if err != nil {

cmd/workers.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,17 @@ import (
3131
"go.opentelemetry.io/otel"
3232

3333
"github.com/blnkfinance/blnk"
34+
"github.com/blnkfinance/blnk/api/middleware"
3435
"github.com/blnkfinance/blnk/config"
3536
"github.com/blnkfinance/blnk/internal/hotpairs"
37+
"github.com/blnkfinance/blnk/internal/metrics"
3638
"github.com/blnkfinance/blnk/internal/notification"
3739
redis_db "github.com/blnkfinance/blnk/internal/redis-db"
3840
"github.com/blnkfinance/blnk/internal/search"
41+
trace "github.com/blnkfinance/blnk/internal/traces"
3942
"github.com/blnkfinance/blnk/model"
43+
"go.opentelemetry.io/otel/attribute"
44+
otelmetric "go.opentelemetry.io/otel/metric"
4045

4146
"github.com/hibiken/asynq"
4247
"github.com/hibiken/asynqmon"
@@ -56,6 +61,8 @@ func (b *blnkInstance) processTransaction(ctx context.Context, t *asynq.Task) er
5661
ctx, span := otel.Tracer("blnk.transactions.worker").Start(ctx, "Process Transaction From Redis Queue")
5762
defer span.End()
5863

64+
startTime := time.Now()
65+
5966
var txn model.Transaction
6067
if err := json.Unmarshal(t.Payload(), &txn); err != nil {
6168
logrus.Error(err)
@@ -69,6 +76,19 @@ func (b *blnkInstance) processTransaction(ctx context.Context, t *asynq.Task) er
6976
return nil
7077
}
7178

79+
handled, err := b.blnk.TryRecordQueuedTransactionBatch(ctx, &txn)
80+
if b.cnf.Queue.EnableHotLane && t.Type() == b.cnf.Queue.HotQueueName {
81+
handled, err = b.blnk.TryRecordQueuedTransactionBatchForHotLane(ctx, &txn)
82+
}
83+
if err != nil {
84+
logrus.WithError(err).Warnf("coalesced processing attempt failed for transaction %s", txn.TransactionID)
85+
}
86+
if handled {
87+
metrics.QueueProcessingDuration.Record(ctx, time.Since(startTime).Seconds(),
88+
otelmetric.WithAttributes(attribute.String("result", "success")),
89+
)
90+
return nil
91+
}
7292
_, err = b.blnk.ProcessQueuedTransaction(ctx, &txn, b.cnf.Queue.EnableHotLane && t.Type() == b.cnf.Queue.HotQueueName)
7393
if err != nil {
7494
// Handle reference already used error
@@ -94,6 +114,9 @@ func (b *blnkInstance) processTransaction(ctx context.Context, t *asynq.Task) er
94114

95115
logrus.Infof("Insufficient funds for transaction %s, retry attempt %d/%d",
96116
txn.TransactionID, retryCount, b.cnf.Queue.MaxRetryAttempts)
117+
metrics.WorkerRetriesTotal.Add(ctx, 1,
118+
otelmetric.WithAttributes(attribute.String("reason", "insufficient_funds")),
119+
)
97120
return err // This will trigger a retry
98121
}
99122

@@ -121,9 +144,16 @@ func (b *blnkInstance) processTransaction(ctx context.Context, t *asynq.Task) er
121144
}
122145

123146
logrus.Infof("Transaction %s pushed back for retry due to error: %v", txn.TransactionID, err)
147+
metrics.WorkerRetriesTotal.Add(ctx, 1,
148+
otelmetric.WithAttributes(attribute.String("reason", "other")),
149+
)
124150
return err
125151
}
126152

153+
logrus.Infof("Transaction %s processed successfully", txn.TransactionID)
154+
metrics.QueueProcessingDuration.Record(ctx, time.Since(startTime).Seconds(),
155+
otelmetric.WithAttributes(attribute.String("result", "success")),
156+
)
127157
return nil
128158
}
129159

@@ -527,6 +557,9 @@ func startMonitoringServer(conf *config.Configuration) *http.Server {
527557
})
528558

529559
monitoringMux.Handle("/monitoring/", asynqmonHandler)
560+
if h := trace.MetricsHandler(); h != nil {
561+
monitoringMux.Handle("/metrics", middleware.MetricsAuthHandler(conf.Server.Secure, conf.Server.MetricsBearerToken, h))
562+
}
530563

531564
monitoringAddr := fmt.Sprintf(":%s", conf.Queue.MonitoringPort)
532565
srv := &http.Server{

config/config.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -90,13 +90,14 @@ var (
9090
var ConfigStore atomic.Value
9191

9292
type ServerConfig struct {
93-
SSL bool `json:"ssl" envconfig:"BLNK_SERVER_SSL"`
94-
CertStoragePath string `json:"cert_storage_path" envconfig:"BLNK_CERT_STORAGE_PATH"`
95-
Secure bool `json:"secure" envconfig:"BLNK_SERVER_SECURE"`
96-
SecretKey string `json:"secret_key" envconfig:"BLNK_SERVER_SECRET_KEY"`
97-
Domain string `json:"domain" envconfig:"BLNK_SERVER_SSL_DOMAIN"`
98-
Email string `json:"ssl_email" envconfig:"BLNK_SERVER_SSL_EMAIL"`
99-
Port string `json:"port" envconfig:"BLNK_SERVER_PORT"`
93+
SSL bool `json:"ssl" envconfig:"BLNK_SERVER_SSL"`
94+
CertStoragePath string `json:"cert_storage_path" envconfig:"BLNK_CERT_STORAGE_PATH"`
95+
Secure bool `json:"secure" envconfig:"BLNK_SERVER_SECURE"`
96+
SecretKey string `json:"secret_key" envconfig:"BLNK_SERVER_SECRET_KEY"`
97+
Domain string `json:"domain" envconfig:"BLNK_SERVER_SSL_DOMAIN"`
98+
Email string `json:"ssl_email" envconfig:"BLNK_SERVER_SSL_EMAIL"`
99+
Port string `json:"port" envconfig:"BLNK_SERVER_PORT"`
100+
MetricsBearerToken string `json:"metrics_bearer_token" envconfig:"BLNK_METRICS_BEARER_TOKEN"`
100101
}
101102

102103
type DataSourceConfig struct {

docker-compose.dev.yaml

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ services:
2020
restart: on-failure
2121
environment:
2222
TZ: ${TZ:-Etc/UTC}
23-
OTEL_EXPORTER_OTLP_ENDPOINT: jaeger:4318
23+
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: http://jaeger:4318/v1/traces
2424
ports:
2525
- "5001:5001"
2626
- "80:80"
@@ -38,7 +38,9 @@ services:
3838
restart: on-failure
3939
entrypoint: [ "blnk", "workers"]
4040
environment:
41-
OTEL_EXPORTER_OTLP_ENDPOINT: jaeger:4318
41+
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: http://jaeger:4318/v1/traces
42+
ports:
43+
- "5004:5004"
4244
depends_on:
4345
- redis
4446
- postgres
@@ -107,6 +109,19 @@ services:
107109
timeout: 5s
108110
retries: 3
109111

112+
prometheus:
113+
image: prom/prometheus:latest
114+
container_name: prometheus
115+
profiles:
116+
- monitoring
117+
ports:
118+
- "9090:9090"
119+
volumes:
120+
- ./prometheus.yml:/etc/prometheus/prometheus.yml
121+
depends_on:
122+
- server
123+
- worker
124+
110125
volumes:
111126
pg_data:
112127
typesense_data:

0 commit comments

Comments
 (0)