Skip to content

Commit e2b74a1

Browse files
mempool: adapt for generalized validation
1 parent 7767396 commit e2b74a1

File tree

4 files changed

+110
-82
lines changed

4 files changed

+110
-82
lines changed

dmq-node/src/DMQ/Protocol/LocalMsgSubmission/Client.hs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@ module DMQ.Protocol.LocalMsgSubmission.Client
1717
import DMQ.Protocol.LocalMsgSubmission.Type
1818
import Network.TypedProtocol.Peer.Client
1919
import Ouroboros.Network.Protocol.LocalTxSubmission.Client
20+
import Ouroboros.Network.TxSubmission.Mempool.Simple
2021

2122
-- | Type aliases for the high level client API
2223
--
23-
type LocalMsgSubmissionClient sig = LocalTxSubmissionClient sig SigMempoolFail
24+
type LocalMsgSubmissionClient sig = LocalTxSubmissionClient sig (MempoolAddFail sig)
2425
type LocalMsgClientStIdle = LocalTxClientStIdle
2526

2627

dmq-node/src/DMQ/Protocol/LocalMsgSubmission/Server.hs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@ module DMQ.Protocol.LocalMsgSubmission.Server
1818
import DMQ.Protocol.LocalMsgSubmission.Type
1919
import Network.TypedProtocol.Peer.Server
2020
import Ouroboros.Network.Protocol.LocalTxSubmission.Server as LocalTxSubmission
21+
import Ouroboros.Network.TxSubmission.Mempool.Simple
2122

2223
-- | Type aliases for the high level client API
2324
--
24-
type LocalMsgSubmissionServer sig = LocalTxSubmissionServer sig SigMempoolFail
25+
type LocalMsgSubmissionServer sig = LocalTxSubmissionServer sig (MempoolAddFail sig)
2526

2627

2728
-- | A non-pipelined 'Peer' representing the 'LocalMsgSubmissionServer'.

dmq-node/src/DMQ/Protocol/LocalMsgSubmission/Type.hs

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,22 +12,10 @@ module DMQ.Protocol.LocalMsgSubmission.Type
1212
, module Ouroboros
1313
) where
1414

15-
import Data.Text (Text)
1615
import Network.TypedProtocol.Core as Core
1716
import Ouroboros.Network.Protocol.LocalTxSubmission.Type as Ouroboros
18-
import Ouroboros.Network.Util.ShowProxy
17+
import Ouroboros.Network.TxSubmission.Mempool.Simple
1918

2019
-- | The LocalMsgSubmission protocol is an alias for the LocalTxSubmission
2120
--
22-
type LocalMsgSubmission sig = Ouroboros.LocalTxSubmission sig SigMempoolFail
23-
24-
-- | The type of failures when adding to the mempool
25-
--
26-
data SigMempoolFail =
27-
SigInvalid Text
28-
| SigDuplicate
29-
| SigExpired
30-
| SigResultOther Text
31-
deriving (Eq, Show)
32-
33-
instance ShowProxy SigMempoolFail where
21+
type LocalMsgSubmission sig = Ouroboros.LocalTxSubmission sig (MempoolAddFail sig)
Lines changed: 104 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,49 @@
1-
{-# LANGUAGE DerivingStrategies #-}
2-
{-# LANGUAGE GADTs #-}
3-
{-# LANGUAGE NamedFieldPuns #-}
4-
{-# LANGUAGE ScopedTypeVariables #-}
5-
{-# LANGUAGE StandaloneDeriving #-}
1+
{-# LANGUAGE BangPatterns #-}
2+
{-# LANGUAGE DerivingStrategies #-}
3+
{-# LANGUAGE DisambiguateRecordFields #-}
4+
{-# LANGUAGE FlexibleContexts #-}
5+
{-# LANGUAGE GADTs #-}
6+
{-# LANGUAGE NamedFieldPuns #-}
7+
{-# LANGUAGE ScopedTypeVariables #-}
8+
{-# LANGUAGE StandaloneDeriving #-}
9+
{-# LANGUAGE TupleSections #-}
10+
{-# LANGUAGE TypeFamilies #-}
611

712
-- | The module should be imported qualified.
813
--
914
module Ouroboros.Network.TxSubmission.Mempool.Simple
10-
( Mempool (..)
15+
( InvalidTxsError
16+
, MempoolAddFail
17+
, Mempool (..)
1118
, MempoolSeq (..)
19+
, MempoolWriter (..)
1220
, empty
1321
, new
1422
, read
1523
, getReader
1624
, getWriter
25+
, writerAdapter
1726
) where
1827

1928
import Prelude hiding (read, seq)
2029

2130
import Control.Concurrent.Class.MonadSTM.Strict
22-
import Control.Monad (when)
31+
import Control.DeepSeq
32+
import Control.Exception (assert)
2333
import Control.Monad.Class.MonadThrow
24-
34+
import Control.Monad.Trans.Except
2535
import Data.Bifunctor (bimap)
26-
import Data.Either (partitionEithers)
36+
import Data.Either
2737
import Data.Foldable (toList)
2838
import Data.Foldable qualified as Foldable
29-
import Data.Function (on)
30-
import Data.List (find, nubBy)
39+
import Data.List (find)
3140
import Data.Maybe (isJust)
3241
import Data.Sequence (Seq)
3342
import Data.Sequence qualified as Seq
3443
import Data.Set (Set)
3544
import Data.Set qualified as Set
36-
import Data.Typeable (Typeable)
3745

46+
import Ouroboros.Network.Protocol.LocalTxSubmission.Type (SubmitResult (..))
3847
import Ouroboros.Network.SizeInBytes
3948
import Ouroboros.Network.TxSubmission.Inbound.V2.Types
4049
import Ouroboros.Network.TxSubmission.Mempool.Reader
@@ -105,69 +114,98 @@ getReader getTxId getTxSize (Mempool mempool) =
105114
f :: Int -> tx -> (txid, Int, SizeInBytes)
106115
f idx tx = (getTxId tx, idx, getTxSize tx)
107116

117+
-- | type of mempool validation errors which are thrown as exceptions
118+
--
119+
data family InvalidTxsError failure
108120

109-
data InvalidTxsError where
110-
InvalidTxsError :: forall txid failure.
111-
( Typeable txid
112-
, Typeable failure
113-
, Show txid
114-
, Show failure
115-
)
116-
=> [(txid, failure)]
117-
-> InvalidTxsError
118-
119-
deriving instance Show InvalidTxsError
120-
instance Exception InvalidTxsError
121-
121+
-- | type of mempool validation errors which are non-fatal
122+
--
123+
data family MempoolAddFail tx
122124

123-
-- | A simple mempool writer.
125+
-- | A mempool writer which generalizes the tx submission mempool writer
126+
-- TODO: We could replace TxSubmissionMempoolWriter with this at some point
127+
--
128+
data MempoolWriter txid tx failure idx m =
129+
MempoolWriter {
130+
131+
-- | Compute the transaction id from a transaction.
132+
--
133+
-- This is used in the protocol handler to verify a full transaction
134+
-- matches a previously given transaction id.
135+
--
136+
txId :: tx -> txid,
137+
138+
-- | Supply a batch of transactions to the mempool. They are either
139+
-- accepted or rejected individually, but in the order supplied.
140+
--
141+
-- The 'txid's of all transactions that were added successfully are
142+
-- returned.
143+
mempoolAddTxs :: [tx] -> m [(txid, SubmitResult (MempoolAddFail tx))]
144+
}
145+
146+
147+
-- | A mempool writer with validation harness
148+
-- PRECONDITION: no duplicates given to mempoolAddTxs
124149
--
125150
getWriter :: forall tx txid ctx failure m.
126151
( MonadSTM m
152+
, Exception (InvalidTxsError failure)
127153
, MonadThrow m
154+
-- TODO:
155+
-- , NFData txid
156+
-- , NFData tx
157+
-- , NFData (MempoolAddFail tx)
128158
, Ord txid
129-
, Typeable txid
130-
, Typeable failure
131-
, Show txid
132-
, Show failure
133159
)
134160
=> (tx -> txid)
135161
-- ^ get txid of a tx
136162
-> m ctx
137-
-- ^ monadic validation ctx
138-
-> (ctx -> tx -> Either failure ())
139-
-- ^ validate a tx, any failing `tx` throws an exception.
140-
-> (failure -> Bool)
141-
-- ^ return `True` when a failure should throw an exception
163+
-- ^ acquire validation context
164+
-> ([tx] -> ctx -> Except (InvalidTxsError failure) [(Either (MempoolAddFail tx) ())])
165+
-- ^ validation function which should evaluate its result to normal form
166+
-- esp. if it is 'expensive'
167+
-> MempoolAddFail tx
168+
-- ^ replace duplicates
142169
-> Mempool m txid tx
143-
-> TxSubmissionMempoolWriter txid tx Int m
144-
getWriter getTxId getValidationCtx validateTx failureFilterFn (Mempool mempool) =
145-
TxSubmissionMempoolWriter {
146-
txId = getTxId,
147-
148-
mempoolAddTxs = \txs -> do
149-
ctx <- getValidationCtx
150-
(invalidTxIds, validTxs) <- atomically $ do
151-
MempoolSeq { mempoolSet, mempoolSeq } <- readTVar mempool
152-
let (invalidTxIds, validTxs) =
153-
bimap (filter (failureFilterFn . snd))
154-
(nubBy (on (==) getTxId))
155-
. partitionEithers
156-
. map (\tx -> case validateTx ctx tx of
157-
Left e -> Left (getTxId tx, e)
158-
Right _ -> Right tx
159-
)
160-
. filter (\tx -> getTxId tx `Set.notMember` mempoolSet)
161-
$ txs
162-
mempoolTxs' = MempoolSeq {
163-
mempoolSet = Foldable.foldl' (\s tx -> getTxId tx `Set.insert` s)
164-
mempoolSet
165-
validTxs,
166-
mempoolSeq = Foldable.foldl' (Seq.|>) mempoolSeq validTxs
167-
}
168-
writeTVar mempool mempoolTxs'
169-
return (invalidTxIds, map getTxId validTxs)
170-
when (not (null invalidTxIds)) $
171-
throwIO (InvalidTxsError invalidTxIds)
172-
return validTxs
173-
}
170+
-> MempoolWriter txid tx failure Int m
171+
getWriter getTxId acquireCtx validateTxs duplicateFail (Mempool mempool) =
172+
MempoolWriter {
173+
txId = getTxId,
174+
175+
mempoolAddTxs = \txs -> assert (not . null $ txs) $ do
176+
ctx <- acquireCtx
177+
!vTxs <- case runExcept (validateTxs txs ctx) of
178+
Left e -> throwIO e
179+
Right r -> pure {-. force-} $ zipWith3 ((,,) . getTxId) txs txs r
180+
181+
atomically $ do
182+
MempoolSeq { mempoolSet, mempoolSeq } <- readTVar mempool
183+
let result =
184+
[if duplicate then
185+
Left . (txid,) $ SubmitFail duplicateFail
186+
else
187+
bimap ((txid,) . SubmitFail) (const (txid, tx)) eErrTx
188+
| (txid, tx, eErrTx) <- vTxs
189+
, let duplicate = txid `Set.member` mempoolSet
190+
]
191+
(validIds, validTxs) = unzip . rights $ result
192+
mempoolTxs' = MempoolSeq {
193+
mempoolSet = Set.union mempoolSet (Set.fromList validIds),
194+
mempoolSeq = Foldable.foldl' (Seq.|>) mempoolSeq validTxs
195+
}
196+
writeTVar mempool mempoolTxs'
197+
return $ either id ((,SubmitSuccess) . fst) <$> result
198+
}
199+
200+
201+
-- | Takes the general mempool writer defined here
202+
-- and adapts it to the API of the tx submission mempool writer
203+
-- to avoid more breaking changes for now.
204+
--
205+
writerAdapter :: (Functor m)
206+
=> MempoolWriter txid tx failure idx m
207+
-> TxSubmissionMempoolWriter txid tx idx m
208+
writerAdapter MempoolWriter { txId, mempoolAddTxs } =
209+
TxSubmissionMempoolWriter { txId, mempoolAddTxs = adapter }
210+
where
211+
adapter = fmap (fmap fst) . mempoolAddTxs

0 commit comments

Comments
 (0)