Miscellaneous Tools for Rust Enterprise Applications Link to heading
Building enterprise applications requires more than just frameworks and libraries for core functionality. In this final post of our series, we’ll explore essential miscellaneous tools for Rust enterprise applications, focusing on observability, logging, and machine learning integration.
Observability in Rust Applications Link to heading
Observability is crucial for understanding the behavior and performance of enterprise applications in production. It encompasses metrics, logging, and distributed tracing.
Metrics Collection with Prometheus Link to heading
Prometheus has become the de facto standard for metrics collection in modern applications. Rust offers excellent integration through the prometheus
crate:
use axum::{routing::get, Router};
use prometheus::{Encoder, IntCounter, IntGauge, Registry, TextEncoder};
use std::sync::Arc;
use std::time::Instant;
// Define metrics
struct Metrics {
registry: Registry,
requests_total: IntCounter,
requests_in_progress: IntGauge,
request_duration_seconds: prometheus::Histogram,
}
impl Metrics {
fn new() -> Self {
let registry = Registry::new();
let requests_total = IntCounter::new(
"app_requests_total",
"Total number of requests received",
).unwrap();
let requests_in_progress = IntGauge::new(
"app_requests_in_progress",
"Number of requests currently being processed",
).unwrap();
let request_duration_seconds = prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"app_request_duration_seconds",
"Request duration in seconds",
)
.buckets(vec![0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]),
).unwrap();
registry.register(Box::new(requests_total.clone())).unwrap();
registry.register(Box::new(requests_in_progress.clone())).unwrap();
registry.register(Box::new(request_duration_seconds.clone())).unwrap();
Self {
registry,
requests_total,
requests_in_progress,
request_duration_seconds,
}
}
}
// Middleware for tracking metrics
async fn track_metrics<B>(
req: axum::http::Request<B>,
next: axum::middleware::Next<B>,
metrics: Arc<Metrics>,
) -> axum::response::Response {
// Increment requests counter
metrics.requests_total.inc();
// Track in-progress requests
metrics.requests_in_progress.inc();
// Record start time
let start = Instant::now();
// Process the request
let response = next.run(req).await;
// Record duration
let duration = start.elapsed().as_secs_f64();
metrics.request_duration_seconds.observe(duration);
// Decrement in-progress counter
metrics.requests_in_progress.dec();
response
}
// Metrics endpoint handler
async fn metrics_handler(metrics: axum::extract::State<Arc<Metrics>>) -> String {
let encoder = TextEncoder::new();
let metric_families = metrics.registry.gather();
let mut buffer = Vec::new();
encoder.encode(&metric_families, &mut buffer).unwrap();
String::from_utf8(buffer).unwrap()
}
// Example application with metrics
#[tokio::main]
async fn main() {
// Initialize metrics
let metrics = Arc::new(Metrics::new());
// Build our application
let app = Router::new()
.route("/", get(|| async { "Hello, World!" }))
.route("/metrics", get(metrics_handler))
.layer(axum::middleware::from_fn_with_state(
metrics.clone(),
track_metrics,
))
.with_state(metrics);
// Run our server
let addr = "0.0.0.0:3000";
println!("Server running on http://{}", addr);
axum::Server::bind(&addr.parse().unwrap())
.serve(app.into_make_service())
.await
.unwrap();
}
Distributed Tracing with OpenTelemetry Link to heading
Distributed tracing is essential for understanding request flows across microservices. Rust supports OpenTelemetry for standardized tracing:
use axum::{routing::get, Router};
use opentelemetry::global;
use opentelemetry::sdk::trace::{self, Sampler};
use opentelemetry::sdk::Resource;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_semantic_conventions::resource;
use std::time::Duration;
use tower_http::trace::TraceLayer;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, Layer};
// Initialize OpenTelemetry
fn init_tracer() -> opentelemetry::sdk::trace::Tracer {
// Configure OTLP exporter
let otlp_exporter = opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint("http://localhost:4317")
.with_timeout(Duration::from_secs(3));
// Create a trace pipeline
opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(otlp_exporter)
.with_trace_config(
trace::config()
.with_sampler(Sampler::AlwaysOn)
.with_resource(Resource::new(vec![
resource::SERVICE_NAME.string("my-rust-service"),
resource::SERVICE_VERSION.string("0.1.0"),
])),
)
.install_simple()
.unwrap()
}
#[tokio::main]
async fn main() {
// Initialize tracing
let tracer = init_tracer();
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
tracing_subscriber::registry()
.with(telemetry)
.with(tracing_subscriber::fmt::layer())
.init();
// Build our application with tracing
let app = Router::new()
.route("/", get(|| async { "Hello, World!" }))
.route("/users/:id", get(get_user))
.layer(TraceLayer::new_for_http());
// Run our server
let addr = "0.0.0.0:3000";
println!("Server running on http://{}", addr);
axum::Server::bind(&addr.parse().unwrap())
.serve(app.into_make_service())
.await
.unwrap();
// Ensure all spans are exported
global::shutdown_tracer_provider();
}
// Example handler with manual tracing
async fn get_user(axum::extract::Path(id): axum::extract::Path<String>) -> String {
// Create a span for database operation
let db_span = tracing::info_span!("database.query", db.system = "postgresql");
// Execute the database query within the span
let user = db_span.in_scope(|| {
// Simulate database query
tracing::info!(user.id = %id, "Fetching user from database");
std::thread::sleep(Duration::from_millis(50));
format!("User {}", id)
});
// Create a span for external API call
let api_span = tracing::info_span!("http.client", http.url = "https://api.example.com");
// Execute the API call within the span
api_span.in_scope(|| {
// Simulate API call
tracing::info!(user.id = %id, "Fetching user details from external API");
std::thread::sleep(Duration::from_millis(100));
});
user
}
Health Checks and Readiness Probes Link to heading
Enterprise applications deployed in container orchestration systems like Kubernetes require health checks:
use axum::{
routing::get,
Router,
http::StatusCode,
response::IntoResponse,
};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
// Application state with health status
struct AppState {
is_ready: AtomicBool,
db_pool: Option<sqlx::PgPool>,
}
// Health check handler
async fn health_check() -> impl IntoResponse {
// Simple health check that always returns OK
StatusCode::OK
}
// Readiness check handler
async fn readiness_check(
axum::extract::State(state): axum::extract::State<Arc<AppState>>,
) -> impl IntoResponse {
// Check if the application is ready to serve traffic
if state.is_ready.load(Ordering::Relaxed) {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
}
}
// Liveness check handler
async fn liveness_check(
axum::extract::State(state): axum::extract::State<Arc<AppState>>,
) -> impl IntoResponse {
// Check if the database connection is alive
if let Some(pool) = &state.db_pool {
match sqlx::query("SELECT 1").execute(pool).await {
Ok(_) => StatusCode::OK,
Err(_) => StatusCode::SERVICE_UNAVAILABLE,
}
} else {
// No database configured, just return OK
StatusCode::OK
}
}
#[tokio::main]
async fn main() {
// Initialize application state
let state = Arc::new(AppState {
is_ready: AtomicBool::new(false),
db_pool: None, // Initialize your database pool here
});
// Build our application
let app = Router::new()
.route("/", get(|| async { "Hello, World!" }))
.route("/health", get(health_check))
.route("/ready", get(readiness_check))
.route("/live", get(liveness_check))
.with_state(state.clone());
// Run our server
let addr = "0.0.0.0:3000";
println!("Server running on http://{}", addr);
// Mark the application as ready after a short delay
let state_clone = state.clone();
tokio::spawn(async move {
// Simulate initialization time
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
state_clone.is_ready.store(true, Ordering::Relaxed);
println!("Application is now ready to serve traffic");
});
axum::Server::bind(&addr.parse().unwrap())
.serve(app.into_make_service())
.await
.unwrap();
}
Logging in Rust Applications Link to heading
Proper logging is essential for debugging and monitoring enterprise applications. Rust offers several approaches to structured logging.
Structured Logging with tracing Link to heading
The tracing
crate provides a framework for structured, contextual logging:
use tracing::{info, instrument, Level};
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
// Define a struct for our application
struct User {
id: u64,
name: String,
}
// Instrument a function to create a span
#[instrument(skip(password))]
fn create_user(name: &str, password: &str) -> User {
// The name field will be logged, but password will be skipped
info!(user.action = "create", "Creating new user");
// Simulate user creation
let user = User {
id: 42,
name: name.to_string(),
};
// Log with additional fields
info!(
user.id = user.id,
user.name = user.name,
"User successfully created"
);
user
}
// Instrument an async function
#[instrument(fields(request_id = %uuid::Uuid::new_v4()))]
async fn handle_request(user_id: u64) {
info!(user.id = user_id, "Processing request");
// Simulate some work
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
// Create a new span for a sub-operation
let op_span = tracing::info_span!("database.query", db.table = "users");
let _guard = op_span.enter();
info!(db.query.duration_ms = 35, "Database query completed");
}
#[tokio::main]
async fn main() {
// Initialize the tracing subscriber with JSON formatting
tracing_subscriber::registry()
.with(fmt::layer().json())
.with(EnvFilter::from_default_env()
.add_directive(Level::INFO.into())
.add_directive("sqlx=warn".parse().unwrap()))
.init();
info!(version = env!("CARGO_PKG_VERSION"), "Application starting");
let user = create_user("alice", "secret_password");
handle_request(user.id).await;
info!("Application shutting down");
}
Log Aggregation with Vector Link to heading
For enterprise applications, log aggregation is essential. Vector is a lightweight, ultra-fast tool for observability data collection and processing:
use serde_json::json;
use tracing::{info, instrument};
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
// Configure logging to output JSON that can be collected by Vector
fn configure_logging() {
tracing_subscriber::registry()
.with(fmt::layer()
.json()
.with_current_span(true)
.with_span_list(true))
.with(EnvFilter::from_default_env())
.init();
}
#[tokio::main]
async fn main() {
configure_logging();
// Log application metadata
info!(
app.name = "my-enterprise-app",
app.version = env!("CARGO_PKG_VERSION"),
app.environment = std::env::var("APP_ENV").unwrap_or_else(|_| "development".to_string()),
"Application starting"
);
// Run the application
run_app().await;
}
#[instrument]
async fn run_app() {
// Application logic here
info!(event = "app_started", "Application is running");
// Log structured data
info!(
event = "business_transaction",
transaction.id = uuid::Uuid::new_v4().to_string(),
transaction.type = "payment",
transaction.amount = 99.95,
transaction.currency = "USD",
"Payment processed successfully"
);
}
Vector configuration (vector.toml):
[sources.docker_logs]
type = "docker_logs"
include_containers = ["my-enterprise-app"]
[transforms.parse_json]
type = "remap"
inputs = ["docker_logs"]
source = '''
. = parse_json!(.message)
'''
[transforms.add_metadata]
type = "remap"
inputs = ["parse_json"]
source = '''
.hostname = get_env_var!("HOSTNAME")
.region = get_env_var!("REGION", "unknown")
'''
[sinks.elasticsearch]
type = "elasticsearch"
inputs = ["add_metadata"]
endpoint = "http://elasticsearch:9200"
index = "logs-%Y-%m-%d"
Log Rotation and Retention Link to heading
For applications that write logs to files, proper rotation and retention policies are important:
use tracing_appender::rolling::{RollingFileAppender, Rotation};
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
fn configure_file_logging() {
// Create a rolling file appender
let file_appender = RollingFileAppender::new(
Rotation::DAILY,
"/var/log/my-app",
"application.log",
);
// Create a non-blocking writer
let (non_blocking, _guard) = tracing_appender::non_blocking(file_appender);
// Initialize the subscriber
tracing_subscriber::registry()
.with(fmt::layer()
.with_writer(non_blocking)
.with_ansi(false))
.with(EnvFilter::from_default_env())
.init();
// The _guard variable should be kept alive for the duration of the program
// We're intentionally leaking it here, but in a real application you would
// store it in your application state
std::mem::forget(_guard);
}
Machine Learning Integration Link to heading
Integrating machine learning into enterprise applications can provide valuable insights and automation. Rust offers several approaches to ML integration.
Using Rust ML Libraries Directly Link to heading
For simple ML tasks, you can use Rust libraries like linfa
:
use linfa::prelude::*;
use linfa_trees::{DecisionTree, SplitQuality};
use ndarray::{array, Array2};
// Define a simple decision tree model
fn train_decision_tree() -> DecisionTree<f64, bool> {
// Sample data: [feature1, feature2] -> target
let features = Array2::from_shape_vec(
(6, 2),
vec![
1.0, 2.0, // Sample 1
2.0, 3.0, // Sample 2
3.0, 3.0, // Sample 3
2.0, 1.0, // Sample 4
1.0, 1.0, // Sample 5
3.0, 2.0, // Sample 6
],
)
.unwrap();
// Target values (true/false)
let targets = array![true, true, true, false, false, false];
// Create a dataset
let dataset = Dataset::new(features, targets);
// Train a decision tree
DecisionTree::params()
.with_split_quality(SplitQuality::Gini)
.max_depth(3)
.fit(&dataset)
.unwrap()
}
// Use the model for prediction
fn predict_with_model(model: &DecisionTree<f64, bool>, feature1: f64, feature2: f64) -> bool {
let features = array![[feature1, feature2]];
let prediction = model.predict(&features).unwrap();
prediction[0]
}
fn main() {
// Train the model
let model = train_decision_tree();
// Make predictions
let prediction = predict_with_model(&model, 1.5, 2.5);
println!("Prediction: {}", prediction);
// Save the model
let model_bytes = bincode::serialize(&model).unwrap();
std::fs::write("decision_tree.model", model_bytes).unwrap();
// Load the model
let loaded_bytes = std::fs::read("decision_tree.model").unwrap();
let loaded_model: DecisionTree<f64, bool> = bincode::deserialize(&loaded_bytes).unwrap();
// Use the loaded model
let prediction2 = predict_with_model(&loaded_model, 1.5, 2.5);
println!("Prediction with loaded model: {}", prediction2);
}
Integrating with Python ML Ecosystem Link to heading
For more complex ML tasks, you can integrate with Python’s rich ML ecosystem using PyO3:
use pyo3::prelude::*;
use pyo3::types::{PyDict, PyList};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
struct InputData {
feature1: f64,
feature2: f64,
feature3: f64,
}
#[derive(Serialize, Deserialize)]
struct PredictionResult {
prediction: f64,
confidence: f64,
}
// Function to make predictions using a scikit-learn model
fn predict_with_sklearn(data: &InputData) -> PyResult<PredictionResult> {
Python::with_gil(|py| {
// Import Python modules
let pickle = py.import("pickle")?;
let numpy = py.import("numpy")?;
// Load the model from a file
let model_file = std::fs::read("model.pkl")?;
let model = pickle.call_method1("loads", (model_file,))?;
// Convert input data to a numpy array
let features = numpy.call_method1(
"array",
([data.feature1, data.feature2, data.feature3],),
)?;
let features = features.call_method1("reshape", (1, -1))?;
// Make a prediction
let prediction = model.call_method1("predict", (features,))?;
let prediction_value = prediction.call_method1("item", (0,))?.extract::<f64>()?;
// Get prediction confidence (for some models)
let confidence = model
.call_method1("predict_proba", (features,))?
.call_method1("max", ())?
.extract::<f64>()?;
Ok(PredictionResult {
prediction: prediction_value,
confidence,
})
})
}
// Function to train a model with scikit-learn
fn train_sklearn_model(training_data: Vec<InputData>, targets: Vec<f64>) -> PyResult<()> {
Python::with_gil(|py| {
// Import Python modules
let sklearn = py.import("sklearn.ensemble")?;
let numpy = py.import("numpy")?;
let pickle = py.import("pickle")?;
// Convert training data to numpy arrays
let features_list = PyList::new(
py,
training_data.iter().map(|data| {
vec![data.feature1, data.feature2, data.feature3]
}),
);
let features = numpy.call_method1("array", (features_list,))?;
let targets_list = PyList::new(py, targets);
let targets = numpy.call_method1("array", (targets_list,))?;
// Create and train a random forest model
let model = sklearn.call_method1(
"RandomForestRegressor",
(),
Some(PyDict::new(py).into()),
)?;
model.call_method1("fit", (features, targets))?;
// Save the model to a file
let model_bytes = pickle.call_method1("dumps", (model,))?;
let model_bytes = model_bytes.extract::<Vec<u8>>()?;
std::fs::write("model.pkl", model_bytes)?;
Ok(())
})
}
fn main() -> PyResult<()> {
// Example training data
let training_data = vec![
InputData { feature1: 1.0, feature2: 2.0, feature3: 3.0 },
InputData { feature1: 2.0, feature2: 3.0, feature3: 4.0 },
InputData { feature1: 3.0, feature2: 4.0, feature3: 5.0 },
InputData { feature1: 4.0, feature2: 5.0, feature3: 6.0 },
];
let targets = vec![10.0, 20.0, 30.0, 40.0];
// Train the model
train_sklearn_model(training_data, targets)?;
// Make a prediction
let input = InputData {
feature1: 2.5,
feature2: 3.5,
feature3: 4.5,
};
let result = predict_with_sklearn(&input)?;
println!("Prediction: {}, Confidence: {}", result.prediction, result.confidence);
Ok(())
}
Serving ML Models with Rust Link to heading
For production deployment, you can serve ML models with a Rust web server:
use axum::{
extract::State,
routing::{get, post},
Json, Router,
};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
// Model input and output types
#[derive(Deserialize)]
struct PredictionRequest {
features: Vec<f64>,
}
#[derive(Serialize)]
struct PredictionResponse {
prediction: f64,
confidence: Option<f64>,
}
// Simple model interface
trait Model: Send + Sync {
fn predict(&self, features: &[f64]) -> (f64, Option<f64>);
}
// Example implementation using a pre-trained model
struct RandomForestModel {
// In a real application, this would hold the model data
}
impl RandomForestModel {
fn new() -> Self {
// Load model from file
Self {}
}
}
impl Model for RandomForestModel {
fn predict(&self, features: &[f64]) -> (f64, Option<f64>) {
// In a real application, this would use the model to make a prediction
// For this example, we'll just return a dummy value
let prediction = features.iter().sum::<f64>() / features.len() as f64;
let confidence = Some(0.95);
(prediction, confidence)
}
}
// Application state
struct AppState {
model: Box<dyn Model>,
}
// Prediction endpoint
async fn predict(
State(state): State<Arc<AppState>>,
Json(request): Json<PredictionRequest>,
) -> Json<PredictionResponse> {
let (prediction, confidence) = state.model.predict(&request.features);
Json(PredictionResponse {
prediction,
confidence,
})
}
// Health check endpoint
async fn health_check() -> &'static str {
"OK"
}
#[tokio::main]
async fn main() {
// Initialize the model
let model: Box<dyn Model> = Box::new(RandomForestModel::new());
// Create application state
let state = Arc::new(AppState { model });
// Build our application
let app = Router::new()
.route("/predict", post(predict))
.route("/health", get(health_check))
.with_state(state);
// Run our server
let addr = "0.0.0.0:3000";
println!("Model serving on http://{}", addr);
axum::Server::bind(&addr.parse().unwrap())
.serve(app.into_make_service())
.await
.unwrap();
}
Feature Engineering and Data Processing Link to heading
Efficient data processing is crucial for ML applications. Rust excels at high-performance data manipulation:
use ndarray::{Array1, Array2, Axis};
use std::collections::HashMap;
// Feature engineering functions
struct FeatureProcessor {
categorical_mappings: HashMap<String, HashMap<String, usize>>,
numerical_means: Vec<f64>,
numerical_stds: Vec<f64>,
}
impl FeatureProcessor {
// Create a new processor from training data
fn new(
categorical_columns: &[Vec<String>],
numerical_columns: &[Vec<f64>],
) -> Self {
// Process categorical features
let mut categorical_mappings = HashMap::new();
for (i, column) in categorical_columns.iter().enumerate() {
let mut mapping = HashMap::new();
let mut unique_values = column.clone();
unique_values.sort();
unique_values.dedup();
for (j, value) in unique_values.iter().enumerate() {
mapping.insert(value.clone(), j);
}
categorical_mappings.insert(format!("cat_{}", i), mapping);
}
// Process numerical features
let mut numerical_means = Vec::new();
let mut numerical_stds = Vec::new();
for column in numerical_columns {
let mean = column.iter().sum::<f64>() / column.len() as f64;
let variance = column.iter()
.map(|&x| (x - mean).powi(2))
.sum::<f64>() / column.len() as f64;
let std_dev = variance.sqrt();
numerical_means.push(mean);
numerical_stds.push(std_dev);
}
Self {
categorical_mappings,
numerical_means,
numerical_stds,
}
}
// Transform new data using the learned mappings
fn transform(
&self,
categorical_features: &[String],
numerical_features: &[f64],
) -> Vec<f64> {
let mut transformed = Vec::new();
// Transform categorical features to one-hot encoding
for (i, feature) in categorical_features.iter().enumerate() {
let mapping = &self.categorical_mappings[&format!("cat_{}", i)];
let index = mapping.get(feature).copied().unwrap_or(0);
// One-hot encoding
for j in 0..mapping.len() {
transformed.push(if j == index { 1.0 } else { 0.0 });
}
}
// Transform numerical features with standardization
for (i, &feature) in numerical_features.iter().enumerate() {
let standardized = (feature - self.numerical_means[i]) / self.numerical_stds[i];
transformed.push(standardized);
}
transformed
}
}
// Example usage
fn main() {
// Training data
let categorical_columns = vec![
vec!["red".to_string(), "blue".to_string(), "green".to_string(), "red".to_string()],
vec!["small".to_string(), "medium".to_string(), "large".to_string(), "medium".to_string()],
];
let numerical_columns = vec![
vec![1.0, 2.0, 3.0, 4.0],
vec![10.0, 20.0, 30.0, 40.0],
];
// Create the processor
let processor = FeatureProcessor::new(&categorical_columns, &numerical_columns);
// Transform a new sample
let new_categorical = vec!["blue".to_string(), "large".to_string()];
let new_numerical = vec![2.5, 25.0];
let transformed = processor.transform(&new_categorical, &new_numerical);
println!("Transformed features: {:?}", transformed);
}
Conclusion Link to heading
In this final post of our series on Enterprise Applications with Rust, we’ve explored essential miscellaneous tools that complement the core components of enterprise systems:
- Observability Tools: Metrics collection with Prometheus, distributed tracing with OpenTelemetry, and health checks for container orchestration systems
- Logging Solutions: Structured logging with tracing, log aggregation with Vector, and proper log rotation and retention policies
- Machine Learning Integration: Using Rust ML libraries directly, integrating with Python’s ML ecosystem, serving ML models with Rust web servers, and efficient feature engineering
These tools are crucial for building robust, maintainable, and insightful enterprise applications. When implementing these components:
- Start with observability from day one: It’s much easier to build observability into your application from the beginning than to add it later
- Use structured logging: Structured logs are easier to search, filter, and analyze
- Consider ML integration carefully: Choose the right approach based on your team’s expertise and the complexity of your ML needs
Throughout this series, we’ve explored how Rust can be used to build enterprise applications, from the core components to the supporting tools. Rust’s emphasis on performance, reliability, and safety makes it an excellent choice for enterprise development, and its growing ecosystem provides the tools needed to build comprehensive solutions.
As you embark on your journey of building enterprise applications with Rust, remember that the best architecture is one that meets your specific requirements while leveraging the strengths of the language and its ecosystem. We hope this series has provided valuable insights and practical guidance for your enterprise Rust development.
Thank you for following along with this series on Enterprise Applications with Rust!