11'use strict' ;
2+ const Denque = require ( 'denque' ) ;
23const EventEmitter = require ( 'events' ) ;
34const ServerDescription = require ( './server_description' ) . ServerDescription ;
45const ServerType = require ( './common' ) . ServerType ;
@@ -18,6 +19,7 @@ const isNodeShuttingDownError = require('../error').isNodeShuttingDownError;
1819const maxWireVersion = require ( '../utils' ) . maxWireVersion ;
1920const ClientSession = require ( '../sessions' ) . ClientSession ;
2021const MongoError = require ( '../error' ) . MongoError ;
22+ const MongoServerSelectionError = require ( '../error' ) . MongoServerSelectionError ;
2123const resolveClusterTime = require ( '../topologies/shared' ) . resolveClusterTime ;
2224const SrvPoller = require ( './srv_polling' ) . SrvPoller ;
2325const getMMAPError = require ( '../topologies/shared' ) . getMMAPError ;
@@ -35,7 +37,7 @@ const clearAndRemoveTimerFrom = common.clearAndRemoveTimerFrom;
3537const serverSelection = require ( './server_selection' ) ;
3638const readPreferenceServerSelector = serverSelection . readPreferenceServerSelector ;
3739const writableServerSelector = serverSelection . writableServerSelector ;
38- const selectServers = serverSelection . selectServers ;
40+ // const selectServers = serverSelection.selectServers;
3941
4042// Global state
4143let globalTopologyCounter = 0 ;
@@ -74,6 +76,9 @@ const DEPRECATED_OPTIONS = new Set([
7476 'bufferMaxEntries'
7577] ) ;
7678
79+ const kCancelled = Symbol ( 'cancelled' ) ;
80+ const kWaitQueue = Symbol ( 'waitQueue' ) ;
81+
7782/**
7883 * A container of server instances representing a connection to a MongoDB topology.
7984 *
@@ -140,6 +145,7 @@ class Topology extends EventEmitter {
140145 return result ;
141146 } , new Map ( ) ) ;
142147
148+ this [ kWaitQueue ] = new Denque ( ) ;
143149 this . s = {
144150 // the id of this topology
145151 id : topologyId ,
@@ -195,7 +201,6 @@ class Topology extends EventEmitter {
195201 clusterTime : null ,
196202
197203 // timer management
198- iterationTimers : new Set ( ) ,
199204 connectionTimers : new Set ( )
200205 } ;
201206
@@ -335,8 +340,7 @@ class Topology extends EventEmitter {
335340 return ;
336341 }
337342
338- // clear all existing monitor timers
339- drainTimerQueue ( this . s . iterationTimers ) ;
343+ drainWaitQueue ( this [ kWaitQueue ] , new MongoError ( 'Topology closed' ) ) ;
340344 drainTimerQueue ( this . s . connectionTimers ) ;
341345
342346 if ( this . s . srvPoller ) {
@@ -416,26 +420,43 @@ class Topology extends EventEmitter {
416420 const transaction = session && session . transaction ;
417421
418422 if ( isSharded && transaction && transaction . server ) {
419- callback ( null , transaction . server ) ;
423+ callback ( undefined , transaction . server ) ;
420424 return ;
421425 }
422426
423- selectServers (
424- this ,
425- selector ,
426- options . serverSelectionTimeoutMS ,
427- process . hrtime ( ) ,
428- ( err , servers ) => {
429- if ( err ) return callback ( err ) ;
430-
431- const selectedServer = randomSelection ( servers ) ;
432- if ( isSharded && transaction && transaction . isActive ) {
433- transaction . pinServer ( selectedServer ) ;
434- }
427+ // support server selection by options with readPreference
428+ let serverSelector = selector ;
429+ if ( typeof selector === 'object' ) {
430+ const readPreference = selector . readPreference
431+ ? selector . readPreference
432+ : ReadPreference . primary ;
435433
436- callback ( null , selectedServer ) ;
437- }
438- ) ;
434+ serverSelector = readPreferenceServerSelector ( readPreference ) ;
435+ }
436+
437+ const waitQueueMember = {
438+ serverSelector,
439+ transaction,
440+ callback
441+ } ;
442+
443+ const serverSelectionTimeoutMS = options . serverSelectionTimeoutMS ;
444+ if ( serverSelectionTimeoutMS ) {
445+ waitQueueMember . timer = setTimeout ( ( ) => {
446+ waitQueueMember [ kCancelled ] = true ;
447+ waitQueueMember . timer = undefined ;
448+ const timeoutError = new MongoServerSelectionError (
449+ `Server selection timed out after ${ serverSelectionTimeoutMS } ms` ,
450+ this . description
451+ ) ;
452+
453+ waitQueueMember . callback ( timeoutError ) ;
454+ } , serverSelectionTimeoutMS ) ;
455+ }
456+
457+ // place the member at the front of the wait queue
458+ this [ kWaitQueue ] . unshift ( waitQueueMember ) ;
459+ processWaitQueue ( this ) ;
439460 }
440461
441462 // Sessions related methods
@@ -545,6 +566,11 @@ class Topology extends EventEmitter {
545566 // update server list from updated descriptions
546567 updateServers ( this , serverDescription ) ;
547568
569+ // attempt to resolve any outstanding server selection attempts
570+ if ( this [ kWaitQueue ] . length > 0 ) {
571+ processWaitQueue ( this ) ;
572+ }
573+
548574 this . emit (
549575 'topologyDescriptionChanged' ,
550576 new events . TopologyDescriptionChangedEvent (
@@ -1012,6 +1038,64 @@ function srvPollingHandler(topology) {
10121038 } ;
10131039}
10141040
1041+ function drainWaitQueue ( queue , err ) {
1042+ while ( queue . length ) {
1043+ const waitQueueMember = queue . pop ( ) ;
1044+ clearTimeout ( waitQueueMember . timer ) ;
1045+ if ( ! waitQueueMember [ kCancelled ] ) {
1046+ waitQueueMember . callback ( err ) ;
1047+ }
1048+ }
1049+ }
1050+
1051+ function processWaitQueue ( topology ) {
1052+ if ( topology . s . state === STATE_CLOSED ) {
1053+ drainWaitQueue ( topology [ kWaitQueue ] , new MongoError ( 'Topology is closed, please connect' ) ) ;
1054+ return ;
1055+ }
1056+
1057+ const isSharded = topology . description . type === TopologyType . Sharded ;
1058+ const serverDescriptions = Array . from ( topology . description . servers . values ( ) ) ;
1059+ for ( let i = 0 ; i < topology [ kWaitQueue ] . length ; ++ i ) {
1060+ const waitQueueMember = topology [ kWaitQueue ] . shift ( ) ;
1061+ if ( waitQueueMember [ kCancelled ] ) {
1062+ continue ;
1063+ }
1064+
1065+ let selectedDescriptions ;
1066+ try {
1067+ const serverSelector = waitQueueMember . serverSelector ;
1068+ selectedDescriptions = serverSelector
1069+ ? serverSelector ( topology . description , serverDescriptions )
1070+ : serverDescriptions ;
1071+ } catch ( e ) {
1072+ clearTimeout ( waitQueueMember . timer ) ;
1073+ waitQueueMember . callback ( e ) ;
1074+ break ;
1075+ }
1076+
1077+ if ( selectedDescriptions . length === 0 ) {
1078+ topology [ kWaitQueue ] . push ( waitQueueMember ) ;
1079+ break ;
1080+ }
1081+
1082+ const selectedServerDescription = randomSelection ( selectedDescriptions ) ;
1083+ const selectedServer = topology . s . servers . get ( selectedServerDescription . address ) ;
1084+ const transaction = waitQueueMember . transaction ;
1085+ if ( isSharded && transaction && transaction . isActive ) {
1086+ transaction . pinServer ( selectedServer ) ;
1087+ }
1088+
1089+ clearTimeout ( waitQueueMember . timer ) ;
1090+ waitQueueMember . callback ( undefined , selectedServer ) ;
1091+ }
1092+
1093+ if ( topology [ kWaitQueue ] . length > 0 ) {
1094+ // ensure all server monitors attempt monitoring soon
1095+ topology . s . servers . forEach ( server => process . nextTick ( ( ) => server . requestCheck ( ) ) ) ;
1096+ }
1097+ }
1098+
10151099/**
10161100 * A server opening SDAM monitoring event
10171101 *
0 commit comments