Ignore:
Timestamp:
Nov 24, 2016, 1:14:11 PM (9 years ago)
Author:
Silvan Scherrer
Message:

Samba Server: update vendor to version 4.4.3

Location:
vendor/current/source4/dsdb/tests/python
Files:
5 added
11 edited

Legend:

Unmodified
Added
Removed
  • vendor/current/source4/dsdb/tests/python/acl.py

    r740 r988  
    99sys.path.insert(0, "bin/python")
    1010import samba
    11 samba.ensure_external_module("testtools", "testtools")
    12 samba.ensure_external_module("subunit", "subunit/python")
     11
     12from samba.tests.subunitrun import SubunitOptions, TestProgram
    1313
    1414import samba.getopt as options
     
    2121from ldb import ERR_OPERATIONS_ERROR
    2222from ldb import Message, MessageElement, Dn
    23 from ldb import FLAG_MOD_REPLACE, FLAG_MOD_ADD
     23from ldb import FLAG_MOD_REPLACE, FLAG_MOD_ADD, FLAG_MOD_DELETE
    2424from samba.dcerpc import security, drsuapi, misc
    2525
     
    2727from samba import gensec, sd_utils
    2828from samba.samdb import SamDB
    29 from samba.credentials import Credentials
     29from samba.credentials import Credentials, DONT_USE_KERBEROS
    3030import samba.tests
    3131from samba.tests import delete_force
    32 from subunit.run import SubunitTestRunner
    33 import unittest
     32import samba.dsdb
    3433
    3534parser = optparse.OptionParser("acl.py [options] <host>")
     
    4140credopts = options.CredentialsOptions(parser)
    4241parser.add_option_group(credopts)
     42subunitopts = SubunitOptions(parser)
     43parser.add_option_group(subunitopts)
     44
    4345opts, args = parser.parse_args()
    4446
     
    6769    def setUp(self):
    6870        super(AclTests, self).setUp()
    69         self.ldb_admin = ldb
    70         self.base_dn = ldb.domain_dn()
    71         self.domain_sid = security.dom_sid(ldb.get_domain_sid())
     71        self.ldb_admin = SamDB(ldaphost, credentials=creds, session_info=system_session(lp), lp=lp)
     72        self.base_dn = self.ldb_admin.domain_dn()
     73        self.domain_sid = security.dom_sid(self.ldb_admin.get_domain_sid())
    7274        self.user_pass = "samba123@"
    7375        self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
    74         self.sd_utils = sd_utils.SDUtils(ldb)
     76        self.sd_utils = sd_utils.SDUtils(self.ldb_admin)
    7577        #used for anonymous login
    7678        self.creds_tmp = Credentials()
     
    9496        creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
    9597                                      | gensec.FEATURE_SEAL)
     98        creds_tmp.set_kerberos_state(DONT_USE_KERBEROS) # kinit is too expensive to use in a tight loop
    9699        ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
    97100        return ldb_target
     
    127130
    128131        # add admins to the Domain Admins group
    129         self.ldb_admin.add_remove_group_members("Domain Admins", self.usr_admin_owner,
     132        self.ldb_admin.add_remove_group_members("Domain Admins", [self.usr_admin_owner],
    130133                       add_members_operation=True)
    131         self.ldb_admin.add_remove_group_members("Domain Admins", self.usr_admin_not_owner,
     134        self.ldb_admin.add_remove_group_members("Domain Admins", [self.usr_admin_not_owner],
    132135                       add_members_operation=True)
    133136
     
    168171        self.ldb_notowner.newuser(self.test_user1, self.user_pass, userou=self.ou2)
    169172        self.ldb_notowner.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
    170                                    grouptype=4)
     173                                   grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
    171174        # Make sure we HAVE created the two objects -- user and group
    172175        # !!! We should not be able to do that, but however beacuse of ACE ordering our inherited Deny ACE
     
    187190            self.ldb_user.newuser(self.test_user1, self.user_pass, userou=self.ou2)
    188191            self.ldb_user.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
    189                                    grouptype=4)
     192                                   grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
    190193        except LdbError, (num, _):
    191194            self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
     
    211214        try:
    212215            self.ldb_user.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
    213                                    grouptype=4)
     216                                   grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
    214217        except LdbError, (num, _):
    215218            self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
     
    235238        self.ldb_owner.newuser(self.test_user1, self.user_pass, userou=self.ou2)
    236239        self.ldb_owner.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
    237                                  grouptype=4)
     240                                 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
    238241        # Make sure we have successfully created the two objects -- user and group
    239242        res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
     
    268271        self.ldb_user3 = self.get_ldb_connection(self.user_with_group_sm, self.user_pass)
    269272        self.user_sid = self.sd_utils.get_object_sid( self.get_user_dn(self.user_with_wp))
    270         self.ldb_admin.newgroup("test_modify_group2", grouptype=4)
    271         self.ldb_admin.newgroup("test_modify_group3", grouptype=4)
     273        self.ldb_admin.newgroup("test_modify_group2", grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
     274        self.ldb_admin.newgroup("test_modify_group3", grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
    272275        self.ldb_admin.newuser("test_modify_user2", self.user_pass)
    273276
     
    303306        # Second test object -- Group
    304307        print "Testing modify on Group object"
    305         self.ldb_admin.newgroup("test_modify_group1", grouptype=4)
     308        self.ldb_admin.newgroup("test_modify_group1",
     309                                grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
    306310        self.sd_utils.dacl_add_ace("CN=test_modify_group1,CN=Users," + self.base_dn, mod)
    307311        ldif = """
     
    361365        # Second test object -- Group
    362366        print "Testing modify on Group object"
    363         self.ldb_admin.newgroup("test_modify_group1", grouptype=4)
     367        self.ldb_admin.newgroup("test_modify_group1",
     368                                grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
    364369        self.sd_utils.dacl_add_ace("CN=test_modify_group1,CN=Users," + self.base_dn, mod)
    365370        ldif = """
     
    379384replace: url
    380385url: www.samba.org"""
     386        try:
     387            self.ldb_user.modify_ldif(ldif)
     388        except LdbError, (num, _):
     389            self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
     390        else:
     391            # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
     392            self.fail()
     393        # Modify on attribute you do not have rights for granted while also modifying something you do have rights for
     394        ldif = """
     395dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
     396changetype: modify
     397replace: url
     398url: www.samba.org
     399replace: displayName
     400displayName: test_changed"""
    381401        try:
    382402            self.ldb_user.modify_ldif(ldif)
     
    435455        # Second test object -- Group
    436456        print "Testing modify on Group object"
    437         self.ldb_admin.newgroup("test_modify_group1", grouptype=4)
     457        self.ldb_admin.newgroup("test_modify_group1",
     458                                grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
    438459        # Modify on attribute you do not have rights for granted
    439460        ldif = """
     
    601622    def setUp(self):
    602623        super(AclSearchTests, self).setUp()
     624        # Get the old "dSHeuristics" if it was set
     625        dsheuristics = self.ldb_admin.get_dsheuristics()
     626        # Reset the "dSHeuristics" as they were before
     627        self.addCleanup(self.ldb_admin.set_dsheuristics, dsheuristics)
     628        # Set the "dSHeuristics" to activate the correct "userPassword" behaviour
     629        self.ldb_admin.set_dsheuristics("000000001")
     630        # Get the old "minPwdAge"
     631        minPwdAge = self.ldb_admin.get_minPwdAge()
     632        # Reset the "minPwdAge" as it was before
     633        self.addCleanup(self.ldb_admin.set_minPwdAge, minPwdAge)
     634        # Set it temporarely to "0"
     635        self.ldb_admin.set_minPwdAge("0")
     636
    603637        self.u1 = "search_u1"
    604638        self.u2 = "search_u2"
     
    608642        self.ldb_admin.newuser(self.u2, self.user_pass)
    609643        self.ldb_admin.newuser(self.u3, self.user_pass)
    610         self.ldb_admin.newgroup(self.group1, grouptype=-2147483646)
    611         self.ldb_admin.add_remove_group_members(self.group1, self.u2,
     644        self.ldb_admin.newgroup(self.group1, grouptype=samba.dsdb.GTYPE_SECURITY_GLOBAL_GROUP)
     645        self.ldb_admin.add_remove_group_members(self.group1, [self.u2],
    612646                                                add_members_operation=True)
    613647        self.ldb_user = self.get_ldb_connection(self.u1, self.user_pass)
     
    669703        self.assertEquals(len(res), 1)
    670704        #verify some of the attributes
    671         #dont care about values
     705        #don't care about values
    672706        self.assertTrue("ldapServiceName" in res[0])
    673707        self.assertTrue("namingContexts" in res[0])
     
    695729            self.fail()
    696730        try:
    697             res = anonymous.search("CN=Configuration," + self.base_dn, expression="(objectClass=*)",
     731            res = anonymous.search(anonymous.get_config_basedn(), expression="(objectClass=*)",
    698732                                        scope=SCOPE_SUBTREE)
    699733        except LdbError, (num, _):
     
    716750        self.assertTrue(res[0]["dn"] == Dn(self.ldb_admin,
    717751                                           "OU=test_search_ou2,OU=test_search_ou1," + self.base_dn))
    718         res = anonymous.search("CN=Configuration," + self.base_dn, expression="(objectClass=*)",
     752        res = anonymous.search(anonymous.get_config_basedn(), expression="(objectClass=*)",
    719753                               scope=SCOPE_SUBTREE)
    720754        self.assertEquals(len(res), 1)
     
    12311265        self.assertNotEqual(len(res), 0)
    12321266
     1267    def test_rename_u9(self):
     1268        """Rename 'User object' cross OU, with explicit deny on sd and dc"""
     1269        ou1_dn = "OU=test_rename_ou1," + self.base_dn
     1270        ou2_dn = "OU=test_rename_ou2," + self.base_dn
     1271        user_dn = "CN=test_rename_user2," + ou1_dn
     1272        rename_user_dn = "CN=test_rename_user5," + ou2_dn
     1273        # Create OU structure
     1274        self.ldb_admin.create_ou(ou1_dn)
     1275        self.ldb_admin.create_ou(ou2_dn)
     1276        self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
     1277        mod = "(D;;SD;;;DA)"
     1278        self.sd_utils.dacl_add_ace(user_dn, mod)
     1279        mod = "(D;;DC;;;DA)"
     1280        self.sd_utils.dacl_add_ace(ou1_dn, mod)
     1281        # Rename 'User object' having SD and CC to AU
     1282        try:
     1283            self.ldb_admin.rename(user_dn, rename_user_dn)
     1284        except LdbError, (num, _):
     1285            self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
     1286        else:
     1287            self.fail()
     1288        #add an allow ace so we can delete this ou
     1289        mod = "(A;;DC;;;DA)"
     1290        self.sd_utils.dacl_add_ace(ou1_dn, mod)
     1291
     1292
    12331293#tests on Control Access Rights
    12341294class AclCARTests(AclTests):
     
    12361296    def setUp(self):
    12371297        super(AclCARTests, self).setUp()
     1298
     1299        # Get the old "dSHeuristics" if it was set
     1300        dsheuristics = self.ldb_admin.get_dsheuristics()
     1301        # Reset the "dSHeuristics" as they were before
     1302        self.addCleanup(self.ldb_admin.set_dsheuristics, dsheuristics)
     1303        # Set the "dSHeuristics" to activate the correct "userPassword" behaviour
     1304        self.ldb_admin.set_dsheuristics("000000001")
     1305        # Get the old "minPwdAge"
     1306        minPwdAge = self.ldb_admin.get_minPwdAge()
     1307        # Reset the "minPwdAge" as it was before
     1308        self.addCleanup(self.ldb_admin.set_minPwdAge, minPwdAge)
     1309        # Set it temporarely to "0"
     1310        self.ldb_admin.set_minPwdAge("0")
     1311
    12381312        self.user_with_wp = "acl_car_user1"
    12391313        self.user_with_pc = "acl_car_user2"
     
    15171591        self.ldb_admin.newuser(self.u2, self.user_pass)
    15181592        self.ldb_admin.newuser(self.u3, self.user_pass)
    1519         self.ldb_admin.add_remove_group_members("Domain Admins", self.u3,
     1593        self.ldb_admin.add_remove_group_members("Domain Admins", [self.u3],
    15201594                                                add_members_operation=True)
    15211595        self.ldb_user1 = self.get_ldb_connection(self.u1, self.user_pass)
     
    15421616        self.sd_utils.dacl_add_ace("OU=ext_ou1," + self.base_dn, mod)
    15431617        #create a group under that, grant RP to u2
    1544         self.ldb_user1.newgroup("ext_group1", groupou="OU=ext_ou1", grouptype=4)
     1618        self.ldb_user1.newgroup("ext_group1", groupou="OU=ext_ou1",
     1619                                grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
    15451620        mod = "(A;;RP;;;%s)" % str(self.user_sid2)
    15461621        self.sd_utils.dacl_add_ace("CN=ext_group1,OU=ext_ou1," + self.base_dn, mod)
     
    15631638        self.assertTrue("nTSecurityDescriptor" in res[0].keys())
    15641639
     1640class AclUndeleteTests(AclTests):
     1641
     1642    def setUp(self):
     1643        super(AclUndeleteTests, self).setUp()
     1644        self.regular_user = "undeleter1"
     1645        self.ou1 = "OU=undeleted_ou,"
     1646        self.testuser1 = "to_be_undeleted1"
     1647        self.testuser2 = "to_be_undeleted2"
     1648        self.testuser3 = "to_be_undeleted3"
     1649        self.testuser4 = "to_be_undeleted4"
     1650        self.testuser5 = "to_be_undeleted5"
     1651        self.testuser6 = "to_be_undeleted6"
     1652
     1653        self.new_dn_ou = "CN="+ self.testuser4 + "," + self.ou1 + self.base_dn
     1654
     1655        # Create regular user
     1656        self.testuser1_dn = self.get_user_dn(self.testuser1)
     1657        self.testuser2_dn = self.get_user_dn(self.testuser2)
     1658        self.testuser3_dn = self.get_user_dn(self.testuser3)
     1659        self.testuser4_dn = self.get_user_dn(self.testuser4)
     1660        self.testuser5_dn = self.get_user_dn(self.testuser5)
     1661        self.deleted_dn1 = self.create_delete_user(self.testuser1)
     1662        self.deleted_dn2 = self.create_delete_user(self.testuser2)
     1663        self.deleted_dn3 = self.create_delete_user(self.testuser3)
     1664        self.deleted_dn4 = self.create_delete_user(self.testuser4)
     1665        self.deleted_dn5 = self.create_delete_user(self.testuser5)
     1666
     1667        self.ldb_admin.create_ou(self.ou1 + self.base_dn)
     1668
     1669        self.ldb_admin.newuser(self.regular_user, self.user_pass)
     1670        self.ldb_admin.add_remove_group_members("Domain Admins", [self.regular_user],
     1671                       add_members_operation=True)
     1672        self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
     1673        self.sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
     1674
     1675    def tearDown(self):
     1676        super(AclUndeleteTests, self).tearDown()
     1677        delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
     1678        delete_force(self.ldb_admin, self.get_user_dn(self.testuser1))
     1679        delete_force(self.ldb_admin, self.get_user_dn(self.testuser2))
     1680        delete_force(self.ldb_admin, self.get_user_dn(self.testuser3))
     1681        delete_force(self.ldb_admin, self.get_user_dn(self.testuser4))
     1682        delete_force(self.ldb_admin, self.get_user_dn(self.testuser5))
     1683        delete_force(self.ldb_admin, self.new_dn_ou)
     1684        delete_force(self.ldb_admin, self.ou1 + self.base_dn)
     1685
     1686    def GUID_string(self, guid):
     1687        return ldb.schema_format_value("objectGUID", guid)
     1688
     1689    def create_delete_user(self, new_user):
     1690        self.ldb_admin.newuser(new_user, self.user_pass)
     1691
     1692        res = self.ldb_admin.search(expression="(objectClass=*)",
     1693                                    base=self.get_user_dn(new_user),
     1694                                    scope=SCOPE_BASE,
     1695                                    controls=["show_deleted:1"])
     1696        guid = res[0]["objectGUID"][0]
     1697        self.ldb_admin.delete(self.get_user_dn(new_user))
     1698        res = self.ldb_admin.search(base="<GUID=%s>" % self.GUID_string(guid),
     1699                         scope=SCOPE_BASE, controls=["show_deleted:1"])
     1700        self.assertEquals(len(res), 1)
     1701        return str(res[0].dn)
     1702
     1703    def undelete_deleted(self, olddn, newdn):
     1704        msg = Message()
     1705        msg.dn = Dn(self.ldb_user, olddn)
     1706        msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted")
     1707        msg["distinguishedName"] = MessageElement([newdn], FLAG_MOD_REPLACE, "distinguishedName")
     1708        res = self.ldb_user.modify(msg, ["show_recycled:1"])
     1709
     1710    def undelete_deleted_with_mod(self, olddn, newdn):
     1711        msg = Message()
     1712        msg.dn = Dn(ldb, olddn)
     1713        msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted")
     1714        msg["distinguishedName"] = MessageElement([newdn], FLAG_MOD_REPLACE, "distinguishedName")
     1715        msg["url"] = MessageElement(["www.samba.org"], FLAG_MOD_REPLACE, "url")
     1716        res = self.ldb_user.modify(msg, ["show_deleted:1"])
     1717
     1718    def test_undelete(self):
     1719        # it appears the user has to have LC on the old parent to be able to move the object
     1720        # otherwise we get no such object. Since only System can modify the SD on deleted object
     1721        # we cannot grant this permission via LDAP, and this leaves us with "negative" tests at the moment
     1722
     1723        # deny write property on rdn, should fail
     1724        mod = "(OD;;WP;bf967a0e-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.sid)
     1725        self.sd_utils.dacl_add_ace(self.deleted_dn1, mod)
     1726        try:
     1727            self.undelete_deleted(self.deleted_dn1, self.testuser1_dn)
     1728            self.fail()
     1729        except LdbError, (num, _):
     1730            self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
     1731
     1732        # seems that permissions on isDeleted and distinguishedName are irrelevant
     1733        mod = "(OD;;WP;bf96798f-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.sid)
     1734        self.sd_utils.dacl_add_ace(self.deleted_dn2, mod)
     1735        mod = "(OD;;WP;bf9679e4-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.sid)
     1736        self.sd_utils.dacl_add_ace(self.deleted_dn2, mod)
     1737        self.undelete_deleted(self.deleted_dn2, self.testuser2_dn)
     1738
     1739        # attempt undelete with simultanious addition of url, WP to which is denied
     1740        mod = "(OD;;WP;9a9a0221-4a5b-11d1-a9c3-0000f80367c1;;%s)" % str(self.sid)
     1741        self.sd_utils.dacl_add_ace(self.deleted_dn3, mod)
     1742        try:
     1743            self.undelete_deleted_with_mod(self.deleted_dn3, self.testuser3_dn)
     1744            self.fail()
     1745        except LdbError, (num, _):
     1746            self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
     1747
     1748        # undelete in an ou, in which we have no right to create children
     1749        mod = "(D;;CC;;;%s)" % str(self.sid)
     1750        self.sd_utils.dacl_add_ace(self.ou1 + self.base_dn, mod)
     1751        try:
     1752            self.undelete_deleted(self.deleted_dn4, self.new_dn_ou)
     1753            self.fail()
     1754        except LdbError, (num, _):
     1755            self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
     1756
     1757        # delete is not required
     1758        mod = "(D;;SD;;;%s)" % str(self.sid)
     1759        self.sd_utils.dacl_add_ace(self.deleted_dn5, mod)
     1760        self.undelete_deleted(self.deleted_dn5, self.testuser5_dn)
     1761
     1762        # deny Reanimate-Tombstone, should fail
     1763        mod = "(OD;;CR;45ec5156-db7e-47bb-b53f-dbeb2d03c40f;;%s)" % str(self.sid)
     1764        self.sd_utils.dacl_add_ace(self.base_dn, mod)
     1765        try:
     1766            self.undelete_deleted(self.deleted_dn4, self.testuser4_dn)
     1767            self.fail()
     1768        except LdbError, (num, _):
     1769            self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
    15651770
    15661771class AclSPNTests(AclTests):
     
    16221827    # same as for join_RODC, but do not set any SPNs
    16231828    def create_rodc(self, ctx):
     1829         ctx.nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
     1830         ctx.full_nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
    16241831         ctx.krbtgt_dn = "CN=krbtgt_%s,CN=Users,%s" % (ctx.myname, ctx.base_dn)
    16251832
     
    16511858
    16521859    def create_dc(self, ctx):
     1860        ctx.nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
     1861        ctx.full_nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
    16531862        ctx.userAccountControl = samba.dsdb.UF_SERVER_TRUST_ACCOUNT | samba.dsdb.UF_TRUSTED_FOR_DELEGATION
    16541863        ctx.secure_channel_type = misc.SEC_CHAN_BDC
     
    16781887                         (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
    16791888        self.replace_spn(self.ldb_user1, ctx.acct_dn, "GC/%s.%s/%s" %
    1680                          (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
     1889                         (ctx.myname, ctx.dnsdomain, ctx.dnsforest))
    16811890        self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s/%s" % (ctx.myname, netbiosdomain))
    16821891        self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/%s" %
     
    17371946                         (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
    17381947        self.replace_spn(self.ldb_admin, self.computerdn, "GC/%s.%s/%s" %
    1739                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
     1948                         (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsforest))
    17401949        self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s/%s" % (self.computername, netbiosdomain))
    17411950        self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/ForestDnsZones.%s" %
     
    18012010        try:
    18022011            self.replace_spn(self.ldb_user1, self.computerdn, "GC/%s.%s/%s" %
    1803                              (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
     2012                             (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsforest))
    18042013        except LdbError, (num, _):
    18052014            self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
     
    18252034ldb = SamDB(ldaphost, credentials=creds, session_info=system_session(lp), lp=lp)
    18262035
    1827 runner = SubunitTestRunner()
    1828 rc = 0
    1829 if not runner.run(unittest.makeSuite(AclAddTests)).wasSuccessful():
    1830     rc = 1
    1831 if not runner.run(unittest.makeSuite(AclModifyTests)).wasSuccessful():
    1832     rc = 1
    1833 if not runner.run(unittest.makeSuite(AclDeleteTests)).wasSuccessful():
    1834     rc = 1
    1835 if not runner.run(unittest.makeSuite(AclRenameTests)).wasSuccessful():
    1836     rc = 1
    1837 
    1838 # Get the old "dSHeuristics" if it was set
    1839 dsheuristics = ldb.get_dsheuristics()
    1840 # Set the "dSHeuristics" to activate the correct "userPassword" behaviour
    1841 ldb.set_dsheuristics("000000001")
    1842 # Get the old "minPwdAge"
    1843 minPwdAge = ldb.get_minPwdAge()
    1844 # Set it temporarely to "0"
    1845 ldb.set_minPwdAge("0")
    1846 if not runner.run(unittest.makeSuite(AclCARTests)).wasSuccessful():
    1847     rc = 1
    1848 if not runner.run(unittest.makeSuite(AclSearchTests)).wasSuccessful():
    1849     rc = 1
    1850 # Reset the "dSHeuristics" as they were before
    1851 ldb.set_dsheuristics(dsheuristics)
    1852 # Reset the "minPwdAge" as it was before
    1853 ldb.set_minPwdAge(minPwdAge)
    1854 
    1855 if not runner.run(unittest.makeSuite(AclExtendedTests)).wasSuccessful():
    1856     rc = 1
    1857 if not runner.run(unittest.makeSuite(AclSPNTests)).wasSuccessful():
    1858     rc = 1
    1859 sys.exit(rc)
     2036TestProgram(module=__name__, opts=subunitopts)
  • vendor/current/source4/dsdb/tests/python/deletetest.py

    r740 r988  
    88sys.path.insert(0, "bin/python")
    99import samba
    10 samba.ensure_external_module("testtools", "testtools")
    11 samba.ensure_external_module("subunit", "subunit/python")
     10
     11from samba.tests.subunitrun import SubunitOptions, TestProgram
    1212
    1313import samba.getopt as options
    1414
    1515from samba.auth import system_session
    16 from ldb import SCOPE_BASE, LdbError
    17 from ldb import ERR_NO_SUCH_OBJECT, ERR_NOT_ALLOWED_ON_NON_LEAF
    18 from ldb import ERR_UNWILLING_TO_PERFORM
     16from ldb import SCOPE_BASE, LdbError, Message, MessageElement, Dn, FLAG_MOD_ADD, FLAG_MOD_DELETE, FLAG_MOD_REPLACE
     17from ldb import ERR_NO_SUCH_OBJECT, ERR_NOT_ALLOWED_ON_NON_LEAF, ERR_ENTRY_ALREADY_EXISTS, ERR_ATTRIBUTE_OR_VALUE_EXISTS
     18from ldb import ERR_UNWILLING_TO_PERFORM, ERR_OPERATIONS_ERROR
    1919from samba.samdb import SamDB
    2020from samba.tests import delete_force
    21 
    22 from subunit.run import SubunitTestRunner
    23 import unittest
    2421
    2522parser = optparse.OptionParser("deletetest.py [options] <host|file>")
     
    3027credopts = options.CredentialsOptions(parser)
    3128parser.add_option_group(credopts)
     29subunitopts = SubunitOptions(parser)
     30parser.add_option_group(subunitopts)
    3231opts, args = parser.parse_args()
    3332
     
    4140creds = credopts.get_credentials(lp)
    4241
    43 class BasicDeleteTests(unittest.TestCase):
    44 
     42class BaseDeleteTests(samba.tests.TestCase):
    4543
    4644    def GUID_string(self, guid):
     
    4846
    4947    def setUp(self):
    50         self.ldb = ldb
    51         self.base_dn = ldb.domain_dn()
    52         self.configuration_dn = ldb.get_config_basedn().get_linearized()
     48        super(BaseDeleteTests, self).setUp()
     49        self.ldb = SamDB(host, credentials=creds, session_info=system_session(lp), lp=lp)
     50
     51        self.base_dn = self.ldb.domain_dn()
     52        self.configuration_dn = self.ldb.get_config_basedn().get_linearized()
    5353
    5454    def search_guid(self, guid):
    5555        print "SEARCH by GUID %s" % self.GUID_string(guid)
    5656
    57         res = ldb.search(base="<GUID=%s>" % self.GUID_string(guid),
     57        res = self.ldb.search(base="<GUID=%s>" % self.GUID_string(guid),
    5858                         scope=SCOPE_BASE, controls=["show_deleted:1"])
    5959        self.assertEquals(len(res), 1)
     
    6363        print "SEARCH by DN %s" % dn
    6464
    65         res = ldb.search(expression="(objectClass=*)",
     65        res = self.ldb.search(expression="(objectClass=*)",
    6666                         base=dn,
    6767                         scope=SCOPE_BASE,
     
    6969        self.assertEquals(len(res), 1)
    7070        return res[0]
     71
     72
     73class BasicDeleteTests(BaseDeleteTests):
     74
     75    def setUp(self):
     76        super(BasicDeleteTests, self).setUp()
    7177
    7278    def del_attr_values(self, delObj):
     
    119125        delete_force(self.ldb, "cn=ldaptestcontainer," + self.base_dn)
    120126
    121         ldb.add({
     127        self.ldb.add({
    122128            "dn": "cn=ldaptestcontainer," + self.base_dn,
    123129            "objectclass": "container"})
    124         ldb.add({
     130        self.ldb.add({
    125131            "dn": "cn=entry1,cn=ldaptestcontainer," + self.base_dn,
    126132            "objectclass": "container"})
    127         ldb.add({
     133        self.ldb.add({
    128134            "dn": "cn=entry2,cn=ldaptestcontainer," + self.base_dn,
    129135            "objectclass": "container"})
    130136
    131137        try:
    132             ldb.delete("cn=ldaptestcontainer," + self.base_dn)
    133             self.fail()
    134         except LdbError, (num, _):
    135             self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF)
    136 
    137         ldb.delete("cn=ldaptestcontainer," + self.base_dn, ["tree_delete:1"])
    138 
    139         try:
    140             res = ldb.search("cn=ldaptestcontainer," + self.base_dn,
     138            self.ldb.delete("cn=ldaptestcontainer," + self.base_dn)
     139            self.fail()
     140        except LdbError, (num, _):
     141            self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF)
     142
     143        self.ldb.delete("cn=ldaptestcontainer," + self.base_dn, ["tree_delete:1"])
     144
     145        try:
     146            res = self.ldb.search("cn=ldaptestcontainer," + self.base_dn,
    141147                             scope=SCOPE_BASE, attrs=[])
    142148            self.fail()
     
    144150            self.assertEquals(num, ERR_NO_SUCH_OBJECT)
    145151        try:
    146             res = ldb.search("cn=entry1,cn=ldaptestcontainer," + self.base_dn,
     152            res = self.ldb.search("cn=entry1,cn=ldaptestcontainer," + self.base_dn,
    147153                             scope=SCOPE_BASE, attrs=[])
    148154            self.fail()
     
    150156            self.assertEquals(num, ERR_NO_SUCH_OBJECT)
    151157        try:
    152             res = ldb.search("cn=entry2,cn=ldaptestcontainer," + self.base_dn,
     158            res = self.ldb.search("cn=entry2,cn=ldaptestcontainer," + self.base_dn,
    153159                             scope=SCOPE_BASE, attrs=[])
    154160            self.fail()
     
    162168        # Performs some protected object delete testing
    163169
    164         res = ldb.search(base="", expression="", scope=SCOPE_BASE,
     170        res = self.ldb.search(base="", expression="", scope=SCOPE_BASE,
    165171                         attrs=["dsServiceName", "dNSHostName"])
    166172        self.assertEquals(len(res), 1)
     
    168174        # Delete failing since DC's nTDSDSA object is protected
    169175        try:
    170             ldb.delete(res[0]["dsServiceName"][0])
    171             self.fail()
    172         except LdbError, (num, _):
    173             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
    174 
    175         res = ldb.search(self.base_dn, attrs=["rIDSetReferences"],
     176            self.ldb.delete(res[0]["dsServiceName"][0])
     177            self.fail()
     178        except LdbError, (num, _):
     179            self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
     180
     181        res = self.ldb.search(self.base_dn, attrs=["rIDSetReferences"],
    176182                         expression="(&(objectClass=computer)(dNSHostName=" + res[0]["dNSHostName"][0] + "))")
    177183        self.assertEquals(len(res), 1)
     
    179185        # Deletes failing since DC's rIDSet object is protected
    180186        try:
    181             ldb.delete(res[0]["rIDSetReferences"][0])
    182             self.fail()
    183         except LdbError, (num, _):
    184             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
    185         try:
    186             ldb.delete(res[0]["rIDSetReferences"][0], ["tree_delete:1"])
     187            self.ldb.delete(res[0]["rIDSetReferences"][0])
     188            self.fail()
     189        except LdbError, (num, _):
     190            self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
     191        try:
     192            self.ldb.delete(res[0]["rIDSetReferences"][0], ["tree_delete:1"])
    187193            self.fail()
    188194        except LdbError, (num, _):
     
    192198
    193199        try:
    194             ldb.delete("cn=Enterprise Schema,cn=Partitions," + self.configuration_dn)
    195             self.fail()
    196         except LdbError, (num, _):
    197             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
    198         try:
    199             ldb.delete("cn=Enterprise Schema,cn=Partitions," + self.configuration_dn, ["tree_delete:1"])
    200             self.fail()
    201         except LdbError, (num, _):
    202             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
    203 
    204         try:
    205             ldb.delete("cn=Enterprise Configuration,cn=Partitions," + self.configuration_dn)
    206             self.fail()
    207         except LdbError, (num, _):
    208             self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF)
    209         try:
    210             ldb.delete("cn=Enterprise Configuration,cn=Partitions," + self.configuration_dn, ["tree_delete:1"])
    211             self.fail()
    212         except LdbError, (num, _):
    213             self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF)
    214 
    215         res = ldb.search("cn=Partitions," + self.configuration_dn, attrs=[],
     200            self.ldb.delete("cn=Enterprise Schema,cn=Partitions," + self.configuration_dn)
     201            self.fail()
     202        except LdbError, (num, _):
     203            self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
     204        try:
     205            self.ldb.delete("cn=Enterprise Schema,cn=Partitions," + self.configuration_dn, ["tree_delete:1"])
     206            self.fail()
     207        except LdbError, (num, _):
     208            self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
     209
     210        try:
     211            self.ldb.delete("cn=Enterprise Configuration,cn=Partitions," + self.configuration_dn)
     212            self.fail()
     213        except LdbError, (num, _):
     214            self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF)
     215        try:
     216            self.ldb.delete("cn=Enterprise Configuration,cn=Partitions," + self.configuration_dn, ["tree_delete:1"])
     217            self.fail()
     218        except LdbError, (num, _):
     219            self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF)
     220
     221        res = self.ldb.search("cn=Partitions," + self.configuration_dn, attrs=[],
    216222                         expression="(nCName=%s)" % self.base_dn)
    217223        self.assertEquals(len(res), 1)
    218224
    219225        try:
    220             ldb.delete(res[0].dn)
    221             self.fail()
    222         except LdbError, (num, _):
    223             self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF)
    224         try:
    225             ldb.delete(res[0].dn, ["tree_delete:1"])
     226            self.ldb.delete(res[0].dn)
     227            self.fail()
     228        except LdbError, (num, _):
     229            self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF)
     230        try:
     231            self.ldb.delete(res[0].dn, ["tree_delete:1"])
    226232            self.fail()
    227233        except LdbError, (num, _):
     
    230236        # Delete failing since "SYSTEM_FLAG_DISALLOW_DELETE"
    231237        try:
    232             ldb.delete("CN=Users," + self.base_dn)
     238            self.ldb.delete("CN=Users," + self.base_dn)
    233239            self.fail()
    234240        except LdbError, (num, _):
     
    237243        # Tree-delete failing since "isCriticalSystemObject"
    238244        try:
    239             ldb.delete("CN=Computers," + self.base_dn, ["tree_delete:1"])
     245            self.ldb.delete("CN=Computers," + self.base_dn, ["tree_delete:1"])
    240246            self.fail()
    241247        except LdbError, (num, _):
     
    247253        print self.base_dn
    248254
    249         usr1="cn=testuser,cn=users," + self.base_dn
    250         usr2="cn=testuser2,cn=users," + self.base_dn
    251         grp1="cn=testdelgroup1,cn=users," + self.base_dn
    252         sit1="cn=testsite1,cn=sites," + self.configuration_dn
    253         ss1="cn=NTDS Site Settings,cn=testsite1,cn=sites," + self.configuration_dn
    254         srv1="cn=Servers,cn=testsite1,cn=sites," + self.configuration_dn
    255         srv2="cn=TESTSRV,cn=Servers,cn=testsite1,cn=sites," + self.configuration_dn
     255        # user current time in ms to make unique objects
     256        import time
     257        marker = str(int(round(time.time()*1000)))
     258        usr1_name = "u_" + marker
     259        usr2_name = "u2_" + marker
     260        grp_name = "g1_" + marker
     261        site_name = "s1_" + marker
     262
     263        usr1 = "cn=%s,cn=users,%s" % (usr1_name, self.base_dn)
     264        usr2 = "cn=%s,cn=users,%s" % (usr2_name, self.base_dn)
     265        grp1 = "cn=%s,cn=users,%s" % (grp_name, self.base_dn)
     266        sit1 = "cn=%s,cn=sites,%s" % (site_name, self.configuration_dn)
     267        ss1 = "cn=NTDS Site Settings,cn=%s,cn=sites,%s" % (site_name, self.configuration_dn)
     268        srv1 = "cn=Servers,cn=%s,cn=sites,%s" % (site_name, self.configuration_dn)
     269        srv2 = "cn=TESTSRV,cn=Servers,cn=%s,cn=sites,%s" % (site_name, self.configuration_dn)
    256270
    257271        delete_force(self.ldb, usr1)
     
    263277        delete_force(self.ldb, sit1)
    264278
    265         ldb.add({
     279        self.ldb.add({
    266280            "dn": usr1,
    267281            "objectclass": "user",
    268282            "description": "test user description",
    269             "samaccountname": "testuser"})
    270 
    271         ldb.add({
     283            "samaccountname": usr1_name})
     284
     285        self.ldb.add({
    272286            "dn": usr2,
    273287            "objectclass": "user",
    274288            "description": "test user 2 description",
    275             "samaccountname": "testuser2"})
    276 
    277         ldb.add({
     289            "samaccountname": usr2_name})
     290
     291        self.ldb.add({
    278292            "dn": grp1,
    279293            "objectclass": "group",
    280294            "description": "test group",
    281             "samaccountname": "testdelgroup1",
     295            "samaccountname": grp_name,
    282296            "member": [ usr1, usr2 ],
    283297            "isDeleted": "FALSE" })
    284298
    285         ldb.add({
     299        self.ldb.add({
    286300            "dn": sit1,
    287301            "objectclass": "site" })
    288302
    289         ldb.add({
     303        self.ldb.add({
    290304            "dn": ss1,
    291305            "objectclass": ["applicationSiteSettings", "nTDSSiteSettings"] })
    292306
    293         ldb.add({
     307        self.ldb.add({
    294308            "dn": srv1,
    295309            "objectclass": "serversContainer" })
    296310
    297         ldb.add({
     311        self.ldb.add({
    298312            "dn": srv2,
    299313            "objectClass": "server" })
     
    320334        guid7=objLive7["objectGUID"][0]
    321335
    322         ldb.delete(usr1)
    323         ldb.delete(usr2)
    324         ldb.delete(grp1)
    325         ldb.delete(srv1, ["tree_delete:1"])
    326         ldb.delete(sit1, ["tree_delete:1"])
     336        self.ldb.delete(usr1)
     337        self.ldb.delete(usr2)
     338        self.ldb.delete(grp1)
     339        self.ldb.delete(srv1, ["tree_delete:1"])
     340        self.ldb.delete(sit1, ["tree_delete:1"])
    327341
    328342        objDeleted1 = self.search_guid(guid1)
     
    358372        self.check_rdn(objLive7, objDeleted7, "cn")
    359373
    360         self.delete_deleted(ldb, usr1)
    361         self.delete_deleted(ldb, usr2)
    362         self.delete_deleted(ldb, grp1)
    363         self.delete_deleted(ldb, sit1)
    364         self.delete_deleted(ldb, ss1)
    365         self.delete_deleted(ldb, srv1)
    366         self.delete_deleted(ldb, srv2)
     374        self.delete_deleted(self.ldb, usr1)
     375        self.delete_deleted(self.ldb, usr2)
     376        self.delete_deleted(self.ldb, grp1)
     377        self.delete_deleted(self.ldb, sit1)
     378        self.delete_deleted(self.ldb, ss1)
     379        self.delete_deleted(self.ldb, srv1)
     380        self.delete_deleted(self.ldb, srv2)
    367381
    368382        self.assertTrue("CN=Deleted Objects" in str(objDeleted1.dn))
     
    374388        self.assertFalse("CN=Deleted Objects" in str(objDeleted7.dn))
    375389
     390
    376391if not "://" in host:
    377392    if os.path.isfile(host):
     
    380395        host = "ldap://%s" % host
    381396
    382 ldb = SamDB(host, credentials=creds, session_info=system_session(lp), lp=lp)
    383 
    384 runner = SubunitTestRunner()
    385 rc = 0
    386 if not runner.run(unittest.makeSuite(BasicDeleteTests)).wasSuccessful():
    387     rc = 1
    388 
    389 sys.exit(rc)
     397TestProgram(module=__name__, opts=subunitopts)
  • vendor/current/source4/dsdb/tests/python/dsdb_schema_info.py

    r740 r988  
    3131
    3232sys.path.insert(0, "bin/python")
    33 import samba
    34 samba.ensure_external_module("testtools", "testtools")
     33import samba.tests
    3534
    3635from ldb import SCOPE_BASE, LdbError
  • vendor/current/source4/dsdb/tests/python/ldap.py

    r740 r988  
    22# -*- coding: utf-8 -*-
    33# This is a port of the original in testprogs/ejs/ldap.js
     4
     5# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2008-2011
     6#
     7# This program is free software; you can redistribute it and/or modify
     8# it under the terms of the GNU General Public License as published by
     9# the Free Software Foundation; either version 3 of the License, or
     10# (at your option) any later version.
     11#
     12# This program is distributed in the hope that it will be useful,
     13# but WITHOUT ANY WARRANTY; without even the implied warranty of
     14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     15# GNU General Public License for more details.
     16#
     17# You should have received a copy of the GNU General Public License
     18# along with this program.  If not, see <http://www.gnu.org/licenses/>.
    419
    520import optparse
     
    1126sys.path.insert(0, "bin/python")
    1227import samba
    13 samba.ensure_external_module("testtools", "testtools")
    14 samba.ensure_external_module("subunit", "subunit/python")
    15 
     28from samba.tests.subunitrun import SubunitOptions, TestProgram
    1629import samba.getopt as options
    1730
     
    3649    SYSTEM_FLAG_CONFIG_ALLOW_LIMITED_MOVE)
    3750
    38 from subunit.run import SubunitTestRunner
    39 import unittest
    40 
    4151from samba.ndr import ndr_pack, ndr_unpack
    4252from samba.dcerpc import security, lsa
     
    5060credopts = options.CredentialsOptions(parser)
    5161parser.add_option_group(credopts)
     62subunitopts = SubunitOptions(parser)
     63parser.add_option_group(subunitopts)
    5264opts, args = parser.parse_args()
    5365
     
    6173creds = credopts.get_credentials(lp)
    6274
    63 class BasicTests(unittest.TestCase):
     75class BasicTests(samba.tests.TestCase):
    6476
    6577    def setUp(self):
     
    7183        self.schema_dn = ldb.get_schema_basedn().get_linearized()
    7284        self.domain_sid = security.dom_sid(ldb.get_domain_sid())
    73 
    74         print "baseDN: %s\n" % self.base_dn
    7585
    7686        delete_force(self.ldb, "cn=posixuser,cn=users," + self.base_dn)
     
    97107        delete_force(self.ldb, "ou=testou,cn=users," + self.base_dn)
    98108        delete_force(self.ldb, "cn=Test Secret,cn=system," + self.base_dn)
     109        delete_force(self.ldb, "cn=testtimevaluesuser1,cn=users," + self.base_dn)
    99110
    100111    def test_objectclasses(self):
    101112        """Test objectClass behaviour"""
    102         print "Test objectClass behaviour"""
    103 
    104         # We cannot create LSA-specific objects (oc "secret" or "trustedDomain")
    105         try:
    106             self.ldb.add({
    107                 "dn": "cn=Test Secret,cn=system," + self.base_dn,
    108                 "objectClass": "secret" })
    109             self.fail()
    110         except LdbError, (num, _):
    111             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
    112 
    113113        # Invalid objectclass specified
    114114        try:
     
    149149            self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
    150150
    151         # We cannot instanciate from an abstract objectclass
     151        # We cannot instanciate from an abstract object class ("connectionPoint"
     152        # or "leaf"). In the first case we use "connectionPoint" (subclass of
     153        # "leaf") to prevent a naming violation - this returns us a
     154        # "ERR_UNWILLING_TO_PERFORM" since it is not structural. In the second
     155        # case however we get "ERR_OBJECT_CLASS_VIOLATION" since an abstract
     156        # class is also not allowed to be auxiliary.
    152157        try:
    153158            self.ldb.add({
     
    157162        except LdbError, (num, _):
    158163            self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
     164        try:
     165            self.ldb.add({
     166                "dn": "cn=ldaptestuser,cn=users," + self.base_dn,
     167                "objectClass": ["person", "leaf"] })
     168            self.fail()
     169        except LdbError, (num, _):
     170            self.assertEquals(num, ERR_OBJECT_CLASS_VIOLATION)
     171
     172        # Objects instanciated using "satisfied" abstract classes (concrete
     173        # subclasses) are allowed
     174        self.ldb.add({
     175             "dn": "cn=ldaptestuser,cn=users," + self.base_dn,
     176             "objectClass": ["top", "leaf", "connectionPoint", "serviceConnectionPoint"] })
     177
     178        delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
     179
     180        # Two disjoint top-most structural object classes aren't allowed
     181        try:
     182            self.ldb.add({
     183                "dn": "cn=ldaptestuser,cn=users," + self.base_dn,
     184                "objectClass": ["person", "container"] })
     185            self.fail()
     186        except LdbError, (num, _):
     187            self.assertEquals(num, ERR_OBJECT_CLASS_VIOLATION)
    159188
    160189        # Test allowed system flags
     
    222251            self.assertEquals(num, ERR_NO_SUCH_ATTRIBUTE)
    223252
    224         # The top-most structural class cannot be changed by adding another
    225         # structural one
     253        # We cannot add a the new top-most structural class "user" here since
     254        # we are missing at least one new mandatory attribute (in this case
     255        # "sAMAccountName")
    226256        m = Message()
    227257        m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
     
    252282        ldb.modify(m)
    253283
    254         # It's only possible to replace with the same objectclass combination.
    255         # So the replace action on "objectClass" attributes is really useless.
     284        # This does not work since object class "leaf" is not auxiliary nor it
     285        # stands in direct relation to "person" (and it is abstract too!)
     286        m = Message()
     287        m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
     288        m["objectClass"] = MessageElement("leaf", FLAG_MOD_ADD,
     289          "objectClass")
     290        try:
     291            ldb.modify(m)
     292            self.fail()
     293        except LdbError, (num, _):
     294            self.assertEquals(num, ERR_OBJECT_CLASS_VIOLATION)
     295
     296        # Objectclass replace operations can be performed as well
    256297        m = Message()
    257298        m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
     
    266307        ldb.modify(m)
    267308
     309        # This does not work since object class "leaf" is not auxiliary nor it
     310        # stands in direct relation to "person" (and it is abstract too!)
    268311        m = Message()
    269312        m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
    270313        m["objectClass"] = MessageElement(["top", "person", "bootableDevice",
    271           "connectionPoint"], FLAG_MOD_REPLACE, "objectClass")
     314          "leaf"], FLAG_MOD_REPLACE, "objectClass")
    272315        try:
    273316            ldb.modify(m)
     
    356399        delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
    357400
     401        self.ldb.add({
     402             "dn": "cn=ldaptestuser,cn=users," + self.base_dn,
     403             "objectClass": "user" })
     404
     405        # Add a new top-most structural class "container". This does not work
     406        # since it stands in no direct relation to the current one.
     407        m = Message()
     408        m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
     409        m["objectClass"] = MessageElement("container", FLAG_MOD_ADD,
     410          "objectClass")
     411        try:
     412            ldb.modify(m)
     413            self.fail()
     414        except LdbError, (num, _):
     415            self.assertEquals(num, ERR_OBJECT_CLASS_VIOLATION)
     416
     417        # Add a new top-most structural class "inetOrgPerson" and remove it
     418        # afterwards
     419        m = Message()
     420        m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
     421        m["objectClass"] = MessageElement("inetOrgPerson", FLAG_MOD_ADD,
     422          "objectClass")
     423        ldb.modify(m)
     424
     425        m = Message()
     426        m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
     427        m["objectClass"] = MessageElement("inetOrgPerson", FLAG_MOD_DELETE,
     428          "objectClass")
     429        ldb.modify(m)
     430
     431        # Replace top-most structural class to "inetOrgPerson" and reset it
     432        # back to "user"
     433        m = Message()
     434        m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
     435        m["objectClass"] = MessageElement("inetOrgPerson", FLAG_MOD_REPLACE,
     436          "objectClass")
     437        ldb.modify(m)
     438
     439        m = Message()
     440        m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
     441        m["objectClass"] = MessageElement("user", FLAG_MOD_REPLACE,
     442          "objectClass")
     443        ldb.modify(m)
     444
     445        # Add a new auxiliary object class "posixAccount" to "ldaptestuser"
     446        m = Message()
     447        m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
     448        m["objectClass"] = MessageElement("posixAccount", FLAG_MOD_ADD,
     449          "objectClass")
     450        ldb.modify(m)
     451
     452        # Be sure that "top" is the first and the (most) structural object class
     453        # the last value of the "objectClass" attribute - MS-ADTS 3.1.1.1.4
     454        res = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn,
     455                         scope=SCOPE_BASE, attrs=["objectClass"])
     456        self.assertTrue(len(res) == 1)
     457        self.assertEquals(res[0]["objectClass"][0], "top")
     458        self.assertEquals(res[0]["objectClass"][len(res[0]["objectClass"])-1], "user")
     459
     460        delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
     461
    358462    def test_system_only(self):
    359463        """Test systemOnly objects"""
    360         print "Test systemOnly objects"""
    361 
    362464        try:
    363465            self.ldb.add({
     
    445547    def test_invalid_parent(self):
    446548        """Test adding an object with invalid parent"""
    447         print "Test adding an object with invalid parent"""
    448 
    449549        try:
    450550            self.ldb.add({
     
    471571    def test_invalid_attribute(self):
    472572        """Test invalid attributes on schema/objectclasses"""
    473         print "Test invalid attributes on schema/objectclasses"""
    474 
    475573        # attributes not in schema test
    476574
     
    573671    def test_single_valued_attributes(self):
    574672        """Test single-valued attributes"""
    575         print "Test single-valued attributes"""
    576 
    577673        try:
    578674            self.ldb.add({
     
    618714    def test_attribute_ranges(self):
    619715        """Test attribute ranges"""
    620         print "Test attribute ranges"""
    621 
    622716        # Too short (min. 1)
    623717        try:
     
    673767    def test_empty_messages(self):
    674768        """Test empty messages"""
    675         print "Test empty messages"""
    676 
    677769        m = Message()
    678770        m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn)
     
    694786    def test_empty_attributes(self):
    695787        """Test empty attributes"""
    696         print "Test empty attributes"""
    697 
    698788        m = Message()
    699789        m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn)
     
    739829    def test_instanceType(self):
    740830        """Tests the 'instanceType' attribute"""
    741         print "Tests the 'instanceType' attribute"""
    742 
    743831        # The instance type is single-valued
    744832        try:
     
    806894        delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn)
    807895
     896        #only write is allowed with NC_HEAD for originating updates
     897        try:
     898            self.ldb.add({
     899                "dn": "cn=ldaptestuser2,cn=users," + self.base_dn,
     900                "objectclass": "user",
     901                "instanceType": "3" })
     902            self.fail()
     903        except LdbError, (num, _):
     904            self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
     905        delete_force(self.ldb, "cn=ldaptestuser2,cn=users," + self.base_dn)
     906
    808907    def test_distinguished_name(self):
    809908        """Tests the 'distinguishedName' attribute"""
    810         print "Tests the 'distinguishedName' attribute"""
    811 
    812909        # The "dn" shortcut isn't supported
    813910        m = Message()
     
    858955            self.fail()
    859956        except LdbError, (num, _):
    860             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
     957            self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
    861958
    862959        m = Message()
     
    882979            self.fail()
    883980        except LdbError, (num, _):
    884             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
     981            self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
    885982
    886983        delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn)
     
    888985    def test_rdn_name(self):
    889986        """Tests the RDN"""
    890         print "Tests the RDN"""
    891 
    892987        # Search
    893988
     
    10591154    def DISABLED_test_largeRDN(self):
    10601155        """Testing large rDN (limit 64 characters)"""
    1061         rdn = "CN=a012345678901234567890123456789012345678901234567890123456789012";
     1156        rdn = "CN=a012345678901234567890123456789012345678901234567890123456789012"
    10621157        delete_force(self.ldb, "%s,%s" % (rdn, self.base_dn))
    10631158        ldif = """
     
    10681163        delete_force(self.ldb, "%s,%s" % (rdn, self.base_dn))
    10691164
    1070         rdn = "CN=a0123456789012345678901234567890123456789012345678901234567890120";
     1165        rdn = "CN=a0123456789012345678901234567890123456789012345678901234567890120"
    10711166        delete_force(self.ldb, "%s,%s" % (rdn, self.base_dn))
    10721167        try:
     
    10831178    def test_rename(self):
    10841179        """Tests the rename operation"""
    1085         print "Tests the rename operations"""
    1086 
    10871180        try:
    10881181            # cannot rename to be a child of itself
     
    11941287    def test_rename_twice(self):
    11951288        """Tests the rename operation twice - this corresponds to a past bug"""
    1196         print "Tests the rename twice operation"""
    1197 
    11981289        self.ldb.add({
    11991290             "dn": "cn=ldaptestuser5,cn=users," + self.base_dn,
     
    12071298        ldb.rename("cn=ldaptestuser5,cn=Users," + self.base_dn, "cn=ldaptestUSER5,cn=users," + self.base_dn)
    12081299        res = ldb.search(expression="cn=ldaptestuser5")
    1209         print "Found %u records" % len(res)
    12101300        self.assertEquals(len(res), 1, "Wrong number of hits for cn=ldaptestuser5")
    12111301        res = ldb.search(expression="(&(cn=ldaptestuser5)(objectclass=user))")
    1212         print "Found %u records" % len(res)
    12131302        self.assertEquals(len(res), 1, "Wrong number of hits for (&(cn=ldaptestuser5)(objectclass=user))")
    12141303        delete_force(self.ldb, "cn=ldaptestuser5,cn=users," + self.base_dn)
     
    12161305    def test_objectGUID(self):
    12171306        """Test objectGUID behaviour"""
    1218         print "Testing objectGUID behaviour\n"
    1219 
    12201307        # The objectGUID cannot directly be set
    12211308        try:
     
    12321319            "dn": "cn=ldaptestcontainer," + self.base_dn,
    12331320            "objectClass": "container" })
    1234 
    1235         res = ldb.search("cn=ldaptestcontainer," + self.base_dn,
    1236                          scope=SCOPE_BASE,
    1237                          attrs=["objectGUID", "uSNCreated", "uSNChanged", "whenCreated", "whenChanged"])
    1238         self.assertTrue(len(res) == 1)
    1239         self.assertTrue("objectGUID" in res[0])
    1240         self.assertTrue("uSNCreated" in res[0])
    1241         self.assertTrue("uSNChanged" in res[0])
    1242         self.assertTrue("whenCreated" in res[0])
    1243         self.assertTrue("whenChanged" in res[0])
    1244 
    1245         delete_force(self.ldb, "cn=ldaptestcontainer," + self.base_dn)
    1246 
    1247         # All the following attributes are specificable on add operations
    1248         self.ldb.add({
    1249             "dn": "cn=ldaptestcontainer," + self.base_dn,
    1250             "objectClass": "container",
    1251             "uSNCreated" : "1",
    1252             "uSNChanged" : "1",
    1253             "whenCreated": timestring(long(time.time())),
    1254             "whenChanged": timestring(long(time.time())) })
    1255 
    1256         res = ldb.search("cn=ldaptestcontainer," + self.base_dn,
    1257                          scope=SCOPE_BASE,
    1258                          attrs=["objectGUID", "uSNCreated", "uSNChanged", "whenCreated", "whenChanged"])
    1259         self.assertTrue(len(res) == 1)
    1260         self.assertTrue("objectGUID" in res[0])
    1261         self.assertTrue("uSNCreated" in res[0])
    1262         self.assertFalse(res[0]["uSNCreated"][0] == "1") # these are corrected
    1263         self.assertTrue("uSNChanged" in res[0])
    1264         self.assertFalse(res[0]["uSNChanged"][0] == "1") # these are corrected
    1265 
    1266         delete_force(self.ldb, "cn=ldaptestcontainer," + self.base_dn)
    1267 
    1268         # All this attributes are specificable on add operations
    1269         self.ldb.add({
    1270             "dn": "cn=ldaptestcontainer," + self.base_dn,
    1271             "objectclass": "container",
    1272             "uSNCreated" : "1",
    1273             "uSNChanged" : "1",
    1274             "whenCreated": timestring(long(time.time())),
    1275             "whenChanged": timestring(long(time.time())) })
    1276 
    1277         res = ldb.search("cn=ldaptestcontainer," + self.base_dn,
    1278                          scope=SCOPE_BASE,
    1279                          attrs=["objectGUID", "uSNCreated", "uSNChanged", "whenCreated", "whenChanged"])
    1280         self.assertTrue(len(res) == 1)
    1281         self.assertTrue("objectGUID" in res[0])
    1282         self.assertTrue("uSNCreated" in res[0])
    1283         self.assertFalse(res[0]["uSNCreated"][0] == "1") # these are corrected
    1284         self.assertTrue("uSNChanged" in res[0])
    1285         self.assertFalse(res[0]["uSNChanged"][0] == "1") # these are corrected
    1286         self.assertTrue("whenCreated" in res[0])
    1287         self.assertTrue("whenChanged" in res[0])
    12881321
    12891322        # The objectGUID cannot directly be changed
     
    13031336    def test_parentGUID(self):
    13041337        """Test parentGUID behaviour"""
    1305         print "Testing parentGUID behaviour\n"
    1306 
    13071338        self.ldb.add({
    13081339            "dn": "cn=parentguidtest,cn=users," + self.base_dn,
    13091340            "objectclass":"user",
    1310             "samaccountname":"parentguidtest"});
     1341            "samaccountname":"parentguidtest"})
    13111342        res1 = ldb.search(base="cn=parentguidtest,cn=users," + self.base_dn, scope=SCOPE_BASE,
    1312                           attrs=["parentGUID", "samaccountname"]);
     1343                          attrs=["parentGUID", "samaccountname"])
    13131344        res2 = ldb.search(base="cn=users," + self.base_dn,scope=SCOPE_BASE,
    1314                           attrs=["objectGUID"]);
     1345                          attrs=["objectGUID"])
    13151346        res3 = ldb.search(base=self.base_dn, scope=SCOPE_BASE,
    1316                           attrs=["parentGUID"]);
     1347                          attrs=["parentGUID"])
    13171348        res4 = ldb.search(base=self.configuration_dn, scope=SCOPE_BASE,
    1318                           attrs=["parentGUID"]);
     1349                          attrs=["parentGUID"])
    13191350        res5 = ldb.search(base=self.schema_dn, scope=SCOPE_BASE,
    1320                           attrs=["parentGUID"]);
     1351                          attrs=["parentGUID"])
    13211352
    13221353        """Check if the parentGUID is valid """
    1323         self.assertEquals(res1[0]["parentGUID"], res2[0]["objectGUID"]);
     1354        self.assertEquals(res1[0]["parentGUID"], res2[0]["objectGUID"])
    13241355
    13251356        """Check if it returns nothing when there is no parent object - default NC"""
     
    13291360                has_parentGUID = True
    13301361                break
    1331         self.assertFalse(has_parentGUID);
     1362        self.assertFalse(has_parentGUID)
    13321363
    13331364        """Check if it returns nothing when there is no parent object - configuration NC"""
     
    13371368                has_parentGUID = True
    13381369                break
    1339         self.assertFalse(has_parentGUID);
     1370        self.assertFalse(has_parentGUID)
    13401371
    13411372        """Check if it returns nothing when there is no parent object - schema NC"""
     
    13451376                has_parentGUID = True
    13461377                break
    1347         self.assertFalse(has_parentGUID);
     1378        self.assertFalse(has_parentGUID)
    13481379
    13491380        """Ensures that if you look for another object attribute after the constructed
     
    13561387        self.assertTrue(has_another_attribute)
    13571388        self.assertTrue(len(res1[0]["samaccountname"]) == 1)
    1358         self.assertEquals(res1[0]["samaccountname"][0], "parentguidtest");
    1359 
    1360         print "Testing parentGUID behaviour on rename\n"
     1389        self.assertEquals(res1[0]["samaccountname"][0], "parentguidtest")
     1390
     1391        # Testing parentGUID behaviour on rename\
    13611392
    13621393        self.ldb.add({
    13631394            "dn": "cn=testotherusers," + self.base_dn,
    1364             "objectclass":"container"});
     1395            "objectclass":"container"})
    13651396        res1 = ldb.search(base="cn=testotherusers," + self.base_dn,scope=SCOPE_BASE,
    1366                           attrs=["objectGUID"]);
     1397                          attrs=["objectGUID"])
    13671398        ldb.rename("cn=parentguidtest,cn=users," + self.base_dn,
    1368                    "cn=parentguidtest,cn=testotherusers," + self.base_dn);
     1399                   "cn=parentguidtest,cn=testotherusers," + self.base_dn)
    13691400        res2 = ldb.search(base="cn=parentguidtest,cn=testotherusers," + self.base_dn,
    13701401                          scope=SCOPE_BASE,
    1371                           attrs=["parentGUID"]);
    1372         self.assertEquals(res1[0]["objectGUID"], res2[0]["parentGUID"]);
     1402                          attrs=["parentGUID"])
     1403        self.assertEquals(res1[0]["objectGUID"], res2[0]["parentGUID"])
    13731404
    13741405        delete_force(self.ldb, "cn=parentguidtest,cn=testotherusers," + self.base_dn)
    13751406        delete_force(self.ldb, "cn=testotherusers," + self.base_dn)
    13761407
     1408    def test_usnChanged(self):
     1409        """Test usnChanged behaviour"""
     1410
     1411        self.ldb.add({
     1412            "dn": "cn=ldaptestcontainer," + self.base_dn,
     1413            "objectClass": "container" })
     1414
     1415        res = ldb.search("cn=ldaptestcontainer," + self.base_dn,
     1416                         scope=SCOPE_BASE,
     1417                         attrs=["objectGUID", "uSNCreated", "uSNChanged", "whenCreated", "whenChanged", "description"])
     1418        self.assertTrue(len(res) == 1)
     1419        self.assertFalse("description" in res[0])
     1420        self.assertTrue("objectGUID" in res[0])
     1421        self.assertTrue("uSNCreated" in res[0])
     1422        self.assertTrue("uSNChanged" in res[0])
     1423        self.assertTrue("whenCreated" in res[0])
     1424        self.assertTrue("whenChanged" in res[0])
     1425
     1426        delete_force(self.ldb, "cn=ldaptestcontainer," + self.base_dn)
     1427
     1428        # All this attributes are specificable on add operations
     1429        self.ldb.add({
     1430            "dn": "cn=ldaptestcontainer," + self.base_dn,
     1431            "objectclass": "container",
     1432            "uSNCreated" : "1",
     1433            "uSNChanged" : "1",
     1434            "whenCreated": timestring(long(time.time())),
     1435            "whenChanged": timestring(long(time.time())) })
     1436
     1437        res = ldb.search("cn=ldaptestcontainer," + self.base_dn,
     1438                         scope=SCOPE_BASE,
     1439                         attrs=["objectGUID", "uSNCreated", "uSNChanged", "whenCreated", "whenChanged", "description"])
     1440        self.assertTrue(len(res) == 1)
     1441        self.assertFalse("description" in res[0])
     1442        self.assertTrue("objectGUID" in res[0])
     1443        self.assertTrue("uSNCreated" in res[0])
     1444        self.assertFalse(res[0]["uSNCreated"][0] == "1") # these are corrected
     1445        self.assertTrue("uSNChanged" in res[0])
     1446        self.assertFalse(res[0]["uSNChanged"][0] == "1") # these are corrected
     1447        self.assertTrue("whenCreated" in res[0])
     1448        self.assertTrue("whenChanged" in res[0])
     1449
     1450        ldb.modify_ldif("""
     1451dn: cn=ldaptestcontainer,""" + self.base_dn + """
     1452changetype: modify
     1453replace: description
     1454""")
     1455
     1456        res2 = ldb.search("cn=ldaptestcontainer," + self.base_dn,
     1457                         scope=SCOPE_BASE,
     1458                         attrs=["uSNCreated", "uSNChanged", "description"])
     1459        self.assertTrue(len(res) == 1)
     1460        self.assertFalse("description" in res2[0])
     1461        self.assertEqual(res[0]["usnCreated"], res2[0]["usnCreated"])
     1462        self.assertEqual(res[0]["usnCreated"], res2[0]["usnChanged"])
     1463        self.assertEqual(res[0]["usnChanged"], res2[0]["usnChanged"])
     1464
     1465        ldb.modify_ldif("""
     1466dn: cn=ldaptestcontainer,""" + self.base_dn + """
     1467changetype: modify
     1468replace: description
     1469description: test
     1470""")
     1471
     1472        res3 = ldb.search("cn=ldaptestcontainer," + self.base_dn,
     1473                         scope=SCOPE_BASE,
     1474                         attrs=["uSNCreated", "uSNChanged", "description"])
     1475        self.assertTrue(len(res) == 1)
     1476        self.assertTrue("description" in res3[0])
     1477        self.assertEqual("test", str(res3[0]["description"][0]))
     1478        self.assertEqual(res[0]["usnCreated"], res3[0]["usnCreated"])
     1479        self.assertNotEqual(res[0]["usnCreated"], res3[0]["usnChanged"])
     1480        self.assertNotEqual(res[0]["usnChanged"], res3[0]["usnChanged"])
     1481
     1482        ldb.modify_ldif("""
     1483dn: cn=ldaptestcontainer,""" + self.base_dn + """
     1484changetype: modify
     1485replace: description
     1486description: test
     1487""")
     1488
     1489        res4 = ldb.search("cn=ldaptestcontainer," + self.base_dn,
     1490                         scope=SCOPE_BASE,
     1491                         attrs=["uSNCreated", "uSNChanged", "description"])
     1492        self.assertTrue(len(res) == 1)
     1493        self.assertTrue("description" in res4[0])
     1494        self.assertEqual("test", str(res4[0]["description"][0]))
     1495        self.assertEqual(res[0]["usnCreated"], res4[0]["usnCreated"])
     1496        self.assertNotEqual(res3[0]["usnCreated"], res4[0]["usnChanged"])
     1497        self.assertEqual(res3[0]["usnChanged"], res4[0]["usnChanged"])
     1498
     1499        ldb.modify_ldif("""
     1500dn: cn=ldaptestcontainer,""" + self.base_dn + """
     1501changetype: modify
     1502replace: description
     1503description: test2
     1504""")
     1505
     1506        res5 = ldb.search("cn=ldaptestcontainer," + self.base_dn,
     1507                         scope=SCOPE_BASE,
     1508                         attrs=["uSNCreated", "uSNChanged", "description"])
     1509        self.assertTrue(len(res) == 1)
     1510        self.assertTrue("description" in res5[0])
     1511        self.assertEqual("test2", str(res5[0]["description"][0]))
     1512        self.assertEqual(res[0]["usnCreated"], res5[0]["usnCreated"])
     1513        self.assertNotEqual(res3[0]["usnChanged"], res5[0]["usnChanged"])
     1514
     1515        ldb.modify_ldif("""
     1516dn: cn=ldaptestcontainer,""" + self.base_dn + """
     1517changetype: modify
     1518delete: description
     1519description: test2
     1520""")
     1521
     1522        res6 = ldb.search("cn=ldaptestcontainer," + self.base_dn,
     1523                         scope=SCOPE_BASE,
     1524                         attrs=["uSNCreated", "uSNChanged", "description"])
     1525        self.assertTrue(len(res) == 1)
     1526        self.assertFalse("description" in res6[0])
     1527        self.assertEqual(res[0]["usnCreated"], res6[0]["usnCreated"])
     1528        self.assertNotEqual(res5[0]["usnChanged"], res6[0]["usnChanged"])
     1529
     1530        ldb.modify_ldif("""
     1531dn: cn=ldaptestcontainer,""" + self.base_dn + """
     1532changetype: modify
     1533add: description
     1534description: test3
     1535""")
     1536
     1537        res7 = ldb.search("cn=ldaptestcontainer," + self.base_dn,
     1538                         scope=SCOPE_BASE,
     1539                         attrs=["uSNCreated", "uSNChanged", "description"])
     1540        self.assertTrue(len(res) == 1)
     1541        self.assertTrue("description" in res7[0])
     1542        self.assertEqual("test3", str(res7[0]["description"][0]))
     1543        self.assertEqual(res[0]["usnCreated"], res7[0]["usnCreated"])
     1544        self.assertNotEqual(res6[0]["usnChanged"], res7[0]["usnChanged"])
     1545
     1546        ldb.modify_ldif("""
     1547dn: cn=ldaptestcontainer,""" + self.base_dn + """
     1548changetype: modify
     1549delete: description
     1550""")
     1551
     1552        res8 = ldb.search("cn=ldaptestcontainer," + self.base_dn,
     1553                         scope=SCOPE_BASE,
     1554                         attrs=["uSNCreated", "uSNChanged", "description"])
     1555        self.assertTrue(len(res) == 1)
     1556        self.assertFalse("description" in res8[0])
     1557        self.assertEqual(res[0]["usnCreated"], res8[0]["usnCreated"])
     1558        self.assertNotEqual(res7[0]["usnChanged"], res8[0]["usnChanged"])
     1559
     1560        delete_force(self.ldb, "cn=ldaptestcontainer," + self.base_dn)
     1561
    13771562    def test_groupType_int32(self):
    13781563        """Test groupType (int32) behaviour (should appear to be casted to a 32 bit signed integer before comparsion)"""
    1379         print "Testing groupType (int32) behaviour\n"
    13801564
    13811565        res1 = ldb.search(base=self.base_dn, scope=SCOPE_SUBTREE,
    1382                           attrs=["groupType"], expression="groupType=2147483653");
     1566                          attrs=["groupType"], expression="groupType=2147483653")
    13831567
    13841568        res2 = ldb.search(base=self.base_dn, scope=SCOPE_SUBTREE,
    1385                           attrs=["groupType"], expression="groupType=-2147483643");
     1569                          attrs=["groupType"], expression="groupType=-2147483643")
    13861570
    13871571        self.assertEquals(len(res1), len(res2))
     
    13931577    def test_linked_attributes(self):
    13941578        """This tests the linked attribute behaviour"""
    1395         print "Testing linked attribute behaviour\n"
    13961579
    13971580        ldb.add({
     
    14821665    def test_wkguid(self):
    14831666        """Test Well known GUID behaviours (including DN+Binary)"""
    1484         print "Test Well known GUID behaviours (including DN+Binary)"""
    14851667
    14861668        res = self.ldb.search(base=("<WKGUID=ab1d30f3768811d1aded00c04fd8d5cd,%s>" % self.base_dn), scope=SCOPE_BASE, attrs=[])
     
    14991681    def test_subschemasubentry(self):
    15001682        """Test subSchemaSubEntry appears when requested, but not when not requested"""
    1501         print "Test subSchemaSubEntry"""
    15021683
    15031684        res = self.ldb.search(base=self.base_dn, scope=SCOPE_BASE, attrs=["subSchemaSubEntry"])
     
    15121693        """Basic tests"""
    15131694
    1514         print "Testing user add"
     1695        # Testing user add
    15151696
    15161697        ldb.add({
     
    15611742                 })
    15621743
    1563         print "Testing ldb.search for (&(cn=ldaptestcomputer3)(objectClass=user))";
    1564         res = ldb.search(self.base_dn, expression="(&(cn=ldaptestcomputer3)(objectClass=user))");
     1744        # Testing ldb.search for (&(cn=ldaptestcomputer3)(objectClass=user))
     1745        res = ldb.search(self.base_dn, expression="(&(cn=ldaptestcomputer3)(objectClass=user))")
    15651746        self.assertEquals(len(res), 1, "Found only %d for (&(cn=ldaptestcomputer3)(objectClass=user))" % len(res))
    15661747
    1567         self.assertEquals(str(res[0].dn), ("CN=ldaptestcomputer3,CN=Computers," + self.base_dn));
    1568         self.assertEquals(res[0]["cn"][0], "ldaptestcomputer3");
    1569         self.assertEquals(res[0]["name"][0], "ldaptestcomputer3");
    1570         self.assertEquals(res[0]["objectClass"][0], "top");
    1571         self.assertEquals(res[0]["objectClass"][1], "person");
    1572         self.assertEquals(res[0]["objectClass"][2], "organizationalPerson");
    1573         self.assertEquals(res[0]["objectClass"][3], "user");
    1574         self.assertEquals(res[0]["objectClass"][4], "computer");
     1748        self.assertEquals(str(res[0].dn), ("CN=ldaptestcomputer3,CN=Computers," + self.base_dn))
     1749        self.assertEquals(res[0]["cn"][0], "ldaptestcomputer3")
     1750        self.assertEquals(res[0]["name"][0], "ldaptestcomputer3")
     1751        self.assertEquals(res[0]["objectClass"][0], "top")
     1752        self.assertEquals(res[0]["objectClass"][1], "person")
     1753        self.assertEquals(res[0]["objectClass"][2], "organizationalPerson")
     1754        self.assertEquals(res[0]["objectClass"][3], "user")
     1755        self.assertEquals(res[0]["objectClass"][4], "computer")
    15751756        self.assertTrue("objectGUID" in res[0])
    15761757        self.assertTrue("whenCreated" in res[0])
    1577         self.assertEquals(res[0]["objectCategory"][0], ("CN=Computer,CN=Schema,CN=Configuration," + self.base_dn));
    1578         self.assertEquals(int(res[0]["primaryGroupID"][0]), 513);
    1579         self.assertEquals(int(res[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT);
    1580         self.assertEquals(int(res[0]["userAccountControl"][0]), UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD | UF_ACCOUNTDISABLE);
     1758        self.assertEquals(res[0]["objectCategory"][0], ("CN=Computer,%s" % ldb.get_schema_basedn()))
     1759        self.assertEquals(int(res[0]["primaryGroupID"][0]), 513)
     1760        self.assertEquals(int(res[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT)
     1761        self.assertEquals(int(res[0]["userAccountControl"][0]), UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD | UF_ACCOUNTDISABLE)
    15811762
    15821763        delete_force(self.ldb, "cn=ldaptestcomputer3,cn=computers," + self.base_dn)
    15831764
    1584         print "Testing attribute or value exists behaviour"
     1765        # Testing attribute or value exists behaviour
    15851766        try:
    15861767            ldb.modify_ldif("""
     
    16141795            self.assertEquals(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS)
    16151796
    1616         print "Testing ranged results"
     1797        # Testing ranged results
    16171798        ldb.modify_ldif("""
    16181799dn: cn=ldaptest2computer,cn=computers,""" + self.base_dn + """
     
    16601841                         attrs=["servicePrincipalName;range=0-*"])
    16611842        self.assertEquals(len(res), 1, "Could not find (cn=ldaptest2computer)")
    1662         #print len(res[0]["servicePrincipalName;range=0-*"])
    16631843        self.assertEquals(len(res[0]["servicePrincipalName;range=0-*"]), 30)
    16641844
    16651845        res = ldb.search(self.base_dn, expression="(cn=ldaptest2computer))", scope=SCOPE_SUBTREE, attrs=["servicePrincipalName;range=0-19"])
    16661846        self.assertEquals(len(res), 1, "Could not find (cn=ldaptest2computer)")
    1667             # print res[0]["servicePrincipalName;range=0-19"].length
    16681847        self.assertEquals(len(res[0]["servicePrincipalName;range=0-19"]), 20)
    16691848
     
    16901869        self.assertEquals(len(res), 1, "Could not find (cn=ldaptest2computer)")
    16911870        self.assertEquals(len(res[0]["servicePrincipalName;range=11-*"]), 19)
    1692             # print res[0]["servicePrincipalName;range=11-*"][18]
    1693             # print pos_11
    16941871            # self.assertEquals((res[0]["servicePrincipalName;range=11-*"][18]), pos_11)
    16951872
     
    17011878        res = ldb.search(self.base_dn, expression="(cn=ldaptest2computer))", scope=SCOPE_SUBTREE, attrs=["servicePrincipalName"])
    17021879        self.assertEquals(len(res), 1, "Could not find (cn=ldaptest2computer)")
    1703             # print res[0]["servicePrincipalName"][18]
    1704             # print pos_11
    17051880        self.assertEquals(len(res[0]["servicePrincipalName"]), 30)
    17061881            # self.assertEquals(res[0]["servicePrincipalName"][18], pos_11)
     
    17141889            "sn": "ldap user2"})
    17151890
    1716         print "Testing Ambigious Name Resolution"
     1891        # Testing Ambigious Name Resolution
    17171892        # Testing ldb.search for (&(anr=ldap testy)(objectClass=user))
    17181893        res = ldb.search(expression="(&(anr=ldap testy)(objectClass=user))")
     
    17961971#        self.assertEquals(len(res), 0, "Found (&(anr==\"testy ldap\")(objectClass=user))")
    17971972
    1798         print "Testing Renames"
     1973        # Testing Renames
    17991974
    18001975        attrs = ["objectGUID", "objectSid"]
    1801         print "Testing ldb.search for (&(cn=ldaptestUSer2)(objectClass=user))"
     1976        # Testing ldb.search for (&(cn=ldaptestUSer2)(objectClass=user))
    18021977        res_user = ldb.search(self.base_dn, expression="(&(cn=ldaptestUSer2)(objectClass=user))", scope=SCOPE_SUBTREE, attrs=attrs)
    18031978        self.assertEquals(len(res_user), 1, "Could not find (&(cn=ldaptestUSer2)(objectClass=user))")
     
    18061981        ldb.rename("<SID=" + ldb.schema_format_value("objectSID", res_user[0]["objectSID"][0]) + ">" , "cn=ldaptestUSER3,cn=users," + self.base_dn)
    18071982
    1808         print "Testing ldb.search for (&(cn=ldaptestuser3)(objectClass=user))"
     1983        # Testing ldb.search for (&(cn=ldaptestuser3)(objectClass=user))
    18091984        res = ldb.search(expression="(&(cn=ldaptestuser3)(objectClass=user))")
    18101985        self.assertEquals(len(res), 1, "Could not find (&(cn=ldaptestuser3)(objectClass=user))")
     
    18342009        self.assertEquals(len(res), 0, "(&(&(cn=ldaptestuser3)(userAccountControl=547))(objectClass=user))")
    18352010
    1836         # This is a Samba special, and does not exist in real AD
    1837         #    print "Testing ldb.search for (dn=CN=ldaptestUSER3,CN=Users," + self.base_dn + ")"
    1838         #    res = ldb.search("(dn=CN=ldaptestUSER3,CN=Users," + self.base_dn + ")")
    1839         #    if (res.error != 0 || len(res) != 1) {
    1840         #        print "Could not find (dn=CN=ldaptestUSER3,CN=Users," + self.base_dn + ")"
    1841         #        self.assertEquals(len(res), 1)
    1842         #    }
    1843         #    self.assertEquals(res[0].dn, ("CN=ldaptestUSER3,CN=Users," + self.base_dn))
    1844         #    self.assertEquals(res[0].cn, "ldaptestUSER3")
    1845         #    self.assertEquals(res[0].name, "ldaptestUSER3")
    1846 
    1847         print "Testing ldb.search for (distinguishedName=CN=ldaptestUSER3,CN=Users," + self.base_dn + ")"
     2011        # Testing ldb.search for (dn=CN=ldaptestUSER3,CN=Users," + self.base_dn + ") - should not work
     2012        res = ldb.search(expression="(dn=CN=ldaptestUSER3,CN=Users," + self.base_dn + ")")
     2013        self.assertEquals(len(res), 0, "Could find (dn=CN=ldaptestUSER3,CN=Users," + self.base_dn + ")")
     2014
     2015        # Testing ldb.search for (distinguishedName=CN=ldaptestUSER3,CN=Users," + self.base_dn + ")
    18482016        res = ldb.search(expression="(distinguishedName=CN=ldaptestUSER3,CN=Users," + self.base_dn + ")")
    1849         self.assertEquals(len(res), 1, "Could not find (dn=CN=ldaptestUSER3,CN=Users," + self.base_dn + ")")
     2017        self.assertEquals(len(res), 1, "Could not find (distinguishedName=CN=ldaptestUSER3,CN=Users," + self.base_dn + ")")
    18502018        self.assertEquals(str(res[0].dn), ("CN=ldaptestUSER3,CN=Users," + self.base_dn))
    18512019        self.assertEquals(str(res[0]["cn"]), "ldaptestUSER3")
     
    18842052            self.assertEquals(num, ERR_ENTRY_ALREADY_EXISTS)
    18852053        try:
    1886             ldb.rename("cn=ldaptestuser3,cn=users," + self.base_dn, "cn=ldaptestuser3,cn=configuration," + self.base_dn)
     2054            ldb.rename("cn=ldaptestuser3,cn=users,%s" % self.base_dn, "cn=ldaptestuser3,%s" % ldb.get_config_basedn())
    18872055            self.fail()
    18882056        except LdbError, (num, _):
     
    18972065        ldb.rename("cn=ldaptestgroup,cn=users," + self.base_dn, "cn=ldaptestgroup2,cn=users," + self.base_dn)
    18982066
    1899         print "Testing subtree renames"
     2067        # Testing subtree renames
    19002068
    19012069        ldb.add({"dn": "cn=ldaptestcontainer," + self.base_dn,
     
    19242092""")
    19252093
    1926         print "Testing ldb.rename of cn=ldaptestcontainer," + self.base_dn + " to cn=ldaptestcontainer2," + self.base_dn
     2094        # Testing ldb.rename of cn=ldaptestcontainer," + self.base_dn + " to cn=ldaptestcontainer2," + self.base_dn
    19272095        ldb.rename("CN=ldaptestcontainer," + self.base_dn, "CN=ldaptestcontainer2," + self.base_dn)
    19282096
    1929         print "Testing ldb.search for (&(cn=ldaptestuser4)(objectClass=user))"
     2097        # Testing ldb.search for (&(cn=ldaptestuser4)(objectClass=user))
    19302098        res = ldb.search(expression="(&(cn=ldaptestuser4)(objectClass=user))")
    19312099        self.assertEquals(len(res), 1, "Could not find (&(cn=ldaptestuser4)(objectClass=user))")
    19322100
    1933         print "Testing subtree ldb.search for (&(cn=ldaptestuser4)(objectClass=user)) in (just renamed from) cn=ldaptestcontainer," + self.base_dn
     2101        # Testing subtree ldb.search for (&(cn=ldaptestuser4)(objectClass=user)) in (just renamed from) cn=ldaptestcontainer," + self.base_dn
    19342102        try:
    19352103            res = ldb.search("cn=ldaptestcontainer," + self.base_dn,
     
    19402108            self.assertEquals(num, ERR_NO_SUCH_OBJECT)
    19412109
    1942         print "Testing one-level ldb.search for (&(cn=ldaptestuser4)(objectClass=user)) in (just renamed from) cn=ldaptestcontainer," + self.base_dn
     2110        # Testing one-level ldb.search for (&(cn=ldaptestuser4)(objectClass=user)) in (just renamed from) cn=ldaptestcontainer," + self.base_dn
    19432111        try:
    19442112            res = ldb.search("cn=ldaptestcontainer," + self.base_dn,
     
    19482116            self.assertEquals(num, ERR_NO_SUCH_OBJECT)
    19492117
    1950         print "Testing ldb.search for (&(cn=ldaptestuser4)(objectClass=user)) in renamed container"
     2118        # Testing ldb.search for (&(cn=ldaptestuser4)(objectClass=user)) in renamed container"
    19512119        res = ldb.search("cn=ldaptestcontainer2," + self.base_dn, expression="(&(cn=ldaptestuser4)(objectClass=user))", scope=SCOPE_SUBTREE)
    19522120        self.assertEquals(len(res), 1, "Could not find (&(cn=ldaptestuser4)(objectClass=user)) under cn=ldaptestcontainer2," + self.base_dn)
     
    19572125        time.sleep(4)
    19582126
    1959         print "Testing ldb.search for (&(member=CN=ldaptestuser4,CN=ldaptestcontainer2," + self.base_dn + ")(objectclass=group)) to check subtree renames and linked attributes"
     2127        # Testing ldb.search for (&(member=CN=ldaptestuser4,CN=ldaptestcontainer2," + self.base_dn + ")(objectclass=group)) to check subtree renames and linked attributes"
    19602128        res = ldb.search(self.base_dn, expression="(&(member=CN=ldaptestuser4,CN=ldaptestcontainer2," + self.base_dn + ")(objectclass=group))", scope=SCOPE_SUBTREE)
    19612129        self.assertEquals(len(res), 1, "Could not find (&(member=CN=ldaptestuser4,CN=ldaptestcontainer2," + self.base_dn + ")(objectclass=group)), perhaps linked attributes are not consistant with subtree renames?")
    19622130
    1963         print "Testing ldb.rename (into itself) of cn=ldaptestcontainer2," + self.base_dn + " to cn=ldaptestcontainer,cn=ldaptestcontainer2," + self.base_dn
     2131        # Testing ldb.rename (into itself) of cn=ldaptestcontainer2," + self.base_dn + " to cn=ldaptestcontainer,cn=ldaptestcontainer2," + self.base_dn
    19642132        try:
    19652133            ldb.rename("cn=ldaptestcontainer2," + self.base_dn, "cn=ldaptestcontainer,cn=ldaptestcontainer2," + self.base_dn)
     
    19682136            self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
    19692137
    1970         print "Testing ldb.rename (into non-existent container) of cn=ldaptestcontainer2," + self.base_dn + " to cn=ldaptestcontainer,cn=ldaptestcontainer3," + self.base_dn
     2138        # Testing ldb.rename (into non-existent container) of cn=ldaptestcontainer2," + self.base_dn + " to cn=ldaptestcontainer,cn=ldaptestcontainer3," + self.base_dn
    19712139        try:
    19722140            ldb.rename("cn=ldaptestcontainer2," + self.base_dn, "cn=ldaptestcontainer,cn=ldaptestcontainer3," + self.base_dn)
     
    19752143            self.assertTrue(num in (ERR_UNWILLING_TO_PERFORM, ERR_OTHER))
    19762144
    1977         print "Testing delete (should fail, not a leaf node) of renamed cn=ldaptestcontainer2," + self.base_dn
     2145        # Testing delete (should fail, not a leaf node) of renamed cn=ldaptestcontainer2," + self.base_dn
    19782146        try:
    19792147            ldb.delete("cn=ldaptestcontainer2," + self.base_dn)
     
    19822150            self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF)
    19832151
    1984         print "Testing base ldb.search for CN=ldaptestuser4,CN=ldaptestcontainer2," + self.base_dn
     2152        # Testing base ldb.search for CN=ldaptestuser4,CN=ldaptestcontainer2," + self.base_dn
    19852153        res = ldb.search(expression="(objectclass=*)", base=("CN=ldaptestuser4,CN=ldaptestcontainer2," + self.base_dn), scope=SCOPE_BASE)
    19862154        self.assertEquals(len(res), 1)
     
    19882156        self.assertEquals(len(res), 0)
    19892157
    1990         print "Testing one-level ldb.search for (&(cn=ldaptestuser4)(objectClass=user)) in cn=ldaptestcontainer2," + self.base_dn
     2158        # Testing one-level ldb.search for (&(cn=ldaptestuser4)(objectClass=user)) in cn=ldaptestcontainer2," + self.base_dn
    19912159        res = ldb.search(expression="(&(cn=ldaptestuser4)(objectClass=user))", base=("cn=ldaptestcontainer2," + self.base_dn), scope=SCOPE_ONELEVEL)
    1992         # FIXME: self.assertEquals(len(res), 0)
    1993 
    1994         print "Testing one-level ldb.search for (&(cn=ldaptestuser4)(objectClass=user)) in cn=ldaptestcontainer2," + self.base_dn
     2160        self.assertEquals(len(res), 1)
     2161
     2162        # Testing one-level ldb.search for (&(cn=ldaptestuser4)(objectClass=user)) in cn=ldaptestcontainer2," + self.base_dn
    19952163        res = ldb.search(expression="(&(cn=ldaptestuser4)(objectClass=user))", base=("cn=ldaptestcontainer2," + self.base_dn), scope=SCOPE_SUBTREE)
    1996         # FIXME: self.assertEquals(len(res), 0)
    1997 
    1998         print "Testing delete of subtree renamed "+("CN=ldaptestuser4,CN=ldaptestcontainer2," + self.base_dn)
     2164        self.assertEquals(len(res), 1)
     2165
     2166        # Testing delete of subtree renamed "+("CN=ldaptestuser4,CN=ldaptestcontainer2," + self.base_dn)
    19992167        ldb.delete(("CN=ldaptestuser4,CN=ldaptestcontainer2," + self.base_dn))
    2000         print "Testing delete of renamed cn=ldaptestcontainer2," + self.base_dn
     2168        # Testing delete of renamed cn=ldaptestcontainer2," + self.base_dn
    20012169        ldb.delete("cn=ldaptestcontainer2," + self.base_dn)
    20022170
     
    20052173        ldb.add({"dn": "cn=ldaptestutf8user2  Úùéìòà,cn=users," + self.base_dn, "objectClass": "user"})
    20062174
    2007         print "Testing ldb.search for (&(cn=ldaptestuser)(objectClass=user))"
     2175        # Testing ldb.search for (&(cn=ldaptestuser)(objectClass=user))"
    20082176        res = ldb.search(expression="(&(cn=ldaptestuser)(objectClass=user))")
    20092177        self.assertEquals(len(res), 1, "Could not find (&(cn=ldaptestuser)(objectClass=user))")
     
    20152183        self.assertTrue("objectGUID" in res[0])
    20162184        self.assertTrue("whenCreated" in res[0])
    2017         self.assertEquals(str(res[0]["objectCategory"]), ("CN=Person,CN=Schema,CN=Configuration," + self.base_dn))
     2185        self.assertEquals(str(res[0]["objectCategory"]), ("CN=Person,%s" % ldb.get_schema_basedn()))
    20182186        self.assertEquals(int(res[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT)
    20192187        self.assertEquals(int(res[0]["userAccountControl"][0]), UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD | UF_ACCOUNTDISABLE)
     
    20212189        self.assertEquals(len(res[0]["memberOf"]), 1)
    20222190
    2023         print "Testing ldb.search for (&(cn=ldaptestuser)(objectCategory=cn=person,cn=schema,cn=configuration," + self.base_dn + "))"
    2024         res2 = ldb.search(expression="(&(cn=ldaptestuser)(objectCategory=cn=person,cn=schema,cn=configuration," + self.base_dn + "))")
    2025         self.assertEquals(len(res2), 1, "Could not find (&(cn=ldaptestuser)(objectCategory=cn=person,cn=schema,cn=configuration," + self.base_dn + "))")
     2191        # Testing ldb.search for (&(cn=ldaptestuser)(objectCategory=cn=person,%s))" % ldb.get_schema_basedn()
     2192        res2 = ldb.search(expression="(&(cn=ldaptestuser)(objectCategory=cn=person,%s))" % ldb.get_schema_basedn())
     2193        self.assertEquals(len(res2), 1, "Could not find (&(cn=ldaptestuser)(objectCategory=cn=person,%s))" % ldb.get_schema_basedn())
    20262194
    20272195        self.assertEquals(res[0].dn, res2[0].dn)
    20282196
    2029         print "Testing ldb.search for (&(cn=ldaptestuser)(objectCategory=PerSon))"
     2197        # Testing ldb.search for (&(cn=ldaptestuser)(objectCategory=PerSon))"
    20302198        res3 = ldb.search(expression="(&(cn=ldaptestuser)(objectCategory=PerSon))")
    20312199        self.assertEquals(len(res3), 1, "Could not find (&(cn=ldaptestuser)(objectCategory=PerSon)): matched %d" % len(res3))
     
    20342202
    20352203        if gc_ldb is not None:
    2036             print "Testing ldb.search for (&(cn=ldaptestuser)(objectCategory=PerSon)) in Global Catalog"
     2204            # Testing ldb.search for (&(cn=ldaptestuser)(objectCategory=PerSon)) in Global Catalog"
    20372205            res3gc = gc_ldb.search(expression="(&(cn=ldaptestuser)(objectCategory=PerSon))")
    20382206            self.assertEquals(len(res3gc), 1)
     
    20402208            self.assertEquals(res[0].dn, res3gc[0].dn)
    20412209
    2042         print "Testing ldb.search for (&(cn=ldaptestuser)(objectCategory=PerSon)) in with 'phantom root' control"
     2210        # Testing ldb.search for (&(cn=ldaptestuser)(objectCategory=PerSon)) in with 'phantom root' control"
    20432211
    20442212        if gc_ldb is not None:
     
    20502218        ldb.delete(res[0].dn)
    20512219
    2052         print "Testing ldb.search for (&(cn=ldaptestcomputer)(objectClass=user))"
     2220        # Testing ldb.search for (&(cn=ldaptestcomputer)(objectClass=user))"
    20532221        res = ldb.search(expression="(&(cn=ldaptestcomputer)(objectClass=user))")
    20542222        self.assertEquals(len(res), 1, "Could not find (&(cn=ldaptestuser)(objectClass=user))")
     
    20602228        self.assertTrue("objectGUID" in res[0])
    20612229        self.assertTrue("whenCreated" in res[0])
    2062         self.assertEquals(str(res[0]["objectCategory"]), ("CN=Computer,CN=Schema,CN=Configuration," + self.base_dn))
     2230        self.assertEquals(str(res[0]["objectCategory"]), ("CN=Computer,%s" % ldb.get_schema_basedn()))
    20632231        self.assertEquals(int(res[0]["primaryGroupID"][0]), 513)
    20642232        self.assertEquals(int(res[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT)
     
    20672235        self.assertEquals(len(res[0]["memberOf"]), 1)
    20682236
    2069         print "Testing ldb.search for (&(cn=ldaptestcomputer)(objectCategory=cn=computer,cn=schema,cn=configuration," + self.base_dn + "))"
    2070         res2 = ldb.search(expression="(&(cn=ldaptestcomputer)(objectCategory=cn=computer,cn=schema,cn=configuration," + self.base_dn + "))")
    2071         self.assertEquals(len(res2), 1, "Could not find (&(cn=ldaptestcomputer)(objectCategory=cn=computer,cn=schema,cn=configuration," + self.base_dn + "))")
     2237        # Testing ldb.search for (&(cn=ldaptestcomputer)(objectCategory=cn=computer,%s))" % ldb.get_schema_basedn()
     2238        res2 = ldb.search(expression="(&(cn=ldaptestcomputer)(objectCategory=cn=computer,%s))" % ldb.get_schema_basedn())
     2239        self.assertEquals(len(res2), 1, "Could not find (&(cn=ldaptestcomputer)(objectCategory=cn=computer,%s))" % ldb.get_schema_basedn())
    20722240
    20732241        self.assertEquals(res[0].dn, res2[0].dn)
    20742242
    20752243        if gc_ldb is not None:
    2076             print "Testing ldb.search for (&(cn=ldaptestcomputer)(objectCategory=cn=computer,cn=schema,cn=configuration," + self.base_dn + ")) in Global Catlog"
    2077             res2gc = gc_ldb.search(expression="(&(cn=ldaptestcomputer)(objectCategory=cn=computer,cn=schema,cn=configuration," + self.base_dn + "))")
    2078             self.assertEquals(len(res2gc), 1, "Could not find (&(cn=ldaptestcomputer)(objectCategory=cn=computer,cn=schema,cn=configuration," + self.base_dn + ")) in Global Catlog")
     2244            # Testing ldb.search for (&(cn=ldaptestcomputer)(objectCategory=cn=computer,%s)) in Global Catalog" % gc_ldb.get_schema_basedn()
     2245            res2gc = gc_ldb.search(expression="(&(cn=ldaptestcomputer)(objectCategory=cn=computer,%s))" % gc_ldb.get_schema_basedn())
     2246            self.assertEquals(len(res2gc), 1, "Could not find (&(cn=ldaptestcomputer)(objectCategory=cn=computer,%s)) In Global Catalog" % gc_ldb.get_schema_basedn())
    20792247
    20802248            self.assertEquals(res[0].dn, res2gc[0].dn)
    20812249
    2082         print "Testing ldb.search for (&(cn=ldaptestcomputer)(objectCategory=compuTER))"
     2250        # Testing ldb.search for (&(cn=ldaptestcomputer)(objectCategory=compuTER))"
    20832251        res3 = ldb.search(expression="(&(cn=ldaptestcomputer)(objectCategory=compuTER))")
    20842252        self.assertEquals(len(res3), 1, "Could not find (&(cn=ldaptestcomputer)(objectCategory=compuTER))")
     
    20872255
    20882256        if gc_ldb is not None:
    2089             print "Testing ldb.search for (&(cn=ldaptestcomputer)(objectCategory=compuTER)) in Global Catalog"
     2257            # Testing ldb.search for (&(cn=ldaptestcomputer)(objectCategory=compuTER)) in Global Catalog"
    20902258            res3gc = gc_ldb.search(expression="(&(cn=ldaptestcomputer)(objectCategory=compuTER))")
    20912259            self.assertEquals(len(res3gc), 1, "Could not find (&(cn=ldaptestcomputer)(objectCategory=compuTER)) in Global Catalog")
     
    20932261            self.assertEquals(res[0].dn, res3gc[0].dn)
    20942262
    2095         print "Testing ldb.search for (&(cn=ldaptestcomp*r)(objectCategory=compuTER))"
     2263        # Testing ldb.search for (&(cn=ldaptestcomp*r)(objectCategory=compuTER))"
    20962264        res4 = ldb.search(expression="(&(cn=ldaptestcomp*r)(objectCategory=compuTER))")
    20972265        self.assertEquals(len(res4), 1, "Could not find (&(cn=ldaptestcomp*r)(objectCategory=compuTER))")
     
    20992267        self.assertEquals(res[0].dn, res4[0].dn)
    21002268
    2101         print "Testing ldb.search for (&(cn=ldaptestcomput*)(objectCategory=compuTER))"
     2269        # Testing ldb.search for (&(cn=ldaptestcomput*)(objectCategory=compuTER))"
    21022270        res5 = ldb.search(expression="(&(cn=ldaptestcomput*)(objectCategory=compuTER))")
    21032271        self.assertEquals(len(res5), 1, "Could not find (&(cn=ldaptestcomput*)(objectCategory=compuTER))")
     
    21052273        self.assertEquals(res[0].dn, res5[0].dn)
    21062274
    2107         print "Testing ldb.search for (&(cn=*daptestcomputer)(objectCategory=compuTER))"
     2275        # Testing ldb.search for (&(cn=*daptestcomputer)(objectCategory=compuTER))"
    21082276        res6 = ldb.search(expression="(&(cn=*daptestcomputer)(objectCategory=compuTER))")
    21092277        self.assertEquals(len(res6), 1, "Could not find (&(cn=*daptestcomputer)(objectCategory=compuTER))")
     
    21132281        ldb.delete("<GUID=" + ldb.schema_format_value("objectGUID", res[0]["objectGUID"][0]) + ">")
    21142282
    2115         print "Testing ldb.search for (&(cn=ldaptest2computer)(objectClass=user))"
     2283        # Testing ldb.search for (&(cn=ldaptest2computer)(objectClass=user))"
    21162284        res = ldb.search(expression="(&(cn=ldaptest2computer)(objectClass=user))")
    21172285        self.assertEquals(len(res), 1, "Could not find (&(cn=ldaptest2computer)(objectClass=user))")
     
    21232291        self.assertTrue("objectGUID" in res[0])
    21242292        self.assertTrue("whenCreated" in res[0])
    2125         self.assertEquals(res[0]["objectCategory"][0], "CN=Computer,CN=Schema,CN=Configuration," + self.base_dn)
     2293        self.assertEquals(res[0]["objectCategory"][0], "CN=Computer,%s" % ldb.get_schema_basedn())
    21262294        self.assertEquals(int(res[0]["sAMAccountType"][0]), ATYPE_WORKSTATION_TRUST)
    21272295        self.assertEquals(int(res[0]["userAccountControl"][0]), UF_WORKSTATION_TRUST_ACCOUNT)
     
    21302298
    21312299        attrs = ["cn", "name", "objectClass", "objectGUID", "objectSID", "whenCreated", "nTSecurityDescriptor", "memberOf", "allowedAttributes", "allowedAttributesEffective"]
    2132         print "Testing ldb.search for (&(cn=ldaptestUSer2)(objectClass=user))"
     2300        # Testing ldb.search for (&(cn=ldaptestUSer2)(objectClass=user))"
    21332301        res_user = ldb.search(self.base_dn, expression="(&(cn=ldaptestUSer2)(objectClass=user))", scope=SCOPE_SUBTREE, attrs=attrs)
    21342302        self.assertEquals(len(res_user), 1, "Could not find (&(cn=ldaptestUSer2)(objectClass=user))")
     
    21502318
    21512319        attrs = ["cn", "name", "objectClass", "objectGUID", "objectSID", "whenCreated", "nTSecurityDescriptor", "member", "allowedAttributes", "allowedAttributesEffective"]
    2152         print "Testing ldb.search for (&(cn=ldaptestgroup2)(objectClass=group))"
     2320        # Testing ldb.search for (&(cn=ldaptestgroup2)(objectClass=group))"
    21532321        res = ldb.search(self.base_dn, expression="(&(cn=ldaptestgroup2)(objectClass=group))", scope=SCOPE_SUBTREE, attrs=attrs)
    21542322        self.assertEquals(len(res), 1, "Could not find (&(cn=ldaptestgroup2)(objectClass=group))")
     
    21802348        self.assertTrue(("<GUID=" + ldb.schema_format_value("objectGUID", ldaptestuser2_guid) + ">;<SID=" + ldb.schema_format_value("objectSid", ldaptestuser2_sid) + ">;CN=ldaptestuser2,CN=Users," + self.base_dn).upper() in memberUP)
    21812349
    2182         print "Quicktest for linked attributes"
     2350        # Quicktest for linked attributes"
    21832351        ldb.modify_ldif("""
    21842352dn: cn=ldaptestgroup2,cn=users,""" + self.base_dn + """
     
    22432411
    22442412        attrs = ["cn", "name", "objectClass", "objectGUID", "whenCreated", "nTSecurityDescriptor", "member"]
    2245         print "Testing ldb.search for (&(cn=ldaptestgroup2)(objectClass=group)) to check linked delete"
     2413        # Testing ldb.search for (&(cn=ldaptestgroup2)(objectClass=group)) to check linked delete"
    22462414        res = ldb.search(self.base_dn, expression="(&(cn=ldaptestgroup2)(objectClass=group))", scope=SCOPE_SUBTREE, attrs=attrs)
    22472415        self.assertEquals(len(res), 1, "Could not find (&(cn=ldaptestgroup2)(objectClass=group)) to check linked delete")
     
    22502418        self.assertTrue("member" not in res[0])
    22512419
    2252         print "Testing ldb.search for (&(cn=ldaptestutf8user ÈÙÉÌÒÀ)(objectClass=user))"
    2253 # TODO UTF8 users don't seem to work fully anymore
    2254 #        res = ldb.search(expression="(&(cn=ldaptestutf8user ÈÙÉÌÒÀ)(objectClass=user))")
     2420        # Testing ldb.search for (&(cn=ldaptestutf8user ÈÙÉÌÒÀ)(objectClass=user))"
     2421        res = ldb.search(expression="(&(cn=ldaptestutf8user ÈÙÉÌÒÀ)(objectClass=user))")
     2422        self.assertEquals(len(res), 1, "Could not find (&(cn=ldaptestutf8user ÈÙÉÌÒÀ)(objectClass=user))")
    22552423        res = ldb.search(expression="(&(cn=ldaptestutf8user Úùéìòà)(objectclass=user))")
    22562424        self.assertEquals(len(res), 1, "Could not find (&(cn=ldaptestutf8user ÈÙÉÌÒÀ)(objectClass=user))")
     
    22632431        self.assertTrue("whenCreated" in res[0])
    22642432
     2433        # delete "ldaptestutf8user"
    22652434        ldb.delete(res[0].dn)
    22662435
    2267         print "Testing ldb.search for (&(cn=ldaptestutf8user2*)(objectClass=user))"
     2436        # Testing ldb.search for (&(cn=ldaptestutf8user2*)(objectClass=user))"
    22682437        res = ldb.search(expression="(&(cn=ldaptestutf8user2*)(objectClass=user))")
    22692438        self.assertEquals(len(res), 1, "Could not find (&(cn=ldaptestutf8user2*)(objectClass=user))")
    22702439
     2440        # Testing ldb.search for (&(cn=ldaptestutf8user2  ÈÙÉÌÒÀ)(objectClass=user))"
     2441        res = ldb.search(expression="(&(cn=ldaptestutf8user2  ÈÙÉÌÒÀ)(objectClass=user))")
     2442        self.assertEquals(len(res), 1, "Could not find (&(cn=ldaptestutf8user2  ÈÙÉÌÒÀ)(objectClass=user))")
     2443
     2444        # delete "ldaptestutf8user2 "
    22712445        ldb.delete(res[0].dn)
    22722446
    22732447        ldb.delete(("CN=ldaptestgroup2,CN=Users," + self.base_dn))
    22742448
    2275         print "Testing ldb.search for (&(cn=ldaptestutf8user2 ÈÙÉÌÒÀ)(objectClass=user))"
    2276 # TODO UTF8 users don't seem to work fully anymore
    2277 #        res = ldb.search(expression="(&(cn=ldaptestutf8user ÈÙÉÌÒÀ)(objectClass=user))")
    2278 #        self.assertEquals(len(res), 1, "Could not find (&(cn=ldaptestutf8user ÈÙÉÌÒÀ)(objectClass=user))")
    2279 
    2280         print "Testing that we can't get at the configuration DN from the main search base"
     2449        # Testing that we can't get at the configuration DN from the main search base"
    22812450        res = ldb.search(self.base_dn, expression="objectClass=crossRef", scope=SCOPE_SUBTREE, attrs=["cn"])
    22822451        self.assertEquals(len(res), 0)
    22832452
    2284         print "Testing that we can get at the configuration DN from the main search base on the LDAP port with the 'phantom root' search_options control"
     2453        # Testing that we can get at the configuration DN from the main search base on the LDAP port with the 'phantom root' search_options control"
    22852454        res = ldb.search(self.base_dn, expression="objectClass=crossRef", scope=SCOPE_SUBTREE, attrs=["cn"], controls=["search_options:1:2"])
    22862455        self.assertTrue(len(res) > 0)
    22872456
    22882457        if gc_ldb is not None:
    2289             print "Testing that we can get at the configuration DN from the main search base on the GC port with the search_options control == 0"
     2458            # Testing that we can get at the configuration DN from the main search base on the GC port with the search_options control == 0"
    22902459
    22912460            res = gc_ldb.search(self.base_dn, expression="objectClass=crossRef", scope=SCOPE_SUBTREE, attrs=["cn"], controls=["search_options:1:0"])
    22922461            self.assertTrue(len(res) > 0)
    22932462
    2294             print "Testing that we do find configuration elements in the global catlog"
     2463            # Testing that we do find configuration elements in the global catlog"
    22952464            res = gc_ldb.search(self.base_dn, expression="objectClass=crossRef", scope=SCOPE_SUBTREE, attrs=["cn"])
    22962465            self.assertTrue(len(res) > 0)
    22972466
    2298             print "Testing that we do find configuration elements and user elements at the same time"
     2467            # Testing that we do find configuration elements and user elements at the same time"
    22992468            res = gc_ldb.search(self.base_dn, expression="(|(objectClass=crossRef)(objectClass=person))", scope=SCOPE_SUBTREE, attrs=["cn"])
    23002469            self.assertTrue(len(res) > 0)
    23012470
    2302             print "Testing that we do find configuration elements in the global catlog, with the configuration basedn"
     2471            # Testing that we do find configuration elements in the global catlog, with the configuration basedn"
    23032472            res = gc_ldb.search(self.configuration_dn, expression="objectClass=crossRef", scope=SCOPE_SUBTREE, attrs=["cn"])
    23042473            self.assertTrue(len(res) > 0)
    23052474
    2306         print "Testing that we can get at the configuration DN on the main LDAP port"
     2475        # Testing that we can get at the configuration DN on the main LDAP port"
    23072476        res = ldb.search(self.configuration_dn, expression="objectClass=crossRef", scope=SCOPE_SUBTREE, attrs=["cn"])
    23082477        self.assertTrue(len(res) > 0)
    23092478
    2310         print "Testing objectCategory canonacolisation"
     2479        # Testing objectCategory canonacolisation"
    23112480        res = ldb.search(self.configuration_dn, expression="objectCategory=ntDsDSA", scope=SCOPE_SUBTREE, attrs=["cn"])
    23122481        self.assertTrue(len(res) > 0, "Didn't find any records with objectCategory=ntDsDSA")
     
    23172486        self.assertTrue(len(res) != 0)
    23182487
    2319         print "Testing objectClass attribute order on "+ self.base_dn
     2488        # Testing objectClass attribute order on "+ self.base_dn
    23202489        res = ldb.search(expression="objectClass=domain", base=self.base_dn,
    23212490                         scope=SCOPE_BASE, attrs=["objectClass"])
     
    23262495    #  check enumeration
    23272496
    2328         print "Testing ldb.search for objectCategory=person"
     2497        # Testing ldb.search for objectCategory=person"
    23292498        res = ldb.search(self.base_dn, expression="objectCategory=person", scope=SCOPE_SUBTREE, attrs=["cn"])
    23302499        self.assertTrue(len(res) > 0)
    23312500
    2332         print "Testing ldb.search for objectCategory=person with domain scope control"
     2501        # Testing ldb.search for objectCategory=person with domain scope control"
    23332502        res = ldb.search(self.base_dn, expression="objectCategory=person", scope=SCOPE_SUBTREE, attrs=["cn"], controls=["domain_scope:1"])
    23342503        self.assertTrue(len(res) > 0)
    23352504
    2336         print "Testing ldb.search for objectCategory=user"
     2505        # Testing ldb.search for objectCategory=user"
    23372506        res = ldb.search(self.base_dn, expression="objectCategory=user", scope=SCOPE_SUBTREE, attrs=["cn"])
    23382507        self.assertTrue(len(res) > 0)
    23392508
    2340         print "Testing ldb.search for objectCategory=user with domain scope control"
     2509        # Testing ldb.search for objectCategory=user with domain scope control"
    23412510        res = ldb.search(self.base_dn, expression="objectCategory=user", scope=SCOPE_SUBTREE, attrs=["cn"], controls=["domain_scope:1"])
    23422511        self.assertTrue(len(res) > 0)
    23432512
    2344         print "Testing ldb.search for objectCategory=group"
     2513        # Testing ldb.search for objectCategory=group"
    23452514        res = ldb.search(self.base_dn, expression="objectCategory=group", scope=SCOPE_SUBTREE, attrs=["cn"])
    23462515        self.assertTrue(len(res) > 0)
    23472516
    2348         print "Testing ldb.search for objectCategory=group with domain scope control"
     2517        # Testing ldb.search for objectCategory=group with domain scope control"
    23492518        res = ldb.search(self.base_dn, expression="objectCategory=group", scope=SCOPE_SUBTREE, attrs=["cn"], controls=["domain_scope:1"])
    23502519        self.assertTrue(len(res) > 0)
    23512520
    2352         print "Testing creating a user with the posixAccount objectClass"
     2521        # Testing creating a user with the posixAccount objectClass"
    23532522        self.ldb.add_ldif("""dn: cn=posixuser,CN=Users,%s
    23542523objectClass: top
     
    23672536description: A POSIX user"""% (self.base_dn))
    23682537
    2369         print "Testing removing the posixAccount objectClass from an existing user"
     2538        # Testing removing the posixAccount objectClass from an existing user"
    23702539        self.ldb.modify_ldif("""dn: cn=posixuser,CN=Users,%s
    23712540changetype: modify
     
    23732542objectClass: posixAccount"""% (self.base_dn))
    23742543
    2375         print "Testing adding the posixAccount objectClass to an existing user"
     2544        # Testing adding the posixAccount objectClass to an existing user"
    23762545        self.ldb.modify_ldif("""dn: cn=posixuser,CN=Users,%s
    23772546changetype: modify
     
    24602629        delete_force(self.ldb, user_dn)
    24612630        try:
    2462             sddl = "O:DUG:DUD:PAI(A;;RPWP;;;AU)S:PAI"
     2631            sddl = "O:DUG:DUD:AI(A;;RPWP;;;AU)S:PAI"
    24632632            desc = security.descriptor.from_sddl(sddl, security.dom_sid('S-1-5-21'))
    24642633            desc_base64 = base64.b64encode( ndr_pack(desc) )
     
    24702639            res = self.ldb.search(base=user_dn, attrs=["nTSecurityDescriptor"])
    24712640            self.assertTrue("nTSecurityDescriptor" in res[0])
     2641            desc = res[0]["nTSecurityDescriptor"][0]
     2642            desc = ndr_unpack(security.descriptor, desc)
     2643            desc_sddl = desc.as_sddl(self.domain_sid)
     2644            self.assertTrue("O:S-1-5-21-513G:S-1-5-21-513D:AI(A;;RPWP;;;AU)" in desc_sddl)
    24722645        finally:
    24732646            delete_force(self.ldb, user_dn)
     
    26322805    def test_dsheuristics(self):
    26332806        """Tests the 'dSHeuristics' attribute"""
    2634         print "Tests the 'dSHeuristics' attribute"""
     2807        # Tests the 'dSHeuristics' attribute"
    26352808
    26362809        # Get the current value to restore it later
    26372810        dsheuristics = self.ldb.get_dsheuristics()
    2638         # Should not be longer than 18 chars?
    2639         try:
    2640             self.ldb.set_dsheuristics("123ABC-+!1asdfg@#^12")
    2641         except LdbError, (num, _):
    2642             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
    2643         # If it is >= 10 chars, tenthChar should be 1
    2644         try:
    2645             self.ldb.set_dsheuristics("00020000000002")
    2646         except LdbError, (num, _):
    2647             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
    2648         # apart from the above, all char values are accepted
    2649         self.ldb.set_dsheuristics("123ABC-+!1asdfg@#^")
    2650         self.assertEquals(self.ldb.get_dsheuristics(), "123ABC-+!1asdfg@#^")
    2651         # restore old value
    2652         self.ldb.set_dsheuristics(dsheuristics)
     2811        # Perform the length checks: for each decade (except the 0th) we need
     2812        # the first index to be the number. This goes till the 9th one, beyond
     2813        # there does not seem to be another limitation.
     2814        try:
     2815            dshstr = ""
     2816            for i in range(1,11):
     2817                # This is in the range
     2818                self.ldb.set_dsheuristics(dshstr + "x")
     2819                self.ldb.set_dsheuristics(dshstr + "xxxxx")
     2820                dshstr = dshstr + "xxxxxxxxx"
     2821                if i < 10:
     2822                    # Not anymore in the range, new decade specifier needed
     2823                    try:
     2824                        self.ldb.set_dsheuristics(dshstr + "x")
     2825                        self.fail()
     2826                    except LdbError, (num, _):
     2827                        self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
     2828                    dshstr = dshstr + str(i)
     2829                else:
     2830                    # There does not seem to be an upper limit
     2831                    self.ldb.set_dsheuristics(dshstr + "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
     2832            # apart from the above, all char values are accepted
     2833            self.ldb.set_dsheuristics("123ABC-+!1asdfg@#^")
     2834            self.assertEquals(self.ldb.get_dsheuristics(), "123ABC-+!1asdfg@#^")
     2835        finally:
     2836            # restore old value
     2837            self.ldb.set_dsheuristics(dsheuristics)
    26532838
    26542839    def test_ldapControlReturn(self):
     
    26562841           really return something"""
    26572842        res = self.ldb.search(attrs=["cn"],
    2658                               controls=["paged_result:1:10"])
     2843                              controls=["paged_results:1:10"])
    26592844        self.assertEquals(len(res.controls), 1)
    26602845        self.assertEquals(res.controls[0].oid, "1.2.840.113556.1.4.319")
     2846        s = str(res.controls[0])
    26612847
    26622848    def test_operational(self):
    26632849        """Tests operational attributes"""
    2664         print "Tests operational attributes"""
     2850        # Tests operational attributes"
    26652851
    26662852        res = self.ldb.search(self.base_dn, scope=SCOPE_BASE,
     
    26752861        self.assertTrue("whenChanged" in res[0])
    26762862
    2677 class BaseDnTests(unittest.TestCase):
     2863    def test_timevalues1(self):
     2864        """Tests possible syntax of time attributes"""
     2865
     2866        user_name = "testtimevaluesuser1"
     2867        user_dn = "CN=%s,CN=Users,%s" % (user_name, self.base_dn)
     2868
     2869        delete_force(self.ldb, user_dn)
     2870        self.ldb.add({ "dn": user_dn,
     2871                       "objectClass": "user",
     2872                       "sAMAccountName": user_name })
     2873
     2874        #
     2875        # We check the following values:
     2876        #
     2877        #   370101000000Z     => 20370101000000.0Z
     2878        # 20370102000000.*Z   => 20370102000000.0Z
     2879        #
     2880        ext = [ "Z", ".0Z", ".Z", ".000Z", ".RandomIgnoredCharacters...987654321Z" ]
     2881        for i in range(0, len(ext)):
     2882            v_raw = "203701%02d000000" % (i + 1)
     2883            if ext[i] == "Z":
     2884                v_set = v_raw[2:] + ext[i]
     2885            else:
     2886                v_set = v_raw + ext[i]
     2887            v_get = v_raw + ".0Z"
     2888
     2889            m = Message()
     2890            m.dn = Dn(ldb, user_dn)
     2891            m["msTSExpireDate"] = MessageElement([v_set],
     2892                                                 FLAG_MOD_REPLACE,
     2893                                                 "msTSExpireDate")
     2894            self.ldb.modify(m)
     2895
     2896            res = self.ldb.search(base=user_dn, scope=SCOPE_BASE, attrs=["msTSExpireDate"])
     2897            self.assertTrue(len(res) == 1)
     2898            self.assertTrue("msTSExpireDate" in res[0])
     2899            self.assertTrue(len(res[0]["msTSExpireDate"]) == 1)
     2900            self.assertEquals(res[0]["msTSExpireDate"][0], v_get)
     2901
     2902class BaseDnTests(samba.tests.TestCase):
    26782903
    26792904    def setUp(self):
     
    27582983        self.assertEquals(int(res[0]["domainFunctionality"][0]), int(res4[0]["msDS-Behavior-Version"][0]))
    27592984
    2760         res5 = self.ldb.search("cn=partitions," + str(ldb.get_config_basedn()), scope=SCOPE_BASE, attrs=["msDS-Behavior-Version"])
     2985        res5 = self.ldb.search("cn=partitions,%s" % ldb.get_config_basedn(), scope=SCOPE_BASE, attrs=["msDS-Behavior-Version"])
    27612986        self.assertEquals(len(res5), 1)
    27622987        self.assertEquals(len(res5[0]["msDS-Behavior-Version"]), 1)
     
    27783003        """Testing the ldap service name in rootDSE"""
    27793004        res = self.ldb.search("", scope=SCOPE_BASE,
    2780                               attrs=["ldapServiceName", "dNSHostName"])
     3005                              attrs=["ldapServiceName", "dnsHostName"])
    27813006        self.assertEquals(len(res), 1)
    2782 
    2783         (hostname, _, dns_domainname) = res[0]["dNSHostName"][0].partition(".")
    2784         self.assertTrue(":%s$@%s" % (hostname, dns_domainname.upper())
    2785                         in res[0]["ldapServiceName"][0])
     3007        self.assertTrue("ldapServiceName" in res[0])
     3008        self.assertTrue("dnsHostName" in res[0])
     3009
     3010        (hostname, _, dns_domainname) = res[0]["dnsHostName"][0].partition(".")
     3011
     3012        given = res[0]["ldapServiceName"][0]
     3013        expected = "%s:%s$@%s" % (dns_domainname.lower(), hostname.lower(), dns_domainname.upper())
     3014        self.assertEquals(given, expected)
    27863015
    27873016if not "://" in host:
     
    27983027    gc_ldb = None
    27993028
    2800 runner = SubunitTestRunner()
    2801 rc = 0
    2802 if not runner.run(unittest.makeSuite(BaseDnTests)).wasSuccessful():
    2803     rc = 1
    2804 if not runner.run(unittest.makeSuite(BasicTests)).wasSuccessful():
    2805     rc = 1
    2806 sys.exit(rc)
     3029TestProgram(module=__name__, opts=subunitopts)
  • vendor/current/source4/dsdb/tests/python/ldap_schema.py

    r740 r988  
    22# -*- coding: utf-8 -*-
    33# This is a port of the original in testprogs/ejs/ldap.js
     4
     5# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2008-2011
     6#
     7# This program is free software; you can redistribute it and/or modify
     8# it under the terms of the GNU General Public License as published by
     9# the Free Software Foundation; either version 3 of the License, or
     10# (at your option) any later version.
     11#
     12# This program is distributed in the hope that it will be useful,
     13# but WITHOUT ANY WARRANTY; without even the implied warranty of
     14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     15# GNU General Public License for more details.
     16#
     17# You should have received a copy of the GNU General Public License
     18# along with this program.  If not, see <http://www.gnu.org/licenses/>.
     19
     20
    421
    522import optparse
     
    1128sys.path.insert(0, "bin/python")
    1229import samba
    13 samba.ensure_external_module("testtools", "testtools")
    14 samba.ensure_external_module("subunit", "subunit/python")
     30from samba.tests.subunitrun import TestProgram, SubunitOptions
    1531
    1632import samba.getopt as options
     
    2642from samba.dsdb import DS_DOMAIN_FUNCTION_2003
    2743from samba.tests import delete_force
    28 
    29 from subunit.run import SubunitTestRunner
    30 import unittest
     44from samba.ndr import ndr_unpack
     45from samba.dcerpc import drsblobs
    3146
    3247parser = optparse.OptionParser("ldap_schema.py [options] <host>")
     
    3752credopts = options.CredentialsOptions(parser)
    3853parser.add_option_group(credopts)
     54subunitopts = SubunitOptions(parser)
     55parser.add_option_group(subunitopts)
    3956opts, args = parser.parse_args()
    4057
     
    4966
    5067
    51 class SchemaTests(unittest.TestCase):
     68class SchemaTests(samba.tests.TestCase):
    5269
    5370    def setUp(self):
    5471        super(SchemaTests, self).setUp()
    55         self.ldb = ldb
    56         self.base_dn = ldb.domain_dn()
    57         self.schema_dn = ldb.get_schema_basedn().get_linearized()
     72        self.ldb = SamDB(host, credentials=creds,
     73            session_info=system_session(lp), lp=lp, options=ldb_options)
     74        self.base_dn = self.ldb.domain_dn()
     75        self.schema_dn = self.ldb.get_schema_basedn().get_linearized()
    5876
    5977    def test_generated_schema(self):
     
    6886    def test_generated_schema_is_operational(self):
    6987        """Testing we don't get the generated schema via LDAP by default"""
     88        # Must keep the "*" form
    7089        res = self.ldb.search("cn=aggregate,"+self.schema_dn, scope=SCOPE_BASE,
    71                 attrs=["*"])
     90                              attrs=["*"])
    7291        self.assertEquals(len(res), 1)
    7392        self.assertFalse("dITContentRules" in res[0])
     
    95114"""
    96115        self.ldb.add_ldif(ldif)
     116        # We must do a schemaUpdateNow otherwise it's not 100% sure that the schema
     117        # will contain the new attribute
     118        ldif = """
     119dn:
     120changetype: modify
     121add: schemaUpdateNow
     122schemaUpdateNow: 1
     123"""
     124        self.ldb.modify_ldif(ldif)
    97125
    98126        # Search for created attribute
    99127        res = []
    100         res = self.ldb.search("cn=%s,%s" % (attr_name, self.schema_dn), scope=SCOPE_BASE, attrs=["*"])
     128        res = self.ldb.search("cn=%s,%s" % (attr_name, self.schema_dn), scope=SCOPE_BASE,
     129                              attrs=["lDAPDisplayName","schemaIDGUID", "msDS-IntID"])
    101130        self.assertEquals(len(res), 1)
    102131        self.assertEquals(res[0]["lDAPDisplayName"][0], attr_ldap_display_name)
    103132        self.assertTrue("schemaIDGUID" in res[0])
     133        if "msDS-IntId" in res[0]:
     134            msDS_IntId = int(res[0]["msDS-IntId"][0])
     135            if msDS_IntId < 0:
     136                msDS_IntId += (1 << 32)
     137        else:
     138            msDS_IntId = None
    104139
    105140        class_name = "test-Class" + time.strftime("%s", time.gmtime())
     
    152187        # Search for created objectclass
    153188        res = []
    154         res = self.ldb.search("cn=%s,%s" % (class_name, self.schema_dn), scope=SCOPE_BASE, attrs=["*"])
     189        res = self.ldb.search("cn=%s,%s" % (class_name, self.schema_dn), scope=SCOPE_BASE,
     190                              attrs=["lDAPDisplayName", "defaultObjectCategory", "schemaIDGUID", "distinguishedName"])
    155191        self.assertEquals(len(res), 1)
    156192        self.assertEquals(res[0]["lDAPDisplayName"][0], class_ldap_display_name)
     
    184220
    185221        # Search for created object
    186         res = []
    187         res = self.ldb.search("cn=%s,cn=Users,%s" % (object_name, self.base_dn), scope=SCOPE_BASE, attrs=["*"])
    188         self.assertEquals(len(res), 1)
     222        obj_res = self.ldb.search("cn=%s,cn=Users,%s" % (object_name, self.base_dn), scope=SCOPE_BASE, attrs=["replPropertyMetaData"])
     223
     224        self.assertEquals(len(obj_res), 1)
     225        self.assertTrue("replPropertyMetaData" in obj_res[0])
     226        val = obj_res[0]["replPropertyMetaData"][0]
     227        repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, str(val))
     228        obj = repl.ctr
     229
     230        # Windows 2000 functional level won't have this.  It is too
     231        # hard to work it out from the prefixmap however, so we skip
     232        # this test in that case.
     233        if msDS_IntId is not None:
     234            found = False
     235            for o in repl.ctr.array:
     236                if o.attid == msDS_IntId:
     237                    found = True
     238                    break
     239            self.assertTrue(found, "Did not find 0x%08x in replPropertyMetaData" % msDS_IntId)
    189240        # Delete the object
    190241        delete_force(self.ldb, "cn=%s,cn=Users,%s" % (object_name, self.base_dn))
     
    215266        # Search for created objectclass
    216267        res = []
    217         res = self.ldb.search("cn=%s,%s" % (class_name, self.schema_dn), scope=SCOPE_BASE, attrs=["*"])
     268        res = self.ldb.search("cn=%s,%s" % (class_name, self.schema_dn), scope=SCOPE_BASE,
     269                              attrs=["lDAPDisplayName", "defaultObjectCategory",
     270                                     "schemaIDGUID", "distinguishedName"])
    218271        self.assertEquals(len(res), 1)
    219272        self.assertEquals(res[0]["lDAPDisplayName"][0], class_ldap_display_name)
     
    241294        # Search for created object
    242295        res = []
    243         res = self.ldb.search("ou=%s,%s" % (object_name, self.base_dn), scope=SCOPE_BASE, attrs=["*"])
     296        res = self.ldb.search("ou=%s,%s" % (object_name, self.base_dn), scope=SCOPE_BASE, attrs=["dn"])
    244297        self.assertEquals(len(res), 1)
    245298        # Delete the object
     
    247300
    248301
    249 class SchemaTests_msDS_IntId(unittest.TestCase):
     302class SchemaTests_msDS_IntId(samba.tests.TestCase):
    250303
    251304    def setUp(self):
    252305        super(SchemaTests_msDS_IntId, self).setUp()
    253         self.ldb = ldb
    254         res = ldb.search(base="", expression="", scope=SCOPE_BASE, attrs=["*"])
     306        self.ldb = SamDB(host, credentials=creds,
     307            session_info=system_session(lp), lp=lp, options=ldb_options)
     308        res = self.ldb.search(base="", expression="", scope=SCOPE_BASE,
     309                         attrs=["schemaNamingContext", "defaultNamingContext",
     310                                "forestFunctionality"])
    255311        self.assertEquals(len(res), 1)
    256312        self.schema_dn = res[0]["schemaNamingContext"][0]
     
    331387        # Search for created attribute
    332388        res = []
    333         res = self.ldb.search(attr_dn, scope=SCOPE_BASE, attrs=["*"])
     389        res = self.ldb.search(attr_dn, scope=SCOPE_BASE,
     390                              attrs=["lDAPDisplayName", "msDS-IntId", "systemFlags"])
    334391        self.assertEquals(len(res), 1)
    335392        self.assertEquals(res[0]["lDAPDisplayName"][0], attr_ldap_name)
     
    373430        # Search for created attribute
    374431        res = []
    375         res = self.ldb.search(attr_dn, scope=SCOPE_BASE, attrs=["*"])
     432        res = self.ldb.search(attr_dn, scope=SCOPE_BASE,
     433                              attrs=["lDAPDisplayName", "msDS-IntId"])
    376434        self.assertEquals(len(res), 1)
    377435        self.assertEquals(res[0]["lDAPDisplayName"][0], attr_ldap_name)
     
    428486        self._ldap_schemaUpdateNow()
    429487
    430         res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=["*"])
     488        res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=["msDS-IntId"])
    431489        self.assertEquals(len(res), 1)
    432490        self.assertEquals(res[0]["msDS-IntId"][0], "-1993108831")
     
    440498
    441499        # Search for created Class
    442         res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=["*"])
     500        res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=["msDS-IntId"])
    443501        self.assertEquals(len(res), 1)
    444502        self.assertFalse("msDS-IntId" in res[0])
     
    465523        self.ldb.add_ldif(ldif_add)
    466524
    467         res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=["*"])
     525        res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=["msDS-IntId"])
    468526        self.assertEquals(len(res), 1)
    469527        self.assertEquals(res[0]["msDS-IntId"][0], "-1993108831")
     
    478536
    479537        # Search for created Class
    480         res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=["*"])
     538        res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=["msDS-IntId"])
    481539        self.assertEquals(len(res), 1)
    482540        self.assertFalse("msDS-IntId" in res[0])
     
    490548        except LdbError, (num, _):
    491549            self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
    492         res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=["*"])
     550        res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=["msDS-IntId"])
    493551        self.assertEquals(len(res), 1)
    494552        self.assertFalse("msDS-IntId" in res[0])
     
    519577
    520578
    521 class SchemaTests_msDS_isRODC(unittest.TestCase):
     579class SchemaTests_msDS_isRODC(samba.tests.TestCase):
    522580
    523581    def setUp(self):
    524582        super(SchemaTests_msDS_isRODC, self).setUp()
    525         self.ldb = ldb
    526         res = ldb.search(base="", expression="", scope=SCOPE_BASE, attrs=["*"])
     583        self.ldb =  SamDB(host, credentials=creds,
     584            session_info=system_session(lp), lp=lp, options=ldb_options)
     585        res = self.ldb.search(base="", expression="", scope=SCOPE_BASE, attrs=["defaultNamingContext"])
    527586        self.assertEquals(len(res), 1)
    528587        self.base_dn = res[0]["defaultNamingContext"][0]
     
    568627    ldb_options = ["modules:paged_searches"]
    569628
    570 ldb = SamDB(host, credentials=creds, session_info=system_session(lp), lp=lp, options=ldb_options)
    571 
    572 runner = SubunitTestRunner()
    573 rc = 0
    574 if not runner.run(unittest.makeSuite(SchemaTests)).wasSuccessful():
    575     rc = 1
    576 if not runner.run(unittest.makeSuite(SchemaTests_msDS_IntId)).wasSuccessful():
    577     rc = 1
    578 if not runner.run(unittest.makeSuite(SchemaTests_msDS_isRODC)).wasSuccessful():
    579     rc = 1
    580 
    581 sys.exit(rc)
     629TestProgram(module=__name__, opts=subunitopts)
  • vendor/current/source4/dsdb/tests/python/ldap_syntaxes.py

    r740 r988  
    1111sys.path.insert(0, "bin/python")
    1212import samba
    13 samba.ensure_external_module("testtools", "testtools")
    14 samba.ensure_external_module("subunit", "subunit/python")
     13
     14from samba.tests.subunitrun import SubunitOptions, TestProgram
    1515
    1616import samba.getopt as options
     
    2121from ldb import ERR_INVALID_ATTRIBUTE_SYNTAX
    2222from ldb import ERR_ENTRY_ALREADY_EXISTS
    23 
    24 from subunit.run import SubunitTestRunner
    25 import unittest
    2623
    2724import samba.tests
     
    3431credopts = options.CredentialsOptions(parser)
    3532parser.add_option_group(credopts)
     33subunitopts = SubunitOptions(parser)
     34parser.add_option_group(subunitopts)
    3635opts, args = parser.parse_args()
    3736
     
    4544
    4645
    47 class SyntaxTests(unittest.TestCase):
     46class SyntaxTests(samba.tests.TestCase):
    4847
    4948    def setUp(self):
    5049        super(SyntaxTests, self).setUp()
    51         self.ldb = ldb
    52         self.base_dn = ldb.domain_dn()
    53         self.schema_dn = ldb.get_schema_basedn().get_linearized()
     50        self.ldb = samba.tests.connect_samdb(host, credentials=creds,
     51            session_info=system_session(lp), lp=lp)
     52        self.base_dn = self.ldb.domain_dn()
     53        self.schema_dn = self.ldb.get_schema_basedn().get_linearized()
    5454        self._setup_dn_string_test()
    5555        self._setup_dn_binary_test()
     
    193193
    194194    def test_dn_string(self):
    195         # add obeject with correct value
     195        # add object with correct value
    196196        object_name1 = "obj-DN-String1" + time.strftime("%s", time.gmtime())
    197197        ldif = self._get_object_ldif(object_name1, self.dn_string_class_name, self.dn_string_class_ldap_display_name,
     
    200200
    201201        # search by specifying the DN part only
    202         res = ldb.search(base=self.base_dn,
     202        res = self.ldb.search(base=self.base_dn,
    203203                         scope=SCOPE_SUBTREE,
    204204                         expression="(%s=%s)" % (self.dn_string_attribute, self.base_dn))
     
    206206
    207207        # search by specifying the string part only
    208         res = ldb.search(base=self.base_dn,
     208        res = self.ldb.search(base=self.base_dn,
    209209                         scope=SCOPE_SUBTREE,
    210210                         expression="(%s=S:5:ABCDE)" % self.dn_string_attribute)
     
    212212
    213213        # search by DN+Stirng
    214         res = ldb.search(base=self.base_dn,
     214        res = self.ldb.search(base=self.base_dn,
    215215                         scope=SCOPE_SUBTREE,
    216216                         expression="(%s=S:5:ABCDE:%s)" % (self.dn_string_attribute, self.base_dn))
     
    284284        except LdbError, (num, _):
    285285            self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
    286         pass
    287286
    288287    def test_dn_binary(self):
     
    294293
    295294        # search by specifyingthe DN part
    296         res = ldb.search(base=self.base_dn,
     295        res = self.ldb.search(base=self.base_dn,
    297296                         scope=SCOPE_SUBTREE,
    298297                         expression="(%s=%s)" % (self.dn_binary_attribute, self.base_dn))
     
    300299
    301300        # search by specifying the binary part
    302         res = ldb.search(base=self.base_dn,
     301        res = self.ldb.search(base=self.base_dn,
    303302                         scope=SCOPE_SUBTREE,
    304303                         expression="(%s=B:4:1234)" % self.dn_binary_attribute)
     
    306305
    307306        # search by DN+Binary
    308         res = ldb.search(base=self.base_dn,
     307        res = self.ldb.search(base=self.base_dn,
    309308                         scope=SCOPE_SUBTREE,
    310309                         expression="(%s=B:4:1234:%s)" % (self.dn_binary_attribute, self.base_dn))
     
    370369        except LdbError, (num, _):
    371370            self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
    372         pass
    373 
    374 ldb = samba.tests.connect_samdb(host, credentials=creds, session_info=system_session(lp), lp=lp)
    375 runner = SubunitTestRunner()
    376 rc = 0
    377 if not runner.run(unittest.makeSuite(SyntaxTests)).wasSuccessful():
    378     rc = 1
    379 
    380 sys.exit(rc)
     371
     372TestProgram(module=__name__, opts=subunitopts)
  • vendor/current/source4/dsdb/tests/python/passwords.py

    r740 r988  
    1717sys.path.insert(0, "bin/python")
    1818import samba
    19 samba.ensure_external_module("testtools", "testtools")
    20 samba.ensure_external_module("subunit", "subunit/python")
     19
     20from samba.tests.subunitrun import SubunitOptions, TestProgram
    2121
    2222import samba.getopt as options
     
    3535import samba.tests
    3636from samba.tests import delete_force
    37 from subunit.run import SubunitTestRunner
    38 import unittest
    3937
    4038parser = optparse.OptionParser("passwords.py [options] <host>")
     
    4543credopts = options.CredentialsOptions(parser)
    4644parser.add_option_group(credopts)
     45subunitopts = SubunitOptions(parser)
     46parser.add_option_group(subunitopts)
     47
    4748opts, args = parser.parse_args()
    4849
     
    6768    def setUp(self):
    6869        super(PasswordTests, self).setUp()
    69         self.ldb = ldb
    70         self.base_dn = ldb.domain_dn()
     70        self.ldb = SamDB(url=host, session_info=system_session(lp), credentials=creds, lp=lp)
     71
     72        # Gets back the basedn
     73        base_dn = self.ldb.domain_dn()
     74
     75        # Gets back the configuration basedn
     76        configuration_dn = self.ldb.get_config_basedn().get_linearized()
     77
     78        # Get the old "dSHeuristics" if it was set
     79        dsheuristics = self.ldb.get_dsheuristics()
     80
     81        # Set the "dSHeuristics" to activate the correct "userPassword" behaviour
     82        self.ldb.set_dsheuristics("000000001")
     83
     84        # Reset the "dSHeuristics" as they were before
     85        self.addCleanup(self.ldb.set_dsheuristics, dsheuristics)
     86
     87        # Get the old "minPwdAge"
     88        minPwdAge = self.ldb.get_minPwdAge()
     89
     90        # Set it temporarely to "0"
     91        self.ldb.set_minPwdAge("0")
     92        self.base_dn = self.ldb.domain_dn()
     93
     94        # Reset the "minPwdAge" as it was before
     95        self.addCleanup(self.ldb.set_minPwdAge, minPwdAge)
    7196
    7297        # (Re)adds the test user "testuser" with no password atm
     
    137162
    138163    def test_unicodePwd_hash_set(self):
    139         print "Performs a password hash set operation on 'unicodePwd' which should be prevented"
     164        """Performs a password hash set operation on 'unicodePwd' which should be prevented"""
    140165        # Notice: Direct hash password sets should never work
    141166
    142167        m = Message()
    143         m.dn = Dn(ldb, "cn=testuser,cn=users," + self.base_dn)
     168        m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn)
    144169        m["unicodePwd"] = MessageElement("XXXXXXXXXXXXXXXX", FLAG_MOD_REPLACE,
    145170          "unicodePwd")
    146171        try:
    147             ldb.modify(m)
     172            self.ldb.modify(m)
    148173            self.fail()
    149174        except LdbError, (num, _):
     
    151176
    152177    def test_unicodePwd_hash_change(self):
    153         print "Performs a password hash change operation on 'unicodePwd' which should be prevented"
     178        """Performs a password hash change operation on 'unicodePwd' which should be prevented"""
    154179        # Notice: Direct hash password changes should never work
    155180
     
    169194
    170195    def test_unicodePwd_clear_set(self):
    171         print "Performs a password cleartext set operation on 'unicodePwd'"
    172 
    173         m = Message()
    174         m.dn = Dn(ldb, "cn=testuser,cn=users," + self.base_dn)
     196        """Performs a password cleartext set operation on 'unicodePwd'"""
     197
     198        m = Message()
     199        m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn)
    175200        m["unicodePwd"] = MessageElement("\"thatsAcomplPASS2\"".encode('utf-16-le'),
    176201          FLAG_MOD_REPLACE, "unicodePwd")
    177         ldb.modify(m)
     202        self.ldb.modify(m)
    178203
    179204    def test_unicodePwd_clear_change(self):
    180         print "Performs a password cleartext change operation on 'unicodePwd'"
     205        """Performs a password cleartext change operation on 'unicodePwd'"""
    181206
    182207        self.ldb2.modify_ldif("""
     
    220245
    221246    def test_dBCSPwd_hash_set(self):
    222         print "Performs a password hash set operation on 'dBCSPwd' which should be prevented"
     247        """Performs a password hash set operation on 'dBCSPwd' which should be prevented"""
    223248        # Notice: Direct hash password sets should never work
    224249
    225250        m = Message()
    226         m.dn = Dn(ldb, "cn=testuser,cn=users," + self.base_dn)
     251        m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn)
    227252        m["dBCSPwd"] = MessageElement("XXXXXXXXXXXXXXXX", FLAG_MOD_REPLACE,
    228253          "dBCSPwd")
    229254        try:
    230             ldb.modify(m)
     255            self.ldb.modify(m)
    231256            self.fail()
    232257        except LdbError, (num, _):
     
    234259
    235260    def test_dBCSPwd_hash_change(self):
    236         print "Performs a password hash change operation on 'dBCSPwd' which should be prevented"
     261        """Performs a password hash change operation on 'dBCSPwd' which should be prevented"""
    237262        # Notice: Direct hash password changes should never work
    238263
     
    251276
    252277    def test_userPassword_clear_set(self):
    253         print "Performs a password cleartext set operation on 'userPassword'"
     278        """Performs a password cleartext set operation on 'userPassword'"""
    254279        # Notice: This works only against Windows if "dSHeuristics" has been set
    255280        # properly
    256281
    257282        m = Message()
    258         m.dn = Dn(ldb, "cn=testuser,cn=users," + self.base_dn)
     283        m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn)
    259284        m["userPassword"] = MessageElement("thatsAcomplPASS2", FLAG_MOD_REPLACE,
    260285          "userPassword")
    261         ldb.modify(m)
     286        self.ldb.modify(m)
    262287
    263288    def test_userPassword_clear_change(self):
    264         print "Performs a password cleartext change operation on 'userPassword'"
     289        """Performs a password cleartext change operation on 'userPassword'"""
    265290        # Notice: This works only against Windows if "dSHeuristics" has been set
    266291        # properly
     
    306331
    307332    def test_clearTextPassword_clear_set(self):
    308         print "Performs a password cleartext set operation on 'clearTextPassword'"
     333        """Performs a password cleartext set operation on 'clearTextPassword'"""
    309334        # Notice: This never works against Windows - only supported by us
    310335
    311336        try:
    312337            m = Message()
    313             m.dn = Dn(ldb, "cn=testuser,cn=users," + self.base_dn)
     338            m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn)
    314339            m["clearTextPassword"] = MessageElement("thatsAcomplPASS2".encode('utf-16-le'),
    315340              FLAG_MOD_REPLACE, "clearTextPassword")
    316             ldb.modify(m)
     341            self.ldb.modify(m)
    317342            # this passes against s4
    318343        except LdbError, (num, msg):
     
    322347
    323348    def test_clearTextPassword_clear_change(self):
    324         print "Performs a password cleartext change operation on 'clearTextPassword'"
     349        """Performs a password cleartext change operation on 'clearTextPassword'"""
    325350        # Notice: This never works against Windows - only supported by us
    326351
     
    375400
    376401    def test_failures(self):
    377         print "Performs some failure testing"
    378 
    379         try:
    380             ldb.modify_ldif("""
    381 dn: cn=testuser,cn=users,""" + self.base_dn + """
    382 changetype: modify
    383 delete: userPassword
    384 userPassword: thatsAcomplPASS1
    385 """)
    386             self.fail()
    387         except LdbError, (num, _):
    388             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
    389 
    390         try:
    391             self.ldb2.modify_ldif("""
    392 dn: cn=testuser,cn=users,""" + self.base_dn + """
    393 changetype: modify
    394 delete: userPassword
    395 userPassword: thatsAcomplPASS1
    396 """)
    397             self.fail()
    398         except LdbError, (num, _):
    399             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
    400 
    401         try:
    402             ldb.modify_ldif("""
    403 dn: cn=testuser,cn=users,""" + self.base_dn + """
    404 changetype: modify
    405 delete: userPassword
    406 """)
    407             self.fail()
    408         except LdbError, (num, _):
    409             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
    410 
    411         try:
    412             self.ldb2.modify_ldif("""
    413 dn: cn=testuser,cn=users,""" + self.base_dn + """
    414 changetype: modify
    415 delete: userPassword
    416 """)
    417             self.fail()
    418         except LdbError, (num, _):
    419             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
    420 
    421         try:
    422             ldb.modify_ldif("""
     402        """Performs some failure testing"""
     403
     404        try:
     405            self.ldb.modify_ldif("""
     406dn: cn=testuser,cn=users,""" + self.base_dn + """
     407changetype: modify
     408delete: userPassword
     409userPassword: thatsAcomplPASS1
     410""")
     411            self.fail()
     412        except LdbError, (num, _):
     413            self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
     414
     415        try:
     416            self.ldb2.modify_ldif("""
     417dn: cn=testuser,cn=users,""" + self.base_dn + """
     418changetype: modify
     419delete: userPassword
     420userPassword: thatsAcomplPASS1
     421""")
     422            self.fail()
     423        except LdbError, (num, _):
     424            self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
     425
     426        try:
     427            self.ldb.modify_ldif("""
     428dn: cn=testuser,cn=users,""" + self.base_dn + """
     429changetype: modify
     430delete: userPassword
     431""")
     432            self.fail()
     433        except LdbError, (num, _):
     434            self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
     435
     436        try:
     437            self.ldb2.modify_ldif("""
     438dn: cn=testuser,cn=users,""" + self.base_dn + """
     439changetype: modify
     440delete: userPassword
     441""")
     442            self.fail()
     443        except LdbError, (num, _):
     444            self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
     445
     446        try:
     447            self.ldb.modify_ldif("""
    423448dn: cn=testuser,cn=users,""" + self.base_dn + """
    424449changetype: modify
     
    442467
    443468        try:
    444             ldb.modify_ldif("""
    445 dn: cn=testuser,cn=users,""" + self.base_dn + """
    446 changetype: modify
    447 delete: userPassword
    448 userPassword: thatsAcomplPASS1
    449 add: userPassword
    450 userPassword: thatsAcomplPASS2
    451 userPassword: thatsAcomplPASS2
    452 """)
    453             self.fail()
    454         except LdbError, (num, _):
    455             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
    456 
    457         try:
    458             self.ldb2.modify_ldif("""
    459 dn: cn=testuser,cn=users,""" + self.base_dn + """
    460 changetype: modify
    461 delete: userPassword
    462 userPassword: thatsAcomplPASS1
    463 add: userPassword
    464 userPassword: thatsAcomplPASS2
    465 userPassword: thatsAcomplPASS2
    466 """)
    467             self.fail()
    468         except LdbError, (num, _):
    469             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
    470 
    471         try:
    472             ldb.modify_ldif("""
    473 dn: cn=testuser,cn=users,""" + self.base_dn + """
    474 changetype: modify
    475 delete: userPassword
    476 userPassword: thatsAcomplPASS1
    477 userPassword: thatsAcomplPASS1
    478 add: userPassword
    479 userPassword: thatsAcomplPASS2
    480 """)
    481             self.fail()
    482         except LdbError, (num, _):
    483             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
    484 
    485         try:
    486             self.ldb2.modify_ldif("""
    487 dn: cn=testuser,cn=users,""" + self.base_dn + """
    488 changetype: modify
    489 delete: userPassword
    490 userPassword: thatsAcomplPASS1
    491 userPassword: thatsAcomplPASS1
    492 add: userPassword
    493 userPassword: thatsAcomplPASS2
    494 """)
    495             self.fail()
    496         except LdbError, (num, _):
    497             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
    498 
    499         try:
    500             ldb.modify_ldif("""
     469            self.ldb.modify_ldif("""
     470dn: cn=testuser,cn=users,""" + self.base_dn + """
     471changetype: modify
     472delete: userPassword
     473userPassword: thatsAcomplPASS1
     474add: userPassword
     475userPassword: thatsAcomplPASS2
     476userPassword: thatsAcomplPASS2
     477""")
     478            self.fail()
     479        except LdbError, (num, _):
     480            self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
     481
     482        try:
     483            self.ldb2.modify_ldif("""
     484dn: cn=testuser,cn=users,""" + self.base_dn + """
     485changetype: modify
     486delete: userPassword
     487userPassword: thatsAcomplPASS1
     488add: userPassword
     489userPassword: thatsAcomplPASS2
     490userPassword: thatsAcomplPASS2
     491""")
     492            self.fail()
     493        except LdbError, (num, _):
     494            self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
     495
     496        try:
     497            self.ldb.modify_ldif("""
     498dn: cn=testuser,cn=users,""" + self.base_dn + """
     499changetype: modify
     500delete: userPassword
     501userPassword: thatsAcomplPASS1
     502userPassword: thatsAcomplPASS1
     503add: userPassword
     504userPassword: thatsAcomplPASS2
     505""")
     506            self.fail()
     507        except LdbError, (num, _):
     508            self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
     509
     510        try:
     511            self.ldb2.modify_ldif("""
     512dn: cn=testuser,cn=users,""" + self.base_dn + """
     513changetype: modify
     514delete: userPassword
     515userPassword: thatsAcomplPASS1
     516userPassword: thatsAcomplPASS1
     517add: userPassword
     518userPassword: thatsAcomplPASS2
     519""")
     520            self.fail()
     521        except LdbError, (num, _):
     522            self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
     523
     524        try:
     525            self.ldb.modify_ldif("""
    501526dn: cn=testuser,cn=users,""" + self.base_dn + """
    502527changetype: modify
     
    528553
    529554        try:
    530             ldb.modify_ldif("""
     555            self.ldb.modify_ldif("""
    531556dn: cn=testuser,cn=users,""" + self.base_dn + """
    532557changetype: modify
     
    558583
    559584        try:
    560             ldb.modify_ldif("""
     585            self.ldb.modify_ldif("""
    561586dn: cn=testuser,cn=users,""" + self.base_dn + """
    562587changetype: modify
     
    624649
    625650        # Several password changes at once are allowed
    626         ldb.modify_ldif("""
     651        self.ldb.modify_ldif("""
    627652dn: cn=testuser,cn=users,""" + self.base_dn + """
    628653changetype: modify
     
    633658
    634659        # Several password changes at once are allowed
    635         ldb.modify_ldif("""
     660        self.ldb.modify_ldif("""
    636661dn: cn=testuser,cn=users,""" + self.base_dn + """
    637662changetype: modify
     
    702727
    703728        m = Message()
    704         m.dn = Dn(ldb, "cn=testuser,cn=users," + self.base_dn)
     729        m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn)
    705730        m["unicodePwd"] = MessageElement([], FLAG_MOD_ADD, "unicodePwd")
    706731        try:
    707             ldb.modify(m)
    708             self.fail()
    709         except LdbError, (num, _):
    710             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
    711 
    712         m = Message()
    713         m.dn = Dn(ldb, "cn=testuser,cn=users," + self.base_dn)
     732            self.ldb.modify(m)
     733            self.fail()
     734        except LdbError, (num, _):
     735            self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
     736
     737        m = Message()
     738        m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn)
    714739        m["dBCSPwd"] = MessageElement([], FLAG_MOD_ADD, "dBCSPwd")
    715740        try:
    716             ldb.modify(m)
    717             self.fail()
    718         except LdbError, (num, _):
    719             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
    720 
    721         m = Message()
    722         m.dn = Dn(ldb, "cn=testuser,cn=users," + self.base_dn)
     741            self.ldb.modify(m)
     742            self.fail()
     743        except LdbError, (num, _):
     744            self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
     745
     746        m = Message()
     747        m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn)
    723748        m["userPassword"] = MessageElement([], FLAG_MOD_ADD, "userPassword")
    724749        try:
    725             ldb.modify(m)
    726             self.fail()
    727         except LdbError, (num, _):
    728             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
    729 
    730         m = Message()
    731         m.dn = Dn(ldb, "cn=testuser,cn=users," + self.base_dn)
     750            self.ldb.modify(m)
     751            self.fail()
     752        except LdbError, (num, _):
     753            self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
     754
     755        m = Message()
     756        m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn)
    732757        m["clearTextPassword"] = MessageElement([], FLAG_MOD_ADD, "clearTextPassword")
    733758        try:
    734             ldb.modify(m)
     759            self.ldb.modify(m)
    735760            self.fail()
    736761        except LdbError, (num, _):
     
    739764
    740765        m = Message()
    741         m.dn = Dn(ldb, "cn=testuser,cn=users," + self.base_dn)
     766        m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn)
    742767        m["unicodePwd"] = MessageElement([], FLAG_MOD_REPLACE, "unicodePwd")
    743768        try:
    744             ldb.modify(m)
    745             self.fail()
    746         except LdbError, (num, _):
    747             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
    748 
    749         m = Message()
    750         m.dn = Dn(ldb, "cn=testuser,cn=users," + self.base_dn)
     769            self.ldb.modify(m)
     770            self.fail()
     771        except LdbError, (num, _):
     772            self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
     773
     774        m = Message()
     775        m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn)
    751776        m["dBCSPwd"] = MessageElement([], FLAG_MOD_REPLACE, "dBCSPwd")
    752777        try:
    753             ldb.modify(m)
    754             self.fail()
    755         except LdbError, (num, _):
    756             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
    757 
    758         m = Message()
    759         m.dn = Dn(ldb, "cn=testuser,cn=users," + self.base_dn)
     778            self.ldb.modify(m)
     779            self.fail()
     780        except LdbError, (num, _):
     781            self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
     782
     783        m = Message()
     784        m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn)
    760785        m["userPassword"] = MessageElement([], FLAG_MOD_REPLACE, "userPassword")
    761786        try:
    762             ldb.modify(m)
    763             self.fail()
    764         except LdbError, (num, _):
    765             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
    766 
    767         m = Message()
    768         m.dn = Dn(ldb, "cn=testuser,cn=users," + self.base_dn)
     787            self.ldb.modify(m)
     788            self.fail()
     789        except LdbError, (num, _):
     790            self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
     791
     792        m = Message()
     793        m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn)
    769794        m["clearTextPassword"] = MessageElement([], FLAG_MOD_REPLACE, "clearTextPassword")
    770795        try:
    771             ldb.modify(m)
     796            self.ldb.modify(m)
    772797            self.fail()
    773798        except LdbError, (num, _):
     
    776801
    777802        m = Message()
    778         m.dn = Dn(ldb, "cn=testuser,cn=users," + self.base_dn)
     803        m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn)
    779804        m["unicodePwd"] = MessageElement([], FLAG_MOD_DELETE, "unicodePwd")
    780805        try:
    781             ldb.modify(m)
    782             self.fail()
    783         except LdbError, (num, _):
    784             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
    785 
    786         m = Message()
    787         m.dn = Dn(ldb, "cn=testuser,cn=users," + self.base_dn)
     806            self.ldb.modify(m)
     807            self.fail()
     808        except LdbError, (num, _):
     809            self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
     810
     811        m = Message()
     812        m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn)
    788813        m["dBCSPwd"] = MessageElement([], FLAG_MOD_DELETE, "dBCSPwd")
    789814        try:
    790             ldb.modify(m)
    791             self.fail()
    792         except LdbError, (num, _):
    793             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
    794 
    795         m = Message()
    796         m.dn = Dn(ldb, "cn=testuser,cn=users," + self.base_dn)
     815            self.ldb.modify(m)
     816            self.fail()
     817        except LdbError, (num, _):
     818            self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
     819
     820        m = Message()
     821        m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn)
    797822        m["userPassword"] = MessageElement([], FLAG_MOD_DELETE, "userPassword")
    798823        try:
    799             ldb.modify(m)
    800             self.fail()
    801         except LdbError, (num, _):
    802             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
    803 
    804         m = Message()
    805         m.dn = Dn(ldb, "cn=testuser,cn=users," + self.base_dn)
     824            self.ldb.modify(m)
     825            self.fail()
     826        except LdbError, (num, _):
     827            self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
     828
     829        m = Message()
     830        m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn)
    806831        m["clearTextPassword"] = MessageElement([], FLAG_MOD_DELETE, "clearTextPassword")
    807832        try:
    808             ldb.modify(m)
     833            self.ldb.modify(m)
    809834            self.fail()
    810835        except LdbError, (num, _):
     
    816841
    817842        # Delete the "dSHeuristics"
    818         ldb.set_dsheuristics(None)
     843        self.ldb.set_dsheuristics(None)
    819844
    820845        time.sleep(1) # This switching time is strictly needed!
    821846
    822847        m = Message()
    823         m.dn = Dn(ldb, "cn=testuser,cn=users," + self.base_dn)
     848        m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn)
    824849        m["userPassword"] = MessageElement("myPassword", FLAG_MOD_ADD,
    825850          "userPassword")
    826         ldb.modify(m)
    827 
    828         res = ldb.search("cn=testuser,cn=users," + self.base_dn,
     851        self.ldb.modify(m)
     852
     853        res = self.ldb.search("cn=testuser,cn=users," + self.base_dn,
    829854                         scope=SCOPE_BASE, attrs=["userPassword"])
    830855        self.assertTrue(len(res) == 1)
     
    833858
    834859        m = Message()
    835         m.dn = Dn(ldb, "cn=testuser,cn=users," + self.base_dn)
     860        m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn)
    836861        m["userPassword"] = MessageElement("myPassword2", FLAG_MOD_REPLACE,
    837862          "userPassword")
    838         ldb.modify(m)
    839 
    840         res = ldb.search("cn=testuser,cn=users," + self.base_dn,
     863        self.ldb.modify(m)
     864
     865        res = self.ldb.search("cn=testuser,cn=users," + self.base_dn,
    841866                         scope=SCOPE_BASE, attrs=["userPassword"])
    842867        self.assertTrue(len(res) == 1)
     
    845870
    846871        m = Message()
    847         m.dn = Dn(ldb, "cn=testuser,cn=users," + self.base_dn)
     872        m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn)
    848873        m["userPassword"] = MessageElement([], FLAG_MOD_DELETE,
    849874          "userPassword")
    850         ldb.modify(m)
    851 
    852         res = ldb.search("cn=testuser,cn=users," + self.base_dn,
     875        self.ldb.modify(m)
     876
     877        res = self.ldb.search("cn=testuser,cn=users," + self.base_dn,
    853878                         scope=SCOPE_BASE, attrs=["userPassword"])
    854879        self.assertTrue(len(res) == 1)
     
    856881
    857882        # Set the test "dSHeuristics" to deactivate "userPassword" pwd changes
    858         ldb.set_dsheuristics("000000000")
    859 
    860         m = Message()
    861         m.dn = Dn(ldb, "cn=testuser,cn=users," + self.base_dn)
     883        self.ldb.set_dsheuristics("000000000")
     884
     885        m = Message()
     886        m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn)
    862887        m["userPassword"] = MessageElement("myPassword3", FLAG_MOD_REPLACE,
    863888          "userPassword")
    864         ldb.modify(m)
    865 
    866         res = ldb.search("cn=testuser,cn=users," + self.base_dn,
     889        self.ldb.modify(m)
     890
     891        res = self.ldb.search("cn=testuser,cn=users," + self.base_dn,
    867892                         scope=SCOPE_BASE, attrs=["userPassword"])
    868893        self.assertTrue(len(res) == 1)
     
    871896
    872897        # Set the test "dSHeuristics" to deactivate "userPassword" pwd changes
    873         ldb.set_dsheuristics("000000002")
    874 
    875         m = Message()
    876         m.dn = Dn(ldb, "cn=testuser,cn=users," + self.base_dn)
     898        self.ldb.set_dsheuristics("000000002")
     899
     900        m = Message()
     901        m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn)
    877902        m["userPassword"] = MessageElement("myPassword4", FLAG_MOD_REPLACE,
    878903          "userPassword")
    879         ldb.modify(m)
    880 
    881         res = ldb.search("cn=testuser,cn=users," + self.base_dn,
     904        self.ldb.modify(m)
     905
     906        res = self.ldb.search("cn=testuser,cn=users," + self.base_dn,
    882907                         scope=SCOPE_BASE, attrs=["userPassword"])
    883908        self.assertTrue(len(res) == 1)
     
    886911
    887912        # Reset the test "dSHeuristics" (reactivate "userPassword" pwd changes)
    888         ldb.set_dsheuristics("000000001")
     913        self.ldb.set_dsheuristics("000000001")
    889914
    890915    def test_zero_length(self):
    891916        # Get the old "minPwdLength"
    892         minPwdLength = ldb.get_minPwdLength()
     917        minPwdLength = self.ldb.get_minPwdLength()
    893918        # Set it temporarely to "0"
    894         ldb.set_minPwdLength("0")
     919        self.ldb.set_minPwdLength("0")
    895920
    896921        # Get the old "pwdProperties"
    897         pwdProperties = ldb.get_pwdProperties()
     922        pwdProperties = self.ldb.get_pwdProperties()
    898923        # Set them temporarely to "0" (to deactivate eventually the complexity)
    899         ldb.set_pwdProperties("0")
    900 
    901         ldb.setpassword("(sAMAccountName=testuser)", "")
     924        self.ldb.set_pwdProperties("0")
     925
     926        self.ldb.setpassword("(sAMAccountName=testuser)", "")
    902927
    903928        # Reset the "pwdProperties" as they were before
    904         ldb.set_pwdProperties(pwdProperties)
     929        self.ldb.set_pwdProperties(pwdProperties)
    905930
    906931        # Reset the "minPwdLength" as it was before
    907         ldb.set_minPwdLength(minPwdLength)
     932        self.ldb.set_minPwdLength(minPwdLength)
    908933
    909934    def tearDown(self):
     
    920945        host = "ldap://%s" % host
    921946
    922 ldb = SamDB(url=host, session_info=system_session(lp), credentials=creds, lp=lp)
    923 
    924 # Gets back the basedn
    925 base_dn = ldb.domain_dn()
    926 
    927 # Gets back the configuration basedn
    928 configuration_dn = ldb.get_config_basedn().get_linearized()
    929 
    930 # Get the old "dSHeuristics" if it was set
    931 dsheuristics = ldb.get_dsheuristics()
    932 
    933 # Set the "dSHeuristics" to activate the correct "userPassword" behaviour
    934 ldb.set_dsheuristics("000000001")
    935 
    936 # Get the old "minPwdAge"
    937 minPwdAge = ldb.get_minPwdAge()
    938 
    939 # Set it temporarely to "0"
    940 ldb.set_minPwdAge("0")
    941 
    942 runner = SubunitTestRunner()
    943 rc = 0
    944 if not runner.run(unittest.makeSuite(PasswordTests)).wasSuccessful():
    945     rc = 1
    946 
    947 # Reset the "dSHeuristics" as they were before
    948 ldb.set_dsheuristics(dsheuristics)
    949 
    950 # Reset the "minPwdAge" as it was before
    951 ldb.set_minPwdAge(minPwdAge)
    952 
    953 sys.exit(rc)
     947TestProgram(module=__name__, opts=subunitopts)
  • vendor/current/source4/dsdb/tests/python/sam.py

    r740 r988  
    99sys.path.insert(0, "bin/python")
    1010import samba
    11 samba.ensure_external_module("testtools", "testtools")
    12 samba.ensure_external_module("subunit", "subunit/python")
     11from samba.tests.subunitrun import SubunitOptions, TestProgram
    1312
    1413import samba.getopt as options
     
    2221from ldb import ERR_CONSTRAINT_VIOLATION
    2322from ldb import ERR_UNDEFINED_ATTRIBUTE_TYPE
     23from ldb import ERR_INSUFFICIENT_ACCESS_RIGHTS
    2424from ldb import Message, MessageElement, Dn
    2525from ldb import FLAG_MOD_ADD, FLAG_MOD_REPLACE, FLAG_MOD_DELETE
     
    2828    UF_WORKSTATION_TRUST_ACCOUNT, UF_SERVER_TRUST_ACCOUNT,
    2929    UF_PARTIAL_SECRETS_ACCOUNT, UF_TEMP_DUPLICATE_ACCOUNT,
    30     UF_PASSWD_NOTREQD, ATYPE_NORMAL_ACCOUNT,
     30    UF_INTERDOMAIN_TRUST_ACCOUNT,
     31    UF_PASSWD_NOTREQD, UF_LOCKOUT, UF_PASSWORD_EXPIRED, ATYPE_NORMAL_ACCOUNT,
    3132    GTYPE_SECURITY_BUILTIN_LOCAL_GROUP, GTYPE_SECURITY_DOMAIN_LOCAL_GROUP,
    3233    GTYPE_SECURITY_GLOBAL_GROUP, GTYPE_SECURITY_UNIVERSAL_GROUP,
     
    3738    ATYPE_DISTRIBUTION_UNIVERSAL_GROUP, ATYPE_DISTRIBUTION_LOCAL_GROUP,
    3839    ATYPE_WORKSTATION_TRUST)
    39 from samba.dcerpc.security import (DOMAIN_RID_USERS, DOMAIN_RID_DOMAIN_MEMBERS,
    40     DOMAIN_RID_DCS, DOMAIN_RID_READONLY_DCS)
    41 
    42 from subunit.run import SubunitTestRunner
    43 import unittest
     40from samba.dcerpc.security import (DOMAIN_RID_USERS, DOMAIN_RID_ADMINS,
     41    DOMAIN_RID_DOMAIN_MEMBERS, DOMAIN_RID_DCS, DOMAIN_RID_READONLY_DCS)
    4442
    4543from samba.dcerpc import security
     
    5351credopts = options.CredentialsOptions(parser)
    5452parser.add_option_group(credopts)
     53subunitopts = SubunitOptions(parser)
     54parser.add_option_group(subunitopts)
    5555opts, args = parser.parse_args()
    5656
     
    6464creds = credopts.get_credentials(lp)
    6565
    66 class SamTests(unittest.TestCase):
     66class SamTests(samba.tests.TestCase):
    6767
    6868    def setUp(self):
     
    587587    def test_sam_attributes(self):
    588588        """Test the behaviour of special attributes of SAM objects"""
    589         print "Testing the behaviour of special attributes of SAM objects\n"""
     589        print "Testing the behaviour of special attributes of SAM objects\n"
    590590
    591591        ldb.add({
     
    14261426        # With SYSTEM rights you can set a interdomain trust account.
    14271427
    1428         # Invalid attribute
    1429         try:
    1430             ldb.add({
    1431                 "dn": "cn=ldaptestuser,cn=users," + self.base_dn,
    1432                 "objectclass": "user",
    1433                 "userAccountControl": "0"})
    1434             self.fail()
    1435         except LdbError, (num, _):
    1436             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
     1428        ldb.add({
     1429            "dn": "cn=ldaptestuser,cn=users," + self.base_dn,
     1430            "objectclass": "user",
     1431            "userAccountControl": "0"})
     1432
     1433        res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn,
     1434                          scope=SCOPE_BASE,
     1435                          attrs=["sAMAccountType", "userAccountControl"])
     1436        self.assertTrue(len(res1) == 1)
     1437        self.assertEquals(int(res1[0]["sAMAccountType"][0]),
     1438          ATYPE_NORMAL_ACCOUNT)
     1439        self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE == 0)
     1440        self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_PASSWD_NOTREQD == 0)
    14371441        delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
    14381442
    1439 # This has to wait until s4 supports it (needs a password module change)
    1440 #        try:
    1441 #            ldb.add({
    1442 #                "dn": "cn=ldaptestuser,cn=users," + self.base_dn,
    1443 #                "objectclass": "user",
    1444 #                "userAccountControl": str(UF_NORMAL_ACCOUNT)})
    1445 #            self.fail()
    1446 #        except LdbError, (num, _):
    1447 #            self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
    1448 #        delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
     1443        ldb.add({
     1444            "dn": "cn=ldaptestuser,cn=users," + self.base_dn,
     1445            "objectclass": "user",
     1446            "userAccountControl": str(UF_NORMAL_ACCOUNT)})
     1447        delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
    14491448
    14501449        ldb.add({
     
    14601459          ATYPE_NORMAL_ACCOUNT)
    14611460        self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE == 0)
     1461        delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
     1462
     1463        ldb.add({
     1464            "dn": "cn=ldaptestuser,cn=users," + self.base_dn,
     1465            "objectclass": "user",
     1466            "userAccountControl": str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD | UF_LOCKOUT | UF_PASSWORD_EXPIRED)})
     1467
     1468        res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn,
     1469                          scope=SCOPE_BASE,
     1470                          attrs=["sAMAccountType", "userAccountControl", "lockoutTime", "pwdLastSet"])
     1471        self.assertTrue(len(res1) == 1)
     1472        self.assertEquals(int(res1[0]["sAMAccountType"][0]),
     1473          ATYPE_NORMAL_ACCOUNT)
     1474        self.assertTrue(int(res1[0]["userAccountControl"][0]) & (UF_LOCKOUT | UF_PASSWORD_EXPIRED) == 0)
     1475        self.assertFalse("lockoutTime" in res1[0])
     1476        self.assertTrue(int(res1[0]["pwdLastSet"][0]) == 0)
    14621477        delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
    14631478
     
    14721487        delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
    14731488
    1474 # This isn't supported yet in s4
    1475 #        try:
    1476 #            ldb.add({
    1477 #                "dn": "cn=ldaptestuser,cn=users," + self.base_dn,
    1478 #                "objectclass": "user",
    1479 #                "userAccountControl": str(UF_SERVER_TRUST_ACCOUNT)})
    1480 #            self.fail()
    1481 #        except LdbError, (num, _):
    1482 #            self.assertEquals(num, ERR_OBJECT_CLASS_VIOLATION)
    1483 #        delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
    1484 #
    1485 #        try:
    1486 #            ldb.add({
    1487 #                "dn": "cn=ldaptestuser,cn=users," + self.base_dn,
    1488 #                "objectclass": "user",
    1489 #                "userAccountControl": str(UF_WORKSTATION_TRUST_ACCOUNT)})
    1490 #        except LdbError, (num, _):
    1491 #            self.assertEquals(num, ERR_OBJECT_CLASS_VIOLATION)
    1492 #        delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
    1493 
    1494 # This isn't supported yet in s4 - needs ACL module adaption
    1495 #        try:
    1496 #            ldb.add({
    1497 #                "dn": "cn=ldaptestuser,cn=users," + self.base_dn,
    1498 #                "objectclass": "user",
    1499 #                "userAccountControl": str(UF_INTERDOMAIN_TRUST_ACCOUNT)})
    1500 #            self.fail()
    1501 #        except LdbError, (num, _):
    1502 #            self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
    1503 #        delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
     1489        try:
     1490            ldb.add({
     1491                "dn": "cn=ldaptestuser,cn=users," + self.base_dn,
     1492                "objectclass": "user",
     1493                "userAccountControl": str(UF_SERVER_TRUST_ACCOUNT)})
     1494            self.fail()
     1495        except LdbError, (num, _):
     1496            self.assertEquals(num, ERR_OBJECT_CLASS_VIOLATION)
     1497        delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
     1498
     1499        try:
     1500            ldb.add({
     1501                "dn": "cn=ldaptestuser,cn=users," + self.base_dn,
     1502                "objectclass": "user",
     1503                "userAccountControl": str(UF_WORKSTATION_TRUST_ACCOUNT)})
     1504        except LdbError, (num, _):
     1505            self.assertEquals(num, ERR_OBJECT_CLASS_VIOLATION)
     1506        delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
     1507
     1508        try:
     1509            ldb.add({
     1510                "dn": "cn=ldaptestuser,cn=users," + self.base_dn,
     1511                "objectclass": "user",
     1512                "userAccountControl": str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PARTIAL_SECRETS_ACCOUNT)})
     1513        except LdbError, (num, _):
     1514            self.assertEquals(num, ERR_OBJECT_CLASS_VIOLATION)
     1515        delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
     1516
     1517        try:
     1518            ldb.add({
     1519                "dn": "cn=ldaptestuser,cn=users," + self.base_dn,
     1520                "objectclass": "user",
     1521                "userAccountControl": str(UF_INTERDOMAIN_TRUST_ACCOUNT)})
     1522            self.fail()
     1523        except LdbError, (num, _):
     1524            self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
     1525        delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
    15041526
    15051527        # Modify operation
     
    15341556            self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
    15351557
    1536 # This has to wait until s4 supports it (needs a password module change)
    1537 #        try:
    1538 #            m = Message()
    1539 #            m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
    1540 #            m["userAccountControl"] = MessageElement(
    1541 #              str(UF_NORMAL_ACCOUNT),
    1542 #              FLAG_MOD_REPLACE, "userAccountControl")
    1543 #            ldb.modify(m)
    1544 #        except LdbError, (num, _):
    1545 #            self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
     1558        try:
     1559            m = Message()
     1560            m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
     1561            m["userAccountControl"] = MessageElement(
     1562              str(UF_NORMAL_ACCOUNT),
     1563              FLAG_MOD_REPLACE, "userAccountControl")
     1564            ldb.modify(m)
     1565        except LdbError, (num, _):
     1566            self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
    15461567
    15471568        m = Message()
     
    15591580          ATYPE_NORMAL_ACCOUNT)
    15601581        self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE == 0)
     1582
     1583        m = Message()
     1584        m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
     1585        m["userAccountControl"] = MessageElement(
     1586          str(UF_ACCOUNTDISABLE),
     1587          FLAG_MOD_REPLACE, "userAccountControl")
     1588        ldb.modify(m)
     1589
     1590        res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn,
     1591                          scope=SCOPE_BASE,
     1592                          attrs=["sAMAccountType", "userAccountControl"])
     1593        self.assertTrue(len(res1) == 1)
     1594        self.assertEquals(int(res1[0]["sAMAccountType"][0]),
     1595          ATYPE_NORMAL_ACCOUNT)
     1596        self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_NORMAL_ACCOUNT != 0)
     1597        self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE != 0)
     1598
     1599        m = Message()
     1600        m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
     1601        m["lockoutTime"] = MessageElement(str(samba.unix2nttime(0)), FLAG_MOD_REPLACE, "lockoutTime")
     1602        m["pwdLastSet"] = MessageElement(str(samba.unix2nttime(0)), FLAG_MOD_REPLACE, "pwdLastSet")
     1603        ldb.modify(m)
     1604
     1605        m = Message()
     1606        m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
     1607        m["userAccountControl"] = MessageElement(
     1608          str(UF_LOCKOUT | UF_PASSWORD_EXPIRED),
     1609          FLAG_MOD_REPLACE, "userAccountControl")
     1610        ldb.modify(m)
     1611
     1612        res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn,
     1613                          scope=SCOPE_BASE,
     1614                          attrs=["sAMAccountType", "userAccountControl", "lockoutTime", "pwdLastSet"])
     1615        self.assertTrue(len(res1) == 1)
     1616        self.assertEquals(int(res1[0]["sAMAccountType"][0]),
     1617          ATYPE_NORMAL_ACCOUNT)
     1618        self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_NORMAL_ACCOUNT != 0)
     1619        self.assertTrue(int(res1[0]["userAccountControl"][0]) & (UF_LOCKOUT | UF_PASSWORD_EXPIRED) == 0)
     1620        self.assertTrue(int(res1[0]["lockoutTime"][0]) == 0)
     1621        self.assertTrue(int(res1[0]["pwdLastSet"][0]) == 0)
    15611622
    15621623        try:
     
    15711632            self.assertEquals(num, ERR_OTHER)
    15721633
    1573 # This isn't supported yet in s4
    1574 #        try:
    1575 #            m = Message()
    1576 #            m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
    1577 #            m["userAccountControl"] = MessageElement(
    1578 #              str(UF_SERVER_TRUST_ACCOUNT),
    1579 #              FLAG_MOD_REPLACE, "userAccountControl")
    1580 #            ldb.modify(m)
    1581 #            self.fail()
    1582 #        except LdbError, (num, _):
    1583 #            self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
     1634        try:
     1635            m = Message()
     1636            m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
     1637            m["userAccountControl"] = MessageElement(
     1638              str(UF_SERVER_TRUST_ACCOUNT),
     1639              FLAG_MOD_REPLACE, "userAccountControl")
     1640            ldb.modify(m)
     1641            self.fail()
     1642        except LdbError, (num, _):
     1643            self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
    15841644
    15851645        m = Message()
     
    15901650        ldb.modify(m)
    15911651
     1652        try:
     1653            m = Message()
     1654            m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
     1655            m["userAccountControl"] = MessageElement(
     1656              str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PARTIAL_SECRETS_ACCOUNT),
     1657              FLAG_MOD_REPLACE, "userAccountControl")
     1658            ldb.modify(m)
     1659            self.fail()
     1660        except LdbError, (num, _):
     1661            self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
     1662
    15921663        res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn,
    15931664                          scope=SCOPE_BASE, attrs=["sAMAccountType"])
     
    16091680          ATYPE_NORMAL_ACCOUNT)
    16101681
    1611 # This isn't supported yet in s4 - needs ACL module adaption
    1612 #        try:
    1613 #            m = Message()
    1614 #            m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
    1615 #            m["userAccountControl"] = MessageElement(
    1616 #              str(UF_INTERDOMAIN_TRUST_ACCOUNT),
    1617 #              FLAG_MOD_REPLACE, "userAccountControl")
    1618 #            ldb.modify(m)
    1619 #            self.fail()
    1620 #        except LdbError, (num, _):
    1621 #            self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
     1682        try:
     1683            m = Message()
     1684            m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
     1685            m["userAccountControl"] = MessageElement(
     1686              str(UF_INTERDOMAIN_TRUST_ACCOUNT),
     1687              FLAG_MOD_REPLACE, "userAccountControl")
     1688            ldb.modify(m)
     1689            self.fail()
     1690        except LdbError, (num, _):
     1691            self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
    16221692
    16231693        # With a computer object
     
    16301700        # With SYSTEM rights you can set a interdomain trust account.
    16311701
    1632         # Invalid attribute
    1633         try:
    1634             ldb.add({
    1635                 "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn,
    1636                 "objectclass": "computer",
    1637                 "userAccountControl": "0"})
    1638             self.fail()
    1639         except LdbError, (num, _):
    1640             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
     1702        ldb.add({
     1703            "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn,
     1704            "objectclass": "computer",
     1705            "userAccountControl": "0"})
     1706
     1707        res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn,
     1708                          scope=SCOPE_BASE,
     1709                          attrs=["sAMAccountType", "userAccountControl"])
     1710        self.assertTrue(len(res1) == 1)
     1711        self.assertEquals(int(res1[0]["sAMAccountType"][0]),
     1712          ATYPE_NORMAL_ACCOUNT)
     1713        self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE == 0)
     1714        self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_PASSWD_NOTREQD == 0)
    16411715        delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
    16421716
    1643 # This has to wait until s4 supports it (needs a password module change)
    1644 #        try:
    1645 #            ldb.add({
    1646 #                "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn,
    1647 #                "objectclass": "computer",
    1648 #                "userAccountControl": str(UF_NORMAL_ACCOUNT)})
    1649 #            self.fail()
    1650 #        except LdbError, (num, _):
    1651 #            self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
    1652 #        delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
     1717        ldb.add({
     1718            "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn,
     1719            "objectclass": "computer",
     1720            "userAccountControl": str(UF_NORMAL_ACCOUNT)})
     1721        delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
    16531722
    16541723        ldb.add({
     
    16641733          ATYPE_NORMAL_ACCOUNT)
    16651734        self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE == 0)
     1735        delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
     1736
     1737        ldb.add({
     1738            "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn,
     1739            "objectclass": "computer",
     1740            "userAccountControl": str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD | UF_LOCKOUT | UF_PASSWORD_EXPIRED)})
     1741
     1742        res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn,
     1743                          scope=SCOPE_BASE,
     1744                          attrs=["sAMAccountType", "userAccountControl", "lockoutTime", "pwdLastSet"])
     1745        self.assertTrue(len(res1) == 1)
     1746        self.assertEquals(int(res1[0]["sAMAccountType"][0]),
     1747          ATYPE_NORMAL_ACCOUNT)
     1748        self.assertTrue(int(res1[0]["userAccountControl"][0]) & (UF_LOCKOUT | UF_PASSWORD_EXPIRED) == 0)
     1749        self.assertFalse("lockoutTime" in res1[0])
     1750        self.assertTrue(int(res1[0]["pwdLastSet"][0]) == 0)
    16661751        delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
    16671752
     
    16971782        delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
    16981783
    1699 # This isn't supported yet in s4 - needs ACL module adaption
    1700 #        try:
    1701 #            ldb.add({
    1702 #                "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn,
    1703 #                "objectclass": "computer",
    1704 #                "userAccountControl": str(UF_INTERDOMAIN_TRUST_ACCOUNT)})
    1705 #            self.fail()
    1706 #        except LdbError, (num, _):
    1707 #            self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
    1708 #        delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
     1784        try:
     1785            ldb.add({
     1786                "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn,
     1787                "objectclass": "computer",
     1788                "userAccountControl": str(UF_INTERDOMAIN_TRUST_ACCOUNT)})
     1789            self.fail()
     1790        except LdbError, (num, _):
     1791            self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
     1792        delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
    17091793
    17101794        # Modify operation
     
    17401824            self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
    17411825
    1742 # This has to wait until s4 supports it (needs a password module change)
    1743 #        try:
    1744 #            m = Message()
    1745 #            m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
    1746 #            m["userAccountControl"] = MessageElement(
    1747 #              str(UF_NORMAL_ACCOUNT),
    1748 #              FLAG_MOD_REPLACE, "userAccountControl")
    1749 #            ldb.modify(m)
    1750 #        except LdbError, (num, _):
    1751 #            self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
     1826        try:
     1827            m = Message()
     1828            m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
     1829            m["userAccountControl"] = MessageElement(
     1830              str(UF_NORMAL_ACCOUNT),
     1831              FLAG_MOD_REPLACE, "userAccountControl")
     1832            ldb.modify(m)
     1833        except LdbError, (num, _):
     1834            self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
    17521835
    17531836        m = Message()
     
    17651848          ATYPE_NORMAL_ACCOUNT)
    17661849        self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE == 0)
     1850
     1851        m = Message()
     1852        m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
     1853        m["userAccountControl"] = MessageElement(
     1854          str(UF_ACCOUNTDISABLE),
     1855          FLAG_MOD_REPLACE, "userAccountControl")
     1856        ldb.modify(m)
     1857
     1858        res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn,
     1859                          scope=SCOPE_BASE,
     1860                          attrs=["sAMAccountType", "userAccountControl"])
     1861        self.assertTrue(len(res1) == 1)
     1862        self.assertEquals(int(res1[0]["sAMAccountType"][0]),
     1863          ATYPE_NORMAL_ACCOUNT)
     1864        self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_NORMAL_ACCOUNT != 0)
     1865        self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE != 0)
     1866
     1867        m = Message()
     1868        m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
     1869        m["lockoutTime"] = MessageElement(str(samba.unix2nttime(0)), FLAG_MOD_REPLACE, "lockoutTime")
     1870        m["pwdLastSet"] = MessageElement(str(samba.unix2nttime(0)), FLAG_MOD_REPLACE, "pwdLastSet")
     1871        ldb.modify(m)
     1872
     1873        m = Message()
     1874        m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
     1875        m["userAccountControl"] = MessageElement(
     1876          str(UF_LOCKOUT | UF_PASSWORD_EXPIRED),
     1877          FLAG_MOD_REPLACE, "userAccountControl")
     1878        ldb.modify(m)
     1879
     1880        res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn,
     1881                          scope=SCOPE_BASE,
     1882                          attrs=["sAMAccountType", "userAccountControl", "lockoutTime", "pwdLastSet"])
     1883        self.assertTrue(len(res1) == 1)
     1884        self.assertEquals(int(res1[0]["sAMAccountType"][0]),
     1885          ATYPE_NORMAL_ACCOUNT)
     1886        self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_NORMAL_ACCOUNT != 0)
     1887        self.assertTrue(int(res1[0]["userAccountControl"][0]) & (UF_LOCKOUT | UF_PASSWORD_EXPIRED) == 0)
     1888        self.assertTrue(int(res1[0]["lockoutTime"][0]) == 0)
     1889        self.assertTrue(int(res1[0]["pwdLastSet"][0]) == 0)
    17671890
    17681891        try:
     
    18551978          ATYPE_WORKSTATION_TRUST)
    18561979
    1857 # This isn't supported yet in s4 - needs ACL module adaption
    1858 #        try:
    1859 #            m = Message()
    1860 #            m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
    1861 #            m["userAccountControl"] = MessageElement(
    1862 #              str(UF_INTERDOMAIN_TRUST_ACCOUNT),
    1863 #              FLAG_MOD_REPLACE, "userAccountControl")
    1864 #            ldb.modify(m)
    1865 #            self.fail()
    1866 #        except LdbError, (num, _):
    1867 #            self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
     1980        try:
     1981            m = Message()
     1982            m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
     1983            m["userAccountControl"] = MessageElement(
     1984              str(UF_INTERDOMAIN_TRUST_ACCOUNT),
     1985              FLAG_MOD_REPLACE, "userAccountControl")
     1986            ldb.modify(m)
     1987            self.fail()
     1988        except LdbError, (num, _):
     1989            self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
     1990
     1991        # "primaryGroupID" does not change if account type remains the same
     1992
     1993        # For a user account
     1994
     1995        ldb.add({
     1996            "dn": "cn=ldaptestuser2,cn=users," + self.base_dn,
     1997            "objectclass": "user",
     1998            "userAccountControl": str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD | UF_ACCOUNTDISABLE)})
     1999
     2000        res1 = ldb.search("cn=ldaptestuser2,cn=users," + self.base_dn,
     2001                          scope=SCOPE_BASE,
     2002                          attrs=["userAccountControl"])
     2003        self.assertTrue(len(res1) == 1)
     2004        self.assertEquals(int(res1[0]["userAccountControl"][0]),
     2005           UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD | UF_ACCOUNTDISABLE)
     2006
     2007        m = Message()
     2008        m.dn = Dn(ldb, "<SID=" + ldb.get_domain_sid() + "-" + str(DOMAIN_RID_ADMINS) + ">")
     2009        m["member"] = MessageElement(
     2010          "cn=ldaptestuser2,cn=users," + self.base_dn, FLAG_MOD_ADD, "member")
     2011        ldb.modify(m)
     2012
     2013        m = Message()
     2014        m.dn = Dn(ldb, "cn=ldaptestuser2,cn=users," + self.base_dn)
     2015        m["primaryGroupID"] = MessageElement(str(DOMAIN_RID_ADMINS),
     2016          FLAG_MOD_REPLACE, "primaryGroupID")
     2017        ldb.modify(m)
     2018
     2019        m = Message()
     2020        m.dn = Dn(ldb, "cn=ldaptestuser2,cn=users," + self.base_dn)
     2021        m["userAccountControl"] = MessageElement(
     2022          str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD),
     2023          FLAG_MOD_REPLACE, "userAccountControl")
     2024        ldb.modify(m)
     2025
     2026        res1 = ldb.search("cn=ldaptestuser2,cn=users," + self.base_dn,
     2027                          scope=SCOPE_BASE,
     2028                          attrs=["userAccountControl", "primaryGroupID"])
     2029        self.assertTrue(len(res1) == 1)
     2030        self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE == 0)
     2031        self.assertEquals(int(res1[0]["primaryGroupID"][0]), DOMAIN_RID_ADMINS)
     2032
     2033        # For a workstation account
     2034
     2035        res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn,
     2036                          scope=SCOPE_BASE,
     2037                          attrs=["primaryGroupID"])
     2038        self.assertTrue(len(res1) == 1)
     2039        self.assertEquals(int(res1[0]["primaryGroupID"][0]), DOMAIN_RID_DOMAIN_MEMBERS)
     2040
     2041        m = Message()
     2042        m.dn = Dn(ldb, "<SID=" + ldb.get_domain_sid() + "-" + str(DOMAIN_RID_USERS) + ">")
     2043        m["member"] = MessageElement(
     2044          "cn=ldaptestcomputer,cn=computers," + self.base_dn, FLAG_MOD_ADD, "member")
     2045        ldb.modify(m)
     2046
     2047        m = Message()
     2048        m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
     2049        m["primaryGroupID"] = MessageElement(str(DOMAIN_RID_USERS),
     2050          FLAG_MOD_REPLACE, "primaryGroupID")
     2051        ldb.modify(m)
     2052
     2053        m = Message()
     2054        m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
     2055        m["userAccountControl"] = MessageElement(
     2056          str(UF_WORKSTATION_TRUST_ACCOUNT),
     2057          FLAG_MOD_REPLACE, "userAccountControl")
     2058        ldb.modify(m)
     2059
     2060        res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn,
     2061                          scope=SCOPE_BASE,
     2062                          attrs=["primaryGroupID"])
     2063        self.assertTrue(len(res1) == 1)
     2064        self.assertEquals(int(res1[0]["primaryGroupID"][0]), DOMAIN_RID_USERS)
    18682065
    18692066        delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
     2067        delete_force(self.ldb, "cn=ldaptestuser2,cn=users," + self.base_dn)
     2068        delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
     2069
     2070    def test_isCriticalSystemObject(self):
     2071        """Test the isCriticalSystemObject behaviour"""
     2072        print "Testing isCriticalSystemObject behaviour\n"
     2073
     2074        # Add tests
     2075
     2076        ldb.add({
     2077            "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn,
     2078            "objectclass": "computer"})
     2079
     2080        res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn,
     2081                          scope=SCOPE_BASE,
     2082                          attrs=["isCriticalSystemObject"])
     2083        self.assertTrue(len(res1) == 1)
     2084        self.assertTrue("isCriticalSystemObject" not in res1[0])
     2085
     2086        delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
     2087
     2088        ldb.add({
     2089            "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn,
     2090            "objectclass": "computer",
     2091            "userAccountControl": str(UF_WORKSTATION_TRUST_ACCOUNT)})
     2092
     2093        res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn,
     2094                          scope=SCOPE_BASE,
     2095                          attrs=["isCriticalSystemObject"])
     2096        self.assertTrue(len(res1) == 1)
     2097        self.assertEquals(res1[0]["isCriticalSystemObject"][0], "FALSE")
     2098
     2099        delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
     2100
     2101        ldb.add({
     2102            "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn,
     2103            "objectclass": "computer",
     2104            "userAccountControl": str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PARTIAL_SECRETS_ACCOUNT)})
     2105
     2106        res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn,
     2107                          scope=SCOPE_BASE,
     2108                          attrs=["isCriticalSystemObject"])
     2109        self.assertTrue(len(res1) == 1)
     2110        self.assertEquals(res1[0]["isCriticalSystemObject"][0], "TRUE")
     2111
     2112        delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
     2113
     2114        ldb.add({
     2115            "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn,
     2116            "objectclass": "computer",
     2117            "userAccountControl": str(UF_SERVER_TRUST_ACCOUNT)})
     2118
     2119        res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn,
     2120                          scope=SCOPE_BASE,
     2121                          attrs=["isCriticalSystemObject"])
     2122        self.assertTrue(len(res1) == 1)
     2123        self.assertEquals(res1[0]["isCriticalSystemObject"][0], "TRUE")
     2124
     2125        # Modification tests
     2126
     2127        m = Message()
     2128        m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
     2129        m["userAccountControl"] = MessageElement(str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD),
     2130          FLAG_MOD_REPLACE, "userAccountControl")
     2131        ldb.modify(m)
     2132
     2133        res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn,
     2134                          scope=SCOPE_BASE,
     2135                          attrs=["isCriticalSystemObject"])
     2136        self.assertTrue(len(res1) == 1)
     2137        self.assertEquals(res1[0]["isCriticalSystemObject"][0], "TRUE")
     2138
     2139        m = Message()
     2140        m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
     2141        m["userAccountControl"] = MessageElement(str(UF_WORKSTATION_TRUST_ACCOUNT),
     2142          FLAG_MOD_REPLACE, "userAccountControl")
     2143        ldb.modify(m)
     2144
     2145        res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn,
     2146                          scope=SCOPE_BASE,
     2147                          attrs=["isCriticalSystemObject"])
     2148        self.assertTrue(len(res1) == 1)
     2149        self.assertEquals(res1[0]["isCriticalSystemObject"][0], "FALSE")
     2150
     2151        m = Message()
     2152        m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
     2153        m["userAccountControl"] = MessageElement(
     2154          str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PARTIAL_SECRETS_ACCOUNT),
     2155          FLAG_MOD_REPLACE, "userAccountControl")
     2156        ldb.modify(m)
     2157
     2158        res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn,
     2159                          scope=SCOPE_BASE,
     2160                          attrs=["isCriticalSystemObject"])
     2161        self.assertTrue(len(res1) == 1)
     2162        self.assertEquals(res1[0]["isCriticalSystemObject"][0], "TRUE")
     2163
     2164        m = Message()
     2165        m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
     2166        m["userAccountControl"] = MessageElement(str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD),
     2167          FLAG_MOD_REPLACE, "userAccountControl")
     2168        ldb.modify(m)
     2169
     2170        res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn,
     2171                          scope=SCOPE_BASE,
     2172                          attrs=["isCriticalSystemObject"])
     2173        self.assertTrue(len(res1) == 1)
     2174        self.assertEquals(res1[0]["isCriticalSystemObject"][0], "TRUE")
     2175
     2176        m = Message()
     2177        m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
     2178        m["userAccountControl"] = MessageElement(str(UF_SERVER_TRUST_ACCOUNT),
     2179          FLAG_MOD_REPLACE, "userAccountControl")
     2180        ldb.modify(m)
     2181
     2182        res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn,
     2183                          scope=SCOPE_BASE,
     2184                          attrs=["isCriticalSystemObject"])
     2185        self.assertTrue(len(res1) == 1)
     2186        self.assertEquals(res1[0]["isCriticalSystemObject"][0], "TRUE")
     2187
     2188        m = Message()
     2189        m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
     2190        m["userAccountControl"] = MessageElement(str(UF_WORKSTATION_TRUST_ACCOUNT),
     2191          FLAG_MOD_REPLACE, "userAccountControl")
     2192        ldb.modify(m)
     2193
     2194        res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn,
     2195                          scope=SCOPE_BASE,
     2196                          attrs=["isCriticalSystemObject"])
     2197        self.assertTrue(len(res1) == 1)
     2198        self.assertEquals(res1[0]["isCriticalSystemObject"][0], "FALSE")
     2199
    18702200        delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
    18712201
     
    22072537        self.assertEquals(res[0]["dNSHostName"][0], "testname2.testdom")
    22082538        self.assertEquals(res[0]["sAMAccountName"][0], "testname2$")
    2209         self.assertTrue(res[0]["servicePrincipalName"][0] == "HOST/testname2" or
    2210                         res[0]["servicePrincipalName"][1] == "HOST/testname2")
    2211         self.assertTrue(res[0]["servicePrincipalName"][0] == "HOST/testname2.testdom" or
    2212                         res[0]["servicePrincipalName"][1] == "HOST/testname2.testdom")
     2539        self.assertTrue(len(res[0]["servicePrincipalName"]) == 2)
     2540        self.assertTrue("HOST/testname2" in res[0]["servicePrincipalName"])
     2541        self.assertTrue("HOST/testname2.testdom" in res[0]["servicePrincipalName"])
     2542
     2543        m = Message()
     2544        m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
     2545        m["servicePrincipalName"] = MessageElement("HOST/testname2.testdom",
     2546                                                   FLAG_MOD_ADD, "servicePrincipalName")
     2547        try:
     2548            ldb.modify(m)
     2549            self.fail()
     2550        except LdbError, (num, _):
     2551            self.assertEquals(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS)
     2552
     2553        m = Message()
     2554        m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
     2555        m["servicePrincipalName"] = MessageElement("HOST/testname3",
     2556                                                   FLAG_MOD_ADD, "servicePrincipalName")
     2557        ldb.modify(m)
     2558
     2559        res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn,
     2560                         scope=SCOPE_BASE, attrs=["dNSHostName", "sAMAccountName", "servicePrincipalName"])
     2561        self.assertTrue(len(res) == 1)
     2562        self.assertEquals(res[0]["dNSHostName"][0], "testname2.testdom")
     2563        self.assertEquals(res[0]["sAMAccountName"][0], "testname2$")
     2564        self.assertTrue(len(res[0]["servicePrincipalName"]) == 3)
     2565        self.assertTrue("HOST/testname2" in res[0]["servicePrincipalName"])
     2566        self.assertTrue("HOST/testname3" in res[0]["servicePrincipalName"])
     2567        self.assertTrue("HOST/testname2.testdom" in res[0]["servicePrincipalName"])
     2568
     2569        m = Message()
     2570        m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
     2571        m["dNSHostName"] = MessageElement("testname3.testdom",
     2572                                          FLAG_MOD_REPLACE, "dNSHostName")
     2573        m["servicePrincipalName"] = MessageElement("HOST/testname3.testdom",
     2574                                                   FLAG_MOD_ADD, "servicePrincipalName")
     2575        ldb.modify(m)
     2576
     2577        res = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn,
     2578                         scope=SCOPE_BASE, attrs=["dNSHostName", "sAMAccountName", "servicePrincipalName"])
     2579        self.assertTrue(len(res) == 1)
     2580        self.assertEquals(res[0]["dNSHostName"][0], "testname3.testdom")
     2581        self.assertEquals(res[0]["sAMAccountName"][0], "testname2$")
     2582        self.assertTrue(len(res[0]["servicePrincipalName"]) == 3)
     2583        self.assertTrue("HOST/testname2" in res[0]["servicePrincipalName"])
     2584        self.assertTrue("HOST/testname3" in res[0]["servicePrincipalName"])
     2585        self.assertTrue("HOST/testname3.testdom" in res[0]["servicePrincipalName"])
    22132586
    22142587        delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
     
    22162589    def test_sam_description_attribute(self):
    22172590        """Test SAM description attribute"""
    2218         print "Test SAM description attribute"""
     2591        print "Test SAM description attribute"
    22192592
    22202593        self.ldb.add({
    22212594            "dn": "cn=ldaptestgroup,cn=users," + self.base_dn,
    2222             "description": "desc2",
    22232595            "objectclass": "group",
    2224             "description": "desc1"})
     2596            "description": "desc1"
     2597        })
    22252598
    22262599        res = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn,
     
    23822755
    23832756
     2757    def test_fSMORoleOwner_attribute(self):
     2758        """Test fSMORoleOwner attribute"""
     2759        print "Test fSMORoleOwner attribute"
     2760
     2761        ds_service_name = self.ldb.get_dsServiceName()
     2762
     2763        # The "fSMORoleOwner" attribute can only be set to "nTDSDSA" entries,
     2764        # invalid DNs return ERR_UNWILLING_TO_PERFORM
     2765
     2766        try:
     2767            self.ldb.add({
     2768                "dn": "cn=ldaptestgroup,cn=users," + self.base_dn,
     2769                "objectclass": "group",
     2770                "fSMORoleOwner": self.base_dn})
     2771            self.fail()
     2772        except LdbError, (num, _):
     2773            self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
     2774
     2775        try:
     2776            self.ldb.add({
     2777                "dn": "cn=ldaptestgroup,cn=users," + self.base_dn,
     2778                "objectclass": "group",
     2779                "fSMORoleOwner": [] })
     2780            self.fail()
     2781        except LdbError, (num, _):
     2782            self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
     2783
     2784        # We are able to set it to a valid "nTDSDSA" entry if the server is
     2785        # capable of handling the role
     2786
     2787        self.ldb.add({
     2788            "dn": "cn=ldaptestgroup,cn=users," + self.base_dn,
     2789            "objectclass": "group",
     2790            "fSMORoleOwner": ds_service_name })
     2791
     2792        delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn)
     2793
     2794        self.ldb.add({
     2795            "dn": "cn=ldaptestgroup,cn=users," + self.base_dn,
     2796            "objectclass": "group" })
     2797
     2798        m = Message()
     2799        m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn)
     2800        m.add(MessageElement(self.base_dn, FLAG_MOD_REPLACE, "fSMORoleOwner"))
     2801        try:
     2802            ldb.modify(m)
     2803            self.fail()
     2804        except LdbError, (num, _):
     2805            self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
     2806
     2807        m = Message()
     2808        m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn)
     2809        m.add(MessageElement([], FLAG_MOD_REPLACE, "fSMORoleOwner"))
     2810        try:
     2811            ldb.modify(m)
     2812            self.fail()
     2813        except LdbError, (num, _):
     2814            self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
     2815
     2816        # We are able to set it to a valid "nTDSDSA" entry if the server is
     2817        # capable of handling the role
     2818
     2819        m = Message()
     2820        m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn)
     2821        m.add(MessageElement(ds_service_name, FLAG_MOD_REPLACE, "fSMORoleOwner"))
     2822        ldb.modify(m)
     2823
     2824        # A clean-out works on plain entries, not master (schema, PDC...) DNs
     2825
     2826        m = Message()
     2827        m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn)
     2828        m.add(MessageElement([], FLAG_MOD_DELETE, "fSMORoleOwner"))
     2829        ldb.modify(m)
     2830
     2831        delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn)
     2832
     2833    def test_protected_sid_objects(self):
     2834        """Test deletion of objects with RID < 1000"""
     2835        # a list of some well-known sids
     2836        # objects in Builtin are aready covered by objectclass
     2837        protected_list = [
     2838            ["CN=Domain Admins","CN=Users,"],
     2839            ["CN=Schema Admins","CN=Users,"],
     2840            ["CN=Enterprise Admins","CN=Users,"],
     2841            ["CN=Administrator","CN=Users,"],
     2842            ["CN=Domain Controllers","CN=Users,"],
     2843            ]
     2844
     2845
     2846
     2847        for pr_object in protected_list:
     2848            try:
     2849                self.ldb.delete(pr_object[0] + "," + pr_object[1] + self.base_dn)
     2850            except LdbError, (num, _):
     2851                self.assertEquals(num, ERR_OTHER)
     2852            else:
     2853                self.fail("Deleted " + pr_object[0])
     2854
     2855            try:
     2856                self.ldb.rename(pr_object[0] + "," + pr_object[1] + self.base_dn,
     2857                                pr_object[0] + "2," + pr_object[1] + self.base_dn)
     2858            except LdbError, (num, _):
     2859                self.fail("Could not rename " + pr_object[0])
     2860
     2861            self.ldb.rename(pr_object[0] + "2," + pr_object[1] + self.base_dn,
     2862                            pr_object[0] + "," + pr_object[1] + self.base_dn)
     2863
     2864    def test_new_user_default_attributes(self):
     2865        """Test default attributes for new user objects"""
     2866        print "Test default attributes for new User objects\n"
     2867
     2868        user_name = "ldaptestuser"
     2869        user_dn = "CN=%s,CN=Users,%s" % (user_name, self.base_dn)
     2870        ldb.add({
     2871            "dn": user_dn,
     2872            "objectclass": "user",
     2873            "sAMAccountName": user_name})
     2874
     2875        res = ldb.search(user_dn, scope=SCOPE_BASE)
     2876        self.assertTrue(len(res) == 1)
     2877        user_obj = res[0]
     2878
     2879        expected_attrs = {"primaryGroupID": MessageElement(["513"]),
     2880                          "logonCount": MessageElement(["0"]),
     2881                          "cn": MessageElement([user_name]),
     2882                          "countryCode": MessageElement(["0"]),
     2883                          "objectClass": MessageElement(["top","person","organizationalPerson","user"]),
     2884                          "instanceType": MessageElement(["4"]),
     2885                          "distinguishedName": MessageElement([user_dn]),
     2886                          "sAMAccountType": MessageElement(["805306368"]),
     2887                          "objectSid": "**SKIP**",
     2888                          "whenCreated": "**SKIP**",
     2889                          "uSNCreated": "**SKIP**",
     2890                          "badPasswordTime": MessageElement(["0"]),
     2891                          "dn": Dn(ldb, user_dn),
     2892                          "pwdLastSet": MessageElement(["0"]),
     2893                          "sAMAccountName": MessageElement([user_name]),
     2894                          "objectCategory": MessageElement(["CN=Person,%s" % ldb.get_schema_basedn().get_linearized()]),
     2895                          "objectGUID": "**SKIP**",
     2896                          "whenChanged": "**SKIP**",
     2897                          "badPwdCount": MessageElement(["0"]),
     2898                          "accountExpires": MessageElement(["9223372036854775807"]),
     2899                          "name": MessageElement([user_name]),
     2900                          "codePage": MessageElement(["0"]),
     2901                          "userAccountControl": MessageElement(["546"]),
     2902                          "lastLogon": MessageElement(["0"]),
     2903                          "uSNChanged": "**SKIP**",
     2904                          "lastLogoff": MessageElement(["0"])}
     2905        # assert we have expected attribute names
     2906        actual_names = set(user_obj.keys())
     2907        # Samba does not use 'dSCorePropagationData', so skip it
     2908        actual_names -= set(['dSCorePropagationData'])
     2909        self.assertEqual(set(expected_attrs.keys()), actual_names, "Actual object does not have expected attributes")
     2910        # check attribute values
     2911        for name in expected_attrs.keys():
     2912            actual_val = user_obj.get(name)
     2913            self.assertFalse(actual_val is None, "No value for attribute '%s'" % name)
     2914            expected_val = expected_attrs[name]
     2915            if expected_val == "**SKIP**":
     2916                # "**ANY**" values means "any"
     2917                continue
     2918            self.assertEqual(expected_val, actual_val,
     2919                             "Unexpected value for '%s'" % name)
     2920        # clean up
     2921        delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
     2922
     2923
    23842924if not "://" in host:
    23852925    if os.path.isfile(host):
     
    23902930ldb = SamDB(host, credentials=creds, session_info=system_session(lp), lp=lp)
    23912931
    2392 runner = SubunitTestRunner()
    2393 rc = 0
    2394 if not runner.run(unittest.makeSuite(SamTests)).wasSuccessful():
    2395     rc = 1
    2396 sys.exit(rc)
     2932TestProgram(module=__name__, opts=subunitopts)
  • vendor/current/source4/dsdb/tests/python/sec_descriptor.py

    r740 r988  
    1111sys.path.insert(0, "bin/python")
    1212import samba
    13 samba.ensure_external_module("testtools", "testtools")
    14 samba.ensure_external_module("subunit", "subunit/python")
     13
     14from samba.tests.subunitrun import SubunitOptions, TestProgram
    1515
    1616import samba.getopt as options
     
    2020
    2121# For running the test unit
    22 from samba.ndr import ndr_pack
     22from samba.ndr import ndr_pack, ndr_unpack
    2323from samba.dcerpc import security
    2424
    2525from samba import gensec, sd_utils
    2626from samba.samdb import SamDB
    27 from samba.credentials import Credentials
     27from samba.credentials import Credentials, DONT_USE_KERBEROS
    2828from samba.auth import system_session
    2929from samba.dsdb import DS_DOMAIN_FUNCTION_2008
    3030from samba.dcerpc.security import (
    3131    SECINFO_OWNER, SECINFO_GROUP, SECINFO_DACL, SECINFO_SACL)
    32 from subunit.run import SubunitTestRunner
    3332import samba.tests
    3433from samba.tests import delete_force
    35 import unittest
    3634
    3735parser = optparse.OptionParser("sec_descriptor.py [options] <host>")
     
    4341credopts = options.CredentialsOptions(parser)
    4442parser.add_option_group(credopts)
     43subunitopts = SubunitOptions(parser)
     44parser.add_option_group(subunitopts)
     45
    4546opts, args = parser.parse_args()
    4647
     
    137138        creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
    138139                                      | gensec.FEATURE_SEAL)
     140        creds_tmp.set_kerberos_state(DONT_USE_KERBEROS) # kinit is too expensive to use in a tight loop
    139141        ldb_target = SamDB(url=host, credentials=creds_tmp, lp=lp)
    140142        return ldb_target
     
    142144    def setUp(self):
    143145        super(DescriptorTests, self).setUp()
    144         self.ldb_admin = ldb
    145         self.base_dn = ldb.domain_dn()
     146        self.ldb_admin = SamDB(host, credentials=creds, session_info=system_session(lp), lp=lp,
     147            options=ldb_options)
     148        self.base_dn = self.ldb_admin.domain_dn()
    146149        self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
    147150        self.schema_dn = self.ldb_admin.get_schema_basedn().get_linearized()
    148151        self.domain_sid = security.dom_sid(self.ldb_admin.get_domain_sid())
    149         self.sd_utils = sd_utils.SDUtils(ldb)
     152        self.sd_utils = sd_utils.SDUtils(self.ldb_admin)
    150153        print "baseDN: %s" % self.base_dn
    151154
     
    200203
    201204        self.ldb_admin.add_remove_group_members("Enterprise Admins",
    202                                                 "testuser1,testuser5,testuser6,testuser8",
     205                                                ["testuser1", "testuser5", "testuser6", "testuser8"],
    203206                                                add_members_operation=True)
    204207        self.ldb_admin.add_remove_group_members("Domain Admins",
    205                                                 "testuser2,testuser5,testuser6,testuser7",
     208                                                ["testuser2","testuser5","testuser6","testuser7"],
    206209                                                add_members_operation=True)
    207210        self.ldb_admin.add_remove_group_members("Schema Admins",
    208                                                 "testuser3,testuser6,testuser7,testuser8",
     211                                                ["testuser3","testuser6","testuser7","testuser8"],
    209212                                                add_members_operation=True)
    210213
     
    313316            },
    314317        }
    315         # Discover 'msDS-Behavior-Version'
    316         res = self.ldb_admin.search(base=self.base_dn, expression="distinguishedName=%s" % self.base_dn, \
    317                 attrs=['msDS-Behavior-Version'])
    318         res = int(res[0]['msDS-Behavior-Version'][0])
     318        # Discover 'domainControllerFunctionality'
     319        res = self.ldb_admin.search(base="", scope=SCOPE_BASE,
     320                                    attrs=['domainControllerFunctionality'])
     321        res = int(res[0]['domainControllerFunctionality'][0])
    319322        if res < DS_DOMAIN_FUNCTION_2008:
    320323            self.DS_BEHAVIOR = "ds_behavior_win2003"
     
    18481851        self.assertFalse("G:" in desc_sddl)
    18491852
     1853    def test_311(self):
     1854        sd_flags = (SECINFO_OWNER |
     1855                    SECINFO_GROUP |
     1856                    SECINFO_DACL |
     1857                    SECINFO_SACL)
     1858
     1859        res = self.ldb_admin.search(self.base_dn, SCOPE_BASE, None,
     1860                [], controls=None)
     1861        self.assertFalse("nTSecurityDescriptor" in res[0])
     1862
     1863        res = self.ldb_admin.search(self.base_dn, SCOPE_BASE, None,
     1864                ["name"], controls=None)
     1865        self.assertFalse("nTSecurityDescriptor" in res[0])
     1866
     1867        res = self.ldb_admin.search(self.base_dn, SCOPE_BASE, None,
     1868                ["name"], controls=["sd_flags:1:%d" % (sd_flags)])
     1869        self.assertFalse("nTSecurityDescriptor" in res[0])
     1870
     1871        res = self.ldb_admin.search(self.base_dn, SCOPE_BASE, None,
     1872                [], controls=["sd_flags:1:%d" % (sd_flags)])
     1873        self.assertTrue("nTSecurityDescriptor" in res[0])
     1874        tmp = res[0]["nTSecurityDescriptor"][0]
     1875        sd = ndr_unpack(security.descriptor, tmp)
     1876        sddl = sd.as_sddl(self.sd_utils.domain_sid)
     1877        self.assertTrue("O:" in sddl)
     1878        self.assertTrue("G:" in sddl)
     1879        self.assertTrue("D:" in sddl)
     1880        self.assertTrue("S:" in sddl)
     1881
     1882        res = self.ldb_admin.search(self.base_dn, SCOPE_BASE, None,
     1883                ["*"], controls=["sd_flags:1:%d" % (sd_flags)])
     1884        self.assertTrue("nTSecurityDescriptor" in res[0])
     1885        tmp = res[0]["nTSecurityDescriptor"][0]
     1886        sd = ndr_unpack(security.descriptor, tmp)
     1887        sddl = sd.as_sddl(self.sd_utils.domain_sid)
     1888        self.assertTrue("O:" in sddl)
     1889        self.assertTrue("G:" in sddl)
     1890        self.assertTrue("D:" in sddl)
     1891        self.assertTrue("S:" in sddl)
     1892
     1893        res = self.ldb_admin.search(self.base_dn, SCOPE_BASE, None,
     1894                ["nTSecurityDescriptor", "*"], controls=["sd_flags:1:%d" % (sd_flags)])
     1895        self.assertTrue("nTSecurityDescriptor" in res[0])
     1896        tmp = res[0]["nTSecurityDescriptor"][0]
     1897        sd = ndr_unpack(security.descriptor, tmp)
     1898        sddl = sd.as_sddl(self.sd_utils.domain_sid)
     1899        self.assertTrue("O:" in sddl)
     1900        self.assertTrue("G:" in sddl)
     1901        self.assertTrue("D:" in sddl)
     1902        self.assertTrue("S:" in sddl)
     1903
     1904        res = self.ldb_admin.search(self.base_dn, SCOPE_BASE, None,
     1905                ["*", "nTSecurityDescriptor"], controls=["sd_flags:1:%d" % (sd_flags)])
     1906        self.assertTrue("nTSecurityDescriptor" in res[0])
     1907        tmp = res[0]["nTSecurityDescriptor"][0]
     1908        sd = ndr_unpack(security.descriptor, tmp)
     1909        sddl = sd.as_sddl(self.sd_utils.domain_sid)
     1910        self.assertTrue("O:" in sddl)
     1911        self.assertTrue("G:" in sddl)
     1912        self.assertTrue("D:" in sddl)
     1913        self.assertTrue("S:" in sddl)
     1914
     1915        res = self.ldb_admin.search(self.base_dn, SCOPE_BASE, None,
     1916                ["nTSecurityDescriptor", "name"], controls=["sd_flags:1:%d" % (sd_flags)])
     1917        self.assertTrue("nTSecurityDescriptor" in res[0])
     1918        tmp = res[0]["nTSecurityDescriptor"][0]
     1919        sd = ndr_unpack(security.descriptor, tmp)
     1920        sddl = sd.as_sddl(self.sd_utils.domain_sid)
     1921        self.assertTrue("O:" in sddl)
     1922        self.assertTrue("G:" in sddl)
     1923        self.assertTrue("D:" in sddl)
     1924        self.assertTrue("S:" in sddl)
     1925
     1926        res = self.ldb_admin.search(self.base_dn, SCOPE_BASE, None,
     1927                ["name", "nTSecurityDescriptor"], controls=["sd_flags:1:%d" % (sd_flags)])
     1928        self.assertTrue("nTSecurityDescriptor" in res[0])
     1929        tmp = res[0]["nTSecurityDescriptor"][0]
     1930        sd = ndr_unpack(security.descriptor, tmp)
     1931        sddl = sd.as_sddl(self.sd_utils.domain_sid)
     1932        self.assertTrue("O:" in sddl)
     1933        self.assertTrue("G:" in sddl)
     1934        self.assertTrue("D:" in sddl)
     1935        self.assertTrue("S:" in sddl)
     1936
     1937        res = self.ldb_admin.search(self.base_dn, SCOPE_BASE, None,
     1938                ["nTSecurityDescriptor"], controls=None)
     1939        self.assertTrue("nTSecurityDescriptor" in res[0])
     1940        tmp = res[0]["nTSecurityDescriptor"][0]
     1941        sd = ndr_unpack(security.descriptor, tmp)
     1942        sddl = sd.as_sddl(self.sd_utils.domain_sid)
     1943        self.assertTrue("O:" in sddl)
     1944        self.assertTrue("G:" in sddl)
     1945        self.assertTrue("D:" in sddl)
     1946        self.assertTrue("S:" in sddl)
     1947
     1948        res = self.ldb_admin.search(self.base_dn, SCOPE_BASE, None,
     1949                ["name", "nTSecurityDescriptor"], controls=None)
     1950        self.assertTrue("nTSecurityDescriptor" in res[0])
     1951        tmp = res[0]["nTSecurityDescriptor"][0]
     1952        sd = ndr_unpack(security.descriptor, tmp)
     1953        sddl = sd.as_sddl(self.sd_utils.domain_sid)
     1954        self.assertTrue("O:" in sddl)
     1955        self.assertTrue("G:" in sddl)
     1956        self.assertTrue("D:" in sddl)
     1957        self.assertTrue("S:" in sddl)
     1958
     1959        res = self.ldb_admin.search(self.base_dn, SCOPE_BASE, None,
     1960                ["nTSecurityDescriptor", "name"], controls=None)
     1961        self.assertTrue("nTSecurityDescriptor" in res[0])
     1962        tmp = res[0]["nTSecurityDescriptor"][0]
     1963        sd = ndr_unpack(security.descriptor, tmp)
     1964        sddl = sd.as_sddl(self.sd_utils.domain_sid)
     1965        self.assertTrue("O:" in sddl)
     1966        self.assertTrue("G:" in sddl)
     1967        self.assertTrue("D:" in sddl)
     1968        self.assertTrue("S:" in sddl)
     1969
     1970    def test_312(self):
     1971        """This search is done by the windows dc join..."""
     1972
     1973        res = self.ldb_admin.search(self.base_dn, SCOPE_BASE, None, ["1.1"],
     1974                controls=["extended_dn:1:0", "sd_flags:1:0", "search_options:1:1"])
     1975        self.assertFalse("nTSecurityDescriptor" in res[0])
    18501976
    18511977class RightsAttributesTests(DescriptorTests):
     
    18651991        self.ldb_admin.newuser("testuser_attr2", "samba123@")
    18661992        self.ldb_admin.add_remove_group_members("Domain Admins",
    1867                                                 "testuser_attr2",
     1993                                                ["testuser_attr2"],
    18681994                                                add_members_operation=True)
    18691995
     
    19602086        self.assertTrue("managedBy" in res[0]["allowedAttributesEffective"])
    19612087
     2088class SdAutoInheritTests(DescriptorTests):
     2089    def deleteAll(self):
     2090        delete_force(self.ldb_admin, self.sub_dn)
     2091        delete_force(self.ldb_admin, self.ou_dn)
     2092
     2093    def setUp(self):
     2094        super(SdAutoInheritTests, self).setUp()
     2095        self.ou_dn = "OU=test_SdAutoInherit_ou," + self.base_dn
     2096        self.sub_dn = "OU=test_sub," + self.ou_dn
     2097        self.deleteAll()
     2098
     2099    def test_301(self):
     2100        """ Modify a descriptor with OWNER_SECURITY_INFORMATION set.
     2101            See that only the owner has been changed.
     2102        """
     2103        attrs = ["nTSecurityDescriptor", "replPropertyMetaData", "uSNChanged"]
     2104        controls=["sd_flags:1:%d" % (SECINFO_DACL)]
     2105        ace = "(A;CI;CC;;;NU)"
     2106        sub_ace = "(A;CIID;CC;;;NU)"
     2107        sd_sddl = "O:BAG:BAD:P(A;CI;0x000f01ff;;;AU)"
     2108        sd = security.descriptor.from_sddl(sd_sddl, self.domain_sid)
     2109
     2110        self.ldb_admin.create_ou(self.ou_dn,sd=sd)
     2111        self.ldb_admin.create_ou(self.sub_dn)
     2112
     2113        ou_res0 = self.sd_utils.ldb.search(self.ou_dn, SCOPE_BASE,
     2114                                           None, attrs, controls=controls)
     2115        sub_res0 = self.sd_utils.ldb.search(self.sub_dn, SCOPE_BASE,
     2116                                            None, attrs, controls=controls)
     2117
     2118        ou_sd0 = ndr_unpack(security.descriptor, ou_res0[0]["nTSecurityDescriptor"][0])
     2119        sub_sd0 = ndr_unpack(security.descriptor, sub_res0[0]["nTSecurityDescriptor"][0])
     2120
     2121        ou_sddl0 = ou_sd0.as_sddl(self.domain_sid)
     2122        sub_sddl0 = sub_sd0.as_sddl(self.domain_sid)
     2123
     2124        self.assertFalse(ace in ou_sddl0)
     2125        self.assertFalse(ace in sub_sddl0)
     2126
     2127        ou_sddl1 = (ou_sddl0[:ou_sddl0.index("(")] + ace +
     2128                    ou_sddl0[ou_sddl0.index("("):])
     2129
     2130        sub_sddl1 = (sub_sddl0[:sub_sddl0.index("(")] + ace +
     2131                     sub_sddl0[sub_sddl0.index("("):])
     2132
     2133        self.sd_utils.modify_sd_on_dn(self.ou_dn, ou_sddl1, controls=controls)
     2134
     2135        sub_res2 = self.sd_utils.ldb.search(self.sub_dn, SCOPE_BASE,
     2136                                            None, attrs, controls=controls)
     2137        ou_res2 = self.sd_utils.ldb.search(self.ou_dn, SCOPE_BASE,
     2138                                           None, attrs, controls=controls)
     2139
     2140        ou_sd2 = ndr_unpack(security.descriptor, ou_res2[0]["nTSecurityDescriptor"][0])
     2141        sub_sd2 = ndr_unpack(security.descriptor, sub_res2[0]["nTSecurityDescriptor"][0])
     2142
     2143        ou_sddl2 = ou_sd2.as_sddl(self.domain_sid)
     2144        sub_sddl2 = sub_sd2.as_sddl(self.domain_sid)
     2145
     2146        self.assertFalse(ou_sddl2 == ou_sddl0)
     2147        self.assertFalse(sub_sddl2 == sub_sddl0)
     2148
     2149        if ace not in ou_sddl2:
     2150            print "ou0: %s" % ou_sddl0
     2151            print "ou2: %s" % ou_sddl2
     2152
     2153        if sub_ace not in sub_sddl2:
     2154            print "sub0: %s" % sub_sddl0
     2155            print "sub2: %s" % sub_sddl2
     2156
     2157        self.assertTrue(ace in ou_sddl2)
     2158        self.assertTrue(sub_ace in sub_sddl2)
     2159
     2160        ou_usn0 = int(ou_res0[0]["uSNChanged"][0])
     2161        ou_usn2 = int(ou_res2[0]["uSNChanged"][0])
     2162        self.assertTrue(ou_usn2 > ou_usn0)
     2163
     2164        sub_usn0 = int(sub_res0[0]["uSNChanged"][0])
     2165        sub_usn2 = int(sub_res2[0]["uSNChanged"][0])
     2166        self.assertTrue(sub_usn2 == sub_usn0)
     2167
    19622168if not "://" in host:
    19632169    if os.path.isfile(host):
     
    19702176    ldb_options = ["modules:paged_searches"]
    19712177
    1972 ldb = SamDB(host,
    1973             credentials=creds,
    1974             session_info=system_session(lp),
    1975             lp=lp,
    1976             options=ldb_options)
    1977 
    1978 runner = SubunitTestRunner()
    1979 rc = 0
    1980 if not runner.run(unittest.makeSuite(OwnerGroupDescriptorTests)).wasSuccessful():
    1981     rc = 1
    1982 if not runner.run(unittest.makeSuite(DaclDescriptorTests)).wasSuccessful():
    1983     rc = 1
    1984 if not runner.run(unittest.makeSuite(SdFlagsDescriptorTests)).wasSuccessful():
    1985     rc = 1
    1986 if not runner.run(unittest.makeSuite(RightsAttributesTests)).wasSuccessful():
    1987     rc = 1
    1988 sys.exit(rc)
     2178TestProgram(module=__name__, opts=subunitopts)
  • vendor/current/source4/dsdb/tests/python/token_group.py

    r740 r988  
    99sys.path.insert(0, "bin/python")
    1010import samba
    11 samba.ensure_external_module("testtools", "testtools")
    12 samba.ensure_external_module("subunit", "subunit/python")
     11
     12from samba.tests.subunitrun import SubunitOptions, TestProgram
    1313
    1414import samba.getopt as options
    1515
    1616from samba.auth import system_session
    17 from samba import ldb
     17from samba import ldb, dsdb
    1818from samba.samdb import SamDB
    1919from samba.auth import AuthContext
    20 from samba.ndr import ndr_pack, ndr_unpack
     20from samba.ndr import ndr_unpack
    2121from samba import gensec
    22 from samba.credentials import Credentials
    23 
    24 from subunit.run import SubunitTestRunner
    25 import unittest
     22from samba.credentials import Credentials, DONT_USE_KERBEROS
     23from samba.dsdb import GTYPE_SECURITY_GLOBAL_GROUP, GTYPE_SECURITY_UNIVERSAL_GROUP
    2624import samba.tests
    27 
    28 from samba.dcerpc import security
     25from samba.tests import delete_force
     26
    2927from samba.auth import AUTH_SESSION_INFO_DEFAULT_GROUPS, AUTH_SESSION_INFO_AUTHENTICATED, AUTH_SESSION_INFO_SIMPLE_PRIVILEGES
    3028
     
    3735credopts = options.CredentialsOptions(parser)
    3836parser.add_option_group(credopts)
     37subunitopts = SubunitOptions(parser)
     38parser.add_option_group(subunitopts)
    3939opts, args = parser.parse_args()
    4040
     
    4747lp = sambaopts.get_loadparm()
    4848creds = credopts.get_credentials(lp)
    49 
    50 class TokenTest(samba.tests.TestCase):
     49creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
     50
     51def closure(vSet, wSet, aSet):
     52    for edge in aSet:
     53        start, end = edge
     54        if start in wSet:
     55            if end not in wSet and end in vSet:
     56                wSet.add(end)
     57                closure(vSet, wSet, aSet)
     58
     59class StaticTokenTest(samba.tests.TestCase):
    5160
    5261    def setUp(self):
    53         super(TokenTest, self).setUp()
    54         self.ldb = samdb
    55         self.base_dn = samdb.domain_dn()
     62        super(StaticTokenTest, self).setUp()
     63        self.ldb = SamDB(url, credentials=creds, session_info=system_session(lp), lp=lp)
     64        self.base_dn = self.ldb.domain_dn()
    5665
    5766        res = self.ldb.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
     
    7988        self.assertEquals(len(res), 1)
    8089
    81         print("Geting tokenGroups from rootDSE")
     90        print("Getting tokenGroups from rootDSE")
    8291        tokengroups = []
    8392        for sid in res[0]['tokenGroups']:
     
    8998            print("token sids don't match")
    9099            print("tokengroups: %s" % tokengroups)
    91             print("calculated : %s" % self.user_sids);
     100            print("calculated : %s" % self.user_sids)
    92101            print("difference : %s" % sidset1.difference(sidset2))
    93102            self.fail(msg="calculated groups don't match against rootDSE tokenGroups")
    94103
    95104    def test_dn_tokenGroups(self):
    96         print("Geting tokenGroups from user DN")
     105        print("Getting tokenGroups from user DN")
    97106        res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
    98107        self.assertEquals(len(res), 1)
     
    106115        if len(sidset1.difference(sidset2)):
    107116            print("token sids don't match")
    108             print("tokengroups: %s" % tokengroups)
    109             print("calculated : %s" % sids);
    110117            print("difference : %s" % sidset1.difference(sidset2))
    111118            self.fail(msg="calculated groups don't match against user DN tokenGroups")
    112        
     119
    113120    def test_pac_groups(self):
    114121        settings = {}
     
    135142        server_finished = False
    136143        server_to_client = ""
    137        
    138         """Run the actual call loop"""
     144
     145        # Run the actual call loop.
    139146        while client_finished == False and server_finished == False:
    140147            if not client_finished:
     
    156163        if len(sidset1.difference(sidset2)):
    157164            print("token sids don't match")
     165            print("difference : %s" % sidset1.difference(sidset2))
     166            self.fail(msg="calculated groups don't match against user PAC tokenGroups")
     167
     168class DynamicTokenTest(samba.tests.TestCase):
     169
     170    def get_creds(self, target_username, target_password):
     171        creds_tmp = Credentials()
     172        creds_tmp.set_username(target_username)
     173        creds_tmp.set_password(target_password)
     174        creds_tmp.set_domain(creds.get_domain())
     175        creds_tmp.set_realm(creds.get_realm())
     176        creds_tmp.set_workstation(creds.get_workstation())
     177        creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
     178                                      | gensec.FEATURE_SEAL)
     179        return creds_tmp
     180
     181    def get_ldb_connection(self, target_username, target_password):
     182        creds_tmp = self.get_creds(target_username, target_password)
     183        ldb_target = SamDB(url=url, credentials=creds_tmp, lp=lp)
     184        return ldb_target
     185
     186    def setUp(self):
     187        super(DynamicTokenTest, self).setUp()
     188        self.admin_ldb = SamDB(url, credentials=creds, session_info=system_session(lp), lp=lp)
     189
     190        self.base_dn = self.admin_ldb.domain_dn()
     191
     192        self.test_user = "tokengroups_user1"
     193        self.test_user_pass = "samba123@"
     194        self.admin_ldb.newuser(self.test_user, self.test_user_pass)
     195        self.test_group0 = "tokengroups_group0"
     196        self.admin_ldb.newgroup(self.test_group0, grouptype=dsdb.GTYPE_SECURITY_DOMAIN_LOCAL_GROUP)
     197        res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group0, self.base_dn),
     198                                    attrs=["objectSid"], scope=ldb.SCOPE_BASE)
     199        self.test_group0_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0])
     200
     201        self.admin_ldb.add_remove_group_members(self.test_group0, [self.test_user],
     202                                       add_members_operation=True)
     203
     204        self.test_group1 = "tokengroups_group1"
     205        self.admin_ldb.newgroup(self.test_group1, grouptype=dsdb.GTYPE_SECURITY_GLOBAL_GROUP)
     206        res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group1, self.base_dn),
     207                                    attrs=["objectSid"], scope=ldb.SCOPE_BASE)
     208        self.test_group1_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0])
     209
     210        self.admin_ldb.add_remove_group_members(self.test_group1, [self.test_user],
     211                                       add_members_operation=True)
     212
     213        self.test_group2 = "tokengroups_group2"
     214        self.admin_ldb.newgroup(self.test_group2, grouptype=dsdb.GTYPE_SECURITY_UNIVERSAL_GROUP)
     215
     216        res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group2, self.base_dn),
     217                                    attrs=["objectSid"], scope=ldb.SCOPE_BASE)
     218        self.test_group2_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0])
     219
     220        self.admin_ldb.add_remove_group_members(self.test_group2, [self.test_user],
     221                                       add_members_operation=True)
     222
     223        self.ldb = self.get_ldb_connection(self.test_user, self.test_user_pass)
     224
     225        res = self.ldb.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
     226        self.assertEquals(len(res), 1)
     227
     228        self.user_sid_dn = "<SID=%s>" % str(ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["tokenGroups"][0]))
     229
     230        res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=[])
     231        self.assertEquals(len(res), 1)
     232
     233        self.test_user_dn = res[0].dn
     234
     235        session_info_flags = ( AUTH_SESSION_INFO_DEFAULT_GROUPS |
     236                               AUTH_SESSION_INFO_AUTHENTICATED |
     237                               AUTH_SESSION_INFO_SIMPLE_PRIVILEGES)
     238        session = samba.auth.user_session(self.ldb, lp_ctx=lp, dn=self.user_sid_dn,
     239                                          session_info_flags=session_info_flags)
     240
     241        token = session.security_token
     242        self.user_sids = []
     243        for s in token.sids:
     244            self.user_sids.append(str(s))
     245
     246    def tearDown(self):
     247        super(DynamicTokenTest, self).tearDown()
     248        delete_force(self.admin_ldb, "CN=%s,%s,%s" %
     249                          (self.test_user, "cn=users", self.base_dn))
     250        delete_force(self.admin_ldb, "CN=%s,%s,%s" %
     251                          (self.test_group0, "cn=users", self.base_dn))
     252        delete_force(self.admin_ldb, "CN=%s,%s,%s" %
     253                          (self.test_group1, "cn=users", self.base_dn))
     254        delete_force(self.admin_ldb, "CN=%s,%s,%s" %
     255                          (self.test_group2, "cn=users", self.base_dn))
     256
     257    def test_rootDSE_tokenGroups(self):
     258        """Testing rootDSE tokengroups against internal calculation"""
     259        if not url.startswith("ldap"):
     260            self.fail(msg="This test is only valid on ldap")
     261
     262        res = self.ldb.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
     263        self.assertEquals(len(res), 1)
     264
     265        print("Getting tokenGroups from rootDSE")
     266        tokengroups = []
     267        for sid in res[0]['tokenGroups']:
     268            tokengroups.append(str(ndr_unpack(samba.dcerpc.security.dom_sid, sid)))
     269
     270        sidset1 = set(tokengroups)
     271        sidset2 = set(self.user_sids)
     272        if len(sidset1.difference(sidset2)):
     273            print("token sids don't match")
    158274            print("tokengroups: %s" % tokengroups)
    159             print("calculated : %s" % sids);
     275            print("calculated : %s" % self.user_sids)
     276            print("difference : %s" % sidset1.difference(sidset2))
     277            self.fail(msg="calculated groups don't match against rootDSE tokenGroups")
     278
     279    def test_dn_tokenGroups(self):
     280        print("Getting tokenGroups from user DN")
     281        res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
     282        self.assertEquals(len(res), 1)
     283
     284        dn_tokengroups = []
     285        for sid in res[0]['tokenGroups']:
     286            dn_tokengroups.append(str(ndr_unpack(samba.dcerpc.security.dom_sid, sid)))
     287
     288        sidset1 = set(dn_tokengroups)
     289        sidset2 = set(self.user_sids)
     290        if len(sidset1.difference(sidset2)):
     291            print("token sids don't match")
     292            print("difference : %s" % sidset1.difference(sidset2))
     293            self.fail(msg="calculated groups don't match against user DN tokenGroups")
     294
     295    def test_pac_groups(self):
     296        settings = {}
     297        settings["lp_ctx"] = lp
     298        settings["target_hostname"] = lp.get("netbios name")
     299
     300        gensec_client = gensec.Security.start_client(settings)
     301        gensec_client.set_credentials(self.get_creds(self.test_user, self.test_user_pass))
     302        gensec_client.want_feature(gensec.FEATURE_SEAL)
     303        gensec_client.start_mech_by_sasl_name("GSSAPI")
     304
     305        auth_context = AuthContext(lp_ctx=lp, ldb=self.ldb, methods=[])
     306
     307        gensec_server = gensec.Security.start_server(settings, auth_context)
     308        machine_creds = Credentials()
     309        machine_creds.guess(lp)
     310        machine_creds.set_machine_account(lp)
     311        gensec_server.set_credentials(machine_creds)
     312
     313        gensec_server.want_feature(gensec.FEATURE_SEAL)
     314        gensec_server.start_mech_by_sasl_name("GSSAPI")
     315
     316        client_finished = False
     317        server_finished = False
     318        server_to_client = ""
     319
     320        # Run the actual call loop.
     321        while client_finished == False and server_finished == False:
     322            if not client_finished:
     323                print "running client gensec_update"
     324                (client_finished, client_to_server) = gensec_client.update(server_to_client)
     325            if not server_finished:
     326                print "running server gensec_update"
     327                (server_finished, server_to_client) = gensec_server.update(client_to_server)
     328
     329        session = gensec_server.session_info()
     330
     331        token = session.security_token
     332        pac_sids = []
     333        for s in token.sids:
     334            pac_sids.append(str(s))
     335
     336        sidset1 = set(pac_sids)
     337        sidset2 = set(self.user_sids)
     338        if len(sidset1.difference(sidset2)):
     339            print("token sids don't match")
    160340            print("difference : %s" % sidset1.difference(sidset2))
    161341            self.fail(msg="calculated groups don't match against user PAC tokenGroups")
    162342
     343
     344    def test_tokenGroups_manual(self):
     345        # Manually run the tokenGroups algorithm from MS-ADTS 3.1.1.4.5.19 and MS-DRSR 4.1.8.3
     346        # and compare the result
     347        res = self.admin_ldb.search(base=self.base_dn, scope=ldb.SCOPE_SUBTREE,
     348                                    expression="(|(objectclass=user)(objectclass=group))",
     349                                    attrs=["memberOf"])
     350        aSet = set()
     351        aSetR = set()
     352        vSet = set()
     353        for obj in res:
     354            if "memberOf" in obj:
     355                for dn in obj["memberOf"]:
     356                    first = obj.dn.get_casefold()
     357                    second = ldb.Dn(self.admin_ldb, dn).get_casefold()
     358                    aSet.add((first, second))
     359                    aSetR.add((second, first))
     360                    vSet.add(first)
     361                    vSet.add(second)
     362
     363        res = self.admin_ldb.search(base=self.base_dn, scope=ldb.SCOPE_SUBTREE,
     364                                    expression="(objectclass=user)",
     365                                    attrs=["primaryGroupID"])
     366        for obj in res:
     367            if "primaryGroupID" in obj:
     368                sid = "%s-%d" % (self.admin_ldb.get_domain_sid(), int(obj["primaryGroupID"][0]))
     369                res2 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE,
     370                                             attrs=[])
     371                first = obj.dn.get_casefold()
     372                second = res2[0].dn.get_casefold()
     373
     374                aSet.add((first, second))
     375                aSetR.add((second, first))
     376                vSet.add(first)
     377                vSet.add(second)
     378
     379        wSet = set()
     380        wSet.add(self.test_user_dn.get_casefold())
     381        closure(vSet, wSet, aSet)
     382        wSet.remove(self.test_user_dn.get_casefold())
     383
     384        tokenGroupsSet = set()
     385
     386        res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
     387        self.assertEquals(len(res), 1)
     388
     389        dn_tokengroups = []
     390        for sid in res[0]['tokenGroups']:
     391            sid = ndr_unpack(samba.dcerpc.security.dom_sid, sid)
     392            res3 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE,
     393                                         attrs=[])
     394            tokenGroupsSet.add(res3[0].dn.get_casefold())
     395
     396        if len(wSet.difference(tokenGroupsSet)):
     397            self.fail(msg="additional calculated: %s" % wSet.difference(tokenGroupsSet))
     398
     399        if len(tokenGroupsSet.difference(wSet)):
     400            self.fail(msg="additional tokenGroups: %s" % tokenGroupsSet.difference(wSet))
     401
     402
     403    def filtered_closure(self, wSet, filter_grouptype):
     404        res = self.admin_ldb.search(base=self.base_dn, scope=ldb.SCOPE_SUBTREE,
     405                                    expression="(|(objectclass=user)(objectclass=group))",
     406                                    attrs=["memberOf"])
     407        aSet = set()
     408        aSetR = set()
     409        vSet = set()
     410        for obj in res:
     411            vSet.add(obj.dn.get_casefold())
     412            if "memberOf" in obj:
     413                for dn in obj["memberOf"]:
     414                    first = obj.dn.get_casefold()
     415                    second = ldb.Dn(self.admin_ldb, dn).get_casefold()
     416                    aSet.add((first, second))
     417                    aSetR.add((second, first))
     418                    vSet.add(first)
     419                    vSet.add(second)
     420
     421        res = self.admin_ldb.search(base=self.base_dn, scope=ldb.SCOPE_SUBTREE,
     422                                    expression="(objectclass=user)",
     423                                    attrs=["primaryGroupID"])
     424        for obj in res:
     425            if "primaryGroupID" in obj:
     426                sid = "%s-%d" % (self.admin_ldb.get_domain_sid(), int(obj["primaryGroupID"][0]))
     427                res2 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE,
     428                                             attrs=[])
     429                first = obj.dn.get_casefold()
     430                second = res2[0].dn.get_casefold()
     431
     432                aSet.add((first, second))
     433                aSetR.add((second, first))
     434                vSet.add(first)
     435                vSet.add(second)
     436
     437        uSet = set()
     438        for v in vSet:
     439            res_group = self.admin_ldb.search(base=v, scope=ldb.SCOPE_BASE,
     440                                              attrs=["groupType"],
     441                                              expression="objectClass=group")
     442            if len(res_group) == 1:
     443                if hex(int(res_group[0]["groupType"][0]) & 0x00000000FFFFFFFF) == hex(filter_grouptype):
     444                    uSet.add(v)
     445            else:
     446                uSet.add(v)
     447
     448        closure(uSet, wSet, aSet)
     449
     450
     451    def test_tokenGroupsGlobalAndUniversal_manual(self):
     452        # Manually run the tokenGroups algorithm from MS-ADTS 3.1.1.4.5.19 and MS-DRSR 4.1.8.3
     453        # and compare the result
     454
     455        # The variable names come from MS-ADTS May 15, 2014
     456
     457        S = set()
     458        S.add(self.test_user_dn.get_casefold())
     459
     460        self.filtered_closure(S, GTYPE_SECURITY_GLOBAL_GROUP)
     461
     462        T = set()
     463        # Not really a SID, we do this on DNs...
     464        for sid in S:
     465            X = set()
     466            X.add(sid)
     467            self.filtered_closure(X, GTYPE_SECURITY_UNIVERSAL_GROUP)
     468
     469            T = T.union(X)
     470
     471        T.remove(self.test_user_dn.get_casefold())
     472
     473        tokenGroupsSet = set()
     474
     475        res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["tokenGroupsGlobalAndUniversal"])
     476        self.assertEquals(len(res), 1)
     477
     478        dn_tokengroups = []
     479        for sid in res[0]['tokenGroupsGlobalAndUniversal']:
     480            sid = ndr_unpack(samba.dcerpc.security.dom_sid, sid)
     481            res3 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE,
     482                                         attrs=[])
     483            tokenGroupsSet.add(res3[0].dn.get_casefold())
     484
     485        if len(T.difference(tokenGroupsSet)):
     486            self.fail(msg="additional calculated: %s" % T.difference(tokenGroupsSet))
     487
     488        if len(tokenGroupsSet.difference(T)):
     489            self.fail(msg="additional tokenGroupsGlobalAndUniversal: %s" % tokenGroupsSet.difference(T))
    163490
    164491if not "://" in url:
     
    168495        url = "ldap://%s" % url
    169496
    170 samdb = SamDB(url, credentials=creds, session_info=system_session(lp), lp=lp)
    171 
    172 runner = SubunitTestRunner()
    173 rc = 0
    174 if not runner.run(unittest.makeSuite(TokenTest)).wasSuccessful():
    175     rc = 1
    176 sys.exit(rc)
     497TestProgram(module=__name__, opts=subunitopts)
  • vendor/current/source4/dsdb/tests/python/urgent_replication.py

    r740 r988  
    44import optparse
    55import sys
    6 import os
    7 
    86sys.path.insert(0, "bin/python")
    97import samba
    10 samba.ensure_external_module("testtools", "testtools")
    11 samba.ensure_external_module("subunit", "subunit/python")
    12 
    13 import samba.getopt as options
    14 
    15 from samba.auth import system_session
     8from samba.tests.subunitrun import TestProgram, SubunitOptions
     9
    1610from ldb import (LdbError, ERR_NO_SUCH_OBJECT, Message,
    1711    MessageElement, Dn, FLAG_MOD_REPLACE)
    18 from samba.samdb import SamDB
    1912import samba.tests
    2013import samba.dsdb as dsdb
    21 
    22 from subunit.run import SubunitTestRunner
    23 import unittest
     14import samba.getopt as options
    2415
    2516parser = optparse.OptionParser("urgent_replication.py [options] <host>")
     
    2718parser.add_option_group(sambaopts)
    2819parser.add_option_group(options.VersionOptions(parser))
     20
    2921# use command line creds if available
    3022credopts = options.CredentialsOptions(parser)
    3123parser.add_option_group(credopts)
     24subunitopts = SubunitOptions(parser)
     25parser.add_option_group(subunitopts)
    3226opts, args = parser.parse_args()
    3327
     
    3832host = args[0]
    3933
    40 lp = sambaopts.get_loadparm()
    41 creds = credopts.get_credentials(lp)
    4234
    4335class UrgentReplicationTests(samba.tests.TestCase):
     
    5143    def setUp(self):
    5244        super(UrgentReplicationTests, self).setUp()
    53         self.ldb = ldb
    54         self.base_dn = ldb.domain_dn()
     45        self.ldb = samba.tests.connect_samdb(host, global_schema=False)
     46        self.base_dn = self.ldb.domain_dn()
    5547
    5648        print "baseDN: %s\n" % self.base_dn
    5749
    5850    def test_nonurgent_object(self):
    59         """Test if the urgent replication is not activated
    60            when handling a non urgent object"""
     51        """Test if the urgent replication is not activated when handling a non urgent object."""
    6152        self.ldb.add({
    6253            "dn": "cn=nonurgenttest,cn=users," + self.base_dn,
     
    6556            "description":"nonurgenttest description"})
    6657
    67         # urgent replication should not be enabled when creating 
     58        # urgent replication should not be enabled when creating
    6859        res = self.ldb.load_partition_usn(self.base_dn)
    6960        self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
     
    7162        # urgent replication should not be enabled when modifying
    7263        m = Message()
    73         m.dn = Dn(ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
     64        m.dn = Dn(self.ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
    7465        m["description"] = MessageElement("new description", FLAG_MOD_REPLACE,
    7566          "description")
    76         ldb.modify(m)
     67        self.ldb.modify(m)
    7768        res = self.ldb.load_partition_usn(self.base_dn)
    7869        self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
     
    8374        self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
    8475
    85 
    8676    def test_nTDSDSA_object(self):
    87         '''Test if the urgent replication is activated
    88            when handling a nTDSDSA object'''
    89         self.ldb.add({
    90             "dn": "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn,
     77        """Test if the urgent replication is activated when handling a nTDSDSA object."""
     78        self.ldb.add({
     79            "dn": "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,%s" %
     80                self.ldb.get_config_basedn(),
    9181            "objectclass":"server",
    9282            "cn":"test server",
     
    10898        # urgent replication should NOT be enabled when modifying
    10999        m = Message()
    110         m.dn = Dn(ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
     100        m.dn = Dn(self.ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
    111101        m["options"] = MessageElement("0", FLAG_MOD_REPLACE,
    112102          "options")
    113         ldb.modify(m)
     103        self.ldb.modify(m)
    114104        res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
    115105        self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
     
    122112        self.delete_force(self.ldb, "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
    123113
    124 
    125114    def test_crossRef_object(self):
    126         '''Test if the urgent replication is activated
    127            when handling a crossRef object'''
     115        """Test if the urgent replication is activated when handling a crossRef object."""
    128116        self.ldb.add({
    129117                      "dn": "CN=test crossRef,CN=Partitions,CN=Configuration,"+ self.base_dn,
    130118                      "objectClass": "crossRef",
    131119                      "cn": "test crossRef",
    132                       "dnsRoot": lp.get("realm").lower(),
     120                      "dnsRoot": self.get_loadparm().get("realm").lower(),
    133121                      "instanceType": "4",
    134122                      "nCName": self.base_dn,
     
    143131        # urgent replication should NOT be enabled when modifying
    144132        m = Message()
    145         m.dn = Dn(ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
     133        m.dn = Dn(self.ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
    146134        m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
    147135          "systemFlags")
    148         ldb.modify(m)
     136        self.ldb.modify(m)
    149137        res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
    150138        self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
     
    156144        self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
    157145
    158 
    159 
    160146    def test_attributeSchema_object(self):
    161         '''Test if the urgent replication is activated
    162            when handling an attributeSchema object'''
     147        """Test if the urgent replication is activated when handling an attributeSchema object"""
    163148
    164149        try:
     
    187172            print "Not testing urgent replication when creating attributeSchema object ...\n"
    188173
    189         # urgent replication should be enabled when modifying 
    190         m = Message()
    191         m.dn = Dn(ldb, "CN=test attributeSchema,CN=Schema,CN=Configuration," + self.base_dn)
     174        # urgent replication should be enabled when modifying
     175        m = Message()
     176        m.dn = Dn(self.ldb, "CN=test attributeSchema,CN=Schema,CN=Configuration," + self.base_dn)
    192177        m["lDAPDisplayName"] = MessageElement("updated test attributeSchema", FLAG_MOD_REPLACE,
    193178          "lDAPDisplayName")
    194         ldb.modify(m)
     179        self.ldb.modify(m)
    195180        res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
    196181        self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
    197182
    198 
    199183    def test_classSchema_object(self):
    200         '''Test if the urgent replication is activated
    201            when handling a classSchema object'''
     184        """Test if the urgent replication is activated when handling a classSchema object."""
    202185        try:
    203186            self.ldb.add_ldif(
     
    232215        # urgent replication should be enabled when modifying
    233216        m = Message()
    234         m.dn = Dn(ldb, "CN=test classSchema,CN=Schema,CN=Configuration," + self.base_dn)
     217        m.dn = Dn(self.ldb, "CN=test classSchema,CN=Schema,CN=Configuration," + self.base_dn)
    235218        m["lDAPDisplayName"] = MessageElement("updated test classSchema", FLAG_MOD_REPLACE,
    236219          "lDAPDisplayName")
    237         ldb.modify(m)
     220        self.ldb.modify(m)
    238221        res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
    239222        self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
    240223
    241 
    242224    def test_secret_object(self):
    243         '''Test if the urgent replication is activated
    244            when handling a secret object'''
     225        """Test if the urgent replication is activated when handling a secret object."""
    245226
    246227        self.ldb.add({
     
    257238        # urgent replication should be enabled when modifying
    258239        m = Message()
    259         m.dn = Dn(ldb, "cn=test secret,cn=System," + self.base_dn)
     240        m.dn = Dn(self.ldb, "cn=test secret,cn=System," + self.base_dn)
    260241        m["currentValue"] = MessageElement("yyyyyyyy", FLAG_MOD_REPLACE,
    261242          "currentValue")
    262         ldb.modify(m)
    263         res = self.ldb.load_partition_usn(self.base_dn)
    264         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
    265 
    266         # urgent replication should NOT be enabled when deleting 
     243        self.ldb.modify(m)
     244        res = self.ldb.load_partition_usn(self.base_dn)
     245        self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
     246
     247        # urgent replication should NOT be enabled when deleting
    267248        self.delete_force(self.ldb, "cn=test secret,cn=System," + self.base_dn)
    268249        res = self.ldb.load_partition_usn(self.base_dn)
    269250        self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
    270251
    271 
    272252    def test_rIDManager_object(self):
    273         '''Test if the urgent replication is activated
    274             when handling a rIDManager object'''
     253        """Test if the urgent replication is activated when handling a rIDManager object."""
    275254        self.ldb.add_ldif(
    276255            """dn: CN=RID Manager test,CN=System,%s""" % self.base_dn + """
     
    290269        # urgent replication should be enabled when modifying
    291270        m = Message()
    292         m.dn = Dn(ldb, "CN=RID Manager test,CN=System," + self.base_dn)
     271        m.dn = Dn(self.ldb, "CN=RID Manager test,CN=System," + self.base_dn)
    293272        m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
    294273          "systemFlags")
    295         ldb.modify(m)
     274        self.ldb.modify(m)
    296275        res = self.ldb.load_partition_usn(self.base_dn)
    297276        self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
     
    302281        self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
    303282
    304 
    305283    def test_urgent_attributes(self):
    306         '''Test if the urgent replication is activated
    307             when handling urgent attributes of an object'''
     284        """Test if the urgent replication is activated when handling urgent attributes of an object."""
    308285
    309286        self.ldb.add({
     
    320297        self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
    321298
    322         # urgent replication should be enabled when modifying userAccountControl 
    323         m = Message()
    324         m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
     299        # urgent replication should be enabled when modifying userAccountControl
     300        m = Message()
     301        m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
    325302        m["userAccountControl"] = MessageElement(str(dsdb.UF_NORMAL_ACCOUNT+dsdb.UF_SMARTCARD_REQUIRED), FLAG_MOD_REPLACE,
    326303          "userAccountControl")
    327         ldb.modify(m)
     304        self.ldb.modify(m)
    328305        res = self.ldb.load_partition_usn(self.base_dn)
    329306        self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
     
    331308        # urgent replication should be enabled when modifying lockoutTime
    332309        m = Message()
    333         m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
     310        m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
    334311        m["lockoutTime"] = MessageElement("1", FLAG_MOD_REPLACE,
    335312          "lockoutTime")
    336         ldb.modify(m)
     313        self.ldb.modify(m)
    337314        res = self.ldb.load_partition_usn(self.base_dn)
    338315        self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
     
    340317        # urgent replication should be enabled when modifying pwdLastSet
    341318        m = Message()
    342         m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
     319        m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
    343320        m["pwdLastSet"] = MessageElement("1", FLAG_MOD_REPLACE,
    344321          "pwdLastSet")
    345         ldb.modify(m)
     322        self.ldb.modify(m)
    346323        res = self.ldb.load_partition_usn(self.base_dn)
    347324        self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
     
    350327        # attribute
    351328        m = Message()
    352         m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
     329        m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
    353330        m["description"] = MessageElement("updated urgent attributes test description",
    354331                                          FLAG_MOD_REPLACE, "description")
    355         ldb.modify(m)
     332        self.ldb.modify(m)
    356333        res = self.ldb.load_partition_usn(self.base_dn)
    357334        self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
     
    363340
    364341
    365 if not "://" in host:
    366     if os.path.isfile(host):
    367         host = "tdb://%s" % host
    368     else:
    369         host = "ldap://%s" % host
    370 
    371 
    372 ldb = SamDB(host, credentials=creds, session_info=system_session(lp), lp=lp,
    373             global_schema=False)
    374 
    375 runner = SubunitTestRunner()
    376 rc = 0
    377 if not runner.run(unittest.makeSuite(UrgentReplicationTests)).wasSuccessful():
    378     rc = 1
    379 sys.exit(rc)
     342TestProgram(module=__name__, opts=subunitopts)
Note: See TracChangeset for help on using the changeset viewer.