Skip to content

Commit

Permalink
populate appProtocol kflow column
Browse files Browse the repository at this point in the history
  • Loading branch information
Will committed Mar 21, 2024
1 parent 76f9437 commit fe4b7aa
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 31 deletions.
73 changes: 51 additions & 22 deletions src/custom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,16 @@ pub const RADIUS_ACCT_STATUS: &str = "RADIUS_ACCT_STATUS";

#[derive(Debug)]
pub struct Customs {
columns: Columns,
output: Vec<kflowCustom>,
protocol: u32,
}

#[derive(Debug)]
pub struct Columns {
app_proto: Option<u64>,
fragments: Option<u64>,
fields: HashMap<String, u64>,
vec: Vec<kflowCustom>,
columns: HashMap<String, u64>,
}

impl Customs {
Expand Down Expand Up @@ -124,41 +130,52 @@ impl Customs {
fields.insert(TLS_SERVER_NAME.to_owned(), id);
}

Customs{
let columns = Columns {
app_proto: fields.get(APP_PROTOCOL).cloned(),
fragments: fields.get(FRAGMENTS).cloned(),
fields: fields,
vec: Vec::with_capacity(cs.len()),
columns: fields,
};

Customs {
columns: columns,
output: Vec::with_capacity(cs.len()),
protocol: 0,
}
}

pub fn append(&mut self, ctr: &Counter) {
if ctr.fragments > 0 {
self.fragments.map(|id| self.add_u32(id, ctr.fragments as u32));
self.protocol = match ctr.decoder {
Decoder::DNS => 1,
Decoder::HTTP => 2,
Decoder::TLS => 3,
Decoder::DHCP => 4,
Decoder::Radius => 9,
_ => 0,
};

if let Some(id) = self.columns.fragments {
if ctr.fragments > 0 {
self.add_u32(id, ctr.fragments as u32);
}
}

self.app_proto.map(|id| {
match ctr.decoder {
Decoder::DNS => self.add_u32(id, 1),
Decoder::HTTP => self.add_u32(id, 2),
Decoder::TLS => self.add_u32(id, 3),
Decoder::DHCP => self.add_u32(id, 4),
Decoder::Radius => self.add_u32(id, 9),
_ => (),
if let Some(id) = self.columns.app_proto {
if self.protocol > 0 {
self.add_u32(id, self.protocol);
}
});
}
}

pub fn add_str(&mut self, id: u64, val: &CStr) {
self.vec.push(kflowCustom::str(id, val))
self.output.push(kflowCustom::str(id, val))
}

pub fn add_u32(&mut self, id: u64, val: u32) {
self.vec.push(kflowCustom::u32(id, val))
self.output.push(kflowCustom::u32(id, val))
}

pub fn add_addr(&mut self, id: u64, val: IpAddr) {
self.vec.push(kflowCustom::addr(id, val))
self.output.push(kflowCustom::addr(id, val))
}

pub fn add_latency(&mut self, id: u64, d: Duration) {
Expand All @@ -171,21 +188,33 @@ impl Customs {
}.whole_milliseconds() as u32);
}

pub fn app_protocol(&self) -> u32 {
self.protocol
}

pub fn clear(&mut self) {
self.vec.clear();
self.protocol = 0;
self.output.clear();
}

pub fn get(&self, key: &str) -> Result<u64, ()> {
match self.fields.get(key) {
match self.columns.get(key) {
Some(id) => Ok(*id),
None => Err(()),
}
}
}

impl Columns {
pub fn get(&self, key: &str) -> Option<&u64> {
self.columns.get(key)
}
}

impl Deref for Customs {
type Target = [kflowCustom];

fn deref(&self) -> &[kflowCustom] {
&self.vec[..]
&self.output[..]
}
}
22 changes: 14 additions & 8 deletions src/libkflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ pub fn configure(cfg: &Config) -> Result<Device, Error> {
}
}

pub fn send(key: &Key, ctr: &Counter, sr: u32, cs: Option<&[kflowCustom]>) -> Result<(), Error> {
pub fn flow(key: &Key, ctr: &Counter, sample_rate: u32, app_protocol: u32) -> kflow {
let mut kflow: kflow = Default::default();
let mut v6src: [u8; 16];
let mut v6dst: [u8; 16];
Expand Down Expand Up @@ -260,13 +260,14 @@ pub fn send(key: &Key, ctr: &Counter, sr: u32, cs: Option<&[kflowCustom]>) -> Re
},
}

kflow.srcEthMac = pack_mac(&ctr.ethernet.src);
kflow.dstEthMac = pack_mac(&ctr.ethernet.dst);
kflow.tos = ctr.tos as u32;
kflow.l4SrcPort = key.1.port as u32;
kflow.l4DstPort = key.2.port as u32;
kflow.tcpFlags = ctr.tcp_flags as u32;
kflow.sampleRate = sr;
kflow.srcEthMac = pack_mac(&ctr.ethernet.src);
kflow.dstEthMac = pack_mac(&ctr.ethernet.dst);
kflow.tos = ctr.tos as u32;
kflow.l4SrcPort = key.1.port as u32;
kflow.l4DstPort = key.2.port as u32;
kflow.tcpFlags = ctr.tcp_flags as u32;
kflow.sampleRate = sample_rate;
kflow.appProtocol = app_protocol;

match ctr.direction {
Direction::In => {
Expand All @@ -283,6 +284,10 @@ pub fn send(key: &Key, ctr: &Counter, sr: u32, cs: Option<&[kflowCustom]>) -> Re
},
}

kflow
}

pub fn send(mut kflow: kflow, cs: Option<&[kflowCustom]>) -> Result<(), Error> {
if let Some(cs) = cs {
kflow.customs = cs.as_ptr();
kflow.numCustoms = cs.len() as u32;
Expand Down Expand Up @@ -526,6 +531,7 @@ pub struct kflow {
pub ipv6SrcRoutePrefix: *const u8,
pub ipv6DstRoutePrefix: *const u8,
pub isMetric: u8,
pub appProtocol: u32,

pub customs: *const kflowCustom,
pub numCustoms: u32,
Expand Down
6 changes: 5 additions & 1 deletion src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,14 @@ impl FlowQueue {
fn send(customs: &mut Customs, tracker: &mut Tracker, key: &Key, ctr: &mut Counter, sr: u32) {
customs.append(ctr);
tracker.append(key, customs);
libkflow::send(key, ctr, sr, match &customs {

let flow = libkflow::flow(key, ctr, sr, customs.app_protocol());

libkflow::send(flow, match &customs {
cs if !cs.is_empty() => Some(cs),
_ => None,
}).expect("failed to send flow");

ctr.clear();
}
}
Expand Down

0 comments on commit fe4b7aa

Please sign in to comment.