diff --git a/controller/subscription_controller.go b/controller/subscription_controller.go index 05a71f02..93e3579a 100644 --- a/controller/subscription_controller.go +++ b/controller/subscription_controller.go @@ -618,6 +618,11 @@ func stripeHandleInvoicePaid( AddRefreshTransferBalance(clientSession.Ctx, *networkId) + id, ok := clientSession.Ctx.Value(stripeWebhookIDKey).(server.Id) + if ok { + model.MarkStripeWebhookProcessed(clientSession.Ctx, id, *networkId) + } + return &StripeWebhookResult{}, nil } @@ -1483,12 +1488,19 @@ func PlaySubscriptionRenewalPost( return nil } +const stripeWebhookIDKey = "stripeWebhookID" + func VerifyStripeBody(req *http.Request) (io.Reader, error) { bodyBytes, err := io.ReadAll(req.Body) if err != nil { return nil, err } + id, _ := model.StoreStripeWebhookBody(req.Context(), bodyBytes) + // Attach the ID to the context + ctx := context.WithValue(req.Context(), stripeWebhookIDKey, id) + req = req.WithContext(ctx) + _, err = stripewebhook.ConstructEventWithOptions( bodyBytes, req.Header.Get("Stripe-Signature"), diff --git a/db_migrations.go b/db_migrations.go index 14b84566..a52ff48f 100644 --- a/db_migrations.go +++ b/db_migrations.go @@ -2558,7 +2558,7 @@ var migrations = []any{ min_block_number bigint NOT NULL DEFAULT 0, max_block_number bigint NOT NULL DEFAULT 0, country_location_id uuid NOT NULL DEFAULT gen_random_uuid(), - + PRIMARY KEY (network_id, country_location_id) ) `), @@ -2638,4 +2638,15 @@ var migrations = []any{ ALTER TABLE network_client_speed ADD COLUMN sample_count bigint NOT NULL DEFAULT 1 `), + + newSqlMigration(` + CREATE TABLE subscription_webhook_stripe ( + id uuid NOT NULL, + webhook_body text NOT NULL, + network_id uuid, + created_at timestamp NOT NULL DEFAULT now(), + + PRIMARY KEY (id) + ) + `), } diff --git a/model/subscription_model.go b/model/subscription_model.go index d2d7f1d9..e07e2633 100644 --- a/model/subscription_model.go +++ b/model/subscription_model.go @@ -3020,3 +3020,52 @@ func GetOpenTransferByteCount( return openTransferByteCount } + +func StoreStripeWebhookBody(ctx context.Context, webhookBody []byte) (id server.Id, returnErr error) { + + server.Tx(ctx, func(tx server.PgTx) { + + id = server.NewId() + + server.RaisePgResult(tx.Exec( + ctx, + ` + INSERT INTO subscription_webhook_stripe ( + id, + webhook_body + ) + VALUES ($1, $2) + `, + id, + string(webhookBody), + )) + }) + + return +} + +func MarkStripeWebhookProcessed( + ctx context.Context, + id server.Id, + networkId server.Id, +) (returnErr error) { + + server.Tx(ctx, func(tx server.PgTx) { + + server.RaisePgResult(tx.Exec( + ctx, + ` + UPDATE subscription_webhook_stripe + SET + network_id = $2 + WHERE + id = $1 + `, + id, + networkId, + )) + }) + + return + +} diff --git a/model/subscription_model_test.go b/model/subscription_model_test.go index 6867d3ca..2dd6b988 100644 --- a/model/subscription_model_test.go +++ b/model/subscription_model_test.go @@ -1315,6 +1315,28 @@ func TestGetOpenTransferByteCount(t *testing.T) { }) } +func TestStoreStripeWebhook(t *testing.T) { + server.DefaultTestEnv().Run(func() { + + ctx := context.Background() + + webhookBody := `{ + "id": "evt_1JQY2Y2eZvKYlo2C0q7Z1g5H", + "object": "event", + }` + + id, err := StoreStripeWebhookBody(ctx, []byte(webhookBody)) + assert.Equal(t, err, nil) + assert.NotEqual(t, id, "") + + networkId := server.NewId() + + err = MarkStripeWebhookProcessed(ctx, id, networkId) + assert.Equal(t, err, nil) + + }) +} + // FIXME a subsidy test where N clients pay each other // FIXME each client uses a different amount of data, but sends to peer clients following the same offset distribution as the others // FIXME the end result is that everyone should be paid the same, even though they get different amounts of data