Changeset 745 for trunk/server/lib/tdb/common/lock.c
- Timestamp:
- Nov 27, 2012, 4:43:17 PM (13 years ago)
- Location:
- trunk/server
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/server
- Property svn:mergeinfo changed
/vendor/current merged: 581,587,591,594,597,600,615,618,740
- Property svn:mergeinfo changed
-
trunk/server/lib/tdb/common/lock.c
r647 r745 7 7 Copyright (C) Paul `Rusty' Russell 2000 8 8 Copyright (C) Jeremy Allison 2000-2003 9 9 10 10 ** NOTE! The following LGPL license applies to the tdb 11 11 ** library. This does NOT imply that all of Samba is released 12 12 ** under the LGPL 13 13 14 14 This library is free software; you can redistribute it and/or 15 15 modify it under the terms of the GNU Lesser General Public … … 27 27 28 28 #include "tdb_private.h" 29 30 #define TDB_MARK_LOCK 0x8000000031 32 29 #ifdef __OS2__ 33 30 34 static char* lock_type( int lck) 35 { 36 static char buffer[16]; 37 switch(lck) { 38 case F_GETLK: return "F_GETLK"; 39 case F_SETLK: return "F_SETLK"; 40 case F_SETLKW: return "F_SETLKW"; 41 default: 42 sprintf( buffer, "unknown %d", lck); 43 } 44 return buffer; 31 static char* lock_type( bool waitflag) 32 { 33 if (waitflag) 34 return "F_SETLKW"; 35 else 36 return "F_SETLK"; 45 37 } 46 38 … … 59 51 60 52 static int _mutex_brlock(struct tdb_context *tdb, tdb_off_t offset, 61 int rw_type, int lck_type, int probe, size_t len)53 int rw_type, bool waitflag, size_t len) 62 54 { 63 55 HMTX hSem; … … 66 58 67 59 switch( offset) { 68 case GLOBAL_LOCK:60 case OPEN_LOCK: 69 61 hSem = tdb->hGlobalLock; 70 62 break; … … 86 78 TDB_LOG((tdb, TDB_DEBUG_TRACE,"_mutex_brlock handle %d, offset %d\n", hSem, offset)); 87 79 88 if ( lck_type == F_SETLKW)80 if (waitflag) 89 81 ulTimeout = SEM_INDEFINITE_WAIT; 90 82 else … … 112 104 errno = EINVAL; 113 105 TDB_LOG(( tdb, TDB_DEBUG_ERROR, "_mutex_brlock pid %X, failed (fd=%d) at offset %d rw_type=%d lck_type=%d len=%d, rc=%d\n", 114 getpid(), tdb->fd, offset, rw_type, lck_type, (int)len, rc));106 getpid(), tdb->fd, offset, rw_type, waitflag, (int)len, rc)); 115 107 tdb->ecode = TDB_ERR_LOCK; 116 108 return -1; … … 118 110 #endif 119 111 120 void tdb_setalarm_sigptr(struct tdb_context *tdb, volatile sig_atomic_t *ptr)112 _PUBLIC_ void tdb_setalarm_sigptr(struct tdb_context *tdb, volatile sig_atomic_t *ptr) 121 113 { 122 114 tdb->interrupt_sig_ptr = ptr; 123 115 } 124 116 125 /* a byte range locking function - return 0 on success 126 this functions locks/unlocks 1 byte at the specified offset. 127 128 On error, errno is also set so that errors are passed back properly 129 through tdb_open(). 130 131 note that a len of zero means lock to end of file 132 */ 133 int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset, 134 int rw_type, int lck_type, int probe, size_t len) 135 { 136 117 static int fcntl_lock(struct tdb_context *tdb, 118 int rw, tdb_off_t off, size_t len, bool waitflag) 119 { 137 120 #ifdef __OS2__ 138 121 APIRET rc; 139 ULONG fAccess = 0; 140 int fLock = 0;122 ULONG fAccess = 0; // default exclusiv 123 int fLock = 1; // default lock 141 124 off_t cbFile; 142 125 off_t offStart; 143 126 off_t cbRange; 144 145 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_brlock pid %X, fd %d, lck_type %s, rw_type %s, offset %d, len %d\n", 146 getpid(), tdb->fd, lock_type(lck_type), read_type(rw_type), offset, len)); 147 148 if (tdb->flags & TDB_NOLOCK) { 149 return 0; 150 } 151 152 if ((rw_type == F_WRLCK) && (tdb->read_only || tdb->traverse_read)) { 153 tdb->ecode = TDB_ERR_RDONLY; 154 return -1; 155 } 127 FILELOCK lockArea = {0}, unlockArea = {0}; 128 129 TDB_LOG((tdb, TDB_DEBUG_TRACE, "fcntl_lock in pid %X, fd %d, lck_type %s, rw_type %s, offset %d, len %d\n", 130 getpid(), tdb->fd, lock_type(waitflag), read_type(rw), off, len)); 156 131 157 switch( offset) {158 case GLOBAL_LOCK:132 switch(off) { 133 case OPEN_LOCK: 159 134 case ACTIVE_LOCK: 160 135 case TRANSACTION_LOCK: 161 return _mutex_brlock( tdb, offset, rw_type, lck_type, probe, len);136 return _mutex_brlock(tdb, off, rw, waitflag, len); 162 137 } 163 138 164 139 /* flags and order */ 165 fAccess = 0; /* exclusive */ 166 switch (rw_type) 140 switch (rw) 167 141 { 168 142 case F_UNLCK: 169 143 fLock = 0; 144 unlockArea.lOffset = off; 145 unlockArea.lRange = len ? len : tdb->header.hash_size *4; //was LONG_MAX 170 146 break; 171 147 case F_RDLCK: 148 lockArea.lOffset = off; 149 lockArea.lRange = len ? len : LONG_MAX; 172 150 fAccess = 1; /* read-only */ 151 break; 173 152 case F_WRLCK: 174 fLock = 1; 153 lockArea.lOffset = off; 154 lockArea.lRange = len ? len : LONG_MAX; 175 155 break; 176 156 default: … … 178 158 } 179 159 180 FILELOCK aflock[2]; 181 bzero(&aflock[(fLock + 1) & 1], sizeof(aflock[0])); 182 aflock[fLock].lOffset = offset; 183 aflock[fLock].lRange = len ? len : LONG_MAX; 184 rc = DosSetFileLocks(tdb->fd, &aflock[0], &aflock[1], SEM_IMMEDIATE_RETURN, fAccess); 185 186 if (rc != NO_ERROR && lck_type == F_SETLKW) { 160 rc = DosSetFileLocks(tdb->fd, &unlockArea, &lockArea, SEM_IMMEDIATE_RETURN, fAccess); 161 162 if (rc != NO_ERROR && waitflag) { 187 163 int count = 20; 188 164 do { 189 rc = DosSetFileLocks(tdb->fd, & aflock[0], &aflock[1], 100, fAccess);165 rc = DosSetFileLocks(tdb->fd, &unlockArea, &lockArea, 100, fAccess); 190 166 count--; 191 167 } while( count>0 && rc !=NO_ERROR); 192 168 193 169 } 170 171 TDB_LOG(( tdb, TDB_DEBUG_TRACE, "fcntl_lock out pid %X, fd %d, lck_type %s, rw_type %s, offset %d, len %d, rc=%d\n", 172 getpid(), tdb->fd, lock_type(waitflag), read_type(rw), off, len, rc)); 173 194 174 if (rc != NO_ERROR) { 195 175 errno = EINVAL; 196 /* Generic lock error. errno set by fcntl. 197 * EAGAIN is an expected return from non-blocking 198 * locks. */ 199 if (!probe && lck_type != F_SETLK) { 200 /* Ensure error code is set for log fun to examine. */ 201 tdb->ecode = TDB_ERR_LOCK; 202 TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d len=%d\n", 203 tdb->fd, offset, rw_type, lck_type, (int)len)); 176 return -1; 177 } 178 179 #else 180 struct flock fl; 181 182 fl.l_type = rw; 183 fl.l_whence = SEEK_SET; 184 fl.l_start = off; 185 fl.l_len = len; 186 fl.l_pid = 0; 187 188 if (waitflag) 189 return fcntl(tdb->fd, F_SETLKW, &fl); 190 else 191 return fcntl(tdb->fd, F_SETLK, &fl); 192 #endif 193 } 194 195 static int fcntl_unlock(struct tdb_context *tdb, int rw, tdb_off_t off, size_t len) 196 { 197 struct flock fl; 198 #if 0 /* Check they matched up locks and unlocks correctly. */ 199 char line[80]; 200 FILE *locks; 201 bool found = false; 202 203 locks = fopen("/proc/locks", "r"); 204 205 while (fgets(line, 80, locks)) { 206 char *p; 207 int type, start, l; 208 209 /* eg. 1: FLOCK ADVISORY WRITE 2440 08:01:2180826 0 EOF */ 210 p = strchr(line, ':') + 1; 211 if (strncmp(p, " POSIX ADVISORY ", strlen(" POSIX ADVISORY "))) 212 continue; 213 p += strlen(" FLOCK ADVISORY "); 214 if (strncmp(p, "READ ", strlen("READ ")) == 0) 215 type = F_RDLCK; 216 else if (strncmp(p, "WRITE ", strlen("WRITE ")) == 0) 217 type = F_WRLCK; 218 else 219 abort(); 220 p += 6; 221 if (atoi(p) != getpid()) 222 continue; 223 p = strchr(strchr(p, ' ') + 1, ' ') + 1; 224 start = atoi(p); 225 p = strchr(p, ' ') + 1; 226 if (strncmp(p, "EOF", 3) == 0) 227 l = 0; 228 else 229 l = atoi(p) - start + 1; 230 231 if (off == start) { 232 if (len != l) { 233 fprintf(stderr, "Len %u should be %u: %s", 234 (int)len, l, line); 235 abort(); 236 } 237 if (type != rw) { 238 fprintf(stderr, "Type %s wrong: %s", 239 rw == F_RDLCK ? "READ" : "WRITE", line); 240 abort(); 241 } 242 found = true; 243 break; 204 244 } 205 206 TDB_LOG(( tdb, TDB_DEBUG_TRACE, "tdb_brlock pid %X, failed (fd=%d) at offset %d rw_type=%d lck_type=%d len=%d\n", 207 getpid(), tdb->fd, offset, rw_type, lck_type, (int)len)); 208 tdb->ecode = TDB_ERR_LOCK; 209 return -1; 210 } 211 212 TDB_LOG(( tdb, TDB_DEBUG_TRACE, "tdb_brlock pid %X, fd %d, lck_type %s, rw_type %s, offset %d, len %d DONE\n", 213 getpid(), tdb->fd, lock_type(lck_type), read_type(rw_type), offset, len)); 214 245 } 246 247 if (!found) { 248 fprintf(stderr, "Unlock on %u@%u not found!\n", 249 (int)off, (int)len); 250 abort(); 251 } 252 253 fclose(locks); 254 #endif 255 256 fl.l_type = F_UNLCK; 257 fl.l_whence = SEEK_SET; 258 fl.l_start = off; 259 fl.l_len = len; 260 fl.l_pid = 0; 261 #ifdef __OS2__ 262 return fcntl_lock(tdb, F_UNLCK, off, len, 1); 215 263 #else 216 217 struct flock fl; 264 return fcntl(tdb->fd, F_SETLKW, &fl); 265 #endif 266 } 267 268 /* list -1 is the alloc list, otherwise a hash chain. */ 269 static tdb_off_t lock_offset(int list) 270 { 271 return FREELIST_TOP + 4*list; 272 } 273 274 /* a byte range locking function - return 0 on success 275 this functions locks/unlocks 1 byte at the specified offset. 276 277 On error, errno is also set so that errors are passed back properly 278 through tdb_open(). 279 280 note that a len of zero means lock to end of file 281 */ 282 int tdb_brlock(struct tdb_context *tdb, 283 int rw_type, tdb_off_t offset, size_t len, 284 enum tdb_lock_flags flags) 285 { 218 286 int ret; 219 287 220 288 if (tdb->flags & TDB_NOLOCK) { 289 return 0; 290 } 291 292 if (flags & TDB_LOCK_MARK_ONLY) { 221 293 return 0; 222 294 } … … 227 299 } 228 300 229 fl.l_type = rw_type;230 fl.l_whence = SEEK_SET;231 fl.l_start = offset;232 fl.l_len = len;233 fl.l_pid = 0;234 235 301 do { 236 ret = fcntl (tdb->fd,lck_type,&fl);237 302 ret = fcntl_lock(tdb, rw_type, offset, len, 303 flags & TDB_LOCK_WAIT); 238 304 /* Check for a sigalarm break. */ 239 305 if (ret == -1 && errno == EINTR && … … 249 315 * EAGAIN is an expected return from non-blocking 250 316 * locks. */ 251 if (! probe && lck_type != F_SETLK) {252 TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d len=%d\n",253 tdb->fd, offset, rw_type, lck_type, (int)len));317 if (!(flags & TDB_LOCK_PROBE) && errno != EAGAIN) { 318 TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d flags=%d len=%d\n", 319 tdb->fd, offset, rw_type, flags, (int)len)); 254 320 } 255 321 return -1; 256 322 } 257 323 return 0; 258 #endif 259 } 260 324 } 325 326 int tdb_brunlock(struct tdb_context *tdb, 327 int rw_type, tdb_off_t offset, size_t len) 328 { 329 int ret; 330 331 if (tdb->flags & TDB_NOLOCK) { 332 return 0; 333 } 334 335 do { 336 ret = fcntl_unlock(tdb, rw_type, offset, len); 337 } while (ret == -1 && errno == EINTR); 338 339 if (ret == -1) { 340 TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brunlock failed (fd=%d) at offset %d rw_type=%d len=%d\n", 341 tdb->fd, offset, rw_type, (int)len)); 342 } 343 return ret; 344 } 261 345 262 346 /* … … 266 350 made. For those OSes we may loop for a while. 267 351 */ 268 int tdb_ brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len)352 int tdb_allrecord_upgrade(struct tdb_context *tdb) 269 353 { 270 354 int count = 1000; 355 356 if (tdb->allrecord_lock.count != 1) { 357 TDB_LOG((tdb, TDB_DEBUG_ERROR, 358 "tdb_allrecord_upgrade failed: count %u too high\n", 359 tdb->allrecord_lock.count)); 360 return -1; 361 } 362 363 if (tdb->allrecord_lock.off != 1) { 364 TDB_LOG((tdb, TDB_DEBUG_ERROR, 365 "tdb_allrecord_upgrade failed: already upgraded?\n")); 366 return -1; 367 } 368 271 369 while (count--) { 272 370 struct timeval tv; 273 274 371 #ifdef __OS2__ 275 372 // YD we cannot upgrade without an unlock first... 276 tdb_brlock(tdb, offset, F_UNLCK, F_SETLKW, 1, len);373 tdb_brlock(tdb, F_UNLCK, FREELIST_TOP, 0, TDB_LOCK_WAIT|TDB_LOCK_PROBE); 277 374 #endif 278 279 if (tdb_brlock(tdb, offset, F_WRLCK, F_SETLKW, 1, len) == 0) { 375 if (tdb_brlock(tdb, F_WRLCK, FREELIST_TOP, 0, 376 TDB_LOCK_WAIT|TDB_LOCK_PROBE) == 0) { 377 tdb->allrecord_lock.ltype = F_WRLCK; 378 tdb->allrecord_lock.off = 0; 280 379 return 0; 281 380 } … … 288 387 select(0, NULL, NULL, NULL, &tv); 289 388 } 290 TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_ brlock_upgrade failed at offset %d\n", offset));389 TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_allrecord_upgrade failed\n")); 291 390 return -1; 292 391 } 293 392 294 295 /* lock a list in the database. list -1 is the alloc list */ 296 static int _tdb_lock(struct tdb_context *tdb, int list, int ltype, int op) 393 static struct tdb_lock_type *find_nestlock(struct tdb_context *tdb, 394 tdb_off_t offset) 395 { 396 unsigned int i; 397 398 for (i=0; i<tdb->num_lockrecs; i++) { 399 if (tdb->lockrecs[i].off == offset) { 400 return &tdb->lockrecs[i]; 401 } 402 } 403 return NULL; 404 } 405 406 /* lock an offset in the database. */ 407 int tdb_nest_lock(struct tdb_context *tdb, uint32_t offset, int ltype, 408 enum tdb_lock_flags flags) 297 409 { 298 410 struct tdb_lock_type *new_lck; 299 int i; 300 bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK); 301 302 ltype &= ~TDB_MARK_LOCK; 303 304 /* a global lock allows us to avoid per chain locks */ 305 if (tdb->global_lock.count && 306 (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) { 307 return 0; 308 } 309 310 if (tdb->global_lock.count) { 411 412 if (offset >= lock_offset(tdb->header.hash_size)) { 311 413 tdb->ecode = TDB_ERR_LOCK; 312 return -1; 313 } 314 315 if (list < -1 || list >= (int)tdb->header.hash_size) { 316 tdb->ecode = TDB_ERR_LOCK; 317 TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_lock: invalid list %d for ltype=%d\n", 318 list, ltype)); 414 TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_lock: invalid offset %u for ltype=%d\n", 415 offset, ltype)); 319 416 return -1; 320 417 } … … 322 419 return 0; 323 420 324 for (i=0; i<tdb->num_lockrecs; i++) { 325 if (tdb->lockrecs[i].list == list) { 326 if (tdb->lockrecs[i].count == 0) { 327 /* 328 * Can't happen, see tdb_unlock(). It should 329 * be an assert. 330 */ 331 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock: " 332 "lck->count == 0 for list %d", list)); 333 } 334 /* 335 * Just increment the in-memory struct, posix locks 336 * don't stack. 337 */ 338 tdb->lockrecs[i].count++; 339 return 0; 340 } 421 new_lck = find_nestlock(tdb, offset); 422 if (new_lck) { 423 /* 424 * Just increment the in-memory struct, posix locks 425 * don't stack. 426 */ 427 new_lck->count++; 428 return 0; 341 429 } 342 430 … … 352 440 /* Since fcntl locks don't nest, we do a lock for the first one, 353 441 and simply bump the count for future ones */ 354 if (!mark_lock && 355 tdb->methods->tdb_brlock(tdb,FREELIST_TOP+4*list, ltype, op, 356 0, 1)) { 357 return -1; 358 } 359 360 tdb->num_locks++; 361 362 tdb->lockrecs[tdb->num_lockrecs].list = list; 442 if (tdb_brlock(tdb, ltype, offset, 1, flags)) { 443 return -1; 444 } 445 446 tdb->lockrecs[tdb->num_lockrecs].off = offset; 363 447 tdb->lockrecs[tdb->num_lockrecs].count = 1; 364 448 tdb->lockrecs[tdb->num_lockrecs].ltype = ltype; 365 tdb->num_lockrecs += 1;449 tdb->num_lockrecs++; 366 450 367 451 return 0; 452 } 453 454 static int tdb_lock_and_recover(struct tdb_context *tdb) 455 { 456 int ret; 457 458 /* We need to match locking order in transaction commit. */ 459 if (tdb_brlock(tdb, F_WRLCK, FREELIST_TOP, 0, TDB_LOCK_WAIT)) { 460 return -1; 461 } 462 463 if (tdb_brlock(tdb, F_WRLCK, OPEN_LOCK, 1, TDB_LOCK_WAIT)) { 464 tdb_brunlock(tdb, F_WRLCK, FREELIST_TOP, 0); 465 return -1; 466 } 467 468 ret = tdb_transaction_recover(tdb); 469 470 tdb_brunlock(tdb, F_WRLCK, OPEN_LOCK, 1); 471 tdb_brunlock(tdb, F_WRLCK, FREELIST_TOP, 0); 472 473 return ret; 474 } 475 476 static bool have_data_locks(const struct tdb_context *tdb) 477 { 478 unsigned int i; 479 480 for (i = 0; i < tdb->num_lockrecs; i++) { 481 if (tdb->lockrecs[i].off >= lock_offset(-1)) 482 return true; 483 } 484 return false; 485 } 486 487 static int tdb_lock_list(struct tdb_context *tdb, int list, int ltype, 488 enum tdb_lock_flags waitflag) 489 { 490 int ret; 491 bool check = false; 492 493 /* a allrecord lock allows us to avoid per chain locks */ 494 if (tdb->allrecord_lock.count && 495 (ltype == tdb->allrecord_lock.ltype || ltype == F_RDLCK)) { 496 return 0; 497 } 498 499 if (tdb->allrecord_lock.count) { 500 tdb->ecode = TDB_ERR_LOCK; 501 ret = -1; 502 } else { 503 /* Only check when we grab first data lock. */ 504 check = !have_data_locks(tdb); 505 ret = tdb_nest_lock(tdb, lock_offset(list), ltype, waitflag); 506 507 if (ret == 0 && check && tdb_needs_recovery(tdb)) { 508 tdb_nest_unlock(tdb, lock_offset(list), ltype, false); 509 510 if (tdb_lock_and_recover(tdb) == -1) { 511 return -1; 512 } 513 return tdb_lock_list(tdb, list, ltype, waitflag); 514 } 515 } 516 return ret; 368 517 } 369 518 … … 372 521 { 373 522 int ret; 374 ret = _tdb_lock(tdb, list, ltype, F_SETLKW); 523 524 ret = tdb_lock_list(tdb, list, ltype, TDB_LOCK_WAIT); 375 525 if (ret) { 376 526 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock failed on list %d " … … 383 533 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype) 384 534 { 385 return _tdb_lock(tdb, list, ltype, F_SETLK); 386 } 387 388 389 /* unlock the database: returns void because it's too late for errors. */ 390 /* changed to return int it may be interesting to know there 391 has been an error --simo */ 392 int tdb_unlock(struct tdb_context *tdb, int list, int ltype) 535 return tdb_lock_list(tdb, list, ltype, TDB_LOCK_NOWAIT); 536 } 537 538 539 int tdb_nest_unlock(struct tdb_context *tdb, uint32_t offset, int ltype, 540 bool mark_lock) 393 541 { 394 542 int ret = -1; 395 int i; 396 struct tdb_lock_type *lck = NULL; 397 bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK); 398 399 ltype &= ~TDB_MARK_LOCK; 400 401 /* a global lock allows us to avoid per chain locks */ 402 if (tdb->global_lock.count && 403 (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) { 404 return 0; 405 } 406 407 if (tdb->global_lock.count) { 408 tdb->ecode = TDB_ERR_LOCK; 409 return -1; 410 } 543 struct tdb_lock_type *lck; 411 544 412 545 if (tdb->flags & TDB_NOLOCK) … … 414 547 415 548 /* Sanity checks */ 416 if ( list < -1 || list >= (int)tdb->header.hash_size) {417 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));549 if (offset >= lock_offset(tdb->header.hash_size)) { 550 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: offset %u invalid (%d)\n", offset, tdb->header.hash_size)); 418 551 return ret; 419 552 } 420 553 421 for (i=0; i<tdb->num_lockrecs; i++) { 422 if (tdb->lockrecs[i].list == list) { 423 lck = &tdb->lockrecs[i]; 424 break; 425 } 426 } 427 554 lck = find_nestlock(tdb, offset); 428 555 if ((lck == NULL) || (lck->count == 0)) { 429 556 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: count is 0\n")); … … 446 573 ret = 0; 447 574 } else { 448 ret = tdb->methods->tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK, 449 F_SETLKW, 0, 1); 450 } 451 tdb->num_locks--; 575 ret = tdb_brunlock(tdb, ltype, offset, 1); 576 } 452 577 453 578 /* … … 455 580 * last array element. 456 581 */ 457 458 if (tdb->num_lockrecs > 1) { 459 *lck = tdb->lockrecs[tdb->num_lockrecs-1]; 460 } 461 tdb->num_lockrecs -= 1; 582 *lck = tdb->lockrecs[--tdb->num_lockrecs]; 462 583 463 584 /* … … 475 596 } 476 597 598 int tdb_unlock(struct tdb_context *tdb, int list, int ltype) 599 { 600 /* a global lock allows us to avoid per chain locks */ 601 if (tdb->allrecord_lock.count && 602 (ltype == tdb->allrecord_lock.ltype || ltype == F_RDLCK)) { 603 return 0; 604 } 605 606 if (tdb->allrecord_lock.count) { 607 tdb->ecode = TDB_ERR_LOCK; 608 return -1; 609 } 610 611 return tdb_nest_unlock(tdb, lock_offset(list), ltype, false); 612 } 613 477 614 /* 478 615 get the transaction lock 479 616 */ 480 int tdb_transaction_lock(struct tdb_context *tdb, int ltype) 481 { 482 if (tdb->global_lock.count) { 483 return 0; 484 } 485 if (tdb->transaction_lock_count > 0) { 486 tdb->transaction_lock_count++; 487 return 0; 488 } 489 490 if (tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, ltype, 491 F_SETLKW, 0, 1) == -1) { 492 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_lock: failed to get transaction lock\n")); 493 tdb->ecode = TDB_ERR_LOCK; 494 return -1; 495 } 496 tdb->transaction_lock_count++; 497 return 0; 617 int tdb_transaction_lock(struct tdb_context *tdb, int ltype, 618 enum tdb_lock_flags lockflags) 619 { 620 return tdb_nest_lock(tdb, TRANSACTION_LOCK, ltype, lockflags); 498 621 } 499 622 … … 501 624 release the transaction lock 502 625 */ 503 int tdb_transaction_unlock(struct tdb_context *tdb) 504 { 505 int ret; 506 if (tdb->global_lock.count) { 507 return 0; 508 } 509 if (tdb->transaction_lock_count > 1) { 510 tdb->transaction_lock_count--; 511 return 0; 512 } 513 ret = tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1); 514 if (ret == 0) { 515 tdb->transaction_lock_count = 0; 516 } 517 return ret; 518 } 519 520 521 522 523 /* lock/unlock entire database */ 524 static int _tdb_lockall(struct tdb_context *tdb, int ltype, int op) 525 { 526 bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK); 527 528 ltype &= ~TDB_MARK_LOCK; 529 626 int tdb_transaction_unlock(struct tdb_context *tdb, int ltype) 627 { 628 return tdb_nest_unlock(tdb, TRANSACTION_LOCK, ltype, false); 629 } 630 631 /* Returns 0 if all done, -1 if error, 1 if ok. */ 632 static int tdb_allrecord_check(struct tdb_context *tdb, int ltype, 633 enum tdb_lock_flags flags, bool upgradable) 634 { 530 635 /* There are no locks on read-only dbs */ 531 636 if (tdb->read_only || tdb->traverse_read) { … … 534 639 } 535 640 536 if (tdb-> global_lock.count && tdb->global_lock.ltype == ltype) {537 tdb-> global_lock.count++;538 return 0; 539 } 540 541 if (tdb-> global_lock.count) {641 if (tdb->allrecord_lock.count && tdb->allrecord_lock.ltype == ltype) { 642 tdb->allrecord_lock.count++; 643 return 0; 644 } 645 646 if (tdb->allrecord_lock.count) { 542 647 /* a global lock of a different type exists */ 543 648 tdb->ecode = TDB_ERR_LOCK; 544 649 return -1; 545 650 } 546 547 if (tdb ->num_locks != 0) {651 652 if (tdb_have_extra_locks(tdb)) { 548 653 /* can't combine global and chain locks */ 549 654 tdb->ecode = TDB_ERR_LOCK; … … 551 656 } 552 657 553 if (!mark_lock && 554 tdb->methods->tdb_brlock(tdb, FREELIST_TOP, ltype, op, 555 0, 4*tdb->header.hash_size)) { 556 if (op == F_SETLKW) { 557 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lockall failed (%s)\n", strerror(errno))); 658 if (upgradable && ltype != F_RDLCK) { 659 /* tdb error: you can't upgrade a write lock! */ 660 tdb->ecode = TDB_ERR_LOCK; 661 return -1; 662 } 663 return 1; 664 } 665 666 /* We only need to lock individual bytes, but Linux merges consecutive locks 667 * so we lock in contiguous ranges. */ 668 static int tdb_chainlock_gradual(struct tdb_context *tdb, 669 int ltype, enum tdb_lock_flags flags, 670 size_t off, size_t len) 671 { 672 int ret; 673 enum tdb_lock_flags nb_flags = (flags & ~TDB_LOCK_WAIT); 674 675 if (len <= 4) { 676 /* Single record. Just do blocking lock. */ 677 return tdb_brlock(tdb, ltype, off, len, flags); 678 } 679 680 /* First we try non-blocking. */ 681 ret = tdb_brlock(tdb, ltype, off, len, nb_flags); 682 if (ret == 0) { 683 return 0; 684 } 685 686 /* Try locking first half, then second. */ 687 ret = tdb_chainlock_gradual(tdb, ltype, flags, off, len / 2); 688 if (ret == -1) 689 return -1; 690 691 ret = tdb_chainlock_gradual(tdb, ltype, flags, 692 off + len / 2, len - len / 2); 693 if (ret == -1) { 694 tdb_brunlock(tdb, ltype, off, len / 2); 695 return -1; 696 } 697 return 0; 698 } 699 700 /* lock/unlock entire database. It can only be upgradable if you have some 701 * other way of guaranteeing exclusivity (ie. transaction write lock). 702 * We do the locking gradually to avoid being starved by smaller locks. */ 703 int tdb_allrecord_lock(struct tdb_context *tdb, int ltype, 704 enum tdb_lock_flags flags, bool upgradable) 705 { 706 switch (tdb_allrecord_check(tdb, ltype, flags, upgradable)) { 707 case -1: 708 return -1; 709 case 0: 710 return 0; 711 } 712 713 /* We cover two kinds of locks: 714 * 1) Normal chain locks. Taken for almost all operations. 715 * 3) Individual records locks. Taken after normal or free 716 * chain locks. 717 * 718 * It is (1) which cause the starvation problem, so we're only 719 * gradual for that. */ 720 if (tdb_chainlock_gradual(tdb, ltype, flags, FREELIST_TOP, 721 tdb->header.hash_size * 4) == -1) { 722 return -1; 723 } 724 725 /* Grab individual record locks. */ 726 if (tdb_brlock(tdb, ltype, lock_offset(tdb->header.hash_size), 0, 727 flags) == -1) { 728 tdb_brunlock(tdb, ltype, FREELIST_TOP, 729 tdb->header.hash_size * 4); 730 return -1; 731 } 732 733 tdb->allrecord_lock.count = 1; 734 /* If it's upgradable, it's actually exclusive so we can treat 735 * it as a write lock. */ 736 tdb->allrecord_lock.ltype = upgradable ? F_WRLCK : ltype; 737 tdb->allrecord_lock.off = upgradable; 738 739 if (tdb_needs_recovery(tdb)) { 740 bool mark = flags & TDB_LOCK_MARK_ONLY; 741 tdb_allrecord_unlock(tdb, ltype, mark); 742 if (mark) { 743 tdb->ecode = TDB_ERR_LOCK; 744 TDB_LOG((tdb, TDB_DEBUG_ERROR, 745 "tdb_lockall_mark cannot do recovery\n")); 746 return -1; 558 747 } 559 return -1;560 }561 562 tdb->global_lock.count = 1;563 tdb->global_lock.ltype = ltype;748 if (tdb_lock_and_recover(tdb) == -1) { 749 return -1; 750 } 751 return tdb_allrecord_lock(tdb, ltype, flags, upgradable); 752 } 564 753 565 754 return 0; … … 569 758 570 759 /* unlock entire db */ 571 static int _tdb_unlockall(struct tdb_context *tdb, int ltype) 572 { 573 bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK); 574 575 ltype &= ~TDB_MARK_LOCK; 576 760 int tdb_allrecord_unlock(struct tdb_context *tdb, int ltype, bool mark_lock) 761 { 577 762 /* There are no locks on read-only dbs */ 578 763 if (tdb->read_only || tdb->traverse_read) { … … 581 766 } 582 767 583 if (tdb-> global_lock.ltype != ltype || tdb->global_lock.count == 0) {768 if (tdb->allrecord_lock.count == 0) { 584 769 tdb->ecode = TDB_ERR_LOCK; 585 770 return -1; 586 771 } 587 772 588 if (tdb->global_lock.count > 1) { 589 tdb->global_lock.count--; 590 return 0; 591 } 592 593 if (!mark_lock && 594 tdb->methods->tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 595 0, 4*tdb->header.hash_size)) { 773 /* Upgradable locks are marked as write locks. */ 774 if (tdb->allrecord_lock.ltype != ltype 775 && (!tdb->allrecord_lock.off || ltype != F_RDLCK)) { 776 tdb->ecode = TDB_ERR_LOCK; 777 return -1; 778 } 779 780 if (tdb->allrecord_lock.count > 1) { 781 tdb->allrecord_lock.count--; 782 return 0; 783 } 784 785 if (!mark_lock && tdb_brunlock(tdb, ltype, FREELIST_TOP, 0)) { 596 786 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlockall failed (%s)\n", strerror(errno))); 597 787 return -1; 598 788 } 599 789 600 tdb-> global_lock.count = 0;601 tdb-> global_lock.ltype = 0;790 tdb->allrecord_lock.count = 0; 791 tdb->allrecord_lock.ltype = 0; 602 792 603 793 return 0; … … 605 795 606 796 /* lock entire database with write lock */ 607 int tdb_lockall(struct tdb_context *tdb)797 _PUBLIC_ int tdb_lockall(struct tdb_context *tdb) 608 798 { 609 799 tdb_trace(tdb, "tdb_lockall"); 610 return _tdb_lockall(tdb, F_WRLCK, F_SETLKW);800 return tdb_allrecord_lock(tdb, F_WRLCK, TDB_LOCK_WAIT, false); 611 801 } 612 802 613 803 /* lock entire database with write lock - mark only */ 614 int tdb_lockall_mark(struct tdb_context *tdb)804 _PUBLIC_ int tdb_lockall_mark(struct tdb_context *tdb) 615 805 { 616 806 tdb_trace(tdb, "tdb_lockall_mark"); 617 return _tdb_lockall(tdb, F_WRLCK | TDB_MARK_LOCK, F_SETLKW);807 return tdb_allrecord_lock(tdb, F_WRLCK, TDB_LOCK_MARK_ONLY, false); 618 808 } 619 809 620 810 /* unlock entire database with write lock - unmark only */ 621 int tdb_lockall_unmark(struct tdb_context *tdb)811 _PUBLIC_ int tdb_lockall_unmark(struct tdb_context *tdb) 622 812 { 623 813 tdb_trace(tdb, "tdb_lockall_unmark"); 624 return _tdb_unlockall(tdb, F_WRLCK | TDB_MARK_LOCK);814 return tdb_allrecord_unlock(tdb, F_WRLCK, true); 625 815 } 626 816 627 817 /* lock entire database with write lock - nonblocking varient */ 628 int tdb_lockall_nonblock(struct tdb_context *tdb)629 { 630 int ret = _tdb_lockall(tdb, F_WRLCK, F_SETLK);818 _PUBLIC_ int tdb_lockall_nonblock(struct tdb_context *tdb) 819 { 820 int ret = tdb_allrecord_lock(tdb, F_WRLCK, TDB_LOCK_NOWAIT, false); 631 821 tdb_trace_ret(tdb, "tdb_lockall_nonblock", ret); 632 822 return ret; … … 634 824 635 825 /* unlock entire database with write lock */ 636 int tdb_unlockall(struct tdb_context *tdb)826 _PUBLIC_ int tdb_unlockall(struct tdb_context *tdb) 637 827 { 638 828 tdb_trace(tdb, "tdb_unlockall"); 639 return _tdb_unlockall(tdb, F_WRLCK);829 return tdb_allrecord_unlock(tdb, F_WRLCK, false); 640 830 } 641 831 642 832 /* lock entire database with read lock */ 643 int tdb_lockall_read(struct tdb_context *tdb)833 _PUBLIC_ int tdb_lockall_read(struct tdb_context *tdb) 644 834 { 645 835 tdb_trace(tdb, "tdb_lockall_read"); 646 return _tdb_lockall(tdb, F_RDLCK, F_SETLKW);836 return tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, false); 647 837 } 648 838 649 839 /* lock entire database with read lock - nonblock varient */ 650 int tdb_lockall_read_nonblock(struct tdb_context *tdb)651 { 652 int ret = _tdb_lockall(tdb, F_RDLCK, F_SETLK);840 _PUBLIC_ int tdb_lockall_read_nonblock(struct tdb_context *tdb) 841 { 842 int ret = tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_NOWAIT, false); 653 843 tdb_trace_ret(tdb, "tdb_lockall_read_nonblock", ret); 654 844 return ret; … … 656 846 657 847 /* unlock entire database with read lock */ 658 int tdb_unlockall_read(struct tdb_context *tdb)848 _PUBLIC_ int tdb_unlockall_read(struct tdb_context *tdb) 659 849 { 660 850 tdb_trace(tdb, "tdb_unlockall_read"); 661 return _tdb_unlockall(tdb, F_RDLCK);851 return tdb_allrecord_unlock(tdb, F_RDLCK, false); 662 852 } 663 853 664 854 /* lock/unlock one hash chain. This is meant to be used to reduce 665 855 contention - it cannot guarantee how many records will be locked */ 666 int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)856 _PUBLIC_ int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key) 667 857 { 668 858 int ret = tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK); … … 674 864 to reduce contention - it cannot guarantee how many records will be 675 865 locked */ 676 int tdb_chainlock_nonblock(struct tdb_context *tdb, TDB_DATA key)866 _PUBLIC_ int tdb_chainlock_nonblock(struct tdb_context *tdb, TDB_DATA key) 677 867 { 678 868 int ret = tdb_lock_nonblock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK); … … 682 872 683 873 /* mark a chain as locked without actually locking it. Warning! use with great caution! */ 684 int tdb_chainlock_mark(struct tdb_context *tdb, TDB_DATA key) 685 { 686 int ret = tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK); 874 _PUBLIC_ int tdb_chainlock_mark(struct tdb_context *tdb, TDB_DATA key) 875 { 876 int ret = tdb_nest_lock(tdb, lock_offset(BUCKET(tdb->hash_fn(&key))), 877 F_WRLCK, TDB_LOCK_MARK_ONLY); 687 878 tdb_trace_1rec(tdb, "tdb_chainlock_mark", key); 688 879 return ret; … … 690 881 691 882 /* unmark a chain as locked without actually locking it. Warning! use with great caution! */ 692 int tdb_chainlock_unmark(struct tdb_context *tdb, TDB_DATA key)883 _PUBLIC_ int tdb_chainlock_unmark(struct tdb_context *tdb, TDB_DATA key) 693 884 { 694 885 tdb_trace_1rec(tdb, "tdb_chainlock_unmark", key); 695 return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK); 696 } 697 698 int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key) 886 return tdb_nest_unlock(tdb, lock_offset(BUCKET(tdb->hash_fn(&key))), 887 F_WRLCK, true); 888 } 889 890 _PUBLIC_ int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key) 699 891 { 700 892 tdb_trace_1rec(tdb, "tdb_chainunlock", key); … … 702 894 } 703 895 704 int tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key)896 _PUBLIC_ int tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key) 705 897 { 706 898 int ret; … … 710 902 } 711 903 712 int tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key)904 _PUBLIC_ int tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key) 713 905 { 714 906 tdb_trace_1rec(tdb, "tdb_chainunlock_read", key); … … 716 908 } 717 909 718 719 720 910 /* record lock stops delete underneath */ 721 911 int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off) 722 912 { 723 if (tdb-> global_lock.count) {724 return 0; 725 } 726 return off ? tdb ->methods->tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0, 1) : 0;913 if (tdb->allrecord_lock.count) { 914 return 0; 915 } 916 return off ? tdb_brlock(tdb, F_RDLCK, off, 1, TDB_LOCK_WAIT) : 0; 727 917 } 728 918 … … 738 928 if (i->off == off) 739 929 return -1; 740 return tdb->methods->tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1, 1); 741 } 742 743 /* 744 Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not 745 an error to fail to get the lock here. 746 */ 930 if (tdb->allrecord_lock.count) { 931 if (tdb->allrecord_lock.ltype == F_WRLCK) { 932 return 0; 933 } 934 return -1; 935 } 936 return tdb_brlock(tdb, F_WRLCK, off, 1, TDB_LOCK_NOWAIT|TDB_LOCK_PROBE); 937 } 938 747 939 int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off) 748 940 { 749 return tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0, 1); 941 if (tdb->allrecord_lock.count) { 942 return 0; 943 } 944 return tdb_brunlock(tdb, F_WRLCK, off, 1); 750 945 } 751 946 … … 756 951 uint32_t count = 0; 757 952 758 if (tdb-> global_lock.count) {953 if (tdb->allrecord_lock.count) { 759 954 return 0; 760 955 } … … 765 960 if (i->off == off) 766 961 count++; 767 return (count == 1 ? tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0, 1) : 0); 768 } 962 return (count == 1 ? tdb_brunlock(tdb, F_RDLCK, off, 1) : 0); 963 } 964 965 bool tdb_have_extra_locks(struct tdb_context *tdb) 966 { 967 unsigned int extra = tdb->num_lockrecs; 968 969 /* A transaction holds the lock for all records. */ 970 if (!tdb->transaction && tdb->allrecord_lock.count) { 971 return true; 972 } 973 974 /* We always hold the active lock if CLEAR_IF_FIRST. */ 975 if (find_nestlock(tdb, ACTIVE_LOCK)) { 976 extra--; 977 } 978 979 /* In a transaction, we expect to hold the transaction lock */ 980 if (tdb->transaction && find_nestlock(tdb, TRANSACTION_LOCK)) { 981 extra--; 982 } 983 984 return extra; 985 } 986 987 /* The transaction code uses this to remove all locks. */ 988 void tdb_release_transaction_locks(struct tdb_context *tdb) 989 { 990 unsigned int i, active = 0; 991 992 if (tdb->allrecord_lock.count != 0) { 993 tdb_brunlock(tdb, tdb->allrecord_lock.ltype, FREELIST_TOP, 0); 994 tdb->allrecord_lock.count = 0; 995 } 996 997 for (i=0;i<tdb->num_lockrecs;i++) { 998 struct tdb_lock_type *lck = &tdb->lockrecs[i]; 999 1000 /* Don't release the active lock! Copy it to first entry. */ 1001 if (lck->off == ACTIVE_LOCK) { 1002 tdb->lockrecs[active++] = *lck; 1003 } else { 1004 tdb_brunlock(tdb, lck->ltype, lck->off, 1); 1005 } 1006 } 1007 tdb->num_lockrecs = active; 1008 if (tdb->num_lockrecs == 0) { 1009 SAFE_FREE(tdb->lockrecs); 1010 } 1011 }
Note:
See TracChangeset
for help on using the changeset viewer.