diff --git a/packages/feedsim/install_feedsim_dlrm.sh b/packages/feedsim/install_feedsim_dlrm.sh new file mode 100755 index 00000000..0bbec843 --- /dev/null +++ b/packages/feedsim/install_feedsim_dlrm.sh @@ -0,0 +1,123 @@ +#!/bin/bash +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# Install FeedSim with DLRM support +# This script wraps the standard install_feedsim.sh and adds LibTorch + +set -Eeuo pipefail + +# Constants +FEEDSIM_ROOT=$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd -P) +BENCHPRESS_ROOT="$(readlink -f "$FEEDSIM_ROOT/../..")" +FEEDSIM_ROOT_SRC="${BENCHPRESS_ROOT}/benchmarks/feedsim" +FEEDSIM_THIRD_PARTY_SRC="${FEEDSIM_ROOT_SRC}/third_party" +LIBTORCH_VERSION="2.1.0" + +echo "=== FeedSim DLRM Installation Script ===" +echo "BENCHPRESS_ROOT is ${BENCHPRESS_ROOT}" + +msg() { + echo >&2 -e "${1-}" +} + +die() { + local msg=$1 + local code=${2-1} + msg "$msg" + exit "$code" +} + +# First run the standard installation +msg "Step 1: Running standard FeedSim installation..." +"${FEEDSIM_ROOT}/install_feedsim.sh" + +# Now install LibTorch +msg "" +msg "Step 2: Installing LibTorch for DLRM support..." + +cd "${FEEDSIM_THIRD_PARTY_SRC}" + +ARCH="$(uname -m)" +if [ "$ARCH" = "x86_64" ]; then + LIBTORCH_URL="https://download.pytorch.org/libtorch/cpu/libtorch-cxx11-abi-shared-with-deps-${LIBTORCH_VERSION}%2Bcpu.zip" +elif [ "$ARCH" = "aarch64" ]; then + # For ARM, we need to build from source or use a different approach + msg "WARNING: Pre-built LibTorch for ARM64 may not be available." + msg "Attempting to download CPU version..." + LIBTORCH_URL="https://download.pytorch.org/libtorch/cpu/libtorch-cxx11-abi-shared-with-deps-${LIBTORCH_VERSION}%2Bcpu.zip" +else + die "Unsupported architecture: ${ARCH}" +fi + +if ! [ -d "libtorch" ]; then + msg "Downloading LibTorch ${LIBTORCH_VERSION}..." + wget "${LIBTORCH_URL}" -O libtorch.zip + msg "Extracting LibTorch..." + unzip -q libtorch.zip + rm libtorch.zip + msg "LibTorch installed to ${FEEDSIM_THIRD_PARTY_SRC}/libtorch" +else + msg "[SKIPPED] LibTorch already installed" +fi + +# Copy the DLRM model if it exists +DLRM_MODEL_SRC="/home/wsu/feedsim_v2/models/dlrm_small.pt" +DLRM_MODEL_DST="${FEEDSIM_ROOT_SRC}/models/dlrm_small.pt" + +if [ -f "$DLRM_MODEL_SRC" ]; then + msg "Copying DLRM model..." + mkdir -p "${FEEDSIM_ROOT_SRC}/models" + cp "$DLRM_MODEL_SRC" "$DLRM_MODEL_DST" + msg "DLRM model copied to ${DLRM_MODEL_DST}" +else + msg "WARNING: DLRM model not found at ${DLRM_MODEL_SRC}" + msg "You will need to provide the model path when running FeedSim with DLRM" +fi + +# Rebuild FeedSim with DLRM support +msg "" +msg "Step 3: Rebuilding FeedSim with DLRM support..." + +cd "${FEEDSIM_ROOT_SRC}/src" + +# Remove old build to ensure clean rebuild with DLRM +if [ -d "build" ]; then + msg "Removing old build directory..." + rm -rf build +fi + +mkdir -p build && cd build/ + +# Build FeedSim with DLRM enabled +FS_CFLAGS="${BP_CFLAGS:--O3 -DNDEBUG}" +FS_CXXFLAGS="${BP_CXXFLAGS:--O3 -DNDEBUG}" +FS_LDFLAGS="${BP_LDFLAGS:-} -latomic -Wl,--export-dynamic" + +export PATH="${FEEDSIM_THIRD_PARTY_SRC}/cmake-4.0.3/staging/bin:${PATH}" + +msg "Configuring with CMake (DLRM enabled)..." +cmake -G Ninja \ + -DCMAKE_BUILD_TYPE=Release \ + -DCMAKE_C_COMPILER="${BP_CC:-gcc}" \ + -DCMAKE_CXX_COMPILER="${BP_CXX:-g++}" \ + -DCMAKE_C_FLAGS_RELEASE="$FS_CFLAGS" \ + -DCMAKE_CXX_FLAGS_RELEASE="$FS_CXXFLAGS -DFMT_HEADER_ONLY=1" \ + -DCMAKE_EXE_LINKER_FLAGS_RELEASE="$FS_LDFLAGS" \ + -DFEEDSIM_USE_DLRM=ON \ + -DTorch_DIR="${FEEDSIM_THIRD_PARTY_SRC}/libtorch/share/cmake/Torch" \ + -DCMAKE_PREFIX_PATH="${FEEDSIM_THIRD_PARTY_SRC}/libtorch" \ + ../ + +msg "Building FeedSim with DLRM..." +ninja -v + +msg "" +msg "=== FeedSim DLRM Installation Complete ===" +msg "" +msg "To run FeedSim with DLRM workload:" +msg " cd ${FEEDSIM_ROOT_SRC}" +msg " ./run.sh -W dlrm -M ${DLRM_MODEL_DST}" +msg "" +msg "Or use the standard PageRank workload:" +msg " ./run.sh" +msg "" diff --git a/packages/feedsim/run.sh b/packages/feedsim/run.sh index 3564c9aa..08c15d34 100644 --- a/packages/feedsim/run.sh +++ b/packages/feedsim/run.sh @@ -81,6 +81,17 @@ Usage: ${0##*/} [OPTION]... -R Seed for LeafNodeRank random number generator. If not provided, current time will be used. -P Seed for PageRank random number generator. If not provided, current time will be used. -C Seed for PointerChase random number generator. If not provided, current time will be used. + --workload Workload type: 'pagerank' (default) or 'dlrm'. Requires DLRM-enabled build. + --dlrm-model Path to DLRM TorchScript model file (.pt). Required when --workload=dlrm is used. + --dlrm-batch-size DLRM batch size for inference. Default: 256 + --dlrm-inferences Number of DLRM inference calls per request. Default: 1 + --dlrm-threads Number of LibTorch threads for DLRM inference. Default: 8 + --async-io Enable async (non-blocking) I/O mode. Eliminates thread starvation on high-core CPUs. + --io-dist I/O latency distribution: 'fixed' (default), 'exponential', or 'lognormal'. + --io-mean Mean I/O latency in milliseconds. Default: 200 + --io-stddev I/O latency standard deviation in ms (for lognormal distribution). Default: 50 + --io-stages Number of I/O stages to simulate (models multi-hop data fetching). Default: 1 + --io-stage-latency Latency per I/O stage in ms (when --io-stages > 1). Default: 50 EOF } @@ -169,6 +180,41 @@ main() { local pointerchase_seed pointerchase_seed="" + # DLRM options + local workload_type + workload_type="pagerank" + + local dlrm_model_path + dlrm_model_path="" + + local dlrm_batch_size + dlrm_batch_size="256" + + local dlrm_inferences_per_request + dlrm_inferences_per_request="1" + + local dlrm_threads + dlrm_threads="8" + + # Phase 3: Async I/O options + local async_io + async_io="" + + local io_latency_distribution + io_latency_distribution="fixed" + + local io_latency_mean_ms + io_latency_mean_ms="200" + + local io_latency_stddev_ms + io_latency_stddev_ms="50" + + local io_stages + io_stages="1" + + local io_stage_latency_ms + io_stage_latency_ms="50" + if [ -z "$IS_AUTOSCALE_RUN" ]; then echo > $BREPS_LFILE fi @@ -178,46 +224,58 @@ main() { case $1 in -t) thrift_threads="$2" + shift ;; -c) ranking_cpu_threads="$2" + shift ;; -s) srv_io_threads="$2" + shift ;; -l) driver_threads="$2" + shift ;; -a) auto_driver_threads="1" ;; -q) fixed_qps="$2" + shift ;; -d) fixed_qps_duration="$2" + shift ;; -w) warmup_time="$2" + shift ;; -p) port="$2" + shift ;; -o) result_filename="$2" + shift ;; -i) icache_iterations="$2" + shift ;; -S) if [ "$2" != "default_do_not_store" ]; then store_graph="--store_graph=$2" fi + shift ;; -L) if [ "$2" != "default_do_not_load" ]; then load_graph="--load_graph=$2" fi + shift ;; -I) instrument_graph="--instrument_graph" @@ -226,26 +284,105 @@ main() { if [[ "$2" -gt 0 ]]; then qps_threshold="$2" fi + shift ;; -x) if [[ "$2" -gt 0 ]]; then max_warmup_iterations="$2" fi + shift ;; -N) no_retry_mode="1" ;; -D) queue_drain_time="$2" + shift ;; -R) leafnoderank_seed="--node_rank_seed=$2" + shift ;; -P) pagerank_seed="--page_rank_seed=$2" + shift ;; -C) pointerchase_seed="--pointer_chase_seed=$2" + shift + ;; + --workload) + workload_type="$2" + shift + ;; + --workload=*) + workload_type="${1#*=}" + ;; + --dlrm-model) + dlrm_model_path="$2" + shift + ;; + --dlrm-model=*) + dlrm_model_path="${1#*=}" + ;; + --dlrm-batch-size) + dlrm_batch_size="$2" + shift + ;; + --dlrm-batch-size=*) + dlrm_batch_size="${1#*=}" + ;; + --dlrm-inferences) + dlrm_inferences_per_request="$2" + shift + ;; + --dlrm-inferences=*) + dlrm_inferences_per_request="${1#*=}" + ;; + --dlrm-threads) + dlrm_threads="$2" + shift + ;; + --dlrm-threads=*) + dlrm_threads="${1#*=}" + ;; + --async-io) + async_io="1" + ;; + --io-dist) + io_latency_distribution="$2" + shift + ;; + --io-dist=*) + io_latency_distribution="${1#*=}" + ;; + --io-mean) + io_latency_mean_ms="$2" + shift + ;; + --io-mean=*) + io_latency_mean_ms="${1#*=}" + ;; + --io-stddev) + io_latency_stddev_ms="$2" + shift + ;; + --io-stddev=*) + io_latency_stddev_ms="${1#*=}" + ;; + --io-stages) + io_stages="$2" + shift + ;; + --io-stages=*) + io_stages="${1#*=}" + ;; + --io-stage-latency) + io_stage_latency_ms="$2" + shift + ;; + --io-stage-latency=*) + io_stage_latency_ms="${1#*=}" ;; -h|--help) show_help >&2 @@ -255,19 +392,16 @@ main() { echo "Unsupported arg '$1'" 1>&2 break esac - - case $1 in - -t|-c|-s|-d|-p|-q|-o|-w|-i|-l|-S|-L|-r|-x|-D|-R|-P|-C) - if [ -z "$2" ]; then - echo "Invalid option: '$1' requires an argument" 1>&2 - exit 1 - fi - shift # Additional shift for the argument - ;; - esac shift # pop the previously read argument done + # Validate long option arguments + if [ "$async_io" = "1" ]; then + if [ "$io_latency_distribution" != "fixed" ] && [ "$io_latency_distribution" != "exponential" ] && [ "$io_latency_distribution" != "lognormal" ]; then + die "Invalid --io-dist value '$io_latency_distribution'. Must be 'fixed', 'exponential', or 'lognormal'." + fi + fi + create_breakdown_csv "$BREAKDOWN_FOLDER" set -u # Enable unbound variables check from here onwards @@ -279,8 +413,42 @@ main() { cd "${FEEDSIM_ROOT_SRC}" + # Build DLRM options if workload type is dlrm + local dlrm_opts="" + if [ "$workload_type" = "dlrm" ]; then + if [ -z "$dlrm_model_path" ]; then + die "DLRM workload requires --dlrm-model to specify the TorchScript model" + fi + dlrm_opts="--workload_type=dlrm --dlrm_model_path=$dlrm_model_path --dlrm_batch_size=$dlrm_batch_size --dlrm_inferences_per_request=$dlrm_inferences_per_request --dlrm_threads=$dlrm_threads" + echo "Using DLRM workload with model: $dlrm_model_path" + + # Set LD_LIBRARY_PATH for LibTorch if needed + if [ -d "${FEEDSIM_ROOT}/third_party/libtorch/lib" ]; then + export LD_LIBRARY_PATH="${FEEDSIM_ROOT}/third_party/libtorch/lib:${LD_LIBRARY_PATH:-}" + fi + else + dlrm_opts="--workload_type=pagerank" + echo "Using PageRank workload" + fi + + # Build async I/O options (Phase 3) + local async_io_opts="" + if [ "$async_io" = "1" ]; then + async_io_opts="--async_io --io_latency_distribution=$io_latency_distribution --io_latency_mean_ms=$io_latency_mean_ms --io_latency_stddev_ms=$io_latency_stddev_ms --io_stages=$io_stages --io_stage_latency_ms=$io_stage_latency_ms" + echo "Using ASYNC I/O mode (non-blocking) - eliminates thread starvation" + echo " I/O latency distribution: $io_latency_distribution" + echo " I/O latency mean: ${io_latency_mean_ms}ms" + if [ "$io_latency_distribution" = "lognormal" ]; then + echo " I/O latency stddev: ${io_latency_stddev_ms}ms" + fi + if [ "$io_stages" -gt 1 ]; then + echo " I/O stages: $io_stages x ${io_stage_latency_ms}ms" + fi + fi + # Starting leaf node service monitor_port=$((port-1000)) + # shellcheck disable=SC2086 MALLOC_CONF=narenas:20,dirty_decay_ms:5000 build/workloads/ranking/LeafNodeRank \ --port="$port" \ --monitor_port="$monitor_port" \ @@ -296,12 +464,14 @@ main() { --graph_max_iters=1 \ --noaffinity \ --min_icache_iterations="$icache_iterations" \ - "$store_graph" \ - "$load_graph" \ - "$instrument_graph" \ - "$leafnoderank_seed" \ - "$pagerank_seed" \ - "$pointerchase_seed" >> $BREPS_LFILE 2>&1 & + $dlrm_opts \ + $async_io_opts \ + $store_graph \ + $load_graph \ + $instrument_graph \ + $leafnoderank_seed \ + $pagerank_seed \ + $pointerchase_seed >> $BREPS_LFILE 2>&1 & LEAF_PID=$! diff --git a/packages/feedsim/third_party/src/scripts/search_qps.sh b/packages/feedsim/third_party/src/scripts/search_qps.sh index e8761d08..4f4f82de 100755 --- a/packages/feedsim/third_party/src/scripts/search_qps.sh +++ b/packages/feedsim/third_party/src/scripts/search_qps.sh @@ -471,7 +471,7 @@ if [[ -n "$fixed_qps" ]]; then printf "final requested_qps = %.2f, measured_qps = %.2f, latency = %.2f\n" $fixed_qps_el $measured_qps $measured_latency echo "final requested_qps = $fixed_qps_el, measured_qps = $measured_qps, latency = $measured_latency" >> $BREPS_LFILE benchreps_tell_state "after fixed_qps_iter $fixed_qps_el" - sleep 7 # wait between iterations + sleep 15 # wait between iterations (increased from 7 to allow async requests to drain) done fi exit 0 diff --git a/packages/feedsim/third_party/src/workloads/ranking/CMakeLists.txt b/packages/feedsim/third_party/src/workloads/ranking/CMakeLists.txt index 73e43ce3..832280b0 100644 --- a/packages/feedsim/third_party/src/workloads/ranking/CMakeLists.txt +++ b/packages/feedsim/third_party/src/workloads/ranking/CMakeLists.txt @@ -98,6 +98,32 @@ target_link_libraries(rankingDwarfs ) target_compile_options(rankingDwarfs PRIVATE -fvisibility=hidden) +# DLRM support (optional, requires LibTorch) +option(FEEDSIM_USE_DLRM "Enable DLRM workload with LibTorch" OFF) + +if(FEEDSIM_USE_DLRM) + find_package(Torch REQUIRED) + message(STATUS "DLRM enabled with LibTorch: ${TORCH_LIBRARIES}") + + add_library(rankingDLRM STATIC + dwarfs/dlrm.cpp + dwarfs/dlrm.h + ) + target_compile_definitions(rankingDLRM PUBLIC FEEDSIM_USE_DLRM) + target_include_directories(rankingDLRM + PUBLIC + dwarfs/ + ${CMAKE_CURRENT_SOURCE_DIR}/../../ + ${CMAKE_CURRENT_SOURCE_DIR}/../../../ + ${TORCH_INCLUDE_DIRS} + ) + target_link_libraries(rankingDLRM + PUBLIC + ${TORCH_LIBRARIES} + ) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${TORCH_CXX_FLAGS}") +endif() + # Generate getopts for LeafNodeRank add_custom_command( @@ -139,21 +165,33 @@ target_include_directories(LeafNodeRank ${FBTHRIFT_INCLUDE_DIR} ) +# Base LeafNodeRank libraries +set(LEAF_NODE_RANK_LIBS + OLDISim::OLDISim + leafNodeRankcmdline + rankingDwarfs + icachebuster + PointerChaser + ${FOLLY_LIBRARIES} + ${IBERTY_LIBRARIES} + ranking-cpp2 + rankingThriftDataGen + ${CMAKE_DL_LIBS} + glog::glog + ${DOUBLE_CONVERSION_LIBRARY} + ${FBTHRIFT_LIBRARIES} +) + +# Add DLRM library if enabled +if(FEEDSIM_USE_DLRM) + list(APPEND LEAF_NODE_RANK_LIBS rankingDLRM) + target_compile_definitions(LeafNodeRank PRIVATE FEEDSIM_USE_DLRM) + target_include_directories(LeafNodeRank PRIVATE ${TORCH_INCLUDE_DIRS}) +endif() + target_link_libraries(LeafNodeRank PRIVATE - OLDISim::OLDISim - leafNodeRankcmdline - rankingDwarfs - icachebuster - PointerChaser - ${FOLLY_LIBRARIES} - ${IBERTY_LIBRARIES} - ranking-cpp2 - rankingThriftDataGen - ${CMAKE_DL_LIBS} - glog::glog - ${DOUBLE_CONVERSION_LIBRARY} - ${FBTHRIFT_LIBRARIES} + ${LEAF_NODE_RANK_LIBS} PUBLIC Threads::Threads ZLIB::ZLIB diff --git a/packages/feedsim/third_party/src/workloads/ranking/DriverNodeRank.cc b/packages/feedsim/third_party/src/workloads/ranking/DriverNodeRank.cc index 3769f3e5..96abbb50 100644 --- a/packages/feedsim/third_party/src/workloads/ranking/DriverNodeRank.cc +++ b/packages/feedsim/third_party/src/workloads/ranking/DriverNodeRank.cc @@ -31,7 +31,7 @@ static gengetopt_args_info args; const int kMaxRequestSize = 8192; -const int kRecomputeQPSPeriod = 5; +const int kRecomputeQPSPeriod = 1; // Reduced from 5 to 1 second for faster feedback struct ThreadData { std::string random_string; @@ -65,12 +65,36 @@ void RecomputeDelayTimerHandler(evutil_socket_t listener, int16_t flags, this_thread->test_driver->GetConnectionStats(); // Get QPS for last stats period - double qps = static_cast( - stats.query_counts_.at(ranking::kPageRankRequestType)) / - (stats.end_time_ - stats.start_time_) * 1000000000; + double elapsed_secs = (stats.end_time_ - stats.start_time_) / 1000000000.0; + if (elapsed_secs <= 0) { + AddRecomputeDelayTimer(*this_thread); + return; + } + + double measured_qps = static_cast( + stats.query_counts_.at(ranking::kPageRankRequestType)) / elapsed_secs; + + // Compute target delay in microseconds + double target_delay_us = 1000000.0 / this_thread->qps_per_thread; + + // Adjust delay using proportional feedback control + // If measured_qps > target: increase delay to slow down + // If measured_qps < target: decrease delay to speed up + // Use a damping factor (0.5) to prevent oscillations + double qps_ratio = measured_qps / this_thread->qps_per_thread; + double damping = 0.5; + double adjustment = 1.0 + damping * (qps_ratio - 1.0); - // Adjust delay based on QPS - this_thread->request_delay = (1000000 / this_thread->qps_per_thread) * (qps / this_thread->qps_per_thread); + // Clamp adjustment to prevent extreme values + if (adjustment < 0.5) adjustment = 0.5; + if (adjustment > 2.0) adjustment = 2.0; + + this_thread->request_delay = static_cast(target_delay_us * adjustment); + + // Ensure minimum delay to prevent flooding + if (this_thread->request_delay < 100) { + this_thread->request_delay = 100; // 100us minimum = 10000 QPS max per thread + } AddRecomputeDelayTimer(*this_thread); } diff --git a/packages/feedsim/third_party/src/workloads/ranking/LeafNodeRank.cc b/packages/feedsim/third_party/src/workloads/ranking/LeafNodeRank.cc index 1bb24ee9..712eda9b 100644 --- a/packages/feedsim/third_party/src/workloads/ranking/LeafNodeRank.cc +++ b/packages/feedsim/third_party/src/workloads/ranking/LeafNodeRank.cc @@ -14,8 +14,12 @@ #include #include +#include #include +#include #include +#include +#include #include #include #include @@ -26,6 +30,7 @@ #include #include #include +#include #include #include @@ -43,6 +48,10 @@ #include "TimekeeperPool.h" #include "dwarfs/pagerank.h" +#ifdef FEEDSIM_USE_DLRM +#include "dwarfs/dlrm.h" +#endif + #include "if/gen-cpp2/ranking_types.h" #include "../search/ICacheBuster.h" @@ -61,6 +70,13 @@ const auto kNumICacheBusterMethods = 100000; const auto kPointerChaseSize = 10000000; const auto kPageRankThreshold = 1e-4; +// I/O latency distribution types for Phase 3 +enum class IOLatencyDistType { + FIXED, // Fixed latency (original behavior) + EXPONENTIAL, // Exponential distribution (memoryless, models queue delays) + LOGNORMAL // Lognormal distribution (models real-world service latencies) +}; + struct ThreadData { std::shared_ptr cpuThreadPool; std::shared_ptr srvCPUThreadPool; @@ -68,16 +84,143 @@ struct ThreadData { std::shared_ptr ioThreadPool; std::shared_ptr timekeeperPool; std::unique_ptr page_ranker; +#ifdef FEEDSIM_USE_DLRM + std::shared_ptr dlrm_ranker; +#endif std::unique_ptr pointer_chaser; std::unique_ptr icache_buster; std::default_random_engine rng; std::gamma_distribution latency_distribution; std::string random_string; + + // Phase 3: I/O latency distribution support + IOLatencyDistType io_latency_dist_type = IOLatencyDistType::FIXED; + std::exponential_distribution io_exponential_dist; + std::lognormal_distribution io_lognormal_dist; + int io_latency_mean_ms = 200; + int io_latency_min_ms = 50; // Minimum bound to prevent too-fast responses + int io_latency_max_ms = 1000; // Maximum bound to prevent extreme outliers (was 5000) + + // Mutex for thread-safe RNG access (RNG state is not thread-safe) + std::mutex rng_mutex; + + // Get next I/O latency based on distribution type + // IMPORTANT: This function MUST be called from the handler thread (before async) + // to avoid race conditions on the RNG state. + int getNextIOLatencyMs() { + std::lock_guard lock(rng_mutex); + switch (io_latency_dist_type) { + case IOLatencyDistType::FIXED: + return io_latency_mean_ms; + case IOLatencyDistType::EXPONENTIAL: + // Exponential distribution with specified mean, bounded + return std::max(io_latency_min_ms, + std::min(io_latency_max_ms, static_cast(io_exponential_dist(rng)))); + case IOLatencyDistType::LOGNORMAL: + // Lognormal distribution with tighter bounds to reduce tail latency variance + return std::max(io_latency_min_ms, + std::min(io_latency_max_ms, static_cast(io_lognormal_dist(rng)))); + default: + return io_latency_mean_ms; + } + } +}; + +// Enum for workload type +enum class WorkloadType { + PAGERANK, + DLRM }; +// Global workload type +static WorkloadType g_workload_type = WorkloadType::PAGERANK; + // Global graph that will be shared across threads CSRGraph g_shared_graph; +#ifdef FEEDSIM_USE_DLRM +void ThreadStartup( + oldisim::NodeThread& thread, + std::vector& thread_data, + ranking::dwarfs::PageRankParams& params, + const std::shared_ptr& cpuThreadPool, + const std::shared_ptr& srvCPUThreadPool, + const std::shared_ptr& srvIOThreadPool, + const std::shared_ptr& ioThreadPool, + const std::shared_ptr& timekeeperPool, + const std::shared_ptr& shared_dlrm_ranker) { + auto& this_thread = thread_data[thread.get_thread_num()]; + this_thread.cpuThreadPool = cpuThreadPool; + this_thread.srvCPUThreadPool = srvCPUThreadPool; + this_thread.srvIOThreadPool = srvIOThreadPool; + this_thread.ioThreadPool = ioThreadPool; + this_thread.timekeeperPool = timekeeperPool; + + // Store shared DLRM ranker + this_thread.dlrm_ranker = shared_dlrm_ranker; + + unsigned noderank_seed; + if (args.node_rank_seed_given) { + noderank_seed = static_cast(args.node_rank_seed_arg); + } else { + noderank_seed = std::chrono::system_clock::now().time_since_epoch().count(); + } + + unsigned pointer_chase_seed; + if (args.pointer_chase_seed_given) { + pointer_chase_seed = static_cast(args.pointer_chase_seed_arg); + } else { + pointer_chase_seed = + std::chrono::system_clock::now().time_since_epoch().count(); + } + + // Only initialize PageRank if we're using it + if (g_workload_type == WorkloadType::PAGERANK) { + unsigned page_rank_seed; + if (args.page_rank_seed_given) { + page_rank_seed = static_cast(args.page_rank_seed_arg); + } else { + page_rank_seed = std::chrono::system_clock::now().time_since_epoch().count(); + } + auto graph = params.makeGraphCopy(g_shared_graph); + this_thread.page_ranker = std::make_unique( + std::move(graph), args.cpu_threads_arg, page_rank_seed); + this_thread.icache_buster = + std::make_unique(kNumICacheBusterMethods); + } + + this_thread.pointer_chaser = std::make_unique( + kPointerChaseSize, pointer_chase_seed); + this_thread.rng.seed(noderank_seed); + + const double alpha = 0.7; + const double beta = 20000; + this_thread.latency_distribution = + std::gamma_distribution(alpha, beta); + + this_thread.random_string = RandomString(args.random_data_size_arg); + + // Phase 3: Initialize I/O latency distributions + this_thread.io_latency_mean_ms = args.io_latency_mean_ms_arg; + std::string io_dist_str = args.io_latency_distribution_arg; + if (io_dist_str == "exponential") { + this_thread.io_latency_dist_type = IOLatencyDistType::EXPONENTIAL; + double rate = 1.0 / static_cast(args.io_latency_mean_ms_arg); + this_thread.io_exponential_dist = std::exponential_distribution(rate); + } else if (io_dist_str == "lognormal") { + this_thread.io_latency_dist_type = IOLatencyDistType::LOGNORMAL; + double mean = static_cast(args.io_latency_mean_ms_arg); + double stddev = static_cast(args.io_latency_stddev_ms_arg); + double variance = stddev * stddev; + double mu = std::log(mean * mean / std::sqrt(variance + mean * mean)); + double sigma = std::sqrt(std::log(1.0 + variance / (mean * mean))); + this_thread.io_lognormal_dist = std::lognormal_distribution(mu, sigma); + } else { + this_thread.io_latency_dist_type = IOLatencyDistType::FIXED; + } +} +#endif + void ThreadStartup( oldisim::NodeThread& thread, std::vector& thread_data, @@ -88,7 +231,6 @@ void ThreadStartup( const std::shared_ptr& ioThreadPool, const std::shared_ptr& timekeeperPool) { auto& this_thread = thread_data[thread.get_thread_num()]; - // auto graph = params.buildGraph(); auto graph = params.makeGraphCopy(g_shared_graph); this_thread.cpuThreadPool = cpuThreadPool; this_thread.srvCPUThreadPool = srvCPUThreadPool; @@ -131,6 +273,27 @@ void ThreadStartup( std::gamma_distribution(alpha, beta); this_thread.random_string = RandomString(args.random_data_size_arg); + + // Phase 3: Initialize I/O latency distributions + this_thread.io_latency_mean_ms = args.io_latency_mean_ms_arg; + std::string io_dist_str = args.io_latency_distribution_arg; + if (io_dist_str == "exponential") { + this_thread.io_latency_dist_type = IOLatencyDistType::EXPONENTIAL; + // Exponential distribution with rate lambda = 1/mean + double rate = 1.0 / static_cast(args.io_latency_mean_ms_arg); + this_thread.io_exponential_dist = std::exponential_distribution(rate); + } else if (io_dist_str == "lognormal") { + this_thread.io_latency_dist_type = IOLatencyDistType::LOGNORMAL; + // Convert mean and stddev to lognormal parameters (mu, sigma) + double mean = static_cast(args.io_latency_mean_ms_arg); + double stddev = static_cast(args.io_latency_stddev_ms_arg); + double variance = stddev * stddev; + double mu = std::log(mean * mean / std::sqrt(variance + mean * mean)); + double sigma = std::sqrt(std::log(1.0 + variance / (mean * mean))); + this_thread.io_lognormal_dist = std::lognormal_distribution(mu, sigma); + } else { + this_thread.io_latency_dist_type = IOLatencyDistType::FIXED; + } } std::string compressPayload(const std::string& data, int result) { @@ -170,60 +333,281 @@ ranking::RankingResponse deserializePayload(const folly::IOBuf* buf) { return resp; } +/** + * Phase 3: Async (non-blocking) request handler using continuation-passing style. + * + * This handler implements the same logic as PageRankRequestHandler but without + * blocking .get() calls. This eliminates thread starvation on high-core CPUs + * by returning immediately and processing the response asynchronously. + * + * Key differences from the blocking handler: + * 1. No blocking .get() calls on the I/O future + * 2. Response is sent in the final continuation + * 3. All I/O stages are chained via .thenValue()/.thenVia() + * 4. Uses configurable I/O latency distributions + * + * CRITICAL: The QueryContext is moved into a shared_ptr to extend its lifetime + * beyond the handler return. The oldisim framework destroys the context after + * the handler returns, but we need it to survive until the async callback. + */ +void AsyncPageRankRequestHandler( + oldisim::NodeThread& thread, + oldisim::QueryContext& context, + std::vector& thread_data) { + auto& this_thread = thread_data[thread.get_thread_num()]; + int thread_id = thread.get_thread_num(); + + // CRITICAL: Move the QueryContext into a shared_ptr to extend its lifetime. + // The oldisim framework destroys the stack-allocated context after this + // handler returns, but we need it alive until the async callback completes. + auto context_ptr = std::make_shared(std::move(context)); + + // Stage 1: ICacheBuster (synchronous, lightweight) - only for PageRank + if (g_workload_type == WorkloadType::PAGERANK) { + const int min_iterations = std::max(args.min_icache_iterations_arg, 0); + const int num_iterations = + static_cast(this_thread.latency_distribution(this_thread.rng)) + + min_iterations; + ICacheBuster& buster = *this_thread.icache_buster; + + for (int i = 0; i < num_iterations; i++) { + buster.RunNextMethod(); + } + } + + // Stage 2: Ranking workload (CPU-intensive, parallelized) + // This stage blocks briefly for CPU work, which is acceptable + int ranking_result = 0; + if (g_workload_type == WorkloadType::PAGERANK) { + auto per_thread_subset = args.graph_subset_arg / args.cpu_threads_arg; + + std::vector> futures; + for (int i = 0; i < args.cpu_threads_arg; i++) { + auto f = folly::via( + this_thread.cpuThreadPool.get(), + [i, &this_thread, per_thread_subset]() { + return this_thread.page_ranker->rank( + i, + args.graph_max_iters_arg, + kPageRankThreshold, + args.rank_trials_per_thread_arg, + per_thread_subset); + }); + futures.push_back(std::move(f)); + } + auto fs = folly::collectAll(std::move(futures)).get(); + for (auto& f : fs) { + ranking_result += f.value(); + } + } +#ifdef FEEDSIM_USE_DLRM + else if (g_workload_type == WorkloadType::DLRM) { + int num_inferences = args.dlrm_inferences_per_request_arg; + int batch_size = args.dlrm_batch_size_arg; + + std::vector> futures; + for (int i = 0; i < args.cpu_threads_arg; i++) { + auto f = folly::via( + this_thread.cpuThreadPool.get(), + [thread_id, num_inferences, batch_size, &this_thread]() { + return this_thread.dlrm_ranker->infer( + thread_id, num_inferences, batch_size); + }); + futures.push_back(std::move(f)); + } + auto fs = folly::collectAll(std::move(futures)).get(); + for (auto& f : fs) { + ranking_result += f.value(); + } + } +#endif + + // Capture data needed for async stages by value + auto random_string = this_thread.random_string; + auto srvCPUThreadPool = this_thread.srvCPUThreadPool; + auto srvIOThreadPool = this_thread.srvIOThreadPool; + auto ioThreadPool = this_thread.ioThreadPool; + auto timekeeperPool = this_thread.timekeeperPool; + search::PointerChase* pointer_chaser = this_thread.pointer_chaser.get(); + + // Get I/O latency for this request (configurable distribution) + int io_latency_ms = this_thread.getNextIOLatencyMs(); + + // For multi-stage I/O simulation + int num_io_stages = args.io_stages_arg; + int io_stage_latency_ms = args.io_stage_latency_ms_arg; + + // Capture values for lambda captures + int srv_io_threads = args.srv_io_threads_arg; + int srv_threads = args.srv_threads_arg; + int num_objects = args.num_objects_arg; + int chase_iterations = args.chase_iterations_arg; + + // Stage 3: Async I/O simulation (NON-BLOCKING) + // This is the critical change - we use continuation-passing style + auto timekeeper = timekeeperPool->getTimekeeper(); + + // Calculate total I/O latency + int total_io_latency_ms = (num_io_stages > 1) + ? (num_io_stages * io_stage_latency_ms) + : io_latency_ms; + + // Start async chain with I/O sleep + folly::futures::sleep( + std::chrono::milliseconds(total_io_latency_ms), timekeeper.get()) + .via(ioThreadPool.get()) + .thenValue([ranking_result, random_string, srvIOThreadPool, + srv_io_threads, num_objects](folly::Unit) { + // Stage 4: Compression and serialization + auto compressed = compressPayload(random_string, ranking_result); + auto per_thread_num_objects = num_objects / srv_io_threads; + + std::vector> compressionFutures; + for (int i = 0; i < srv_io_threads; i++) { + auto f = folly::via(srvIOThreadPool.get(), [per_thread_num_objects]() { + auto resp = ranking::generators::generateRandomRankingResponse( + per_thread_num_objects); + auto payloadiobufq = serializePayload(resp); + auto buf = payloadiobufq.move(); + const auto compress_length = buf->computeChainDataLength() / 2; + size_t total_size = 0; + for (auto range : *buf) { + if (total_size >= compress_length) break; + auto iobuf = folly::IOBuf::copyBuffer(range.data(), range.size()); + auto c = compressThrift(std::move(iobuf)); + total_size += range.size(); + } + return 1; + }); + compressionFutures.push_back(std::move(f)); + } + return folly::collectAll(std::move(compressionFutures)) + .via(srvIOThreadPool.get()) + .thenValue([ranking_result](std::vector> results) { + int total = ranking_result; + for (auto& r : results) { + if (r.hasValue()) total += r.value(); + } + return total; + }); + }) + .thenValue([pointer_chaser, srvCPUThreadPool, srv_threads, + chase_iterations](int prev_result) { + // Stage 5: Pointer chase + auto per_thread_chase_iterations = chase_iterations / srv_threads; + + std::vector> chaseFutures; + for (int i = 0; i < srv_threads; i++) { + auto f = folly::via(srvCPUThreadPool.get(), + [pointer_chaser, per_thread_chase_iterations]() { + pointer_chaser->Chase(per_thread_chase_iterations); + return 1; + }); + chaseFutures.push_back(std::move(f)); + } + return folly::collectAll(std::move(chaseFutures)) + .via(srvCPUThreadPool.get()) + .thenValue([prev_result](std::vector> results) { + int total = prev_result; + for (auto& r : results) { + if (r.hasValue()) total += r.value(); + } + return total; + }); + }) + .thenValue([context_ptr, srv_io_threads, num_objects](int final_result) { + // Stage 6: Generate and send response + auto per_thread_num_objects = num_objects / srv_io_threads; + auto r = ranking::generators::generateRandomRankingResponse( + per_thread_num_objects); + ranking::RankingResponse resp = r; + + auto payloadiobufq = serializePayload(resp); + auto buf = payloadiobufq.move(); + + context_ptr->SendResponse(buf->data(), buf->length()); + }) + .thenError(folly::tag_t{}, [context_ptr](const std::exception& e) { + // Error handling + std::cerr << "Async request handler error: " << e.what() << std::endl; + context_ptr->SendResponse(nullptr, 0); + }); + + // NO .get() here! Handler returns immediately, work continues asynchronously +} + void PageRankRequestHandler( oldisim::NodeThread& thread, oldisim::QueryContext& context, std::vector& thread_data) { auto& this_thread = thread_data[thread.get_thread_num()]; - const int min_iterations = std::max(args.min_icache_iterations_arg, 0); - const int num_iterations = - static_cast(this_thread.latency_distribution(this_thread.rng)) + - min_iterations; - ICacheBuster& buster = *this_thread.icache_buster; search::PointerChase& chaser = *this_thread.pointer_chaser; - for (int i = 0; i < num_iterations; i++) { - buster.RunNextMethod(); + // ICacheBuster stage (only for PageRank mode) + if (g_workload_type == WorkloadType::PAGERANK) { + const int min_iterations = std::max(args.min_icache_iterations_arg, 0); + const int num_iterations = + static_cast(this_thread.latency_distribution(this_thread.rng)) + + min_iterations; + ICacheBuster& buster = *this_thread.icache_buster; + + for (int i = 0; i < num_iterations; i++) { + buster.RunNextMethod(); + } } - // auto start = std::chrono::steady_clock::now(); - auto per_thread_subset = args.graph_subset_arg / args.cpu_threads_arg; - - std::vector> futures; - for (int i = 0; i < args.cpu_threads_arg; i++) { - auto f = folly::via( - this_thread.cpuThreadPool.get(), - [i, &this_thread, per_thread_subset]() { - return this_thread.page_ranker->rank( - i, - args.graph_max_iters_arg, - kPageRankThreshold, - args.rank_trials_per_thread_arg, - per_thread_subset); - }); - futures.push_back(std::move(f)); + int result = 0; + + // Ranking stage - either PageRank or DLRM + if (g_workload_type == WorkloadType::PAGERANK) { + // PageRank workload + auto per_thread_subset = args.graph_subset_arg / args.cpu_threads_arg; + + std::vector> futures; + for (int i = 0; i < args.cpu_threads_arg; i++) { + auto f = folly::via( + this_thread.cpuThreadPool.get(), + [i, &this_thread, per_thread_subset]() { + return this_thread.page_ranker->rank( + i, + args.graph_max_iters_arg, + kPageRankThreshold, + args.rank_trials_per_thread_arg, + per_thread_subset); + }); + futures.push_back(std::move(f)); + } + auto fs = folly::collect(futures).get(); + result = std::accumulate(fs.begin(), fs.end(), 0); + } +#ifdef FEEDSIM_USE_DLRM + else if (g_workload_type == WorkloadType::DLRM) { + // DLRM workload + int thread_id = thread.get_thread_num(); + int num_inferences = args.dlrm_inferences_per_request_arg; + int batch_size = args.dlrm_batch_size_arg; + + std::vector> futures; + for (int i = 0; i < args.cpu_threads_arg; i++) { + auto f = folly::via( + this_thread.cpuThreadPool.get(), + [thread_id, num_inferences, batch_size, &this_thread]() { + return this_thread.dlrm_ranker->infer( + thread_id, num_inferences, batch_size); + }); + futures.push_back(std::move(f)); + } + auto fs = folly::collect(futures).get(); + result = std::accumulate(fs.begin(), fs.end(), 0); } - auto fs = folly::collect(futures).get(); - int result = std::accumulate(fs.begin(), fs.end(), 0); - // auto end = std::chrono::steady_clock::now(); - // auto duration = - // std::chrono::duration_cast(end - start) - // .count(); - // std::cout << duration - // << '\n'; +#endif + + // I/O simulation stage auto timekeeper = this_thread.timekeeperPool->getTimekeeper(); auto s = folly::futures::sleep( std::chrono::milliseconds(args.io_time_ms_arg), timekeeper.get()) .via(this_thread.ioThreadPool.get()) .thenValue([&](auto&& _) { - // auto start = std::chrono::steady_clock::now(); - // chaser.Chase(args.io_chase_iterations_arg); - // auto end = std::chrono::steady_clock::now(); - // std::cout << - // std::chrono::duration_cast( - // end - start) - // .count() - // << '\n'; return result + 1; }); result = std::move(s).get(); @@ -256,14 +640,6 @@ void PageRankRequestHandler( auto cfs = folly::collect(compressionFutures).get(); int cResult = std::accumulate(cfs.begin(), cfs.end(), 0); - /* - auto r = folly::via(this_thread.srvCPUThreadPool.get(), [&]() { - //chaser.Chase(args.chase_iterations_arg); - return ranking::generators::generateRandomRankingResponse( - args.num_objects_arg); - }); - */ - auto per_thread_chase_iterations = args.chase_iterations_arg / args.srv_threads_arg; std::vector> chaseFutures; @@ -280,15 +656,12 @@ void PageRankRequestHandler( // Generate a response auto r = ranking::generators::generateRandomRankingResponse( per_thread_num_objects); - ranking::RankingResponse resp = r; // std::move(r).get(); + ranking::RankingResponse resp = r; // Serialize into FBThrift auto payloadiobufq = serializePayload(resp); auto buf = payloadiobufq.move(); - // folly::futures::sleep(std::chrono::milliseconds(2), - // timekeeper.get()).get(); - auto uncompressed = decompressPayload(compressed); auto resp1 = deserializePayload(buf.get()); @@ -307,6 +680,22 @@ int main(int argc, char** argv) { if (args.quiet_given != 0u) { log_level = QUIET; } + + // Determine workload type + std::string workload_type_str = args.workload_type_arg; + if (workload_type_str == "dlrm") { +#ifdef FEEDSIM_USE_DLRM + g_workload_type = WorkloadType::DLRM; + std::cout << "Using DLRM workload type" << std::endl; +#else + DIE("DLRM workload requested but FEEDSIM_USE_DLRM is not defined. " + "Rebuild with LibTorch support."); +#endif + } else { + g_workload_type = WorkloadType::PAGERANK; + std::cout << "Using PageRank workload type" << std::endl; + } + int fake_argc = 1; char* fake_argv[2] = {const_cast("./LeafNodeRank"), nullptr}; char** sargv = static_cast(fake_argv); @@ -328,59 +717,163 @@ int main(int argc, char** argv) { auto timekeeperPool = std::make_shared(args.timekeeper_threads_arg); + // Warm up all thread pools to ensure threads are spawned and ready + // This prevents cold-start latency spikes during actual request processing + std::cout << "Warming up thread pools..." << std::endl; + { + const int warmup_tasks = 100; // Run multiple tasks to ensure all threads are active + + // Warm up CPU thread pool + std::vector> cpuFutures; + for (int i = 0; i < warmup_tasks; i++) { + cpuFutures.push_back(folly::via(cpuThreadPool.get(), []() { + volatile int sum = 0; + for (int j = 0; j < 1000; j++) sum += j; + return static_cast(sum); + })); + } + folly::collectAll(std::move(cpuFutures)).get(); + + // Warm up srvCPU thread pool + std::vector> srvCPUFutures; + for (int i = 0; i < warmup_tasks; i++) { + srvCPUFutures.push_back(folly::via(srvCPUThreadPool.get(), []() { + volatile int sum = 0; + for (int j = 0; j < 1000; j++) sum += j; + return static_cast(sum); + })); + } + folly::collectAll(std::move(srvCPUFutures)).get(); + + // Warm up srvIO thread pool + std::vector> srvIOFutures; + for (int i = 0; i < warmup_tasks; i++) { + srvIOFutures.push_back(folly::via(srvIOThreadPool.get(), []() { + volatile int sum = 0; + for (int j = 0; j < 1000; j++) sum += j; + return static_cast(sum); + })); + } + folly::collectAll(std::move(srvIOFutures)).get(); + + // Warm up IO thread pool (uses different API) + std::vector> ioFutures; + for (int i = 0; i < warmup_tasks; i++) { + ioFutures.push_back(folly::via(ioThreadPool.get(), []() { + return 1; + })); + } + folly::collectAll(std::move(ioFutures)).get(); + + // Warm up timekeeper by scheduling a few sleeps + auto timekeeper = timekeeperPool->getTimekeeper(); + std::vector> sleepFutures; + for (int i = 0; i < 10; i++) { + sleepFutures.push_back( + folly::futures::sleep(std::chrono::milliseconds(1), timekeeper.get())); + } + folly::collectAll(std::move(sleepFutures)).get(); + } + std::cout << "Thread pool warmup complete" << std::endl; + std::vector thread_data(args.threads_arg); ranking::dwarfs::PageRankParams params{ args.graph_scale_arg, args.graph_degree_arg}; - // create or load a graph - - if (args.load_graph_given) { - if (args.instrument_graph_given) { - auto start_load = std::chrono::steady_clock::now(); - g_shared_graph = params.loadGraphFromFile(args.load_graph_arg); - auto end_load = std::chrono::steady_clock::now(); - auto load_duration = - std::chrono::duration_cast( - end_load - start_load) - .count(); - std::cout << "Graph loading time: " << load_duration << " ms" - << std::endl; - } else { - g_shared_graph = params.loadGraphFromFile(args.load_graph_arg); +#ifdef FEEDSIM_USE_DLRM + // Initialize shared DLRM model if using DLRM workload + std::shared_ptr shared_dlrm_ranker; + if (g_workload_type == WorkloadType::DLRM) { + if (!args.dlrm_model_path_given) { + DIE("DLRM workload requires --dlrm_model_path"); } - } else { - if (args.instrument_graph_given) { - auto start_build = std::chrono::steady_clock::now(); - g_shared_graph = params.buildGraph(); - auto end_build = std::chrono::steady_clock::now(); - auto build_duration = - std::chrono::duration_cast( - end_build - start_build) - .count(); - std::cout << "Graph building time: " << build_duration << " ms" - << std::endl; - - if (args.store_graph_given) { - auto start_store = std::chrono::steady_clock::now(); - params.storeGraphToFile(g_shared_graph, args.store_graph_arg); - auto end_store = std::chrono::steady_clock::now(); - auto store_duration = + ranking::dwarfs::DLRMParams dlrm_params; + dlrm_params.model_path = args.dlrm_model_path_arg; + dlrm_params.batch_size = args.dlrm_batch_size_arg; + dlrm_params.num_threads = args.dlrm_threads_arg; + + unsigned dlrm_seed = 0; + if (args.dlrm_seed_given) { + dlrm_seed = static_cast(args.dlrm_seed_arg); + } + + // Create shared DLRM model (thread-safe for inference) + shared_dlrm_ranker = std::make_shared( + dlrm_params, args.threads_arg, dlrm_seed); + + // Warm up DLRM model to stabilize inference latency + // JIT compilation and memory allocation happen on first few inferences + std::cout << "Warming up DLRM model..." << std::endl; + const int warmup_iterations = 10; // Run enough iterations to JIT compile all paths + for (int i = 0; i < warmup_iterations; i++) { + shared_dlrm_ranker->infer(0, 1, args.dlrm_batch_size_arg); + } + std::cout << "DLRM warmup complete (" << warmup_iterations << " iterations)" << std::endl; + } +#endif + + // create or load a graph (only for PageRank mode) + if (g_workload_type == WorkloadType::PAGERANK) { + if (args.load_graph_given) { + if (args.instrument_graph_given) { + auto start_load = std::chrono::steady_clock::now(); + g_shared_graph = params.loadGraphFromFile(args.load_graph_arg); + auto end_load = std::chrono::steady_clock::now(); + auto load_duration = std::chrono::duration_cast( - end_store - start_store) + end_load - start_load) .count(); - std::cout << "Graph storing time: " << store_duration << " ms" + std::cout << "Graph loading time: " << load_duration << " ms" << std::endl; + } else { + g_shared_graph = params.loadGraphFromFile(args.load_graph_arg); } } else { - g_shared_graph = params.buildGraph(); - if (args.store_graph_given) { - params.storeGraphToFile(g_shared_graph, args.store_graph_arg); + if (args.instrument_graph_given) { + auto start_build = std::chrono::steady_clock::now(); + g_shared_graph = params.buildGraph(); + auto end_build = std::chrono::steady_clock::now(); + auto build_duration = + std::chrono::duration_cast( + end_build - start_build) + .count(); + std::cout << "Graph building time: " << build_duration << " ms" + << std::endl; + + if (args.store_graph_given) { + auto start_store = std::chrono::steady_clock::now(); + params.storeGraphToFile(g_shared_graph, args.store_graph_arg); + auto end_store = std::chrono::steady_clock::now(); + auto store_duration = + std::chrono::duration_cast( + end_store - start_store) + .count(); + std::cout << "Graph storing time: " << store_duration << " ms" + << std::endl; + } + } else { + g_shared_graph = params.buildGraph(); + if (args.store_graph_given) { + params.storeGraphToFile(g_shared_graph, args.store_graph_arg); + } } } } oldisim::LeafNodeServer server(args.port_arg); server.SetThreadStartupCallback([&](auto&& thread) { +#ifdef FEEDSIM_USE_DLRM + return ThreadStartup( + thread, + thread_data, + params, + cpuThreadPool, + srvCPUThreadPool, + srvIOThreadPool, + ioThreadPool, + timekeeperPool, + shared_dlrm_ranker); +#else return ThreadStartup( thread, thread_data, @@ -390,12 +883,34 @@ int main(int argc, char** argv) { srvIOThreadPool, ioThreadPool, timekeeperPool); +#endif }); - server.RegisterQueryCallback( - ranking::kPageRankRequestType, - [&thread_data](auto&& thread, auto&& context) { - return PageRankRequestHandler(thread, context, thread_data); - }); + + // Choose request handler based on async_io flag + if (args.async_io_given) { + std::cout << "Using ASYNC (non-blocking) I/O mode - eliminates thread starvation" << std::endl; + std::cout << " I/O latency distribution: " << args.io_latency_distribution_arg << std::endl; + std::cout << " I/O latency mean: " << args.io_latency_mean_ms_arg << " ms" << std::endl; + if (std::string(args.io_latency_distribution_arg) == "lognormal") { + std::cout << " I/O latency stddev: " << args.io_latency_stddev_ms_arg << " ms" << std::endl; + } + if (args.io_stages_arg > 1) { + std::cout << " I/O stages: " << args.io_stages_arg << " x " << args.io_stage_latency_ms_arg << " ms" << std::endl; + } + + server.RegisterQueryCallback( + ranking::kPageRankRequestType, + [&thread_data](auto&& thread, auto&& context) { + return AsyncPageRankRequestHandler(thread, context, thread_data); + }); + } else { + std::cout << "Using BLOCKING I/O mode (original behavior)" << std::endl; + server.RegisterQueryCallback( + ranking::kPageRankRequestType, + [&thread_data](auto&& thread, auto&& context) { + return PageRankRequestHandler(thread, context, thread_data); + }); + } server.SetNumThreads(args.threads_arg); server.SetThreadPinning(args.noaffinity_given == 0u); server.SetThreadLoadBalancing(args.noloadbalance_given == 0u); diff --git a/packages/feedsim/third_party/src/workloads/ranking/LeafNodeRankCmdline.ggo b/packages/feedsim/third_party/src/workloads/ranking/LeafNodeRankCmdline.ggo index b1ff92e8..b218d961 100644 --- a/packages/feedsim/third_party/src/workloads/ranking/LeafNodeRankCmdline.ggo +++ b/packages/feedsim/third_party/src/workloads/ranking/LeafNodeRankCmdline.ggo @@ -51,3 +51,19 @@ option "port" - "Port to run server on." int default="11222" option "monitor_port" - "Port to run monitoring server on." int default="8888" option "noaffinity" - "Specify to disable thread pinning" option "noloadbalance" - "Specify to disable thread load balancing" + +# DLRM (FeedSim v2) options +option "workload_type" - "Type of ranking workload: pagerank or dlrm." string values="pagerank","dlrm" default="pagerank" +option "dlrm_model_path" - "Path to TorchScript DLRM model file (.pt)." string typestr="filename" optional +option "dlrm_batch_size" - "Batch size for DLRM inference." int default="256" +option "dlrm_inferences_per_request" - "Number of DLRM inference calls per request." int default="1" +option "dlrm_threads" - "Number of LibTorch threads for DLRM inference." int default="8" +option "dlrm_seed" - "Seed for DLRM feature generation. If not provided, current time will be used." long optional + +# Phase 3: Non-blocking I/O options +option "async_io" - "Enable non-blocking async I/O pattern (eliminates thread starvation)." +option "io_latency_distribution" - "I/O latency distribution: fixed, exponential, or lognormal." string values="fixed","exponential","lognormal" default="fixed" +option "io_latency_mean_ms" - "Mean I/O latency in milliseconds. For fixed: exact value; for exponential/lognormal: distribution mean." int default="200" +option "io_latency_stddev_ms" - "I/O latency standard deviation in ms (only used for lognormal distribution)." int default="50" +option "io_stages" - "Number of I/O stages to simulate (models multi-hop data fetching like Feature Store -> Memcache -> TAO)." int default="1" +option "io_stage_latency_ms" - "Latency per I/O stage in ms (when io_stages > 1). Total latency = io_stages * io_stage_latency_ms." int default="50" diff --git a/packages/feedsim/third_party/src/workloads/ranking/dwarfs/dlrm.cpp b/packages/feedsim/third_party/src/workloads/ranking/dwarfs/dlrm.cpp new file mode 100644 index 00000000..c4883671 --- /dev/null +++ b/packages/feedsim/third_party/src/workloads/ranking/dwarfs/dlrm.cpp @@ -0,0 +1,217 @@ +/** + * dlrm.cpp - DLRM Inference Implementation for FeedSim + * + * Copyright (c) Meta Platforms, Inc. and affiliates. + */ + +#ifdef FEEDSIM_USE_DLRM + +#include "dlrm.h" + +// Include LibTorch headers only in the implementation file +// to avoid conflicts with oldisim's Log.h +#include +#include + +#include +#include +#include + +namespace ranking { +namespace dwarfs { + +// Implementation class that holds LibTorch-specific data +struct DLRM::Impl { + torch::jit::script::Module model; + + // Per-thread state for synthetic feature generation + struct ThreadState { + std::mt19937 rng; + std::normal_distribution dense_dist{0.0f, 1.0f}; + std::vector dense_buffer; + std::vector sparse_buffer; + }; + std::vector> thread_states; + + void loadModel(const std::string& model_path) { + model = torch::jit::load(model_path); + model.eval(); + model = torch::jit::optimize_for_inference(model); + } + + void initializeThreadState( + int num_threads, + unsigned seed, + int batch_size, + int num_dense_features, + int num_sparse_features) { + thread_states.resize(num_threads); + + for (int i = 0; i < num_threads; ++i) { + auto state = std::make_unique(); + + // Initialize RNG + unsigned actual_seed = seed; + if (actual_seed == 0) { + actual_seed = + std::chrono::system_clock::now().time_since_epoch().count() + i; + } else { + actual_seed += i; // Unique seed per thread + } + state->rng.seed(actual_seed); + + // Pre-allocate buffers + state->dense_buffer.resize(batch_size * num_dense_features); + state->sparse_buffer.resize(batch_size * num_sparse_features); + + thread_states[i] = std::move(state); + } + } + + at::Tensor generateDenseFeatures( + int thread_id, + int batch_size, + int num_dense_features) { + auto& state = thread_states[thread_id]; + + // Resize buffer if needed + size_t required_size = batch_size * num_dense_features; + if (state->dense_buffer.size() < required_size) { + state->dense_buffer.resize(required_size); + } + + // Generate random dense features (log-normal distribution to mimic Criteo) + for (size_t i = 0; i < required_size; ++i) { + float normal_val = state->dense_dist(state->rng); + state->dense_buffer[i] = std::exp(1.5f + normal_val); + } + + return torch::from_blob( + state->dense_buffer.data(), + {static_cast(batch_size), num_dense_features}, + torch::kFloat32); + } + + at::Tensor generateSparseFeatures( + int thread_id, + int batch_size, + int num_sparse_features, + const std::vector& embedding_table_sizes) { + auto& state = thread_states[thread_id]; + + // Resize buffer if needed + size_t required_size = batch_size * num_sparse_features; + if (state->sparse_buffer.size() < required_size) { + state->sparse_buffer.resize(required_size); + } + + // Generate random sparse feature indices + for (size_t i = 0; i < required_size; ++i) { + int feature_idx = i % num_sparse_features; + int64_t max_val = embedding_table_sizes[feature_idx]; + state->sparse_buffer[i] = state->rng() % max_val; + } + + return torch::from_blob( + state->sparse_buffer.data(), + {static_cast(batch_size), num_sparse_features}, + torch::kInt64); + } +}; + +DLRM::DLRM(const DLRMParams& params, int num_thread_instances, unsigned seed) + : pimpl_(std::make_unique()), params_(params) { + // Set number of inference threads + at::set_num_threads(params_.num_threads); + + // Enable JIT optimizations + torch::jit::setGraphExecutorOptimize(true); + + // Load model if path provided + if (!params_.model_path.empty()) { + try { + std::cout << "Loading DLRM model from: " << params_.model_path + << std::endl; + pimpl_->loadModel(params_.model_path); + model_loaded_ = true; + std::cout << "DLRM model loaded successfully." << std::endl; + } catch (const c10::Error& e) { + throw std::runtime_error( + "Failed to load DLRM model: " + std::string(e.what())); + } + } + + // Initialize per-thread state + pimpl_->initializeThreadState( + num_thread_instances, + seed, + params_.batch_size, + params_.num_dense_features, + params_.num_sparse_features); + + // Warmup + if (model_loaded_) { + warmup(10); + } +} + +DLRM::~DLRM() = default; + +int DLRM::infer(int thread_id, int num_inferences, int batch_size) { + if (!model_loaded_) { + throw std::runtime_error("Model not loaded"); + } + + if (thread_id < 0 || + thread_id >= static_cast(pimpl_->thread_states.size())) { + throw std::out_of_range("Invalid thread_id: " + std::to_string(thread_id)); + } + + int total_predictions = 0; + + for (int i = 0; i < num_inferences; ++i) { + // Generate features + auto dense_tensor = pimpl_->generateDenseFeatures( + thread_id, batch_size, params_.num_dense_features); + auto sparse_tensor = pimpl_->generateSparseFeatures( + thread_id, + batch_size, + params_.num_sparse_features, + params_.embedding_table_sizes); + + // Run inference + std::vector inputs; + inputs.push_back(dense_tensor); + inputs.push_back(sparse_tensor); + + torch::NoGradGuard no_grad; + auto output = pimpl_->model.forward(inputs).toTensor(); + + // Count predictions (simulating actual work with the output) + total_predictions += output.numel(); + } + + return total_predictions; +} + +void DLRM::warmup(int num_iterations) { + if (!model_loaded_) { + std::cerr << "Warning: Cannot warmup - model not loaded" << std::endl; + return; + } + + std::cout << "Warming up DLRM model (" << num_iterations << " iterations)..." + << std::endl; + + // Use thread 0 for warmup + for (int i = 0; i < num_iterations; ++i) { + infer(0, 1, params_.batch_size); + } + + std::cout << "DLRM warmup complete." << std::endl; +} + +} // namespace dwarfs +} // namespace ranking + +#endif // FEEDSIM_USE_DLRM diff --git a/packages/feedsim/third_party/src/workloads/ranking/dwarfs/dlrm.h b/packages/feedsim/third_party/src/workloads/ranking/dwarfs/dlrm.h new file mode 100644 index 00000000..ec959573 --- /dev/null +++ b/packages/feedsim/third_party/src/workloads/ranking/dwarfs/dlrm.h @@ -0,0 +1,127 @@ +/** + * dlrm.h - DLRM Inference for FeedSim + * + * This header provides the DLRM inference class for use in FeedSim's + * LeafNodeRank as a drop-in replacement for PageRank. + * + * Copyright (c) Meta Platforms, Inc. and affiliates. + */ + +#pragma once + +#ifdef FEEDSIM_USE_DLRM + +// Forward declare torch types to avoid including LibTorch headers here. +// This prevents conflicts between LibTorch's c10 library and oldisim's Log.h. +namespace torch { +namespace jit { +class Module; +} // namespace jit +} // namespace torch + +#include +#include +#include +#include + +namespace ranking { +namespace dwarfs { + +/** + * Configuration for DLRM inference. + */ +struct DLRMParams { + // Model file path (TorchScript .pt file) + std::string model_path; + + // Feature dimensions + int num_dense_features = 13; + int num_sparse_features = 26; + + // Inference settings + int batch_size = 256; + int num_threads = 8; + + // Embedding table max sizes (for feature generation bounds) + std::vector embedding_table_sizes; + + DLRMParams() { + // Default embedding table sizes from trained model + embedding_table_sizes = { + 50000, 39060, 17295, 7424, 20265, 3, 7122, 1543, 63, + 50000, 50000, 50000, 10, 2209, 11938, 155, 4, 976, + 14, 50000, 50000, 50000, 50000, 12973, 108, 36, + }; + } +}; + +/** + * DLRM Inference Engine. + * + * Provides a similar interface to PageRank for easy integration into + * the existing FeedSim request handling flow. + * + * Thread Safety: + * - The model itself is thread-safe for concurrent inference + * - Each thread should use its own thread_id for per-thread state + */ +class DLRM { + public: + /** + * Construct a DLRM inference engine. + * + * @param params Configuration parameters + * @param num_thread_instances Number of thread instances to pre-allocate + * @param seed Random seed for feature generation (0 = use system clock) + */ + explicit DLRM( + const DLRMParams& params, + int num_thread_instances = 1, + unsigned seed = 0); + + ~DLRM(); + + // Disable copy + DLRM(const DLRM&) = delete; + DLRM& operator=(const DLRM&) = delete; + + /** + * Run DLRM inference with synthetic feature generation. + * + * This method signature mirrors PageRank::rank() for easy integration. + * + * @param thread_id Thread identifier for thread-local state + * @param num_inferences Number of inference calls to make + * @param batch_size Batch size per inference call + * @return Total number of predictions made + */ + int infer(int thread_id, int num_inferences, int batch_size); + + /** + * Warmup the model for optimal inference performance. + * + * @param num_iterations Number of warmup iterations + */ + void warmup(int num_iterations = 10); + + /** + * Check if model is loaded. + */ + bool isModelLoaded() const { + return model_loaded_; + } + + private: + // Use pimpl idiom to hide LibTorch implementation details + struct Impl; + std::unique_ptr pimpl_; + + // Configuration + DLRMParams params_; + bool model_loaded_ = false; +}; + +} // namespace dwarfs +} // namespace ranking + +#endif // FEEDSIM_USE_DLRM