Skip to content

Commit ba6c0c0

Browse files
committed
final touches
1 parent 64bb45b commit ba6c0c0

6 files changed

Lines changed: 43 additions & 100 deletions

File tree

Cargo.lock

Lines changed: 0 additions & 59 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

openmetrics_udpserver/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,9 @@ clap = "4.4.*"
1010
bytes = "1.5.*"
1111
regex = "1.10.*"
1212
byteorder = "1.5.*"
13-
async-channel = "2.0.*"
1413
prometheus-client = "0.21.*"
1514
hyper = { version = "0.14.*", features = ["http2", "server"] }
16-
tokio = { version = "1.33.*", features = ["macros", "rt-multi-thread", "signal"] }
15+
tokio = { version = "1.33.*", features = ["macros", "rt-multi-thread", "signal", "sync"] }
1716
axum = { version = "0.6.*", features = ["macros", "http1", "tokio"], default-features = false }
1817
openmetrics_udpserver_lib = { path = "../openmetrics_udpserver_lib" }
1918

openmetrics_udpserver/src/main.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@ use crate::processor::{InboundMetric, Processor};
1010
use crate::serverdensity::aggregator::{ServerDensityAggregator, ServerDensityConfig};
1111
use crate::udp_server::UdpServer;
1212
use anyhow::{anyhow, Context};
13-
use async_channel::unbounded;
1413
use clap::{Arg, ArgAction, Command};
1514
use prometheus_client::registry::Registry;
1615
use std::process::exit;
1716
use std::sync::Arc;
17+
use tokio::sync::broadcast::channel;
1818
use tokio::sync::RwLock;
1919

2020
#[tokio::main]
@@ -86,10 +86,10 @@ async fn main() -> anyhow::Result<(), anyhow::Error> {
8686
println!("http host: {}", &config.http_bind);
8787

8888
let metric_registry = Arc::new(RwLock::new(Registry::default()));
89-
let (sender, receiver) = unbounded::<InboundMetric>();
89+
let (sender, receiver) = channel::<InboundMetric>(100_000);
9090

9191
let processor_config = config.clone();
92-
let processor_receiver = receiver.clone();
92+
let processor_receiver = sender.subscribe();
9393
let processor_registry = metric_registry.clone();
9494
let processor_handle = tokio::spawn(async move {
9595
let mut processor = Processor::new(processor_config, processor_registry);

openmetrics_udpserver/src/processor.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,16 @@ use crate::metrics::resetting_counter::ResettingCounterMetric;
33
use crate::metrics::resetting_value_metric::ResettingSingleValMetric;
44
use crate::metrics::ModifyMetric;
55
use anyhow::anyhow;
6-
use async_channel::Receiver;
76
use openmetrics_udpserver_lib::MetricType;
87
use prometheus_client::registry::{Metric, Registry};
98
use regex::Regex;
109
use std::collections::HashMap;
1110
use std::sync::Arc;
1211
use std::time::Duration;
12+
use tokio::sync::broadcast::error::TryRecvError;
13+
use tokio::sync::broadcast::Receiver;
1314
use tokio::sync::RwLock;
15+
use tokio::task::yield_now;
1416

1517
#[derive(Debug, Clone)]
1618
pub struct InboundMetric {
@@ -34,11 +36,11 @@ impl Processor {
3436
}
3537
}
3638

37-
pub async fn run(&mut self, receiver: Receiver<InboundMetric>) {
39+
pub async fn run(&mut self, mut receiver: Receiver<InboundMetric>) {
3840
let regex_allowed_chars = Regex::new(r"^[^a-zA-Z_:]|[^a-zA-Z0-9_:]")
3941
.expect("Unable to compile metrics naming regex, should not happen");
4042
loop {
41-
match receiver.recv().await {
43+
match receiver.try_recv() {
4244
Ok(inbound_metric) => {
4345
let metric_name = regex_allowed_chars
4446
.replace_all(&inbound_metric.name.replace('.', "_"), "")
@@ -73,7 +75,12 @@ impl Processor {
7375
}
7476
}
7577
}
76-
Err(_) => panic!("All metric senders were dropped, should not happen"),
78+
Err(TryRecvError::Empty | TryRecvError::Lagged(_)) => {
79+
yield_now().await;
80+
}
81+
Err(TryRecvError::Closed) => {
82+
panic!("All metric senders were dropped, should not happen")
83+
}
7784
}
7885
}
7986
}

openmetrics_udpserver/src/serverdensity/aggregator.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use crate::processor::InboundMetric;
22
use crate::serverdensity::{AverageHandler, MinHandler, PeakHandler, SumHandler};
3-
use async_channel::{Receiver, TryRecvError};
43
use clap::ArgMatches;
54
use openmetrics_udpserver_lib::MetricType;
65
use regex::Regex;
@@ -9,6 +8,8 @@ use std::collections::HashMap;
98
use std::fs::File;
109
use std::io::{BufReader, Read};
1110
use std::time::{Duration, SystemTime};
11+
use tokio::sync::broadcast::error::TryRecvError;
12+
use tokio::sync::broadcast::Receiver;
1213

1314
#[derive(Clone)]
1415
pub struct ServerDensityConfig {
@@ -114,7 +115,7 @@ impl ServerDensityAggregator {
114115
}
115116
}
116117

117-
pub async fn run(&self, receiver: Receiver<InboundMetric>) {
118+
pub async fn run(&self, mut receiver: Receiver<InboundMetric>) {
118119
let regex = Regex::new(r"[^0-9a-zA-ZäöüÄÖÜß\-()._]*").expect("failed to compile regex");
119120

120121
let mut metricmap = HashMap::new();
@@ -163,12 +164,12 @@ impl ServerDensityAggregator {
163164
);
164165
}
165166
}
166-
Err(TryRecvError::Empty) => {
167-
break;
168-
}
169167
Err(TryRecvError::Closed) => {
170168
panic!("channel disconnected, should never happen.");
171169
}
170+
Err(TryRecvError::Empty | TryRecvError::Lagged(_)) => {
171+
break;
172+
}
172173
};
173174
}
174175

openmetrics_udpserver/src/udp_server.rs

Lines changed: 22 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
use crate::config::Config;
22
use crate::processor::InboundMetric;
3-
use async_channel::Sender;
43
use byteorder::BigEndian;
54
use byteorder::ByteOrder;
65
use openmetrics_udpserver_lib::MetricType;
7-
use std::net::UdpSocket;
6+
use tokio::net::UdpSocket;
7+
use tokio::sync::broadcast::Sender;
88

99
pub struct UdpServer {
1010
config: Config,
@@ -20,41 +20,36 @@ impl UdpServer {
2020
}
2121

2222
pub async fn run(&self) {
23-
let mut udp_socket =
24-
UdpSocket::bind(&self.config.udp_bind).expect("Unable to bind UDP Server");
23+
let udp_socket = UdpSocket::bind(&self.config.udp_bind)
24+
.await
25+
.expect("Unable to bind UDP Server");
2526
loop {
26-
match self.read(&mut udp_socket) {
27-
Ok(metric) => {
28-
if let Err(err) = self.metric_sender.send(metric).await {
29-
eprintln!("Unable to process inbound metric: {}", err);
27+
if udp_socket.readable().await.is_ok() {
28+
let mut buf = [0; 300];
29+
if let Ok(read_bytes) = udp_socket.try_recv(&mut buf) {
30+
match self.decode_buffer(&buf, read_bytes) {
31+
Ok(inbound_metric) => {
32+
if let Err(err) = self.metric_sender.send(inbound_metric) {
33+
eprintln!("Unable to process inbound metric: {}", err);
34+
}
35+
}
36+
Err(err) => {
37+
eprintln!("could not decode message from socket: {}", err);
38+
}
3039
}
3140
}
32-
Err(err) => {
33-
eprintln!("could not read message from socket: {}", err);
34-
}
3541
}
3642
}
3743
}
3844

39-
fn read(&self, socket: &mut UdpSocket) -> Result<InboundMetric, String> {
40-
let mut buf = [0; 300];
41-
let (amt, _) = socket
42-
.recv_from(&mut buf)
43-
.map_err(|_| "Couldn't recv from socket".to_string())?;
44-
45-
if amt <= 6 {
46-
return Err("UDP Package size is too small".to_string());
47-
}
48-
49-
let metric_type = match MetricType::from_u16(BigEndian::read_u16(&buf[0..2])) {
45+
fn decode_buffer(&self, data: &[u8], read_bytes: usize) -> Result<InboundMetric, String> {
46+
let metric_type = match MetricType::from_u16(BigEndian::read_u16(&data[0..2])) {
5047
Some(m) => m,
51-
None => {
52-
return Err("Got unsupported metric type".to_string());
53-
}
48+
None => return Err("Got unsupported metric type".to_string()),
5449
};
5550

56-
let count = BigEndian::read_i32(&buf[2..6]);
57-
let name = String::from_utf8_lossy(&buf[6..amt])
51+
let count = BigEndian::read_i32(&data[2..6]);
52+
let name = String::from_utf8_lossy(&data[6..read_bytes])
5853
.to_string()
5954
.replace('"', "");
6055

0 commit comments

Comments
 (0)