diff --git a/include/cpp-statsd-client/UDPSender.hpp b/include/cpp-statsd-client/UDPSender.hpp index cdabc75..050c74e 100644 --- a/include/cpp-statsd-client/UDPSender.hpp +++ b/include/cpp-statsd-client/UDPSender.hpp @@ -246,8 +246,8 @@ inline void UDPSender::queueMessage(const std::string& message) noexcept { // We aquire a lock but only if we actually need to (i.e. there is a thread also accessing the queue) auto batchingLock = m_batchingThread.joinable() ? std::unique_lock(m_batchingMutex) : std::unique_lock(); - // Either we don't have a place to batch our message or we exceeded the batch size, so make a new batch - if (m_batchingMessageQueue.empty() || m_batchingMessageQueue.back().length() > m_batchsize) { + // Either we don't have a place to batch our message or we are about to exceed the batch size, so make a new batch + if (m_batchingMessageQueue.empty() || m_batchingMessageQueue.back().size() + message.size() > m_batchsize) { m_batchingMessageQueue.emplace_back(); m_batchingMessageQueue.back().reserve(m_batchsize + 256); } // When there is already a batch open we need a separator when its not empty diff --git a/tests/testStatsdClient.cpp b/tests/testStatsdClient.cpp index 22e0d6e..aa34101 100644 --- a/tests/testStatsdClient.cpp +++ b/tests/testStatsdClient.cpp @@ -9,17 +9,22 @@ using namespace Statsd; // Note that we could just synchronously recv metrics and not use a thread but doing the test async has the // advantage that we can test the threaded batching mode in a straightforward way. The server thread basically // just keeps storing metrics in an vector until it hears a special one signaling the test is over and bails -void mock(StatsdServer& server, std::vector& messages) { +void mock(StatsdServer& server, std::vector& messages, bool split) { do { // Grab the messages that are waiting auto recvd = server.receive(); + // If we don't want splitting, give the raw message though we still go + // through splitting to parse the quit message + if (!split) messages.emplace_back(recvd); + // Split the messages on '\n' auto start = std::string::npos; do { - // Keep this message + // Keep this message (unless we don't want splitting, then we + // do nothing because we already stored the whole batch) auto end = recvd.find('\n', ++start); - messages.emplace_back(recvd.substr(start, end)); + if (split) messages.emplace_back(recvd.substr(start, end)); start = end; // Bail if we found the special quit message @@ -53,6 +58,72 @@ void testErrorConditions() { throwOnError(client, false, "Should not be able to resolve a ridiculous ip"); } +void testOverflow() { + StatsdServer mock_server; + std::vector messages, expected; + std::thread server(mock, std::ref(mock_server), std::ref(messages), /*split=*/false); + + // Set a new config that has the client send messages to a proper address that can be resolved + StatsdClient client("localhost", 8125, /*prefix=*/"", /*batchsize=*/48, /*sendInterval=*/0, /*gaugePrecision=*/3); + throwOnError(client); + + // Send message that exhausts most of the batch limit (40/48) + client.increment("stat.timesWeUsed40CharsOfBufferSpace"); + throwOnError(client); + expected.emplace_back("stat.timesWeUsed40CharsOfBufferSpace:1|c"); + + // Send message that would go over the batch limit (50/48) + // The second message shouldn't fit the batch and must be part of the next batch + client.increment("stat.i"); + throwOnError(client); + expected.emplace_back("stat.i:1|c"); + + // Flush messages + client.flush(); + + // Send messages that would exactly fit in the batch, newlines included (48/48) + // Everything should be part of the same batch + expected.emplace_back(); + for (int idx = 0; idx < 5; idx++) { + client.increment("silly"); + throwOnError(client); + expected.back().append("silly:1|c\n"); + } + expected.back().pop_back(); + + // Flush messages + client.flush(); + + // With all messages flushed, send one giant message that would exceed the batch (50/48) + // The message should arrive intact in one batch + client.increment("stat.veryImportantIfIncrementedComputerExplode"); + throwOnError(client); + expected.emplace_back("stat.veryImportantIfIncrementedComputerExplode:1|c"); + + // Flush messages + client.flush(); + + // Signal the mock server we are done (and flush so that the signal is sent) + client.timing("DONE", 0); + client.flush(); + + // Wait for the server to stop + server.join(); + + // Make sure we get the exactly correct output + if (messages != expected) { + std::cerr << "Unexpected stats received by server, got:" << std::endl; + for (const auto& message : messages) { + std::cerr << "- " << message << std::endl; + } + std::cerr << std::endl << "But we expected:" << std::endl; + for (const auto& message : expected) { + std::cerr << "- " << message << std::endl; + } + throw std::runtime_error("Unexpected stats"); + } +} + void testReconfigure() { StatsdServer server; throwOnError(server); @@ -79,7 +150,7 @@ void testReconfigure() { void testSendRecv(uint64_t batchSize, uint64_t sendInterval) { StatsdServer mock_server; std::vector messages, expected; - std::thread server(mock, std::ref(mock_server), std::ref(messages)); + std::thread server(mock, std::ref(mock_server), std::ref(messages), /*split=*/true); // Set a new config that has the client send messages to a proper address that can be resolved StatsdClient client("localhost", 8125, "sendRecv.", batchSize, sendInterval, 3); @@ -176,6 +247,8 @@ int main() { // general things that should be errors testErrorConditions(); + // ensure batch size is respected + testOverflow(); // reconfiguring how you are sending testReconfigure(); // no batching