diff --git a/lscpp/CMakeLists.txt b/lscpp/CMakeLists.txt index 03ddc77..39679b2 100644 --- a/lscpp/CMakeLists.txt +++ b/lscpp/CMakeLists.txt @@ -7,7 +7,7 @@ if(CMAKE_INSTALL_PREFIX_INITIALIZED_TO_DEFAULT) endif() include(CTest) -set(CMAKE_CXX_STANDARD 17) +set(CMAKE_CXX_STANDARD 20) add_subdirectory(external/loguru) diff --git a/lscpp/include/lscpp/experimental/adl_lsp_server.h b/lscpp/include/lscpp/experimental/adl_lsp_server.h index 970a751..3605d94 100644 --- a/lscpp/include/lscpp/experimental/adl_lsp_server.h +++ b/lscpp/include/lscpp/experimental/adl_lsp_server.h @@ -29,6 +29,8 @@ #include "lscpp/protocol/TextDocumentPositionParams.h" #include "lscpp/transporter.h" #include "messages.h" +#include "threadsafe_queue.h" +#include namespace lscpp::experimental { @@ -326,31 +328,53 @@ class server_with_default_handler { } }; -template -void launch(Server &&server, transporter &&transporter_) { - - // Allows to attach a debugger, - // before the language server starts to communicate with the client. - // std::this_thread::sleep_for(std::chrono::seconds(config.startup_delay)); +inline void read_message(transporter &transporter_, + threadsafe_queue &q) { + while (true) { + auto header = parse_header(transporter_); - auto rcv = std::async(std::launch::async, [&]() { - while (true) { - auto header = parse_header(transporter_); - - if (header.content_length < - 0) { // TODO parse header should not use -1 to report an error - continue; - } - auto msg = transporter_.read_message(header.content_length); + if (header.content_length < + 0) { // TODO parse header should not use -1 to report an error + continue; + } + auto msg = transporter_.read_message(header.content_length); + q.push(msg); + } +} - auto result = lscpp_handle_message(server, msg); - if (result) { - write_lsp_message(transporter_, *result); - } +template +void work(std::stop_token t, threadsafe_queue &in_queue, + threadsafe_queue &out_queue, Server &server) { + while (!t.stop_requested()) { + auto msg = in_queue.wait_and_pop(t); + if (!msg) + return; + auto result = lscpp_handle_message(server, *msg); + if (result) { + out_queue.push(*result); } - }); + } +} + +inline void write_message(std::stop_token t, transporter &transporter_, + threadsafe_queue &q) { + while (!t.stop_requested()) { + auto msg = q.wait_and_pop(t); + if (msg) + write_lsp_message(transporter_, *msg); + } +} - rcv.wait(); +template +void launch(Server &&server, transporter &&transporter_) { + threadsafe_queue in_queue; + threadsafe_queue out_queue; + + std::jthread writer(write_message, std::ref(transporter_), + std::ref(out_queue)); + std::jthread worker(work, std::ref(in_queue), std::ref(out_queue), + std::ref(server)); + read_message(transporter_, in_queue); } } // namespace lscpp::experimental diff --git a/lscpp/include/lscpp/experimental/threadsafe_queue.h b/lscpp/include/lscpp/experimental/threadsafe_queue.h new file mode 100644 index 0000000..5981993 --- /dev/null +++ b/lscpp/include/lscpp/experimental/threadsafe_queue.h @@ -0,0 +1,29 @@ +#pragma once + +#include +#include +#include +#include +#include + +template class threadsafe_queue { + mutable std::mutex m; + std::queue data; + std::condition_variable_any cond; + +public: + void push(T const &obj) { + std::lock_guard lock(m); + data.push(std::move(obj)); + cond.notify_one(); + } + std::optional wait_and_pop(std::stop_token s) { + std::unique_lock lock(m); + cond.wait(lock, s, [this]() { return !data.empty(); }); + if (s.stop_requested()) + return {}; + auto res = data.front(); + data.pop(); + return res; + } +}; \ No newline at end of file