source: python/trunk/Lib/test/test_kqueue.py

Last change on this file was 391, checked in by dmik, 11 years ago

python: Merge vendor 2.7.6 to trunk.

  • Property svn:eol-style set to native
File size: 6.5 KB
Line 
1"""
2Tests for kqueue wrapper.
3"""
4import socket
5import errno
6import time
7import select
8import sys
9import unittest
10
11from test import test_support
12if not hasattr(select, "kqueue"):
13 raise unittest.SkipTest("test works only on BSD")
14
15class TestKQueue(unittest.TestCase):
16 def test_create_queue(self):
17 kq = select.kqueue()
18 self.assertTrue(kq.fileno() > 0, kq.fileno())
19 self.assertTrue(not kq.closed)
20 kq.close()
21 self.assertTrue(kq.closed)
22 self.assertRaises(ValueError, kq.fileno)
23
24 def test_create_event(self):
25 fd = sys.stderr.fileno()
26 ev = select.kevent(fd)
27 other = select.kevent(1000)
28 self.assertEqual(ev.ident, fd)
29 self.assertEqual(ev.filter, select.KQ_FILTER_READ)
30 self.assertEqual(ev.flags, select.KQ_EV_ADD)
31 self.assertEqual(ev.fflags, 0)
32 self.assertEqual(ev.data, 0)
33 self.assertEqual(ev.udata, 0)
34 self.assertEqual(ev, ev)
35 self.assertNotEqual(ev, other)
36 self.assertEqual(cmp(ev, other), -1)
37 self.assertTrue(ev < other)
38 self.assertTrue(other >= ev)
39 self.assertRaises(TypeError, cmp, ev, None)
40 self.assertRaises(TypeError, cmp, ev, 1)
41 self.assertRaises(TypeError, cmp, ev, "ev")
42
43 ev = select.kevent(fd, select.KQ_FILTER_WRITE)
44 self.assertEqual(ev.ident, fd)
45 self.assertEqual(ev.filter, select.KQ_FILTER_WRITE)
46 self.assertEqual(ev.flags, select.KQ_EV_ADD)
47 self.assertEqual(ev.fflags, 0)
48 self.assertEqual(ev.data, 0)
49 self.assertEqual(ev.udata, 0)
50 self.assertEqual(ev, ev)
51 self.assertNotEqual(ev, other)
52
53 ev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ONESHOT)
54 self.assertEqual(ev.ident, fd)
55 self.assertEqual(ev.filter, select.KQ_FILTER_WRITE)
56 self.assertEqual(ev.flags, select.KQ_EV_ONESHOT)
57 self.assertEqual(ev.fflags, 0)
58 self.assertEqual(ev.data, 0)
59 self.assertEqual(ev.udata, 0)
60 self.assertEqual(ev, ev)
61 self.assertNotEqual(ev, other)
62
63 ev = select.kevent(1, 2, 3, 4, 5, 6)
64 self.assertEqual(ev.ident, 1)
65 self.assertEqual(ev.filter, 2)
66 self.assertEqual(ev.flags, 3)
67 self.assertEqual(ev.fflags, 4)
68 self.assertEqual(ev.data, 5)
69 self.assertEqual(ev.udata, 6)
70 self.assertEqual(ev, ev)
71 self.assertNotEqual(ev, other)
72
73 bignum = 0x7fff
74 ev = select.kevent(bignum, 1, 2, 3, bignum - 1, bignum)
75 self.assertEqual(ev.ident, bignum)
76 self.assertEqual(ev.filter, 1)
77 self.assertEqual(ev.flags, 2)
78 self.assertEqual(ev.fflags, 3)
79 self.assertEqual(ev.data, bignum - 1)
80 self.assertEqual(ev.udata, bignum)
81 self.assertEqual(ev, ev)
82 self.assertNotEqual(ev, other)
83
84 def test_queue_event(self):
85 serverSocket = socket.socket()
86 serverSocket.bind(('127.0.0.1', 0))
87 serverSocket.listen(1)
88 client = socket.socket()
89 client.setblocking(False)
90 try:
91 client.connect(('127.0.0.1', serverSocket.getsockname()[1]))
92 except socket.error, e:
93 self.assertEqual(e.args[0], errno.EINPROGRESS)
94 else:
95 #raise AssertionError("Connect should have raised EINPROGRESS")
96 pass # FreeBSD doesn't raise an exception here
97 server, addr = serverSocket.accept()
98
99 kq = select.kqueue()
100 kq2 = select.kqueue.fromfd(kq.fileno())
101
102 ev = select.kevent(server.fileno(),
103 select.KQ_FILTER_WRITE,
104 select.KQ_EV_ADD | select.KQ_EV_ENABLE)
105 kq.control([ev], 0)
106 ev = select.kevent(server.fileno(),
107 select.KQ_FILTER_READ,
108 select.KQ_EV_ADD | select.KQ_EV_ENABLE)
109 kq.control([ev], 0)
110 ev = select.kevent(client.fileno(),
111 select.KQ_FILTER_WRITE,
112 select.KQ_EV_ADD | select.KQ_EV_ENABLE)
113 kq2.control([ev], 0)
114 ev = select.kevent(client.fileno(),
115 select.KQ_FILTER_READ,
116 select.KQ_EV_ADD | select.KQ_EV_ENABLE)
117 kq2.control([ev], 0)
118
119 events = kq.control(None, 4, 1)
120 events = set((e.ident, e.filter) for e in events)
121 self.assertEqual(events, set([
122 (client.fileno(), select.KQ_FILTER_WRITE),
123 (server.fileno(), select.KQ_FILTER_WRITE)]))
124
125 client.send("Hello!")
126 server.send("world!!!")
127
128 # We may need to call it several times
129 for i in range(10):
130 events = kq.control(None, 4, 1)
131 if len(events) == 4:
132 break
133 time.sleep(1.0)
134 else:
135 self.fail('timeout waiting for event notifications')
136
137 events = set((e.ident, e.filter) for e in events)
138 self.assertEqual(events, set([
139 (client.fileno(), select.KQ_FILTER_WRITE),
140 (client.fileno(), select.KQ_FILTER_READ),
141 (server.fileno(), select.KQ_FILTER_WRITE),
142 (server.fileno(), select.KQ_FILTER_READ)]))
143
144 # Remove completely client, and server read part
145 ev = select.kevent(client.fileno(),
146 select.KQ_FILTER_WRITE,
147 select.KQ_EV_DELETE)
148 kq.control([ev], 0)
149 ev = select.kevent(client.fileno(),
150 select.KQ_FILTER_READ,
151 select.KQ_EV_DELETE)
152 kq.control([ev], 0)
153 ev = select.kevent(server.fileno(),
154 select.KQ_FILTER_READ,
155 select.KQ_EV_DELETE)
156 kq.control([ev], 0, 0)
157
158 events = kq.control([], 4, 0.99)
159 events = set((e.ident, e.filter) for e in events)
160 self.assertEqual(events, set([
161 (server.fileno(), select.KQ_FILTER_WRITE)]))
162
163 client.close()
164 server.close()
165 serverSocket.close()
166
167 def testPair(self):
168 kq = select.kqueue()
169 a, b = socket.socketpair()
170
171 a.send(b'foo')
172 event1 = select.kevent(a, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE)
173 event2 = select.kevent(b, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE)
174 r = kq.control([event1, event2], 1, 1)
175 self.assertTrue(r)
176 self.assertFalse(r[0].flags & select.KQ_EV_ERROR)
177 self.assertEqual(b.recv(r[0].data), b'foo')
178
179 a.close()
180 b.close()
181 kq.close()
182
183def test_main():
184 test_support.run_unittest(TestKQueue)
185
186if __name__ == "__main__":
187 test_main()
Note: See TracBrowser for help on using the repository browser.