Changeset 391 for python/trunk/Lib/sqlite3/test
- Timestamp:
- Mar 19, 2014, 11:31:01 PM (11 years ago)
- Location:
- python/trunk
- Files:
-
- 10 edited
Legend:
- Unmodified
- Added
- Removed
-
python/trunk
-
Property svn:mergeinfo
set to
/python/vendor/Python-2.7.6 merged eligible /python/vendor/current merged eligible
-
Property svn:mergeinfo
set to
-
python/trunk/Lib/sqlite3/test/dbapi.py
r2 r391 2 2 # pysqlite2/test/dbapi.py: tests for DB-API compliance 3 3 # 4 # Copyright (C) 2004-20 07Gerhard Häring <gh@ghaering.de>4 # Copyright (C) 2004-2010 Gerhard Häring <gh@ghaering.de> 5 5 # 6 6 # This file is part of pysqlite. … … 24 24 import unittest 25 25 import sys 26 import threading27 26 import sqlite3 as sqlite 27 try: 28 import threading 29 except ImportError: 30 threading = None 28 31 29 32 class ModuleTests(unittest.TestCase): … … 42 45 43 46 def CheckWarning(self): 44 self.assert _(issubclass(sqlite.Warning, StandardError),45 "Warning is not a subclass of StandardError")47 self.assertTrue(issubclass(sqlite.Warning, StandardError), 48 "Warning is not a subclass of StandardError") 46 49 47 50 def CheckError(self): 48 self. failUnless(issubclass(sqlite.Error, StandardError),51 self.assertTrue(issubclass(sqlite.Error, StandardError), 49 52 "Error is not a subclass of StandardError") 50 53 51 54 def CheckInterfaceError(self): 52 self. failUnless(issubclass(sqlite.InterfaceError, sqlite.Error),55 self.assertTrue(issubclass(sqlite.InterfaceError, sqlite.Error), 53 56 "InterfaceError is not a subclass of Error") 54 57 55 58 def CheckDatabaseError(self): 56 self. failUnless(issubclass(sqlite.DatabaseError, sqlite.Error),59 self.assertTrue(issubclass(sqlite.DatabaseError, sqlite.Error), 57 60 "DatabaseError is not a subclass of Error") 58 61 59 62 def CheckDataError(self): 60 self. failUnless(issubclass(sqlite.DataError, sqlite.DatabaseError),63 self.assertTrue(issubclass(sqlite.DataError, sqlite.DatabaseError), 61 64 "DataError is not a subclass of DatabaseError") 62 65 63 66 def CheckOperationalError(self): 64 self. failUnless(issubclass(sqlite.OperationalError, sqlite.DatabaseError),67 self.assertTrue(issubclass(sqlite.OperationalError, sqlite.DatabaseError), 65 68 "OperationalError is not a subclass of DatabaseError") 66 69 67 70 def CheckIntegrityError(self): 68 self. failUnless(issubclass(sqlite.IntegrityError, sqlite.DatabaseError),71 self.assertTrue(issubclass(sqlite.IntegrityError, sqlite.DatabaseError), 69 72 "IntegrityError is not a subclass of DatabaseError") 70 73 71 74 def CheckInternalError(self): 72 self. failUnless(issubclass(sqlite.InternalError, sqlite.DatabaseError),75 self.assertTrue(issubclass(sqlite.InternalError, sqlite.DatabaseError), 73 76 "InternalError is not a subclass of DatabaseError") 74 77 75 78 def CheckProgrammingError(self): 76 self. failUnless(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError),79 self.assertTrue(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError), 77 80 "ProgrammingError is not a subclass of DatabaseError") 78 81 79 82 def CheckNotSupportedError(self): 80 self. failUnless(issubclass(sqlite.NotSupportedError,83 self.assertTrue(issubclass(sqlite.NotSupportedError, 81 84 sqlite.DatabaseError), 82 85 "NotSupportedError is not a subclass of DatabaseError") … … 128 131 def CheckExceptions(self): 129 132 # Optional DB-API extension. 130 self. failUnlessEqual(self.cx.Warning, sqlite.Warning)131 self. failUnlessEqual(self.cx.Error, sqlite.Error)132 self. failUnlessEqual(self.cx.InterfaceError, sqlite.InterfaceError)133 self. failUnlessEqual(self.cx.DatabaseError, sqlite.DatabaseError)134 self. failUnlessEqual(self.cx.DataError, sqlite.DataError)135 self. failUnlessEqual(self.cx.OperationalError, sqlite.OperationalError)136 self. failUnlessEqual(self.cx.IntegrityError, sqlite.IntegrityError)137 self. failUnlessEqual(self.cx.InternalError, sqlite.InternalError)138 self. failUnlessEqual(self.cx.ProgrammingError, sqlite.ProgrammingError)139 self. failUnlessEqual(self.cx.NotSupportedError, sqlite.NotSupportedError)133 self.assertEqual(self.cx.Warning, sqlite.Warning) 134 self.assertEqual(self.cx.Error, sqlite.Error) 135 self.assertEqual(self.cx.InterfaceError, sqlite.InterfaceError) 136 self.assertEqual(self.cx.DatabaseError, sqlite.DatabaseError) 137 self.assertEqual(self.cx.DataError, sqlite.DataError) 138 self.assertEqual(self.cx.OperationalError, sqlite.OperationalError) 139 self.assertEqual(self.cx.IntegrityError, sqlite.IntegrityError) 140 self.assertEqual(self.cx.InternalError, sqlite.InternalError) 141 self.assertEqual(self.cx.ProgrammingError, sqlite.ProgrammingError) 142 self.assertEqual(self.cx.NotSupportedError, sqlite.NotSupportedError) 140 143 141 144 class CursorTests(unittest.TestCase): … … 201 204 self.cu.execute("insert into test(name) values (?)", ("Hugo",)) 202 205 206 def CheckExecuteArgStringWithZeroByte(self): 207 self.cu.execute("insert into test(name) values (?)", ("Hu\x00go",)) 208 209 self.cu.execute("select name from test where id=?", (self.cu.lastrowid,)) 210 row = self.cu.fetchone() 211 self.assertEqual(row[0], "Hu\x00go") 212 203 213 def CheckExecuteWrongNoOfArgs1(self): 204 214 # too many parameters … … 229 239 self.cu.execute("select name from test where name=?", ["foo"]) 230 240 row = self.cu.fetchone() 231 self. failUnlessEqual(row[0], "foo")241 self.assertEqual(row[0], "foo") 232 242 233 243 def CheckExecuteParamSequence(self): … … 242 252 self.cu.execute("select name from test where name=?", L()) 243 253 row = self.cu.fetchone() 244 self. failUnlessEqual(row[0], "foo")254 self.assertEqual(row[0], "foo") 245 255 246 256 def CheckExecuteDictMapping(self): … … 248 258 self.cu.execute("select name from test where name=:name", {"name": "foo"}) 249 259 row = self.cu.fetchone() 250 self. failUnlessEqual(row[0], "foo")260 self.assertEqual(row[0], "foo") 251 261 252 262 def CheckExecuteDictMapping_Mapping(self): … … 262 272 self.cu.execute("select name from test where name=:name", D()) 263 273 row = self.cu.fetchone() 264 self. failUnlessEqual(row[0], "foo")274 self.assertEqual(row[0], "foo") 265 275 266 276 def CheckExecuteDictMappingTooLittleArgs(self): … … 296 306 self.cu.execute("insert into test(name) values ('foo')") 297 307 self.cu.execute("update test set name='bar'") 298 self. failUnlessEqual(self.cu.rowcount, 2)308 self.assertEqual(self.cu.rowcount, 2) 299 309 300 310 def CheckRowcountSelect(self): … … 305 315 """ 306 316 self.cu.execute("select 5 union select 6") 307 self. failUnlessEqual(self.cu.rowcount, -1)317 self.assertEqual(self.cu.rowcount, -1) 308 318 309 319 def CheckRowcountExecutemany(self): 310 320 self.cu.execute("delete from test") 311 321 self.cu.executemany("insert into test(name) values (?)", [(1,), (2,), (3,)]) 312 self. failUnlessEqual(self.cu.rowcount, 3)322 self.assertEqual(self.cu.rowcount, 3) 313 323 314 324 def CheckTotalChanges(self): … … 383 393 for row in self.cu: 384 394 lst.append(row[0]) 385 self. failUnlessEqual(lst[0], 5)386 self. failUnlessEqual(lst[1], 6)395 self.assertEqual(lst[0], 5) 396 self.assertEqual(lst[1], 6) 387 397 388 398 def CheckFetchone(self): 389 399 self.cu.execute("select name from test") 390 400 row = self.cu.fetchone() 391 self. failUnlessEqual(row[0], "foo")401 self.assertEqual(row[0], "foo") 392 402 row = self.cu.fetchone() 393 self. failUnlessEqual(row, None)403 self.assertEqual(row, None) 394 404 395 405 def CheckFetchoneNoStatement(self): 396 406 cur = self.cx.cursor() 397 407 row = cur.fetchone() 398 self. failUnlessEqual(row, None)408 self.assertEqual(row, None) 399 409 400 410 def CheckArraySize(self): 401 411 # must default ot 1 402 self. failUnlessEqual(self.cu.arraysize, 1)412 self.assertEqual(self.cu.arraysize, 1) 403 413 404 414 # now set to 2 … … 413 423 res = self.cu.fetchmany() 414 424 415 self. failUnlessEqual(len(res), 2)425 self.assertEqual(len(res), 2) 416 426 417 427 def CheckFetchmany(self): 418 428 self.cu.execute("select name from test") 419 429 res = self.cu.fetchmany(100) 420 self. failUnlessEqual(len(res), 1)430 self.assertEqual(len(res), 1) 421 431 res = self.cu.fetchmany(100) 422 self. failUnlessEqual(res, [])432 self.assertEqual(res, []) 423 433 424 434 def CheckFetchmanyKwArg(self): … … 426 436 self.cu.execute("select name from test") 427 437 res = self.cu.fetchmany(size=100) 428 self. failUnlessEqual(len(res), 1)438 self.assertEqual(len(res), 1) 429 439 430 440 def CheckFetchall(self): 431 441 self.cu.execute("select name from test") 432 442 res = self.cu.fetchall() 433 self. failUnlessEqual(len(res), 1)443 self.assertEqual(len(res), 1) 434 444 res = self.cu.fetchall() 435 self. failUnlessEqual(res, [])445 self.assertEqual(res, []) 436 446 437 447 def CheckSetinputsizes(self): … … 446 456 def CheckCursorConnection(self): 447 457 # Optional DB-API extension. 448 self. failUnlessEqual(self.cu.connection, self.cx)458 self.assertEqual(self.cu.connection, self.cx) 449 459 450 460 def CheckWrongCursorCallable(self): … … 466 476 pass 467 477 478 @unittest.skipUnless(threading, 'This test requires threading.') 468 479 class ThreadTests(unittest.TestCase): 469 480 def setUp(self): … … 657 668 cur.execute("select i from a") 658 669 res = cur.fetchone()[0] 659 self. failUnlessEqual(res, 5)670 self.assertEqual(res, 5) 660 671 661 672 def CheckScriptStringUnicode(self): … … 671 682 cur.execute("select i from a") 672 683 res = cur.fetchone()[0] 673 self. failUnlessEqual(res, 6)674 675 def CheckScript ErrorIncomplete(self):684 self.assertEqual(res, 6) 685 686 def CheckScriptSyntaxError(self): 676 687 con = sqlite.connect(":memory:") 677 688 cur = con.cursor() 678 689 raised = False 679 690 try: 680 cur.executescript("create table test( sadfsadfdsa")681 except sqlite. ProgrammingError:691 cur.executescript("create table test(x); asdf; create table test2(x)") 692 except sqlite.OperationalError: 682 693 raised = True 683 self. failUnlessEqual(raised, True, "should have raised an exception")694 self.assertEqual(raised, True, "should have raised an exception") 684 695 685 696 def CheckScriptErrorNormal(self): … … 691 702 except sqlite.OperationalError: 692 703 raised = True 693 self. failUnlessEqual(raised, True, "should have raised an exception")704 self.assertEqual(raised, True, "should have raised an exception") 694 705 695 706 def CheckConnectionExecute(self): 696 707 con = sqlite.connect(":memory:") 697 708 result = con.execute("select 5").fetchone()[0] 698 self. failUnlessEqual(result, 5, "Basic test of Connection.execute")709 self.assertEqual(result, 5, "Basic test of Connection.execute") 699 710 700 711 def CheckConnectionExecutemany(self): … … 703 714 con.executemany("insert into test(foo) values (?)", [(3,), (4,)]) 704 715 result = con.execute("select foo from test order by foo").fetchall() 705 self. failUnlessEqual(result[0][0], 3, "Basic test of Connection.executemany")706 self. failUnlessEqual(result[1][0], 4, "Basic test of Connection.executemany")716 self.assertEqual(result[0][0], 3, "Basic test of Connection.executemany") 717 self.assertEqual(result[1][0], 4, "Basic test of Connection.executemany") 707 718 708 719 def CheckConnectionExecutescript(self): … … 710 721 con.executescript("create table test(foo); insert into test(foo) values (5);") 711 722 result = con.execute("select foo from test").fetchone()[0] 712 self. failUnlessEqual(result, 5, "Basic test of Connection.executescript")713 714 class Closed Tests(unittest.TestCase):723 self.assertEqual(result, 5, "Basic test of Connection.executescript") 724 725 class ClosedConTests(unittest.TestCase): 715 726 def setUp(self): 716 727 pass … … 763 774 except: 764 775 self.fail("Should have raised a ProgrammingError") 765 766 776 767 777 def CheckClosedCreateFunction(self): … … 831 841 self.fail("Should have raised a ProgrammingError") 832 842 843 class ClosedCurTests(unittest.TestCase): 844 def setUp(self): 845 pass 846 847 def tearDown(self): 848 pass 849 850 def CheckClosed(self): 851 con = sqlite.connect(":memory:") 852 cur = con.cursor() 853 cur.close() 854 855 for method_name in ("execute", "executemany", "executescript", "fetchall", "fetchmany", "fetchone"): 856 if method_name in ("execute", "executescript"): 857 params = ("select 4 union select 5",) 858 elif method_name == "executemany": 859 params = ("insert into foo(bar) values (?)", [(3,), (4,)]) 860 else: 861 params = [] 862 863 try: 864 method = getattr(cur, method_name) 865 866 method(*params) 867 self.fail("Should have raised a ProgrammingError: method " + method_name) 868 except sqlite.ProgrammingError: 869 pass 870 except: 871 self.fail("Should have raised a ProgrammingError: " + method_name) 872 833 873 def suite(): 834 874 module_suite = unittest.makeSuite(ModuleTests, "Check") … … 838 878 constructor_suite = unittest.makeSuite(ConstructorTests, "Check") 839 879 ext_suite = unittest.makeSuite(ExtensionTests, "Check") 840 closed_suite = unittest.makeSuite(ClosedTests, "Check") 841 return unittest.TestSuite((module_suite, connection_suite, cursor_suite, thread_suite, constructor_suite, ext_suite, closed_suite)) 880 closed_con_suite = unittest.makeSuite(ClosedConTests, "Check") 881 closed_cur_suite = unittest.makeSuite(ClosedCurTests, "Check") 882 return unittest.TestSuite((module_suite, connection_suite, cursor_suite, thread_suite, constructor_suite, ext_suite, closed_con_suite, closed_cur_suite)) 842 883 843 884 def test(): -
python/trunk/Lib/sqlite3/test/dump.py
r2 r391 14 14 def CheckTableDump(self): 15 15 expected_sqls = [ 16 """CREATE TABLE "index"("index" blob);""" 17 , 18 """INSERT INTO "index" VALUES(X'01');""" 19 , 20 """CREATE TABLE "quoted""table"("quoted""field" text);""" 21 , 22 """INSERT INTO "quoted""table" VALUES('quoted''value');""" 23 , 16 24 "CREATE TABLE t1(id integer primary key, s1 text, " \ 17 25 "t1_i1 integer not null, i2 integer, unique (s1), " \ … … 21 29 , 22 30 "INSERT INTO \"t1\" VALUES(2,'foo2',30,30);" 31 , 32 u"INSERT INTO \"t1\" VALUES(3,'f\xc3\xb6',40,10);" 23 33 , 24 34 "CREATE TABLE t2(id integer, t2_i1 integer, " \ … … 42 52 for i in xrange(len(expected_sqls))] 43 53 54 def CheckUnorderableRow(self): 55 # iterdump() should be able to cope with unorderable row types (issue #15545) 56 class UnorderableRow: 57 def __init__(self, cursor, row): 58 self.row = row 59 def __getitem__(self, index): 60 return self.row[index] 61 self.cx.row_factory = UnorderableRow 62 CREATE_ALPHA = """CREATE TABLE "alpha" ("one");""" 63 CREATE_BETA = """CREATE TABLE "beta" ("two");""" 64 expected = [ 65 "BEGIN TRANSACTION;", 66 CREATE_ALPHA, 67 CREATE_BETA, 68 "COMMIT;" 69 ] 70 self.cu.execute(CREATE_BETA) 71 self.cu.execute(CREATE_ALPHA) 72 got = list(self.cx.iterdump()) 73 self.assertEqual(expected, got) 74 44 75 def suite(): 45 76 return unittest.TestSuite(unittest.makeSuite(DumpTests, "Check")) -
python/trunk/Lib/sqlite3/test/factory.py
r2 r391 48 48 49 49 def CheckIsInstance(self): 50 self. failUnless(isinstance(self.con,50 self.assertTrue(isinstance(self.con, 51 51 MyConnection), 52 52 "connection is not instance of MyConnection") … … 61 61 def CheckIsInstance(self): 62 62 cur = self.con.cursor(factory=MyCursor) 63 self. failUnless(isinstance(cur,63 self.assertTrue(isinstance(cur, 64 64 MyCursor), 65 65 "cursor is not instance of MyCursor") … … 73 73 cur.execute("select 4+5 as foo") 74 74 row = cur.fetchone() 75 self. failUnless(isinstance(row,75 self.assertTrue(isinstance(row, 76 76 dict), 77 77 "row is not instance of dict") … … 88 88 self.con.row_factory = lambda cur, row: list(row) 89 89 row = self.con.execute("select 1, 2").fetchone() 90 self. failUnless(isinstance(row,90 self.assertTrue(isinstance(row, 91 91 list), 92 92 "row is not instance of list") … … 95 95 self.con.row_factory = sqlite.Row 96 96 row = self.con.execute("select 1 as a, 2 as b").fetchone() 97 self. failUnless(isinstance(row,97 self.assertTrue(isinstance(row, 98 98 sqlite.Row), 99 99 "row is not instance of sqlite.Row") 100 100 101 101 col1, col2 = row["a"], row["b"] 102 self. failUnless(col1 == 1, "by name: wrong result for column 'a'")103 self. failUnless(col2 == 2, "by name: wrong result for column 'a'")102 self.assertTrue(col1 == 1, "by name: wrong result for column 'a'") 103 self.assertTrue(col2 == 2, "by name: wrong result for column 'a'") 104 104 105 105 col1, col2 = row["A"], row["B"] 106 self. failUnless(col1 == 1, "by name: wrong result for column 'A'")107 self. failUnless(col2 == 2, "by name: wrong result for column 'B'")106 self.assertTrue(col1 == 1, "by name: wrong result for column 'A'") 107 self.assertTrue(col2 == 2, "by name: wrong result for column 'B'") 108 108 109 109 col1, col2 = row[0], row[1] 110 self. failUnless(col1 == 1, "by index: wrong result for column 0")111 self. failUnless(col2 == 2, "by index: wrong result for column 1")110 self.assertTrue(col1 == 1, "by index: wrong result for column 0") 111 self.assertTrue(col2 == 2, "by index: wrong result for column 1") 112 112 113 113 def CheckSqliteRowIter(self): … … 129 129 row = self.con.execute("select 1 as a, 2 as b").fetchone() 130 130 d = dict(row) 131 self. failUnlessEqual(d["a"], row["a"])132 self. failUnlessEqual(d["b"], row["b"])131 self.assertEqual(d["a"], row["a"]) 132 self.assertEqual(d["b"], row["b"]) 133 133 134 134 def CheckSqliteRowHashCmp(self): … … 139 139 row_3 = self.con.execute("select 1 as a, 3 as b").fetchone() 140 140 141 self. failUnless(row_1 == row_1)142 self. failUnless(row_1 == row_2)143 self. failUnless(row_2 != row_3)144 145 self. failIf(row_1 != row_1)146 self. failIf(row_1 != row_2)147 self. failIf(row_2 == row_3)148 149 self. failUnlessEqual(row_1, row_2)150 self. failUnlessEqual(hash(row_1), hash(row_2))151 self. failIfEqual(row_1, row_3)152 self. failIfEqual(hash(row_1), hash(row_3))141 self.assertTrue(row_1 == row_1) 142 self.assertTrue(row_1 == row_2) 143 self.assertTrue(row_2 != row_3) 144 145 self.assertFalse(row_1 != row_1) 146 self.assertFalse(row_1 != row_2) 147 self.assertFalse(row_2 == row_3) 148 149 self.assertEqual(row_1, row_2) 150 self.assertEqual(hash(row_1), hash(row_2)) 151 self.assertNotEqual(row_1, row_3) 152 self.assertNotEqual(hash(row_1), hash(row_3)) 153 153 154 154 def tearDown(self): … … 162 162 austria = unicode("Österreich", "latin1") 163 163 row = self.con.execute("select ?", (austria,)).fetchone() 164 self. failUnless(type(row[0]) == unicode, "type of row[0] must be unicode")164 self.assertTrue(type(row[0]) == unicode, "type of row[0] must be unicode") 165 165 166 166 def CheckString(self): … … 168 168 austria = unicode("Österreich", "latin1") 169 169 row = self.con.execute("select ?", (austria,)).fetchone() 170 self. failUnless(type(row[0]) == str, "type of row[0] must be str")171 self. failUnless(row[0] == austria.encode("utf-8"), "column must equal original data in UTF-8")170 self.assertTrue(type(row[0]) == str, "type of row[0] must be str") 171 self.assertTrue(row[0] == austria.encode("utf-8"), "column must equal original data in UTF-8") 172 172 173 173 def CheckCustom(self): … … 175 175 austria = unicode("Österreich", "latin1") 176 176 row = self.con.execute("select ?", (austria.encode("latin1"),)).fetchone() 177 self. failUnless(type(row[0]) == unicode, "type of row[0] must be unicode")178 self. failUnless(row[0].endswith(u"reich"), "column must contain original data")177 self.assertTrue(type(row[0]) == unicode, "type of row[0] must be unicode") 178 self.assertTrue(row[0].endswith(u"reich"), "column must contain original data") 179 179 180 180 def CheckOptimizedUnicode(self): … … 184 184 a_row = self.con.execute("select ?", (austria,)).fetchone() 185 185 d_row = self.con.execute("select ?", (germany,)).fetchone() 186 self.failUnless(type(a_row[0]) == unicode, "type of non-ASCII row must be unicode") 187 self.failUnless(type(d_row[0]) == str, "type of ASCII-only row must be str") 186 self.assertTrue(type(a_row[0]) == unicode, "type of non-ASCII row must be unicode") 187 self.assertTrue(type(d_row[0]) == str, "type of ASCII-only row must be str") 188 189 def tearDown(self): 190 self.con.close() 191 192 class TextFactoryTestsWithEmbeddedZeroBytes(unittest.TestCase): 193 def setUp(self): 194 self.con = sqlite.connect(":memory:") 195 self.con.execute("create table test (value text)") 196 self.con.execute("insert into test (value) values (?)", ("a\x00b",)) 197 198 def CheckString(self): 199 # text_factory defaults to unicode 200 row = self.con.execute("select value from test").fetchone() 201 self.assertIs(type(row[0]), unicode) 202 self.assertEqual(row[0], "a\x00b") 203 204 def CheckCustom(self): 205 # A custom factory should receive an str argument 206 self.con.text_factory = lambda x: x 207 row = self.con.execute("select value from test").fetchone() 208 self.assertIs(type(row[0]), str) 209 self.assertEqual(row[0], "a\x00b") 210 211 def CheckOptimizedUnicodeAsString(self): 212 # ASCII -> str argument 213 self.con.text_factory = sqlite.OptimizedUnicode 214 row = self.con.execute("select value from test").fetchone() 215 self.assertIs(type(row[0]), str) 216 self.assertEqual(row[0], "a\x00b") 217 218 def CheckOptimizedUnicodeAsUnicode(self): 219 # Non-ASCII -> unicode argument 220 self.con.text_factory = sqlite.OptimizedUnicode 221 self.con.execute("delete from test") 222 self.con.execute("insert into test (value) values (?)", (u'ä\0ö',)) 223 row = self.con.execute("select value from test").fetchone() 224 self.assertIs(type(row[0]), unicode) 225 self.assertEqual(row[0], u"ä\x00ö") 188 226 189 227 def tearDown(self): … … 196 234 row_suite = unittest.makeSuite(RowFactoryTests, "Check") 197 235 text_suite = unittest.makeSuite(TextFactoryTests, "Check") 198 return unittest.TestSuite((connection_suite, cursor_suite, row_suite_compat, row_suite, text_suite)) 236 text_zero_bytes_suite = unittest.makeSuite(TextFactoryTestsWithEmbeddedZeroBytes, "Check") 237 return unittest.TestSuite((connection_suite, cursor_suite, row_suite_compat, row_suite, text_suite, text_zero_bytes_suite)) 199 238 200 239 def test(): -
python/trunk/Lib/sqlite3/test/hooks.py
r2 r391 38 38 self.fail("should have raised a TypeError") 39 39 except TypeError, e: 40 self. failUnlessEqual(e.args[0], "parameter must be callable")40 self.assertEqual(e.args[0], "parameter must be callable") 41 41 42 42 def CheckCreateCollationNotAscii(self): … … 75 75 self.fail("should have raised an OperationalError") 76 76 except sqlite.OperationalError, e: 77 self.failUnlessEqual(e.args[0].lower(), "no such collation sequence: mycoll") 77 self.assertEqual(e.args[0].lower(), "no such collation sequence: mycoll") 78 79 def CheckCollationReturnsLargeInteger(self): 80 def mycoll(x, y): 81 # reverse order 82 return -((x > y) - (x < y)) * 2**32 83 con = sqlite.connect(":memory:") 84 con.create_collation("mycoll", mycoll) 85 sql = """ 86 select x from ( 87 select 'a' as x 88 union 89 select 'b' as x 90 union 91 select 'c' as x 92 ) order by x collate mycoll 93 """ 94 result = con.execute(sql).fetchall() 95 self.assertEqual(result, [('c',), ('b',), ('a',)], 96 msg="the expected order was not returned") 78 97 79 98 def CheckCollationRegisterTwice(self): … … 120 139 create table foo(a, b) 121 140 """) 122 self. failUnless(progress_calls)141 self.assertTrue(progress_calls) 123 142 124 143 … … 144 163 """) 145 164 second_count = len(progress_calls) 146 self. failUnless(first_count > second_count)165 self.assertTrue(first_count > second_count) 147 166 148 167 def CheckCancelOperation(self): … … 167 186 """ 168 187 con = sqlite.connect(":memory:") 169 action = 0170 def progress(): 171 action = 1188 action = [] 189 def progress(): 190 action.append(1) 172 191 return 0 173 192 con.set_progress_handler(progress, 1) 174 193 con.set_progress_handler(None, 1) 175 194 con.execute("select 1 union select 2 union select 3").fetchall() 176 self. failUnlessEqual(action, 0, "progress handler was not cleared")195 self.assertEqual(len(action), 0, "progress handler was not cleared") 177 196 178 197 def suite(): -
python/trunk/Lib/sqlite3/test/py25tests.py
r2 r391 55 55 self.con.rollback() 56 56 count = self.con.execute("select count(*) from test").fetchone()[0] 57 self. failUnlessEqual(count, 1)57 self.assertEqual(count, 1) 58 58 59 59 def CheckContextManagerRollback(self): 60 60 """Is a rollback called in the context manager?""" 61 61 global did_rollback 62 self. failUnlessEqual(did_rollback, False)62 self.assertEqual(did_rollback, False) 63 63 try: 64 64 with self.con: … … 67 67 except sqlite.IntegrityError: 68 68 pass 69 self. failUnlessEqual(did_rollback, True)69 self.assertEqual(did_rollback, True) 70 70 71 71 def suite(): -
python/trunk/Lib/sqlite3/test/regression.py
r2 r391 1 #-*- coding: ISO-8859-1 -*-1 #-*- coding: iso-8859-1 -*- 2 2 # pysqlite2/test/regression.py: pysqlite regression tests 3 3 # … … 66 66 cur = self.con.cursor() 67 67 cur.execute('select 1 as "foo bar [datetime]"') 68 self. failUnlessEqual(cur.description[0][0], "foo bar")68 self.assertEqual(cur.description[0][0], "foo bar") 69 69 70 70 cur.execute('select 1 as "foo baz"') 71 self.failUnlessEqual(cur.description[0][0], "foo baz") 72 73 def CheckStatementAvailable(self): 74 # pysqlite up to 2.3.2 crashed on this, because the active statement handle was not checked 75 # before trying to fetch data from it. close() destroys the active statement ... 76 con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES) 77 cur = con.cursor() 78 cur.execute("select 4 union select 5") 79 cur.close() 80 cur.fetchone() 81 cur.fetchone() 71 self.assertEqual(cur.description[0][0], "foo baz") 82 72 83 73 def CheckStatementFinalizationOnCloseDb(self): … … 168 158 "isolation_level", u"\xe9") 169 159 160 def CheckCursorConstructorCallCheck(self): 161 """ 162 Verifies that cursor methods check whether base class __init__ was 163 called. 164 """ 165 class Cursor(sqlite.Cursor): 166 def __init__(self, con): 167 pass 168 169 con = sqlite.connect(":memory:") 170 cur = Cursor(con) 171 try: 172 cur.execute("select 4+5").fetchall() 173 self.fail("should have raised ProgrammingError") 174 except sqlite.ProgrammingError: 175 pass 176 except: 177 self.fail("should have raised ProgrammingError") 178 179 def CheckConnectionConstructorCallCheck(self): 180 """ 181 Verifies that connection methods check whether base class __init__ was 182 called. 183 """ 184 class Connection(sqlite.Connection): 185 def __init__(self, name): 186 pass 187 188 con = Connection(":memory:") 189 try: 190 cur = con.cursor() 191 self.fail("should have raised ProgrammingError") 192 except sqlite.ProgrammingError: 193 pass 194 except: 195 self.fail("should have raised ProgrammingError") 196 197 def CheckCursorRegistration(self): 198 """ 199 Verifies that subclassed cursor classes are correctly registered with 200 the connection object, too. (fetch-across-rollback problem) 201 """ 202 class Connection(sqlite.Connection): 203 def cursor(self): 204 return Cursor(self) 205 206 class Cursor(sqlite.Cursor): 207 def __init__(self, con): 208 sqlite.Cursor.__init__(self, con) 209 210 con = Connection(":memory:") 211 cur = con.cursor() 212 cur.execute("create table foo(x)") 213 cur.executemany("insert into foo(x) values (?)", [(3,), (4,), (5,)]) 214 cur.execute("select x from foo") 215 con.rollback() 216 try: 217 cur.fetchall() 218 self.fail("should have raised InterfaceError") 219 except sqlite.InterfaceError: 220 pass 221 except: 222 self.fail("should have raised InterfaceError") 223 224 def CheckAutoCommit(self): 225 """ 226 Verifies that creating a connection in autocommit mode works. 227 2.5.3 introduced a regression so that these could no longer 228 be created. 229 """ 230 con = sqlite.connect(":memory:", isolation_level=None) 231 232 def CheckPragmaAutocommit(self): 233 """ 234 Verifies that running a PRAGMA statement that does an autocommit does 235 work. This did not work in 2.5.3/2.5.4. 236 """ 237 cur = self.con.cursor() 238 cur.execute("create table foo(bar)") 239 cur.execute("insert into foo(bar) values (5)") 240 241 cur.execute("pragma page_size") 242 row = cur.fetchone() 243 244 def CheckSetDict(self): 245 """ 246 See http://bugs.python.org/issue7478 247 248 It was possible to successfully register callbacks that could not be 249 hashed. Return codes of PyDict_SetItem were not checked properly. 250 """ 251 class NotHashable: 252 def __call__(self, *args, **kw): 253 pass 254 def __hash__(self): 255 raise TypeError() 256 var = NotHashable() 257 self.assertRaises(TypeError, self.con.create_function, var) 258 self.assertRaises(TypeError, self.con.create_aggregate, var) 259 self.assertRaises(TypeError, self.con.set_authorizer, var) 260 self.assertRaises(TypeError, self.con.set_progress_handler, var) 261 262 def CheckConnectionCall(self): 263 """ 264 Call a connection with a non-string SQL request: check error handling 265 of the statement constructor. 266 """ 267 self.assertRaises(sqlite.Warning, self.con, 1) 268 269 def CheckRecursiveCursorUse(self): 270 """ 271 http://bugs.python.org/issue10811 272 273 Recursively using a cursor, such as when reusing it from a generator led to segfaults. 274 Now we catch recursive cursor usage and raise a ProgrammingError. 275 """ 276 con = sqlite.connect(":memory:") 277 278 cur = con.cursor() 279 cur.execute("create table a (bar)") 280 cur.execute("create table b (baz)") 281 282 def foo(): 283 cur.execute("insert into a (bar) values (?)", (1,)) 284 yield 1 285 286 with self.assertRaises(sqlite.ProgrammingError): 287 cur.executemany("insert into b (baz) values (?)", 288 ((i,) for i in foo())) 289 290 def CheckConvertTimestampMicrosecondPadding(self): 291 """ 292 http://bugs.python.org/issue14720 293 294 The microsecond parsing of convert_timestamp() should pad with zeros, 295 since the microsecond string "456" actually represents "456000". 296 """ 297 298 con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES) 299 cur = con.cursor() 300 cur.execute("CREATE TABLE t (x TIMESTAMP)") 301 302 # Microseconds should be 456000 303 cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.456')") 304 305 # Microseconds should be truncated to 123456 306 cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.123456789')") 307 308 cur.execute("SELECT * FROM t") 309 values = [x[0] for x in cur.fetchall()] 310 311 self.assertEqual(values, [ 312 datetime.datetime(2012, 4, 4, 15, 6, 0, 456000), 313 datetime.datetime(2012, 4, 4, 15, 6, 0, 123456), 314 ]) 315 170 316 171 317 def suite(): -
python/trunk/Lib/sqlite3/test/transactions.py
r2 r391 60 60 self.cur2.execute("select i from test") 61 61 res = self.cur2.fetchall() 62 self. failUnlessEqual(len(res), 1)62 self.assertEqual(len(res), 1) 63 63 64 64 def CheckInsertStartsTransaction(self): … … 67 67 self.cur2.execute("select i from test") 68 68 res = self.cur2.fetchall() 69 self. failUnlessEqual(len(res), 0)69 self.assertEqual(len(res), 0) 70 70 71 71 def CheckUpdateStartsTransaction(self): … … 76 76 self.cur2.execute("select i from test") 77 77 res = self.cur2.fetchone()[0] 78 self. failUnlessEqual(res, 5)78 self.assertEqual(res, 5) 79 79 80 80 def CheckDeleteStartsTransaction(self): … … 85 85 self.cur2.execute("select i from test") 86 86 res = self.cur2.fetchall() 87 self. failUnlessEqual(len(res), 1)87 self.assertEqual(len(res), 1) 88 88 89 89 def CheckReplaceStartsTransaction(self): … … 94 94 self.cur2.execute("select i from test") 95 95 res = self.cur2.fetchall() 96 self. failUnlessEqual(len(res), 1)97 self. failUnlessEqual(res[0][0], 5)96 self.assertEqual(len(res), 1) 97 self.assertEqual(res[0][0], 5) 98 98 99 99 def CheckToggleAutoCommit(self): … … 101 101 self.cur1.execute("insert into test(i) values (5)") 102 102 self.con1.isolation_level = None 103 self. failUnlessEqual(self.con1.isolation_level, None)104 self.cur2.execute("select i from test") 105 res = self.cur2.fetchall() 106 self. failUnlessEqual(len(res), 1)103 self.assertEqual(self.con1.isolation_level, None) 104 self.cur2.execute("select i from test") 105 res = self.cur2.fetchall() 106 self.assertEqual(len(res), 1) 107 107 108 108 self.con1.isolation_level = "DEFERRED" 109 self. failUnlessEqual(self.con1.isolation_level , "DEFERRED")110 self.cur1.execute("insert into test(i) values (5)") 111 self.cur2.execute("select i from test") 112 res = self.cur2.fetchall() 113 self. failUnlessEqual(len(res), 1)109 self.assertEqual(self.con1.isolation_level , "DEFERRED") 110 self.cur1.execute("insert into test(i) values (5)") 111 self.cur2.execute("select i from test") 112 res = self.cur2.fetchall() 113 self.assertEqual(len(res), 1) 114 114 115 115 def CheckRaiseTimeout(self): … … 149 149 self.con1.commit() 150 150 151 def CheckRollbackCursorConsistency(self): 152 """ 153 Checks if cursors on the connection are set into a "reset" state 154 when a rollback is done on the connection. 155 """ 156 con = sqlite.connect(":memory:") 157 cur = con.cursor() 158 cur.execute("create table test(x)") 159 cur.execute("insert into test(x) values (5)") 160 cur.execute("select 1 union select 2 union select 3") 161 162 con.rollback() 163 try: 164 cur.fetchall() 165 self.fail("InterfaceError should have been raised") 166 except sqlite.InterfaceError, e: 167 pass 168 except: 169 self.fail("InterfaceError should have been raised") 170 151 171 class SpecialCommandTests(unittest.TestCase): 152 172 def setUp(self): -
python/trunk/Lib/sqlite3/test/types.py
r2 r391 22 22 # 3. This notice may not be removed or altered from any source distribution. 23 23 24 import zlib,datetime24 import datetime 25 25 import unittest 26 26 import sqlite3 as sqlite 27 try: 28 import zlib 29 except ImportError: 30 zlib = None 31 27 32 28 33 class SqliteTypeTests(unittest.TestCase): … … 40 45 self.cur.execute("select s from test") 41 46 row = self.cur.fetchone() 42 self. failUnlessEqual(row[0], u"Österreich")47 self.assertEqual(row[0], u"Österreich") 43 48 44 49 def CheckSmallInt(self): … … 46 51 self.cur.execute("select i from test") 47 52 row = self.cur.fetchone() 48 self. failUnlessEqual(row[0], 42)53 self.assertEqual(row[0], 42) 49 54 50 55 def CheckLargeInt(self): … … 53 58 self.cur.execute("select i from test") 54 59 row = self.cur.fetchone() 55 self. failUnlessEqual(row[0], num)60 self.assertEqual(row[0], num) 56 61 57 62 def CheckFloat(self): … … 60 65 self.cur.execute("select f from test") 61 66 row = self.cur.fetchone() 62 self. failUnlessEqual(row[0], val)67 self.assertEqual(row[0], val) 63 68 64 69 def CheckBlob(self): … … 67 72 self.cur.execute("select b from test") 68 73 row = self.cur.fetchone() 69 self. failUnlessEqual(row[0], val)74 self.assertEqual(row[0], val) 70 75 71 76 def CheckUnicodeExecute(self): 72 77 self.cur.execute(u"select 'Österreich'") 73 78 row = self.cur.fetchone() 74 self.failUnlessEqual(row[0], u"Österreich") 79 self.assertEqual(row[0], u"Österreich") 80 81 def CheckNonUtf8_Default(self): 82 try: 83 self.cur.execute("select ?", (chr(150),)) 84 self.fail("should have raised a ProgrammingError") 85 except sqlite.ProgrammingError: 86 pass 87 88 def CheckNonUtf8_TextFactoryString(self): 89 orig_text_factory = self.con.text_factory 90 try: 91 self.con.text_factory = str 92 self.cur.execute("select ?", (chr(150),)) 93 finally: 94 self.con.text_factory = orig_text_factory 95 96 def CheckNonUtf8_TextFactoryOptimizedUnicode(self): 97 orig_text_factory = self.con.text_factory 98 try: 99 try: 100 self.con.text_factory = sqlite.OptimizedUnicode 101 self.cur.execute("select ?", (chr(150),)) 102 self.fail("should have raised a ProgrammingError") 103 except sqlite.ProgrammingError: 104 pass 105 finally: 106 self.con.text_factory = orig_text_factory 75 107 76 108 class DeclTypesTests(unittest.TestCase): … … 123 155 self.cur.execute('select s as "s [WRONG]" from test') 124 156 row = self.cur.fetchone() 125 self. failUnlessEqual(row[0], "foo")157 self.assertEqual(row[0], "foo") 126 158 127 159 def CheckSmallInt(self): … … 130 162 self.cur.execute("select i from test") 131 163 row = self.cur.fetchone() 132 self. failUnlessEqual(row[0], 42)164 self.assertEqual(row[0], 42) 133 165 134 166 def CheckLargeInt(self): … … 138 170 self.cur.execute("select i from test") 139 171 row = self.cur.fetchone() 140 self. failUnlessEqual(row[0], num)172 self.assertEqual(row[0], num) 141 173 142 174 def CheckFloat(self): … … 146 178 self.cur.execute("select f from test") 147 179 row = self.cur.fetchone() 148 self. failUnlessEqual(row[0], 47.2)180 self.assertEqual(row[0], 47.2) 149 181 150 182 def CheckBool(self): … … 153 185 self.cur.execute("select b from test") 154 186 row = self.cur.fetchone() 155 self. failUnlessEqual(row[0], False)187 self.assertEqual(row[0], False) 156 188 157 189 self.cur.execute("delete from test") … … 159 191 self.cur.execute("select b from test") 160 192 row = self.cur.fetchone() 161 self. failUnlessEqual(row[0], True)193 self.assertEqual(row[0], True) 162 194 163 195 def CheckUnicode(self): … … 167 199 self.cur.execute("select u from test") 168 200 row = self.cur.fetchone() 169 self. failUnlessEqual(row[0], val)201 self.assertEqual(row[0], val) 170 202 171 203 def CheckFoo(self): … … 174 206 self.cur.execute("select foo from test") 175 207 row = self.cur.fetchone() 176 self. failUnlessEqual(row[0], val)208 self.assertEqual(row[0], val) 177 209 178 210 def CheckUnsupportedSeq(self): … … 204 236 self.cur.execute("select bin from test") 205 237 row = self.cur.fetchone() 206 self. failUnlessEqual(row[0], val)238 self.assertEqual(row[0], val) 207 239 208 240 def CheckNumber1(self): … … 210 242 value = self.cur.execute("select n1 from test").fetchone()[0] 211 243 # if the converter is not used, it's an int instead of a float 212 self. failUnlessEqual(type(value), float)244 self.assertEqual(type(value), float) 213 245 214 246 def CheckNumber2(self): 215 """Checks w ether converter names are cut off at '(' characters"""247 """Checks whether converter names are cut off at '(' characters""" 216 248 self.cur.execute("insert into test(n2) values (5)") 217 249 value = self.cur.execute("select n2 from test").fetchone()[0] 218 250 # if the converter is not used, it's an int instead of a float 219 self. failUnlessEqual(type(value), float)251 self.assertEqual(type(value), float) 220 252 221 253 class ColNamesTests(unittest.TestCase): … … 227 259 sqlite.converters["FOO"] = lambda x: "[%s]" % x 228 260 sqlite.converters["BAR"] = lambda x: "<%s>" % x 229 sqlite.converters["EXC"] = lambda x: 5 /0261 sqlite.converters["EXC"] = lambda x: 5 // 0 230 262 sqlite.converters["B1B1"] = lambda x: "MARKER" 231 263 … … 246 278 self.cur.execute("select x from test") 247 279 val = self.cur.fetchone()[0] 248 self. failUnlessEqual(val, "xxx")280 self.assertEqual(val, "xxx") 249 281 250 282 def CheckNone(self): … … 252 284 self.cur.execute("select x from test") 253 285 val = self.cur.fetchone()[0] 254 self. failUnlessEqual(val, None)286 self.assertEqual(val, None) 255 287 256 288 def CheckColName(self): … … 258 290 self.cur.execute('select x as "x [bar]" from test') 259 291 val = self.cur.fetchone()[0] 260 self. failUnlessEqual(val, "<xxx>")292 self.assertEqual(val, "<xxx>") 261 293 262 294 # Check if the stripping of colnames works. Everything after the first 263 295 # whitespace should be stripped. 264 self. failUnlessEqual(self.cur.description[0][0], "x")296 self.assertEqual(self.cur.description[0][0], "x") 265 297 266 298 def CheckCaseInConverterName(self): 267 299 self.cur.execute("""select 'other' as "x [b1b1]\"""") 268 300 val = self.cur.fetchone()[0] 269 self. failUnlessEqual(val, "MARKER")301 self.assertEqual(val, "MARKER") 270 302 271 303 def CheckCursorDescriptionNoRow(self): … … 275 307 """ 276 308 self.cur.execute("select * from test where 0 = 1") 277 self.assert _(self.cur.description[0][0] =="x")309 self.assertEqual(self.cur.description[0][0], "x") 278 310 279 311 class ObjectAdaptationTests(unittest.TestCase): … … 299 331 self.cur.execute("select ?", (4,)) 300 332 val = self.cur.fetchone()[0] 301 self.failUnlessEqual(type(val), float) 302 333 self.assertEqual(type(val), float) 334 335 @unittest.skipUnless(zlib, "requires zlib") 303 336 class BinaryConverterTests(unittest.TestCase): 304 337 def convert(s): … … 316 349 testdata = "abcdefg" * 10 317 350 result = self.con.execute('select ? as "x [bin]"', (buffer(zlib.compress(testdata)),)).fetchone()[0] 318 self. failUnlessEqual(testdata, result)351 self.assertEqual(testdata, result) 319 352 320 353 class DateTimeTests(unittest.TestCase): … … 333 366 self.cur.execute("select d from test") 334 367 d2 = self.cur.fetchone()[0] 335 self. failUnlessEqual(d, d2)368 self.assertEqual(d, d2) 336 369 337 370 def CheckSqliteTimestamp(self): … … 340 373 self.cur.execute("select ts from test") 341 374 ts2 = self.cur.fetchone()[0] 342 self. failUnlessEqual(ts, ts2)375 self.assertEqual(ts, ts2) 343 376 344 377 def CheckSqlTimestamp(self): … … 352 385 self.cur.execute("select ts from test") 353 386 ts = self.cur.fetchone()[0] 354 self. failUnlessEqual(type(ts), datetime.datetime)355 self. failUnlessEqual(ts.year, now.year)387 self.assertEqual(type(ts), datetime.datetime) 388 self.assertEqual(ts.year, now.year) 356 389 357 390 def CheckDateTimeSubSeconds(self): … … 360 393 self.cur.execute("select ts from test") 361 394 ts2 = self.cur.fetchone()[0] 362 self. failUnlessEqual(ts, ts2)395 self.assertEqual(ts, ts2) 363 396 364 397 def CheckDateTimeSubSecondsFloatingPoint(self): … … 367 400 self.cur.execute("select ts from test") 368 401 ts2 = self.cur.fetchone()[0] 369 self. failUnlessEqual(ts, ts2)402 self.assertEqual(ts, ts2) 370 403 371 404 def suite(): -
python/trunk/Lib/sqlite3/test/userfunctions.py
r2 r391 38 38 def func_returnblob(): 39 39 return buffer("blob") 40 def func_returnlonglong(): 41 return 1<<31 40 42 def func_raiseexception(): 41 5 /043 5 // 0 42 44 43 45 def func_isstring(v): … … 51 53 def func_isblob(v): 52 54 return type(v) is buffer 55 def func_islonglong(v): 56 return isinstance(v, (int, long)) and v >= 1<<31 53 57 54 58 class AggrNoStep: … … 68 72 class AggrExceptionInInit: 69 73 def __init__(self): 70 5 /074 5 // 0 71 75 72 76 def step(self, x): … … 81 85 82 86 def step(self, x): 83 5 /087 5 // 0 84 88 85 89 def finalize(self): … … 94 98 95 99 def finalize(self): 96 5 /0100 5 // 0 97 101 98 102 class AggrCheckType: … … 127 131 self.con.create_function("returnnull", 0, func_returnnull) 128 132 self.con.create_function("returnblob", 0, func_returnblob) 133 self.con.create_function("returnlonglong", 0, func_returnlonglong) 129 134 self.con.create_function("raiseexception", 0, func_raiseexception) 130 135 … … 134 139 self.con.create_function("isnone", 1, func_isnone) 135 140 self.con.create_function("isblob", 1, func_isblob) 141 self.con.create_function("islonglong", 1, func_islonglong) 136 142 137 143 def tearDown(self): … … 161 167 cur.execute("select returntext()") 162 168 val = cur.fetchone()[0] 163 self. failUnlessEqual(type(val), unicode)164 self. failUnlessEqual(val, "foo")169 self.assertEqual(type(val), unicode) 170 self.assertEqual(val, "foo") 165 171 166 172 def CheckFuncReturnUnicode(self): … … 168 174 cur.execute("select returnunicode()") 169 175 val = cur.fetchone()[0] 170 self. failUnlessEqual(type(val), unicode)171 self. failUnlessEqual(val, u"bar")176 self.assertEqual(type(val), unicode) 177 self.assertEqual(val, u"bar") 172 178 173 179 def CheckFuncReturnInt(self): … … 175 181 cur.execute("select returnint()") 176 182 val = cur.fetchone()[0] 177 self. failUnlessEqual(type(val), int)178 self. failUnlessEqual(val, 42)183 self.assertEqual(type(val), int) 184 self.assertEqual(val, 42) 179 185 180 186 def CheckFuncReturnFloat(self): … … 182 188 cur.execute("select returnfloat()") 183 189 val = cur.fetchone()[0] 184 self. failUnlessEqual(type(val), float)190 self.assertEqual(type(val), float) 185 191 if val < 3.139 or val > 3.141: 186 192 self.fail("wrong value") … … 190 196 cur.execute("select returnnull()") 191 197 val = cur.fetchone()[0] 192 self. failUnlessEqual(type(val), type(None))193 self. failUnlessEqual(val, None)198 self.assertEqual(type(val), type(None)) 199 self.assertEqual(val, None) 194 200 195 201 def CheckFuncReturnBlob(self): … … 197 203 cur.execute("select returnblob()") 198 204 val = cur.fetchone()[0] 199 self.failUnlessEqual(type(val), buffer) 200 self.failUnlessEqual(val, buffer("blob")) 205 self.assertEqual(type(val), buffer) 206 self.assertEqual(val, buffer("blob")) 207 208 def CheckFuncReturnLongLong(self): 209 cur = self.con.cursor() 210 cur.execute("select returnlonglong()") 211 val = cur.fetchone()[0] 212 self.assertEqual(val, 1<<31) 201 213 202 214 def CheckFuncException(self): … … 207 219 self.fail("should have raised OperationalError") 208 220 except sqlite.OperationalError, e: 209 self. failUnlessEqual(e.args[0], 'user-defined function raised exception')221 self.assertEqual(e.args[0], 'user-defined function raised exception') 210 222 211 223 def CheckParamString(self): … … 213 225 cur.execute("select isstring(?)", ("foo",)) 214 226 val = cur.fetchone()[0] 215 self. failUnlessEqual(val, 1)227 self.assertEqual(val, 1) 216 228 217 229 def CheckParamInt(self): … … 219 231 cur.execute("select isint(?)", (42,)) 220 232 val = cur.fetchone()[0] 221 self. failUnlessEqual(val, 1)233 self.assertEqual(val, 1) 222 234 223 235 def CheckParamFloat(self): … … 225 237 cur.execute("select isfloat(?)", (3.14,)) 226 238 val = cur.fetchone()[0] 227 self. failUnlessEqual(val, 1)239 self.assertEqual(val, 1) 228 240 229 241 def CheckParamNone(self): … … 231 243 cur.execute("select isnone(?)", (None,)) 232 244 val = cur.fetchone()[0] 233 self. failUnlessEqual(val, 1)245 self.assertEqual(val, 1) 234 246 235 247 def CheckParamBlob(self): … … 237 249 cur.execute("select isblob(?)", (buffer("blob"),)) 238 250 val = cur.fetchone()[0] 239 self.failUnlessEqual(val, 1) 251 self.assertEqual(val, 1) 252 253 def CheckParamLongLong(self): 254 cur = self.con.cursor() 255 cur.execute("select islonglong(?)", (1<<42,)) 256 val = cur.fetchone()[0] 257 self.assertEqual(val, 1) 240 258 241 259 class AggregateTests(unittest.TestCase): … … 281 299 self.fail("should have raised an AttributeError") 282 300 except AttributeError, e: 283 self. failUnlessEqual(e.args[0], "AggrNoStep instance has no attribute 'step'")301 self.assertEqual(e.args[0], "AggrNoStep instance has no attribute 'step'") 284 302 285 303 def CheckAggrNoFinalize(self): … … 290 308 self.fail("should have raised an OperationalError") 291 309 except sqlite.OperationalError, e: 292 self. failUnlessEqual(e.args[0], "user-defined aggregate's 'finalize' method raised error")310 self.assertEqual(e.args[0], "user-defined aggregate's 'finalize' method raised error") 293 311 294 312 def CheckAggrExceptionInInit(self): … … 299 317 self.fail("should have raised an OperationalError") 300 318 except sqlite.OperationalError, e: 301 self. failUnlessEqual(e.args[0], "user-defined aggregate's '__init__' method raised error")319 self.assertEqual(e.args[0], "user-defined aggregate's '__init__' method raised error") 302 320 303 321 def CheckAggrExceptionInStep(self): … … 308 326 self.fail("should have raised an OperationalError") 309 327 except sqlite.OperationalError, e: 310 self. failUnlessEqual(e.args[0], "user-defined aggregate's 'step' method raised error")328 self.assertEqual(e.args[0], "user-defined aggregate's 'step' method raised error") 311 329 312 330 def CheckAggrExceptionInFinalize(self): … … 317 335 self.fail("should have raised an OperationalError") 318 336 except sqlite.OperationalError, e: 319 self. failUnlessEqual(e.args[0], "user-defined aggregate's 'finalize' method raised error")337 self.assertEqual(e.args[0], "user-defined aggregate's 'finalize' method raised error") 320 338 321 339 def CheckAggrCheckParamStr(self): … … 323 341 cur.execute("select checkType('str', ?)", ("foo",)) 324 342 val = cur.fetchone()[0] 325 self. failUnlessEqual(val, 1)343 self.assertEqual(val, 1) 326 344 327 345 def CheckAggrCheckParamInt(self): … … 329 347 cur.execute("select checkType('int', ?)", (42,)) 330 348 val = cur.fetchone()[0] 331 self. failUnlessEqual(val, 1)349 self.assertEqual(val, 1) 332 350 333 351 def CheckAggrCheckParamFloat(self): … … 335 353 cur.execute("select checkType('float', ?)", (3.14,)) 336 354 val = cur.fetchone()[0] 337 self. failUnlessEqual(val, 1)355 self.assertEqual(val, 1) 338 356 339 357 def CheckAggrCheckParamNone(self): … … 341 359 cur.execute("select checkType('None', ?)", (None,)) 342 360 val = cur.fetchone()[0] 343 self. failUnlessEqual(val, 1)361 self.assertEqual(val, 1) 344 362 345 363 def CheckAggrCheckParamBlob(self): … … 347 365 cur.execute("select checkType('blob', ?)", (buffer("blob"),)) 348 366 val = cur.fetchone()[0] 349 self. failUnlessEqual(val, 1)367 self.assertEqual(val, 1) 350 368 351 369 def CheckAggrCheckAggrSum(self): … … 355 373 cur.execute("select mysum(i) from test") 356 374 val = cur.fetchone()[0] 357 self.failUnlessEqual(val, 60) 358 359 def authorizer_cb(action, arg1, arg2, dbname, source): 360 if action != sqlite.SQLITE_SELECT: 361 return sqlite.SQLITE_DENY 362 if arg2 == 'c2' or arg1 == 't2': 363 return sqlite.SQLITE_DENY 364 return sqlite.SQLITE_OK 375 self.assertEqual(val, 60) 365 376 366 377 class AuthorizerTests(unittest.TestCase): 378 @staticmethod 379 def authorizer_cb(action, arg1, arg2, dbname, source): 380 if action != sqlite.SQLITE_SELECT: 381 return sqlite.SQLITE_DENY 382 if arg2 == 'c2' or arg1 == 't2': 383 return sqlite.SQLITE_DENY 384 return sqlite.SQLITE_OK 385 367 386 def setUp(self): 368 387 self.con = sqlite.connect(":memory:") … … 377 396 self.con.execute("select c2 from t2") 378 397 379 self.con.set_authorizer( authorizer_cb)398 self.con.set_authorizer(self.authorizer_cb) 380 399 381 400 def tearDown(self): 382 401 pass 383 402 384 def CheckTableAccess(self):403 def test_table_access(self): 385 404 try: 386 405 self.con.execute("select * from t2") … … 391 410 self.fail("should have raised an exception due to missing privileges") 392 411 393 def CheckColumnAccess(self):412 def test_column_access(self): 394 413 try: 395 414 self.con.execute("select c2 from t1") … … 400 419 self.fail("should have raised an exception due to missing privileges") 401 420 421 class AuthorizerRaiseExceptionTests(AuthorizerTests): 422 @staticmethod 423 def authorizer_cb(action, arg1, arg2, dbname, source): 424 if action != sqlite.SQLITE_SELECT: 425 raise ValueError 426 if arg2 == 'c2' or arg1 == 't2': 427 raise ValueError 428 return sqlite.SQLITE_OK 429 430 class AuthorizerIllegalTypeTests(AuthorizerTests): 431 @staticmethod 432 def authorizer_cb(action, arg1, arg2, dbname, source): 433 if action != sqlite.SQLITE_SELECT: 434 return 0.0 435 if arg2 == 'c2' or arg1 == 't2': 436 return 0.0 437 return sqlite.SQLITE_OK 438 439 class AuthorizerLargeIntegerTests(AuthorizerTests): 440 @staticmethod 441 def authorizer_cb(action, arg1, arg2, dbname, source): 442 if action != sqlite.SQLITE_SELECT: 443 return 2**32 444 if arg2 == 'c2' or arg1 == 't2': 445 return 2**32 446 return sqlite.SQLITE_OK 447 448 402 449 def suite(): 403 450 function_suite = unittest.makeSuite(FunctionTests, "Check") 404 451 aggregate_suite = unittest.makeSuite(AggregateTests, "Check") 405 authorizer_suite = unittest.makeSuite(AuthorizerTests, "Check") 406 return unittest.TestSuite((function_suite, aggregate_suite, authorizer_suite)) 452 authorizer_suite = unittest.makeSuite(AuthorizerTests) 453 return unittest.TestSuite(( 454 function_suite, 455 aggregate_suite, 456 authorizer_suite, 457 unittest.makeSuite(AuthorizerRaiseExceptionTests), 458 unittest.makeSuite(AuthorizerIllegalTypeTests), 459 unittest.makeSuite(AuthorizerLargeIntegerTests), 460 )) 407 461 408 462 def test():
Note:
See TracChangeset
for help on using the changeset viewer.