Skip to content

Conversation

@Daksh14
Copy link
Contributor

@Daksh14 Daksh14 commented Oct 19, 2025

Motivation

We slowly use io uring everywhere here I made the simple change of supporting fs::read however it might not be as simple. Let me know if the unsafe I used is correct or not.

We currently use the blocking std::fs::MetaData to obtain file size for buffer capacity and extend the length of the vector according to the bytes read in the CQE. This implementation sounds good on paper to me.

Later we should implement an internal statx helper, in this PR or a seperate PR to make our uring implementation less painful to use. As this pr failed #7616

Lets put statx helper in different PR to avoid merging an inefficient read implementation given io uring is about being more efficient in file IO

Solution

Continue adopting io uring

strace on a tokio::fs::read after this change

io_uring_setup(256, {flags=0, sq_thread_cpu=0, sq_thread_idle=0, sq_entries=256, cq_entries=512, features=IORING_FEAT_SINGLE_MMAP|IORING_FEAT_NODROP|IORING_FEAT_SUBMIT_STABL
E|IORING_FEAT_RW_CUR_POS|IORING_FEAT_CUR_PERSONALITY|IORING_FEAT_FAST_POLL|IORING_FEAT_POLL_32BITS|IORING_FEAT_SQPOLL_NONFIXED|IORING_FEAT_EXT_ARG|IORING_FEAT_NATIVE_WORKERS
|IORING_FEAT_RSRC_TAGS|IORING_FEAT_CQE_SKIP|IORING_FEAT_LINKED_FILE|IORING_FEAT_REG_REG_RING|IORING_FEAT_RECVSEND_BUNDLE|IORING_FEAT_MIN_TIMEOUT|IORING_FEAT_RW_ATTR, sq_off=
{head=0, tail=4, ring_mask=16, ring_entries=24, flags=36, dropped=32, array=8256, user_addr=0}, cq_off={head=8, tail=12, ring_mask=20, ring_entries=28, overflow=44, cqes=64,
 flags=40, user_addr=0}}) = 9
mmap(NULL, 16384, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_POPULATE, 9, 0x10000000) = 0xfaf0bf1e2000
mmap(NULL, 9280, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_POPULATE, 9, 0) = 0xfaf0be71d000
epoll_ctl(5, EPOLL_CTL_ADD, 9, {events=EPOLLIN|EPOLLRDHUP|EPOLLET, data=0}) = 0
io_uring_enter(9, 1, 0, 0, NULL, 128)   = 1
futex(0xfaf0bf2557f0, FUTEX_WAIT_PRIVATE, 1, NULL) = 0
mmap(NULL, 2162688, PROT_NONE, MAP_PRIVATE|MAP_ANONYMOUS|MAP_STACK, -1, 0) = 0xfaf0be50d000
mprotect(0xfaf0be51d000, 2097152, PROT_READ|PROT_WRITE) = 0
rt_sigprocmask(SIG_BLOCK, ~[], [], 8)   = 0
clone3({flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, child_tid=0xfaf0be71c150, parent_
tid=0xfaf0be71c150, exit_signal=0, stack=0xfaf0be50d000, stack_size=0x20e960, tls=0xfaf0be71c7a0} => {parent_tid=[746758]}, 88) = 746758
rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
futex(0xfaf0be71c850, FUTEX_WAKE_PRIVATE, 1) = 1
io_uring_enter(9, 1, 0, 0, NULL, 128)   = 1
futex(0xfaf0bf2557f0, FUTEX_WAIT_PRIVATE, 1, NULL) = -1 EAGAIN (Resource temporarily unavailable)
close(10)                               = 0

@Darksonn Darksonn added A-tokio Area: The main tokio crate M-fs Module: tokio/fs labels Oct 19, 2025
@Darksonn Darksonn changed the title Fs read io uring fs: support io_uring with tokio::fs::read Oct 20, 2025
let mut offset = 0;

while size_read < size {
let left_to_read = (size - size_read) as u32;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let left_to_read = (size - size_read) as u32;
let left_to_read = u32::try_from(size - size_read).unwrap_or(u32::MAX);

To properly support files bigger than 4GB.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

max read size at a time is u32::MAX, we read the rest in other next iterations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future, if we know we're reading more than u32::MAX then we can batch 2 read requests to avoid extra syscalls

@Daksh14 Daksh14 force-pushed the fs_read_io_uring branch 7 times, most recently from 6237a4c to 636cfb8 Compare October 27, 2025 13:36
Copy link
Member

@ADD-SP ADD-SP left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks complicated, I will review it incrementally.

Copy link
Member

@mox692 mox692 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't checked all of the details in read_uring.rs, but left some comments I've noticed so far.

@Daksh14 Daksh14 force-pushed the fs_read_io_uring branch 2 times, most recently from e6c6ce7 to b9c3885 Compare November 2, 2025 17:12
@Daksh14 Daksh14 requested review from ADD-SP, martin-g and mox692 November 3, 2025 06:52
@Daksh14 Daksh14 force-pushed the fs_read_io_uring branch 2 times, most recently from 6261e0e to 974be76 Compare November 12, 2025 11:56
Comment on lines 73 to 79
std::thread::spawn(move || {
let rt: Runtime = rx.recv().unwrap();
rt.shutdown_timeout(Duration::from_millis(300));
done_tx.send(()).unwrap();
});

tx.send(rt).unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is equivalent to just moving the runtime.

Suggested change
std::thread::spawn(move || {
let rt: Runtime = rx.recv().unwrap();
rt.shutdown_timeout(Duration::from_millis(300));
done_tx.send(()).unwrap();
});
tx.send(rt).unwrap();
std::thread::spawn(move || {
rt.shutdown_timeout(Duration::from_millis(300));
done_tx.send(()).unwrap();
});

If you wanted an actual sleep for the loop to make some progress, then what you have now does not achieve that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Darksonn Should I sleep in the std::thread so the loop can make progress? Is that allowed or I shouldn't do it

Copy link
Member

@ADD-SP ADD-SP Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Poll the read(path).await manually to submit it to the kernel.
  2. Then increase the semaphore.
  3. Keep the spawn task pending for ever.

So that we can wait on the semaphore and then shutdown the runtime.

Copy link
Member

@ADD-SP ADD-SP Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this work for multi-thread runtime? For current thread runtime, this won't work. But I believe we can apply the similar pattern to the current thread runtime.

        rt.spawn(async move {
            let path = path[0].clone();
            let mut futs = vec![];

            // spawning a bunch of uring operations.
            for _ in 0..N {
                let path = path.clone();
                let cl = Arc::clone(&cl);
                let fut = Box::pin(read(path));
                assert_pending!(fut.poll(cx));
                futs.push(fut);
            }

            pending_forever().await;
        });

        std::thread::spawn(move || {
            rt.shutdown_timeout(Duration::from_millis(300));
            done_tx.send(()).unwrap();
        });

        done_rx.recv().unwrap();

@Daksh14 Daksh14 force-pushed the fs_read_io_uring branch 3 times, most recently from b599115 to 44f8f98 Compare November 14, 2025 19:40
Copy link
Member

@mox692 mox692 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good to me.

Comment on lines 124 to 140
Ok(size_read) => {
*offset += size_read as u64;
read_len -= size_read;

if read_len == 0 || size_read == 0 {
return Ok((size_read, r_fd, r_buf));
}

buf = r_buf;
fd = r_fd;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think this makes sense.

Suggested change
Ok(size_read) => {
*offset += size_read as u64;
read_len -= size_read;
if read_len == 0 || size_read == 0 {
return Ok((size_read, r_fd, r_buf));
}
buf = r_buf;
fd = r_fd;
}
Ok(size_read) => return Ok((size_read, r_fd, r_buf)),

The current logic means that unless the buffer fits the file exactly, we perform the final read of length zero twice:

  1. We call op_read, which performs a partial read and then a length-zero EOF read. It returns a non-zero length to the caller.
  2. Since the caller got a non-zero length, it now needs to call op_read again to check for EOF.

Checking for EOF twice does not make sense.

@Daksh14 Daksh14 force-pushed the fs_read_io_uring branch 2 times, most recently from d219de6 to a90b5c7 Compare November 29, 2025 10:20
Comment on lines 146 to 148
// If io_uring is enabled (and not falling back to the thread pool),
// the first poll should return Pending.
assert_pending!(fut.as_mut().poll(cx));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the fut.as_mut().poll() registers the waker, this assertion will be polled twice. Consider introduce noop waker here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced the contex of poll_fn with a noop waker @ADD-SP

    Reading 64 MB chunks at a time and keeping the kernel busy surpases
    std::fs::read time with unoptimized io_uring one being 1.12% fast
    Fix tests fail on allocation failure

    Doc fix and fix double subtraction of offset

    Fix typos and use assert the stopped task's first poll

    Add comments and panic on failed u32 conversion
    Check the EOF internally in the `op_read` function
    Fix test with noop waker
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

A-tokio Area: The main tokio crate M-fs Module: tokio/fs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants