Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:17:40

0001 
0002 // Copyright 2020, Jefferson Science Associates, LLC.
0003 // Subject to the terms in the LICENSE file found in the top-level directory.
0004 
0005 #include <unistd.h>
0006 #include <sys/statvfs.h>
0007 #include <sys/stat.h>
0008 #include <sys/time.h>
0009 #include <sys/resource.h>
0010 #include <pthread.h>
0011 #include <sys/time.h>
0012 #include <sys/resource.h>
0013 
0014 #ifdef __APPLE__
0015 #include <sys/types.h>
0016 #include <sys/sysctl.h>
0017 #include <mach/vm_statistics.h>
0018 #include <mach/kern_return.h>
0019 #include <mach/host_info.h>
0020 #include <mach/mach_init.h>
0021 #include <mach/mach_host.h>
0022 #endif // __APPLE__
0023 
0024 #include <map>
0025 #include <string>
0026 #include <sstream>
0027 #include <fstream>
0028 #include <iterator>
0029 #include <set>
0030 #include <mutex>
0031 #include <cmath>
0032 using namespace std;
0033 
0034 #include <JANA/JLogger.h>
0035 
0036 #include "JControlZMQ.h"
0037 #include "JControlEventProcessor.h"
0038 #include "janaJSON.h"
0039 
0040 
0041 // The following are convenience macros that make
0042 // the code below a little more succinct and easier
0043 // to read. It changes this:
0044 //
0045 //  JSONADD(key) << val;
0046 //
0047 //   to
0048 //
0049 //  ss<<"\n,\""<<key<<"\":" << val;
0050 //
0051 // The JSONADS version takes a second argument and
0052 // encapsulates it in double quotes in the JSON output.
0053 //#define JSONADD(K) ss<<",\n\""<<K<<"\":"
0054 //#define JSONADS(K,V) ss<<",\n\""<<K<<"\":\""<<V<<"\""
0055 
0056 
0057 template<class T>
0058 void JSONADD(stringstream &ss, string K, T V, int indent_level=0, bool first_element=false){
0059         if( ! first_element ) ss << ",\n";
0060         ss << string(indent_level*2, ' ');
0061         ss << "\"" << K << "\":";
0062         ss << "\"" << V << "\"";
0063 }
0064 void JSONOPENARRAY(stringstream &ss, string K, int indent_level=0, bool first_element=false){
0065     if( ! first_element ) ss << ",\n";
0066     ss << string(indent_level*2, ' ');
0067     ss << "\"" << K << "\":";
0068     ss << "[\n";
0069 }
0070 void JSONCLOSEARRAY(stringstream &ss, int indent_level=0){
0071     ss << "\n" << string(indent_level*2, ' ') + "]";
0072 }
0073 void JSONOPENSECTION(stringstream &ss, int indent_level=0, bool first_element=false){
0074     if( ! first_element ) ss << ",\n";
0075     ss << string(indent_level*2, ' ');
0076     ss << "{\n";
0077 }
0078 void JSONCLOSESECTION(stringstream &ss, int indent_level=0){
0079     ss << "\n" << string(indent_level*2, ' ') + "}";
0080 }
0081 
0082 //-------------------------------------------------------------
0083 // JControlZMQ
0084 //-------------------------------------------------------------
0085 JControlZMQ::JControlZMQ(JApplication *app, int port):_japp(app), _port(port)
0086 {
0087     // This is called from jcontrol plugin's InitPlugin() routine
0088 
0089     LOG << "Launching JControlZMQ thread ..." << LOG_END;
0090 
0091     // Create new zmq context, get the current host name, and launch server thread
0092     _zmq_context = zmq_ctx_new();
0093     gethostname( _host, 256 );
0094     _pid = getpid();
0095     _thr = new std::thread( &JControlZMQ::ServerLoop, this );
0096 
0097     // Create new JControlEventProcessor and add to application.
0098     // This is used to access event info via JEvent. It is also
0099     // used when in debug_mode to step through events one at a
0100     // time.
0101     _jproc = new JControlEventProcessor(_japp);
0102     _japp->Add(_jproc);
0103 }
0104 
0105 //-------------------------------------------------------------
0106 // ~JControlZMQ
0107 //-------------------------------------------------------------
0108 JControlZMQ::~JControlZMQ()
0109 {
0110     // Tell server thread to quit and wait for it to finish
0111     _done =true;
0112     if(_thr != nullptr) {
0113         LOG << "Joining JControlZMQ thread ..." << LOG_END;
0114         _thr->join();
0115         delete _thr;
0116         LOG << "JControlZMQ thread joined." << LOG_END;
0117     }
0118 
0119     // Delete the zmq context
0120     if( _zmq_context != nullptr ) zmq_ctx_destroy( _zmq_context );
0121 
0122 }
0123 
0124 //-------------------------------------------------------------
0125 // ServerLoop
0126 //-------------------------------------------------------------
0127 void JControlZMQ::ServerLoop()
0128 {
0129     /// This implements the zmq server for handling REQ-REP requests.
0130     /// It is run in a separate thread and should generally take very
0131     /// little CPU unless it gets pounded by requests. It will loop
0132     /// indefinitely until the internal _done member is set to true.
0133     /// Currently, that is done only by calling the destructor.
0134 
0135     LOG << "JControlZMQ server starting ..." << LOG_END;
0136 
0137     // This just makes it easier to identify this thread while debugging.
0138     // Linux and Mac OS X use different calling signatures for pthread_setname_np
0139 #if __linux__
0140     pthread_setname_np( pthread_self(), "JControlZMQ::ServerLoop" );
0141 #elif defined(__APPLE__)
0142     pthread_setname_np( "JControlZMQ::ServerLoop" );
0143 #endif
0144 
0145     // Bind to port number specified in constructor. Most likely this came from JANA_ZMQ_PORT config. parameter
0146     char bind_str[256];
0147     snprintf( bind_str, 256, "tcp://*:%d", _port );
0148     void *responder = zmq_socket( _zmq_context, ZMQ_REP );
0149     auto ret = zmq_bind( responder, bind_str);
0150     if( ret != 0 ){
0151         LOG << "JControlZMQ: Unable to bind zeroMQ control socket " << _port << "!" << LOG_END;
0152         perror("zmq_bind");
0153         return;
0154     }
0155 
0156     // Loop until told to quit
0157     while( !_done ){
0158 
0159         // Listen for request (non-blocking)
0160         char buff[2048];
0161         auto rc = zmq_recv( responder, buff, 2048, ZMQ_DONTWAIT);
0162         if( rc< 0 ){
0163             if( (errno==EAGAIN) || (errno==EINTR) ){
0164                 std::this_thread::sleep_for(std::chrono::milliseconds(250));
0165                 continue;
0166             }else{
0167                 LOG << "JControlZMQ: ERROR listening on control socket: errno=" << errno << LOG_END;
0168                 _done = true;
0169                 continue;
0170             }
0171         }
0172 
0173         // Split request into tokens
0174         std::vector<string> vals;
0175         istringstream iss( std::string(buff, rc) );
0176         copy( istream_iterator<string>(iss), istream_iterator<string>(), back_inserter(vals) );
0177 
0178         // Response is JSON that gets built up in ss
0179         stringstream ss;
0180 
0181         // Add content based on command (i.e. vals[0])
0182         if( vals.empty()){
0183             //------------------ Empty Command
0184             ss << "{message:\"Empty Command\"}";
0185         }else if( vals[0] == "quit" ){
0186             //------------------ quit
0187             _done = true;
0188             if( vals.size()>1 )  _japp->SetExitCode( strtol(vals[1].c_str(), nullptr, 10) ); // allow remote user to optionally set exit code.
0189             _japp->Quit();
0190             ss << "{message:\"OK\"}";
0191         }else if( vals[0] == "get_status" ){
0192             //------------------ get_status
0193             ss << GetStatusJSON();
0194         }else if( vals[0]=="set_nthreads" ){
0195             //------------------ set_nthreads
0196             if( vals.size()==2 ){
0197                 int Nthreads = strtol( vals[1].c_str(), nullptr, 10 );
0198                 _japp->Scale( Nthreads );
0199                 ss << "OK";
0200             }else{
0201                 ss << "wrong number of args to set_nthreads. 1 expected, " << vals.size() << " received";
0202             }
0203         }else if( vals[0]=="decrement_nthreads" ){
0204             //------------------ decrement_nthreads
0205             auto Nthreads = _japp->GetNThreads();
0206             Nthreads--;
0207             if(Nthreads>0) {
0208                 _japp->Scale(Nthreads);
0209                 ss << "OK";
0210             }else{
0211                 ss << "WARNING: Will not decrement to <1 thread";
0212             }
0213         }else if( vals[0]=="increment_nthreads" ){
0214             //------------------ increment_nthreads
0215             auto Nthreads = _japp->GetNThreads();
0216             Nthreads++;
0217             _japp->Scale( Nthreads );
0218             ss << "OK";
0219         }else if( vals[0] == "get_file_size" ){ // mulitple files may be specified
0220             //------------------ get_file_size
0221             if( vals.size()<2){
0222                 ss << "{message:\"ERROR: No file given!\"}";
0223             }else{
0224                 ss << "{";
0225                 for( uint32_t i=1; i<vals.size(); i++){
0226 
0227                     auto &fname = vals[i];
0228                     struct stat st ={};
0229                     int64_t fsize = -1;
0230                     if( stat(fname.c_str(), &st) == 0) fsize = (int64_t)st.st_size;
0231                     if( i>1 ) ss << ",";
0232                     ss << "\"" << fname << "\"" << ":" << fsize << "\n";
0233                 }
0234                 ss << "}";
0235             }
0236         }else if( vals[0] == "get_disk_space" ){ // mulitple directories may be specified
0237             //------------------ get_disk_space
0238             if( vals.size()<2){
0239                 ss << "{message:\"ERROR: No directory given!\"}";
0240             }else{
0241                 ss << "{";
0242                 for( uint32_t i=1; i<vals.size(); i++){
0243 
0244                     auto &dname = vals[i];
0245 
0246                     std::map<std::string,float> myvals;
0247                     GetDiskSpace( dname, myvals);
0248                     for( const auto &p : myvals ) ss << "\"" << p.first << "\":" << p.second << "\n";
0249                 }
0250                 ss << "}";
0251             }
0252         }else if( vals[0] == "get_factory_list" ){
0253             //------------------ get_factory_list
0254             ss << GetJANAFactoryListJSON();
0255         }else if( vals[0]=="stop" ){
0256             //------------------ stop
0257             _japp->Stop();
0258             ss << "OK";
0259         }else if( vals[0]=="resume" ){
0260             //------------------ resume
0261             _japp->Run(false);
0262             ss << "OK";
0263         }else if( vals[0]=="debug_mode" ){
0264             //------------------ debug_mode
0265             if( vals.size()==2 ){
0266                 bool debug_mode = strtol( vals[1].c_str(), nullptr, 10 ) != 0;
0267                 _jproc->SetDebugMode(debug_mode);
0268                 ss << "OK";
0269             }else{
0270                 ss << "wrong number of args to debug_mode. 1 expected, " << vals.size() << " received";
0271             }
0272         }else if( vals[0]=="next_event" ){
0273             //------------------ next_event
0274             _jproc->NextEvent();
0275             ss << "OK";
0276         }else if( vals[0] == "get_object_count" ){
0277             //------------------ get_object_count
0278             ss << GetJANAObjectListJSON();
0279         }else if( vals[0] == "get_objects" ){  // get objects from single factory iff debug_mode is currently true
0280             //------------------ get_objects
0281             std::string factory_tag = vals.size()>=4 ? vals[3]:"";
0282             ss << GetJANAObjectsJSON( vals[1], vals[2], factory_tag );
0283         }else if( vals[0] == "fetch_objects" ){ // get objects from multiple factories regardless of debug_mode state
0284             //------------------ fetch_objects
0285             ss << FetchJANAObjectsJSON( vals );
0286         }else{
0287             //------------------ Unknown Command
0288             ss << "{message:\"Bad Command: " << vals[0] << "\"}";
0289         }
0290 
0291         // Send JSON string back to requester
0292         zmq_send( responder, ss.str().data(), ss.str().length(), 0);
0293     }
0294 }
0295 //-------------------------------------------------------------
0296 // GetStatusJSON
0297 //-------------------------------------------------------------
0298 string JControlZMQ::GetStatusJSON()
0299 {
0300     // Create JSON string
0301     stringstream ss;
0302     ss << "{\n";
0303     ss << R"("program":"JANAcp")";  // (n.b. c++11 string literal)
0304 
0305     // Static info
0306     JSONADD( ss,"host" , _host );
0307     JSONADD( ss,"PID" , _pid );
0308 
0309     // Add numeric values to the vals map which will be converted into JSON below
0310     map<string,float> vals;
0311     
0312     // Get JANA status info
0313     JANAStatusPROC(vals);
0314 
0315     // Get current system info from /proc
0316     HostStatusPROC(vals);
0317 
0318     // Write all items in "vals" into the JSON formatted return string
0319     for( const auto &p : vals ) JSONADD(ss, p.first, p.second);
0320 
0321     // Close JSON string and return
0322     ss << "\n}";
0323     return ss.str(); // TODO: return this with move semantics
0324 }
0325 
0326 //---------------------------------
0327 // JANAStatusPROC
0328 //---------------------------------
0329 void JControlZMQ::JANAStatusPROC(std::map<std::string,float> &vals)
0330 {
0331     vals["NEventsProcessed"   ] = _japp->GetNEventsProcessed();
0332     vals["NThreads"           ] = _japp->GetNThreads();
0333     vals["rate_avg"           ] = _japp->GetIntegratedRate();
0334     vals["rate_instantaneous" ] = _japp->GetInstantaneousRate();
0335 }
0336 
0337 //---------------------------------
0338 // HostStatusPROC
0339 //---------------------------------
0340 void JControlZMQ::HostStatusPROC(std::map<std::string,float> &vals)
0341 {
0342     /// Get CPU and Memory stats for system and current process and add
0343     /// them to the vals dictionary. This method just defers to the
0344     /// system specific method.
0345 #ifdef __linux__
0346     HostStatusPROCLinux( vals );
0347 #elif defined(__APPLE__)
0348     HostStatusPROCMacOSX( vals );
0349 #else
0350     vals["unknown_system_type"] = 1;
0351 #endif
0352 }
0353 
0354 //---------------------------------
0355 // HostStatusPROCLinux
0356 //---------------------------------
0357 void JControlZMQ::HostStatusPROCLinux(std::map<std::string,float> & vals )
0358 {
0359 #ifdef __linux__
0360     /// Get host info using the /proc mechanism on Linux machines.
0361     /// This returns the CPU usage/idle time. In order to work,
0362     /// it needs to take two measurements separated in time and
0363     /// calculate the difference. So that we don't linger here
0364     /// too long, we maintain static members to keep track of the
0365     /// previous reading and take the delta with that.
0366 
0367     //------------------ CPU Usage ----------------------
0368     static time_t last_time = 0;
0369     static double last_user = 0.0;
0370     static double last_nice = 0.0;
0371     static double last_sys  = 0.0;
0372     static double last_idle = 0.0;
0373     static double delta_user = 0.0;
0374     static double delta_nice = 0.0;
0375     static double delta_sys  = 0.0;
0376     static double delta_idle = 1.0;
0377 
0378     time_t now = time(nullptr);
0379     if(now > last_time){
0380         ifstream ifs("/proc/stat");
0381         if( ifs.is_open() ){
0382             string cpu;
0383             double user, nice, sys, idle;
0384 
0385             ifs >> cpu >> user >> nice >> sys >> idle;
0386             ifs.close();
0387 
0388             delta_user = user - last_user;
0389             delta_nice = nice - last_nice;
0390             delta_sys  = sys  - last_sys;
0391             delta_idle = idle - last_idle;
0392             last_user = user;
0393             last_nice = nice;
0394             last_sys  = sys;
0395             last_idle = idle;
0396 
0397             last_time = now;
0398         }
0399     }
0400 
0401     double norm = delta_user + delta_nice + delta_sys + delta_idle;
0402     double user_percent = 100.0*delta_user/norm;
0403     double nice_percent = 100.0*delta_nice/norm;
0404     double sys_percent  = 100.0*delta_sys /norm;
0405     double idle_percent = 100.0*delta_idle/norm;
0406     double cpu_usage    = 100.0 - idle_percent;
0407 
0408     vals["cpu_user" ] = user_percent;
0409     vals["cpu_nice" ] = nice_percent;
0410     vals["cpu_sys"  ] = sys_percent;
0411     vals["cpu_idle" ] = idle_percent;
0412     vals["cpu_total"] = cpu_usage;
0413 
0414     //------------------ Memory Usage ----------------------
0415 
0416     // Read memory from /proc/meminfo
0417     ifstream ifs("/proc/meminfo");
0418     int mem_tot_kB = 0;
0419     int mem_free_kB = 0;
0420     int mem_avail_kB = 0;
0421     if(ifs.is_open()){
0422         char buff[4096];
0423         bzero(buff, 4096);
0424         ifs.read(buff, 4095);
0425         ifs.close();
0426 
0427         string sbuff(buff);
0428 
0429         size_t pos = sbuff.find("MemTotal:");
0430         if(pos != string::npos) mem_tot_kB = strtol(&buff[pos+10+1], nullptr, 10);
0431 
0432         pos = sbuff.find("MemFree:");
0433         if(pos != string::npos) mem_free_kB = strtol(&buff[pos+9+1], nullptr, 10);
0434 
0435         pos = sbuff.find("MemAvailable:");
0436         if(pos != string::npos) mem_avail_kB = strtol(&buff[pos+14+1], nullptr, 10);
0437     }
0438 
0439     // RAM
0440     // reported RAM from /proc/memInfo apparently excludes some amount
0441     // claimed by the kernel. To get the correct amount in GB, I did a
0442     // linear fit to the values I "knew" were correct and the reported
0443     // values in kB for several machines.
0444     int mem_tot_GB = (int)round(0.531161471 + (double)mem_tot_kB*9.65808E-7);
0445     vals["ram_tot_GB"] = mem_tot_GB;
0446     vals["ram_free_GB"] = mem_free_kB*1.0E-6;
0447     vals["ram_avail_GB"] = mem_avail_kB*1.0E-6;
0448 
0449     // Get system resource usage
0450     struct rusage usage = {};
0451     getrusage(RUSAGE_SELF, &usage);
0452     double mem_usage = (double)(usage.ru_maxrss)/1024.0; // convert to MB
0453     vals["ram_used_this_proc_GB"] = (double)mem_usage*1.0E-3;
0454 #else
0455     _DBG_<<"Calling HostStatusPROCLinux on non-Linux machine. " << vals.size() << std::endl; // vals.size() is just to prevent compiler warning
0456 #endif // __linux__
0457 }
0458 
0459 //---------------------------------
0460 // HostStatusPROCMacOSX
0461 //---------------------------------
0462 void JControlZMQ::HostStatusPROCMacOSX(std::map<std::string,float> &vals)
0463 {
0464 #ifdef __APPLE__
0465 
0466     //------------------ Memory Usage ----------------------
0467     mach_msg_type_number_t count = HOST_VM_INFO_COUNT;
0468     vm_statistics_data_t vmstat;
0469     if(KERN_SUCCESS == host_statistics(mach_host_self(), HOST_VM_INFO, (host_info_t)&vmstat, &count)) {
0470         double page_size = (double)getpagesize();
0471         double inactive = page_size*vmstat.inactive_count;
0472         double ramfree = page_size*vmstat.free_count;
0473         vals["ram_free_GB"] = ramfree/pow(1024.0, 3);
0474         vals["ram_avail_GB"] = (inactive+ramfree)/pow(1024.0, 3);
0475     }
0476 
0477     // Get total system memory (this is more stable than adding everything returned by host_statistics)
0478     int mib [] = { CTL_HW, HW_MEMSIZE };
0479     int64_t value = 0;
0480     size_t length = sizeof(value);
0481     if(sysctl(mib, 2, &value, &length, NULL, 0) != -1) vals["ram_tot_GB"] = ((double)value)/pow(1024.0, 3);
0482 
0483     // Get memory and CPU usage for this process
0484     struct rusage usage;
0485     getrusage(RUSAGE_SELF, &usage);
0486     double mem_usage = (double)(usage.ru_maxrss); // empirically, this seems to b in bytes though google claims kB (?)
0487     vals["ram_used_this_proc_GB"] = (double)mem_usage/pow(1024.0, 3); // convert to GB
0488 
0489     //------------------ CPU Usage ----------------------
0490     static time_t last_time = 0;
0491     static double last_user = 0.0;
0492     static double last_nice = 0.0;
0493     static double last_sys  = 0.0;
0494     static double last_idle = 0.0;
0495     static double delta_user = 0.0;
0496     static double delta_nice = 0.0;
0497     static double delta_sys  = 0.0;
0498     static double delta_idle = 1.0;
0499 
0500     // The following was copied from a Stack Overflow example
0501     processor_cpu_load_info_t cpuLoad;
0502     mach_msg_type_number_t processorMsgCount;
0503     natural_t processorCount;
0504 
0505     uint64_t totalSystemTime = 0, totalUserTime = 0, totalIdleTime = 0;
0506     kern_return_t err = host_processor_info(mach_host_self(), PROCESSOR_CPU_LOAD_INFO, &processorCount, (processor_info_array_t *)&cpuLoad, &processorMsgCount);
0507     if( err == KERN_SUCCESS ) {
0508         for (natural_t i = 0; i < processorCount; i++) {
0509             // Calc load types and totals, with guards against 32-bit overflow
0510             // (values are natural_t)
0511             uint64_t system = 0, user = 0, idle = 0;
0512 
0513             system = cpuLoad[i].cpu_ticks[CPU_STATE_SYSTEM];
0514             user = cpuLoad[i].cpu_ticks[CPU_STATE_USER] + cpuLoad[i].cpu_ticks[CPU_STATE_NICE];
0515             idle = cpuLoad[i].cpu_ticks[CPU_STATE_IDLE];
0516 
0517             totalSystemTime += system;
0518             totalUserTime += user;
0519             totalIdleTime += idle;
0520         }
0521     }
0522 
0523     // Similar to Linux version, we must use two measurements to get a usage rate.
0524     time_t now = time(nullptr);
0525     if(now > last_time){
0526         double user, nice=0, sys, idle;
0527 
0528         user = totalUserTime;
0529         sys = totalSystemTime;
0530         idle = totalIdleTime;
0531 
0532         delta_user = user - last_user;
0533         delta_nice = nice - last_nice;
0534         delta_sys  = sys  - last_sys;
0535         delta_idle = idle - last_idle;
0536         last_user = user;
0537         last_nice = nice;
0538         last_sys  = sys;
0539         last_idle = idle;
0540 
0541         last_time = now;
0542     }
0543 
0544     double norm = delta_user + delta_nice + delta_sys + delta_idle;
0545     double user_percent = 100.0*delta_user/norm;
0546     double nice_percent = 100.0*delta_nice/norm;
0547     double sys_percent  = 100.0*delta_sys /norm;
0548     double idle_percent = 100.0*delta_idle/norm;
0549     double cpu_usage    = 100.0 - idle_percent;
0550 
0551     vals["cpu_user" ] = user_percent;
0552     vals["cpu_nice" ] = nice_percent;
0553     vals["cpu_sys"  ] = sys_percent;
0554     vals["cpu_idle" ] = idle_percent;
0555     vals["cpu_total"] = cpu_usage;
0556 
0557 #else
0558     _DBG_<<"Calling HostStatusPROCMacOSX on non-APPLE machine. " << vals.size() << std::endl; // vals.size() is just to prevent compiler warning
0559 #endif // __APPLE__
0560 }
0561 
0562 //---------------------------------
0563 // GetDiskSpace
0564 //---------------------------------
0565 void JControlZMQ::GetDiskSpace(const std::string &dirname, std::map<std::string,float> &vals)
0566 {
0567     // Attempt to get stats on the disk specified by dirname.
0568     // If found, they are added to vals. If no directory by
0569     // that name is found then nothing is added to vals and
0570     // this returns quietly.
0571 
0572     struct statvfs vfs = {};
0573     int err = statvfs(dirname.c_str(), &vfs);
0574     if( err != 0 ) return;
0575 
0576     double total_GB = vfs.f_bsize * vfs.f_blocks * 1.0E-9;
0577     double avail_GB = vfs.f_bsize * vfs.f_bavail * 1.0E-9;
0578     double used_GB  = total_GB-avail_GB;
0579     double used_percent = 100.0*used_GB/total_GB;
0580 
0581     vals[dirname+"_tot"] = total_GB;
0582     vals[dirname+"_avail"] = avail_GB;
0583     vals[dirname+"_used"] = used_GB;
0584     vals[dirname+"_percent_used"] = used_percent;
0585 }
0586 
0587 //---------------------------------
0588 // GetJANAFactoryListJSON
0589 //---------------------------------
0590 std::string JControlZMQ::GetJANAFactoryListJSON()
0591 {
0592     // Create JSON string
0593     stringstream ss;
0594     ss << "{\n";
0595     ss << R"("program":"JANAcp")";  // (n.b. c++11 string literal)
0596 
0597     // Static info
0598     JSONADD( ss,"host" , _host );
0599     JSONADD( ss,"PID" , _pid );
0600     ss << ",\n" << R"("factories":[)";
0601 
0602     const auto& component_summary = _japp->GetComponentSummary();
0603     bool is_first = true;
0604     for( const auto& fac_summary : component_summary.factories ){
0605 
0606         int indent_level = 2;
0607         if( !is_first ) ss << ",";
0608         is_first = false;
0609         ss << "\n" + string(indent_level*2, ' ') + "{\n";
0610         JSONADD( ss, "plugin_name" , fac_summary.plugin_name, indent_level, true );
0611         JSONADD( ss, "factory_name" , fac_summary.factory_name, indent_level );
0612         JSONADD( ss, "factory_tag" , fac_summary.factory_tag, indent_level );
0613         JSONADD( ss,"object_name" , fac_summary.object_name, indent_level );
0614         ss << "\n" + string(indent_level*2, ' ') + "}";
0615     }
0616     ss << "\n  ]\n";
0617 
0618     // Close JSON string and return
0619     ss << "}\n";
0620 //    cout << ss.str() << endl;
0621     return ss.str(); // TODO: return this with move semantics
0622 }
0623 
0624 //---------------------------------
0625 // GetJANAObjectListJSON
0626 //---------------------------------
0627 std::string JControlZMQ::GetJANAObjectListJSON(){
0628     /// Get a list of all objects in JSON format. This only reports the
0629     /// the factory name, tag, object type, plugin and number of objects
0630     /// already produced for this event.
0631 
0632     // Get list of factories and number of objects they've created this event already
0633     std::map<JFactorySummary, std::size_t> factory_object_counts;
0634     _jproc->GetObjectStatus( factory_object_counts );
0635 
0636     // Create JSON string
0637     stringstream ss;
0638     ss << "{\n";
0639     ss << R"("program":"JANAcp")";  // (n.b. c++11 string literal)
0640 
0641     // Static info
0642     JSONADD( ss,"host" , _host );
0643     JSONADD( ss,"PID" , _pid );
0644     JSONADD( ss,"run_number" , _jproc->GetRunNumber() );
0645     JSONADD( ss,"event_number" , _jproc->GetEventNumber() );
0646     ss << ",\n" << R"("factories":[)";
0647 
0648     bool is_first = true;
0649     for( auto pfac_summary : factory_object_counts ){
0650 
0651         auto &fac_summary = pfac_summary.first;
0652         auto &Nobjects    = pfac_summary.second;
0653 
0654         int indent_level = 2;
0655         if( !is_first ) ss << ",";
0656         is_first = false;
0657         ss << "\n" + string(indent_level*2, ' ') + "{\n";
0658         JSONADD( ss, "plugin_name" , fac_summary.plugin_name, indent_level, true );
0659         JSONADD( ss, "factory_name" , fac_summary.factory_name, indent_level );
0660         JSONADD( ss, "factory_tag" , fac_summary.factory_tag, indent_level );
0661         JSONADD( ss,"object_name" , fac_summary.object_name, indent_level );
0662         JSONADD( ss,"nobjects" , Nobjects, indent_level );
0663         ss << "\n" + string(indent_level*2, ' ') + "}";
0664     }
0665     ss << "\n  ]\n";
0666 
0667     // Close JSON string and return
0668     ss << "}\n";
0669 //    cout << ss.str() << endl;
0670     return ss.str(); // TODO: return this with move semantics
0671 }
0672 
0673 //---------------------------------
0674 // GetJANAObjectsJSON
0675 //---------------------------------
0676 std::string JControlZMQ::GetJANAObjectsJSON(const std::string &object_name, const std::string &factory_name, const std::string &factory_tag){
0677     /// Get the object contents (if possible) for the specified factory.
0678     /// If this is called while not in debug_mode then it will return
0679     /// no objects.
0680 
0681     // Get map of objects where key is address as hex string
0682     std::map<std::string, JObjectSummary> objects;
0683     _jproc->GetObjects( factory_name, factory_tag, object_name, objects );
0684 
0685     // Build map of values to create JSON record of. Add some redundant
0686     // info so there is the option of verifying the exact origin of this
0687     // by the consumer.
0688     std::unordered_map<std::string, std::string> mvals;
0689     mvals["program"] = "JANAcp";
0690     mvals["host"] = _host;
0691     mvals["PID"] = ToString(_pid);
0692     mvals["run_number"] = ToString(_jproc->GetRunNumber());
0693     mvals["event_number"] = ToString(_jproc->GetEventNumber());
0694     mvals["object_name"] = object_name;
0695     mvals["factory_name"] = factory_name;
0696     mvals["factory_tag"] = factory_tag;
0697     mvals["objects"] = JJSON_Create(objects, 2); // Create JSON of objects
0698 
0699     // Create JSON string
0700     std::string json = JJSON_Create(mvals);
0701 //    cout << json << std::endl;
0702     return json;
0703 }
0704 
0705 //---------------------------------
0706 // FetchJANAObjectsJSON
0707 //---------------------------------
0708 std::string JControlZMQ::FetchJANAObjectsJSON(std::vector<std::string> &vals){
0709     /// Fetch multiple objects from the next event to be processed or
0710     /// from the current event if debug_mode is currently true.
0711     /// This is intended to be used in a situation where the event processing
0712     /// should be allowed to continue basically uninhibited and one simply
0713     /// wants to spectate occasional events. E.g. a remote event monitor.
0714     ///
0715     /// Upon entry, vals will contain the full command which should look like
0716     ///
0717     ///   "fetch_objects" "factory:tag1" "factory:tag2" ...
0718     ///
0719     /// where "factory:tag" is the combined factory + tag string.
0720 
0721     std::set<std::string> factorytags;
0722     for( size_t i=1; i<vals.size(); i++ ) factorytags.insert( vals[i] );
0723     _jproc->SetFetchFactories( factorytags ); // this automatically sets the fetch flag
0724 
0725     // If we are in debug_mode then fetch the objects immediately for the 
0726     // current event. Note that FetchObjectsNow will clear the fetch_flag
0727     // so the wait loop below will exit immediately on the first iteration.
0728     if( _jproc->GetDebugMode() ) _jproc->FetchObjectsNow();
0729     
0730     // Build map of values to create JSON record of. Add some redundant
0731     // info so there is the option of verifying the exact origin of this
0732     // by the consumer.
0733     std::unordered_map<std::string, std::string> mvals;
0734     mvals["program"] = "JANAcp";
0735     mvals["host"] = _host;
0736     mvals["PID"] = ToString(_pid);
0737     
0738     // Wait up to 3 seconds for the fetch to finish.
0739     for(int i=0; i<1000; i++){
0740         if( !_jproc->GetFetchFlag() ) break;
0741         std::this_thread::sleep_for(std::chrono::milliseconds(3));
0742     }
0743 
0744     // run_number and event_number are recorded when the data is fetched
0745     // from the event into the metadata. Append all FetchMetadata to
0746     // record.
0747     auto metadata = _jproc->GetLastFetchMetadata();
0748     mvals.insert(metadata.begin(), metadata.end() );
0749     
0750      // Get the results of the fetch operation
0751     auto results = _jproc->GetLastFetchResult();
0752 
0753     // Convert all object summaries into JSON strings
0754     std::unordered_map<std::string, std::string> mobjvals;
0755     for( auto& [factorytag, objects] : results ){
0756         mobjvals[factorytag] = JJSON_Create(objects, 3); // Create JSON of objects
0757     }
0758     mvals["objects"] = JJSON_Create(mobjvals);
0759 
0760     // Create JSON string
0761     std::string json = JJSON_Create(mvals);
0762     return json;
0763 }
0764 
0765