Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
256 changes: 256 additions & 0 deletions cli/cmd/payments/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
package payments

import (
"context"
"fmt"
"time"

"github.com/ethereum/go-ethereum/common"
logging "github.com/ipfs/go-log/v2"
"github.com/spf13/cobra"
"github.com/storacha/forgectl/cli/config"
"github.com/storacha/forgectl/pkg/database"
"github.com/storacha/forgectl/pkg/services/inspector"
)

var (
monitorPayer string
monitorInterval time.Duration
monitorDBURL string
)

var log = logging.Logger("payments/monitor")

var monitorCmd = &cobra.Command{
Use: "monitor",
Short: "Monitor payment account, operator, and rails info, storing snapshots in PostgreSQL",
Long: `Monitor continuously polls PaymentAccountInfo, PaymentOperatorInfo, and PaymentRails
for a specified payer address and stores diff-based snapshots in a PostgreSQL database.

The database uses triggers to efficiently track changes:
- New rows are inserted only when values change
- Unchanged values update only the checked_at timestamp
- This creates a compact history showing when values changed and when they were verified

The command runs indefinitely until interrupted (Ctrl+C).`,
RunE: runMonitor,
}

func init() {
monitorCmd.Flags().StringVar(&monitorPayer, "payer", "", "Payer address to monitor (required)")
cobra.CheckErr(monitorCmd.MarkFlagRequired("payer"))

monitorCmd.Flags().DurationVar(&monitorInterval, "interval", 30*time.Second, "Polling interval (e.g., 30s, 1m, 5m)")
monitorCmd.Flags().StringVar(&monitorDBURL, "db-url", "", "Database connection URL (overrides config file)")
}

func runMonitor(cmd *cobra.Command, args []string) error {
// Validate payer address
if !common.IsHexAddress(monitorPayer) {
return fmt.Errorf("invalid payer address: %s", monitorPayer)
}
payerAddr := common.HexToAddress(monitorPayer)

ctx := cmd.Context()

// Load configuration
cfg, err := config.Load()
if err != nil {
return err
}

// Determine database URL
dbURL := monitorDBURL
if dbURL == "" {
dbURL = cfg.DatabaseURL
}
if dbURL == "" {
return fmt.Errorf("database URL is required: use --db-url flag or set database_url in config file")
}

// Connect to database
log.Infof("Connecting to database...")
db, err := database.Connect(dbURL)
if err != nil {
return fmt.Errorf("database connection failed: %w", err)
}
defer db.Close()

// Apply triggers (idempotent - safe to run multiple times)
log.Infof("Applying database triggers...")
if err := db.ApplyTriggers(); err != nil {
return fmt.Errorf("failed to apply triggers: %w", err)
}

// Create inspector service
log.Infof("Initializing inspector service...")
inspctr, err := inspector.New(inspector.Config{
ClientEndpoint: cfg.RPCUrl,
PaymentsContractAddress: cfg.PaymentsAddr(),
ServiceContractAddress: cfg.ServiceAddr(),
ProviderRegistryAddress: cfg.ServiceRegistryAddr(),
})
if err != nil {
return err
}

tokenAddr := cfg.TokenAddr()

// Get or create payer-token pair ID (once at startup)
log.Infof("Resolving payer-token pair...")
payerTokenPairID, err := db.GetOrCreatePayerTokenPair(payerAddr.Hex(), tokenAddr.Hex())
if err != nil {
return fmt.Errorf("failed to get/create payer-token pair: %w", err)
}
log.Infof("Monitoring payer-token pair ID: %d", payerTokenPairID)

log.Infof("Starting monitor for payer %s with %s interval", payerAddr.Hex(), monitorInterval)
log.Info("Press Ctrl+C to stop")

// Create ticker for polling
ticker := time.NewTicker(monitorInterval)
defer ticker.Stop()

// Perform initial poll immediately
if err := pollAndStore(ctx, inspctr, db, payerTokenPairID, payerAddr, tokenAddr); err != nil {
log.Errorf("Initial poll failed: %v", err)
}

// Poll loop
for {
select {
case <-ctx.Done():
log.Info("Monitor stopped by user")
return nil
case <-ticker.C:
if err := pollAndStore(ctx, inspctr, db, payerTokenPairID, payerAddr, tokenAddr); err != nil {
log.Errorf("Poll failed: %v", err)
// Continue polling even if one poll fails
}
}
}
}

// pollAndStore fetches payment data and stores it in the database
func pollAndStore(ctx context.Context, inspctr *inspector.Service, db *database.DB, payerTokenPairID uint, payer, token common.Address) error {
log.Debugf("Polling payment data for payer %s...", payer.Hex())

// Fetch account info
accountInfo, err := inspctr.PaymentAccountInfo(ctx, token, payer)
if err != nil {
return fmt.Errorf("failed to fetch account info: %w", err)
}

// Fetch operator info
operatorInfo, err := inspctr.PaymentOperatorInfo(ctx, token, payer)
if err != nil {
return fmt.Errorf("failed to fetch operator info: %w", err)
}

// Fetch rails info
railsInfo, err := inspctr.PaymentsRailsForPayer(ctx, token, payer, 0, 100)
if err != nil {
return fmt.Errorf("failed to fetch rails info: %w", err)
}

// Create and save account snapshot (trigger handles deduplication)
accountSnapshot := database.NewPaymentAccountSnapshot(
payerTokenPairID,
accountInfo.Funds,
accountInfo.LockupCurrent,
accountInfo.LockupRate,
accountInfo.LockupLastSettledAt,
)
if err := db.SaveAccountSnapshot(accountSnapshot); err != nil {
return err
}

// Create and save operator snapshot (trigger handles deduplication)
operatorSnapshot := database.NewPaymentOperatorSnapshot(
payerTokenPairID,
operatorInfo.IsApproved,
operatorInfo.RateAllowance,
operatorInfo.LockupAllowance,
operatorInfo.RateUsage,
operatorInfo.LockupUsage,
operatorInfo.MaxLockupPeriod,
)
if err := db.SaveOperatorSnapshot(operatorSnapshot); err != nil {
return err
}

// Create and save rails summary (trigger handles deduplication)
railsSummary := database.NewPaymentRailsSummary(
payerTokenPairID,
railsInfo.Total,
railsInfo.NextOffset,
)
if err := db.SaveRailsSummary(railsSummary); err != nil {
return err
}

// Fetch and save each rail's detailed info
for _, rail := range railsInfo.Rails {
railDetail, err := inspctr.PaymentsRailInfo(ctx, rail.RailId)
if err != nil {
log.Errorf("Failed to fetch rail %s: %v", rail.RailId.String(), err)
continue // Don't fail entire poll if one rail fails
}

// Resolve all FK IDs for this rail
providerID, err := db.GetOrCreateProvider(
railDetail.To.Hex(),
nil, // provider_id from chain - we don't have it yet
"", // name - will be filled when we implement ListProviders tracking
"", // description
"", // payee
)
if err != nil {
log.Errorf("Failed to resolve provider for rail %s: %v", rail.RailId.String(), err)
continue
}

operatorContractID, err := db.GetOrCreateContract(railDetail.Operator.Hex())
if err != nil {
log.Errorf("Failed to resolve operator contract for rail %s: %v", rail.RailId.String(), err)
continue
}

validatorContractID, err := db.GetOrCreateContract(railDetail.Validator.Hex())
if err != nil {
log.Errorf("Failed to resolve validator contract for rail %s: %v", rail.RailId.String(), err)
continue
}

serviceFeeRecipientContractID, err := db.GetOrCreateContract(railDetail.ServiceFeeRecipient.Hex())
if err != nil {
log.Errorf("Failed to resolve service fee recipient contract for rail %s: %v", rail.RailId.String(), err)
continue
}

// Create rail info with all FK IDs
railInfo := database.NewPaymentRailInfo(
rail.RailId,
payerTokenPairID,
providerID,
operatorContractID,
validatorContractID,
serviceFeeRecipientContractID,
railDetail.PaymentRate,
railDetail.LockupPeriod,
railDetail.LockupFixed,
railDetail.SettledUpTo,
railDetail.EndEpoch,
railDetail.CommissionRateBps,
rail.IsTerminated,
)
if err := db.SaveRailInfo(railInfo); err != nil {
return err
}
}

log.Infof("Poll complete at %s - processed account, operator, and %d rails",
accountSnapshot.CheckedAt.Format(time.RFC3339), len(railsInfo.Rails))

return nil
}
1 change: 1 addition & 0 deletions cli/cmd/payments/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ var Cmd = &cobra.Command{
func init() {
Cmd.AddCommand(statusCmd)
Cmd.AddCommand(calculateCmd)
Cmd.AddCommand(monitorCmd)
}
6 changes: 6 additions & 0 deletions cli/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type Config struct {

KeystorePath string `mapstructure:"keystore_path"`
KeystorePassword string `mapstructure:"keystore_password"`

// Database configuration
DatabaseURL string `mapstructure:"database_url"`
}

// Validate checks that all required configuration fields are set and valid
Expand Down Expand Up @@ -95,6 +98,9 @@ func (c *Config) Validate() error {
return fmt.Errorf("keystore_password is required")
}

// Database URL is optional - only required when using monitor command
// Validation happens in the monitor command itself

return nil
}

Expand Down
8 changes: 8 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ require (
github.com/gorilla/websocket v1.4.2 // indirect
github.com/holiman/uint256 v1.3.2 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/pgx/v5 v5.6.0 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-runewidth v0.0.16 // indirect
github.com/pelletier/go-toml/v2 v2.2.4 // indirect
Expand All @@ -51,4 +57,6 @@ require (
golang.org/x/crypto v0.36.0 // indirect
golang.org/x/sys v0.36.0 // indirect
golang.org/x/text v0.28.0 // indirect
gorm.io/driver/postgres v1.6.0 // indirect
gorm.io/gorm v1.31.0 // indirect
)
21 changes: 21 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ github.com/crate-crypto/go-eth-kzg v1.4.0 h1:WzDGjHk4gFg6YzV0rJOAsTK4z3Qkz5jd4RE
github.com/crate-crypto/go-eth-kzg v1.4.0/go.mod h1:J9/u5sWfznSObptgfa92Jq8rTswn6ahQWEuiLHOjCUI=
github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a h1:W8mUrRp6NOVl3J+MYp5kPMoUZPp7aOYHtaua31lwRHg=
github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a/go.mod h1:sTwzHBvIzm2RfVCGNEBZgRyjwK40bVoun3ZnGOCafNM=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dchest/siphash v1.2.3 h1:QXwFc8cFOR2dSa/gE6o/HokBMWtLUaNDVd+22aKHeEA=
Expand Down Expand Up @@ -116,8 +117,20 @@ github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/ipfs/go-log/v2 v2.8.2 h1:nVG4nNHUwwI/sTs9Bi5iE8sXFQwXs3AjkkuWhg7+Y2I=
github.com/ipfs/go-log/v2 v2.8.2/go.mod h1:UhIYAwMV7Nb4ZmihUxfIRM2Istw/y9cAk3xaK+4Zs2c=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY=
github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw=
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus=
github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/klauspost/compress v1.16.0 h1:iULayQNOReoYUe+1qtKOqw9CwJv3aNQu8ivo7lw1HU4=
github.com/klauspost/compress v1.16.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4=
Expand Down Expand Up @@ -202,6 +215,9 @@ github.com/spf13/viper v1.21.0 h1:x5S+0EU27Lbphp4UKm1C+1oQO+rKx36vfCoaVebLFSU=
github.com/spf13/viper v1.21.0/go.mod h1:P0lhsswPGWD/1lZJ9ny3fYnVqxiegrlNrEmgLjbTCAY=
github.com/storacha/filecoin-services/go v0.0.1 h1:o5y+s9fVzVJy8pO8zmo1zDyObwaEU8APsY+Wu/Ozf98=
github.com/storacha/filecoin-services/go v0.0.1/go.mod h1:defpuuds8wN2Ag2rJH/InaARxfY49LXsP7V+RjCYhUY=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
Expand Down Expand Up @@ -254,5 +270,10 @@ gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gorm.io/driver/postgres v1.6.0 h1:2dxzU8xJ+ivvqTRph34QX+WrRaJlmfyPqXmoGVjMBa4=
gorm.io/driver/postgres v1.6.0/go.mod h1:vUw0mrGgrTK+uPHEhAdV4sfFELrByKVGnaVRkXDhtWo=
gorm.io/gorm v1.31.0 h1:0VlycGreVhK7RF/Bwt51Fk8v0xLiiiFdbGDPIZQ7mJY=
gorm.io/gorm v1.31.0/go.mod h1:XyQVbO2k6YkOis7C2437jSit3SsDK72s7n7rsSHd+Gs=
Loading