Commit 61db8ed8 authored by Daniel Willmann's avatar Daniel Willmann
Browse files

Refactor: Move actual trace code into module and generalize

parent 2891bf5b
......@@ -22,4 +22,5 @@ opentelemetry-semantic-conventions = "0.3.0"
pcap-parser = { version = "0.10.1", features = ["data"] }
clap = "3.0.0-beta.2"
etherparse = "0.9.0"
once_cell = "1.5.2"
#![allow(dead_code)]
pub mod trace;
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum Type {
......
......@@ -171,7 +171,7 @@ mod parser {
}
}
use super::gsmtap::OsmocoreLog;
use super::OsmocoreLog;
use std::collections::HashMap;
#[derive(Debug, Clone, PartialEq)]
......
pub mod gsmtap;
pub mod trace;
mod options;
mod gsmtap;
mod pcap;
mod telemetry;
use clap::Clap;
use eyre::*;
use std::collections::HashMap;
use std::{
rc::Rc,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
......@@ -18,101 +18,10 @@ use std::{
use tracing::{debug, error, info, trace, warn, Level};
use tracing_subscriber::EnvFilter;
use async_std::net::{SocketAddr, UdpSocket};
use async_std::net::UdpSocket;
use opentelemetry::trace::{Span as _, Tracer as _, TracerProvider as _};
use opentelemetry::{
sdk::trace::Span,
sdk::trace::{Tracer, TracerProvider},
trace::*,
KeyValue,
};
use opentelemetry_semantic_conventions::resource::*;
use gsmtap_telemetry::{gsmtap::*, trace};
#[derive(Debug)]
struct Frame {
name: String,
}
#[derive(Debug)]
struct Fsm {
tracer: Tracer,
}
#[derive(Debug)]
struct PeerContext {
addr: SocketAddr,
callstack: Vec<(Frame, Span)>,
fsms: HashMap<String, Fsm>,
tracer: Tracer,
}
fn string_to_static_str(s: String) -> &'static str {
Box::leak(s.into_boxed_str())
}
/// WARNING, this function leaks memory! Right now we never free a trace_provider anyway
fn tracer_from_attrs(
name: String,
attrs: Vec<KeyValue>,
providers: &mut Vec<Rc<TracerProvider>>,
) -> Result<Tracer> {
let resource = opentelemetry::sdk::Resource::new(attrs);
let config = opentelemetry::sdk::trace::config().with_resource(resource);
let tracer_provider = opentelemetry_jaeger::new_pipeline()
.with_service_name(string_to_static_str(name))
.with_trace_config(config)
.build()?;
let tracer_provider = Rc::new(tracer_provider);
providers.push(tracer_provider.clone());
Ok(tracer_provider.get_tracer("", None))
}
fn tracer_from_peer(
peer: SocketAddr,
olog: &OsmocoreLog,
providers: &mut Vec<Rc<TracerProvider>>,
) -> Result<Tracer> {
let attrs = vec![
KeyValue::new(HOST_NAME, peer.ip().to_string()),
KeyValue::new(HOST_ID, peer.port() as i64),
KeyValue::new(PROCESS_PID, olog.pid as i64),
KeyValue::new(PROCESS_EXECUTABLE_NAME, olog.proc_name.clone()),
KeyValue::new(SERVICE_NAME, olog.proc_name.clone()),
KeyValue::new(SERVICE_NAMESPACE, olog.proc_name.clone()),
];
tracer_from_attrs(olog.proc_name.clone(), attrs, providers)
}
fn tracer_from_osmo_trace(
peer: &PeerContext,
traceattrs: &trace::TraceAttrs,
providers: &mut Vec<Rc<TracerProvider>>,
) -> Result<Tracer> {
let peer = peer.addr;
let fsminst = traceattrs.attrs.get("fsminst").cloned().unwrap_or_default();
let fsmname = traceattrs.attrs.get("name").cloned().unwrap_or_default();
let fsmid = traceattrs.attrs.get("id").cloned().unwrap_or_default();
let attrs = vec![
KeyValue::new(HOST_NAME, peer.to_string()),
KeyValue::new(PROCESS_PID, traceattrs.common.pid as i64),
KeyValue::new(PROCESS_EXECUTABLE_NAME, traceattrs.common.proc_name.clone()),
KeyValue::new(SERVICE_NAME, fsmid),
KeyValue::new(SERVICE_NAMESPACE, fsmname.clone()),
KeyValue::new(SERVICE_INSTANCE_ID, fsminst),
];
tracer_from_attrs(
format!("{}-{}", traceattrs.common.proc_name.clone(), fsmname),
attrs,
providers,
)
}
use gsmtap::{parser::parse_gsmtap, Packet};
use telemetry::Telemetry;
#[async_std::main]
async fn main() -> Result<()> {
......@@ -157,15 +66,14 @@ async fn listen_socket(listen: &str) -> Result<()> {
let socket = UdpSocket::bind(listen).await?;
info!("Listening on {:?}", socket.local_addr());
let mut peers = HashMap::new();
let mut providers = Vec::new();
let mut telem = Telemetry::new();
while !stop.load(Ordering::SeqCst) {
let mut buf = [0; 16 + 84 + 4096];
let (amt, src) = socket.recv_from(&mut buf).await?;
let buf = &mut buf[..amt];
let olog = match parser::parse_gsmtap(buf) {
let olog = match parse_gsmtap(buf) {
Ok(Packet::OsmocoreLog(olog)) => olog,
Err(err) => {
warn!("Ignoring packet from {}: {}", src, err);
......@@ -173,117 +81,7 @@ async fn listen_socket(listen: &str) -> Result<()> {
}
};
if olog.subsystem != SS_TRACE {
debug!("Ignoring regular log message {:?}", olog);
continue;
}
debug!("Recv GSMTAP Trace from {}", src);
debug!("Parsed OsmocoreLog: {:?}", &olog);
let otrace = trace::parse_osmotrace(&olog).map_err(|e| {
error!("{:?}", &olog);
e
})?;
debug!("Parsed OsmoTrace: {:?}", &otrace);
let peer = peers.entry(src).or_insert_with(|| {
info!("New peer: {}", src);
let tracer = tracer_from_peer(src, &olog, &mut providers).unwrap();
PeerContext {
addr: src,
callstack: Vec::new(),
fsms: HashMap::new(),
tracer,
}
});
match otrace {
trace::OsmoTrace::EnterFunc(attrs) => {
let mut tracer = &peer.tracer;
let frame = Frame {
name: attrs.common.name.clone(),
};
let mut span = tracer
.span_builder(&frame.name)
.with_start_time(attrs.common.timestamp);
let fsm_inst = attrs.attrs.get("fsminst").cloned().unwrap_or_default();
if attrs.attrs.contains_key("fsm_alloc") {
let tracer = tracer_from_osmo_trace(peer, &attrs, &mut providers)?;
peer.fsms.insert(fsm_inst.clone(), Fsm { tracer });
}
if attrs.attrs.contains_key("fsm_free") {
let _ = peer.fsms.remove(&fsm_inst);
}
if attrs.attrs.contains_key("fsminst") {
if let Some(fsm) = peer.fsms.get(&fsm_inst) {
tracer = &fsm.tracer;
}
}
if let Some((_frame, parent)) = &peer.callstack.last() {
let ctx = opentelemetry::Context::new().with_span(parent.clone());
span = span.with_parent_context(ctx);
}
let attrs = attrs.span_attrs();
span = span.with_attributes(attrs);
let span = span.start(tracer);
info!("Entering function {}", &frame.name);
peer.callstack.push((frame, span));
info!(
"Current callstack:\n{}",
peer.callstack
.iter()
.map(|(frame, _)| frame.name.clone())
.collect::<Vec<_>>()
.join("->")
);
info!(
"Current FSMs:\n{}",
peer.fsms.keys().cloned().collect::<Vec<_>>().join(",")
);
}
trace::OsmoTrace::ExitFunc(exit) => {
info!("Exit: {}", exit.common.name);
if let Some((frame, span)) = peer.callstack.pop() {
// FIXME: Verify that we actually exit the current function
assert_eq!(frame.name, exit.common.name);
let code = match &exit.status {
x if x == "ok" => StatusCode::Ok,
x if x == "error" => StatusCode::Error,
_ => StatusCode::Unset,
};
let msg = exit.msg.unwrap_or_default();
span.set_status(code, msg);
span.end_with_timestamp(exit.common.timestamp);
} else {
warn!("Already at top! Function: {}", exit.common.name);
}
}
trace::OsmoTrace::SetAttr(attrs) => {
let idx = peer.callstack.len() - 1;
if let Some((_frame, span)) = peer.callstack.get_mut(idx) {
for (key, value) in attrs.attrs.iter() {
span.set_attribute(KeyValue::new(key.clone(), value.clone()));
}
}
}
trace::OsmoTrace::Event(event) => {
if let Some((_frame, span)) = &peer.callstack.last() {
let attrs = event.span_attrs();
span.add_event_with_timestamp(event.msg, event.common.timestamp, attrs);
} else {
warn!("Got event without span: {:?}", &event);
}
}
}
telem.handle_packet(src, &olog).await?;
}
info!("Shutting down");
......
use eyre::*;
use once_cell::sync::Lazy;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
#[allow(unused_imports)]
use tracing::{debug, error, info, trace, warn, Level};
use async_std::net::SocketAddr;
use opentelemetry::trace::{Span as _, Tracer as _, TracerProvider as _};
use opentelemetry::{
sdk::trace::Span,
sdk::trace::{Tracer, TracerProvider},
trace::*,
KeyValue,
};
use opentelemetry_semantic_conventions::resource::*;
use crate::gsmtap::*;
#[derive(Debug)]
struct Frame {
name: String,
}
#[derive(Debug)]
struct Fsm {
tracer: Tracer,
}
#[derive(Debug)]
struct PeerContext {
addr: SocketAddr,
callstack: Vec<(Frame, Span)>,
fsms: HashMap<String, Fsm>,
tracer: Tracer,
}
fn string_to_static_str(s: String) -> &'static str {
Box::leak(s.into_boxed_str())
}
static PROVIDERS: Lazy<Mutex<Vec<Arc<TracerProvider>>>> = Lazy::new(|| Mutex::new(Vec::new()));
/// WARNING, this function leaks memory! Right now we never free a trace_provider anyway
fn tracer_from_attrs(name: String, attrs: Vec<KeyValue>) -> Result<Tracer> {
let resource = opentelemetry::sdk::Resource::new(attrs);
let config = opentelemetry::sdk::trace::config().with_resource(resource);
let tracer_provider = opentelemetry_jaeger::new_pipeline()
.with_service_name(string_to_static_str(name))
.with_trace_config(config)
.build()?;
let tracer_provider = Arc::new(tracer_provider);
PROVIDERS.lock().unwrap().push(tracer_provider.clone());
Ok(tracer_provider.get_tracer("", None))
}
fn tracer_from_peer(peer: SocketAddr, olog: &OsmocoreLog) -> Result<Tracer> {
let attrs = vec![
KeyValue::new(HOST_NAME, peer.ip().to_string()),
KeyValue::new(HOST_ID, peer.port() as i64),
KeyValue::new(PROCESS_PID, olog.pid as i64),
KeyValue::new(PROCESS_EXECUTABLE_NAME, olog.proc_name.clone()),
KeyValue::new(SERVICE_NAME, olog.proc_name.clone()),
KeyValue::new(SERVICE_NAMESPACE, olog.proc_name.clone()),
];
tracer_from_attrs(olog.proc_name.clone(), attrs)
}
fn tracer_from_osmo_trace(peer: &PeerContext, traceattrs: &trace::TraceAttrs) -> Result<Tracer> {
let peer = peer.addr;
let fsminst = traceattrs.attrs.get("fsminst").cloned().unwrap_or_default();
let fsmname = traceattrs.attrs.get("name").cloned().unwrap_or_default();
let fsmid = traceattrs.attrs.get("id").cloned().unwrap_or_default();
let attrs = vec![
KeyValue::new(HOST_NAME, peer.to_string()),
KeyValue::new(PROCESS_PID, traceattrs.common.pid as i64),
KeyValue::new(PROCESS_EXECUTABLE_NAME, traceattrs.common.proc_name.clone()),
KeyValue::new(SERVICE_NAME, fsmid),
KeyValue::new(SERVICE_NAMESPACE, fsmname.clone()),
KeyValue::new(SERVICE_INSTANCE_ID, fsminst),
];
tracer_from_attrs(
format!("{}-{}", traceattrs.common.proc_name.clone(), fsmname),
attrs,
)
}
pub struct Telemetry {
peers: HashMap<SocketAddr, PeerContext>,
}
impl Telemetry {
pub fn new() -> Telemetry {
Telemetry {
peers: HashMap::new(),
}
}
pub async fn handle_packet(&mut self, src: SocketAddr, olog: &OsmocoreLog) -> Result<()> {
if olog.subsystem != SS_TRACE {
debug!("Ignoring regular log message {:?}", olog);
return Ok(());
}
debug!("Recv GSMTAP Trace from {}", src);
debug!("Parsed OsmocoreLog: {:?}", &olog);
let otrace = trace::parse_osmotrace(&olog).map_err(|e| {
error!("{:?}", &olog);
e
})?;
debug!("Parsed OsmoTrace: {:?}", &otrace);
let peer = self.peers.entry(src).or_insert_with(|| {
info!("New peer: {}", src);
let tracer = tracer_from_peer(src, &olog).unwrap();
PeerContext {
addr: src,
callstack: Vec::new(),
fsms: HashMap::new(),
tracer,
}
});
match otrace {
trace::OsmoTrace::EnterFunc(attrs) => {
let mut tracer = &peer.tracer;
let frame = Frame {
name: attrs.common.name.clone(),
};
let mut span = tracer
.span_builder(&frame.name)
.with_start_time(attrs.common.timestamp);
let fsm_inst = attrs.attrs.get("fsminst").cloned().unwrap_or_default();
if attrs.attrs.contains_key("fsm_alloc") {
let tracer = tracer_from_osmo_trace(peer, &attrs)?;
peer.fsms.insert(fsm_inst.clone(), Fsm { tracer });
}
if attrs.attrs.contains_key("fsm_free") {
let _ = peer.fsms.remove(&fsm_inst);
}
if attrs.attrs.contains_key("fsminst") {
if let Some(fsm) = peer.fsms.get(&fsm_inst) {
tracer = &fsm.tracer;
}
}
if let Some((_frame, parent)) = &peer.callstack.last() {
let ctx = opentelemetry::Context::new().with_span(parent.clone());
span = span.with_parent_context(ctx);
}
let attrs = attrs.span_attrs();
span = span.with_attributes(attrs);
let span = span.start(tracer);
info!("Entering function {}", &frame.name);
peer.callstack.push((frame, span));
info!(
"Current callstack:\n{}",
peer.callstack
.iter()
.map(|(frame, _)| frame.name.clone())
.collect::<Vec<_>>()
.join("->")
);
info!(
"Current FSMs:\n{}",
peer.fsms.keys().cloned().collect::<Vec<_>>().join(",")
);
}
trace::OsmoTrace::ExitFunc(exit) => {
info!("Exit: {}", exit.common.name);
if let Some((frame, span)) = peer.callstack.pop() {
// FIXME: Verify that we actually exit the current function
assert_eq!(frame.name, exit.common.name);
let code = match &exit.status {
x if x == "ok" => StatusCode::Ok,
x if x == "error" => StatusCode::Error,
_ => StatusCode::Unset,
};
let msg = exit.msg.unwrap_or_default();
span.set_status(code, msg);
span.end_with_timestamp(exit.common.timestamp);
} else {
warn!("Already at top! Function: {}", exit.common.name);
}
}
trace::OsmoTrace::SetAttr(attrs) => {
let idx = peer.callstack.len() - 1;
if let Some((_frame, span)) = peer.callstack.get_mut(idx) {
for (key, value) in attrs.attrs.iter() {
span.set_attribute(KeyValue::new(key.clone(), value.clone()));
}
}
}
trace::OsmoTrace::Event(event) => {
if let Some((_frame, span)) = &peer.callstack.last() {
let attrs = event.span_attrs();
span.add_event_with_timestamp(event.msg, event.common.timestamp, attrs);
} else {
warn!("Got event without span: {:?}", &event);
}
}
}
Ok(())
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment