Skip to content

Commit 2acee44

Browse files
committed
io-uring: Create op_read helper function
1 parent 72f84b7 commit 2acee44

File tree

1 file changed

+46
-45
lines changed

1 file changed

+46
-45
lines changed

tokio/src/fs/read_uring.rs

Lines changed: 46 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -39,20 +39,18 @@ async fn read_to_end_uring(
3939
mut buf: Vec<u8>,
4040
) -> io::Result<Vec<u8>> {
4141
let mut offset = 0;
42-
4342
let start_cap = buf.capacity();
4443

4544
// if buffer has no room and no size_hint, start with a small probe_read from 0 offset
4645
if (size_hint.is_none() || size_hint == Some(0)) && buf.capacity() - buf.len() < PROBE_SIZE {
47-
let (size_read, r_fd, r_buf) = small_probe_read(fd, buf, offset).await?;
46+
let (size_read, r_fd, r_buf) = small_probe_read(fd, buf, &mut offset).await?;
4847

4948
if size_read == 0 {
5049
return Ok(r_buf);
5150
}
5251

53-
buf = r_buf;
5452
fd = r_fd;
55-
offset += size_read as u64;
53+
buf = r_buf;
5654
}
5755

5856
loop {
@@ -61,7 +59,7 @@ async fn read_to_end_uring(
6159
// and see if it returns `Ok(0)`. If so, we've avoided an
6260
// unnecessary increasing of the capacity. But if not, append the
6361
// probe buffer to the primary buffer and let its capacity grow.
64-
let (size_read, r_fd, r_buf) = small_probe_read(fd, buf, offset).await?;
62+
let (size_read, r_fd, r_buf) = small_probe_read(fd, buf, &mut offset).await?;
6563

6664
if size_read == 0 {
6765
return Ok(r_buf);
@@ -88,43 +86,26 @@ async fn read_to_end_uring(
8886
// is u32::MAX
8987
let mut read_len = buf_len as u32;
9088

91-
loop {
92-
// read into spare capacity
93-
let (res, r_fd, r_buf) = Op::read(fd, buf, read_len, offset).await;
94-
95-
match res {
96-
Ok(0) => return Ok(r_buf),
97-
Ok(size_read) => {
98-
fd = r_fd;
99-
buf = r_buf;
100-
offset += size_read as u64;
101-
read_len -= size_read;
102-
103-
// keep reading if there's something left to be read
104-
if read_len > 0 {
105-
continue;
106-
} else {
107-
break;
108-
}
109-
}
110-
Err(e) if e.kind() == ErrorKind::Interrupted => {
111-
buf = r_buf;
112-
fd = r_fd;
89+
// read into spare capacity
90+
let (size_read, r_fd, r_buf) = op_read(fd, buf, &mut offset, &mut read_len).await?;
11391

114-
continue;
115-
}
116-
Err(e) => return Err(e),
117-
}
92+
if size_read == 0 {
93+
return Ok(r_buf);
11894
}
95+
96+
fd = r_fd;
97+
buf = r_buf;
11998
}
12099
}
121100

122101
async fn small_probe_read(
123-
mut fd: OwnedFd,
102+
fd: OwnedFd,
124103
mut buf: Vec<u8>,
125-
offset: u64,
104+
offset: &mut u64,
126105
) -> io::Result<(u32, OwnedFd, Vec<u8>)> {
127106
let mut temp_arr = [0; PROBE_SIZE];
107+
let mut read_len = PROBE_SIZE_U32;
108+
128109
let has_enough = buf.len() > PROBE_SIZE;
129110

130111
if has_enough {
@@ -141,27 +122,47 @@ async fn small_probe_read(
141122
buf.reserve_exact(PROBE_SIZE);
142123
}
143124

125+
let (size_read, r_fd, mut r_buf) = op_read(fd, buf, offset, &mut read_len).await?;
126+
127+
// return early if we inserted into reserved PROBE_SIZE
128+
// bytes
129+
if !has_enough {
130+
Ok((size_read, r_fd, r_buf))
131+
} else {
132+
let old_len = r_buf.len() - (size_read as usize);
133+
134+
r_buf.splice(old_len..old_len, temp_arr);
135+
136+
Ok((size_read, r_fd, r_buf))
137+
}
138+
}
139+
140+
async fn op_read(
141+
mut fd: OwnedFd,
142+
mut buf: Vec<u8>,
143+
offset: &mut u64,
144+
read_len: &mut u32,
145+
) -> io::Result<(u32, OwnedFd, Vec<u8>)> {
144146
loop {
145-
let (res, r_fd, mut r_buf) = Op::read(fd, buf, PROBE_SIZE_U32, offset).await;
147+
let (res, r_fd, r_buf) = Op::read(fd, buf, *read_len, *offset).await;
146148

147149
match res {
148-
// return early if we inserted into reserved PROBE_SIZE
149-
// bytes
150-
Ok(size_read) if !has_enough => return Ok((size_read, r_fd, r_buf)),
150+
Err(e) if e.kind() == ErrorKind::Interrupted => {
151+
buf = r_buf;
152+
fd = r_fd;
153+
}
154+
Err(e) => return Err(e),
151155
Ok(size_read) => {
152-
let old_len = r_buf.len() - (size_read as usize);
156+
*offset += size_read as u64;
157+
*read_len -= size_read;
153158

154-
r_buf.splice(old_len..old_len, temp_arr);
159+
if *read_len == 0 || size_read == 0 {
160+
return Ok((size_read, r_fd, r_buf));
161+
}
155162

156-
return Ok((size_read, r_fd, r_buf));
157-
}
158-
Err(e) if e.kind() == ErrorKind::Interrupted => {
159163
buf = r_buf;
160164
fd = r_fd;
161-
162-
continue;
163165
}
164-
Err(e) => return Err(e),
165166
}
166167
}
167168
}

0 commit comments

Comments
 (0)