Skip to content

Commit 7d4cceb

Browse files
committed
io-uring: Create op_read helper function
Fix tests fail on allocation failure Doc fix and fix double subtraction of offset Fix typos and use assert the stopped task's first poll Add comments and panic on failed u32 conversion
1 parent a5a0c7d commit 7d4cceb

File tree

4 files changed

+102
-114
lines changed

4 files changed

+102
-114
lines changed

tokio/src/fs/read.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,16 @@ use std::{io, path::Path};
3030
///
3131
/// [`ErrorKind::Interrupted`]: std::io::ErrorKind::Interrupted
3232
///
33+
/// # io_uring support
34+
///
35+
/// On Linux, you can also use io_uring for executing system calls. To enable
36+
/// io_uring, you need to specify the `--cfg tokio_unstable` flag at compile time,
37+
/// enable the io-uring cargo feature, and set the `Builder::enable_io_uring`
38+
/// runtime option.
39+
///
40+
/// Support for io_uring is currently experimental, so its behavior may change
41+
/// or it may be removed in future versions.
42+
///
3343
/// # Examples
3444
///
3545
/// ```no_run

tokio/src/fs/read_uring.rs

Lines changed: 59 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ const PROBE_SIZE_U32: u32 = PROBE_SIZE as u32;
1313

1414
// Max bytes we can read using io uring submission at a time
1515
// SAFETY: cannot be higher than u32::MAX for safe cast
16-
// Set to read max 64 blocks at time
16+
// Set to read max 64 MiB at time
1717
const MAX_READ_SIZE: usize = 64 * 1024 * 1024;
1818

1919
pub(crate) async fn read_uring(path: &Path) -> io::Result<Vec<u8>> {
@@ -27,141 +27,111 @@ pub(crate) async fn read_uring(path: &Path) -> io::Result<Vec<u8>> {
2727
.expect("unexpected in-flight operation detected")
2828
.into();
2929

30-
// extra single capacity for the whole size to fit without any reallocation
31-
let buf = Vec::with_capacity(size_hint.unwrap_or(0));
30+
let mut buf = Vec::new();
3231

33-
read_to_end_uring(size_hint, fd, buf).await
32+
if let Some(size_hint) = size_hint {
33+
buf.try_reserve(size_hint)?;
34+
}
35+
36+
read_to_end_uring(fd, buf).await
3437
}
3538

36-
async fn read_to_end_uring(
37-
size_hint: Option<usize>,
38-
mut fd: OwnedFd,
39-
mut buf: Vec<u8>,
40-
) -> io::Result<Vec<u8>> {
39+
async fn read_to_end_uring(mut fd: OwnedFd, mut buf: Vec<u8>) -> io::Result<Vec<u8>> {
4140
let mut offset = 0;
42-
4341
let start_cap = buf.capacity();
4442

45-
// if buffer has no room and no size_hint, start with a small probe_read from 0 offset
46-
if (size_hint.is_none() || size_hint == Some(0)) && buf.capacity() - buf.len() < PROBE_SIZE {
47-
let (size_read, r_fd, r_buf) = small_probe_read(fd, buf, offset).await?;
48-
49-
if size_read == 0 {
50-
return Ok(r_buf);
51-
}
52-
53-
buf = r_buf;
54-
fd = r_fd;
55-
offset += size_read as u64;
56-
}
57-
5843
loop {
59-
if buf.len() == buf.capacity() && buf.capacity() == start_cap {
44+
if buf.len() == buf.capacity() && buf.capacity() == start_cap && buf.len() > PROBE_SIZE {
6045
// The buffer might be an exact fit. Let's read into a probe buffer
6146
// and see if it returns `Ok(0)`. If so, we've avoided an
6247
// unnecessary increasing of the capacity. But if not, append the
6348
// probe buffer to the primary buffer and let its capacity grow.
64-
let (size_read, r_fd, r_buf) = small_probe_read(fd, buf, offset).await?;
49+
let (size_read, r_fd, r_buf) = small_probe_read(fd, buf, &mut offset).await?;
6550

6651
if size_read == 0 {
6752
return Ok(r_buf);
6853
}
6954

7055
buf = r_buf;
7156
fd = r_fd;
72-
offset += size_read as u64;
7357
}
7458

7559
// buf is full, need more capacity
7660
if buf.len() == buf.capacity() {
7761
buf.try_reserve(PROBE_SIZE)?;
7862
}
7963

80-
// doesn't matter if we have a valid size_hint or not, if we do more
81-
// than 2 consecutive_short_reads, gradually increase the buffer
82-
// capacity to read more data at a time
83-
8464
// prepare the spare capacity to be read into
8565
let buf_len = usize::min(buf.spare_capacity_mut().len(), MAX_READ_SIZE);
8666

87-
// SAFETY: buf_len cannot be greater than u32::MAX because max_read_size
88-
// is u32::MAX
89-
let mut read_len = buf_len as u32;
90-
91-
loop {
92-
// read into spare capacity
93-
let (res, r_fd, r_buf) = Op::read(fd, buf, read_len, offset).await;
94-
95-
match res {
96-
Ok(0) => return Ok(r_buf),
97-
Ok(size_read) => {
98-
fd = r_fd;
99-
buf = r_buf;
100-
offset += size_read as u64;
101-
read_len -= size_read;
102-
103-
// keep reading if there's something left to be read
104-
if read_len > 0 {
105-
continue;
106-
} else {
107-
break;
108-
}
109-
}
110-
Err(e) if e.kind() == ErrorKind::Interrupted => {
111-
buf = r_buf;
112-
fd = r_fd;
67+
// buf_len cannot be greater than u32::MAX because MAX_READ_SIZE
68+
// is less than u32::MAX
69+
let read_len = u32::try_from(buf_len).expect("buf_len must always fit in u32");
11370

114-
continue;
115-
}
116-
Err(e) => return Err(e),
117-
}
71+
// read into spare capacity
72+
let (size_read, r_fd, r_buf) = op_read(fd, buf, &mut offset, read_len).await?;
73+
74+
if size_read == 0 {
75+
return Ok(r_buf);
11876
}
77+
78+
fd = r_fd;
79+
buf = r_buf;
11980
}
12081
}
12182

12283
async fn small_probe_read(
123-
mut fd: OwnedFd,
84+
fd: OwnedFd,
12485
mut buf: Vec<u8>,
125-
offset: u64,
86+
offset: &mut u64,
12687
) -> io::Result<(u32, OwnedFd, Vec<u8>)> {
88+
let read_len = PROBE_SIZE_U32;
89+
12790
let mut temp_arr = [0; PROBE_SIZE];
128-
let has_enough = buf.len() > PROBE_SIZE;
129-
130-
if has_enough {
131-
// if we have more than PROBE_SIZE bytes in the buffer already then
132-
// don't call reserve as we might potentially read 0 bytes
133-
let back_bytes_len = buf.len() - PROBE_SIZE;
134-
temp_arr.copy_from_slice(&buf[back_bytes_len..]);
135-
// We're decreasing the length of the buffer and len is greater
136-
// than PROBE_SIZE. So we can read into the discarded length
137-
buf.truncate(back_bytes_len);
138-
} else {
139-
// we don't even have PROBE_SIZE length in the buffer, we need this
140-
// reservation
141-
buf.reserve_exact(PROBE_SIZE);
142-
}
91+
// we don't call this function if buffer's length < PROBE_SIZE
92+
let back_bytes_len = buf.len() - PROBE_SIZE;
93+
94+
temp_arr.copy_from_slice(&buf[back_bytes_len..]);
95+
96+
// We're decreasing the length of the buffer and len is greater
97+
// than PROBE_SIZE. So we can read into the discarded length
98+
buf.truncate(back_bytes_len);
14399

100+
let (size_read, r_fd, mut r_buf) = op_read(fd, buf, offset, read_len).await?;
101+
// If `size_read` returns zero due to reasons such as buffer's exact fit,
102+
// then this `try_reserve` does not perform allocation.
103+
r_buf.try_reserve(PROBE_SIZE)?;
104+
r_buf.splice(back_bytes_len..back_bytes_len, temp_arr);
105+
106+
Ok((size_read, r_fd, r_buf))
107+
}
108+
109+
async fn op_read(
110+
mut fd: OwnedFd,
111+
mut buf: Vec<u8>,
112+
offset: &mut u64,
113+
mut read_len: u32,
114+
) -> io::Result<(u32, OwnedFd, Vec<u8>)> {
144115
loop {
145-
let (res, r_fd, mut r_buf) = Op::read(fd, buf, PROBE_SIZE_U32, offset).await;
116+
let (res, r_fd, r_buf) = Op::read(fd, buf, read_len, *offset).await;
146117

147118
match res {
148-
// return early if we inserted into reserved PROBE_SIZE
149-
// bytes
150-
Ok(size_read) if !has_enough => return Ok((size_read, r_fd, r_buf)),
119+
Err(e) if e.kind() == ErrorKind::Interrupted => {
120+
buf = r_buf;
121+
fd = r_fd;
122+
}
123+
Err(e) => return Err(e),
151124
Ok(size_read) => {
152-
let old_len = r_buf.len() - (size_read as usize);
125+
*offset += size_read as u64;
126+
read_len -= size_read;
153127

154-
r_buf.splice(old_len..old_len, temp_arr);
128+
if read_len == 0 || size_read == 0 {
129+
return Ok((size_read, r_fd, r_buf));
130+
}
155131

156-
return Ok((size_read, r_fd, r_buf));
157-
}
158-
Err(e) if e.kind() == ErrorKind::Interrupted => {
159132
buf = r_buf;
160133
fd = r_fd;
161-
162-
continue;
163134
}
164-
Err(e) => return Err(e),
165135
}
166136
}
167137
}

tokio/src/io/uring/read.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ impl Cancellable for Read {
3838

3939
impl Op<Read> {
4040
// Submit a request to read a FD at given length and offset into a
41-
// dynamic buffer with uinitialized memory. The read happens on unitialized
42-
// buffer and no overwiting happens.
41+
// dynamic buffer with uninitialized memory. The read happens on unitialized
42+
// buffer and no overwriting happens.
4343

4444
// SAFETY: The `len` of the amount to be read and the buffer that is passed
4545
// should have capacity > len.

tokio/tests/fs_uring_read.rs

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,17 @@
88
target_os = "linux"
99
))]
1010

11-
use futures::future::FutureExt;
11+
use futures::future::Future;
12+
use std::future::poll_fn;
1213
use std::io::Write;
14+
use std::path::PathBuf;
1315
use std::sync::mpsc;
1416
use std::task::Poll;
1517
use std::time::Duration;
16-
use std::{future::poll_fn, path::PathBuf};
1718
use tempfile::NamedTempFile;
18-
use tokio::{
19-
fs::read,
20-
runtime::{Builder, Runtime},
21-
};
19+
use tokio::fs::read;
20+
use tokio::runtime::{Builder, Runtime};
21+
use tokio_test::assert_pending;
2222
use tokio_util::task::TaskTracker;
2323

2424
fn multi_rt(n: usize) -> Box<dyn Fn() -> Runtime> {
@@ -49,34 +49,38 @@ fn rt_combinations() -> Vec<Box<dyn Fn() -> Runtime>> {
4949
#[test]
5050
fn shutdown_runtime_while_performing_io_uring_ops() {
5151
fn run(rt: Runtime) {
52-
let (tx, rx) = mpsc::channel();
5352
let (done_tx, done_rx) = mpsc::channel();
54-
5553
let (_tmp, path) = create_tmp_files(1);
54+
// keep 100 permits
55+
const N: i32 = 100;
5656
rt.spawn(async move {
5757
let path = path[0].clone();
5858

5959
// spawning a bunch of uring operations.
60-
loop {
60+
let mut futs = vec![];
61+
62+
// spawning a bunch of uring operations.
63+
for _ in 0..N {
6164
let path = path.clone();
62-
tokio::spawn(async move {
63-
let bytes = read(path).await.unwrap();
65+
let mut fut = Box::pin(read(path));
6466

65-
assert_eq!(bytes, vec![20; 1023]);
66-
});
67+
poll_fn(|cx| {
68+
assert_pending!(fut.as_mut().poll(cx));
69+
Poll::<()>::Pending
70+
})
71+
.await;
6772

68-
// Avoid busy looping.
69-
tokio::task::yield_now().await;
73+
futs.push(fut);
7074
}
75+
76+
tokio::task::yield_now().await;
7177
});
7278

7379
std::thread::spawn(move || {
74-
let rt: Runtime = rx.recv().unwrap();
7580
rt.shutdown_timeout(Duration::from_millis(300));
7681
done_tx.send(()).unwrap();
7782
});
7883

79-
tx.send(rt).unwrap();
8084
done_rx.recv().unwrap();
8185
}
8286

@@ -130,25 +134,29 @@ async fn read_small_large_files() {
130134
#[tokio::test]
131135
async fn cancel_op_future() {
132136
let (_tmp_file, path): (Vec<NamedTempFile>, Vec<PathBuf>) = create_tmp_files(1);
137+
let path = path[0].clone();
133138

134139
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
140+
135141
let handle = tokio::spawn(async move {
136-
poll_fn(|cx| {
137-
let fut = read(&path[0]);
142+
let fut = read(path.clone());
143+
tokio::pin!(fut);
138144

145+
poll_fn(move |cx| {
139146
// If io_uring is enabled (and not falling back to the thread pool),
140147
// the first poll should return Pending.
141-
let _pending = Box::pin(fut).poll_unpin(cx);
142-
143-
tx.send(()).unwrap();
148+
assert_pending!(fut.as_mut().poll(cx));
149+
tx.send(true).unwrap();
144150

145151
Poll::<()>::Pending
146152
})
147153
.await;
148154
});
149155

150156
// Wait for the first poll
151-
rx.recv().await.unwrap();
157+
158+
let val = rx.recv().await;
159+
assert!(val.unwrap());
152160

153161
handle.abort();
154162

0 commit comments

Comments
 (0)