diff --git a/tests/csapi/room_messages_test.go b/tests/csapi/room_messages_test.go index 373f3363..13c0e32d 100644 --- a/tests/csapi/room_messages_test.go +++ b/tests/csapi/room_messages_test.go @@ -1,9 +1,13 @@ package csapi_tests import ( + "encoding/json" "fmt" "net/http" "net/url" + "slices" + "strconv" + "strings" "testing" "github.com/tidwall/gjson" @@ -15,6 +19,7 @@ import ( "github.com/matrix-org/complement/match" "github.com/matrix-org/complement/must" "github.com/matrix-org/complement/runtime" + "github.com/matrix-org/gomatrixserverlib/spec" ) // sytest: POST /rooms/:room_id/send/:event_type sends a message @@ -220,3 +225,413 @@ func TestRoomMessagesLazyLoadingLocalUser(t *testing.T) { }, }) } + +type MessageDraft struct { + Sender *client.CSAPI + Message string +} + +type EventInfo struct { + MessageDraft MessageDraft + EventID string +} + +type MessagesTestCase struct { + name string + numberOfMessagesToSend int + messagesRequestLimit int +} + +func TestMessagesOverFederation(t *testing.T) { + deployment := complement.Deploy(t, 2) + defer deployment.Destroy(t) + + alice := deployment.Register(t, "hs1", helpers.RegistrationOpts{ + LocalpartSuffix: "alice", + }) + bob := deployment.Register(t, "hs2", helpers.RegistrationOpts{ + LocalpartSuffix: "bob", + }) + + // Test to make sure all of the messages sent in the room are visible to someone else + // who joins the room later on. + t.Run("Visible shared history after joining new room (backfill)", func(t *testing.T) { + // FIXME: Dendrite doesn't handle backfill here for whatever reason + runtime.SkipIf(t, runtime.Dendrite) + + // Some homeservers have different hard-limits for `/messages?limit=xxx` requests + // (Synapse's `MAX_LIMIT` is 1000) so we test a few different variations. + for _, testCase := range []MessagesTestCase{ + // Test where the `/messages?limit=xxx` is <= than the number of messages the + // homeserver tries to backfill before responding to the `/messages` request. + // Because the Matrix spec default `limit` is 10, we can assume that this is lower + // than the number of messages that *any* homeserver will try to backfill before + // responding. + { + name: "`messagesRequestLimit` is lower than the number of messages backfilled (assumed)", + // We send more messages than fit in one request + numberOfMessagesToSend: 20, + // This is the default limit in the Matrix spec so it's bound to be lower than + // the number of messages that are backfilled. + messagesRequestLimit: 10, + }, + // Test where the `/messages?limit=xxx` is greater than the number of messages + // Synapse tries to backfill (100) before responding to the `/messages` request. + { + name: "`messagesRequestLimit` is greater than the number of messages backfilled (in Synapse, 100)", + // We send more messages than fit in one request + numberOfMessagesToSend: 300, + // We request more messages than Synapse tries to backfill at once (which is 100) + messagesRequestLimit: 200, + }, + } { + t.Run(testCase.name, func(t *testing.T) { + // Alice creates the room + roomID := alice.MustCreateRoom(t, map[string]interface{}{ + // The `public_chat` preset includes `history_visibility: "shared"` ("Previous + // events are always accessible to newly joined members. All events in the + // room are accessible, even those sent when the member was not a part of the + // room."), which is what we want to test. + "preset": "public_chat", + }) + + // Send messages and make sure we can see them in `/messages` + _sendAndTestMessageHistory( + t, + roomID, + deployment.GetFullyQualifiedHomeserverName(t, "hs1"), + alice, + bob, + testCase, + ) + }) + } + }) + + // Test to make sure all of the messages sent in the room are visible to someone else + // who *re-joins* the room. + t.Run("Visible shared history after re-joining room (backfill)", func(t *testing.T) { + // FIXME: Dendrite doesn't handle backfill well on re-join yet + runtime.SkipIf(t, runtime.Dendrite) + + // Some homeservers have different hard-limits for `/messages?limit=xxx` requests + // (Synapse's `MAX_LIMIT` is 1000) so we test a few different variations. + for _, testCase := range []MessagesTestCase{ + // Test where the `/messages?limit=xxx` is <= than the number of messages the + // homeserver tries to backfill before responding to the `/messages` request. + // Because the Matrix spec default `limit` is 10, we can assume that this is lower + // than the number of messages that *any* homeserver will try to backfill before + // responding. + { + name: "`messagesRequestLimit` is lower than the number of messages backfilled (assumed)", + // We send more messages than fit in one request + numberOfMessagesToSend: 20, + // This is the default limit in the Matrix spec so it's bound to be lower than + // the number of messages that are backfilled. + messagesRequestLimit: 10, + }, + // Test where the `/messages?limit=xxx` is greater than the number of messages + // Synapse tries to backfill (100) before responding to the `/messages` request. + // + // FIXME: This test currently doesn't work because the homeserver will backfill + // the `limit=100` and return those 100 new events + all of the old history + // leaving an invisible gap in between. So the events in the response includes the + // 100 new events, [gap], the old history from when you were previously joined. + // This is the type of scenario that MSC3871 (Gappy timelines) is trying to + // address. This is a hole in the spec as there is no way for a homeserver + // indicate gaps to the client so they can paginate the gap and cause the + // homeserver to backfill more. + // + // { + // name: "`messagesRequestLimit` is greater than the number of messages backfilled (in Synapse, 100)", + // // We send more messages than fit in one request + // numberOfMessagesToSend: 300, + // // We request more messages than Synapse tries to backfill at once (which is 100) + // messagesRequestLimit: 200, + // }, + } { + t.Run(testCase.name, func(t *testing.T) { + // Start a sync loop + _, aliceSince := alice.MustSync(t, client.SyncReq{TimeoutMillis: "0"}) + + // Alice creates the room + roomID := alice.MustCreateRoom(t, map[string]interface{}{ + // The `public_chat` preset includes `history_visibility: "shared"` ("Previous + // events are always accessible to newly joined members. All events in the + // room are accessible, even those sent when the member was not a part of the + // room."), which is what we want to test. + "preset": "public_chat", + }) + + // Bob joins the room + bob.MustJoinRoom(t, roomID, []spec.ServerName{ + deployment.GetFullyQualifiedHomeserverName(t, "hs1"), + }) + aliceSince = alice.MustSyncUntil(t, client.SyncReq{Since: aliceSince}, client.SyncJoinedTo(bob.UserID, roomID)) + + // Bob leaves the room + bob.MustLeaveRoom(t, roomID) + // Make sure the leave has federated + aliceSince = alice.MustSyncUntil(t, client.SyncReq{Since: aliceSince}, client.SyncLeftFrom(bob.UserID, roomID)) + + // Send messages and make sure we can see them in `/messages` + _sendAndTestMessageHistory( + t, + roomID, + deployment.GetFullyQualifiedHomeserverName(t, "hs1"), + alice, + bob, + testCase, + ) + }) + } + }) +} + +// 1. Alice sends a bunch of messages into the room +// 2. Bob joins the room +// 3. Bob paginates backwards through the room history until he reaches the start of the room +// 4. Assert that Bob sees all of the messages that Alice sent in the correct order +func _sendAndTestMessageHistory( + t *testing.T, + roomID string, + serverToJoinVia spec.ServerName, + alice, bob *client.CSAPI, + testCase MessagesTestCase, +) { + // Keep track of the order + eventIDs := make([]string, 0) + // Map from event_id to event info + eventMap := make(map[string]EventInfo) + + messageDrafts := make([]MessageDraft, 0, testCase.numberOfMessagesToSend) + for i := 0; i < testCase.numberOfMessagesToSend; i++ { + messageDrafts = append(messageDrafts, MessageDraft{alice, fmt.Sprintf("Filler message %d to increase history size.", i+1)}) + } + sendAndTrackMessages(t, roomID, messageDrafts, &eventIDs, &eventMap) + + // Bob joins the room + bob.MustJoinRoom(t, roomID, []spec.ServerName{ + serverToJoinVia, + }) + + // Make it easy to cross-reference the events being talked about in the logs + for eventIndex, eventID := range eventIDs { + t.Logf("Message %d -> event_id=%s", eventIndex, eventID) + } + + // Keep paginating backwards until we reach the start of the room + reverseChronologicalActualEventIDs := make( + []string, + 0, + // This is a minimum capacity (there will be more events) + testCase.numberOfMessagesToSend, + ) + fromToken := "" + for { + messageQueryParams := url.Values{ + "dir": []string{"b"}, + "limit": []string{strconv.Itoa(testCase.messagesRequestLimit)}, + } + if fromToken != "" { + messageQueryParams.Set("from", fromToken) + } + + messagesRes := bob.MustDo(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, + client.WithContentType("application/json"), + client.WithQueries(messageQueryParams), + ) + messagesResBody := client.ParseJSON(t, messagesRes) + actualEventIDsFromRequest := extractEventIDsFromMessagesResponse(t, messagesResBody) + reverseChronologicalActualEventIDs = append(reverseChronologicalActualEventIDs, actualEventIDsFromRequest...) + + // Make it easy to understand what each `/messages` request returned + relevantActualEventIDsFromRequest := filterEventIDs(t, actualEventIDsFromRequest, eventIDs) + firstEventIndex := -1 + lastEventIndex := -1 + if len(relevantActualEventIDsFromRequest) > 0 { + firstEventIndex = slices.Index(eventIDs, relevantActualEventIDsFromRequest[0]) + lastEventIndex = slices.Index(eventIDs, relevantActualEventIDsFromRequest[len(relevantActualEventIDsFromRequest)-1]) + } + t.Logf("Fetched %d events from the `/messages` endpoint that included events %d to %d", + len(actualEventIDsFromRequest), + firstEventIndex, lastEventIndex, + ) + + endTokenRes := gjson.GetBytes(messagesResBody, "end") + // "`end`: If no further events are available (either because we have reached the + // start of the timeline, or because the user does not have permission to see + // any more events), this property is omitted from the response." (Matrix spec) + if !endTokenRes.Exists() { + break + } + fromToken = endTokenRes.Str + + // Or if we don't see any more events, we will assume that we reached the + // start of the room. No more to paginate. + if len(actualEventIDsFromRequest) == 0 { + break + } + } + + // Put them in chronological order to match the expected list + chronologicalActualEventIds := slices.Clone(reverseChronologicalActualEventIDs) + slices.Reverse(chronologicalActualEventIds) + + // Assert timeline order + assertEventsInOrder(t, chronologicalActualEventIds, eventIDs) +} + +func sendMessageDrafts( + t *testing.T, + roomID string, + messageDrafts []MessageDraft, +) []string { + t.Helper() + + eventIDs := make([]string, len(messageDrafts)) + for messageDraftIndex, messageDraft := range messageDrafts { + eventID := messageDraft.Sender.SendEventSynced(t, roomID, b.Event{ + Type: "m.room.message", + Content: map[string]interface{}{ + "msgtype": "m.text", + "body": messageDraft.Message, + }, + }) + eventIDs[messageDraftIndex] = eventID + } + + return eventIDs +} + +// sendAndTrackMessages sends the given message drafts to the room, keeping track of the +// new events in the list of `eventIDs` and `eventMap`. Returns the list of new event +// IDs that were sent. +func sendAndTrackMessages( + t *testing.T, + roomID string, + messageDrafts []MessageDraft, + eventIDs *[]string, + eventMap *map[string]EventInfo, +) []string { + t.Helper() + + newEventIDs := sendMessageDrafts(t, roomID, messageDrafts) + + *eventIDs = append(*eventIDs, newEventIDs...) + for i, eventID := range newEventIDs { + (*eventMap)[eventID] = EventInfo{ + MessageDraft: messageDrafts[i], + EventID: eventID, + } + } + + return newEventIDs +} + +// extractEventIDsFromMessagesResponse extracts the event IDs from the given +// `/messages` response body. +func extractEventIDsFromMessagesResponse( + t *testing.T, + messagesResBody json.RawMessage, +) []string { + t.Helper() + + wantKey := "chunk" + keyRes := gjson.GetBytes(messagesResBody, wantKey) + if !keyRes.Exists() { + t.Fatalf("extractEventIDsFromMessagesResponse: missing key '%s'", wantKey) + } + if !keyRes.IsArray() { + t.Fatalf("extractEventIDsFromMessagesResponse: key '%s' is not an array (was %s)", wantKey, keyRes.Type) + } + + var eventIDs []string + actualEvents := keyRes.Array() + for _, event := range actualEvents { + eventIDs = append(eventIDs, event.Get("event_id").Str) + } + + return eventIDs +} + +func filterEventIDs(t *testing.T, actualEventIDs []string, expectedEventIDs []string) []string { + t.Helper() + + relevantActualEventIDs := make([]string, 0, len(expectedEventIDs)) + for _, eventID := range actualEventIDs { + if slices.Contains(expectedEventIDs, eventID) { + relevantActualEventIDs = append(relevantActualEventIDs, eventID) + } + } + + return relevantActualEventIDs +} + +// assertEventsInOrder asserts all `actualEventIDs` are present and in order according +// to `expectedEventIDs`. Other unrelated events can be in between. +func assertEventsInOrder(t *testing.T, actualEventIDs []string, expectedEventIDs []string) { + t.Helper() + + relevantActualEventIDs := filterEventIDs(t, actualEventIDs, expectedEventIDs) + + if len(relevantActualEventIDs) != len(expectedEventIDs) { + t.Fatalf("expected %d events in timeline (got %d relevant events filtered down from %d events)\n%s", + len(expectedEventIDs), len(relevantActualEventIDs), len(actualEventIDs), + generateEventOrderDiffString(relevantActualEventIDs, expectedEventIDs), + ) + } + + for i, eventID := range relevantActualEventIDs { + if eventID != expectedEventIDs[i] { + t.Fatalf("expected event ID %s (got %s) at index %d\n%s", + expectedEventIDs[i], eventID, i, generateEventOrderDiffString(relevantActualEventIDs, expectedEventIDs), + ) + } + } +} + +func generateEventOrderDiffString(actualEventIDs []string, expectedEventIDs []string) string { + expectedLines := make([]string, len(expectedEventIDs)) + for i, expectedEventID := range expectedEventIDs { + isExpectedInActual := slices.Contains(actualEventIDs, expectedEventID) + isMissingIndicatorString := " " + if !isExpectedInActual { + isMissingIndicatorString = "?" + } + + expectedLines[i] = fmt.Sprintf("%2d: %s %s", i, isMissingIndicatorString, expectedEventID) + } + expectedDiffString := strings.Join(expectedLines, "\n") + + actualLines := make([]string, len(actualEventIDs)) + for actualEventIndex, actualEventID := range actualEventIDs { + isActualInExpected := slices.Contains(expectedEventIDs, actualEventID) + isActualInExpectedIndicatorString := " " + if isActualInExpected { + isActualInExpectedIndicatorString = "+" + } + + expectedIndex := slices.Index(expectedEventIDs, actualEventID) + expectedIndexString := "" + if actualEventIndex != expectedIndex { + expectedDirectionString := "⬆️" + if expectedIndex > actualEventIndex { + expectedDirectionString = "⬇️" + } + + expectedIndexString = fmt.Sprintf(" (expected index %d %s)", expectedIndex, expectedDirectionString) + } + + actualLines[actualEventIndex] = fmt.Sprintf("%2d: %s %s%s", + actualEventIndex, isActualInExpectedIndicatorString, actualEventID, expectedIndexString, + ) + } + actualDiffString := strings.Join(actualLines, "\n") + + return fmt.Sprintf( + "Actual events ('+' = found expected items):\n%s\nExpected events ('?' = missing expected items):\n%s", + actualDiffString, + expectedDiffString, + ) +}