-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathapp_single_process.py
More file actions
122 lines (93 loc) · 3.19 KB
/
app_single_process.py
File metadata and controls
122 lines (93 loc) · 3.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import json
from time import time
from os import getpid
from confluent_kafka import Consumer, Message
from time import sleep
from multiprocessing import Value, Lock
from threading import Thread
from structlog import get_logger
from cgn_ec_consumer.handlers.generic import BaseHandler
from cgn_ec_consumer.config import settings
from cgn_ec_consumer.logger import setup_logging
logger = get_logger()
metrics_count = Value("i", 0)
metrics_lock = Lock()
def _process_metrics(
records: list[Message],
consumer: Consumer,
handler: BaseHandler,
metrics_count: Value,
metrics_lock: Lock,
):
metrics = []
for msg in records:
message = json.loads(msg.value().decode("utf-8"))
metric = handler.parse_message(message)
if not metric:
continue
metrics.append(metric)
handler.process_outputs(metrics)
metrics_total = len(metrics)
with metrics_lock:
metrics_count.value += metrics_total
metrics.clear()
logger.debug(f"Total metrics processed by process[{getpid()}]: {metrics_total}")
consumer.commit()
def build_config():
config = {
"group.id": settings.KAFKA_GROUP_ID,
"bootstrap.servers": settings.KAFKA_BOOTSTRAP_SERVERS,
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
"fetch.max.bytes": 524288000,
"partition.assignment.strategy": "roundrobin",
}
return config
def print_metrics_count(metrics_count: Value):
prev_count = 0
prev_time = time() # Record the initial time
while True:
current_count = metrics_count.value
current_time = time()
# Calculate metrics processed in the interval
metrics_diff = current_count - prev_count
time_diff = current_time - prev_time
if time_diff > 0:
avg_metrics_per_second = metrics_diff / time_diff
else:
avg_metrics_per_second = 0
logger.info(
"Metric statistics",
total_metrics_processed=current_count,
avg_metrics_per_second=f"{avg_metrics_per_second:.2f}"
if avg_metrics_per_second
else 0,
)
# Update previous metrics and time for the next iteration
prev_count = current_count
prev_time = current_time
sleep(5)
if __name__ == "__main__":
setup_logging()
if not settings.HANDLER:
exit("A HANDLER must be configured to handle the metrics")
# metric worker
t = Thread(target=print_metrics_count, args=(metrics_count,), daemon=True)
t.start()
handler = settings.HANDLER(outputs=settings.OUTPUTS)
config = build_config()
consumer = Consumer(config)
consumer.subscribe([handler.TOPIC])
while True:
try:
records = consumer.consume(settings.KAFKA_MAX_RECORDS_POLL, timeout=1)
if not records or records is None:
sleep(0.01)
continue
_process_metrics(records, consumer, handler, metrics_count, metrics_lock)
except Exception as err:
logger.error(f"Terminating worker with pid: {getpid()} due to error: {err}")
consumer.close()
break
sleep(0.01)
consumer.close()