Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-07-06 08:57:17

0001 
0002 // Copyright 2020, Jefferson Science Associates, LLC.
0003 // Subject to the terms in the LICENSE file found in the top-level directory.
0004 
0005 #include "JBenchmarker.h"
0006 
0007 #include <JANA/Utils/JCpuInfo.h>
0008 
0009 #include <fstream>
0010 #include <cmath>
0011 #include <iomanip>
0012 #include <ios>
0013 #include <sys/stat.h>
0014 #include <iostream>
0015 #include <vector>
0016 
0017 JBenchmarker::JBenchmarker(JApplication* app) : m_app(app) {
0018 
0019     m_max_threads = JCpuInfo::GetNumCpus();
0020 
0021     auto params = app->GetJParameterManager();
0022 
0023     m_logger = params->GetLogger("benchmark");
0024 
0025     params->SetParameter("jana:nevents", 0);
0026     // Prevent users' choice of nevents from interfering with everything
0027 
0028     params->SetDefaultParameter(
0029             "benchmark:nsamples",
0030             m_nsamples,
0031             "Number of samples for each benchmark test");
0032 
0033     params->SetDefaultParameter(
0034             "benchmark:minthreads",
0035             m_min_threads,
0036             "Minimum number of threads for benchmark test");
0037 
0038     params->SetDefaultParameter(
0039             "benchmark:maxthreads",
0040             m_max_threads,
0041             "Maximum number of threads for benchmark test");
0042 
0043     params->SetDefaultParameter(
0044             "benchmark:use_log_scale",
0045             m_use_log_scale,
0046             "Use log scale (instead of linear)");
0047 
0048     if (m_use_log_scale) {
0049         // A thread step of 1 won't work for log scale, so in this case we default to 2
0050         m_thread_step = 2;
0051     }
0052 
0053     params->SetDefaultParameter(
0054             "benchmark:threadstep",
0055             m_thread_step,
0056             "Delta number of threads between each benchmark test");
0057 
0058     params->SetDefaultParameter(
0059             "benchmark:resultsdir",
0060             m_output_dir,
0061             "Output directory name for benchmark test results");
0062 
0063     params->SetDefaultParameter(
0064             "benchmark:copyscript",
0065             m_copy_script,
0066             "Copy plotting script to results dir");
0067 
0068 
0069     params->SetParameter("nthreads", m_max_threads);
0070     // Otherwise JApplication::Scale() doesn't scale up. This is an interesting bug. TODO: Remove me when fixed.
0071 }
0072 
0073 
0074 JBenchmarker::~JBenchmarker() {}
0075 
0076 
0077 void JBenchmarker::RunUntilFinished() {
0078 
0079     LOG_INFO(m_logger) << "Running benchmarker with the following settings:" << std::endl
0080                        << "    benchmark:minthreads = " << m_min_threads << std::endl
0081                        << "    benchmark:maxthreads = " << m_max_threads << std::endl
0082                        << "    benchmark:threadstep = " << m_thread_step << std::endl
0083                        << "    benchmark:use_log_scale = " << m_use_log_scale << std::endl
0084                        << "    benchmark:nsamples = " << m_nsamples << std::endl
0085                        << "    benchmark:resultsdir = " << m_output_dir << std::endl;
0086 
0087 
0088     mkdir(m_output_dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0089 
0090     std::ofstream samples_file(m_output_dir + "/samples.dat");
0091     samples_file << "# nthreads     rate" << std::endl;
0092 
0093     std::ofstream rates_file(m_output_dir + "/rates.dat");
0094     rates_file << "# nthreads  avg_rate       rms" << std::endl;
0095 
0096 
0097     std::vector<size_t> nthreads_space;
0098     if (m_use_log_scale) {
0099         for (size_t i=m_min_threads; i<m_max_threads; i *= m_thread_step) {
0100             nthreads_space.push_back(i);
0101         }
0102     }
0103     else {
0104         // Use linear scale
0105         for (size_t i=m_min_threads; i<m_max_threads; i += m_thread_step) {
0106             nthreads_space.push_back(i);
0107         }
0108     }
0109     if (nthreads_space.back() != m_max_threads) {
0110         nthreads_space.push_back(m_max_threads);
0111     }
0112 
0113     m_app->SetTicker(false);
0114     m_app->Run(false);
0115 
0116     // Wait for events to start flowing indicating the source is primed
0117     for (int i = 0; i < 5; i++) {
0118         LOG_INFO(m_logger) << "Waiting for event source to start producing ... rate: " << m_app->GetInstantaneousRate() << LOG_END;
0119         std::this_thread::sleep_for(std::chrono::milliseconds(1000));
0120         auto rate = m_app->GetInstantaneousRate();
0121         if (rate > 10.0) {
0122             LOG_INFO(m_logger) << "Rate: " << rate << "Hz   -  ready to begin test" << LOG_END;
0123             break;
0124         }
0125     }
0126 
0127     for (size_t nthreads: nthreads_space) {
0128         if (m_app->IsQuitting()) {
0129             break;
0130         }
0131 
0132         m_app->Scale(nthreads);
0133 
0134         // Loop for at most 60 seconds waiting for the number of threads to update
0135         for (int i = 0; i < 60; i++) {
0136             std::this_thread::sleep_for(std::chrono::milliseconds(1000));
0137             if (m_app->GetNThreads() == nthreads) break;
0138         }
0139 
0140         // Accumulate avg and rms rates for all samples for each nthreads
0141         double avg = 0;
0142         double rms = 0;
0143         double sum = 0;
0144         double sum2 = 0;
0145 
0146         for (uint32_t isample = 0; isample < m_nsamples && !m_app->IsQuitting(); isample++) {
0147             // Acquire mNsamples instantaneous rate measurements. The
0148             // GetInstantaneousRate method will only update every 0.5
0149             // seconds so we just wait for 1 second between samples to
0150             // ensure independent measurements.
0151             std::this_thread::sleep_for(std::chrono::milliseconds(1000));
0152             auto rate = m_app->GetInstantaneousRate();
0153 
0154             sum += rate;
0155             sum2 += rate * rate;
0156             double N = (double) (isample + 1);
0157             avg = sum / N; // Overwrite with updated value after each sample
0158             rms = sqrt((sum2 + N * avg * avg - 2.0 * avg * sum) / N);
0159 
0160             LOG_INFO(m_logger)
0161                 << std::setprecision(2) << std::fixed
0162                 << "nthreads=" << m_app->GetNThreads()
0163                 << "  rate=" << rate << "Hz"
0164                 << "  (avg = " << avg << " +/- " << rms / sqrt(N) << " Hz)" << LOG_END;
0165 
0166             // Write line in sample file
0167             samples_file << std::setw(7) << nthreads << " "
0168                          << std::setw(12) << std::setprecision(2) << std::fixed << rate << std::endl;
0169             samples_file.flush();
0170         }
0171 
0172         // Write line in rates file
0173         rates_file << std::setw(7) << nthreads << " "
0174                    << std::setw(12) << std::setprecision(2) << std::fixed << avg << " "
0175                    << std::setw(10) << std::setprecision(2) << std::fixed << rms << std::endl;
0176         rates_file.flush();
0177     }
0178 
0179     // Close files
0180     // Hopefully, because we called flush(), the files will be partially filled even if we are SIGKILLed
0181     // before we reach this point.
0182 
0183     samples_file.close();
0184     rates_file.close();
0185 
0186     if (m_copy_script) {
0187         copy_to_output_dir("${JANA_HOME}/bin/jana-plot-scaletest.py");
0188         LOG_INFO(m_logger)
0189             << "Testing finished. To view a plot of test results:\n"
0190             << "    cd " << m_output_dir
0191             << "\n    ./jana-plot-scaletest.py\n" << LOG_END;
0192     }
0193     else {
0194         LOG_INFO(m_logger) 
0195             << "Testing finished. To view a plot of test results:\n"
0196             << "    cd " << m_output_dir << "\n"
0197             << "    $JANA_HOME/bin/jana-plot-scaletest.py\n" << LOG_END;
0198     }
0199     m_app->Stop(true);
0200 }
0201 
0202 
0203 void JBenchmarker::copy_to_output_dir(std::string filename) {
0204 
0205     // Substitute environment variables in given filename
0206     std::string new_fname = filename;
0207     while (auto pos_start = new_fname.find("${") != new_fname.npos) {
0208         auto pos_end = new_fname.find("}", pos_start + 3);
0209         if (pos_end != new_fname.npos) {
0210 
0211             std::string envar_name = new_fname.substr(pos_start + 1, pos_end - pos_start - 1);
0212             LOG_DEBUG(m_logger) << "Looking for env var '" << envar_name
0213                                 << "'" << LOG_END;
0214 
0215             auto envar = getenv(envar_name.c_str());
0216             if (envar) {
0217                 new_fname.replace(pos_start - 1, pos_end + 2 - pos_start, envar);
0218             } else {
0219                 LOG_ERROR(m_logger) << "Environment variable '"
0220                                     << envar_name
0221                                     << "' not set. Cannot copy "
0222                                     << filename << LOG_END;
0223                 return;
0224             }
0225         } else {
0226             LOG_ERROR(m_logger) << "Error in string format: "
0227                                 << filename << LOG_END;
0228         }
0229     }
0230 
0231     // Extract filename without path
0232     std::string base_fname = new_fname;
0233     if (auto pos = base_fname.rfind("/")) base_fname.erase(0, pos);
0234     auto out_name = m_output_dir + "/" + base_fname;
0235 
0236     // Copy file
0237     LOG_INFO(m_logger) << "Copying " << new_fname << " -> " << m_output_dir << LOG_END;
0238     std::ifstream src(new_fname, std::ios::binary);
0239     std::ofstream dst(out_name, std::ios::binary);
0240     dst << src.rdbuf();
0241 
0242     // Change permissions to match source
0243     struct stat st;
0244     stat(new_fname.c_str(), &st);
0245     chmod(out_name.c_str(), st.st_mode);
0246 }
0247 
0248 
0249 
0250 
0251 
0252 
0253 
0254 
0255 
0256 
0257 
0258 
0259