Warning, /include/Geant4/tools/wroot/mpi_ntuple_column_wise is written in an unsupported language. File is not indexed.
0001 // Copyright (C) 2010, Guy Barrand. All rights reserved.
0002 // See the file tools.license for terms.
0003
0004 #ifndef tools_wroot_mpi_ntuple_column_wise
0005 #define tools_wroot_mpi_ntuple_column_wise
0006
0007 // MPI pntuple. It uses the tools/impi interface.
0008
0009 #include "base_pntuple_column_wise"
0010 #include "mpi_basket_add"
0011 #include "impi_ntuple"
0012
0013 #include "../S_STRING"
0014 #include "../forit"
0015
0016 namespace tools {
0017 namespace wroot {
0018
0019 class mpi_ntuple_column_wise : public base_pntuple_column_wise, public virtual impi_ntuple {
0020 typedef base_pntuple_column_wise parent;
0021 protected:
0022 class basket_add : public mpi_basket_add {
0023 typedef mpi_basket_add parent;
0024 public:
0025 virtual bool add_basket(basket* a_basket) { // we get ownership of a_basket.
0026 if(m_row_mode) {
0027 m_parallel_branch.m_parallel_baskets.push_back(a_basket);
0028 if(ready_to_flush_baskets(m_cols)) {return flush_baskets(m_mpi,m_dest,m_tag,m_id,m_cols);}
0029 return true;
0030 }
0031 bool status = mpi_send_basket(m_mpi,m_dest,m_tag,m_id,m_icol,*a_basket);
0032 delete a_basket;
0033 return status;
0034 }
0035 public:
0036 basket_add(impi& a_mpi,int a_dest,int a_tag,uint32 a_id,uint32 a_icol,
0037 branch& a_parallel_branch,
0038 std::vector<icol*>& a_cols,
0039 bool a_row_mode)
0040 :parent(a_mpi,a_dest,a_tag,a_id,a_icol)
0041 ,m_parallel_branch(a_parallel_branch)
0042 ,m_cols(a_cols)
0043 ,m_row_mode(a_row_mode)
0044 {}
0045 protected:
0046 basket_add(const basket_add& a_from)
0047 :branch::iadd_basket(a_from)
0048 ,parent(a_from)
0049 ,m_parallel_branch(a_from.m_parallel_branch)
0050 ,m_cols(a_from.m_cols)
0051 ,m_row_mode(a_from.m_row_mode)
0052 {}
0053 basket_add& operator=(const basket_add& a_from){
0054 parent::operator=(a_from);
0055 m_row_mode = a_from.m_row_mode;
0056 return *this;
0057 }
0058 protected:
0059 branch& m_parallel_branch;
0060 std::vector<icol*>& m_cols;
0061 bool m_row_mode;
0062 };
0063
0064 public:
0065 virtual bool add_row(impi& a_mpi,int a_dest,int a_tag) {
0066 if(m_cols.empty()) return false;
0067
0068 tools_vforit(icol*,m_cols,it) (*it)->add();
0069
0070 uint32 _icol = 0;
0071 tools_vforit(icol*,m_cols,it) {
0072 basket_add _badd(a_mpi,a_dest,a_tag,m_id,_icol,(*it)->get_branch(),m_cols,m_row_mode);_icol++;
0073 if(!(*it)->get_branch().pfill(_badd,m_nev)) return false;
0074 }
0075
0076 tools_vforit(icol*,m_cols,it) (*it)->set_def();
0077 return true;
0078 }
0079
0080 virtual bool end_fill(impi& a_mpi,int a_dest,int a_tag) {
0081 uint32 _icol = 0;
0082 tools_vforit(icol*,m_cols,it) {
0083 basket_add _badd(a_mpi,a_dest,a_tag,m_id,_icol,(*it)->get_branch(),m_cols,m_row_mode);_icol++;
0084 if(!(*it)->get_branch().end_pfill(_badd)) return false;
0085 }
0086
0087 if(m_row_mode) {
0088 size_t number;
0089 bool status = flush_remaining_baskets(number,a_mpi,a_dest,a_tag,m_id,m_cols);
0090 if(number) {
0091 m_out << "tools::wroot::mpi_ntuple_column_wise::end_fill : it remained " << number << " baskets not written on file." << std::endl;
0092 status = false;
0093 }
0094 if(!status) return false;
0095 }
0096
0097 a_mpi.pack_reset();
0098 if(!a_mpi.pack(mpi_protocol_end_fill())) return false;
0099 if(!a_mpi.pack(m_id)) return false;
0100 if(!end_leaves(a_mpi)) return false;
0101 if(!a_mpi.send_buffer(a_dest,a_tag)) return false;
0102
0103 return true;
0104 }
0105
0106 public:
0107 mpi_ntuple_column_wise(uint32 a_id,std::ostream& a_out,
0108 bool a_byte_swap,uint32 a_compression,seek a_seek_directory,
0109 const std::string& a_name,const std::string& a_title,
0110 bool a_row_mode,uint32 a_nev,
0111 bool a_verbose)
0112 :parent(a_out,a_byte_swap,a_compression,a_seek_directory,a_name,a_title,a_verbose)
0113 ,m_id(a_id)
0114 ,m_row_mode(a_row_mode)
0115 ,m_nev(a_nev)
0116 {
0117 if(m_row_mode) {
0118 if(!m_nev) m_nev = 4000; //4000*sizeof(double) = 32000 = default basket size.
0119 } else {
0120 m_nev = 0;
0121 }
0122 }
0123
0124 mpi_ntuple_column_wise(uint32 a_id,std::ostream& a_out,
0125 bool a_byte_swap,uint32 a_compression,seek a_seek_directory,
0126 const std::vector<uint32>& a_basket_sizes,const ntuple_booking& a_bkg,
0127 bool a_row_mode,uint32 a_nev,
0128 bool a_verbose)
0129 :parent(a_out,a_byte_swap,a_compression,a_seek_directory,a_basket_sizes,a_bkg,a_verbose)
0130 ,m_id(a_id)
0131 ,m_row_mode(a_row_mode)
0132 ,m_nev(a_nev)
0133 {
0134 if(m_row_mode) {
0135 if(!m_nev) m_nev = 4000; //4000*sizeof(double) = 32000 = default basket size.
0136 } else {
0137 m_nev = 0;
0138 }
0139 }
0140 virtual ~mpi_ntuple_column_wise() {}
0141 protected:
0142 mpi_ntuple_column_wise(const mpi_ntuple_column_wise& a_from)
0143 :impi_ntuple(a_from)
0144 ,parent(a_from)
0145 ,m_row_mode(a_from.m_row_mode)
0146 ,m_nev(a_from.m_nev)
0147 {}
0148 mpi_ntuple_column_wise& operator=(const mpi_ntuple_column_wise& a_from){parent::operator=(a_from);return *this;}
0149 protected:
0150 static bool ready_to_flush_baskets(std::vector<icol*>& a_cols) {
0151 //return true if all parallel branches have at least one basket in their m_parallel_baskets.
0152 if(a_cols.empty()) return false;
0153 tools_vforit(icol*,a_cols,it) {
0154 branch& _branch = (*it)->get_branch();
0155 if(_branch.m_parallel_baskets.empty()) return false;
0156 }
0157 return true;
0158 }
0159 static bool flush_baskets(impi& a_mpi,int a_dest,int a_tag,uint32 a_id,std::vector<icol*>& a_cols) {
0160 a_mpi.pack_reset();
0161 if(!a_mpi.pack(mpi_protocol_baskets())) return false;
0162 if(!a_mpi.pack(a_id)) return false;
0163
0164 bool status = true;
0165 uint32 _icol = 0;
0166 tools_vforit(icol*,a_cols,it) {
0167 branch& _branch = (*it)->get_branch();
0168 basket* _front_basket = _branch.m_parallel_baskets.front();
0169 if(status) {
0170 if(!mpi_pack_basket(a_mpi,_icol,*_front_basket)) status = false;
0171 }
0172 _branch.m_parallel_baskets.erase(_branch.m_parallel_baskets.begin());
0173 delete _front_basket;
0174 _icol++;
0175 }
0176 if(!status) return false;
0177
0178 return a_mpi.send_buffer(a_dest,a_tag);
0179 }
0180
0181 static bool flush_remaining_baskets(size_t& a_number,impi& a_mpi,int a_dest,int a_tag,uint32 a_id,std::vector<icol*>& a_cols) {
0182 a_number = 0;
0183 while(ready_to_flush_baskets(a_cols)) {
0184 if(!flush_baskets(a_mpi,a_dest,a_tag,a_id,a_cols)) return false;
0185 }
0186 // look for pending baskets.
0187 {tools_vforit(icol*,a_cols,it) {
0188 branch& _branch = (*it)->get_branch();
0189 a_number += _branch.m_parallel_baskets.size();
0190 }}
0191 {tools_vforit(icol*,a_cols,it) {
0192 branch& _branch = (*it)->get_branch();
0193 safe_clear(_branch.m_parallel_baskets);
0194 }}
0195 return true;
0196 }
0197 protected:
0198 bool end_leaves(impi& a_mpi) const {
0199 #include "MPI_SET_MAX.icc"
0200 tools_vforcit(icol*,m_cols,pit) {
0201 base_leaf* _pleaf = (*pit)->get_leaf();
0202
0203 bool set_done = false;
0204
0205 TOOLS_WROOT_MPI_NTUPLE_LEAF_STRING_SET_LENGTH_MAX
0206
0207 if(!set_done) {
0208 if(!a_mpi.pack((uint32)0)) return false;
0209 if(!a_mpi.pack((int)0)) return false;
0210 }
0211 }
0212 #undef TOOLS_WROOT_MPI_NTUPLE_SET_MAX
0213 #undef TOOLS_WROOT_MPI_NTUPLE_STRING_SET_MAX
0214 return true;
0215 }
0216 protected:
0217 uint32 m_id;
0218 bool m_row_mode;
0219 uint32 m_nev;
0220 };
0221
0222 }}
0223
0224 #endif