Ignore:
Timestamp:
Nov 24, 2016, 1:14:11 PM (9 years ago)
Author:
Silvan Scherrer
Message:

Samba Server: update vendor to version 4.4.3

File:
1 edited

Legend:

Unmodified
Added
Removed
  • vendor/current/lib/tevent/bindings.py

    r740 r988  
    2323
    2424import signal
     25from unittest import TestCase, TestProgram
     26import gc
     27
    2528import _tevent
    26 from unittest import TestCase
    2729
    2830class BackendListTests(TestCase):
     
    6163        sig = self.ctx.add_signal(signal.SIGINT, 0, lambda callback: None)
    6264        self.assertTrue(isinstance(sig, _tevent.Signal))
     65
     66    def test_timer(self):
     67        """Test a timer is can be scheduled"""
     68        collecting_list = []
     69        # time "0" has already passed, callback will be scheduled immediately
     70        timer = self.ctx.add_timer(0, lambda t: collecting_list.append(True))
     71        self.assertTrue(timer.active)
     72        self.assertEqual(collecting_list, [])
     73        self.ctx.loop_once()
     74        self.assertFalse(timer.active)
     75        self.assertEqual(collecting_list, [True])
     76
     77    def test_timer_deallocate_timer(self):
     78        """Test timer is scheduled even if reference to it isn't held"""
     79        collecting_list = []
     80        def callback(t):
     81            collecting_list.append(True)
     82        timer = self.ctx.add_timer(0, lambda t: collecting_list.append(True))
     83        gc.collect()
     84        self.assertEqual(collecting_list, [])
     85        self.ctx.loop_once()
     86        self.assertEqual(collecting_list, [True])
     87
     88    def test_timer_deallocate_context(self):
     89        """Test timer is unscheduled when context is freed"""
     90        collecting_list = []
     91        def callback(t):
     92            collecting_list.append(True)
     93        timer = self.ctx.add_timer(0, lambda t: collecting_list.append(True))
     94        self.assertTrue(timer.active)
     95        del self.ctx
     96        gc.collect()
     97        self.assertEqual(collecting_list, [])
     98        self.assertFalse(timer.active)
     99
     100    def test_timer_offset(self):
     101        """Test scheduling timer with an offset"""
     102        collecting_list = []
     103        self.ctx.add_timer_offset(0.2, lambda t: collecting_list.append(2))
     104        self.ctx.add_timer_offset(0.1, lambda t: collecting_list.append(1))
     105        self.assertEqual(collecting_list, [])
     106        self.ctx.loop_once()
     107        self.assertEqual(collecting_list, [1])
     108        self.ctx.loop_once()
     109        self.assertEqual(collecting_list, [1, 2])
     110
     111if __name__ == '__main__':
     112    TestProgram()
Note: See TracChangeset for help on using the changeset viewer.