Skip to content

Commit 90e00ad

Browse files
app: integration
1 parent 754ebad commit 90e00ad

File tree

4 files changed

+83
-75
lines changed

4 files changed

+83
-75
lines changed

dmq-node/app/Main.hs

Lines changed: 59 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ module Main where
1010

1111
import Control.Exception (throwIO)
1212
import Control.Monad (void, when)
13-
import Control.Monad.Class.MonadAsync
1413
import Control.Tracer (Tracer (..), nullTracer, traceWith)
1514

1615
import Data.Act
@@ -28,13 +27,15 @@ import System.Random (newStdGen, split)
2827
import Cardano.Git.Rev (gitRev)
2928
import Cardano.KESAgent.KES.Evolution qualified as KES
3029
import Cardano.KESAgent.Protocols.StandardCrypto (StandardCrypto)
30+
import Cardano.Ledger.Keys (VKey (..))
31+
import Cardano.Ledger.Hashes (hashKey)
3132

3233
import DMQ.Configuration
3334
import DMQ.Configuration.CLIOptions (parseCLIOptions)
3435
import DMQ.Configuration.Topology (readTopologyFileOrError)
3536
import DMQ.Diffusion.Applications (diffusionApplications)
3637
import DMQ.Diffusion.Arguments
37-
import DMQ.Diffusion.NodeKernel (mempool, withNodeKernel)
38+
import DMQ.Diffusion.NodeKernel
3839
import DMQ.Handlers.TopLevel (toplevelExceptionHandler)
3940
import DMQ.NodeToClient qualified as NtC
4041
import DMQ.NodeToNode (NodeToNodeVersion, dmqCodecs, dmqLimitsAndTimeouts,
@@ -45,9 +46,11 @@ import DMQ.Tracer
4546

4647
import DMQ.Diffusion.PeerSelection (policy)
4748
import DMQ.NodeToClient.LocalStateQueryClient
49+
import DMQ.Protocol.SigSubmission.Validate
4850
import Ouroboros.Network.Diffusion qualified as Diffusion
4951
import Ouroboros.Network.PeerSelection.PeerSharing.Codec (decodeRemoteAddress,
5052
encodeRemoteAddress)
53+
import Ouroboros.Network.SizeInBytes
5154
import Ouroboros.Network.Snocket
5255
import Ouroboros.Network.TxSubmission.Mempool.Simple qualified as Mempool
5356

@@ -118,56 +121,66 @@ runDMQ commandLineConfig = do
118121
let (psRng, policyRng) = split stdGen
119122

120123
Diffusion.withIOManager \iocp -> do
121-
let localSnocket' = localSnocket iocp
124+
let localSnocket' = localSnocket iocp
125+
mkStakePoolMonitor = connectToCardanoNode tracer localSnocket' snocketPath
122126

123127
withNodeKernel @StandardCrypto
124128
tracer
125129
dmqConfig
126130
evolutionConfig
127-
psRng $ \nodeKernel -> do
131+
psRng
132+
mkStakePoolMonitor $ \nodeKernel -> do
128133
dmqDiffusionConfiguration <- mkDiffusionConfiguration dmqConfig nt
129134

130-
let stakePoolMonitor = connectToCardanoNode tracer localSnocket' snocketPath nodeKernel
131-
132-
withAsync stakePoolMonitor \aid -> do
133-
link aid
134-
let dmqNtNApps =
135-
ntnApps tracer
136-
dmqConfig
137-
nodeKernel
138-
(dmqCodecs
135+
let sigSize :: Sig StandardCrypto -> SizeInBytes
136+
sigSize _ = 0 -- TODO
137+
mempoolReader = Mempool.getReader sigId sigSize (mempool nodeKernel)
138+
dmqNtNApps =
139+
let ntnMempoolWriter = Mempool.writerAdapter $
140+
Mempool.getWriter sigId
141+
(poolValidationCtx $ stakePools nodeKernel)
142+
(validateSig FailDefault (hashKey . VKey))
143+
SigDuplicate
144+
(mempool nodeKernel)
145+
in ntnApps tracer
146+
dmqConfig
147+
mempoolReader
148+
ntnMempoolWriter
149+
sigSize
150+
nodeKernel
151+
(dmqCodecs
139152
(encodeRemoteAddress (maxBound @NodeToNodeVersion))
140153
(decodeRemoteAddress (maxBound @NodeToNodeVersion)))
141-
dmqLimitsAndTimeouts
142-
defaultSigDecisionPolicy
143-
dmqNtCApps =
144-
let sigSize _ = 0 -- TODO
145-
maxMsgs = 1000 -- TODO: make this negotiated in the handshake?
146-
mempoolReader = Mempool.getReader sigId sigSize (mempool nodeKernel)
147-
mempoolWriter = Mempool.getWriter sigId (pure ())
148-
(\_ _ -> Right () :: Either Void ())
149-
(\_ _ -> pure True)
150-
(mempool nodeKernel)
151-
in NtC.ntcApps tracer dmqConfig
152-
mempoolReader mempoolWriter maxMsgs
153-
(NtC.dmqCodecs encodeReject decodeReject)
154-
dmqDiffusionArguments =
155-
diffusionArguments (if handshakeTracer
156-
then WithEventType "Handshake" >$< tracer
157-
else nullTracer)
158-
(if localHandshakeTracer
159-
then WithEventType "Handshake" >$< tracer
160-
else nullTracer)
161-
dmqDiffusionApplications =
162-
diffusionApplications nodeKernel
163-
dmqConfig
164-
dmqDiffusionConfiguration
165-
dmqLimitsAndTimeouts
166-
dmqNtNApps
167-
dmqNtCApps
168-
(policy policyRng)
169-
170-
Diffusion.run dmqDiffusionArguments
171-
(dmqDiffusionTracers dmqConfig tracer)
172-
dmqDiffusionConfiguration
173-
dmqDiffusionApplications
154+
dmqLimitsAndTimeouts
155+
defaultSigDecisionPolicy
156+
dmqNtCApps =
157+
let maxMsgs = 1000 -- TODO: make this negotiated in the handshake?
158+
ntcMempoolWriter =
159+
Mempool.getWriter sigId
160+
(poolValidationCtx $ stakePools nodeKernel)
161+
(validateSig FailSoft (hashKey . VKey))
162+
SigDuplicate
163+
(mempool nodeKernel)
164+
in NtC.ntcApps tracer dmqConfig
165+
mempoolReader ntcMempoolWriter maxMsgs
166+
(NtC.dmqCodecs encodeReject decodeReject)
167+
dmqDiffusionArguments =
168+
diffusionArguments (if handshakeTracer
169+
then WithEventType "Handshake" >$< tracer
170+
else nullTracer)
171+
(if localHandshakeTracer
172+
then WithEventType "Handshake" >$< tracer
173+
else nullTracer)
174+
dmqDiffusionApplications =
175+
diffusionApplications nodeKernel
176+
dmqConfig
177+
dmqDiffusionConfiguration
178+
dmqLimitsAndTimeouts
179+
dmqNtNApps
180+
dmqNtCApps
181+
(policy policyRng)
182+
183+
Diffusion.run dmqDiffusionArguments
184+
(dmqDiffusionTracers dmqConfig tracer)
185+
dmqDiffusionConfiguration
186+
dmqDiffusionApplications

dmq-node/dmq-node.cabal

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,9 +145,9 @@ executable dmq-node
145145
aeson,
146146
base,
147147
cardano-git-rev,
148+
cardano-ledger-core,
148149
contra-tracer >=0.1 && <0.3,
149150
dmq-node,
150-
io-classes,
151151
kes-agent-crypto,
152152
optparse-applicative,
153153
ouroboros-network,

dmq-node/src/DMQ/NodeToClient.hs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ module DMQ.NodeToClient
1616
import Data.Aeson qualified as Aeson
1717
import Data.ByteString.Lazy (ByteString)
1818
import Data.Functor.Contravariant ((>$<))
19-
import Data.Typeable (Typeable)
2019
import Data.Void
2120
import Data.Word
2221

@@ -47,6 +46,7 @@ import DMQ.Protocol.LocalMsgSubmission.Codec
4746
import DMQ.Protocol.LocalMsgSubmission.Server
4847
import DMQ.Protocol.LocalMsgSubmission.Type
4948
import DMQ.Protocol.SigSubmission.Type (Sig, SigId, sigId)
49+
import DMQ.Protocol.SigSubmission.Validate
5050
import DMQ.Tracer
5151

5252
import Ouroboros.Network.Context
@@ -58,9 +58,9 @@ import Ouroboros.Network.OrphanInstances ()
5858
import Ouroboros.Network.Protocol.Handshake (Handshake, HandshakeArguments (..))
5959
import Ouroboros.Network.Protocol.Handshake.Codec (cborTermVersionDataCodec,
6060
codecHandshake, noTimeLimitsHandshake)
61-
import Ouroboros.Network.TxSubmission.Inbound.V2.Types
62-
(TxSubmissionMempoolWriter)
6361
import Ouroboros.Network.TxSubmission.Mempool.Reader
62+
import Ouroboros.Network.TxSubmission.Mempool.Simple
63+
import Ouroboros.Network.Util.ShowProxy
6464

6565

6666
type HandshakeTr ntcAddr = Mx.WithBearer (ConnectionId ntcAddr) (TraceSendRecv (Handshake NodeToClientVersion CBOR.Term))
@@ -100,8 +100,8 @@ data Codecs crypto m =
100100
dmqCodecs :: ( MonadST m
101101
, Crypto crypto
102102
)
103-
=> (SigMempoolFail -> CBOR.Encoding)
104-
-> (forall s. CBOR.Decoder s SigMempoolFail)
103+
=> (MempoolAddFail (Sig crypto) -> CBOR.Encoding)
104+
-> (forall s. CBOR.Decoder s (MempoolAddFail (Sig crypto)))
105105
-> Codecs crypto m
106106
dmqCodecs encodeReject' decodeReject' =
107107
Codecs {
@@ -132,18 +132,20 @@ data Apps ntcAddr m a =
132132
-- | Construct applications for the node-to-client protocols
133133
--
134134
ntcApps
135-
:: forall crypto idx ntcAddr m.
135+
:: forall crypto idx ntcAddr failure m.
136136
( MonadThrow m
137137
, MonadThread m
138138
, MonadSTM m
139139
, Crypto crypto
140-
, Typeable crypto
141140
, Aeson.ToJSON ntcAddr
141+
, Aeson.ToJSON (MempoolAddFail (Sig crypto))
142+
, ShowProxy (MempoolAddFail (Sig crypto))
143+
, ShowProxy (Sig crypto)
142144
)
143145
=> (forall ev. Aeson.ToJSON ev => Tracer m (WithEventType ev))
144146
-> Configuration
145147
-> TxSubmissionMempoolReader SigId (Sig crypto) idx m
146-
-> TxSubmissionMempoolWriter SigId (Sig crypto) idx m
148+
-> MempoolWriter SigId (Sig crypto) failure idx m
147149
-> Word16
148150
-> Codecs crypto m
149151
-> Apps ntcAddr m ()

dmq-node/src/DMQ/NodeToNode.hs

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ import Ouroboros.Network.PeerSharing (bracketPeerSharingClient,
9090
peerSharingClient, peerSharingServer)
9191
import Ouroboros.Network.Snocket (RemoteAddress)
9292
import Ouroboros.Network.TxSubmission.Inbound.V2 as SigSubmission
93-
import Ouroboros.Network.TxSubmission.Mempool.Simple qualified as Mempool
93+
import Ouroboros.Network.TxSubmission.Mempool.Reader
9494
import Ouroboros.Network.TxSubmission.Outbound
9595

9696
import Ouroboros.Network.OrphanInstances ()
@@ -150,12 +150,12 @@ data Apps addr m a b =
150150
}
151151

152152
ntnApps
153-
:: forall crypto m addr .
153+
:: forall crypto m addr idx.
154154
( Crypto crypto
155-
, DSIGN.ContextDSIGN (DSIGN crypto) ~ ()
156-
, DSIGN.Signable (DSIGN crypto) (OCertSignable crypto)
157-
, KES.ContextKES (KES crypto) ~ ()
158-
, KES.Signable (KES crypto) BS.ByteString
155+
-- , DSIGN.ContextDSIGN (DSIGN crypto) ~ ()
156+
-- , DSIGN.Signable (DSIGN crypto) (OCertSignable crypto)
157+
-- , KES.ContextKES (KES crypto) ~ ()
158+
-- , KES.Signable (KES crypto) BS.ByteString
159159
, Typeable crypto
160160
, Alternative (STM m)
161161
, MonadAsync m
@@ -166,12 +166,16 @@ ntnApps
166166
, MonadThrow (STM m)
167167
, MonadTimer m
168168
, Ord addr
169+
, Ord idx
169170
, Show addr
170171
, Hashable addr
171172
, Aeson.ToJSON addr
172173
)
173174
=> (forall ev. Aeson.ToJSON ev => Tracer m (WithEventType ev))
174175
-> Configuration
176+
-> TxSubmissionMempoolReader SigId (Sig crypto) idx m
177+
-> TxSubmissionMempoolWriter SigId (Sig crypto) idx m
178+
-> (Sig crypto -> SizeInBytes)
175179
-> NodeKernel crypto addr m
176180
-> Codecs crypto addr m
177181
-> LimitsAndTimeouts crypto addr
@@ -191,6 +195,9 @@ ntnApps
191195
, dmqcSigSubmissionInboundTracer = I sigSubmissionInboundTracer
192196
, dmqcSigSubmissionLogicTracer = I sigSubmissionLogicTracer
193197
}
198+
mempoolReader
199+
mempoolWriter
200+
sigSize
194201
NodeKernel {
195202
fetchClientRegistry
196203
, peerSharingRegistry
@@ -225,20 +232,6 @@ ntnApps
225232
, aPeerSharingServer
226233
}
227234
where
228-
sigSize :: Sig crypto -> SizeInBytes
229-
sigSize _ = 0 -- TODO
230-
231-
mempoolReader = Mempool.getReader sigId sigSize mempool
232-
-- TODO: invalid signatures are just omitted from the mempool. For DMQ
233-
-- we need to validate signatures when we received them, and shutdown
234-
-- connection if we receive one, rather than validate them in the
235-
-- mempool.
236-
mempoolWriter = Mempool.getWriter sigId
237-
(pure ()) -- TODO not needed
238-
(\_ -> validateSig evolutionConfig)
239-
(\_ -> True)
240-
mempool
241-
242235
aSigSubmissionClient
243236
:: NodeToNodeVersion
244237
-> ExpandedInitiatorContext addr m

0 commit comments

Comments
 (0)