diff --git a/config/src/config.rs b/config/src/config.rs index 1cb378096..cde4dde8e 100644 --- a/config/src/config.rs +++ b/config/src/config.rs @@ -318,6 +318,8 @@ pub struct Storage { pub master_key_import_path: Option, /// Keep a map of "address" to "list of UTXOs" in memory, to speed up getBalance and getUtxoInfo methods pub utxos_in_memory: bool, + /// Index RAD hashes in memory + pub rad_hashes_index: bool, /// RocksDB option max_open_files. -1 means unlimited. pub max_open_files: i32, } @@ -779,6 +781,10 @@ impl Storage { .utxos_in_memory .to_owned() .unwrap_or_else(|| defaults.storage_utxos_in_memory()), + rad_hashes_index: config + .rad_hashes_index + .to_owned() + .unwrap_or_else(|| defaults.storage_rad_hashes_index()), max_open_files: config .max_open_files .unwrap_or_else(|| defaults.storage_max_open_files()), @@ -791,6 +797,7 @@ impl Storage { db_path: Some(self.db_path.clone()), master_key_import_path: self.master_key_import_path.clone(), utxos_in_memory: Some(self.utxos_in_memory), + rad_hashes_index: Some(self.rad_hashes_index), max_open_files: Some(self.max_open_files), } } @@ -1360,6 +1367,7 @@ mod tests { db_path: Some(PathBuf::from("other")), master_key_import_path: None, utxos_in_memory: None, + rad_hashes_index: None, max_open_files: None, }; let config = Storage::from_partial(&partial_config, &Testnet); diff --git a/config/src/defaults.rs b/config/src/defaults.rs index dee900207..2c7887392 100644 --- a/config/src/defaults.rs +++ b/config/src/defaults.rs @@ -54,11 +54,16 @@ pub trait Defaults { PathBuf::from(".witnet") } - /// Do not keep utxos in memory by default + /// Keep utxos in memory by default fn storage_utxos_in_memory(&self) -> bool { true } + /// Do not index RAD hashes in memory by default + fn storage_rad_hashes_index(&self) -> bool { + false + } + /// Unlimited number of open files by default fn storage_max_open_files(&self) -> i32 { -1 diff --git a/data_structures/src/chain/mod.rs b/data_structures/src/chain/mod.rs index 4d5bd9203..adfede9e4 100644 --- a/data_structures/src/chain/mod.rs +++ b/data_structures/src/chain/mod.rs @@ -4042,6 +4042,9 @@ pub struct ChainState { /// Unspent Outputs Pool #[serde(skip)] pub unspent_outputs_pool: UnspentOutputsPool, + /// Arrays of (Epoch, DrTxHash) grouped by RadHash + #[serde(skip)] + pub rad_hashes_index: HashMap>, } impl ChainState { @@ -4074,6 +4077,20 @@ impl ChainState { .consensus_constants .clone() } + + pub fn get_data_requests_by_rad_hash(&self, rad_hash: &Hash) -> HashMap> { + let mut found: HashMap> = HashMap::new(); + if let Some(entry) = self.rad_hashes_index.get(rad_hash) { + entry.iter().for_each(|(epoch, dr_tx_hash)| { + if let Some(vec) = found.get_mut(epoch) { + vec.push(*dr_tx_hash) + } else { + found.insert(*epoch, vec![*dr_tx_hash]); + } + }); + } + found + } } /// A boxed and pinned future that resolves to a vector of stuff. diff --git a/data_structures/src/proto/versioning.rs b/data_structures/src/proto/versioning.rs index 21685f1ca..7240d700a 100644 --- a/data_structures/src/proto/versioning.rs +++ b/data_structures/src/proto/versioning.rs @@ -431,10 +431,12 @@ impl Versioned for crate::transaction::StakeTransactionBody { impl Versioned for crate::transaction::UnstakeTransactionBody { type LegacyType = ::ProtoStruct; } - impl Versioned for crate::chain::DataRequestOutput { type LegacyType = ::ProtoStruct; } +impl Versioned for crate::chain::RADRequest { + type LegacyType = ::ProtoStruct; +} pub trait AutoVersioned: ProtobufConvert {} @@ -447,6 +449,7 @@ impl AutoVersioned for crate::transaction::MintTransaction {} impl AutoVersioned for crate::transaction::StakeTransactionBody {} impl AutoVersioned for crate::transaction::UnstakeTransactionBody {} impl AutoVersioned for crate::chain::DataRequestOutput {} +impl AutoVersioned for crate::chain::RADRequest {} pub trait VersionedHashable { fn versioned_hash(&self, version: ProtocolVersion) -> Hash; diff --git a/node/src/actors/chain_manager/actor.rs b/node/src/actors/chain_manager/actor.rs index 8c669a43e..047eb61fe 100644 --- a/node/src/actors/chain_manager/actor.rs +++ b/node/src/actors/chain_manager/actor.rs @@ -172,6 +172,9 @@ impl ChainManager { log::debug!("Initial WIT supply: {}", act.initial_supply); } + // Set whether an in-memory index of data request transactions is required + act.rad_hashes_index = config.storage.rad_hashes_index; + storage_mngr::get_chain_state(storage_keys::chain_state_key(magic)) .into_actor(act) .then(|chain_state_from_storage, _, _| { diff --git a/node/src/actors/chain_manager/handlers.rs b/node/src/actors/chain_manager/handlers.rs index a43051712..b5e50ba48 100644 --- a/node/src/actors/chain_manager/handlers.rs +++ b/node/src/actors/chain_manager/handlers.rs @@ -50,8 +50,8 @@ use crate::{ GetReputation, GetReputationResult, GetSignalingInfo, GetState, GetSuperBlockVotes, GetSupplyInfo, GetSupplyInfo2, GetUtxoInfo, IsConfirmedBlock, PeersBeacons, QueryStakes, QueryStakesOrderByOptions, QueryStakingPowers, ReputationStats, Rewind, - SendLastBeacon, SessionUnitResult, SetLastBeacon, SetPeersLimits, SignalingInfo, - SnapshotExport, SnapshotImport, TryMineBlock, try_do_magic_into_pkh, + SearchDataRequests, SendLastBeacon, SessionUnitResult, SetLastBeacon, SetPeersLimits, + SignalingInfo, SnapshotExport, SnapshotImport, TryMineBlock, try_do_magic_into_pkh, }, sessions_manager::SessionsManager, }, @@ -1528,6 +1528,48 @@ impl Handler for ChainManager { } } +impl Handler for ChainManager { + type Result = ::Result; + + fn handle(&mut self, msg: SearchDataRequests, _ctx: &mut Self::Context) -> Self::Result { + let result = self.chain_state.rad_hashes_index.get(&msg.rad_hash); + if let Some(result) = result { + let limit = msg.limit.unwrap_or(u16::MAX) as usize; + let offset = msg.offset.unwrap_or_default(); + let reverse = msg.reverse.unwrap_or_default(); + let mut since_epoch: i64 = msg.since.unwrap_or_default(); + if let Some(current) = self.current_epoch { + since_epoch = i64::from(current).saturating_add(since_epoch); + } + let since_epoch: u32 = since_epoch.try_into().inspect_err(|&e| { + log::warn!("Invalid 'since' limit on SearchDataRequests: {e}"); + })?; + + Ok(result + .iter() + .filter_map(|tuple| { + if tuple.0 >= since_epoch { + Some(*tuple) + } else { + None + } + }) + .sorted_by(|(epoch_a, _), (epoch_b, _)| { + if reverse { + epoch_b.cmp(epoch_a) + } else { + epoch_a.cmp(epoch_b) + } + }) + .skip(offset) + .take(limit) + .collect()) + } else { + Ok(vec![]) + } + } +} + impl Handler for ChainManager { type Result = ::Result; diff --git a/node/src/actors/chain_manager/mod.rs b/node/src/actors/chain_manager/mod.rs index f469af53d..5600e46ce 100644 --- a/node/src/actors/chain_manager/mod.rs +++ b/node/src/actors/chain_manager/mod.rs @@ -260,6 +260,8 @@ pub struct ChainManager { consensus_constants_wit2: ConsensusConstantsWit2, /// Initial WIT supply initial_supply: u64, + /// Populate RAD hashes index + rad_hashes_index: bool, } impl ChainManager { @@ -938,7 +940,8 @@ impl ChainManager { .. } => { let block_epoch = block.block_header.beacon.checkpoint; - let block_hash = block.versioned_hash(get_protocol_version(Some(block_epoch))); + let protocol_version = get_protocol_version(Some(block_epoch)); + let block_hash = block.versioned_hash(protocol_version); let block_signals = block.block_header.signals; let validator_count = stakes.validator_count(); @@ -990,7 +993,7 @@ impl ChainManager { // Check total amount staked to make sure we can activate wit/2 let superblock_period = chain_info.consensus_constants.superblock_period; - if get_protocol_version(Some(block_epoch)) == ProtocolVersion::V1_8 + if protocol_version == ProtocolVersion::V1_8 && get_protocol_version_activation_epoch(ProtocolVersion::V2_0) == Epoch::MAX && block_epoch % u32::from(superblock_period) == 0 { @@ -1111,6 +1114,22 @@ impl ChainManager { &HashSet::default(), ); + // Update chain state's RAD hash index, only if set up in config: + if self.rad_hashes_index { + block.txns.data_request_txns + .iter() + .for_each(|dr_tx| { + let dr_tx_hash = dr_tx.versioned_hash(protocol_version); + let rad_hash = dr_tx.body.dr_output.data_request.versioned_hash(protocol_version); + if let Some(vec) = self.chain_state.rad_hashes_index.get_mut(&rad_hash) { + vec.push((block_epoch, dr_tx_hash)); + } else { + self.chain_state.rad_hashes_index.insert(rad_hash, vec![(block_epoch, dr_tx_hash)]); + } + log::warn!("Indexing data request {dr_tx_hash:?} with rad_hash {rad_hash:?} on epoch {block_epoch}"); + }); + } + match self.sm_state { StateMachine::WaitingConsensus => { // Persist finished data requests into storage diff --git a/node/src/actors/json_rpc/api.rs b/node/src/actors/json_rpc/api.rs index c613f11d1..0ba89e414 100644 --- a/node/src/actors/json_rpc/api.rs +++ b/node/src/actors/json_rpc/api.rs @@ -36,8 +36,8 @@ use crate::{ GetItemSuperblock, GetItemTransaction, GetKnownPeers, GetMemoryTransaction, GetMempool, GetNodeStats, GetProtocolInfo, GetReputation, GetSignalingInfo, GetState, GetSupplyInfo, GetSupplyInfo2, GetUtxoInfo, InitializePeers, IsConfirmedBlock, - MagicEither, QueryStakes, QueryStakingPowers, Rewind, SnapshotExport, SnapshotImport, - StakeAuthorization, + MagicEither, QueryStakes, QueryStakingPowers, Rewind, SearchDataRequests, + SnapshotExport, SnapshotImport, StakeAuthorization, }, peers_manager::PeersManager, sessions_manager::SessionsManager, @@ -167,6 +167,9 @@ pub fn attach_regular_methods( server.add_actix_method(system, "getUtxoInfo", move |params: Params| { Box::pin(get_utxo_info(params.parse())) }); + server.add_actix_method(system, "searchDataRequests", |params: Params| { + Box::pin(search_data_requests(params.parse())) + }); } /// Attach the sensitive JSON-RPC methods to a multi-transport server. @@ -2350,6 +2353,30 @@ pub async fn query_stakes(params: Result, Error>) -> JsonRpc .await } +/// Query data requests transaction hashes by providing a RAD hash value +pub async fn search_data_requests( + params: Result, Error>, +) -> JsonRpcResult { + // short-circuit if parameters are wrong + let params = params?; + // parse params or defaults + let msg = params.ok_or(Error::invalid_params("A 'radHash' must be specified"))?; + ChainManager::from_registry() + .send(msg) + .map(|res| match res { + Ok(Ok(result)) => serde_json::to_value(result).map_err(internal_error), + Ok(Err(e)) => { + let err = internal_error_s(e); + Err(err) + } + Err(e) => { + let err = internal_error_s(e); + Err(err) + } + }) + .await +} + /// Format of the output of query_powers #[derive(Clone, Debug, PartialEq, Eq, serde::Deserialize, serde::Serialize)] pub struct QueryStakingPowersRecord { diff --git a/node/src/actors/messages.rs b/node/src/actors/messages.rs index 5a8b3417b..a908e9bb0 100644 --- a/node/src/actors/messages.rs +++ b/node/src/actors/messages.rs @@ -484,6 +484,27 @@ where } } +/// Message for querying data requests transactions by some specified RAD hash value +#[derive(Clone, Debug, Eq, Hash, PartialEq, serde::Deserialize, serde::Serialize)] +#[serde(rename_all = "camelCase")] +pub struct SearchDataRequests { + /// Limits max number of entries to return (default: 0 == u16::MAX) + pub limit: Option, + /// Skips first found entries (default: 0) + pub offset: Option, + /// The RAD hash of the data request transactions being searched for: + pub rad_hash: Hash, + /// List fresher data requests first + pub reverse: Option, + /// Select data requests that have been included on or after the specified + /// absolute epoch, or relative epoch if negative (default: -30240, or one week ago) + pub since: Option, +} + +impl Message for SearchDataRequests { + type Result = Result, anyhow::Error>; +} + /// Builds a `DataRequestTransaction` from a `DataRequestOutput` #[derive(Clone, Debug, Default, Hash, Eq, PartialEq, Serialize, Deserialize)] pub struct BuildDrt { diff --git a/witnet.toml b/witnet.toml index fb31da670..ac8699dab 100644 --- a/witnet.toml +++ b/witnet.toml @@ -60,6 +60,8 @@ reject_sybil_inbounds = true [storage] # Path of the folder where RocksDB storage files will be written to. db_path = ".witnet/storage" +# Enable in-memory index of data requests grouped by RAD hash +rad_hashes_index = false [jsonrpc] # Enables or disables the JSON-RPC server altogether. This is needed for using the CLI methods of the node.