diff --git a/src/custom.rs b/src/custom.rs index a6bf4da..2995394 100644 --- a/src/custom.rs +++ b/src/custom.rs @@ -55,10 +55,16 @@ pub const RADIUS_ACCT_STATUS: &str = "RADIUS_ACCT_STATUS"; #[derive(Debug)] pub struct Customs { + columns: Columns, + output: Vec, + protocol: u32, +} + +#[derive(Debug)] +pub struct Columns { app_proto: Option, fragments: Option, - fields: HashMap, - vec: Vec, + columns: HashMap, } impl Customs { @@ -124,41 +130,52 @@ impl Customs { fields.insert(TLS_SERVER_NAME.to_owned(), id); } - Customs{ + let columns = Columns { app_proto: fields.get(APP_PROTOCOL).cloned(), fragments: fields.get(FRAGMENTS).cloned(), - fields: fields, - vec: Vec::with_capacity(cs.len()), + columns: fields, + }; + + Customs { + columns: columns, + output: Vec::with_capacity(cs.len()), + protocol: 0, } } pub fn append(&mut self, ctr: &Counter) { - if ctr.fragments > 0 { - self.fragments.map(|id| self.add_u32(id, ctr.fragments as u32)); + self.protocol = match ctr.decoder { + Decoder::DNS => 1, + Decoder::HTTP => 2, + Decoder::TLS => 3, + Decoder::DHCP => 4, + Decoder::Radius => 9, + _ => 0, + }; + + if let Some(id) = self.columns.fragments { + if ctr.fragments > 0 { + self.add_u32(id, ctr.fragments as u32); + } } - self.app_proto.map(|id| { - match ctr.decoder { - Decoder::DNS => self.add_u32(id, 1), - Decoder::HTTP => self.add_u32(id, 2), - Decoder::TLS => self.add_u32(id, 3), - Decoder::DHCP => self.add_u32(id, 4), - Decoder::Radius => self.add_u32(id, 9), - _ => (), + if let Some(id) = self.columns.app_proto { + if self.protocol > 0 { + self.add_u32(id, self.protocol); } - }); + } } pub fn add_str(&mut self, id: u64, val: &CStr) { - self.vec.push(kflowCustom::str(id, val)) + self.output.push(kflowCustom::str(id, val)) } pub fn add_u32(&mut self, id: u64, val: u32) { - self.vec.push(kflowCustom::u32(id, val)) + self.output.push(kflowCustom::u32(id, val)) } pub fn add_addr(&mut self, id: u64, val: IpAddr) { - self.vec.push(kflowCustom::addr(id, val)) + self.output.push(kflowCustom::addr(id, val)) } pub fn add_latency(&mut self, id: u64, d: Duration) { @@ -171,21 +188,33 @@ impl Customs { }.whole_milliseconds() as u32); } + pub fn app_protocol(&self) -> u32 { + self.protocol + } + pub fn clear(&mut self) { - self.vec.clear(); + self.protocol = 0; + self.output.clear(); } pub fn get(&self, key: &str) -> Result { - match self.fields.get(key) { + match self.columns.get(key) { Some(id) => Ok(*id), None => Err(()), } } } +impl Columns { + pub fn get(&self, key: &str) -> Option<&u64> { + self.columns.get(key) + } +} + impl Deref for Customs { type Target = [kflowCustom]; + fn deref(&self) -> &[kflowCustom] { - &self.vec[..] + &self.output[..] } } diff --git a/src/libkflow.rs b/src/libkflow.rs index 4badbff..ce9a7c5 100644 --- a/src/libkflow.rs +++ b/src/libkflow.rs @@ -227,7 +227,7 @@ pub fn configure(cfg: &Config) -> Result { } } -pub fn send(key: &Key, ctr: &Counter, sr: u32, cs: Option<&[kflowCustom]>) -> Result<(), Error> { +pub fn flow(key: &Key, ctr: &Counter, sample_rate: u32, app_protocol: u32) -> kflow { let mut kflow: kflow = Default::default(); let mut v6src: [u8; 16]; let mut v6dst: [u8; 16]; @@ -260,13 +260,14 @@ pub fn send(key: &Key, ctr: &Counter, sr: u32, cs: Option<&[kflowCustom]>) -> Re }, } - kflow.srcEthMac = pack_mac(&ctr.ethernet.src); - kflow.dstEthMac = pack_mac(&ctr.ethernet.dst); - kflow.tos = ctr.tos as u32; - kflow.l4SrcPort = key.1.port as u32; - kflow.l4DstPort = key.2.port as u32; - kflow.tcpFlags = ctr.tcp_flags as u32; - kflow.sampleRate = sr; + kflow.srcEthMac = pack_mac(&ctr.ethernet.src); + kflow.dstEthMac = pack_mac(&ctr.ethernet.dst); + kflow.tos = ctr.tos as u32; + kflow.l4SrcPort = key.1.port as u32; + kflow.l4DstPort = key.2.port as u32; + kflow.tcpFlags = ctr.tcp_flags as u32; + kflow.sampleRate = sample_rate; + kflow.appProtocol = app_protocol; match ctr.direction { Direction::In => { @@ -283,6 +284,10 @@ pub fn send(key: &Key, ctr: &Counter, sr: u32, cs: Option<&[kflowCustom]>) -> Re }, } + kflow +} + +pub fn send(mut kflow: kflow, cs: Option<&[kflowCustom]>) -> Result<(), Error> { if let Some(cs) = cs { kflow.customs = cs.as_ptr(); kflow.numCustoms = cs.len() as u32; @@ -526,6 +531,7 @@ pub struct kflow { pub ipv6SrcRoutePrefix: *const u8, pub ipv6DstRoutePrefix: *const u8, pub isMetric: u8, + pub appProtocol: u32, pub customs: *const kflowCustom, pub numCustoms: u32, diff --git a/src/queue.rs b/src/queue.rs index ae90371..e00d537 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -131,10 +131,14 @@ impl FlowQueue { fn send(customs: &mut Customs, tracker: &mut Tracker, key: &Key, ctr: &mut Counter, sr: u32) { customs.append(ctr); tracker.append(key, customs); - libkflow::send(key, ctr, sr, match &customs { + + let flow = libkflow::flow(key, ctr, sr, customs.app_protocol()); + + libkflow::send(flow, match &customs { cs if !cs.is_empty() => Some(cs), _ => None, }).expect("failed to send flow"); + ctr.clear(); } }