diff --git a/Cargo.lock b/Cargo.lock index aa0e5db..c1d0d6b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2011,6 +2011,7 @@ name = "fmcd" version = "0.1.0" dependencies = [ "anyhow", + "async-stream", "async-trait", "async-utility", "axum", @@ -2049,6 +2050,7 @@ dependencies = [ "tempfile", "time", "tokio", + "tokio-stream", "toml", "tower 0.4.13", "tower-http 0.5.2", diff --git a/Cargo.toml b/Cargo.toml index 28dd66b..a331485 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,6 +63,8 @@ sha2 = "0.10" toml = "0.8" futures = "0.3" +async-stream = "0.3" +tokio-stream = "0.1" metrics = { version = "0.23", default-features = false } metrics-exporter-prometheus = { version = "0.15.3", default-features = false } console = "0.15" diff --git a/README.md b/README.md index 7f7cd0f..4b048e2 100644 --- a/README.md +++ b/README.md @@ -56,9 +56,9 @@ curl http://localhost:7070/v2/admin/info \ - `/v2/admin/info`: Display wallet info (holdings, tiers). - `/v2/admin/backup`: Upload the (encrypted) snapshot of mint notes to federation. -- `/v2/admin/discover-version`: Discover the common api version to use to communicate with the federation. +- `/v2/admin/version`: Discover the common api version to use to communicate with the federation. - `/v2/admin/restore`: Restore the previously created backup of mint notes (with `backup` command). -- `/v2/admin/list-operations`: List operations. +- `/v2/admin/operations`: List operations. - `/v2/admin/module`: Call a module subcommand. - `/v2/admin/config`: Returns the client config. @@ -73,11 +73,8 @@ curl http://localhost:7070/v2/admin/info \ ### Lightning network related commands: - `/v2/ln/invoice`: Create a lightning invoice to receive payment via gateway. -- `/v2/ln/await-invoice`: Wait for incoming invoice to be paid. - `/v2/ln/pay`: Pay a lightning invoice or lnurl via a gateway. -- `/v2/ln/await-pay`: Wait for a lightning payment to complete. -- `/v2/ln/list-gateways`: List registered gateways. -- `/v2/ln/switch-gateway`: Switch active gateway. +- `/v2/ln/gateways`: List registered gateways. ### Onchain related commands: @@ -197,11 +194,8 @@ docker logs fmcd | grep "Generating" Build your own OCI image using Nix: ```bash -# Build the OCI container -nix build .#oci - -# Load into Docker -docker load < ./result +# Build the OCI container and Load into Docker +nix build .#oci && docker load < ./result # Verify the image docker image ls | grep fmcd diff --git a/curl.md b/curl.md index 0880950..1d1b63d 100644 --- a/curl.md +++ b/curl.md @@ -1,34 +1,399 @@ -# Fedimint Clientd Curl Examples +# Fedimint Clientd (fmcd) Curl Examples -## Admin +## Authorization Setup +All requests require Basic Authentication with username `fmcd` and your configured password. -Info: +```bash +# Set your password (replace with your actual password) +FMCD_PASS="bdb056904c8971cedf717265176f99e25f0c43e9f8294c69967b184c3dca768e" +# Create the Basic Auth header (using -w0 to prevent line wrapping) +FMCD_AUTH_TOKEN=$(echo -n "fmcd:$FMCD_PASS" | base64 -w0) + +# Use in requests +curl -H "Authorization: Basic $FMCD_AUTH_TOKEN" ... +``` + +## Common Variables +```bash +# Set these for easier testing +FMCD_URL="http://127.0.0.1:7070" +FEDERATION_ID="15db8cb4f1ec8e484d73b889372bec94812580f929e8148b7437d359af422cd3" +GATEWAY_ID="035f2f7912e0f570841d5c0d8976a40af0dcca5609198436f596e78d2c851ee58a" +``` + +## Admin Endpoints + +### Get Federation Info +```bash +# Get info for all connected federations +curl -X GET "$FMCD_URL/v2/admin/info" \ + -H "Authorization: Basic $FMCD_AUTH_TOKEN" | jq . + +# List federations with balances +curl -X GET "$FMCD_URL/v2/admin/info" \ + -H "Authorization: Basic $FMCD_AUTH_TOKEN" | jq 'to_entries | map({id: .key, name: .value.meta.federation_name, totalAmountMsat: .value.totalAmountMsat})' +``` + +### List Operations +```bash +# Get recent operations for a federation +curl -X POST "$FMCD_URL/v2/admin/operations" \ + -H "Authorization: Basic $FMCD_AUTH_TOKEN" \ + -H "Content-Type: application/json" \ + -d "{ + \"federationId\": \"$FEDERATION_ID\", + \"limit\": 50 + }" | jq +``` + +### Join Federation +```bash +# Join a new federation with an invite code +curl -X POST "$FMCD_URL/v2/admin/join" \ + -H "Authorization: Basic $FMCD_AUTH_TOKEN" \ + -H "Content-Type: application/json" \ + -d '{ + "inviteCode": "fed11qgqrgvnhwden5te0v9k8q6rp9ekh2arfdeukuet595cr2ttpd3jhq6rzve6zuer9wchxvetyd938gcewvdhk6tcqqysptkuvknc7erjgf4em3zfh90kffqf9srujn6q53d6r056qd5apxw6jxgcqqqqqq" + }' | jq +``` + +### List Federations +```bash +# Get list of all connected federations +curl -X GET "$FMCD_URL/v2/admin/federations" \ + -H "Authorization: Basic $FMCD_AUTH_TOKEN" | jq +``` + +### Backup Federation +```bash +# Create a backup of a federation +curl -X POST "$FMCD_URL/v2/admin/backup" \ + -H "Authorization: Basic $FMCD_AUTH_TOKEN" \ + -H "Content-Type: application/json" \ + -d "{ + \"federationId\": \"$FEDERATION_ID\" + }" | jq +``` + +### Restore Federation +```bash +# Restore a federation from backup +curl -X POST "$FMCD_URL/v2/admin/restore" \ + -H "Authorization: Basic $FMCD_AUTH_TOKEN" \ + -H "Content-Type: application/json" \ + -d '{ + "backup": "YOUR_BACKUP_STRING_HERE" + }' | jq +``` + +### Get Version +```bash +# Get fmcd version information +curl -X GET "$FMCD_URL/v2/admin/version" \ + -H "Authorization: Basic $FMCD_AUTH_TOKEN" | jq +``` + +### Get Module Info +```bash +# Get information about federation modules +curl -X POST "$FMCD_URL/v2/admin/module" \ + -H "Authorization: Basic $FMCD_AUTH_TOKEN" \ + -H "Content-Type: application/json" \ + -d "{ + \"federationId\": \"$FEDERATION_ID\", + \"module\": \"ln\" + }" | jq +``` + +### Get Config +```bash +# Get current configuration +curl -X GET "$FMCD_URL/v2/admin/config" \ + -H "Authorization: Basic $FMCD_AUTH_TOKEN" | jq +``` + +## Lightning (LN) Endpoints + +### List Gateways +```bash +# Get available gateways for a federation +curl -X POST "$FMCD_URL/v2/ln/gateways" \ + -H "Authorization: Basic $FMCD_AUTH_TOKEN" \ + -H "Content-Type: application/json" \ + -d "{ + \"federationId\": \"$FEDERATION_ID\" + }" | jq +``` + +### Create Invoice +```bash +# Generate a Lightning invoice +curl -X POST "$FMCD_URL/v2/ln/invoice" \ + -H "Authorization: Basic $FMCD_AUTH_TOKEN" \ + -H "Content-Type: application/json" \ + -d "{ + \"amountMsat\": 1000000, + \"description\": \"Test invoice\", + \"expiryTime\": 3600, + \"gatewayId\": \"$GATEWAY_ID\", + \"federationId\": \"$FEDERATION_ID\" + }" | jq +``` + +### Pay Invoice +```bash +# Pay a Lightning invoice +curl -X POST "$FMCD_URL/v2/ln/pay" \ + -H "Authorization: Basic $FMCD_AUTH_TOKEN" \ + -H "Content-Type: application/json" \ + -d "{ + \"paymentInfo\": \"lnbc100n1p3ehk5...\", + \"gatewayId\": \"$GATEWAY_ID\", + \"federationId\": \"$FEDERATION_ID\" + }" | jq ``` -curl http://localhost:3333/v2/admin/info -H "Authorization: Bearer password" | jq + +### Check Payment Status +```bash +# Get status of a Lightning payment +curl -X POST "$FMCD_URL/v2/ln/status" \ + -H "Authorization: Basic $FMCD_AUTH_TOKEN" \ + -H "Content-Type: application/json" \ + -d "{ + \"operationId\": \"OPERATION_ID_HERE\", + \"federationId\": \"$FEDERATION_ID\" + }" | jq ``` -List Connected Fedimints and their balances: +## On-chain Endpoints +### Get Deposit Address +```bash +# Generate a Bitcoin deposit address +curl -X POST "$FMCD_URL/v2/onchain/deposit-address" \ + -H "Authorization: Basic $FMCD_AUTH_TOKEN" \ + -H "Content-Type: application/json" \ + -d "{ + \"federationId\": \"$FEDERATION_ID\" + }" | jq ``` -curl http://localhost:3333/v2/admin/info -H "Authorization: Bearer password" | jq 'to_entries | map({id: .key, name: .value.meta.federation_name, totalAmountMsat: .value.totalAmountMsat})' + +### Await Deposit +```bash +# Wait for a deposit to be confirmed (with timeout) +curl -X POST "$FMCD_URL/v2/onchain/await-deposit" \ + -H "Authorization: Basic $FMCD_AUTH_TOKEN" \ + -H "Content-Type: application/json" \ + -d "{ + \"operationId\": \"OPERATION_ID_HERE\", + \"federationId\": \"$FEDERATION_ID\", + \"timeout\": 600 + }" | jq ``` -## Mint +### Withdraw to Address +```bash +# Withdraw Bitcoin to an on-chain address +curl -X POST "$FMCD_URL/v2/onchain/withdraw" \ + -H "Authorization: Basic $FMCD_AUTH_TOKEN" \ + -H "Content-Type: application/json" \ + -d "{ + \"address\": \"tb1qexampleaddress...\", + \"amountSats\": 50000, + \"feeRateSatsPerVbyte\": 5, + \"federationId\": \"$FEDERATION_ID\" + }" | jq +``` -## Lightning +## Mint Endpoints -Get a gateway ID for a federation: +### Encode Notes +```bash +# Encode ecash notes +curl -X POST "$FMCD_URL/v2/mint/encode-notes" \ + -H "Authorization: Basic $FMCD_AUTH_TOKEN" \ + -H "Content-Type: application/json" \ + -d "{ + \"notes\": \"NOTES_DATA_HERE\", + \"federationId\": \"$FEDERATION_ID\" + }" | jq +``` +### Decode Notes +```bash +# Decode ecash notes +curl -X POST "$FMCD_URL/v2/mint/decode-notes" \ + -H "Authorization: Basic $FMCD_AUTH_TOKEN" \ + -H "Content-Type: application/json" \ + -d "{ + \"notes\": \"ENCODED_NOTES_HERE\", + \"federationId\": \"$FEDERATION_ID\" + }" | jq ``` -curl -v -X POST http://localhost:3333/v2/ln/list-gateways -H "Authorization: Bearer password" -H "Content-type: application/json" -d '{"federationId" : -"15db8cb4f1ec8e484d73b889372bec94812580f929e8148b7437d359af422cd3"}' + +### Split Notes +```bash +# Split ecash notes into smaller denominations +curl -X POST "$FMCD_URL/v2/mint/split" \ + -H "Authorization: Basic $FMCD_AUTH_TOKEN" \ + -H "Content-Type: application/json" \ + -d "{ + \"notes\": \"NOTES_TO_SPLIT\", + \"amountMsat\": 500000, + \"federationId\": \"$FEDERATION_ID\" + }" | jq ``` -Create an invoice for a federation (using the gateway ID above): +### Combine Notes +```bash +# Combine multiple ecash notes +curl -X POST "$FMCD_URL/v2/mint/combine" \ + -H "Authorization: Basic $FMCD_AUTH_TOKEN" \ + -H "Content-Type: application/json" \ + -d "{ + \"notes\": [\"NOTE1\", \"NOTE2\"], + \"federationId\": \"$FEDERATION_ID\" + }" | jq +``` +### Spend Notes +```bash +# Spend ecash notes +curl -X POST "$FMCD_URL/v2/mint/spend" \ + -H "Authorization: Basic $FMCD_AUTH_TOKEN" \ + -H "Content-Type: application/json" \ + -d "{ + \"amountMsat\": 100000, + \"allowOverpay\": true, + \"timeout\": 60, + \"includeInvite\": false, + \"federationId\": \"$FEDERATION_ID\" + }" | jq ``` -curl -v -X POST http://localhost:3333/v2/ln/invoice -H "Authorization: Bearer password" -H "Content-Type: application/json" -d '{"amountMsat": 1000000, "description": "test", "gatewayId": "035f2f7912e0f570841d5c0d8976a40af0dcca5609198436f596e78d2c851ee58a", "federationId": "15db8cb4f1ec8e484d73b889372bec94812580f929e8148b7437d359af422cd3"}' + +### Validate Notes +```bash +# Validate ecash notes +curl -X POST "$FMCD_URL/v2/mint/validate" \ + -H "Authorization: Basic $FMCD_AUTH_TOKEN" \ + -H "Content-Type: application/json" \ + -d "{ + \"notes\": \"NOTES_TO_VALIDATE\", + \"federationId\": \"$FEDERATION_ID\" + }" | jq +``` + +### Reissue Notes +```bash +# Reissue ecash notes +curl -X POST "$FMCD_URL/v2/mint/reissue" \ + -H "Authorization: Basic $FMCD_AUTH_TOKEN" \ + -H "Content-Type: application/json" \ + -d "{ + \"notes\": \"NOTES_TO_REISSUE\", + \"federationId\": \"$FEDERATION_ID\" + }" | jq +``` + +## WebSocket Examples + +### Connect to WebSocket +```bash +# Using wscat (install with: npm install -g wscat) +wscat -c "ws://localhost:7070/ws" \ + -H "Authorization: Basic $FMCD_AUTH_TOKEN" +``` + +### WebSocket Request Format +Once connected, send JSON requests in this format: +```json +{ + "method": "admin.info", + "params": {}, + "id": 1 +} ``` -## Onchain +### WebSocket Methods +- `admin.info` - Get federation info +- `admin.operations` - List operations +- `admin.join` - Join federation +- `ln.invoice` - Create invoice +- `ln.pay` - Pay invoice +- `ln.gateways` - List gateways +- `onchain.deposit-address` - Get deposit address +- `onchain.withdraw` - Withdraw to address +- `mint.spend` - Spend ecash + +## Testing Examples + +### Complete Invoice Flow +```bash +# 1. Get a gateway +GATEWAY_RESPONSE=$(curl -s -X POST "$FMCD_URL/v2/ln/gateways" \ + -H "Authorization: Basic $FMCD_AUTH_TOKEN" \ + -H "Content-Type: application/json" \ + -d "{\"federationId\": \"$FEDERATION_ID\"}") +GATEWAY_ID=$(echo $GATEWAY_RESPONSE | jq -r '.gateways[0].gatewayId') + +# 2. Create an invoice +INVOICE_RESPONSE=$(curl -s -X POST "$FMCD_URL/v2/ln/invoice" \ + -H "Authorization: Basic $FMCD_AUTH_TOKEN" \ + -H "Content-Type: application/json" \ + -d "{ + \"amountMsat\": 1000000, + \"description\": \"Test payment\", + \"gatewayId\": \"$GATEWAY_ID\", + \"federationId\": \"$FEDERATION_ID\" + }") +INVOICE=$(echo $INVOICE_RESPONSE | jq -r '.invoice') +OPERATION_ID=$(echo $INVOICE_RESPONSE | jq -r '.operationId') + +echo "Invoice: $INVOICE" +echo "Operation ID: $OPERATION_ID" + +# 3. Check payment status +curl -X POST "$FMCD_URL/v2/ln/status" \ + -H "Authorization: Basic $FMCD_AUTH_TOKEN" \ + -H "Content-Type: application/json" \ + -d "{ + \"operationId\": \"$OPERATION_ID\", + \"federationId\": \"$FEDERATION_ID\" + }" | jq +``` + +### Check Balance +```bash +# Get total balance across all federations +curl -s -X GET "$FMCD_URL/v2/admin/info" \ + -H "Authorization: Basic $FMCD_AUTH_TOKEN" | \ + jq '[.[] | .totalAmountMsat] | add | . / 1000 | "Total: \(.) sats"' +``` + +## Error Handling + +Most endpoints will return errors in this format: +```json +{ + "error": "Error message", + "code": "ERROR_CODE", + "details": {} +} +``` + +Common HTTP status codes: +- `200` - Success +- `400` - Bad Request (invalid parameters) +- `401` - Unauthorized (check authentication) +- `404` - Not Found +- `500` - Internal Server Error + +## Tips + +1. Always use the correct `federationId` for your requests +2. Gateway IDs are required for Lightning operations +3. Amounts are typically in millisatoshis (msat) for Lightning, satoshis for on-chain +4. Use `jq` for pretty-printing and parsing JSON responses +5. Set shell variables for frequently used values to avoid repetition +6. Check operation status for async operations like payments and deposits diff --git a/src/auth/basic.rs b/src/auth/basic.rs index 7c36d8a..50df0ec 100644 --- a/src/auth/basic.rs +++ b/src/auth/basic.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use axum::body::Body; -use axum::extract::{Extension, Request}; +use axum::extract::Request; use axum::http::StatusCode; use axum::middleware::Next; use axum::response::Response; @@ -58,7 +58,7 @@ impl BasicAuth { pub async fn basic_auth_middleware_with_events( auth: Arc, event_bus: Arc, - mut request: Request, + request: Request, next: Next, ) -> Result { let method = request.method().to_string(); diff --git a/src/config.rs b/src/config.rs index 2ab5c2b..1985118 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,4 +1,3 @@ -use std::io::Write; use std::path::{Path, PathBuf}; use anyhow::Result; diff --git a/src/database/instrumented.rs b/src/database/instrumented.rs index 72e989f..7488a17 100644 --- a/src/database/instrumented.rs +++ b/src/database/instrumented.rs @@ -1,4 +1,3 @@ -use std::collections::HashMap; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; diff --git a/src/error/categories.rs b/src/error/categories.rs index bd76c38..429fe36 100644 --- a/src/error/categories.rs +++ b/src/error/categories.rs @@ -2,7 +2,6 @@ use std::fmt; use axum::http::StatusCode; use serde::{Deserialize, Serialize}; -use tracing::{error, warn}; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] diff --git a/src/events/handlers/logging.rs b/src/events/handlers/logging.rs index 2bc84dd..946d869 100644 --- a/src/events/handlers/logging.rs +++ b/src/events/handlers/logging.rs @@ -2,9 +2,7 @@ use async_trait::async_trait; use tracing::{debug, error, info, warn}; use crate::events::{EventHandler, FmcdEvent}; -use crate::observability::sanitization::{ - sanitize_invoice, sanitize_payment_hash, sanitize_preimage, -}; +use crate::observability::sanitization::{sanitize_invoice, sanitize_preimage}; /// Event handler that logs all events with appropriate levels and sanitization pub struct LoggingEventHandler { @@ -43,20 +41,18 @@ impl EventHandler for LoggingEventHandler { ); } FmcdEvent::PaymentSucceeded { - payment_id, + operation_id, federation_id, + amount_msat, preimage, - fee_msat, - correlation_id, timestamp, } => { info!( event_type = "payment_succeeded", - payment_id = %payment_id, + operation_id = %operation_id, federation_id = %federation_id, + amount_msat = amount_msat, preimage = %sanitize_preimage(&preimage), - fee_msat = fee_msat, - correlation_id = ?correlation_id, timestamp = %timestamp, "Payment succeeded" ); @@ -98,17 +94,17 @@ impl EventHandler for LoggingEventHandler { ); } FmcdEvent::InvoicePaid { - invoice_id, + operation_id, federation_id, - amount_received_msat, + amount_msat, correlation_id, timestamp, } => { info!( event_type = "invoice_paid", - invoice_id = %invoice_id, + operation_id = %operation_id, federation_id = %federation_id, - amount_received_msat = amount_received_msat, + amount_msat = amount_msat, correlation_id = ?correlation_id, timestamp = %timestamp, "Invoice paid" @@ -343,20 +339,21 @@ impl EventHandler for LoggingEventHandler { "Withdrawal initiated" ); } - FmcdEvent::WithdrawalCompleted { + FmcdEvent::WithdrawalSucceeded { operation_id, federation_id, + amount_sat, txid, - correlation_id, timestamp, } => { info!( + event_type = "withdrawal_succeeded", operation_id = %operation_id, federation_id = %federation_id, + amount_sat = amount_sat, txid = %txid, - correlation_id = ?correlation_id, timestamp = %timestamp, - "Withdrawal completed" + "Withdrawal succeeded" ); } FmcdEvent::WithdrawalFailed { @@ -375,6 +372,40 @@ impl EventHandler for LoggingEventHandler { "Withdrawal failed" ); } + FmcdEvent::PaymentRefunded { + operation_id, + federation_id, + reason, + timestamp, + } => { + info!( + event_type = "payment_refunded", + operation_id = %operation_id, + federation_id = %federation_id, + reason = %reason, + timestamp = %timestamp, + "Payment refunded" + ); + } + FmcdEvent::DepositClaimed { + operation_id, + federation_id, + amount_sat, + txid, + correlation_id, + timestamp, + } => { + info!( + event_type = "deposit_claimed", + operation_id = %operation_id, + federation_id = %federation_id, + amount_sat = amount_sat, + txid = %txid, + correlation_id = ?correlation_id, + timestamp = %timestamp, + "Deposit claimed" + ); + } } Ok(()) diff --git a/src/events/handlers/metrics.rs b/src/events/handlers/metrics.rs index 0316eb4..1fc326b 100644 --- a/src/events/handlers/metrics.rs +++ b/src/events/handlers/metrics.rs @@ -1,9 +1,6 @@ -use std::sync::Arc; -use std::time::Duration; - use async_trait::async_trait; -use metrics::{counter, gauge, histogram}; -use tracing::{debug, error}; +use metrics::{counter, histogram}; +use tracing::debug; use crate::events::{EventHandler, FmcdEvent}; use crate::metrics::{ @@ -127,10 +124,10 @@ impl EventHandler for MetricsEventHandler { } FmcdEvent::PaymentSucceeded { federation_id, - fee_msat, + amount_msat, .. } => { - self.record_payment_metrics(&federation_id, "succeeded", None, Some(fee_msat)); + self.record_payment_metrics(&federation_id, "succeeded", Some(amount_msat), None); } FmcdEvent::PaymentFailed { federation_id, .. } => { self.record_payment_metrics(&federation_id, "failed", None, None); @@ -144,10 +141,10 @@ impl EventHandler for MetricsEventHandler { } FmcdEvent::InvoicePaid { federation_id, - amount_received_msat, + amount_msat, .. } => { - self.record_invoice_metrics(&federation_id, "paid", Some(amount_received_msat)); + self.record_invoice_metrics(&federation_id, "paid", Some(amount_msat)); } FmcdEvent::InvoiceExpired { federation_id, .. } => { self.record_invoice_metrics(&federation_id, "expired", None); @@ -209,7 +206,7 @@ impl EventHandler for MetricsEventHandler { histogram!(PAYMENT_AMOUNT_MSAT, "federation_id" => federation_id) .record((amount_sat * 1000) as f64); } - FmcdEvent::WithdrawalCompleted { federation_id, .. } => { + FmcdEvent::WithdrawalSucceeded { federation_id, .. } => { counter!(PAYMENTS_TOTAL, "federation_id" => federation_id, "type" => "withdrawal", "status" => "completed").increment(1); } FmcdEvent::WithdrawalFailed { federation_id, .. } => { @@ -228,6 +225,19 @@ impl EventHandler for MetricsEventHandler { histogram!(PAYMENT_AMOUNT_MSAT, "federation_id" => federation_id) .record((amount_sat * 1000) as f64); } + FmcdEvent::PaymentRefunded { federation_id, .. } => { + counter!(PAYMENTS_TOTAL, "federation_id" => federation_id, "status" => "refunded") + .increment(1); + } + FmcdEvent::DepositClaimed { + federation_id, + amount_sat, + .. + } => { + counter!(PAYMENTS_TOTAL, "federation_id" => federation_id.clone(), "type" => "deposit", "status" => "claimed").increment(1); + histogram!(PAYMENT_AMOUNT_MSAT, "federation_id" => federation_id) + .record((amount_sat * 1000) as f64); + } } // Record general event bus metrics diff --git a/src/events/mod.rs b/src/events/mod.rs index 6748bc8..3e7027c 100644 --- a/src/events/mod.rs +++ b/src/events/mod.rs @@ -22,11 +22,16 @@ pub enum FmcdEvent { timestamp: DateTime, }, PaymentSucceeded { - payment_id: String, + operation_id: String, federation_id: String, + amount_msat: u64, preimage: String, - fee_msat: u64, - correlation_id: Option, + timestamp: DateTime, + }, + PaymentRefunded { + operation_id: String, + federation_id: String, + reason: String, timestamp: DateTime, }, PaymentFailed { @@ -47,9 +52,9 @@ pub enum FmcdEvent { timestamp: DateTime, }, InvoicePaid { - invoice_id: String, + operation_id: String, federation_id: String, - amount_received_msat: u64, + amount_msat: u64, correlation_id: Option, timestamp: DateTime, }, @@ -96,6 +101,14 @@ pub enum FmcdEvent { correlation_id: Option, timestamp: DateTime, }, + DepositClaimed { + operation_id: String, + federation_id: String, + amount_sat: u64, + txid: String, + correlation_id: Option, + timestamp: DateTime, + }, WithdrawalInitiated { operation_id: String, federation_id: String, @@ -105,11 +118,11 @@ pub enum FmcdEvent { correlation_id: Option, timestamp: DateTime, }, - WithdrawalCompleted { + WithdrawalSucceeded { operation_id: String, federation_id: String, + amount_sat: u64, txid: String, - correlation_id: Option, timestamp: DateTime, }, WithdrawalFailed { @@ -170,6 +183,7 @@ impl FmcdEvent { match self { FmcdEvent::PaymentInitiated { timestamp, .. } => *timestamp, FmcdEvent::PaymentSucceeded { timestamp, .. } => *timestamp, + FmcdEvent::PaymentRefunded { timestamp, .. } => *timestamp, FmcdEvent::PaymentFailed { timestamp, .. } => *timestamp, FmcdEvent::InvoiceCreated { timestamp, .. } => *timestamp, FmcdEvent::InvoicePaid { timestamp, .. } => *timestamp, @@ -179,8 +193,9 @@ impl FmcdEvent { FmcdEvent::FederationBalanceUpdated { timestamp, .. } => *timestamp, FmcdEvent::DepositAddressGenerated { timestamp, .. } => *timestamp, FmcdEvent::DepositDetected { timestamp, .. } => *timestamp, + FmcdEvent::DepositClaimed { timestamp, .. } => *timestamp, FmcdEvent::WithdrawalInitiated { timestamp, .. } => *timestamp, - FmcdEvent::WithdrawalCompleted { timestamp, .. } => *timestamp, + FmcdEvent::WithdrawalSucceeded { timestamp, .. } => *timestamp, FmcdEvent::WithdrawalFailed { timestamp, .. } => *timestamp, FmcdEvent::GatewaySelected { timestamp, .. } => *timestamp, FmcdEvent::GatewayUnavailable { timestamp, .. } => *timestamp, @@ -193,7 +208,8 @@ impl FmcdEvent { pub fn correlation_id(&self) -> Option<&String> { match self { FmcdEvent::PaymentInitiated { correlation_id, .. } => correlation_id.as_ref(), - FmcdEvent::PaymentSucceeded { correlation_id, .. } => correlation_id.as_ref(), + FmcdEvent::PaymentSucceeded { .. } => None, + FmcdEvent::PaymentRefunded { .. } => None, FmcdEvent::PaymentFailed { correlation_id, .. } => correlation_id.as_ref(), FmcdEvent::InvoiceCreated { correlation_id, .. } => correlation_id.as_ref(), FmcdEvent::InvoicePaid { correlation_id, .. } => correlation_id.as_ref(), @@ -203,8 +219,9 @@ impl FmcdEvent { FmcdEvent::FederationBalanceUpdated { correlation_id, .. } => correlation_id.as_ref(), FmcdEvent::DepositAddressGenerated { correlation_id, .. } => correlation_id.as_ref(), FmcdEvent::DepositDetected { correlation_id, .. } => correlation_id.as_ref(), + FmcdEvent::DepositClaimed { correlation_id, .. } => correlation_id.as_ref(), FmcdEvent::WithdrawalInitiated { correlation_id, .. } => correlation_id.as_ref(), - FmcdEvent::WithdrawalCompleted { correlation_id, .. } => correlation_id.as_ref(), + FmcdEvent::WithdrawalSucceeded { .. } => None, FmcdEvent::WithdrawalFailed { correlation_id, .. } => correlation_id.as_ref(), FmcdEvent::GatewaySelected { correlation_id, .. } => correlation_id.as_ref(), FmcdEvent::GatewayUnavailable { correlation_id, .. } => correlation_id.as_ref(), @@ -218,6 +235,7 @@ impl FmcdEvent { match self { FmcdEvent::PaymentInitiated { .. } => "payment_initiated", FmcdEvent::PaymentSucceeded { .. } => "payment_succeeded", + FmcdEvent::PaymentRefunded { .. } => "payment_refunded", FmcdEvent::PaymentFailed { .. } => "payment_failed", FmcdEvent::InvoiceCreated { .. } => "invoice_created", FmcdEvent::InvoicePaid { .. } => "invoice_paid", @@ -227,8 +245,9 @@ impl FmcdEvent { FmcdEvent::FederationBalanceUpdated { .. } => "federation_balance_updated", FmcdEvent::DepositAddressGenerated { .. } => "deposit_address_generated", FmcdEvent::DepositDetected { .. } => "deposit_detected", + FmcdEvent::DepositClaimed { .. } => "deposit_claimed", FmcdEvent::WithdrawalInitiated { .. } => "withdrawal_initiated", - FmcdEvent::WithdrawalCompleted { .. } => "withdrawal_completed", + FmcdEvent::WithdrawalSucceeded { .. } => "withdrawal_succeeded", FmcdEvent::WithdrawalFailed { .. } => "withdrawal_failed", FmcdEvent::GatewaySelected { .. } => "gateway_selected", FmcdEvent::GatewayUnavailable { .. } => "gateway_unavailable", diff --git a/src/health/checks.rs b/src/health/checks.rs index 8f46297..714275d 100644 --- a/src/health/checks.rs +++ b/src/health/checks.rs @@ -6,7 +6,7 @@ use axum::extract::Extension; use axum::http::StatusCode; use axum::response::Json; use chrono::{DateTime, Utc}; -use fedimint_client::{Client, ClientHandleArc}; +use fedimint_client::ClientHandleArc; use serde::{Deserialize, Serialize}; use tracing::{debug, error, info, warn}; diff --git a/src/main.rs b/src/main.rs index 2fe75b7..d2001b1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -282,13 +282,13 @@ async fn track_metrics(req: Request, next: Next) -> impl IntoResponse { /// Implements Fedimint V0.2 API Route matching against CLI commands: /// - `/v2/admin/backup`: Upload the (encrypted) snapshot of mint notes to /// federation. -/// - `/v2/admin/discover-version`: Discover the common api version to use to -/// communicate with the federation. +/// - `/v2/admin/version`: Discover the common api version to use to communicate +/// with the federation. /// - `/v2/admin/info`: Display wallet info (holdings, tiers). /// - `/v2/admin/join`: Join a federation with an invite code. /// - `/v2/admin/restore`: Restore the previously created backup of mint notes /// (with `backup` command). -/// - `/v2/admin/list-operations`: List operations. +/// - `/v2/admin/operations`: List operations. /// - `/v2/admin/module`: Call a module subcommand. /// - `/v2/admin/config`: Returns the client config. /// @@ -305,14 +305,8 @@ async fn track_metrics(req: Request, next: Next) -> impl IntoResponse { /// Lightning network related commands: /// - `/v2/ln/invoice`: Create a lightning invoice to receive payment via /// gateway. -/// - `/v2/ln/invoice-external-pubkey-tweaked`: Create a lightning invoice to -/// receive payment via gateway with external pubkey. -/// - `/v2/ln/await-invoice`: Wait for incoming invoice to be paid. -/// - `/v2/ln/claim-external-receive-tweaked`: Claim an external receive. /// - `/v2/ln/pay`: Pay a lightning invoice or lnurl via a gateway. -/// - `/v2/ln/await-pay`: Wait for a lightning payment to complete. -/// - `/v2/ln/list-gateways`: List registered gateways. -/// - `/v2/ln/switch-gateway`: Switch active gateway. +/// - `/v2/ln/gateways`: List registered gateways. /// /// Onchain related commands: /// - `/v2/onchain/deposit-address`: Generate a new deposit address, funds sent @@ -331,18 +325,24 @@ fn fedimint_v2_rest() -> Router { .route("/combine", post(mint::combine::handle_rest)); let ln_router = Router::new() + // Modern API endpoints - aligns with fedimint client 0.8 behavior .route("/invoice", post(ln::invoice::handle_rest)) + .route("/invoice/status/bulk", post(ln::status::handle_bulk_status)) .route( - "/invoice-external-pubkey-tweaked", - post(ln::invoice_external_pubkey_tweaked::handle_rest), + "/operation/:operation_id/status", + get(ln::status::handle_rest_by_operation_id), ) - .route("/await-invoice", post(ln::await_invoice::handle_rest)) .route( - "/claim-external-receive-tweaked", - post(ln::claim_external_receive_tweaked::handle_rest), + "/operation/:operation_id/stream", + get(ln::stream::handle_operation_stream), ) + .route( + "/events/stream", + get(ln::stream::handle_global_event_stream), + ) + // Other LN endpoints .route("/pay", post(ln::pay::handle_rest)) - .route("/list-gateways", post(ln::list_gateways::handle_rest)); + .route("/gateways", post(ln::gateways::handle_rest)); let onchain_router = Router::new() .route( @@ -354,20 +354,14 @@ fn fedimint_v2_rest() -> Router { let admin_router = Router::new() .route("/backup", post(admin::backup::handle_rest)) - .route( - "/discover-version", - post(admin::discover_version::handle_rest), - ) - .route("/federation-ids", get(admin::federation_ids::handle_rest)) + .route("/version", post(admin::version::handle_rest)) + .route("/federations", get(admin::federations::handle_rest)) .route("/info", get(admin::info::handle_rest)) .route("/join", post(admin::join::handle_rest)) .route("/restore", post(admin::restore::handle_rest)) // .route("/printsecret", get(handle_printsecret)) TODO: should I expose this // under admin? - .route( - "/list-operations", - post(admin::list_operations::handle_rest), - ) + .route("/operations", post(admin::operations::handle_rest)) .route("/module", post(admin::module::handle_rest)) .route("/config", get(admin::config::handle_rest)); diff --git a/src/metrics/collectors.rs b/src/metrics/collectors.rs index 2aed2ff..1e020d4 100644 --- a/src/metrics/collectors.rs +++ b/src/metrics/collectors.rs @@ -5,7 +5,7 @@ use async_trait::async_trait; use lazy_static::lazy_static; use metrics::{counter, gauge, histogram}; use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle}; -use tracing::{debug, error, info, warn}; +use tracing::{debug, info}; use crate::events::{EventHandler, FmcdEvent}; @@ -98,18 +98,12 @@ impl MetricsCollector { "Recorded payment initiation metrics" ); } - FmcdEvent::PaymentSucceeded { - federation_id, - fee_msat, - .. - } => { + FmcdEvent::PaymentSucceeded { federation_id, .. } => { counter!(PAYMENTS_TOTAL, "federation_id" => federation_id.clone(), "status" => "succeeded").increment(1); - histogram!(PAYMENT_FEES_MSAT, "federation_id" => federation_id.clone()) - .record(*fee_msat as f64); + // Note: fee_msat is not available in PaymentSucceeded event debug!( federation_id = %federation_id, - fee_msat = fee_msat, "Recorded payment success metrics" ); } @@ -150,16 +144,16 @@ impl MetricsCollector { } FmcdEvent::InvoicePaid { federation_id, - amount_received_msat, + amount_msat, .. } => { counter!(INVOICES_TOTAL, "federation_id" => federation_id.clone(), "status" => "paid").increment(1); histogram!(INVOICE_AMOUNT_MSAT, "federation_id" => federation_id.clone()) - .record(*amount_received_msat as f64); + .record(*amount_msat as f64); debug!( federation_id = %federation_id, - amount_received_msat = amount_received_msat, + amount_msat = amount_msat, "Recorded invoice payment metrics" ); } @@ -483,11 +477,10 @@ mod tests { let collector = MetricsCollector::new(); let event = FmcdEvent::PaymentSucceeded { - payment_id: "test-payment".to_string(), + operation_id: "test-payment".to_string(), federation_id: "test-fed".to_string(), + amount_msat: 50000, preimage: "test-preimage".to_string(), - fee_msat: 1000, - correlation_id: Some("test-correlation".to_string()), timestamp: Utc::now(), }; diff --git a/src/operations/payment.rs b/src/operations/payment.rs index 6505ace..927ee39 100644 --- a/src/operations/payment.rs +++ b/src/operations/payment.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use chrono::{DateTime, Utc}; use fedimint_core::config::FederationId; use sha2::{Digest, Sha256}; -use tracing::{info_span, instrument, Instrument, Span}; +use tracing::{info_span, instrument, Span}; use crate::events::{EventBus, FmcdEvent}; use crate::observability::correlation::RequestContext; @@ -184,7 +184,7 @@ impl PaymentTracker { /// Mark payment as succeeded and publish event #[instrument(skip(self), fields(payment_id = %self.payment_id))] - pub async fn succeed(&mut self, preimage: String, fee_msat: u64) { + pub async fn succeed(&mut self, preimage: String, amount_msat: u64, fee_msat: u64) { self.state = PaymentState::Succeeded; self.span.record("state", self.state.as_str()); self.span.record("fee_msat", fee_msat); @@ -194,11 +194,10 @@ impl PaymentTracker { self.span.record("duration_ms", duration.num_milliseconds()); let event = FmcdEvent::PaymentSucceeded { - payment_id: self.payment_id.clone(), + operation_id: self.payment_id.clone(), federation_id: self.federation_id.clone(), + amount_msat, preimage, - fee_msat, - correlation_id: self.correlation_id.clone(), timestamp: Utc::now(), }; @@ -296,6 +295,16 @@ impl InvoiceTracker { &self.invoice_id } + /// Get the federation ID + pub fn federation_id(&self) -> &str { + &self.federation_id + } + + /// Get the correlation ID + pub fn correlation_id(&self) -> Option<&String> { + self.correlation_id.as_ref() + } + /// Record invoice creation #[instrument(skip(self), fields(invoice_id = %self.invoice_id))] pub async fn created(&self, amount_msat: u64, invoice: String) { @@ -321,9 +330,9 @@ impl InvoiceTracker { #[instrument(skip(self), fields(invoice_id = %self.invoice_id))] pub async fn paid(&self, amount_received_msat: u64) { let event = FmcdEvent::InvoicePaid { - invoice_id: self.invoice_id.clone(), + operation_id: self.invoice_id.clone(), // Using invoice_id as operation_id federation_id: self.federation_id.clone(), - amount_received_msat, + amount_msat: amount_received_msat, correlation_id: self.correlation_id.clone(), timestamp: Utc::now(), }; @@ -356,6 +365,3 @@ impl InvoiceTracker { } } } - -#[cfg(test)] -mod tests; diff --git a/src/router/handlers/admin/federation_ids.rs b/src/router/handlers/admin/federations.rs similarity index 100% rename from src/router/handlers/admin/federation_ids.rs rename to src/router/handlers/admin/federations.rs diff --git a/src/router/handlers/admin/mod.rs b/src/router/handlers/admin/mod.rs index cfb6a0c..079ceb6 100644 --- a/src/router/handlers/admin/mod.rs +++ b/src/router/handlers/admin/mod.rs @@ -1,12 +1,12 @@ pub mod backup; pub mod config; -pub mod discover_version; -pub mod federation_ids; +pub mod federations; pub mod info; pub mod join; -pub mod list_operations; pub mod module; +pub mod operations; pub mod restore; +pub mod version; use fedimint_client::ClientHandleArc; use fedimint_mint_client::MintClientModule; diff --git a/src/router/handlers/admin/list_operations.rs b/src/router/handlers/admin/operations.rs similarity index 95% rename from src/router/handlers/admin/list_operations.rs rename to src/router/handlers/admin/operations.rs index eb3b534..5d7228b 100644 --- a/src/router/handlers/admin/list_operations.rs +++ b/src/router/handlers/admin/operations.rs @@ -32,7 +32,7 @@ pub struct OperationOutput { pub outcome: Option, } -async fn _list_operations( +async fn _operations( client: ClientHandleArc, req: ListOperationsRequest, ) -> Result { @@ -85,7 +85,7 @@ pub async fn handle_ws(state: AppState, v: Value) -> Result { ) })?; let client = state.get_client(v.federation_id).await?; - let operations = _list_operations(client, v).await?; + let operations = _operations(client, v).await?; let operations_json = json!(operations); Ok(operations_json) } @@ -96,6 +96,6 @@ pub async fn handle_rest( Json(req): Json, ) -> Result, AppError> { let client = state.get_client(req.federation_id).await?; - let operations = _list_operations(client, req).await?; + let operations = _operations(client, req).await?; Ok(Json(operations)) } diff --git a/src/router/handlers/admin/discover_version.rs b/src/router/handlers/admin/version.rs similarity index 79% rename from src/router/handlers/admin/discover_version.rs rename to src/router/handlers/admin/version.rs index 3c682b5..08e84ed 100644 --- a/src/router/handlers/admin/discover_version.rs +++ b/src/router/handlers/admin/version.rs @@ -8,7 +8,7 @@ use crate::error::AppError; use crate::multimint::MultiMint; use crate::state::AppState; -async fn _discover_version(multimint: MultiMint) -> Result { +async fn _version(multimint: MultiMint) -> Result { let mut api_versions = HashMap::new(); for (id, client) in multimint.clients.lock().await.iter() { api_versions.insert( @@ -20,13 +20,13 @@ async fn _discover_version(multimint: MultiMint) -> Result { } pub async fn handle_ws(state: AppState) -> Result { - let version = _discover_version(state.multimint).await?; + let version = _version(state.multimint).await?; let version_json = json!(version); Ok(version_json) } #[axum_macros::debug_handler] pub async fn handle_rest(State(state): State) -> Result, AppError> { - let version = _discover_version(state.multimint).await?; + let version = _version(state.multimint).await?; Ok(Json(version)) } diff --git a/src/router/handlers/ln/await_invoice.rs b/src/router/handlers/ln/await_invoice.rs deleted file mode 100644 index ecbbde6..0000000 --- a/src/router/handlers/ln/await_invoice.rs +++ /dev/null @@ -1,79 +0,0 @@ -use anyhow::anyhow; -use axum::extract::State; -use axum::http::StatusCode; -use axum::Json; -use fedimint_client::ClientHandleArc; -use fedimint_core::config::FederationId; -use fedimint_core::core::OperationId; -use fedimint_ln_client::{LightningClientModule, LnReceiveState}; -use futures_util::StreamExt; -use serde::{Deserialize, Serialize}; -use serde_json::{json, Value}; -use tracing::{debug, error, info}; - -use crate::error::AppError; -use crate::state::AppState; - -#[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct AwaitInvoiceRequest { - pub operation_id: OperationId, - pub federation_id: FederationId, -} - -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct AwaitInvoiceResponse { - pub status: LnReceiveState, -} - -async fn _await_invoice( - client: ClientHandleArc, - req: AwaitInvoiceRequest, -) -> Result { - let lightning_module = &client.get_first_module::()?; - let mut updates = lightning_module - .subscribe_ln_receive(req.operation_id) - .await? - .into_stream(); - info!( - "Created await invoice stream for operation id: {:?}", - req.operation_id - ); - while let Some(update) = updates.next().await { - debug!("Invoice state update received"); - match &update { - LnReceiveState::Claimed => { - return Ok(AwaitInvoiceResponse { status: update }); - } - LnReceiveState::Canceled { reason } => { - error!("Invoice canceled: {}", reason); - return Ok(AwaitInvoiceResponse { status: update }); - } - _ => {} - } - } - - Err(AppError::new( - StatusCode::INTERNAL_SERVER_ERROR, - anyhow!("Unexpected end of stream"), - )) -} - -pub async fn handle_ws(state: AppState, v: Value) -> Result { - let v = serde_json::from_value::(v) - .map_err(|e| AppError::new(StatusCode::BAD_REQUEST, anyhow!("Invalid request: {}", e)))?; - let client = state.get_client(v.federation_id).await?; - let invoice_response = _await_invoice(client, v).await?; - Ok(json!(invoice_response)) -} - -#[axum_macros::debug_handler] -pub async fn handle_rest( - State(state): State, - Json(req): Json, -) -> Result, AppError> { - let client = state.get_client(req.federation_id).await?; - let invoice_response = _await_invoice(client, req).await?; - Ok(Json(invoice_response)) -} diff --git a/src/router/handlers/ln/claim_external_receive_tweaked.rs b/src/router/handlers/ln/claim_external_receive_tweaked.rs deleted file mode 100644 index 700cfc5..0000000 --- a/src/router/handlers/ln/claim_external_receive_tweaked.rs +++ /dev/null @@ -1,90 +0,0 @@ -use anyhow::anyhow; -use axum::extract::State; -use axum::http::StatusCode; -use axum::Json; -use fedimint_client::ClientHandleArc; -use fedimint_core::config::FederationId; -use fedimint_core::secp256k1::{Keypair, Secp256k1, SecretKey}; -use fedimint_ln_client::{LightningClientModule, LnReceiveState}; -use futures_util::StreamExt; -use serde::{Deserialize, Serialize}; -use serde_json::{json, Value}; -use tracing::{debug, error, info}; - -use crate::error::AppError; -use crate::state::AppState; - -#[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct ClaimExternalReceiveTweakedRequest { - pub tweaks: Vec, - pub private_key: SecretKey, - pub federation_id: FederationId, -} - -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct ClaimExternalReceiveTweakedResponse { - pub status: LnReceiveState, -} - -#[allow(deprecated)] // TODO: Migrate to recurring payment functionality in future update -async fn _await_claim_external_receive_tweaked( - client: ClientHandleArc, - req: ClaimExternalReceiveTweakedRequest, -) -> Result { - let secp = Secp256k1::new(); - let key_pair = Keypair::from_secret_key(&secp, &req.private_key); - let lightning_module = &client.get_first_module::()?; - let operation_id = lightning_module - .scan_receive_for_user_tweaked(key_pair, req.tweaks, ()) - .await; - - for operation_id in operation_id { - let mut updates = lightning_module - .subscribe_ln_claim(operation_id) - .await? - .into_stream(); - info!( - "Created claim external receive tweaked stream for operation id: {:?}", - operation_id - ); - while let Some(update) = updates.next().await { - debug!("External receive state update received"); - match &update { - LnReceiveState::Claimed => { - return Ok(ClaimExternalReceiveTweakedResponse { status: update }); - } - LnReceiveState::Canceled { reason } => { - error!("Claim canceled: {}", reason); - return Ok(ClaimExternalReceiveTweakedResponse { status: update }); - } - _ => {} - } - } - } - - Err(AppError::new( - StatusCode::INTERNAL_SERVER_ERROR, - anyhow!("Unexpected end of stream"), - )) -} - -pub async fn handle_ws(state: AppState, v: Value) -> Result { - let v = serde_json::from_value::(v) - .map_err(|e| AppError::new(StatusCode::BAD_REQUEST, anyhow!("Invalid request: {}", e)))?; - let client = state.get_client(v.federation_id).await?; - let invoice = _await_claim_external_receive_tweaked(client, v).await?; - let invoice_json = json!(invoice); - Ok(invoice_json) -} - -#[axum_macros::debug_handler] -pub async fn handle_rest( - State(state): State, - Json(req): Json, -) -> Result, AppError> { - let client = state.get_client(req.federation_id).await?; - let invoice_response = _await_claim_external_receive_tweaked(client, req).await?; - Ok(Json(invoice_response)) -} diff --git a/src/router/handlers/ln/list_gateways.rs b/src/router/handlers/ln/gateways.rs similarity index 91% rename from src/router/handlers/ln/list_gateways.rs rename to src/router/handlers/ln/gateways.rs index 21cf5a8..e6fc1fc 100644 --- a/src/router/handlers/ln/list_gateways.rs +++ b/src/router/handlers/ln/gateways.rs @@ -17,7 +17,7 @@ pub struct ListGatewaysRequest { pub federation_id: FederationId, } -async fn _list_gateways(client: ClientHandleArc) -> Result { +async fn _gateways(client: ClientHandleArc) -> Result { let lightning_module = client.get_first_module::()?; let gateways = lightning_module.list_gateways().await; if gateways.is_empty() { @@ -43,7 +43,7 @@ pub async fn handle_ws(state: AppState, v: Value) -> Result { let v = serde_json::from_value::(v) .map_err(|e| AppError::new(StatusCode::BAD_REQUEST, anyhow!("Invalid request: {}", e)))?; let client = state.get_client(v.federation_id).await?; - let gateways = _list_gateways(client).await?; + let gateways = _gateways(client).await?; let gateways_json = json!(gateways); Ok(gateways_json) } @@ -54,6 +54,6 @@ pub async fn handle_rest( Json(req): Json, ) -> Result, AppError> { let client = state.get_client(req.federation_id).await?; - let gateways = _list_gateways(client).await?; + let gateways = _gateways(client).await?; Ok(Json(gateways)) } diff --git a/src/router/handlers/ln/invoice.rs b/src/router/handlers/ln/invoice.rs index 520013c..cdee42a 100644 --- a/src/router/handlers/ln/invoice.rs +++ b/src/router/handlers/ln/invoice.rs @@ -1,23 +1,29 @@ +use std::time::Duration; + use anyhow::anyhow; use axum::extract::{Extension, State}; use axum::http::StatusCode; use axum::Json; +use chrono::{DateTime, Utc}; use fedimint_client::ClientHandleArc; use fedimint_core::config::FederationId; use fedimint_core::core::OperationId; use fedimint_core::secp256k1::PublicKey; use fedimint_core::Amount; -use fedimint_ln_client::LightningClientModule; +use fedimint_ln_client::{LightningClientModule, LnReceiveState}; use fedimint_ln_common::lightning_invoice::{Bolt11InvoiceDescription, Description}; +use futures_util::StreamExt; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; -use tracing::{error, info, instrument}; +use tracing::{error, info, instrument, warn}; +use uuid::Uuid; use crate::error::AppError; use crate::observability::correlation::RequestContext; use crate::operations::payment::InvoiceTracker; use crate::state::AppState; +/// Invoice creation request with essential fields #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] pub struct LnInvoiceRequest { @@ -26,14 +32,72 @@ pub struct LnInvoiceRequest { pub expiry_time: Option, pub gateway_id: PublicKey, pub federation_id: FederationId, - pub extra_meta: Option, + /// Optional metadata to store with the invoice (e.g., order ID, customer + /// info) + pub metadata: Option, } -#[derive(Debug, Serialize)] +/// Invoice response with essential information +#[derive(Debug, Serialize, Clone)] #[serde(rename_all = "camelCase")] pub struct LnInvoiceResponse { + /// Unique invoice identifier for tracking + pub invoice_id: String, + /// Fedimint operation ID pub operation_id: OperationId, + /// BOLT11 invoice string pub invoice: String, + /// Current invoice status + pub status: InvoiceStatus, + /// Settlement information (if available) + pub settlement: Option, + /// Invoice creation timestamp + pub created_at: DateTime, + /// Invoice expiry timestamp + pub expires_at: Option>, + /// Optional metadata associated with the invoice + #[serde(skip_serializing_if = "Option::is_none")] + pub metadata: Option, +} + +/// Unified invoice status enum +#[derive(Debug, Serialize, Clone)] +#[serde(rename_all = "lowercase")] +pub enum InvoiceStatus { + Created, + Pending, + Claimed { + amount_received_msat: u64, + settled_at: DateTime, + }, + Expired { + expired_at: DateTime, + }, + Canceled { + reason: String, + canceled_at: DateTime, + }, +} + +/// Settlement information structure +#[derive(Debug, Serialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct SettlementInfo { + pub amount_received_msat: u64, + pub settled_at: DateTime, + pub preimage: Option, + pub gateway_fee_msat: Option, +} + +/// Invoice status update for streaming +#[derive(Debug, Serialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct InvoiceStatusUpdate { + pub invoice_id: String, + pub operation_id: OperationId, + pub status: InvoiceStatus, + pub settlement: Option, + pub updated_at: DateTime, } #[instrument( @@ -46,7 +110,7 @@ pub struct LnInvoiceResponse { invoice_id = tracing::field::Empty, ) )] -async fn _invoice( +async fn _create_invoice( state: &AppState, client: ClientHandleArc, req: LnInvoiceRequest, @@ -54,7 +118,20 @@ async fn _invoice( ) -> Result { let span = tracing::Span::current(); - let lightning_module = client.get_first_module::()?; + let lightning_module = client + .get_first_module::() + .map_err(|e| { + error!( + federation_id = %req.federation_id, + error = ?e, + "Failed to get Lightning module from fedimint client" + ); + AppError::new( + StatusCode::INTERNAL_SERVER_ERROR, + anyhow!("Failed to get Lightning module: {}", e), + ) + })?; + let gateway = lightning_module .select_gateway(&req.gateway_id) .await @@ -62,11 +139,11 @@ async fn _invoice( error!( gateway_id = %req.gateway_id, federation_id = %req.federation_id, - "Failed to select gateway" + "Failed to select gateway - gateway may be offline or not registered" ); AppError::new( - StatusCode::INTERNAL_SERVER_ERROR, - anyhow!("Failed to select gateway"), + StatusCode::BAD_REQUEST, + anyhow!("Failed to select gateway with ID {}. Gateway may be offline or not registered with this federation.", req.gateway_id), ) })?; @@ -74,57 +151,361 @@ async fn _invoice( gateway_id = %gateway.gateway_id, federation_id = %req.federation_id, amount_msat = %req.amount_msat.msats, - "Creating invoice with selected gateway" + "Creating invoice with automatic monitoring" ); + let created_at = Utc::now(); + let expires_at = req + .expiry_time + .map(|expiry| created_at + chrono::Duration::seconds(expiry as i64)); + + // Use provided metadata or default to null + let metadata = req.metadata.clone().unwrap_or(serde_json::Value::Null); + + // Create fedimint invoice using native client let (operation_id, invoice, _) = lightning_module .create_bolt11_invoice( req.amount_msat, - Bolt11InvoiceDescription::Direct(Description::new(req.description)?), + Bolt11InvoiceDescription::Direct(Description::new(req.description.clone()).map_err( + |e| { + error!( + federation_id = %req.federation_id, + description = %req.description, + error = ?e, + "Invalid invoice description" + ); + AppError::new( + StatusCode::BAD_REQUEST, + anyhow!("Invalid invoice description: {}", e), + ) + }, + )?), req.expiry_time, - req.extra_meta.unwrap_or_default(), + metadata, Some(gateway), ) - .await?; + .await + .map_err(|e| { + error!( + federation_id = %req.federation_id, + amount_msat = %req.amount_msat.msats, + error = ?e, + "Failed to create fedimint invoice" + ); + AppError::new( + StatusCode::INTERNAL_SERVER_ERROR, + anyhow!("Failed to create invoice: {}", e), + ) + })?; - // Create invoice tracker using operation_id as invoice_id + // Generate unique invoice ID for tracking (no longer stored) + let invoice_id = format!("inv_{}", Uuid::new_v4().simple()); + + // Create invoice tracker for observability let invoice_tracker = InvoiceTracker::new( - format!("{:?}", operation_id), + invoice_id.clone(), req.federation_id, state.event_bus.clone(), - Some(context), + Some(context.clone()), ); - // Record the operation and invoice IDs in the span + // Record telemetry span.record("operation_id", &format!("{:?}", operation_id)); - span.record("invoice_id", invoice_tracker.invoice_id()); + span.record("invoice_id", &invoice_id); // Track invoice creation invoice_tracker .created(req.amount_msat.msats, invoice.to_string()) .await; + let response = LnInvoiceResponse { + invoice_id: invoice_id.clone(), + operation_id, + invoice: invoice.to_string(), + status: InvoiceStatus::Created, + settlement: None, + created_at, + expires_at, + metadata: req.metadata.clone(), + }; + + // Register with payment lifecycle manager for comprehensive tracking and ecash + // claiming + if let Some(ref payment_lifecycle_manager) = state.payment_lifecycle_manager { + if let Err(e) = payment_lifecycle_manager + .track_lightning_receive( + operation_id, + req.federation_id, + req.amount_msat, + req.metadata.clone(), + Some(context.correlation_id.clone()), + ) + .await + { + error!( + operation_id = ?operation_id, + invoice_id = %invoice_id, + error = ?e, + "Failed to register invoice with payment lifecycle manager" + ); + } else { + info!( + operation_id = ?operation_id, + invoice_id = %invoice_id, + "Invoice registered with payment lifecycle manager for automatic ecash claiming" + ); + } + } + + // Start automatic monitoring for all invoices (legacy compatibility - will be + // removed) + let client_clone = client.clone(); + let timeout = Duration::from_secs(24 * 60 * 60); // 24 hours max timeout + let invoice_tracker_clone = invoice_tracker; + let invoice_id_clone = invoice_id.clone(); + let amount_msat = req.amount_msat.msats; + + tokio::spawn(async move { + if let Err(e) = monitor_invoice_settlement_automatic( + client_clone, + operation_id, + invoice_id_clone.clone(), + amount_msat, + timeout, + invoice_tracker_clone, + ) + .await + { + error!( + operation_id = ?operation_id, + invoice_id = %invoice_id_clone, + error = ?e, + "Failed to automatically monitor invoice settlement" + ); + } + }); + info!( operation_id = ?operation_id, - invoice_id = %invoice_tracker.invoice_id(), + invoice_id = %invoice_id, federation_id = %req.federation_id, amount_msat = %req.amount_msat.msats, - "Invoice created successfully" + "Invoice created successfully with automatic monitoring" ); - Ok(LnInvoiceResponse { - operation_id, - invoice: invoice.to_string(), - }) + Ok(response) +} + +/// Automatic settlement monitoring using fedimint's subscribe_ln_receive +/// Monitors all invoices automatically until settled, expired, or timeout +async fn monitor_invoice_settlement_automatic( + client: ClientHandleArc, + operation_id: OperationId, + invoice_id: String, + amount_msat: u64, + timeout: Duration, + invoice_tracker: InvoiceTracker, +) -> anyhow::Result<()> { + let lightning_module = client.get_first_module::()?; + + // Use fedimint's native subscribe_ln_receive for automatic monitoring + let mut updates = lightning_module + .subscribe_ln_receive(operation_id) + .await? + .into_stream(); + + info!( + operation_id = ?operation_id, + invoice_id = %invoice_id, + timeout_secs = timeout.as_secs(), + "Started automatic invoice settlement monitoring" + ); + + let timeout_future = tokio::time::sleep(timeout); + tokio::pin!(timeout_future); + + loop { + tokio::select! { + update = updates.next() => { + match update { + Some(LnReceiveState::Claimed) => { + info!( + operation_id = ?operation_id, + invoice_id = %invoice_id, + "Invoice settled - publishing event to event bus" + ); + + if let Err(e) = handle_settlement_success( + &invoice_tracker, + operation_id, + &invoice_id, + amount_msat, + ).await { + error!( + operation_id = ?operation_id, + invoice_id = %invoice_id, + error = ?e, + "Failed to handle settlement success" + ); + } + break; + } + Some(LnReceiveState::Canceled { reason }) => { + warn!( + operation_id = ?operation_id, + invoice_id = %invoice_id, + reason = %reason, + "Invoice canceled - publishing event to event bus" + ); + + if let Err(e) = handle_settlement_cancellation( + &invoice_tracker, + operation_id, + &invoice_id, + reason.to_string(), + ).await { + error!( + operation_id = ?operation_id, + invoice_id = %invoice_id, + error = ?e, + "Failed to handle settlement cancellation" + ); + } + break; + } + Some(state) => { + info!( + operation_id = ?operation_id, + invoice_id = %invoice_id, + state = ?state, + "Invoice status update - continuing automatic monitoring" + ); + continue; + } + None => { + warn!( + operation_id = ?operation_id, + invoice_id = %invoice_id, + "Automatic monitoring stream ended unexpectedly" + ); + break; + } + } + } + _ = &mut timeout_future => { + warn!( + operation_id = ?operation_id, + invoice_id = %invoice_id, + timeout_secs = timeout.as_secs(), + "Invoice settlement monitoring timed out" + ); + + if let Err(e) = handle_settlement_timeout( + operation_id, + &invoice_id, + ).await { + error!( + operation_id = ?operation_id, + invoice_id = %invoice_id, + error = ?e, + "Failed to handle settlement timeout" + ); + } + break; + } + } + } + + info!( + operation_id = ?operation_id, + invoice_id = %invoice_id, + "Automatic invoice settlement monitoring completed" + ); + + Ok(()) +} + +async fn handle_settlement_success( + invoice_tracker: &InvoiceTracker, + operation_id: OperationId, + invoice_id: &str, + amount_msat: u64, +) -> anyhow::Result<()> { + // NOTE: Fedimint's current API doesn't expose the actual settlement amount + // from the LnReceiveState::Claimed state. The actual amount received might + // differ from the invoice amount due to routing fees or other factors. + // For now, we use the original invoice amount as a reasonable approximation. + // This ensures webhooks and events receive meaningful data rather than 0. + let amount_received_msat = amount_msat; + + // Publish invoice paid event to event bus + invoice_tracker.paid(amount_received_msat).await; + + info!( + operation_id = ?operation_id, + invoice_id = %invoice_id, + amount_received_msat = amount_received_msat, + "Invoice settlement event published to event bus" + ); + + Ok(()) +} + +async fn handle_settlement_cancellation( + invoice_tracker: &InvoiceTracker, + operation_id: OperationId, + invoice_id: &str, + reason: String, +) -> anyhow::Result<()> { + // Publish invoice expiration/cancellation event to event bus + invoice_tracker.expired().await; + + info!( + operation_id = ?operation_id, + invoice_id = %invoice_id, + reason = %reason, + "Invoice cancellation event published to event bus" + ); + + Ok(()) +} + +async fn handle_settlement_timeout( + operation_id: OperationId, + invoice_id: &str, +) -> anyhow::Result<()> { + info!( + operation_id = ?operation_id, + invoice_id = %invoice_id, + "Invoice monitoring timeout - invoice may still be active in fedimint" + ); + + // Note: We don't publish an event for timeout as the invoice may still be valid + // The timeout is just for our monitoring, not the actual invoice expiry + + Ok(()) } pub async fn handle_ws(state: AppState, v: Value) -> Result { let v = serde_json::from_value::(v) .map_err(|e| AppError::new(StatusCode::BAD_REQUEST, anyhow!("Invalid request: {}", e)))?; let client = state.get_client(v.federation_id).await?; - // TODO: WebSocket requests should get RequestContext from middleware + // Create a new context for backward compatibility (when called without context) let context = RequestContext::new(None); - let invoice = _invoice(&state, client, v, context).await?; + let invoice = _create_invoice(&state, client, v, context).await?; + let invoice_json = json!(invoice); + Ok(invoice_json) +} + +pub async fn handle_ws_with_context( + state: AppState, + v: Value, + context: RequestContext, +) -> Result { + let v = serde_json::from_value::(v) + .map_err(|e| AppError::new(StatusCode::BAD_REQUEST, anyhow!("Invalid request: {}", e)))?; + let client = state.get_client(v.federation_id).await?; + let invoice = _create_invoice(&state, client, v, context).await?; let invoice_json = json!(invoice); Ok(invoice_json) } @@ -136,6 +517,6 @@ pub async fn handle_rest( Json(req): Json, ) -> Result, AppError> { let client = state.get_client(req.federation_id).await?; - let invoice = _invoice(&state, client, req, context).await?; + let invoice = _create_invoice(&state, client, req, context).await?; Ok(Json(invoice)) } diff --git a/src/router/handlers/ln/invoice_external_pubkey_tweaked.rs b/src/router/handlers/ln/invoice_external_pubkey_tweaked.rs deleted file mode 100644 index 3acbd20..0000000 --- a/src/router/handlers/ln/invoice_external_pubkey_tweaked.rs +++ /dev/null @@ -1,89 +0,0 @@ -use anyhow::anyhow; -use axum::extract::State; -use axum::http::StatusCode; -use axum::Json; -use fedimint_client::ClientHandleArc; -use fedimint_core::config::FederationId; -use fedimint_core::core::OperationId; -use fedimint_core::secp256k1::PublicKey; -use fedimint_core::Amount; -use fedimint_ln_client::LightningClientModule; -use fedimint_ln_common::lightning_invoice::{Bolt11InvoiceDescription, Description}; -use serde::{Deserialize, Serialize}; -use serde_json::{json, Value}; -use tracing::error; - -use crate::error::AppError; -use crate::state::AppState; - -#[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct LnInvoiceExternalPubkeyTweakedRequest { - pub amount_msat: Amount, - pub description: String, - pub expiry_time: Option, - pub external_pubkey: PublicKey, - pub tweak: u64, - pub gateway_id: PublicKey, - pub federation_id: FederationId, - pub extra_meta: Option, -} - -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct LnInvoiceExternalPubkeyTweakedResponse { - pub operation_id: OperationId, - pub invoice: String, -} - -async fn _invoice( - client: ClientHandleArc, - req: LnInvoiceExternalPubkeyTweakedRequest, -) -> Result { - let lightning_module = client.get_first_module::()?; - let gateway = lightning_module - .select_gateway(&req.gateway_id) - .await - .ok_or_else(|| { - error!("Failed to select gateway: {}", req.gateway_id); - AppError::new( - StatusCode::INTERNAL_SERVER_ERROR, - anyhow!("Failed to select gateway"), - ) - })?; - - let (operation_id, invoice, _) = lightning_module - .create_bolt11_invoice_for_user_tweaked( - req.amount_msat, - Bolt11InvoiceDescription::Direct(Description::new(req.description)?), - req.expiry_time, - req.external_pubkey, - req.tweak, - req.extra_meta.unwrap_or_default(), - Some(gateway), - ) - .await?; - Ok(LnInvoiceExternalPubkeyTweakedResponse { - operation_id, - invoice: invoice.to_string(), - }) -} - -pub async fn handle_ws(state: AppState, v: Value) -> Result { - let v = serde_json::from_value::(v) - .map_err(|e| AppError::new(StatusCode::BAD_REQUEST, anyhow!("Invalid request: {}", e)))?; - let client = state.get_client(v.federation_id).await?; - let invoice = _invoice(client, v).await?; - let invoice_json = json!(invoice); - Ok(invoice_json) -} - -#[axum_macros::debug_handler] -pub async fn handle_rest( - State(state): State, - Json(req): Json, -) -> Result, AppError> { - let client = state.get_client(req.federation_id).await?; - let invoice = _invoice(client, req).await?; - Ok(Json(invoice)) -} diff --git a/src/router/handlers/ln/mod.rs b/src/router/handlers/ln/mod.rs index d10532b..9b085dc 100644 --- a/src/router/handlers/ln/mod.rs +++ b/src/router/handlers/ln/mod.rs @@ -11,12 +11,11 @@ use tracing::{debug, info}; use self::pay::{LnPayRequest, LnPayResponse}; use crate::observability::sanitize_invoice; -pub mod await_invoice; -pub mod claim_external_receive_tweaked; +pub mod gateways; pub mod invoice; -pub mod invoice_external_pubkey_tweaked; -pub mod list_gateways; pub mod pay; +pub mod status; +pub mod stream; pub async fn get_invoice(req: &LnPayRequest) -> anyhow::Result { let info = req.payment_info.trim(); diff --git a/src/router/handlers/ln/pay.rs b/src/router/handlers/ln/pay.rs index cbefa8a..4c49f74 100644 --- a/src/router/handlers/ln/pay.rs +++ b/src/router/handlers/ln/pay.rs @@ -1,6 +1,4 @@ -use anyhow::anyhow; use axum::extract::{Extension, State}; -use axum::http::StatusCode; use axum::Json; use fedimint_client::ClientHandleArc; use fedimint_core::config::FederationId; @@ -10,7 +8,7 @@ use fedimint_core::Amount; use fedimint_ln_client::{LightningClientModule, OutgoingLightningPayment, PayType}; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; -use tracing::{error, info, info_span, instrument, Instrument}; +use tracing::{error, info, instrument}; use crate::error::{AppError, ErrorCategory}; use crate::observability::correlation::RequestContext; @@ -240,6 +238,30 @@ async fn _pay( "Payment initiated successfully" ); + // Register with payment lifecycle manager for comprehensive tracking + if let Some(ref payment_lifecycle_manager) = state.payment_lifecycle_manager { + if let Err(e) = payment_lifecycle_manager + .track_lightning_pay( + operation_id, + req.federation_id, + req.amount_msat.unwrap_or(Amount::ZERO), + None, + ) + .await + { + error!( + operation_id = ?operation_id, + error = ?e, + "Failed to register payment with lifecycle manager" + ); + } else { + info!( + operation_id = ?operation_id, + "Payment registered with lifecycle manager for monitoring" + ); + } + } + // Wait for payment completion let result = wait_for_ln_payment(&client, payment_type, contract_id.to_string(), false) .await? @@ -277,9 +299,10 @@ async fn _pay( AppError::gateway_error("Payment failed or timed out").with_context(context.clone()) })?; - // Track payment success + // Track payment success - use actual payment amount + let payment_amount = req.amount_msat.unwrap_or(fee).msats; payment_tracker - .succeed(result.preimage.clone(), fee.msats) + .succeed(result.preimage.clone(), payment_amount, fee.msats) .await; span.record("payment_status", "completed"); diff --git a/src/router/handlers/ln/status.rs b/src/router/handlers/ln/status.rs new file mode 100644 index 0000000..f84f77f --- /dev/null +++ b/src/router/handlers/ln/status.rs @@ -0,0 +1,282 @@ +use anyhow::anyhow; +use axum::extract::{Path, Query, State}; +use axum::http::StatusCode; +use axum::Json; +use chrono::Utc; +use fedimint_client::ClientHandleArc; +use fedimint_core::config::FederationId; +use fedimint_core::core::OperationId; +use fedimint_ln_client::{LightningClientModule, LnReceiveState}; +use futures_util::StreamExt; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use tracing::{error, info, instrument, warn}; + +use crate::error::AppError; +use crate::router::handlers::ln::invoice::{InvoiceStatus, SettlementInfo}; +use crate::state::AppState; + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct StatusQuery { + pub federation_id: FederationId, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct StatusResponse { + pub invoice_id: Option, + pub operation_id: OperationId, + pub status: InvoiceStatus, + pub settlement: Option, + pub last_updated: chrono::DateTime, +} + +/// Unified status endpoint that supports both invoice_id and operation_id +/// lookup +#[instrument( + skip(client), + fields( + operation_id = ?operation_id, + federation_id = %query.federation_id, + status = tracing::field::Empty, + ) +)] +async fn _get_status( + client: ClientHandleArc, + operation_id: OperationId, + query: StatusQuery, +) -> Result { + let span = tracing::Span::current(); + let lightning_module = client.get_first_module::()?; + + // Try to get the invoice amount from operation metadata + let invoice_amount_msat = client + .operation_log() + .get_operation(operation_id) + .await + .and_then(|op| { + // Extract amount from operation metadata if available + op.meta::() + .get("amount") + .and_then(|v| v.as_u64()) + }) + .unwrap_or(0); // Default to 0 if not found + + // Use fedimint's native subscribe_ln_receive to get current state + let current_state = match lightning_module.subscribe_ln_receive(operation_id).await { + Ok(stream) => { + // Get the current state from the stream - this is fedimint's native approach + let mut stream = stream.into_stream(); + match stream.next().await { + Some(state) => state, + None => { + // If no state is available, try to determine if operation exists + return Err(AppError::new( + StatusCode::NOT_FOUND, + anyhow!("Invoice operation not found or monitoring stream unavailable"), + )); + } + } + } + Err(e) => { + error!( + operation_id = ?operation_id, + federation_id = %query.federation_id, + error = ?e, + "Failed to get invoice status from fedimint native client" + ); + return Err(AppError::new( + StatusCode::INTERNAL_SERVER_ERROR, + anyhow!("Failed to get invoice status: {}", e), + )); + } + }; + + let last_updated = Utc::now(); + let (status, settlement) = match current_state { + LnReceiveState::Created => (InvoiceStatus::Created, None), + LnReceiveState::WaitingForPayment { .. } => (InvoiceStatus::Pending, None), + LnReceiveState::Claimed => { + // NOTE: Fedimint's LnReceiveState::Claimed doesn't include settlement details + // The actual amount received might differ from invoice amount due to fees. + // Using the invoice amount from operation metadata as a reasonable + // approximation. This ensures API consumers receive meaningful data + // rather than 0. + let settlement_info = SettlementInfo { + amount_received_msat: if invoice_amount_msat > 0 { + invoice_amount_msat + } else { + // Log warning if amount is not available + warn!( + operation_id = ?operation_id, + "Invoice amount not found in operation metadata, using 0" + ); + 0 + }, + settled_at: last_updated, // Using current time as approximation + preimage: None, // Not exposed in current API + gateway_fee_msat: None, // Not exposed in current API + }; + ( + InvoiceStatus::Claimed { + amount_received_msat: settlement_info.amount_received_msat, + settled_at: settlement_info.settled_at, + }, + Some(settlement_info), + ) + } + LnReceiveState::Canceled { reason } => ( + InvoiceStatus::Canceled { + reason: reason.to_string(), + canceled_at: last_updated, + }, + None, + ), + LnReceiveState::Funded => (InvoiceStatus::Pending, None), + LnReceiveState::AwaitingFunds => (InvoiceStatus::Pending, None), + // Note: All LnReceiveState variants are now explicitly handled + // If new variants are added to fedimint, compilation will fail here + }; + + span.record("status", &format!("{:?}", status)); + + info!( + operation_id = ?operation_id, + federation_id = %query.federation_id, + status = ?status, + "Retrieved invoice status using fedimint native behavior" + ); + + Ok(StatusResponse { + invoice_id: None, // Invoice ID is provided separately when looked up by invoice_id + operation_id, + status, + settlement, + last_updated, + }) +} + +pub async fn handle_ws(state: AppState, v: Value) -> Result { + #[derive(Deserialize)] + struct WSRequest { + operation_id: OperationId, + federation_id: FederationId, + } + + let req = serde_json::from_value::(v) + .map_err(|e| AppError::new(StatusCode::BAD_REQUEST, anyhow!("Invalid request: {}", e)))?; + + let client = state.get_client(req.federation_id).await?; + let query = StatusQuery { + federation_id: req.federation_id, + }; + let status = _get_status(client, req.operation_id, query).await?; + Ok(json!(status)) +} + +/// REST endpoint for status query by operation ID +#[axum_macros::debug_handler] +pub async fn handle_rest_by_operation_id( + State(state): State, + Path(operation_id_str): Path, + Query(query): Query, +) -> Result, AppError> { + let operation_id = operation_id_str.parse::().map_err(|e| { + AppError::new( + StatusCode::BAD_REQUEST, + anyhow!("Invalid operation ID: {}", e), + ) + })?; + + let client = state.get_client(query.federation_id).await?; + let status = _get_status(client, operation_id, query).await?; + Ok(Json(status)) +} + +/// Bulk status query for multiple invoices +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct BulkStatusRequest { + pub federation_id: FederationId, + pub operation_ids: Vec, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct BulkStatusResponse { + pub statuses: Vec, + pub errors: Vec, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct BulkStatusItem { + pub operation_id: OperationId, + pub status: InvoiceStatus, + pub settlement: Option, + pub last_updated: chrono::DateTime, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct BulkStatusError { + pub operation_id: OperationId, + pub error: String, +} + +/// Bulk status endpoint for querying multiple invoices efficiently +#[axum_macros::debug_handler] +pub async fn handle_bulk_status( + State(state): State, + Json(req): Json, +) -> Result, AppError> { + let client = state.get_client(req.federation_id).await?; + let mut statuses = Vec::new(); + let mut errors = Vec::new(); + + info!( + federation_id = %req.federation_id, + operation_count = req.operation_ids.len(), + "Processing bulk status request using fedimint native behavior" + ); + + // Process each operation ID + for operation_id in req.operation_ids { + let query = StatusQuery { + federation_id: req.federation_id, + }; + + match _get_status(client.clone(), operation_id, query).await { + Ok(status_response) => { + statuses.push(BulkStatusItem { + operation_id, + status: status_response.status, + settlement: status_response.settlement, + last_updated: status_response.last_updated, + }); + } + Err(e) => { + error!( + operation_id = ?operation_id, + federation_id = %req.federation_id, + error = ?e, + "Failed to get status for operation in bulk request" + ); + errors.push(BulkStatusError { + operation_id, + error: e.to_string(), + }); + } + } + } + + info!( + federation_id = %req.federation_id, + successful_count = statuses.len(), + error_count = errors.len(), + "Completed bulk status request" + ); + + Ok(Json(BulkStatusResponse { statuses, errors })) +} diff --git a/src/router/handlers/ln/stream.rs b/src/router/handlers/ln/stream.rs new file mode 100644 index 0000000..5cf28f7 --- /dev/null +++ b/src/router/handlers/ln/stream.rs @@ -0,0 +1,355 @@ +use std::convert::Infallible; +use std::time::Duration; + +use anyhow::anyhow; +use axum::extract::{Path, Query, State}; +use axum::http::StatusCode; +use axum::response::sse::{Event, KeepAlive, Sse}; +use axum::response::{IntoResponse, Response}; +use chrono::Utc; +use fedimint_client::ClientHandleArc; +use fedimint_core::config::FederationId; +use fedimint_core::core::OperationId; +use fedimint_ln_client::{LightningClientModule, LnReceiveState}; +use futures_util::stream::Stream; +use serde::Deserialize; +use tokio_stream::wrappers::IntervalStream; +use tracing::{error, info, warn}; + +use crate::error::AppError; +use crate::router::handlers::ln::invoice::{InvoiceStatus, InvoiceStatusUpdate, SettlementInfo}; +use crate::state::AppState; + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct StreamQuery { + pub federation_id: FederationId, + /// Heartbeat interval in seconds (default: 30) + pub heartbeat_interval: Option, + /// Stream timeout in seconds (default: 600) + pub timeout_seconds: Option, +} + +/// Create a unified invoice status stream using fedimint's native +/// subscribe_ln_receive +async fn create_unified_invoice_stream( + client: ClientHandleArc, + operation_id: OperationId, + heartbeat_interval: Duration, + timeout: Duration, +) -> impl Stream> { + let lightning_module = match client.get_first_module::() { + Ok(module) => module, + Err(e) => { + error!( + operation_id = ?operation_id, + error = ?e, + "Failed to get lightning module for unified streaming" + ); + return tokio_stream::empty().boxed(); + } + }; + + // Try to get the invoice amount from operation metadata + let invoice_amount_msat = client + .operation_log() + .get_operation(operation_id) + .await + .and_then(|op| { + // Extract amount from operation metadata if available + op.meta::() + .get("amount") + .and_then(|v| v.as_u64()) + }) + .unwrap_or(0); // Default to 0 if not found + + // Use fedimint's native subscribe_ln_receive for real-time monitoring + let updates_stream = match lightning_module.subscribe_ln_receive(operation_id).await { + Ok(stream) => stream.into_stream(), + Err(e) => { + error!( + operation_id = ?operation_id, + error = ?e, + "Failed to subscribe to fedimint native invoice updates" + ); + return tokio_stream::empty().boxed(); + } + }; + + info!( + operation_id = ?operation_id, + timeout_secs = timeout.as_secs(), + heartbeat_interval_secs = heartbeat_interval.as_secs(), + "Started unified invoice stream using fedimint native behavior" + ); + + // Create heartbeat stream for connection keepalive + use futures_util::stream::{self, StreamExt}; + let heartbeat_stream = IntervalStream::new(tokio::time::interval(heartbeat_interval)) + .map(|_| Ok::<_, Infallible>(Event::default().event("heartbeat").data("ping"))); + + // Convert fedimint LnReceiveState updates to unified SSE events + let invoice_updates_stream = updates_stream.map(move |ln_state| { + let updated_at = Utc::now(); + let (status, settlement_info) = + fedimint_state_to_unified_status(ln_state, updated_at, invoice_amount_msat); + + let update = InvoiceStatusUpdate { + invoice_id: format!("inv_{:?}", operation_id), // Generate consistent invoice_id + operation_id, + status, + settlement: settlement_info, + updated_at, + }; + + match serde_json::to_string(&update) { + Ok(json_data) => { + info!( + operation_id = ?operation_id, + status = ?update.status, + "Sending unified invoice status update via native fedimint stream" + ); + Ok::<_, Infallible>(Event::default().event("invoice_update").data(json_data)) + } + Err(e) => { + error!( + operation_id = ?operation_id, + error = ?e, + "Failed to serialize unified invoice update" + ); + Ok::<_, Infallible>( + Event::default() + .event("error") + .data(format!("Serialization error: {}", e)), + ) + } + } + }); + + // Create timeout stream using futures_util for consistency + let timeout_stream = stream::once(async move { + tokio::time::sleep(timeout).await; + warn!( + operation_id = ?operation_id, + timeout_secs = timeout.as_secs(), + "Unified invoice stream timed out" + ); + Ok::<_, Infallible>(Event::default().event("timeout").data(format!( + "{{\"message\":\"Stream timed out after {} seconds\",\"timeout_seconds\":{}}}", + timeout.as_secs(), + timeout.as_secs() + ))) + }); + + // Select from all streams concurrently + let combined_stream = stream::select_all(vec![ + heartbeat_stream.boxed(), + invoice_updates_stream.boxed(), + timeout_stream.boxed(), + ]); + + Box::pin(combined_stream) +} + +/// Convert fedimint LnReceiveState to unified status representation +fn fedimint_state_to_unified_status( + ln_state: LnReceiveState, + updated_at: chrono::DateTime, + invoice_amount_msat: u64, +) -> (InvoiceStatus, Option) { + match ln_state { + LnReceiveState::Created => (InvoiceStatus::Created, None), + LnReceiveState::WaitingForPayment { .. } => (InvoiceStatus::Pending, None), + LnReceiveState::Claimed => { + // NOTE: Fedimint's LnReceiveState::Claimed doesn't include settlement details + // The actual amount received might differ from invoice amount due to fees. + // Using the invoice amount from operation metadata as a reasonable + // approximation. This ensures real-time monitoring receives + // meaningful data rather than 0. + let settlement_info = SettlementInfo { + amount_received_msat: if invoice_amount_msat > 0 { + invoice_amount_msat + } else { + // Log warning if amount is not available + warn!("Invoice amount not found in operation metadata for stream, using 0"); + 0 + }, + settled_at: updated_at, // Using update time as approximation + preimage: None, // Not exposed in current API + gateway_fee_msat: None, // Not exposed in current API + }; + ( + InvoiceStatus::Claimed { + amount_received_msat: settlement_info.amount_received_msat, + settled_at: settlement_info.settled_at, + }, + Some(settlement_info), + ) + } + LnReceiveState::Canceled { reason } => ( + InvoiceStatus::Canceled { + reason: reason.to_string(), + canceled_at: updated_at, + }, + None, + ), + LnReceiveState::Funded => (InvoiceStatus::Pending, None), + LnReceiveState::AwaitingFunds => (InvoiceStatus::Pending, None), + // Note: All LnReceiveState variants are now explicitly handled + // If new variants are added to fedimint, compilation will fail here + } +} + +/// Unified invoice stream endpoint - supports both operation_id and invoice_id +#[axum_macros::debug_handler] +pub async fn handle_operation_stream( + State(state): State, + Path(operation_id_str): Path, + Query(query): Query, +) -> Result { + let operation_id = operation_id_str.parse::().map_err(|e| { + AppError::new( + StatusCode::BAD_REQUEST, + anyhow!("Invalid operation ID: {}", e), + ) + })?; + + let client = state.get_client(query.federation_id).await?; + let heartbeat_interval = Duration::from_secs(query.heartbeat_interval.unwrap_or(30)); + let timeout = Duration::from_secs(query.timeout_seconds.unwrap_or(600)); + + info!( + operation_id = ?operation_id, + federation_id = %query.federation_id, + heartbeat_interval_secs = heartbeat_interval.as_secs(), + timeout_secs = timeout.as_secs(), + "Starting unified invoice stream for operation" + ); + + let stream = + create_unified_invoice_stream(client, operation_id, heartbeat_interval, timeout).await; + + let sse = Sse::new(stream).keep_alive( + KeepAlive::new() + .interval(heartbeat_interval) + .text("keep-alive"), + ); + + Ok(sse.into_response()) +} + +/// Multi-invoice event stream from the event bus +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct EventStreamQuery { + pub federation_id: FederationId, + /// Filter by specific invoice/operation IDs (optional) + pub filter_ids: Option>, + /// Heartbeat interval in seconds (default: 30) + pub heartbeat_interval: Option, +} + +/// Global event stream for all invoices in a federation +#[axum_macros::debug_handler] +pub async fn handle_global_event_stream( + State(state): State, + Query(query): Query, +) -> Result { + let mut event_receiver = state.event_bus.subscribe(); + let federation_id = query.federation_id.to_string(); + let heartbeat_interval = Duration::from_secs(query.heartbeat_interval.unwrap_or(30)); + let filter_ids = query.filter_ids.unwrap_or_default(); + + info!( + federation_id = %federation_id, + filter_count = filter_ids.len(), + "Starting unified global invoice event stream" + ); + + let stream = async_stream::stream! { + let mut heartbeat_interval = tokio::time::interval(heartbeat_interval); + + loop { + tokio::select! { + _ = heartbeat_interval.tick() => { + yield Ok::<_, Infallible>(Event::default() + .event("heartbeat") + .data("ping")); + } + event_result = event_receiver.recv() => { + match event_result { + Ok(event) => { + // Filter events for the requested federation + let should_send = match &event { + crate::events::FmcdEvent::InvoiceCreated { federation_id: fid, .. } => { + fid == &federation_id && (filter_ids.is_empty() || + filter_ids.iter().any(|id| event.contains_id(id))) + }, + crate::events::FmcdEvent::InvoicePaid { federation_id: fid, .. } => { + fid == &federation_id && (filter_ids.is_empty() || + filter_ids.iter().any(|id| event.contains_id(id))) + }, + crate::events::FmcdEvent::InvoiceExpired { federation_id: fid, .. } => { + fid == &federation_id && (filter_ids.is_empty() || + filter_ids.iter().any(|id| event.contains_id(id))) + }, + _ => false, + }; + + if should_send { + match serde_json::to_string(&event) { + Ok(json_data) => { + yield Ok::<_, Infallible>(Event::default() + .event("invoice_event") + .data(json_data)); + } + Err(e) => { + warn!(error = ?e, "Failed to serialize unified event"); + yield Ok::<_, Infallible>(Event::default() + .event("error") + .data(format!("Serialization error: {}", e))); + } + } + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => { + warn!(skipped = skipped, "Unified event stream lagged"); + yield Ok::<_, Infallible>(Event::default() + .event("warning") + .data(format!("Stream lagged, {} events skipped", skipped))); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + info!("Unified event bus closed, ending stream"); + break; + } + } + } + } + } + }; + + let sse = Sse::new(stream).keep_alive( + KeepAlive::new() + .interval(heartbeat_interval) + .text("keep-alive"), + ); + + Ok(sse.into_response()) +} + +// TODO: Add helper method to FmcdEvent for ID filtering +trait EventIdFilter { + fn contains_id(&self, id: &str) -> bool; +} + +impl EventIdFilter for crate::events::FmcdEvent { + fn contains_id(&self, id: &str) -> bool { + match self { + // Match invoice_id where available, operation_id for InvoicePaid + crate::events::FmcdEvent::InvoiceCreated { invoice_id, .. } => invoice_id == id, + crate::events::FmcdEvent::InvoicePaid { operation_id, .. } => operation_id == id, + crate::events::FmcdEvent::InvoiceExpired { invoice_id, .. } => invoice_id == id, + _ => false, + } + } +} diff --git a/src/router/handlers/onchain/deposit_address.rs b/src/router/handlers/onchain/deposit_address.rs index fdc8504..3eaaab0 100644 --- a/src/router/handlers/onchain/deposit_address.rs +++ b/src/router/handlers/onchain/deposit_address.rs @@ -6,7 +6,6 @@ use chrono::Utc; use fedimint_client::ClientHandleArc; use fedimint_core::config::FederationId; use fedimint_core::core::OperationId; -use fedimint_ln_common::bitcoin::Address; use fedimint_wallet_client::client_db::TweakIdx; use fedimint_wallet_client::WalletClientModule; use serde::{Deserialize, Serialize}; @@ -105,6 +104,31 @@ async fn _deposit_address( } } + // Register with payment lifecycle manager for automatic ecash claiming + if let Some(ref payment_lifecycle_manager) = state.payment_lifecycle_manager { + if let Err(e) = payment_lifecycle_manager + .track_onchain_deposit( + operation_id, + req.federation_id, + context.as_ref().map(|c| c.correlation_id.clone()), + ) + .await + { + tracing::warn!( + operation_id = ?operation_id, + federation_id = %req.federation_id, + error = ?e, + "Failed to register deposit with payment lifecycle manager" + ); + } else { + tracing::info!( + operation_id = ?operation_id, + federation_id = %req.federation_id, + "Deposit registered with payment lifecycle manager for automatic ecash claiming" + ); + } + } + info!( federation_id = %req.federation_id, operation_id = ?operation_id, diff --git a/src/router/handlers/onchain/withdraw.rs b/src/router/handlers/onchain/withdraw.rs index d4e3aa3..2cf800c 100644 --- a/src/router/handlers/onchain/withdraw.rs +++ b/src/router/handlers/onchain/withdraw.rs @@ -4,7 +4,6 @@ use anyhow::anyhow; use axum::extract::{Extension, State}; use axum::http::StatusCode; use axum::Json; -use bitcoin::address::NetworkUnchecked; use bitcoin::{Address, Amount, Txid}; use chrono::Utc; use fedimint_client::ClientHandleArc; @@ -107,6 +106,25 @@ async fn _withdraw( "Withdrawal initiated" ); + // Register with payment lifecycle manager for comprehensive monitoring + if let Some(ref payment_lifecycle_manager) = state.payment_lifecycle_manager { + if let Err(e) = payment_lifecycle_manager + .track_onchain_withdraw(operation_id, req.federation_id, amount.to_sat()) + .await + { + error!( + operation_id = ?operation_id, + error = ?e, + "Failed to register withdrawal with payment lifecycle manager" + ); + } else { + info!( + operation_id = ?operation_id, + "Withdrawal registered with payment lifecycle manager for monitoring" + ); + } + } + let mut updates = wallet_module .subscribe_withdraw_updates(operation_id) .await? @@ -117,15 +135,15 @@ async fn _withdraw( match update { WithdrawState::Succeeded(txid) => { - // Emit withdrawal completed event - let withdrawal_completed_event = FmcdEvent::WithdrawalCompleted { + // Emit withdrawal succeeded event + let withdrawal_succeeded_event = FmcdEvent::WithdrawalSucceeded { operation_id: format!("{:?}", operation_id), federation_id: req.federation_id.to_string(), + amount_sat: amount.to_sat(), txid: txid.to_string(), - correlation_id: Some(context.correlation_id.clone()), timestamp: Utc::now(), }; - if let Err(e) = state.event_bus.publish(withdrawal_completed_event).await { + if let Err(e) = state.event_bus.publish(withdrawal_succeeded_event).await { error!( operation_id = ?operation_id, correlation_id = %context.correlation_id, diff --git a/src/router/ws.rs b/src/router/ws.rs index 0d6a65d..d80ceea 100644 --- a/src/router/ws.rs +++ b/src/router/ws.rs @@ -7,10 +7,12 @@ use futures_util::stream::StreamExt; use serde::{Deserialize, Serialize}; use serde_json::Value; use tracing::{info, warn}; +use uuid::Uuid; use super::handlers; use crate::auth::{AuthenticatedMessage, WebSocketAuth}; use crate::error::AppError; +use crate::observability::correlation::RequestContext; use crate::state::AppState; const JSONRPC_VERSION: &str = "2.0"; @@ -70,10 +72,10 @@ pub enum JsonRpcMethod { MintValidate, MintSplit, MintCombine, + // Lightning API methods LnInvoice, - LnInvoiceExternalPubkeyTweaked, - LnAwaitInvoice, - LnClaimExternalReceiveTweaked, + LnStatus, + LnStatusBulk, LnPay, LnListGateways, WalletDepositAddress, @@ -86,9 +88,18 @@ async fn handle_socket( state: AppState, auth: Arc, ) -> Result<(), anyhow::Error> { + // Create a session-level correlation ID for this WebSocket connection + let session_correlation_id = Uuid::new_v4().to_string(); + while let Some(Ok(msg)) = socket.next().await { if let Message::Text(text) = msg { - info!("Received WebSocket message"); + // Create a new RequestContext for each message, using session correlation ID + let message_context = RequestContext::new(Some(session_correlation_id.clone())); + info!( + correlation_id = %message_context.correlation_id, + request_id = %message_context.request_id, + "Received WebSocket message" + ); // If authentication is enabled, expect authenticated messages if auth.is_enabled() { @@ -117,7 +128,7 @@ async fn handle_socket( } }; - let res = match_method(req.clone(), state.clone()).await; + let res = match_method(req.clone(), state.clone(), message_context.clone()).await; let res_msg = create_json_rpc_response(res, req.id); // Send response as authenticated message @@ -137,7 +148,7 @@ async fn handle_socket( } }; - let res = match_method(req.clone(), state.clone()).await; + let res = match_method(req.clone(), state.clone(), message_context.clone()).await; let res_msg = create_json_rpc_response(res, req.id); let response_text = serde_json::to_string(&res_msg)?; socket.send(Message::Text(response_text)).await?; @@ -214,17 +225,21 @@ async fn send_auth_error(socket: &mut WebSocket, message: &str) -> Result<(), an Ok(()) } -async fn match_method(req: JsonRpcRequest, state: AppState) -> Result { +async fn match_method( + req: JsonRpcRequest, + state: AppState, + context: RequestContext, +) -> Result { match req.method { JsonRpcMethod::AdminBackup => { handlers::admin::backup::handle_ws(state.clone(), req.params).await } JsonRpcMethod::AdminConfig => handlers::admin::config::handle_ws(state.clone()).await, JsonRpcMethod::AdminDiscoverVersion => { - handlers::admin::discover_version::handle_ws(state.clone()).await + handlers::admin::version::handle_ws(state.clone()).await } JsonRpcMethod::AdminFederationIds => { - handlers::admin::federation_ids::handle_ws(state.clone(), req.params).await + handlers::admin::federations::handle_ws(state.clone(), req.params).await } JsonRpcMethod::AdminInfo => { handlers::admin::info::handle_ws(state.clone(), req.params).await @@ -239,7 +254,7 @@ async fn match_method(req: JsonRpcRequest, state: AppState) -> Result { - handlers::admin::list_operations::handle_ws(state.clone(), req.params).await + handlers::admin::operations::handle_ws(state.clone(), req.params).await } JsonRpcMethod::MintDecodeNotes => handlers::mint::decode_notes::handle_ws(req.params).await, JsonRpcMethod::MintEncodeNotes => handlers::mint::encode_notes::handle_ws(req.params).await, @@ -254,22 +269,25 @@ async fn match_method(req: JsonRpcRequest, state: AppState) -> Result handlers::mint::split::handle_ws(req.params).await, JsonRpcMethod::MintCombine => handlers::mint::combine::handle_ws(req.params).await, + + // Lightning API methods - aligns with fedimint client 0.8 behavior JsonRpcMethod::LnInvoice => { - handlers::ln::invoice::handle_ws(state.clone(), req.params).await - } - JsonRpcMethod::LnInvoiceExternalPubkeyTweaked => { - handlers::ln::invoice_external_pubkey_tweaked::handle_ws(state.clone(), req.params) - .await - } - JsonRpcMethod::LnAwaitInvoice => { - handlers::ln::await_invoice::handle_ws(state.clone(), req.params).await + handlers::ln::invoice::handle_ws_with_context(state.clone(), req.params, context).await } - JsonRpcMethod::LnClaimExternalReceiveTweaked => { - handlers::ln::claim_external_receive_tweaked::handle_ws(state.clone(), req.params).await + JsonRpcMethod::LnStatus => handlers::ln::status::handle_ws(state.clone(), req.params).await, + JsonRpcMethod::LnStatusBulk => { + // For bulk operations, parse as bulk request + let bulk_req = serde_json::from_value(req.params)?; + let response = handlers::ln::status::handle_bulk_status( + axum::extract::State(state), + axum::Json(bulk_req), + ) + .await?; + Ok(serde_json::to_value(response.0)?) } JsonRpcMethod::LnPay => handlers::ln::pay::handle_ws(state.clone(), req.params).await, JsonRpcMethod::LnListGateways => { - handlers::ln::list_gateways::handle_ws(state.clone(), req.params).await + handlers::ln::gateways::handle_ws(state.clone(), req.params).await } JsonRpcMethod::WalletDepositAddress => { handlers::onchain::deposit_address::handle_ws(state.clone(), req.params).await diff --git a/src/services/deposit_monitor.rs b/src/services/deposit_monitor.rs index 6c405cd..f1045f2 100644 --- a/src/services/deposit_monitor.rs +++ b/src/services/deposit_monitor.rs @@ -1,4 +1,4 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; @@ -7,12 +7,12 @@ use chrono::Utc; use fedimint_client::ClientHandleArc; use fedimint_core::config::FederationId; use fedimint_core::core::OperationId; -use fedimint_ln_common::bitcoin::{Address, OutPoint}; +use fedimint_ln_common::bitcoin::OutPoint; use fedimint_wallet_client::{DepositStateV2, WalletClientModule}; use futures_util::StreamExt; use serde::{Deserialize, Serialize}; use tokio::sync::{broadcast, Mutex, RwLock}; -use tokio::time::{interval, sleep}; +use tokio::time::interval; use tracing::{debug, error, info, instrument, warn}; use crate::events::{EventBus, FmcdEvent}; diff --git a/src/services/mod.rs b/src/services/mod.rs index 297cc59..f24de9b 100644 --- a/src/services/mod.rs +++ b/src/services/mod.rs @@ -1,5 +1,7 @@ pub mod balance_monitor; pub mod deposit_monitor; +pub mod payment_lifecycle; pub use balance_monitor::{BalanceMonitor, BalanceMonitorConfig}; pub use deposit_monitor::{DepositMonitor, DepositMonitorConfig}; +pub use payment_lifecycle::{PaymentLifecycleConfig, PaymentLifecycleManager}; diff --git a/src/services/payment_lifecycle.rs b/src/services/payment_lifecycle.rs new file mode 100644 index 0000000..b221438 --- /dev/null +++ b/src/services/payment_lifecycle.rs @@ -0,0 +1,1063 @@ +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::{anyhow, Result}; +use chrono::Utc; +use fedimint_client::ClientHandleArc; +use fedimint_core::config::FederationId; +use fedimint_core::core::OperationId; +use fedimint_core::Amount; +use fedimint_ln_client::{LightningClientModule, LnPayState, LnReceiveState}; +use fedimint_wallet_client::{DepositStateV2, WalletClientModule, WithdrawState}; +use futures_util::StreamExt; +use serde::{Deserialize, Serialize}; +use tokio::sync::{broadcast, Mutex, RwLock}; +use tokio::time::interval; +use tracing::{debug, error, info, instrument, warn}; + +use crate::events::{EventBus, FmcdEvent}; +use crate::multimint::MultiMint; + +/// Type of payment operation +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub enum PaymentType { + LightningReceive, + LightningPay, + OnchainDeposit, + OnchainWithdraw, +} + +/// Information about a payment operation being tracked +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PaymentOperation { + pub operation_id: OperationId, + pub federation_id: FederationId, + pub payment_type: PaymentType, + pub amount_msat: Option, + pub created_at: chrono::DateTime, + pub metadata: Option, + pub correlation_id: Option, + /// Track if we've already attempted to claim the ecash + pub claim_attempted: bool, + /// Track if ecash was successfully claimed + pub ecash_claimed: bool, +} + +/// Configuration for the payment lifecycle manager +#[derive(Debug, Clone)] +pub struct PaymentLifecycleConfig { + /// How often to poll for operation updates (default: 5 seconds) + pub poll_interval: Duration, + /// How long to monitor an operation before giving up (default: 24 hours) + pub operation_timeout: Duration, + /// Maximum number of operations to monitor simultaneously per federation + pub max_operations_per_federation: usize, + /// How long to wait for ecash claiming after payment is received (default: + /// 30 seconds) + pub claim_timeout: Duration, +} + +impl Default for PaymentLifecycleConfig { + fn default() -> Self { + Self { + poll_interval: Duration::from_secs(5), + operation_timeout: Duration::from_secs(24 * 60 * 60), // 24 hours + max_operations_per_federation: 1000, + claim_timeout: Duration::from_secs(30), + } + } +} + +/// Service that manages the complete lifecycle of payment operations +#[derive(Debug)] +pub struct PaymentLifecycleManager { + event_bus: Arc, + multimint: Arc, + config: PaymentLifecycleConfig, + active_operations: Arc>>, + shutdown_tx: Arc>>>, +} + +impl PaymentLifecycleManager { + /// Create a new payment lifecycle manager + pub fn new( + event_bus: Arc, + multimint: Arc, + config: PaymentLifecycleConfig, + ) -> Self { + Self { + event_bus, + multimint, + config, + active_operations: Arc::new(RwLock::new(HashMap::new())), + shutdown_tx: Arc::new(Mutex::new(None)), + } + } + + /// Start the payment lifecycle management service + #[instrument(skip(self))] + pub async fn start(&self) -> Result<()> { + let (shutdown_tx, _) = broadcast::channel(1); + { + let mut tx_guard = self.shutdown_tx.lock().await; + *tx_guard = Some(shutdown_tx.clone()); + } + + info!( + poll_interval_secs = self.config.poll_interval.as_secs(), + operation_timeout_hours = self.config.operation_timeout.as_secs() / 3600, + claim_timeout_secs = self.config.claim_timeout.as_secs(), + "Starting payment lifecycle manager service" + ); + + // First, recover any pending operations from all federations + if let Err(e) = self.recover_pending_operations().await { + error!(error = ?e, "Failed to recover pending operations on startup"); + } + + // Clone necessary data for the monitoring task + let event_bus = self.event_bus.clone(); + let multimint = self.multimint.clone(); + let active_operations = self.active_operations.clone(); + let poll_interval = self.config.poll_interval; + let operation_timeout = self.config.operation_timeout; + let claim_timeout = self.config.claim_timeout; + + // Spawn the monitoring task + tokio::spawn(async move { + let mut shutdown_rx = shutdown_tx.subscribe(); + let mut poll_timer = interval(poll_interval); + + loop { + tokio::select! { + _ = poll_timer.tick() => { + if let Err(e) = Self::process_all_operations( + &event_bus, + &multimint, + &active_operations, + operation_timeout, + claim_timeout, + ).await { + error!(error = ?e, "Error during payment lifecycle processing"); + } + } + _ = shutdown_rx.recv() => { + info!("Payment lifecycle manager received shutdown signal"); + break; + } + } + } + + info!("Payment lifecycle manager service stopped"); + }); + + Ok(()) + } + + /// Stop the payment lifecycle management service + pub async fn stop(&self) -> Result<()> { + let tx_guard = self.shutdown_tx.lock().await; + if let Some(shutdown_tx) = tx_guard.as_ref() { + let _ = shutdown_tx.send(()); + } + Ok(()) + } + + /// Add a new Lightning receive operation to track + #[instrument(skip(self))] + pub async fn track_lightning_receive( + &self, + operation_id: OperationId, + federation_id: FederationId, + amount_msat: Amount, + metadata: Option, + correlation_id: Option, + ) -> Result<()> { + let operation = PaymentOperation { + operation_id, + federation_id, + payment_type: PaymentType::LightningReceive, + amount_msat: Some(amount_msat), + created_at: Utc::now(), + metadata, + correlation_id, + claim_attempted: false, + ecash_claimed: false, + }; + + self.add_operation(operation).await + } + + /// Add a new Lightning pay operation to track + #[instrument(skip(self))] + pub async fn track_lightning_pay( + &self, + operation_id: OperationId, + federation_id: FederationId, + amount_msat: Amount, + metadata: Option, + ) -> Result<()> { + let operation = PaymentOperation { + operation_id, + federation_id, + payment_type: PaymentType::LightningPay, + amount_msat: Some(amount_msat), + created_at: Utc::now(), + metadata, + correlation_id: None, + claim_attempted: false, + ecash_claimed: false, + }; + + self.add_operation(operation).await + } + + /// Add a new onchain deposit operation to track + #[instrument(skip(self))] + pub async fn track_onchain_deposit( + &self, + operation_id: OperationId, + federation_id: FederationId, + correlation_id: Option, + ) -> Result<()> { + let operation = PaymentOperation { + operation_id, + federation_id, + payment_type: PaymentType::OnchainDeposit, + amount_msat: None, // Will be determined when deposit is confirmed + created_at: Utc::now(), + metadata: None, + correlation_id, + claim_attempted: false, + ecash_claimed: false, + }; + + self.add_operation(operation).await + } + + /// Add a new onchain withdrawal operation to track + #[instrument(skip(self))] + pub async fn track_onchain_withdraw( + &self, + operation_id: OperationId, + federation_id: FederationId, + amount_sat: u64, + ) -> Result<()> { + let operation = PaymentOperation { + operation_id, + federation_id, + payment_type: PaymentType::OnchainWithdraw, + amount_msat: Some(Amount::from_sats(amount_sat)), + created_at: Utc::now(), + metadata: None, + correlation_id: None, + claim_attempted: false, + ecash_claimed: false, + }; + + self.add_operation(operation).await + } + + /// Add an operation to track + async fn add_operation(&self, operation: PaymentOperation) -> Result<()> { + let operation_id = operation.operation_id; + let federation_id = operation.federation_id; + let payment_type = operation.payment_type.clone(); + + // Check federation limit + { + let operations = self.active_operations.read().await; + let federation_count = operations + .values() + .filter(|op| op.federation_id == federation_id) + .count(); + + if federation_count >= self.config.max_operations_per_federation { + return Err(anyhow!( + "Federation {} has reached maximum operations limit ({})", + federation_id, + self.config.max_operations_per_federation + )); + } + } + + // Add to active operations + { + let mut operations = self.active_operations.write().await; + operations.insert(operation_id, operation); + } + + info!( + operation_id = ?operation_id, + federation_id = %federation_id, + payment_type = ?payment_type, + "Added payment operation to lifecycle tracking" + ); + + Ok(()) + } + + /// Recover pending operations from all federations on startup + #[instrument(skip(self))] + async fn recover_pending_operations(&self) -> Result<()> { + info!("Recovering pending operations from all federations"); + + let clients = self.multimint.clients.lock().await.clone(); + let mut total_recovered = 0; + + for (federation_id, client) in clients.iter() { + match self + .recover_federation_operations(client, *federation_id) + .await + { + Ok(count) => { + if count > 0 { + info!( + federation_id = %federation_id, + recovered_operations = count, + "Recovered pending operations from federation" + ); + total_recovered += count; + } + } + Err(e) => { + error!( + federation_id = %federation_id, + error = ?e, + "Failed to recover operations from federation" + ); + } + } + } + + info!( + total_recovered_operations = total_recovered, + "Completed recovery of pending operations" + ); + + Ok(()) + } + + /// Recover pending operations from a specific federation + async fn recover_federation_operations( + &self, + client: &ClientHandleArc, + federation_id: FederationId, + ) -> Result { + let operations = client + .operation_log() + .paginate_operations_rev(100, None) // Get last 100 operations + .await; + + let mut recovered_count = 0; + + for (key, value) in operations { + let operation_id = key.operation_id; + let operation_kind = value.operation_module_kind(); + let created_at = chrono::DateTime::::from(key.creation_time); + + // Check if operation is still within timeout window + let age = Utc::now().signed_duration_since(created_at); + if age.to_std().unwrap_or_default() > self.config.operation_timeout { + debug!( + operation_id = ?operation_id, + age_hours = age.num_hours(), + "Skipping expired operation during recovery" + ); + continue; + } + + // Determine payment type based on operation kind and metadata + // Also check the operation type string in metadata + let meta = value.meta::(); + let variant = meta.get("variant").and_then(|v| v.as_str()); + let op_type = meta.get("type").and_then(|v| v.as_str()); + + let payment_type = match operation_kind.as_ref() { + "ln" => { + // Check multiple metadata fields to determine operation type + if variant == Some("receive") + || op_type == Some("ln_receive") + || meta.get("invoice").is_some() + { + Some(PaymentType::LightningReceive) + } else if variant == Some("pay") + || op_type == Some("ln_pay") + || meta.get("payment_hash").is_some() + { + Some(PaymentType::LightningPay) + } else { + // Log unrecognized Lightning operation for debugging + debug!( + operation_id = ?operation_id, + metadata = ?meta, + "Unrecognized Lightning operation type during recovery" + ); + None + } + } + "wallet" => { + // Check multiple metadata fields to determine operation type + if variant == Some("deposit") + || op_type == Some("deposit") + || meta.get("address").is_some() + { + Some(PaymentType::OnchainDeposit) + } else if variant == Some("withdraw") + || op_type == Some("withdraw") + || meta.get("recipient").is_some() + { + Some(PaymentType::OnchainWithdraw) + } else { + // Log unrecognized wallet operation for debugging + debug!( + operation_id = ?operation_id, + metadata = ?meta, + "Unrecognized wallet operation type during recovery" + ); + None + } + } + _ => None, + }; + + // Skip operations that have completed outcomes ONLY if they're truly complete + // For receive operations, we still want to track them if ecash hasn't been + // claimed + if let Some(_outcome) = value.outcome::() { + // Check if this is a successful outcome that means ecash was already claimed + let is_truly_complete = match &payment_type { + Some(PaymentType::LightningReceive) | Some(PaymentType::OnchainDeposit) => { + // For receive operations, check if the outcome indicates successful + // claiming - any non-null outcome means the operation is complete + true + } + _ => true, // For other operations, having an outcome means they're complete + }; + + if is_truly_complete { + debug!( + operation_id = ?operation_id, + payment_type = ?payment_type, + "Skipping completed operation during recovery" + ); + continue; + } + } + + if let Some(payment_type) = payment_type { + let operation = PaymentOperation { + operation_id, + federation_id, + payment_type: payment_type.clone(), + amount_msat: value + .meta::() + .get("amount_msat") + .and_then(|v| v.as_u64()) + .map(Amount::from_msats), + created_at, + metadata: Some(value.meta()), + correlation_id: None, + claim_attempted: false, + ecash_claimed: false, + }; + + // Add to active operations + let mut operations = self.active_operations.write().await; + operations.insert(operation_id, operation); + recovered_count += 1; + + debug!( + operation_id = ?operation_id, + payment_type = ?payment_type, + "Recovered pending operation" + ); + } + } + + Ok(recovered_count) + } + + /// Process all active operations + async fn process_all_operations( + event_bus: &Arc, + multimint: &Arc, + active_operations: &Arc>>, + operation_timeout: Duration, + claim_timeout: Duration, + ) -> Result<()> { + let operations_to_process = { + let operations = active_operations.read().await; + operations.clone() + }; + + if operations_to_process.is_empty() { + return Ok(()); + } + + debug!( + active_operations = operations_to_process.len(), + "Processing active payment operations" + ); + + let now = Utc::now(); + let mut completed_operations = Vec::new(); + let mut timed_out_operations = Vec::new(); + + // Group operations by federation for efficient processing + let mut operations_by_federation: HashMap> = + HashMap::new(); + + for (operation_id, operation) in operations_to_process { + // Check timeout + if now + .signed_duration_since(operation.created_at) + .to_std() + .unwrap_or_default() + > operation_timeout + { + timed_out_operations.push(operation_id); + continue; + } + + operations_by_federation + .entry(operation.federation_id) + .or_default() + .push(operation); + } + + // Process operations by federation + for (federation_id, federation_operations) in operations_by_federation { + let client = match multimint.get(&federation_id).await { + Some(client) => client, + None => { + warn!(federation_id = %federation_id, "Federation client not available"); + continue; + } + }; + + for operation in federation_operations { + match operation.payment_type { + PaymentType::LightningReceive => { + let mut operation_mut = operation.clone(); + if let Err(e) = Self::process_lightning_receive( + &client, + &mut operation_mut, + event_bus, + claim_timeout, + ) + .await + { + error!( + operation_id = ?operation_mut.operation_id, + error = ?e, + "Failed to process Lightning receive" + ); + } + + // Update the operation in active_operations with the new state + let mut operations = active_operations.write().await; + if let Some(active_op) = operations.get_mut(&operation_mut.operation_id) { + active_op.claim_attempted = operation_mut.claim_attempted; + active_op.ecash_claimed = operation_mut.ecash_claimed; + } + drop(operations); + + if operation_mut.ecash_claimed { + completed_operations.push(operation_mut.operation_id); + } + } + PaymentType::LightningPay => { + if let Err(e) = + Self::process_lightning_pay(&client, &operation, event_bus).await + { + error!( + operation_id = ?operation.operation_id, + error = ?e, + "Failed to process Lightning pay" + ); + } + } + PaymentType::OnchainDeposit => { + let mut operation_mut = operation.clone(); + if let Err(e) = Self::process_onchain_deposit( + &client, + &mut operation_mut, + event_bus, + claim_timeout, + ) + .await + { + error!( + operation_id = ?operation_mut.operation_id, + error = ?e, + "Failed to process onchain deposit" + ); + } + + // Update the operation in active_operations with the new state + let mut operations = active_operations.write().await; + if let Some(active_op) = operations.get_mut(&operation_mut.operation_id) { + active_op.claim_attempted = operation_mut.claim_attempted; + active_op.ecash_claimed = operation_mut.ecash_claimed; + active_op.amount_msat = operation_mut.amount_msat; + } + drop(operations); + + if operation_mut.ecash_claimed { + completed_operations.push(operation_mut.operation_id); + } + } + PaymentType::OnchainWithdraw => { + if let Err(e) = + Self::process_onchain_withdraw(&client, &operation, event_bus).await + { + error!( + operation_id = ?operation.operation_id, + error = ?e, + "Failed to process onchain withdrawal" + ); + } + } + } + } + } + + // Remove completed and timed out operations + if !completed_operations.is_empty() || !timed_out_operations.is_empty() { + let mut operations = active_operations.write().await; + + for operation_id in &completed_operations { + operations.remove(operation_id); + info!(operation_id = ?operation_id, "Payment operation completed successfully"); + } + + for operation_id in &timed_out_operations { + operations.remove(operation_id); + warn!(operation_id = ?operation_id, "Payment operation timed out"); + } + } + + Ok(()) + } + + /// Process a Lightning receive operation - most importantly, claim the + /// ecash! + async fn process_lightning_receive( + client: &ClientHandleArc, + operation: &mut PaymentOperation, + event_bus: &Arc, + _claim_timeout: Duration, + ) -> Result<()> { + let lightning_module = client.get_first_module::()?; + + // Subscribe to updates for this operation + let mut updates = lightning_module + .subscribe_ln_receive(operation.operation_id) + .await? + .into_stream(); + + // Process all available states to get to the current state + let mut last_state = None; + let timeout_future = tokio::time::sleep(Duration::from_millis(100)); + tokio::pin!(timeout_future); + + loop { + tokio::select! { + update = updates.next() => { + match update { + Some(state) => { + last_state = Some(state); + continue; + } + None => break, + } + } + _ = &mut timeout_future => { + break; + } + } + } + + let current_state = match last_state { + Some(state) => state, + None => { + debug!( + operation_id = ?operation.operation_id, + "No state updates available for Lightning receive" + ); + return Ok(()); + } + }; + + match current_state { + LnReceiveState::Claimed => { + // CRITICAL INSIGHT: When LnReceiveState::Claimed is reached, + // the ecash notes have ALREADY been issued to the wallet! + // The "Claimed" state means the full payment flow is complete: + // 1. Gateway received the Lightning payment + // 2. Federation issued ecash notes + // 3. Notes are now in the client's wallet + + if !operation.ecash_claimed { + info!( + operation_id = ?operation.operation_id, + amount_msat = ?operation.amount_msat, + "Lightning payment successfully claimed - ecash notes received in wallet!" + ); + + // Mark as successfully claimed + operation.ecash_claimed = true; + operation.claim_attempted = true; + + // Publish success event + if let Some(amount) = operation.amount_msat { + let event = FmcdEvent::InvoicePaid { + operation_id: format!("{:?}", operation.operation_id), + federation_id: operation.federation_id.to_string(), + amount_msat: amount.msats, + correlation_id: operation.correlation_id.clone(), + timestamp: Utc::now(), + }; + let _ = event_bus.publish(event).await; + } + + // Verify balance was actually updated + let balance = client.get_balance().await; + info!( + operation_id = ?operation.operation_id, + balance_msat = balance.msats, + "Wallet balance after Lightning receive" + ); + } + } + LnReceiveState::Canceled { reason } => { + warn!( + operation_id = ?operation.operation_id, + reason = %reason, + "Lightning receive canceled" + ); + operation.claim_attempted = true; + } + LnReceiveState::WaitingForPayment { .. } => { + debug!( + operation_id = ?operation.operation_id, + "Lightning invoice waiting for payment" + ); + } + _ => { + debug!( + operation_id = ?operation.operation_id, + state = ?current_state, + "Lightning receive in intermediate state" + ); + } + } + + Ok(()) + } + + /// Process a Lightning pay operation + async fn process_lightning_pay( + client: &ClientHandleArc, + operation: &PaymentOperation, + event_bus: &Arc, + ) -> Result<()> { + let lightning_module = client.get_first_module::()?; + + // Check current state + let mut updates = lightning_module + .subscribe_ln_pay(operation.operation_id) + .await? + .into_stream(); + + let current_state = match updates.next().await { + Some(state) => state, + None => return Ok(()), + }; + + match current_state { + LnPayState::Success { preimage } => { + info!( + operation_id = ?operation.operation_id, + "Lightning payment succeeded" + ); + + // Publish success event + if let Some(amount) = operation.amount_msat { + let event = FmcdEvent::PaymentSucceeded { + operation_id: format!("{:?}", operation.operation_id), + federation_id: operation.federation_id.to_string(), + amount_msat: amount.msats, + preimage, + timestamp: Utc::now(), + }; + let _ = event_bus.publish(event).await; + } + } + LnPayState::Refunded { gateway_error } => { + warn!( + operation_id = ?operation.operation_id, + error = %gateway_error, + "Lightning payment refunded" + ); + + // Publish refund event + let event = FmcdEvent::PaymentRefunded { + operation_id: format!("{:?}", operation.operation_id), + federation_id: operation.federation_id.to_string(), + reason: gateway_error.to_string(), + timestamp: Utc::now(), + }; + let _ = event_bus.publish(event).await; + } + _ => { + // Still in progress + debug!( + operation_id = ?operation.operation_id, + state = ?current_state, + "Lightning payment still in progress" + ); + } + } + + Ok(()) + } + + /// Process an onchain deposit operation - claim ecash after confirmation! + async fn process_onchain_deposit( + client: &ClientHandleArc, + operation: &mut PaymentOperation, + event_bus: &Arc, + _claim_timeout: Duration, + ) -> Result<()> { + let wallet_module = client.get_first_module::()?; + + // Subscribe to updates for this operation + let mut updates = wallet_module + .subscribe_deposit(operation.operation_id) + .await? + .into_stream(); + + // Process all available states to get to the current state + let mut last_state = None; + let timeout_future = tokio::time::sleep(Duration::from_millis(100)); + tokio::pin!(timeout_future); + + loop { + tokio::select! { + update = updates.next() => { + match update { + Some(state) => { + last_state = Some(state); + continue; + } + None => break, + } + } + _ = &mut timeout_future => { + break; + } + } + } + + let current_state = match last_state { + Some(state) => state, + None => { + debug!( + operation_id = ?operation.operation_id, + "No state updates available for onchain deposit" + ); + return Ok(()); + } + }; + + match current_state { + DepositStateV2::Claimed { + btc_deposited, + btc_out_point, + } => { + // CRITICAL INSIGHT: When DepositStateV2::Claimed is reached, + // the ecash notes have ALREADY been issued to the wallet! + // The "Claimed" state means the full deposit flow is complete: + // 1. Bitcoin transaction confirmed on-chain + // 2. Federation verified the deposit + // 3. Ecash notes issued and now in the client's wallet + + if !operation.ecash_claimed { + info!( + operation_id = ?operation.operation_id, + amount_sat = btc_deposited.to_sat(), + txid = %btc_out_point.txid, + "Onchain deposit successfully claimed - ecash notes received in wallet!" + ); + + // Mark as successfully claimed + operation.ecash_claimed = true; + operation.claim_attempted = true; + + // Update the operation amount now that we know it + operation.amount_msat = Some(Amount::from_sats(btc_deposited.to_sat())); + + // Publish success event + let event = FmcdEvent::DepositClaimed { + operation_id: format!("{:?}", operation.operation_id), + federation_id: operation.federation_id.to_string(), + amount_sat: btc_deposited.to_sat(), + txid: btc_out_point.txid.to_string(), + correlation_id: operation.correlation_id.clone(), + timestamp: Utc::now(), + }; + let _ = event_bus.publish(event).await; + + // Verify balance was actually updated + let balance = client.get_balance().await; + info!( + operation_id = ?operation.operation_id, + balance_msat = balance.msats, + "Wallet balance after onchain deposit" + ); + } + } + DepositStateV2::Confirmed { + btc_deposited, + btc_out_point, + } => { + // Deposit is confirmed but not yet claimed + // The federation is still processing it + info!( + operation_id = ?operation.operation_id, + amount_sat = btc_deposited.to_sat(), + txid = %btc_out_point.txid, + "Onchain deposit confirmed, waiting for federation to issue ecash" + ); + + // Update amount now that we know it + operation.amount_msat = Some(Amount::from_sats(btc_deposited.to_sat())); + } + DepositStateV2::Failed(reason) => { + error!( + operation_id = ?operation.operation_id, + reason = %reason, + "Onchain deposit failed" + ); + operation.claim_attempted = true; + } + DepositStateV2::WaitingForTransaction => { + debug!( + operation_id = ?operation.operation_id, + "Waiting for onchain transaction" + ); + } + DepositStateV2::WaitingForConfirmation { + btc_deposited, + btc_out_point, + } => { + debug!( + operation_id = ?operation.operation_id, + amount_sat = btc_deposited.to_sat(), + txid = %btc_out_point.txid, + "Waiting for onchain confirmations" + ); + } + } + + Ok(()) + } + + /// Process an onchain withdrawal operation + async fn process_onchain_withdraw( + client: &ClientHandleArc, + operation: &PaymentOperation, + event_bus: &Arc, + ) -> Result<()> { + let wallet_module = client.get_first_module::()?; + + // Check current state + let mut updates = wallet_module + .subscribe_withdraw_updates(operation.operation_id) + .await? + .into_stream(); + + let current_state = match updates.next().await { + Some(state) => state, + None => return Ok(()), + }; + + match current_state { + WithdrawState::Succeeded(txid) => { + info!( + operation_id = ?operation.operation_id, + txid = %txid, + "Onchain withdrawal succeeded" + ); + + // Publish success event + if let Some(amount) = operation.amount_msat { + let event = FmcdEvent::WithdrawalSucceeded { + operation_id: format!("{:?}", operation.operation_id), + federation_id: operation.federation_id.to_string(), + amount_sat: amount.sats_round_down(), + txid: txid.to_string(), + timestamp: Utc::now(), + }; + let _ = event_bus.publish(event).await; + } + } + WithdrawState::Failed(reason) => { + error!( + operation_id = ?operation.operation_id, + reason = %reason, + "Onchain withdrawal failed" + ); + + // Publish failure event + let event = FmcdEvent::WithdrawalFailed { + operation_id: format!("{:?}", operation.operation_id), + federation_id: operation.federation_id.to_string(), + reason, + correlation_id: None, + timestamp: Utc::now(), + }; + let _ = event_bus.publish(event).await; + } + _ => { + // Still in progress + debug!( + operation_id = ?operation.operation_id, + state = ?current_state, + "Onchain withdrawal still in progress" + ); + } + } + + Ok(()) + } + + /// Get statistics about active operations + pub async fn get_stats(&self) -> PaymentLifecycleStats { + let operations = self.active_operations.read().await; + let mut by_type: HashMap = HashMap::new(); + let mut by_federation: HashMap = HashMap::new(); + + for operation in operations.values() { + *by_type.entry(operation.payment_type.clone()).or_insert(0) += 1; + *by_federation.entry(operation.federation_id).or_insert(0) += 1; + } + + PaymentLifecycleStats { + total_active_operations: operations.len(), + operations_by_type: by_type, + operations_by_federation: by_federation, + } + } +} + +/// Statistics about the payment lifecycle manager +#[derive(Debug, Clone, Serialize)] +pub struct PaymentLifecycleStats { + pub total_active_operations: usize, + pub operations_by_type: HashMap, + pub operations_by_federation: HashMap, +} diff --git a/src/state.rs b/src/state.rs index 5a9032d..20e9b83 100644 --- a/src/state.rs +++ b/src/state.rs @@ -2,8 +2,7 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::Instant; -use anyhow::{anyhow, Result}; -use axum::http::StatusCode; +use anyhow::Result; use fedimint_client::ClientHandleArc; use fedimint_core::config::{FederationId, FederationIdPrefix}; use tracing::{info, warn}; @@ -12,20 +11,23 @@ use crate::error::{AppError, ErrorCategory}; use crate::events::handlers::{LoggingEventHandler, MetricsEventHandler}; use crate::events::EventBus; use crate::multimint::MultiMint; -use crate::observability::correlation::RequestContext; -use crate::services::{BalanceMonitor, BalanceMonitorConfig, DepositMonitor, DepositMonitorConfig}; +use crate::services::{ + BalanceMonitor, BalanceMonitorConfig, DepositMonitor, DepositMonitorConfig, + PaymentLifecycleConfig, PaymentLifecycleManager, +}; use crate::webhooks::{WebhookConfig, WebhookNotifier}; #[cfg(test)] #[path = "state_tests.rs"] mod tests; -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct AppState { pub multimint: MultiMint, pub start_time: Instant, pub event_bus: Arc, pub deposit_monitor: Option>, pub balance_monitor: Option>, + pub payment_lifecycle_manager: Option>, } impl AppState { @@ -79,12 +81,19 @@ impl AppState { BalanceMonitorConfig::default(), )); + let payment_lifecycle_manager = Arc::new(PaymentLifecycleManager::new( + event_bus.clone(), + Arc::new(clients.clone()), + PaymentLifecycleConfig::default(), + )); + Ok(Self { multimint: clients, start_time: Instant::now(), event_bus, deposit_monitor: Some(deposit_monitor), balance_monitor: Some(balance_monitor), + payment_lifecycle_manager: Some(payment_lifecycle_manager), }) } @@ -142,12 +151,19 @@ impl AppState { BalanceMonitorConfig::default(), )); + let payment_lifecycle_manager = Arc::new(PaymentLifecycleManager::new( + event_bus.clone(), + Arc::new(clients.clone()), + PaymentLifecycleConfig::default(), + )); + Ok(Self { multimint: clients, start_time: Instant::now(), event_bus, deposit_monitor: Some(deposit_monitor), balance_monitor: Some(balance_monitor), + payment_lifecycle_manager: Some(payment_lifecycle_manager), }) } @@ -221,7 +237,8 @@ impl AppState { self.start_time.elapsed() } - /// Start the monitoring services (deposit and balance monitors) + /// Start the monitoring services (deposit, balance, and payment lifecycle + /// monitors) pub async fn start_monitoring_services(&self) -> Result<()> { if let Some(ref deposit_monitor) = self.deposit_monitor { deposit_monitor.start().await?; @@ -233,6 +250,11 @@ impl AppState { info!("Balance monitor started successfully"); } + if let Some(ref payment_lifecycle_manager) = self.payment_lifecycle_manager { + payment_lifecycle_manager.start().await?; + info!("Payment lifecycle manager started successfully"); + } + Ok(()) } diff --git a/src/webhooks/invoice.rs b/src/webhooks/invoice.rs new file mode 100644 index 0000000..e28a2b0 --- /dev/null +++ b/src/webhooks/invoice.rs @@ -0,0 +1,118 @@ +use std::time::Duration; + +use chrono::{DateTime, Utc}; +use serde::Serialize; +use tracing::{error, info, warn}; + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct InvoiceWebhookEvent { + pub event_type: String, + pub operation_id: String, + pub invoice_id: String, + pub federation_id: String, + pub timestamp: DateTime, + pub data: serde_json::Value, +} + +/// Send webhook notification for invoice events +/// Uses exponential backoff with retries for reliability +pub async fn send_invoice_webhook( + webhook_url: &str, + event: &InvoiceWebhookEvent, +) -> anyhow::Result<()> { + const MAX_RETRIES: u32 = 3; + const BASE_DELAY: Duration = Duration::from_millis(100); + const REQUEST_TIMEOUT: Duration = Duration::from_secs(10); + + let client = reqwest::Client::builder() + .timeout(REQUEST_TIMEOUT) + .build()?; + + for attempt in 0..MAX_RETRIES { + // Use bit shifting for efficient exponential backoff: 2^n = 1 << n + let delay = BASE_DELAY * (1 << attempt); + + if attempt > 0 { + info!( + webhook_url = %webhook_url, + attempt = attempt + 1, + delay_ms = delay.as_millis(), + "Retrying webhook request after delay" + ); + tokio::time::sleep(delay).await; + } + + match send_webhook_request(&client, webhook_url, event).await { + Ok(()) => { + info!( + webhook_url = %webhook_url, + event_type = %event.event_type, + operation_id = %event.operation_id, + attempt = attempt + 1, + "Webhook sent successfully" + ); + return Ok(()); + } + Err(e) => { + warn!( + webhook_url = %webhook_url, + event_type = %event.event_type, + operation_id = %event.operation_id, + attempt = attempt + 1, + error = ?e, + "Webhook request failed" + ); + + if attempt == MAX_RETRIES - 1 { + error!( + webhook_url = %webhook_url, + event_type = %event.event_type, + operation_id = %event.operation_id, + max_retries = MAX_RETRIES, + "Webhook delivery failed after all retries" + ); + return Err(anyhow::anyhow!( + "Webhook delivery failed after {} retries: {}", + MAX_RETRIES, + e + )); + } + } + } + } + + Ok(()) +} + +async fn send_webhook_request( + client: &reqwest::Client, + webhook_url: &str, + event: &InvoiceWebhookEvent, +) -> anyhow::Result<()> { + let response = client + .post(webhook_url) + .header("Content-Type", "application/json") + .header("User-Agent", "fmcd-webhook/1.0") + .header("X-Fmcd-Event-Type", &event.event_type) + .header("X-Fmcd-Operation-Id", &event.operation_id) + .header("X-Fmcd-Timestamp", event.timestamp.to_rfc3339()) + .json(event) + .send() + .await?; + + let status = response.status(); + if status.is_success() { + Ok(()) + } else { + let error_body = response + .text() + .await + .unwrap_or_else(|_| "Unknown error".to_string()); + anyhow::bail!( + "Webhook request failed with status {}: {}", + status, + error_body + ); + } +} diff --git a/src/webhooks/mod.rs b/src/webhooks/mod.rs index 5eea468..e7cc4cf 100644 --- a/src/webhooks/mod.rs +++ b/src/webhooks/mod.rs @@ -1,3 +1,4 @@ +pub mod invoice; pub mod notifier; #[cfg(test)] diff --git a/src/webhooks/notifier.rs b/src/webhooks/notifier.rs index 0bd6505..d2d5bd5 100644 --- a/src/webhooks/notifier.rs +++ b/src/webhooks/notifier.rs @@ -1,17 +1,15 @@ use std::collections::HashSet; use std::fmt; -use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; +use std::net::IpAddr; use std::str::FromStr; -use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; -use chrono::{DateTime, Utc}; use hex; use hmac::{Hmac, Mac}; use reqwest::Client; use serde::{Deserialize, Serialize}; -use serde_json::{Map, Value}; +use serde_json::Value; use sha2::Sha256; use tokio::time::sleep; use tracing::{debug, error, info, warn}; @@ -333,7 +331,7 @@ impl WebhookNotifier { let event_type = event.event_type(); let event_id = event.event_id(); - let timestamp = event.timestamp(); + let _timestamp = event.timestamp(); debug!( event_id = %event_id, diff --git a/src/webhooks/tests/invoice_tests.rs b/src/webhooks/tests/invoice_tests.rs new file mode 100644 index 0000000..7006f79 --- /dev/null +++ b/src/webhooks/tests/invoice_tests.rs @@ -0,0 +1,26 @@ +#![allow(clippy::unwrap_used)] + +use chrono::Utc; +use serde_json::json; + +use crate::webhooks::invoice::InvoiceWebhookEvent; + +#[tokio::test] +async fn test_webhook_event_serialization() { + let event = InvoiceWebhookEvent { + event_type: "invoice_paid".to_string(), + operation_id: "test-op-123".to_string(), + invoice_id: "test-invoice-456".to_string(), + federation_id: "test-fed-789".to_string(), + timestamp: Utc::now(), + data: json!({ + "amount_received_msat": 1000000, + "settled_at": "2024-01-01T12:00:00Z" + }), + }; + + let serialized = serde_json::to_string(&event).unwrap(); + assert!(serialized.contains("invoice_paid")); + assert!(serialized.contains("test-op-123")); + assert!(serialized.contains("1000000")); +} diff --git a/src/webhooks/tests/mod.rs b/src/webhooks/tests/mod.rs index 2dcb7fe..7b67144 100644 --- a/src/webhooks/tests/mod.rs +++ b/src/webhooks/tests/mod.rs @@ -1,2 +1,4 @@ #[cfg(test)] +mod invoice_tests; +#[cfg(test)] mod notifier_tests;