diff --git a/Cargo.lock b/Cargo.lock index aa7eb276e28..280ea82193b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1617,6 +1617,7 @@ dependencies = [ "prometheus", "prometheus-static-metric", "protobuf", + "proxy_ffi", "raft", "raft-proto", "raftstore", @@ -4420,6 +4421,26 @@ dependencies = [ "protobuf-codegen", ] +[[package]] +name = "proxy_ffi" +version = "0.0.1" +dependencies = [ + "engine_rocks", + "engine_traits", + "fail", + "futures 0.3.15", + "futures-util", + "keys", + "kvproto", + "lazy_static", + "protobuf", + "slog", + "slog-global", + "tikv_util", + "tokio", + "tokio-timer", +] + [[package]] name = "proxy_server" version = "0.0.1" diff --git a/Cargo.toml b/Cargo.toml index 6235d1d9c8a..4e407a30b25 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -301,6 +301,7 @@ members = [ "fuzz/fuzzer-honggfuzz", "fuzz/fuzzer-libfuzzer", "gen-proxy-ffi", + "proxy_ffi", "proxy_tests", "raftstore-proxy", "tests", @@ -388,6 +389,7 @@ engine_store_ffi = { path = "engine_store_ffi", default-features = false } new-mock-engine-store = { path = "new-mock-engine-store", default-features = false } proxy_server = { path = "proxy_server", default-features = false } engine_tiflash = { path = "engine_tiflash", default-features = false } +proxy_ffi = { path = "proxy_ffi", default-features = false } [profile.dev.package.grpcio-sys] debug = false diff --git a/engine_store_ffi/Cargo.toml b/engine_store_ffi/Cargo.toml index f24a69afae2..291365e9b88 100644 --- a/engine_store_ffi/Cargo.toml +++ b/engine_store_ffi/Cargo.toml @@ -37,7 +37,6 @@ collections = { workspace = true } crossbeam = "0.8" derivative = "2" encryption = { workspace = true, default-features = false } - engine_rocks = { workspace = true, default-features = false } # Should be [dev-dependencies] but we need to control the features # https://github.com/rust-lang/cargo/issues/6915 @@ -67,6 +66,7 @@ portable-atomic = "0.3" prometheus = { version = "0.13", features = ["nightly"] } prometheus-static-metric = "0.5" protobuf = { version = "2.8", features = ["bytes"] } +proxy_ffi = { workspace = true, default-features = false } raft = { version = "0.7.0", default-features = false, features = ["protobuf-codec"] } raft-proto = { version = "0.7.0", default-features = false } raftstore = { workspace = true, default-features = false } diff --git a/engine_store_ffi/src/ffi/mod.rs b/engine_store_ffi/src/ffi/mod.rs index 4f63dbd8e53..3b1218ea564 100644 --- a/engine_store_ffi/src/ffi/mod.rs +++ b/engine_store_ffi/src/ffi/mod.rs @@ -3,38 +3,17 @@ /// All mods end up with `_impls` impl structs defined in interface. /// Other mods which define and impl structs should not end up with name /// `_impls`. - -#[allow(dead_code)] -pub mod interfaces; -// All ffi impls that without raft domain. -pub mod basic_ffi_impls; -// All ffi impls that within raft domain, but without proxy helper context. -pub mod domain_impls; -// All ffi impls that within proxy helper context. -pub mod context_impls; -pub mod encryption_impls; -// FFI directly related with EngineStoreServerHelper. -pub mod engine_store_helper_impls; pub(crate) mod lock_cf_reader; // FFI directly related with RaftStoreProxyFFIHelper. +pub mod encryption_impls; +pub mod raftstore_proxy; pub mod raftstore_proxy_helper_impls; -pub mod sst_reader_impls; +pub mod read_index_helper; pub use engine_tiflash::EngineStoreConfig; +pub use proxy_ffi::*; pub use self::{ - basic_ffi_impls::*, domain_impls::*, encryption_impls::*, engine_store_helper_impls::*, - interfaces::root::DB as interfaces_ffi, lock_cf_reader::*, raftstore_proxy_helper_impls::*, + encryption_impls::*, lock_cf_reader::*, raftstore_proxy::*, raftstore_proxy_helper_impls::*, sst_reader_impls::*, }; - -#[allow(clippy::wrong_self_convention)] -pub trait UnwrapExternCFunc { - unsafe fn into_inner(&self) -> &T; -} - -impl UnwrapExternCFunc for std::option::Option { - unsafe fn into_inner(&self) -> &T { - std::mem::transmute::<&Self, &T>(self) - } -} diff --git a/engine_store_ffi/src/ffi/raftstore_proxy.rs b/engine_store_ffi/src/ffi/raftstore_proxy.rs new file mode 100644 index 00000000000..9f075f22c77 --- /dev/null +++ b/engine_store_ffi/src/ffi/raftstore_proxy.rs @@ -0,0 +1,92 @@ +// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. + +use std::sync::{ + atomic::{AtomicU8, Ordering}, + Arc, +}; + +use encryption::DataKeyManager; +use engine_traits::Peekable; + +use super::{ + interfaces_ffi::{ConstRawVoidPtr, RaftProxyStatus, RaftStoreProxyPtr}, + raftstore_proxy_helper_impls::*, + read_index_helper, +}; +use crate::TiFlashEngine; + +pub struct RaftStoreProxy { + pub status: AtomicU8, + pub key_manager: Option>, + pub read_index_client: Option>, + pub kv_engine: std::sync::RwLock>, +} + +impl RaftStoreProxy { + pub fn new( + status: AtomicU8, + key_manager: Option>, + read_index_client: Option>, + kv_engine: std::sync::RwLock>, + ) -> Self { + RaftStoreProxy { + status, + key_manager, + read_index_client, + kv_engine, + } + } +} + +impl RaftStoreProxyFFI for RaftStoreProxy { + fn set_kv_engine(&mut self, kv_engine: Option) { + let mut lock = self.kv_engine.write().unwrap(); + *lock = kv_engine; + } + + fn set_status(&mut self, s: RaftProxyStatus) { + self.status.store(s as u8, Ordering::SeqCst); + } + + fn get_value_cf(&self, cf: &str, key: &[u8], cb: F) + where + F: FnOnce(Result, String>), + { + let kv_engine_lock = self.kv_engine.read().unwrap(); + let kv_engine = kv_engine_lock.as_ref(); + if kv_engine.is_none() { + cb(Err("KV engine is not initialized".to_string())); + return; + } + let value = kv_engine.unwrap().get_value_cf(cf, key); + match value { + Ok(v) => { + if let Some(x) = v { + cb(Ok(Some(&x))); + } else { + cb(Ok(None)); + } + } + Err(e) => { + cb(Err(format!("{}", e))); + } + } + } +} + +impl RaftStoreProxyPtr { + pub unsafe fn as_ref(&self) -> &RaftStoreProxy { + &*(self.inner as *const RaftStoreProxy) + } + pub fn is_null(&self) -> bool { + self.inner.is_null() + } +} + +impl From<&RaftStoreProxy> for RaftStoreProxyPtr { + fn from(ptr: &RaftStoreProxy) -> Self { + Self { + inner: ptr as *const _ as ConstRawVoidPtr, + } + } +} diff --git a/engine_store_ffi/src/ffi/raftstore_proxy_helper_impls.rs b/engine_store_ffi/src/ffi/raftstore_proxy_helper_impls.rs index 4e976b2d25f..42a5a4ff391 100644 --- a/engine_store_ffi/src/ffi/raftstore_proxy_helper_impls.rs +++ b/engine_store_ffi/src/ffi/raftstore_proxy_helper_impls.rs @@ -2,15 +2,10 @@ use std::{ pin::Pin, - sync::{ - atomic::{AtomicU8, Ordering}, - Arc, - }, + sync::{atomic::Ordering, Arc}, time, }; -use encryption::DataKeyManager; -use engine_traits::Peekable; use kvproto::kvrpcpb; use protobuf::Message; @@ -21,98 +16,13 @@ use super::{ engine_store_helper_impls::*, interfaces_ffi, interfaces_ffi::{ - BaseBuffView, ConstRawVoidPtr, CppStrVecView, KVGetStatus, RaftProxyStatus, - RaftStoreProxyFFIHelper, RaftStoreProxyPtr, RawCppPtr, RawCppStringPtr, RawRustPtr, - RawVoidPtr, SSTReaderInterfaces, + BaseBuffView, CppStrVecView, KVGetStatus, RaftProxyStatus, RaftStoreProxyFFIHelper, + RaftStoreProxyPtr, RawCppPtr, RawCppStringPtr, RawRustPtr, RawVoidPtr, SSTReaderInterfaces, }, + read_index_helper, sst_reader_impls::*, - UnwrapExternCFunc, + utils, UnwrapExternCFunc, }; -use crate::{read_index_helper, utils, TiFlashEngine}; - -pub trait RaftStoreProxyFFI: Sync { - fn set_status(&mut self, s: RaftProxyStatus); - fn get_value_cf(&self, cf: &str, key: &[u8], cb: F) - where - F: FnOnce(Result, String>); - fn set_kv_engine(&mut self, kv_engine: Option); -} - -pub struct RaftStoreProxy { - pub status: AtomicU8, - pub key_manager: Option>, - pub read_index_client: Option>, - pub kv_engine: std::sync::RwLock>, -} - -impl RaftStoreProxy { - pub fn new( - status: AtomicU8, - key_manager: Option>, - read_index_client: Option>, - kv_engine: std::sync::RwLock>, - ) -> Self { - RaftStoreProxy { - status, - key_manager, - read_index_client, - kv_engine, - } - } -} - -impl RaftStoreProxyFFI for RaftStoreProxy { - fn set_kv_engine(&mut self, kv_engine: Option) { - let mut lock = self.kv_engine.write().unwrap(); - *lock = kv_engine; - } - - fn set_status(&mut self, s: RaftProxyStatus) { - self.status.store(s as u8, Ordering::SeqCst); - } - - fn get_value_cf(&self, cf: &str, key: &[u8], cb: F) - where - F: FnOnce(Result, String>), - { - let kv_engine_lock = self.kv_engine.read().unwrap(); - let kv_engine = kv_engine_lock.as_ref(); - if kv_engine.is_none() { - cb(Err("KV engine is not initialized".to_string())); - return; - } - let value = kv_engine.unwrap().get_value_cf(cf, key); - match value { - Ok(v) => { - if let Some(x) = v { - cb(Ok(Some(&x))); - } else { - cb(Ok(None)); - } - } - Err(e) => { - cb(Err(format!("{}", e))); - } - } - } -} - -impl RaftStoreProxyPtr { - pub unsafe fn as_ref(&self) -> &RaftStoreProxy { - &*(self.inner as *const RaftStoreProxy) - } - pub fn is_null(&self) -> bool { - self.inner.is_null() - } -} - -impl From<&RaftStoreProxy> for RaftStoreProxyPtr { - fn from(ptr: &RaftStoreProxy) -> Self { - Self { - inner: ptr as *const _ as ConstRawVoidPtr, - } - } -} impl Clone for RaftStoreProxyPtr { fn clone(&self) -> RaftStoreProxyPtr { @@ -124,8 +34,16 @@ impl Clone for RaftStoreProxyPtr { impl Copy for RaftStoreProxyPtr {} +pub trait RaftStoreProxyFFI: Sync { + fn set_status(&mut self, s: RaftProxyStatus); + fn get_value_cf(&self, cf: &str, key: &[u8], cb: F) + where + F: FnOnce(Result, String>); + fn set_kv_engine(&mut self, kv_engine: Option); +} + impl RaftStoreProxyFFIHelper { - pub fn new(proxy: &RaftStoreProxy) -> Self { + pub fn new(proxy: RaftStoreProxyPtr) -> Self { RaftStoreProxyFFIHelper { proxy_ptr: proxy.into(), fn_handle_get_proxy_status: Some(ffi_handle_get_proxy_status), @@ -353,3 +271,60 @@ pub unsafe extern "C" fn ffi_poll_timer_task(task_ptr: RawVoidPtr, waker: RawVoi 0 } } + +#[repr(u32)] +#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] +pub enum RawRustPtrType { + None = 0, + ReadIndexTask = 1, + ArcFutureWaker = 2, + TimerTask = 3, +} + +impl From for RawRustPtrType { + fn from(x: u32) -> Self { + unsafe { std::mem::transmute(x) } + } +} + +// TODO remove this warn. +#[allow(clippy::from_over_into)] +impl Into for RawRustPtrType { + fn into(self) -> u32 { + unsafe { std::mem::transmute(self) } + } +} + +pub extern "C" fn ffi_gc_rust_ptr(data: RawVoidPtr, type_: interfaces_ffi::RawRustPtrType) { + if data.is_null() { + return; + } + let type_: RawRustPtrType = type_.into(); + match type_ { + RawRustPtrType::ReadIndexTask => unsafe { + drop(Box::from_raw(data as *mut read_index_helper::ReadIndexTask)); + }, + RawRustPtrType::ArcFutureWaker => unsafe { + drop(Box::from_raw(data as *mut utils::ArcNotifyWaker)); + }, + RawRustPtrType::TimerTask => unsafe { + drop(Box::from_raw(data as *mut utils::TimerTask)); + }, + _ => unreachable!(), + } +} + +impl Default for RawRustPtr { + fn default() -> Self { + Self { + ptr: std::ptr::null_mut(), + type_: RawRustPtrType::None.into(), + } + } +} + +impl RawRustPtr { + pub fn is_null(&self) -> bool { + self.ptr.is_null() + } +} diff --git a/engine_store_ffi/src/read_index_helper.rs b/engine_store_ffi/src/ffi/read_index_helper.rs similarity index 100% rename from engine_store_ffi/src/read_index_helper.rs rename to engine_store_ffi/src/ffi/read_index_helper.rs diff --git a/engine_store_ffi/src/lib.rs b/engine_store_ffi/src/lib.rs index d068baacf8f..b98bf686af0 100644 --- a/engine_store_ffi/src/lib.rs +++ b/engine_store_ffi/src/lib.rs @@ -6,8 +6,6 @@ pub mod core; pub mod engine; pub mod ffi; pub mod observer; -pub mod read_index_helper; -mod utils; // Be discreet when expose inner mods by pub use. // engine_store_ffi crate includes too many mods here. diff --git a/new-mock-engine-store/src/mock_cluster.rs b/new-mock-engine-store/src/mock_cluster.rs index bf98252a34c..0d20736c11a 100644 --- a/new-mock-engine-store/src/mock_cluster.rs +++ b/new-mock-engine-store/src/mock_cluster.rs @@ -16,7 +16,7 @@ pub use engine_store_ffi::ffi::{ interfaces_ffi::{ EngineStoreServerHelper, RaftProxyStatus, RaftStoreProxyFFIHelper, RawCppPtr, }, - UnwrapExternCFunc, + RaftStoreProxy, UnwrapExternCFunc, }; pub use engine_store_ffi::TiFlashEngine; use engine_tiflash::DB; @@ -180,7 +180,7 @@ impl> Cluster { key_manager: key_mgr.clone(), read_index_client: match router { Some(r) => Some(Box::new( - engine_store_ffi::read_index_helper::ReadIndexClient::new( + engine_store_ffi::ffi::read_index_helper::ReadIndexClient::new( r.clone(), SysQuota::cpu_cores_quota() as usize * 2, ), @@ -190,7 +190,8 @@ impl> Cluster { kv_engine: std::sync::RwLock::new(Some(engines.kv.clone())), }); - let mut proxy_helper = Box::new(RaftStoreProxyFFIHelper::new(&proxy)); + let proxy_ref = proxy.as_ref(); + let mut proxy_helper = Box::new(RaftStoreProxyFFIHelper::new(proxy_ref.into())); let mut engine_store_server = Box::new(EngineStoreServer::new(id, Some(engines))); engine_store_server.proxy_compat = proxy_compat; engine_store_server.mock_cfg = mock_cfg; @@ -418,7 +419,7 @@ impl> Cluster { let mut lock = self.ffi_helper_set.lock().unwrap(); let ffi_helper_set = lock.get_mut(&node_id).unwrap(); ffi_helper_set.proxy.read_index_client = Some(Box::new( - engine_store_ffi::read_index_helper::ReadIndexClient::new( + engine_store_ffi::ffi::read_index_helper::ReadIndexClient::new( router.clone(), SysQuota::cpu_cores_quota() as usize * 2, ), diff --git a/proxy_ffi/Cargo.toml b/proxy_ffi/Cargo.toml new file mode 100644 index 00000000000..ddd87380ab8 --- /dev/null +++ b/proxy_ffi/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "proxy_ffi" +version = "0.0.1" +authors = ["The TiKV Authors"] +license = "Apache-2.0" +edition = "2018" +publish = false + +[features] +default = [] +failpoints = ["fail/failpoints"] +testexport = [] + +[dependencies] +encryption = { workspace = true, default-features = false } +engine_rocks = { workspace = true, default-features = false } +engine_traits = { workspace = true, default-features = false } +fail = "0.5" +futures = "0.3" +futures-util = { version = "0.3.1", default-features = false, features = ["io"] } +keys = { workspace = true, default-features = false } +kvproto = { git = "https://github.com/pingcap/kvproto.git" } +lazy_static = "1.3" +protobuf = { version = "2.8", features = ["bytes"] } +slog = { version = "2.3", features = ["max_level_trace", "release_max_level_debug"] } +slog-global = { version = "0.1", git = "https://github.com/breeswish/slog-global.git", rev = "d592f88e4dbba5eb439998463054f1a44fbf17b9" } +tikv_util = { workspace = true, default-features = false } +tokio = { version = "1.5", features = ["sync", "rt-multi-thread"] } +tokio-timer = { git = "https://github.com/tikv/tokio", branch = "tokio-timer-hotfix" } diff --git a/engine_store_ffi/src/ffi/basic_ffi_impls.rs b/proxy_ffi/src/basic_ffi_impls.rs similarity index 78% rename from engine_store_ffi/src/ffi/basic_ffi_impls.rs rename to proxy_ffi/src/basic_ffi_impls.rs index 3c756e115f8..8aed6197908 100644 --- a/engine_store_ffi/src/ffi/basic_ffi_impls.rs +++ b/proxy_ffi/src/basic_ffi_impls.rs @@ -1,7 +1,18 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. use std::pin::Pin; -pub use super::interfaces_ffi::BaseBuffView; +use super::interfaces_ffi::BaseBuffView; + +#[allow(clippy::wrong_self_convention)] +pub trait UnwrapExternCFunc { + unsafe fn into_inner(&self) -> &T; +} + +impl UnwrapExternCFunc for std::option::Option { + unsafe fn into_inner(&self) -> &T { + std::mem::transmute::<&Self, &T>(self) + } +} impl From<&[u8]> for BaseBuffView { fn from(s: &[u8]) -> Self { diff --git a/engine_store_ffi/src/ffi/context_impls.rs b/proxy_ffi/src/context_impls.rs similarity index 100% rename from engine_store_ffi/src/ffi/context_impls.rs rename to proxy_ffi/src/context_impls.rs diff --git a/engine_store_ffi/src/ffi/domain_impls.rs b/proxy_ffi/src/domain_impls.rs similarity index 58% rename from engine_store_ffi/src/ffi/domain_impls.rs rename to proxy_ffi/src/domain_impls.rs index ca49e8a4e0c..04e8f424059 100644 --- a/engine_store_ffi/src/ffi/domain_impls.rs +++ b/proxy_ffi/src/domain_impls.rs @@ -1,15 +1,33 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. +use std::pin::Pin; + use engine_traits::{CF_DEFAULT, CF_LOCK, CF_WRITE}; -use super::{ - interfaces_ffi, - interfaces_ffi::{ - BaseBuffView, ColumnFamilyType, RaftCmdHeader, RawRustPtr, RawVoidPtr, WriteCmdType, - WriteCmdsView, - }, +use super::interfaces_ffi::{ + BaseBuffView, ColumnFamilyType, RaftCmdHeader, SSTView, SSTViewVec, WriteCmdType, WriteCmdsView, }; +pub fn into_sst_views(snaps: Vec<(&[u8], ColumnFamilyType)>) -> Vec { + let mut snaps_view = vec![]; + for (path, cf) in snaps { + snaps_view.push(SSTView { + type_: cf, + path: path.into(), + }) + } + snaps_view +} + +impl From>> for SSTViewVec { + fn from(snaps_view: Pin<&Vec>) -> Self { + Self { + views: snaps_view.as_ptr(), + len: snaps_view.len() as u64, + } + } +} + pub fn name_to_cf(cf: &str) -> ColumnFamilyType { if cf.is_empty() { return ColumnFamilyType::Default; @@ -92,62 +110,3 @@ impl RaftCmdHeader { } } } - -#[repr(u32)] -#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] -pub enum RawRustPtrType { - None = 0, - ReadIndexTask = 1, - ArcFutureWaker = 2, - TimerTask = 3, -} - -impl From for RawRustPtrType { - fn from(x: u32) -> Self { - unsafe { std::mem::transmute(x) } - } -} - -// TODO remove this warn. -#[allow(clippy::from_over_into)] -impl Into for RawRustPtrType { - fn into(self) -> u32 { - unsafe { std::mem::transmute(self) } - } -} - -pub extern "C" fn ffi_gc_rust_ptr(data: RawVoidPtr, type_: interfaces_ffi::RawRustPtrType) { - if data.is_null() { - return; - } - let type_: RawRustPtrType = type_.into(); - match type_ { - RawRustPtrType::ReadIndexTask => unsafe { - drop(Box::from_raw( - data as *mut crate::read_index_helper::ReadIndexTask, - )); - }, - RawRustPtrType::ArcFutureWaker => unsafe { - drop(Box::from_raw(data as *mut crate::utils::ArcNotifyWaker)); - }, - RawRustPtrType::TimerTask => unsafe { - drop(Box::from_raw(data as *mut crate::utils::TimerTask)); - }, - _ => unreachable!(), - } -} - -impl Default for RawRustPtr { - fn default() -> Self { - Self { - ptr: std::ptr::null_mut(), - type_: RawRustPtrType::None.into(), - } - } -} - -impl RawRustPtr { - pub fn is_null(&self) -> bool { - self.ptr.is_null() - } -} diff --git a/engine_store_ffi/src/ffi/engine_store_helper_impls.rs b/proxy_ffi/src/engine_store_helper_impls.rs similarity index 99% rename from engine_store_ffi/src/ffi/engine_store_helper_impls.rs rename to proxy_ffi/src/engine_store_helper_impls.rs index b92eed0db02..f4aea0fdb1e 100644 --- a/engine_store_ffi/src/ffi/engine_store_helper_impls.rs +++ b/proxy_ffi/src/engine_store_helper_impls.rs @@ -5,6 +5,7 @@ use kvproto::{kvrpcpb, metapb, raft_cmdpb}; use super::{ basic_ffi_impls::*, + domain_impls::*, interfaces_ffi, interfaces_ffi::{ BaseBuffView, ColumnFamilyType, CppStrWithView, EngineStoreApplyRes, @@ -13,7 +14,6 @@ use super::{ RawCppStringPtr, RawVoidPtr, SpecialCppPtrType, StoreStats, RAFT_STORE_PROXY_MAGIC_NUMBER, RAFT_STORE_PROXY_VERSION, }, - sst_reader_impls::*, UnwrapExternCFunc, WriteCmds, }; diff --git a/engine_store_ffi/src/ffi/interfaces.rs b/proxy_ffi/src/interfaces.rs similarity index 100% rename from engine_store_ffi/src/ffi/interfaces.rs rename to proxy_ffi/src/interfaces.rs diff --git a/proxy_ffi/src/lib.rs b/proxy_ffi/src/lib.rs new file mode 100644 index 00000000000..ab15451f1c3 --- /dev/null +++ b/proxy_ffi/src/lib.rs @@ -0,0 +1,24 @@ +// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. + +#[allow(dead_code)] +pub mod interfaces; + +/// All mods end up with `_impls` impl structs defined in interface. +/// Other mods which define and impl structs should not end up with name +/// `_impls`. +// All ffi impls that without raft domain. +pub mod basic_ffi_impls; +// All ffi impls that within raft domain, but without proxy helper context. +pub mod domain_impls; +// FFI directly related with EngineStoreServerHelper. +pub mod engine_store_helper_impls; +// All ffi impls that within engine store helper context. +pub mod context_impls; + +pub mod sst_reader_impls; +pub mod utils; + +pub use self::{ + basic_ffi_impls::*, context_impls::*, domain_impls::*, engine_store_helper_impls::*, + interfaces::root::DB as interfaces_ffi, sst_reader_impls::*, utils::*, +}; diff --git a/engine_store_ffi/src/ffi/sst_reader_impls.rs b/proxy_ffi/src/sst_reader_impls.rs similarity index 91% rename from engine_store_ffi/src/ffi/sst_reader_impls.rs rename to proxy_ffi/src/sst_reader_impls.rs index 17dd4d81a21..dd2fd9ac920 100644 --- a/engine_store_ffi/src/ffi/sst_reader_impls.rs +++ b/proxy_ffi/src/sst_reader_impls.rs @@ -1,5 +1,5 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. -use std::{cell::RefCell, pin::Pin, sync::Arc}; +use std::{cell::RefCell, sync::Arc}; use encryption::DataKeyManager; use engine_rocks::{get_env, RocksSstIterator, RocksSstReader}; @@ -8,7 +8,7 @@ use engine_traits::{IterOptions, Iterator, RefIterable, SstReader}; use super::{ interfaces_ffi::{ BaseBuffView, ColumnFamilyType, RaftStoreProxyPtr, RawVoidPtr, SSTReaderInterfaces, - SSTReaderPtr, SSTView, SSTViewVec, + SSTReaderPtr, SSTView, }, LockCFFileReader, }; @@ -124,26 +124,6 @@ pub unsafe extern "C" fn ffi_gc_sst_reader(reader: SSTReaderPtr, type_: ColumnFa } } -pub fn into_sst_views(snaps: Vec<(&[u8], ColumnFamilyType)>) -> Vec { - let mut snaps_view = vec![]; - for (path, cf) in snaps { - snaps_view.push(SSTView { - type_: cf, - path: path.into(), - }) - } - snaps_view -} - -impl From>> for SSTViewVec { - fn from(snaps_view: Pin<&Vec>) -> Self { - Self { - views: snaps_view.as_ptr(), - len: snaps_view.len() as u64, - } - } -} - pub struct SSTFileReader<'a> { iter: RefCell>>, remained: RefCell, diff --git a/engine_store_ffi/src/utils.rs b/proxy_ffi/src/utils.rs similarity index 100% rename from engine_store_ffi/src/utils.rs rename to proxy_ffi/src/utils.rs diff --git a/proxy_server/src/run.rs b/proxy_server/src/run.rs index 9c6e68d0b62..21a6190a491 100644 --- a/proxy_server/src/run.rs +++ b/proxy_server/src/run.rs @@ -35,9 +35,9 @@ use engine_store_ffi::{ EngineStoreServerHelper, EngineStoreServerStatus, RaftProxyStatus, RaftStoreProxyFFIHelper, }, + read_index_helper::ReadIndexClient, RaftStoreProxy, RaftStoreProxyFFI, }, - read_index_helper::ReadIndexClient, TiFlashEngine, }; use engine_traits::{ @@ -149,8 +149,9 @@ pub fn run_impl( std::sync::RwLock::new(None), ); + let proxy_ref = &proxy; let proxy_helper = { - let mut proxy_helper = RaftStoreProxyFFIHelper::new(&proxy); + let mut proxy_helper = RaftStoreProxyFFIHelper::new(proxy_ref.into()); proxy_helper.fn_server_info = Some(ffi_server_info); proxy_helper }; @@ -254,8 +255,9 @@ fn run_impl_only_for_decryption( std::sync::RwLock::new(None), ); + let proxy_ref = &proxy; let proxy_helper = { - let mut proxy_helper = RaftStoreProxyFFIHelper::new(&proxy); + let mut proxy_helper = RaftStoreProxyFFIHelper::new(proxy_ref.into()); proxy_helper.fn_server_info = Some(ffi_server_info); proxy_helper }; diff --git a/proxy_tests/proxy/replica_read.rs b/proxy_tests/proxy/replica_read.rs index 3c484240955..e182b6ec0e4 100644 --- a/proxy_tests/proxy/replica_read.rs +++ b/proxy_tests/proxy/replica_read.rs @@ -191,7 +191,6 @@ fn test_read_index() { cluster.must_put(b"k0", b"v0"); cluster.pd_client.must_none_pending_peer(p2.clone()); cluster.pd_client.must_none_pending_peer(p3.clone()); - let region = cluster.get_region(b"k0"); assert_eq!(cluster.leader_of_region(region.get_id()).unwrap(), p1);