Skip to content

Commit e710ee1

Browse files
LParduegregor-cf
andcommitted
h3: add support for streaming HEADERS sends
Calls to send_request(), send_response(), send_response_with_priority(), and send_additional_headers() can fail with a StreamBlocked error indicating lack of underlying transport capacity. When this occurs, applications are expected to retry the operation when the stream is later reported as writable. However, certain conditions could mean that sufficient capacity might never be made available, effectively permenantly blocking header sends. The root cause of this problem was the choice to enforce that a HEADERS frame is always sent (buffered into a quiche stream) in whole. This change adds new functions related to stream management and header sending to support a streaming send design. These will produce a HEADERS frame that can be sent in whole or in part, depending on available capacity. When a frame is only partly sent, applications are notified and can resume sending using the new continue_partial_headers() method, once the stream is writable. The new functions unify how clients and servers send headers. For a client, the expected sequence is now something like: 0. Decide to initate a request 1. reserve_request_stream() - reserves a stream if limits allow 2. stream_headers() - inititates streaming of a HEADERS frame 3. continue_partial_headers() - if stream_headers() returned Error::PartialHeader For a server, the expected sequence is now something like: 0. Receive a request via the poll() function and decide to respond 1. stream_priority() - set the stream's sending priority per RFC 9218 3. stream_headers() - inititates streaming of a HEADERS frame 4. continue_partial_headers() - if stream_headers return Error::PartialHeader While headers are being streamed, other operations that would cause an HTTP/3 frame to be sent on the stream are prevented. HEADERS frames must be sent completely before other operations are successful. Applications do not need to manage the partial HEADERS buffer, this is dealt with inside quiche. Co-authored-by: Gregor Maier <[email protected]>
1 parent 56fde82 commit e710ee1

File tree

6 files changed

+1106
-119
lines changed

6 files changed

+1106
-119
lines changed

apps/src/common.rs

Lines changed: 79 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,8 @@ pub trait HttpConn {
334334
&mut self, conn: &mut quiche::Connection, target_path: &Option<String>,
335335
);
336336

337+
fn retry_partial_headers(&mut self, conn: &mut quiche::Connection);
338+
337339
fn handle_responses(
338340
&mut self, conn: &mut quiche::Connection, buf: &mut [u8],
339341
req_start: &std::time::Instant,
@@ -369,12 +371,19 @@ pub struct Http09Request {
369371
response_writer: Option<std::io::BufWriter<std::fs::File>>,
370372
}
371373

374+
enum HeadersSendStatus {
375+
NotStarted,
376+
InProgess,
377+
Completed,
378+
}
379+
372380
/// Represents an HTTP/3 formatted request.
373381
struct Http3Request {
374382
url: url::Url,
375383
cardinal: u64,
376384
stream_id: Option<u64>,
377385
hdrs: Vec<quiche::h3::Header>,
386+
hdrs_send_status: HeadersSendStatus,
378387
priority: Option<Priority>,
379388
response_hdrs: Vec<quiche::h3::Header>,
380389
response_body: Vec<u8>,
@@ -477,6 +486,8 @@ impl HttpConn for Http09Conn {
477486
self.reqs_sent += reqs_done;
478487
}
479488

489+
fn retry_partial_headers(&mut self, _conn: &mut quiche::Connection) {}
490+
480491
fn handle_responses(
481492
&mut self, conn: &mut quiche::Connection, buf: &mut [u8],
482493
req_start: &std::time::Instant,
@@ -829,6 +840,7 @@ impl Http3Conn {
829840
url: url.clone(),
830841
cardinal: i,
831842
hdrs,
843+
hdrs_send_status: HeadersSendStatus::NotStarted,
832844
priority,
833845
response_hdrs: Vec::new(),
834846
response_body: Vec::new(),
@@ -1120,19 +1132,78 @@ impl Http3Conn {
11201132
}
11211133

11221134
impl HttpConn for Http3Conn {
1135+
fn retry_partial_headers(&mut self, conn: &mut quiche::Connection) {
1136+
for req in self.reqs.iter_mut().filter(|r| {
1137+
matches!(r.hdrs_send_status, HeadersSendStatus::InProgess)
1138+
}) {
1139+
let stream_id = req
1140+
.stream_id
1141+
.expect("partial header resend but no stream ID");
1142+
1143+
debug!("retrying stream id={}", stream_id);
1144+
match self.h3_conn.continue_partial_headers(conn, stream_id) {
1145+
Ok(_) => {
1146+
req.hdrs_send_status = HeadersSendStatus::Completed;
1147+
debug!("Completed HTTP request {:?}", &req.hdrs);
1148+
self.reqs_hdrs_sent += 1;
1149+
},
1150+
1151+
Err(e) => {
1152+
error!("retry failed to send request {:?}", e);
1153+
break;
1154+
},
1155+
}
1156+
}
1157+
}
1158+
11231159
fn send_requests(
11241160
&mut self, conn: &mut quiche::Connection, target_path: &Option<String>,
11251161
) {
1126-
let mut reqs_done = 0;
1162+
// First retry partial headers
1163+
self.retry_partial_headers(conn);
1164+
1165+
// Then send new headers.
1166+
for req in self.reqs.iter_mut().filter(|r| {
1167+
matches!(r.hdrs_send_status, HeadersSendStatus::NotStarted)
1168+
}) {
1169+
let stream_id = match self.h3_conn.reserve_request_stream(conn) {
1170+
Ok(v) => v,
11271171

1128-
// First send headers.
1129-
for req in self.reqs.iter_mut().skip(self.reqs_hdrs_sent) {
1130-
let s = match self.h3_conn.send_request(
1172+
Err(quiche::h3::Error::TransportError(
1173+
quiche::Error::StreamLimit,
1174+
)) => {
1175+
debug!("not enough stream credits, retry later...");
1176+
break;
1177+
},
1178+
1179+
Err(quiche::h3::Error::StreamBlocked) => {
1180+
debug!("stream is blocked, retry later...");
1181+
break;
1182+
},
1183+
1184+
Err(e) => {
1185+
error!("failed to reserve stream {e:?}");
1186+
break;
1187+
},
1188+
};
1189+
1190+
match self.h3_conn.stream_headers(
11311191
conn,
1192+
stream_id,
11321193
&req.hdrs,
1194+
false,
11331195
self.body.is_none(),
11341196
) {
1135-
Ok(v) => v,
1197+
Ok(()) => {
1198+
req.hdrs_send_status = HeadersSendStatus::Completed;
1199+
debug!("Completed HTTP request {:?}", &req.hdrs);
1200+
self.reqs_hdrs_sent += 1;
1201+
},
1202+
1203+
Err(quiche::h3::Error::PartialHeader) => {
1204+
debug!("sent partial headers, marking for later continuation");
1205+
req.hdrs_send_status = HeadersSendStatus::InProgess;
1206+
},
11361207

11371208
Err(quiche::h3::Error::TransportError(
11381209
quiche::Error::StreamLimit,
@@ -1152,23 +1223,18 @@ impl HttpConn for Http3Conn {
11521223
},
11531224
};
11541225

1155-
debug!("Sent HTTP request {:?}", &req.hdrs);
1156-
11571226
if let Some(priority) = &req.priority {
11581227
// If sending the priority fails, don't try again.
11591228
self.h3_conn
1160-
.send_priority_update_for_request(conn, s, priority)
1229+
.send_priority_update_for_request(conn, stream_id, priority)
11611230
.ok();
11621231
}
11631232

1164-
req.stream_id = Some(s);
1233+
req.stream_id = Some(stream_id);
11651234
req.response_writer =
11661235
make_resource_writer(&req.url, target_path, req.cardinal);
1167-
self.sent_body_bytes.insert(s, 0);
1168-
1169-
reqs_done += 1;
1236+
self.sent_body_bytes.insert(stream_id, 0);
11701237
}
1171-
self.reqs_hdrs_sent += reqs_done;
11721238

11731239
// Then send any remaining body.
11741240
if let Some(body) = &self.body {

quiche/include/quiche.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1117,6 +1117,9 @@ int64_t quiche_h3_send_request(quiche_h3_conn *conn, quiche_conn *quic_conn,
11171117
const quiche_h3_header *headers, size_t headers_len,
11181118
bool fin);
11191119

1120+
// Reserves an HTTP/3 request stream.
1121+
int64_t reserve_request_stream(quiche_h3_conn *conn, quiche_conn *quic_conn);
1122+
11201123
// Sends an HTTP/3 response on the specified stream with default priority.
11211124
int quiche_h3_send_response(quiche_h3_conn *conn, quiche_conn *quic_conn,
11221125
uint64_t stream_id, const quiche_h3_header *headers,
@@ -1134,6 +1137,16 @@ int quiche_h3_send_additional_headers(quiche_h3_conn *conn,
11341137
quiche_h3_header *headers, size_t headers_len,
11351138
bool is_trailer_section, bool fin);
11361139

1140+
// Initiates streaming of a new HTTP/3 HEADERS frame.
1141+
int quiche_h3_stream_headers(quiche_h3_conn *conn,
1142+
quiche_conn *quic_conn, uint64_t stream_id,
1143+
quiche_h3_header *headers, size_t headers_len,
1144+
bool is_trailer_section, bool fin);
1145+
1146+
// Continues sending headers on the given stream.
1147+
int quiche_h3_continue_partial_headers(quiche_h3_conn *conn,
1148+
quiche_conn *quic_conn, uint64_t stream_id);
1149+
11371150
// Sends an HTTP/3 body chunk on the given stream.
11381151
ssize_t quiche_h3_send_body(quiche_h3_conn *conn, quiche_conn *quic_conn,
11391152
uint64_t stream_id, const uint8_t *body, size_t body_len,
@@ -1147,6 +1160,10 @@ ssize_t quiche_h3_recv_body(quiche_h3_conn *conn, quiche_conn *quic_conn,
11471160
int quiche_h3_send_goaway(quiche_h3_conn *conn, quiche_conn *quic_conn,
11481161
uint64_t id);
11491162

1163+
// Sets the HTTP/3 priority for a stream.
1164+
int quiche_h3_stream_priority(quiche_conn *conn, uint64_t stream_id,
1165+
quiche_h3_priority *priority);
1166+
11501167
// Try to parse an Extensible Priority field value.
11511168
int quiche_h3_parse_extensible_priority(uint8_t *priority,
11521169
size_t priority_len,

quiche/src/h3/ffi.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,37 @@ pub extern "C" fn quiche_h3_send_additional_headers(
288288
}
289289
}
290290

291+
#[no_mangle]
292+
pub extern "C" fn quiche_h3_stream_headers(
293+
conn: &mut h3::Connection, quic_conn: &mut Connection, stream_id: u64,
294+
headers: *const Header, headers_len: size_t, is_trailer_section: bool,
295+
fin: bool,
296+
) -> c_int {
297+
let headers = headers_from_ptr(headers, headers_len);
298+
299+
match conn.stream_headers(
300+
quic_conn,
301+
stream_id,
302+
&headers,
303+
is_trailer_section,
304+
fin,
305+
) {
306+
Ok(_) => 0,
307+
308+
Err(e) => e.to_c() as c_int,
309+
}
310+
}
311+
312+
#[no_mangle]
313+
pub extern "C" fn quiche_h3_continue_partial_headers(
314+
conn: &mut h3::Connection, quic_conn: &mut Connection, stream_id: u64,
315+
) -> c_int {
316+
match conn.continue_partial_headers(quic_conn, stream_id) {
317+
Ok(_) => 0,
318+
Err(e) => e.to_c() as c_int,
319+
}
320+
}
321+
291322
#[no_mangle]
292323
pub extern "C" fn quiche_h3_send_body(
293324
conn: &mut h3::Connection, quic_conn: &mut Connection, stream_id: u64,
@@ -335,6 +366,17 @@ pub extern "C" fn quiche_h3_send_goaway(
335366
}
336367
}
337368

369+
#[no_mangle]
370+
pub extern "C" fn quiche_h3_stream_priority(
371+
quic_conn: &mut Connection, id: u64, priority: &Priority,
372+
) -> c_int {
373+
match h3::Connection::stream_priority(quic_conn, id, priority) {
374+
Ok(()) => 0,
375+
376+
Err(e) => e.to_c() as c_int,
377+
}
378+
}
379+
338380
#[no_mangle]
339381
#[cfg(feature = "sfv")]
340382
pub extern "C" fn quiche_h3_parse_extensible_priority(

0 commit comments

Comments
 (0)