1111use std:: collections:: HashMap ;
1212use std:: path:: { Path , PathBuf } ;
1313use std:: sync:: Arc ;
14+ use std:: time:: { Duration , SystemTime } ;
1415
1516use anyhow:: { Context , anyhow, bail} ;
1617use bytes:: BytesMut ;
@@ -152,6 +153,54 @@ impl LatestSnapshot {
152153 }
153154}
154155
156+ #[ derive( PartialEq , Eq , PartialOrd , Ord , Clone , Copy ) ]
157+ pub enum ArchivedLsn {
158+ None ,
159+ Snapshot {
160+ // Ordering is intentional: LSN takes priority over elapsed wall clock time for comparisons
161+ min_applied_lsn : Lsn ,
162+ created_at : SystemTime ,
163+ } ,
164+ }
165+
166+ impl ArchivedLsn {
167+ pub fn get_min_applied_lsn ( & self ) -> Lsn {
168+ match self {
169+ ArchivedLsn :: None => Lsn :: INVALID ,
170+ ArchivedLsn :: Snapshot {
171+ min_applied_lsn, ..
172+ } => * min_applied_lsn,
173+ }
174+ }
175+
176+ pub fn get_age ( & self ) -> Duration {
177+ match self {
178+ ArchivedLsn :: None => Duration :: MAX ,
179+ ArchivedLsn :: Snapshot { created_at, .. } => SystemTime :: now ( )
180+ . duration_since ( * created_at)
181+ . unwrap_or_default ( ) , // zero if created-at is earlier than current system time
182+ }
183+ }
184+ }
185+
186+ impl From < & LatestSnapshot > for ArchivedLsn {
187+ fn from ( latest : & LatestSnapshot ) -> Self {
188+ ArchivedLsn :: Snapshot {
189+ min_applied_lsn : latest. min_applied_lsn ,
190+ created_at : latest. created_at . into ( ) ,
191+ }
192+ }
193+ }
194+
195+ impl From < & PartitionSnapshotMetadata > for ArchivedLsn {
196+ fn from ( metadata : & PartitionSnapshotMetadata ) -> Self {
197+ ArchivedLsn :: Snapshot {
198+ min_applied_lsn : metadata. min_applied_lsn ,
199+ created_at : metadata. created_at . into ( ) ,
200+ }
201+ }
202+ }
203+
155204struct UniqueSnapshotKey {
156205 lsn : Lsn ,
157206 snapshot_id : SnapshotId ,
@@ -542,15 +591,17 @@ impl SnapshotRepository {
542591 }
543592
544593 /// Retrieve the latest known LSN to be archived to the snapshot repository.
545- /// Response of `Ok(Lsn::INVALID)` indicates no existing snapshot for the partition.
546- pub async fn get_latest_archived_lsn ( & self , partition_id : PartitionId ) -> anyhow:: Result < Lsn > {
594+ pub async fn get_latest_archived_lsn (
595+ & self ,
596+ partition_id : PartitionId ,
597+ ) -> anyhow:: Result < ArchivedLsn > {
547598 let latest_path = self . get_latest_snapshot_pointer ( partition_id) ;
548599
549600 let latest = match self . object_store . get ( & latest_path) . await {
550601 Ok ( result) => result,
551602 Err ( object_store:: Error :: NotFound { .. } ) => {
552603 debug ! ( "Latest snapshot data not found in repository" ) ;
553- return Ok ( Lsn :: INVALID ) ;
604+ return Ok ( ArchivedLsn :: None ) ;
554605 }
555606 Err ( e) => {
556607 return Err ( anyhow:: Error :: new ( e) . context ( format ! (
@@ -562,7 +613,7 @@ impl SnapshotRepository {
562613 let latest: LatestSnapshot = serde_json:: from_slice ( & latest. bytes ( ) . await ?) ?;
563614 debug ! ( partition_id = %partition_id, snapshot_id = %latest. snapshot_id, "Latest snapshot metadata: {:?}" , latest) ;
564615
565- Ok ( latest. min_applied_lsn )
616+ Ok ( ArchivedLsn :: from ( & latest) )
566617 }
567618
568619 async fn get_latest_snapshot_metadata_for_update (
0 commit comments