Skip to content

Commit b599115

Browse files
committed
Fix tests fail on allocation failure
Doc fix and fix double subtraction of offset Fix shutdown op test
1 parent 974be76 commit b599115

File tree

3 files changed

+55
-34
lines changed

3 files changed

+55
-34
lines changed

tokio/src/fs/read.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,16 @@ use std::{io, path::Path};
3030
///
3131
/// [`ErrorKind::Interrupted`]: std::io::ErrorKind::Interrupted
3232
///
33+
/// # io_uring support
34+
///
35+
/// On Linux, you can also use io_uring for executing system calls. To enable
36+
/// io_uring, you need to specify the `--cfg tokio_unstable` flag at compile time,
37+
/// enable the io-uring cargo feature, and set the `Builder::enable_io_uring`
38+
/// runtime option.
39+
///
40+
/// Support for io_uring is currently experimental, so its behavior may change
41+
/// or it may be removed in future versions.
42+
///
3343
/// # Examples
3444
///
3545
/// ```no_run

tokio/src/fs/read_uring.rs

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,11 @@ pub(crate) async fn read_uring(path: &Path) -> io::Result<Vec<u8>> {
2727
.expect("unexpected in-flight operation detected")
2828
.into();
2929

30-
// extra single capacity for the whole size to fit without any reallocation
31-
let buf = Vec::with_capacity(size_hint.unwrap_or(0));
30+
let mut buf = Vec::new();
31+
32+
if let Some(size_hint) = size_hint {
33+
buf.try_reserve(size_hint)?;
34+
}
3235

3336
read_to_end_uring(size_hint, fd, buf).await
3437
}
@@ -67,7 +70,6 @@ async fn read_to_end_uring(
6770

6871
buf = r_buf;
6972
fd = r_fd;
70-
offset += size_read as u64;
7173
}
7274

7375
// buf is full, need more capacity
@@ -79,11 +81,11 @@ async fn read_to_end_uring(
7981
let buf_len = usize::min(buf.spare_capacity_mut().len(), MAX_READ_SIZE);
8082

8183
// buf_len cannot be greater than u32::MAX because MAX_READ_SIZE
82-
// is u32::MAX
83-
let mut read_len = buf_len as u32;
84+
// is less than u32::MAX
85+
let read_len = buf_len as u32;
8486

8587
// read into spare capacity
86-
let (size_read, r_fd, r_buf) = op_read(fd, buf, &mut offset, &mut read_len).await?;
88+
let (size_read, r_fd, r_buf) = op_read(fd, buf, &mut offset, read_len).await?;
8789

8890
if size_read == 0 {
8991
return Ok(r_buf);
@@ -99,12 +101,12 @@ async fn small_probe_read(
99101
mut buf: Vec<u8>,
100102
offset: &mut u64,
101103
) -> io::Result<(u32, OwnedFd, Vec<u8>)> {
102-
let mut read_len = PROBE_SIZE_U32;
104+
let read_len = PROBE_SIZE_U32;
103105

104106
if buf.len() < PROBE_SIZE {
105107
buf.try_reserve(PROBE_SIZE)?;
106108

107-
return op_read(fd, buf, offset, &mut read_len).await;
109+
return op_read(fd, buf, offset, read_len).await;
108110
}
109111

110112
let mut temp_arr = [0; PROBE_SIZE];
@@ -116,8 +118,9 @@ async fn small_probe_read(
116118
// than PROBE_SIZE. So we can read into the discarded length
117119
buf.truncate(back_bytes_len);
118120

119-
let (size_read, r_fd, mut r_buf) = op_read(fd, buf, offset, &mut read_len).await?;
121+
let (size_read, r_fd, mut r_buf) = op_read(fd, buf, offset, read_len).await?;
120122

123+
r_buf.try_reserve(PROBE_SIZE)?;
121124
r_buf.splice(back_bytes_len..back_bytes_len, temp_arr);
122125

123126
Ok((size_read, r_fd, r_buf))
@@ -127,10 +130,10 @@ async fn op_read(
127130
mut fd: OwnedFd,
128131
mut buf: Vec<u8>,
129132
offset: &mut u64,
130-
read_len: &mut u32,
133+
mut read_len: u32,
131134
) -> io::Result<(u32, OwnedFd, Vec<u8>)> {
132135
loop {
133-
let (res, r_fd, r_buf) = Op::read(fd, buf, *read_len, *offset).await;
136+
let (res, r_fd, r_buf) = Op::read(fd, buf, read_len, *offset).await;
134137

135138
match res {
136139
Err(e) if e.kind() == ErrorKind::Interrupted => {
@@ -140,9 +143,9 @@ async fn op_read(
140143
Err(e) => return Err(e),
141144
Ok(size_read) => {
142145
*offset += size_read as u64;
143-
*read_len -= size_read;
146+
read_len -= size_read;
144147

145-
if *read_len == 0 || size_read == 0 {
148+
if read_len == 0 || size_read == 0 {
146149
return Ok((size_read, r_fd, r_buf));
147150
}
148151

tokio/tests/fs_uring_read.rs

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,17 @@
88
target_os = "linux"
99
))]
1010

11-
use futures::future::FutureExt;
11+
use futures::future::Future;
12+
use std::future::poll_fn;
1213
use std::io::Write;
14+
use std::path::PathBuf;
1315
use std::sync::mpsc;
1416
use std::task::Poll;
1517
use std::time::Duration;
16-
use std::{future::poll_fn, path::PathBuf};
1718
use tempfile::NamedTempFile;
18-
use tokio::{
19-
fs::read,
20-
runtime::{Builder, Runtime},
21-
};
19+
use tokio::fs::read;
20+
use tokio::runtime::{Builder, Runtime};
21+
use tokio_test::assert_pending;
2222
use tokio_util::task::TaskTracker;
2323

2424
fn multi_rt(n: usize) -> Box<dyn Fn() -> Runtime> {
@@ -49,34 +49,38 @@ fn rt_combinations() -> Vec<Box<dyn Fn() -> Runtime>> {
4949
#[test]
5050
fn shutdown_runtime_while_performing_io_uring_ops() {
5151
fn run(rt: Runtime) {
52-
let (tx, rx) = mpsc::channel();
5352
let (done_tx, done_rx) = mpsc::channel();
54-
5553
let (_tmp, path) = create_tmp_files(1);
54+
// keep 100 permits
55+
const N: i32 = 100;
5656
rt.spawn(async move {
5757
let path = path[0].clone();
5858

5959
// spawning a bunch of uring operations.
60-
loop {
60+
let mut futs = vec![];
61+
62+
// spawning a bunch of uring operations.
63+
for _ in 0..N {
6164
let path = path.clone();
62-
tokio::spawn(async move {
63-
let bytes = read(path).await.unwrap();
65+
let mut fut = Box::pin(read(path));
6466

65-
assert_eq!(bytes, vec![20; 1023]);
66-
});
67+
poll_fn(|cx| {
68+
assert_pending!(fut.as_mut().poll(cx));
69+
Poll::<()>::Pending
70+
})
71+
.await;
6772

68-
// Avoid busy looping.
69-
tokio::task::yield_now().await;
73+
futs.push(fut);
7074
}
75+
76+
tokio::task::yield_now().await;
7177
});
7278

7379
std::thread::spawn(move || {
74-
let rt: Runtime = rx.recv().unwrap();
7580
rt.shutdown_timeout(Duration::from_millis(300));
7681
done_tx.send(()).unwrap();
7782
});
7883

79-
tx.send(rt).unwrap();
8084
done_rx.recv().unwrap();
8185
}
8286

@@ -130,15 +134,18 @@ async fn read_small_large_files() {
130134
#[tokio::test]
131135
async fn cancel_op_future() {
132136
let (_tmp_file, path): (Vec<NamedTempFile>, Vec<PathBuf>) = create_tmp_files(1);
137+
let path = path[0].clone();
133138

134139
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
140+
135141
let handle = tokio::spawn(async move {
136-
poll_fn(|cx| {
137-
let fut = read(&path[0]);
142+
let fut = read(path.clone());
143+
tokio::pin!(fut);
138144

145+
poll_fn(move |cx| {
139146
// If io_uring is enabled (and not falling back to the thread pool),
140147
// the first poll should return Pending.
141-
let _pending = Box::pin(fut).poll_unpin(cx);
148+
let _pending = fut.as_mut().poll(cx);
142149

143150
tx.send(()).unwrap();
144151

@@ -148,7 +155,8 @@ async fn cancel_op_future() {
148155
});
149156

150157
// Wait for the first poll
151-
rx.recv().await.unwrap();
158+
159+
let _ = rx.try_recv();
152160

153161
handle.abort();
154162

0 commit comments

Comments
 (0)