Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2024-05-18 08:29:46

0001 /*-
0002  * Copyright (c) 2009, 2020 Oracle and/or its affiliates.  All rights reserved.
0003  *
0004  * See the file LICENSE for license information.
0005  *
0006  * $Id$
0007  */
0008 
0009 #ifndef _DB_STL_DBC_H
0010 #define _DB_STL_DBC_H
0011 
0012 #include <errno.h>
0013 
0014 #include <set>
0015 
0016 #include "dbstl_common.h"
0017 #include "dbstl_dbt.h"
0018 #include "dbstl_exception.h"
0019 #include "dbstl_container.h"
0020 #include "dbstl_resource_manager.h"
0021 
0022 START_NS(dbstl)
0023 
0024 // Forward declarations.
0025 class db_container;
0026 class DbCursorBase;
0027 template<Typename data_dt>
0028 class RandDbCursor;
0029 class DbstlMultipleKeyDataIterator;
0030 class DbstlMultipleRecnoDataIterator;
0031 using std::set;
0032 
0033 /////////////////////////////////////////////////////////////////////
0034 /////////////////////////////////////////////////////////////////////
0035 //
0036 // LazyDupCursor class template definition.
0037 //
0038 // This class allows us to make a shallow copy on construction. When the
0039 // cursor pointer is first dereferenced a deep copy is made.
0040 //
0041 // The allowed type for BaseType is DbCursor<> and RandDbCursor<>
0042 // The expected usage of this class is:
0043 // 1. Create an iterator in container::begin(), the iterator::pcsr.csr_ptr_
0044 // points to an object, thus no need to duplicate.
0045 // 2. The iterator is created with default argument, thus the
0046 // iterator::pcsr.csr_ptr_ and dup_src_ is NULL, and this iterator is
0047 // copied using copy constructor for may be many times, but until the
0048 // cursor is really used, no cursor is duplicated.
0049 //
0050 // There is an informing mechanism between an instance of this class and
0051 // its dup_src_ cursor: when that cursor is about to change state, it will
0052 // inform all registered LazyDupCursor "listeners" of the change, so that
0053 // they will duplicate from the cursor before the change, because that
0054 // is the expected cursor state for the listeners.
0055 
0056 template <Typename BaseType>
0057 class LazyDupCursor
0058 {
0059     // dup_src_ is used by this class internally to duplicate another
0060     // cursor and set to csr_ptr_, and it is assigned in the copy
0061     // constructor from another LazyDupCursor object's csr_ptr_; csr_ptr_
0062     // is the acutual pointer that is used to perform cursor operations.
0063     //
0064     BaseType *csr_ptr_, *dup_src_;
0065     typedef LazyDupCursor<BaseType> self;
0066 
0067 public:
0068     ////////////////////////////////////////////////////////////////////
0069     //
0070     // Begin public constructors and destructor.
0071     //
0072     inline LazyDupCursor()
0073     {
0074         csr_ptr_ = NULL;
0075         dup_src_ = NULL;
0076     }
0077 
0078     // Used in all iterator types' constructors, dbcptr is created
0079     // solely for this object, and the cursor is not yet opened, so we
0080     // simply assign it to csr_ptr_.
0081     explicit inline LazyDupCursor(BaseType *dbcptr)
0082     {
0083         csr_ptr_ = dbcptr;
0084         // Already have pointer, do not need to duplicate.
0085         dup_src_ = NULL;
0086     }
0087 
0088     // Do not copy to csr_ptr_, shallow copy from dp2.csr_ptr_.
0089     LazyDupCursor(const self& dp2)
0090     {
0091         csr_ptr_ = NULL;
0092         if (dp2.csr_ptr_)
0093             dup_src_ = dp2.csr_ptr_;
0094         else
0095             dup_src_ = dp2.dup_src_;
0096         if (dup_src_)
0097             dup_src_->add_dupper(this);
0098     }
0099 
0100     ~LazyDupCursor()
0101     {
0102         // Not duplicated yet, remove from dup_src_.
0103         if (csr_ptr_ == NULL && dup_src_ != NULL)
0104             dup_src_->erase_dupper(this);
0105         if (csr_ptr_)
0106             delete csr_ptr_;// Delete the cursor.
0107     }
0108 
0109     ////////////////////////////////////////////////////////////////////
0110 
0111     // Deep copy.
0112     inline const self& operator=(const self &dp2)
0113     {
0114         BaseType *dcb;
0115 
0116         dcb = dp2.csr_ptr_ ? dp2.csr_ptr_ : dp2.dup_src_;
0117         this->operator=(dcb);
0118 
0119         return dp2;
0120     }
0121 
0122     // Deep copy.
0123     inline BaseType *operator=(BaseType *dcb)
0124     {
0125 
0126         if (csr_ptr_) {
0127             // Only dup_src_ will inform this, not csr_ptr_.
0128             delete csr_ptr_;
0129             csr_ptr_ = NULL;
0130         }
0131 
0132         if (dcb)
0133             csr_ptr_ = new BaseType(*dcb);
0134         if (dup_src_ != NULL) {
0135             dup_src_->erase_dupper(this);
0136             dup_src_ = NULL;
0137         }
0138 
0139         return dcb;
0140     }
0141 
0142     void set_cursor(BaseType *dbc)
0143     {
0144         assert(dbc != NULL);
0145         if (csr_ptr_) {
0146             // Only dup_src_ will inform this, not csr_ptr_.
0147             delete csr_ptr_;
0148             csr_ptr_ = NULL;
0149         }
0150 
0151         csr_ptr_ = dbc;
0152         if (dup_src_ != NULL) {
0153             dup_src_->erase_dupper(this);
0154             dup_src_ = NULL;
0155         }
0156     }
0157 
0158     // If dup_src_ is informing this object, pass false parameter.
0159     inline BaseType* duplicate(bool erase_dupper = true)
0160     {
0161         assert(dup_src_ != NULL);
0162         if (csr_ptr_) {
0163             // Only dup_src_ will inform this, not csr_ptr_.
0164             delete csr_ptr_;
0165             csr_ptr_ = NULL;
0166         }
0167         csr_ptr_ = new BaseType(*dup_src_);
0168         if (erase_dupper)
0169             dup_src_->erase_dupper(this);
0170         dup_src_ = NULL;
0171         return csr_ptr_;
0172     }
0173 
0174     inline BaseType* operator->()
0175     {
0176         if (csr_ptr_)
0177             return csr_ptr_;
0178 
0179         return duplicate();
0180     }
0181 
0182     inline operator bool()
0183     {
0184         return csr_ptr_ != NULL;
0185     }
0186 
0187     inline bool operator!()
0188     {
0189         return !csr_ptr_;
0190     }
0191 
0192     inline bool operator==(void *p)
0193     {
0194         return csr_ptr_ == p;
0195     }
0196 
0197     inline BaseType* base_ptr(){
0198         if (csr_ptr_)
0199             return csr_ptr_;
0200         return duplicate();
0201     }
0202 };
0203 
0204 
0205 /////////////////////////////////////////////////////////////////////////
0206 /////////////////////////////////////////////////////////////////////////
0207 //
0208 // DbCursorBase class definition.
0209 //
0210 // DbCursorBase is the base class for DbCursor<> class template, this class
0211 // wraps the Berkeley DB cursor, in order for the ResourceManager to close
0212 // the Berkeley DB cursor and set the pointer to null. 
0213 // If we don't set the cursor to NULL, the handle could become valid again,
0214 // since Berkeley DB recycles handles. DB STL would then try to use the same
0215 // handle across different instances, which is not supported.
0216 //
0217 // In ResourceManager, whenver a cursor is opened, it stores the
0218 // DbCursorBase* pointer, so that when need to close the cursor, it calls
0219 // DbCursorBase::close() function.
0220 //
0221 class DbCursorBase
0222 {
0223 protected:
0224     Dbc *csr_;
0225     DbTxn *owner_txn_;
0226     Db *owner_db_;
0227     int csr_status_;
0228 
0229 public:
0230     enum DbcGetSkipOptions{SKIP_KEY, SKIP_DATA, SKIP_NONE};
0231     inline DbTxn *get_owner_txn() const { return owner_txn_;}
0232     inline void set_owner_txn(DbTxn *otxn) { owner_txn_ = otxn;}
0233 
0234     inline Db *get_owner_db() const { return owner_db_;}
0235     inline void set_owner_db(Db *odb) { owner_db_ = odb;}
0236 
0237     inline Dbc *get_cursor()  const { return csr_;}
0238     inline Dbc *&get_cursor_reference() { return csr_;}
0239     inline void set_cursor(Dbc*csr1)
0240     {
0241         if (csr_)
0242             ResourceManager::instance()->remove_cursor(this);
0243         csr_ = csr1;
0244     }
0245 
0246     inline int close()
0247     {
0248         int ret = 0;
0249 
0250         if (csr_ != NULL && (((DBC *)csr_)->flags & DBC_ACTIVE) != 0) {
0251             ret = csr_->close();
0252             csr_ = NULL;
0253         }
0254         return ret;
0255     }
0256 
0257     DbCursorBase(){
0258         owner_txn_ = NULL;
0259         owner_db_ = NULL;
0260         csr_ = NULL;
0261         csr_status_ = 0;
0262     }
0263 
0264     DbCursorBase(const DbCursorBase &csrbase)
0265     {
0266         this->operator=(csrbase);
0267     }
0268 
0269     const DbCursorBase &operator=(const DbCursorBase &csrbase)
0270     {
0271         owner_txn_ = csrbase.owner_txn_;
0272         owner_db_ = csrbase.owner_db_;
0273         csr_ = NULL; // Need to call DbCursor<>::dup to duplicate.
0274         csr_status_ = 0;
0275         return csrbase;
0276     }
0277 
0278     virtual ~DbCursorBase()
0279     {
0280         close();
0281     }
0282 }; // DbCursorBase
0283 
0284 ////////////////////////////////////////////////////////////////////////
0285 ///////////////////////////////////////////////////////////////////////
0286 //
0287 // DbCursor class template definition
0288 //
0289 // DbCursor is the connection between Berkeley DB and dbstl container classes
0290 // it is the wrapper class for Dbc* cursor of Berkeley Db, to be used for
0291 // iterator classes of Berkeley DB backed STL container classes.
0292 // Requirement:
0293 // 1. Deep copy using Dbc->dup.
0294 // 2. Dbc*cursor management via ResourceManager class.
0295 // 3. Provide methods to do increment, decrement and advance operations,
0296 //    advance is only available for random access iterator from DB_RECNO
0297 //    containers.
0298 //
0299 
0300 template<typename key_dt, typename data_dt>
0301 class DbCursor : public DbCursorBase{
0302 protected:
0303     // Lazy duplication support: store the LazyDupCursor objects which
0304     // will duplicate from this cursor.
0305     typedef LazyDupCursor<DbCursor<key_dt, data_dt> > dupper_t;
0306     typedef LazyDupCursor<RandDbCursor<data_dt> > dupperr_t;
0307     typedef set<LazyDupCursor<DbCursor<key_dt, data_dt> >* > dupset_t;
0308     typedef set<LazyDupCursor<RandDbCursor<data_dt> >* > dupsetr_t;
0309 
0310     set<LazyDupCursor<DbCursor<key_dt, data_dt> >* > sduppers1_;
0311     set<LazyDupCursor<RandDbCursor<data_dt> >* > sduppers2_;
0312 
0313     // We must use DB_DBT_USERMEM for Dbc::get and Db::get if they are
0314     // used in multi-threaded application, so we use key_buf_ and
0315     // data_buf_ data members for get operations, and initialize them
0316     // to use user memory.
0317     Dbt key_buf_, data_buf_;
0318 
0319     // Similar to Berkeley DB C++ API's classes, used to iterate through
0320     // bulk retrieved key/data pairs.
0321     DbstlMultipleKeyDataIterator *multi_itr_;
0322     DbstlMultipleRecnoDataIterator *recno_itr_;
0323 
0324     // Whether to use bulk retrieval. If non-zero, do bulk retrieval,
0325     // bulk buffer size is this member, otherwise not bulk read.
0326     // By default this member is 0.
0327     u_int32_t bulk_retrieval_;
0328     // Whether to use DB_RMW flag in Dbc::get, by default false.
0329     bool rmw_get_;
0330 
0331     // Whether to poll data from cursor's current position on every
0332     // get_current_key/data call.
0333     // Note that curr_key_/curr_data_ members are always maintained
0334     // to contain current k/d value of the pair pointed to by csr_.
0335     // If doing bulk retrieval, this flag is ignored, we will always
0336     // read data from bulk buffer.
0337     bool directdb_get_;
0338 
0339     // Inform LazyDupCursor objects registered in this object to do
0340     // duplication because this cursor is to be changed.
0341     // This function should be called in any function of
0342     // DbCursor and RandDbCursor whenever the cursor is about to change
0343     // state(move/close, etc).
0344     inline void inform_duppers()
0345     {
0346         typename dupset_t::iterator i1;
0347         typename dupsetr_t::iterator i2;
0348         for (i1 = sduppers1_.begin(); i1 != sduppers1_.end(); i1++)
0349             (*i1)->duplicate(false);
0350         for (i2 = sduppers2_.begin(); i2 != sduppers2_.end(); i2++)
0351             (*i2)->duplicate(false);
0352         sduppers1_.clear();
0353         sduppers2_.clear();
0354     }
0355 
0356 public:
0357     friend class DataItem;
0358 
0359     // Current key/data pair pointed by "csr_" Dbc*cursor. They are both
0360     // maintained on cursor movement. If directdb_get_ is true,
0361     // they are both refreshed on every get_current{[_key][_data]} call and 
0362     // the retrieved key/data pair is returned to user.
0363     DataItem curr_key_;
0364     DataItem curr_data_;
0365 
0366     typedef DbCursor<key_dt, data_dt> self;
0367 
0368     // This function is used by all iterators to do equals comparison.
0369     // Random iterators will also use it to do less than/greater than
0370     // comparisons.
0371     // Internally, the page number difference or index difference is
0372     // returned, so for btree and hash databases, if two cursors point to
0373     // the same key/data pair, we will get 0 returned, meaning they are
0374     // equal; if return value is not 0, it means no more than that they
0375     // they are not equal. We can't assume any order information between
0376     // the two cursors. For recno databases, we use the recno to do less
0377     // than and greater than comparison. So we can get a reliable knowledge
0378     // of the relative position of two iterators from the return value.
0379     int compare(const self *csr2) const{
0380         int res, ret;
0381 
0382         BDBOP(((DBC *)csr_)->cmp((DBC *)csr_, (DBC *)csr2->csr_,
0383             &res, 0), ret);
0384         return res;
0385     }
0386 
0387     ////////////////////////////////////////////////////////////////////
0388     //
0389     // Add and remove cursor change event listeners.
0390     //
0391     inline void add_dupper(dupper_t *dupper)
0392     {
0393         sduppers1_.insert(dupper);
0394     }
0395 
0396     inline void add_dupper(dupperr_t *dupper)
0397     {
0398         sduppers2_.insert(dupper);
0399     }
0400 
0401     inline void erase_dupper(dupper_t *dup1)
0402     {
0403         sduppers1_.erase(dup1);
0404     }
0405 
0406     inline void erase_dupper(dupperr_t *dup1)
0407     {
0408         sduppers2_.erase(dup1);
0409     }
0410 
0411     ////////////////////////////////////////////////////////////////////
0412 
0413 public:
0414 
0415     inline bool get_rmw()
0416     {
0417         return rmw_get_;
0418     }
0419 
0420     bool set_rmw(bool rmw, DB_ENV *env = NULL )
0421     {
0422         u_int32_t flag = 0;
0423         DB_ENV *dbenv = NULL;
0424         int ret;
0425 
0426         if (env)
0427             dbenv = env;
0428         else
0429             dbenv = ((DBC*)csr_)->dbenv;
0430         BDBOP(dbenv->get_open_flags(dbenv, &flag), ret);
0431 
0432         // DB_RMW flag requires locking subsystem started.
0433         if (rmw && ((flag & DB_INIT_LOCK) || (flag & DB_INIT_CDB) ||
0434             (flag & DB_INIT_TXN)))
0435             rmw_get_ = true;
0436         else
0437             rmw_get_ = false;
0438         return rmw_get_;
0439     }
0440 
0441     // Modify bulk buffer size. Bulk read is enabled when creating an
0442     // iterator, so users later can only modify the bulk buffer size
0443     // to another value, but can't enable/disable bulk read while an
0444     // iterator is already alive. 
0445     // Returns true if succeeded, false otherwise.
0446     inline bool set_bulk_buffer(u_int32_t sz)
0447     {
0448         if (bulk_retrieval_ && sz) {
0449             normalize_bulk_bufsize(sz);
0450             bulk_retrieval_ = sz;
0451             return true;
0452         }
0453 
0454         return false;
0455 
0456     }
0457 
0458     inline u_int32_t get_bulk_bufsize()
0459     {
0460         return bulk_retrieval_;
0461     }
0462 
0463     inline void enlarge_dbt(Dbt &d, u_int32_t sz)
0464     {
0465         void *p;
0466 
0467         p = DbstlReAlloc(d.get_data(), sz);
0468         dbstl_assert(p != NULL);
0469         d.set_ulen(sz);
0470         d.set_data(p);
0471         d.set_size(sz);
0472     }
0473     // Move forward or backward, often by 1 key/data pair, we can use
0474     // different flags for Dbc::get function. Then update the key/data
0475     // pair and csr_status_ members.
0476     //
0477     int increment(int flag)
0478     {
0479         int ret = 0;
0480         Dbt &k = key_buf_, &d = data_buf_;
0481         u_int32_t sz, getflags = 0, bulk_bufsz;
0482 
0483         if (csr_ == NULL)
0484             return INVALID_ITERATOR_CURSOR;
0485         curr_key_.reset();
0486         curr_data_.reset();
0487         inform_duppers();
0488 
0489         // Berkeley DB cursor flags are not bitwise set, so we can't
0490         // use bit operations here.
0491         //
0492         if (this->bulk_retrieval_ != 0)
0493             switch (flag) {
0494             case DB_PREV:
0495             case DB_PREV_DUP:
0496             case DB_PREV_NODUP:
0497             case DB_LAST:
0498             case DB_JOIN_ITEM:
0499             case DB_GET_RECNO:
0500             case DB_SET_RECNO: 
0501                 break;
0502             default:
0503                 getflags |= DB_MULTIPLE_KEY;
0504                 if (data_buf_.get_ulen() != bulk_retrieval_)
0505                     enlarge_dbt(data_buf_, bulk_retrieval_);
0506                 break;
0507             }
0508 
0509         if (this->rmw_get_)
0510             getflags |= DB_RMW;
0511 
0512         // Do not use BDBOP or BDBOP2 here because it is likely
0513         // that an iteration will step onto end() position.
0514 retry:      ret = csr_->get(&k, &d, flag | getflags);
0515         if (ret == 0) {
0516             if (bulk_retrieval_ && (getflags & DB_MULTIPLE_KEY)) {
0517                 // A new retrieval, so both multi_itr_ and
0518                 // recno_itr_ must be NULL.
0519                 if (((DBC*)csr_)->dbtype == DB_RECNO) {
0520                     if (recno_itr_) {
0521                         delete recno_itr_;
0522                         recno_itr_ = NULL;
0523                     }
0524                     recno_itr_ =
0525                     new DbstlMultipleRecnoDataIterator(d);
0526                 } else {
0527                     if (multi_itr_) {
0528                         delete multi_itr_;
0529                         multi_itr_ = NULL;
0530                     }
0531                     multi_itr_ = new
0532                         DbstlMultipleKeyDataIterator(d);
0533                 }
0534             } else {
0535                 // Non bulk retrieval succeeded.
0536                 curr_key_.set_dbt(k, false);
0537                 curr_data_.set_dbt(d, false);
0538                 limit_buf_size_after_use();
0539             }
0540         } else if (ret == DB_BUFFER_SMALL) {
0541             // Either the key or data DBTs might trigger a
0542             // DB_KEYSMALL return. Only enlarge the DBT if it 
0543             // is actually too small. 
0544             if (((sz = d.get_size()) > 0) && (sz > d.get_ulen()))
0545                 enlarge_dbt(d, sz);
0546 
0547             if (((sz = k.get_size()) > 0) && (sz > k.get_ulen()))
0548                 enlarge_dbt(k, sz);
0549 
0550             goto retry;
0551         } else {
0552             if (ret == DB_NOTFOUND) {
0553                 ret = INVALID_ITERATOR_POSITION;
0554                 this->curr_key_.reset();
0555                 this->curr_data_.reset();
0556             } else if (bulk_retrieval_ &&
0557                 (getflags & DB_MULTIPLE_KEY)){
0558                 BDBOP(((DBC*)csr_)->dbp->
0559                     get_pagesize(((DBC*)csr_)->
0560                     dbp, &bulk_bufsz), ret);
0561                 if (bulk_bufsz > d.get_ulen()) {// buf size error
0562                     normalize_bulk_bufsize(bulk_bufsz);
0563                     bulk_retrieval_ = bulk_bufsz;
0564                     enlarge_dbt(d, bulk_bufsz);
0565                     goto retry;
0566                 } else
0567                     throw_bdb_exception(
0568                         "DbCursor<>::increment", ret);
0569             } else
0570                 throw_bdb_exception(
0571                     "DbCursor<>::increment", ret);
0572         }
0573 
0574         csr_status_ = ret;
0575         return ret;
0576     }
0577 
0578     // After each use of key_buf_ and data_buf_, limit their buffer size to
0579     // a reasonable size so that they don't waste a big memory space.
0580     inline void limit_buf_size_after_use() 
0581     {
0582         if (bulk_retrieval_)
0583             // Bulk buffer has to be huge, so don't check it.
0584             return;
0585 
0586         if (key_buf_.get_ulen() > DBSTL_MAX_KEY_BUF_LEN) {
0587             key_buf_.set_data(DbstlReAlloc(key_buf_.get_data(),
0588                 DBSTL_MAX_KEY_BUF_LEN));
0589             key_buf_.set_ulen(DBSTL_MAX_KEY_BUF_LEN);
0590         }
0591         if (data_buf_.get_ulen() > DBSTL_MAX_DATA_BUF_LEN) {
0592             data_buf_.set_data(DbstlReAlloc(data_buf_.get_data(),
0593                 DBSTL_MAX_DATA_BUF_LEN));
0594             data_buf_.set_ulen(DBSTL_MAX_DATA_BUF_LEN);
0595         }
0596     }
0597 
0598     // Duplicate this object's cursor and set it to dbc1.
0599     //
0600     inline int dup(DbCursor<key_dt, data_dt>& dbc1) const
0601     {
0602         Dbc* pcsr = 0;
0603         int ret;
0604 
0605         if (csr_ != 0 && csr_->dup(&pcsr, DB_POSITION) == 0) {
0606             dbc1.set_cursor(pcsr);
0607             dbc1.set_owner_db(this->get_owner_db());
0608             dbc1.set_owner_txn(this->get_owner_txn());
0609             ResourceManager::instance()->add_cursor(
0610                 this->get_owner_db(), &dbc1);
0611             ret = 0;
0612         } else
0613             ret = ITERATOR_DUP_ERROR;
0614 
0615         return ret;
0616     }
0617 
0618 public:
0619     // Open a cursor, do not move it, it is at an invalid position.
0620     // All cursors should be opened using this method.
0621     //
0622     inline int open(db_container *pdbc, int flags)
0623     {
0624         int ret;
0625 
0626         Db *pdb = pdbc->get_db_handle();
0627         if (pdb == NULL)
0628             return 0;
0629         if (csr_) // Close before open.
0630             return 0;
0631         ret = ResourceManager::instance()->
0632             open_cursor(this, pdb, flags);
0633         set_rmw(rmw_get_);
0634         this->csr_status_ = ret;
0635         return ret;
0636     }
0637 
0638     // Move Berkeley DB cursor to specified key k, by default use DB_SET,
0639     // but DB_SET_RANGE can and may also be used.
0640     //
0641     int move_to(const key_dt&k, u_int32_t flag = DB_SET)
0642     {
0643         Dbt &d1 = data_buf_;
0644         int ret;
0645         u_int32_t sz;
0646         DataItem k1(k, true);
0647 
0648         if (csr_ == NULL)
0649             return INVALID_ITERATOR_CURSOR;
0650 
0651         curr_key_.reset();
0652         curr_data_.reset();
0653         inform_duppers();
0654 
0655         // It is likely that k is not in db, causing get(DB_SET) to
0656         // fail, we should not throw an exception because of this.
0657         //
0658         if (rmw_get_)
0659             flag |= DB_RMW;
0660 retry:      ret = csr_->get(&k1.get_dbt(), &d1, flag);
0661         if (ret == 0) {
0662             curr_key_ = k1;
0663             curr_data_.set_dbt(d1, false);
0664             limit_buf_size_after_use();
0665         } else if (ret == DB_BUFFER_SMALL) {
0666             sz = d1.get_size();
0667             assert(sz > 0);
0668             enlarge_dbt(d1, sz);
0669             goto retry;
0670         } else {
0671             if (ret == DB_NOTFOUND) {
0672                 ret = INVALID_ITERATOR_POSITION;
0673                 // Invalidate current values because it is
0674                 // at an invalid position.
0675                 this->curr_key_.reset();
0676                 this->curr_data_.reset();
0677             } else
0678                 throw_bdb_exception("DbCursor<>::move_to", ret);
0679         }
0680 
0681         csr_status_ = ret;
0682         return ret;
0683     }
0684 
0685     // Returns the number of keys equal to the current one.
0686     inline size_t count()
0687     {
0688         int ret;
0689         db_recno_t cnt;
0690 
0691         BDBOP2(csr_->count(&cnt, 0), ret, close());
0692         return (size_t)cnt;
0693     }
0694 
0695     int insert(const key_dt&k, const data_dt& d, int pos = DB_BEFORE)
0696     {
0697         // !!!XXX:
0698         //         We do a deep copy of the input data into a local
0699         //         variable. Apparently not doing so causes issues
0700         //         when using gcc. Even though the put completes prior
0701         //         to returning from this function call.
0702         //         It would be best to avoid this additional copy.
0703         int ret;
0704         // (k, d) pair may be a temporary pair, so we must copy them.
0705         DataItem k1(k, false), d1(d, false);
0706 
0707         inform_duppers();
0708         if (pos == DB_AFTER) {
0709             ret = this->csr_->put(&k1.get_dbt(), &d1.get_dbt(),
0710                 pos);
0711             // May be using this flag for an empty database,
0712             // because begin() an iterator of an empty db_vector
0713             // equals its end() iterator, so use DB_KEYLAST to
0714             // retry.
0715             //
0716             if (ret == EINVAL || ret == 0)
0717                 return ret;
0718             else if (ret)
0719                 throw_bdb_exception("DbCursor<>::insert", ret);
0720         }
0721         if (pos == DB_NODUPDATA)
0722             BDBOP3(this->csr_->put(&k1.get_dbt(), &d1.get_dbt(),
0723                 pos), ret, DB_KEYEXIST, close());
0724         else
0725             BDBOP2(this->csr_->put(&k1.get_dbt(), &d1.get_dbt(),
0726                 pos), ret, close());
0727         this->csr_status_ = ret;
0728         if (ret == 0) {
0729             curr_key_ = k1;
0730             curr_data_ = d1;
0731         }
0732         // This cursor points to the new key/data pair now.
0733         return ret;
0734     }
0735 
0736     // Replace current cursor-pointed data item with d.
0737     inline int replace(const data_dt& d)
0738     {
0739         Dbt k1;
0740         int ret;
0741         // !!!XXX:
0742         //         We do a deep copy of the input data into a local
0743         //         variable. Apparently not doing so causes issues
0744         //         when using gcc. Even though the put completes prior
0745         //         to returning from this function call.
0746         //         It would be best to avoid this additional copy.
0747         // d may be a temporary object, so we must copy it.
0748         DataItem d1(d, false);
0749 
0750         
0751         BDBOP2(this->csr_->put(&k1, &d1.get_dbt(), DB_CURRENT),
0752             ret, close());
0753         curr_data_ = d1; // Update current data.
0754         
0755         this->csr_status_ = ret;
0756         return ret;
0757     }
0758 
0759     // Remove old key and insert new key-psuodo_data. First insert then
0760     // move to old key and remove it so that the cursor remains at the
0761     // old key's position, according to DB documentation.
0762     // But from practice I can see
0763     // the cursor after delete seems not at old position because a for
0764     // loop iteration exits prematurelly, not all elements are passed.
0765     //
0766     inline int replace_key(const key_dt&k)
0767     {
0768         data_dt d;
0769         key_dt k0;
0770         int ret;
0771 
0772         this->get_current_key_data(k0, d);
0773         if (k0 == k)
0774             return 0;
0775 
0776         DbCursor<key_dt, data_dt> csr2;
0777         this->dup(csr2);
0778         // Delete current, then insert new key/data pair.
0779         ret = csr2.del(); 
0780         ret = csr2.insert(k, d, DB_KEYLAST);
0781         this->csr_status_ = ret;
0782         
0783         // Now this->csr_ is sitting on an invalid position, its 
0784         // iterator is invalidated. Must first move it to the next
0785         // position before using it.
0786         return ret;
0787     }
0788 
0789     inline int del()
0790     {
0791         int ret;
0792 
0793         inform_duppers();
0794         BDBOP2(csr_->del(0), ret, close());
0795 
0796         // By default pos.csr_ will stay at where it was after delete,
0797         // which now is an invalid position. So we need to move to
0798         // next to conform to stl specifications, but we don't move it
0799         // here, iterator::erase should move the iterator itself 
0800         // forward.
0801         //
0802         this->csr_status_ = ret;
0803         return ret;
0804     }
0805 
0806     // Make sure the bulk buffer is large enough, and a multiple of 1KB. 
0807     // This function may be called prior to cursor initialization, it is 
0808     // not possible to verify that the buffer size is a multiple of the 
0809     // page size here.
0810     u_int32_t normalize_bulk_bufsize(u_int32_t &bulksz)
0811     {
0812         if (bulksz == 0)
0813             return 0;
0814 
0815         while (bulksz < 16 * sizeof(data_dt))
0816             bulksz *= 2;
0817 
0818         bulksz = bulksz + 1024 - bulksz % 1024;
0819 
0820         return bulksz;
0821     }
0822 
0823     ////////////////////////////////////////////////////////////////////
0824     //
0825     // Begin public constructors and destructor.
0826     //
0827     explicit DbCursor(u_int32_t b_bulk_retrieval = 0, bool brmw1 = false,
0828         bool directdbget = true) : DbCursorBase(),
0829         curr_key_(sizeof(key_dt)), curr_data_(sizeof(data_dt))
0830     {
0831         u_int32_t bulksz = sizeof(data_dt); // non-bulk
0832         rmw_get_ = brmw1;
0833         this->bulk_retrieval_ = 
0834             normalize_bulk_bufsize(b_bulk_retrieval);
0835         recno_itr_ = NULL;
0836         multi_itr_ = NULL;
0837 
0838         if (bulk_retrieval_) {
0839             if (bulksz <= bulk_retrieval_)
0840                 bulksz = bulk_retrieval_;
0841             else {
0842                 normalize_bulk_bufsize(bulksz);
0843                 bulk_retrieval_ = bulksz;
0844             }
0845         }
0846         key_buf_.set_data(DbstlMalloc(sizeof(key_dt)));
0847         key_buf_.set_ulen(sizeof(key_dt));
0848         key_buf_.set_flags(DB_DBT_USERMEM);
0849         data_buf_.set_data(DbstlMalloc(bulksz));
0850         data_buf_.set_ulen(bulksz);
0851         data_buf_.set_flags(DB_DBT_USERMEM);
0852         directdb_get_ = directdbget;
0853     }
0854 
0855     // Copy constructor, duplicate cursor here.
0856     DbCursor(const DbCursor<key_dt, data_dt>& dbc) :
0857         DbCursorBase(dbc),
0858         curr_key_(dbc.curr_key_), curr_data_(dbc.curr_data_)
0859     {
0860         void *pk, *pd;
0861 
0862         dbc.dup(*this);
0863         csr_status_ = dbc.csr_status_;
0864         if (csr_ || dbc.csr_)
0865             this->rmw_get_ = set_rmw(dbc.rmw_get_,
0866                 ((DBC*)dbc.csr_)->dbenv);
0867         else
0868             rmw_get_ = dbc.rmw_get_;
0869 
0870         bulk_retrieval_ = dbc.bulk_retrieval_;
0871 
0872         // Now we have to copy key_buf_ and data_buf_ to support
0873         // multiple retrieval.
0874         key_buf_.set_data(pk = DbstlMalloc(dbc.key_buf_.get_ulen()));
0875         key_buf_.set_ulen(dbc.key_buf_.get_ulen());
0876         key_buf_.set_size(dbc.key_buf_.get_size());
0877         key_buf_.set_flags(DB_DBT_USERMEM);
0878         memcpy(pk, dbc.key_buf_.get_data(), key_buf_.get_ulen());
0879 
0880         data_buf_.set_data(pd = DbstlMalloc(dbc.data_buf_.get_ulen()));
0881         data_buf_.set_ulen(dbc.data_buf_.get_ulen());
0882         data_buf_.set_size(dbc.data_buf_.get_size());
0883         data_buf_.set_flags(DB_DBT_USERMEM);
0884         memcpy(pd, dbc.data_buf_.get_data(), data_buf_.get_ulen());
0885         if (dbc.recno_itr_) {
0886             recno_itr_ = new DbstlMultipleRecnoDataIterator(
0887                 data_buf_);
0888             recno_itr_->set_pointer(dbc.recno_itr_->get_pointer());
0889         } else
0890             recno_itr_ = NULL;
0891         if (dbc.multi_itr_) {
0892             multi_itr_ = new DbstlMultipleKeyDataIterator(
0893                 data_buf_);
0894             multi_itr_->set_pointer(dbc.multi_itr_->get_pointer());
0895 
0896         } else
0897             multi_itr_ = NULL;
0898 
0899         directdb_get_ = dbc.directdb_get_;
0900 
0901         // Do not copy sduppers, they are private to each DbCursor<>
0902         // object.
0903     }
0904 
0905     virtual ~DbCursor()
0906     {
0907         close(); // Call close() ahead of freeing following buffers.
0908         free(key_buf_.get_data());
0909         free(data_buf_.get_data());
0910         if (multi_itr_)
0911             delete multi_itr_;
0912         if (recno_itr_)
0913             delete recno_itr_;
0914     }
0915 
0916     ////////////////////////////////////////////////////////////////////
0917 
0918     const DbCursor<key_dt, data_dt>& operator=
0919         (const DbCursor<key_dt, data_dt>& dbc)
0920     {
0921         void *pk;
0922         u_int32_t ulen;
0923 
0924         DbCursorBase::operator =(dbc);
0925         dbc.dup(*this);
0926         curr_key_ = dbc.curr_key_;
0927         curr_data_ = dbc.curr_data_;
0928         rmw_get_ = dbc.rmw_get_;
0929         this->bulk_retrieval_ = dbc.bulk_retrieval_;
0930         this->directdb_get_ = dbc.directdb_get_;
0931         // Now we have to copy key_buf_ and data_buf_ to support
0932         // bulk retrieval.
0933         key_buf_.set_data(pk = DbstlReAlloc(key_buf_.get_data(),
0934             ulen = dbc.key_buf_.get_ulen()));
0935         key_buf_.set_ulen(ulen);
0936         key_buf_.set_size(dbc.key_buf_.get_size());
0937         key_buf_.set_flags(DB_DBT_USERMEM);
0938         memcpy(pk, dbc.key_buf_.get_data(), ulen);
0939 
0940         data_buf_.set_data(pk = DbstlReAlloc(key_buf_.get_data(),
0941             ulen = dbc.key_buf_.get_ulen()));
0942         data_buf_.set_ulen(ulen);
0943         data_buf_.set_size(dbc.data_buf_.get_size());
0944         data_buf_.set_flags(DB_DBT_USERMEM);
0945         memcpy(pk, dbc.key_buf_.get_data(), ulen);
0946 
0947         if (dbc.recno_itr_) {
0948             if (recno_itr_) {
0949                 delete recno_itr_;
0950                 recno_itr_ = NULL;
0951             }
0952             recno_itr_ = new DbstlMultipleRecnoDataIterator(
0953                 data_buf_);
0954             recno_itr_->set_pointer(dbc.recno_itr_->get_pointer());
0955         } else if (recno_itr_) {
0956             delete recno_itr_;
0957             recno_itr_ = NULL;
0958         }
0959 
0960         if (dbc.multi_itr_) {
0961             if (multi_itr_) {
0962                 delete multi_itr_;
0963                 multi_itr_ = NULL;
0964             }
0965             multi_itr_ = new DbstlMultipleKeyDataIterator(
0966                 data_buf_);
0967             multi_itr_->set_pointer(dbc.multi_itr_->get_pointer());
0968 
0969         } else if (multi_itr_) {
0970             delete multi_itr_;
0971             multi_itr_ = NULL;
0972         }
0973 
0974         return dbc;
0975         // Do not copy sduppers, they are private to each DbCursor<>
0976         // object.
0977 
0978     }
0979 
0980     // Move Dbc*cursor to next position. If doing bulk read, read from
0981     // the bulk buffer. If bulk buffer exhausted, do another bulk read
0982     // from database, and then read from the bulk buffer. Quit if no
0983     // more data in database.
0984     //
0985     int next(int flag = DB_NEXT)
0986     {
0987         Dbt k, d;
0988         db_recno_t recno;
0989         int ret;
0990 
0991 retry:      if (bulk_retrieval_) {
0992             if (multi_itr_) {
0993                 if (multi_itr_->next(k, d)) {
0994                     curr_key_.set_dbt(k, false);
0995                     curr_data_.set_dbt(d, false);
0996                     return 0;
0997                 } else {
0998                     delete multi_itr_;
0999                     multi_itr_ = NULL;
1000                 }
1001             }
1002             if (recno_itr_) {
1003                 if (recno_itr_->next(recno, d)) {
1004                     curr_key_.set_dbt(k, false);
1005                     curr_data_.set_dbt(d, false);
1006                     return 0;
1007                 } else {
1008                     delete recno_itr_;
1009                     recno_itr_ = NULL;
1010                 }
1011             }
1012         }
1013         ret = increment(flag);
1014         if (bulk_retrieval_ && ret == 0)
1015             goto retry;
1016         return ret;
1017     }
1018 
1019     inline int prev(int flag = DB_PREV)
1020     {
1021         return increment(flag);
1022     }
1023 
1024     // Move Dbc*cursor to first element. If doing bulk read, read data
1025     // from bulk buffer.
1026     int first()
1027     {
1028         Dbt k, d;
1029         db_recno_t recno;
1030         int ret;
1031 
1032         ret = increment(DB_FIRST);
1033         if (bulk_retrieval_) {
1034             if (multi_itr_) {
1035                 if (multi_itr_->next(k, d)) {
1036                     curr_key_.set_dbt(k, false);
1037                     curr_data_.set_dbt(d, false);
1038                     return 0;
1039                 } else {
1040                     delete multi_itr_;
1041                     multi_itr_ = NULL;
1042                 }
1043             }
1044             if (recno_itr_) {
1045                 if (recno_itr_->next(recno, d)) {
1046                     curr_key_.set_dbt(k, false);
1047                     curr_data_.set_dbt(d, false);
1048                     return 0;
1049                 } else {
1050                     delete recno_itr_;
1051                     recno_itr_ = NULL;
1052                 }
1053             }
1054         }
1055 
1056         return ret;
1057     }
1058 
1059     inline int last()
1060     {
1061         return increment(DB_LAST);
1062     }
1063 
1064     // Get current key/data pair, shallow copy. Return 0 on success,
1065     // -1 if no data.
1066     inline int get_current_key_data(key_dt&k, data_dt&d)
1067     {
1068         if (directdb_get_)
1069             update_current_key_data_from_db(
1070                 DbCursorBase::SKIP_NONE);
1071         if (curr_key_.get_data(k) == 0 && curr_data_.get_data(d) == 0)
1072             return 0;
1073         else 
1074             return INVALID_KEY_DATA;
1075     }
1076 
1077     // Get current data, shallow copy. Return 0 on success, -1 if no data.
1078     inline int get_current_data(data_dt&d)
1079     {
1080         if (directdb_get_)
1081             update_current_key_data_from_db(DbCursorBase::SKIP_KEY);
1082         if (curr_data_.get_data(d) == 0)
1083             return 0;
1084         else 
1085             return INVALID_KEY_DATA;
1086     }
1087 
1088     // Get current key, shallow copy. Return 0 on success, -1 if no data.
1089     inline int get_current_key(key_dt&k)
1090     {
1091         if (directdb_get_)
1092             update_current_key_data_from_db(
1093                 DbCursorBase::SKIP_DATA);
1094         if (curr_key_.get_data(k) == 0)
1095             return 0;
1096         else 
1097             return INVALID_KEY_DATA;
1098     }
1099 
1100     inline void close()
1101     {
1102         if (csr_) {
1103             inform_duppers();
1104             ResourceManager::instance()->remove_cursor(this);
1105         }
1106         csr_ = NULL;
1107     }
1108 
1109     // Parameter skipkd specifies skip retrieving key or data:
1110     // If 0, don't skip, retrieve both;
1111     // If 1, skip retrieving key;
1112     // If 2, skip retrieving data.
1113     // Do not poll from db again if doing bulk retrieval.
1114     void update_current_key_data_from_db(DbcGetSkipOptions skipkd) {
1115         int ret;
1116         u_int32_t sz, sz1, kflags = DB_DBT_USERMEM,
1117             dflags = DB_DBT_USERMEM;
1118         // Do not poll from db again if doing bulk retrieval.
1119         if (this->bulk_retrieval_)
1120             return;
1121         if (this->csr_status_ != 0) {
1122             curr_key_.reset();
1123             curr_data_.reset();
1124             return;
1125         }
1126         
1127         // We will modify flags if skip key or data, so cache old
1128         // value and set it after get calls.
1129         if (skipkd != DbCursorBase::SKIP_NONE) {
1130             kflags = key_buf_.get_flags();
1131             dflags = data_buf_.get_flags();
1132         }
1133         if (skipkd == DbCursorBase::SKIP_KEY) {
1134             key_buf_.set_dlen(0);
1135             key_buf_.set_flags(DB_DBT_PARTIAL | DB_DBT_USERMEM);
1136         }
1137 
1138         if (skipkd == DbCursorBase::SKIP_DATA) {
1139             data_buf_.set_dlen(0);
1140             data_buf_.set_flags(DB_DBT_PARTIAL | DB_DBT_USERMEM);
1141         }
1142 retry:      ret = csr_->get(&key_buf_, &data_buf_, DB_CURRENT);
1143         if (ret == 0) {
1144             if (skipkd != DbCursorBase::SKIP_KEY)
1145                 curr_key_ = key_buf_;
1146             if (skipkd != DbCursorBase::SKIP_DATA)
1147                 curr_data_ = data_buf_;
1148             limit_buf_size_after_use();
1149         } else if (ret == DB_BUFFER_SMALL) {
1150             if ((sz = key_buf_.get_size()) > 0)
1151                 enlarge_dbt(key_buf_, sz);
1152             if ((sz1 = data_buf_.get_size()) > 0) 
1153                 enlarge_dbt(data_buf_, sz1);
1154             if (sz == 0 && sz1 == 0)
1155                 THROW0(InvalidDbtException);
1156             goto retry;
1157         } else {
1158             if (skipkd != DbCursorBase::SKIP_NONE) {
1159                 key_buf_.set_flags(kflags);
1160                 data_buf_.set_flags(dflags);
1161             }
1162             throw_bdb_exception(
1163             "DbCursor<>::update_current_key_data_from_db", ret);
1164         }
1165 
1166         if (skipkd != DbCursorBase::SKIP_NONE) {
1167             key_buf_.set_flags(kflags);
1168             data_buf_.set_flags(dflags);
1169         }
1170     }
1171 }; // DbCursor<>
1172 
1173 ////////////////////////////////////////////////////////////////////////
1174 ////////////////////////////////////////////////////////////////////////
1175 //
1176 // RandDbCursor class template definition
1177 //
1178 // RandDbCursor is a random accessible cursor wrapper for use by
1179 // db_vector_iterator, it derives from DbCursor<> class. It has a fixed key
1180 // data type, which is index_type.
1181 //
1182 typedef db_recno_t index_type;
1183 template<Typename data_dt>
1184 class RandDbCursor : public DbCursor<index_type, data_dt>
1185 {
1186 protected:
1187     friend class DataItem;
1188     typedef ssize_t difference_type;
1189 public:
1190     typedef RandDbCursor<data_dt> self;
1191     typedef DbCursor<index_type, data_dt> base;
1192 
1193     // Return current csr_ pointed element's index in recno database
1194     // (i.e. the index starting from 1). csr_ must be open and
1195     // point to an existing key/data pair.
1196     //
1197     inline index_type get_current_index() const
1198     {
1199         index_type ndx;
1200 
1201         if (this->directdb_get_)
1202             ((self *)this)->update_current_key_data_from_db(
1203                 DbCursorBase::SKIP_DATA);
1204         this->curr_key_.get_data(ndx);
1205         return ndx;
1206     }
1207 
1208     inline int compare(const self *csr2) const{
1209         index_type i1, i2;
1210 
1211         i1 = this->get_current_index();
1212         i2 = csr2->get_current_index();
1213         return i1 - i2;
1214     }
1215 
1216     // Insert data d before/after current position.
1217     int insert(const data_dt& d, int pos = DB_BEFORE){
1218         int k = 1, ret;
1219         //data_dt dta;
1220 
1221         // Inserting into empty db, must set key to 1.
1222         if (pos == DB_KEYLAST)
1223             k = 1;
1224 
1225         ret = base::insert(k, d, pos);
1226 
1227         // Inserting into a empty db using begin() itr, so flag is
1228         // DB_AFTER and surely failed, so change to use DB_KEYLAST
1229         // and try again.
1230         if (ret == EINVAL) {
1231             k = 1;
1232             pos = DB_KEYLAST;
1233             ret = base::insert(k, d, pos);
1234         }
1235         this->csr_status_ = ret;
1236         return ret;
1237     }
1238 
1239     /*
1240      * Move the cursor n positions, if reaches the beginning or end,
1241      * returns DB_NOTFOUND.
1242      */
1243     int advance(difference_type n)
1244     {
1245         int ret = 0;
1246         index_type indx;
1247         u_int32_t sz, flags = 0;
1248 
1249         indx = this->get_current_index();
1250         if (n == 0)
1251             return 0;
1252 
1253         index_type i = (index_type)n;
1254         indx += i;
1255 
1256         if (n < 0 && indx < 1) { // Index in recno db starts from 1.
1257 
1258             ret = INVALID_ITERATOR_POSITION;
1259             return ret;
1260         }
1261         this->inform_duppers();
1262 
1263         // Do a search to determine whether new position is valid.
1264         Dbt k, &d = this->data_buf_;
1265 
1266         
1267         k.set_data(&indx);
1268         k.set_size(sizeof(indx));
1269         if (this->rmw_get_)
1270             flags |= DB_RMW;
1271 
1272 retry:      if (this->csr_ && 
1273             ((ret = this->csr_->get(&k, &d, DB_SET)) == DB_NOTFOUND)) {
1274             this->csr_status_ = ret = INVALID_ITERATOR_POSITION;
1275             this->curr_key_.reset();
1276             this->curr_data_.reset();
1277         } else if (ret == DB_BUFFER_SMALL) {
1278             sz = d.get_size();
1279             assert(sz > 0);
1280             this->enlarge_dbt(d, sz);
1281             goto retry;
1282         } else if (ret == 0) {
1283             this->curr_key_.set_dbt(k, false);
1284             this->curr_data_.set_dbt(d, false);
1285             this->limit_buf_size_after_use();
1286         } else
1287             throw_bdb_exception("RandDbCursor<>::advance", ret);
1288         this->csr_status_ = ret;
1289         return ret;
1290     }
1291 
1292     // Return the last index of recno db (index starting from 1),
1293     // it will also move the underlying cursor to last key/data pair.
1294     //
1295     inline index_type last_index()
1296     {
1297         int ret;
1298 
1299         ret = this->last();
1300         if (ret)
1301             return 0;// Invalid position.
1302         else
1303             return get_current_index();
1304     }
1305 
1306     explicit RandDbCursor(u_int32_t b_bulk_retrieval = 0,
1307         bool b_rmw1 = false, bool directdbget = true)
1308         : base(b_bulk_retrieval, b_rmw1, directdbget)
1309     {
1310     }
1311 
1312     RandDbCursor(const RandDbCursor<data_dt>& rdbc) : base(rdbc)
1313     {
1314     }
1315 
1316     explicit RandDbCursor(Dbc* csr1, int posidx = 0) : base(csr1)
1317     {
1318     }
1319 
1320     virtual ~RandDbCursor()
1321     {
1322     }
1323 
1324 }; // RandDbCursor<>
1325 
1326 END_NS //ns dbstl
1327 
1328 #endif // !_DB_STL_DBC_H