diff --git a/.gitignore b/.gitignore index cc52196b..0d22eaa1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,9 @@ **/bin **/build **/.DS_Store +**/test.out +**/profile/cpu +**/profile/memory # binaries client/build diff --git a/bringyour/trace.go b/bringyour/trace.go index e5b58526..f7b480f2 100644 --- a/bringyour/trace.go +++ b/bringyour/trace.go @@ -21,7 +21,7 @@ import ( func IsDoneError(r any) bool { isDoneMessage := func(message string) bool { switch message { - case "Done": + case "Done", "Done.": return true // pgx case "context canceled", "timeout: context already done: context canceled": diff --git a/client/client.go b/client/client.go index 20c9b415..4d483f65 100644 --- a/client/client.go +++ b/client/client.go @@ -154,29 +154,35 @@ func encodeUuid(src [16]byte) string { return fmt.Sprintf("%x-%x-%x-%x-%x", src[0:4], src[4:6], src[6:8], src[8:10], src[10:16]) } -type Path struct { - ClientId *Id - StreamId *Id +type TransferPath struct { + SourceId *Id + DestinationId *Id + StreamId *Id } -func NewPath(clientId *Id, streamId *Id) *Path { - return &Path{ - ClientId: clientId, - StreamId: streamId, +func NewTransferPath(sourceId *Id, destinationId *Id, streamId *Id) *TransferPath { + return &TransferPath{ + SourceId: sourceId, + DestinationId: destinationId, + StreamId: streamId, } } -func fromConnectPath(path connect.Path) *Path { - return &Path{ - ClientId: newId(path.ClientId), - StreamId: newId(path.StreamId), +func fromConnect(path connect.TransferPath) *TransferPath { + return &TransferPath{ + SourceId: newId(path.SourceId), + DestinationId: newId(path.DestinationId), + StreamId: newId(path.StreamId), } } -func (self *Path) toConnectPath() connect.Path { - path := connect.Path{} - if self.ClientId != nil { - path.ClientId = connect.Id(self.ClientId.id) +func (self *TransferPath) toConnect() connect.TransferPath { + path := connect.TransferPath{} + if self.SourceId != nil { + path.SourceId = connect.Id(self.SourceId.id) + } + if self.DestinationId != nil { + path.DestinationId = connect.Id(self.DestinationId.id) } if self.StreamId != nil { path.StreamId = connect.Id(self.StreamId.id) diff --git a/client/device.go b/client/device.go index 7de90ffc..18227b9c 100644 --- a/client/device.go +++ b/client/device.go @@ -251,7 +251,7 @@ func (self *BringYourDevice) connectChanged(connectEnabled bool) { } // `ReceivePacketFunction` -func (self *BringYourDevice) receive(source connect.Path, ipProtocol connect.IpProtocol, packet []byte) { +func (self *BringYourDevice) receive(source connect.TransferPath, ipProtocol connect.IpProtocol, packet []byte) { // deviceLog("GOT A PACKET %d", len(packet)) for _, receiveCallback := range self.receiveCallbacks.Get() { receiveCallback(source, ipProtocol, packet) @@ -357,20 +357,20 @@ func (self *BringYourDevice) SetDestination(specs *ProviderSpecList, provideMode connectSpecs = append(connectSpecs, specs.Get(i).toConnectProviderSpec()) } - paths := []connect.Path{} + destinations := []connect.MultiHopId{} for _, connectSpec := range connectSpecs { if connectSpec.ClientId != nil { - paths = append(paths, connect.Path{ClientId: *connectSpec.ClientId}) + destinations = append(destinations, connect.RequireMultiHopId(*connectSpec.ClientId)) } } // connect to a single client // no need to optimize this case, use the simplest user nat client - if DebugUseSingleClientConnect && len(connectSpecs) == len(paths) && len(paths) == 1 { + if DebugUseSingleClientConnect && len(connectSpecs) == len(destinations) && len(destinations) == 1 { self.remoteUserNatClient, returnErr = connect.NewRemoteUserNatClient( self.client, self.receive, - paths, + destinations, protocol.ProvideMode_Network, ) if returnErr != nil { @@ -419,7 +419,7 @@ func (self *BringYourDevice) Shuffle() { func (self *BringYourDevice) SendPacket(packet []byte, n int32) bool { packetCopy := make([]byte, n) copy(packetCopy, packet[0:n]) - source := connect.Path{ClientId: self.clientId} + source := connect.SourceId(self.clientId) self.stateLock.Lock() remoteUserNatClient := self.remoteUserNatClient @@ -445,7 +445,7 @@ func (self *BringYourDevice) SendPacket(packet []byte, n int32) bool { } func (self *BringYourDevice) AddReceivePacket(receivePacket ReceivePacket) Sub { - receive := func(destination connect.Path, ipProtocol connect.IpProtocol, packet []byte) { + receive := func(destination connect.TransferPath, ipProtocol connect.IpProtocol, packet []byte) { receivePacket.ReceivePacket(packet) } callbackId := self.receiveCallbacks.Add(receive) diff --git a/client/gomobile.go b/client/gomobile.go index 0b9b7fd8..bf0c0d63 100644 --- a/client/gomobile.go +++ b/client/gomobile.go @@ -83,13 +83,13 @@ func NewIntList() *IntList { } } -type PathList struct { - exportedList[*Path] +type TransferPathList struct { + exportedList[*TransferPath] } -func NewPathList() *PathList { - return &PathList{ - exportedList: *newExportedList[*Path](), +func NewTransferPathList() *TransferPathList { + return &TransferPathList{ + exportedList: *newExportedList[*TransferPath](), } } diff --git a/connect/connect_test.go b/connect/connect_test.go index e2e1ae26..ab7a4f9b 100644 --- a/connect/connect_test.go +++ b/connect/connect_test.go @@ -29,48 +29,56 @@ import ( func TestConnect(t *testing.T) { bringyour.DefaultTestEnv().Run(func() { + fmt.Printf("[progress]start TestConnect\n") testConnect(t, contractTestNone, false, true) }) } func TestConnectWithSymmetricContracts(t *testing.T) { bringyour.DefaultTestEnv().Run(func() { + fmt.Printf("[progress]start TestConnectWithSymmetricContracts\n") testConnect(t, contractTestSymmetric, false, true) }) } func TestConnectWithAsymmetricContracts(t *testing.T) { bringyour.DefaultTestEnv().Run(func() { + fmt.Printf("[progress]start TestConnectWithAsymmetricContracts\n") testConnect(t, contractTestAsymmetric, false, true) }) } func TestConnectWithChaos(t *testing.T) { bringyour.DefaultTestEnv().Run(func() { + fmt.Printf("[progress]start TestConnectWithChaos\n") testConnect(t, contractTestNone, true, true) }) } func TestConnectWithSymmetricContractsWithChaos(t *testing.T) { bringyour.DefaultTestEnv().Run(func() { + fmt.Printf("[progress]start TestConnectWithSymmetricContractsWithChaos\n") testConnect(t, contractTestSymmetric, true, true) }) } func TestConnectWithAsymmetricContractsWithChaos(t *testing.T) { bringyour.DefaultTestEnv().Run(func() { + fmt.Printf("[progress]start TestConnectWithAsymmetricContractsWithChaos\n") testConnect(t, contractTestAsymmetric, true, true) }) } func TestConnectNoTransportReform(t *testing.T) { bringyour.DefaultTestEnv().Run(func() { + fmt.Printf("[progress]start TestConnectNoTransportReform\n") testConnect(t, contractTestNone, false, false) }) } func TestConnectWithChaosNoTransportReform(t *testing.T) { bringyour.DefaultTestEnv().Run(func() { + fmt.Printf("[progress]start TestConnectWithChaosNoTransportReform\n") testConnect(t, contractTestNone, true, false) }) } @@ -98,7 +106,7 @@ func testConnect(t *testing.T, contractTest int, enableChaos bool, enableTranspo os.Setenv("WARP_SERVICE", "test") os.Setenv("WARP_BLOCK", "test") - receiveTimeout := 60 * time.Second + receiveTimeout := 90 * time.Second // larger values test the send queue and receive queue sizes messageContentSizes := []ByteCount{ @@ -119,9 +127,9 @@ func testConnect(t *testing.T, contractTest int, enableChaos bool, enableTranspo // if the receive sequence times out, we may get duplicate receives // the timeout should be greater than the write/read and reconnect timeouts in the exchange, // due to chaos - sequenceIdleTimeout = 10 * time.Second + sequenceIdleTimeout = 15 * time.Second } else { - sequenceIdleTimeout = 2 * idleTimeout + sequenceIdleTimeout = 5 * idleTimeout } ctx, cancel := context.WithCancel(context.Background()) @@ -333,25 +341,25 @@ func testConnect(t *testing.T, contractTest int, enableChaos bool, enableTranspo // } // } - clientA.AddReceiveCallback(func(sourceId connect.Id, frames []*protocol.Frame, provideMode protocol.ProvideMode) { + clientA.AddReceiveCallback(func(source connect.TransferPath, frames []*protocol.Frame, provideMode protocol.ProvideMode) { // printReceive("a", frames) receiveA <- &Message{ - sourceId: sourceId, + sourceId: source.SourceId, frames: frames, provideMode: provideMode, } }) - clientB.AddReceiveCallback(func(sourceId connect.Id, frames []*protocol.Frame, provideMode protocol.ProvideMode) { + clientB.AddReceiveCallback(func(source connect.TransferPath, frames []*protocol.Frame, provideMode protocol.ProvideMode) { // printReceive("b", frames) receiveB <- &Message{ - sourceId: sourceId, + sourceId: source.SourceId, frames: frames, provideMode: provideMode, } }) - initialTransferBalance := ByteCount(1024 * 1024 * 1024 * 1024) + initialTransferBalance := ByteCount(1024) * ByteCount(1024) * ByteCount(1024) * ByteCount(1024) balanceCodeA, err := model.CreateBalanceCode( ctx, @@ -486,10 +494,10 @@ func testConnect(t *testing.T, contractTest int, enableChaos bool, enableTranspo ackA := make(chan error, 1024) ackB := make(chan error, 1024) - for burstSize := 1; burstSize < burstM; burstSize += 1 { + for burstSize := 1; burstSize <= burstM; burstSize += 1 { for b := 0; b < 2; b += 1 { fmt.Printf( - "[%s] burstSize=%d b=%d\n", + "[progress][%s] burstSize=%d b=%d\n", model.ByteCountHumanReadable(messageContentSize), burstSize, b, @@ -539,34 +547,35 @@ func testConnect(t *testing.T, contractTest int, enableChaos bool, enableTranspo } } for j := 0; j < nackM; j += 1 { - success := clientA.SendWithTimeout( + _, err := clientA.SendWithTimeoutDetailed( connect.RequireToFrame(&protocol.SimpleMessage{ MessageIndex: uint32(i*nackM + j), MessageCount: uint32(0), Content: messageContent, }), - connect.Id(clientIdB), + connect.DestinationId(connect.Id(clientIdB)), nil, -1, connect.NoAck(), ) - if !success { - panic(errors.New("Could not send.")) + if err != nil && !bringyour.IsDoneError(err) { + panic(fmt.Errorf("Could not send = %v", err)) } } - success := clientA.Send( + _, err := clientA.SendWithTimeoutDetailed( connect.RequireToFrame(&protocol.SimpleMessage{ MessageIndex: uint32(i), MessageCount: uint32(burstSize), Content: messageContent, }), - connect.Id(clientIdB), + connect.DestinationId(connect.Id(clientIdB)), func(err error) { ackA <- err }, + -1, ) - if !success { - panic(errors.New("Could not send.")) + if err != nil && !bringyour.IsDoneError(err) { + panic(fmt.Errorf("Could not send = %v", err)) } } }() @@ -699,40 +708,40 @@ func testConnect(t *testing.T, contractTest int, enableChaos bool, enableTranspo opts = append(opts, connect.CompanionContract()) } - success := clientB.SendWithTimeout( + _, err := clientB.SendWithTimeoutDetailed( connect.RequireToFrame(&protocol.SimpleMessage{ MessageIndex: uint32(i*nackM + j), MessageCount: uint32(0), Content: messageContent, }), - connect.Id(clientIdA), + connect.DestinationId(connect.Id(clientIdA)), nil, -1, opts..., ) - if !success { - panic(errors.New("Could not send.")) + if err != nil && !bringyour.IsDoneError(err) { + panic(fmt.Errorf("Could not send = %v", err)) } } opts := []any{} if contractTest == contractTestAsymmetric { opts = append(opts, connect.CompanionContract()) } - success := clientB.SendWithTimeout( + _, err := clientB.SendWithTimeoutDetailed( connect.RequireToFrame(&protocol.SimpleMessage{ MessageIndex: uint32(i), MessageCount: uint32(burstSize), Content: messageContent, }), - connect.Id(clientIdA), + connect.DestinationId(connect.Id(clientIdA)), func(err error) { ackB <- err }, -1, opts..., ) - if !success { - panic(errors.New("Could not send.")) + if err != nil && !bringyour.IsDoneError(err) { + panic(fmt.Errorf("Could not send = %v", err)) } } }() @@ -816,19 +825,19 @@ func testConnect(t *testing.T, contractTest int, enableChaos bool, enableTranspo // } // } - resendItemCountA, resendItemByteCountA, sequenceIdA := clientA.ResendQueueSize(connect.Id(clientIdB), false) + resendItemCountA, resendItemByteCountA, sequenceIdA := clientA.ResendQueueSize(connect.DestinationId(connect.Id(clientIdB)), connect.MultiHopId{}, false, false) assert.Equal(t, resendItemCountA, 0) assert.Equal(t, resendItemByteCountA, ByteCount(0)) - resendItemCountB, resentItemByteCountB, sequenceIdB := clientB.ResendQueueSize(connect.Id(clientIdA), false) + resendItemCountB, resentItemByteCountB, sequenceIdB := clientB.ResendQueueSize(connect.DestinationId(connect.Id(clientIdA)), connect.MultiHopId{}, false, false) assert.Equal(t, resendItemCountB, 0) assert.Equal(t, resentItemByteCountB, ByteCount(0)) - receiveItemCountA, receiveItemByteCountA := clientA.ReceiveQueueSize(connect.Id(clientIdB), sequenceIdB) + receiveItemCountA, receiveItemByteCountA := clientA.ReceiveQueueSize(connect.DestinationId(connect.Id(clientIdB)), sequenceIdB) assert.Equal(t, receiveItemCountA, 0) assert.Equal(t, receiveItemByteCountA, ByteCount(0)) - receiveItemCountB, receiveItemByteCountB := clientB.ReceiveQueueSize(connect.Id(clientIdA), sequenceIdA) + receiveItemCountB, receiveItemByteCountB := clientB.ReceiveQueueSize(connect.DestinationId(connect.Id(clientIdA)), sequenceIdA) assert.Equal(t, receiveItemCountB, 0) assert.Equal(t, receiveItemByteCountB, ByteCount(0)) } @@ -836,23 +845,7 @@ func testConnect(t *testing.T, contractTest int, enableChaos bool, enableTranspo } select { - case <-time.After(10 * time.Second): - } - - flushedContractIdsA := []bringyour.Id{} - for _, contractId := range clientA.ContractManager().Flush(false) { - flushedContractIdsA = append(flushedContractIdsA, bringyour.Id(contractId)) - } - flushedContractIdsB := []bringyour.Id{} - for _, contractId := range clientB.ContractManager().Flush(false) { - flushedContractIdsB = append(flushedContractIdsB, bringyour.Id(contractId)) - } - - clientA.Flush() - clientB.Flush() - - select { - case <-time.After(4 * time.Second): + case <-time.After(1 * time.Second): } for _, transportA := range transportAs { @@ -879,6 +872,22 @@ func testConnect(t *testing.T, contractTest int, enableChaos bool, enableTranspo case <-time.After(1 * time.Second): } + flushedContractIdsA := []bringyour.Id{} + for _, contractId := range clientA.ContractManager().Flush(false) { + flushedContractIdsA = append(flushedContractIdsA, bringyour.Id(contractId)) + } + flushedContractIdsB := []bringyour.Id{} + for _, contractId := range clientB.ContractManager().Flush(false) { + flushedContractIdsB = append(flushedContractIdsB, bringyour.Id(contractId)) + } + + clientA.Flush() + clientB.Flush() + + select { + case <-time.After(1 * time.Second): + } + clientA.Close() clientB.Close() @@ -902,19 +911,23 @@ func testConnect(t *testing.T, contractTest int, enableChaos bool, enableTranspo contractIdPartialClosePartiesAToB := model.GetOpenContractIdsWithPartialClose(ctx, clientIdA, clientIdB) contractIdPartialClosePartiesBToA := model.GetOpenContractIdsWithPartialClose(ctx, clientIdB, clientIdA) + // note ContractPartySource is for contracts that were queued up and never used for contractId, party := range contractIdPartialClosePartiesAToB { - if party == model.ContractPartyCheckpoint { + if party == model.ContractPartyCheckpoint || party == model.ContractPartySource { model.CloseContract(ctx, contractId, clientIdB, 0, false) delete(contractIdPartialClosePartiesAToB, contractId) } } for contractId, party := range contractIdPartialClosePartiesBToA { - if party == model.ContractPartyCheckpoint { + if party == model.ContractPartyCheckpoint || party == model.ContractPartySource { model.CloseContract(ctx, contractId, clientIdA, 0, false) delete(contractIdPartialClosePartiesBToA, contractId) } } + assert.Equal(t, len(contractIdPartialClosePartiesAToB), 0) + assert.Equal(t, len(contractIdPartialClosePartiesBToA), 0) + // FIXME what are these other contracts? // for _, party := range contractIdPartialClosePartiesAToB { // assert.Equal(t, model.ContractPartySource, party) @@ -923,22 +936,26 @@ func testConnect(t *testing.T, contractTest int, enableChaos bool, enableTranspo // assert.Equal(t, model.ContractPartySource, party) // } - if e := len(contractIdPartialClosePartiesAToB) - len(flushedContractIdsA); 1 < e { - assert.Equal(t, len(flushedContractIdsA), len(contractIdPartialClosePartiesAToB)) - } - for _, contractId := range flushedContractIdsA { - party, ok := contractIdPartialClosePartiesAToB[contractId] - assert.Equal(t, true, ok) - assert.Equal(t, model.ContractPartySource, party) - } - if e := len(contractIdPartialClosePartiesBToA) - len(flushedContractIdsB); 1 < e { - assert.Equal(t, len(flushedContractIdsB), len(contractIdPartialClosePartiesBToA)) - } - for _, contractId := range flushedContractIdsB { - party, ok := contractIdPartialClosePartiesBToA[contractId] - assert.Equal(t, true, ok) - assert.Equal(t, model.ContractPartySource, party) - } + // SendSequence now flushes pending contracts when closed + assert.Equal(t, len(flushedContractIdsA), 0) + assert.Equal(t, len(flushedContractIdsB), 0) + + // if e := len(contractIdPartialClosePartiesAToB) - len(flushedContractIdsA); 1 < e { + // assert.Equal(t, len(flushedContractIdsA), len(contractIdPartialClosePartiesAToB)) + // } + // for _, contractId := range flushedContractIdsA { + // party, ok := contractIdPartialClosePartiesAToB[contractId] + // assert.Equal(t, true, ok) + // assert.Equal(t, model.ContractPartySource, party) + // } + // if e := len(contractIdPartialClosePartiesBToA) - len(flushedContractIdsB); 1 < e { + // assert.Equal(t, len(flushedContractIdsB), len(contractIdPartialClosePartiesBToA)) + // } + // for _, contractId := range flushedContractIdsB { + // party, ok := contractIdPartialClosePartiesBToA[contractId] + // assert.Equal(t, true, ok) + // assert.Equal(t, model.ContractPartySource, party) + // } localStatsA := clientA.ContractManager().LocalStats() localStatsB := clientB.ContractManager().LocalStats() diff --git a/connect/profile/cpu b/connect/profile/cpu deleted file mode 100644 index cd0faaf5..00000000 Binary files a/connect/profile/cpu and /dev/null differ diff --git a/connect/profile/memory b/connect/profile/memory deleted file mode 100644 index 1c0e0e04..00000000 Binary files a/connect/profile/memory and /dev/null differ diff --git a/connect/profile/race.out.54770 b/connect/profile/race.out.54770 new file mode 100644 index 00000000..34d9d614 --- /dev/null +++ b/connect/profile/race.out.54770 @@ -0,0 +1,56 @@ +================== +WARNING: DATA RACE +Write at 0x00c00105c6d0 by goroutine 23329: + runtime.recvDirect() + /usr/local/go/src/runtime/chan.go:348 +0x7c + bringyour.com/connect.(*SendSequence).Close() + /Users/brien/bringyour/connect/connect/transfer.go:1991 +0x88 + bringyour.com/connect.(*SendBuffer).Pack.func1.1() + /Users/brien/bringyour/connect/connect/transfer.go:1025 +0xd4 + +Previous read at 0x00c00105c6d0 by goroutine 124: + runtime.chansend1() + /usr/local/go/src/runtime/chan.go:146 +0x2c + bringyour.com/connect.(*SendSequence).Ack() + /Users/brien/bringyour/connect/connect/transfer.go:1311 +0x50c + bringyour.com/connect.(*SendBuffer).Ack() + /Users/brien/bringyour/connect/connect/transfer.go:1079 +0xb0 + bringyour.com/connect.(*Client).run.func6() + /Users/brien/bringyour/connect/connect/transfer.go:754 +0xfe4 + bringyour.com/connect.(*Client).run() + /Users/brien/bringyour/connect/connect/transfer.go:766 +0xee8 + bringyour.com/connect.NewClientWithTag.gowrap1() + /Users/brien/bringyour/connect/connect/transfer.go:324 +0x34 + +Goroutine 23329 (running) created at: + bringyour.com/connect.(*SendBuffer).Pack.func1() + /Users/brien/bringyour/connect/connect/transfer.go:1020 +0x3d8 + bringyour.com/connect.(*SendBuffer).Pack() + /Users/brien/bringyour/connect/connect/transfer.go:1055 +0x1e8 + bringyour.com/connect.(*Client).sendWithTimeoutDetailed() + /Users/brien/bringyour/connect/connect/transfer.go:561 +0xe28 + bringyour.com/connect.(*Client).SendWithTimeoutDetailed() + /Users/brien/bringyour/connect/connect/transfer.go:426 +0xd4 + bringyour.com/connect.(*Client).SendWithTimeout() + /Users/brien/bringyour/connect/connect/transfer.go:415 +0x50 + bringyour.com/service/connect.testConnect.func10() + /Users/brien/bringyour/bringyour/connect/connect_test.go:702 +0x600 + +Goroutine 124 (running) created at: + bringyour.com/connect.NewClientWithTag() + /Users/brien/bringyour/connect/connect/transfer.go:324 +0xf04 + bringyour.com/connect.NewClient() + /Users/brien/bringyour/connect/connect/transfer.go:292 +0x6c + bringyour.com/service/connect.testConnect() + /Users/brien/bringyour/bringyour/connect/connect_test.go:242 +0xda8 + bringyour.com/service/connect.TestConnect.func1() + /Users/brien/bringyour/bringyour/connect/connect_test.go:32 +0x38 + bringyour.com/bringyour.(*TestEnv).Run() + /Users/brien/bringyour/bringyour/bringyour/test_util.go:38 +0x4c + bringyour.com/service/connect.TestConnect() + /Users/brien/bringyour/bringyour/connect/connect_test.go:31 +0x68 + testing.tRunner() + /usr/local/go/src/testing/testing.go:1689 +0x180 + testing.(*T).Run.gowrap1() + /usr/local/go/src/testing/testing.go:1742 +0x40 +================== diff --git a/connect/resident.go b/connect/resident.go index 887dbe88..d5455463 100644 --- a/connect/resident.go +++ b/connect/resident.go @@ -559,7 +559,7 @@ func (self *Exchange) handleExchangeConnection(conn net.Conn) { glog.Warning("[ecrr] %s/%s done\n", resident.clientId, resident.residentId) } - multiRouteReader := resident.client.RouteManager().OpenMultiRouteReader(resident.client.ClientId()) + multiRouteReader := resident.client.RouteManager().OpenMultiRouteReader(connect.DestinationId(resident.client.ClientId())) if !slices.Contains(multiRouteReader.GetActiveRoutes(), receive) { glog.Warning("[ecrr] %s/%s missing receive route\n", resident.clientId, resident.residentId) } @@ -1520,9 +1520,9 @@ func (self *Resident) clientForward() { */ // `connect.ForwardFunction` -func (self *Resident) handleClientForward(sourceId_ connect.Id, destinationId_ connect.Id, transferFrameBytes []byte) { - sourceId := bringyour.Id(sourceId_) - destinationId := bringyour.Id(destinationId_) +func (self *Resident) handleClientForward(path connect.TransferPath, transferFrameBytes []byte) { + sourceId := bringyour.Id(path.SourceId) + destinationId := bringyour.Id(path.DestinationId) self.UpdateActivity() @@ -1640,8 +1640,8 @@ func (self *Resident) handleClientForward(sourceId_ connect.Id, destinationId_ c } // `connect.ReceiveFunction` -func (self *Resident) handleClientReceive(sourceId_ connect.Id, frames []*protocol.Frame, provideMode protocol.ProvideMode) { - sourceId := bringyour.Id(sourceId_) +func (self *Resident) handleClientReceive(source connect.TransferPath, frames []*protocol.Frame, provideMode protocol.ProvideMode) { + sourceId := bringyour.Id(source.SourceId) if sourceId != self.clientId { glog.Infof("[rr]abuse not from client (%s<>%s)\n", sourceId, self.clientId) @@ -1675,7 +1675,7 @@ func (self *Resident) AddTransport() ( // in `connect` the transport is bidirectional // in the resident, each transport is a single direction transport := &clientTransport{ - sendTransport: connect.NewSendClientTransport(connect.Id(self.clientId)), + sendTransport: connect.NewSendClientTransport(connect.DestinationId(connect.Id(self.clientId))), receiveTransport: connect.NewReceiveGatewayTransport(), }