Skip to content

Commit b7ccee7

Browse files
committed
io-uring: Create op_read helper function
Fix tests fail on allocation failure Doc fix and fix double subtraction of offset Fix typos and use assert the stopped task's first poll
1 parent a5dbf4e commit b7ccee7

File tree

4 files changed

+100
-114
lines changed

4 files changed

+100
-114
lines changed

tokio/src/fs/read.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,16 @@ use std::{io, path::Path};
3030
///
3131
/// [`ErrorKind::Interrupted`]: std::io::ErrorKind::Interrupted
3232
///
33+
/// # io_uring support
34+
///
35+
/// On Linux, you can also use io_uring for executing system calls. To enable
36+
/// io_uring, you need to specify the `--cfg tokio_unstable` flag at compile time,
37+
/// enable the io-uring cargo feature, and set the `Builder::enable_io_uring`
38+
/// runtime option.
39+
///
40+
/// Support for io_uring is currently experimental, so its behavior may change
41+
/// or it may be removed in future versions.
42+
///
3343
/// # Examples
3444
///
3545
/// ```no_run

tokio/src/fs/read_uring.rs

Lines changed: 57 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ const PROBE_SIZE_U32: u32 = PROBE_SIZE as u32;
1313

1414
// Max bytes we can read using io uring submission at a time
1515
// SAFETY: cannot be higher than u32::MAX for safe cast
16-
// Set to read max 64 blocks at time
16+
// Set to read max 64 MiB at time
1717
const MAX_READ_SIZE: usize = 64 * 1024 * 1024;
1818

1919
pub(crate) async fn read_uring(path: &Path) -> io::Result<Vec<u8>> {
@@ -27,141 +27,109 @@ pub(crate) async fn read_uring(path: &Path) -> io::Result<Vec<u8>> {
2727
.expect("unexpected in-flight operation detected")
2828
.into();
2929

30-
// extra single capacity for the whole size to fit without any reallocation
31-
let buf = Vec::with_capacity(size_hint.unwrap_or(0));
30+
let mut buf = Vec::new();
3231

33-
read_to_end_uring(size_hint, fd, buf).await
32+
if let Some(size_hint) = size_hint {
33+
buf.try_reserve(size_hint)?;
34+
}
35+
36+
read_to_end_uring(fd, buf).await
3437
}
3538

36-
async fn read_to_end_uring(
37-
size_hint: Option<usize>,
38-
mut fd: OwnedFd,
39-
mut buf: Vec<u8>,
40-
) -> io::Result<Vec<u8>> {
39+
async fn read_to_end_uring(mut fd: OwnedFd, mut buf: Vec<u8>) -> io::Result<Vec<u8>> {
4140
let mut offset = 0;
42-
4341
let start_cap = buf.capacity();
4442

45-
// if buffer has no room and no size_hint, start with a small probe_read from 0 offset
46-
if (size_hint.is_none() || size_hint == Some(0)) && buf.capacity() - buf.len() < PROBE_SIZE {
47-
let (size_read, r_fd, r_buf) = small_probe_read(fd, buf, offset).await?;
48-
49-
if size_read == 0 {
50-
return Ok(r_buf);
51-
}
52-
53-
buf = r_buf;
54-
fd = r_fd;
55-
offset += size_read as u64;
56-
}
57-
5843
loop {
59-
if buf.len() == buf.capacity() && buf.capacity() == start_cap {
44+
if buf.len() == buf.capacity() && buf.capacity() == start_cap && buf.len() > PROBE_SIZE {
6045
// The buffer might be an exact fit. Let's read into a probe buffer
6146
// and see if it returns `Ok(0)`. If so, we've avoided an
6247
// unnecessary increasing of the capacity. But if not, append the
6348
// probe buffer to the primary buffer and let its capacity grow.
64-
let (size_read, r_fd, r_buf) = small_probe_read(fd, buf, offset).await?;
49+
let (size_read, r_fd, r_buf) = small_probe_read(fd, buf, &mut offset).await?;
6550

6651
if size_read == 0 {
6752
return Ok(r_buf);
6853
}
6954

7055
buf = r_buf;
7156
fd = r_fd;
72-
offset += size_read as u64;
7357
}
7458

7559
// buf is full, need more capacity
7660
if buf.len() == buf.capacity() {
7761
buf.try_reserve(PROBE_SIZE)?;
7862
}
7963

80-
// doesn't matter if we have a valid size_hint or not, if we do more
81-
// than 2 consecutive_short_reads, gradually increase the buffer
82-
// capacity to read more data at a time
83-
8464
// prepare the spare capacity to be read into
8565
let buf_len = usize::min(buf.spare_capacity_mut().len(), MAX_READ_SIZE);
8666

87-
// SAFETY: buf_len cannot be greater than u32::MAX because max_read_size
88-
// is u32::MAX
89-
let mut read_len = buf_len as u32;
90-
91-
loop {
92-
// read into spare capacity
93-
let (res, r_fd, r_buf) = Op::read(fd, buf, read_len, offset).await;
94-
95-
match res {
96-
Ok(0) => return Ok(r_buf),
97-
Ok(size_read) => {
98-
fd = r_fd;
99-
buf = r_buf;
100-
offset += size_read as u64;
101-
read_len -= size_read;
102-
103-
// keep reading if there's something left to be read
104-
if read_len > 0 {
105-
continue;
106-
} else {
107-
break;
108-
}
109-
}
110-
Err(e) if e.kind() == ErrorKind::Interrupted => {
111-
buf = r_buf;
112-
fd = r_fd;
67+
// buf_len cannot be greater than u32::MAX because MAX_READ_SIZE
68+
// is less than u32::MAX
69+
let read_len = buf_len as u32;
11370

114-
continue;
115-
}
116-
Err(e) => return Err(e),
117-
}
71+
// read into spare capacity
72+
let (size_read, r_fd, r_buf) = op_read(fd, buf, &mut offset, read_len).await?;
73+
74+
if size_read == 0 {
75+
return Ok(r_buf);
11876
}
77+
78+
fd = r_fd;
79+
buf = r_buf;
11980
}
12081
}
12182

12283
async fn small_probe_read(
123-
mut fd: OwnedFd,
84+
fd: OwnedFd,
12485
mut buf: Vec<u8>,
125-
offset: u64,
86+
offset: &mut u64,
12687
) -> io::Result<(u32, OwnedFd, Vec<u8>)> {
88+
let read_len = PROBE_SIZE_U32;
89+
12790
let mut temp_arr = [0; PROBE_SIZE];
128-
let has_enough = buf.len() > PROBE_SIZE;
129-
130-
if has_enough {
131-
// if we have more than PROBE_SIZE bytes in the buffer already then
132-
// don't call reserve as we might potentially read 0 bytes
133-
let back_bytes_len = buf.len() - PROBE_SIZE;
134-
temp_arr.copy_from_slice(&buf[back_bytes_len..]);
135-
// We're decreasing the length of the buffer and len is greater
136-
// than PROBE_SIZE. So we can read into the discarded length
137-
buf.truncate(back_bytes_len);
138-
} else {
139-
// we don't even have PROBE_SIZE length in the buffer, we need this
140-
// reservation
141-
buf.reserve_exact(PROBE_SIZE);
142-
}
91+
let back_bytes_len = buf.len() - PROBE_SIZE;
92+
93+
temp_arr.copy_from_slice(&buf[back_bytes_len..]);
94+
95+
// We're decreasing the length of the buffer and len is greater
96+
// than PROBE_SIZE. So we can read into the discarded length
97+
buf.truncate(back_bytes_len);
14398

99+
let (size_read, r_fd, mut r_buf) = op_read(fd, buf, offset, read_len).await?;
100+
101+
r_buf.try_reserve(PROBE_SIZE)?;
102+
r_buf.splice(back_bytes_len..back_bytes_len, temp_arr);
103+
104+
Ok((size_read, r_fd, r_buf))
105+
}
106+
107+
async fn op_read(
108+
mut fd: OwnedFd,
109+
mut buf: Vec<u8>,
110+
offset: &mut u64,
111+
mut read_len: u32,
112+
) -> io::Result<(u32, OwnedFd, Vec<u8>)> {
144113
loop {
145-
let (res, r_fd, mut r_buf) = Op::read(fd, buf, PROBE_SIZE_U32, offset).await;
114+
let (res, r_fd, r_buf) = Op::read(fd, buf, read_len, *offset).await;
146115

147116
match res {
148-
// return early if we inserted into reserved PROBE_SIZE
149-
// bytes
150-
Ok(size_read) if !has_enough => return Ok((size_read, r_fd, r_buf)),
117+
Err(e) if e.kind() == ErrorKind::Interrupted => {
118+
buf = r_buf;
119+
fd = r_fd;
120+
}
121+
Err(e) => return Err(e),
151122
Ok(size_read) => {
152-
let old_len = r_buf.len() - (size_read as usize);
123+
*offset += size_read as u64;
124+
read_len -= size_read;
153125

154-
r_buf.splice(old_len..old_len, temp_arr);
126+
if read_len == 0 || size_read == 0 {
127+
return Ok((size_read, r_fd, r_buf));
128+
}
155129

156-
return Ok((size_read, r_fd, r_buf));
157-
}
158-
Err(e) if e.kind() == ErrorKind::Interrupted => {
159130
buf = r_buf;
160131
fd = r_fd;
161-
162-
continue;
163132
}
164-
Err(e) => return Err(e),
165133
}
166134
}
167135
}

tokio/src/io/uring/read.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ impl Cancellable for Read {
3838

3939
impl Op<Read> {
4040
// Submit a request to read a FD at given length and offset into a
41-
// dynamic buffer with uinitialized memory. The read happens on unitialized
42-
// buffer and no overwiting happens.
41+
// dynamic buffer with uninitialized memory. The read happens on unitialized
42+
// buffer and no overwriting happens.
4343

4444
// SAFETY: The `len` of the amount to be read and the buffer that is passed
4545
// should have capacity > len.

tokio/tests/fs_uring_read.rs

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,17 @@
88
target_os = "linux"
99
))]
1010

11-
use futures::future::FutureExt;
11+
use futures::future::Future;
12+
use std::future::poll_fn;
1213
use std::io::Write;
14+
use std::path::PathBuf;
1315
use std::sync::mpsc;
1416
use std::task::Poll;
1517
use std::time::Duration;
16-
use std::{future::poll_fn, path::PathBuf};
1718
use tempfile::NamedTempFile;
18-
use tokio::{
19-
fs::read,
20-
runtime::{Builder, Runtime},
21-
};
19+
use tokio::fs::read;
20+
use tokio::runtime::{Builder, Runtime};
21+
use tokio_test::assert_pending;
2222
use tokio_util::task::TaskTracker;
2323

2424
fn multi_rt(n: usize) -> Box<dyn Fn() -> Runtime> {
@@ -49,34 +49,38 @@ fn rt_combinations() -> Vec<Box<dyn Fn() -> Runtime>> {
4949
#[test]
5050
fn shutdown_runtime_while_performing_io_uring_ops() {
5151
fn run(rt: Runtime) {
52-
let (tx, rx) = mpsc::channel();
5352
let (done_tx, done_rx) = mpsc::channel();
54-
5553
let (_tmp, path) = create_tmp_files(1);
54+
// keep 100 permits
55+
const N: i32 = 100;
5656
rt.spawn(async move {
5757
let path = path[0].clone();
5858

5959
// spawning a bunch of uring operations.
60-
loop {
60+
let mut futs = vec![];
61+
62+
// spawning a bunch of uring operations.
63+
for _ in 0..N {
6164
let path = path.clone();
62-
tokio::spawn(async move {
63-
let bytes = read(path).await.unwrap();
65+
let mut fut = Box::pin(read(path));
6466

65-
assert_eq!(bytes, vec![20; 1023]);
66-
});
67+
poll_fn(|cx| {
68+
assert_pending!(fut.as_mut().poll(cx));
69+
Poll::<()>::Pending
70+
})
71+
.await;
6772

68-
// Avoid busy looping.
69-
tokio::task::yield_now().await;
73+
futs.push(fut);
7074
}
75+
76+
tokio::task::yield_now().await;
7177
});
7278

7379
std::thread::spawn(move || {
74-
let rt: Runtime = rx.recv().unwrap();
7580
rt.shutdown_timeout(Duration::from_millis(300));
7681
done_tx.send(()).unwrap();
7782
});
7883

79-
tx.send(rt).unwrap();
8084
done_rx.recv().unwrap();
8185
}
8286

@@ -130,25 +134,29 @@ async fn read_small_large_files() {
130134
#[tokio::test]
131135
async fn cancel_op_future() {
132136
let (_tmp_file, path): (Vec<NamedTempFile>, Vec<PathBuf>) = create_tmp_files(1);
137+
let path = path[0].clone();
133138

134139
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
140+
135141
let handle = tokio::spawn(async move {
136-
poll_fn(|cx| {
137-
let fut = read(&path[0]);
142+
let fut = read(path.clone());
143+
tokio::pin!(fut);
138144

145+
poll_fn(move |cx| {
139146
// If io_uring is enabled (and not falling back to the thread pool),
140147
// the first poll should return Pending.
141-
let _pending = Box::pin(fut).poll_unpin(cx);
142-
143-
tx.send(()).unwrap();
148+
assert_pending!(fut.as_mut().poll(cx));
149+
tx.send(true).unwrap();
144150

145151
Poll::<()>::Pending
146152
})
147153
.await;
148154
});
149155

150156
// Wait for the first poll
151-
rx.recv().await.unwrap();
157+
158+
let val = rx.recv().await;
159+
assert!(val.unwrap());
152160

153161
handle.abort();
154162

0 commit comments

Comments
 (0)