Changeset 988 for vendor/current/source4/dsdb/tests
- Timestamp:
- Nov 24, 2016, 1:14:11 PM (9 years ago)
- Location:
- vendor/current/source4/dsdb/tests/python
- Files:
-
- 5 added
- 11 edited
Legend:
- Unmodified
- Added
- Removed
-
vendor/current/source4/dsdb/tests/python/acl.py
r740 r988 9 9 sys.path.insert(0, "bin/python") 10 10 import samba 11 samba.ensure_external_module("testtools", "testtools") 12 samba.ensure_external_module("subunit", "subunit/python") 11 12 from samba.tests.subunitrun import SubunitOptions, TestProgram 13 13 14 14 import samba.getopt as options … … 21 21 from ldb import ERR_OPERATIONS_ERROR 22 22 from ldb import Message, MessageElement, Dn 23 from ldb import FLAG_MOD_REPLACE, FLAG_MOD_ADD 23 from ldb import FLAG_MOD_REPLACE, FLAG_MOD_ADD, FLAG_MOD_DELETE 24 24 from samba.dcerpc import security, drsuapi, misc 25 25 … … 27 27 from samba import gensec, sd_utils 28 28 from samba.samdb import SamDB 29 from samba.credentials import Credentials 29 from samba.credentials import Credentials, DONT_USE_KERBEROS 30 30 import samba.tests 31 31 from samba.tests import delete_force 32 from subunit.run import SubunitTestRunner 33 import unittest 32 import samba.dsdb 34 33 35 34 parser = optparse.OptionParser("acl.py [options] <host>") … … 41 40 credopts = options.CredentialsOptions(parser) 42 41 parser.add_option_group(credopts) 42 subunitopts = SubunitOptions(parser) 43 parser.add_option_group(subunitopts) 44 43 45 opts, args = parser.parse_args() 44 46 … … 67 69 def setUp(self): 68 70 super(AclTests, self).setUp() 69 self.ldb_admin = ldb70 self.base_dn = ldb.domain_dn()71 self.domain_sid = security.dom_sid( ldb.get_domain_sid())71 self.ldb_admin = SamDB(ldaphost, credentials=creds, session_info=system_session(lp), lp=lp) 72 self.base_dn = self.ldb_admin.domain_dn() 73 self.domain_sid = security.dom_sid(self.ldb_admin.get_domain_sid()) 72 74 self.user_pass = "samba123@" 73 75 self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized() 74 self.sd_utils = sd_utils.SDUtils( ldb)76 self.sd_utils = sd_utils.SDUtils(self.ldb_admin) 75 77 #used for anonymous login 76 78 self.creds_tmp = Credentials() … … 94 96 creds_tmp.set_gensec_features(creds_tmp.get_gensec_features() 95 97 | gensec.FEATURE_SEAL) 98 creds_tmp.set_kerberos_state(DONT_USE_KERBEROS) # kinit is too expensive to use in a tight loop 96 99 ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp) 97 100 return ldb_target … … 127 130 128 131 # add admins to the Domain Admins group 129 self.ldb_admin.add_remove_group_members("Domain Admins", self.usr_admin_owner,132 self.ldb_admin.add_remove_group_members("Domain Admins", [self.usr_admin_owner], 130 133 add_members_operation=True) 131 self.ldb_admin.add_remove_group_members("Domain Admins", self.usr_admin_not_owner,134 self.ldb_admin.add_remove_group_members("Domain Admins", [self.usr_admin_not_owner], 132 135 add_members_operation=True) 133 136 … … 168 171 self.ldb_notowner.newuser(self.test_user1, self.user_pass, userou=self.ou2) 169 172 self.ldb_notowner.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1", 170 grouptype= 4)173 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP) 171 174 # Make sure we HAVE created the two objects -- user and group 172 175 # !!! We should not be able to do that, but however beacuse of ACE ordering our inherited Deny ACE … … 187 190 self.ldb_user.newuser(self.test_user1, self.user_pass, userou=self.ou2) 188 191 self.ldb_user.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1", 189 grouptype= 4)192 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP) 190 193 except LdbError, (num, _): 191 194 self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS) … … 211 214 try: 212 215 self.ldb_user.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1", 213 grouptype= 4)216 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP) 214 217 except LdbError, (num, _): 215 218 self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS) … … 235 238 self.ldb_owner.newuser(self.test_user1, self.user_pass, userou=self.ou2) 236 239 self.ldb_owner.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1", 237 grouptype= 4)240 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP) 238 241 # Make sure we have successfully created the two objects -- user and group 239 242 res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn)) … … 268 271 self.ldb_user3 = self.get_ldb_connection(self.user_with_group_sm, self.user_pass) 269 272 self.user_sid = self.sd_utils.get_object_sid( self.get_user_dn(self.user_with_wp)) 270 self.ldb_admin.newgroup("test_modify_group2", grouptype= 4)271 self.ldb_admin.newgroup("test_modify_group3", grouptype= 4)273 self.ldb_admin.newgroup("test_modify_group2", grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP) 274 self.ldb_admin.newgroup("test_modify_group3", grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP) 272 275 self.ldb_admin.newuser("test_modify_user2", self.user_pass) 273 276 … … 303 306 # Second test object -- Group 304 307 print "Testing modify on Group object" 305 self.ldb_admin.newgroup("test_modify_group1", grouptype=4) 308 self.ldb_admin.newgroup("test_modify_group1", 309 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP) 306 310 self.sd_utils.dacl_add_ace("CN=test_modify_group1,CN=Users," + self.base_dn, mod) 307 311 ldif = """ … … 361 365 # Second test object -- Group 362 366 print "Testing modify on Group object" 363 self.ldb_admin.newgroup("test_modify_group1", grouptype=4) 367 self.ldb_admin.newgroup("test_modify_group1", 368 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP) 364 369 self.sd_utils.dacl_add_ace("CN=test_modify_group1,CN=Users," + self.base_dn, mod) 365 370 ldif = """ … … 379 384 replace: url 380 385 url: www.samba.org""" 386 try: 387 self.ldb_user.modify_ldif(ldif) 388 except LdbError, (num, _): 389 self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS) 390 else: 391 # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS 392 self.fail() 393 # Modify on attribute you do not have rights for granted while also modifying something you do have rights for 394 ldif = """ 395 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """ 396 changetype: modify 397 replace: url 398 url: www.samba.org 399 replace: displayName 400 displayName: test_changed""" 381 401 try: 382 402 self.ldb_user.modify_ldif(ldif) … … 435 455 # Second test object -- Group 436 456 print "Testing modify on Group object" 437 self.ldb_admin.newgroup("test_modify_group1", grouptype=4) 457 self.ldb_admin.newgroup("test_modify_group1", 458 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP) 438 459 # Modify on attribute you do not have rights for granted 439 460 ldif = """ … … 601 622 def setUp(self): 602 623 super(AclSearchTests, self).setUp() 624 # Get the old "dSHeuristics" if it was set 625 dsheuristics = self.ldb_admin.get_dsheuristics() 626 # Reset the "dSHeuristics" as they were before 627 self.addCleanup(self.ldb_admin.set_dsheuristics, dsheuristics) 628 # Set the "dSHeuristics" to activate the correct "userPassword" behaviour 629 self.ldb_admin.set_dsheuristics("000000001") 630 # Get the old "minPwdAge" 631 minPwdAge = self.ldb_admin.get_minPwdAge() 632 # Reset the "minPwdAge" as it was before 633 self.addCleanup(self.ldb_admin.set_minPwdAge, minPwdAge) 634 # Set it temporarely to "0" 635 self.ldb_admin.set_minPwdAge("0") 636 603 637 self.u1 = "search_u1" 604 638 self.u2 = "search_u2" … … 608 642 self.ldb_admin.newuser(self.u2, self.user_pass) 609 643 self.ldb_admin.newuser(self.u3, self.user_pass) 610 self.ldb_admin.newgroup(self.group1, grouptype= -2147483646)611 self.ldb_admin.add_remove_group_members(self.group1, self.u2,644 self.ldb_admin.newgroup(self.group1, grouptype=samba.dsdb.GTYPE_SECURITY_GLOBAL_GROUP) 645 self.ldb_admin.add_remove_group_members(self.group1, [self.u2], 612 646 add_members_operation=True) 613 647 self.ldb_user = self.get_ldb_connection(self.u1, self.user_pass) … … 669 703 self.assertEquals(len(res), 1) 670 704 #verify some of the attributes 671 #don t care about values705 #don't care about values 672 706 self.assertTrue("ldapServiceName" in res[0]) 673 707 self.assertTrue("namingContexts" in res[0]) … … 695 729 self.fail() 696 730 try: 697 res = anonymous.search( "CN=Configuration," + self.base_dn, expression="(objectClass=*)",731 res = anonymous.search(anonymous.get_config_basedn(), expression="(objectClass=*)", 698 732 scope=SCOPE_SUBTREE) 699 733 except LdbError, (num, _): … … 716 750 self.assertTrue(res[0]["dn"] == Dn(self.ldb_admin, 717 751 "OU=test_search_ou2,OU=test_search_ou1," + self.base_dn)) 718 res = anonymous.search( "CN=Configuration," + self.base_dn, expression="(objectClass=*)",752 res = anonymous.search(anonymous.get_config_basedn(), expression="(objectClass=*)", 719 753 scope=SCOPE_SUBTREE) 720 754 self.assertEquals(len(res), 1) … … 1231 1265 self.assertNotEqual(len(res), 0) 1232 1266 1267 def test_rename_u9(self): 1268 """Rename 'User object' cross OU, with explicit deny on sd and dc""" 1269 ou1_dn = "OU=test_rename_ou1," + self.base_dn 1270 ou2_dn = "OU=test_rename_ou2," + self.base_dn 1271 user_dn = "CN=test_rename_user2," + ou1_dn 1272 rename_user_dn = "CN=test_rename_user5," + ou2_dn 1273 # Create OU structure 1274 self.ldb_admin.create_ou(ou1_dn) 1275 self.ldb_admin.create_ou(ou2_dn) 1276 self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1) 1277 mod = "(D;;SD;;;DA)" 1278 self.sd_utils.dacl_add_ace(user_dn, mod) 1279 mod = "(D;;DC;;;DA)" 1280 self.sd_utils.dacl_add_ace(ou1_dn, mod) 1281 # Rename 'User object' having SD and CC to AU 1282 try: 1283 self.ldb_admin.rename(user_dn, rename_user_dn) 1284 except LdbError, (num, _): 1285 self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS) 1286 else: 1287 self.fail() 1288 #add an allow ace so we can delete this ou 1289 mod = "(A;;DC;;;DA)" 1290 self.sd_utils.dacl_add_ace(ou1_dn, mod) 1291 1292 1233 1293 #tests on Control Access Rights 1234 1294 class AclCARTests(AclTests): … … 1236 1296 def setUp(self): 1237 1297 super(AclCARTests, self).setUp() 1298 1299 # Get the old "dSHeuristics" if it was set 1300 dsheuristics = self.ldb_admin.get_dsheuristics() 1301 # Reset the "dSHeuristics" as they were before 1302 self.addCleanup(self.ldb_admin.set_dsheuristics, dsheuristics) 1303 # Set the "dSHeuristics" to activate the correct "userPassword" behaviour 1304 self.ldb_admin.set_dsheuristics("000000001") 1305 # Get the old "minPwdAge" 1306 minPwdAge = self.ldb_admin.get_minPwdAge() 1307 # Reset the "minPwdAge" as it was before 1308 self.addCleanup(self.ldb_admin.set_minPwdAge, minPwdAge) 1309 # Set it temporarely to "0" 1310 self.ldb_admin.set_minPwdAge("0") 1311 1238 1312 self.user_with_wp = "acl_car_user1" 1239 1313 self.user_with_pc = "acl_car_user2" … … 1517 1591 self.ldb_admin.newuser(self.u2, self.user_pass) 1518 1592 self.ldb_admin.newuser(self.u3, self.user_pass) 1519 self.ldb_admin.add_remove_group_members("Domain Admins", self.u3,1593 self.ldb_admin.add_remove_group_members("Domain Admins", [self.u3], 1520 1594 add_members_operation=True) 1521 1595 self.ldb_user1 = self.get_ldb_connection(self.u1, self.user_pass) … … 1542 1616 self.sd_utils.dacl_add_ace("OU=ext_ou1," + self.base_dn, mod) 1543 1617 #create a group under that, grant RP to u2 1544 self.ldb_user1.newgroup("ext_group1", groupou="OU=ext_ou1", grouptype=4) 1618 self.ldb_user1.newgroup("ext_group1", groupou="OU=ext_ou1", 1619 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP) 1545 1620 mod = "(A;;RP;;;%s)" % str(self.user_sid2) 1546 1621 self.sd_utils.dacl_add_ace("CN=ext_group1,OU=ext_ou1," + self.base_dn, mod) … … 1563 1638 self.assertTrue("nTSecurityDescriptor" in res[0].keys()) 1564 1639 1640 class AclUndeleteTests(AclTests): 1641 1642 def setUp(self): 1643 super(AclUndeleteTests, self).setUp() 1644 self.regular_user = "undeleter1" 1645 self.ou1 = "OU=undeleted_ou," 1646 self.testuser1 = "to_be_undeleted1" 1647 self.testuser2 = "to_be_undeleted2" 1648 self.testuser3 = "to_be_undeleted3" 1649 self.testuser4 = "to_be_undeleted4" 1650 self.testuser5 = "to_be_undeleted5" 1651 self.testuser6 = "to_be_undeleted6" 1652 1653 self.new_dn_ou = "CN="+ self.testuser4 + "," + self.ou1 + self.base_dn 1654 1655 # Create regular user 1656 self.testuser1_dn = self.get_user_dn(self.testuser1) 1657 self.testuser2_dn = self.get_user_dn(self.testuser2) 1658 self.testuser3_dn = self.get_user_dn(self.testuser3) 1659 self.testuser4_dn = self.get_user_dn(self.testuser4) 1660 self.testuser5_dn = self.get_user_dn(self.testuser5) 1661 self.deleted_dn1 = self.create_delete_user(self.testuser1) 1662 self.deleted_dn2 = self.create_delete_user(self.testuser2) 1663 self.deleted_dn3 = self.create_delete_user(self.testuser3) 1664 self.deleted_dn4 = self.create_delete_user(self.testuser4) 1665 self.deleted_dn5 = self.create_delete_user(self.testuser5) 1666 1667 self.ldb_admin.create_ou(self.ou1 + self.base_dn) 1668 1669 self.ldb_admin.newuser(self.regular_user, self.user_pass) 1670 self.ldb_admin.add_remove_group_members("Domain Admins", [self.regular_user], 1671 add_members_operation=True) 1672 self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass) 1673 self.sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user)) 1674 1675 def tearDown(self): 1676 super(AclUndeleteTests, self).tearDown() 1677 delete_force(self.ldb_admin, self.get_user_dn(self.regular_user)) 1678 delete_force(self.ldb_admin, self.get_user_dn(self.testuser1)) 1679 delete_force(self.ldb_admin, self.get_user_dn(self.testuser2)) 1680 delete_force(self.ldb_admin, self.get_user_dn(self.testuser3)) 1681 delete_force(self.ldb_admin, self.get_user_dn(self.testuser4)) 1682 delete_force(self.ldb_admin, self.get_user_dn(self.testuser5)) 1683 delete_force(self.ldb_admin, self.new_dn_ou) 1684 delete_force(self.ldb_admin, self.ou1 + self.base_dn) 1685 1686 def GUID_string(self, guid): 1687 return ldb.schema_format_value("objectGUID", guid) 1688 1689 def create_delete_user(self, new_user): 1690 self.ldb_admin.newuser(new_user, self.user_pass) 1691 1692 res = self.ldb_admin.search(expression="(objectClass=*)", 1693 base=self.get_user_dn(new_user), 1694 scope=SCOPE_BASE, 1695 controls=["show_deleted:1"]) 1696 guid = res[0]["objectGUID"][0] 1697 self.ldb_admin.delete(self.get_user_dn(new_user)) 1698 res = self.ldb_admin.search(base="<GUID=%s>" % self.GUID_string(guid), 1699 scope=SCOPE_BASE, controls=["show_deleted:1"]) 1700 self.assertEquals(len(res), 1) 1701 return str(res[0].dn) 1702 1703 def undelete_deleted(self, olddn, newdn): 1704 msg = Message() 1705 msg.dn = Dn(self.ldb_user, olddn) 1706 msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted") 1707 msg["distinguishedName"] = MessageElement([newdn], FLAG_MOD_REPLACE, "distinguishedName") 1708 res = self.ldb_user.modify(msg, ["show_recycled:1"]) 1709 1710 def undelete_deleted_with_mod(self, olddn, newdn): 1711 msg = Message() 1712 msg.dn = Dn(ldb, olddn) 1713 msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted") 1714 msg["distinguishedName"] = MessageElement([newdn], FLAG_MOD_REPLACE, "distinguishedName") 1715 msg["url"] = MessageElement(["www.samba.org"], FLAG_MOD_REPLACE, "url") 1716 res = self.ldb_user.modify(msg, ["show_deleted:1"]) 1717 1718 def test_undelete(self): 1719 # it appears the user has to have LC on the old parent to be able to move the object 1720 # otherwise we get no such object. Since only System can modify the SD on deleted object 1721 # we cannot grant this permission via LDAP, and this leaves us with "negative" tests at the moment 1722 1723 # deny write property on rdn, should fail 1724 mod = "(OD;;WP;bf967a0e-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.sid) 1725 self.sd_utils.dacl_add_ace(self.deleted_dn1, mod) 1726 try: 1727 self.undelete_deleted(self.deleted_dn1, self.testuser1_dn) 1728 self.fail() 1729 except LdbError, (num, _): 1730 self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS) 1731 1732 # seems that permissions on isDeleted and distinguishedName are irrelevant 1733 mod = "(OD;;WP;bf96798f-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.sid) 1734 self.sd_utils.dacl_add_ace(self.deleted_dn2, mod) 1735 mod = "(OD;;WP;bf9679e4-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.sid) 1736 self.sd_utils.dacl_add_ace(self.deleted_dn2, mod) 1737 self.undelete_deleted(self.deleted_dn2, self.testuser2_dn) 1738 1739 # attempt undelete with simultanious addition of url, WP to which is denied 1740 mod = "(OD;;WP;9a9a0221-4a5b-11d1-a9c3-0000f80367c1;;%s)" % str(self.sid) 1741 self.sd_utils.dacl_add_ace(self.deleted_dn3, mod) 1742 try: 1743 self.undelete_deleted_with_mod(self.deleted_dn3, self.testuser3_dn) 1744 self.fail() 1745 except LdbError, (num, _): 1746 self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS) 1747 1748 # undelete in an ou, in which we have no right to create children 1749 mod = "(D;;CC;;;%s)" % str(self.sid) 1750 self.sd_utils.dacl_add_ace(self.ou1 + self.base_dn, mod) 1751 try: 1752 self.undelete_deleted(self.deleted_dn4, self.new_dn_ou) 1753 self.fail() 1754 except LdbError, (num, _): 1755 self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS) 1756 1757 # delete is not required 1758 mod = "(D;;SD;;;%s)" % str(self.sid) 1759 self.sd_utils.dacl_add_ace(self.deleted_dn5, mod) 1760 self.undelete_deleted(self.deleted_dn5, self.testuser5_dn) 1761 1762 # deny Reanimate-Tombstone, should fail 1763 mod = "(OD;;CR;45ec5156-db7e-47bb-b53f-dbeb2d03c40f;;%s)" % str(self.sid) 1764 self.sd_utils.dacl_add_ace(self.base_dn, mod) 1765 try: 1766 self.undelete_deleted(self.deleted_dn4, self.testuser4_dn) 1767 self.fail() 1768 except LdbError, (num, _): 1769 self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS) 1565 1770 1566 1771 class AclSPNTests(AclTests): … … 1622 1827 # same as for join_RODC, but do not set any SPNs 1623 1828 def create_rodc(self, ctx): 1829 ctx.nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ] 1830 ctx.full_nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ] 1624 1831 ctx.krbtgt_dn = "CN=krbtgt_%s,CN=Users,%s" % (ctx.myname, ctx.base_dn) 1625 1832 … … 1651 1858 1652 1859 def create_dc(self, ctx): 1860 ctx.nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ] 1861 ctx.full_nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ] 1653 1862 ctx.userAccountControl = samba.dsdb.UF_SERVER_TRUST_ACCOUNT | samba.dsdb.UF_TRUSTED_FOR_DELEGATION 1654 1863 ctx.secure_channel_type = misc.SEC_CHAN_BDC … … 1678 1887 (ctx.myname, ctx.dnsdomain, ctx.dnsdomain)) 1679 1888 self.replace_spn(self.ldb_user1, ctx.acct_dn, "GC/%s.%s/%s" % 1680 (ctx.myname, ctx.dnsdomain, ctx.dns domain))1889 (ctx.myname, ctx.dnsdomain, ctx.dnsforest)) 1681 1890 self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s/%s" % (ctx.myname, netbiosdomain)) 1682 1891 self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/%s" % … … 1737 1946 (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain)) 1738 1947 self.replace_spn(self.ldb_admin, self.computerdn, "GC/%s.%s/%s" % 1739 (self.computername, self.dcctx.dnsdomain, self.dcctx.dns domain))1948 (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsforest)) 1740 1949 self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s/%s" % (self.computername, netbiosdomain)) 1741 1950 self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/ForestDnsZones.%s" % … … 1801 2010 try: 1802 2011 self.replace_spn(self.ldb_user1, self.computerdn, "GC/%s.%s/%s" % 1803 (self.computername, self.dcctx.dnsdomain, self.dcctx.dns domain))2012 (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsforest)) 1804 2013 except LdbError, (num, _): 1805 2014 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) … … 1825 2034 ldb = SamDB(ldaphost, credentials=creds, session_info=system_session(lp), lp=lp) 1826 2035 1827 runner = SubunitTestRunner() 1828 rc = 0 1829 if not runner.run(unittest.makeSuite(AclAddTests)).wasSuccessful(): 1830 rc = 1 1831 if not runner.run(unittest.makeSuite(AclModifyTests)).wasSuccessful(): 1832 rc = 1 1833 if not runner.run(unittest.makeSuite(AclDeleteTests)).wasSuccessful(): 1834 rc = 1 1835 if not runner.run(unittest.makeSuite(AclRenameTests)).wasSuccessful(): 1836 rc = 1 1837 1838 # Get the old "dSHeuristics" if it was set 1839 dsheuristics = ldb.get_dsheuristics() 1840 # Set the "dSHeuristics" to activate the correct "userPassword" behaviour 1841 ldb.set_dsheuristics("000000001") 1842 # Get the old "minPwdAge" 1843 minPwdAge = ldb.get_minPwdAge() 1844 # Set it temporarely to "0" 1845 ldb.set_minPwdAge("0") 1846 if not runner.run(unittest.makeSuite(AclCARTests)).wasSuccessful(): 1847 rc = 1 1848 if not runner.run(unittest.makeSuite(AclSearchTests)).wasSuccessful(): 1849 rc = 1 1850 # Reset the "dSHeuristics" as they were before 1851 ldb.set_dsheuristics(dsheuristics) 1852 # Reset the "minPwdAge" as it was before 1853 ldb.set_minPwdAge(minPwdAge) 1854 1855 if not runner.run(unittest.makeSuite(AclExtendedTests)).wasSuccessful(): 1856 rc = 1 1857 if not runner.run(unittest.makeSuite(AclSPNTests)).wasSuccessful(): 1858 rc = 1 1859 sys.exit(rc) 2036 TestProgram(module=__name__, opts=subunitopts) -
vendor/current/source4/dsdb/tests/python/deletetest.py
r740 r988 8 8 sys.path.insert(0, "bin/python") 9 9 import samba 10 samba.ensure_external_module("testtools", "testtools") 11 samba.ensure_external_module("subunit", "subunit/python") 10 11 from samba.tests.subunitrun import SubunitOptions, TestProgram 12 12 13 13 import samba.getopt as options 14 14 15 15 from samba.auth import system_session 16 from ldb import SCOPE_BASE, LdbError 17 from ldb import ERR_NO_SUCH_OBJECT, ERR_NOT_ALLOWED_ON_NON_LEAF 18 from ldb import ERR_UNWILLING_TO_PERFORM 16 from ldb import SCOPE_BASE, LdbError, Message, MessageElement, Dn, FLAG_MOD_ADD, FLAG_MOD_DELETE, FLAG_MOD_REPLACE 17 from ldb import ERR_NO_SUCH_OBJECT, ERR_NOT_ALLOWED_ON_NON_LEAF, ERR_ENTRY_ALREADY_EXISTS, ERR_ATTRIBUTE_OR_VALUE_EXISTS 18 from ldb import ERR_UNWILLING_TO_PERFORM, ERR_OPERATIONS_ERROR 19 19 from samba.samdb import SamDB 20 20 from samba.tests import delete_force 21 22 from subunit.run import SubunitTestRunner23 import unittest24 21 25 22 parser = optparse.OptionParser("deletetest.py [options] <host|file>") … … 30 27 credopts = options.CredentialsOptions(parser) 31 28 parser.add_option_group(credopts) 29 subunitopts = SubunitOptions(parser) 30 parser.add_option_group(subunitopts) 32 31 opts, args = parser.parse_args() 33 32 … … 41 40 creds = credopts.get_credentials(lp) 42 41 43 class BasicDeleteTests(unittest.TestCase): 44 42 class BaseDeleteTests(samba.tests.TestCase): 45 43 46 44 def GUID_string(self, guid): … … 48 46 49 47 def setUp(self): 50 self.ldb = ldb 51 self.base_dn = ldb.domain_dn() 52 self.configuration_dn = ldb.get_config_basedn().get_linearized() 48 super(BaseDeleteTests, self).setUp() 49 self.ldb = SamDB(host, credentials=creds, session_info=system_session(lp), lp=lp) 50 51 self.base_dn = self.ldb.domain_dn() 52 self.configuration_dn = self.ldb.get_config_basedn().get_linearized() 53 53 54 54 def search_guid(self, guid): 55 55 print "SEARCH by GUID %s" % self.GUID_string(guid) 56 56 57 res = ldb.search(base="<GUID=%s>" % self.GUID_string(guid),57 res = self.ldb.search(base="<GUID=%s>" % self.GUID_string(guid), 58 58 scope=SCOPE_BASE, controls=["show_deleted:1"]) 59 59 self.assertEquals(len(res), 1) … … 63 63 print "SEARCH by DN %s" % dn 64 64 65 res = ldb.search(expression="(objectClass=*)",65 res = self.ldb.search(expression="(objectClass=*)", 66 66 base=dn, 67 67 scope=SCOPE_BASE, … … 69 69 self.assertEquals(len(res), 1) 70 70 return res[0] 71 72 73 class BasicDeleteTests(BaseDeleteTests): 74 75 def setUp(self): 76 super(BasicDeleteTests, self).setUp() 71 77 72 78 def del_attr_values(self, delObj): … … 119 125 delete_force(self.ldb, "cn=ldaptestcontainer," + self.base_dn) 120 126 121 ldb.add({127 self.ldb.add({ 122 128 "dn": "cn=ldaptestcontainer," + self.base_dn, 123 129 "objectclass": "container"}) 124 ldb.add({130 self.ldb.add({ 125 131 "dn": "cn=entry1,cn=ldaptestcontainer," + self.base_dn, 126 132 "objectclass": "container"}) 127 ldb.add({133 self.ldb.add({ 128 134 "dn": "cn=entry2,cn=ldaptestcontainer," + self.base_dn, 129 135 "objectclass": "container"}) 130 136 131 137 try: 132 ldb.delete("cn=ldaptestcontainer," + self.base_dn)133 self.fail() 134 except LdbError, (num, _): 135 self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF) 136 137 ldb.delete("cn=ldaptestcontainer," + self.base_dn, ["tree_delete:1"])138 139 try: 140 res = ldb.search("cn=ldaptestcontainer," + self.base_dn,138 self.ldb.delete("cn=ldaptestcontainer," + self.base_dn) 139 self.fail() 140 except LdbError, (num, _): 141 self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF) 142 143 self.ldb.delete("cn=ldaptestcontainer," + self.base_dn, ["tree_delete:1"]) 144 145 try: 146 res = self.ldb.search("cn=ldaptestcontainer," + self.base_dn, 141 147 scope=SCOPE_BASE, attrs=[]) 142 148 self.fail() … … 144 150 self.assertEquals(num, ERR_NO_SUCH_OBJECT) 145 151 try: 146 res = ldb.search("cn=entry1,cn=ldaptestcontainer," + self.base_dn,152 res = self.ldb.search("cn=entry1,cn=ldaptestcontainer," + self.base_dn, 147 153 scope=SCOPE_BASE, attrs=[]) 148 154 self.fail() … … 150 156 self.assertEquals(num, ERR_NO_SUCH_OBJECT) 151 157 try: 152 res = ldb.search("cn=entry2,cn=ldaptestcontainer," + self.base_dn,158 res = self.ldb.search("cn=entry2,cn=ldaptestcontainer," + self.base_dn, 153 159 scope=SCOPE_BASE, attrs=[]) 154 160 self.fail() … … 162 168 # Performs some protected object delete testing 163 169 164 res = ldb.search(base="", expression="", scope=SCOPE_BASE,170 res = self.ldb.search(base="", expression="", scope=SCOPE_BASE, 165 171 attrs=["dsServiceName", "dNSHostName"]) 166 172 self.assertEquals(len(res), 1) … … 168 174 # Delete failing since DC's nTDSDSA object is protected 169 175 try: 170 ldb.delete(res[0]["dsServiceName"][0])171 self.fail() 172 except LdbError, (num, _): 173 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 174 175 res = ldb.search(self.base_dn, attrs=["rIDSetReferences"],176 self.ldb.delete(res[0]["dsServiceName"][0]) 177 self.fail() 178 except LdbError, (num, _): 179 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 180 181 res = self.ldb.search(self.base_dn, attrs=["rIDSetReferences"], 176 182 expression="(&(objectClass=computer)(dNSHostName=" + res[0]["dNSHostName"][0] + "))") 177 183 self.assertEquals(len(res), 1) … … 179 185 # Deletes failing since DC's rIDSet object is protected 180 186 try: 181 ldb.delete(res[0]["rIDSetReferences"][0])182 self.fail() 183 except LdbError, (num, _): 184 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 185 try: 186 ldb.delete(res[0]["rIDSetReferences"][0], ["tree_delete:1"])187 self.ldb.delete(res[0]["rIDSetReferences"][0]) 188 self.fail() 189 except LdbError, (num, _): 190 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 191 try: 192 self.ldb.delete(res[0]["rIDSetReferences"][0], ["tree_delete:1"]) 187 193 self.fail() 188 194 except LdbError, (num, _): … … 192 198 193 199 try: 194 ldb.delete("cn=Enterprise Schema,cn=Partitions," + self.configuration_dn)195 self.fail() 196 except LdbError, (num, _): 197 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 198 try: 199 ldb.delete("cn=Enterprise Schema,cn=Partitions," + self.configuration_dn, ["tree_delete:1"])200 self.fail() 201 except LdbError, (num, _): 202 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 203 204 try: 205 ldb.delete("cn=Enterprise Configuration,cn=Partitions," + self.configuration_dn)206 self.fail() 207 except LdbError, (num, _): 208 self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF) 209 try: 210 ldb.delete("cn=Enterprise Configuration,cn=Partitions," + self.configuration_dn, ["tree_delete:1"])211 self.fail() 212 except LdbError, (num, _): 213 self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF) 214 215 res = ldb.search("cn=Partitions," + self.configuration_dn, attrs=[],200 self.ldb.delete("cn=Enterprise Schema,cn=Partitions," + self.configuration_dn) 201 self.fail() 202 except LdbError, (num, _): 203 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 204 try: 205 self.ldb.delete("cn=Enterprise Schema,cn=Partitions," + self.configuration_dn, ["tree_delete:1"]) 206 self.fail() 207 except LdbError, (num, _): 208 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 209 210 try: 211 self.ldb.delete("cn=Enterprise Configuration,cn=Partitions," + self.configuration_dn) 212 self.fail() 213 except LdbError, (num, _): 214 self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF) 215 try: 216 self.ldb.delete("cn=Enterprise Configuration,cn=Partitions," + self.configuration_dn, ["tree_delete:1"]) 217 self.fail() 218 except LdbError, (num, _): 219 self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF) 220 221 res = self.ldb.search("cn=Partitions," + self.configuration_dn, attrs=[], 216 222 expression="(nCName=%s)" % self.base_dn) 217 223 self.assertEquals(len(res), 1) 218 224 219 225 try: 220 ldb.delete(res[0].dn)221 self.fail() 222 except LdbError, (num, _): 223 self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF) 224 try: 225 ldb.delete(res[0].dn, ["tree_delete:1"])226 self.ldb.delete(res[0].dn) 227 self.fail() 228 except LdbError, (num, _): 229 self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF) 230 try: 231 self.ldb.delete(res[0].dn, ["tree_delete:1"]) 226 232 self.fail() 227 233 except LdbError, (num, _): … … 230 236 # Delete failing since "SYSTEM_FLAG_DISALLOW_DELETE" 231 237 try: 232 ldb.delete("CN=Users," + self.base_dn)238 self.ldb.delete("CN=Users," + self.base_dn) 233 239 self.fail() 234 240 except LdbError, (num, _): … … 237 243 # Tree-delete failing since "isCriticalSystemObject" 238 244 try: 239 ldb.delete("CN=Computers," + self.base_dn, ["tree_delete:1"])245 self.ldb.delete("CN=Computers," + self.base_dn, ["tree_delete:1"]) 240 246 self.fail() 241 247 except LdbError, (num, _): … … 247 253 print self.base_dn 248 254 249 usr1="cn=testuser,cn=users," + self.base_dn 250 usr2="cn=testuser2,cn=users," + self.base_dn 251 grp1="cn=testdelgroup1,cn=users," + self.base_dn 252 sit1="cn=testsite1,cn=sites," + self.configuration_dn 253 ss1="cn=NTDS Site Settings,cn=testsite1,cn=sites," + self.configuration_dn 254 srv1="cn=Servers,cn=testsite1,cn=sites," + self.configuration_dn 255 srv2="cn=TESTSRV,cn=Servers,cn=testsite1,cn=sites," + self.configuration_dn 255 # user current time in ms to make unique objects 256 import time 257 marker = str(int(round(time.time()*1000))) 258 usr1_name = "u_" + marker 259 usr2_name = "u2_" + marker 260 grp_name = "g1_" + marker 261 site_name = "s1_" + marker 262 263 usr1 = "cn=%s,cn=users,%s" % (usr1_name, self.base_dn) 264 usr2 = "cn=%s,cn=users,%s" % (usr2_name, self.base_dn) 265 grp1 = "cn=%s,cn=users,%s" % (grp_name, self.base_dn) 266 sit1 = "cn=%s,cn=sites,%s" % (site_name, self.configuration_dn) 267 ss1 = "cn=NTDS Site Settings,cn=%s,cn=sites,%s" % (site_name, self.configuration_dn) 268 srv1 = "cn=Servers,cn=%s,cn=sites,%s" % (site_name, self.configuration_dn) 269 srv2 = "cn=TESTSRV,cn=Servers,cn=%s,cn=sites,%s" % (site_name, self.configuration_dn) 256 270 257 271 delete_force(self.ldb, usr1) … … 263 277 delete_force(self.ldb, sit1) 264 278 265 ldb.add({279 self.ldb.add({ 266 280 "dn": usr1, 267 281 "objectclass": "user", 268 282 "description": "test user description", 269 "samaccountname": "testuser"})270 271 ldb.add({283 "samaccountname": usr1_name}) 284 285 self.ldb.add({ 272 286 "dn": usr2, 273 287 "objectclass": "user", 274 288 "description": "test user 2 description", 275 "samaccountname": "testuser2"})276 277 ldb.add({289 "samaccountname": usr2_name}) 290 291 self.ldb.add({ 278 292 "dn": grp1, 279 293 "objectclass": "group", 280 294 "description": "test group", 281 "samaccountname": "testdelgroup1",295 "samaccountname": grp_name, 282 296 "member": [ usr1, usr2 ], 283 297 "isDeleted": "FALSE" }) 284 298 285 ldb.add({299 self.ldb.add({ 286 300 "dn": sit1, 287 301 "objectclass": "site" }) 288 302 289 ldb.add({303 self.ldb.add({ 290 304 "dn": ss1, 291 305 "objectclass": ["applicationSiteSettings", "nTDSSiteSettings"] }) 292 306 293 ldb.add({307 self.ldb.add({ 294 308 "dn": srv1, 295 309 "objectclass": "serversContainer" }) 296 310 297 ldb.add({311 self.ldb.add({ 298 312 "dn": srv2, 299 313 "objectClass": "server" }) … … 320 334 guid7=objLive7["objectGUID"][0] 321 335 322 ldb.delete(usr1)323 ldb.delete(usr2)324 ldb.delete(grp1)325 ldb.delete(srv1, ["tree_delete:1"])326 ldb.delete(sit1, ["tree_delete:1"])336 self.ldb.delete(usr1) 337 self.ldb.delete(usr2) 338 self.ldb.delete(grp1) 339 self.ldb.delete(srv1, ["tree_delete:1"]) 340 self.ldb.delete(sit1, ["tree_delete:1"]) 327 341 328 342 objDeleted1 = self.search_guid(guid1) … … 358 372 self.check_rdn(objLive7, objDeleted7, "cn") 359 373 360 self.delete_deleted( ldb, usr1)361 self.delete_deleted( ldb, usr2)362 self.delete_deleted( ldb, grp1)363 self.delete_deleted( ldb, sit1)364 self.delete_deleted( ldb, ss1)365 self.delete_deleted( ldb, srv1)366 self.delete_deleted( ldb, srv2)374 self.delete_deleted(self.ldb, usr1) 375 self.delete_deleted(self.ldb, usr2) 376 self.delete_deleted(self.ldb, grp1) 377 self.delete_deleted(self.ldb, sit1) 378 self.delete_deleted(self.ldb, ss1) 379 self.delete_deleted(self.ldb, srv1) 380 self.delete_deleted(self.ldb, srv2) 367 381 368 382 self.assertTrue("CN=Deleted Objects" in str(objDeleted1.dn)) … … 374 388 self.assertFalse("CN=Deleted Objects" in str(objDeleted7.dn)) 375 389 390 376 391 if not "://" in host: 377 392 if os.path.isfile(host): … … 380 395 host = "ldap://%s" % host 381 396 382 ldb = SamDB(host, credentials=creds, session_info=system_session(lp), lp=lp) 383 384 runner = SubunitTestRunner() 385 rc = 0 386 if not runner.run(unittest.makeSuite(BasicDeleteTests)).wasSuccessful(): 387 rc = 1 388 389 sys.exit(rc) 397 TestProgram(module=__name__, opts=subunitopts) -
vendor/current/source4/dsdb/tests/python/dsdb_schema_info.py
r740 r988 31 31 32 32 sys.path.insert(0, "bin/python") 33 import samba 34 samba.ensure_external_module("testtools", "testtools") 33 import samba.tests 35 34 36 35 from ldb import SCOPE_BASE, LdbError -
vendor/current/source4/dsdb/tests/python/ldap.py
r740 r988 2 2 # -*- coding: utf-8 -*- 3 3 # This is a port of the original in testprogs/ejs/ldap.js 4 5 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2008-2011 6 # 7 # This program is free software; you can redistribute it and/or modify 8 # it under the terms of the GNU General Public License as published by 9 # the Free Software Foundation; either version 3 of the License, or 10 # (at your option) any later version. 11 # 12 # This program is distributed in the hope that it will be useful, 13 # but WITHOUT ANY WARRANTY; without even the implied warranty of 14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15 # GNU General Public License for more details. 16 # 17 # You should have received a copy of the GNU General Public License 18 # along with this program. If not, see <http://www.gnu.org/licenses/>. 4 19 5 20 import optparse … … 11 26 sys.path.insert(0, "bin/python") 12 27 import samba 13 samba.ensure_external_module("testtools", "testtools") 14 samba.ensure_external_module("subunit", "subunit/python") 15 28 from samba.tests.subunitrun import SubunitOptions, TestProgram 16 29 import samba.getopt as options 17 30 … … 36 49 SYSTEM_FLAG_CONFIG_ALLOW_LIMITED_MOVE) 37 50 38 from subunit.run import SubunitTestRunner39 import unittest40 41 51 from samba.ndr import ndr_pack, ndr_unpack 42 52 from samba.dcerpc import security, lsa … … 50 60 credopts = options.CredentialsOptions(parser) 51 61 parser.add_option_group(credopts) 62 subunitopts = SubunitOptions(parser) 63 parser.add_option_group(subunitopts) 52 64 opts, args = parser.parse_args() 53 65 … … 61 73 creds = credopts.get_credentials(lp) 62 74 63 class BasicTests( unittest.TestCase):75 class BasicTests(samba.tests.TestCase): 64 76 65 77 def setUp(self): … … 71 83 self.schema_dn = ldb.get_schema_basedn().get_linearized() 72 84 self.domain_sid = security.dom_sid(ldb.get_domain_sid()) 73 74 print "baseDN: %s\n" % self.base_dn75 85 76 86 delete_force(self.ldb, "cn=posixuser,cn=users," + self.base_dn) … … 97 107 delete_force(self.ldb, "ou=testou,cn=users," + self.base_dn) 98 108 delete_force(self.ldb, "cn=Test Secret,cn=system," + self.base_dn) 109 delete_force(self.ldb, "cn=testtimevaluesuser1,cn=users," + self.base_dn) 99 110 100 111 def test_objectclasses(self): 101 112 """Test objectClass behaviour""" 102 print "Test objectClass behaviour"""103 104 # We cannot create LSA-specific objects (oc "secret" or "trustedDomain")105 try:106 self.ldb.add({107 "dn": "cn=Test Secret,cn=system," + self.base_dn,108 "objectClass": "secret" })109 self.fail()110 except LdbError, (num, _):111 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)112 113 113 # Invalid objectclass specified 114 114 try: … … 149 149 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) 150 150 151 # We cannot instanciate from an abstract objectclass 151 # We cannot instanciate from an abstract object class ("connectionPoint" 152 # or "leaf"). In the first case we use "connectionPoint" (subclass of 153 # "leaf") to prevent a naming violation - this returns us a 154 # "ERR_UNWILLING_TO_PERFORM" since it is not structural. In the second 155 # case however we get "ERR_OBJECT_CLASS_VIOLATION" since an abstract 156 # class is also not allowed to be auxiliary. 152 157 try: 153 158 self.ldb.add({ … … 157 162 except LdbError, (num, _): 158 163 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 164 try: 165 self.ldb.add({ 166 "dn": "cn=ldaptestuser,cn=users," + self.base_dn, 167 "objectClass": ["person", "leaf"] }) 168 self.fail() 169 except LdbError, (num, _): 170 self.assertEquals(num, ERR_OBJECT_CLASS_VIOLATION) 171 172 # Objects instanciated using "satisfied" abstract classes (concrete 173 # subclasses) are allowed 174 self.ldb.add({ 175 "dn": "cn=ldaptestuser,cn=users," + self.base_dn, 176 "objectClass": ["top", "leaf", "connectionPoint", "serviceConnectionPoint"] }) 177 178 delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) 179 180 # Two disjoint top-most structural object classes aren't allowed 181 try: 182 self.ldb.add({ 183 "dn": "cn=ldaptestuser,cn=users," + self.base_dn, 184 "objectClass": ["person", "container"] }) 185 self.fail() 186 except LdbError, (num, _): 187 self.assertEquals(num, ERR_OBJECT_CLASS_VIOLATION) 159 188 160 189 # Test allowed system flags … … 222 251 self.assertEquals(num, ERR_NO_SUCH_ATTRIBUTE) 223 252 224 # The top-most structural class cannot be changed by adding another 225 # structural one 253 # We cannot add a the new top-most structural class "user" here since 254 # we are missing at least one new mandatory attribute (in this case 255 # "sAMAccountName") 226 256 m = Message() 227 257 m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) … … 252 282 ldb.modify(m) 253 283 254 # It's only possible to replace with the same objectclass combination. 255 # So the replace action on "objectClass" attributes is really useless. 284 # This does not work since object class "leaf" is not auxiliary nor it 285 # stands in direct relation to "person" (and it is abstract too!) 286 m = Message() 287 m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) 288 m["objectClass"] = MessageElement("leaf", FLAG_MOD_ADD, 289 "objectClass") 290 try: 291 ldb.modify(m) 292 self.fail() 293 except LdbError, (num, _): 294 self.assertEquals(num, ERR_OBJECT_CLASS_VIOLATION) 295 296 # Objectclass replace operations can be performed as well 256 297 m = Message() 257 298 m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) … … 266 307 ldb.modify(m) 267 308 309 # This does not work since object class "leaf" is not auxiliary nor it 310 # stands in direct relation to "person" (and it is abstract too!) 268 311 m = Message() 269 312 m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) 270 313 m["objectClass"] = MessageElement(["top", "person", "bootableDevice", 271 " connectionPoint"], FLAG_MOD_REPLACE, "objectClass")314 "leaf"], FLAG_MOD_REPLACE, "objectClass") 272 315 try: 273 316 ldb.modify(m) … … 356 399 delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) 357 400 401 self.ldb.add({ 402 "dn": "cn=ldaptestuser,cn=users," + self.base_dn, 403 "objectClass": "user" }) 404 405 # Add a new top-most structural class "container". This does not work 406 # since it stands in no direct relation to the current one. 407 m = Message() 408 m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) 409 m["objectClass"] = MessageElement("container", FLAG_MOD_ADD, 410 "objectClass") 411 try: 412 ldb.modify(m) 413 self.fail() 414 except LdbError, (num, _): 415 self.assertEquals(num, ERR_OBJECT_CLASS_VIOLATION) 416 417 # Add a new top-most structural class "inetOrgPerson" and remove it 418 # afterwards 419 m = Message() 420 m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) 421 m["objectClass"] = MessageElement("inetOrgPerson", FLAG_MOD_ADD, 422 "objectClass") 423 ldb.modify(m) 424 425 m = Message() 426 m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) 427 m["objectClass"] = MessageElement("inetOrgPerson", FLAG_MOD_DELETE, 428 "objectClass") 429 ldb.modify(m) 430 431 # Replace top-most structural class to "inetOrgPerson" and reset it 432 # back to "user" 433 m = Message() 434 m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) 435 m["objectClass"] = MessageElement("inetOrgPerson", FLAG_MOD_REPLACE, 436 "objectClass") 437 ldb.modify(m) 438 439 m = Message() 440 m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) 441 m["objectClass"] = MessageElement("user", FLAG_MOD_REPLACE, 442 "objectClass") 443 ldb.modify(m) 444 445 # Add a new auxiliary object class "posixAccount" to "ldaptestuser" 446 m = Message() 447 m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) 448 m["objectClass"] = MessageElement("posixAccount", FLAG_MOD_ADD, 449 "objectClass") 450 ldb.modify(m) 451 452 # Be sure that "top" is the first and the (most) structural object class 453 # the last value of the "objectClass" attribute - MS-ADTS 3.1.1.1.4 454 res = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, 455 scope=SCOPE_BASE, attrs=["objectClass"]) 456 self.assertTrue(len(res) == 1) 457 self.assertEquals(res[0]["objectClass"][0], "top") 458 self.assertEquals(res[0]["objectClass"][len(res[0]["objectClass"])-1], "user") 459 460 delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) 461 358 462 def test_system_only(self): 359 463 """Test systemOnly objects""" 360 print "Test systemOnly objects"""361 362 464 try: 363 465 self.ldb.add({ … … 445 547 def test_invalid_parent(self): 446 548 """Test adding an object with invalid parent""" 447 print "Test adding an object with invalid parent"""448 449 549 try: 450 550 self.ldb.add({ … … 471 571 def test_invalid_attribute(self): 472 572 """Test invalid attributes on schema/objectclasses""" 473 print "Test invalid attributes on schema/objectclasses"""474 475 573 # attributes not in schema test 476 574 … … 573 671 def test_single_valued_attributes(self): 574 672 """Test single-valued attributes""" 575 print "Test single-valued attributes"""576 577 673 try: 578 674 self.ldb.add({ … … 618 714 def test_attribute_ranges(self): 619 715 """Test attribute ranges""" 620 print "Test attribute ranges"""621 622 716 # Too short (min. 1) 623 717 try: … … 673 767 def test_empty_messages(self): 674 768 """Test empty messages""" 675 print "Test empty messages"""676 677 769 m = Message() 678 770 m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) … … 694 786 def test_empty_attributes(self): 695 787 """Test empty attributes""" 696 print "Test empty attributes"""697 698 788 m = Message() 699 789 m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) … … 739 829 def test_instanceType(self): 740 830 """Tests the 'instanceType' attribute""" 741 print "Tests the 'instanceType' attribute"""742 743 831 # The instance type is single-valued 744 832 try: … … 806 894 delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) 807 895 896 #only write is allowed with NC_HEAD for originating updates 897 try: 898 self.ldb.add({ 899 "dn": "cn=ldaptestuser2,cn=users," + self.base_dn, 900 "objectclass": "user", 901 "instanceType": "3" }) 902 self.fail() 903 except LdbError, (num, _): 904 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 905 delete_force(self.ldb, "cn=ldaptestuser2,cn=users," + self.base_dn) 906 808 907 def test_distinguished_name(self): 809 908 """Tests the 'distinguishedName' attribute""" 810 print "Tests the 'distinguishedName' attribute"""811 812 909 # The "dn" shortcut isn't supported 813 910 m = Message() … … 858 955 self.fail() 859 956 except LdbError, (num, _): 860 self.assertEquals(num, ERR_ CONSTRAINT_VIOLATION)957 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 861 958 862 959 m = Message() … … 882 979 self.fail() 883 980 except LdbError, (num, _): 884 self.assertEquals(num, ERR_ CONSTRAINT_VIOLATION)981 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 885 982 886 983 delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) … … 888 985 def test_rdn_name(self): 889 986 """Tests the RDN""" 890 print "Tests the RDN"""891 892 987 # Search 893 988 … … 1059 1154 def DISABLED_test_largeRDN(self): 1060 1155 """Testing large rDN (limit 64 characters)""" 1061 rdn = "CN=a012345678901234567890123456789012345678901234567890123456789012" ;1156 rdn = "CN=a012345678901234567890123456789012345678901234567890123456789012" 1062 1157 delete_force(self.ldb, "%s,%s" % (rdn, self.base_dn)) 1063 1158 ldif = """ … … 1068 1163 delete_force(self.ldb, "%s,%s" % (rdn, self.base_dn)) 1069 1164 1070 rdn = "CN=a0123456789012345678901234567890123456789012345678901234567890120" ;1165 rdn = "CN=a0123456789012345678901234567890123456789012345678901234567890120" 1071 1166 delete_force(self.ldb, "%s,%s" % (rdn, self.base_dn)) 1072 1167 try: … … 1083 1178 def test_rename(self): 1084 1179 """Tests the rename operation""" 1085 print "Tests the rename operations"""1086 1087 1180 try: 1088 1181 # cannot rename to be a child of itself … … 1194 1287 def test_rename_twice(self): 1195 1288 """Tests the rename operation twice - this corresponds to a past bug""" 1196 print "Tests the rename twice operation"""1197 1198 1289 self.ldb.add({ 1199 1290 "dn": "cn=ldaptestuser5,cn=users," + self.base_dn, … … 1207 1298 ldb.rename("cn=ldaptestuser5,cn=Users," + self.base_dn, "cn=ldaptestUSER5,cn=users," + self.base_dn) 1208 1299 res = ldb.search(expression="cn=ldaptestuser5") 1209 print "Found %u records" % len(res)1210 1300 self.assertEquals(len(res), 1, "Wrong number of hits for cn=ldaptestuser5") 1211 1301 res = ldb.search(expression="(&(cn=ldaptestuser5)(objectclass=user))") 1212 print "Found %u records" % len(res)1213 1302 self.assertEquals(len(res), 1, "Wrong number of hits for (&(cn=ldaptestuser5)(objectclass=user))") 1214 1303 delete_force(self.ldb, "cn=ldaptestuser5,cn=users," + self.base_dn) … … 1216 1305 def test_objectGUID(self): 1217 1306 """Test objectGUID behaviour""" 1218 print "Testing objectGUID behaviour\n"1219 1220 1307 # The objectGUID cannot directly be set 1221 1308 try: … … 1232 1319 "dn": "cn=ldaptestcontainer," + self.base_dn, 1233 1320 "objectClass": "container" }) 1234 1235 res = ldb.search("cn=ldaptestcontainer," + self.base_dn,1236 scope=SCOPE_BASE,1237 attrs=["objectGUID", "uSNCreated", "uSNChanged", "whenCreated", "whenChanged"])1238 self.assertTrue(len(res) == 1)1239 self.assertTrue("objectGUID" in res[0])1240 self.assertTrue("uSNCreated" in res[0])1241 self.assertTrue("uSNChanged" in res[0])1242 self.assertTrue("whenCreated" in res[0])1243 self.assertTrue("whenChanged" in res[0])1244 1245 delete_force(self.ldb, "cn=ldaptestcontainer," + self.base_dn)1246 1247 # All the following attributes are specificable on add operations1248 self.ldb.add({1249 "dn": "cn=ldaptestcontainer," + self.base_dn,1250 "objectClass": "container",1251 "uSNCreated" : "1",1252 "uSNChanged" : "1",1253 "whenCreated": timestring(long(time.time())),1254 "whenChanged": timestring(long(time.time())) })1255 1256 res = ldb.search("cn=ldaptestcontainer," + self.base_dn,1257 scope=SCOPE_BASE,1258 attrs=["objectGUID", "uSNCreated", "uSNChanged", "whenCreated", "whenChanged"])1259 self.assertTrue(len(res) == 1)1260 self.assertTrue("objectGUID" in res[0])1261 self.assertTrue("uSNCreated" in res[0])1262 self.assertFalse(res[0]["uSNCreated"][0] == "1") # these are corrected1263 self.assertTrue("uSNChanged" in res[0])1264 self.assertFalse(res[0]["uSNChanged"][0] == "1") # these are corrected1265 1266 delete_force(self.ldb, "cn=ldaptestcontainer," + self.base_dn)1267 1268 # All this attributes are specificable on add operations1269 self.ldb.add({1270 "dn": "cn=ldaptestcontainer," + self.base_dn,1271 "objectclass": "container",1272 "uSNCreated" : "1",1273 "uSNChanged" : "1",1274 "whenCreated": timestring(long(time.time())),1275 "whenChanged": timestring(long(time.time())) })1276 1277 res = ldb.search("cn=ldaptestcontainer," + self.base_dn,1278 scope=SCOPE_BASE,1279 attrs=["objectGUID", "uSNCreated", "uSNChanged", "whenCreated", "whenChanged"])1280 self.assertTrue(len(res) == 1)1281 self.assertTrue("objectGUID" in res[0])1282 self.assertTrue("uSNCreated" in res[0])1283 self.assertFalse(res[0]["uSNCreated"][0] == "1") # these are corrected1284 self.assertTrue("uSNChanged" in res[0])1285 self.assertFalse(res[0]["uSNChanged"][0] == "1") # these are corrected1286 self.assertTrue("whenCreated" in res[0])1287 self.assertTrue("whenChanged" in res[0])1288 1321 1289 1322 # The objectGUID cannot directly be changed … … 1303 1336 def test_parentGUID(self): 1304 1337 """Test parentGUID behaviour""" 1305 print "Testing parentGUID behaviour\n"1306 1307 1338 self.ldb.add({ 1308 1339 "dn": "cn=parentguidtest,cn=users," + self.base_dn, 1309 1340 "objectclass":"user", 1310 "samaccountname":"parentguidtest"}) ;1341 "samaccountname":"parentguidtest"}) 1311 1342 res1 = ldb.search(base="cn=parentguidtest,cn=users," + self.base_dn, scope=SCOPE_BASE, 1312 attrs=["parentGUID", "samaccountname"]) ;1343 attrs=["parentGUID", "samaccountname"]) 1313 1344 res2 = ldb.search(base="cn=users," + self.base_dn,scope=SCOPE_BASE, 1314 attrs=["objectGUID"]) ;1345 attrs=["objectGUID"]) 1315 1346 res3 = ldb.search(base=self.base_dn, scope=SCOPE_BASE, 1316 attrs=["parentGUID"]) ;1347 attrs=["parentGUID"]) 1317 1348 res4 = ldb.search(base=self.configuration_dn, scope=SCOPE_BASE, 1318 attrs=["parentGUID"]) ;1349 attrs=["parentGUID"]) 1319 1350 res5 = ldb.search(base=self.schema_dn, scope=SCOPE_BASE, 1320 attrs=["parentGUID"]) ;1351 attrs=["parentGUID"]) 1321 1352 1322 1353 """Check if the parentGUID is valid """ 1323 self.assertEquals(res1[0]["parentGUID"], res2[0]["objectGUID"]) ;1354 self.assertEquals(res1[0]["parentGUID"], res2[0]["objectGUID"]) 1324 1355 1325 1356 """Check if it returns nothing when there is no parent object - default NC""" … … 1329 1360 has_parentGUID = True 1330 1361 break 1331 self.assertFalse(has_parentGUID) ;1362 self.assertFalse(has_parentGUID) 1332 1363 1333 1364 """Check if it returns nothing when there is no parent object - configuration NC""" … … 1337 1368 has_parentGUID = True 1338 1369 break 1339 self.assertFalse(has_parentGUID) ;1370 self.assertFalse(has_parentGUID) 1340 1371 1341 1372 """Check if it returns nothing when there is no parent object - schema NC""" … … 1345 1376 has_parentGUID = True 1346 1377 break 1347 self.assertFalse(has_parentGUID) ;1378 self.assertFalse(has_parentGUID) 1348 1379 1349 1380 """Ensures that if you look for another object attribute after the constructed … … 1356 1387 self.assertTrue(has_another_attribute) 1357 1388 self.assertTrue(len(res1[0]["samaccountname"]) == 1) 1358 self.assertEquals(res1[0]["samaccountname"][0], "parentguidtest") ;1359 1360 print "Testing parentGUID behaviour on rename\n"1389 self.assertEquals(res1[0]["samaccountname"][0], "parentguidtest") 1390 1391 # Testing parentGUID behaviour on rename\ 1361 1392 1362 1393 self.ldb.add({ 1363 1394 "dn": "cn=testotherusers," + self.base_dn, 1364 "objectclass":"container"}) ;1395 "objectclass":"container"}) 1365 1396 res1 = ldb.search(base="cn=testotherusers," + self.base_dn,scope=SCOPE_BASE, 1366 attrs=["objectGUID"]) ;1397 attrs=["objectGUID"]) 1367 1398 ldb.rename("cn=parentguidtest,cn=users," + self.base_dn, 1368 "cn=parentguidtest,cn=testotherusers," + self.base_dn) ;1399 "cn=parentguidtest,cn=testotherusers," + self.base_dn) 1369 1400 res2 = ldb.search(base="cn=parentguidtest,cn=testotherusers," + self.base_dn, 1370 1401 scope=SCOPE_BASE, 1371 attrs=["parentGUID"]) ;1372 self.assertEquals(res1[0]["objectGUID"], res2[0]["parentGUID"]) ;1402 attrs=["parentGUID"]) 1403 self.assertEquals(res1[0]["objectGUID"], res2[0]["parentGUID"]) 1373 1404 1374 1405 delete_force(self.ldb, "cn=parentguidtest,cn=testotherusers," + self.base_dn) 1375 1406 delete_force(self.ldb, "cn=testotherusers," + self.base_dn) 1376 1407 1408 def test_usnChanged(self): 1409 """Test usnChanged behaviour""" 1410 1411 self.ldb.add({ 1412 "dn": "cn=ldaptestcontainer," + self.base_dn, 1413 "objectClass": "container" }) 1414 1415 res = ldb.search("cn=ldaptestcontainer," + self.base_dn, 1416 scope=SCOPE_BASE, 1417 attrs=["objectGUID", "uSNCreated", "uSNChanged", "whenCreated", "whenChanged", "description"]) 1418 self.assertTrue(len(res) == 1) 1419 self.assertFalse("description" in res[0]) 1420 self.assertTrue("objectGUID" in res[0]) 1421 self.assertTrue("uSNCreated" in res[0]) 1422 self.assertTrue("uSNChanged" in res[0]) 1423 self.assertTrue("whenCreated" in res[0]) 1424 self.assertTrue("whenChanged" in res[0]) 1425 1426 delete_force(self.ldb, "cn=ldaptestcontainer," + self.base_dn) 1427 1428 # All this attributes are specificable on add operations 1429 self.ldb.add({ 1430 "dn": "cn=ldaptestcontainer," + self.base_dn, 1431 "objectclass": "container", 1432 "uSNCreated" : "1", 1433 "uSNChanged" : "1", 1434 "whenCreated": timestring(long(time.time())), 1435 "whenChanged": timestring(long(time.time())) }) 1436 1437 res = ldb.search("cn=ldaptestcontainer," + self.base_dn, 1438 scope=SCOPE_BASE, 1439 attrs=["objectGUID", "uSNCreated", "uSNChanged", "whenCreated", "whenChanged", "description"]) 1440 self.assertTrue(len(res) == 1) 1441 self.assertFalse("description" in res[0]) 1442 self.assertTrue("objectGUID" in res[0]) 1443 self.assertTrue("uSNCreated" in res[0]) 1444 self.assertFalse(res[0]["uSNCreated"][0] == "1") # these are corrected 1445 self.assertTrue("uSNChanged" in res[0]) 1446 self.assertFalse(res[0]["uSNChanged"][0] == "1") # these are corrected 1447 self.assertTrue("whenCreated" in res[0]) 1448 self.assertTrue("whenChanged" in res[0]) 1449 1450 ldb.modify_ldif(""" 1451 dn: cn=ldaptestcontainer,""" + self.base_dn + """ 1452 changetype: modify 1453 replace: description 1454 """) 1455 1456 res2 = ldb.search("cn=ldaptestcontainer," + self.base_dn, 1457 scope=SCOPE_BASE, 1458 attrs=["uSNCreated", "uSNChanged", "description"]) 1459 self.assertTrue(len(res) == 1) 1460 self.assertFalse("description" in res2[0]) 1461 self.assertEqual(res[0]["usnCreated"], res2[0]["usnCreated"]) 1462 self.assertEqual(res[0]["usnCreated"], res2[0]["usnChanged"]) 1463 self.assertEqual(res[0]["usnChanged"], res2[0]["usnChanged"]) 1464 1465 ldb.modify_ldif(""" 1466 dn: cn=ldaptestcontainer,""" + self.base_dn + """ 1467 changetype: modify 1468 replace: description 1469 description: test 1470 """) 1471 1472 res3 = ldb.search("cn=ldaptestcontainer," + self.base_dn, 1473 scope=SCOPE_BASE, 1474 attrs=["uSNCreated", "uSNChanged", "description"]) 1475 self.assertTrue(len(res) == 1) 1476 self.assertTrue("description" in res3[0]) 1477 self.assertEqual("test", str(res3[0]["description"][0])) 1478 self.assertEqual(res[0]["usnCreated"], res3[0]["usnCreated"]) 1479 self.assertNotEqual(res[0]["usnCreated"], res3[0]["usnChanged"]) 1480 self.assertNotEqual(res[0]["usnChanged"], res3[0]["usnChanged"]) 1481 1482 ldb.modify_ldif(""" 1483 dn: cn=ldaptestcontainer,""" + self.base_dn + """ 1484 changetype: modify 1485 replace: description 1486 description: test 1487 """) 1488 1489 res4 = ldb.search("cn=ldaptestcontainer," + self.base_dn, 1490 scope=SCOPE_BASE, 1491 attrs=["uSNCreated", "uSNChanged", "description"]) 1492 self.assertTrue(len(res) == 1) 1493 self.assertTrue("description" in res4[0]) 1494 self.assertEqual("test", str(res4[0]["description"][0])) 1495 self.assertEqual(res[0]["usnCreated"], res4[0]["usnCreated"]) 1496 self.assertNotEqual(res3[0]["usnCreated"], res4[0]["usnChanged"]) 1497 self.assertEqual(res3[0]["usnChanged"], res4[0]["usnChanged"]) 1498 1499 ldb.modify_ldif(""" 1500 dn: cn=ldaptestcontainer,""" + self.base_dn + """ 1501 changetype: modify 1502 replace: description 1503 description: test2 1504 """) 1505 1506 res5 = ldb.search("cn=ldaptestcontainer," + self.base_dn, 1507 scope=SCOPE_BASE, 1508 attrs=["uSNCreated", "uSNChanged", "description"]) 1509 self.assertTrue(len(res) == 1) 1510 self.assertTrue("description" in res5[0]) 1511 self.assertEqual("test2", str(res5[0]["description"][0])) 1512 self.assertEqual(res[0]["usnCreated"], res5[0]["usnCreated"]) 1513 self.assertNotEqual(res3[0]["usnChanged"], res5[0]["usnChanged"]) 1514 1515 ldb.modify_ldif(""" 1516 dn: cn=ldaptestcontainer,""" + self.base_dn + """ 1517 changetype: modify 1518 delete: description 1519 description: test2 1520 """) 1521 1522 res6 = ldb.search("cn=ldaptestcontainer," + self.base_dn, 1523 scope=SCOPE_BASE, 1524 attrs=["uSNCreated", "uSNChanged", "description"]) 1525 self.assertTrue(len(res) == 1) 1526 self.assertFalse("description" in res6[0]) 1527 self.assertEqual(res[0]["usnCreated"], res6[0]["usnCreated"]) 1528 self.assertNotEqual(res5[0]["usnChanged"], res6[0]["usnChanged"]) 1529 1530 ldb.modify_ldif(""" 1531 dn: cn=ldaptestcontainer,""" + self.base_dn + """ 1532 changetype: modify 1533 add: description 1534 description: test3 1535 """) 1536 1537 res7 = ldb.search("cn=ldaptestcontainer," + self.base_dn, 1538 scope=SCOPE_BASE, 1539 attrs=["uSNCreated", "uSNChanged", "description"]) 1540 self.assertTrue(len(res) == 1) 1541 self.assertTrue("description" in res7[0]) 1542 self.assertEqual("test3", str(res7[0]["description"][0])) 1543 self.assertEqual(res[0]["usnCreated"], res7[0]["usnCreated"]) 1544 self.assertNotEqual(res6[0]["usnChanged"], res7[0]["usnChanged"]) 1545 1546 ldb.modify_ldif(""" 1547 dn: cn=ldaptestcontainer,""" + self.base_dn + """ 1548 changetype: modify 1549 delete: description 1550 """) 1551 1552 res8 = ldb.search("cn=ldaptestcontainer," + self.base_dn, 1553 scope=SCOPE_BASE, 1554 attrs=["uSNCreated", "uSNChanged", "description"]) 1555 self.assertTrue(len(res) == 1) 1556 self.assertFalse("description" in res8[0]) 1557 self.assertEqual(res[0]["usnCreated"], res8[0]["usnCreated"]) 1558 self.assertNotEqual(res7[0]["usnChanged"], res8[0]["usnChanged"]) 1559 1560 delete_force(self.ldb, "cn=ldaptestcontainer," + self.base_dn) 1561 1377 1562 def test_groupType_int32(self): 1378 1563 """Test groupType (int32) behaviour (should appear to be casted to a 32 bit signed integer before comparsion)""" 1379 print "Testing groupType (int32) behaviour\n"1380 1564 1381 1565 res1 = ldb.search(base=self.base_dn, scope=SCOPE_SUBTREE, 1382 attrs=["groupType"], expression="groupType=2147483653") ;1566 attrs=["groupType"], expression="groupType=2147483653") 1383 1567 1384 1568 res2 = ldb.search(base=self.base_dn, scope=SCOPE_SUBTREE, 1385 attrs=["groupType"], expression="groupType=-2147483643") ;1569 attrs=["groupType"], expression="groupType=-2147483643") 1386 1570 1387 1571 self.assertEquals(len(res1), len(res2)) … … 1393 1577 def test_linked_attributes(self): 1394 1578 """This tests the linked attribute behaviour""" 1395 print "Testing linked attribute behaviour\n"1396 1579 1397 1580 ldb.add({ … … 1482 1665 def test_wkguid(self): 1483 1666 """Test Well known GUID behaviours (including DN+Binary)""" 1484 print "Test Well known GUID behaviours (including DN+Binary)"""1485 1667 1486 1668 res = self.ldb.search(base=("<WKGUID=ab1d30f3768811d1aded00c04fd8d5cd,%s>" % self.base_dn), scope=SCOPE_BASE, attrs=[]) … … 1499 1681 def test_subschemasubentry(self): 1500 1682 """Test subSchemaSubEntry appears when requested, but not when not requested""" 1501 print "Test subSchemaSubEntry"""1502 1683 1503 1684 res = self.ldb.search(base=self.base_dn, scope=SCOPE_BASE, attrs=["subSchemaSubEntry"]) … … 1512 1693 """Basic tests""" 1513 1694 1514 print "Testing user add"1695 # Testing user add 1515 1696 1516 1697 ldb.add({ … … 1561 1742 }) 1562 1743 1563 print "Testing ldb.search for (&(cn=ldaptestcomputer3)(objectClass=user))";1564 res = ldb.search(self.base_dn, expression="(&(cn=ldaptestcomputer3)(objectClass=user))") ;1744 # Testing ldb.search for (&(cn=ldaptestcomputer3)(objectClass=user)) 1745 res = ldb.search(self.base_dn, expression="(&(cn=ldaptestcomputer3)(objectClass=user))") 1565 1746 self.assertEquals(len(res), 1, "Found only %d for (&(cn=ldaptestcomputer3)(objectClass=user))" % len(res)) 1566 1747 1567 self.assertEquals(str(res[0].dn), ("CN=ldaptestcomputer3,CN=Computers," + self.base_dn)) ;1568 self.assertEquals(res[0]["cn"][0], "ldaptestcomputer3") ;1569 self.assertEquals(res[0]["name"][0], "ldaptestcomputer3") ;1570 self.assertEquals(res[0]["objectClass"][0], "top") ;1571 self.assertEquals(res[0]["objectClass"][1], "person") ;1572 self.assertEquals(res[0]["objectClass"][2], "organizationalPerson") ;1573 self.assertEquals(res[0]["objectClass"][3], "user") ;1574 self.assertEquals(res[0]["objectClass"][4], "computer") ;1748 self.assertEquals(str(res[0].dn), ("CN=ldaptestcomputer3,CN=Computers," + self.base_dn)) 1749 self.assertEquals(res[0]["cn"][0], "ldaptestcomputer3") 1750 self.assertEquals(res[0]["name"][0], "ldaptestcomputer3") 1751 self.assertEquals(res[0]["objectClass"][0], "top") 1752 self.assertEquals(res[0]["objectClass"][1], "person") 1753 self.assertEquals(res[0]["objectClass"][2], "organizationalPerson") 1754 self.assertEquals(res[0]["objectClass"][3], "user") 1755 self.assertEquals(res[0]["objectClass"][4], "computer") 1575 1756 self.assertTrue("objectGUID" in res[0]) 1576 1757 self.assertTrue("whenCreated" in res[0]) 1577 self.assertEquals(res[0]["objectCategory"][0], ("CN=Computer, CN=Schema,CN=Configuration," + self.base_dn));1578 self.assertEquals(int(res[0]["primaryGroupID"][0]), 513) ;1579 self.assertEquals(int(res[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) ;1580 self.assertEquals(int(res[0]["userAccountControl"][0]), UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD | UF_ACCOUNTDISABLE) ;1758 self.assertEquals(res[0]["objectCategory"][0], ("CN=Computer,%s" % ldb.get_schema_basedn())) 1759 self.assertEquals(int(res[0]["primaryGroupID"][0]), 513) 1760 self.assertEquals(int(res[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) 1761 self.assertEquals(int(res[0]["userAccountControl"][0]), UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD | UF_ACCOUNTDISABLE) 1581 1762 1582 1763 delete_force(self.ldb, "cn=ldaptestcomputer3,cn=computers," + self.base_dn) 1583 1764 1584 print "Testing attribute or value exists behaviour"1765 # Testing attribute or value exists behaviour 1585 1766 try: 1586 1767 ldb.modify_ldif(""" … … 1614 1795 self.assertEquals(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS) 1615 1796 1616 print "Testing ranged results"1797 # Testing ranged results 1617 1798 ldb.modify_ldif(""" 1618 1799 dn: cn=ldaptest2computer,cn=computers,""" + self.base_dn + """ … … 1660 1841 attrs=["servicePrincipalName;range=0-*"]) 1661 1842 self.assertEquals(len(res), 1, "Could not find (cn=ldaptest2computer)") 1662 #print len(res[0]["servicePrincipalName;range=0-*"])1663 1843 self.assertEquals(len(res[0]["servicePrincipalName;range=0-*"]), 30) 1664 1844 1665 1845 res = ldb.search(self.base_dn, expression="(cn=ldaptest2computer))", scope=SCOPE_SUBTREE, attrs=["servicePrincipalName;range=0-19"]) 1666 1846 self.assertEquals(len(res), 1, "Could not find (cn=ldaptest2computer)") 1667 # print res[0]["servicePrincipalName;range=0-19"].length1668 1847 self.assertEquals(len(res[0]["servicePrincipalName;range=0-19"]), 20) 1669 1848 … … 1690 1869 self.assertEquals(len(res), 1, "Could not find (cn=ldaptest2computer)") 1691 1870 self.assertEquals(len(res[0]["servicePrincipalName;range=11-*"]), 19) 1692 # print res[0]["servicePrincipalName;range=11-*"][18]1693 # print pos_111694 1871 # self.assertEquals((res[0]["servicePrincipalName;range=11-*"][18]), pos_11) 1695 1872 … … 1701 1878 res = ldb.search(self.base_dn, expression="(cn=ldaptest2computer))", scope=SCOPE_SUBTREE, attrs=["servicePrincipalName"]) 1702 1879 self.assertEquals(len(res), 1, "Could not find (cn=ldaptest2computer)") 1703 # print res[0]["servicePrincipalName"][18]1704 # print pos_111705 1880 self.assertEquals(len(res[0]["servicePrincipalName"]), 30) 1706 1881 # self.assertEquals(res[0]["servicePrincipalName"][18], pos_11) … … 1714 1889 "sn": "ldap user2"}) 1715 1890 1716 print "Testing Ambigious Name Resolution"1891 # Testing Ambigious Name Resolution 1717 1892 # Testing ldb.search for (&(anr=ldap testy)(objectClass=user)) 1718 1893 res = ldb.search(expression="(&(anr=ldap testy)(objectClass=user))") … … 1796 1971 # self.assertEquals(len(res), 0, "Found (&(anr==\"testy ldap\")(objectClass=user))") 1797 1972 1798 print "Testing Renames"1973 # Testing Renames 1799 1974 1800 1975 attrs = ["objectGUID", "objectSid"] 1801 print "Testing ldb.search for (&(cn=ldaptestUSer2)(objectClass=user))"1976 # Testing ldb.search for (&(cn=ldaptestUSer2)(objectClass=user)) 1802 1977 res_user = ldb.search(self.base_dn, expression="(&(cn=ldaptestUSer2)(objectClass=user))", scope=SCOPE_SUBTREE, attrs=attrs) 1803 1978 self.assertEquals(len(res_user), 1, "Could not find (&(cn=ldaptestUSer2)(objectClass=user))") … … 1806 1981 ldb.rename("<SID=" + ldb.schema_format_value("objectSID", res_user[0]["objectSID"][0]) + ">" , "cn=ldaptestUSER3,cn=users," + self.base_dn) 1807 1982 1808 print "Testing ldb.search for (&(cn=ldaptestuser3)(objectClass=user))"1983 # Testing ldb.search for (&(cn=ldaptestuser3)(objectClass=user)) 1809 1984 res = ldb.search(expression="(&(cn=ldaptestuser3)(objectClass=user))") 1810 1985 self.assertEquals(len(res), 1, "Could not find (&(cn=ldaptestuser3)(objectClass=user))") … … 1834 2009 self.assertEquals(len(res), 0, "(&(&(cn=ldaptestuser3)(userAccountControl=547))(objectClass=user))") 1835 2010 1836 # This is a Samba special, and does not exist in real AD 1837 # print "Testing ldb.search for (dn=CN=ldaptestUSER3,CN=Users," + self.base_dn + ")" 1838 # res = ldb.search("(dn=CN=ldaptestUSER3,CN=Users," + self.base_dn + ")") 1839 # if (res.error != 0 || len(res) != 1) { 1840 # print "Could not find (dn=CN=ldaptestUSER3,CN=Users," + self.base_dn + ")" 1841 # self.assertEquals(len(res), 1) 1842 # } 1843 # self.assertEquals(res[0].dn, ("CN=ldaptestUSER3,CN=Users," + self.base_dn)) 1844 # self.assertEquals(res[0].cn, "ldaptestUSER3") 1845 # self.assertEquals(res[0].name, "ldaptestUSER3") 1846 1847 print "Testing ldb.search for (distinguishedName=CN=ldaptestUSER3,CN=Users," + self.base_dn + ")" 2011 # Testing ldb.search for (dn=CN=ldaptestUSER3,CN=Users," + self.base_dn + ") - should not work 2012 res = ldb.search(expression="(dn=CN=ldaptestUSER3,CN=Users," + self.base_dn + ")") 2013 self.assertEquals(len(res), 0, "Could find (dn=CN=ldaptestUSER3,CN=Users," + self.base_dn + ")") 2014 2015 # Testing ldb.search for (distinguishedName=CN=ldaptestUSER3,CN=Users," + self.base_dn + ") 1848 2016 res = ldb.search(expression="(distinguishedName=CN=ldaptestUSER3,CN=Users," + self.base_dn + ")") 1849 self.assertEquals(len(res), 1, "Could not find (d n=CN=ldaptestUSER3,CN=Users," + self.base_dn + ")")2017 self.assertEquals(len(res), 1, "Could not find (distinguishedName=CN=ldaptestUSER3,CN=Users," + self.base_dn + ")") 1850 2018 self.assertEquals(str(res[0].dn), ("CN=ldaptestUSER3,CN=Users," + self.base_dn)) 1851 2019 self.assertEquals(str(res[0]["cn"]), "ldaptestUSER3") … … 1884 2052 self.assertEquals(num, ERR_ENTRY_ALREADY_EXISTS) 1885 2053 try: 1886 ldb.rename("cn=ldaptestuser3,cn=users, " + self.base_dn, "cn=ldaptestuser3,cn=configuration," + self.base_dn)2054 ldb.rename("cn=ldaptestuser3,cn=users,%s" % self.base_dn, "cn=ldaptestuser3,%s" % ldb.get_config_basedn()) 1887 2055 self.fail() 1888 2056 except LdbError, (num, _): … … 1897 2065 ldb.rename("cn=ldaptestgroup,cn=users," + self.base_dn, "cn=ldaptestgroup2,cn=users," + self.base_dn) 1898 2066 1899 print "Testing subtree renames"2067 # Testing subtree renames 1900 2068 1901 2069 ldb.add({"dn": "cn=ldaptestcontainer," + self.base_dn, … … 1924 2092 """) 1925 2093 1926 print "Testing ldb.rename of cn=ldaptestcontainer," + self.base_dn + " to cn=ldaptestcontainer2," + self.base_dn2094 # Testing ldb.rename of cn=ldaptestcontainer," + self.base_dn + " to cn=ldaptestcontainer2," + self.base_dn 1927 2095 ldb.rename("CN=ldaptestcontainer," + self.base_dn, "CN=ldaptestcontainer2," + self.base_dn) 1928 2096 1929 print "Testing ldb.search for (&(cn=ldaptestuser4)(objectClass=user))"2097 # Testing ldb.search for (&(cn=ldaptestuser4)(objectClass=user)) 1930 2098 res = ldb.search(expression="(&(cn=ldaptestuser4)(objectClass=user))") 1931 2099 self.assertEquals(len(res), 1, "Could not find (&(cn=ldaptestuser4)(objectClass=user))") 1932 2100 1933 print "Testing subtree ldb.search for (&(cn=ldaptestuser4)(objectClass=user)) in (just renamed from) cn=ldaptestcontainer," + self.base_dn2101 # Testing subtree ldb.search for (&(cn=ldaptestuser4)(objectClass=user)) in (just renamed from) cn=ldaptestcontainer," + self.base_dn 1934 2102 try: 1935 2103 res = ldb.search("cn=ldaptestcontainer," + self.base_dn, … … 1940 2108 self.assertEquals(num, ERR_NO_SUCH_OBJECT) 1941 2109 1942 print "Testing one-level ldb.search for (&(cn=ldaptestuser4)(objectClass=user)) in (just renamed from) cn=ldaptestcontainer," + self.base_dn2110 # Testing one-level ldb.search for (&(cn=ldaptestuser4)(objectClass=user)) in (just renamed from) cn=ldaptestcontainer," + self.base_dn 1943 2111 try: 1944 2112 res = ldb.search("cn=ldaptestcontainer," + self.base_dn, … … 1948 2116 self.assertEquals(num, ERR_NO_SUCH_OBJECT) 1949 2117 1950 print "Testing ldb.search for (&(cn=ldaptestuser4)(objectClass=user)) in renamed container"2118 # Testing ldb.search for (&(cn=ldaptestuser4)(objectClass=user)) in renamed container" 1951 2119 res = ldb.search("cn=ldaptestcontainer2," + self.base_dn, expression="(&(cn=ldaptestuser4)(objectClass=user))", scope=SCOPE_SUBTREE) 1952 2120 self.assertEquals(len(res), 1, "Could not find (&(cn=ldaptestuser4)(objectClass=user)) under cn=ldaptestcontainer2," + self.base_dn) … … 1957 2125 time.sleep(4) 1958 2126 1959 print "Testing ldb.search for (&(member=CN=ldaptestuser4,CN=ldaptestcontainer2," + self.base_dn + ")(objectclass=group)) to check subtree renames and linked attributes"2127 # Testing ldb.search for (&(member=CN=ldaptestuser4,CN=ldaptestcontainer2," + self.base_dn + ")(objectclass=group)) to check subtree renames and linked attributes" 1960 2128 res = ldb.search(self.base_dn, expression="(&(member=CN=ldaptestuser4,CN=ldaptestcontainer2," + self.base_dn + ")(objectclass=group))", scope=SCOPE_SUBTREE) 1961 2129 self.assertEquals(len(res), 1, "Could not find (&(member=CN=ldaptestuser4,CN=ldaptestcontainer2," + self.base_dn + ")(objectclass=group)), perhaps linked attributes are not consistant with subtree renames?") 1962 2130 1963 print "Testing ldb.rename (into itself) of cn=ldaptestcontainer2," + self.base_dn + " to cn=ldaptestcontainer,cn=ldaptestcontainer2," + self.base_dn2131 # Testing ldb.rename (into itself) of cn=ldaptestcontainer2," + self.base_dn + " to cn=ldaptestcontainer,cn=ldaptestcontainer2," + self.base_dn 1964 2132 try: 1965 2133 ldb.rename("cn=ldaptestcontainer2," + self.base_dn, "cn=ldaptestcontainer,cn=ldaptestcontainer2," + self.base_dn) … … 1968 2136 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 1969 2137 1970 print "Testing ldb.rename (into non-existent container) of cn=ldaptestcontainer2," + self.base_dn + " to cn=ldaptestcontainer,cn=ldaptestcontainer3," + self.base_dn2138 # Testing ldb.rename (into non-existent container) of cn=ldaptestcontainer2," + self.base_dn + " to cn=ldaptestcontainer,cn=ldaptestcontainer3," + self.base_dn 1971 2139 try: 1972 2140 ldb.rename("cn=ldaptestcontainer2," + self.base_dn, "cn=ldaptestcontainer,cn=ldaptestcontainer3," + self.base_dn) … … 1975 2143 self.assertTrue(num in (ERR_UNWILLING_TO_PERFORM, ERR_OTHER)) 1976 2144 1977 print "Testing delete (should fail, not a leaf node) of renamed cn=ldaptestcontainer2," + self.base_dn2145 # Testing delete (should fail, not a leaf node) of renamed cn=ldaptestcontainer2," + self.base_dn 1978 2146 try: 1979 2147 ldb.delete("cn=ldaptestcontainer2," + self.base_dn) … … 1982 2150 self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF) 1983 2151 1984 print "Testing base ldb.search for CN=ldaptestuser4,CN=ldaptestcontainer2," + self.base_dn2152 # Testing base ldb.search for CN=ldaptestuser4,CN=ldaptestcontainer2," + self.base_dn 1985 2153 res = ldb.search(expression="(objectclass=*)", base=("CN=ldaptestuser4,CN=ldaptestcontainer2," + self.base_dn), scope=SCOPE_BASE) 1986 2154 self.assertEquals(len(res), 1) … … 1988 2156 self.assertEquals(len(res), 0) 1989 2157 1990 print "Testing one-level ldb.search for (&(cn=ldaptestuser4)(objectClass=user)) in cn=ldaptestcontainer2," + self.base_dn2158 # Testing one-level ldb.search for (&(cn=ldaptestuser4)(objectClass=user)) in cn=ldaptestcontainer2," + self.base_dn 1991 2159 res = ldb.search(expression="(&(cn=ldaptestuser4)(objectClass=user))", base=("cn=ldaptestcontainer2," + self.base_dn), scope=SCOPE_ONELEVEL) 1992 # FIXME: self.assertEquals(len(res), 0)1993 1994 print "Testing one-level ldb.search for (&(cn=ldaptestuser4)(objectClass=user)) in cn=ldaptestcontainer2," + self.base_dn2160 self.assertEquals(len(res), 1) 2161 2162 # Testing one-level ldb.search for (&(cn=ldaptestuser4)(objectClass=user)) in cn=ldaptestcontainer2," + self.base_dn 1995 2163 res = ldb.search(expression="(&(cn=ldaptestuser4)(objectClass=user))", base=("cn=ldaptestcontainer2," + self.base_dn), scope=SCOPE_SUBTREE) 1996 # FIXME: self.assertEquals(len(res), 0)1997 1998 print "Testing delete of subtree renamed "+("CN=ldaptestuser4,CN=ldaptestcontainer2," + self.base_dn)2164 self.assertEquals(len(res), 1) 2165 2166 # Testing delete of subtree renamed "+("CN=ldaptestuser4,CN=ldaptestcontainer2," + self.base_dn) 1999 2167 ldb.delete(("CN=ldaptestuser4,CN=ldaptestcontainer2," + self.base_dn)) 2000 print "Testing delete of renamed cn=ldaptestcontainer2," + self.base_dn2168 # Testing delete of renamed cn=ldaptestcontainer2," + self.base_dn 2001 2169 ldb.delete("cn=ldaptestcontainer2," + self.base_dn) 2002 2170 … … 2005 2173 ldb.add({"dn": "cn=ldaptestutf8user2 Úùéìòà ,cn=users," + self.base_dn, "objectClass": "user"}) 2006 2174 2007 print "Testing ldb.search for (&(cn=ldaptestuser)(objectClass=user))"2175 # Testing ldb.search for (&(cn=ldaptestuser)(objectClass=user))" 2008 2176 res = ldb.search(expression="(&(cn=ldaptestuser)(objectClass=user))") 2009 2177 self.assertEquals(len(res), 1, "Could not find (&(cn=ldaptestuser)(objectClass=user))") … … 2015 2183 self.assertTrue("objectGUID" in res[0]) 2016 2184 self.assertTrue("whenCreated" in res[0]) 2017 self.assertEquals(str(res[0]["objectCategory"]), ("CN=Person, CN=Schema,CN=Configuration," + self.base_dn))2185 self.assertEquals(str(res[0]["objectCategory"]), ("CN=Person,%s" % ldb.get_schema_basedn())) 2018 2186 self.assertEquals(int(res[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) 2019 2187 self.assertEquals(int(res[0]["userAccountControl"][0]), UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD | UF_ACCOUNTDISABLE) … … 2021 2189 self.assertEquals(len(res[0]["memberOf"]), 1) 2022 2190 2023 print "Testing ldb.search for (&(cn=ldaptestuser)(objectCategory=cn=person,cn=schema,cn=configuration," + self.base_dn + "))"2024 res2 = ldb.search(expression="(&(cn=ldaptestuser)(objectCategory=cn=person, cn=schema,cn=configuration," + self.base_dn + "))")2025 self.assertEquals(len(res2), 1, "Could not find (&(cn=ldaptestuser)(objectCategory=cn=person, cn=schema,cn=configuration," + self.base_dn + "))")2191 # Testing ldb.search for (&(cn=ldaptestuser)(objectCategory=cn=person,%s))" % ldb.get_schema_basedn() 2192 res2 = ldb.search(expression="(&(cn=ldaptestuser)(objectCategory=cn=person,%s))" % ldb.get_schema_basedn()) 2193 self.assertEquals(len(res2), 1, "Could not find (&(cn=ldaptestuser)(objectCategory=cn=person,%s))" % ldb.get_schema_basedn()) 2026 2194 2027 2195 self.assertEquals(res[0].dn, res2[0].dn) 2028 2196 2029 print "Testing ldb.search for (&(cn=ldaptestuser)(objectCategory=PerSon))"2197 # Testing ldb.search for (&(cn=ldaptestuser)(objectCategory=PerSon))" 2030 2198 res3 = ldb.search(expression="(&(cn=ldaptestuser)(objectCategory=PerSon))") 2031 2199 self.assertEquals(len(res3), 1, "Could not find (&(cn=ldaptestuser)(objectCategory=PerSon)): matched %d" % len(res3)) … … 2034 2202 2035 2203 if gc_ldb is not None: 2036 print "Testing ldb.search for (&(cn=ldaptestuser)(objectCategory=PerSon)) in Global Catalog"2204 # Testing ldb.search for (&(cn=ldaptestuser)(objectCategory=PerSon)) in Global Catalog" 2037 2205 res3gc = gc_ldb.search(expression="(&(cn=ldaptestuser)(objectCategory=PerSon))") 2038 2206 self.assertEquals(len(res3gc), 1) … … 2040 2208 self.assertEquals(res[0].dn, res3gc[0].dn) 2041 2209 2042 print "Testing ldb.search for (&(cn=ldaptestuser)(objectCategory=PerSon)) in with 'phantom root' control"2210 # Testing ldb.search for (&(cn=ldaptestuser)(objectCategory=PerSon)) in with 'phantom root' control" 2043 2211 2044 2212 if gc_ldb is not None: … … 2050 2218 ldb.delete(res[0].dn) 2051 2219 2052 print "Testing ldb.search for (&(cn=ldaptestcomputer)(objectClass=user))"2220 # Testing ldb.search for (&(cn=ldaptestcomputer)(objectClass=user))" 2053 2221 res = ldb.search(expression="(&(cn=ldaptestcomputer)(objectClass=user))") 2054 2222 self.assertEquals(len(res), 1, "Could not find (&(cn=ldaptestuser)(objectClass=user))") … … 2060 2228 self.assertTrue("objectGUID" in res[0]) 2061 2229 self.assertTrue("whenCreated" in res[0]) 2062 self.assertEquals(str(res[0]["objectCategory"]), ("CN=Computer, CN=Schema,CN=Configuration," + self.base_dn))2230 self.assertEquals(str(res[0]["objectCategory"]), ("CN=Computer,%s" % ldb.get_schema_basedn())) 2063 2231 self.assertEquals(int(res[0]["primaryGroupID"][0]), 513) 2064 2232 self.assertEquals(int(res[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) … … 2067 2235 self.assertEquals(len(res[0]["memberOf"]), 1) 2068 2236 2069 print "Testing ldb.search for (&(cn=ldaptestcomputer)(objectCategory=cn=computer,cn=schema,cn=configuration," + self.base_dn + "))"2070 res2 = ldb.search(expression="(&(cn=ldaptestcomputer)(objectCategory=cn=computer, cn=schema,cn=configuration," + self.base_dn + "))")2071 self.assertEquals(len(res2), 1, "Could not find (&(cn=ldaptestcomputer)(objectCategory=cn=computer, cn=schema,cn=configuration," + self.base_dn + "))")2237 # Testing ldb.search for (&(cn=ldaptestcomputer)(objectCategory=cn=computer,%s))" % ldb.get_schema_basedn() 2238 res2 = ldb.search(expression="(&(cn=ldaptestcomputer)(objectCategory=cn=computer,%s))" % ldb.get_schema_basedn()) 2239 self.assertEquals(len(res2), 1, "Could not find (&(cn=ldaptestcomputer)(objectCategory=cn=computer,%s))" % ldb.get_schema_basedn()) 2072 2240 2073 2241 self.assertEquals(res[0].dn, res2[0].dn) 2074 2242 2075 2243 if gc_ldb is not None: 2076 print "Testing ldb.search for (&(cn=ldaptestcomputer)(objectCategory=cn=computer,cn=schema,cn=configuration," + self.base_dn + ")) in Global Catlog"2077 res2gc = gc_ldb.search(expression="(&(cn=ldaptestcomputer)(objectCategory=cn=computer, cn=schema,cn=configuration," + self.base_dn + "))")2078 self.assertEquals(len(res2gc), 1, "Could not find (&(cn=ldaptestcomputer)(objectCategory=cn=computer, cn=schema,cn=configuration," + self.base_dn + ")) in Global Catlog")2244 # Testing ldb.search for (&(cn=ldaptestcomputer)(objectCategory=cn=computer,%s)) in Global Catalog" % gc_ldb.get_schema_basedn() 2245 res2gc = gc_ldb.search(expression="(&(cn=ldaptestcomputer)(objectCategory=cn=computer,%s))" % gc_ldb.get_schema_basedn()) 2246 self.assertEquals(len(res2gc), 1, "Could not find (&(cn=ldaptestcomputer)(objectCategory=cn=computer,%s)) In Global Catalog" % gc_ldb.get_schema_basedn()) 2079 2247 2080 2248 self.assertEquals(res[0].dn, res2gc[0].dn) 2081 2249 2082 print "Testing ldb.search for (&(cn=ldaptestcomputer)(objectCategory=compuTER))"2250 # Testing ldb.search for (&(cn=ldaptestcomputer)(objectCategory=compuTER))" 2083 2251 res3 = ldb.search(expression="(&(cn=ldaptestcomputer)(objectCategory=compuTER))") 2084 2252 self.assertEquals(len(res3), 1, "Could not find (&(cn=ldaptestcomputer)(objectCategory=compuTER))") … … 2087 2255 2088 2256 if gc_ldb is not None: 2089 print "Testing ldb.search for (&(cn=ldaptestcomputer)(objectCategory=compuTER)) in Global Catalog"2257 # Testing ldb.search for (&(cn=ldaptestcomputer)(objectCategory=compuTER)) in Global Catalog" 2090 2258 res3gc = gc_ldb.search(expression="(&(cn=ldaptestcomputer)(objectCategory=compuTER))") 2091 2259 self.assertEquals(len(res3gc), 1, "Could not find (&(cn=ldaptestcomputer)(objectCategory=compuTER)) in Global Catalog") … … 2093 2261 self.assertEquals(res[0].dn, res3gc[0].dn) 2094 2262 2095 print "Testing ldb.search for (&(cn=ldaptestcomp*r)(objectCategory=compuTER))"2263 # Testing ldb.search for (&(cn=ldaptestcomp*r)(objectCategory=compuTER))" 2096 2264 res4 = ldb.search(expression="(&(cn=ldaptestcomp*r)(objectCategory=compuTER))") 2097 2265 self.assertEquals(len(res4), 1, "Could not find (&(cn=ldaptestcomp*r)(objectCategory=compuTER))") … … 2099 2267 self.assertEquals(res[0].dn, res4[0].dn) 2100 2268 2101 print "Testing ldb.search for (&(cn=ldaptestcomput*)(objectCategory=compuTER))"2269 # Testing ldb.search for (&(cn=ldaptestcomput*)(objectCategory=compuTER))" 2102 2270 res5 = ldb.search(expression="(&(cn=ldaptestcomput*)(objectCategory=compuTER))") 2103 2271 self.assertEquals(len(res5), 1, "Could not find (&(cn=ldaptestcomput*)(objectCategory=compuTER))") … … 2105 2273 self.assertEquals(res[0].dn, res5[0].dn) 2106 2274 2107 print "Testing ldb.search for (&(cn=*daptestcomputer)(objectCategory=compuTER))"2275 # Testing ldb.search for (&(cn=*daptestcomputer)(objectCategory=compuTER))" 2108 2276 res6 = ldb.search(expression="(&(cn=*daptestcomputer)(objectCategory=compuTER))") 2109 2277 self.assertEquals(len(res6), 1, "Could not find (&(cn=*daptestcomputer)(objectCategory=compuTER))") … … 2113 2281 ldb.delete("<GUID=" + ldb.schema_format_value("objectGUID", res[0]["objectGUID"][0]) + ">") 2114 2282 2115 print "Testing ldb.search for (&(cn=ldaptest2computer)(objectClass=user))"2283 # Testing ldb.search for (&(cn=ldaptest2computer)(objectClass=user))" 2116 2284 res = ldb.search(expression="(&(cn=ldaptest2computer)(objectClass=user))") 2117 2285 self.assertEquals(len(res), 1, "Could not find (&(cn=ldaptest2computer)(objectClass=user))") … … 2123 2291 self.assertTrue("objectGUID" in res[0]) 2124 2292 self.assertTrue("whenCreated" in res[0]) 2125 self.assertEquals(res[0]["objectCategory"][0], "CN=Computer, CN=Schema,CN=Configuration," + self.base_dn)2293 self.assertEquals(res[0]["objectCategory"][0], "CN=Computer,%s" % ldb.get_schema_basedn()) 2126 2294 self.assertEquals(int(res[0]["sAMAccountType"][0]), ATYPE_WORKSTATION_TRUST) 2127 2295 self.assertEquals(int(res[0]["userAccountControl"][0]), UF_WORKSTATION_TRUST_ACCOUNT) … … 2130 2298 2131 2299 attrs = ["cn", "name", "objectClass", "objectGUID", "objectSID", "whenCreated", "nTSecurityDescriptor", "memberOf", "allowedAttributes", "allowedAttributesEffective"] 2132 print "Testing ldb.search for (&(cn=ldaptestUSer2)(objectClass=user))"2300 # Testing ldb.search for (&(cn=ldaptestUSer2)(objectClass=user))" 2133 2301 res_user = ldb.search(self.base_dn, expression="(&(cn=ldaptestUSer2)(objectClass=user))", scope=SCOPE_SUBTREE, attrs=attrs) 2134 2302 self.assertEquals(len(res_user), 1, "Could not find (&(cn=ldaptestUSer2)(objectClass=user))") … … 2150 2318 2151 2319 attrs = ["cn", "name", "objectClass", "objectGUID", "objectSID", "whenCreated", "nTSecurityDescriptor", "member", "allowedAttributes", "allowedAttributesEffective"] 2152 print "Testing ldb.search for (&(cn=ldaptestgroup2)(objectClass=group))"2320 # Testing ldb.search for (&(cn=ldaptestgroup2)(objectClass=group))" 2153 2321 res = ldb.search(self.base_dn, expression="(&(cn=ldaptestgroup2)(objectClass=group))", scope=SCOPE_SUBTREE, attrs=attrs) 2154 2322 self.assertEquals(len(res), 1, "Could not find (&(cn=ldaptestgroup2)(objectClass=group))") … … 2180 2348 self.assertTrue(("<GUID=" + ldb.schema_format_value("objectGUID", ldaptestuser2_guid) + ">;<SID=" + ldb.schema_format_value("objectSid", ldaptestuser2_sid) + ">;CN=ldaptestuser2,CN=Users," + self.base_dn).upper() in memberUP) 2181 2349 2182 print "Quicktest for linked attributes"2350 # Quicktest for linked attributes" 2183 2351 ldb.modify_ldif(""" 2184 2352 dn: cn=ldaptestgroup2,cn=users,""" + self.base_dn + """ … … 2243 2411 2244 2412 attrs = ["cn", "name", "objectClass", "objectGUID", "whenCreated", "nTSecurityDescriptor", "member"] 2245 print "Testing ldb.search for (&(cn=ldaptestgroup2)(objectClass=group)) to check linked delete"2413 # Testing ldb.search for (&(cn=ldaptestgroup2)(objectClass=group)) to check linked delete" 2246 2414 res = ldb.search(self.base_dn, expression="(&(cn=ldaptestgroup2)(objectClass=group))", scope=SCOPE_SUBTREE, attrs=attrs) 2247 2415 self.assertEquals(len(res), 1, "Could not find (&(cn=ldaptestgroup2)(objectClass=group)) to check linked delete") … … 2250 2418 self.assertTrue("member" not in res[0]) 2251 2419 2252 print "Testing ldb.search for (&(cn=ldaptestutf8user ÃÃÃÃÃÃ)(objectClass=user))"2253 # TODO UTF8 users don't seem to work fully anymore 2254 # res = ldb.search(expression="(&(cn=ldaptestutf8user ÃÃÃÃÃÃ)(objectClass=user))")2420 # Testing ldb.search for (&(cn=ldaptestutf8user ÃÃÃÃÃÃ)(objectClass=user))" 2421 res = ldb.search(expression="(&(cn=ldaptestutf8user ÃÃÃÃÃÃ)(objectClass=user))") 2422 self.assertEquals(len(res), 1, "Could not find (&(cn=ldaptestutf8user ÃÃÃÃÃÃ)(objectClass=user))") 2255 2423 res = ldb.search(expression="(&(cn=ldaptestutf8user Úùéìòà )(objectclass=user))") 2256 2424 self.assertEquals(len(res), 1, "Could not find (&(cn=ldaptestutf8user ÃÃÃÃÃÃ)(objectClass=user))") … … 2263 2431 self.assertTrue("whenCreated" in res[0]) 2264 2432 2433 # delete "ldaptestutf8user" 2265 2434 ldb.delete(res[0].dn) 2266 2435 2267 print "Testing ldb.search for (&(cn=ldaptestutf8user2*)(objectClass=user))"2436 # Testing ldb.search for (&(cn=ldaptestutf8user2*)(objectClass=user))" 2268 2437 res = ldb.search(expression="(&(cn=ldaptestutf8user2*)(objectClass=user))") 2269 2438 self.assertEquals(len(res), 1, "Could not find (&(cn=ldaptestutf8user2*)(objectClass=user))") 2270 2439 2440 # Testing ldb.search for (&(cn=ldaptestutf8user2 ÃÃÃÃÃÃ)(objectClass=user))" 2441 res = ldb.search(expression="(&(cn=ldaptestutf8user2 ÃÃÃÃÃÃ)(objectClass=user))") 2442 self.assertEquals(len(res), 1, "Could not find (&(cn=ldaptestutf8user2 ÃÃÃÃÃÃ)(objectClass=user))") 2443 2444 # delete "ldaptestutf8user2 " 2271 2445 ldb.delete(res[0].dn) 2272 2446 2273 2447 ldb.delete(("CN=ldaptestgroup2,CN=Users," + self.base_dn)) 2274 2448 2275 print "Testing ldb.search for (&(cn=ldaptestutf8user2 ÃÃÃÃÃÃ)(objectClass=user))" 2276 # TODO UTF8 users don't seem to work fully anymore 2277 # res = ldb.search(expression="(&(cn=ldaptestutf8user ÃÃÃÃÃÃ)(objectClass=user))") 2278 # self.assertEquals(len(res), 1, "Could not find (&(cn=ldaptestutf8user ÃÃÃÃÃÃ)(objectClass=user))") 2279 2280 print "Testing that we can't get at the configuration DN from the main search base" 2449 # Testing that we can't get at the configuration DN from the main search base" 2281 2450 res = ldb.search(self.base_dn, expression="objectClass=crossRef", scope=SCOPE_SUBTREE, attrs=["cn"]) 2282 2451 self.assertEquals(len(res), 0) 2283 2452 2284 print "Testing that we can get at the configuration DN from the main search base on the LDAP port with the 'phantom root' search_options control"2453 # Testing that we can get at the configuration DN from the main search base on the LDAP port with the 'phantom root' search_options control" 2285 2454 res = ldb.search(self.base_dn, expression="objectClass=crossRef", scope=SCOPE_SUBTREE, attrs=["cn"], controls=["search_options:1:2"]) 2286 2455 self.assertTrue(len(res) > 0) 2287 2456 2288 2457 if gc_ldb is not None: 2289 print "Testing that we can get at the configuration DN from the main search base on the GC port with the search_options control == 0"2458 # Testing that we can get at the configuration DN from the main search base on the GC port with the search_options control == 0" 2290 2459 2291 2460 res = gc_ldb.search(self.base_dn, expression="objectClass=crossRef", scope=SCOPE_SUBTREE, attrs=["cn"], controls=["search_options:1:0"]) 2292 2461 self.assertTrue(len(res) > 0) 2293 2462 2294 print "Testing that we do find configuration elements in the global catlog"2463 # Testing that we do find configuration elements in the global catlog" 2295 2464 res = gc_ldb.search(self.base_dn, expression="objectClass=crossRef", scope=SCOPE_SUBTREE, attrs=["cn"]) 2296 2465 self.assertTrue(len(res) > 0) 2297 2466 2298 print "Testing that we do find configuration elements and user elements at the same time"2467 # Testing that we do find configuration elements and user elements at the same time" 2299 2468 res = gc_ldb.search(self.base_dn, expression="(|(objectClass=crossRef)(objectClass=person))", scope=SCOPE_SUBTREE, attrs=["cn"]) 2300 2469 self.assertTrue(len(res) > 0) 2301 2470 2302 print "Testing that we do find configuration elements in the global catlog, with the configuration basedn"2471 # Testing that we do find configuration elements in the global catlog, with the configuration basedn" 2303 2472 res = gc_ldb.search(self.configuration_dn, expression="objectClass=crossRef", scope=SCOPE_SUBTREE, attrs=["cn"]) 2304 2473 self.assertTrue(len(res) > 0) 2305 2474 2306 print "Testing that we can get at the configuration DN on the main LDAP port"2475 # Testing that we can get at the configuration DN on the main LDAP port" 2307 2476 res = ldb.search(self.configuration_dn, expression="objectClass=crossRef", scope=SCOPE_SUBTREE, attrs=["cn"]) 2308 2477 self.assertTrue(len(res) > 0) 2309 2478 2310 print "Testing objectCategory canonacolisation"2479 # Testing objectCategory canonacolisation" 2311 2480 res = ldb.search(self.configuration_dn, expression="objectCategory=ntDsDSA", scope=SCOPE_SUBTREE, attrs=["cn"]) 2312 2481 self.assertTrue(len(res) > 0, "Didn't find any records with objectCategory=ntDsDSA") … … 2317 2486 self.assertTrue(len(res) != 0) 2318 2487 2319 print "Testing objectClass attribute order on "+ self.base_dn2488 # Testing objectClass attribute order on "+ self.base_dn 2320 2489 res = ldb.search(expression="objectClass=domain", base=self.base_dn, 2321 2490 scope=SCOPE_BASE, attrs=["objectClass"]) … … 2326 2495 # check enumeration 2327 2496 2328 print "Testing ldb.search for objectCategory=person"2497 # Testing ldb.search for objectCategory=person" 2329 2498 res = ldb.search(self.base_dn, expression="objectCategory=person", scope=SCOPE_SUBTREE, attrs=["cn"]) 2330 2499 self.assertTrue(len(res) > 0) 2331 2500 2332 print "Testing ldb.search for objectCategory=person with domain scope control"2501 # Testing ldb.search for objectCategory=person with domain scope control" 2333 2502 res = ldb.search(self.base_dn, expression="objectCategory=person", scope=SCOPE_SUBTREE, attrs=["cn"], controls=["domain_scope:1"]) 2334 2503 self.assertTrue(len(res) > 0) 2335 2504 2336 print "Testing ldb.search for objectCategory=user"2505 # Testing ldb.search for objectCategory=user" 2337 2506 res = ldb.search(self.base_dn, expression="objectCategory=user", scope=SCOPE_SUBTREE, attrs=["cn"]) 2338 2507 self.assertTrue(len(res) > 0) 2339 2508 2340 print "Testing ldb.search for objectCategory=user with domain scope control"2509 # Testing ldb.search for objectCategory=user with domain scope control" 2341 2510 res = ldb.search(self.base_dn, expression="objectCategory=user", scope=SCOPE_SUBTREE, attrs=["cn"], controls=["domain_scope:1"]) 2342 2511 self.assertTrue(len(res) > 0) 2343 2512 2344 print "Testing ldb.search for objectCategory=group"2513 # Testing ldb.search for objectCategory=group" 2345 2514 res = ldb.search(self.base_dn, expression="objectCategory=group", scope=SCOPE_SUBTREE, attrs=["cn"]) 2346 2515 self.assertTrue(len(res) > 0) 2347 2516 2348 print "Testing ldb.search for objectCategory=group with domain scope control"2517 # Testing ldb.search for objectCategory=group with domain scope control" 2349 2518 res = ldb.search(self.base_dn, expression="objectCategory=group", scope=SCOPE_SUBTREE, attrs=["cn"], controls=["domain_scope:1"]) 2350 2519 self.assertTrue(len(res) > 0) 2351 2520 2352 print "Testing creating a user with the posixAccount objectClass"2521 # Testing creating a user with the posixAccount objectClass" 2353 2522 self.ldb.add_ldif("""dn: cn=posixuser,CN=Users,%s 2354 2523 objectClass: top … … 2367 2536 description: A POSIX user"""% (self.base_dn)) 2368 2537 2369 print "Testing removing the posixAccount objectClass from an existing user"2538 # Testing removing the posixAccount objectClass from an existing user" 2370 2539 self.ldb.modify_ldif("""dn: cn=posixuser,CN=Users,%s 2371 2540 changetype: modify … … 2373 2542 objectClass: posixAccount"""% (self.base_dn)) 2374 2543 2375 print "Testing adding the posixAccount objectClass to an existing user"2544 # Testing adding the posixAccount objectClass to an existing user" 2376 2545 self.ldb.modify_ldif("""dn: cn=posixuser,CN=Users,%s 2377 2546 changetype: modify … … 2460 2629 delete_force(self.ldb, user_dn) 2461 2630 try: 2462 sddl = "O:DUG:DUD: PAI(A;;RPWP;;;AU)S:PAI"2631 sddl = "O:DUG:DUD:AI(A;;RPWP;;;AU)S:PAI" 2463 2632 desc = security.descriptor.from_sddl(sddl, security.dom_sid('S-1-5-21')) 2464 2633 desc_base64 = base64.b64encode( ndr_pack(desc) ) … … 2470 2639 res = self.ldb.search(base=user_dn, attrs=["nTSecurityDescriptor"]) 2471 2640 self.assertTrue("nTSecurityDescriptor" in res[0]) 2641 desc = res[0]["nTSecurityDescriptor"][0] 2642 desc = ndr_unpack(security.descriptor, desc) 2643 desc_sddl = desc.as_sddl(self.domain_sid) 2644 self.assertTrue("O:S-1-5-21-513G:S-1-5-21-513D:AI(A;;RPWP;;;AU)" in desc_sddl) 2472 2645 finally: 2473 2646 delete_force(self.ldb, user_dn) … … 2632 2805 def test_dsheuristics(self): 2633 2806 """Tests the 'dSHeuristics' attribute""" 2634 print "Tests the 'dSHeuristics' attribute"""2807 # Tests the 'dSHeuristics' attribute" 2635 2808 2636 2809 # Get the current value to restore it later 2637 2810 dsheuristics = self.ldb.get_dsheuristics() 2638 # Should not be longer than 18 chars? 2639 try: 2640 self.ldb.set_dsheuristics("123ABC-+!1asdfg@#^12") 2641 except LdbError, (num, _): 2642 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) 2643 # If it is >= 10 chars, tenthChar should be 1 2644 try: 2645 self.ldb.set_dsheuristics("00020000000002") 2646 except LdbError, (num, _): 2647 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) 2648 # apart from the above, all char values are accepted 2649 self.ldb.set_dsheuristics("123ABC-+!1asdfg@#^") 2650 self.assertEquals(self.ldb.get_dsheuristics(), "123ABC-+!1asdfg@#^") 2651 # restore old value 2652 self.ldb.set_dsheuristics(dsheuristics) 2811 # Perform the length checks: for each decade (except the 0th) we need 2812 # the first index to be the number. This goes till the 9th one, beyond 2813 # there does not seem to be another limitation. 2814 try: 2815 dshstr = "" 2816 for i in range(1,11): 2817 # This is in the range 2818 self.ldb.set_dsheuristics(dshstr + "x") 2819 self.ldb.set_dsheuristics(dshstr + "xxxxx") 2820 dshstr = dshstr + "xxxxxxxxx" 2821 if i < 10: 2822 # Not anymore in the range, new decade specifier needed 2823 try: 2824 self.ldb.set_dsheuristics(dshstr + "x") 2825 self.fail() 2826 except LdbError, (num, _): 2827 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) 2828 dshstr = dshstr + str(i) 2829 else: 2830 # There does not seem to be an upper limit 2831 self.ldb.set_dsheuristics(dshstr + "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx") 2832 # apart from the above, all char values are accepted 2833 self.ldb.set_dsheuristics("123ABC-+!1asdfg@#^") 2834 self.assertEquals(self.ldb.get_dsheuristics(), "123ABC-+!1asdfg@#^") 2835 finally: 2836 # restore old value 2837 self.ldb.set_dsheuristics(dsheuristics) 2653 2838 2654 2839 def test_ldapControlReturn(self): … … 2656 2841 really return something""" 2657 2842 res = self.ldb.search(attrs=["cn"], 2658 controls=["paged_result :1:10"])2843 controls=["paged_results:1:10"]) 2659 2844 self.assertEquals(len(res.controls), 1) 2660 2845 self.assertEquals(res.controls[0].oid, "1.2.840.113556.1.4.319") 2846 s = str(res.controls[0]) 2661 2847 2662 2848 def test_operational(self): 2663 2849 """Tests operational attributes""" 2664 print "Tests operational attributes"""2850 # Tests operational attributes" 2665 2851 2666 2852 res = self.ldb.search(self.base_dn, scope=SCOPE_BASE, … … 2675 2861 self.assertTrue("whenChanged" in res[0]) 2676 2862 2677 class BaseDnTests(unittest.TestCase): 2863 def test_timevalues1(self): 2864 """Tests possible syntax of time attributes""" 2865 2866 user_name = "testtimevaluesuser1" 2867 user_dn = "CN=%s,CN=Users,%s" % (user_name, self.base_dn) 2868 2869 delete_force(self.ldb, user_dn) 2870 self.ldb.add({ "dn": user_dn, 2871 "objectClass": "user", 2872 "sAMAccountName": user_name }) 2873 2874 # 2875 # We check the following values: 2876 # 2877 # 370101000000Z => 20370101000000.0Z 2878 # 20370102000000.*Z => 20370102000000.0Z 2879 # 2880 ext = [ "Z", ".0Z", ".Z", ".000Z", ".RandomIgnoredCharacters...987654321Z" ] 2881 for i in range(0, len(ext)): 2882 v_raw = "203701%02d000000" % (i + 1) 2883 if ext[i] == "Z": 2884 v_set = v_raw[2:] + ext[i] 2885 else: 2886 v_set = v_raw + ext[i] 2887 v_get = v_raw + ".0Z" 2888 2889 m = Message() 2890 m.dn = Dn(ldb, user_dn) 2891 m["msTSExpireDate"] = MessageElement([v_set], 2892 FLAG_MOD_REPLACE, 2893 "msTSExpireDate") 2894 self.ldb.modify(m) 2895 2896 res = self.ldb.search(base=user_dn, scope=SCOPE_BASE, attrs=["msTSExpireDate"]) 2897 self.assertTrue(len(res) == 1) 2898 self.assertTrue("msTSExpireDate" in res[0]) 2899 self.assertTrue(len(res[0]["msTSExpireDate"]) == 1) 2900 self.assertEquals(res[0]["msTSExpireDate"][0], v_get) 2901 2902 class BaseDnTests(samba.tests.TestCase): 2678 2903 2679 2904 def setUp(self): … … 2758 2983 self.assertEquals(int(res[0]["domainFunctionality"][0]), int(res4[0]["msDS-Behavior-Version"][0])) 2759 2984 2760 res5 = self.ldb.search("cn=partitions, " + str(ldb.get_config_basedn()), scope=SCOPE_BASE, attrs=["msDS-Behavior-Version"])2985 res5 = self.ldb.search("cn=partitions,%s" % ldb.get_config_basedn(), scope=SCOPE_BASE, attrs=["msDS-Behavior-Version"]) 2761 2986 self.assertEquals(len(res5), 1) 2762 2987 self.assertEquals(len(res5[0]["msDS-Behavior-Version"]), 1) … … 2778 3003 """Testing the ldap service name in rootDSE""" 2779 3004 res = self.ldb.search("", scope=SCOPE_BASE, 2780 attrs=["ldapServiceName", "d NSHostName"])3005 attrs=["ldapServiceName", "dnsHostName"]) 2781 3006 self.assertEquals(len(res), 1) 2782 2783 (hostname, _, dns_domainname) = res[0]["dNSHostName"][0].partition(".") 2784 self.assertTrue(":%s$@%s" % (hostname, dns_domainname.upper()) 2785 in res[0]["ldapServiceName"][0]) 3007 self.assertTrue("ldapServiceName" in res[0]) 3008 self.assertTrue("dnsHostName" in res[0]) 3009 3010 (hostname, _, dns_domainname) = res[0]["dnsHostName"][0].partition(".") 3011 3012 given = res[0]["ldapServiceName"][0] 3013 expected = "%s:%s$@%s" % (dns_domainname.lower(), hostname.lower(), dns_domainname.upper()) 3014 self.assertEquals(given, expected) 2786 3015 2787 3016 if not "://" in host: … … 2798 3027 gc_ldb = None 2799 3028 2800 runner = SubunitTestRunner() 2801 rc = 0 2802 if not runner.run(unittest.makeSuite(BaseDnTests)).wasSuccessful(): 2803 rc = 1 2804 if not runner.run(unittest.makeSuite(BasicTests)).wasSuccessful(): 2805 rc = 1 2806 sys.exit(rc) 3029 TestProgram(module=__name__, opts=subunitopts) -
vendor/current/source4/dsdb/tests/python/ldap_schema.py
r740 r988 2 2 # -*- coding: utf-8 -*- 3 3 # This is a port of the original in testprogs/ejs/ldap.js 4 5 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2008-2011 6 # 7 # This program is free software; you can redistribute it and/or modify 8 # it under the terms of the GNU General Public License as published by 9 # the Free Software Foundation; either version 3 of the License, or 10 # (at your option) any later version. 11 # 12 # This program is distributed in the hope that it will be useful, 13 # but WITHOUT ANY WARRANTY; without even the implied warranty of 14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15 # GNU General Public License for more details. 16 # 17 # You should have received a copy of the GNU General Public License 18 # along with this program. If not, see <http://www.gnu.org/licenses/>. 19 20 4 21 5 22 import optparse … … 11 28 sys.path.insert(0, "bin/python") 12 29 import samba 13 samba.ensure_external_module("testtools", "testtools") 14 samba.ensure_external_module("subunit", "subunit/python") 30 from samba.tests.subunitrun import TestProgram, SubunitOptions 15 31 16 32 import samba.getopt as options … … 26 42 from samba.dsdb import DS_DOMAIN_FUNCTION_2003 27 43 from samba.tests import delete_force 28 29 from subunit.run import SubunitTestRunner 30 import unittest 44 from samba.ndr import ndr_unpack 45 from samba.dcerpc import drsblobs 31 46 32 47 parser = optparse.OptionParser("ldap_schema.py [options] <host>") … … 37 52 credopts = options.CredentialsOptions(parser) 38 53 parser.add_option_group(credopts) 54 subunitopts = SubunitOptions(parser) 55 parser.add_option_group(subunitopts) 39 56 opts, args = parser.parse_args() 40 57 … … 49 66 50 67 51 class SchemaTests( unittest.TestCase):68 class SchemaTests(samba.tests.TestCase): 52 69 53 70 def setUp(self): 54 71 super(SchemaTests, self).setUp() 55 self.ldb = ldb 56 self.base_dn = ldb.domain_dn() 57 self.schema_dn = ldb.get_schema_basedn().get_linearized() 72 self.ldb = SamDB(host, credentials=creds, 73 session_info=system_session(lp), lp=lp, options=ldb_options) 74 self.base_dn = self.ldb.domain_dn() 75 self.schema_dn = self.ldb.get_schema_basedn().get_linearized() 58 76 59 77 def test_generated_schema(self): … … 68 86 def test_generated_schema_is_operational(self): 69 87 """Testing we don't get the generated schema via LDAP by default""" 88 # Must keep the "*" form 70 89 res = self.ldb.search("cn=aggregate,"+self.schema_dn, scope=SCOPE_BASE, 71 attrs=["*"])90 attrs=["*"]) 72 91 self.assertEquals(len(res), 1) 73 92 self.assertFalse("dITContentRules" in res[0]) … … 95 114 """ 96 115 self.ldb.add_ldif(ldif) 116 # We must do a schemaUpdateNow otherwise it's not 100% sure that the schema 117 # will contain the new attribute 118 ldif = """ 119 dn: 120 changetype: modify 121 add: schemaUpdateNow 122 schemaUpdateNow: 1 123 """ 124 self.ldb.modify_ldif(ldif) 97 125 98 126 # Search for created attribute 99 127 res = [] 100 res = self.ldb.search("cn=%s,%s" % (attr_name, self.schema_dn), scope=SCOPE_BASE, attrs=["*"]) 128 res = self.ldb.search("cn=%s,%s" % (attr_name, self.schema_dn), scope=SCOPE_BASE, 129 attrs=["lDAPDisplayName","schemaIDGUID", "msDS-IntID"]) 101 130 self.assertEquals(len(res), 1) 102 131 self.assertEquals(res[0]["lDAPDisplayName"][0], attr_ldap_display_name) 103 132 self.assertTrue("schemaIDGUID" in res[0]) 133 if "msDS-IntId" in res[0]: 134 msDS_IntId = int(res[0]["msDS-IntId"][0]) 135 if msDS_IntId < 0: 136 msDS_IntId += (1 << 32) 137 else: 138 msDS_IntId = None 104 139 105 140 class_name = "test-Class" + time.strftime("%s", time.gmtime()) … … 152 187 # Search for created objectclass 153 188 res = [] 154 res = self.ldb.search("cn=%s,%s" % (class_name, self.schema_dn), scope=SCOPE_BASE, attrs=["*"]) 189 res = self.ldb.search("cn=%s,%s" % (class_name, self.schema_dn), scope=SCOPE_BASE, 190 attrs=["lDAPDisplayName", "defaultObjectCategory", "schemaIDGUID", "distinguishedName"]) 155 191 self.assertEquals(len(res), 1) 156 192 self.assertEquals(res[0]["lDAPDisplayName"][0], class_ldap_display_name) … … 184 220 185 221 # Search for created object 186 res = [] 187 res = self.ldb.search("cn=%s,cn=Users,%s" % (object_name, self.base_dn), scope=SCOPE_BASE, attrs=["*"]) 188 self.assertEquals(len(res), 1) 222 obj_res = self.ldb.search("cn=%s,cn=Users,%s" % (object_name, self.base_dn), scope=SCOPE_BASE, attrs=["replPropertyMetaData"]) 223 224 self.assertEquals(len(obj_res), 1) 225 self.assertTrue("replPropertyMetaData" in obj_res[0]) 226 val = obj_res[0]["replPropertyMetaData"][0] 227 repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, str(val)) 228 obj = repl.ctr 229 230 # Windows 2000 functional level won't have this. It is too 231 # hard to work it out from the prefixmap however, so we skip 232 # this test in that case. 233 if msDS_IntId is not None: 234 found = False 235 for o in repl.ctr.array: 236 if o.attid == msDS_IntId: 237 found = True 238 break 239 self.assertTrue(found, "Did not find 0x%08x in replPropertyMetaData" % msDS_IntId) 189 240 # Delete the object 190 241 delete_force(self.ldb, "cn=%s,cn=Users,%s" % (object_name, self.base_dn)) … … 215 266 # Search for created objectclass 216 267 res = [] 217 res = self.ldb.search("cn=%s,%s" % (class_name, self.schema_dn), scope=SCOPE_BASE, attrs=["*"]) 268 res = self.ldb.search("cn=%s,%s" % (class_name, self.schema_dn), scope=SCOPE_BASE, 269 attrs=["lDAPDisplayName", "defaultObjectCategory", 270 "schemaIDGUID", "distinguishedName"]) 218 271 self.assertEquals(len(res), 1) 219 272 self.assertEquals(res[0]["lDAPDisplayName"][0], class_ldap_display_name) … … 241 294 # Search for created object 242 295 res = [] 243 res = self.ldb.search("ou=%s,%s" % (object_name, self.base_dn), scope=SCOPE_BASE, attrs=[" *"])296 res = self.ldb.search("ou=%s,%s" % (object_name, self.base_dn), scope=SCOPE_BASE, attrs=["dn"]) 244 297 self.assertEquals(len(res), 1) 245 298 # Delete the object … … 247 300 248 301 249 class SchemaTests_msDS_IntId( unittest.TestCase):302 class SchemaTests_msDS_IntId(samba.tests.TestCase): 250 303 251 304 def setUp(self): 252 305 super(SchemaTests_msDS_IntId, self).setUp() 253 self.ldb = ldb 254 res = ldb.search(base="", expression="", scope=SCOPE_BASE, attrs=["*"]) 306 self.ldb = SamDB(host, credentials=creds, 307 session_info=system_session(lp), lp=lp, options=ldb_options) 308 res = self.ldb.search(base="", expression="", scope=SCOPE_BASE, 309 attrs=["schemaNamingContext", "defaultNamingContext", 310 "forestFunctionality"]) 255 311 self.assertEquals(len(res), 1) 256 312 self.schema_dn = res[0]["schemaNamingContext"][0] … … 331 387 # Search for created attribute 332 388 res = [] 333 res = self.ldb.search(attr_dn, scope=SCOPE_BASE, attrs=["*"]) 389 res = self.ldb.search(attr_dn, scope=SCOPE_BASE, 390 attrs=["lDAPDisplayName", "msDS-IntId", "systemFlags"]) 334 391 self.assertEquals(len(res), 1) 335 392 self.assertEquals(res[0]["lDAPDisplayName"][0], attr_ldap_name) … … 373 430 # Search for created attribute 374 431 res = [] 375 res = self.ldb.search(attr_dn, scope=SCOPE_BASE, attrs=["*"]) 432 res = self.ldb.search(attr_dn, scope=SCOPE_BASE, 433 attrs=["lDAPDisplayName", "msDS-IntId"]) 376 434 self.assertEquals(len(res), 1) 377 435 self.assertEquals(res[0]["lDAPDisplayName"][0], attr_ldap_name) … … 428 486 self._ldap_schemaUpdateNow() 429 487 430 res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=[" *"])488 res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=["msDS-IntId"]) 431 489 self.assertEquals(len(res), 1) 432 490 self.assertEquals(res[0]["msDS-IntId"][0], "-1993108831") … … 440 498 441 499 # Search for created Class 442 res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=[" *"])500 res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=["msDS-IntId"]) 443 501 self.assertEquals(len(res), 1) 444 502 self.assertFalse("msDS-IntId" in res[0]) … … 465 523 self.ldb.add_ldif(ldif_add) 466 524 467 res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=[" *"])525 res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=["msDS-IntId"]) 468 526 self.assertEquals(len(res), 1) 469 527 self.assertEquals(res[0]["msDS-IntId"][0], "-1993108831") … … 478 536 479 537 # Search for created Class 480 res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=[" *"])538 res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=["msDS-IntId"]) 481 539 self.assertEquals(len(res), 1) 482 540 self.assertFalse("msDS-IntId" in res[0]) … … 490 548 except LdbError, (num, _): 491 549 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) 492 res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=[" *"])550 res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=["msDS-IntId"]) 493 551 self.assertEquals(len(res), 1) 494 552 self.assertFalse("msDS-IntId" in res[0]) … … 519 577 520 578 521 class SchemaTests_msDS_isRODC( unittest.TestCase):579 class SchemaTests_msDS_isRODC(samba.tests.TestCase): 522 580 523 581 def setUp(self): 524 582 super(SchemaTests_msDS_isRODC, self).setUp() 525 self.ldb = ldb 526 res = ldb.search(base="", expression="", scope=SCOPE_BASE, attrs=["*"]) 583 self.ldb = SamDB(host, credentials=creds, 584 session_info=system_session(lp), lp=lp, options=ldb_options) 585 res = self.ldb.search(base="", expression="", scope=SCOPE_BASE, attrs=["defaultNamingContext"]) 527 586 self.assertEquals(len(res), 1) 528 587 self.base_dn = res[0]["defaultNamingContext"][0] … … 568 627 ldb_options = ["modules:paged_searches"] 569 628 570 ldb = SamDB(host, credentials=creds, session_info=system_session(lp), lp=lp, options=ldb_options) 571 572 runner = SubunitTestRunner() 573 rc = 0 574 if not runner.run(unittest.makeSuite(SchemaTests)).wasSuccessful(): 575 rc = 1 576 if not runner.run(unittest.makeSuite(SchemaTests_msDS_IntId)).wasSuccessful(): 577 rc = 1 578 if not runner.run(unittest.makeSuite(SchemaTests_msDS_isRODC)).wasSuccessful(): 579 rc = 1 580 581 sys.exit(rc) 629 TestProgram(module=__name__, opts=subunitopts) -
vendor/current/source4/dsdb/tests/python/ldap_syntaxes.py
r740 r988 11 11 sys.path.insert(0, "bin/python") 12 12 import samba 13 samba.ensure_external_module("testtools", "testtools") 14 samba.ensure_external_module("subunit", "subunit/python") 13 14 from samba.tests.subunitrun import SubunitOptions, TestProgram 15 15 16 16 import samba.getopt as options … … 21 21 from ldb import ERR_INVALID_ATTRIBUTE_SYNTAX 22 22 from ldb import ERR_ENTRY_ALREADY_EXISTS 23 24 from subunit.run import SubunitTestRunner25 import unittest26 23 27 24 import samba.tests … … 34 31 credopts = options.CredentialsOptions(parser) 35 32 parser.add_option_group(credopts) 33 subunitopts = SubunitOptions(parser) 34 parser.add_option_group(subunitopts) 36 35 opts, args = parser.parse_args() 37 36 … … 45 44 46 45 47 class SyntaxTests( unittest.TestCase):46 class SyntaxTests(samba.tests.TestCase): 48 47 49 48 def setUp(self): 50 49 super(SyntaxTests, self).setUp() 51 self.ldb = ldb 52 self.base_dn = ldb.domain_dn() 53 self.schema_dn = ldb.get_schema_basedn().get_linearized() 50 self.ldb = samba.tests.connect_samdb(host, credentials=creds, 51 session_info=system_session(lp), lp=lp) 52 self.base_dn = self.ldb.domain_dn() 53 self.schema_dn = self.ldb.get_schema_basedn().get_linearized() 54 54 self._setup_dn_string_test() 55 55 self._setup_dn_binary_test() … … 193 193 194 194 def test_dn_string(self): 195 # add ob eject with correct value195 # add object with correct value 196 196 object_name1 = "obj-DN-String1" + time.strftime("%s", time.gmtime()) 197 197 ldif = self._get_object_ldif(object_name1, self.dn_string_class_name, self.dn_string_class_ldap_display_name, … … 200 200 201 201 # search by specifying the DN part only 202 res = ldb.search(base=self.base_dn,202 res = self.ldb.search(base=self.base_dn, 203 203 scope=SCOPE_SUBTREE, 204 204 expression="(%s=%s)" % (self.dn_string_attribute, self.base_dn)) … … 206 206 207 207 # search by specifying the string part only 208 res = ldb.search(base=self.base_dn,208 res = self.ldb.search(base=self.base_dn, 209 209 scope=SCOPE_SUBTREE, 210 210 expression="(%s=S:5:ABCDE)" % self.dn_string_attribute) … … 212 212 213 213 # search by DN+Stirng 214 res = ldb.search(base=self.base_dn,214 res = self.ldb.search(base=self.base_dn, 215 215 scope=SCOPE_SUBTREE, 216 216 expression="(%s=S:5:ABCDE:%s)" % (self.dn_string_attribute, self.base_dn)) … … 284 284 except LdbError, (num, _): 285 285 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) 286 pass287 286 288 287 def test_dn_binary(self): … … 294 293 295 294 # search by specifyingthe DN part 296 res = ldb.search(base=self.base_dn,295 res = self.ldb.search(base=self.base_dn, 297 296 scope=SCOPE_SUBTREE, 298 297 expression="(%s=%s)" % (self.dn_binary_attribute, self.base_dn)) … … 300 299 301 300 # search by specifying the binary part 302 res = ldb.search(base=self.base_dn,301 res = self.ldb.search(base=self.base_dn, 303 302 scope=SCOPE_SUBTREE, 304 303 expression="(%s=B:4:1234)" % self.dn_binary_attribute) … … 306 305 307 306 # search by DN+Binary 308 res = ldb.search(base=self.base_dn,307 res = self.ldb.search(base=self.base_dn, 309 308 scope=SCOPE_SUBTREE, 310 309 expression="(%s=B:4:1234:%s)" % (self.dn_binary_attribute, self.base_dn)) … … 370 369 except LdbError, (num, _): 371 370 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) 372 pass 373 374 ldb = samba.tests.connect_samdb(host, credentials=creds, session_info=system_session(lp), lp=lp) 375 runner = SubunitTestRunner() 376 rc = 0 377 if not runner.run(unittest.makeSuite(SyntaxTests)).wasSuccessful(): 378 rc = 1 379 380 sys.exit(rc) 371 372 TestProgram(module=__name__, opts=subunitopts) -
vendor/current/source4/dsdb/tests/python/passwords.py
r740 r988 17 17 sys.path.insert(0, "bin/python") 18 18 import samba 19 samba.ensure_external_module("testtools", "testtools") 20 samba.ensure_external_module("subunit", "subunit/python") 19 20 from samba.tests.subunitrun import SubunitOptions, TestProgram 21 21 22 22 import samba.getopt as options … … 35 35 import samba.tests 36 36 from samba.tests import delete_force 37 from subunit.run import SubunitTestRunner38 import unittest39 37 40 38 parser = optparse.OptionParser("passwords.py [options] <host>") … … 45 43 credopts = options.CredentialsOptions(parser) 46 44 parser.add_option_group(credopts) 45 subunitopts = SubunitOptions(parser) 46 parser.add_option_group(subunitopts) 47 47 48 opts, args = parser.parse_args() 48 49 … … 67 68 def setUp(self): 68 69 super(PasswordTests, self).setUp() 69 self.ldb = ldb 70 self.base_dn = ldb.domain_dn() 70 self.ldb = SamDB(url=host, session_info=system_session(lp), credentials=creds, lp=lp) 71 72 # Gets back the basedn 73 base_dn = self.ldb.domain_dn() 74 75 # Gets back the configuration basedn 76 configuration_dn = self.ldb.get_config_basedn().get_linearized() 77 78 # Get the old "dSHeuristics" if it was set 79 dsheuristics = self.ldb.get_dsheuristics() 80 81 # Set the "dSHeuristics" to activate the correct "userPassword" behaviour 82 self.ldb.set_dsheuristics("000000001") 83 84 # Reset the "dSHeuristics" as they were before 85 self.addCleanup(self.ldb.set_dsheuristics, dsheuristics) 86 87 # Get the old "minPwdAge" 88 minPwdAge = self.ldb.get_minPwdAge() 89 90 # Set it temporarely to "0" 91 self.ldb.set_minPwdAge("0") 92 self.base_dn = self.ldb.domain_dn() 93 94 # Reset the "minPwdAge" as it was before 95 self.addCleanup(self.ldb.set_minPwdAge, minPwdAge) 71 96 72 97 # (Re)adds the test user "testuser" with no password atm … … 137 162 138 163 def test_unicodePwd_hash_set(self): 139 print "Performs a password hash set operation on 'unicodePwd' which should be prevented"164 """Performs a password hash set operation on 'unicodePwd' which should be prevented""" 140 165 # Notice: Direct hash password sets should never work 141 166 142 167 m = Message() 143 m.dn = Dn( ldb, "cn=testuser,cn=users," + self.base_dn)168 m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn) 144 169 m["unicodePwd"] = MessageElement("XXXXXXXXXXXXXXXX", FLAG_MOD_REPLACE, 145 170 "unicodePwd") 146 171 try: 147 ldb.modify(m)172 self.ldb.modify(m) 148 173 self.fail() 149 174 except LdbError, (num, _): … … 151 176 152 177 def test_unicodePwd_hash_change(self): 153 print "Performs a password hash change operation on 'unicodePwd' which should be prevented"178 """Performs a password hash change operation on 'unicodePwd' which should be prevented""" 154 179 # Notice: Direct hash password changes should never work 155 180 … … 169 194 170 195 def test_unicodePwd_clear_set(self): 171 print "Performs a password cleartext set operation on 'unicodePwd'"172 173 m = Message() 174 m.dn = Dn( ldb, "cn=testuser,cn=users," + self.base_dn)196 """Performs a password cleartext set operation on 'unicodePwd'""" 197 198 m = Message() 199 m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn) 175 200 m["unicodePwd"] = MessageElement("\"thatsAcomplPASS2\"".encode('utf-16-le'), 176 201 FLAG_MOD_REPLACE, "unicodePwd") 177 ldb.modify(m)202 self.ldb.modify(m) 178 203 179 204 def test_unicodePwd_clear_change(self): 180 print "Performs a password cleartext change operation on 'unicodePwd'"205 """Performs a password cleartext change operation on 'unicodePwd'""" 181 206 182 207 self.ldb2.modify_ldif(""" … … 220 245 221 246 def test_dBCSPwd_hash_set(self): 222 print "Performs a password hash set operation on 'dBCSPwd' which should be prevented"247 """Performs a password hash set operation on 'dBCSPwd' which should be prevented""" 223 248 # Notice: Direct hash password sets should never work 224 249 225 250 m = Message() 226 m.dn = Dn( ldb, "cn=testuser,cn=users," + self.base_dn)251 m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn) 227 252 m["dBCSPwd"] = MessageElement("XXXXXXXXXXXXXXXX", FLAG_MOD_REPLACE, 228 253 "dBCSPwd") 229 254 try: 230 ldb.modify(m)255 self.ldb.modify(m) 231 256 self.fail() 232 257 except LdbError, (num, _): … … 234 259 235 260 def test_dBCSPwd_hash_change(self): 236 print "Performs a password hash change operation on 'dBCSPwd' which should be prevented"261 """Performs a password hash change operation on 'dBCSPwd' which should be prevented""" 237 262 # Notice: Direct hash password changes should never work 238 263 … … 251 276 252 277 def test_userPassword_clear_set(self): 253 print "Performs a password cleartext set operation on 'userPassword'"278 """Performs a password cleartext set operation on 'userPassword'""" 254 279 # Notice: This works only against Windows if "dSHeuristics" has been set 255 280 # properly 256 281 257 282 m = Message() 258 m.dn = Dn( ldb, "cn=testuser,cn=users," + self.base_dn)283 m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn) 259 284 m["userPassword"] = MessageElement("thatsAcomplPASS2", FLAG_MOD_REPLACE, 260 285 "userPassword") 261 ldb.modify(m)286 self.ldb.modify(m) 262 287 263 288 def test_userPassword_clear_change(self): 264 print "Performs a password cleartext change operation on 'userPassword'"289 """Performs a password cleartext change operation on 'userPassword'""" 265 290 # Notice: This works only against Windows if "dSHeuristics" has been set 266 291 # properly … … 306 331 307 332 def test_clearTextPassword_clear_set(self): 308 print "Performs a password cleartext set operation on 'clearTextPassword'"333 """Performs a password cleartext set operation on 'clearTextPassword'""" 309 334 # Notice: This never works against Windows - only supported by us 310 335 311 336 try: 312 337 m = Message() 313 m.dn = Dn( ldb, "cn=testuser,cn=users," + self.base_dn)338 m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn) 314 339 m["clearTextPassword"] = MessageElement("thatsAcomplPASS2".encode('utf-16-le'), 315 340 FLAG_MOD_REPLACE, "clearTextPassword") 316 ldb.modify(m)341 self.ldb.modify(m) 317 342 # this passes against s4 318 343 except LdbError, (num, msg): … … 322 347 323 348 def test_clearTextPassword_clear_change(self): 324 print "Performs a password cleartext change operation on 'clearTextPassword'"349 """Performs a password cleartext change operation on 'clearTextPassword'""" 325 350 # Notice: This never works against Windows - only supported by us 326 351 … … 375 400 376 401 def test_failures(self): 377 print "Performs some failure testing"378 379 try: 380 ldb.modify_ldif("""381 dn: cn=testuser,cn=users,""" + self.base_dn + """ 382 changetype: modify 383 delete: userPassword 384 userPassword: thatsAcomplPASS1 385 """) 386 self.fail() 387 except LdbError, (num, _): 388 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) 389 390 try: 391 self.ldb2.modify_ldif(""" 392 dn: cn=testuser,cn=users,""" + self.base_dn + """ 393 changetype: modify 394 delete: userPassword 395 userPassword: thatsAcomplPASS1 396 """) 397 self.fail() 398 except LdbError, (num, _): 399 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) 400 401 try: 402 ldb.modify_ldif("""403 dn: cn=testuser,cn=users,""" + self.base_dn + """ 404 changetype: modify 405 delete: userPassword 406 """) 407 self.fail() 408 except LdbError, (num, _): 409 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) 410 411 try: 412 self.ldb2.modify_ldif(""" 413 dn: cn=testuser,cn=users,""" + self.base_dn + """ 414 changetype: modify 415 delete: userPassword 416 """) 417 self.fail() 418 except LdbError, (num, _): 419 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) 420 421 try: 422 ldb.modify_ldif("""402 """Performs some failure testing""" 403 404 try: 405 self.ldb.modify_ldif(""" 406 dn: cn=testuser,cn=users,""" + self.base_dn + """ 407 changetype: modify 408 delete: userPassword 409 userPassword: thatsAcomplPASS1 410 """) 411 self.fail() 412 except LdbError, (num, _): 413 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) 414 415 try: 416 self.ldb2.modify_ldif(""" 417 dn: cn=testuser,cn=users,""" + self.base_dn + """ 418 changetype: modify 419 delete: userPassword 420 userPassword: thatsAcomplPASS1 421 """) 422 self.fail() 423 except LdbError, (num, _): 424 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) 425 426 try: 427 self.ldb.modify_ldif(""" 428 dn: cn=testuser,cn=users,""" + self.base_dn + """ 429 changetype: modify 430 delete: userPassword 431 """) 432 self.fail() 433 except LdbError, (num, _): 434 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) 435 436 try: 437 self.ldb2.modify_ldif(""" 438 dn: cn=testuser,cn=users,""" + self.base_dn + """ 439 changetype: modify 440 delete: userPassword 441 """) 442 self.fail() 443 except LdbError, (num, _): 444 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) 445 446 try: 447 self.ldb.modify_ldif(""" 423 448 dn: cn=testuser,cn=users,""" + self.base_dn + """ 424 449 changetype: modify … … 442 467 443 468 try: 444 ldb.modify_ldif("""445 dn: cn=testuser,cn=users,""" + self.base_dn + """ 446 changetype: modify 447 delete: userPassword 448 userPassword: thatsAcomplPASS1 449 add: userPassword 450 userPassword: thatsAcomplPASS2 451 userPassword: thatsAcomplPASS2 452 """) 453 self.fail() 454 except LdbError, (num, _): 455 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) 456 457 try: 458 self.ldb2.modify_ldif(""" 459 dn: cn=testuser,cn=users,""" + self.base_dn + """ 460 changetype: modify 461 delete: userPassword 462 userPassword: thatsAcomplPASS1 463 add: userPassword 464 userPassword: thatsAcomplPASS2 465 userPassword: thatsAcomplPASS2 466 """) 467 self.fail() 468 except LdbError, (num, _): 469 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) 470 471 try: 472 ldb.modify_ldif("""473 dn: cn=testuser,cn=users,""" + self.base_dn + """ 474 changetype: modify 475 delete: userPassword 476 userPassword: thatsAcomplPASS1 477 userPassword: thatsAcomplPASS1 478 add: userPassword 479 userPassword: thatsAcomplPASS2 480 """) 481 self.fail() 482 except LdbError, (num, _): 483 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) 484 485 try: 486 self.ldb2.modify_ldif(""" 487 dn: cn=testuser,cn=users,""" + self.base_dn + """ 488 changetype: modify 489 delete: userPassword 490 userPassword: thatsAcomplPASS1 491 userPassword: thatsAcomplPASS1 492 add: userPassword 493 userPassword: thatsAcomplPASS2 494 """) 495 self.fail() 496 except LdbError, (num, _): 497 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) 498 499 try: 500 ldb.modify_ldif("""469 self.ldb.modify_ldif(""" 470 dn: cn=testuser,cn=users,""" + self.base_dn + """ 471 changetype: modify 472 delete: userPassword 473 userPassword: thatsAcomplPASS1 474 add: userPassword 475 userPassword: thatsAcomplPASS2 476 userPassword: thatsAcomplPASS2 477 """) 478 self.fail() 479 except LdbError, (num, _): 480 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) 481 482 try: 483 self.ldb2.modify_ldif(""" 484 dn: cn=testuser,cn=users,""" + self.base_dn + """ 485 changetype: modify 486 delete: userPassword 487 userPassword: thatsAcomplPASS1 488 add: userPassword 489 userPassword: thatsAcomplPASS2 490 userPassword: thatsAcomplPASS2 491 """) 492 self.fail() 493 except LdbError, (num, _): 494 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) 495 496 try: 497 self.ldb.modify_ldif(""" 498 dn: cn=testuser,cn=users,""" + self.base_dn + """ 499 changetype: modify 500 delete: userPassword 501 userPassword: thatsAcomplPASS1 502 userPassword: thatsAcomplPASS1 503 add: userPassword 504 userPassword: thatsAcomplPASS2 505 """) 506 self.fail() 507 except LdbError, (num, _): 508 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) 509 510 try: 511 self.ldb2.modify_ldif(""" 512 dn: cn=testuser,cn=users,""" + self.base_dn + """ 513 changetype: modify 514 delete: userPassword 515 userPassword: thatsAcomplPASS1 516 userPassword: thatsAcomplPASS1 517 add: userPassword 518 userPassword: thatsAcomplPASS2 519 """) 520 self.fail() 521 except LdbError, (num, _): 522 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) 523 524 try: 525 self.ldb.modify_ldif(""" 501 526 dn: cn=testuser,cn=users,""" + self.base_dn + """ 502 527 changetype: modify … … 528 553 529 554 try: 530 ldb.modify_ldif("""555 self.ldb.modify_ldif(""" 531 556 dn: cn=testuser,cn=users,""" + self.base_dn + """ 532 557 changetype: modify … … 558 583 559 584 try: 560 ldb.modify_ldif("""585 self.ldb.modify_ldif(""" 561 586 dn: cn=testuser,cn=users,""" + self.base_dn + """ 562 587 changetype: modify … … 624 649 625 650 # Several password changes at once are allowed 626 ldb.modify_ldif("""651 self.ldb.modify_ldif(""" 627 652 dn: cn=testuser,cn=users,""" + self.base_dn + """ 628 653 changetype: modify … … 633 658 634 659 # Several password changes at once are allowed 635 ldb.modify_ldif("""660 self.ldb.modify_ldif(""" 636 661 dn: cn=testuser,cn=users,""" + self.base_dn + """ 637 662 changetype: modify … … 702 727 703 728 m = Message() 704 m.dn = Dn( ldb, "cn=testuser,cn=users," + self.base_dn)729 m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn) 705 730 m["unicodePwd"] = MessageElement([], FLAG_MOD_ADD, "unicodePwd") 706 731 try: 707 ldb.modify(m)708 self.fail() 709 except LdbError, (num, _): 710 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) 711 712 m = Message() 713 m.dn = Dn( ldb, "cn=testuser,cn=users," + self.base_dn)732 self.ldb.modify(m) 733 self.fail() 734 except LdbError, (num, _): 735 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) 736 737 m = Message() 738 m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn) 714 739 m["dBCSPwd"] = MessageElement([], FLAG_MOD_ADD, "dBCSPwd") 715 740 try: 716 ldb.modify(m)717 self.fail() 718 except LdbError, (num, _): 719 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) 720 721 m = Message() 722 m.dn = Dn( ldb, "cn=testuser,cn=users," + self.base_dn)741 self.ldb.modify(m) 742 self.fail() 743 except LdbError, (num, _): 744 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) 745 746 m = Message() 747 m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn) 723 748 m["userPassword"] = MessageElement([], FLAG_MOD_ADD, "userPassword") 724 749 try: 725 ldb.modify(m)726 self.fail() 727 except LdbError, (num, _): 728 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) 729 730 m = Message() 731 m.dn = Dn( ldb, "cn=testuser,cn=users," + self.base_dn)750 self.ldb.modify(m) 751 self.fail() 752 except LdbError, (num, _): 753 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) 754 755 m = Message() 756 m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn) 732 757 m["clearTextPassword"] = MessageElement([], FLAG_MOD_ADD, "clearTextPassword") 733 758 try: 734 ldb.modify(m)759 self.ldb.modify(m) 735 760 self.fail() 736 761 except LdbError, (num, _): … … 739 764 740 765 m = Message() 741 m.dn = Dn( ldb, "cn=testuser,cn=users," + self.base_dn)766 m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn) 742 767 m["unicodePwd"] = MessageElement([], FLAG_MOD_REPLACE, "unicodePwd") 743 768 try: 744 ldb.modify(m)745 self.fail() 746 except LdbError, (num, _): 747 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 748 749 m = Message() 750 m.dn = Dn( ldb, "cn=testuser,cn=users," + self.base_dn)769 self.ldb.modify(m) 770 self.fail() 771 except LdbError, (num, _): 772 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 773 774 m = Message() 775 m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn) 751 776 m["dBCSPwd"] = MessageElement([], FLAG_MOD_REPLACE, "dBCSPwd") 752 777 try: 753 ldb.modify(m)754 self.fail() 755 except LdbError, (num, _): 756 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 757 758 m = Message() 759 m.dn = Dn( ldb, "cn=testuser,cn=users," + self.base_dn)778 self.ldb.modify(m) 779 self.fail() 780 except LdbError, (num, _): 781 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 782 783 m = Message() 784 m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn) 760 785 m["userPassword"] = MessageElement([], FLAG_MOD_REPLACE, "userPassword") 761 786 try: 762 ldb.modify(m)763 self.fail() 764 except LdbError, (num, _): 765 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 766 767 m = Message() 768 m.dn = Dn( ldb, "cn=testuser,cn=users," + self.base_dn)787 self.ldb.modify(m) 788 self.fail() 789 except LdbError, (num, _): 790 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 791 792 m = Message() 793 m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn) 769 794 m["clearTextPassword"] = MessageElement([], FLAG_MOD_REPLACE, "clearTextPassword") 770 795 try: 771 ldb.modify(m)796 self.ldb.modify(m) 772 797 self.fail() 773 798 except LdbError, (num, _): … … 776 801 777 802 m = Message() 778 m.dn = Dn( ldb, "cn=testuser,cn=users," + self.base_dn)803 m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn) 779 804 m["unicodePwd"] = MessageElement([], FLAG_MOD_DELETE, "unicodePwd") 780 805 try: 781 ldb.modify(m)782 self.fail() 783 except LdbError, (num, _): 784 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 785 786 m = Message() 787 m.dn = Dn( ldb, "cn=testuser,cn=users," + self.base_dn)806 self.ldb.modify(m) 807 self.fail() 808 except LdbError, (num, _): 809 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 810 811 m = Message() 812 m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn) 788 813 m["dBCSPwd"] = MessageElement([], FLAG_MOD_DELETE, "dBCSPwd") 789 814 try: 790 ldb.modify(m)791 self.fail() 792 except LdbError, (num, _): 793 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 794 795 m = Message() 796 m.dn = Dn( ldb, "cn=testuser,cn=users," + self.base_dn)815 self.ldb.modify(m) 816 self.fail() 817 except LdbError, (num, _): 818 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 819 820 m = Message() 821 m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn) 797 822 m["userPassword"] = MessageElement([], FLAG_MOD_DELETE, "userPassword") 798 823 try: 799 ldb.modify(m)800 self.fail() 801 except LdbError, (num, _): 802 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) 803 804 m = Message() 805 m.dn = Dn( ldb, "cn=testuser,cn=users," + self.base_dn)824 self.ldb.modify(m) 825 self.fail() 826 except LdbError, (num, _): 827 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) 828 829 m = Message() 830 m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn) 806 831 m["clearTextPassword"] = MessageElement([], FLAG_MOD_DELETE, "clearTextPassword") 807 832 try: 808 ldb.modify(m)833 self.ldb.modify(m) 809 834 self.fail() 810 835 except LdbError, (num, _): … … 816 841 817 842 # Delete the "dSHeuristics" 818 ldb.set_dsheuristics(None)843 self.ldb.set_dsheuristics(None) 819 844 820 845 time.sleep(1) # This switching time is strictly needed! 821 846 822 847 m = Message() 823 m.dn = Dn( ldb, "cn=testuser,cn=users," + self.base_dn)848 m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn) 824 849 m["userPassword"] = MessageElement("myPassword", FLAG_MOD_ADD, 825 850 "userPassword") 826 ldb.modify(m)827 828 res = ldb.search("cn=testuser,cn=users," + self.base_dn,851 self.ldb.modify(m) 852 853 res = self.ldb.search("cn=testuser,cn=users," + self.base_dn, 829 854 scope=SCOPE_BASE, attrs=["userPassword"]) 830 855 self.assertTrue(len(res) == 1) … … 833 858 834 859 m = Message() 835 m.dn = Dn( ldb, "cn=testuser,cn=users," + self.base_dn)860 m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn) 836 861 m["userPassword"] = MessageElement("myPassword2", FLAG_MOD_REPLACE, 837 862 "userPassword") 838 ldb.modify(m)839 840 res = ldb.search("cn=testuser,cn=users," + self.base_dn,863 self.ldb.modify(m) 864 865 res = self.ldb.search("cn=testuser,cn=users," + self.base_dn, 841 866 scope=SCOPE_BASE, attrs=["userPassword"]) 842 867 self.assertTrue(len(res) == 1) … … 845 870 846 871 m = Message() 847 m.dn = Dn( ldb, "cn=testuser,cn=users," + self.base_dn)872 m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn) 848 873 m["userPassword"] = MessageElement([], FLAG_MOD_DELETE, 849 874 "userPassword") 850 ldb.modify(m)851 852 res = ldb.search("cn=testuser,cn=users," + self.base_dn,875 self.ldb.modify(m) 876 877 res = self.ldb.search("cn=testuser,cn=users," + self.base_dn, 853 878 scope=SCOPE_BASE, attrs=["userPassword"]) 854 879 self.assertTrue(len(res) == 1) … … 856 881 857 882 # Set the test "dSHeuristics" to deactivate "userPassword" pwd changes 858 ldb.set_dsheuristics("000000000")859 860 m = Message() 861 m.dn = Dn( ldb, "cn=testuser,cn=users," + self.base_dn)883 self.ldb.set_dsheuristics("000000000") 884 885 m = Message() 886 m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn) 862 887 m["userPassword"] = MessageElement("myPassword3", FLAG_MOD_REPLACE, 863 888 "userPassword") 864 ldb.modify(m)865 866 res = ldb.search("cn=testuser,cn=users," + self.base_dn,889 self.ldb.modify(m) 890 891 res = self.ldb.search("cn=testuser,cn=users," + self.base_dn, 867 892 scope=SCOPE_BASE, attrs=["userPassword"]) 868 893 self.assertTrue(len(res) == 1) … … 871 896 872 897 # Set the test "dSHeuristics" to deactivate "userPassword" pwd changes 873 ldb.set_dsheuristics("000000002")874 875 m = Message() 876 m.dn = Dn( ldb, "cn=testuser,cn=users," + self.base_dn)898 self.ldb.set_dsheuristics("000000002") 899 900 m = Message() 901 m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn) 877 902 m["userPassword"] = MessageElement("myPassword4", FLAG_MOD_REPLACE, 878 903 "userPassword") 879 ldb.modify(m)880 881 res = ldb.search("cn=testuser,cn=users," + self.base_dn,904 self.ldb.modify(m) 905 906 res = self.ldb.search("cn=testuser,cn=users," + self.base_dn, 882 907 scope=SCOPE_BASE, attrs=["userPassword"]) 883 908 self.assertTrue(len(res) == 1) … … 886 911 887 912 # Reset the test "dSHeuristics" (reactivate "userPassword" pwd changes) 888 ldb.set_dsheuristics("000000001")913 self.ldb.set_dsheuristics("000000001") 889 914 890 915 def test_zero_length(self): 891 916 # Get the old "minPwdLength" 892 minPwdLength = ldb.get_minPwdLength()917 minPwdLength = self.ldb.get_minPwdLength() 893 918 # Set it temporarely to "0" 894 ldb.set_minPwdLength("0")919 self.ldb.set_minPwdLength("0") 895 920 896 921 # Get the old "pwdProperties" 897 pwdProperties = ldb.get_pwdProperties()922 pwdProperties = self.ldb.get_pwdProperties() 898 923 # Set them temporarely to "0" (to deactivate eventually the complexity) 899 ldb.set_pwdProperties("0")900 901 ldb.setpassword("(sAMAccountName=testuser)", "")924 self.ldb.set_pwdProperties("0") 925 926 self.ldb.setpassword("(sAMAccountName=testuser)", "") 902 927 903 928 # Reset the "pwdProperties" as they were before 904 ldb.set_pwdProperties(pwdProperties)929 self.ldb.set_pwdProperties(pwdProperties) 905 930 906 931 # Reset the "minPwdLength" as it was before 907 ldb.set_minPwdLength(minPwdLength)932 self.ldb.set_minPwdLength(minPwdLength) 908 933 909 934 def tearDown(self): … … 920 945 host = "ldap://%s" % host 921 946 922 ldb = SamDB(url=host, session_info=system_session(lp), credentials=creds, lp=lp) 923 924 # Gets back the basedn 925 base_dn = ldb.domain_dn() 926 927 # Gets back the configuration basedn 928 configuration_dn = ldb.get_config_basedn().get_linearized() 929 930 # Get the old "dSHeuristics" if it was set 931 dsheuristics = ldb.get_dsheuristics() 932 933 # Set the "dSHeuristics" to activate the correct "userPassword" behaviour 934 ldb.set_dsheuristics("000000001") 935 936 # Get the old "minPwdAge" 937 minPwdAge = ldb.get_minPwdAge() 938 939 # Set it temporarely to "0" 940 ldb.set_minPwdAge("0") 941 942 runner = SubunitTestRunner() 943 rc = 0 944 if not runner.run(unittest.makeSuite(PasswordTests)).wasSuccessful(): 945 rc = 1 946 947 # Reset the "dSHeuristics" as they were before 948 ldb.set_dsheuristics(dsheuristics) 949 950 # Reset the "minPwdAge" as it was before 951 ldb.set_minPwdAge(minPwdAge) 952 953 sys.exit(rc) 947 TestProgram(module=__name__, opts=subunitopts) -
vendor/current/source4/dsdb/tests/python/sam.py
r740 r988 9 9 sys.path.insert(0, "bin/python") 10 10 import samba 11 samba.ensure_external_module("testtools", "testtools") 12 samba.ensure_external_module("subunit", "subunit/python") 11 from samba.tests.subunitrun import SubunitOptions, TestProgram 13 12 14 13 import samba.getopt as options … … 22 21 from ldb import ERR_CONSTRAINT_VIOLATION 23 22 from ldb import ERR_UNDEFINED_ATTRIBUTE_TYPE 23 from ldb import ERR_INSUFFICIENT_ACCESS_RIGHTS 24 24 from ldb import Message, MessageElement, Dn 25 25 from ldb import FLAG_MOD_ADD, FLAG_MOD_REPLACE, FLAG_MOD_DELETE … … 28 28 UF_WORKSTATION_TRUST_ACCOUNT, UF_SERVER_TRUST_ACCOUNT, 29 29 UF_PARTIAL_SECRETS_ACCOUNT, UF_TEMP_DUPLICATE_ACCOUNT, 30 UF_PASSWD_NOTREQD, ATYPE_NORMAL_ACCOUNT, 30 UF_INTERDOMAIN_TRUST_ACCOUNT, 31 UF_PASSWD_NOTREQD, UF_LOCKOUT, UF_PASSWORD_EXPIRED, ATYPE_NORMAL_ACCOUNT, 31 32 GTYPE_SECURITY_BUILTIN_LOCAL_GROUP, GTYPE_SECURITY_DOMAIN_LOCAL_GROUP, 32 33 GTYPE_SECURITY_GLOBAL_GROUP, GTYPE_SECURITY_UNIVERSAL_GROUP, … … 37 38 ATYPE_DISTRIBUTION_UNIVERSAL_GROUP, ATYPE_DISTRIBUTION_LOCAL_GROUP, 38 39 ATYPE_WORKSTATION_TRUST) 39 from samba.dcerpc.security import (DOMAIN_RID_USERS, DOMAIN_RID_DOMAIN_MEMBERS, 40 DOMAIN_RID_DCS, DOMAIN_RID_READONLY_DCS) 41 42 from subunit.run import SubunitTestRunner 43 import unittest 40 from samba.dcerpc.security import (DOMAIN_RID_USERS, DOMAIN_RID_ADMINS, 41 DOMAIN_RID_DOMAIN_MEMBERS, DOMAIN_RID_DCS, DOMAIN_RID_READONLY_DCS) 44 42 45 43 from samba.dcerpc import security … … 53 51 credopts = options.CredentialsOptions(parser) 54 52 parser.add_option_group(credopts) 53 subunitopts = SubunitOptions(parser) 54 parser.add_option_group(subunitopts) 55 55 opts, args = parser.parse_args() 56 56 … … 64 64 creds = credopts.get_credentials(lp) 65 65 66 class SamTests( unittest.TestCase):66 class SamTests(samba.tests.TestCase): 67 67 68 68 def setUp(self): … … 587 587 def test_sam_attributes(self): 588 588 """Test the behaviour of special attributes of SAM objects""" 589 print "Testing the behaviour of special attributes of SAM objects\n" ""589 print "Testing the behaviour of special attributes of SAM objects\n" 590 590 591 591 ldb.add({ … … 1426 1426 # With SYSTEM rights you can set a interdomain trust account. 1427 1427 1428 # Invalid attribute 1429 try: 1430 ldb.add({ 1431 "dn": "cn=ldaptestuser,cn=users," + self.base_dn, 1432 "objectclass": "user", 1433 "userAccountControl": "0"}) 1434 self.fail() 1435 except LdbError, (num, _): 1436 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 1428 ldb.add({ 1429 "dn": "cn=ldaptestuser,cn=users," + self.base_dn, 1430 "objectclass": "user", 1431 "userAccountControl": "0"}) 1432 1433 res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, 1434 scope=SCOPE_BASE, 1435 attrs=["sAMAccountType", "userAccountControl"]) 1436 self.assertTrue(len(res1) == 1) 1437 self.assertEquals(int(res1[0]["sAMAccountType"][0]), 1438 ATYPE_NORMAL_ACCOUNT) 1439 self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE == 0) 1440 self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_PASSWD_NOTREQD == 0) 1437 1441 delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) 1438 1442 1439 # This has to wait until s4 supports it (needs a password module change) 1440 # try: 1441 # ldb.add({ 1442 # "dn": "cn=ldaptestuser,cn=users," + self.base_dn, 1443 # "objectclass": "user", 1444 # "userAccountControl": str(UF_NORMAL_ACCOUNT)}) 1445 # self.fail() 1446 # except LdbError, (num, _): 1447 # self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 1448 # delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) 1443 ldb.add({ 1444 "dn": "cn=ldaptestuser,cn=users," + self.base_dn, 1445 "objectclass": "user", 1446 "userAccountControl": str(UF_NORMAL_ACCOUNT)}) 1447 delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) 1449 1448 1450 1449 ldb.add({ … … 1460 1459 ATYPE_NORMAL_ACCOUNT) 1461 1460 self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE == 0) 1461 delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) 1462 1463 ldb.add({ 1464 "dn": "cn=ldaptestuser,cn=users," + self.base_dn, 1465 "objectclass": "user", 1466 "userAccountControl": str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD | UF_LOCKOUT | UF_PASSWORD_EXPIRED)}) 1467 1468 res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, 1469 scope=SCOPE_BASE, 1470 attrs=["sAMAccountType", "userAccountControl", "lockoutTime", "pwdLastSet"]) 1471 self.assertTrue(len(res1) == 1) 1472 self.assertEquals(int(res1[0]["sAMAccountType"][0]), 1473 ATYPE_NORMAL_ACCOUNT) 1474 self.assertTrue(int(res1[0]["userAccountControl"][0]) & (UF_LOCKOUT | UF_PASSWORD_EXPIRED) == 0) 1475 self.assertFalse("lockoutTime" in res1[0]) 1476 self.assertTrue(int(res1[0]["pwdLastSet"][0]) == 0) 1462 1477 delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) 1463 1478 … … 1472 1487 delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) 1473 1488 1474 # This isn't supported yet in s4 1475 # try: 1476 # ldb.add({ 1477 # "dn": "cn=ldaptestuser,cn=users," + self.base_dn, 1478 # "objectclass": "user", 1479 # "userAccountControl": str(UF_SERVER_TRUST_ACCOUNT)}) 1480 # self.fail() 1481 # except LdbError, (num, _): 1482 # self.assertEquals(num, ERR_OBJECT_CLASS_VIOLATION) 1483 # delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) 1484 # 1485 # try: 1486 # ldb.add({ 1487 # "dn": "cn=ldaptestuser,cn=users," + self.base_dn, 1488 # "objectclass": "user", 1489 # "userAccountControl": str(UF_WORKSTATION_TRUST_ACCOUNT)}) 1490 # except LdbError, (num, _): 1491 # self.assertEquals(num, ERR_OBJECT_CLASS_VIOLATION) 1492 # delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) 1493 1494 # This isn't supported yet in s4 - needs ACL module adaption 1495 # try: 1496 # ldb.add({ 1497 # "dn": "cn=ldaptestuser,cn=users," + self.base_dn, 1498 # "objectclass": "user", 1499 # "userAccountControl": str(UF_INTERDOMAIN_TRUST_ACCOUNT)}) 1500 # self.fail() 1501 # except LdbError, (num, _): 1502 # self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS) 1503 # delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) 1489 try: 1490 ldb.add({ 1491 "dn": "cn=ldaptestuser,cn=users," + self.base_dn, 1492 "objectclass": "user", 1493 "userAccountControl": str(UF_SERVER_TRUST_ACCOUNT)}) 1494 self.fail() 1495 except LdbError, (num, _): 1496 self.assertEquals(num, ERR_OBJECT_CLASS_VIOLATION) 1497 delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) 1498 1499 try: 1500 ldb.add({ 1501 "dn": "cn=ldaptestuser,cn=users," + self.base_dn, 1502 "objectclass": "user", 1503 "userAccountControl": str(UF_WORKSTATION_TRUST_ACCOUNT)}) 1504 except LdbError, (num, _): 1505 self.assertEquals(num, ERR_OBJECT_CLASS_VIOLATION) 1506 delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) 1507 1508 try: 1509 ldb.add({ 1510 "dn": "cn=ldaptestuser,cn=users," + self.base_dn, 1511 "objectclass": "user", 1512 "userAccountControl": str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PARTIAL_SECRETS_ACCOUNT)}) 1513 except LdbError, (num, _): 1514 self.assertEquals(num, ERR_OBJECT_CLASS_VIOLATION) 1515 delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) 1516 1517 try: 1518 ldb.add({ 1519 "dn": "cn=ldaptestuser,cn=users," + self.base_dn, 1520 "objectclass": "user", 1521 "userAccountControl": str(UF_INTERDOMAIN_TRUST_ACCOUNT)}) 1522 self.fail() 1523 except LdbError, (num, _): 1524 self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS) 1525 delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) 1504 1526 1505 1527 # Modify operation … … 1534 1556 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 1535 1557 1536 # This has to wait until s4 supports it (needs a password module change) 1537 # try: 1538 # m = Message() 1539 # m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) 1540 # m["userAccountControl"] = MessageElement( 1541 # str(UF_NORMAL_ACCOUNT), 1542 # FLAG_MOD_REPLACE, "userAccountControl") 1543 # ldb.modify(m) 1544 # except LdbError, (num, _): 1545 # self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 1558 try: 1559 m = Message() 1560 m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) 1561 m["userAccountControl"] = MessageElement( 1562 str(UF_NORMAL_ACCOUNT), 1563 FLAG_MOD_REPLACE, "userAccountControl") 1564 ldb.modify(m) 1565 except LdbError, (num, _): 1566 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 1546 1567 1547 1568 m = Message() … … 1559 1580 ATYPE_NORMAL_ACCOUNT) 1560 1581 self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE == 0) 1582 1583 m = Message() 1584 m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) 1585 m["userAccountControl"] = MessageElement( 1586 str(UF_ACCOUNTDISABLE), 1587 FLAG_MOD_REPLACE, "userAccountControl") 1588 ldb.modify(m) 1589 1590 res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, 1591 scope=SCOPE_BASE, 1592 attrs=["sAMAccountType", "userAccountControl"]) 1593 self.assertTrue(len(res1) == 1) 1594 self.assertEquals(int(res1[0]["sAMAccountType"][0]), 1595 ATYPE_NORMAL_ACCOUNT) 1596 self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_NORMAL_ACCOUNT != 0) 1597 self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE != 0) 1598 1599 m = Message() 1600 m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) 1601 m["lockoutTime"] = MessageElement(str(samba.unix2nttime(0)), FLAG_MOD_REPLACE, "lockoutTime") 1602 m["pwdLastSet"] = MessageElement(str(samba.unix2nttime(0)), FLAG_MOD_REPLACE, "pwdLastSet") 1603 ldb.modify(m) 1604 1605 m = Message() 1606 m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) 1607 m["userAccountControl"] = MessageElement( 1608 str(UF_LOCKOUT | UF_PASSWORD_EXPIRED), 1609 FLAG_MOD_REPLACE, "userAccountControl") 1610 ldb.modify(m) 1611 1612 res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, 1613 scope=SCOPE_BASE, 1614 attrs=["sAMAccountType", "userAccountControl", "lockoutTime", "pwdLastSet"]) 1615 self.assertTrue(len(res1) == 1) 1616 self.assertEquals(int(res1[0]["sAMAccountType"][0]), 1617 ATYPE_NORMAL_ACCOUNT) 1618 self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_NORMAL_ACCOUNT != 0) 1619 self.assertTrue(int(res1[0]["userAccountControl"][0]) & (UF_LOCKOUT | UF_PASSWORD_EXPIRED) == 0) 1620 self.assertTrue(int(res1[0]["lockoutTime"][0]) == 0) 1621 self.assertTrue(int(res1[0]["pwdLastSet"][0]) == 0) 1561 1622 1562 1623 try: … … 1571 1632 self.assertEquals(num, ERR_OTHER) 1572 1633 1573 # This isn't supported yet in s4 1574 # try: 1575 # m = Message() 1576 # m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) 1577 # m["userAccountControl"] = MessageElement( 1578 # str(UF_SERVER_TRUST_ACCOUNT), 1579 # FLAG_MOD_REPLACE, "userAccountControl") 1580 # ldb.modify(m) 1581 # self.fail() 1582 # except LdbError, (num, _): 1583 # self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 1634 try: 1635 m = Message() 1636 m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) 1637 m["userAccountControl"] = MessageElement( 1638 str(UF_SERVER_TRUST_ACCOUNT), 1639 FLAG_MOD_REPLACE, "userAccountControl") 1640 ldb.modify(m) 1641 self.fail() 1642 except LdbError, (num, _): 1643 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 1584 1644 1585 1645 m = Message() … … 1590 1650 ldb.modify(m) 1591 1651 1652 try: 1653 m = Message() 1654 m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) 1655 m["userAccountControl"] = MessageElement( 1656 str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PARTIAL_SECRETS_ACCOUNT), 1657 FLAG_MOD_REPLACE, "userAccountControl") 1658 ldb.modify(m) 1659 self.fail() 1660 except LdbError, (num, _): 1661 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 1662 1592 1663 res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, 1593 1664 scope=SCOPE_BASE, attrs=["sAMAccountType"]) … … 1609 1680 ATYPE_NORMAL_ACCOUNT) 1610 1681 1611 # This isn't supported yet in s4 - needs ACL module adaption 1612 # try: 1613 # m = Message() 1614 # m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) 1615 # m["userAccountControl"] = MessageElement( 1616 # str(UF_INTERDOMAIN_TRUST_ACCOUNT), 1617 # FLAG_MOD_REPLACE, "userAccountControl") 1618 # ldb.modify(m) 1619 # self.fail() 1620 # except LdbError, (num, _): 1621 # self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS) 1682 try: 1683 m = Message() 1684 m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) 1685 m["userAccountControl"] = MessageElement( 1686 str(UF_INTERDOMAIN_TRUST_ACCOUNT), 1687 FLAG_MOD_REPLACE, "userAccountControl") 1688 ldb.modify(m) 1689 self.fail() 1690 except LdbError, (num, _): 1691 self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS) 1622 1692 1623 1693 # With a computer object … … 1630 1700 # With SYSTEM rights you can set a interdomain trust account. 1631 1701 1632 # Invalid attribute 1633 try: 1634 ldb.add({ 1635 "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, 1636 "objectclass": "computer", 1637 "userAccountControl": "0"}) 1638 self.fail() 1639 except LdbError, (num, _): 1640 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 1702 ldb.add({ 1703 "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, 1704 "objectclass": "computer", 1705 "userAccountControl": "0"}) 1706 1707 res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, 1708 scope=SCOPE_BASE, 1709 attrs=["sAMAccountType", "userAccountControl"]) 1710 self.assertTrue(len(res1) == 1) 1711 self.assertEquals(int(res1[0]["sAMAccountType"][0]), 1712 ATYPE_NORMAL_ACCOUNT) 1713 self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE == 0) 1714 self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_PASSWD_NOTREQD == 0) 1641 1715 delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) 1642 1716 1643 # This has to wait until s4 supports it (needs a password module change) 1644 # try: 1645 # ldb.add({ 1646 # "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, 1647 # "objectclass": "computer", 1648 # "userAccountControl": str(UF_NORMAL_ACCOUNT)}) 1649 # self.fail() 1650 # except LdbError, (num, _): 1651 # self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 1652 # delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) 1717 ldb.add({ 1718 "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, 1719 "objectclass": "computer", 1720 "userAccountControl": str(UF_NORMAL_ACCOUNT)}) 1721 delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) 1653 1722 1654 1723 ldb.add({ … … 1664 1733 ATYPE_NORMAL_ACCOUNT) 1665 1734 self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE == 0) 1735 delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) 1736 1737 ldb.add({ 1738 "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, 1739 "objectclass": "computer", 1740 "userAccountControl": str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD | UF_LOCKOUT | UF_PASSWORD_EXPIRED)}) 1741 1742 res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, 1743 scope=SCOPE_BASE, 1744 attrs=["sAMAccountType", "userAccountControl", "lockoutTime", "pwdLastSet"]) 1745 self.assertTrue(len(res1) == 1) 1746 self.assertEquals(int(res1[0]["sAMAccountType"][0]), 1747 ATYPE_NORMAL_ACCOUNT) 1748 self.assertTrue(int(res1[0]["userAccountControl"][0]) & (UF_LOCKOUT | UF_PASSWORD_EXPIRED) == 0) 1749 self.assertFalse("lockoutTime" in res1[0]) 1750 self.assertTrue(int(res1[0]["pwdLastSet"][0]) == 0) 1666 1751 delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) 1667 1752 … … 1697 1782 delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) 1698 1783 1699 # This isn't supported yet in s4 - needs ACL module adaption 1700 # try: 1701 # ldb.add({ 1702 # "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, 1703 # "objectclass": "computer", 1704 # "userAccountControl": str(UF_INTERDOMAIN_TRUST_ACCOUNT)}) 1705 # self.fail() 1706 # except LdbError, (num, _): 1707 # self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS) 1708 # delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) 1784 try: 1785 ldb.add({ 1786 "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, 1787 "objectclass": "computer", 1788 "userAccountControl": str(UF_INTERDOMAIN_TRUST_ACCOUNT)}) 1789 self.fail() 1790 except LdbError, (num, _): 1791 self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS) 1792 delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) 1709 1793 1710 1794 # Modify operation … … 1740 1824 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 1741 1825 1742 # This has to wait until s4 supports it (needs a password module change) 1743 # try: 1744 # m = Message() 1745 # m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) 1746 # m["userAccountControl"] = MessageElement( 1747 # str(UF_NORMAL_ACCOUNT), 1748 # FLAG_MOD_REPLACE, "userAccountControl") 1749 # ldb.modify(m) 1750 # except LdbError, (num, _): 1751 # self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 1826 try: 1827 m = Message() 1828 m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) 1829 m["userAccountControl"] = MessageElement( 1830 str(UF_NORMAL_ACCOUNT), 1831 FLAG_MOD_REPLACE, "userAccountControl") 1832 ldb.modify(m) 1833 except LdbError, (num, _): 1834 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 1752 1835 1753 1836 m = Message() … … 1765 1848 ATYPE_NORMAL_ACCOUNT) 1766 1849 self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE == 0) 1850 1851 m = Message() 1852 m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) 1853 m["userAccountControl"] = MessageElement( 1854 str(UF_ACCOUNTDISABLE), 1855 FLAG_MOD_REPLACE, "userAccountControl") 1856 ldb.modify(m) 1857 1858 res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, 1859 scope=SCOPE_BASE, 1860 attrs=["sAMAccountType", "userAccountControl"]) 1861 self.assertTrue(len(res1) == 1) 1862 self.assertEquals(int(res1[0]["sAMAccountType"][0]), 1863 ATYPE_NORMAL_ACCOUNT) 1864 self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_NORMAL_ACCOUNT != 0) 1865 self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE != 0) 1866 1867 m = Message() 1868 m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) 1869 m["lockoutTime"] = MessageElement(str(samba.unix2nttime(0)), FLAG_MOD_REPLACE, "lockoutTime") 1870 m["pwdLastSet"] = MessageElement(str(samba.unix2nttime(0)), FLAG_MOD_REPLACE, "pwdLastSet") 1871 ldb.modify(m) 1872 1873 m = Message() 1874 m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) 1875 m["userAccountControl"] = MessageElement( 1876 str(UF_LOCKOUT | UF_PASSWORD_EXPIRED), 1877 FLAG_MOD_REPLACE, "userAccountControl") 1878 ldb.modify(m) 1879 1880 res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, 1881 scope=SCOPE_BASE, 1882 attrs=["sAMAccountType", "userAccountControl", "lockoutTime", "pwdLastSet"]) 1883 self.assertTrue(len(res1) == 1) 1884 self.assertEquals(int(res1[0]["sAMAccountType"][0]), 1885 ATYPE_NORMAL_ACCOUNT) 1886 self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_NORMAL_ACCOUNT != 0) 1887 self.assertTrue(int(res1[0]["userAccountControl"][0]) & (UF_LOCKOUT | UF_PASSWORD_EXPIRED) == 0) 1888 self.assertTrue(int(res1[0]["lockoutTime"][0]) == 0) 1889 self.assertTrue(int(res1[0]["pwdLastSet"][0]) == 0) 1767 1890 1768 1891 try: … … 1855 1978 ATYPE_WORKSTATION_TRUST) 1856 1979 1857 # This isn't supported yet in s4 - needs ACL module adaption 1858 # try: 1859 # m = Message() 1860 # m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) 1861 # m["userAccountControl"] = MessageElement( 1862 # str(UF_INTERDOMAIN_TRUST_ACCOUNT), 1863 # FLAG_MOD_REPLACE, "userAccountControl") 1864 # ldb.modify(m) 1865 # self.fail() 1866 # except LdbError, (num, _): 1867 # self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS) 1980 try: 1981 m = Message() 1982 m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) 1983 m["userAccountControl"] = MessageElement( 1984 str(UF_INTERDOMAIN_TRUST_ACCOUNT), 1985 FLAG_MOD_REPLACE, "userAccountControl") 1986 ldb.modify(m) 1987 self.fail() 1988 except LdbError, (num, _): 1989 self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS) 1990 1991 # "primaryGroupID" does not change if account type remains the same 1992 1993 # For a user account 1994 1995 ldb.add({ 1996 "dn": "cn=ldaptestuser2,cn=users," + self.base_dn, 1997 "objectclass": "user", 1998 "userAccountControl": str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD | UF_ACCOUNTDISABLE)}) 1999 2000 res1 = ldb.search("cn=ldaptestuser2,cn=users," + self.base_dn, 2001 scope=SCOPE_BASE, 2002 attrs=["userAccountControl"]) 2003 self.assertTrue(len(res1) == 1) 2004 self.assertEquals(int(res1[0]["userAccountControl"][0]), 2005 UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD | UF_ACCOUNTDISABLE) 2006 2007 m = Message() 2008 m.dn = Dn(ldb, "<SID=" + ldb.get_domain_sid() + "-" + str(DOMAIN_RID_ADMINS) + ">") 2009 m["member"] = MessageElement( 2010 "cn=ldaptestuser2,cn=users," + self.base_dn, FLAG_MOD_ADD, "member") 2011 ldb.modify(m) 2012 2013 m = Message() 2014 m.dn = Dn(ldb, "cn=ldaptestuser2,cn=users," + self.base_dn) 2015 m["primaryGroupID"] = MessageElement(str(DOMAIN_RID_ADMINS), 2016 FLAG_MOD_REPLACE, "primaryGroupID") 2017 ldb.modify(m) 2018 2019 m = Message() 2020 m.dn = Dn(ldb, "cn=ldaptestuser2,cn=users," + self.base_dn) 2021 m["userAccountControl"] = MessageElement( 2022 str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD), 2023 FLAG_MOD_REPLACE, "userAccountControl") 2024 ldb.modify(m) 2025 2026 res1 = ldb.search("cn=ldaptestuser2,cn=users," + self.base_dn, 2027 scope=SCOPE_BASE, 2028 attrs=["userAccountControl", "primaryGroupID"]) 2029 self.assertTrue(len(res1) == 1) 2030 self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE == 0) 2031 self.assertEquals(int(res1[0]["primaryGroupID"][0]), DOMAIN_RID_ADMINS) 2032 2033 # For a workstation account 2034 2035 res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, 2036 scope=SCOPE_BASE, 2037 attrs=["primaryGroupID"]) 2038 self.assertTrue(len(res1) == 1) 2039 self.assertEquals(int(res1[0]["primaryGroupID"][0]), DOMAIN_RID_DOMAIN_MEMBERS) 2040 2041 m = Message() 2042 m.dn = Dn(ldb, "<SID=" + ldb.get_domain_sid() + "-" + str(DOMAIN_RID_USERS) + ">") 2043 m["member"] = MessageElement( 2044 "cn=ldaptestcomputer,cn=computers," + self.base_dn, FLAG_MOD_ADD, "member") 2045 ldb.modify(m) 2046 2047 m = Message() 2048 m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) 2049 m["primaryGroupID"] = MessageElement(str(DOMAIN_RID_USERS), 2050 FLAG_MOD_REPLACE, "primaryGroupID") 2051 ldb.modify(m) 2052 2053 m = Message() 2054 m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) 2055 m["userAccountControl"] = MessageElement( 2056 str(UF_WORKSTATION_TRUST_ACCOUNT), 2057 FLAG_MOD_REPLACE, "userAccountControl") 2058 ldb.modify(m) 2059 2060 res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, 2061 scope=SCOPE_BASE, 2062 attrs=["primaryGroupID"]) 2063 self.assertTrue(len(res1) == 1) 2064 self.assertEquals(int(res1[0]["primaryGroupID"][0]), DOMAIN_RID_USERS) 1868 2065 1869 2066 delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) 2067 delete_force(self.ldb, "cn=ldaptestuser2,cn=users," + self.base_dn) 2068 delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) 2069 2070 def test_isCriticalSystemObject(self): 2071 """Test the isCriticalSystemObject behaviour""" 2072 print "Testing isCriticalSystemObject behaviour\n" 2073 2074 # Add tests 2075 2076 ldb.add({ 2077 "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, 2078 "objectclass": "computer"}) 2079 2080 res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, 2081 scope=SCOPE_BASE, 2082 attrs=["isCriticalSystemObject"]) 2083 self.assertTrue(len(res1) == 1) 2084 self.assertTrue("isCriticalSystemObject" not in res1[0]) 2085 2086 delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) 2087 2088 ldb.add({ 2089 "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, 2090 "objectclass": "computer", 2091 "userAccountControl": str(UF_WORKSTATION_TRUST_ACCOUNT)}) 2092 2093 res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, 2094 scope=SCOPE_BASE, 2095 attrs=["isCriticalSystemObject"]) 2096 self.assertTrue(len(res1) == 1) 2097 self.assertEquals(res1[0]["isCriticalSystemObject"][0], "FALSE") 2098 2099 delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) 2100 2101 ldb.add({ 2102 "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, 2103 "objectclass": "computer", 2104 "userAccountControl": str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PARTIAL_SECRETS_ACCOUNT)}) 2105 2106 res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, 2107 scope=SCOPE_BASE, 2108 attrs=["isCriticalSystemObject"]) 2109 self.assertTrue(len(res1) == 1) 2110 self.assertEquals(res1[0]["isCriticalSystemObject"][0], "TRUE") 2111 2112 delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) 2113 2114 ldb.add({ 2115 "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, 2116 "objectclass": "computer", 2117 "userAccountControl": str(UF_SERVER_TRUST_ACCOUNT)}) 2118 2119 res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, 2120 scope=SCOPE_BASE, 2121 attrs=["isCriticalSystemObject"]) 2122 self.assertTrue(len(res1) == 1) 2123 self.assertEquals(res1[0]["isCriticalSystemObject"][0], "TRUE") 2124 2125 # Modification tests 2126 2127 m = Message() 2128 m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) 2129 m["userAccountControl"] = MessageElement(str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD), 2130 FLAG_MOD_REPLACE, "userAccountControl") 2131 ldb.modify(m) 2132 2133 res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, 2134 scope=SCOPE_BASE, 2135 attrs=["isCriticalSystemObject"]) 2136 self.assertTrue(len(res1) == 1) 2137 self.assertEquals(res1[0]["isCriticalSystemObject"][0], "TRUE") 2138 2139 m = Message() 2140 m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) 2141 m["userAccountControl"] = MessageElement(str(UF_WORKSTATION_TRUST_ACCOUNT), 2142 FLAG_MOD_REPLACE, "userAccountControl") 2143 ldb.modify(m) 2144 2145 res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, 2146 scope=SCOPE_BASE, 2147 attrs=["isCriticalSystemObject"]) 2148 self.assertTrue(len(res1) == 1) 2149 self.assertEquals(res1[0]["isCriticalSystemObject"][0], "FALSE") 2150 2151 m = Message() 2152 m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) 2153 m["userAccountControl"] = MessageElement( 2154 str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PARTIAL_SECRETS_ACCOUNT), 2155 FLAG_MOD_REPLACE, "userAccountControl") 2156 ldb.modify(m) 2157 2158 res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, 2159 scope=SCOPE_BASE, 2160 attrs=["isCriticalSystemObject"]) 2161 self.assertTrue(len(res1) == 1) 2162 self.assertEquals(res1[0]["isCriticalSystemObject"][0], "TRUE") 2163 2164 m = Message() 2165 m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) 2166 m["userAccountControl"] = MessageElement(str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD), 2167 FLAG_MOD_REPLACE, "userAccountControl") 2168 ldb.modify(m) 2169 2170 res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, 2171 scope=SCOPE_BASE, 2172 attrs=["isCriticalSystemObject"]) 2173 self.assertTrue(len(res1) == 1) 2174 self.assertEquals(res1[0]["isCriticalSystemObject"][0], "TRUE") 2175 2176 m = Message() 2177 m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) 2178 m["userAccountControl"] = MessageElement(str(UF_SERVER_TRUST_ACCOUNT), 2179 FLAG_MOD_REPLACE, "userAccountControl") 2180 ldb.modify(m) 2181 2182 res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, 2183 scope=SCOPE_BASE, 2184 attrs=["isCriticalSystemObject"]) 2185 self.assertTrue(len(res1) == 1) 2186 self.assertEquals(res1[0]["isCriticalSystemObject"][0], "TRUE") 2187 2188 m = Message() 2189 m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) 2190 m["userAccountControl"] = MessageElement(str(UF_WORKSTATION_TRUST_ACCOUNT), 2191 FLAG_MOD_REPLACE, "userAccountControl") 2192 ldb.modify(m) 2193 2194 res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, 2195 scope=SCOPE_BASE, 2196 attrs=["isCriticalSystemObject"]) 2197 self.assertTrue(len(res1) == 1) 2198 self.assertEquals(res1[0]["isCriticalSystemObject"][0], "FALSE") 2199 1870 2200 delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) 1871 2201 … … 2207 2537 self.assertEquals(res[0]["dNSHostName"][0], "testname2.testdom") 2208 2538 self.assertEquals(res[0]["sAMAccountName"][0], "testname2$") 2209 self.assertTrue(res[0]["servicePrincipalName"][0] == "HOST/testname2" or 2210 res[0]["servicePrincipalName"][1] == "HOST/testname2") 2211 self.assertTrue(res[0]["servicePrincipalName"][0] == "HOST/testname2.testdom" or 2212 res[0]["servicePrincipalName"][1] == "HOST/testname2.testdom") 2539 self.assertTrue(len(res[0]["servicePrincipalName"]) == 2) 2540 self.assertTrue("HOST/testname2" in res[0]["servicePrincipalName"]) 2541 self.assertTrue("HOST/testname2.testdom" in res[0]["servicePrincipalName"]) 2542 2543 m = Message() 2544 m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) 2545 m["servicePrincipalName"] = MessageElement("HOST/testname2.testdom", 2546 FLAG_MOD_ADD, "servicePrincipalName") 2547 try: 2548 ldb.modify(m) 2549 self.fail() 2550 except LdbError, (num, _): 2551 self.assertEquals(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS) 2552 2553 m = Message() 2554 m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) 2555 m["servicePrincipalName"] = MessageElement("HOST/testname3", 2556 FLAG_MOD_ADD, "servicePrincipalName") 2557 ldb.modify(m) 2558 2559 res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, 2560 scope=SCOPE_BASE, attrs=["dNSHostName", "sAMAccountName", "servicePrincipalName"]) 2561 self.assertTrue(len(res) == 1) 2562 self.assertEquals(res[0]["dNSHostName"][0], "testname2.testdom") 2563 self.assertEquals(res[0]["sAMAccountName"][0], "testname2$") 2564 self.assertTrue(len(res[0]["servicePrincipalName"]) == 3) 2565 self.assertTrue("HOST/testname2" in res[0]["servicePrincipalName"]) 2566 self.assertTrue("HOST/testname3" in res[0]["servicePrincipalName"]) 2567 self.assertTrue("HOST/testname2.testdom" in res[0]["servicePrincipalName"]) 2568 2569 m = Message() 2570 m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) 2571 m["dNSHostName"] = MessageElement("testname3.testdom", 2572 FLAG_MOD_REPLACE, "dNSHostName") 2573 m["servicePrincipalName"] = MessageElement("HOST/testname3.testdom", 2574 FLAG_MOD_ADD, "servicePrincipalName") 2575 ldb.modify(m) 2576 2577 res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, 2578 scope=SCOPE_BASE, attrs=["dNSHostName", "sAMAccountName", "servicePrincipalName"]) 2579 self.assertTrue(len(res) == 1) 2580 self.assertEquals(res[0]["dNSHostName"][0], "testname3.testdom") 2581 self.assertEquals(res[0]["sAMAccountName"][0], "testname2$") 2582 self.assertTrue(len(res[0]["servicePrincipalName"]) == 3) 2583 self.assertTrue("HOST/testname2" in res[0]["servicePrincipalName"]) 2584 self.assertTrue("HOST/testname3" in res[0]["servicePrincipalName"]) 2585 self.assertTrue("HOST/testname3.testdom" in res[0]["servicePrincipalName"]) 2213 2586 2214 2587 delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) … … 2216 2589 def test_sam_description_attribute(self): 2217 2590 """Test SAM description attribute""" 2218 print "Test SAM description attribute" ""2591 print "Test SAM description attribute" 2219 2592 2220 2593 self.ldb.add({ 2221 2594 "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, 2222 "description": "desc2",2223 2595 "objectclass": "group", 2224 "description": "desc1"}) 2596 "description": "desc1" 2597 }) 2225 2598 2226 2599 res = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, … … 2382 2755 2383 2756 2757 def test_fSMORoleOwner_attribute(self): 2758 """Test fSMORoleOwner attribute""" 2759 print "Test fSMORoleOwner attribute" 2760 2761 ds_service_name = self.ldb.get_dsServiceName() 2762 2763 # The "fSMORoleOwner" attribute can only be set to "nTDSDSA" entries, 2764 # invalid DNs return ERR_UNWILLING_TO_PERFORM 2765 2766 try: 2767 self.ldb.add({ 2768 "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, 2769 "objectclass": "group", 2770 "fSMORoleOwner": self.base_dn}) 2771 self.fail() 2772 except LdbError, (num, _): 2773 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 2774 2775 try: 2776 self.ldb.add({ 2777 "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, 2778 "objectclass": "group", 2779 "fSMORoleOwner": [] }) 2780 self.fail() 2781 except LdbError, (num, _): 2782 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) 2783 2784 # We are able to set it to a valid "nTDSDSA" entry if the server is 2785 # capable of handling the role 2786 2787 self.ldb.add({ 2788 "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, 2789 "objectclass": "group", 2790 "fSMORoleOwner": ds_service_name }) 2791 2792 delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) 2793 2794 self.ldb.add({ 2795 "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, 2796 "objectclass": "group" }) 2797 2798 m = Message() 2799 m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) 2800 m.add(MessageElement(self.base_dn, FLAG_MOD_REPLACE, "fSMORoleOwner")) 2801 try: 2802 ldb.modify(m) 2803 self.fail() 2804 except LdbError, (num, _): 2805 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 2806 2807 m = Message() 2808 m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) 2809 m.add(MessageElement([], FLAG_MOD_REPLACE, "fSMORoleOwner")) 2810 try: 2811 ldb.modify(m) 2812 self.fail() 2813 except LdbError, (num, _): 2814 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) 2815 2816 # We are able to set it to a valid "nTDSDSA" entry if the server is 2817 # capable of handling the role 2818 2819 m = Message() 2820 m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) 2821 m.add(MessageElement(ds_service_name, FLAG_MOD_REPLACE, "fSMORoleOwner")) 2822 ldb.modify(m) 2823 2824 # A clean-out works on plain entries, not master (schema, PDC...) DNs 2825 2826 m = Message() 2827 m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) 2828 m.add(MessageElement([], FLAG_MOD_DELETE, "fSMORoleOwner")) 2829 ldb.modify(m) 2830 2831 delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) 2832 2833 def test_protected_sid_objects(self): 2834 """Test deletion of objects with RID < 1000""" 2835 # a list of some well-known sids 2836 # objects in Builtin are aready covered by objectclass 2837 protected_list = [ 2838 ["CN=Domain Admins","CN=Users,"], 2839 ["CN=Schema Admins","CN=Users,"], 2840 ["CN=Enterprise Admins","CN=Users,"], 2841 ["CN=Administrator","CN=Users,"], 2842 ["CN=Domain Controllers","CN=Users,"], 2843 ] 2844 2845 2846 2847 for pr_object in protected_list: 2848 try: 2849 self.ldb.delete(pr_object[0] + "," + pr_object[1] + self.base_dn) 2850 except LdbError, (num, _): 2851 self.assertEquals(num, ERR_OTHER) 2852 else: 2853 self.fail("Deleted " + pr_object[0]) 2854 2855 try: 2856 self.ldb.rename(pr_object[0] + "," + pr_object[1] + self.base_dn, 2857 pr_object[0] + "2," + pr_object[1] + self.base_dn) 2858 except LdbError, (num, _): 2859 self.fail("Could not rename " + pr_object[0]) 2860 2861 self.ldb.rename(pr_object[0] + "2," + pr_object[1] + self.base_dn, 2862 pr_object[0] + "," + pr_object[1] + self.base_dn) 2863 2864 def test_new_user_default_attributes(self): 2865 """Test default attributes for new user objects""" 2866 print "Test default attributes for new User objects\n" 2867 2868 user_name = "ldaptestuser" 2869 user_dn = "CN=%s,CN=Users,%s" % (user_name, self.base_dn) 2870 ldb.add({ 2871 "dn": user_dn, 2872 "objectclass": "user", 2873 "sAMAccountName": user_name}) 2874 2875 res = ldb.search(user_dn, scope=SCOPE_BASE) 2876 self.assertTrue(len(res) == 1) 2877 user_obj = res[0] 2878 2879 expected_attrs = {"primaryGroupID": MessageElement(["513"]), 2880 "logonCount": MessageElement(["0"]), 2881 "cn": MessageElement([user_name]), 2882 "countryCode": MessageElement(["0"]), 2883 "objectClass": MessageElement(["top","person","organizationalPerson","user"]), 2884 "instanceType": MessageElement(["4"]), 2885 "distinguishedName": MessageElement([user_dn]), 2886 "sAMAccountType": MessageElement(["805306368"]), 2887 "objectSid": "**SKIP**", 2888 "whenCreated": "**SKIP**", 2889 "uSNCreated": "**SKIP**", 2890 "badPasswordTime": MessageElement(["0"]), 2891 "dn": Dn(ldb, user_dn), 2892 "pwdLastSet": MessageElement(["0"]), 2893 "sAMAccountName": MessageElement([user_name]), 2894 "objectCategory": MessageElement(["CN=Person,%s" % ldb.get_schema_basedn().get_linearized()]), 2895 "objectGUID": "**SKIP**", 2896 "whenChanged": "**SKIP**", 2897 "badPwdCount": MessageElement(["0"]), 2898 "accountExpires": MessageElement(["9223372036854775807"]), 2899 "name": MessageElement([user_name]), 2900 "codePage": MessageElement(["0"]), 2901 "userAccountControl": MessageElement(["546"]), 2902 "lastLogon": MessageElement(["0"]), 2903 "uSNChanged": "**SKIP**", 2904 "lastLogoff": MessageElement(["0"])} 2905 # assert we have expected attribute names 2906 actual_names = set(user_obj.keys()) 2907 # Samba does not use 'dSCorePropagationData', so skip it 2908 actual_names -= set(['dSCorePropagationData']) 2909 self.assertEqual(set(expected_attrs.keys()), actual_names, "Actual object does not have expected attributes") 2910 # check attribute values 2911 for name in expected_attrs.keys(): 2912 actual_val = user_obj.get(name) 2913 self.assertFalse(actual_val is None, "No value for attribute '%s'" % name) 2914 expected_val = expected_attrs[name] 2915 if expected_val == "**SKIP**": 2916 # "**ANY**" values means "any" 2917 continue 2918 self.assertEqual(expected_val, actual_val, 2919 "Unexpected value for '%s'" % name) 2920 # clean up 2921 delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) 2922 2923 2384 2924 if not "://" in host: 2385 2925 if os.path.isfile(host): … … 2390 2930 ldb = SamDB(host, credentials=creds, session_info=system_session(lp), lp=lp) 2391 2931 2392 runner = SubunitTestRunner() 2393 rc = 0 2394 if not runner.run(unittest.makeSuite(SamTests)).wasSuccessful(): 2395 rc = 1 2396 sys.exit(rc) 2932 TestProgram(module=__name__, opts=subunitopts) -
vendor/current/source4/dsdb/tests/python/sec_descriptor.py
r740 r988 11 11 sys.path.insert(0, "bin/python") 12 12 import samba 13 samba.ensure_external_module("testtools", "testtools") 14 samba.ensure_external_module("subunit", "subunit/python") 13 14 from samba.tests.subunitrun import SubunitOptions, TestProgram 15 15 16 16 import samba.getopt as options … … 20 20 21 21 # For running the test unit 22 from samba.ndr import ndr_pack 22 from samba.ndr import ndr_pack, ndr_unpack 23 23 from samba.dcerpc import security 24 24 25 25 from samba import gensec, sd_utils 26 26 from samba.samdb import SamDB 27 from samba.credentials import Credentials 27 from samba.credentials import Credentials, DONT_USE_KERBEROS 28 28 from samba.auth import system_session 29 29 from samba.dsdb import DS_DOMAIN_FUNCTION_2008 30 30 from samba.dcerpc.security import ( 31 31 SECINFO_OWNER, SECINFO_GROUP, SECINFO_DACL, SECINFO_SACL) 32 from subunit.run import SubunitTestRunner33 32 import samba.tests 34 33 from samba.tests import delete_force 35 import unittest36 34 37 35 parser = optparse.OptionParser("sec_descriptor.py [options] <host>") … … 43 41 credopts = options.CredentialsOptions(parser) 44 42 parser.add_option_group(credopts) 43 subunitopts = SubunitOptions(parser) 44 parser.add_option_group(subunitopts) 45 45 46 opts, args = parser.parse_args() 46 47 … … 137 138 creds_tmp.set_gensec_features(creds_tmp.get_gensec_features() 138 139 | gensec.FEATURE_SEAL) 140 creds_tmp.set_kerberos_state(DONT_USE_KERBEROS) # kinit is too expensive to use in a tight loop 139 141 ldb_target = SamDB(url=host, credentials=creds_tmp, lp=lp) 140 142 return ldb_target … … 142 144 def setUp(self): 143 145 super(DescriptorTests, self).setUp() 144 self.ldb_admin = ldb 145 self.base_dn = ldb.domain_dn() 146 self.ldb_admin = SamDB(host, credentials=creds, session_info=system_session(lp), lp=lp, 147 options=ldb_options) 148 self.base_dn = self.ldb_admin.domain_dn() 146 149 self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized() 147 150 self.schema_dn = self.ldb_admin.get_schema_basedn().get_linearized() 148 151 self.domain_sid = security.dom_sid(self.ldb_admin.get_domain_sid()) 149 self.sd_utils = sd_utils.SDUtils( ldb)152 self.sd_utils = sd_utils.SDUtils(self.ldb_admin) 150 153 print "baseDN: %s" % self.base_dn 151 154 … … 200 203 201 204 self.ldb_admin.add_remove_group_members("Enterprise Admins", 202 "testuser1,testuser5,testuser6,testuser8",205 ["testuser1", "testuser5", "testuser6", "testuser8"], 203 206 add_members_operation=True) 204 207 self.ldb_admin.add_remove_group_members("Domain Admins", 205 "testuser2,testuser5,testuser6,testuser7",208 ["testuser2","testuser5","testuser6","testuser7"], 206 209 add_members_operation=True) 207 210 self.ldb_admin.add_remove_group_members("Schema Admins", 208 "testuser3,testuser6,testuser7,testuser8",211 ["testuser3","testuser6","testuser7","testuser8"], 209 212 add_members_operation=True) 210 213 … … 313 316 }, 314 317 } 315 # Discover ' msDS-Behavior-Version'316 res = self.ldb_admin.search(base= self.base_dn, expression="distinguishedName=%s" % self.base_dn, \317 attrs=['msDS-Behavior-Version'])318 res = int(res[0][' msDS-Behavior-Version'][0])318 # Discover 'domainControllerFunctionality' 319 res = self.ldb_admin.search(base="", scope=SCOPE_BASE, 320 attrs=['domainControllerFunctionality']) 321 res = int(res[0]['domainControllerFunctionality'][0]) 319 322 if res < DS_DOMAIN_FUNCTION_2008: 320 323 self.DS_BEHAVIOR = "ds_behavior_win2003" … … 1848 1851 self.assertFalse("G:" in desc_sddl) 1849 1852 1853 def test_311(self): 1854 sd_flags = (SECINFO_OWNER | 1855 SECINFO_GROUP | 1856 SECINFO_DACL | 1857 SECINFO_SACL) 1858 1859 res = self.ldb_admin.search(self.base_dn, SCOPE_BASE, None, 1860 [], controls=None) 1861 self.assertFalse("nTSecurityDescriptor" in res[0]) 1862 1863 res = self.ldb_admin.search(self.base_dn, SCOPE_BASE, None, 1864 ["name"], controls=None) 1865 self.assertFalse("nTSecurityDescriptor" in res[0]) 1866 1867 res = self.ldb_admin.search(self.base_dn, SCOPE_BASE, None, 1868 ["name"], controls=["sd_flags:1:%d" % (sd_flags)]) 1869 self.assertFalse("nTSecurityDescriptor" in res[0]) 1870 1871 res = self.ldb_admin.search(self.base_dn, SCOPE_BASE, None, 1872 [], controls=["sd_flags:1:%d" % (sd_flags)]) 1873 self.assertTrue("nTSecurityDescriptor" in res[0]) 1874 tmp = res[0]["nTSecurityDescriptor"][0] 1875 sd = ndr_unpack(security.descriptor, tmp) 1876 sddl = sd.as_sddl(self.sd_utils.domain_sid) 1877 self.assertTrue("O:" in sddl) 1878 self.assertTrue("G:" in sddl) 1879 self.assertTrue("D:" in sddl) 1880 self.assertTrue("S:" in sddl) 1881 1882 res = self.ldb_admin.search(self.base_dn, SCOPE_BASE, None, 1883 ["*"], controls=["sd_flags:1:%d" % (sd_flags)]) 1884 self.assertTrue("nTSecurityDescriptor" in res[0]) 1885 tmp = res[0]["nTSecurityDescriptor"][0] 1886 sd = ndr_unpack(security.descriptor, tmp) 1887 sddl = sd.as_sddl(self.sd_utils.domain_sid) 1888 self.assertTrue("O:" in sddl) 1889 self.assertTrue("G:" in sddl) 1890 self.assertTrue("D:" in sddl) 1891 self.assertTrue("S:" in sddl) 1892 1893 res = self.ldb_admin.search(self.base_dn, SCOPE_BASE, None, 1894 ["nTSecurityDescriptor", "*"], controls=["sd_flags:1:%d" % (sd_flags)]) 1895 self.assertTrue("nTSecurityDescriptor" in res[0]) 1896 tmp = res[0]["nTSecurityDescriptor"][0] 1897 sd = ndr_unpack(security.descriptor, tmp) 1898 sddl = sd.as_sddl(self.sd_utils.domain_sid) 1899 self.assertTrue("O:" in sddl) 1900 self.assertTrue("G:" in sddl) 1901 self.assertTrue("D:" in sddl) 1902 self.assertTrue("S:" in sddl) 1903 1904 res = self.ldb_admin.search(self.base_dn, SCOPE_BASE, None, 1905 ["*", "nTSecurityDescriptor"], controls=["sd_flags:1:%d" % (sd_flags)]) 1906 self.assertTrue("nTSecurityDescriptor" in res[0]) 1907 tmp = res[0]["nTSecurityDescriptor"][0] 1908 sd = ndr_unpack(security.descriptor, tmp) 1909 sddl = sd.as_sddl(self.sd_utils.domain_sid) 1910 self.assertTrue("O:" in sddl) 1911 self.assertTrue("G:" in sddl) 1912 self.assertTrue("D:" in sddl) 1913 self.assertTrue("S:" in sddl) 1914 1915 res = self.ldb_admin.search(self.base_dn, SCOPE_BASE, None, 1916 ["nTSecurityDescriptor", "name"], controls=["sd_flags:1:%d" % (sd_flags)]) 1917 self.assertTrue("nTSecurityDescriptor" in res[0]) 1918 tmp = res[0]["nTSecurityDescriptor"][0] 1919 sd = ndr_unpack(security.descriptor, tmp) 1920 sddl = sd.as_sddl(self.sd_utils.domain_sid) 1921 self.assertTrue("O:" in sddl) 1922 self.assertTrue("G:" in sddl) 1923 self.assertTrue("D:" in sddl) 1924 self.assertTrue("S:" in sddl) 1925 1926 res = self.ldb_admin.search(self.base_dn, SCOPE_BASE, None, 1927 ["name", "nTSecurityDescriptor"], controls=["sd_flags:1:%d" % (sd_flags)]) 1928 self.assertTrue("nTSecurityDescriptor" in res[0]) 1929 tmp = res[0]["nTSecurityDescriptor"][0] 1930 sd = ndr_unpack(security.descriptor, tmp) 1931 sddl = sd.as_sddl(self.sd_utils.domain_sid) 1932 self.assertTrue("O:" in sddl) 1933 self.assertTrue("G:" in sddl) 1934 self.assertTrue("D:" in sddl) 1935 self.assertTrue("S:" in sddl) 1936 1937 res = self.ldb_admin.search(self.base_dn, SCOPE_BASE, None, 1938 ["nTSecurityDescriptor"], controls=None) 1939 self.assertTrue("nTSecurityDescriptor" in res[0]) 1940 tmp = res[0]["nTSecurityDescriptor"][0] 1941 sd = ndr_unpack(security.descriptor, tmp) 1942 sddl = sd.as_sddl(self.sd_utils.domain_sid) 1943 self.assertTrue("O:" in sddl) 1944 self.assertTrue("G:" in sddl) 1945 self.assertTrue("D:" in sddl) 1946 self.assertTrue("S:" in sddl) 1947 1948 res = self.ldb_admin.search(self.base_dn, SCOPE_BASE, None, 1949 ["name", "nTSecurityDescriptor"], controls=None) 1950 self.assertTrue("nTSecurityDescriptor" in res[0]) 1951 tmp = res[0]["nTSecurityDescriptor"][0] 1952 sd = ndr_unpack(security.descriptor, tmp) 1953 sddl = sd.as_sddl(self.sd_utils.domain_sid) 1954 self.assertTrue("O:" in sddl) 1955 self.assertTrue("G:" in sddl) 1956 self.assertTrue("D:" in sddl) 1957 self.assertTrue("S:" in sddl) 1958 1959 res = self.ldb_admin.search(self.base_dn, SCOPE_BASE, None, 1960 ["nTSecurityDescriptor", "name"], controls=None) 1961 self.assertTrue("nTSecurityDescriptor" in res[0]) 1962 tmp = res[0]["nTSecurityDescriptor"][0] 1963 sd = ndr_unpack(security.descriptor, tmp) 1964 sddl = sd.as_sddl(self.sd_utils.domain_sid) 1965 self.assertTrue("O:" in sddl) 1966 self.assertTrue("G:" in sddl) 1967 self.assertTrue("D:" in sddl) 1968 self.assertTrue("S:" in sddl) 1969 1970 def test_312(self): 1971 """This search is done by the windows dc join...""" 1972 1973 res = self.ldb_admin.search(self.base_dn, SCOPE_BASE, None, ["1.1"], 1974 controls=["extended_dn:1:0", "sd_flags:1:0", "search_options:1:1"]) 1975 self.assertFalse("nTSecurityDescriptor" in res[0]) 1850 1976 1851 1977 class RightsAttributesTests(DescriptorTests): … … 1865 1991 self.ldb_admin.newuser("testuser_attr2", "samba123@") 1866 1992 self.ldb_admin.add_remove_group_members("Domain Admins", 1867 "testuser_attr2",1993 ["testuser_attr2"], 1868 1994 add_members_operation=True) 1869 1995 … … 1960 2086 self.assertTrue("managedBy" in res[0]["allowedAttributesEffective"]) 1961 2087 2088 class SdAutoInheritTests(DescriptorTests): 2089 def deleteAll(self): 2090 delete_force(self.ldb_admin, self.sub_dn) 2091 delete_force(self.ldb_admin, self.ou_dn) 2092 2093 def setUp(self): 2094 super(SdAutoInheritTests, self).setUp() 2095 self.ou_dn = "OU=test_SdAutoInherit_ou," + self.base_dn 2096 self.sub_dn = "OU=test_sub," + self.ou_dn 2097 self.deleteAll() 2098 2099 def test_301(self): 2100 """ Modify a descriptor with OWNER_SECURITY_INFORMATION set. 2101 See that only the owner has been changed. 2102 """ 2103 attrs = ["nTSecurityDescriptor", "replPropertyMetaData", "uSNChanged"] 2104 controls=["sd_flags:1:%d" % (SECINFO_DACL)] 2105 ace = "(A;CI;CC;;;NU)" 2106 sub_ace = "(A;CIID;CC;;;NU)" 2107 sd_sddl = "O:BAG:BAD:P(A;CI;0x000f01ff;;;AU)" 2108 sd = security.descriptor.from_sddl(sd_sddl, self.domain_sid) 2109 2110 self.ldb_admin.create_ou(self.ou_dn,sd=sd) 2111 self.ldb_admin.create_ou(self.sub_dn) 2112 2113 ou_res0 = self.sd_utils.ldb.search(self.ou_dn, SCOPE_BASE, 2114 None, attrs, controls=controls) 2115 sub_res0 = self.sd_utils.ldb.search(self.sub_dn, SCOPE_BASE, 2116 None, attrs, controls=controls) 2117 2118 ou_sd0 = ndr_unpack(security.descriptor, ou_res0[0]["nTSecurityDescriptor"][0]) 2119 sub_sd0 = ndr_unpack(security.descriptor, sub_res0[0]["nTSecurityDescriptor"][0]) 2120 2121 ou_sddl0 = ou_sd0.as_sddl(self.domain_sid) 2122 sub_sddl0 = sub_sd0.as_sddl(self.domain_sid) 2123 2124 self.assertFalse(ace in ou_sddl0) 2125 self.assertFalse(ace in sub_sddl0) 2126 2127 ou_sddl1 = (ou_sddl0[:ou_sddl0.index("(")] + ace + 2128 ou_sddl0[ou_sddl0.index("("):]) 2129 2130 sub_sddl1 = (sub_sddl0[:sub_sddl0.index("(")] + ace + 2131 sub_sddl0[sub_sddl0.index("("):]) 2132 2133 self.sd_utils.modify_sd_on_dn(self.ou_dn, ou_sddl1, controls=controls) 2134 2135 sub_res2 = self.sd_utils.ldb.search(self.sub_dn, SCOPE_BASE, 2136 None, attrs, controls=controls) 2137 ou_res2 = self.sd_utils.ldb.search(self.ou_dn, SCOPE_BASE, 2138 None, attrs, controls=controls) 2139 2140 ou_sd2 = ndr_unpack(security.descriptor, ou_res2[0]["nTSecurityDescriptor"][0]) 2141 sub_sd2 = ndr_unpack(security.descriptor, sub_res2[0]["nTSecurityDescriptor"][0]) 2142 2143 ou_sddl2 = ou_sd2.as_sddl(self.domain_sid) 2144 sub_sddl2 = sub_sd2.as_sddl(self.domain_sid) 2145 2146 self.assertFalse(ou_sddl2 == ou_sddl0) 2147 self.assertFalse(sub_sddl2 == sub_sddl0) 2148 2149 if ace not in ou_sddl2: 2150 print "ou0: %s" % ou_sddl0 2151 print "ou2: %s" % ou_sddl2 2152 2153 if sub_ace not in sub_sddl2: 2154 print "sub0: %s" % sub_sddl0 2155 print "sub2: %s" % sub_sddl2 2156 2157 self.assertTrue(ace in ou_sddl2) 2158 self.assertTrue(sub_ace in sub_sddl2) 2159 2160 ou_usn0 = int(ou_res0[0]["uSNChanged"][0]) 2161 ou_usn2 = int(ou_res2[0]["uSNChanged"][0]) 2162 self.assertTrue(ou_usn2 > ou_usn0) 2163 2164 sub_usn0 = int(sub_res0[0]["uSNChanged"][0]) 2165 sub_usn2 = int(sub_res2[0]["uSNChanged"][0]) 2166 self.assertTrue(sub_usn2 == sub_usn0) 2167 1962 2168 if not "://" in host: 1963 2169 if os.path.isfile(host): … … 1970 2176 ldb_options = ["modules:paged_searches"] 1971 2177 1972 ldb = SamDB(host, 1973 credentials=creds, 1974 session_info=system_session(lp), 1975 lp=lp, 1976 options=ldb_options) 1977 1978 runner = SubunitTestRunner() 1979 rc = 0 1980 if not runner.run(unittest.makeSuite(OwnerGroupDescriptorTests)).wasSuccessful(): 1981 rc = 1 1982 if not runner.run(unittest.makeSuite(DaclDescriptorTests)).wasSuccessful(): 1983 rc = 1 1984 if not runner.run(unittest.makeSuite(SdFlagsDescriptorTests)).wasSuccessful(): 1985 rc = 1 1986 if not runner.run(unittest.makeSuite(RightsAttributesTests)).wasSuccessful(): 1987 rc = 1 1988 sys.exit(rc) 2178 TestProgram(module=__name__, opts=subunitopts) -
vendor/current/source4/dsdb/tests/python/token_group.py
r740 r988 9 9 sys.path.insert(0, "bin/python") 10 10 import samba 11 samba.ensure_external_module("testtools", "testtools") 12 samba.ensure_external_module("subunit", "subunit/python") 11 12 from samba.tests.subunitrun import SubunitOptions, TestProgram 13 13 14 14 import samba.getopt as options 15 15 16 16 from samba.auth import system_session 17 from samba import ldb 17 from samba import ldb, dsdb 18 18 from samba.samdb import SamDB 19 19 from samba.auth import AuthContext 20 from samba.ndr import ndr_ pack, ndr_unpack20 from samba.ndr import ndr_unpack 21 21 from samba import gensec 22 from samba.credentials import Credentials 23 24 from subunit.run import SubunitTestRunner 25 import unittest 22 from samba.credentials import Credentials, DONT_USE_KERBEROS 23 from samba.dsdb import GTYPE_SECURITY_GLOBAL_GROUP, GTYPE_SECURITY_UNIVERSAL_GROUP 26 24 import samba.tests 27 28 from samba.dcerpc import security 25 from samba.tests import delete_force 26 29 27 from samba.auth import AUTH_SESSION_INFO_DEFAULT_GROUPS, AUTH_SESSION_INFO_AUTHENTICATED, AUTH_SESSION_INFO_SIMPLE_PRIVILEGES 30 28 … … 37 35 credopts = options.CredentialsOptions(parser) 38 36 parser.add_option_group(credopts) 37 subunitopts = SubunitOptions(parser) 38 parser.add_option_group(subunitopts) 39 39 opts, args = parser.parse_args() 40 40 … … 47 47 lp = sambaopts.get_loadparm() 48 48 creds = credopts.get_credentials(lp) 49 50 class TokenTest(samba.tests.TestCase): 49 creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL) 50 51 def closure(vSet, wSet, aSet): 52 for edge in aSet: 53 start, end = edge 54 if start in wSet: 55 if end not in wSet and end in vSet: 56 wSet.add(end) 57 closure(vSet, wSet, aSet) 58 59 class StaticTokenTest(samba.tests.TestCase): 51 60 52 61 def setUp(self): 53 super( TokenTest, self).setUp()54 self.ldb = samdb55 self.base_dn = s amdb.domain_dn()62 super(StaticTokenTest, self).setUp() 63 self.ldb = SamDB(url, credentials=creds, session_info=system_session(lp), lp=lp) 64 self.base_dn = self.ldb.domain_dn() 56 65 57 66 res = self.ldb.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"]) … … 79 88 self.assertEquals(len(res), 1) 80 89 81 print("Get ing tokenGroups from rootDSE")90 print("Getting tokenGroups from rootDSE") 82 91 tokengroups = [] 83 92 for sid in res[0]['tokenGroups']: … … 89 98 print("token sids don't match") 90 99 print("tokengroups: %s" % tokengroups) 91 print("calculated : %s" % self.user_sids) ;100 print("calculated : %s" % self.user_sids) 92 101 print("difference : %s" % sidset1.difference(sidset2)) 93 102 self.fail(msg="calculated groups don't match against rootDSE tokenGroups") 94 103 95 104 def test_dn_tokenGroups(self): 96 print("Get ing tokenGroups from user DN")105 print("Getting tokenGroups from user DN") 97 106 res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["tokenGroups"]) 98 107 self.assertEquals(len(res), 1) … … 106 115 if len(sidset1.difference(sidset2)): 107 116 print("token sids don't match") 108 print("tokengroups: %s" % tokengroups)109 print("calculated : %s" % sids);110 117 print("difference : %s" % sidset1.difference(sidset2)) 111 118 self.fail(msg="calculated groups don't match against user DN tokenGroups") 112 119 113 120 def test_pac_groups(self): 114 121 settings = {} … … 135 142 server_finished = False 136 143 server_to_client = "" 137 138 """Run the actual call loop"""144 145 # Run the actual call loop. 139 146 while client_finished == False and server_finished == False: 140 147 if not client_finished: … … 156 163 if len(sidset1.difference(sidset2)): 157 164 print("token sids don't match") 165 print("difference : %s" % sidset1.difference(sidset2)) 166 self.fail(msg="calculated groups don't match against user PAC tokenGroups") 167 168 class DynamicTokenTest(samba.tests.TestCase): 169 170 def get_creds(self, target_username, target_password): 171 creds_tmp = Credentials() 172 creds_tmp.set_username(target_username) 173 creds_tmp.set_password(target_password) 174 creds_tmp.set_domain(creds.get_domain()) 175 creds_tmp.set_realm(creds.get_realm()) 176 creds_tmp.set_workstation(creds.get_workstation()) 177 creds_tmp.set_gensec_features(creds_tmp.get_gensec_features() 178 | gensec.FEATURE_SEAL) 179 return creds_tmp 180 181 def get_ldb_connection(self, target_username, target_password): 182 creds_tmp = self.get_creds(target_username, target_password) 183 ldb_target = SamDB(url=url, credentials=creds_tmp, lp=lp) 184 return ldb_target 185 186 def setUp(self): 187 super(DynamicTokenTest, self).setUp() 188 self.admin_ldb = SamDB(url, credentials=creds, session_info=system_session(lp), lp=lp) 189 190 self.base_dn = self.admin_ldb.domain_dn() 191 192 self.test_user = "tokengroups_user1" 193 self.test_user_pass = "samba123@" 194 self.admin_ldb.newuser(self.test_user, self.test_user_pass) 195 self.test_group0 = "tokengroups_group0" 196 self.admin_ldb.newgroup(self.test_group0, grouptype=dsdb.GTYPE_SECURITY_DOMAIN_LOCAL_GROUP) 197 res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group0, self.base_dn), 198 attrs=["objectSid"], scope=ldb.SCOPE_BASE) 199 self.test_group0_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0]) 200 201 self.admin_ldb.add_remove_group_members(self.test_group0, [self.test_user], 202 add_members_operation=True) 203 204 self.test_group1 = "tokengroups_group1" 205 self.admin_ldb.newgroup(self.test_group1, grouptype=dsdb.GTYPE_SECURITY_GLOBAL_GROUP) 206 res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group1, self.base_dn), 207 attrs=["objectSid"], scope=ldb.SCOPE_BASE) 208 self.test_group1_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0]) 209 210 self.admin_ldb.add_remove_group_members(self.test_group1, [self.test_user], 211 add_members_operation=True) 212 213 self.test_group2 = "tokengroups_group2" 214 self.admin_ldb.newgroup(self.test_group2, grouptype=dsdb.GTYPE_SECURITY_UNIVERSAL_GROUP) 215 216 res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group2, self.base_dn), 217 attrs=["objectSid"], scope=ldb.SCOPE_BASE) 218 self.test_group2_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0]) 219 220 self.admin_ldb.add_remove_group_members(self.test_group2, [self.test_user], 221 add_members_operation=True) 222 223 self.ldb = self.get_ldb_connection(self.test_user, self.test_user_pass) 224 225 res = self.ldb.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"]) 226 self.assertEquals(len(res), 1) 227 228 self.user_sid_dn = "<SID=%s>" % str(ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["tokenGroups"][0])) 229 230 res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=[]) 231 self.assertEquals(len(res), 1) 232 233 self.test_user_dn = res[0].dn 234 235 session_info_flags = ( AUTH_SESSION_INFO_DEFAULT_GROUPS | 236 AUTH_SESSION_INFO_AUTHENTICATED | 237 AUTH_SESSION_INFO_SIMPLE_PRIVILEGES) 238 session = samba.auth.user_session(self.ldb, lp_ctx=lp, dn=self.user_sid_dn, 239 session_info_flags=session_info_flags) 240 241 token = session.security_token 242 self.user_sids = [] 243 for s in token.sids: 244 self.user_sids.append(str(s)) 245 246 def tearDown(self): 247 super(DynamicTokenTest, self).tearDown() 248 delete_force(self.admin_ldb, "CN=%s,%s,%s" % 249 (self.test_user, "cn=users", self.base_dn)) 250 delete_force(self.admin_ldb, "CN=%s,%s,%s" % 251 (self.test_group0, "cn=users", self.base_dn)) 252 delete_force(self.admin_ldb, "CN=%s,%s,%s" % 253 (self.test_group1, "cn=users", self.base_dn)) 254 delete_force(self.admin_ldb, "CN=%s,%s,%s" % 255 (self.test_group2, "cn=users", self.base_dn)) 256 257 def test_rootDSE_tokenGroups(self): 258 """Testing rootDSE tokengroups against internal calculation""" 259 if not url.startswith("ldap"): 260 self.fail(msg="This test is only valid on ldap") 261 262 res = self.ldb.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"]) 263 self.assertEquals(len(res), 1) 264 265 print("Getting tokenGroups from rootDSE") 266 tokengroups = [] 267 for sid in res[0]['tokenGroups']: 268 tokengroups.append(str(ndr_unpack(samba.dcerpc.security.dom_sid, sid))) 269 270 sidset1 = set(tokengroups) 271 sidset2 = set(self.user_sids) 272 if len(sidset1.difference(sidset2)): 273 print("token sids don't match") 158 274 print("tokengroups: %s" % tokengroups) 159 print("calculated : %s" % sids); 275 print("calculated : %s" % self.user_sids) 276 print("difference : %s" % sidset1.difference(sidset2)) 277 self.fail(msg="calculated groups don't match against rootDSE tokenGroups") 278 279 def test_dn_tokenGroups(self): 280 print("Getting tokenGroups from user DN") 281 res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["tokenGroups"]) 282 self.assertEquals(len(res), 1) 283 284 dn_tokengroups = [] 285 for sid in res[0]['tokenGroups']: 286 dn_tokengroups.append(str(ndr_unpack(samba.dcerpc.security.dom_sid, sid))) 287 288 sidset1 = set(dn_tokengroups) 289 sidset2 = set(self.user_sids) 290 if len(sidset1.difference(sidset2)): 291 print("token sids don't match") 292 print("difference : %s" % sidset1.difference(sidset2)) 293 self.fail(msg="calculated groups don't match against user DN tokenGroups") 294 295 def test_pac_groups(self): 296 settings = {} 297 settings["lp_ctx"] = lp 298 settings["target_hostname"] = lp.get("netbios name") 299 300 gensec_client = gensec.Security.start_client(settings) 301 gensec_client.set_credentials(self.get_creds(self.test_user, self.test_user_pass)) 302 gensec_client.want_feature(gensec.FEATURE_SEAL) 303 gensec_client.start_mech_by_sasl_name("GSSAPI") 304 305 auth_context = AuthContext(lp_ctx=lp, ldb=self.ldb, methods=[]) 306 307 gensec_server = gensec.Security.start_server(settings, auth_context) 308 machine_creds = Credentials() 309 machine_creds.guess(lp) 310 machine_creds.set_machine_account(lp) 311 gensec_server.set_credentials(machine_creds) 312 313 gensec_server.want_feature(gensec.FEATURE_SEAL) 314 gensec_server.start_mech_by_sasl_name("GSSAPI") 315 316 client_finished = False 317 server_finished = False 318 server_to_client = "" 319 320 # Run the actual call loop. 321 while client_finished == False and server_finished == False: 322 if not client_finished: 323 print "running client gensec_update" 324 (client_finished, client_to_server) = gensec_client.update(server_to_client) 325 if not server_finished: 326 print "running server gensec_update" 327 (server_finished, server_to_client) = gensec_server.update(client_to_server) 328 329 session = gensec_server.session_info() 330 331 token = session.security_token 332 pac_sids = [] 333 for s in token.sids: 334 pac_sids.append(str(s)) 335 336 sidset1 = set(pac_sids) 337 sidset2 = set(self.user_sids) 338 if len(sidset1.difference(sidset2)): 339 print("token sids don't match") 160 340 print("difference : %s" % sidset1.difference(sidset2)) 161 341 self.fail(msg="calculated groups don't match against user PAC tokenGroups") 162 342 343 344 def test_tokenGroups_manual(self): 345 # Manually run the tokenGroups algorithm from MS-ADTS 3.1.1.4.5.19 and MS-DRSR 4.1.8.3 346 # and compare the result 347 res = self.admin_ldb.search(base=self.base_dn, scope=ldb.SCOPE_SUBTREE, 348 expression="(|(objectclass=user)(objectclass=group))", 349 attrs=["memberOf"]) 350 aSet = set() 351 aSetR = set() 352 vSet = set() 353 for obj in res: 354 if "memberOf" in obj: 355 for dn in obj["memberOf"]: 356 first = obj.dn.get_casefold() 357 second = ldb.Dn(self.admin_ldb, dn).get_casefold() 358 aSet.add((first, second)) 359 aSetR.add((second, first)) 360 vSet.add(first) 361 vSet.add(second) 362 363 res = self.admin_ldb.search(base=self.base_dn, scope=ldb.SCOPE_SUBTREE, 364 expression="(objectclass=user)", 365 attrs=["primaryGroupID"]) 366 for obj in res: 367 if "primaryGroupID" in obj: 368 sid = "%s-%d" % (self.admin_ldb.get_domain_sid(), int(obj["primaryGroupID"][0])) 369 res2 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE, 370 attrs=[]) 371 first = obj.dn.get_casefold() 372 second = res2[0].dn.get_casefold() 373 374 aSet.add((first, second)) 375 aSetR.add((second, first)) 376 vSet.add(first) 377 vSet.add(second) 378 379 wSet = set() 380 wSet.add(self.test_user_dn.get_casefold()) 381 closure(vSet, wSet, aSet) 382 wSet.remove(self.test_user_dn.get_casefold()) 383 384 tokenGroupsSet = set() 385 386 res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["tokenGroups"]) 387 self.assertEquals(len(res), 1) 388 389 dn_tokengroups = [] 390 for sid in res[0]['tokenGroups']: 391 sid = ndr_unpack(samba.dcerpc.security.dom_sid, sid) 392 res3 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE, 393 attrs=[]) 394 tokenGroupsSet.add(res3[0].dn.get_casefold()) 395 396 if len(wSet.difference(tokenGroupsSet)): 397 self.fail(msg="additional calculated: %s" % wSet.difference(tokenGroupsSet)) 398 399 if len(tokenGroupsSet.difference(wSet)): 400 self.fail(msg="additional tokenGroups: %s" % tokenGroupsSet.difference(wSet)) 401 402 403 def filtered_closure(self, wSet, filter_grouptype): 404 res = self.admin_ldb.search(base=self.base_dn, scope=ldb.SCOPE_SUBTREE, 405 expression="(|(objectclass=user)(objectclass=group))", 406 attrs=["memberOf"]) 407 aSet = set() 408 aSetR = set() 409 vSet = set() 410 for obj in res: 411 vSet.add(obj.dn.get_casefold()) 412 if "memberOf" in obj: 413 for dn in obj["memberOf"]: 414 first = obj.dn.get_casefold() 415 second = ldb.Dn(self.admin_ldb, dn).get_casefold() 416 aSet.add((first, second)) 417 aSetR.add((second, first)) 418 vSet.add(first) 419 vSet.add(second) 420 421 res = self.admin_ldb.search(base=self.base_dn, scope=ldb.SCOPE_SUBTREE, 422 expression="(objectclass=user)", 423 attrs=["primaryGroupID"]) 424 for obj in res: 425 if "primaryGroupID" in obj: 426 sid = "%s-%d" % (self.admin_ldb.get_domain_sid(), int(obj["primaryGroupID"][0])) 427 res2 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE, 428 attrs=[]) 429 first = obj.dn.get_casefold() 430 second = res2[0].dn.get_casefold() 431 432 aSet.add((first, second)) 433 aSetR.add((second, first)) 434 vSet.add(first) 435 vSet.add(second) 436 437 uSet = set() 438 for v in vSet: 439 res_group = self.admin_ldb.search(base=v, scope=ldb.SCOPE_BASE, 440 attrs=["groupType"], 441 expression="objectClass=group") 442 if len(res_group) == 1: 443 if hex(int(res_group[0]["groupType"][0]) & 0x00000000FFFFFFFF) == hex(filter_grouptype): 444 uSet.add(v) 445 else: 446 uSet.add(v) 447 448 closure(uSet, wSet, aSet) 449 450 451 def test_tokenGroupsGlobalAndUniversal_manual(self): 452 # Manually run the tokenGroups algorithm from MS-ADTS 3.1.1.4.5.19 and MS-DRSR 4.1.8.3 453 # and compare the result 454 455 # The variable names come from MS-ADTS May 15, 2014 456 457 S = set() 458 S.add(self.test_user_dn.get_casefold()) 459 460 self.filtered_closure(S, GTYPE_SECURITY_GLOBAL_GROUP) 461 462 T = set() 463 # Not really a SID, we do this on DNs... 464 for sid in S: 465 X = set() 466 X.add(sid) 467 self.filtered_closure(X, GTYPE_SECURITY_UNIVERSAL_GROUP) 468 469 T = T.union(X) 470 471 T.remove(self.test_user_dn.get_casefold()) 472 473 tokenGroupsSet = set() 474 475 res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["tokenGroupsGlobalAndUniversal"]) 476 self.assertEquals(len(res), 1) 477 478 dn_tokengroups = [] 479 for sid in res[0]['tokenGroupsGlobalAndUniversal']: 480 sid = ndr_unpack(samba.dcerpc.security.dom_sid, sid) 481 res3 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE, 482 attrs=[]) 483 tokenGroupsSet.add(res3[0].dn.get_casefold()) 484 485 if len(T.difference(tokenGroupsSet)): 486 self.fail(msg="additional calculated: %s" % T.difference(tokenGroupsSet)) 487 488 if len(tokenGroupsSet.difference(T)): 489 self.fail(msg="additional tokenGroupsGlobalAndUniversal: %s" % tokenGroupsSet.difference(T)) 163 490 164 491 if not "://" in url: … … 168 495 url = "ldap://%s" % url 169 496 170 samdb = SamDB(url, credentials=creds, session_info=system_session(lp), lp=lp) 171 172 runner = SubunitTestRunner() 173 rc = 0 174 if not runner.run(unittest.makeSuite(TokenTest)).wasSuccessful(): 175 rc = 1 176 sys.exit(rc) 497 TestProgram(module=__name__, opts=subunitopts) -
vendor/current/source4/dsdb/tests/python/urgent_replication.py
r740 r988 4 4 import optparse 5 5 import sys 6 import os7 8 6 sys.path.insert(0, "bin/python") 9 7 import samba 10 samba.ensure_external_module("testtools", "testtools") 11 samba.ensure_external_module("subunit", "subunit/python") 12 13 import samba.getopt as options 14 15 from samba.auth import system_session 8 from samba.tests.subunitrun import TestProgram, SubunitOptions 9 16 10 from ldb import (LdbError, ERR_NO_SUCH_OBJECT, Message, 17 11 MessageElement, Dn, FLAG_MOD_REPLACE) 18 from samba.samdb import SamDB19 12 import samba.tests 20 13 import samba.dsdb as dsdb 21 22 from subunit.run import SubunitTestRunner 23 import unittest 14 import samba.getopt as options 24 15 25 16 parser = optparse.OptionParser("urgent_replication.py [options] <host>") … … 27 18 parser.add_option_group(sambaopts) 28 19 parser.add_option_group(options.VersionOptions(parser)) 20 29 21 # use command line creds if available 30 22 credopts = options.CredentialsOptions(parser) 31 23 parser.add_option_group(credopts) 24 subunitopts = SubunitOptions(parser) 25 parser.add_option_group(subunitopts) 32 26 opts, args = parser.parse_args() 33 27 … … 38 32 host = args[0] 39 33 40 lp = sambaopts.get_loadparm()41 creds = credopts.get_credentials(lp)42 34 43 35 class UrgentReplicationTests(samba.tests.TestCase): … … 51 43 def setUp(self): 52 44 super(UrgentReplicationTests, self).setUp() 53 self.ldb = ldb54 self.base_dn = ldb.domain_dn()45 self.ldb = samba.tests.connect_samdb(host, global_schema=False) 46 self.base_dn = self.ldb.domain_dn() 55 47 56 48 print "baseDN: %s\n" % self.base_dn 57 49 58 50 def test_nonurgent_object(self): 59 """Test if the urgent replication is not activated 60 when handling a non urgent object""" 51 """Test if the urgent replication is not activated when handling a non urgent object.""" 61 52 self.ldb.add({ 62 53 "dn": "cn=nonurgenttest,cn=users," + self.base_dn, … … 65 56 "description":"nonurgenttest description"}) 66 57 67 # urgent replication should not be enabled when creating 58 # urgent replication should not be enabled when creating 68 59 res = self.ldb.load_partition_usn(self.base_dn) 69 60 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]) … … 71 62 # urgent replication should not be enabled when modifying 72 63 m = Message() 73 m.dn = Dn( ldb, "cn=nonurgenttest,cn=users," + self.base_dn)64 m.dn = Dn(self.ldb, "cn=nonurgenttest,cn=users," + self.base_dn) 74 65 m["description"] = MessageElement("new description", FLAG_MOD_REPLACE, 75 66 "description") 76 ldb.modify(m)67 self.ldb.modify(m) 77 68 res = self.ldb.load_partition_usn(self.base_dn) 78 69 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]) … … 83 74 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]) 84 75 85 86 76 def test_nTDSDSA_object(self): 87 '''Test if the urgent replication is activated88 when handling a nTDSDSA object'''89 self.ldb.add({90 "dn": "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn,77 """Test if the urgent replication is activated when handling a nTDSDSA object.""" 78 self.ldb.add({ 79 "dn": "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,%s" % 80 self.ldb.get_config_basedn(), 91 81 "objectclass":"server", 92 82 "cn":"test server", … … 108 98 # urgent replication should NOT be enabled when modifying 109 99 m = Message() 110 m.dn = Dn( ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)100 m.dn = Dn(self.ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn) 111 101 m["options"] = MessageElement("0", FLAG_MOD_REPLACE, 112 102 "options") 113 ldb.modify(m)103 self.ldb.modify(m) 114 104 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn) 115 105 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]) … … 122 112 self.delete_force(self.ldb, "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn) 123 113 124 125 114 def test_crossRef_object(self): 126 '''Test if the urgent replication is activated 127 when handling a crossRef object''' 115 """Test if the urgent replication is activated when handling a crossRef object.""" 128 116 self.ldb.add({ 129 117 "dn": "CN=test crossRef,CN=Partitions,CN=Configuration,"+ self.base_dn, 130 118 "objectClass": "crossRef", 131 119 "cn": "test crossRef", 132 "dnsRoot": lp.get("realm").lower(),120 "dnsRoot": self.get_loadparm().get("realm").lower(), 133 121 "instanceType": "4", 134 122 "nCName": self.base_dn, … … 143 131 # urgent replication should NOT be enabled when modifying 144 132 m = Message() 145 m.dn = Dn( ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)133 m.dn = Dn(self.ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn) 146 134 m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE, 147 135 "systemFlags") 148 ldb.modify(m)136 self.ldb.modify(m) 149 137 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn) 150 138 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]) … … 156 144 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]) 157 145 158 159 160 146 def test_attributeSchema_object(self): 161 '''Test if the urgent replication is activated 162 when handling an attributeSchema object''' 147 """Test if the urgent replication is activated when handling an attributeSchema object""" 163 148 164 149 try: … … 187 172 print "Not testing urgent replication when creating attributeSchema object ...\n" 188 173 189 # urgent replication should be enabled when modifying 190 m = Message() 191 m.dn = Dn( ldb, "CN=test attributeSchema,CN=Schema,CN=Configuration," + self.base_dn)174 # urgent replication should be enabled when modifying 175 m = Message() 176 m.dn = Dn(self.ldb, "CN=test attributeSchema,CN=Schema,CN=Configuration," + self.base_dn) 192 177 m["lDAPDisplayName"] = MessageElement("updated test attributeSchema", FLAG_MOD_REPLACE, 193 178 "lDAPDisplayName") 194 ldb.modify(m)179 self.ldb.modify(m) 195 180 res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn) 196 181 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]) 197 182 198 199 183 def test_classSchema_object(self): 200 '''Test if the urgent replication is activated 201 when handling a classSchema object''' 184 """Test if the urgent replication is activated when handling a classSchema object.""" 202 185 try: 203 186 self.ldb.add_ldif( … … 232 215 # urgent replication should be enabled when modifying 233 216 m = Message() 234 m.dn = Dn( ldb, "CN=test classSchema,CN=Schema,CN=Configuration," + self.base_dn)217 m.dn = Dn(self.ldb, "CN=test classSchema,CN=Schema,CN=Configuration," + self.base_dn) 235 218 m["lDAPDisplayName"] = MessageElement("updated test classSchema", FLAG_MOD_REPLACE, 236 219 "lDAPDisplayName") 237 ldb.modify(m)220 self.ldb.modify(m) 238 221 res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn) 239 222 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]) 240 223 241 242 224 def test_secret_object(self): 243 '''Test if the urgent replication is activated 244 when handling a secret object''' 225 """Test if the urgent replication is activated when handling a secret object.""" 245 226 246 227 self.ldb.add({ … … 257 238 # urgent replication should be enabled when modifying 258 239 m = Message() 259 m.dn = Dn( ldb, "cn=test secret,cn=System," + self.base_dn)240 m.dn = Dn(self.ldb, "cn=test secret,cn=System," + self.base_dn) 260 241 m["currentValue"] = MessageElement("yyyyyyyy", FLAG_MOD_REPLACE, 261 242 "currentValue") 262 ldb.modify(m)263 res = self.ldb.load_partition_usn(self.base_dn) 264 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]) 265 266 # urgent replication should NOT be enabled when deleting 243 self.ldb.modify(m) 244 res = self.ldb.load_partition_usn(self.base_dn) 245 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]) 246 247 # urgent replication should NOT be enabled when deleting 267 248 self.delete_force(self.ldb, "cn=test secret,cn=System," + self.base_dn) 268 249 res = self.ldb.load_partition_usn(self.base_dn) 269 250 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]) 270 251 271 272 252 def test_rIDManager_object(self): 273 '''Test if the urgent replication is activated 274 when handling a rIDManager object''' 253 """Test if the urgent replication is activated when handling a rIDManager object.""" 275 254 self.ldb.add_ldif( 276 255 """dn: CN=RID Manager test,CN=System,%s""" % self.base_dn + """ … … 290 269 # urgent replication should be enabled when modifying 291 270 m = Message() 292 m.dn = Dn( ldb, "CN=RID Manager test,CN=System," + self.base_dn)271 m.dn = Dn(self.ldb, "CN=RID Manager test,CN=System," + self.base_dn) 293 272 m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE, 294 273 "systemFlags") 295 ldb.modify(m)274 self.ldb.modify(m) 296 275 res = self.ldb.load_partition_usn(self.base_dn) 297 276 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]) … … 302 281 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]) 303 282 304 305 283 def test_urgent_attributes(self): 306 '''Test if the urgent replication is activated 307 when handling urgent attributes of an object''' 284 """Test if the urgent replication is activated when handling urgent attributes of an object.""" 308 285 309 286 self.ldb.add({ … … 320 297 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]) 321 298 322 # urgent replication should be enabled when modifying userAccountControl 323 m = Message() 324 m.dn = Dn( ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)299 # urgent replication should be enabled when modifying userAccountControl 300 m = Message() 301 m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn) 325 302 m["userAccountControl"] = MessageElement(str(dsdb.UF_NORMAL_ACCOUNT+dsdb.UF_SMARTCARD_REQUIRED), FLAG_MOD_REPLACE, 326 303 "userAccountControl") 327 ldb.modify(m)304 self.ldb.modify(m) 328 305 res = self.ldb.load_partition_usn(self.base_dn) 329 306 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]) … … 331 308 # urgent replication should be enabled when modifying lockoutTime 332 309 m = Message() 333 m.dn = Dn( ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)310 m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn) 334 311 m["lockoutTime"] = MessageElement("1", FLAG_MOD_REPLACE, 335 312 "lockoutTime") 336 ldb.modify(m)313 self.ldb.modify(m) 337 314 res = self.ldb.load_partition_usn(self.base_dn) 338 315 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]) … … 340 317 # urgent replication should be enabled when modifying pwdLastSet 341 318 m = Message() 342 m.dn = Dn( ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)319 m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn) 343 320 m["pwdLastSet"] = MessageElement("1", FLAG_MOD_REPLACE, 344 321 "pwdLastSet") 345 ldb.modify(m)322 self.ldb.modify(m) 346 323 res = self.ldb.load_partition_usn(self.base_dn) 347 324 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]) … … 350 327 # attribute 351 328 m = Message() 352 m.dn = Dn( ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)329 m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn) 353 330 m["description"] = MessageElement("updated urgent attributes test description", 354 331 FLAG_MOD_REPLACE, "description") 355 ldb.modify(m)332 self.ldb.modify(m) 356 333 res = self.ldb.load_partition_usn(self.base_dn) 357 334 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]) … … 363 340 364 341 365 if not "://" in host: 366 if os.path.isfile(host): 367 host = "tdb://%s" % host 368 else: 369 host = "ldap://%s" % host 370 371 372 ldb = SamDB(host, credentials=creds, session_info=system_session(lp), lp=lp, 373 global_schema=False) 374 375 runner = SubunitTestRunner() 376 rc = 0 377 if not runner.run(unittest.makeSuite(UrgentReplicationTests)).wasSuccessful(): 378 rc = 1 379 sys.exit(rc) 342 TestProgram(module=__name__, opts=subunitopts)
Note:
See TracChangeset
for help on using the changeset viewer.