@@ -20,16 +20,46 @@ export interface GraphQLPubsub {
2020  /** 
2121   * Publishes the given payload to all GraphQL subscriptions. 
2222   */ 
23-   publish : ( payload : {  data ?: Record < string ,  unknown >  } )  =>  void 
23+   publish : ( 
24+     payload : {  data ?: Record < string ,  unknown >  } , 
25+     predicate ?: ( args : { 
26+       subscription : GraphQLWebSocketSubscriptionWithId 
27+     } )  =>  boolean , 
28+   )  =>  void 
29+ } 
30+ 
31+ type  GraphQLWebSocketOutgoingMessage  = 
32+   |  { 
33+       type : 'connection_init' 
34+     } 
35+   |  { 
36+       type : 'subscribe' 
37+       id : string 
38+       payload : GraphQLWebSocketSubscription 
39+     } 
40+   |  { 
41+       type : 'complete' 
42+       id : string 
43+     } 
44+ 
45+ interface  GraphQLWebSocketSubscription  { 
46+   query : string 
47+   variables : Record < string ,  unknown > 
48+   extensions : Array < any > 
49+ } 
50+ 
51+ interface  GraphQLWebSocketSubscriptionWithId 
52+   extends  GraphQLWebSocketSubscription  { 
53+   id : string 
2454} 
2555
2656export  class  GraphQLInternalPubsub  { 
2757  public  pubsub : GraphQLPubsub 
2858  public  webSocketLink : WebSocketLink 
29-   private  subscriptions : Set < string > 
59+   private  subscriptions : Map < string ,   GraphQLWebSocketSubscriptionWithId > 
3060
3161  constructor ( public  readonly  url : Path )  { 
32-     this . subscriptions  =  new  Set ( ) 
62+     this . subscriptions  =  new  Map ( ) 
3363
3464    /** 
3565     * @fixme  This isn't nice. 
@@ -52,7 +82,7 @@ export class GraphQLInternalPubsub {
5282            return 
5383          } 
5484
55-           const  message  =  jsonParse ( event . data ) 
85+           const  message  =  jsonParse < GraphQLWebSocketOutgoingMessage > ( event . data ) 
5686
5787          if  ( ! message )  { 
5888            return 
@@ -65,7 +95,10 @@ export class GraphQLInternalPubsub {
6595            } 
6696
6797            case  'subscribe' : { 
68-               this . subscriptions . add ( message . id ) 
98+               this . subscriptions . set ( message . id ,  { 
99+                 ...message . payload , 
100+                 id : message . id , 
101+               } ) 
69102              break 
70103            } 
71104
@@ -80,14 +113,16 @@ export class GraphQLInternalPubsub {
80113
81114    this . pubsub  =  { 
82115      handler : webSocketHandler , 
83-       publish : ( payload )  =>  { 
84-         for  ( const  subscriptionId  of  this . subscriptions )  { 
85-           this . webSocketLink . broadcast ( 
86-             this . createSubscriptionMessage ( { 
87-               id : subscriptionId , 
88-               payload, 
89-             } ) , 
90-           ) 
116+       publish : ( payload ,  predicate  =  ( )  =>  true )  =>  { 
117+         for  ( const  [ ,  subscription ]  of  this . subscriptions )  { 
118+           if  ( predicate ( {  subscription } ) )  { 
119+             this . webSocketLink . broadcast ( 
120+               this . createSubscriptionMessage ( { 
121+                 id : subscription . id , 
122+                 payload, 
123+               } ) , 
124+             ) 
125+           } 
91126        } 
92127      } , 
93128    } 
@@ -110,30 +145,32 @@ export type GraphQLSubscriptionHandler = <
110145    |  GraphQLHandlerNameSelector 
111146    |  DocumentNode 
112147    |  TypedDocumentNode < Query ,  Variables > , 
113-   resolver : ( info : GraphQLSubscriptionHandlerInfo < Variables > )  =>  void , 
148+   resolver : ( info : GraphQLSubscriptionHandlerInfo < Query ,   Variables > )  =>  void , 
114149)  =>  WebSocketHandler 
115150
116151export  interface  GraphQLSubscriptionHandlerInfo < 
152+   Query  extends  GraphQLQuery , 
117153  Variables  extends  GraphQLVariables , 
118154>  { 
119155  operationName : string 
120156  query : string 
121157  variables : Variables 
158+   pubsub : GraphQLSubscriptionHandlerPubsub < Query > 
122159} 
123160
124161export  function  createGraphQLSubscriptionHandler ( 
125-   webSocketLink :  WebSocketLink , 
162+   internalPubsub :  GraphQLInternalPubsub , 
126163) : GraphQLSubscriptionHandler  { 
127164  return  ( operationName ,  resolver )  =>  { 
128-     const  webSocketHandler  =  webSocketLink . addEventListener ( 
165+     const  webSocketHandler  =  internalPubsub . webSocketLink . addEventListener ( 
129166      'connection' , 
130167      ( {  client } )  =>  { 
131168        client . addEventListener ( 'message' ,  async  ( event )  =>  { 
132169          if  ( typeof  event . data  !==  'string' )  { 
133170            return 
134171          } 
135172
136-           const  message  =  jsonParse ( event . data ) 
173+           const  message  =  jsonParse < GraphQLWebSocketOutgoingMessage > ( event . data ) 
137174
138175          if  ( 
139176            message  !=  null  && 
@@ -148,13 +185,19 @@ export function createGraphQLSubscriptionHandler(
148185              node . operationType  ===  OperationTypeNode . SUBSCRIPTION  && 
149186              node . operationName  ===  operationName 
150187            )  { 
188+               const  pubsub  =  new  GraphQLSubscriptionHandlerPubsub ( { 
189+                 internalPubsub, 
190+                 subscriptionId : message . id , 
191+               } ) 
192+ 
151193              /** 
152194               * @todo  Add the path parameters from the pubsub URL. 
153195               */ 
154196              resolver ( { 
155197                operationName : node . operationName , 
156198                query : message . payload . query , 
157-                 variables : message . payload . variables , 
199+                 variables : message . payload . variables  as  any , 
200+                 pubsub, 
158201              } ) 
159202            } 
160203          } 
@@ -165,3 +208,18 @@ export function createGraphQLSubscriptionHandler(
165208    return  webSocketHandler 
166209  } 
167210} 
211+ 
212+ class  GraphQLSubscriptionHandlerPubsub < Query  extends  GraphQLQuery >  { 
213+   constructor ( 
214+     private  readonly  args : { 
215+       internalPubsub : GraphQLInternalPubsub 
216+       subscriptionId : string 
217+     } , 
218+   )  { } 
219+ 
220+   public  publish ( payload : {  data ?: Query  } ) : void   { 
221+     this . args . internalPubsub . pubsub . publish ( payload ,  ( {  subscription } )  =>  { 
222+       return  subscription . id  ===  this . args . subscriptionId 
223+     } ) 
224+   } 
225+ } 
0 commit comments