Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ package lucuma.odb.graphql

package mapping

import cats.Eq
import cats.data.Nested
import cats.effect.kernel.Temporal
import cats.syntax.all.*
import fs2.concurrent.Topic
import fs2.Stream
import grackle.Env
import grackle.Query
Expand Down Expand Up @@ -34,9 +37,13 @@ import lucuma.odb.graphql.predicate.Predicates
import lucuma.odb.instances.given
import org.tpolecat.typename.TypeName

import scala.concurrent.duration.*
import scala.reflect.ClassTag

trait SubscriptionMapping[F[_]] extends Predicates[F] {
import SubscriptionMapping.groupingSubscribe

trait SubscriptionMapping[F[_]: Temporal] extends Predicates[F] {


def topics: Topics[F]
def user: User
Expand Down Expand Up @@ -89,7 +96,7 @@ trait SubscriptionMapping[F[_]] extends Predicates[F] {
SubscriptionField("executionEventAdded", ExecutionEventAddedInput.Binding.Option): (input, child) =>
topics
.executionEvent
.subscribe(1024)
.groupingSubscribe()
.filter: e =>
e.canRead(user) &&
input.flatMap(_.programId).forall(_ === e.programId) &&
Expand All @@ -103,7 +110,7 @@ trait SubscriptionMapping[F[_]] extends Predicates[F] {
SubscriptionField("programEdit", ProgramEditInput.Binding.Option) { (input, child) =>
topics
.program
.subscribe(1024)
.groupingSubscribe()
.filter(e => e.canRead(user) && input.flatMap(_.programId).forall(_ === e.programId))
.map(e => Result(
Environment(
Expand All @@ -117,7 +124,7 @@ trait SubscriptionMapping[F[_]] extends Predicates[F] {
SubscriptionField("observationEdit", ObservationEditInput.Binding.Option) { (input, child) =>
topics
.observation
.subscribe(1024)
.groupingSubscribe()
.filter { e =>
e.canRead(user) && ((
input.flatMap(_.programId).forall(_ === e.programId) &&
Expand Down Expand Up @@ -154,7 +161,7 @@ trait SubscriptionMapping[F[_]] extends Predicates[F] {
SubscriptionField("configurationRequestEdit", ConfigurationRequestEditInput.Binding.Option) { (input, child) =>
topics
.configurationRequest
.subscribe(1024)
.groupingSubscribe()
.filter { e =>
e.canRead(user) && ((
input.flatMap(_.programId).forall(_ === e.programId)
Expand Down Expand Up @@ -190,7 +197,7 @@ trait SubscriptionMapping[F[_]] extends Predicates[F] {
SubscriptionField("targetEdit", TargetEditInput.Binding.Option) { (input, child) =>
topics
.target
.subscribe(1024)
.groupingSubscribe()
.filter { e =>
e.canRead(user) &&
input.flatMap(_.programId).forall(_ === e.programId) &&
Expand Down Expand Up @@ -226,7 +233,7 @@ trait SubscriptionMapping[F[_]] extends Predicates[F] {
SubscriptionField("groupEdit", GroupEditInput.Binding.Option) { (input, child) =>
topics
.group
.subscribe(1024)
.groupingSubscribe()
.filter { e =>
e.canRead(user) &&
input.flatMap(_.programId).forall(_ === e.programId) &&
Expand Down Expand Up @@ -269,3 +276,18 @@ trait SubscriptionMapping[F[_]] extends Predicates[F] {

}

object SubscriptionMapping:
val MaxQueued: Int = 1024
val GroupingChunkSize: Int = 10
val GroupingTimeout: FiniteDuration = 100.millis

extension [F[_]: Temporal, E: Eq](topic: Topic[F, E])
def groupingSubscribe(
maxQueued: Int = MaxQueued,
chunkSize: Int = GroupingChunkSize,
timeout: FiniteDuration = GroupingTimeout
): Stream[F, E] =
topic
.subscribe(maxQueued)
.groupWithin(chunkSize, timeout)
.flatMap(Stream.chunk(_).changes)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may not be a great idea after all. By the time the event is fired, the expected subscription query can produce different results than it would have if it had been generated immediately. For example, when a new group is created without specifying a parent index, it defaults to index 0. It may be subsequently moved. Then the group creation event fires after the GroupingTimeout expires. At that point, the index is no longer 0. So you can see a CREATED event with parent index 1 followed by an UPDATED event also at 1 when you expected CREATED at 0 followed up UPDATED showing 1.

{
  "groupEdit" : {
    "editType" : "CREATED",
    "value" : {
      "id" : "g-10c",
      "parentId" : "g-10b",
      "parentIndex" : 1
    }
  }
},
{
  "groupEdit" : {
    "editType" : "UPDATED",
    "value" : {
      "id" : "g-10c",
      "parentId" : "g-10b",
      "parentIndex" : 1
    }
  }
}

I don't know whether this is a problem for production, but it definitely makes tests unpredictable.

Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

package lucuma.odb.graphql.topic

import cats.Eq
import cats.derived.*
import cats.effect.*
import cats.effect.std.Supervisor
import cats.implicits.*
Expand All @@ -23,7 +25,7 @@ object ConfigurationRequestTopic:
programId: Program.Id,
editType: EditType,
users: List[User.Id]
) extends TopicElement
) extends TopicElement derives Eq

private val topic =
OdbTopic.define[(ConfigurationRequest.Id, Program.Id, EditType), Element](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package lucuma.odb.graphql.topic

import cats.derived.*
import cats.effect.Concurrent
import cats.effect.std.Supervisor
import cats.syntax.apply.*
Expand All @@ -27,7 +28,7 @@ object ExecutionEventAddedTopic:
visitId: Visit.Id,
eventType: ExecutionEventType,
users: List[User.Id]
) extends TopicElement
) extends TopicElement derives cats.Eq

private val topic =
OdbTopic.define[(ExecutionEvent.Id, Program.Id, Observation.Id, Visit.Id, ExecutionEventType), Element](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

package lucuma.odb.graphql.topic

import cats.Eq
import cats.derived.*
import cats.effect.*
import cats.effect.std.Supervisor
import cats.implicits.*
Expand Down Expand Up @@ -30,7 +32,7 @@ object GroupTopic:
programId: Program.Id,
editType: EditType,
users: List[User.Id]
) extends TopicElement
) extends TopicElement derives Eq

private val topic =
OdbTopic.define[(Option[Group.Id], Program.Id, EditType), Element](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

package lucuma.odb.graphql.topic

import cats.Eq
import cats.derived.*
import cats.effect.*
import cats.effect.std.Supervisor
import cats.implicits.*
Expand All @@ -29,7 +31,7 @@ object ObservationTopic:
programId: Program.Id,
editType: EditType,
users: List[User.Id]
) extends TopicElement
) extends TopicElement derives Eq

private val topic =
OdbTopic.define[(Observation.Id, Program.Id, EditType), Element](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ object OdbTopic:
): F[List[User.Id]] =
s.execute(
sql"""
select c_user_id from t_program_user where c_program_id = '#${pid.toString}' and c_user_id notnull
select c_user_id from t_program_user where c_program_id = '#${pid.toString}' and c_user_id notnull order by c_user_id
""".query(user_id)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

package lucuma.odb.graphql.topic

import cats.Eq
import cats.derived.*
import cats.effect.*
import cats.effect.std.Supervisor
import cats.implicits.*
Expand All @@ -24,8 +26,8 @@ object ProgramTopic:
case class Element(
programId: Program.Id,
editType: EditType,
users: List[User.Id],
) extends TopicElement
users: List[User.Id]
) extends TopicElement derives Eq

private val topic =
OdbTopic.define[(Program.Id, EditType), Element](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

package lucuma.odb.graphql.topic

import cats.Eq
import cats.derived.*
import cats.effect.*
import cats.effect.std.Supervisor
import cats.implicits.*
Expand Down Expand Up @@ -30,7 +32,7 @@ object TargetTopic:
programId: Program.Id,
editType: EditType,
users: List[User.Id]
) extends TopicElement
) extends TopicElement derives Eq

private val topic =
OdbTopic.define[(Target.Id, Program.Id, EditType), Element](
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Copyright (c) 2016-2023 Association of Universities for Research in Astronomy, Inc. (AURA)
// For license information see LICENSE or https://opensource.org/licenses/BSD-3-Clause

package lucuma.odb.graphql.subscription

import cats.effect.IO
import cats.syntax.show.*
import io.circe.Json
import io.circe.syntax.*
import lucuma.core.model.Observation
import lucuma.core.model.Program
import lucuma.odb.data.EditType
import lucuma.odb.graphql.query.ExecutionTestSupport
import lucuma.odb.graphql.query.ObservingModeSetupOperations

class observationEditSn extends ExecutionTestSupport with ObservingModeSetupOperations with SubscriptionUtils:
val AcquisitionTotal: Long = 219362500l
val ScienceTotal: Long = 784200000l

def subscriptionQuery(pid: Program.Id) =
s"""
subscription {
observationEdit(input: { programId: "${pid.show}" }) {
observationId
editType
value {
id
itc {
science {
selected {
exposureCount
}
}
}
execution {
digest {
acquisition {
timeEstimate {
total { microseconds }
}
}
science {
timeEstimate {
total { microseconds }
}
}
}
config {
gmosNorth {
science {
nextAtom {
observeClass
}
}
}
}
}
}
}
}
"""

def subscriptionResponse(oid: Observation.Id): Json =
Json.obj(
"observationEdit" -> Json.obj(
"observationId" -> Json.fromString(oid.show),
"editType" -> Json.fromString(EditType.Updated.tag.toUpperCase),
"value" -> Json.obj(
"id" -> oid.asJson,
"itc" -> Json.obj(
"science" -> Json.obj(
"selected" -> Json.obj(
"exposureCount" -> 6.asJson
)
)
),
"execution" -> Json.obj(
"digest" -> Json.obj(
"acquisition" -> Json.obj(
"timeEstimate" -> Json.obj(
"total" -> Json.obj(
"microseconds" -> AcquisitionTotal.asJson
)
)
),
"science" -> Json.obj(
"timeEstimate" -> Json.obj(
"total" -> Json.obj(
"microseconds" -> ScienceTotal.asJson
)
)
)
),
"config" -> Json.obj(
"gmosNorth" -> Json.obj(
"science" -> Json.obj(
"nextAtom" -> Json.obj(
"observeClass" -> "SCIENCE".asJson
)
)
)
)
)
)
)
)

def updateSn(
oid: Observation.Id
): IO[Unit] =
query(
user = pi,
query = s"""
mutation {
updateObservations(input: {
SET: {
scienceRequirements: {
spectroscopy: {
wavelength: { micrometers: 0.700000 }
resolution: 1000
exposureTimeMode: {
signalToNoise: {
value: 99
at: { nanometers: 500 }
}
}
wavelengthCoverage: { micrometers: 0.400000 }
focalPlane: null
focalPlaneAngle: null
capability: null
}
}
},
WHERE: {
id: { EQ: "$oid" }
}
}) {
observations {
scienceRequirements {
spectroscopy {
exposureTimeMode {
signalToNoise {
value
}
}
}
}
}
}
}
"""
).void

test("triggers for editing s/n"):
for
pid <- createProgram(pi, "foo")
tid <- createTargetWithProfileAs(pi, pid)
oid <- createGmosNorthLongSlitObservationAs(pi, pid, List(tid))
_ <- generateOrFail(pid, oid)
_ <- subscriptionExpect(
user = pi,
query = subscriptionQuery(pid),
mutations = Right(sleep >> updateSn(oid)),
expected = List(subscriptionResponse(oid), subscriptionResponse(oid))
)
yield ()
Loading