@@ -1275,20 +1275,6 @@ class OperationDefinitionVisitor
12751275 std::future<ResolverResult> _result;
12761276};
12771277
1278- SubscriptionData::SubscriptionData (std::shared_ptr<OperationData>&& data, SubscriptionName&& field,
1279- response::Value&& arguments, response::Value&& fieldDirectives, peg::ast&& query,
1280- std::string&& operationName, SubscriptionCallback&& callback, const peg::ast_node& selection)
1281- : data(std::move(data))
1282- , field(std::move(field))
1283- , arguments(std::move(arguments))
1284- , fieldDirectives(std::move(fieldDirectives))
1285- , query(std::move(query))
1286- , operationName(std::move(operationName))
1287- , callback(std::move(callback))
1288- , selection(selection)
1289- {
1290- }
1291-
12921278OperationDefinitionVisitor::OperationDefinitionVisitor (ResolverContext resolverContext,
12931279 std::launch launch, std::shared_ptr<RequestState> state, const TypeMap& operations,
12941280 response::Value&& variables, FragmentMap&& fragments)
@@ -1388,6 +1374,20 @@ void OperationDefinitionVisitor::visit(
13881374 std::cref (*operationDefinition.children .back ()));
13891375}
13901376
1377+ SubscriptionData::SubscriptionData (std::shared_ptr<OperationData>&& data, SubscriptionName&& field,
1378+ response::Value&& arguments, response::Value&& fieldDirectives, peg::ast&& query,
1379+ std::string&& operationName, SubscriptionCallback&& callback, const peg::ast_node& selection)
1380+ : data(std::move(data))
1381+ , field(std::move(field))
1382+ , arguments(std::move(arguments))
1383+ , fieldDirectives(std::move(fieldDirectives))
1384+ , query(std::move(query))
1385+ , operationName(std::move(operationName))
1386+ , callback(std::move(callback))
1387+ , selection(selection)
1388+ {
1389+ }
1390+
13911391// SubscriptionDefinitionVisitor visits the AST collects the fields referenced in the subscription
13921392// at the point where we create a subscription.
13931393class SubscriptionDefinitionVisitor
@@ -1628,6 +1628,8 @@ std::list<schema_error> Request::validate(peg::ast& query) const
16281628
16291629 if (!query.validated )
16301630 {
1631+ const std::lock_guard lock { _validationMutex };
1632+
16311633 _validation->visit (*query.root );
16321634 errors = _validation->getStructuredErrors ();
16331635 query.validated = errors.empty ();
@@ -1929,7 +1931,8 @@ std::future<response::Value> Request::resolveUnvalidated(std::launch launch,
19291931 }
19301932}
19311933
1932- SubscriptionKey Request::subscribe (SubscriptionParams&& params, SubscriptionCallback&& callback)
1934+ SubscriptionKey Request::addSubscription (
1935+ SubscriptionParams&& params, SubscriptionCallback&& callback)
19331936{
19341937 auto errors = validate (params.query );
19351938
@@ -1998,20 +2001,29 @@ SubscriptionKey Request::subscribe(SubscriptionParams&& params, SubscriptionCall
19982001 return key;
19992002}
20002003
2004+ SubscriptionKey Request::subscribe (SubscriptionParams&& params, SubscriptionCallback&& callback)
2005+ {
2006+ std::lock_guard lock { _subscriptionMutex };
2007+
2008+ return addSubscription (std::move (params), std::move (callback));
2009+ }
2010+
20012011std::future<SubscriptionKey> Request::subscribe (
20022012 std::launch launch, SubscriptionParams&& params, SubscriptionCallback&& callback)
20032013{
20042014 return std::async (
20052015 launch,
20062016 [spThis = shared_from_this (), launch](SubscriptionParams&& paramsFuture,
20072017 SubscriptionCallback&& callbackFuture) {
2008- const auto key = spThis->subscribe (std::move (paramsFuture), std::move (callbackFuture));
2018+ std::unique_lock lock { spThis->_subscriptionMutex };
2019+ const auto key =
2020+ spThis->addSubscription (std::move (paramsFuture), std::move (callbackFuture));
20092021 const auto itrOperation = spThis->_operations .find (strSubscription);
20102022
20112023 if (itrOperation != spThis->_operations .end ())
20122024 {
2013- const auto & operation = itrOperation->second ;
2014- const auto & registration = spThis->_subscriptions .at (key);
2025+ const auto operation = itrOperation->second ;
2026+ const auto registration = spThis->_subscriptions .at (key);
20152027 response::Value emptyFragmentDirectives (response::Type::Map);
20162028 const SelectionSetParams selectionSetParams {
20172029 ResolverContext::NotifySubscribe,
@@ -2024,6 +2036,8 @@ std::future<SubscriptionKey> Request::subscribe(
20242036 launch,
20252037 };
20262038
2039+ lock.unlock ();
2040+
20272041 try
20282042 {
20292043 operation
@@ -2035,8 +2049,10 @@ std::future<SubscriptionKey> Request::subscribe(
20352049 }
20362050 catch (const std::exception& ex)
20372051 {
2052+ lock.lock ();
2053+
20382054 // Rethrow the exception, but don't leave it subscribed if the resolver failed.
2039- spThis->unsubscribe (key);
2055+ spThis->removeSubscription (key);
20402056 throw ex;
20412057 }
20422058 }
@@ -2047,7 +2063,7 @@ std::future<SubscriptionKey> Request::subscribe(
20472063 std::move (callback));
20482064}
20492065
2050- void Request::unsubscribe (SubscriptionKey key)
2066+ void Request::removeSubscription (SubscriptionKey key)
20512067{
20522068 auto itrSubscription = _subscriptions.find (key);
20532069
@@ -2077,15 +2093,23 @@ void Request::unsubscribe(SubscriptionKey key)
20772093 }
20782094}
20792095
2096+ void Request::unsubscribe (SubscriptionKey key)
2097+ {
2098+ std::lock_guard lock { _subscriptionMutex };
2099+
2100+ removeSubscription (key);
2101+ }
2102+
20802103std::future<void > Request::unsubscribe (std::launch launch, SubscriptionKey key)
20812104{
20822105 return std::async (launch, [spThis = shared_from_this (), launch, key]() {
2106+ std::unique_lock lock { spThis->_subscriptionMutex };
20832107 const auto itrOperation = spThis->_operations .find (strSubscription);
20842108
20852109 if (itrOperation != spThis->_operations .end ())
20862110 {
2087- const auto & operation = itrOperation->second ;
2088- const auto & registration = spThis->_subscriptions .at (key);
2111+ const auto operation = itrOperation->second ;
2112+ const auto registration = spThis->_subscriptions .at (key);
20892113 response::Value emptyFragmentDirectives (response::Type::Map);
20902114 const SelectionSetParams selectionSetParams {
20912115 ResolverContext::NotifyUnsubscribe,
@@ -2098,15 +2122,19 @@ std::future<void> Request::unsubscribe(std::launch launch, SubscriptionKey key)
20982122 launch,
20992123 };
21002124
2125+ lock.unlock ();
2126+
21012127 operation
21022128 ->resolve (selectionSetParams,
21032129 registration->selection ,
21042130 registration->data ->fragments ,
21052131 registration->data ->variables )
21062132 .get ();
2133+
2134+ lock.lock ();
21072135 }
21082136
2109- spThis->unsubscribe (key);
2137+ spThis->removeSubscription (key);
21102138 });
21112139}
21122140
@@ -2215,6 +2243,7 @@ void Request::deliver(std::launch launch, const SubscriptionName& name,
22152243 throw std::invalid_argument (" Missing subscriptionObject" );
22162244 }
22172245
2246+ std::unique_lock lock { _subscriptionMutex };
22182247 auto itrListeners = _listeners.find (name);
22192248
22202249 if (itrListeners == _listeners.end ())
@@ -2323,6 +2352,8 @@ void Request::deliver(std::launch launch, const SubscriptionName& name,
23232352 std::move (result)));
23242353 }
23252354
2355+ lock.unlock ();
2356+
23262357 for (auto & callback : callbacks)
23272358 {
23282359 callback.get ();
0 commit comments