Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
80c7f99
[CITE-132] Addressing review comments-Implementing future cancel changes
Charishma0249 May 17, 2022
3a42c7b
[CITE-132]Addressing review comments-implementing java future cancel job
Charishma0249 May 18, 2022
65d89c3
[CITE-132] Addressed review comments - code cleanup
Charishma0249 May 18, 2022
53fefbb
Merge branch 'develop' into story/CITE-132-1
sathish2379 Apr 22, 2024
2f0a2fc
[CITE-132] update cancel method return and updated cancel check in Sy…
sathish2379 Apr 23, 2024
3d493f8
[CITE-132] Using concurrent Hashmap for storing future
sathish2379 Apr 26, 2024
2e5c969
[CITE 132] Removed Sysout statements
sathish2379 Apr 26, 2024
10d9352
[CITE 132] removed extra spaces, tabs
sathish2379 May 3, 2024
03e5ec3
[CITE 132] spaces fixes
sathish2379 May 3, 2024
d36fdad
[CITE 132] spaces fixes
sathish2379 May 3, 2024
b80eb69
[CITE-132] refactor CheckIfInterrupted method
sathish2379 May 14, 2024
1b033b9
[CITE-132] cancelling of job is not required in getGroupItems
sathish2379 May 14, 2024
55723b7
[CITE-132] cancelling job within sync
sathish2379 May 22, 2024
706db11
[CITE-132] correcting indentations
sathish2379 May 22, 2024
c421f37
[CITE-132] Handle race condition for sync
rajvi-patel-22 Jul 25, 2025
1ff5c11
[CITE-132] Comment out code to check race condition
rajvi-patel-22 Aug 1, 2025
a64b7d3
Merge branch 'develop' into story/CITE-132-1
rajvi-patel-22 Sep 26, 2025
b780979
[CITE-132] added status to db, added logs, proper error handling
Girik1105 Jan 8, 2026
ee23997
Merge branch 'develop' into story/CITE-132-1
Girik1105 Jan 9, 2026
b76e3c6
[CITE-132] better comments
Girik1105 Jan 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
package edu.asu.diging.citesphere.core.repository.jobs;

import java.time.OffsetDateTime;
import java.util.List;
import java.util.Optional;

import org.springframework.data.domain.Pageable;
import org.springframework.data.repository.PagingAndSortingRepository;

import edu.asu.diging.citesphere.core.model.jobs.JobStatus;
import edu.asu.diging.citesphere.core.model.jobs.impl.GroupSyncJob;

public interface GroupSyncJobRepository extends PagingAndSortingRepository<GroupSyncJob, String> {

public Optional<GroupSyncJob> findFirstByGroupIdOrderByCreatedOnDesc(String groupId);

public List<GroupSyncJob> findByGroupIdIn(List<String> groupIds, Pageable page);

public long countByGroupIdIn(List<String> groupIds);

public List<GroupSyncJob> findByStatusInAndCreatedOnBefore(List<JobStatus> statuses, OffsetDateTime threshold);
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package edu.asu.diging.citesphere.core.service;

import java.util.concurrent.Future;

import edu.asu.diging.citesphere.core.exceptions.ZoteroHttpStatusException;
import edu.asu.diging.citesphere.user.IUser;

public interface IAsyncCitationProcessor {

void sync(IUser user, String groupId, long contentVersion, String collectionId) throws ZoteroHttpStatusException;
Future<String> sync(IUser user, String groupId, long contentVersion, String collectionId) throws ZoteroHttpStatusException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

public interface ICitationManager {

boolean cancel(String groupId);

List<ICitationGroup> getGroups(IUser user);

CitationResults getGroupItems(IUser user, String groupId, String collectionId, int page, String sortBy, List<String> conceptIds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
Expand All @@ -17,6 +18,7 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;

import edu.asu.diging.citesphere.core.exceptions.ZoteroHttpStatusException;
Expand All @@ -26,6 +28,7 @@
import edu.asu.diging.citesphere.core.service.IAsyncCitationProcessor;
import edu.asu.diging.citesphere.core.service.ICitationStore;
import edu.asu.diging.citesphere.core.service.jobs.ISyncJobManager;
import edu.asu.diging.citesphere.core.service.jobs.impl.JobStatusChecker;
import edu.asu.diging.citesphere.core.zotero.DeletedZoteroElements;
import edu.asu.diging.citesphere.core.zotero.IZoteroManager;
import edu.asu.diging.citesphere.core.zotero.ZoteroCollectionsResponse;
Expand Down Expand Up @@ -62,7 +65,10 @@ public class AsyncCitationProcessor implements IAsyncCitationProcessor {

@Autowired
private ISyncJobManager jobManager;


@Autowired
private JobStatusChecker jobStatusChecker;

private List<JobStatus> inactiveJobStatuses;

@PostConstruct
Expand All @@ -83,14 +89,14 @@ public void init() {
*/
@Override
@Async
public void sync(IUser user, String groupId, long contentVersion, String collectionId) throws ZoteroHttpStatusException {
public Future<String> sync(IUser user, String groupId, long contentVersion, String collectionId) throws ZoteroHttpStatusException {
GroupSyncJob prevJob = jobManager.getMostRecentJob(groupId + "");
// it's un-intuitive to test for not inactive statuses here, but it's more likely we'll add
// more activate job statuses than inactive ones, so it's less error prone to use the list that
// more active job statuses than inactive ones, so it's less error prone to use the list that
// is less likely to change.
if (prevJob != null && !inactiveJobStatuses.contains(prevJob.getStatus())) {
// there is already a job running, let's not start another one
return;
return new AsyncResult<String>(null);
}

logger.info("Starting sync for " + groupId);
Expand All @@ -101,6 +107,10 @@ public void sync(IUser user, String groupId, long contentVersion, String collect
jobRepo.save(job);
jobManager.addJob(job);

if(checkIfJobShouldBeCanceled(job, groupId)) {
return new AsyncResult<String>(job.getId());
}

// we'll retrieve the latest group version first in case there are more changes
// in between
// this way the group version can be out-dated and trigger another sync next
Expand All @@ -119,30 +129,78 @@ public void sync(IUser user, String groupId, long contentVersion, String collect
jobRepo.save(job);

AtomicInteger counter = new AtomicInteger();

if(checkIfJobShouldBeCanceled(job, groupId)) {
return new AsyncResult<String>(job.getId());
}

syncCitations(user, groupId, job, versions, counter);

if(checkIfJobShouldBeCanceled(job, groupId)) {
return new AsyncResult<String>(job.getId());
}

syncCollections(user, groupId, job, collectionVersions, groupVersion, counter);

if(checkIfJobShouldBeCanceled(job, groupId)) {
return new AsyncResult<String>(job.getId());
}

removeDeletedItems(deletedElements, job);


if(checkIfJobShouldBeCanceled(job, groupId)) {
return new AsyncResult<String>(job.getId());
}

// while this thread has been running, the group might have been updated by another thread
// so, we have to make sure there is no group with the same group id but other object id
// or we'll end up with two groups with the same group id.
Optional<ICitationGroup> group = groupRepo.findFirstByGroupId(new Long(groupId));

if (group.isPresent()) {
group.get().setContentVersion(groupVersion);
groupRepo.save((CitationGroup) group.get());
}

if(checkIfJobShouldBeCanceled(job, groupId)) {
return new AsyncResult<String>(job.getId());
}

job.setStatus(JobStatus.DONE);
job.setFinishedOn(OffsetDateTime.now());
jobRepo.save(job);

return new AsyncResult<String>(job.getId());
}

private boolean checkIfJobShouldBeCanceled(GroupSyncJob job, String groupId) {
// Check both thread interruption AND database status
boolean threadInterrupted = Thread.currentThread().isInterrupted();
boolean jobCancelled = jobStatusChecker.isJobCancelled(job.getId());

if (threadInterrupted || jobCancelled) {
setJobToCanceledState(job, groupId);
return true;
}
return false;
}

private void setJobToCanceledState(GroupSyncJob job, String groupId) {
logger.info("Aborting sync for " + groupId);
job.setStatus(JobStatus.CANCELED);
job.setFinishedOn(OffsetDateTime.now());
jobRepo.save(job);
}

private void syncCitations(IUser user, String groupId, GroupSyncJob job, Map<String, Long> versions,
AtomicInteger counter) throws ZoteroHttpStatusException {
List<String> keysToRetrieve = new ArrayList<>();
for (String key : versions.keySet()) {

if (checkIfJobShouldBeCanceled(job, groupId)) {
return;
}

Optional<ICitation> citation = citationStore.findById(key);

if (citation.isPresent()) {
Expand Down Expand Up @@ -174,6 +232,11 @@ private void syncCollections(IUser user, String groupId, GroupSyncJob job, Map<S
keys.addAll(versions.keySet());

for (String key : keys) {

if (checkIfJobShouldBeCanceled(job, groupId)) {
return; // Ensure we stop processing if the job is cancelled
}

ICitationCollection collection = collectionRepo.findByKeyAndGroupId(key, groupId);
if (collection == null || (versions.containsKey(key) && collection.getVersion() != versions.get(key))
|| collection.getContentVersion() != groupVersion) {
Expand Down Expand Up @@ -211,7 +274,8 @@ private long retrieveCitations(IUser user, String groupId, List<String> keysToRe
// wait 1 second to not send too many requests to Zotero
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
logger.error("Could not wait.", e);
Thread.currentThread().interrupt();
logger.warn("Interrupted during citation retrieval for group {}", groupId);
}
logger.debug("Retrieving: " + keysToRetrieve);
ZoteroGroupItemsResponse retrievedCitations = zoteroManager.getGroupItemsByKey(user, groupId,
Expand All @@ -226,7 +290,8 @@ private long retrieveCollections(IUser user, String groupId, List<String> keysTo
// wait 1 second to not send too many requests to Zotero
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
logger.error("Could not wait.", e);
Thread.currentThread().interrupt();
logger.warn("Interrupted during collection retrieval for group {}", groupId);
}
ZoteroCollectionsResponse response = zoteroManager.getCollectionsByKey(user, groupId,
keysToRetrieve);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -93,6 +95,8 @@

private Map<String, BiFunction<ICitation, ICitation, Integer>> sortFunctions;

Map<String, Future<String>> futureMap = new ConcurrentHashMap<>();

@PostConstruct
public void init() {
sortFunctions = new HashMap<>();
Expand Down Expand Up @@ -380,78 +384,89 @@
throw new GroupDoesNotExistException("Group " + groupId + " does not exist.");
}

@Override
public CitationResults getGroupItems(IUser user, String groupId, String collectionId, int page, String sortBy, List<String> conceptIds)
throws GroupDoesNotExistException, ZoteroHttpStatusException {

ICitationGroup group = null;
Optional<ICitationGroup> groupOptional = groupRepository.findFirstByGroupId(new Long(groupId));
if (!groupOptional.isPresent() || !groupOptional.get().getUsers().contains(user.getUsername())) {
group = zoteroManager.getGroup(user, groupId, false);
if (group != null) {
if (groupOptional.isPresent()){
group.setId(groupOptional.get().getId());
}
group.getUsers().add(user.getUsername());
groupRepository.save((CitationGroup) group);
}
} else {
group = groupOptional.get();
}

if (group == null) {
throw new GroupDoesNotExistException("There is no group with id " + groupId);
}

boolean isModified = zoteroManager.isGroupModified(user, groupId, group.getContentVersion());
CitationResults results = new CitationResults();
if (isModified) {
long previousVersion = group.getContentVersion();
// first update the group info
// if we are using a previously stored group, delete it
ICitationGroup zoteroGroup = null;
if (group.getId() != null) {
zoteroGroup = zoteroManager.getGroup(user, groupId + "", true);
zoteroGroup.setId(group.getId());
}
zoteroGroup.setUpdatedOn(OffsetDateTime.now().toString());
addUserToGroup(zoteroGroup, user);
group = groupRepository.save((CitationGroup) zoteroGroup);

// then update content
results.setNotModified(false);
asyncCitationProcessor.sync(user, group.getGroupId() + "", previousVersion, collectionId);
Future<String> future = asyncCitationProcessor.sync(user, group.getGroupId() + "", previousVersion, collectionId);
futureMap.put(groupId, future);
} else {
results.setNotModified(true);
}

List<ICitation> citations = null;
long total = 0;
if (collectionId != null && !collectionId.trim().isEmpty()) {
citations = (List<ICitation>) citationDao.findCitationsInCollection(groupId, collectionId, (page - 1) * zoteroPageSize, zoteroPageSize, conceptIds);
ICitationCollection collection = collectionManager.getCollection(user, groupId, collectionId);
if (collection != null) {
total = collection.getNumberOfItems();
} else {
total = citations.size();
}
} else {
citations = (List<ICitation>) citationDao.findCitations(groupId, (page - 1) * zoteroPageSize,
zoteroPageSize, false, conceptIds);
if (groupOptional.isPresent()) {
updateCitationGroup(user, groupId);

total = groupRepository.findFirstByGroupId(new Long(groupId)).get().getNumItems();
} else {
total = citations.size();
}
}
results.setCitations(citations != null ? citations : new ArrayList<>());
results.setTotalResults(total);
return results;

}

Check notice on line 459 in citesphere/src/main/java/edu/asu/diging/citesphere/core/service/impl/CitationManager.java

View check run for this annotation

codefactor.io / CodeFactor

citesphere/src/main/java/edu/asu/diging/citesphere/core/service/impl/CitationManager.java#L387-L459

Complex Method
@Override
public boolean cancel(String groupId) {
Future<String> future = futureMap.get(groupId);
if (future != null) {
futureMap.remove(groupId);
return future.cancel(true);
}
return false;
}

@Override
public void forceGroupItemsRefresh(IUser user, String groupId, String collectionId, int page, String sortBy) {
Optional<ICitationGroup> groupOptional = groupRepository.findFirstByGroupId(new Long(groupId));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package edu.asu.diging.citesphere.core.service.jobs.impl;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import edu.asu.diging.citesphere.core.model.jobs.JobStatus;
import edu.asu.diging.citesphere.core.model.jobs.impl.GroupSyncJob;
import edu.asu.diging.citesphere.core.repository.jobs.GroupSyncJobRepository;

/**
* Service for checking job cancellation status from database with caching.
* This reduces database load while still providing timely cancellation detection.
*/
@Service
public class JobStatusChecker {

@Autowired
private GroupSyncJobRepository jobRepo;

@Value("${_job_status_cache_ttl:5000}")
private long cacheTtl;

private Map<String, CachedJobStatus> statusCache = new ConcurrentHashMap<>();

private static class CachedJobStatus {
JobStatus status;
long timestamp;
}

/**
* Check if a job has been cancelled.
* Uses a cache with TTL to reduce database queries.
*
* @param jobId the job ID to check
* @return true if the job is cancelled, false otherwise
*/
public boolean isJobCancelled(String jobId) {
// Check cache first
CachedJobStatus cached = statusCache.get(jobId);
long now = System.currentTimeMillis();

if (cached != null && (now - cached.timestamp) < cacheTtl) {
return cached.status == JobStatus.CANCELED;
}

// Query database if cache miss or expired
Optional<GroupSyncJob> job = jobRepo.findById(jobId);
if (job.isPresent()) {
CachedJobStatus newCache = new CachedJobStatus();
newCache.status = job.get().getStatus();
newCache.timestamp = now;
statusCache.put(jobId, newCache);
return newCache.status == JobStatus.CANCELED;
}

return false;
}

/**
* Invalidate the cache entry for a job.
* This should be called when a job is explicitly cancelled to ensure
* the async thread picks up the change immediately.
*
* @param jobId the job ID to invalidate
*/
public void invalidateCache(String jobId) {
statusCache.remove(jobId);
}

/**
* Periodically clean up old cache entries to prevent memory leaks.
* Runs every 5 minutes and removes entries older than 10x the cache TTL.
*/
@Scheduled(fixedDelay = 300000) // Clean up every 5 minutes
public void cleanupOldCache() {
long threshold = System.currentTimeMillis() - (cacheTtl * 10);
statusCache.entrySet().removeIf(entry ->
entry.getValue().timestamp < threshold
);
}
}
Loading