diff --git a/citesphere/src/main/java/edu/asu/diging/citesphere/core/repository/jobs/GroupSyncJobRepository.java b/citesphere/src/main/java/edu/asu/diging/citesphere/core/repository/jobs/GroupSyncJobRepository.java index 77f266f21..8d7bdf1f5 100644 --- a/citesphere/src/main/java/edu/asu/diging/citesphere/core/repository/jobs/GroupSyncJobRepository.java +++ b/citesphere/src/main/java/edu/asu/diging/citesphere/core/repository/jobs/GroupSyncJobRepository.java @@ -1,18 +1,22 @@ package edu.asu.diging.citesphere.core.repository.jobs; +import java.time.OffsetDateTime; import java.util.List; import java.util.Optional; import org.springframework.data.domain.Pageable; import org.springframework.data.repository.PagingAndSortingRepository; +import edu.asu.diging.citesphere.core.model.jobs.JobStatus; import edu.asu.diging.citesphere.core.model.jobs.impl.GroupSyncJob; public interface GroupSyncJobRepository extends PagingAndSortingRepository { public Optional findFirstByGroupIdOrderByCreatedOnDesc(String groupId); - + public List findByGroupIdIn(List groupIds, Pageable page); - + public long countByGroupIdIn(List groupIds); + + public List findByStatusInAndCreatedOnBefore(List statuses, OffsetDateTime threshold); } diff --git a/citesphere/src/main/java/edu/asu/diging/citesphere/core/service/IAsyncCitationProcessor.java b/citesphere/src/main/java/edu/asu/diging/citesphere/core/service/IAsyncCitationProcessor.java index d1782688c..130aaa3f3 100644 --- a/citesphere/src/main/java/edu/asu/diging/citesphere/core/service/IAsyncCitationProcessor.java +++ b/citesphere/src/main/java/edu/asu/diging/citesphere/core/service/IAsyncCitationProcessor.java @@ -1,10 +1,12 @@ package edu.asu.diging.citesphere.core.service; +import java.util.concurrent.Future; + import edu.asu.diging.citesphere.core.exceptions.ZoteroHttpStatusException; import edu.asu.diging.citesphere.user.IUser; public interface IAsyncCitationProcessor { - void sync(IUser user, String groupId, long contentVersion, String collectionId) throws ZoteroHttpStatusException; + Future sync(IUser user, String groupId, long contentVersion, String collectionId) throws ZoteroHttpStatusException; } \ No newline at end of file diff --git a/citesphere/src/main/java/edu/asu/diging/citesphere/core/service/ICitationManager.java b/citesphere/src/main/java/edu/asu/diging/citesphere/core/service/ICitationManager.java index e434356ad..2022f8f9d 100644 --- a/citesphere/src/main/java/edu/asu/diging/citesphere/core/service/ICitationManager.java +++ b/citesphere/src/main/java/edu/asu/diging/citesphere/core/service/ICitationManager.java @@ -29,6 +29,8 @@ public interface ICitationManager { + boolean cancel(String groupId); + List getGroups(IUser user); CitationResults getGroupItems(IUser user, String groupId, String collectionId, int page, String sortBy, List conceptIds) diff --git a/citesphere/src/main/java/edu/asu/diging/citesphere/core/service/impl/AsyncCitationProcessor.java b/citesphere/src/main/java/edu/asu/diging/citesphere/core/service/impl/AsyncCitationProcessor.java index 998820267..f331edba1 100644 --- a/citesphere/src/main/java/edu/asu/diging/citesphere/core/service/impl/AsyncCitationProcessor.java +++ b/citesphere/src/main/java/edu/asu/diging/citesphere/core/service/impl/AsyncCitationProcessor.java @@ -7,6 +7,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -17,6 +18,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.AsyncResult; import org.springframework.stereotype.Service; import edu.asu.diging.citesphere.core.exceptions.ZoteroHttpStatusException; @@ -26,6 +28,7 @@ import edu.asu.diging.citesphere.core.service.IAsyncCitationProcessor; import edu.asu.diging.citesphere.core.service.ICitationStore; import edu.asu.diging.citesphere.core.service.jobs.ISyncJobManager; +import edu.asu.diging.citesphere.core.service.jobs.impl.JobStatusChecker; import edu.asu.diging.citesphere.core.zotero.DeletedZoteroElements; import edu.asu.diging.citesphere.core.zotero.IZoteroManager; import edu.asu.diging.citesphere.core.zotero.ZoteroCollectionsResponse; @@ -62,7 +65,10 @@ public class AsyncCitationProcessor implements IAsyncCitationProcessor { @Autowired private ISyncJobManager jobManager; - + + @Autowired + private JobStatusChecker jobStatusChecker; + private List inactiveJobStatuses; @PostConstruct @@ -83,14 +89,14 @@ public void init() { */ @Override @Async - public void sync(IUser user, String groupId, long contentVersion, String collectionId) throws ZoteroHttpStatusException { + public Future sync(IUser user, String groupId, long contentVersion, String collectionId) throws ZoteroHttpStatusException { GroupSyncJob prevJob = jobManager.getMostRecentJob(groupId + ""); // it's un-intuitive to test for not inactive statuses here, but it's more likely we'll add - // more activate job statuses than inactive ones, so it's less error prone to use the list that + // more active job statuses than inactive ones, so it's less error prone to use the list that // is less likely to change. if (prevJob != null && !inactiveJobStatuses.contains(prevJob.getStatus())) { // there is already a job running, let's not start another one - return; + return new AsyncResult(null); } logger.info("Starting sync for " + groupId); @@ -101,6 +107,10 @@ public void sync(IUser user, String groupId, long contentVersion, String collect jobRepo.save(job); jobManager.addJob(job); + if(checkIfJobShouldBeCanceled(job, groupId)) { + return new AsyncResult(job.getId()); + } + // we'll retrieve the latest group version first in case there are more changes // in between // this way the group version can be out-dated and trigger another sync next @@ -119,30 +129,78 @@ public void sync(IUser user, String groupId, long contentVersion, String collect jobRepo.save(job); AtomicInteger counter = new AtomicInteger(); + + if(checkIfJobShouldBeCanceled(job, groupId)) { + return new AsyncResult(job.getId()); + } + syncCitations(user, groupId, job, versions, counter); + + if(checkIfJobShouldBeCanceled(job, groupId)) { + return new AsyncResult(job.getId()); + } + syncCollections(user, groupId, job, collectionVersions, groupVersion, counter); + if(checkIfJobShouldBeCanceled(job, groupId)) { + return new AsyncResult(job.getId()); + } + removeDeletedItems(deletedElements, job); - + if(checkIfJobShouldBeCanceled(job, groupId)) { + return new AsyncResult(job.getId()); + } + // while this thread has been running, the group might have been updated by another thread // so, we have to make sure there is no group with the same group id but other object id // or we'll end up with two groups with the same group id. Optional group = groupRepo.findFirstByGroupId(new Long(groupId)); + if (group.isPresent()) { group.get().setContentVersion(groupVersion); groupRepo.save((CitationGroup) group.get()); } + if(checkIfJobShouldBeCanceled(job, groupId)) { + return new AsyncResult(job.getId()); + } + job.setStatus(JobStatus.DONE); job.setFinishedOn(OffsetDateTime.now()); jobRepo.save(job); + + return new AsyncResult(job.getId()); + } + + private boolean checkIfJobShouldBeCanceled(GroupSyncJob job, String groupId) { + // Check both thread interruption AND database status + boolean threadInterrupted = Thread.currentThread().isInterrupted(); + boolean jobCancelled = jobStatusChecker.isJobCancelled(job.getId()); + + if (threadInterrupted || jobCancelled) { + setJobToCanceledState(job, groupId); + return true; + } + return false; + } + + private void setJobToCanceledState(GroupSyncJob job, String groupId) { + logger.info("Aborting sync for " + groupId); + job.setStatus(JobStatus.CANCELED); + job.setFinishedOn(OffsetDateTime.now()); + jobRepo.save(job); } private void syncCitations(IUser user, String groupId, GroupSyncJob job, Map versions, AtomicInteger counter) throws ZoteroHttpStatusException { List keysToRetrieve = new ArrayList<>(); for (String key : versions.keySet()) { + + if (checkIfJobShouldBeCanceled(job, groupId)) { + return; + } + Optional citation = citationStore.findById(key); if (citation.isPresent()) { @@ -174,6 +232,11 @@ private void syncCollections(IUser user, String groupId, GroupSyncJob job, Map keysToRe // wait 1 second to not send too many requests to Zotero TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { - logger.error("Could not wait.", e); + Thread.currentThread().interrupt(); + logger.warn("Interrupted during citation retrieval for group {}", groupId); } logger.debug("Retrieving: " + keysToRetrieve); ZoteroGroupItemsResponse retrievedCitations = zoteroManager.getGroupItemsByKey(user, groupId, @@ -226,7 +290,8 @@ private long retrieveCollections(IUser user, String groupId, List keysTo // wait 1 second to not send too many requests to Zotero TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { - logger.error("Could not wait.", e); + Thread.currentThread().interrupt(); + logger.warn("Interrupted during collection retrieval for group {}", groupId); } ZoteroCollectionsResponse response = zoteroManager.getCollectionsByKey(user, groupId, keysToRetrieve); diff --git a/citesphere/src/main/java/edu/asu/diging/citesphere/core/service/impl/CitationManager.java b/citesphere/src/main/java/edu/asu/diging/citesphere/core/service/impl/CitationManager.java index b934f6e77..b31daad7b 100644 --- a/citesphere/src/main/java/edu/asu/diging/citesphere/core/service/impl/CitationManager.java +++ b/citesphere/src/main/java/edu/asu/diging/citesphere/core/service/impl/CitationManager.java @@ -7,8 +7,10 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -93,6 +95,8 @@ public class CitationManager implements ICitationManager { private Map> sortFunctions; + Map> futureMap = new ConcurrentHashMap<>(); + @PostConstruct public void init() { sortFunctions = new HashMap<>(); @@ -420,7 +424,8 @@ public CitationResults getGroupItems(IUser user, String groupId, String collecti // then update content results.setNotModified(false); - asyncCitationProcessor.sync(user, group.getGroupId() + "", previousVersion, collectionId); + Future future = asyncCitationProcessor.sync(user, group.getGroupId() + "", previousVersion, collectionId); + futureMap.put(groupId, future); } else { results.setNotModified(true); } @@ -452,6 +457,16 @@ public CitationResults getGroupItems(IUser user, String groupId, String collecti } + @Override + public boolean cancel(String groupId) { + Future future = futureMap.get(groupId); + if (future != null) { + futureMap.remove(groupId); + return future.cancel(true); + } + return false; + } + @Override public void forceGroupItemsRefresh(IUser user, String groupId, String collectionId, int page, String sortBy) { Optional groupOptional = groupRepository.findFirstByGroupId(new Long(groupId)); diff --git a/citesphere/src/main/java/edu/asu/diging/citesphere/core/service/jobs/impl/JobStatusChecker.java b/citesphere/src/main/java/edu/asu/diging/citesphere/core/service/jobs/impl/JobStatusChecker.java new file mode 100644 index 000000000..244e39175 --- /dev/null +++ b/citesphere/src/main/java/edu/asu/diging/citesphere/core/service/jobs/impl/JobStatusChecker.java @@ -0,0 +1,87 @@ +package edu.asu.diging.citesphere.core.service.jobs.impl; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; + +import edu.asu.diging.citesphere.core.model.jobs.JobStatus; +import edu.asu.diging.citesphere.core.model.jobs.impl.GroupSyncJob; +import edu.asu.diging.citesphere.core.repository.jobs.GroupSyncJobRepository; + +/** + * Service for checking job cancellation status from database with caching. + * This reduces database load while still providing timely cancellation detection. + */ +@Service +public class JobStatusChecker { + + @Autowired + private GroupSyncJobRepository jobRepo; + + @Value("${_job_status_cache_ttl:5000}") + private long cacheTtl; + + private Map statusCache = new ConcurrentHashMap<>(); + + private static class CachedJobStatus { + JobStatus status; + long timestamp; + } + + /** + * Check if a job has been cancelled. + * Uses a cache with TTL to reduce database queries. + * + * @param jobId the job ID to check + * @return true if the job is cancelled, false otherwise + */ + public boolean isJobCancelled(String jobId) { + // Check cache first + CachedJobStatus cached = statusCache.get(jobId); + long now = System.currentTimeMillis(); + + if (cached != null && (now - cached.timestamp) < cacheTtl) { + return cached.status == JobStatus.CANCELED; + } + + // Query database if cache miss or expired + Optional job = jobRepo.findById(jobId); + if (job.isPresent()) { + CachedJobStatus newCache = new CachedJobStatus(); + newCache.status = job.get().getStatus(); + newCache.timestamp = now; + statusCache.put(jobId, newCache); + return newCache.status == JobStatus.CANCELED; + } + + return false; + } + + /** + * Invalidate the cache entry for a job. + * This should be called when a job is explicitly cancelled to ensure + * the async thread picks up the change immediately. + * + * @param jobId the job ID to invalidate + */ + public void invalidateCache(String jobId) { + statusCache.remove(jobId); + } + + /** + * Periodically clean up old cache entries to prevent memory leaks. + * Runs every 5 minutes and removes entries older than 10x the cache TTL. + */ + @Scheduled(fixedDelay = 300000) // Clean up every 5 minutes + public void cleanupOldCache() { + long threshold = System.currentTimeMillis() - (cacheTtl * 10); + statusCache.entrySet().removeIf(entry -> + entry.getValue().timestamp < threshold + ); + } +} diff --git a/citesphere/src/main/java/edu/asu/diging/citesphere/core/service/jobs/impl/SyncJobManager.java b/citesphere/src/main/java/edu/asu/diging/citesphere/core/service/jobs/impl/SyncJobManager.java index fa4fb77df..ac9119406 100644 --- a/citesphere/src/main/java/edu/asu/diging/citesphere/core/service/jobs/impl/SyncJobManager.java +++ b/citesphere/src/main/java/edu/asu/diging/citesphere/core/service/jobs/impl/SyncJobManager.java @@ -1,6 +1,7 @@ package edu.asu.diging.citesphere.core.service.jobs.impl; import java.time.OffsetDateTime; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; @@ -9,11 +10,12 @@ import javax.annotation.PostConstruct; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.Pageable; import org.springframework.stereotype.Service; -import edu.asu.diging.citesphere.core.model.jobs.IJob; import edu.asu.diging.citesphere.core.model.jobs.JobStatus; import edu.asu.diging.citesphere.core.model.jobs.impl.GroupSyncJob; import edu.asu.diging.citesphere.core.repository.jobs.GroupSyncJobRepository; @@ -25,30 +27,53 @@ @Service public class SyncJobManager implements ISyncJobManager { + private final Logger logger = LoggerFactory.getLogger(getClass()); + private Map currentJobs; - + @Autowired private GroupSyncJobRepository jobRepo; - + @Autowired private ICitationManager citationManager; + + @Autowired + private JobStatusChecker jobStatusChecker; @PostConstruct public void init() { currentJobs = new ConcurrentHashMap<>(); + cleanupStalledJobs(); } - - /* (non-Javadoc) - * @see edu.asu.diging.citesphere.core.service.jobs.impl.ISyncJobManager#addJobId(edu.asu.diging.citesphere.core.model.jobs.impl.GroupSyncJob) + + /** + * Marks stalled jobs (older than 1 hour) as FAILURE on application startup. */ + private void cleanupStalledJobs() { + OffsetDateTime threshold = OffsetDateTime.now().minusHours(1); + List stalledJobs = jobRepo.findByStatusInAndCreatedOnBefore( + Arrays.asList(JobStatus.STARTED, JobStatus.PREPARED, JobStatus.SYNCING), + threshold + ); + + for (GroupSyncJob job : stalledJobs) { + logger.warn("Found stalled job {} from {}, marking as FAILURE", + job.getId(), job.getCreatedOn()); + job.setStatus(JobStatus.FAILURE); + job.setFinishedOn(OffsetDateTime.now()); + jobRepo.save(job); + } + + if (!stalledJobs.isEmpty()) { + logger.info("Cleaned up {} stalled jobs", stalledJobs.size()); + } + } + @Override public void addJob(GroupSyncJob job) { currentJobs.put(job.getGroupId(), job); } - /* (non-Javadoc) - * @see edu.asu.diging.citesphere.core.service.jobs.impl.ISyncJobManager#getMostRecentJob(java.lang.String) - */ @Override public GroupSyncJob getMostRecentJob(String groupId) { GroupSyncJob job = currentJobs.get(groupId); @@ -79,14 +104,35 @@ public long getJobsCount(IUser user) { @Override public void cancelJob(String jobId) { Optional jobOptional = jobRepo.findById(jobId); - if (jobOptional.isPresent()) { - GroupSyncJob job = currentJobs.get(jobOptional.get().getGroupId()); - if (job == null) { - job = jobOptional.get(); - } - job.setStatus(JobStatus.CANCELED); - job.setFinishedOn(OffsetDateTime.now()); - jobRepo.save(job); + if (!jobOptional.isPresent()) { + logger.warn("Cannot cancel job {}: job not found", jobId); + return; } + + GroupSyncJob job = jobOptional.get(); + + if (job.getStatus() == JobStatus.DONE || + job.getStatus() == JobStatus.CANCELED || + job.getStatus() == JobStatus.FAILURE) { + logger.info("Job {} already in terminal state: {}", jobId, job.getStatus()); + return; + } + + // Update database first, then invalidate cache, then interrupt thread + job.setStatus(JobStatus.CANCELED); + job.setFinishedOn(OffsetDateTime.now()); + jobRepo.save(job); + logger.info("Marked job {} as CANCELED in database", jobId); + + jobStatusChecker.invalidateCache(jobId); + + boolean interrupted = citationManager.cancel(job.getGroupId()); + if (interrupted) { + logger.info("Successfully interrupted thread for job {}", jobId); + } else { + logger.warn("Could not interrupt thread for job {} - will be detected on next check", jobId); + } + + currentJobs.remove(job.getGroupId()); } } diff --git a/citesphere/src/main/java/edu/asu/diging/citesphere/web/user/JobStatusController.java b/citesphere/src/main/java/edu/asu/diging/citesphere/web/user/JobStatusController.java new file mode 100644 index 000000000..c1937b0b7 --- /dev/null +++ b/citesphere/src/main/java/edu/asu/diging/citesphere/web/user/JobStatusController.java @@ -0,0 +1,53 @@ +package edu.asu.diging.citesphere.web.user; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; +import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.ResponseBody; + +import edu.asu.diging.citesphere.core.model.jobs.impl.GroupSyncJob; +import edu.asu.diging.citesphere.core.repository.jobs.GroupSyncJobRepository; + +/** + * REST controller for checking job status. + * Provides a polling endpoint for the UI to track job progress and cancellation. + */ +@Controller +public class JobStatusController { + + @Autowired + private GroupSyncJobRepository jobRepo; + + /** + * Get the current status of a job. + * Returns job status, progress, and completion time. + * + * @param jobId the job ID to check + * @return JSON response with job details, or 404. + */ + @RequestMapping(value = "/api/jobs/{jobId}/status", method = RequestMethod.GET) + @ResponseBody + public ResponseEntity> getJobStatus(@PathVariable String jobId) { + Optional job = jobRepo.findById(jobId); + if (!job.isPresent()) { + return ResponseEntity.notFound().build(); + } + + Map response = new HashMap<>(); + response.put("status", job.get().getStatus().toString()); + response.put("current", job.get().getCurrent()); + response.put("total", job.get().getTotal()); + if (job.get().getFinishedOn() != null) { + response.put("finishedOn", job.get().getFinishedOn().toString()); + } + + return ResponseEntity.ok(response); + } +} diff --git a/citesphere/src/main/resources/config.properties b/citesphere/src/main/resources/config.properties index dfd3eb1e8..4d74b6fa6 100644 --- a/citesphere/src/main/resources/config.properties +++ b/citesphere/src/main/resources/config.properties @@ -33,6 +33,9 @@ email.debug=${email.debug} # when should citesphere recheck for updates (in min) _sync_frequency=60 +# how long to cache job status (in milliseconds) - 5 seconds +_job_status_cache_ttl=5000 + # how often should an export retry when a group is syncing (tries are every 5 seconds) _max_export_tries=700