Changeset 988 for vendor/current/lib/tevent/testsuite.c
- Timestamp:
- Nov 24, 2016, 1:14:11 PM (9 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
vendor/current/lib/tevent/testsuite.c
r740 r988 5 5 6 6 Copyright (C) Stefan Metzmacher 2006-2009 7 Copyright (C) Jeremy Allison 2013 7 8 8 9 ** NOTE! The following LGPL license applies to the tevent … … 25 26 26 27 #include "includes.h" 27 #include "lib/ events/events.h"28 #include "lib/tevent/tevent.h" 28 29 #include "system/filesys.h" 30 #include "system/select.h" 31 #include "system/network.h" 29 32 #include "torture/torture.h" 33 #include "torture/local/proto.h" 34 #ifdef HAVE_PTHREAD 35 #include <pthread.h> 36 #include <assert.h> 37 #endif 30 38 31 39 static int fde_count; 32 40 33 static void fde_handler (struct tevent_context *ev_ctx, struct tevent_fd *f,41 static void fde_handler_read(struct tevent_context *ev_ctx, struct tevent_fd *f, 34 42 uint16_t flags, void *private_data) 35 43 { … … 40 48 #endif 41 49 kill(getpid(), SIGALRM); 50 42 51 read(fd[0], &c, 1); 52 fde_count++; 53 } 54 55 static void fde_handler_write(struct tevent_context *ev_ctx, struct tevent_fd *f, 56 uint16_t flags, void *private_data) 57 { 58 int *fd = (int *)private_data; 59 char c = 0; 43 60 write(fd[1], &c, 1); 61 } 62 63 64 /* This will only fire if the fd's returned from pipe() are bi-directional. */ 65 static void fde_handler_read_1(struct tevent_context *ev_ctx, struct tevent_fd *f, 66 uint16_t flags, void *private_data) 67 { 68 int *fd = (int *)private_data; 69 char c; 70 #ifdef SA_SIGINFO 71 kill(getpid(), SIGUSR1); 72 #endif 73 kill(getpid(), SIGALRM); 74 75 read(fd[1], &c, 1); 44 76 fde_count++; 77 } 78 79 /* This will only fire if the fd's returned from pipe() are bi-directional. */ 80 static void fde_handler_write_1(struct tevent_context *ev_ctx, struct tevent_fd *f, 81 uint16_t flags, void *private_data) 82 { 83 int *fd = (int *)private_data; 84 char c = 0; 85 write(fd[0], &c, 1); 45 86 } 46 87 … … 52 93 } 53 94 54 static void count_handler(struct tevent_context *ev_ctx, struct signal_event*te,95 static void count_handler(struct tevent_context *ev_ctx, struct tevent_signal *te, 55 96 int signum, int count, void *info, void *private_data) 56 97 { … … 66 107 const char *backend = (const char *)test_data; 67 108 int alarm_count=0, info_count=0; 68 struct tevent_fd *fde; 109 struct tevent_fd *fde_read; 110 struct tevent_fd *fde_read_1; 111 struct tevent_fd *fde_write; 112 struct tevent_fd *fde_write_1; 69 113 #ifdef SA_RESTART 70 114 struct tevent_signal *se1 = NULL; 71 115 #endif 116 #ifdef SA_RESETHAND 72 117 struct tevent_signal *se2 = NULL; 118 #endif 73 119 #ifdef SA_SIGINFO 74 120 struct tevent_signal *se3 = NULL; … … 76 122 int finished=0; 77 123 struct timeval t; 78 char c = 0;79 80 ev_ctx = event_context_init_byname(test, backend);124 int ret; 125 126 ev_ctx = tevent_context_init_byname(test, backend); 81 127 if (ev_ctx == NULL) { 82 128 torture_comment(test, "event backend '%s' not supported\n", backend); … … 84 130 } 85 131 86 torture_comment(test, "Testing event backend '%s'\n", backend); 132 torture_comment(test, "backend '%s' - %s\n", 133 backend, __FUNCTION__); 87 134 88 135 /* reset globals */ … … 90 137 91 138 /* create a pipe */ 92 pipe(fd); 93 94 fde = event_add_fd(ev_ctx, ev_ctx, fd[0], EVENT_FD_READ, 95 fde_handler, fd); 96 tevent_fd_set_auto_close(fde); 97 98 event_add_timed(ev_ctx, ev_ctx, timeval_current_ofs(2,0), 99 finished_handler, &finished); 139 ret = pipe(fd); 140 torture_assert_int_equal(test, ret, 0, "pipe failed"); 141 142 fde_read = tevent_add_fd(ev_ctx, ev_ctx, fd[0], TEVENT_FD_READ, 143 fde_handler_read, fd); 144 fde_write_1 = tevent_add_fd(ev_ctx, ev_ctx, fd[0], TEVENT_FD_WRITE, 145 fde_handler_write_1, fd); 146 147 fde_write = tevent_add_fd(ev_ctx, ev_ctx, fd[1], TEVENT_FD_WRITE, 148 fde_handler_write, fd); 149 fde_read_1 = tevent_add_fd(ev_ctx, ev_ctx, fd[1], TEVENT_FD_READ, 150 fde_handler_read_1, fd); 151 152 tevent_fd_set_auto_close(fde_read); 153 tevent_fd_set_auto_close(fde_write); 154 155 tevent_add_timer(ev_ctx, ev_ctx, timeval_current_ofs(2,0), 156 finished_handler, &finished); 100 157 101 158 #ifdef SA_RESTART 102 se1 = event_add_signal(ev_ctx, ev_ctx, SIGALRM, SA_RESTART, count_handler, &alarm_count); 159 se1 = tevent_add_signal(ev_ctx, ev_ctx, SIGALRM, SA_RESTART, count_handler, &alarm_count); 160 torture_assert(test, se1 != NULL, "failed to setup se1"); 103 161 #endif 104 162 #ifdef SA_RESETHAND 105 se2 = event_add_signal(ev_ctx, ev_ctx, SIGALRM, SA_RESETHAND, count_handler, &alarm_count); 163 se2 = tevent_add_signal(ev_ctx, ev_ctx, SIGALRM, SA_RESETHAND, count_handler, &alarm_count); 164 torture_assert(test, se2 != NULL, "failed to setup se2"); 106 165 #endif 107 166 #ifdef SA_SIGINFO 108 se3 = event_add_signal(ev_ctx, ev_ctx, SIGUSR1, SA_SIGINFO, count_handler, &info_count); 109 #endif 110 111 write(fd[1], &c, 1); 167 se3 = tevent_add_signal(ev_ctx, ev_ctx, SIGUSR1, SA_SIGINFO, count_handler, &info_count); 168 torture_assert(test, se3 != NULL, "failed to setup se3"); 169 #endif 112 170 113 171 t = timeval_current(); 114 172 while (!finished) { 115 173 errno = 0; 116 if ( event_loop_once(ev_ctx) == -1) {174 if (tevent_loop_once(ev_ctx) == -1) { 117 175 talloc_free(ev_ctx); 118 176 torture_fail(test, talloc_asprintf(test, "Failed event loop %s\n", strerror(errno))); … … 120 178 } 121 179 122 talloc_free(fde); 123 close(fd[1]); 180 talloc_free(fde_read_1); 181 talloc_free(fde_write_1); 182 talloc_free(fde_read); 183 talloc_free(fde_write); 124 184 125 185 while (alarm_count < fde_count+1) { 126 if ( event_loop_once(ev_ctx) == -1) {186 if (tevent_loop_once(ev_ctx) == -1) { 127 187 break; 128 188 } … … 136 196 137 197 torture_assert_int_equal(test, alarm_count, 1+fde_count, "alarm count mismatch"); 198 199 #ifdef SA_RESETHAND 200 /* 201 * we do not call talloc_free(se2) 202 * because it is already gone, 203 * after triggering the event handler. 204 */ 205 #endif 138 206 139 207 #ifdef SA_SIGINFO … … 147 215 } 148 216 217 struct test_event_fd1_state { 218 struct torture_context *tctx; 219 const char *backend; 220 struct tevent_context *ev; 221 int sock[2]; 222 struct tevent_timer *te; 223 struct tevent_fd *fde0; 224 struct tevent_fd *fde1; 225 bool got_write; 226 bool got_read; 227 bool drain; 228 bool drain_done; 229 unsigned loop_count; 230 bool finished; 231 const char *error; 232 }; 233 234 static void test_event_fd1_fde_handler(struct tevent_context *ev_ctx, 235 struct tevent_fd *fde, 236 uint16_t flags, 237 void *private_data) 238 { 239 struct test_event_fd1_state *state = 240 (struct test_event_fd1_state *)private_data; 241 242 if (state->drain_done) { 243 state->finished = true; 244 state->error = __location__; 245 return; 246 } 247 248 if (state->drain) { 249 ssize_t ret; 250 uint8_t c = 0; 251 252 if (!(flags & TEVENT_FD_READ)) { 253 state->finished = true; 254 state->error = __location__; 255 return; 256 } 257 258 ret = read(state->sock[0], &c, 1); 259 if (ret == 1) { 260 return; 261 } 262 263 /* 264 * end of test... 265 */ 266 tevent_fd_set_flags(fde, 0); 267 state->drain_done = true; 268 return; 269 } 270 271 if (!state->got_write) { 272 uint8_t c = 0; 273 274 if (flags != TEVENT_FD_WRITE) { 275 state->finished = true; 276 state->error = __location__; 277 return; 278 } 279 state->got_write = true; 280 281 /* 282 * we write to the other socket... 283 */ 284 write(state->sock[1], &c, 1); 285 TEVENT_FD_NOT_WRITEABLE(fde); 286 TEVENT_FD_READABLE(fde); 287 return; 288 } 289 290 if (!state->got_read) { 291 if (flags != TEVENT_FD_READ) { 292 state->finished = true; 293 state->error = __location__; 294 return; 295 } 296 state->got_read = true; 297 298 TEVENT_FD_NOT_READABLE(fde); 299 return; 300 } 301 302 state->finished = true; 303 state->error = __location__; 304 return; 305 } 306 307 static void test_event_fd1_finished(struct tevent_context *ev_ctx, 308 struct tevent_timer *te, 309 struct timeval tval, 310 void *private_data) 311 { 312 struct test_event_fd1_state *state = 313 (struct test_event_fd1_state *)private_data; 314 315 if (state->drain_done) { 316 state->finished = true; 317 return; 318 } 319 320 if (!state->got_write) { 321 state->finished = true; 322 state->error = __location__; 323 return; 324 } 325 326 if (!state->got_read) { 327 state->finished = true; 328 state->error = __location__; 329 return; 330 } 331 332 state->loop_count++; 333 if (state->loop_count > 3) { 334 state->finished = true; 335 state->error = __location__; 336 return; 337 } 338 339 state->got_write = false; 340 state->got_read = false; 341 342 tevent_fd_set_flags(state->fde0, TEVENT_FD_WRITE); 343 344 if (state->loop_count > 2) { 345 state->drain = true; 346 TALLOC_FREE(state->fde1); 347 TEVENT_FD_READABLE(state->fde0); 348 } 349 350 state->te = tevent_add_timer(state->ev, state->ev, 351 timeval_current_ofs(0,2000), 352 test_event_fd1_finished, state); 353 } 354 355 static bool test_event_fd1(struct torture_context *tctx, 356 const void *test_data) 357 { 358 struct test_event_fd1_state state; 359 360 ZERO_STRUCT(state); 361 state.tctx = tctx; 362 state.backend = (const char *)test_data; 363 364 state.ev = tevent_context_init_byname(tctx, state.backend); 365 if (state.ev == NULL) { 366 torture_skip(tctx, talloc_asprintf(tctx, 367 "event backend '%s' not supported\n", 368 state.backend)); 369 return true; 370 } 371 372 tevent_set_debug_stderr(state.ev); 373 torture_comment(tctx, "backend '%s' - %s\n", 374 state.backend, __FUNCTION__); 375 376 /* 377 * This tests the following: 378 * 379 * It monitors the state of state.sock[0] 380 * with tevent_fd, but we never read/write on state.sock[0] 381 * while state.sock[1] * is only used to write a few bytes. 382 * 383 * We have a loop: 384 * - we wait only for TEVENT_FD_WRITE on state.sock[0] 385 * - we write 1 byte to state.sock[1] 386 * - we wait only for TEVENT_FD_READ on state.sock[0] 387 * - we disable events on state.sock[0] 388 * - the timer event restarts the loop 389 * Then we close state.sock[1] 390 * We have a loop: 391 * - we wait for TEVENT_FD_READ/WRITE on state.sock[0] 392 * - we try to read 1 byte 393 * - if the read gets an error of returns 0 394 * we disable the event handler 395 * - the timer finishes the test 396 */ 397 state.sock[0] = -1; 398 state.sock[1] = -1; 399 socketpair(AF_UNIX, SOCK_STREAM, 0, state.sock); 400 401 state.te = tevent_add_timer(state.ev, state.ev, 402 timeval_current_ofs(0,1000), 403 test_event_fd1_finished, &state); 404 state.fde0 = tevent_add_fd(state.ev, state.ev, 405 state.sock[0], TEVENT_FD_WRITE, 406 test_event_fd1_fde_handler, &state); 407 /* state.fde1 is only used to auto close */ 408 state.fde1 = tevent_add_fd(state.ev, state.ev, 409 state.sock[1], 0, 410 test_event_fd1_fde_handler, &state); 411 412 tevent_fd_set_auto_close(state.fde0); 413 tevent_fd_set_auto_close(state.fde1); 414 415 while (!state.finished) { 416 errno = 0; 417 if (tevent_loop_once(state.ev) == -1) { 418 talloc_free(state.ev); 419 torture_fail(tctx, talloc_asprintf(tctx, 420 "Failed event loop %s\n", 421 strerror(errno))); 422 } 423 } 424 425 talloc_free(state.ev); 426 427 torture_assert(tctx, state.error == NULL, talloc_asprintf(tctx, 428 "%s", state.error)); 429 430 return true; 431 } 432 433 struct test_event_fd2_state { 434 struct torture_context *tctx; 435 const char *backend; 436 struct tevent_context *ev; 437 struct tevent_timer *te; 438 struct test_event_fd2_sock { 439 struct test_event_fd2_state *state; 440 int fd; 441 struct tevent_fd *fde; 442 size_t num_written; 443 size_t num_read; 444 bool got_full; 445 } sock0, sock1; 446 bool finished; 447 const char *error; 448 }; 449 450 static void test_event_fd2_sock_handler(struct tevent_context *ev_ctx, 451 struct tevent_fd *fde, 452 uint16_t flags, 453 void *private_data) 454 { 455 struct test_event_fd2_sock *cur_sock = 456 (struct test_event_fd2_sock *)private_data; 457 struct test_event_fd2_state *state = cur_sock->state; 458 struct test_event_fd2_sock *oth_sock = NULL; 459 uint8_t v = 0, c; 460 ssize_t ret; 461 462 if (cur_sock == &state->sock0) { 463 oth_sock = &state->sock1; 464 } else { 465 oth_sock = &state->sock0; 466 } 467 468 if (oth_sock->num_written == 1) { 469 if (flags != (TEVENT_FD_READ | TEVENT_FD_WRITE)) { 470 state->finished = true; 471 state->error = __location__; 472 return; 473 } 474 } 475 476 if (cur_sock->num_read == oth_sock->num_written) { 477 state->finished = true; 478 state->error = __location__; 479 return; 480 } 481 482 if (!(flags & TEVENT_FD_READ)) { 483 state->finished = true; 484 state->error = __location__; 485 return; 486 } 487 488 if (oth_sock->num_read >= PIPE_BUF) { 489 /* 490 * On Linux we become writable once we've read 491 * one byte. On Solaris we only become writable 492 * again once we've read 4096 bytes. PIPE_BUF 493 * is probably a safe bet to test against. 494 * 495 * There should be room to write a byte again 496 */ 497 if (!(flags & TEVENT_FD_WRITE)) { 498 state->finished = true; 499 state->error = __location__; 500 return; 501 } 502 } 503 504 if ((flags & TEVENT_FD_WRITE) && !cur_sock->got_full) { 505 v = (uint8_t)cur_sock->num_written; 506 ret = write(cur_sock->fd, &v, 1); 507 if (ret != 1) { 508 state->finished = true; 509 state->error = __location__; 510 return; 511 } 512 cur_sock->num_written++; 513 if (cur_sock->num_written > 0x80000000) { 514 state->finished = true; 515 state->error = __location__; 516 return; 517 } 518 return; 519 } 520 521 if (!cur_sock->got_full) { 522 cur_sock->got_full = true; 523 524 if (!oth_sock->got_full) { 525 /* 526 * cur_sock is full, 527 * lets wait for oth_sock 528 * to be filled 529 */ 530 tevent_fd_set_flags(cur_sock->fde, 0); 531 return; 532 } 533 534 /* 535 * oth_sock waited for cur_sock, 536 * lets restart it 537 */ 538 tevent_fd_set_flags(oth_sock->fde, 539 TEVENT_FD_READ|TEVENT_FD_WRITE); 540 } 541 542 ret = read(cur_sock->fd, &v, 1); 543 if (ret != 1) { 544 state->finished = true; 545 state->error = __location__; 546 return; 547 } 548 c = (uint8_t)cur_sock->num_read; 549 if (c != v) { 550 state->finished = true; 551 state->error = __location__; 552 return; 553 } 554 cur_sock->num_read++; 555 556 if (cur_sock->num_read < oth_sock->num_written) { 557 /* there is more to read */ 558 return; 559 } 560 /* 561 * we read everything, we need to remove TEVENT_FD_WRITE 562 * to avoid spinning 563 */ 564 TEVENT_FD_NOT_WRITEABLE(cur_sock->fde); 565 566 if (oth_sock->num_read == cur_sock->num_written) { 567 /* 568 * both directions are finished 569 */ 570 state->finished = true; 571 } 572 573 return; 574 } 575 576 static void test_event_fd2_finished(struct tevent_context *ev_ctx, 577 struct tevent_timer *te, 578 struct timeval tval, 579 void *private_data) 580 { 581 struct test_event_fd2_state *state = 582 (struct test_event_fd2_state *)private_data; 583 584 /* 585 * this should never be triggered 586 */ 587 state->finished = true; 588 state->error = __location__; 589 } 590 591 static bool test_event_fd2(struct torture_context *tctx, 592 const void *test_data) 593 { 594 struct test_event_fd2_state state; 595 int sock[2]; 596 uint8_t c = 0; 597 598 ZERO_STRUCT(state); 599 state.tctx = tctx; 600 state.backend = (const char *)test_data; 601 602 state.ev = tevent_context_init_byname(tctx, state.backend); 603 if (state.ev == NULL) { 604 torture_skip(tctx, talloc_asprintf(tctx, 605 "event backend '%s' not supported\n", 606 state.backend)); 607 return true; 608 } 609 610 tevent_set_debug_stderr(state.ev); 611 torture_comment(tctx, "backend '%s' - %s\n", 612 state.backend, __FUNCTION__); 613 614 /* 615 * This tests the following 616 * 617 * - We write 1 byte to each socket 618 * - We wait for TEVENT_FD_READ/WRITE on both sockets 619 * - When we get TEVENT_FD_WRITE we write 1 byte 620 * until both socket buffers are full, which 621 * means both sockets only get TEVENT_FD_READ. 622 * - Then we read 1 byte until we have consumed 623 * all bytes the other end has written. 624 */ 625 sock[0] = -1; 626 sock[1] = -1; 627 socketpair(AF_UNIX, SOCK_STREAM, 0, sock); 628 629 /* 630 * the timer should never expire 631 */ 632 state.te = tevent_add_timer(state.ev, state.ev, 633 timeval_current_ofs(600, 0), 634 test_event_fd2_finished, &state); 635 state.sock0.state = &state; 636 state.sock0.fd = sock[0]; 637 state.sock0.fde = tevent_add_fd(state.ev, state.ev, 638 state.sock0.fd, 639 TEVENT_FD_READ | TEVENT_FD_WRITE, 640 test_event_fd2_sock_handler, 641 &state.sock0); 642 state.sock1.state = &state; 643 state.sock1.fd = sock[1]; 644 state.sock1.fde = tevent_add_fd(state.ev, state.ev, 645 state.sock1.fd, 646 TEVENT_FD_READ | TEVENT_FD_WRITE, 647 test_event_fd2_sock_handler, 648 &state.sock1); 649 650 tevent_fd_set_auto_close(state.sock0.fde); 651 tevent_fd_set_auto_close(state.sock1.fde); 652 653 write(state.sock0.fd, &c, 1); 654 state.sock0.num_written++; 655 write(state.sock1.fd, &c, 1); 656 state.sock1.num_written++; 657 658 while (!state.finished) { 659 errno = 0; 660 if (tevent_loop_once(state.ev) == -1) { 661 talloc_free(state.ev); 662 torture_fail(tctx, talloc_asprintf(tctx, 663 "Failed event loop %s\n", 664 strerror(errno))); 665 } 666 } 667 668 talloc_free(state.ev); 669 670 torture_assert(tctx, state.error == NULL, talloc_asprintf(tctx, 671 "%s", state.error)); 672 673 return true; 674 } 675 676 #ifdef HAVE_PTHREAD 677 678 static pthread_mutex_t threaded_mutex = PTHREAD_MUTEX_INITIALIZER; 679 static bool do_shutdown = false; 680 681 static void test_event_threaded_lock(void) 682 { 683 int ret; 684 ret = pthread_mutex_lock(&threaded_mutex); 685 assert(ret == 0); 686 } 687 688 static void test_event_threaded_unlock(void) 689 { 690 int ret; 691 ret = pthread_mutex_unlock(&threaded_mutex); 692 assert(ret == 0); 693 } 694 695 static void test_event_threaded_trace(enum tevent_trace_point point, 696 void *private_data) 697 { 698 switch (point) { 699 case TEVENT_TRACE_BEFORE_WAIT: 700 test_event_threaded_unlock(); 701 break; 702 case TEVENT_TRACE_AFTER_WAIT: 703 test_event_threaded_lock(); 704 break; 705 case TEVENT_TRACE_BEFORE_LOOP_ONCE: 706 case TEVENT_TRACE_AFTER_LOOP_ONCE: 707 break; 708 } 709 } 710 711 static void test_event_threaded_timer(struct tevent_context *ev, 712 struct tevent_timer *te, 713 struct timeval current_time, 714 void *private_data) 715 { 716 return; 717 } 718 719 static void *test_event_poll_thread(void *private_data) 720 { 721 struct tevent_context *ev = (struct tevent_context *)private_data; 722 723 test_event_threaded_lock(); 724 725 while (true) { 726 int ret; 727 ret = tevent_loop_once(ev); 728 assert(ret == 0); 729 if (do_shutdown) { 730 test_event_threaded_unlock(); 731 return NULL; 732 } 733 } 734 735 } 736 737 static void test_event_threaded_read_handler(struct tevent_context *ev, 738 struct tevent_fd *fde, 739 uint16_t flags, 740 void *private_data) 741 { 742 int *pfd = (int *)private_data; 743 char c; 744 ssize_t nread; 745 746 if ((flags & TEVENT_FD_READ) == 0) { 747 return; 748 } 749 750 do { 751 nread = read(*pfd, &c, 1); 752 } while ((nread == -1) && (errno == EINTR)); 753 754 assert(nread == 1); 755 } 756 757 static bool test_event_context_threaded(struct torture_context *test, 758 const void *test_data) 759 { 760 struct tevent_context *ev; 761 struct tevent_timer *te; 762 struct tevent_fd *fde; 763 pthread_t poll_thread; 764 int fds[2]; 765 int ret; 766 char c = 0; 767 768 ev = tevent_context_init_byname(test, "poll_mt"); 769 torture_assert(test, ev != NULL, "poll_mt not supported"); 770 771 tevent_set_trace_callback(ev, test_event_threaded_trace, NULL); 772 773 te = tevent_add_timer(ev, ev, timeval_current_ofs(5, 0), 774 test_event_threaded_timer, NULL); 775 torture_assert(test, te != NULL, "Could not add timer"); 776 777 ret = pthread_create(&poll_thread, NULL, test_event_poll_thread, ev); 778 torture_assert(test, ret == 0, "Could not create poll thread"); 779 780 ret = pipe(fds); 781 torture_assert(test, ret == 0, "Could not create pipe"); 782 783 poll(NULL, 0, 100); 784 785 test_event_threaded_lock(); 786 787 fde = tevent_add_fd(ev, ev, fds[0], TEVENT_FD_READ, 788 test_event_threaded_read_handler, &fds[0]); 789 torture_assert(test, fde != NULL, "Could not add fd event"); 790 791 test_event_threaded_unlock(); 792 793 poll(NULL, 0, 100); 794 795 write(fds[1], &c, 1); 796 797 poll(NULL, 0, 100); 798 799 test_event_threaded_lock(); 800 do_shutdown = true; 801 test_event_threaded_unlock(); 802 803 write(fds[1], &c, 1); 804 805 ret = pthread_join(poll_thread, NULL); 806 torture_assert(test, ret == 0, "pthread_join failed"); 807 808 return true; 809 } 810 811 #define NUM_TEVENT_THREADS 100 812 813 /* Ugly, but needed for torture_comment... */ 814 static struct torture_context *thread_test_ctx; 815 static pthread_t thread_map[NUM_TEVENT_THREADS]; 816 static unsigned thread_counter; 817 818 /* Called in master thread context */ 819 static void callback_nowait(struct tevent_context *ev, 820 struct tevent_immediate *im, 821 void *private_ptr) 822 { 823 pthread_t *thread_id_ptr = 824 talloc_get_type_abort(private_ptr, pthread_t); 825 unsigned i; 826 827 for (i = 0; i < NUM_TEVENT_THREADS; i++) { 828 if (pthread_equal(*thread_id_ptr, 829 thread_map[i])) { 830 break; 831 } 832 } 833 torture_comment(thread_test_ctx, 834 "Callback %u from thread %u\n", 835 thread_counter, 836 i); 837 thread_counter++; 838 } 839 840 /* Blast the master tevent_context with a callback, no waiting. */ 841 static void *thread_fn_nowait(void *private_ptr) 842 { 843 struct tevent_thread_proxy *master_tp = 844 talloc_get_type_abort(private_ptr, struct tevent_thread_proxy); 845 struct tevent_immediate *im; 846 pthread_t *thread_id_ptr; 847 848 im = tevent_create_immediate(NULL); 849 if (im == NULL) { 850 return NULL; 851 } 852 thread_id_ptr = talloc(NULL, pthread_t); 853 if (thread_id_ptr == NULL) { 854 return NULL; 855 } 856 *thread_id_ptr = pthread_self(); 857 858 tevent_thread_proxy_schedule(master_tp, 859 &im, 860 callback_nowait, 861 &thread_id_ptr); 862 return NULL; 863 } 864 865 static void timeout_fn(struct tevent_context *ev, 866 struct tevent_timer *te, 867 struct timeval tv, void *p) 868 { 869 thread_counter = NUM_TEVENT_THREADS * 10; 870 } 871 872 static bool test_multi_tevent_threaded(struct torture_context *test, 873 const void *test_data) 874 { 875 unsigned i; 876 struct tevent_context *master_ev; 877 struct tevent_thread_proxy *tp; 878 879 talloc_disable_null_tracking(); 880 881 /* Ugly global stuff. */ 882 thread_test_ctx = test; 883 thread_counter = 0; 884 885 master_ev = tevent_context_init(NULL); 886 if (master_ev == NULL) { 887 return false; 888 } 889 tevent_set_debug_stderr(master_ev); 890 891 tp = tevent_thread_proxy_create(master_ev); 892 if (tp == NULL) { 893 torture_fail(test, 894 talloc_asprintf(test, 895 "tevent_thread_proxy_create failed\n")); 896 talloc_free(master_ev); 897 return false; 898 } 899 900 for (i = 0; i < NUM_TEVENT_THREADS; i++) { 901 int ret = pthread_create(&thread_map[i], 902 NULL, 903 thread_fn_nowait, 904 tp); 905 if (ret != 0) { 906 torture_fail(test, 907 talloc_asprintf(test, 908 "Failed to create thread %i, %d\n", 909 i, ret)); 910 return false; 911 } 912 } 913 914 /* Ensure we don't wait more than 10 seconds. */ 915 tevent_add_timer(master_ev, 916 master_ev, 917 timeval_current_ofs(10,0), 918 timeout_fn, 919 NULL); 920 921 while (thread_counter < NUM_TEVENT_THREADS) { 922 int ret = tevent_loop_once(master_ev); 923 torture_assert(test, ret == 0, "tevent_loop_once failed"); 924 } 925 926 torture_assert(test, thread_counter == NUM_TEVENT_THREADS, 927 "thread_counter fail\n"); 928 929 talloc_free(master_ev); 930 return true; 931 } 932 933 struct reply_state { 934 struct tevent_thread_proxy *reply_tp; 935 pthread_t thread_id; 936 int *p_finished; 937 }; 938 939 static void thread_timeout_fn(struct tevent_context *ev, 940 struct tevent_timer *te, 941 struct timeval tv, void *p) 942 { 943 int *p_finished = (int *)p; 944 945 *p_finished = 2; 946 } 947 948 /* Called in child-thread context */ 949 static void thread_callback(struct tevent_context *ev, 950 struct tevent_immediate *im, 951 void *private_ptr) 952 { 953 struct reply_state *rsp = 954 talloc_get_type_abort(private_ptr, struct reply_state); 955 956 talloc_steal(ev, rsp); 957 *rsp->p_finished = 1; 958 } 959 960 /* Called in master thread context */ 961 static void master_callback(struct tevent_context *ev, 962 struct tevent_immediate *im, 963 void *private_ptr) 964 { 965 struct reply_state *rsp = 966 talloc_get_type_abort(private_ptr, struct reply_state); 967 unsigned i; 968 969 talloc_steal(ev, rsp); 970 971 for (i = 0; i < NUM_TEVENT_THREADS; i++) { 972 if (pthread_equal(rsp->thread_id, 973 thread_map[i])) { 974 break; 975 } 976 } 977 torture_comment(thread_test_ctx, 978 "Callback %u from thread %u\n", 979 thread_counter, 980 i); 981 /* Now reply to the thread ! */ 982 tevent_thread_proxy_schedule(rsp->reply_tp, 983 &im, 984 thread_callback, 985 &rsp); 986 987 thread_counter++; 988 } 989 990 static void *thread_fn_1(void *private_ptr) 991 { 992 struct tevent_thread_proxy *master_tp = 993 talloc_get_type_abort(private_ptr, struct tevent_thread_proxy); 994 struct tevent_thread_proxy *tp; 995 struct tevent_immediate *im; 996 struct tevent_context *ev; 997 struct reply_state *rsp; 998 int finished = 0; 999 int ret; 1000 1001 ev = tevent_context_init(NULL); 1002 if (ev == NULL) { 1003 return NULL; 1004 } 1005 1006 tp = tevent_thread_proxy_create(ev); 1007 if (tp == NULL) { 1008 talloc_free(ev); 1009 return NULL; 1010 } 1011 1012 im = tevent_create_immediate(ev); 1013 if (im == NULL) { 1014 talloc_free(ev); 1015 return NULL; 1016 } 1017 1018 rsp = talloc(ev, struct reply_state); 1019 if (rsp == NULL) { 1020 talloc_free(ev); 1021 return NULL; 1022 } 1023 1024 rsp->thread_id = pthread_self(); 1025 rsp->reply_tp = tp; 1026 rsp->p_finished = &finished; 1027 1028 /* Introduce a little randomness into the mix.. */ 1029 usleep(random() % 7000); 1030 1031 tevent_thread_proxy_schedule(master_tp, 1032 &im, 1033 master_callback, 1034 &rsp); 1035 1036 /* Ensure we don't wait more than 10 seconds. */ 1037 tevent_add_timer(ev, 1038 ev, 1039 timeval_current_ofs(10,0), 1040 thread_timeout_fn, 1041 &finished); 1042 1043 while (finished == 0) { 1044 ret = tevent_loop_once(ev); 1045 assert(ret == 0); 1046 } 1047 1048 if (finished > 1) { 1049 /* Timeout ! */ 1050 abort(); 1051 } 1052 1053 /* 1054 * NB. We should talloc_free(ev) here, but if we do 1055 * we currently get hit by helgrind Fix #323432 1056 * "When calling pthread_cond_destroy or pthread_mutex_destroy 1057 * with initializers as argument Helgrind (incorrectly) reports errors." 1058 * 1059 * http://valgrind.10908.n7.nabble.com/Helgrind-3-9-0-false-positive- 1060 * with-pthread-mutex-destroy-td47757.html 1061 * 1062 * Helgrind doesn't understand that the request/reply 1063 * messages provide synchronization between the lock/unlock 1064 * in tevent_thread_proxy_schedule(), and the pthread_destroy() 1065 * when the struct tevent_thread_proxy object is talloc_free'd. 1066 * 1067 * As a work-around for now return ev for the parent thread to free. 1068 */ 1069 return ev; 1070 } 1071 1072 static bool test_multi_tevent_threaded_1(struct torture_context *test, 1073 const void *test_data) 1074 { 1075 unsigned i; 1076 struct tevent_context *master_ev; 1077 struct tevent_thread_proxy *master_tp; 1078 int ret; 1079 1080 talloc_disable_null_tracking(); 1081 1082 /* Ugly global stuff. */ 1083 thread_test_ctx = test; 1084 thread_counter = 0; 1085 1086 master_ev = tevent_context_init(NULL); 1087 if (master_ev == NULL) { 1088 return false; 1089 } 1090 tevent_set_debug_stderr(master_ev); 1091 1092 master_tp = tevent_thread_proxy_create(master_ev); 1093 if (master_tp == NULL) { 1094 torture_fail(test, 1095 talloc_asprintf(test, 1096 "tevent_thread_proxy_create failed\n")); 1097 talloc_free(master_ev); 1098 return false; 1099 } 1100 1101 for (i = 0; i < NUM_TEVENT_THREADS; i++) { 1102 ret = pthread_create(&thread_map[i], 1103 NULL, 1104 thread_fn_1, 1105 master_tp); 1106 if (ret != 0) { 1107 torture_fail(test, 1108 talloc_asprintf(test, 1109 "Failed to create thread %i, %d\n", 1110 i, ret)); 1111 return false; 1112 } 1113 } 1114 1115 while (thread_counter < NUM_TEVENT_THREADS) { 1116 ret = tevent_loop_once(master_ev); 1117 torture_assert(test, ret == 0, "tevent_loop_once failed"); 1118 } 1119 1120 /* Wait for all the threads to finish - join 'em. */ 1121 for (i = 0; i < NUM_TEVENT_THREADS; i++) { 1122 void *retval; 1123 ret = pthread_join(thread_map[i], &retval); 1124 torture_assert(test, ret == 0, "pthread_join failed"); 1125 /* Free the child thread event context. */ 1126 talloc_free(retval); 1127 } 1128 1129 talloc_free(master_ev); 1130 return true; 1131 } 1132 #endif 1133 149 1134 struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx) 150 1135 { 151 1136 struct torture_suite *suite = torture_suite_create(mem_ctx, "event"); 152 const char **list = event_backend_list(suite);1137 const char **list = tevent_backend_list(suite); 153 1138 int i; 154 1139 155 1140 for (i=0;list && list[i];i++) { 156 torture_suite_add_simple_tcase_const(suite, list[i], 1141 struct torture_suite *backend_suite; 1142 1143 backend_suite = torture_suite_create(mem_ctx, list[i]); 1144 1145 torture_suite_add_simple_tcase_const(backend_suite, 1146 "context", 157 1147 test_event_context, 158 1148 (const void *)list[i]); 159 } 1149 torture_suite_add_simple_tcase_const(backend_suite, 1150 "fd1", 1151 test_event_fd1, 1152 (const void *)list[i]); 1153 torture_suite_add_simple_tcase_const(backend_suite, 1154 "fd2", 1155 test_event_fd2, 1156 (const void *)list[i]); 1157 1158 torture_suite_add_suite(suite, backend_suite); 1159 } 1160 1161 #ifdef HAVE_PTHREAD 1162 torture_suite_add_simple_tcase_const(suite, "threaded_poll_mt", 1163 test_event_context_threaded, 1164 NULL); 1165 1166 torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded", 1167 test_multi_tevent_threaded, 1168 NULL); 1169 1170 torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded_1", 1171 test_multi_tevent_threaded_1, 1172 NULL); 1173 1174 #endif 160 1175 161 1176 return suite;
Note:
See TracChangeset
for help on using the changeset viewer.