Skip to content

Commit dcaf7c1

Browse files
authored
Merge pull request #2474 from scpwiki/WJ-1225-prune-expired
[WJ-1225] Add job to prune expired uploads
2 parents eab5bbe + df20534 commit dcaf7c1

File tree

12 files changed

+109
-26
lines changed

12 files changed

+109
-26
lines changed

deepwell/config.example.toml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,19 @@ max-delay-poll-secs = 360 # 6 minutes
170170
# It is merely for clearing the database of already-expired tokens.
171171
prune-session-secs = 3600 # 1 hour
172172

173+
# The period, in seconds, to prune all expired file uploads.
174+
#
175+
# Uploading files is a two step process:
176+
# 1. Upload the data, which creates a blob_pending row in the database.
177+
# 2. Reference that ID when adding a file revision.
178+
#
179+
# After this point, or if the upload is aborted, the blob_pending row
180+
# is no longer needed and can be deleted. This is mediated through the
181+
# expires_at column in that table, which is the point when we delete
182+
# the row. This also serves as the limit for how long a pending blob can
183+
# be (re)used to upload a file.
184+
prune-upload-secs = 7200 # 2 hours
185+
173186
# The period, in seconds, to prune all unused text rows.
174187
#
175188
# The text table deduplicates identical text objects, and

deepwell/src/config/file.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ struct Job {
123123
min_delay_poll_secs: u64,
124124
max_delay_poll_secs: u64,
125125
prune_session_secs: u64,
126+
prune_upload_secs: u64,
126127
prune_text_secs: u64,
127128
name_change_refill_secs: u64,
128129
lift_expired_punishments_secs: u64,
@@ -284,6 +285,7 @@ impl ConfigFile {
284285
min_delay_poll_secs: job_min_poll_delay_secs,
285286
max_delay_poll_secs: job_max_poll_delay_secs,
286287
prune_session_secs: job_prune_session_secs,
288+
prune_upload_secs: job_prune_upload_secs,
287289
prune_text_secs: job_prune_text_secs,
288290
name_change_refill_secs: job_name_change_refill_secs,
289291
lift_expired_punishments_secs: job_lift_expired_punishments_secs,
@@ -340,6 +342,10 @@ impl ConfigFile {
340342
job_prune_session_secs < RSMQ_DELAY_LIMIT,
341343
"Session prune job period time too long",
342344
);
345+
assert!(
346+
job_prune_upload_secs < RSMQ_DELAY_LIMIT,
347+
"Pending upload prune job period time too long",
348+
);
343349
assert!(
344350
job_prune_text_secs < RSMQ_DELAY_LIMIT,
345351
"Text prune job period time too long",
@@ -404,6 +410,7 @@ impl ConfigFile {
404410
job_min_poll_delay: StdDuration::from_secs(job_min_poll_delay_secs),
405411
job_max_poll_delay: StdDuration::from_secs(job_max_poll_delay_secs),
406412
job_prune_session: StdDuration::from_secs(job_prune_session_secs),
413+
job_prune_uploads: StdDuration::from_secs(job_prune_upload_secs),
407414
job_prune_text: StdDuration::from_secs(job_prune_text_secs),
408415
job_name_change_refill: StdDuration::from_secs(job_name_change_refill_secs),
409416
job_lift_expired_punishments: StdDuration::from_secs(

deepwell/src/config/object.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,9 @@ pub struct Config {
134134
/// How often to run the "prune expired sessions" recurring job.
135135
pub job_prune_session: StdDuration,
136136

137+
/// How often to run the "prune pending uploads" recurring job.
138+
pub job_prune_uploads: StdDuration,
139+
137140
/// How often to run the "prune unused text" recurring job.
138141
pub job_prune_text: StdDuration,
139142

deepwell/src/redis.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919
*/
2020

2121
use crate::services::job::{
22-
JOB_QUEUE_DELAY, JOB_QUEUE_MAXIMUM_SIZE, JOB_QUEUE_NAME, JOB_QUEUE_PROCESS_TIME,
22+
Job, JobService, JOB_QUEUE_DELAY, JOB_QUEUE_MAXIMUM_SIZE, JOB_QUEUE_NAME,
23+
JOB_QUEUE_PROCESS_TIME,
2324
};
2425
use anyhow::Result;
2526
use redis::aio::MultiplexedConnection;
@@ -51,6 +52,17 @@ pub async fn connect(redis_uri: &str) -> Result<(MultiplexedConnection, Rsmq)> {
5152
JOB_QUEUE_MAXIMUM_SIZE,
5253
)
5354
.await?;
55+
56+
// Then add initial repeating jobs
57+
macro_rules! queue_job {
58+
($job_case:ident) => {
59+
JobService::queue_job_inner(&mut rsmq, &Job::$job_case, None).await?
60+
};
61+
}
62+
63+
queue_job!(PruneSessions);
64+
queue_job!(PrunePendingUploads);
65+
queue_job!(PruneText);
5466
}
5567

5668
Ok((connection, rsmq))

deepwell/src/services/blob/service.rs

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,44 @@ impl BlobService {
451451
Ok(output)
452452
}
453453

454+
// Prune operations
455+
456+
/// Deletes all expired pending blobs from the database and S3.
457+
pub async fn prune(ctx: &ServiceContext<'_>) -> Result<()> {
458+
let txn = ctx.transaction();
459+
let bucket = ctx.s3_files_bucket();
460+
info!("Pruning expired pending blobs from database and S3");
461+
462+
// Fetch all expired pending blobs
463+
let pending_blobs = BlobPending::find()
464+
.select_only()
465+
.column(blob_pending::Column::ExternalId)
466+
.column(blob_pending::Column::S3Path)
467+
.filter(blob_pending::Column::ExpiresAt.lte(now()))
468+
.into_tuple::<(String, String)>()
469+
.all(txn)
470+
.await?;
471+
472+
// Delete from the S3 bucket
473+
for (_, s3_path) in &pending_blobs {
474+
// Only try to delete if the object exists,
475+
// ignore missing objects.
476+
if Self::exists(ctx, s3_path).await? {
477+
bucket.delete_object(&s3_path).await?;
478+
}
479+
}
480+
481+
// Delete from the database
482+
let blob_ids = pending_blobs.into_iter().map(|(id, _)| id);
483+
484+
BlobPending::delete_many()
485+
.filter(blob_pending::Column::ExternalId.is_in(blob_ids))
486+
.exec(txn)
487+
.await?;
488+
489+
Ok(())
490+
}
491+
454492
// Hard-deletion operations
455493

456494
/// Does a dry run on a blob hard deletion, showing what would have been changed.
@@ -872,20 +910,6 @@ impl BlobService {
872910
find_or_error!(Self::get_metadata_optional(ctx, hash), Blob)
873911
}
874912

875-
#[allow(dead_code)] // TEMP
876-
pub async fn exists(ctx: &ServiceContext<'_>, hash: &[u8]) -> Result<bool> {
877-
// Special handling for the empty blob
878-
if hash == EMPTY_BLOB_HASH {
879-
debug!("Checking existence of the empty blob");
880-
return Ok(true);
881-
}
882-
883-
// Fetch existence from S3
884-
let hex_hash = blob_hash_to_hex(hash);
885-
let result = Self::head(ctx, &hex_hash).await?;
886-
Ok(result.is_some())
887-
}
888-
889913
/// Possibly retrieve blob contents, if a flag is set.
890914
///
891915
/// This utility conditionally retrieves the
@@ -922,6 +946,11 @@ impl BlobService {
922946
}
923947
}
924948

949+
async fn exists(ctx: &ServiceContext<'_>, path: &str) -> Result<bool> {
950+
let head = Self::head(ctx, path).await?;
951+
Ok(head.is_some())
952+
}
953+
925954
pub async fn hard_delete(ctx: &ServiceContext<'_>, hash: &[u8]) -> Result<()> {
926955
// Special handling for empty blobs
927956
//

deepwell/src/services/caddy/test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ fn build_config(main_domain: &str, files_domain: &str) -> Config {
7171
job_min_poll_delay: StdDuration::from_secs(0),
7272
job_max_poll_delay: StdDuration::from_secs(0),
7373
job_prune_session: StdDuration::from_secs(0),
74+
job_prune_uploads: StdDuration::from_secs(0),
7475
job_prune_text: StdDuration::from_secs(0),
7576
job_name_change_refill: StdDuration::from_secs(0),
7677
job_lift_expired_punishments: StdDuration::from_secs(0),

deepwell/src/services/job/service.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
*/
2020

2121
use super::prelude::*;
22-
use rsmq_async::RsmqConnection;
22+
use rsmq_async::{Rsmq, RsmqConnection};
2323
use std::time::Duration;
2424

2525
pub const JOB_QUEUE_NAME: &str = "job";
@@ -57,13 +57,19 @@ impl JobService {
5757
ctx: &ServiceContext<'_>,
5858
job: &Job,
5959
delay: Option<Duration>,
60+
) -> Result<()> {
61+
let mut rsmq = ctx.rsmq();
62+
Self::queue_job_inner(&mut rsmq, job, delay).await
63+
}
64+
65+
pub async fn queue_job_inner(
66+
rsmq: &mut Rsmq,
67+
job: &Job,
68+
delay: Option<Duration>,
6069
) -> Result<()> {
6170
info!("Queuing job {job:?} (delay {delay:?})");
6271
let payload = serde_json::to_vec(job)?;
63-
ctx.rsmq()
64-
.send_message(JOB_QUEUE_NAME, payload, delay)
65-
.await?;
66-
72+
rsmq.send_message(JOB_QUEUE_NAME, payload, delay).await?;
6773
Ok(())
6874
}
6975

deepwell/src/services/job/structs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ pub enum Job {
2727
depth: u32,
2828
},
2929
PruneSessions,
30+
PrunePendingUploads,
3031
PruneText,
31-
// TODO add job for pruning incomplete uploads (pending_blob table and corresponding columns for foreign keys)
3232
NameChangeRefill,
3333
LiftExpiredPunishments,
3434
}

deepwell/src/services/job/worker.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
2323
use super::prelude::*;
2424
use crate::api::ServerState;
25-
use crate::services::{PageRevisionService, SessionService, TextService, UserService};
25+
use crate::services::{
26+
BlobService, PageRevisionService, SessionService, TextService, UserService,
27+
};
2628
use crate::utils::debug_pointer;
2729
use rsmq_async::{Rsmq, RsmqConnection, RsmqMessage};
2830
use sea_orm::TransactionTrait;
@@ -194,6 +196,14 @@ impl JobWorker {
194196
delay: Some(self.state.config.job_prune_session),
195197
}
196198
}
199+
Job::PrunePendingUploads => {
200+
debug!("Pruning all expired pending uploads from database and S3");
201+
BlobService::prune(ctx).await?;
202+
NextJob::Next {
203+
job: Job::PrunePendingUploads,
204+
delay: Some(self.state.config.job_prune_uploads),
205+
}
206+
}
197207
Job::PruneText => {
198208
debug!("Pruning all unused text items from database");
199209
TextService::prune(ctx).await?;
@@ -241,7 +251,6 @@ impl JobWorker {
241251
debug!("Job execution finished, follow-up job has been produced");
242252
trace!("* Job: {job:?}");
243253
trace!("* Delay: {delay:?}");
244-
245254
JobService::queue_job(ctx, &job, delay).await?;
246255
}
247256
}

install/dev/deepwell/deepwell.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ max-attempts = 3
3535
delay-ms = 5
3636
min-delay-poll-secs = 10 # 10 seconds
3737
max-delay-poll-secs = 360 # 6 minutes
38-
prune-session-secs = 600 # 5 minutes
38+
prune-session-secs = 300 # 5 minutes
39+
prune-upload-secs = 300 # 5 minutes
3940
prune-text-secs = 86400 # 1 day
4041
name-change-refill-secs = 86400 # 1 day
4142
lift-expired-punishments-secs = 86400 # 1 day

0 commit comments

Comments
 (0)