Skip to content

Commit ea2f64e

Browse files
committed
groupWithin?
1 parent 8e85ce8 commit ea2f64e

File tree

5 files changed

+66
-7
lines changed

5 files changed

+66
-7
lines changed

modules/service/src/main/scala/lucuma/odb/graphql/mapping/SubscriptionMapping.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package lucuma.odb.graphql
66
package mapping
77

88
import cats.data.Nested
9+
import cats.effect.kernel.Temporal
910
import cats.syntax.all.*
1011
import fs2.Stream
1112
import grackle.Env
@@ -33,10 +34,11 @@ import lucuma.odb.graphql.mapping.ResultMapping.mapSomeFields
3334
import lucuma.odb.graphql.predicate.Predicates
3435
import lucuma.odb.instances.given
3536
import org.tpolecat.typename.TypeName
37+
import scala.concurrent.duration.*
3638

3739
import scala.reflect.ClassTag
3840

39-
trait SubscriptionMapping[F[_]] extends Predicates[F] {
41+
trait SubscriptionMapping[F[_]: Temporal] extends Predicates[F] {
4042

4143
def topics: Topics[F]
4244
def user: User
@@ -90,6 +92,8 @@ trait SubscriptionMapping[F[_]] extends Predicates[F] {
9092
topics
9193
.executionEvent
9294
.subscribe(1024)
95+
.groupWithin(10, 100.millis)
96+
.flatMap(Stream.chunk(_).changes)
9397
.filter: e =>
9498
e.canRead(user) &&
9599
input.flatMap(_.programId).forall(_ === e.programId) &&
@@ -118,6 +122,8 @@ trait SubscriptionMapping[F[_]] extends Predicates[F] {
118122
topics
119123
.observation
120124
.subscribe(1024)
125+
.groupWithin(10, 100.millis)
126+
.flatMap(Stream.chunk(_).changes)
121127
.filter { e =>
122128
e.canRead(user) && ((
123129
input.flatMap(_.programId).forall(_ === e.programId) &&

modules/service/src/main/scala/lucuma/odb/graphql/topic/ExecutionEventAddedTopic.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
package lucuma.odb.graphql.topic
55

6+
import cats.derived.*
67
import cats.effect.Concurrent
78
import cats.effect.std.Supervisor
89
import cats.syntax.apply.*
@@ -27,7 +28,7 @@ object ExecutionEventAddedTopic:
2728
visitId: Visit.Id,
2829
eventType: ExecutionEventType,
2930
users: List[User.Id]
30-
) extends TopicElement
31+
) extends TopicElement derives cats.Eq
3132

3233
private val topic =
3334
OdbTopic.define[(ExecutionEvent.Id, Program.Id, Observation.Id, Visit.Id, ExecutionEventType), Element](

modules/service/src/main/scala/lucuma/odb/graphql/topic/ObservationTopic.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
package lucuma.odb.graphql.topic
55

6+
import cats.Eq
7+
import cats.derived.*
68
import cats.effect.*
79
import cats.effect.std.Supervisor
810
import cats.implicits.*
@@ -29,7 +31,7 @@ object ObservationTopic:
2931
programId: Program.Id,
3032
editType: EditType,
3133
users: List[User.Id]
32-
) extends TopicElement
34+
) extends TopicElement derives Eq
3335

3436
private val topic =
3537
OdbTopic.define[(Observation.Id, Program.Id, EditType), Element](

modules/service/src/main/scala/lucuma/odb/graphql/topic/OdbTopic.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ object OdbTopic:
3737
): F[List[User.Id]] =
3838
s.execute(
3939
sql"""
40-
select c_user_id from t_program_user where c_program_id = '#${pid.toString}' and c_user_id notnull
40+
select c_user_id from t_program_user where c_program_id = '#${pid.toString}' and c_user_id notnull order by c_user_id
4141
""".query(user_id)
4242
)
4343

modules/service/src/test/scala/lucuma/odb/graphql/subscription/observationEditSn.scala

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import lucuma.odb.graphql.query.ExecutionTestSupport
1414
import lucuma.odb.graphql.query.ObservingModeSetupOperations
1515

1616
class observationEditSn extends ExecutionTestSupport with ObservingModeSetupOperations with SubscriptionUtils:
17+
val AcquisitionTotal: Long = 219362500l
18+
val ScienceTotal: Long = 784200000l
1719

1820
def subscriptionQuery(pid: Program.Id) =
1921
s"""
@@ -23,7 +25,26 @@ class observationEditSn extends ExecutionTestSupport with ObservingModeSetupOper
2325
editType
2426
value {
2527
id
28+
itc {
29+
science {
30+
selected {
31+
exposureCount
32+
}
33+
}
34+
}
2635
execution {
36+
digest {
37+
acquisition {
38+
timeEstimate {
39+
total { microseconds }
40+
}
41+
}
42+
science {
43+
timeEstimate {
44+
total { microseconds }
45+
}
46+
}
47+
}
2748
config {
2849
gmosNorth {
2950
science {
@@ -38,6 +59,8 @@ class observationEditSn extends ExecutionTestSupport with ObservingModeSetupOper
3859
}
3960
}
4061
"""
62+
/*
63+
*/
4164

4265
def subscriptionResponse(oid: Observation.Id): Json =
4366
Json.obj(
@@ -46,7 +69,30 @@ class observationEditSn extends ExecutionTestSupport with ObservingModeSetupOper
4669
"editType" -> Json.fromString(EditType.Updated.tag.toUpperCase),
4770
"value" -> Json.obj(
4871
"id" -> oid.asJson,
72+
"itc" -> Json.obj(
73+
"science" -> Json.obj(
74+
"selected" -> Json.obj(
75+
"exposureCount" -> 6.asJson
76+
)
77+
)
78+
),
4979
"execution" -> Json.obj(
80+
"digest" -> Json.obj(
81+
"acquisition" -> Json.obj(
82+
"timeEstimate" -> Json.obj(
83+
"total" -> Json.obj(
84+
"microseconds" -> AcquisitionTotal.asJson
85+
)
86+
)
87+
),
88+
"science" -> Json.obj(
89+
"timeEstimate" -> Json.obj(
90+
"total" -> Json.obj(
91+
"microseconds" -> ScienceTotal.asJson
92+
)
93+
)
94+
)
95+
),
5096
"config" -> Json.obj(
5197
"gmosNorth" -> Json.obj(
5298
"science" -> Json.obj(
@@ -72,12 +118,18 @@ class observationEditSn extends ExecutionTestSupport with ObservingModeSetupOper
72118
SET: {
73119
scienceRequirements: {
74120
spectroscopy: {
121+
wavelength: { micrometers: 0.700000 }
122+
resolution: 1000
75123
exposureTimeMode: {
76124
signalToNoise: {
77125
value: 99
78126
at: { nanometers: 500 }
79127
}
80128
}
129+
wavelengthCoverage: { micrometers: 0.400000 }
130+
focalPlane: null
131+
focalPlaneAngle: null
132+
capability: null
81133
}
82134
}
83135
},
@@ -107,13 +159,11 @@ class observationEditSn extends ExecutionTestSupport with ObservingModeSetupOper
107159
tid <- createTargetWithProfileAs(pi, pid)
108160
oid <- createGmosNorthLongSlitObservationAs(pi, pid, List(tid))
109161
_ <- generateOrFail(pid, oid)
110-
// expect two responses, one from editing the S/N, one because our query
111-
// requests the sequence which requires a cache update
112162
_ <- subscriptionExpect(
113163
user = pi,
114164
query = subscriptionQuery(pid),
115165
mutations = Right(sleep >> updateSn(oid)),
116-
expected = List(subscriptionResponse(oid), subscriptionResponse(oid))
166+
expected = List(subscriptionResponse(oid))
117167
)
118168
yield ()
119169

0 commit comments

Comments
 (0)