diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 6bd031379..0157a2453 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -13,6 +13,7 @@ dictionary Config { u64 probing_liquidity_limit_multiplier; AnchorChannelsConfig? anchor_channels_config; RouteParametersConfig? route_parameters; + ForwardedPaymentTrackingMode forwarded_payment_tracking_mode; }; dictionary AnchorChannelsConfig { @@ -189,6 +190,12 @@ interface Node { void remove_payment([ByRef]PaymentId payment_id); BalanceDetails list_balances(); sequence list_payments(); + ForwardedPaymentDetails? forwarded_payment([ByRef]ForwardedPaymentId forwarded_payment_id); + sequence list_forwarded_payments(); + ForwardedPaymentTrackingMode forwarded_payment_tracking_mode(); + ChannelForwardingStats? channel_forwarding_stats([ByRef]ChannelId channel_id); + sequence list_channel_forwarding_stats(); + sequence list_channel_pair_forwarding_stats(); sequence list_peers(); sequence list_channels(); NetworkGraph network_graph(); @@ -486,6 +493,12 @@ enum PaymentStatus { "Failed", }; +[Enum] +interface ForwardedPaymentTrackingMode { + Detailed(u64 retention_minutes); + Stats(); +}; + dictionary LSPFeeLimits { u64? max_total_opening_fee_msat; u64? max_proportional_opening_fee_ppm_msat; @@ -507,6 +520,55 @@ dictionary PaymentDetails { u64 latest_update_timestamp; }; +dictionary ForwardedPaymentDetails { + ForwardedPaymentId id; + ChannelId prev_channel_id; + ChannelId next_channel_id; + UserChannelId? prev_user_channel_id; + UserChannelId? next_user_channel_id; + PublicKey? prev_node_id; + PublicKey? next_node_id; + u64? total_fee_earned_msat; + u64? skimmed_fee_msat; + boolean claim_from_onchain_tx; + u64? outbound_amount_forwarded_msat; + u64 forwarded_at_timestamp; +}; + +dictionary ChannelForwardingStats { + ChannelId channel_id; + PublicKey? counterparty_node_id; + u64 inbound_payments_forwarded; + u64 outbound_payments_forwarded; + u64 total_inbound_amount_msat; + u64 total_outbound_amount_msat; + u64 total_fee_earned_msat; + u64 total_skimmed_fee_msat; + u64 onchain_claims_count; + u64 first_forwarded_at_timestamp; + u64 last_forwarded_at_timestamp; +}; + +dictionary ChannelPairForwardingStats { + ChannelPairStatsId id; + ChannelId prev_channel_id; + ChannelId next_channel_id; + u64 bucket_start_timestamp; + PublicKey? prev_node_id; + PublicKey? next_node_id; + u64 payment_count; + u64 total_inbound_amount_msat; + u64 total_outbound_amount_msat; + u64 total_fee_earned_msat; + u64 total_skimmed_fee_msat; + u64 onchain_claims_count; + u64 avg_fee_msat; + u64 avg_inbound_amount_msat; + u64 first_forwarded_at_timestamp; + u64 last_forwarded_at_timestamp; + u64 aggregated_at_timestamp; +}; + dictionary RouteParametersConfig { u64? max_total_routing_fee_msat; u32 max_total_cltv_expiry_delta; @@ -894,6 +956,9 @@ typedef string OfferId; [Custom] typedef string PaymentId; +[Custom] +typedef string ForwardedPaymentId; + [Custom] typedef string PaymentHash; @@ -906,6 +971,9 @@ typedef string PaymentSecret; [Custom] typedef string ChannelId; +[Custom] +typedef string ChannelPairStatsId; + [Custom] typedef string UserChannelId; diff --git a/src/builder.rs b/src/builder.rs index 5d8a5a7a9..dc9fd51b3 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -55,13 +55,20 @@ use crate::fee_estimator::OnchainFeeEstimator; use crate::gossip::GossipSource; use crate::io::sqlite_store::SqliteStore; use crate::io::utils::{ - read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph, + read_channel_forwarding_stats, read_channel_pair_forwarding_stats, read_event_queue, + read_external_pathfinding_scores_from_cache, read_forwarded_payments, read_network_graph, read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_pending_payments, read_scorer, write_node_metrics, }; use crate::io::vss_store::VssStoreBuilder; use crate::io::{ - self, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + self, CHANNEL_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE, + FORWARDED_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + FORWARDED_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, }; @@ -75,9 +82,10 @@ use crate::peer_store::PeerStore; use crate::runtime::{Runtime, RuntimeSpawner}; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ - AsyncPersister, ChainMonitor, ChannelManager, DynStore, DynStoreWrapper, GossipSync, Graph, - KeysManager, MessageRouter, OnionMessenger, PaymentStore, PeerManager, PendingPaymentStore, - Persister, SyncAndAsyncKVStore, + AsyncPersister, ChainMonitor, ChannelForwardingStatsStore, ChannelManager, + ChannelPairForwardingStatsStore, DynStore, DynStoreWrapper, ForwardedPaymentStore, GossipSync, + Graph, KeysManager, MessageRouter, OnionMessenger, PaymentStore, PeerManager, + PendingPaymentStore, Persister, SyncAndAsyncKVStore, }; use crate::wallet::persist::KVStoreWalletPersister; use crate::wallet::Wallet; @@ -1060,14 +1068,23 @@ fn build_with_store_internal( let kv_store_ref = Arc::clone(&kv_store); let logger_ref = Arc::clone(&logger); - let (payment_store_res, node_metris_res, pending_payment_store_res) = - runtime.block_on(async move { - tokio::join!( - read_payments(&*kv_store_ref, Arc::clone(&logger_ref)), - read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)), - read_pending_payments(&*kv_store_ref, Arc::clone(&logger_ref)) - ) - }); + let ( + payment_store_res, + forwarded_payment_store_res, + channel_forwarding_stats_res, + channel_pair_forwarding_stats_res, + node_metris_res, + pending_payment_store_res, + ) = runtime.block_on(async move { + tokio::join!( + read_payments(&*kv_store_ref, Arc::clone(&logger_ref)), + read_forwarded_payments(&*kv_store_ref, Arc::clone(&logger_ref)), + read_channel_forwarding_stats(&*kv_store_ref, Arc::clone(&logger_ref)), + read_channel_pair_forwarding_stats(&*kv_store_ref, Arc::clone(&logger_ref)), + read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)), + read_pending_payments(&*kv_store_ref, Arc::clone(&logger_ref)) + ) + }); // Initialize the status fields. let node_metrics = match node_metris_res { @@ -1096,6 +1113,48 @@ fn build_with_store_internal( }, }; + let forwarded_payment_store = match forwarded_payment_store_res { + Ok(forwarded_payments) => Arc::new(ForwardedPaymentStore::new( + forwarded_payments, + FORWARDED_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + FORWARDED_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + Arc::clone(&kv_store), + Arc::clone(&logger), + )), + Err(e) => { + log_error!(logger, "Failed to read forwarded payment data from store: {}", e); + return Err(BuildError::ReadFailed); + }, + }; + + let channel_forwarding_stats_store = match channel_forwarding_stats_res { + Ok(stats) => Arc::new(ChannelForwardingStatsStore::new( + stats, + CHANNEL_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + CHANNEL_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + Arc::clone(&kv_store), + Arc::clone(&logger), + )), + Err(e) => { + log_error!(logger, "Failed to read channel forwarding stats from store: {}", e); + return Err(BuildError::ReadFailed); + }, + }; + + let channel_pair_forwarding_stats_store = match channel_pair_forwarding_stats_res { + Ok(stats) => Arc::new(ChannelPairForwardingStatsStore::new( + stats, + CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + Arc::clone(&kv_store), + Arc::clone(&logger), + )), + Err(e) => { + log_error!(logger, "Failed to read channel pair forwarding stats from store: {}", e); + return Err(BuildError::ReadFailed); + }, + }; + let (chain_source, chain_tip_opt) = match chain_data_source_config { Some(ChainDataSourceConfig::Esplora { server_url, headers, sync_config }) => { let sync_config = sync_config.unwrap_or(EsploraSyncConfig::default()); @@ -1782,6 +1841,9 @@ fn build_with_store_internal( scorer, peer_store, payment_store, + forwarded_payment_store, + channel_forwarding_stats_store, + channel_pair_forwarding_stats_store, is_running, node_metrics, om_mailbox, diff --git a/src/config.rs b/src/config.rs index 103b74657..7a0fbb990 100644 --- a/src/config.rs +++ b/src/config.rs @@ -21,6 +21,30 @@ use lightning::util::config::{ use crate::logger::LogLevel; +/// The mode used for tracking forwarded payments. +/// +/// This determines how much detail is stored about payment forwarding activity. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub enum ForwardedPaymentTrackingMode { + /// Store individual forwarded payments for the specified retention period (in minutes), + /// then aggregate into channel-pair statistics. + /// + /// Individual payments older than `retention_minutes` are aggregated by channel pair + /// and removed. Set to 0 for unlimited retention. + Detailed { + /// The retention period for individual forwarded payment records, in minutes. + /// Individual payments older than this period are aggregated into channel-pair statistics and removed. + /// Set to 0 for unlimited retention. + retention_minutes: u64, + }, + /// Track only per-channel aggregate statistics without storing individual payment records. + /// + /// This is the default mode. Use this to reduce storage requirements when you only need + /// aggregate metrics like total fees earned per channel. + #[default] + Stats, +} + // Config defaults const DEFAULT_NETWORK: Network = Network::Bitcoin; const DEFAULT_BDK_WALLET_SYNC_INTERVAL_SECS: u64 = 80; @@ -127,9 +151,10 @@ pub(crate) const HRN_RESOLUTION_TIMEOUT_SECS: u64 = 5; /// | `probing_liquidity_limit_multiplier` | 3 | /// | `log_level` | Debug | /// | `anchor_channels_config` | Some(..) | -/// | `route_parameters` | None | +/// | `route_parameters` | None | +/// | `forwarded_payment_tracking_mode` | Detailed | /// -/// See [`AnchorChannelsConfig`] and [`RouteParametersConfig`] for more information regarding their +/// See [`AnchorChannelsConfig`], [`RouteParametersConfig`], and [`ForwardedPaymentTrackingMode`] for more information regarding their /// respective default values. /// /// [`Node`]: crate::Node @@ -192,6 +217,10 @@ pub struct Config { /// **Note:** If unset, default parameters will be used, and you will be able to override the /// parameters on a per-payment basis in the corresponding method calls. pub route_parameters: Option, + /// The mode used for tracking forwarded payments. + /// + /// See [`ForwardedPaymentTrackingMode`] for more information on the available modes. + pub forwarded_payment_tracking_mode: ForwardedPaymentTrackingMode, } impl Default for Config { @@ -206,6 +235,7 @@ impl Default for Config { anchor_channels_config: Some(AnchorChannelsConfig::default()), route_parameters: None, node_alias: None, + forwarded_payment_tracking_mode: ForwardedPaymentTrackingMode::default(), } } } diff --git a/src/event.rs b/src/event.rs index 6f0ed8e09..28d1dc825 100644 --- a/src/event.rs +++ b/src/event.rs @@ -10,6 +10,7 @@ use core::task::{Poll, Waker}; use std::collections::VecDeque; use std::ops::Deref; use std::sync::{Arc, Mutex}; +use std::time::{SystemTime, UNIX_EPOCH}; use bitcoin::blockdata::locktime::absolute::LockTime; use bitcoin::secp256k1::PublicKey; @@ -32,7 +33,7 @@ use lightning_liquidity::lsps2::utils::compute_opening_fee; use lightning_types::payment::{PaymentHash, PaymentPreimage}; use rand::{rng, Rng}; -use crate::config::{may_announce_channel, Config}; +use crate::config::{may_announce_channel, Config, ForwardedPaymentTrackingMode}; use crate::connection::ConnectionManager; use crate::data_store::DataStoreUpdateResult; use crate::fee_estimator::ConfirmationTarget; @@ -45,10 +46,14 @@ use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox; use crate::payment::asynchronous::static_invoice_store::StaticInvoiceStore; use crate::payment::store::{ - PaymentDetails, PaymentDetailsUpdate, PaymentDirection, PaymentKind, PaymentStatus, + ChannelForwardingStats, ForwardedPaymentDetails, ForwardedPaymentId, PaymentDetails, + PaymentDetailsUpdate, PaymentDirection, PaymentKind, PaymentStatus, }; use crate::runtime::Runtime; -use crate::types::{CustomTlvRecord, DynStore, OnionMessenger, PaymentStore, Sweeper, Wallet}; +use crate::types::{ + ChannelForwardingStatsStore, CustomTlvRecord, DynStore, ForwardedPaymentStore, OnionMessenger, + PaymentStore, Sweeper, Wallet, +}; use crate::{ hex_utils, BumpTransactionEventHandler, ChannelManager, Error, Graph, PeerInfo, PeerStore, UserChannelId, @@ -487,6 +492,8 @@ where network_graph: Arc, liquidity_source: Option>>>, payment_store: Arc, + forwarded_payment_store: Arc, + channel_forwarding_stats_store: Arc, peer_store: Arc>, runtime: Arc, logger: L, @@ -506,10 +513,11 @@ where channel_manager: Arc, connection_manager: Arc>, output_sweeper: Arc, network_graph: Arc, liquidity_source: Option>>>, - payment_store: Arc, peer_store: Arc>, - static_invoice_store: Option, onion_messenger: Arc, - om_mailbox: Option>, runtime: Arc, logger: L, - config: Arc, + payment_store: Arc, forwarded_payment_store: Arc, + channel_forwarding_stats_store: Arc, + peer_store: Arc>, static_invoice_store: Option, + onion_messenger: Arc, om_mailbox: Option>, + runtime: Arc, logger: L, config: Arc, ) -> Self { Self { event_queue, @@ -521,6 +529,8 @@ where network_graph, liquidity_source, payment_store, + forwarded_payment_store, + channel_forwarding_stats_store, peer_store, logger, runtime, @@ -1364,9 +1374,106 @@ where .await; } + let prev_channel_id_value = prev_channel_id + .expect("prev_channel_id expected for events generated by LDK versions greater than 0.0.107."); + let next_channel_id_value = next_channel_id + .expect("next_channel_id expected for events generated by LDK versions greater than 0.0.107."); + + let forwarded_at_timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("SystemTime::now() should come after SystemTime::UNIX_EPOCH") + .as_secs(); + + // Calculate inbound amount (outbound + fee) + let inbound_amount_msat = outbound_amount_forwarded_msat + .unwrap_or(0) + .saturating_add(total_fee_earned_msat.unwrap_or(0)); + + // Update per-channel forwarding stats for the inbound channel (prev_channel) + // For new entries, this becomes the initial value; for existing entries, + // these values are used as increments via the to_update() -> update() pattern. + let inbound_stats = ChannelForwardingStats { + channel_id: prev_channel_id_value, + counterparty_node_id: prev_node_id, + inbound_payments_forwarded: 1, + outbound_payments_forwarded: 0, + total_inbound_amount_msat: inbound_amount_msat, + total_outbound_amount_msat: 0, + total_fee_earned_msat: total_fee_earned_msat.unwrap_or(0), + total_skimmed_fee_msat: skimmed_fee_msat.unwrap_or(0), + onchain_claims_count: if claim_from_onchain_tx { 1 } else { 0 }, + first_forwarded_at_timestamp: forwarded_at_timestamp, + last_forwarded_at_timestamp: forwarded_at_timestamp, + }; + self.channel_forwarding_stats_store.insert_or_update(inbound_stats).map_err( + |e| { + log_error!( + self.logger, + "Failed to update inbound channel forwarding stats: {e}" + ); + ReplayEvent() + }, + )?; + + // Update per-channel forwarding stats for the outbound channel (next_channel) + let outbound_stats = ChannelForwardingStats { + channel_id: next_channel_id_value, + counterparty_node_id: next_node_id, + inbound_payments_forwarded: 0, + outbound_payments_forwarded: 1, + total_inbound_amount_msat: 0, + total_outbound_amount_msat: outbound_amount_forwarded_msat.unwrap_or(0), + total_fee_earned_msat: 0, + total_skimmed_fee_msat: 0, + onchain_claims_count: 0, + first_forwarded_at_timestamp: forwarded_at_timestamp, + last_forwarded_at_timestamp: forwarded_at_timestamp, + }; + self.channel_forwarding_stats_store.insert_or_update(outbound_stats).map_err( + |e| { + log_error!( + self.logger, + "Failed to update outbound channel forwarding stats: {e}" + ); + ReplayEvent() + }, + )?; + + // Only store individual forwarded payment details in Detailed mode + match self.config.forwarded_payment_tracking_mode { + ForwardedPaymentTrackingMode::Detailed { .. } => { + // PaymentForwarded does not have a unique id, so we generate a random one here. + let mut id_bytes = [0u8; 32]; + rng().fill(&mut id_bytes); + + let forwarded_payment = ForwardedPaymentDetails { + id: ForwardedPaymentId(id_bytes), + prev_channel_id: prev_channel_id_value, + next_channel_id: next_channel_id_value, + prev_user_channel_id: prev_user_channel_id.map(UserChannelId), + next_user_channel_id: next_user_channel_id.map(UserChannelId), + prev_node_id, + next_node_id, + total_fee_earned_msat, + skimmed_fee_msat, + claim_from_onchain_tx, + outbound_amount_forwarded_msat, + forwarded_at_timestamp, + }; + + self.forwarded_payment_store.insert(forwarded_payment).map_err(|e| { + log_error!(self.logger, "Failed to store forwarded payment: {e}"); + ReplayEvent() + })?; + }, + ForwardedPaymentTrackingMode::Stats => { + // Do not store individual payment details + }, + } + let event = Event::PaymentForwarded { - prev_channel_id: prev_channel_id.expect("prev_channel_id expected for events generated by LDK versions greater than 0.0.107."), - next_channel_id: next_channel_id.expect("next_channel_id expected for events generated by LDK versions greater than 0.0.107."), + prev_channel_id: prev_channel_id_value, + next_channel_id: next_channel_id_value, prev_user_channel_id: prev_user_channel_id.map(UserChannelId), next_user_channel_id: next_user_channel_id.map(UserChannelId), prev_node_id, diff --git a/src/ffi/types.rs b/src/ffi/types.rs index 2a349a967..1e4af49d4 100644 --- a/src/ffi/types.rs +++ b/src/ffi/types.rs @@ -54,7 +54,8 @@ pub use crate::graph::{ChannelInfo, ChannelUpdateInfo, NodeAnnouncementInfo, Nod pub use crate::liquidity::{LSPS1OrderStatus, LSPS2ServiceConfig}; pub use crate::logger::{LogLevel, LogRecord, LogWriter}; pub use crate::payment::store::{ - ConfirmationStatus, LSPFeeLimits, PaymentDirection, PaymentKind, PaymentStatus, + ConfirmationStatus, ForwardedPaymentId, LSPFeeLimits, PaymentDirection, PaymentKind, + PaymentStatus, }; pub use crate::payment::UnifiedPaymentResult; use crate::{hex_utils, SocketAddress, UniffiCustomTypeConverter, UserChannelId}; @@ -722,6 +723,24 @@ impl UniffiCustomTypeConverter for PaymentId { } } +impl UniffiCustomTypeConverter for ForwardedPaymentId { + type Builtin = String; + + fn into_custom(val: Self::Builtin) -> uniffi::Result { + if let Some(bytes_vec) = hex_utils::to_vec(&val) { + let bytes_res = bytes_vec.try_into(); + if let Ok(bytes) = bytes_res { + return Ok(ForwardedPaymentId(bytes)); + } + } + Err(Error::InvalidPaymentId.into()) + } + + fn from_custom(obj: Self) -> Self::Builtin { + hex_utils::to_string(&obj.0) + } +} + impl UniffiCustomTypeConverter for PaymentHash { type Builtin = String; @@ -793,6 +812,27 @@ impl UniffiCustomTypeConverter for ChannelId { } } +use crate::payment::store::ChannelPairStatsId; + +impl UniffiCustomTypeConverter for ChannelPairStatsId { + type Builtin = String; + + fn into_custom(val: Self::Builtin) -> uniffi::Result { + if let Some(hex_vec) = hex_utils::to_vec(&val) { + if hex_vec.len() == 72 { + let mut id = [0u8; 72]; + id.copy_from_slice(&hex_vec[..]); + return Ok(Self(id)); + } + } + Err(Error::InvalidChannelId.into()) // Reuse this error for now + } + + fn from_custom(obj: Self) -> Self::Builtin { + hex_utils::to_string(&obj.0) + } +} + impl UniffiCustomTypeConverter for UserChannelId { type Builtin = String; diff --git a/src/io/mod.rs b/src/io/mod.rs index e080d39f7..8bc270d1f 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -27,6 +27,20 @@ pub(crate) const PEER_INFO_PERSISTENCE_KEY: &str = "peers"; pub(crate) const PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "payments"; pub(crate) const PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; +/// The forwarded payment information will be persisted under this prefix. +pub(crate) const FORWARDED_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "forwarded_payments"; +pub(crate) const FORWARDED_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; + +/// The channel forwarding stats will be persisted under this prefix. +pub(crate) const CHANNEL_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE: &str = + "channel_forwarding_stats"; +pub(crate) const CHANNEL_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; + +/// The channel pair forwarding stats will be persisted under this prefix. +pub(crate) const CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE: &str = + "channel_pair_forwarding_stats"; +pub(crate) const CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; + /// The node metrics will be persisted under this key. pub(crate) const NODE_METRICS_PRIMARY_NAMESPACE: &str = ""; pub(crate) const NODE_METRICS_SECONDARY_NAMESPACE: &str = ""; diff --git a/src/io/utils.rs b/src/io/utils.rs index d2f70377b..40bbf6f80 100644 --- a/src/io/utils.rs +++ b/src/io/utils.rs @@ -42,11 +42,20 @@ use super::*; use crate::chain::ChainSource; use crate::config::WALLET_KEYS_SEED_LEN; use crate::fee_estimator::OnchainFeeEstimator; +use crate::io::{ + CHANNEL_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE, +}; use crate::io::{ NODE_METRICS_KEY, NODE_METRICS_PRIMARY_NAMESPACE, NODE_METRICS_SECONDARY_NAMESPACE, }; use crate::logger::{log_error, LdkLogger, Logger}; -use crate::payment::PendingPaymentDetails; +use crate::payment::{ + ChannelForwardingStats, ChannelPairForwardingStats, ForwardedPaymentDetails, + PendingPaymentDetails, +}; use crate::peer_store::PeerStore; use crate::types::{Broadcaster, DynStore, KeysManager, Sweeper}; use crate::wallet::ser::{ChangeSetDeserWrapper, ChangeSetSerWrapper}; @@ -223,21 +232,17 @@ where }) } -/// Read previously persisted payments information from the store. -pub(crate) async fn read_payments( - kv_store: &DynStore, logger: L, -) -> Result, std::io::Error> +/// Generic helper to read persisted items from a KV store namespace. +async fn read_objects_from_store( + kv_store: &DynStore, logger: L, primary_namespace: &str, secondary_namespace: &str, +) -> Result, std::io::Error> where + T: Readable, L::Target: LdkLogger, { - let mut res = Vec::new(); + let mut stored_keys = KVStore::list(&*kv_store, primary_namespace, secondary_namespace).await?; - let mut stored_keys = KVStore::list( - &*kv_store, - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - ) - .await?; + let mut res = Vec::with_capacity(stored_keys.len()); const BATCH_SIZE: usize = 50; @@ -246,52 +251,44 @@ where // Fill JoinSet with tasks if possible while set.len() < BATCH_SIZE && !stored_keys.is_empty() { if let Some(next_key) = stored_keys.pop() { - let fut = KVStore::read( - &*kv_store, - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - &next_key, - ); + let fut = KVStore::read(&*kv_store, primary_namespace, secondary_namespace, &next_key); set.spawn(fut); debug_assert!(set.len() <= BATCH_SIZE); } } + let type_name = std::any::type_name::(); + while let Some(read_res) = set.join_next().await { // Exit early if we get an IO error. let reader = read_res .map_err(|e| { - log_error!(logger, "Failed to read PaymentDetails: {}", e); + log_error!(logger, "Failed to read {type_name}: {e}"); set.abort_all(); e })? .map_err(|e| { - log_error!(logger, "Failed to read PaymentDetails: {}", e); + log_error!(logger, "Failed to read {type_name}: {e}"); set.abort_all(); e })?; // Refill set for every finished future, if we still have something to do. if let Some(next_key) = stored_keys.pop() { - let fut = KVStore::read( - &*kv_store, - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - &next_key, - ); + let fut = KVStore::read(&*kv_store, primary_namespace, secondary_namespace, &next_key); set.spawn(fut); debug_assert!(set.len() <= BATCH_SIZE); } // Handle result. - let payment = PaymentDetails::read(&mut &*reader).map_err(|e| { - log_error!(logger, "Failed to deserialize PaymentDetails: {}", e); + let item = T::read(&mut &*reader).map_err(|e| { + log_error!(logger, "Failed to deserialize {type_name}: {e}"); std::io::Error::new( std::io::ErrorKind::InvalidData, - "Failed to deserialize PaymentDetails", + format!("Failed to deserialize {type_name}"), ) })?; - res.push(payment); + res.push(item); } debug_assert!(set.is_empty()); @@ -300,6 +297,70 @@ where Ok(res) } +/// Read previously persisted payments information from the store. +pub(crate) async fn read_payments( + kv_store: &DynStore, logger: L, +) -> Result, std::io::Error> +where + L::Target: LdkLogger, +{ + read_objects_from_store( + kv_store, + logger, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + ) + .await +} + +/// Read previously persisted forwarded payments information from the store. +pub(crate) async fn read_forwarded_payments( + kv_store: &DynStore, logger: L, +) -> Result, std::io::Error> +where + L::Target: LdkLogger, +{ + read_objects_from_store( + kv_store, + logger, + FORWARDED_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + FORWARDED_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + ) + .await +} + +/// Read previously persisted channel forwarding stats from the store. +pub(crate) async fn read_channel_forwarding_stats( + kv_store: &DynStore, logger: L, +) -> Result, std::io::Error> +where + L::Target: LdkLogger, +{ + read_objects_from_store( + kv_store, + logger, + CHANNEL_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE, + ) + .await +} + +/// Read previously persisted channel pair forwarding stats from the store. +pub(crate) async fn read_channel_pair_forwarding_stats( + kv_store: &DynStore, logger: L, +) -> Result, std::io::Error> +where + L::Target: LdkLogger, +{ + read_objects_from_store( + kv_store, + logger, + CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE, + ) + .await +} + /// Read `OutputSweeper` state from the store. pub(crate) async fn read_output_sweeper( broadcaster: Arc, fee_estimator: Arc, diff --git a/src/lib.rs b/src/lib.rs index d2222d949..f3aba4bd8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -124,7 +124,8 @@ pub use builder::NodeBuilder as Builder; use chain::ChainSource; use config::{ default_user_config, may_announce_channel, AsyncPaymentsRole, ChannelConfig, Config, - NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL, + ForwardedPaymentTrackingMode, NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL, + RGS_SYNC_INTERVAL, }; use connection::ConnectionManager; pub use error::Error as NodeError; @@ -145,6 +146,7 @@ use lightning::ln::channel_state::{ChannelDetails as LdkChannelDetails, ChannelS use lightning::ln::channelmanager::PaymentId; use lightning::ln::funding::SpliceContribution; use lightning::ln::msgs::SocketAddress; +use lightning::ln::types::ChannelId; use lightning::routing::gossip::NodeAlias; use lightning::util::persist::KVStoreSync; use lightning_background_processor::process_events_async; @@ -152,15 +154,18 @@ use liquidity::{LSPS1Liquidity, LiquiditySource}; use logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; use payment::asynchronous::om_mailbox::OnionMessageMailbox; use payment::asynchronous::static_invoice_store::StaticInvoiceStore; +use payment::store::{aggregate_expired_forwarded_payments, ChannelPairStatsId}; use payment::{ - Bolt11Payment, Bolt12Payment, OnchainPayment, PaymentDetails, SpontaneousPayment, - UnifiedPayment, + Bolt11Payment, Bolt12Payment, ChannelForwardingStats, ChannelPairForwardingStats, + ForwardedPaymentDetails, ForwardedPaymentId, OnchainPayment, PaymentDetails, + SpontaneousPayment, UnifiedPayment, }; use peer_store::{PeerInfo, PeerStore}; use rand::Rng; use runtime::Runtime; use types::{ - Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, DynStore, Graph, + Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelForwardingStatsStore, + ChannelManager, ChannelPairForwardingStatsStore, DynStore, ForwardedPaymentStore, Graph, HRNResolver, KeysManager, OnionMessenger, PaymentStore, PeerManager, Router, Scorer, Sweeper, Wallet, }; @@ -222,6 +227,9 @@ pub struct Node { scorer: Arc>, peer_store: Arc>>, payment_store: Arc, + forwarded_payment_store: Arc, + channel_forwarding_stats_store: Arc, + channel_pair_forwarding_stats_store: Arc, is_running: Arc>, node_metrics: Arc>, om_mailbox: Option>, @@ -264,6 +272,33 @@ impl Node { let chain_source = Arc::clone(&self.chain_source); self.runtime.block_on(async move { chain_source.update_fee_rate_estimates().await })?; + // Check for expired forwarded payments and aggregate them on startup + if let ForwardedPaymentTrackingMode::Detailed { retention_minutes } = + self.config.forwarded_payment_tracking_mode + { + if retention_minutes > 0 { + log_info!(self.logger, "Checking for expired forwarded payments..."); + match aggregate_expired_forwarded_payments( + &self.forwarded_payment_store, + &self.channel_pair_forwarding_stats_store, + retention_minutes, + &self.logger, + ) { + Ok((pair_count, payment_count)) => { + if pair_count > 0 { + log_info!( + self.logger, + "Aggregated {payment_count} payments into {pair_count} channel pairs" + ); + } else { + log_info!(self.logger, "No expired forwarded payments to aggregate"); + } + }, + Err(e) => log_error!(self.logger, "Startup aggregation failed: {e}"), + } + } + } + // Spawn background task continuously syncing onchain, lightning, and fee rate cache. let stop_sync_receiver = self.stop_sender.subscribe(); let chain_source = Arc::clone(&self.chain_source); @@ -549,6 +584,54 @@ impl Node { chain_source.continuously_process_broadcast_queue(stop_tx_bcast).await }); + // Spawn background task for periodic forwarded payment aggregation + if let ForwardedPaymentTrackingMode::Detailed { retention_minutes } = + self.config.forwarded_payment_tracking_mode + { + if retention_minutes > 0 { + let stop_aggregation = self.stop_sender.subscribe(); + let forwarded_payment_store = Arc::clone(&self.forwarded_payment_store); + let channel_pair_stats_store = + Arc::clone(&self.channel_pair_forwarding_stats_store); + let logger = Arc::clone(&self.logger); + + self.runtime.spawn_cancellable_background_task(async move { + let mut interval_ticker = + tokio::time::interval(Duration::from_secs(retention_minutes * 60)); + interval_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let mut stop_aggregation = stop_aggregation; + loop { + tokio::select! { + _ = stop_aggregation.changed() => { + log_trace!(logger, "Stopping forwarded payment aggregation task"); + break; + } + _ = interval_ticker.tick() => { + log_trace!(logger, "Running periodic forwarded payment aggregation"); + match aggregate_expired_forwarded_payments( + &forwarded_payment_store, + &channel_pair_stats_store, + retention_minutes, + &logger, + ) { + Ok((pair_count, payment_count)) if pair_count > 0 => { + log_debug!( + logger, + "Aggregated {} payments into {} channel pairs", + payment_count, + pair_count + ); + }, + Err(e) => log_error!(logger, "Periodic aggregation failed: {}", e), + _ => {}, + } + } + } + } + }); + } + } + let bump_tx_event_handler = Arc::new(BumpTransactionEventHandler::new( Arc::clone(&self.tx_broadcaster), Arc::new(LdkWallet::new(Arc::clone(&self.wallet), Arc::clone(&self.logger))), @@ -573,6 +656,8 @@ impl Node { Arc::clone(&self.network_graph), self.liquidity_source.clone(), Arc::clone(&self.payment_store), + Arc::clone(&self.forwarded_payment_store), + Arc::clone(&self.channel_forwarding_stats_store), Arc::clone(&self.peer_store), static_invoice_store, Arc::clone(&self.onion_messenger), @@ -1214,7 +1299,7 @@ impl Node { /// /// Returns a [`UserChannelId`] allowing to locally keep track of the channel. /// - /// [`AnchorChannelsConfig::per_channel_reserve_sats`]: crate::config::AnchorChannelsConfig::per_channel_reserve_sats + /// [`AnchorChannelsConfig::per_channel_reserve_sats`]: config::AnchorChannelsConfig::per_channel_reserve_sats pub fn open_channel( &self, node_id: PublicKey, address: SocketAddress, channel_amount_sats: u64, push_to_counterparty_msat: Option, channel_config: Option, @@ -1249,7 +1334,7 @@ impl Node { /// /// Returns a [`UserChannelId`] allowing to locally keep track of the channel. /// - /// [`AnchorChannelsConfig::per_channel_reserve_sats`]: crate::config::AnchorChannelsConfig::per_channel_reserve_sats + /// [`AnchorChannelsConfig::per_channel_reserve_sats`]: config::AnchorChannelsConfig::per_channel_reserve_sats pub fn open_announced_channel( &self, node_id: PublicKey, address: SocketAddress, channel_amount_sats: u64, push_to_counterparty_msat: Option, channel_config: Option, @@ -1470,7 +1555,7 @@ impl Node { /// However, if background syncing is disabled (i.e., `background_sync_config` is set to `None`), /// this method must be called manually to keep wallets in sync with the chain state. /// - /// [`EsploraSyncConfig::background_sync_config`]: crate::config::EsploraSyncConfig::background_sync_config + /// [`EsploraSyncConfig::background_sync_config`]: config::EsploraSyncConfig::background_sync_config pub fn sync_wallets(&self) -> Result<(), Error> { if !*self.is_running.read().unwrap() { return Err(Error::NotRunning); @@ -1526,7 +1611,7 @@ impl Node { /// counterparty to broadcast for us (see [`AnchorChannelsConfig::trusted_peers_no_reserve`] /// for more information). /// - /// [`AnchorChannelsConfig::trusted_peers_no_reserve`]: crate::config::AnchorChannelsConfig::trusted_peers_no_reserve + /// [`AnchorChannelsConfig::trusted_peers_no_reserve`]: config::AnchorChannelsConfig::trusted_peers_no_reserve pub fn force_close_channel( &self, user_channel_id: &UserChannelId, counterparty_node_id: PublicKey, reason: Option, @@ -1692,6 +1777,171 @@ impl Node { self.payment_store.list_filter(|_| true) } + /// Retrieve the details of a specific forwarded payment with the given id. + /// + /// **Note:** the identifier is a randomly generated id and not the payment hash or any other + /// identifier tied to the payment itself. + /// + /// **Note:** Individual forwarded payment records are only stored in + /// [`ForwardedPaymentTrackingMode::Detailed`] mode. In [`ForwardedPaymentTrackingMode::Stats`] + /// mode, this will return an empty vector since individual payment records are not stored. + /// In `Detailed` mode, payments are only stored until they are aggregated into statistics + /// based on the configured retention period. + /// + /// Returns `Some` if the forwarded payment was known and `None` otherwise. + pub fn forwarded_payment( + &self, forwarded_payment_id: &ForwardedPaymentId, + ) -> Option { + self.forwarded_payment_store.get(forwarded_payment_id) + } + + /// Retrieves all forwarded payments that match the given predicate. + /// + /// **Note:** Individual forwarded payment records are only stored in + /// [`ForwardedPaymentTrackingMode::Detailed`] mode. In [`ForwardedPaymentTrackingMode::Stats`] + /// mode, this will return an empty vector. In `Detailed` mode, payments are only stored until + /// they are aggregated into statistics based on the configured retention period. + /// + /// For example, to list all forwarded payments that earned at least 1000 msat in fees: + /// ```ignore + /// node.list_forwarded_payments_with_filter(|p| { + /// p.total_fee_earned_msat.unwrap_or(0) >= 1000 + /// }); + /// ``` + /// + /// [`ForwardedPaymentTrackingMode::Detailed`]: ForwardedPaymentTrackingMode::Detailed + /// [`ForwardedPaymentTrackingMode::Stats`]: ForwardedPaymentTrackingMode::Stats + pub fn list_forwarded_payments_with_filter bool>( + &self, f: F, + ) -> Vec { + self.forwarded_payment_store.list_filter(f) + } + + /// Retrieves all forwarded payments. + /// + /// **Note:** Individual forwarded payment records are only stored in + /// [`ForwardedPaymentTrackingMode::Detailed`] mode. In [`ForwardedPaymentTrackingMode::Stats`] + /// mode, this will return an empty vector since individual payment records are not stored. + /// In `Detailed` mode, payments are only stored until they are aggregated into statistics + /// based on the configured retention period. + /// + /// [`ForwardedPaymentTrackingMode::Detailed`]: ForwardedPaymentTrackingMode::Detailed + /// [`ForwardedPaymentTrackingMode::Stats`]: ForwardedPaymentTrackingMode::Stats + pub fn list_forwarded_payments(&self) -> Vec { + self.forwarded_payment_store.list_filter(|_| true) + } + + /// Returns the configured forwarded payment tracking mode. + pub fn forwarded_payment_tracking_mode(&self) -> ForwardedPaymentTrackingMode { + self.config.forwarded_payment_tracking_mode + } + + /// Retrieve the forwarding statistics for a specific channel. + /// + /// Returns `Some` if statistics exist for the given channel and `None` otherwise. + pub fn channel_forwarding_stats( + &self, channel_id: &ChannelId, + ) -> Option { + self.channel_forwarding_stats_store.get(channel_id) + } + + /// Retrieves all channel forwarding statistics. + pub fn list_channel_forwarding_stats(&self) -> Vec { + self.channel_forwarding_stats_store.list_filter(|_| true) + } + + /// Retrieves all channel forwarding statistics that match the given predicate. + /// + /// For example, to list stats for all channels that have earned at least 10000 msat in fees: + /// ```ignore + /// node.list_channel_forwarding_stats_with_filter(|s| { + /// s.total_fee_earned_msat >= 10000 + /// }); + /// ``` + pub fn list_channel_forwarding_stats_with_filter bool>( + &self, f: F, + ) -> Vec { + self.channel_forwarding_stats_store.list_filter(f) + } + + /// Retrieves all aggregated channel pair forwarding statistics. + /// + /// Returns statistics for forwarded payments that have been aggregated by channel pair. + /// These are created when individual forwarded payments expire based on the retention + /// period configured in [`ForwardedPaymentTrackingMode::Detailed`]. + /// + /// **Note:** Channel pair statistics are only created in `Detailed` mode. In + /// [`ForwardedPaymentTrackingMode::Stats`] mode, this will return an empty vector since + /// individual payments are not tracked or aggregated by channel pair in that mode. + /// + /// [`ForwardedPaymentTrackingMode::Detailed`]: ForwardedPaymentTrackingMode::Detailed + /// [`ForwardedPaymentTrackingMode::Stats`]: ForwardedPaymentTrackingMode::Stats + pub fn list_channel_pair_forwarding_stats(&self) -> Vec { + self.channel_pair_forwarding_stats_store.list_filter(|_| true) + } + + /// Retrieves all channel pair forwarding statistics that match the given predicate. + /// + /// **Note:** Channel pair statistics are only created in [`ForwardedPaymentTrackingMode::Detailed`] + /// mode when individual payments are aggregated based on the retention period. In + /// [`ForwardedPaymentTrackingMode::Stats`] mode, this will return an empty vector since + /// individual payments are not tracked or aggregated by channel pair in that mode. + /// + /// For example, to list stats for all channel pairs that have earned at least 50000 msat in fees: + /// ```ignore + /// node.list_channel_pair_forwarding_stats_with_filter(|s| { + /// s.total_fee_earned_msat >= 50000 + /// }); + /// ``` + /// + /// [`ForwardedPaymentTrackingMode::Detailed`]: ForwardedPaymentTrackingMode::Detailed + /// [`ForwardedPaymentTrackingMode::Stats`]: ForwardedPaymentTrackingMode::Stats + pub fn list_channel_pair_forwarding_stats_with_filter< + F: FnMut(&&ChannelPairForwardingStats) -> bool, + >( + &self, f: F, + ) -> Vec { + self.channel_pair_forwarding_stats_store.list_filter(f) + } + + /// Retrieves channel pair forwarding statistics within a specific time range. + /// + /// Returns all bucket entries where `bucket_start_timestamp` falls within `[start_timestamp, end_timestamp)`. + /// + /// Will only be available if [`Node::forwarded_payment_tracking_mode`] returns + /// [`ForwardedPaymentTrackingMode::Detailed`], otherwise an empty [`Vec`] will be returned. + /// + /// [`ForwardedPaymentTrackingMode::Detailed`]: ForwardedPaymentTrackingMode::Detailed + pub fn list_channel_pair_forwarding_stats_in_range( + &self, start_timestamp: u64, end_timestamp: u64, + ) -> Vec { + self.channel_pair_forwarding_stats_store.list_filter(|stats| { + stats.bucket_start_timestamp >= start_timestamp + && stats.bucket_start_timestamp < end_timestamp + }) + } + + /// Retrieves all forwarding statistics buckets for a specific channel pair. + /// + /// Returns all time buckets for the given inbound→outbound channel pair, + /// ordered by bucket timestamp. + /// + /// Will only be available if [`Node::forwarded_payment_tracking_mode`] returns + /// [`ForwardedPaymentTrackingMode::Detailed`], otherwise an empty [`Vec`] will be returned. + /// + /// [`ForwardedPaymentTrackingMode::Detailed`]: ForwardedPaymentTrackingMode::Detailed + pub fn list_channel_pair_forwarding_stats_for_pair( + &self, prev_channel_id: ChannelId, next_channel_id: ChannelId, + ) -> Vec { + let mut results = self.channel_pair_forwarding_stats_store.list_filter(|stats| { + stats.prev_channel_id == prev_channel_id && stats.next_channel_id == next_channel_id + }); + + // Sort by bucket timestamp for chronological ordering + results.sort_by_key(|stats| stats.bucket_start_timestamp); + results + } + /// Retrieves a list of known peers. pub fn list_peers(&self) -> Vec { let mut peers = Vec::new(); @@ -1878,3 +2128,271 @@ pub(crate) fn total_anchor_channels_reserve_sats( * anchor_channels_config.per_channel_reserve_sats }) } + +/// Aggregates multiple channel pair statistics buckets into cumulative totals. +/// +/// This is useful for calculating cumulative statistics across multiple time buckets. +/// All buckets must be for the same channel pair (prev_channel_id, next_channel_id). +/// +/// # Arguments +/// * `buckets` - Slice of statistics buckets to aggregate (must all be for same channel pair) +/// +/// # Returns +/// A single [`ChannelPairForwardingStats`] with: +/// - Summed payment counts, amounts, and fees +/// - Earliest `first_forwarded_at_timestamp` and latest `last_forwarded_at_timestamp` +/// - Recalculated averages based on total payment count +/// - `bucket_start_timestamp` set to the earliest bucket's timestamp +/// - `aggregated_at_timestamp` set to current time +/// +/// # Panics +/// Panics if buckets is empty or if buckets contain different channel pairs. +pub fn aggregate_channel_pair_stats( + buckets: &[ChannelPairForwardingStats], +) -> ChannelPairForwardingStats { + assert!(!buckets.is_empty(), "Cannot aggregate empty bucket list"); + + // Verify all buckets are for the same channel pair + let first = &buckets[0]; + for bucket in &buckets[1..] { + assert_eq!( + bucket.prev_channel_id, first.prev_channel_id, + "All buckets must have the same prev_channel_id" + ); + assert_eq!( + bucket.next_channel_id, first.next_channel_id, + "All buckets must have the same next_channel_id" + ); + } + + // Aggregate values + let mut payment_count = 0u64; + let mut total_inbound_amount_msat = 0u64; + let mut total_outbound_amount_msat = 0u64; + let mut total_fee_earned_msat = 0u64; + let mut total_skimmed_fee_msat = 0u64; + let mut onchain_claims_count = 0u64; + let mut first_forwarded_at_timestamp = u64::MAX; + let mut last_forwarded_at_timestamp = 0u64; + let mut earliest_bucket_start = u64::MAX; + + for bucket in buckets { + payment_count += bucket.payment_count; + total_inbound_amount_msat += bucket.total_inbound_amount_msat; + total_outbound_amount_msat += bucket.total_outbound_amount_msat; + total_fee_earned_msat += bucket.total_fee_earned_msat; + total_skimmed_fee_msat += bucket.total_skimmed_fee_msat; + onchain_claims_count += bucket.onchain_claims_count; + first_forwarded_at_timestamp = + first_forwarded_at_timestamp.min(bucket.first_forwarded_at_timestamp); + last_forwarded_at_timestamp = + last_forwarded_at_timestamp.max(bucket.last_forwarded_at_timestamp); + earliest_bucket_start = earliest_bucket_start.min(bucket.bucket_start_timestamp); + } + + // Calculate averages + let avg_fee_msat = if payment_count > 0 { total_fee_earned_msat / payment_count } else { 0 }; + let avg_inbound_amount_msat = + if payment_count > 0 { total_inbound_amount_msat / payment_count } else { 0 }; + + let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); + + // Create aggregated ID using earliest bucket timestamp + let aggregated_id = ChannelPairStatsId::from_channel_pair_and_bucket( + &first.prev_channel_id, + &first.next_channel_id, + earliest_bucket_start, + ); + + ChannelPairForwardingStats { + id: aggregated_id, + prev_channel_id: first.prev_channel_id, + next_channel_id: first.next_channel_id, + bucket_start_timestamp: earliest_bucket_start, + prev_node_id: first.prev_node_id, + next_node_id: first.next_node_id, + payment_count, + total_inbound_amount_msat, + total_outbound_amount_msat, + total_fee_earned_msat, + total_skimmed_fee_msat, + onchain_claims_count, + avg_fee_msat, + avg_inbound_amount_msat, + first_forwarded_at_timestamp, + last_forwarded_at_timestamp, + aggregated_at_timestamp: now, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use lightning::ln::types::ChannelId; + + #[test] + fn test_aggregate_channel_pair_stats() { + let prev_channel = ChannelId([1u8; 32]); + let next_channel = ChannelId([2u8; 32]); + + // Create 3 buckets with different statistics + let bucket1 = ChannelPairForwardingStats { + id: ChannelPairStatsId::from_channel_pair_and_bucket( + &prev_channel, + &next_channel, + 1738800000, + ), + prev_channel_id: prev_channel, + next_channel_id: next_channel, + bucket_start_timestamp: 1738800000, + prev_node_id: None, + next_node_id: None, + payment_count: 10, + total_inbound_amount_msat: 110000, + total_outbound_amount_msat: 100000, + total_fee_earned_msat: 10000, + total_skimmed_fee_msat: 1000, + onchain_claims_count: 2, + avg_fee_msat: 1000, + avg_inbound_amount_msat: 11000, + first_forwarded_at_timestamp: 1738800000, + last_forwarded_at_timestamp: 1738801000, + aggregated_at_timestamp: 1738802000, + }; + + let bucket2 = ChannelPairForwardingStats { + id: ChannelPairStatsId::from_channel_pair_and_bucket( + &prev_channel, + &next_channel, + 1738803600, + ), + prev_channel_id: prev_channel, + next_channel_id: next_channel, + bucket_start_timestamp: 1738803600, + prev_node_id: None, + next_node_id: None, + payment_count: 5, + total_inbound_amount_msat: 55000, + total_outbound_amount_msat: 50000, + total_fee_earned_msat: 5000, + total_skimmed_fee_msat: 500, + onchain_claims_count: 1, + avg_fee_msat: 1000, + avg_inbound_amount_msat: 11000, + first_forwarded_at_timestamp: 1738803600, + last_forwarded_at_timestamp: 1738804000, + aggregated_at_timestamp: 1738805000, + }; + + let bucket3 = ChannelPairForwardingStats { + id: ChannelPairStatsId::from_channel_pair_and_bucket( + &prev_channel, + &next_channel, + 1738807200, + ), + prev_channel_id: prev_channel, + next_channel_id: next_channel, + bucket_start_timestamp: 1738807200, + prev_node_id: None, + next_node_id: None, + payment_count: 15, + total_inbound_amount_msat: 165000, + total_outbound_amount_msat: 150000, + total_fee_earned_msat: 15000, + total_skimmed_fee_msat: 1500, + onchain_claims_count: 3, + avg_fee_msat: 1000, + avg_inbound_amount_msat: 11000, + first_forwarded_at_timestamp: 1738807200, + last_forwarded_at_timestamp: 1738808000, + aggregated_at_timestamp: 1738809000, + }; + + // Aggregate the buckets + let buckets = vec![bucket1, bucket2, bucket3]; + let aggregated = aggregate_channel_pair_stats(&buckets); + + // Verify aggregated results + assert_eq!(aggregated.payment_count, 30); // 10 + 5 + 15 + assert_eq!(aggregated.total_inbound_amount_msat, 330000); // 110000 + 55000 + 165000 + assert_eq!(aggregated.total_outbound_amount_msat, 300000); // 100000 + 50000 + 150000 + assert_eq!(aggregated.total_fee_earned_msat, 30000); // 10000 + 5000 + 15000 + assert_eq!(aggregated.total_skimmed_fee_msat, 3000); // 1000 + 500 + 1500 + assert_eq!(aggregated.onchain_claims_count, 6); // 2 + 1 + 3 + + // Verify averages are recalculated + assert_eq!(aggregated.avg_fee_msat, 1000); // 30000 / 30 + assert_eq!(aggregated.avg_inbound_amount_msat, 11000); // 330000 / 30 + + // Verify timestamps + assert_eq!(aggregated.first_forwarded_at_timestamp, 1738800000); // Earliest + assert_eq!(aggregated.last_forwarded_at_timestamp, 1738808000); // Latest + assert_eq!(aggregated.bucket_start_timestamp, 1738800000); // Earliest bucket + + // Verify channel pair is preserved + assert_eq!(aggregated.prev_channel_id, prev_channel); + assert_eq!(aggregated.next_channel_id, next_channel); + } + + #[test] + #[should_panic(expected = "Cannot aggregate empty bucket list")] + fn test_aggregate_channel_pair_stats_empty() { + let buckets: Vec = vec![]; + let _ = aggregate_channel_pair_stats(&buckets); + } + + #[test] + #[should_panic(expected = "All buckets must have the same prev_channel_id")] + fn test_aggregate_channel_pair_stats_different_channels() { + let bucket1 = ChannelPairForwardingStats { + id: ChannelPairStatsId::from_channel_pair_and_bucket( + &ChannelId([1u8; 32]), + &ChannelId([2u8; 32]), + 1738800000, + ), + prev_channel_id: ChannelId([1u8; 32]), + next_channel_id: ChannelId([2u8; 32]), + bucket_start_timestamp: 1738800000, + prev_node_id: None, + next_node_id: None, + payment_count: 10, + total_inbound_amount_msat: 110000, + total_outbound_amount_msat: 100000, + total_fee_earned_msat: 10000, + total_skimmed_fee_msat: 1000, + onchain_claims_count: 2, + avg_fee_msat: 1000, + avg_inbound_amount_msat: 11000, + first_forwarded_at_timestamp: 1738800000, + last_forwarded_at_timestamp: 1738801000, + aggregated_at_timestamp: 1738802000, + }; + + let bucket2 = ChannelPairForwardingStats { + id: ChannelPairStatsId::from_channel_pair_and_bucket( + &ChannelId([99u8; 32]), // Different channel! + &ChannelId([2u8; 32]), + 1738803600, + ), + prev_channel_id: ChannelId([99u8; 32]), // Different channel! + next_channel_id: ChannelId([2u8; 32]), + bucket_start_timestamp: 1738803600, + prev_node_id: None, + next_node_id: None, + payment_count: 5, + total_inbound_amount_msat: 55000, + total_outbound_amount_msat: 50000, + total_fee_earned_msat: 5000, + total_skimmed_fee_msat: 500, + onchain_claims_count: 1, + avg_fee_msat: 1000, + avg_inbound_amount_msat: 11000, + first_forwarded_at_timestamp: 1738803600, + last_forwarded_at_timestamp: 1738804000, + aggregated_at_timestamp: 1738805000, + }; + + let buckets = vec![bucket1, bucket2]; + let _ = aggregate_channel_pair_stats(&buckets); + } +} diff --git a/src/payment/mod.rs b/src/payment/mod.rs index 42b5aff3b..7f9a8a1be 100644 --- a/src/payment/mod.rs +++ b/src/payment/mod.rs @@ -22,6 +22,8 @@ pub use onchain::OnchainPayment; pub use pending_payment_store::PendingPaymentDetails; pub use spontaneous::SpontaneousPayment; pub use store::{ - ConfirmationStatus, LSPFeeLimits, PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, + ChannelForwardingStats, ChannelPairForwardingStats, ConfirmationStatus, + ForwardedPaymentDetails, ForwardedPaymentId, LSPFeeLimits, PaymentDetails, PaymentDirection, + PaymentKind, PaymentStatus, }; pub use unified::{UnifiedPayment, UnifiedPaymentResult}; diff --git a/src/payment/store.rs b/src/payment/store.rs index 15e94190c..0d5a82597 100644 --- a/src/payment/store.rs +++ b/src/payment/store.rs @@ -5,11 +5,16 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. +use std::collections::HashMap; +use std::fmt::{Debug, Display}; +use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use bitcoin::secp256k1::PublicKey; use bitcoin::{BlockHash, Txid}; use lightning::ln::channelmanager::PaymentId; use lightning::ln::msgs::DecodeError; +use lightning::ln::types::ChannelId; use lightning::offers::offer::OfferId; use lightning::util::ser::{Readable, Writeable}; use lightning::{ @@ -19,8 +24,11 @@ use lightning::{ use lightning_types::payment::{PaymentHash, PaymentPreimage, PaymentSecret}; use lightning_types::string::UntrustedString; -use crate::data_store::{StorableObject, StorableObjectId, StorableObjectUpdate}; +use crate::data_store::{DataStore, StorableObject, StorableObjectId, StorableObjectUpdate}; use crate::hex_utils; +use crate::logger::{log_debug, log_error, LdkLogger, Logger}; +use crate::types::UserChannelId; +use crate::Error; /// Represents a payment. #[derive(Clone, Debug, PartialEq, Eq)] @@ -761,4 +769,914 @@ mod tests { } } } + + #[test] + fn test_bucket_calculation() { + // Test that bucket timestamps are calculated correctly + let retention_minutes = 60; + let bucket_size_secs = retention_minutes * 60; // 3600 seconds + + // Payment at exactly bucket boundary + let timestamp1 = 1738800000; + let bucket1 = (timestamp1 / bucket_size_secs) * bucket_size_secs; + assert_eq!(bucket1, 1738800000); // Should be unchanged + + // Payment 1 second into bucket + let timestamp2 = 1738800001; + let bucket2 = (timestamp2 / bucket_size_secs) * bucket_size_secs; + assert_eq!(bucket2, 1738800000); // Should round down + + // Payment at end of bucket (3599 seconds in) + let timestamp3 = 1738803599; + let bucket3 = (timestamp3 / bucket_size_secs) * bucket_size_secs; + assert_eq!(bucket3, 1738800000); // Should still be in same bucket + + // Payment at start of next bucket + let timestamp4 = 1738803600; + let bucket4 = (timestamp4 / bucket_size_secs) * bucket_size_secs; + assert_eq!(bucket4, 1738803600); // Should be in next bucket + } + + #[test] + fn test_channel_pair_stats_id_with_bucket() { + use lightning::ln::types::ChannelId; + + let prev_channel = ChannelId([1u8; 32]); + let next_channel = ChannelId([2u8; 32]); + let bucket_timestamp = 1738800000u64; + + let id = ChannelPairStatsId::from_channel_pair_and_bucket( + &prev_channel, + &next_channel, + bucket_timestamp, + ); + + // Verify the ID contains all three components + assert_eq!(&id.0[0..32], &prev_channel.0); + assert_eq!(&id.0[32..64], &next_channel.0); + assert_eq!(&id.0[64..72], &bucket_timestamp.to_be_bytes()); + + // Verify different buckets create different IDs + let id2 = ChannelPairStatsId::from_channel_pair_and_bucket( + &prev_channel, + &next_channel, + bucket_timestamp + 3600, + ); + assert_ne!(id, id2); + } + + #[test] + fn test_payments_grouped_into_correct_buckets() { + use lightning::ln::types::ChannelId; + use std::collections::HashMap; + + let retention_minutes = 60; + let bucket_size_secs = retention_minutes * 60; + let base_time = 3600 * 100; // 360000 + + let prev_channel = ChannelId([1u8; 32]); + let next_channel = ChannelId([2u8; 32]); + + // Create test payments across different time buckets + let mut payments = Vec::new(); + + // 3 payments in bucket 1 + for i in 0..3 { + payments.push(( + base_time + i * 1000, + ForwardedPaymentDetails { + id: ForwardedPaymentId([i as u8; 32]), + prev_channel_id: prev_channel, + next_channel_id: next_channel, + prev_user_channel_id: None, + next_user_channel_id: None, + prev_node_id: None, + next_node_id: None, + total_fee_earned_msat: Some(1000), + skimmed_fee_msat: Some(100), + claim_from_onchain_tx: false, + outbound_amount_forwarded_msat: Some(10000), + forwarded_at_timestamp: base_time + i * 1000, + }, + )); + } + + // 2 payments in bucket 2 + for i in 3..5 { + payments.push(( + base_time + bucket_size_secs + (i - 3) * 1000, + ForwardedPaymentDetails { + id: ForwardedPaymentId([i as u8; 32]), + prev_channel_id: prev_channel, + next_channel_id: next_channel, + prev_user_channel_id: None, + next_user_channel_id: None, + prev_node_id: None, + next_node_id: None, + total_fee_earned_msat: Some(2000), + skimmed_fee_msat: Some(200), + claim_from_onchain_tx: false, + outbound_amount_forwarded_msat: Some(20000), + forwarded_at_timestamp: base_time + bucket_size_secs + (i - 3) * 1000, + }, + )); + } + + // Group payments by bucket (simulating aggregation logic) + let mut bucket_groups: HashMap> = HashMap::new(); + for (_, payment) in &payments { + let bucket_start = + (payment.forwarded_at_timestamp / bucket_size_secs) * bucket_size_secs; + bucket_groups.entry(bucket_start).or_insert_with(Vec::new).push(payment); + } + + // Verify we have 2 distinct buckets + assert_eq!(bucket_groups.len(), 2, "Should have 2 distinct buckets"); + + // Verify bucket 1 has 3 payments + let bucket1_start = (base_time / bucket_size_secs) * bucket_size_secs; + assert_eq!( + bucket_groups.get(&bucket1_start).unwrap().len(), + 3, + "Bucket 1 should have 3 payments" + ); + + // Verify bucket 2 has 2 payments + let bucket2_start = ((base_time + bucket_size_secs) / bucket_size_secs) * bucket_size_secs; + assert_eq!( + bucket_groups.get(&bucket2_start).unwrap().len(), + 2, + "Bucket 2 should have 2 payments" + ); + } + + #[test] + fn test_bucket_statistics_calculation() { + use lightning::ln::types::ChannelId; + + let prev_channel = ChannelId([1u8; 32]); + let next_channel = ChannelId([2u8; 32]); + let bucket_timestamp = 1738800000u64; + + // Simulate aggregating 3 payments + let mut total_fee = 0u64; + let mut total_inbound = 0u64; + let mut total_outbound = 0u64; + + for i in 1..=3 { + let fee = 1000 * i; + let outbound = 10000 * i; + let inbound = outbound + fee; + + total_fee += fee; + total_outbound += outbound; + total_inbound += inbound; + } + + let payment_count = 3; + let avg_fee = total_fee / payment_count; + let avg_inbound = total_inbound / payment_count; + + // Verify calculations + assert_eq!(total_fee, 6000); // 1000 + 2000 + 3000 + assert_eq!(total_outbound, 60000); // 10000 + 20000 + 30000 + assert_eq!(total_inbound, 66000); // 11000 + 22000 + 33000 + assert_eq!(avg_fee, 2000); + assert_eq!(avg_inbound, 22000); + + // Create stats entry + let id = ChannelPairStatsId::from_channel_pair_and_bucket( + &prev_channel, + &next_channel, + bucket_timestamp, + ); + + let stats = ChannelPairForwardingStats { + id, + prev_channel_id: prev_channel, + next_channel_id: next_channel, + bucket_start_timestamp: bucket_timestamp, + prev_node_id: None, + next_node_id: None, + payment_count, + total_inbound_amount_msat: total_inbound, + total_outbound_amount_msat: total_outbound, + total_fee_earned_msat: total_fee, + total_skimmed_fee_msat: 0, + onchain_claims_count: 0, + avg_fee_msat: avg_fee, + avg_inbound_amount_msat: avg_inbound, + first_forwarded_at_timestamp: bucket_timestamp, + last_forwarded_at_timestamp: bucket_timestamp + 1000, + aggregated_at_timestamp: bucket_timestamp + 2000, + }; + + assert_eq!(stats.payment_count, 3); + assert_eq!(stats.avg_fee_msat, 2000); + } + + #[test] + fn test_channel_pair_stats_serialization() { + use lightning::ln::types::ChannelId; + use lightning::util::ser::{Readable, Writeable}; + + let prev_channel = ChannelId([5u8; 32]); + let next_channel = ChannelId([6u8; 32]); + let bucket_timestamp = 1738800000u64; + + let id = ChannelPairStatsId::from_channel_pair_and_bucket( + &prev_channel, + &next_channel, + bucket_timestamp, + ); + + let stats = ChannelPairForwardingStats { + id, + prev_channel_id: prev_channel, + next_channel_id: next_channel, + bucket_start_timestamp: bucket_timestamp, + prev_node_id: None, + next_node_id: None, + payment_count: 10, + total_inbound_amount_msat: 100000, + total_outbound_amount_msat: 90000, + total_fee_earned_msat: 10000, + total_skimmed_fee_msat: 1000, + onchain_claims_count: 2, + avg_fee_msat: 1000, + avg_inbound_amount_msat: 10000, + first_forwarded_at_timestamp: bucket_timestamp, + last_forwarded_at_timestamp: bucket_timestamp + 1000, + aggregated_at_timestamp: bucket_timestamp + 2000, + }; + + // Test serialization/deserialization + let encoded = stats.encode(); + let decoded = ChannelPairForwardingStats::read(&mut &encoded[..]).unwrap(); + + assert_eq!(stats, decoded); + assert_eq!(decoded.bucket_start_timestamp, bucket_timestamp); + assert_eq!(decoded.id.0[64..72], bucket_timestamp.to_be_bytes()); + } +} + +/// A unique identifier for a forwarded payment. +/// +/// This will be a randomly generated 32-byte identifier. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct ForwardedPaymentId(pub [u8; 32]); + +impl StorableObjectId for ForwardedPaymentId { + fn encode_to_hex_str(&self) -> String { + hex_utils::to_string(&self.0) + } +} + +impl Writeable for ForwardedPaymentId { + fn write( + &self, writer: &mut W, + ) -> Result<(), lightning::io::Error> { + self.0.write(writer) + } +} + +impl Readable for ForwardedPaymentId { + fn read(reader: &mut R) -> Result { + Ok(Self(Readable::read(reader)?)) + } +} + +/// Details of a payment that has been forwarded through this node. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ForwardedPaymentDetails { + /// A unique identifier for this forwarded payment. + pub id: ForwardedPaymentId, + /// The channel id of the incoming channel between the previous node and us. + pub prev_channel_id: ChannelId, + /// The channel id of the outgoing channel between the next node and us. + pub next_channel_id: ChannelId, + /// The `user_channel_id` of the incoming channel between the previous node and us. + /// + /// This is only None for events generated or serialized by versions prior to 0.3.0. + pub prev_user_channel_id: Option, + /// The `user_channel_id` of the outgoing channel between the next node and us. + /// + /// This will be `None` if the payment was settled via an on-chain transaction or if the + /// event was generated or serialized by versions prior to 0.3.0. + pub next_user_channel_id: Option, + /// The node id of the previous node. + pub prev_node_id: Option, + /// The node id of the next node. + pub next_node_id: Option, + /// The total fee, in milli-satoshis, which was earned as a result of the payment. + /// + /// Note that if we force-closed the channel over which we forwarded an HTLC while the HTLC + /// was pending, the amount the next hop claimed will have been rounded down to the nearest + /// whole satoshi. Thus, the fee calculated here may be higher than expected as we still + /// claimed the full value in millisatoshis from the source. + /// + /// If the channel which sent us the payment has been force-closed, we will claim the funds + /// via an on-chain transaction. In that case we do not yet know the on-chain transaction + /// fees which we will spend and will instead set this to `None`. It is possible duplicate + /// `PaymentForwarded` events are generated for the same payment iff `total_fee_earned_msat` + /// is `None`. + pub total_fee_earned_msat: Option, + /// The share of the total fee, in milli-satoshis, which was withheld in addition to the + /// forwarding fee. + /// + /// This will be `None` if no fee was skimmed from the forwarded HTLC. + pub skimmed_fee_msat: Option, + /// If this is `true`, the forwarded HTLC was claimed by our counterparty via an on-chain + /// transaction. + pub claim_from_onchain_tx: bool, + /// The final amount forwarded, in milli-satoshis, after the fee is deducted. + /// + /// The caveat described above the total_fee_earned_msat field applies here as well. + pub outbound_amount_forwarded_msat: Option, + /// The timestamp, in seconds since start of the UNIX epoch, when the payment was forwarded. + pub forwarded_at_timestamp: u64, +} + +impl_writeable_tlv_based!(ForwardedPaymentDetails, { + (0, id, required), + (2, prev_channel_id, required), + (4, next_channel_id, required), + (6, prev_user_channel_id, option), + (8, next_user_channel_id, option), + (10, prev_node_id, option), + (12, next_node_id, option), + (14, total_fee_earned_msat, option), + (16, skimmed_fee_msat, option), + (18, claim_from_onchain_tx, required), + (20, outbound_amount_forwarded_msat, option), + (22, forwarded_at_timestamp, required), +}); + +/// A no-op update type for [`ForwardedPaymentDetails`]. +/// +/// Forwarded payments are immutable once stored, so updates are not supported. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub(crate) struct ForwardedPaymentDetailsUpdate { + id: ForwardedPaymentId, +} + +impl StorableObjectUpdate for ForwardedPaymentDetailsUpdate { + fn id(&self) -> ForwardedPaymentId { + self.id + } +} + +impl StorableObject for ForwardedPaymentDetails { + type Id = ForwardedPaymentId; + type Update = ForwardedPaymentDetailsUpdate; + + fn id(&self) -> Self::Id { + self.id + } + + fn update(&mut self, _update: &Self::Update) -> bool { + // Forwarded payments are immutable, so updates are no-ops. + false + } + + fn to_update(&self) -> Self::Update { + ForwardedPaymentDetailsUpdate { id: self.id } + } +} + +/// Aggregate statistics for forwarded payments through a single channel. +/// +/// Each channel has one stats entry tracking all forwards where it was either +/// the inbound or outbound channel. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ChannelForwardingStats { + /// The channel this stats entry tracks. + pub channel_id: ChannelId, + /// The counterparty node id for this channel. + pub counterparty_node_id: Option, + /// Number of payments forwarded where this was the inbound channel. + pub inbound_payments_forwarded: u64, + /// Number of payments forwarded where this was the outbound channel. + pub outbound_payments_forwarded: u64, + /// Total amount received on this channel for forwarding (msat). + pub total_inbound_amount_msat: u64, + /// Total amount sent on this channel for forwarding (msat). + pub total_outbound_amount_msat: u64, + /// Total fees earned from forwards where this was the inbound channel (msat). + pub total_fee_earned_msat: u64, + /// Total skimmed fees (msat). + pub total_skimmed_fee_msat: u64, + /// Number of forwards claimed via onchain tx. + pub onchain_claims_count: u64, + /// Timestamp of first forward through this channel. + pub first_forwarded_at_timestamp: u64, + /// Timestamp of most recent forward through this channel. + pub last_forwarded_at_timestamp: u64, +} + +impl_writeable_tlv_based!(ChannelForwardingStats, { + (0, channel_id, required), + (2, counterparty_node_id, option), + (4, inbound_payments_forwarded, required), + (6, outbound_payments_forwarded, required), + (8, total_inbound_amount_msat, required), + (10, total_outbound_amount_msat, required), + (12, total_fee_earned_msat, required), + (14, total_skimmed_fee_msat, required), + (16, onchain_claims_count, required), + (18, first_forwarded_at_timestamp, required), + (20, last_forwarded_at_timestamp, required), +}); + +/// Channel pair identifier (prev_channel -> next_channel -> time bucket). +/// Formed by concatenating prev_channel_id, next_channel_id, and bucket_start_timestamp. +#[derive(Clone, Copy, PartialEq, Eq, Hash)] +pub struct ChannelPairStatsId(pub [u8; 72]); + +impl ChannelPairStatsId { + /// Create ID by concatenating prev and next channel IDs with bucket timestamp. + pub fn from_channel_pair_and_bucket( + prev: &ChannelId, next: &ChannelId, bucket_start_timestamp: u64, + ) -> Self { + let mut result = [0u8; 72]; + result[0..32].copy_from_slice(&prev.0); + result[32..64].copy_from_slice(&next.0); + result[64..72].copy_from_slice(&bucket_start_timestamp.to_be_bytes()); + Self(result) + } +} + +impl Writeable for ChannelPairStatsId { + fn write( + &self, writer: &mut W, + ) -> Result<(), lightning::io::Error> { + writer.write_all(&self.0) + } +} + +impl Readable for ChannelPairStatsId { + fn read(reader: &mut R) -> Result { + let mut bytes = [0u8; 72]; + reader.read_exact(&mut bytes)?; + Ok(Self(bytes)) + } +} + +impl Display for ChannelPairStatsId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", hex_utils::to_string(&self.0)) + } +} + +impl Debug for ChannelPairStatsId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "ChannelPairStatsId({})", hex_utils::to_string(&self.0)) + } +} + +impl StorableObjectId for ChannelPairStatsId { + fn encode_to_hex_str(&self) -> String { + hex_utils::to_string(&self.0) + } +} + +/// Aggregated statistics for a specific channel pair. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ChannelPairForwardingStats { + /// The unique identifier for this channel pair (derived from channel IDs and bucket timestamp). + pub id: ChannelPairStatsId, + /// The previous (inbound) channel ID. + pub prev_channel_id: ChannelId, + /// The next (outbound) channel ID. + pub next_channel_id: ChannelId, + /// Start of the time bucket (seconds since UNIX epoch). + pub bucket_start_timestamp: u64, + /// The previous (inbound) counterparty node id. + pub prev_node_id: Option, + /// The next (outbound) counterparty node id. + pub next_node_id: Option, + /// Number of payments forwarded through this channel pair. + pub payment_count: u64, + /// Total amount received on the inbound channel (msat). + pub total_inbound_amount_msat: u64, + /// Total amount sent on the outbound channel (msat). + pub total_outbound_amount_msat: u64, + /// Total fees earned from this channel pair (msat). + pub total_fee_earned_msat: u64, + /// Total skimmed fees (msat). + pub total_skimmed_fee_msat: u64, + /// Number of forwards claimed via onchain tx. + pub onchain_claims_count: u64, + /// Average fee per payment (msat). + pub avg_fee_msat: u64, + /// Average inbound amount per payment (msat). + pub avg_inbound_amount_msat: u64, + /// Timestamp of first forward through this channel pair. + pub first_forwarded_at_timestamp: u64, + /// Timestamp of most recent forward through this channel pair. + pub last_forwarded_at_timestamp: u64, + /// Timestamp when this entry was aggregated from individual payments. + pub aggregated_at_timestamp: u64, +} + +impl_writeable_tlv_based!(ChannelPairForwardingStats, { + (0, id, required), + (2, prev_channel_id, required), + (4, next_channel_id, required), + (6, prev_node_id, option), + (8, next_node_id, option), + (10, payment_count, required), + (12, total_inbound_amount_msat, required), + (14, total_outbound_amount_msat, required), + (16, total_fee_earned_msat, required), + (18, total_skimmed_fee_msat, required), + (20, onchain_claims_count, required), + (22, avg_fee_msat, required), + (24, avg_inbound_amount_msat, required), + (26, first_forwarded_at_timestamp, required), + (28, last_forwarded_at_timestamp, required), + (30, aggregated_at_timestamp, required), + (32, bucket_start_timestamp, required), +}); + +/// Update type for [`ChannelForwardingStats`] that supports incrementing counters. +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct ChannelForwardingStatsUpdate { + /// The channel ID being updated. + pub channel_id: ChannelId, + /// The counterparty node id (used when creating new entry). + pub counterparty_node_id: Option, + /// Increment for inbound payment count. + pub inbound_payments_increment: u64, + /// Increment for outbound payment count. + pub outbound_payments_increment: u64, + /// Increment for total inbound amount. + pub inbound_amount_increment_msat: u64, + /// Increment for total outbound amount. + pub outbound_amount_increment_msat: u64, + /// Increment for total fee earned. + pub fee_earned_increment_msat: u64, + /// Increment for skimmed fee. + pub skimmed_fee_increment_msat: u64, + /// Increment for onchain claims count. + pub onchain_claims_increment: u64, + /// Current timestamp for updating first/last timestamps. + pub timestamp: u64, +} + +impl StorableObjectUpdate for ChannelForwardingStatsUpdate { + fn id(&self) -> ChannelId { + self.channel_id + } +} + +impl StorableObjectId for ChannelId { + fn encode_to_hex_str(&self) -> String { + hex_utils::to_string(&self.0) + } +} + +impl StorableObject for ChannelForwardingStats { + type Id = ChannelId; + type Update = ChannelForwardingStatsUpdate; + + fn id(&self) -> Self::Id { + self.channel_id + } + + fn update(&mut self, update: &Self::Update) -> bool { + debug_assert_eq!( + self.channel_id, update.channel_id, + "We should only ever update stats for the same channel id" + ); + + let mut updated = false; + + // Update counterparty if not already set + if self.counterparty_node_id.is_none() && update.counterparty_node_id.is_some() { + self.counterparty_node_id = update.counterparty_node_id; + updated = true; + } + + // Increment counters + if update.inbound_payments_increment > 0 { + self.inbound_payments_forwarded += update.inbound_payments_increment; + updated = true; + } + if update.outbound_payments_increment > 0 { + self.outbound_payments_forwarded += update.outbound_payments_increment; + updated = true; + } + if update.inbound_amount_increment_msat > 0 { + self.total_inbound_amount_msat += update.inbound_amount_increment_msat; + updated = true; + } + if update.outbound_amount_increment_msat > 0 { + self.total_outbound_amount_msat += update.outbound_amount_increment_msat; + updated = true; + } + if update.fee_earned_increment_msat > 0 { + self.total_fee_earned_msat += update.fee_earned_increment_msat; + updated = true; + } + if update.skimmed_fee_increment_msat > 0 { + self.total_skimmed_fee_msat += update.skimmed_fee_increment_msat; + updated = true; + } + if update.onchain_claims_increment > 0 { + self.onchain_claims_count += update.onchain_claims_increment; + updated = true; + } + + // Update timestamps + if updated { + self.first_forwarded_at_timestamp = + self.first_forwarded_at_timestamp.min(update.timestamp); + self.last_forwarded_at_timestamp = + self.last_forwarded_at_timestamp.max(update.timestamp); + } + + updated + } + + fn to_update(&self) -> Self::Update { + // This creates an update representing the current state as increments. + // This is primarily used for insert_or_update behavior. + ChannelForwardingStatsUpdate { + channel_id: self.channel_id, + counterparty_node_id: self.counterparty_node_id, + inbound_payments_increment: self.inbound_payments_forwarded, + outbound_payments_increment: self.outbound_payments_forwarded, + inbound_amount_increment_msat: self.total_inbound_amount_msat, + outbound_amount_increment_msat: self.total_outbound_amount_msat, + fee_earned_increment_msat: self.total_fee_earned_msat, + skimmed_fee_increment_msat: self.total_skimmed_fee_msat, + onchain_claims_increment: self.onchain_claims_count, + timestamp: self.last_forwarded_at_timestamp, + } + } +} + +/// Update type for [`ChannelPairForwardingStats`] that supports incrementing counters. +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct ChannelPairForwardingStatsUpdate { + /// The channel pair ID being updated. + pub id: ChannelPairStatsId, + /// The previous channel ID. + pub prev_channel_id: ChannelId, + /// The next channel ID. + pub next_channel_id: ChannelId, + /// Start of the time bucket (seconds since UNIX epoch). + pub bucket_start_timestamp: u64, + /// The previous node id (used when creating new entry). + pub prev_node_id: Option, + /// The next node id (used when creating new entry). + pub next_node_id: Option, + /// Increment for payment count. + pub payment_count_increment: u64, + /// Increment for total inbound amount. + pub inbound_amount_increment_msat: u64, + /// Increment for total outbound amount. + pub outbound_amount_increment_msat: u64, + /// Increment for total fee earned. + pub fee_earned_increment_msat: u64, + /// Increment for skimmed fee. + pub skimmed_fee_increment_msat: u64, + /// Increment for onchain claims count. + pub onchain_claims_increment: u64, + /// Current timestamp for updating first/last timestamps. + pub timestamp: u64, +} + +impl StorableObjectUpdate for ChannelPairForwardingStatsUpdate { + fn id(&self) -> ChannelPairStatsId { + self.id + } +} + +impl StorableObject for ChannelPairForwardingStats { + type Id = ChannelPairStatsId; + type Update = ChannelPairForwardingStatsUpdate; + + fn id(&self) -> Self::Id { + self.id + } + + fn update(&mut self, update: &Self::Update) -> bool { + debug_assert_eq!( + self.id, update.id, + "We should only ever update stats for the same channel pair id" + ); + + let mut updated = false; + + // Update node ids if not already set + if self.prev_node_id.is_none() && update.prev_node_id.is_some() { + self.prev_node_id = update.prev_node_id; + updated = true; + } + if self.next_node_id.is_none() && update.next_node_id.is_some() { + self.next_node_id = update.next_node_id; + updated = true; + } + + // Increment counters + if update.payment_count_increment > 0 { + self.payment_count += update.payment_count_increment; + updated = true; + } + if update.inbound_amount_increment_msat > 0 { + self.total_inbound_amount_msat += update.inbound_amount_increment_msat; + updated = true; + } + if update.outbound_amount_increment_msat > 0 { + self.total_outbound_amount_msat += update.outbound_amount_increment_msat; + updated = true; + } + if update.fee_earned_increment_msat > 0 { + self.total_fee_earned_msat += update.fee_earned_increment_msat; + updated = true; + } + if update.skimmed_fee_increment_msat > 0 { + self.total_skimmed_fee_msat += update.skimmed_fee_increment_msat; + updated = true; + } + if update.onchain_claims_increment > 0 { + self.onchain_claims_count += update.onchain_claims_increment; + updated = true; + } + + // Update timestamps + if updated { + if self.first_forwarded_at_timestamp == 0 { + self.first_forwarded_at_timestamp = update.timestamp; + } + self.last_forwarded_at_timestamp = update.timestamp; + + // Recalculate averages + if self.payment_count > 0 { + self.avg_fee_msat = self.total_fee_earned_msat / self.payment_count; + self.avg_inbound_amount_msat = self.total_inbound_amount_msat / self.payment_count; + } + } + + updated + } + + fn to_update(&self) -> Self::Update { + // This creates an update representing the current state as increments. + // This is primarily used for insert_or_update behavior. + ChannelPairForwardingStatsUpdate { + id: self.id, + prev_channel_id: self.prev_channel_id, + next_channel_id: self.next_channel_id, + bucket_start_timestamp: self.bucket_start_timestamp, + prev_node_id: self.prev_node_id, + next_node_id: self.next_node_id, + payment_count_increment: self.payment_count, + inbound_amount_increment_msat: self.total_inbound_amount_msat, + outbound_amount_increment_msat: self.total_outbound_amount_msat, + fee_earned_increment_msat: self.total_fee_earned_msat, + skimmed_fee_increment_msat: self.total_skimmed_fee_msat, + onchain_claims_increment: self.onchain_claims_count, + timestamp: self.last_forwarded_at_timestamp, + } + } +} + +/// Aggregate expired forwarded payments into time-bucketed channel pair statistics. +/// +/// Returns (number of buckets created, number of payments aggregated). +pub(crate) fn aggregate_expired_forwarded_payments( + forwarded_payment_store: &DataStore>, + channel_pair_stats_store: &DataStore>, + retention_minutes: u64, logger: &Arc, +) -> Result<(u64, u64), Error> { + let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); + let cutoff = now.saturating_sub(retention_minutes * 60); + let bucket_size_secs = retention_minutes * 60; + + log_debug!( + logger, + "Aggregating forwarded payments older than {retention_minutes} mins (cutoff timestamp: {cutoff})" + ); + + // Get all expired payments + let expired = forwarded_payment_store.list_filter(|p| p.forwarded_at_timestamp < cutoff); + + if expired.is_empty() { + log_debug!(logger, "No expired forwarded payments found"); + return Ok((0, 0)); + } + + log_debug!(logger, "Found {} expired forwarded payments to aggregate", expired.len()); + + // Group by (channel_pair, bucket_start_timestamp) + let mut bucket_groups: HashMap<(ChannelId, ChannelId, u64), Vec<&ForwardedPaymentDetails>> = + HashMap::new(); + + for payment in &expired { + // Calculate which bucket this payment belongs to + let bucket_start = (payment.forwarded_at_timestamp / bucket_size_secs) * bucket_size_secs; + let key = (payment.prev_channel_id, payment.next_channel_id, bucket_start); + bucket_groups.entry(key).or_insert_with(Vec::new).push(payment); + } + + log_debug!(logger, "Grouped into {} time buckets", bucket_groups.len()); + + // Aggregate each bucket + let mut aggregated_bucket_count = 0u64; + let mut removed_payment_count = 0u64; + + for ((prev_channel_id, next_channel_id, bucket_start), payments) in bucket_groups { + debug_assert!(!payments.is_empty(), "Each bucket group should have at least one payment"); + + // Calculate aggregated values + let mut total_inbound_amount_msat = 0u64; + let mut total_outbound_amount_msat = 0u64; + let mut total_fee_earned_msat = 0u64; + let mut total_skimmed_fee_msat = 0u64; + let mut onchain_claims_count = 0u64; + let mut first_timestamp = u64::MAX; + let mut last_timestamp = 0u64; + + // Use first payment for node IDs (they should all be the same for a channel pair) + let first_payment = payments[0]; + let prev_node_id = first_payment.prev_node_id; + let next_node_id = first_payment.next_node_id; + + for payment in &payments { + let outbound = payment.outbound_amount_forwarded_msat.unwrap_or(0); + let fee = payment.total_fee_earned_msat.unwrap_or(0); + let skimmed = payment.skimmed_fee_msat.unwrap_or(0); + + total_inbound_amount_msat = + total_inbound_amount_msat.saturating_add(outbound.saturating_add(fee)); + total_outbound_amount_msat = total_outbound_amount_msat.saturating_add(outbound); + total_fee_earned_msat = total_fee_earned_msat.saturating_add(fee); + total_skimmed_fee_msat = total_skimmed_fee_msat.saturating_add(skimmed); + if payment.claim_from_onchain_tx { + onchain_claims_count += 1; + } + first_timestamp = first_timestamp.min(payment.forwarded_at_timestamp); + last_timestamp = last_timestamp.max(payment.forwarded_at_timestamp); + } + + let payment_count = payments.len() as u64; + let avg_fee_msat = total_fee_earned_msat / payment_count; + let avg_inbound_amount_msat = total_inbound_amount_msat / payment_count; + + let pair_id = ChannelPairStatsId::from_channel_pair_and_bucket( + &prev_channel_id, + &next_channel_id, + bucket_start, + ); + + // Create the bucket stats entry + let stats = ChannelPairForwardingStats { + id: pair_id, + prev_channel_id, + next_channel_id, + bucket_start_timestamp: bucket_start, + prev_node_id, + next_node_id, + payment_count, + total_inbound_amount_msat, + total_outbound_amount_msat, + total_fee_earned_msat, + total_skimmed_fee_msat, + onchain_claims_count, + avg_fee_msat, + avg_inbound_amount_msat, + first_forwarded_at_timestamp: first_timestamp, + last_forwarded_at_timestamp: last_timestamp, + aggregated_at_timestamp: now, + }; + + // Insert the bucket (should be unique - no update needed) + channel_pair_stats_store.insert(stats).map_err(|e| { + log_error!(logger, "Failed to insert channel pair stats bucket for {pair_id:?}: {e}"); + e + })?; + + aggregated_bucket_count += 1; + + // Remove aggregated payments + for payment in payments { + forwarded_payment_store.remove(&payment.id).map_err(|e| { + log_error!(logger, "Failed to remove forwarded payment {:?}: {}", payment.id, e); + e + })?; + removed_payment_count += 1; + } + } + + log_debug!( + logger, + "Successfully aggregated {} payments into {} time buckets", + removed_payment_count, + aggregated_bucket_count + ); + + Ok((aggregated_bucket_count, removed_payment_count)) } diff --git a/src/types.rs b/src/types.rs index b5b1ffed7..3b7d84169 100644 --- a/src/types.rs +++ b/src/types.rs @@ -39,6 +39,9 @@ use crate::data_store::DataStore; use crate::fee_estimator::OnchainFeeEstimator; use crate::logger::Logger; use crate::message_handler::NodeCustomMessageHandler; +use crate::payment::store::{ + ChannelForwardingStats, ChannelPairForwardingStats, ForwardedPaymentDetails, +}; use crate::payment::{PaymentDetails, PendingPaymentDetails}; use crate::runtime::RuntimeSpawner; @@ -320,6 +323,10 @@ pub(crate) type BumpTransactionEventHandler = >; pub(crate) type PaymentStore = DataStore>; +pub(crate) type ForwardedPaymentStore = DataStore>; +pub(crate) type ChannelForwardingStatsStore = DataStore>; +pub(crate) type ChannelPairForwardingStatsStore = + DataStore>; /// A local, potentially user-provided, identifier of a channel. ///