Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.IncorrectResultSizeDataAccessException;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import uk.ac.ucl.rits.inform.datasinks.emapstar.exceptions.MessageIgnoredException;
Expand Down Expand Up @@ -57,8 +58,15 @@ public void processWaveform(
List<Double> numericValues = interchangeValue.get();
Instant observationTime = msg.getObservationTime();
// Try to find the visit. We don't have enough information to create the visit if it doesn't already exist.
Optional<LocationVisit> inferredLocationVisit =
locationVisitRepository.findLocationVisitByLocationAndTime(observationTime, msg.getMappedLocationString());
Optional<LocationVisit> inferredLocationVisit;
try {
inferredLocationVisit = locationVisitRepository.findLocationVisitByLocationAndTime(
observationTime, msg.getMappedLocationString());
} catch (IncorrectResultSizeDataAccessException e) {
logger.error("Multiple location visits found for {} at {}; waveform stored without location visit link. Message: {}",
msg.getMappedLocationString(), observationTime, e.getMessage());
inferredLocationVisit = Optional.empty();
}
// XXX: will have to do some sanity checks here to be sure that the HL7 feed hasn't gone down.
// See issue #36, and here for discussion:
// https://github.com/SAFEHR-data/emap/blob/develop/docs/dev/features/waveform_hf_data.md#core-processor-logic-orphan-data-problem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
public record EmapRabbitMqRoute(EmapDataSourceQueue queueName, EmapDataSourceExchange exchangeName) {
public enum EmapDataSourceQueue {
/**
* The message queue from the HL7 (IDS) feed.
* The message queue derived from the HL7 ADT (IDS) feed.
* Bit of a misnomer now that there are other HL7 inputs to
* Emap (Waveform HL7s), and of course this queue never
* contained HL7 messages anyway.
*/
HL7_QUEUE("hl7Queue"),
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class LocationMapping {
5, List.of(33, 34, 35, 36));
private final Map<Integer, Integer> bayFromBed = new HashMap<>();

LocationMapping() {
public LocationMapping() {
for (var bayToBeds: bayToBeds.entrySet()) {
Integer bay = bayToBeds.getKey();
List<Integer> beds = bayToBeds.getValue();
Expand All @@ -40,7 +40,7 @@ public class LocationMapping {
}
}

String hl7AdtLocationFromCapsuleLocation(String capsuleLocation) {
public String hl7AdtLocationFromCapsuleLocation(String capsuleLocation) {
final Pattern sideroomPattern = Pattern.compile("UCHT03ICURM(\\d+)");
Matcher sideroomMatcher = sideroomPattern.matcher(capsuleLocation);
if (sideroomMatcher.find()) {
Expand Down
18 changes: 18 additions & 0 deletions waveform-generator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
<checkstyle.version>10.3.1</checkstyle.version>
<checkstyle.plugin.version>3.3.0</checkstyle.plugin.version>
<maven-source-plugin.version>3.2.1</maven-source-plugin.version>
<emap-interchange.version>2.7</emap-interchange.version>
<com.google.code.findbugs.annotations.version>3.0.1u2</com.google.code.findbugs.annotations.version>
<spring.boot.mainclass>uk.ac.ucl.rits.inform.datasources.waveform_generator.Application</spring.boot.mainclass>
<validation-api.version>2.0.1.Final</validation-api.version>
Expand All @@ -42,6 +43,17 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>uk.ac.ucl.rits.inform</groupId>
<artifactId>emap-interchange</artifactId>
<version>${emap-interchange.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
Expand All @@ -60,6 +72,12 @@
<version>1.12.0</version>
</dependency>

<dependency>
<groupId>uk.ac.ucl.rits.inform</groupId>
<artifactId>emap-utils</artifactId>
<version>2.7</version>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
@SpringBootApplication(scanBasePackages = {
"uk.ac.ucl.rits.inform.datasources.waveform_generator",
"uk.ac.ucl.rits.inform.interchange"
})
@EnableScheduling
public class Application {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package uk.ac.ucl.rits.inform.datasources.waveform_generator;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import uk.ac.ucl.rits.inform.interchange.springconfig.EmapRabbitMqRoute;

@Configuration
public class Config {
/**
* Publish synthetic ADT messages to the standard ADT queue.
* @return config bean
*/
@Bean
public EmapRabbitMqRoute getHl7DataSource() {
return new EmapRabbitMqRoute(
EmapRabbitMqRoute.EmapDataSourceQueue.HL7_QUEUE,
EmapRabbitMqRoute.EmapDataSourceExchange.DEFAULT_EXCHANGE
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import uk.ac.ucl.rits.inform.datasources.waveform.LocationMapping;
import uk.ac.ucl.rits.inform.datasources.waveform_generator.patient_model.PatientLocationModel;
import uk.ac.ucl.rits.inform.interchange.EmapOperationMessage;
import uk.ac.ucl.rits.inform.interchange.adt.AdmitPatient;
import uk.ac.ucl.rits.inform.interchange.adt.AdtMessage;
import uk.ac.ucl.rits.inform.interchange.messaging.Publisher;

import javax.annotation.PostConstruct;
import java.io.IOException;
Expand All @@ -30,6 +36,8 @@
public class Hl7Generator {
private final Logger logger = LoggerFactory.getLogger(Hl7Generator.class);

private final Publisher publisher;

@Value("${waveform.synthetic.num_patients:30}")
private int numPatients;

Expand All @@ -54,6 +62,9 @@ public class Hl7Generator {
* Where we are up to in generating data (observation time).
*/
private Instant progressDatetime;

private boolean haveInitialised = false;

/**
* @return Where we want to be up to in generating data (observation time).
* This value is used to generate data at the correct rate.
Expand All @@ -66,6 +77,8 @@ private Instant getExpectedProgressDatetime() {
ChronoUnit.NANOS);
}

private final LocationMapping locationMapping = new LocationMapping();

// system time (not observation time) when we started running
private Long monotonicStartTimeNanos = null;
@Value("${waveform.synthetic.end_datetime:#{null}}")
Expand All @@ -90,15 +103,19 @@ public void setComputedDefaults() {
} else {
progressDatetime = startDatetime;
}
// need to initialise with the fact time so ADT correlates with waveform times
patientLocationModel = new PatientLocationModel(possibleLocations, progressDatetime);
}

private final Hl7TcpClientFactory hl7TcpClientFactory;

/**
* @param hl7TcpClientFactory for sending generated messages
* @param publisher for sending synthetic ADT messages
*/
public Hl7Generator(Hl7TcpClientFactory hl7TcpClientFactory) {
public Hl7Generator(Hl7TcpClientFactory hl7TcpClientFactory, Publisher publisher) {
this.hl7TcpClientFactory = hl7TcpClientFactory;
this.publisher = publisher;
}


Expand All @@ -109,6 +126,13 @@ public Hl7Generator(Hl7TcpClientFactory hl7TcpClientFactory) {
*/
@Scheduled(fixedDelay = 1000)
public void generateMessages() throws IOException {
if (!haveInitialised) {
haveInitialised = true;
List<AdmitPatient> initialAdmits = patientLocationModel.getInitialLocations();
logger.info("First scheduled run, perform initial admits: {} messages", initialAdmits.size());
submitBatch(initialAdmits);
}

var start = Instant.now();
// The warp factor is the main mechanism used to control how much data to put in now.
// Although the scheduling interval and chunk size/count will affect what warp factor is achievable.
Expand Down Expand Up @@ -196,7 +220,6 @@ private String applyHl7Template(long samplingRate, String locationId, Instant ob
parameters.put("obsDatetime", obsDatetime);
parameters.put("messageDatetime", messageDatetimeStr);
parameters.put("messageId", messageId);

StringSubstitutor stringSubstitutor = new StringSubstitutor(parameters);
StringBuilder obrMsg = new StringBuilder(stringSubstitutor.replace(templateStr));
for (int obxI = 0; obxI < valuesByStreamId.size(); obxI++) {
Expand Down Expand Up @@ -278,11 +301,13 @@ private List<String> makeSyntheticWaveformMsgs(final String locationId,
"UCHT03ICURM01", "UCHT03ICURM02", "UCHT03ICURM03", "UCHT03ICURM04", "UCHT03ICURM05", "UCHT03ICURM06",
"UCHT03ICURM07", "UCHT03ICURM08", "UCHT03ICURM09", "UCHT03ICURM10", "UCHT03ICURM32",
"UCHT03ICUBED11", "UCHT03ICUBED12", "UCHT03ICUBED14", "UCHT03ICUBED15", "UCHT03ICUBED16", "UCHT03ICUBED17",
"UCHT03ICUBED18", "UCHT03ICUBED19", "UCHT03ICUBED20", "UCHT03ICUBED21", "UCHT03ICUBED23", "UCHT03ICUBED24",
"UCHT03ICUBED25", "UCHT03ICUBED26", "UCHT03ICUBED27", "UCHT03ICUBED28", "UCHT03ICUBED29", "UCHT03ICUBED30",
"UCHT03ICUBED31", "UCHT03ICUBED32", "UCHT03ICUBED33", "UCHT03ICUBED34", "UCHT03ICUBED35", "UCHT03ICUBED36"
"UCHT03ICUBED18", "UCHT03ICUBED19", "UCHT03ICUBED20", "UCHT03ICUBED21", "UCHT03ICUBED22", "UCHT03ICUBED23",
"UCHT03ICUBED24", "UCHT03ICUBED25", "UCHT03ICUBED26", "UCHT03ICUBED27", "UCHT03ICUBED28", "UCHT03ICUBED29",
"UCHT03ICUBED30", "UCHT03ICUBED31", "UCHT03ICUBED33", "UCHT03ICUBED34", "UCHT03ICUBED35", "UCHT03ICUBED36"
);

private PatientLocationModel patientLocationModel = null;

/**
* Generate synthetic waveform data for numPatients patients to cover a period of
* numMillis milliseconds.
Expand All @@ -299,8 +324,16 @@ public List<String> makeSyntheticWaveformMsgsAllPatients(
new SyntheticStream("52912", 50, 0.3, 5), // airway volume
new SyntheticStream("27", 300, 1.2, 10) // ECG
);
List<AdtMessage> locationChangeMessages = patientLocationModel.makeModifications(startTime);
submitBatch(locationChangeMessages);

List<String> empties = new ArrayList<>();
for (int p = 0; p < numPatients; p++) {
var location = possibleLocations.get(p);
String location = possibleLocations.get(p);
if (patientLocationModel.getPatientForLocation(location) == null) {
empties.add(location);
continue;
}
int sizeBefore = waveformMsgs.size();
// each bed has a slightly different frequency
double frequencyFactor = 0.95 + 0.1 * p / possibleLocations.size();
Expand All @@ -317,12 +350,34 @@ public List<String> makeSyntheticWaveformMsgsAllPatients(
stream.baselineSignalFrequency * frequencyFactor, numMillis, startTime, stream.maxSamplesPerMessage));
}
int sizeAfter = waveformMsgs.size();
logger.debug("Patient {} (location {}), generated {} messages", p, location, sizeAfter - sizeBefore);
logger.debug("Patient {} (location {}), generated {} messages (incl ADT)", p, location, sizeAfter - sizeBefore);
}
logger.info("Not generating data for empty locations: {}", empties);

return waveformMsgs;
}

private void submitBatch(List<? extends AdtMessage> adtMsgs) {
List<ImmutablePair<EmapOperationMessage, String>> batch = new ArrayList<>();
int i = 0;
for (var adt: adtMsgs) {
i++;
batch.add(new ImmutablePair<>(adt, String.format("%s_message_%d",
Instant.now().toEpochMilli(), i)));
}
if (batch.isEmpty()) {
return;
}
try {
String batchId = batch.get(0).getRight();
publisher.submit(batch, batchId, () -> {
logger.info("Successfully submitted batch {} (size {})", batchId, batch.size());
});
} catch (InterruptedException e) {
logger.error("submit interrupted", e);
}
}

record SyntheticStream(String streamId, int samplingRate, double baselineSignalFrequency, int maxSamplesPerMessage) {
}

Expand Down
Loading
Loading