Ignore:
Timestamp:
Nov 27, 2012, 4:43:17 PM (13 years ago)
Author:
Silvan Scherrer
Message:

Samba Server: updated trunk to 3.6.0

Location:
trunk/server
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/server

  • trunk/server/lib/tdb/common/lock.c

    r647 r745  
    77   Copyright (C) Paul `Rusty' Russell              2000
    88   Copyright (C) Jeremy Allison                    2000-2003
    9    
     9
    1010     ** NOTE! The following LGPL license applies to the tdb
    1111     ** library. This does NOT imply that all of Samba is released
    1212     ** under the LGPL
    13    
     13
    1414   This library is free software; you can redistribute it and/or
    1515   modify it under the terms of the GNU Lesser General Public
     
    2727
    2828#include "tdb_private.h"
    29 
    30 #define TDB_MARK_LOCK 0x80000000
    31 
    3229#ifdef __OS2__
    3330
    34 static char* lock_type( int lck)
    35 {
    36         static char buffer[16];
    37         switch(lck) {
    38         case F_GETLK: return "F_GETLK";
    39         case F_SETLK: return "F_SETLK";
    40         case F_SETLKW: return "F_SETLKW";
    41         default:
    42                 sprintf( buffer, "unknown %d", lck);
    43         }
    44         return buffer;
     31static char* lock_type( bool waitflag)
     32{
     33        if (waitflag)
     34                return "F_SETLKW";
     35        else
     36                return "F_SETLK";
    4537}
    4638
     
    5951
    6052static int _mutex_brlock(struct tdb_context *tdb, tdb_off_t offset,
    61                int rw_type, int lck_type, int probe, size_t len)
     53               int rw_type, bool waitflag, size_t len)
    6254{
    6355        HMTX    hSem;
     
    6658       
    6759        switch( offset) {
    68         case GLOBAL_LOCK:
     60        case OPEN_LOCK:
    6961                hSem = tdb->hGlobalLock;
    7062                break;
     
    8678        TDB_LOG((tdb, TDB_DEBUG_TRACE,"_mutex_brlock handle %d, offset %d\n", hSem, offset));
    8779
    88         if (lck_type == F_SETLKW)
     80        if (waitflag)
    8981                ulTimeout = SEM_INDEFINITE_WAIT;
    9082        else
     
    112104        errno = EINVAL;
    113105        TDB_LOG(( tdb, TDB_DEBUG_ERROR, "_mutex_brlock pid %X, failed (fd=%d) at offset %d rw_type=%d lck_type=%d len=%d, rc=%d\n",
    114                  getpid(), tdb->fd, offset, rw_type, lck_type, (int)len, rc));
     106                 getpid(), tdb->fd, offset, rw_type, waitflag, (int)len, rc));
    115107        tdb->ecode = TDB_ERR_LOCK;
    116108        return -1;
     
    118110#endif
    119111
    120 void tdb_setalarm_sigptr(struct tdb_context *tdb, volatile sig_atomic_t *ptr)
     112_PUBLIC_ void tdb_setalarm_sigptr(struct tdb_context *tdb, volatile sig_atomic_t *ptr)
    121113{
    122114        tdb->interrupt_sig_ptr = ptr;
    123115}
    124116
    125 /* a byte range locking function - return 0 on success
    126    this functions locks/unlocks 1 byte at the specified offset.
    127 
    128    On error, errno is also set so that errors are passed back properly
    129    through tdb_open().
    130 
    131    note that a len of zero means lock to end of file
    132 */
    133 int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset,
    134                int rw_type, int lck_type, int probe, size_t len)
    135 {
    136 
     117static int fcntl_lock(struct tdb_context *tdb,
     118                      int rw, tdb_off_t off, size_t len, bool waitflag)
     119{
    137120#ifdef __OS2__
    138121        APIRET      rc;
    139         ULONG       fAccess = 0;
    140         int         fLock = 0;
     122        ULONG       fAccess = 0; // default exclusiv
     123        int         fLock = 1;   // default lock
    141124        off_t       cbFile;
    142125        off_t       offStart;
    143126        off_t       cbRange;
    144 
    145         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_brlock pid %X, fd %d, lck_type %s, rw_type %s, offset %d, len %d\n",
    146                 getpid(), tdb->fd, lock_type(lck_type), read_type(rw_type), offset, len));
    147 
    148         if (tdb->flags & TDB_NOLOCK) {
    149                 return 0;
    150         }
    151 
    152         if ((rw_type == F_WRLCK) && (tdb->read_only || tdb->traverse_read)) {
    153                 tdb->ecode = TDB_ERR_RDONLY;
    154                 return -1;
    155         }
     127        FILELOCK    lockArea = {0}, unlockArea = {0};
     128
     129        TDB_LOG((tdb, TDB_DEBUG_TRACE, "fcntl_lock in pid %X, fd %d, lck_type %s, rw_type %s, offset %d, len %d\n",
     130                getpid(), tdb->fd, lock_type(waitflag), read_type(rw), off, len));
    156131       
    157         switch( offset) {
    158         case GLOBAL_LOCK:
     132        switch(off) {
     133        case OPEN_LOCK:
    159134        case ACTIVE_LOCK:
    160135        case TRANSACTION_LOCK:
    161                 return _mutex_brlock( tdb, offset, rw_type,  lck_type,  probe, len);
     136                return _mutex_brlock(tdb, off, rw, waitflag, len);
    162137        }
    163138
    164139        /* flags and order */
    165         fAccess = 0; /* exclusive */
    166         switch (rw_type)
     140        switch (rw)
    167141        {
    168142                case F_UNLCK:
    169143                        fLock = 0;
     144                        unlockArea.lOffset = off;
     145                        unlockArea.lRange  = len ? len : tdb->header.hash_size *4; //was LONG_MAX
    170146                        break;
    171147                case F_RDLCK:
     148                        lockArea.lOffset = off;
     149                        lockArea.lRange  = len ? len : LONG_MAX;
    172150                        fAccess = 1; /* read-only */
     151                        break;
    173152                case F_WRLCK:
    174                         fLock = 1;
     153                        lockArea.lOffset = off;
     154                        lockArea.lRange  = len ? len : LONG_MAX;
    175155                        break;
    176156                default:
     
    178158        }
    179159               
    180         FILELOCK   aflock[2];
    181         bzero(&aflock[(fLock + 1) & 1], sizeof(aflock[0]));
    182         aflock[fLock].lOffset = offset;
    183         aflock[fLock].lRange  = len ? len : LONG_MAX;
    184         rc = DosSetFileLocks(tdb->fd, &aflock[0], &aflock[1], SEM_IMMEDIATE_RETURN, fAccess);
    185 
    186         if (rc != NO_ERROR && lck_type == F_SETLKW) {
     160        rc = DosSetFileLocks(tdb->fd, &unlockArea, &lockArea, SEM_IMMEDIATE_RETURN, fAccess);
     161
     162        if (rc != NO_ERROR && waitflag) {
    187163                int     count = 20;
    188164                do {
    189                         rc = DosSetFileLocks(tdb->fd, &aflock[0], &aflock[1], 100, fAccess);
     165                        rc = DosSetFileLocks(tdb->fd, &unlockArea, &lockArea, 100, fAccess);
    190166                        count--;
    191167                } while( count>0 && rc !=NO_ERROR);
    192168
    193169        }
     170
     171        TDB_LOG(( tdb, TDB_DEBUG_TRACE, "fcntl_lock out pid %X, fd %d, lck_type %s, rw_type %s, offset %d, len %d, rc=%d\n",
     172                getpid(), tdb->fd, lock_type(waitflag), read_type(rw), off, len, rc));
     173
    194174        if (rc != NO_ERROR) {
    195175                errno  = EINVAL;
    196                 /* Generic lock error. errno set by fcntl.
    197                  * EAGAIN is an expected return from non-blocking
    198                  * locks. */
    199                 if (!probe && lck_type != F_SETLK) {
    200                         /* Ensure error code is set for log fun to examine. */
    201                         tdb->ecode = TDB_ERR_LOCK;
    202                         TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d len=%d\n",
    203                                  tdb->fd, offset, rw_type, lck_type, (int)len));
     176                return -1;
     177        }
     178
     179#else
     180        struct flock fl;
     181
     182        fl.l_type = rw;
     183        fl.l_whence = SEEK_SET;
     184        fl.l_start = off;
     185        fl.l_len = len;
     186        fl.l_pid = 0;
     187
     188        if (waitflag)
     189                return fcntl(tdb->fd, F_SETLKW, &fl);
     190        else
     191                return fcntl(tdb->fd, F_SETLK, &fl);
     192#endif
     193}
     194
     195static int fcntl_unlock(struct tdb_context *tdb, int rw, tdb_off_t off, size_t len)
     196{
     197        struct flock fl;
     198#if 0 /* Check they matched up locks and unlocks correctly. */
     199        char line[80];
     200        FILE *locks;
     201        bool found = false;
     202
     203        locks = fopen("/proc/locks", "r");
     204
     205        while (fgets(line, 80, locks)) {
     206                char *p;
     207                int type, start, l;
     208
     209                /* eg. 1: FLOCK  ADVISORY  WRITE 2440 08:01:2180826 0 EOF */
     210                p = strchr(line, ':') + 1;
     211                if (strncmp(p, " POSIX  ADVISORY  ", strlen(" POSIX  ADVISORY  ")))
     212                        continue;
     213                p += strlen(" FLOCK  ADVISORY  ");
     214                if (strncmp(p, "READ  ", strlen("READ  ")) == 0)
     215                        type = F_RDLCK;
     216                else if (strncmp(p, "WRITE ", strlen("WRITE ")) == 0)
     217                        type = F_WRLCK;
     218                else
     219                        abort();
     220                p += 6;
     221                if (atoi(p) != getpid())
     222                        continue;
     223                p = strchr(strchr(p, ' ') + 1, ' ') + 1;
     224                start = atoi(p);
     225                p = strchr(p, ' ') + 1;
     226                if (strncmp(p, "EOF", 3) == 0)
     227                        l = 0;
     228                else
     229                        l = atoi(p) - start + 1;
     230
     231                if (off == start) {
     232                        if (len != l) {
     233                                fprintf(stderr, "Len %u should be %u: %s",
     234                                        (int)len, l, line);
     235                                abort();
     236                        }
     237                        if (type != rw) {
     238                                fprintf(stderr, "Type %s wrong: %s",
     239                                        rw == F_RDLCK ? "READ" : "WRITE", line);
     240                                abort();
     241                        }
     242                        found = true;
     243                        break;
    204244                }
    205 
    206                 TDB_LOG(( tdb, TDB_DEBUG_TRACE, "tdb_brlock pid %X, failed (fd=%d) at offset %d rw_type=%d lck_type=%d len=%d\n",
    207                          getpid(), tdb->fd, offset, rw_type, lck_type, (int)len));
    208                 tdb->ecode = TDB_ERR_LOCK;
    209                 return -1;
    210         }
    211 
    212         TDB_LOG(( tdb, TDB_DEBUG_TRACE, "tdb_brlock pid %X, fd %d, lck_type %s, rw_type %s, offset %d, len %d DONE\n",
    213                 getpid(), tdb->fd, lock_type(lck_type), read_type(rw_type), offset, len));
    214 
     245        }
     246
     247        if (!found) {
     248                fprintf(stderr, "Unlock on %u@%u not found!\n",
     249                        (int)off, (int)len);
     250                abort();
     251        }
     252
     253        fclose(locks);
     254#endif
     255
     256        fl.l_type = F_UNLCK;
     257        fl.l_whence = SEEK_SET;
     258        fl.l_start = off;
     259        fl.l_len = len;
     260        fl.l_pid = 0;
     261#ifdef __OS2__
     262        return fcntl_lock(tdb, F_UNLCK, off, len, 1);
    215263#else
    216 
    217         struct flock fl;
     264        return fcntl(tdb->fd, F_SETLKW, &fl);
     265#endif
     266}
     267
     268/* list -1 is the alloc list, otherwise a hash chain. */
     269static tdb_off_t lock_offset(int list)
     270{
     271        return FREELIST_TOP + 4*list;
     272}
     273
     274/* a byte range locking function - return 0 on success
     275   this functions locks/unlocks 1 byte at the specified offset.
     276
     277   On error, errno is also set so that errors are passed back properly
     278   through tdb_open().
     279
     280   note that a len of zero means lock to end of file
     281*/
     282int tdb_brlock(struct tdb_context *tdb,
     283               int rw_type, tdb_off_t offset, size_t len,
     284               enum tdb_lock_flags flags)
     285{
    218286        int ret;
    219287
    220288        if (tdb->flags & TDB_NOLOCK) {
     289                return 0;
     290        }
     291
     292        if (flags & TDB_LOCK_MARK_ONLY) {
    221293                return 0;
    222294        }
     
    227299        }
    228300
    229         fl.l_type = rw_type;
    230         fl.l_whence = SEEK_SET;
    231         fl.l_start = offset;
    232         fl.l_len = len;
    233         fl.l_pid = 0;
    234 
    235301        do {
    236                 ret = fcntl(tdb->fd,lck_type,&fl);
    237 
     302                ret = fcntl_lock(tdb, rw_type, offset, len,
     303                                 flags & TDB_LOCK_WAIT);
    238304                /* Check for a sigalarm break. */
    239305                if (ret == -1 && errno == EINTR &&
     
    249315                 * EAGAIN is an expected return from non-blocking
    250316                 * locks. */
    251                 if (!probe && lck_type != F_SETLK) {
    252                         TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d len=%d\n",
    253                                  tdb->fd, offset, rw_type, lck_type, (int)len));
     317                if (!(flags & TDB_LOCK_PROBE) && errno != EAGAIN) {
     318                        TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d flags=%d len=%d\n",
     319                                 tdb->fd, offset, rw_type, flags, (int)len));
    254320                }
    255321                return -1;
    256322        }
    257323        return 0;
    258 #endif
    259 }
    260 
     324}
     325
     326int tdb_brunlock(struct tdb_context *tdb,
     327                 int rw_type, tdb_off_t offset, size_t len)
     328{
     329        int ret;
     330
     331        if (tdb->flags & TDB_NOLOCK) {
     332                return 0;
     333        }
     334
     335        do {
     336                ret = fcntl_unlock(tdb, rw_type, offset, len);
     337        } while (ret == -1 && errno == EINTR);
     338
     339        if (ret == -1) {
     340                TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brunlock failed (fd=%d) at offset %d rw_type=%d len=%d\n",
     341                         tdb->fd, offset, rw_type, (int)len));
     342        }
     343        return ret;
     344}
    261345
    262346/*
     
    266350  made. For those OSes we may loop for a while. 
    267351*/
    268 int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len)
     352int tdb_allrecord_upgrade(struct tdb_context *tdb)
    269353{
    270354        int count = 1000;
     355
     356        if (tdb->allrecord_lock.count != 1) {
     357                TDB_LOG((tdb, TDB_DEBUG_ERROR,
     358                         "tdb_allrecord_upgrade failed: count %u too high\n",
     359                         tdb->allrecord_lock.count));
     360                return -1;
     361        }
     362
     363        if (tdb->allrecord_lock.off != 1) {
     364                TDB_LOG((tdb, TDB_DEBUG_ERROR,
     365                         "tdb_allrecord_upgrade failed: already upgraded?\n"));
     366                return -1;
     367        }
     368
    271369        while (count--) {
    272370                struct timeval tv;
    273 
    274371#ifdef __OS2__
    275372                // YD we cannot upgrade without an unlock first...
    276                 tdb_brlock(tdb, offset, F_UNLCK, F_SETLKW, 1, len);
     373                tdb_brlock(tdb, F_UNLCK, FREELIST_TOP, 0, TDB_LOCK_WAIT|TDB_LOCK_PROBE);
    277374#endif
    278 
    279                 if (tdb_brlock(tdb, offset, F_WRLCK, F_SETLKW, 1, len) == 0) {
     375                if (tdb_brlock(tdb, F_WRLCK, FREELIST_TOP, 0,
     376                               TDB_LOCK_WAIT|TDB_LOCK_PROBE) == 0) {
     377                        tdb->allrecord_lock.ltype = F_WRLCK;
     378                        tdb->allrecord_lock.off = 0;
    280379                        return 0;
    281380                }
     
    288387                select(0, NULL, NULL, NULL, &tv);
    289388        }
    290         TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock_upgrade failed at offset %d\n", offset));
     389        TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_allrecord_upgrade failed\n"));
    291390        return -1;
    292391}
    293392
    294 
    295 /* lock a list in the database. list -1 is the alloc list */
    296 static int _tdb_lock(struct tdb_context *tdb, int list, int ltype, int op)
     393static struct tdb_lock_type *find_nestlock(struct tdb_context *tdb,
     394                                           tdb_off_t offset)
     395{
     396        unsigned int i;
     397
     398        for (i=0; i<tdb->num_lockrecs; i++) {
     399                if (tdb->lockrecs[i].off == offset) {
     400                        return &tdb->lockrecs[i];
     401                }
     402        }
     403        return NULL;
     404}
     405
     406/* lock an offset in the database. */
     407int tdb_nest_lock(struct tdb_context *tdb, uint32_t offset, int ltype,
     408                  enum tdb_lock_flags flags)
    297409{
    298410        struct tdb_lock_type *new_lck;
    299         int i;
    300         bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
    301 
    302         ltype &= ~TDB_MARK_LOCK;
    303 
    304         /* a global lock allows us to avoid per chain locks */
    305         if (tdb->global_lock.count &&
    306             (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
    307                 return 0;
    308         }
    309 
    310         if (tdb->global_lock.count) {
     411
     412        if (offset >= lock_offset(tdb->header.hash_size)) {
    311413                tdb->ecode = TDB_ERR_LOCK;
    312                 return -1;
    313         }
    314 
    315         if (list < -1 || list >= (int)tdb->header.hash_size) {
    316                 tdb->ecode = TDB_ERR_LOCK;
    317                 TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_lock: invalid list %d for ltype=%d\n",
    318                            list, ltype));
     414                TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_lock: invalid offset %u for ltype=%d\n",
     415                         offset, ltype));
    319416                return -1;
    320417        }
     
    322419                return 0;
    323420
    324         for (i=0; i<tdb->num_lockrecs; i++) {
    325                 if (tdb->lockrecs[i].list == list) {
    326                         if (tdb->lockrecs[i].count == 0) {
    327                                 /*
    328                                  * Can't happen, see tdb_unlock(). It should
    329                                  * be an assert.
    330                                  */
    331                                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock: "
    332                                          "lck->count == 0 for list %d", list));
    333                         }
    334                         /*
    335                          * Just increment the in-memory struct, posix locks
    336                          * don't stack.
    337                          */
    338                         tdb->lockrecs[i].count++;
    339                         return 0;
    340                 }
     421        new_lck = find_nestlock(tdb, offset);
     422        if (new_lck) {
     423                /*
     424                 * Just increment the in-memory struct, posix locks
     425                 * don't stack.
     426                 */
     427                new_lck->count++;
     428                return 0;
    341429        }
    342430
     
    352440        /* Since fcntl locks don't nest, we do a lock for the first one,
    353441           and simply bump the count for future ones */
    354         if (!mark_lock &&
    355             tdb->methods->tdb_brlock(tdb,FREELIST_TOP+4*list, ltype, op,
    356                                      0, 1)) {
    357                 return -1;
    358         }
    359 
    360         tdb->num_locks++;
    361 
    362         tdb->lockrecs[tdb->num_lockrecs].list = list;
     442        if (tdb_brlock(tdb, ltype, offset, 1, flags)) {
     443                return -1;
     444        }
     445
     446        tdb->lockrecs[tdb->num_lockrecs].off = offset;
    363447        tdb->lockrecs[tdb->num_lockrecs].count = 1;
    364448        tdb->lockrecs[tdb->num_lockrecs].ltype = ltype;
    365         tdb->num_lockrecs += 1;
     449        tdb->num_lockrecs++;
    366450
    367451        return 0;
     452}
     453
     454static int tdb_lock_and_recover(struct tdb_context *tdb)
     455{
     456        int ret;
     457
     458        /* We need to match locking order in transaction commit. */
     459        if (tdb_brlock(tdb, F_WRLCK, FREELIST_TOP, 0, TDB_LOCK_WAIT)) {
     460                return -1;
     461        }
     462
     463        if (tdb_brlock(tdb, F_WRLCK, OPEN_LOCK, 1, TDB_LOCK_WAIT)) {
     464                tdb_brunlock(tdb, F_WRLCK, FREELIST_TOP, 0);
     465                return -1;
     466        }
     467
     468        ret = tdb_transaction_recover(tdb);
     469
     470        tdb_brunlock(tdb, F_WRLCK, OPEN_LOCK, 1);
     471        tdb_brunlock(tdb, F_WRLCK, FREELIST_TOP, 0);
     472
     473        return ret;
     474}
     475
     476static bool have_data_locks(const struct tdb_context *tdb)
     477{
     478        unsigned int i;
     479
     480        for (i = 0; i < tdb->num_lockrecs; i++) {
     481                if (tdb->lockrecs[i].off >= lock_offset(-1))
     482                        return true;
     483        }
     484        return false;
     485}
     486
     487static int tdb_lock_list(struct tdb_context *tdb, int list, int ltype,
     488                         enum tdb_lock_flags waitflag)
     489{
     490        int ret;
     491        bool check = false;
     492
     493        /* a allrecord lock allows us to avoid per chain locks */
     494        if (tdb->allrecord_lock.count &&
     495            (ltype == tdb->allrecord_lock.ltype || ltype == F_RDLCK)) {
     496                return 0;
     497        }
     498
     499        if (tdb->allrecord_lock.count) {
     500                tdb->ecode = TDB_ERR_LOCK;
     501                ret = -1;
     502        } else {
     503                /* Only check when we grab first data lock. */
     504                check = !have_data_locks(tdb);
     505                ret = tdb_nest_lock(tdb, lock_offset(list), ltype, waitflag);
     506
     507                if (ret == 0 && check && tdb_needs_recovery(tdb)) {
     508                        tdb_nest_unlock(tdb, lock_offset(list), ltype, false);
     509
     510                        if (tdb_lock_and_recover(tdb) == -1) {
     511                                return -1;
     512                        }
     513                        return tdb_lock_list(tdb, list, ltype, waitflag);
     514                }
     515        }
     516        return ret;
    368517}
    369518
     
    372521{
    373522        int ret;
    374         ret = _tdb_lock(tdb, list, ltype, F_SETLKW);
     523
     524        ret = tdb_lock_list(tdb, list, ltype, TDB_LOCK_WAIT);
    375525        if (ret) {
    376526                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock failed on list %d "
     
    383533int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype)
    384534{
    385         return _tdb_lock(tdb, list, ltype, F_SETLK);
    386 }
    387 
    388 
    389 /* unlock the database: returns void because it's too late for errors. */
    390         /* changed to return int it may be interesting to know there
    391            has been an error  --simo */
    392 int tdb_unlock(struct tdb_context *tdb, int list, int ltype)
     535        return tdb_lock_list(tdb, list, ltype, TDB_LOCK_NOWAIT);
     536}
     537
     538
     539int tdb_nest_unlock(struct tdb_context *tdb, uint32_t offset, int ltype,
     540                    bool mark_lock)
    393541{
    394542        int ret = -1;
    395         int i;
    396         struct tdb_lock_type *lck = NULL;
    397         bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
    398 
    399         ltype &= ~TDB_MARK_LOCK;
    400 
    401         /* a global lock allows us to avoid per chain locks */
    402         if (tdb->global_lock.count &&
    403             (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
    404                 return 0;
    405         }
    406 
    407         if (tdb->global_lock.count) {
    408                 tdb->ecode = TDB_ERR_LOCK;
    409                 return -1;
    410         }
     543        struct tdb_lock_type *lck;
    411544
    412545        if (tdb->flags & TDB_NOLOCK)
     
    414547
    415548        /* Sanity checks */
    416         if (list < -1 || list >= (int)tdb->header.hash_size) {
    417                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
     549        if (offset >= lock_offset(tdb->header.hash_size)) {
     550                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: offset %u invalid (%d)\n", offset, tdb->header.hash_size));
    418551                return ret;
    419552        }
    420553
    421         for (i=0; i<tdb->num_lockrecs; i++) {
    422                 if (tdb->lockrecs[i].list == list) {
    423                         lck = &tdb->lockrecs[i];
    424                         break;
    425                 }
    426         }
    427 
     554        lck = find_nestlock(tdb, offset);
    428555        if ((lck == NULL) || (lck->count == 0)) {
    429556                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: count is 0\n"));
     
    446573                ret = 0;
    447574        } else {
    448                 ret = tdb->methods->tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK,
    449                                                F_SETLKW, 0, 1);
    450         }
    451         tdb->num_locks--;
     575                ret = tdb_brunlock(tdb, ltype, offset, 1);
     576        }
    452577
    453578        /*
     
    455580         * last array element.
    456581         */
    457 
    458         if (tdb->num_lockrecs > 1) {
    459                 *lck = tdb->lockrecs[tdb->num_lockrecs-1];
    460         }
    461         tdb->num_lockrecs -= 1;
     582        *lck = tdb->lockrecs[--tdb->num_lockrecs];
    462583
    463584        /*
     
    475596}
    476597
     598int tdb_unlock(struct tdb_context *tdb, int list, int ltype)
     599{
     600        /* a global lock allows us to avoid per chain locks */
     601        if (tdb->allrecord_lock.count &&
     602            (ltype == tdb->allrecord_lock.ltype || ltype == F_RDLCK)) {
     603                return 0;
     604        }
     605
     606        if (tdb->allrecord_lock.count) {
     607                tdb->ecode = TDB_ERR_LOCK;
     608                return -1;
     609        }
     610
     611        return tdb_nest_unlock(tdb, lock_offset(list), ltype, false);
     612}
     613
    477614/*
    478615  get the transaction lock
    479616 */
    480 int tdb_transaction_lock(struct tdb_context *tdb, int ltype)
    481 {
    482         if (tdb->global_lock.count) {
    483                 return 0;
    484         }
    485         if (tdb->transaction_lock_count > 0) {
    486                 tdb->transaction_lock_count++;
    487                 return 0;
    488         }
    489 
    490         if (tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, ltype,
    491                                      F_SETLKW, 0, 1) == -1) {
    492                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_lock: failed to get transaction lock\n"));
    493                 tdb->ecode = TDB_ERR_LOCK;
    494                 return -1;
    495         }
    496         tdb->transaction_lock_count++;
    497         return 0;
     617int tdb_transaction_lock(struct tdb_context *tdb, int ltype,
     618                         enum tdb_lock_flags lockflags)
     619{
     620        return tdb_nest_lock(tdb, TRANSACTION_LOCK, ltype, lockflags);
    498621}
    499622
     
    501624  release the transaction lock
    502625 */
    503 int tdb_transaction_unlock(struct tdb_context *tdb)
    504 {
    505         int ret;
    506         if (tdb->global_lock.count) {
    507                 return 0;
    508         }
    509         if (tdb->transaction_lock_count > 1) {
    510                 tdb->transaction_lock_count--;
    511                 return 0;
    512         }
    513         ret = tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
    514         if (ret == 0) {
    515                 tdb->transaction_lock_count = 0;
    516         }
    517         return ret;
    518 }
    519 
    520 
    521 
    522 
    523 /* lock/unlock entire database */
    524 static int _tdb_lockall(struct tdb_context *tdb, int ltype, int op)
    525 {
    526         bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
    527 
    528         ltype &= ~TDB_MARK_LOCK;
    529 
     626int tdb_transaction_unlock(struct tdb_context *tdb, int ltype)
     627{
     628        return tdb_nest_unlock(tdb, TRANSACTION_LOCK, ltype, false);
     629}
     630
     631/* Returns 0 if all done, -1 if error, 1 if ok. */
     632static int tdb_allrecord_check(struct tdb_context *tdb, int ltype,
     633                               enum tdb_lock_flags flags, bool upgradable)
     634{
    530635        /* There are no locks on read-only dbs */
    531636        if (tdb->read_only || tdb->traverse_read) {
     
    534639        }
    535640
    536         if (tdb->global_lock.count && tdb->global_lock.ltype == ltype) {
    537                 tdb->global_lock.count++;
    538                 return 0;
    539         }
    540 
    541         if (tdb->global_lock.count) {
     641        if (tdb->allrecord_lock.count && tdb->allrecord_lock.ltype == ltype) {
     642                tdb->allrecord_lock.count++;
     643                return 0;
     644        }
     645
     646        if (tdb->allrecord_lock.count) {
    542647                /* a global lock of a different type exists */
    543648                tdb->ecode = TDB_ERR_LOCK;
    544649                return -1;
    545650        }
    546        
    547         if (tdb->num_locks != 0) {
     651
     652        if (tdb_have_extra_locks(tdb)) {
    548653                /* can't combine global and chain locks */
    549654                tdb->ecode = TDB_ERR_LOCK;
     
    551656        }
    552657
    553         if (!mark_lock &&
    554             tdb->methods->tdb_brlock(tdb, FREELIST_TOP, ltype, op,
    555                                      0, 4*tdb->header.hash_size)) {
    556                 if (op == F_SETLKW) {
    557                         TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lockall failed (%s)\n", strerror(errno)));
     658        if (upgradable && ltype != F_RDLCK) {
     659                /* tdb error: you can't upgrade a write lock! */
     660                tdb->ecode = TDB_ERR_LOCK;
     661                return -1;
     662        }
     663        return 1;
     664}
     665
     666/* We only need to lock individual bytes, but Linux merges consecutive locks
     667 * so we lock in contiguous ranges. */
     668static int tdb_chainlock_gradual(struct tdb_context *tdb,
     669                                 int ltype, enum tdb_lock_flags flags,
     670                                 size_t off, size_t len)
     671{
     672        int ret;
     673        enum tdb_lock_flags nb_flags = (flags & ~TDB_LOCK_WAIT);
     674
     675        if (len <= 4) {
     676                /* Single record.  Just do blocking lock. */
     677                return tdb_brlock(tdb, ltype, off, len, flags);
     678        }
     679
     680        /* First we try non-blocking. */
     681        ret = tdb_brlock(tdb, ltype, off, len, nb_flags);
     682        if (ret == 0) {
     683                return 0;
     684        }
     685
     686        /* Try locking first half, then second. */
     687        ret = tdb_chainlock_gradual(tdb, ltype, flags, off, len / 2);
     688        if (ret == -1)
     689                return -1;
     690
     691        ret = tdb_chainlock_gradual(tdb, ltype, flags,
     692                                    off + len / 2, len - len / 2);
     693        if (ret == -1) {
     694                tdb_brunlock(tdb, ltype, off, len / 2);
     695                return -1;
     696        }
     697        return 0;
     698}
     699
     700/* lock/unlock entire database.  It can only be upgradable if you have some
     701 * other way of guaranteeing exclusivity (ie. transaction write lock).
     702 * We do the locking gradually to avoid being starved by smaller locks. */
     703int tdb_allrecord_lock(struct tdb_context *tdb, int ltype,
     704                       enum tdb_lock_flags flags, bool upgradable)
     705{
     706        switch (tdb_allrecord_check(tdb, ltype, flags, upgradable)) {
     707        case -1:
     708                return -1;
     709        case 0:
     710                return 0;
     711        }
     712
     713        /* We cover two kinds of locks:
     714         * 1) Normal chain locks.  Taken for almost all operations.
     715         * 3) Individual records locks.  Taken after normal or free
     716         *    chain locks.
     717         *
     718         * It is (1) which cause the starvation problem, so we're only
     719         * gradual for that. */
     720        if (tdb_chainlock_gradual(tdb, ltype, flags, FREELIST_TOP,
     721                                  tdb->header.hash_size * 4) == -1) {
     722                return -1;
     723        }
     724
     725        /* Grab individual record locks. */
     726        if (tdb_brlock(tdb, ltype, lock_offset(tdb->header.hash_size), 0,
     727                       flags) == -1) {
     728                tdb_brunlock(tdb, ltype, FREELIST_TOP,
     729                             tdb->header.hash_size * 4);
     730                return -1;
     731        }
     732
     733        tdb->allrecord_lock.count = 1;
     734        /* If it's upgradable, it's actually exclusive so we can treat
     735         * it as a write lock. */
     736        tdb->allrecord_lock.ltype = upgradable ? F_WRLCK : ltype;
     737        tdb->allrecord_lock.off = upgradable;
     738
     739        if (tdb_needs_recovery(tdb)) {
     740                bool mark = flags & TDB_LOCK_MARK_ONLY;
     741                tdb_allrecord_unlock(tdb, ltype, mark);
     742                if (mark) {
     743                        tdb->ecode = TDB_ERR_LOCK;
     744                        TDB_LOG((tdb, TDB_DEBUG_ERROR,
     745                                 "tdb_lockall_mark cannot do recovery\n"));
     746                        return -1;
    558747                }
    559                 return -1;
    560         }
    561 
    562         tdb->global_lock.count = 1;
    563         tdb->global_lock.ltype = ltype;
     748                if (tdb_lock_and_recover(tdb) == -1) {
     749                        return -1;
     750                }
     751                return tdb_allrecord_lock(tdb, ltype, flags, upgradable);
     752        }
    564753
    565754        return 0;
     
    569758
    570759/* unlock entire db */
    571 static int _tdb_unlockall(struct tdb_context *tdb, int ltype)
    572 {
    573         bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
    574 
    575         ltype &= ~TDB_MARK_LOCK;
    576 
     760int tdb_allrecord_unlock(struct tdb_context *tdb, int ltype, bool mark_lock)
     761{
    577762        /* There are no locks on read-only dbs */
    578763        if (tdb->read_only || tdb->traverse_read) {
     
    581766        }
    582767
    583         if (tdb->global_lock.ltype != ltype || tdb->global_lock.count == 0) {
     768        if (tdb->allrecord_lock.count == 0) {
    584769                tdb->ecode = TDB_ERR_LOCK;
    585770                return -1;
    586771        }
    587772
    588         if (tdb->global_lock.count > 1) {
    589                 tdb->global_lock.count--;
    590                 return 0;
    591         }
    592 
    593         if (!mark_lock &&
    594             tdb->methods->tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW,
    595                                      0, 4*tdb->header.hash_size)) {
     773        /* Upgradable locks are marked as write locks. */
     774        if (tdb->allrecord_lock.ltype != ltype
     775            && (!tdb->allrecord_lock.off || ltype != F_RDLCK)) {
     776                tdb->ecode = TDB_ERR_LOCK;
     777                return -1;
     778        }
     779
     780        if (tdb->allrecord_lock.count > 1) {
     781                tdb->allrecord_lock.count--;
     782                return 0;
     783        }
     784
     785        if (!mark_lock && tdb_brunlock(tdb, ltype, FREELIST_TOP, 0)) {
    596786                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlockall failed (%s)\n", strerror(errno)));
    597787                return -1;
    598788        }
    599789
    600         tdb->global_lock.count = 0;
    601         tdb->global_lock.ltype = 0;
     790        tdb->allrecord_lock.count = 0;
     791        tdb->allrecord_lock.ltype = 0;
    602792
    603793        return 0;
     
    605795
    606796/* lock entire database with write lock */
    607 int tdb_lockall(struct tdb_context *tdb)
     797_PUBLIC_ int tdb_lockall(struct tdb_context *tdb)
    608798{
    609799        tdb_trace(tdb, "tdb_lockall");
    610         return _tdb_lockall(tdb, F_WRLCK, F_SETLKW);
     800        return tdb_allrecord_lock(tdb, F_WRLCK, TDB_LOCK_WAIT, false);
    611801}
    612802
    613803/* lock entire database with write lock - mark only */
    614 int tdb_lockall_mark(struct tdb_context *tdb)
     804_PUBLIC_ int tdb_lockall_mark(struct tdb_context *tdb)
    615805{
    616806        tdb_trace(tdb, "tdb_lockall_mark");
    617         return _tdb_lockall(tdb, F_WRLCK | TDB_MARK_LOCK, F_SETLKW);
     807        return tdb_allrecord_lock(tdb, F_WRLCK, TDB_LOCK_MARK_ONLY, false);
    618808}
    619809
    620810/* unlock entire database with write lock - unmark only */
    621 int tdb_lockall_unmark(struct tdb_context *tdb)
     811_PUBLIC_ int tdb_lockall_unmark(struct tdb_context *tdb)
    622812{
    623813        tdb_trace(tdb, "tdb_lockall_unmark");
    624         return _tdb_unlockall(tdb, F_WRLCK | TDB_MARK_LOCK);
     814        return tdb_allrecord_unlock(tdb, F_WRLCK, true);
    625815}
    626816
    627817/* lock entire database with write lock - nonblocking varient */
    628 int tdb_lockall_nonblock(struct tdb_context *tdb)
    629 {
    630         int ret = _tdb_lockall(tdb, F_WRLCK, F_SETLK);
     818_PUBLIC_ int tdb_lockall_nonblock(struct tdb_context *tdb)
     819{
     820        int ret = tdb_allrecord_lock(tdb, F_WRLCK, TDB_LOCK_NOWAIT, false);
    631821        tdb_trace_ret(tdb, "tdb_lockall_nonblock", ret);
    632822        return ret;
     
    634824
    635825/* unlock entire database with write lock */
    636 int tdb_unlockall(struct tdb_context *tdb)
     826_PUBLIC_ int tdb_unlockall(struct tdb_context *tdb)
    637827{
    638828        tdb_trace(tdb, "tdb_unlockall");
    639         return _tdb_unlockall(tdb, F_WRLCK);
     829        return tdb_allrecord_unlock(tdb, F_WRLCK, false);
    640830}
    641831
    642832/* lock entire database with read lock */
    643 int tdb_lockall_read(struct tdb_context *tdb)
     833_PUBLIC_ int tdb_lockall_read(struct tdb_context *tdb)
    644834{
    645835        tdb_trace(tdb, "tdb_lockall_read");
    646         return _tdb_lockall(tdb, F_RDLCK, F_SETLKW);
     836        return tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, false);
    647837}
    648838
    649839/* lock entire database with read lock - nonblock varient */
    650 int tdb_lockall_read_nonblock(struct tdb_context *tdb)
    651 {
    652         int ret = _tdb_lockall(tdb, F_RDLCK, F_SETLK);
     840_PUBLIC_ int tdb_lockall_read_nonblock(struct tdb_context *tdb)
     841{
     842        int ret = tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_NOWAIT, false);
    653843        tdb_trace_ret(tdb, "tdb_lockall_read_nonblock", ret);
    654844        return ret;
     
    656846
    657847/* unlock entire database with read lock */
    658 int tdb_unlockall_read(struct tdb_context *tdb)
     848_PUBLIC_ int tdb_unlockall_read(struct tdb_context *tdb)
    659849{
    660850        tdb_trace(tdb, "tdb_unlockall_read");
    661         return _tdb_unlockall(tdb, F_RDLCK);
     851        return tdb_allrecord_unlock(tdb, F_RDLCK, false);
    662852}
    663853
    664854/* lock/unlock one hash chain. This is meant to be used to reduce
    665855   contention - it cannot guarantee how many records will be locked */
    666 int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
     856_PUBLIC_ int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
    667857{
    668858        int ret = tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
     
    674864   to reduce contention - it cannot guarantee how many records will be
    675865   locked */
    676 int tdb_chainlock_nonblock(struct tdb_context *tdb, TDB_DATA key)
     866_PUBLIC_ int tdb_chainlock_nonblock(struct tdb_context *tdb, TDB_DATA key)
    677867{
    678868        int ret = tdb_lock_nonblock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
     
    682872
    683873/* mark a chain as locked without actually locking it. Warning! use with great caution! */
    684 int tdb_chainlock_mark(struct tdb_context *tdb, TDB_DATA key)
    685 {
    686         int ret = tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
     874_PUBLIC_ int tdb_chainlock_mark(struct tdb_context *tdb, TDB_DATA key)
     875{
     876        int ret = tdb_nest_lock(tdb, lock_offset(BUCKET(tdb->hash_fn(&key))),
     877                                F_WRLCK, TDB_LOCK_MARK_ONLY);
    687878        tdb_trace_1rec(tdb, "tdb_chainlock_mark", key);
    688879        return ret;
     
    690881
    691882/* unmark a chain as locked without actually locking it. Warning! use with great caution! */
    692 int tdb_chainlock_unmark(struct tdb_context *tdb, TDB_DATA key)
     883_PUBLIC_ int tdb_chainlock_unmark(struct tdb_context *tdb, TDB_DATA key)
    693884{
    694885        tdb_trace_1rec(tdb, "tdb_chainlock_unmark", key);
    695         return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
    696 }
    697 
    698 int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
     886        return tdb_nest_unlock(tdb, lock_offset(BUCKET(tdb->hash_fn(&key))),
     887                               F_WRLCK, true);
     888}
     889
     890_PUBLIC_ int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
    699891{
    700892        tdb_trace_1rec(tdb, "tdb_chainunlock", key);
     
    702894}
    703895
    704 int tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key)
     896_PUBLIC_ int tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key)
    705897{
    706898        int ret;
     
    710902}
    711903
    712 int tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key)
     904_PUBLIC_ int tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key)
    713905{
    714906        tdb_trace_1rec(tdb, "tdb_chainunlock_read", key);
     
    716908}
    717909
    718 
    719 
    720910/* record lock stops delete underneath */
    721911int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off)
    722912{
    723         if (tdb->global_lock.count) {
    724                 return 0;
    725         }
    726         return off ? tdb->methods->tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0, 1) : 0;
     913        if (tdb->allrecord_lock.count) {
     914                return 0;
     915        }
     916        return off ? tdb_brlock(tdb, F_RDLCK, off, 1, TDB_LOCK_WAIT) : 0;
    727917}
    728918
     
    738928                if (i->off == off)
    739929                        return -1;
    740         return tdb->methods->tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1, 1);
    741 }
    742 
    743 /*
    744   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
    745   an error to fail to get the lock here.
    746 */
     930        if (tdb->allrecord_lock.count) {
     931                if (tdb->allrecord_lock.ltype == F_WRLCK) {
     932                        return 0;
     933                }
     934                return -1;
     935        }
     936        return tdb_brlock(tdb, F_WRLCK, off, 1, TDB_LOCK_NOWAIT|TDB_LOCK_PROBE);
     937}
     938
    747939int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off)
    748940{
    749         return tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0, 1);
     941        if (tdb->allrecord_lock.count) {
     942                return 0;
     943        }
     944        return tdb_brunlock(tdb, F_WRLCK, off, 1);
    750945}
    751946
     
    756951        uint32_t count = 0;
    757952
    758         if (tdb->global_lock.count) {
     953        if (tdb->allrecord_lock.count) {
    759954                return 0;
    760955        }
     
    765960                if (i->off == off)
    766961                        count++;
    767         return (count == 1 ? tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0, 1) : 0);
    768 }
     962        return (count == 1 ? tdb_brunlock(tdb, F_RDLCK, off, 1) : 0);
     963}
     964
     965bool tdb_have_extra_locks(struct tdb_context *tdb)
     966{
     967        unsigned int extra = tdb->num_lockrecs;
     968
     969        /* A transaction holds the lock for all records. */
     970        if (!tdb->transaction && tdb->allrecord_lock.count) {
     971                return true;
     972        }
     973
     974        /* We always hold the active lock if CLEAR_IF_FIRST. */
     975        if (find_nestlock(tdb, ACTIVE_LOCK)) {
     976                extra--;
     977        }
     978
     979        /* In a transaction, we expect to hold the transaction lock */
     980        if (tdb->transaction && find_nestlock(tdb, TRANSACTION_LOCK)) {
     981                extra--;
     982        }
     983
     984        return extra;
     985}
     986
     987/* The transaction code uses this to remove all locks. */
     988void tdb_release_transaction_locks(struct tdb_context *tdb)
     989{
     990        unsigned int i, active = 0;
     991
     992        if (tdb->allrecord_lock.count != 0) {
     993                tdb_brunlock(tdb, tdb->allrecord_lock.ltype, FREELIST_TOP, 0);
     994                tdb->allrecord_lock.count = 0;
     995        }
     996
     997        for (i=0;i<tdb->num_lockrecs;i++) {
     998                struct tdb_lock_type *lck = &tdb->lockrecs[i];
     999
     1000                /* Don't release the active lock!  Copy it to first entry. */
     1001                if (lck->off == ACTIVE_LOCK) {
     1002                        tdb->lockrecs[active++] = *lck;
     1003                } else {
     1004                        tdb_brunlock(tdb, lck->ltype, lck->off, 1);
     1005                }
     1006        }
     1007        tdb->num_lockrecs = active;
     1008        if (tdb->num_lockrecs == 0) {
     1009                SAFE_FREE(tdb->lockrecs);
     1010        }
     1011}
Note: See TracChangeset for help on using the changeset viewer.