diff --git a/Cargo.toml b/Cargo.toml index 8644145..08b532f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [workspace] members = [ - "clickhouse-driver","cityhash","lz4a","derive","cityhash-rs" + "clickhouse-driver","cityhash","derive","cityhash-rs" ] [profile.release] diff --git a/README.md b/README.md index 7e56aae..30e3cb0 100644 --- a/README.md +++ b/README.md @@ -2,9 +2,9 @@ [![Build Status](https://travis-ci.org/ddulesov/clickhouse_driver.svg?branch=master)](https://travis-ci.org/ddulesov/clickhouse_driver) ![Rust](https://github.com/ddulesov/clickhouse_driver/workflows/Rust/badge.svg?branch=master) -Asynchronous pure rust tokio-based Clickhouse client library - -development status: **alpha** +Asynchronous pure rust tokio-based Clickhouse client library + +development status: **alpha** Tested on Linux x86-64 (ubuntu 20.04 LTS), Windows 10. @@ -32,8 +32,8 @@ Tested on Linux x86-64 (ubuntu 20.04 LTS), Windows 10. ### Use cases ### -* Make query using SQL syntax supported by Clickhouse Server -* Execute arbitrary DDL commands +* Make query using SQL syntax supported by Clickhouse Server +* Execute arbitrary DDL commands * Query Server status * Insert into Clickhouse Server big (possibly continues ) data stream * Load-balancing using round-robin method @@ -50,21 +50,20 @@ git module update --init --recursive Building requires rust 1.41 stable or nightly, tokio-0.2.x. -- Add next lines into the `dependencies` section of your `Cargo.toml`: +- Add next lines into the `dependencies` section of your `Cargo.toml`: -```toml +```toml clickhouse-driver = { version="0.1.0-alpha.3", path="../path_to_package/clickhouse-driver"} - clickhouse-driver-lz4 = { version="0.1.0", path="../path_to_package/lz4a"} clickhouse-driver-cthrs = { version="0.1.0", path="../path_to_package/cityhash-rs"} ``` - Add usage in main.rs ```rust - extern crate clickhouse_driver; + extern crate clickhouse_driver; use clickhouse_driver::prelude::*; ``` - -to connect to server provide connection url + +to connect to server provide connection url ``` tcp://username:password@localhost/database?paramname=paramvalue&... ``` @@ -77,65 +76,65 @@ tcp://user:default@localhost/log?ping_timout=200ms&execute_timeout=5s&query_time lz4 - fast and efficient compression method. It can significantly reduce transmitted data size and time if used for big data chunks. For small data it's better to choose none compression; - + * `connection_timeout` - timeout for establishing connection. Default is 500ms; - + * `execute_timeout` - timeout for waiting result of **execute** method call - If the execute used for alter huge table it can take + If the execute used for alter huge table it can take long time to complete. In this case set this parameter to appropriate value. In other cases leave the default value (180 sec); - + * `query_timout` - timeout for waiting response from the server with next block of data in **query** call. - Note. Large data query may take long time. This timeout requires that only + Note. Large data query may take long time. This timeout requires that only one chunk of data will receive until the end of timeout. Default value is 180sec; - + * `insert_timeout` - timeout for waiting result of `insert` call insert method call returns error if the server does not receive message until the end of insert_timeout. As insert data processing is asynchronous it doesn't include server block processing time. Default value is 180 sec; - + * `ping_timout` - wait before ping response. - The host will be considered unavailable if the server - does not return pong response until the end of ping_timeout; - -* `retry_timeout` - the number of seconds to wait before send next ping - if the server does not return; - -* `ping_before_query` - 1 (default) or 0. This option if set - requires the driver to check Clickhouse server responsibility + The host will be considered unavailable if the server + does not return pong response until the end of ping_timeout; + +* `retry_timeout` - the number of seconds to wait before send next ping + if the server does not return; + +* `ping_before_query` - 1 (default) or 0. This option if set + requires the driver to check Clickhouse server responsibility after returning connection from pool; - -* `pool_min` - minimal connection pool size. + +* `pool_min` - minimal connection pool size. the number of idle connections that can be kept in the pool; Default value is 2; - -* `pool_max` - maximum number of established connections that the pool + +* `pool_max` - maximum number of established connections that the pool can issued. If the task require new connection while the pool reaches the maximum and there is not idle connection then this task will be put in waiting queue. Default value is 10; - -* `readonly` - 0 (default) |1|2. - 0 - all commands allowed. - 2- select queries and change settings, + +* `readonly` - 0 (default) |1|2. + 0 - all commands allowed. + 2- select queries and change settings, 1 - only select queries ; - + * `keepalive` - keepalive TCP option; * `host` - alternative host(s) All timeout parameters accept integer number - the number of seconds. To specify timeout in milliseconds add `ms` at the end. -Examples: +Examples: - `200ms` ( 200 mseconds ) - `20` ( 20 seconds ) - `10s` ( 10 seconds ) - + ### Example -```rust +```rust struct Blob { id: u64, @@ -209,7 +208,7 @@ async fn main() -> Result<(), io::Error> { * Doesn't support multidimensional Array, * Array data types readonly * LowCardinality - readonly and just String base type -* Insert method support only limited data types +* Insert method support only limited data types `insert` requires that inserted data exactly matches table column type - Int8(16|32|64) - i8(16|32|64) - UInt8(16|32|64) - u8(16|32|64) @@ -222,16 +221,16 @@ async fn main() -> Result<(), io::Error> { - IPv6 - AddrIpv6 - String - &str,String, or &[u8] - Enum8|16 - &str or String. Also, i16 value of enum index can be retrieved. - + ### Roadmap -* `Array` column data type - read/write -* `Tuple` - no plans to support +* `Array` column data type - read/write +* `Tuple` - no plans to support * `AggregateFunction` - no plans to support -* `LowCardinality` - add write support, extend it to `Date`, `DateTime` types +* `LowCardinality` - add write support, extend it to `Date`, `DateTime` types * `Serde` - Row serializer/deserializer interface in addition to ad-hoc one * `TLS` * C-API ? * `async_std` runtime - - + + diff --git a/cityhash-rs/src/lib.rs b/cityhash-rs/src/lib.rs index 3a506e9..170c21b 100644 --- a/cityhash-rs/src/lib.rs +++ b/cityhash-rs/src/lib.rs @@ -298,7 +298,6 @@ pub fn city_hash_128(src: &[u8]) -> Pair { mod tests { use super::*; use clickhouse_driver_cth::{_CityHash128, _CityMurmur, c_char, Hash128}; - use std::vec::Vec; fn city_hash_ref(source: &[u8]) -> Pair { let h = unsafe { _CityHash128(source.as_ptr() as *const c_char, source.len()) }; @@ -392,14 +391,11 @@ mod tests { #[test] fn test_hash_128() { - const MAX_SIZE: u32 = 1024 * 10; + const MAX_SIZE: u32 = 1024; const ITER_COUNT: u8 = 5; use rand::Rng; for s in 8..MAX_SIZE { - let mut b: Vec = Vec::with_capacity(s as usize); - unsafe { - b.set_len(s as usize); - } + let mut b = std::vec![0u8; s as usize]; for _ in 0..ITER_COUNT { rand::thread_rng().fill(&mut b[..]); assert_eq!(city_hash_ref(b.as_ref()), city_hash_128(b.as_ref())); diff --git a/clickhouse-driver/Cargo.toml b/clickhouse-driver/Cargo.toml index 3b3765c..af9f2ef 100644 --- a/clickhouse-driver/Cargo.toml +++ b/clickhouse-driver/Cargo.toml @@ -13,12 +13,13 @@ repository = "https://github.com/ddulesov/clickhouse_driver" [dependencies] clickhouse-driver-cth = { version="0.1.0", path="../cityhash", optional=true} clickhouse-driver-cthrs = { version="0.1.1", path="../cityhash-rs", optional=true} -clickhouse-driver-lz4 = { version="0.1.0", path="../lz4a" } +lz4 = "1.23.1" thiserror = { version="1.0" } log = { version="0.4.8" } url = { version="^2" } -tokio = { version = "0.2", features = ["rt-core", "sync", "tcp", "time", "dns", "stream", "test-util", "io-util","macros"] } +tokio = { version = "1.15.0", features = ["rt", "sync", "net", "time", "rt-multi-thread", "test-util", "io-util","macros"] } + tokio-native-tls = { version = "0.1.0", optional= true } pin-project-lite = { version="^0.1" } futures = { version="0.3" } diff --git a/clickhouse-driver/examples/array.rs b/clickhouse-driver/examples/array.rs index 0569e50..f926f5f 100644 --- a/clickhouse-driver/examples/array.rs +++ b/clickhouse-driver/examples/array.rs @@ -9,10 +9,10 @@ use std::{env, io, time}; macro_rules! get { ($row: ident, $i: expr, $err: ident) => { - $row.value($i)?.ok_or_else($err)?; + $row.value($i)?.ok_or_else($err)? }; ($row: ident, $i: expr, $err: ident, opt) => { - $row.value($i)?; + $row.value($i)? }; } // CREATE table mainx( @@ -64,6 +64,7 @@ async fn main() -> Result<(), io::Error> { } } eprintln!("fetch {} rows in {} msec", c, start.elapsed().as_millis()); + eprintln!("progress {:?}", result.progress); } Ok(()) diff --git a/clickhouse-driver/examples/bulk-insert.rs b/clickhouse-driver/examples/bulk-insert.rs index 4b41cd5..0a914bb 100644 --- a/clickhouse-driver/examples/bulk-insert.rs +++ b/clickhouse-driver/examples/bulk-insert.rs @@ -9,7 +9,7 @@ static NAMES: [&str; 5] = ["one", "two", "three", "four", "five"]; /// Block size const BSIZE: u64 = 10000; /// The number of blocks -const CIRCLE: u64 = 1000; +const CIRCLE: u64 = 10; fn next_block(i: u64) -> Block<'static> { let now = chrono::offset::Utc::now(); diff --git a/clickhouse-driver/examples/insert-select.rs b/clickhouse-driver/examples/insert-select.rs index 4038c79..c7ffffa 100644 --- a/clickhouse-driver/examples/insert-select.rs +++ b/clickhouse-driver/examples/insert-select.rs @@ -44,7 +44,7 @@ impl Deserialize for Blob { }) } } -const C: u64 = 10000; +const C: u64 = 100; #[tokio::main] async fn main() -> Result<(), io::Error> { diff --git a/clickhouse-driver/examples/select.rs b/clickhouse-driver/examples/select.rs index c153b61..cc0b72f 100644 --- a/clickhouse-driver/examples/select.rs +++ b/clickhouse-driver/examples/select.rs @@ -17,7 +17,7 @@ struct Perf { macro_rules! get { ($row: ident, $i: expr, $err: ident) => { - $row.value($i)?.ok_or_else($err)?; + $row.value($i)?.ok_or_else($err)? }; } diff --git a/clickhouse-driver/src/client.rs b/clickhouse-driver/src/client.rs index 2ecd6be..5ad8f7c 100644 --- a/clickhouse-driver/src/client.rs +++ b/clickhouse-driver/src/client.rs @@ -1,11 +1,12 @@ use std::fmt; use std::io; -use std::net::Shutdown; use std::time::Duration; use crate::protocol::block::{Block, ServerBlock}; +use crate::protocol::code::SERVER_PROGRESS; use crate::protocol::command::{CommandSink, ResponseStream}; use crate::protocol::insert::InsertSink; +use crate::protocol::packet::Progress; use crate::protocol::packet::Response; use crate::protocol::packet::{Execute, Hello, Ping}; use crate::protocol::query::Query; @@ -70,6 +71,7 @@ pub(super) type InnerConection = Inner; pub struct QueryResult<'a, R: AsyncRead> { pub(crate) inner: ResponseStream<'a, R>, + pub progress: Progress, } impl<'a, R: AsyncWrite + AsyncRead + Unpin + Send> QueryResult<'a, R> { @@ -83,8 +85,8 @@ impl<'a, R: AsyncRead + Unpin + Send> QueryResult<'a, R> { while let Some(packet) = self.inner.next().await? { if let Response::Data(block) = packet { return Ok(Some(block)); - } else { - //println!("packet {:?}", packet); + } else if let Response::Progress(progress) = packet { + self.progress.update(progress); } } Ok(None) @@ -136,11 +138,13 @@ impl Inner { } /// Split self into Stream and ServerInfo + #[allow(clippy::manual_map)] #[inline] pub(super) fn split(&mut self) -> Option<(&mut (dyn AsyncReadWrite + '_), &mut ServerInfo)> { let info = &mut self.info as *mut ServerInfo; // SAFETY: This can be risky if caller use returned values inside Connection // or InnerConnection methods. Never do it. + match self.socket { None => None, Some(ref mut socket) => unsafe { Some((socket, &mut *info)) }, @@ -150,7 +154,7 @@ impl Inner { /// Establish a connection to a Clickhouse server pub(super) async fn init(options: &Options, addr: &str) -> Result> { let socket = TcpStream::connect(addr).await?; - Inner::setup_stream(&socket, &options)?; + Inner::setup_stream(&socket, options)?; info!( "connection established to: {}", socket.peer_addr().unwrap() @@ -182,7 +186,7 @@ impl Inner { let (revision, timezone) = match stream.next().await? { Some(Response::Hello(_name, _major, _minor, revision, tz)) => (revision as u32, tz), _ => { - socket.shutdown(Shutdown::Both)?; + socket.shutdown().await?; return Err(DriverError::ConnectionTimeout.into()); } }; @@ -198,8 +202,9 @@ impl Inner { } #[inline] - fn setup_stream(socket: &TcpStream, options: &Options) -> io::Result<()> { - socket.set_keepalive(options.keepalive)?; + fn setup_stream(socket: &TcpStream, _options: &Options) -> io::Result<()> { + // TODO set keepalive + // socket.set_keepalive(options.keepalive)?; socket.set_nodelay(true) } @@ -290,10 +295,10 @@ impl Connection { } /// Disconnects this connection from server. - pub(super) fn disconnect(mut self) -> Result<()> { - if let Some(socket) = self.inner.socket.take() { + pub(super) async fn disconnect(mut self) -> Result<()> { + if let Some(mut socket) = self.inner.socket.take() { debug!("disconnect method. shutdown connection"); - socket.shutdown(Shutdown::Both)?; + socket.shutdown().await?; } Ok(()) } @@ -315,7 +320,7 @@ impl Connection { /// associated with the connection pub async fn close(mut self) -> Result<()> { self.inner.cleanup().await?; - self.disconnect() + self.disconnect().await } /// Ping-pong connection verification @@ -353,9 +358,11 @@ impl Connection { let mut stream = self.write_command(self.options().execute_timeout).await?; - if let Some(packet) = stream.next().await? { - warn!("execute method returns packet {}", packet.code()); - return Err(DriverError::PacketOutOfOrder(packet.code()).into()); + while let Some(packet) = stream.next().await? { + if packet.code() != SERVER_PROGRESS { + warn!("execute method returns packet {}", packet.code()); + return Err(DriverError::PacketOutOfOrder(packet.code()).into()); + } } Ok(()) @@ -369,7 +376,7 @@ impl Connection { ) -> Result> { check_pending!(self); - let query = Query::from_block(&data); + let query = Query::from_block(data); self.out.clear(); Execute { query }.write(self.inner.info(), &mut self.out)?; @@ -379,16 +386,25 @@ impl Connection { // Before call insert we will check input data against server table structure stream.skip_empty = false; //stream.set_pending(); - let mut stream = if let Some(Response::Data(block)) = stream.next().await? { - InsertSink::new(stream, block) - } else { - stream.set_deteriorated(); - warn!("insert method. unknown packet received"); - return Err(DriverError::PacketOutOfOrder(0).into()); - }; - stream.next(&data).await?; - Ok(stream) + while let Some(packet) = stream.next().await? { + match packet { + Response::Progress(_) => { + continue; + } + Response::Data(block) => { + let mut st = InsertSink::new(stream, block); + st.next(data).await?; + return Ok(st); + } + _ => { + stream.set_deteriorated(); + warn!("insert method. unknown packet received"); + return Err(DriverError::PacketOutOfOrder(0).into()); + } + } + } + unreachable!() } /// Execute SELECT statement returning sequence of ServerBlocks. /// @@ -411,7 +427,10 @@ impl Connection { let stream = self.write_command(self.options().query_timeout).await?; - Ok(QueryResult { inner: stream }) + Ok(QueryResult { + inner: stream, + progress: Progress::default(), + }) } /// Take inner connection. Drain itself diff --git a/clickhouse-driver/src/compression/mod.rs b/clickhouse-driver/src/compression/mod.rs index 68609d1..fea878a 100644 --- a/clickhouse-driver/src/compression/mod.rs +++ b/clickhouse-driver/src/compression/mod.rs @@ -1,18 +1,19 @@ use std::io; +use std::os::raw::{c_char, c_int}; use std::pin::Pin; use std::task::{Context, Poll}; use byteorder::WriteBytesExt; use byteorder::{LittleEndian, ReadBytesExt}; -use tokio::io::{AsyncBufRead, AsyncRead}; +use lz4::liblz4::LZ4_decompress_safe; +use lz4::liblz4::{LZ4_compressBound, LZ4_compress_default}; + +use tokio::io::{AsyncBufRead, AsyncRead, ReadBuf}; #[cfg(not(feature = "cityhash_rs"))] use clickhouse_driver_cth::city_hash_128; #[cfg(feature = "cityhash_rs")] use clickhouse_driver_cthrs::city_hash_128; -pub use clickhouse_driver_lz4::{ - LZ4_Compress, LZ4_CompressBounds, LZ4_Decompress, LZ4_compress_default, -}; use crate::errors; use crate::errors::DriverError; @@ -37,12 +38,19 @@ where W: io::Write + ?Sized, { fn flush(&mut self) -> std::result::Result<(), io::Error> { - let bufsize = LZ4_CompressBounds(self.buf.len()); - let mut compressed: Vec = Vec::with_capacity(9 + bufsize); - unsafe { - compressed.set_len(9 + bufsize); - } - let bufsize = LZ4_Compress(&self.buf[..], &mut compressed[9..]); + let bufsize = unsafe { LZ4_compressBound(self.buf.len() as i32) as usize }; + + let mut compressed = vec![0u8; 9 + bufsize]; + + let bufsize = unsafe { + LZ4_compress_default( + self.buf[..].as_ptr() as *const c_char, + compressed[9..].as_mut_ptr() as *mut c_char, + self.buf.len() as i32, + bufsize as i32, + ) + }; + if bufsize < 0 { return Err(io::Error::new( io::ErrorKind::Interrupted, @@ -86,6 +94,7 @@ where } } +#[derive(Debug)] enum CompressionState { /// Read first 16 byte containing hash sum of the block +9 bytes of header Hash, @@ -143,10 +152,18 @@ fn decompress(buf: &[u8], raw_size: usize) -> io::Result> { }; // TODO: decompression in-place - let mut orig: Vec = Vec::with_capacity(raw_size); + let orig = vec![0u8; raw_size]; + unsafe { - orig.set_len(raw_size); - let res = LZ4_Decompress(&buf[16 + 9..], &mut orig[..]); + let res = { + LZ4_decompress_safe( + (buf.as_ptr() as *const c_char).add(16 + 9), + orig.as_ptr() as *mut c_char, + (buf.len() - 16 - 9) as c_int, + raw_size as i32, + ) + }; + if res < 0 { return Err(io::Error::new( io::ErrorKind::InvalidData, @@ -172,10 +189,7 @@ pub(crate) struct LZ4ReadAdapter { impl LZ4ReadAdapter { pub(crate) fn new(reader: R) -> LZ4ReadAdapter { - let mut data = Vec::with_capacity(16 + 9); - unsafe { - data.set_len(16 + 9); - } + let data = vec![0; 16 + 9]; LZ4ReadAdapter { data, state: CompressionState::Hash, @@ -233,9 +247,9 @@ impl LZ4ReadAdapter { CompressionState::Compressed => { let raw_size = self.raw_size; // Read from underlying reader. Bypass buffering - let n = - ready!(Pin::new(&mut self.inner).poll_read(cx, &mut self.data[self.p..])?); - self.p += n; + let mut buf = ReadBuf::new(self.data[self.p..].as_mut()); + ready!(Pin::new(&mut self.inner).poll_read(cx, &mut buf)?); + self.p += buf.filled().len(); // Got to the end. Decompress and return raw buffer if self.p >= self.data.len() { debug_assert_eq!(self.p, self.data.len()); @@ -269,10 +283,7 @@ impl LZ4ReadAdapter { return Poll::Ready(Ok(self.data.as_slice())); } else { // Read block by chunks. First read buffered data - self.data.reserve((comp_size - 9) as usize); - unsafe { - self.data.set_len(16 + comp_size as usize); - } + self.data.resize(16 + comp_size as usize, 0); debug_assert!(self.data.capacity() >= (comp_size + 16)); debug_assert!(self.data.len() == (comp_size + 16)); @@ -302,10 +313,7 @@ impl LZ4ReadAdapter { self.raw_size = raw_size as usize; let comp_size = comp_size as usize; - self.data.reserve((comp_size - 9) as usize); - unsafe { - self.data.set_len(16 + comp_size as usize); - } + self.data.resize((16 + comp_size) as usize, 0); //self.p = 9 + 16; // Read the rest of LZ4 block without double buffering right from TCP socket self.state = CompressionState::Compressed; @@ -322,8 +330,8 @@ impl AsyncRead for LZ4ReadAdapter { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { + buf: &mut ReadBuf<'_>, + ) -> Poll> { let me = self.get_mut(); // println!("req read {} bytes from {}", @@ -336,16 +344,17 @@ impl AsyncRead for LZ4ReadAdapter { let data = ready!(me.fill(cx)?); let ready_to_read = data.len(); - let toread = std::cmp::min(buf.len(), ready_to_read); + + let toread = std::cmp::min(buf.remaining(), ready_to_read); //let cz = io::copy(inner, buf)?; if toread == 0 { - return Poll::Ready(Ok(0)); + return Poll::Ready(Ok(())); }; - buf[0..toread].copy_from_slice(&data[0..toread]); + buf.put_slice(&data[0..toread]); me.inner_consume(toread); - Poll::Ready(Ok(toread)) + Poll::Ready(Ok(())) } } diff --git a/clickhouse-driver/src/lib.rs b/clickhouse-driver/src/lib.rs index 691e2a8..7657032 100644 --- a/clickhouse-driver/src/lib.rs +++ b/clickhouse-driver/src/lib.rs @@ -5,7 +5,6 @@ //! add next lines in dependencies section of `Cargo.toml` //! ```toml //! clickhouse-driver = { version="0.1.0-alpha.1", path="../path_to_package/clickhouse-driver"} -//! clickhouse-driver-lz4 = { version="0.1.0", path="../path_to_package/lz4a"} //! clickhouse-driver-cthrs = { version="0.1.0", path="../path_to_package/cityhash-rs"} //! ``` //! ## Supported Clickhouse data types @@ -34,7 +33,7 @@ //! - default database: "default" //! #![recursion_limit = "128"] -#![allow(clippy::unknown_clippy_lints)] +#![allow(unknown_lints)] extern crate byteorder; extern crate chrono; extern crate chrono_tz; diff --git a/clickhouse-driver/src/pool/mod.rs b/clickhouse-driver/src/pool/mod.rs index 0588254..7049a0a 100644 --- a/clickhouse-driver/src/pool/mod.rs +++ b/clickhouse-driver/src/pool/mod.rs @@ -13,7 +13,7 @@ use crossbeam::queue; pub use options::CompressionMethod; pub use options::Options; use parking_lot::Mutex; -use tokio::time::delay_for; +use tokio::time::sleep; use util::*; use crate::{ @@ -249,7 +249,7 @@ impl Pool { break; } - delay_for(inner.options.retry_timeout).await; + sleep(inner.options.retry_timeout).await; c -= 1; } diff --git a/clickhouse-driver/src/protocol/block.rs b/clickhouse-driver/src/protocol/block.rs index 727b186..bd3daf4 100644 --- a/clickhouse-driver/src/protocol/block.rs +++ b/clickhouse-driver/src/protocol/block.rs @@ -222,6 +222,7 @@ impl std::fmt::Debug for BlockColumn { } } +#[allow(dead_code)] #[derive(Debug)] pub struct ServerBlock { pub(crate) columns: Vec, diff --git a/clickhouse-driver/src/protocol/column.rs b/clickhouse-driver/src/protocol/column.rs index f587288..5cf4496 100644 --- a/clickhouse-driver/src/protocol/column.rs +++ b/clickhouse-driver/src/protocol/column.rs @@ -507,7 +507,7 @@ pub(crate) struct EnumColumn { } /// T can be 'u8'(Enum8) or 'u16'(Enum16) -impl EnumColumn { +impl EnumColumn { /// Read server stream as a sequence of u8(u16) bytes /// and store them in internal buffer pub(crate) async fn load_column( @@ -517,9 +517,8 @@ impl EnumColumn { ) -> Result> { debug_assert!(field.get_meta().is_some()); - let mut data: Vec = Vec::with_capacity(rows as usize); + let mut data: Vec = vec![T::default(); rows as usize]; unsafe { - data.set_len(rows as usize); reader.read_exact(as_bytes_bufer_mut(&mut data)).await?; } Ok(EnumColumn { data }) @@ -635,10 +634,7 @@ impl FixedColumn { let mut data: Vec = Vec::with_capacity(rows as usize); for _ in 0..rows { - let mut s: Vec = Vec::with_capacity(width as usize); - unsafe { - s.set_len(width as usize); - } + let mut s = vec![0; width as usize]; reader.read_exact(s.as_mut_slice()).await?; data.push(s.into_boxed_slice()); } @@ -658,16 +654,15 @@ impl FixedColumn { // } // } -impl FixedColumn { +impl FixedColumn { /// Load Column of integer data types from the socket buffer pub(crate) async fn load_column( mut reader: R, rows: u64, ) -> Result> { - let mut data: Vec = Vec::with_capacity(rows as usize); + let mut data: Vec = vec![T::default(); rows as usize]; unsafe { - data.set_len(rows as usize); // Big-endian? Never heard reader.read_exact(as_bytes_bufer_mut(&mut data)).await?; } @@ -705,7 +700,7 @@ where /// Wrap the Column in FixedNullColumn adapter if nulls array is provided. /// Return the unchanged Column if nulls is not provided. #[inline] - pub(crate) fn set_nulls(self: Self, nulls: Option>) -> Box { + pub(crate) fn set_nulls(self, nulls: Option>) -> Box { if let Some(nulls) = nulls { Box::new(FixedNullColumn { inner: self, nulls }) } else { @@ -804,7 +799,7 @@ pub(crate) struct FixedArrayColumn { index: Vec, } -impl FixedArrayColumn { +impl FixedArrayColumn { pub(crate) async fn load_column(mut reader: R, rows: u64) -> Result> where R: AsyncBufRead + Unpin, @@ -879,7 +874,7 @@ pub(crate) struct LowCardinalityColumn { // the number or keys sending in one block. impl LowCardinalityColumn where - T: Sized + Ord + Copy + Send + Into + 'static, + T: Default + Clone + Sized + Ord + Copy + Send + Into + 'static, { pub(crate) async fn load_column( reader: R, diff --git a/clickhouse-driver/src/protocol/command.rs b/clickhouse-driver/src/protocol/command.rs index 04546c4..d3be18a 100644 --- a/clickhouse-driver/src/protocol/command.rs +++ b/clickhouse-driver/src/protocol/command.rs @@ -9,7 +9,7 @@ use tokio::time; use super::block::{BlockColumn, BlockColumnHeader, BlockInfo, ServerBlock}; use super::code::*; use super::column::{AsInColumn, EnumColumn, FixedColumn, StringColumn}; -use super::packet::{ProfileInfo, Response}; +use super::packet::{ProfileInfo, Progress, Response}; use super::value::{ValueDate, ValueDateTime, ValueDateTime64, ValueIp4, ValueIp6, ValueUuid}; use super::ServerInfo; use crate::compression::LZ4ReadAdapter; @@ -198,8 +198,8 @@ impl<'a, R: AsyncRead + Unpin + Send> ResponseStream<'a, R> { } SERVER_PROGRESS => { let revision = self.info.revision; - let (_rows, _bytes, _total) = - read_progress(self.reader.inner_ref(), revision).await?; + let progress = read_progress(self.reader.inner_ref(), revision).await?; + return Ok(Some(Response::Progress(progress))); } code => { self.set_fuse(); @@ -260,10 +260,7 @@ pub(crate) async fn load_nulls( mut reader: R, rows: u64, ) -> Result> { - let mut nulls: Vec = Vec::with_capacity(rows as usize); - unsafe { - nulls.set_len(rows as usize); - }; + let mut nulls = vec![0u8; rows as usize]; reader.read_exact(nulls.as_mut_slice()).await?; Ok(nulls) @@ -595,7 +592,7 @@ where Ok(Response::Hello(name, major, minor, revision, timezone)) } -async fn read_progress(reader: R, revision: u32) -> Result<(u64, u64, u64)> +async fn read_progress(reader: R, revision: u32) -> Result where R: AsyncBufRead + Unpin, { @@ -609,7 +606,7 @@ where } else { 0 }; - Ok((rows, bytes, total)) + Ok(Progress::new(rows, bytes, total)) } async fn read_profile(reader: R) -> Result @@ -646,6 +643,7 @@ mod test { use std::pin::Pin; use std::task::{Context, Poll}; use tokio::io::AsyncRead; + use tokio::io::ReadBuf; struct AsyncChunk<'a> { buf: &'a [u8], @@ -669,11 +667,12 @@ mod test { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - let size = cmp::min(self.cs as usize, buf.len()); + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let size = cmp::min(self.cs as usize, buf.remaining()); + if size == 0 { - return Poll::Ready(Ok(0)); + return Poll::Ready(Ok(())); }; let me = self.get_mut(); @@ -686,8 +685,10 @@ mod test { return Poll::Pending; } } - let size = io::Read::read(&mut me.buf, &mut buf[0..size])?; - Ok(size).into() + let _ = io::Read::read(&mut me.buf, buf.initialize_unfilled_to(size))?; + buf.advance(size); + + Poll::Ready(Ok(())) } } diff --git a/clickhouse-driver/src/protocol/decoder.rs b/clickhouse-driver/src/protocol/decoder.rs index e2c95b6..d442113 100644 --- a/clickhouse-driver/src/protocol/decoder.rs +++ b/clickhouse-driver/src/protocol/decoder.rs @@ -3,7 +3,7 @@ use core::marker::PhantomData; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -use tokio::io::{AsyncBufRead, AsyncRead, AsyncReadExt}; +use tokio::io::{AsyncBufRead, AsyncRead, AsyncReadExt, ReadBuf}; /// Read string data encoded as VarInt(length) + bytearray pub(crate) struct ReadVString<'a, T: FromBytes, R> { @@ -34,11 +34,8 @@ impl FromBytes for Vec { impl<'a, T: FromBytes, R: AsyncRead> ReadVString<'a, T, R> { pub(crate) fn new(reader: &'a mut R, length: usize) -> ReadVString<'a, T, R> { - let data = unsafe { - let mut v = Vec::with_capacity(length); - v.set_len(length); - v - }; + let data = vec![0u8; length]; + let inner = unsafe { Pin::new_unchecked(reader) }; ReadVString { length_: 0, @@ -53,10 +50,9 @@ impl<'a, T: FromBytes, R: AsyncRead> ReadVString<'a, T, R> { if self.length_ == self.data.len() { return FromBytes::from_bytes(&mut self.data).into(); } else { - self.length_ += ready!(self - .inner - .as_mut() - .poll_read(cx, &mut self.data[self.length_..])?); + let mut buffer = ReadBuf::new(self.data[self.length_..].as_mut()); + ready!(self.inner.as_mut().poll_read(cx, &mut buffer)?); + self.length_ += buffer.filled().len(); } } } @@ -91,7 +87,10 @@ impl<'a, R: AsyncRead> ReadVInt<'a, R> { let mut b = [0u8; 1]; loop { //let inner: Pin<&mut R> = unsafe{ Pin::new_unchecked(self.inner) }; - if 0 == ready!(self.inner.as_mut().poll_read(cx, &mut b)?) { + let mut buffer = ReadBuf::new(&mut b); + ready!(self.inner.as_mut().poll_read(cx, &mut buffer)?); + + if buffer.filled().is_empty() { return Poll::Ready(Err(DriverError::BrokenData.into())); } let b = b[0]; diff --git a/clickhouse-driver/src/protocol/packet.rs b/clickhouse-driver/src/protocol/packet.rs index 5743c3d..ac3d6a5 100644 --- a/clickhouse-driver/src/protocol/packet.rs +++ b/clickhouse-driver/src/protocol/packet.rs @@ -98,15 +98,34 @@ impl std::fmt::Debug for ProfileInfo { } } -pub struct Statistics { - rows: u64, - bytes: u64, - total: u64, +#[derive(Copy, Clone, Default, PartialEq)] +pub struct Progress { + pub rows: u64, + pub bytes: u64, + pub total: u64, +} + +impl Progress { + pub fn new(rows: u64, bytes: u64, total: u64) -> Self { + Progress { rows, bytes, total } + } + + pub fn update(&mut self, progress: Progress) { + self.rows += progress.rows; + self.bytes += progress.bytes; + self.total += progress.total; + } + + pub fn reset(&mut self) { + self.rows = 0; + self.bytes = 0; + self.total = 0; + } } -impl std::fmt::Debug for Statistics { +impl std::fmt::Debug for Progress { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Statistics") + f.debug_struct("Progress") .field("rows", &self.rows) .field("bytes", &self.bytes) .field("total", &self.total) @@ -119,10 +138,11 @@ pub(crate) enum Response { Pong, Data(ServerBlock), Hello(String, u64, u64, u64, Tz), + Progress(Progress), // Eos, // Profile(u64, u64, u64, u8, u8), // Totals, - // Extremes, + // Extremes } impl Response { @@ -131,6 +151,7 @@ impl Response { Response::Pong => SERVER_PONG, Response::Data(_) => SERVER_DATA, Response::Hello(..) => SERVER_HELLO, + Response::Progress(..) => SERVER_PROGRESS, // Response::Eos => SERVER_END_OF_STREAM, // Response::Extremes => SERVER_EXTREMES, // Response::Totals => SERVER_TOTALS, diff --git a/clickhouse-driver/src/protocol/query.rs b/clickhouse-driver/src/protocol/query.rs index 11f29ad..3847265 100644 --- a/clickhouse-driver/src/protocol/query.rs +++ b/clickhouse-driver/src/protocol/query.rs @@ -95,6 +95,7 @@ impl ServerWriter for Query { "readonly".encode(writer)?; ro.encode(writer)? } + // Empty string end up settings block ().encode(writer)?; diff --git a/clickhouse-driver/src/protocol/value.rs b/clickhouse-driver/src/protocol/value.rs index d3f0f95..77b637b 100644 --- a/clickhouse-driver/src/protocol/value.rs +++ b/clickhouse-driver/src/protocol/value.rs @@ -235,7 +235,10 @@ where } fn is_compatible(&self, field: &Field) -> bool { - matches!(field.sql_type, SqlType::String | SqlType::FixedString(_) | SqlType::Enum8 | SqlType::Enum16) + matches!( + field.sql_type, + SqlType::String | SqlType::FixedString(_) | SqlType::Enum8 | SqlType::Enum16 + ) } } /// IPv4 output column @@ -513,6 +516,7 @@ impl_intocolumn_simple!(Option, |f| { #[derive(Copy, Clone, Debug)] pub struct ValueIp4([u8; 4]); +#[allow(clippy::from_over_into)] impl Into for ValueIp4 { fn into(mut self) -> Ipv4Addr { self.0.reverse(); @@ -523,6 +527,7 @@ impl Into for ValueIp4 { #[derive(Copy, Clone, Debug)] pub struct ValueIp6([u8; 16]); +#[allow(clippy::from_over_into)] impl Into for ValueIp6 { fn into(mut self) -> Ipv6Addr { self.0.reverse(); @@ -544,7 +549,7 @@ impl Into for ValueUuid { #[derive(Copy, Clone, Debug)] pub struct ValueDate(pub [u8; 2]); -#[derive(Copy, Clone, Debug)] +#[derive(Copy, Clone, Debug, Default)] pub struct ValueDateTime(pub [u8; 4]); #[derive(Copy, Clone, Debug)] @@ -561,6 +566,7 @@ pub struct ValueDecimal64(pub i64); pub struct ValueDecimal128(pub i128); impl ValueDate { + #[allow(clippy::wrong_self_convention)] pub(super) fn to_date(&self) -> chrono::Date { ValueDate::date_inner(i16::from_le_bytes(self.0)) } @@ -573,6 +579,7 @@ impl ValueDate { } impl ValueDateTime { + #[allow(clippy::wrong_self_convention)] pub(super) fn to_datetime(&self) -> DateTime { ValueDateTime::datetime_inner(i32::from_le_bytes(self.0)) } diff --git a/clickhouse-driver/src/types/decimal.rs b/clickhouse-driver/src/types/decimal.rs index 09e38c4..7910e6c 100644 --- a/clickhouse-driver/src/types/decimal.rs +++ b/clickhouse-driver/src/types/decimal.rs @@ -19,7 +19,7 @@ macro_rules! bits { } #[allow(unused_comparisons)] fn fit(precision: u8) -> bool { - precision <= $max && precision >= $min + ($min..=$max).contains(&precision) } } }; @@ -177,8 +177,8 @@ mod test { #[test] fn test_decimal_range() { assert_eq!(i32::fit(8), true); - assert_ne!(i32::fit(10), true); - assert_ne!(i64::fit(6), true); + assert_eq!(i32::fit(10), false); + assert_eq!(i64::fit(6), false); assert_eq!(i64::fit(14), true); } diff --git a/clickhouse-driver/src/types/parser.rs b/clickhouse-driver/src/types/parser.rs index aa84e63..b2c0107 100644 --- a/clickhouse-driver/src/types/parser.rs +++ b/clickhouse-driver/src/types/parser.rs @@ -284,7 +284,7 @@ pub fn parse_type_field(t: &str) -> Result { meta: None, }) }; - }; + } if t.eq("String") { return field!(SqlType::String); diff --git a/clickhouse-driver/tests/insert.rs b/clickhouse-driver/tests/insert.rs index 22f3720..b2b8081 100644 --- a/clickhouse-driver/tests/insert.rs +++ b/clickhouse-driver/tests/insert.rs @@ -471,8 +471,8 @@ async fn test_insert_ip() -> errors::Result<()> { let ip4: Ipv4Addr = "127.0.0.1".parse().unwrap(); let ip6 = Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0x1); - assert_eq!(ip6.is_loopback(), true); - assert_eq!(ip4.is_loopback(), true); + assert!(ip6.is_loopback()); + assert!(ip4.is_loopback()); let data_0 = vec![0u64, 1, 2, 3, 4, 5]; let data_1 = vec![ip4; 6]; let data_2 = vec![ip6; 6]; diff --git a/clickhouse-driver/tests/pool.rs b/clickhouse-driver/tests/pool.rs index 20122e7..791b4e4 100644 --- a/clickhouse-driver/tests/pool.rs +++ b/clickhouse-driver/tests/pool.rs @@ -2,7 +2,7 @@ use clickhouse_driver::prelude::errors; use clickhouse_driver::prelude::*; use std::io; use std::time::Duration; -use tokio::{self, time::delay_for}; +use tokio::{self, time::sleep}; mod common; use common::{get_config, get_pool, get_pool_extend}; @@ -30,7 +30,7 @@ async fn test_connection_pool() -> io::Result<()> { tokio::spawn(async move { let mut conn = pool.connection().await.unwrap(); conn.ping().await.expect("ping ok"); - delay_for(Duration::new(2, 0)).await; + sleep(Duration::new(2, 0)).await; }) }) .collect(); @@ -55,7 +55,7 @@ async fn test_conn_race() -> io::Result<()> { tokio::spawn(async move { let _ = pool.connection().await.unwrap(); - delay_for(Duration::new(0, MSEC_100)).await; + sleep(Duration::new(0, MSEC_100)).await; }) }) .collect(); @@ -85,7 +85,6 @@ async fn test_ping() -> errors::Result<()> { // assert!(err_timeout.unwrap_err().is_timeout()); - let config = get_config().set_timeout(Duration::from_nanos(1)); let pool = Pool::create(config).unwrap(); diff --git a/clickhouse-driver/tests/query.rs b/clickhouse-driver/tests/query.rs index bc87269..4152ec4 100644 --- a/clickhouse-driver/tests/query.rs +++ b/clickhouse-driver/tests/query.rs @@ -38,7 +38,7 @@ async fn test_query_compress() -> errors::Result<()> { let mut qr = conn.query("SELECT lcs FROM main LIMIT 1000").await?; while let Some(_block) = qr.next().await? {} - assert_eq!(qr.is_pending(), false); + assert!(!qr.is_pending()); } drop(pool); @@ -54,7 +54,7 @@ async fn test_query_compress() -> errors::Result<()> { //println!("{}",lcs); } } - assert_eq!(qr.is_pending(), false); + assert!(!qr.is_pending()); drop(pool); let pool = get_pool(); @@ -68,7 +68,7 @@ async fn test_query_compress() -> errors::Result<()> { //println!("{}", lcs); } } - assert_eq!(qr.is_pending(), false); + assert!(!qr.is_pending()); } Ok(()) } @@ -84,11 +84,11 @@ async fn test_query_pending() -> errors::Result<()> { while let Some(_block) = query_result.next().await? { i += 1; if i == 1 { - assert_eq!(query_result.is_pending(), true); + assert!(query_result.is_pending()); } } - assert_eq!(query_result.is_pending(), false); + assert!(!query_result.is_pending()); drop(query_result); Ok(()) } diff --git a/derive/Cargo.toml b/derive/Cargo.toml index cfab200..652027e 100644 --- a/derive/Cargo.toml +++ b/derive/Cargo.toml @@ -8,6 +8,7 @@ edition = "2018" [dependencies] syn = "1.0.31" quote = "1.0.7" +proc-macro2 = "1.0" [lib] proc-macro = true diff --git a/derive/src/lib.rs b/derive/src/lib.rs index d83a5bf..3eab53c 100644 --- a/derive/src/lib.rs +++ b/derive/src/lib.rs @@ -4,8 +4,6 @@ extern crate quote; extern crate syn; use proc_macro::TokenStream; - -use syn::export::TokenStream2; use syn::{parse_macro_input, DeriveInput}; #[proc_macro_derive(IsCommand)] @@ -15,7 +13,7 @@ pub fn command_derive(input: TokenStream) -> TokenStream { gen.into() } -fn command_impl(ast: &syn::DeriveInput) -> TokenStream2 { +fn command_impl(ast: &syn::DeriveInput) -> proc_macro2::TokenStream { let gen = &ast.generics; let name = &ast.ident; quote! { diff --git a/lz4a/Cargo.toml b/lz4a/Cargo.toml deleted file mode 100644 index 78cd218..0000000 --- a/lz4a/Cargo.toml +++ /dev/null @@ -1,20 +0,0 @@ -[package] -name = "clickhouse-driver-lz4" -description = "LZ4 for ClickHouse Asynchronous Driver." -version = "0.1.0" -authors = ["Dmitry Dulesov "] -edition = "2018" -build = "build.rs" -links = "clickhouse-driver" -license = "MIT" -repository = "https://github.com/ddulesov/clickhouse_driver" -include = ["liblz4/lib/**","benches/**","src/**","build.rs"] - -[dependencies] -libc = "0.2" - -[build-dependencies] -cc = "1.0.54" - -[dev-dependencies] -rand = "0.7.3" diff --git a/lz4a/benches/bench.rs b/lz4a/benches/bench.rs deleted file mode 100644 index 26b638c..0000000 --- a/lz4a/benches/bench.rs +++ /dev/null @@ -1,141 +0,0 @@ -#![cfg(rustc_nightly)] -#![feature(test)] -#![feature(core_intrinsics)] - -extern crate libc; -extern crate test; - -use test::Bencher; - -use libc::c_char; -use rand::prelude::*; - -//use core::intrinsics::size_of; -use lz4a::*; - -fn data(len: usize) -> Vec { - let mut b: Vec = Vec::with_capacity(len as usize); - unsafe { - b.set_len(len as usize); - } - - b -} - -#[bench] -fn bench_compress_static_1m(b: &mut Bencher) { - let mut input = data(1024 * 1024); - rand::thread_rng().fill(&mut input[..]); - - let cz: usize = LZ4_CompressInplaceBufferSize(1024 * 1024); - let mut output: Vec = Vec::with_capacity(cz); - unsafe { - output.set_len(cz as usize); - } - let mut code: i32 = 0; - - b.iter(|| { - code = unsafe { - LZ4_compress_default( - input.as_ptr() as *const c_char, - output.as_mut_ptr() as *mut c_char, - (input.len()) as i32, - output.len() as i32, - ) - }; - }); -} - -#[bench] -fn bench_decompress_static_1m(b: &mut Bencher) { - let mut input = data(1024 * 1024); - rand::thread_rng().fill(&mut input[..]); - - let cz: usize = LZ4_CompressInplaceBufferSize(1024 * 1024); - let mut output: Vec = Vec::with_capacity(cz); - - unsafe { - output.set_len(cz as usize); - } - - let mut code = unsafe { - LZ4_compress_default( - input.as_ptr() as *const c_char, - output.as_mut_ptr() as *mut c_char, - (input.len()) as i32, - output.len() as i32, - ) - }; - - output.truncate(code as usize); - - let compressed_len = code; - //drop( input ); - - b.iter(|| unsafe { - code = LZ4_decompress_safe( - output.as_ptr() as *const c_char, - input.as_mut_ptr() as *mut c_char, - compressed_len as i32, - input.len() as i32, - ); - }); -} - -#[bench] -fn bench_compress_highratio_1m(b: &mut Bencher) { - let input = data(1024 * 1024); - - let cz: usize = LZ4_CompressInplaceBufferSize(1024 * 1024); - let mut output: Vec = Vec::with_capacity(cz); - unsafe { - output.set_len(cz as usize); - } - let mut code: i32 = 0; - - b.iter(|| { - code = unsafe { - LZ4_compress_default( - input.as_ptr() as *const c_char, - output.as_mut_ptr() as *mut c_char, - (input.len()) as i32, - output.len() as i32, - ) - }; - }); -} - -#[bench] -fn bench_decompress_highratio_1m(b: &mut Bencher) { - let mut input = data(1024 * 1024); - - let cz: usize = LZ4_CompressInplaceBufferSize(1024 * 1024); - let mut output: Vec = Vec::with_capacity(cz); - - unsafe { - output.set_len(cz as usize); - } - - let mut code = unsafe { - LZ4_compress_default( - input.as_ptr() as *const c_char, - output.as_mut_ptr() as *mut c_char, - (input.len()) as i32, - output.len() as i32, - ) - }; - - output.truncate(code as usize); - - let compressed_len = code; - //drop( input ); - - b.iter(|| unsafe { - code = LZ4_decompress_safe( - output.as_ptr() as *const c_char, - input.as_mut_ptr() as *mut c_char, - compressed_len as i32, - input.len() as i32, - ); - }); -} diff --git a/lz4a/build.rs b/lz4a/build.rs deleted file mode 100644 index 2e2fbbc..0000000 --- a/lz4a/build.rs +++ /dev/null @@ -1,59 +0,0 @@ -extern crate cc; - -use std::error::Error; -use std::path::PathBuf; -use std::{env, fs, process}; - -fn main() { - match run() { - Ok(()) => (), - Err(err) => { - eprintln!("{}", err); - process::exit(1); - } - } -} -#[allow(clippy::single_match)] -fn run() -> Result<(), Box> { - let mut compiler = cc::Build::new(); - compiler - .file("liblz4/lib/lz4.c") - .file("liblz4/lib/lz4hc.c") - //.file("liblz4/lib/lz4frame.c") - //.file("liblz4/lib/xxhash.c") - // We always compile the C with optimization, because otherwise it is 20x slower. - .opt_level(3); - match env::var("TARGET") - .map_err(|err| format!("reading TARGET environment variable: {}", err))? - .as_str() - { - "i686-pc-windows-gnu" => { - compiler.flag("-fno-tree-vectorize"); - } - _ => {} - } - compiler.compile("liblz4.a"); - - let src = env::current_dir()?.join("liblz4").join("lib"); - let dst = PathBuf::from(env::var_os("OUT_DIR").ok_or("missing OUT_DIR environment variable")?); - let include = dst.join("include"); - fs::create_dir_all(&include) - .map_err(|err| format!("creating directory {}: {}", include.display(), err))?; - for e in fs::read_dir(&src)? { - let e = e?; - let utf8_file_name = e - .file_name() - .into_string() - .map_err(|_| format!("unable to convert file name {:?} to UTF-8", e.file_name()))?; - if utf8_file_name.ends_with(".h") { - let from = e.path(); - let to = include.join(e.file_name()); - fs::copy(&from, &to).map_err(|err| { - format!("copying {} to {}: {}", from.display(), to.display(), err) - })?; - } - } - println!("cargo:root={}", dst.display()); - - Ok(()) -} diff --git a/lz4a/src/lib.rs b/lz4a/src/lib.rs deleted file mode 100644 index 1ca43d3..0000000 --- a/lz4a/src/lib.rs +++ /dev/null @@ -1,247 +0,0 @@ -#![no_std] -extern crate libc; -#[cfg(test)] -#[macro_use] -extern crate std; - -use libc::{c_char, c_int, c_uint, size_t}; - -pub type LZ4FErrorCode = size_t; -pub const LZ4F_VERSION: c_uint = 100; - -extern "C" { - // int LZ4_compress_default(const char* source, char* dest, int sourceSize, int maxDestSize); - #[allow(non_snake_case)] - pub fn LZ4_compress_default( - source: *const c_char, - dest: *mut c_char, - sourceSize: c_int, - maxDestSize: c_int, - ) -> c_int; - - // int LZ4_compress_fast (const char* source, char* dest, int sourceSize, int maxDestSize, int acceleration); - #[allow(non_snake_case)] - pub fn LZ4_compress_fast( - source: *const c_char, - dest: *mut c_char, - sourceSize: c_int, - maxDestSize: c_int, - acceleration: c_int, - ) -> c_int; - - // int LZ4_compress_HC (const char* src, char* dst, int srcSize, int dstCapacity, int compressionLevel); - #[allow(non_snake_case)] - pub fn LZ4_compress_HC( - src: *const c_char, - dst: *mut c_char, - srcSize: c_int, - dstCapacity: c_int, - compressionLevel: c_int, - ) -> c_int; - - // int LZ4_decompress_safe (const char* source, char* dest, int compressedSize, int maxDecompressedSize); - #[allow(non_snake_case)] - pub fn LZ4_decompress_safe( - source: *const c_char, - dest: *mut c_char, - compressedSize: c_int, - maxDecompressedSize: c_int, - ) -> c_int; - - #[allow(non_snake_case)] - pub fn LZ4_decompress_fast( - source: *const c_char, - dest: *mut c_char, - originaldSize: c_int, - ) -> c_int; - - // const char* LZ4F_getErrorName(LZ4F_errorCode_t code); - pub fn LZ4F_getErrorName(code: size_t) -> *const c_char; - - // int LZ4_versionNumber(void) - pub fn LZ4_versionNumber() -> c_int; - - // int LZ4_compressBound(int isize) - fn LZ4_compressBound(size: c_int) -> c_int; - -} - -const LZ4_DISTANCE_MAX: usize = 65535; - -#[allow(non_snake_case)] -#[inline] -pub const fn LZ4_CompressInplaceBufferSize(decompressed: usize) -> usize { - decompressed + LZ4_DISTANCE_MAX + 32 -} - -#[allow(non_snake_case)] -#[inline] -pub const fn LZ4_DecompressInplaceBufferSize(compressed: usize) -> usize { - compressed + (compressed >> 8) + 32 -} - -#[allow(non_snake_case)] -#[inline] -pub fn LZ4_Decompress(src: &[u8], dst: &mut [u8]) -> i32 { - unsafe { - LZ4_decompress_safe( - src.as_ptr() as *const c_char, - dst.as_mut_ptr() as *mut c_char, - src.len() as c_int, - dst.len() as c_int, - ) - } -} - -#[allow(non_snake_case)] -#[inline] -pub fn LZ4_Compress(src: &[u8], dst: &mut [u8]) -> i32 { - unsafe { - LZ4_compress_default( - src.as_ptr() as *const c_char, - dst.as_mut_ptr() as *mut c_char, - src.len() as c_int, - dst.len() as c_int, - ) - } -} - -#[allow(non_snake_case)] -#[inline] -pub fn LZ4_CompressBounds(src: usize) -> usize { - unsafe { LZ4_compressBound(src as c_int) as usize } -} - -#[cfg(test)] -mod test { - extern crate rand; - use self::rand::RngCore; - use crate::*; - use libc::c_int; - - #[test] - fn test_version_number() { - let version = unsafe { LZ4_versionNumber() }; - assert_eq!(version, 10902 as c_int); - - // 640 kb original size - assert_eq!(unsafe { LZ4_compressBound(640 * 1024) }, 657946); - - // 1Mb destination bufer - assert_eq!(LZ4_CompressInplaceBufferSize(983009), 1024 * 1024); - assert_eq!(LZ4_DecompressInplaceBufferSize(1044464), 1024 * 1024 - 1); - } - - #[test] - fn test_compression() { - use std::vec::Vec; - let mut rng = rand::thread_rng(); - - for sz in [600_usize, 1024, 6000, 65000, 650000].iter() { - let cz: usize = LZ4_CompressInplaceBufferSize(*sz); - - let mut orig: Vec = Vec::with_capacity(cz); - unsafe { - orig.set_len(cz); - rng.fill_bytes(&mut orig[..]); - - let margin = cz - *sz; - //compress inplace - //maximum compressed size - let bz = LZ4_compressBound(*sz as c_int); - //destination compression bufer - let mut comp: Vec = Vec::with_capacity(bz as usize); - - comp.set_len(bz as usize); - - //normal compression - let code = LZ4_compress_default( - orig.as_ptr().add(margin) as *const c_char, - comp.as_mut_ptr() as *mut c_char, - (orig.len() - margin) as i32, - comp.len() as i32, - ); - - assert!(code >= 0); - assert_eq!(orig.len() - margin, *sz); - let compressed_sz = code as usize; - - //compression inplace - let code = LZ4_compress_default( - orig.as_ptr().add(margin) as *const c_char, - orig.as_mut_ptr() as *mut c_char, - (orig.len() - margin) as i32, - orig.len() as i32, - ); - - assert!(code >= 0); - - assert_eq!(&comp[0..compressed_sz], &orig[0..compressed_sz]); - } - } - - assert_eq!(1, 1); - } - - #[test] - fn test_decompression() { - use std::vec::Vec; - //let mut rng = rand::thread_rng(); - - for sz in [600_usize, 1024, 6000, 65000, 650000].iter() { - let mut orig: Vec = Vec::with_capacity(*sz); - unsafe { - orig.set_len(*sz); - - { - //it's sort of randomized data - orig[0] = 1; - orig[*sz / 4] = 4; - orig[*sz / 2] = 7; - orig[*sz * 2 / 3] = 10; - orig[*sz - 1] = 1; - } - - let bz = LZ4_compressBound(*sz as c_int) as usize; - - let mut comp: Vec = Vec::with_capacity(bz); - comp.set_len(bz); - - let code = LZ4_compress_default( - orig.as_ptr() as *const c_char, - comp.as_mut_ptr() as *mut c_char, - (orig.len()) as i32, - (bz) as i32, - ); - - assert!(code > 0); - //size of compressed data - println!( - "orig {}; compressed {}; in buf len {}", - *sz, - code as usize, - comp.len() - ); - //compressed size - let cz = code as usize; - - let mut buf: Vec = Vec::with_capacity(*sz); - buf.set_len(*sz); - - let code = LZ4_decompress_safe( - comp.as_ptr() as *const c_char, - buf.as_mut_ptr() as *mut c_char, - cz as i32, - *sz as i32, - ); - - assert!(code > 0); - - let cz = code as usize; - - assert_eq!(cz, *sz); - assert_eq!(&orig[0..*sz], &buf[0..cz]); - } - } - } -}