Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions include/cpp-statsd-client/UDPSender.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,8 @@ inline void UDPSender::queueMessage(const std::string& message) noexcept {
// We aquire a lock but only if we actually need to (i.e. there is a thread also accessing the queue)
auto batchingLock =
m_batchingThread.joinable() ? std::unique_lock<std::mutex>(m_batchingMutex) : std::unique_lock<std::mutex>();
// Either we don't have a place to batch our message or we exceeded the batch size, so make a new batch
if (m_batchingMessageQueue.empty() || m_batchingMessageQueue.back().length() > m_batchsize) {
// Either we don't have a place to batch our message or we are about to exceed the batch size, so make a new batch
if (m_batchingMessageQueue.empty() || m_batchingMessageQueue.back().size() + message.size() > m_batchsize) {
m_batchingMessageQueue.emplace_back();
m_batchingMessageQueue.back().reserve(m_batchsize + 256);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we really do want a hard limit we should add some fuzz to the batch size for overages:

Suggested change
m_batchingMessageQueue.back().reserve(m_batchsize + 256);
m_batchingMessageQueue.back().reserve(m_batchsize);

} // When there is already a batch open we need a separator when its not empty
Expand Down
81 changes: 77 additions & 4 deletions tests/testStatsdClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,22 @@ using namespace Statsd;
// Note that we could just synchronously recv metrics and not use a thread but doing the test async has the
// advantage that we can test the threaded batching mode in a straightforward way. The server thread basically
// just keeps storing metrics in an vector until it hears a special one signaling the test is over and bails
void mock(StatsdServer& server, std::vector<std::string>& messages) {
void mock(StatsdServer& server, std::vector<std::string>& messages, bool split) {
do {
// Grab the messages that are waiting
auto recvd = server.receive();

// If we don't want splitting, give the raw message though we still go
// through splitting to parse the quit message
if (!split) messages.emplace_back(recvd);

// Split the messages on '\n'
auto start = std::string::npos;
do {
// Keep this message
// Keep this message (unless we don't want splitting, then we
// do nothing because we already stored the whole batch)
auto end = recvd.find('\n', ++start);
messages.emplace_back(recvd.substr(start, end));
if (split) messages.emplace_back(recvd.substr(start, end));
start = end;

// Bail if we found the special quit message
Expand Down Expand Up @@ -53,6 +58,72 @@ void testErrorConditions() {
throwOnError(client, false, "Should not be able to resolve a ridiculous ip");
}

void testOverflow() {
StatsdServer mock_server;
std::vector<std::string> messages, expected;
std::thread server(mock, std::ref(mock_server), std::ref(messages), /*split=*/false);

// Set a new config that has the client send messages to a proper address that can be resolved
StatsdClient client("localhost", 8125, /*prefix=*/"", /*batchsize=*/48, /*sendInterval=*/0, /*gaugePrecision=*/3);
throwOnError(client);

// Send message that exhausts most of the batch limit (40/48)
client.increment("stat.timesWeUsed40CharsOfBufferSpace");
throwOnError(client);
expected.emplace_back("stat.timesWeUsed40CharsOfBufferSpace:1|c");

// Send message that would go over the batch limit (50/48)
// The second message shouldn't fit the batch and must be part of the next batch
client.increment("stat.i");
throwOnError(client);
expected.emplace_back("stat.i:1|c");

// Flush messages
client.flush();

// Send messages that would exactly fit in the batch, newlines included (48/48)
// Everything should be part of the same batch
expected.emplace_back();
for (int idx = 0; idx < 5; idx++) {
client.increment("silly");
throwOnError(client);
expected.back().append("silly:1|c\n");
}
expected.back().pop_back();

// Flush messages
client.flush();

// With all messages flushed, send one giant message that would exceed the batch (50/48)
// The message should arrive intact in one batch
client.increment("stat.veryImportantIfIncrementedComputerExplode");
throwOnError(client);
expected.emplace_back("stat.veryImportantIfIncrementedComputerExplode:1|c");

// Flush messages
client.flush();

// Signal the mock server we are done (and flush so that the signal is sent)
client.timing("DONE", 0);
client.flush();

// Wait for the server to stop
server.join();

// Make sure we get the exactly correct output
if (messages != expected) {
std::cerr << "Unexpected stats received by server, got:" << std::endl;
for (const auto& message : messages) {
std::cerr << "- " << message << std::endl;
}
std::cerr << std::endl << "But we expected:" << std::endl;
for (const auto& message : expected) {
std::cerr << "- " << message << std::endl;
}
throw std::runtime_error("Unexpected stats");
}
}

void testReconfigure() {
StatsdServer server;
throwOnError(server);
Expand All @@ -79,7 +150,7 @@ void testReconfigure() {
void testSendRecv(uint64_t batchSize, uint64_t sendInterval) {
StatsdServer mock_server;
std::vector<std::string> messages, expected;
std::thread server(mock, std::ref(mock_server), std::ref(messages));
std::thread server(mock, std::ref(mock_server), std::ref(messages), /*split=*/true);

// Set a new config that has the client send messages to a proper address that can be resolved
StatsdClient client("localhost", 8125, "sendRecv.", batchSize, sendInterval, 3);
Expand Down Expand Up @@ -176,6 +247,8 @@ int main() {

// general things that should be errors
testErrorConditions();
// ensure batch size is respected
testOverflow();
// reconfiguring how you are sending
testReconfigure();
// no batching
Expand Down
Loading