1 | #!/usr/bin/env python
|
---|
2 | # -*- coding: utf-8 -*-
|
---|
3 | # This is unit with tests for LDAP access checks
|
---|
4 |
|
---|
5 | import optparse
|
---|
6 | import sys
|
---|
7 | import base64
|
---|
8 | import re
|
---|
9 | sys.path.insert(0, "bin/python")
|
---|
10 | import samba
|
---|
11 | samba.ensure_external_module("testtools", "testtools")
|
---|
12 | samba.ensure_external_module("subunit", "subunit/python")
|
---|
13 |
|
---|
14 | import samba.getopt as options
|
---|
15 | from samba.join import dc_join
|
---|
16 |
|
---|
17 | from ldb import (
|
---|
18 | SCOPE_BASE, SCOPE_SUBTREE, LdbError, ERR_NO_SUCH_OBJECT,
|
---|
19 | ERR_UNWILLING_TO_PERFORM, ERR_INSUFFICIENT_ACCESS_RIGHTS)
|
---|
20 | from ldb import ERR_CONSTRAINT_VIOLATION
|
---|
21 | from ldb import ERR_OPERATIONS_ERROR
|
---|
22 | from ldb import Message, MessageElement, Dn
|
---|
23 | from ldb import FLAG_MOD_REPLACE, FLAG_MOD_ADD
|
---|
24 | from samba.dcerpc import security, drsuapi, misc
|
---|
25 |
|
---|
26 | from samba.auth import system_session
|
---|
27 | from samba import gensec, sd_utils
|
---|
28 | from samba.samdb import SamDB
|
---|
29 | from samba.credentials import Credentials
|
---|
30 | import samba.tests
|
---|
31 | from samba.tests import delete_force
|
---|
32 | from subunit.run import SubunitTestRunner
|
---|
33 | import unittest
|
---|
34 |
|
---|
35 | parser = optparse.OptionParser("acl.py [options] <host>")
|
---|
36 | sambaopts = options.SambaOptions(parser)
|
---|
37 | parser.add_option_group(sambaopts)
|
---|
38 | parser.add_option_group(options.VersionOptions(parser))
|
---|
39 |
|
---|
40 | # use command line creds if available
|
---|
41 | credopts = options.CredentialsOptions(parser)
|
---|
42 | parser.add_option_group(credopts)
|
---|
43 | opts, args = parser.parse_args()
|
---|
44 |
|
---|
45 | if len(args) < 1:
|
---|
46 | parser.print_usage()
|
---|
47 | sys.exit(1)
|
---|
48 |
|
---|
49 | host = args[0]
|
---|
50 | if not "://" in host:
|
---|
51 | ldaphost = "ldap://%s" % host
|
---|
52 | else:
|
---|
53 | ldaphost = host
|
---|
54 | start = host.rindex("://")
|
---|
55 | host = host.lstrip(start+3)
|
---|
56 |
|
---|
57 | lp = sambaopts.get_loadparm()
|
---|
58 | creds = credopts.get_credentials(lp)
|
---|
59 | creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
|
---|
60 |
|
---|
61 | #
|
---|
62 | # Tests start here
|
---|
63 | #
|
---|
64 |
|
---|
65 | class AclTests(samba.tests.TestCase):
|
---|
66 |
|
---|
67 | def setUp(self):
|
---|
68 | super(AclTests, self).setUp()
|
---|
69 | self.ldb_admin = ldb
|
---|
70 | self.base_dn = ldb.domain_dn()
|
---|
71 | self.domain_sid = security.dom_sid(ldb.get_domain_sid())
|
---|
72 | self.user_pass = "samba123@"
|
---|
73 | self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
|
---|
74 | self.sd_utils = sd_utils.SDUtils(ldb)
|
---|
75 | #used for anonymous login
|
---|
76 | self.creds_tmp = Credentials()
|
---|
77 | self.creds_tmp.set_username("")
|
---|
78 | self.creds_tmp.set_password("")
|
---|
79 | self.creds_tmp.set_domain(creds.get_domain())
|
---|
80 | self.creds_tmp.set_realm(creds.get_realm())
|
---|
81 | self.creds_tmp.set_workstation(creds.get_workstation())
|
---|
82 | print "baseDN: %s" % self.base_dn
|
---|
83 |
|
---|
84 | def get_user_dn(self, name):
|
---|
85 | return "CN=%s,CN=Users,%s" % (name, self.base_dn)
|
---|
86 |
|
---|
87 | def get_ldb_connection(self, target_username, target_password):
|
---|
88 | creds_tmp = Credentials()
|
---|
89 | creds_tmp.set_username(target_username)
|
---|
90 | creds_tmp.set_password(target_password)
|
---|
91 | creds_tmp.set_domain(creds.get_domain())
|
---|
92 | creds_tmp.set_realm(creds.get_realm())
|
---|
93 | creds_tmp.set_workstation(creds.get_workstation())
|
---|
94 | creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
|
---|
95 | | gensec.FEATURE_SEAL)
|
---|
96 | ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
|
---|
97 | return ldb_target
|
---|
98 |
|
---|
99 | # Test if we have any additional groups for users than default ones
|
---|
100 | def assert_user_no_group_member(self, username):
|
---|
101 | res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % self.get_user_dn(username))
|
---|
102 | try:
|
---|
103 | self.assertEqual(res[0]["memberOf"][0], "")
|
---|
104 | except KeyError:
|
---|
105 | pass
|
---|
106 | else:
|
---|
107 | self.fail()
|
---|
108 |
|
---|
109 | #tests on ldap add operations
|
---|
110 | class AclAddTests(AclTests):
|
---|
111 |
|
---|
112 | def setUp(self):
|
---|
113 | super(AclAddTests, self).setUp()
|
---|
114 | # Domain admin that will be creator of OU parent-child structure
|
---|
115 | self.usr_admin_owner = "acl_add_user1"
|
---|
116 | # Second domain admin that will not be creator of OU parent-child structure
|
---|
117 | self.usr_admin_not_owner = "acl_add_user2"
|
---|
118 | # Regular user
|
---|
119 | self.regular_user = "acl_add_user3"
|
---|
120 | self.test_user1 = "test_add_user1"
|
---|
121 | self.test_group1 = "test_add_group1"
|
---|
122 | self.ou1 = "OU=test_add_ou1"
|
---|
123 | self.ou2 = "OU=test_add_ou2,%s" % self.ou1
|
---|
124 | self.ldb_admin.newuser(self.usr_admin_owner, self.user_pass)
|
---|
125 | self.ldb_admin.newuser(self.usr_admin_not_owner, self.user_pass)
|
---|
126 | self.ldb_admin.newuser(self.regular_user, self.user_pass)
|
---|
127 |
|
---|
128 | # add admins to the Domain Admins group
|
---|
129 | self.ldb_admin.add_remove_group_members("Domain Admins", self.usr_admin_owner,
|
---|
130 | add_members_operation=True)
|
---|
131 | self.ldb_admin.add_remove_group_members("Domain Admins", self.usr_admin_not_owner,
|
---|
132 | add_members_operation=True)
|
---|
133 |
|
---|
134 | self.ldb_owner = self.get_ldb_connection(self.usr_admin_owner, self.user_pass)
|
---|
135 | self.ldb_notowner = self.get_ldb_connection(self.usr_admin_not_owner, self.user_pass)
|
---|
136 | self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
|
---|
137 |
|
---|
138 | def tearDown(self):
|
---|
139 | super(AclAddTests, self).tearDown()
|
---|
140 | delete_force(self.ldb_admin, "CN=%s,%s,%s" %
|
---|
141 | (self.test_user1, self.ou2, self.base_dn))
|
---|
142 | delete_force(self.ldb_admin, "CN=%s,%s,%s" %
|
---|
143 | (self.test_group1, self.ou2, self.base_dn))
|
---|
144 | delete_force(self.ldb_admin, "%s,%s" % (self.ou2, self.base_dn))
|
---|
145 | delete_force(self.ldb_admin, "%s,%s" % (self.ou1, self.base_dn))
|
---|
146 | delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_owner))
|
---|
147 | delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_not_owner))
|
---|
148 | delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
|
---|
149 | delete_force(self.ldb_admin, self.get_user_dn("test_add_anonymous"))
|
---|
150 |
|
---|
151 | # Make sure top OU is deleted (and so everything under it)
|
---|
152 | def assert_top_ou_deleted(self):
|
---|
153 | res = self.ldb_admin.search(self.base_dn,
|
---|
154 | expression="(distinguishedName=%s,%s)" % (
|
---|
155 | "OU=test_add_ou1", self.base_dn))
|
---|
156 | self.assertEqual(len(res), 0)
|
---|
157 |
|
---|
158 | def test_add_u1(self):
|
---|
159 | """Testing OU with the rights of Doman Admin not creator of the OU """
|
---|
160 | self.assert_top_ou_deleted()
|
---|
161 | # Change descriptor for top level OU
|
---|
162 | self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
|
---|
163 | self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
|
---|
164 | user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.usr_admin_not_owner))
|
---|
165 | mod = "(D;CI;WPCC;;;%s)" % str(user_sid)
|
---|
166 | self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
|
---|
167 | # Test user and group creation with another domain admin's credentials
|
---|
168 | self.ldb_notowner.newuser(self.test_user1, self.user_pass, userou=self.ou2)
|
---|
169 | self.ldb_notowner.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
|
---|
170 | grouptype=4)
|
---|
171 | # Make sure we HAVE created the two objects -- user and group
|
---|
172 | # !!! We should not be able to do that, but however beacuse of ACE ordering our inherited Deny ACE
|
---|
173 | # !!! comes after explicit (A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA) that comes from somewhere
|
---|
174 | res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
|
---|
175 | self.assertTrue(len(res) > 0)
|
---|
176 | res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
|
---|
177 | self.assertTrue(len(res) > 0)
|
---|
178 |
|
---|
179 | def test_add_u2(self):
|
---|
180 | """Testing OU with the regular user that has no rights granted over the OU """
|
---|
181 | self.assert_top_ou_deleted()
|
---|
182 | # Create a parent-child OU structure with domain admin credentials
|
---|
183 | self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
|
---|
184 | self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
|
---|
185 | # Test user and group creation with regular user credentials
|
---|
186 | try:
|
---|
187 | self.ldb_user.newuser(self.test_user1, self.user_pass, userou=self.ou2)
|
---|
188 | self.ldb_user.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
|
---|
189 | grouptype=4)
|
---|
190 | except LdbError, (num, _):
|
---|
191 | self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
|
---|
192 | else:
|
---|
193 | self.fail()
|
---|
194 | # Make sure we HAVEN'T created any of two objects -- user or group
|
---|
195 | res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
|
---|
196 | self.assertEqual(len(res), 0)
|
---|
197 | res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
|
---|
198 | self.assertEqual(len(res), 0)
|
---|
199 |
|
---|
200 | def test_add_u3(self):
|
---|
201 | """Testing OU with the rights of regular user granted the right 'Create User child objects' """
|
---|
202 | self.assert_top_ou_deleted()
|
---|
203 | # Change descriptor for top level OU
|
---|
204 | self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
|
---|
205 | user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
|
---|
206 | mod = "(OA;CI;CC;bf967aba-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
|
---|
207 | self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
|
---|
208 | self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
|
---|
209 | # Test user and group creation with granted user only to one of the objects
|
---|
210 | self.ldb_user.newuser(self.test_user1, self.user_pass, userou=self.ou2, setpassword=False)
|
---|
211 | try:
|
---|
212 | self.ldb_user.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
|
---|
213 | grouptype=4)
|
---|
214 | except LdbError, (num, _):
|
---|
215 | self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
|
---|
216 | else:
|
---|
217 | self.fail()
|
---|
218 | # Make sure we HAVE created the one of two objects -- user
|
---|
219 | res = self.ldb_admin.search(self.base_dn,
|
---|
220 | expression="(distinguishedName=%s,%s)" %
|
---|
221 | ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1",
|
---|
222 | self.base_dn))
|
---|
223 | self.assertNotEqual(len(res), 0)
|
---|
224 | res = self.ldb_admin.search(self.base_dn,
|
---|
225 | expression="(distinguishedName=%s,%s)" %
|
---|
226 | ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1",
|
---|
227 | self.base_dn) )
|
---|
228 | self.assertEqual(len(res), 0)
|
---|
229 |
|
---|
230 | def test_add_u4(self):
|
---|
231 | """ 4 Testing OU with the rights of Doman Admin creator of the OU"""
|
---|
232 | self.assert_top_ou_deleted()
|
---|
233 | self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
|
---|
234 | self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
|
---|
235 | self.ldb_owner.newuser(self.test_user1, self.user_pass, userou=self.ou2)
|
---|
236 | self.ldb_owner.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
|
---|
237 | grouptype=4)
|
---|
238 | # Make sure we have successfully created the two objects -- user and group
|
---|
239 | res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
|
---|
240 | self.assertTrue(len(res) > 0)
|
---|
241 | res = self.ldb_admin.search(self.base_dn,
|
---|
242 | expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
|
---|
243 | self.assertTrue(len(res) > 0)
|
---|
244 |
|
---|
245 | def test_add_anonymous(self):
|
---|
246 | """Test add operation with anonymous user"""
|
---|
247 | anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
|
---|
248 | try:
|
---|
249 | anonymous.newuser("test_add_anonymous", self.user_pass)
|
---|
250 | except LdbError, (num, _):
|
---|
251 | self.assertEquals(num, ERR_OPERATIONS_ERROR)
|
---|
252 | else:
|
---|
253 | self.fail()
|
---|
254 |
|
---|
255 | #tests on ldap modify operations
|
---|
256 | class AclModifyTests(AclTests):
|
---|
257 |
|
---|
258 | def setUp(self):
|
---|
259 | super(AclModifyTests, self).setUp()
|
---|
260 | self.user_with_wp = "acl_mod_user1"
|
---|
261 | self.user_with_sm = "acl_mod_user2"
|
---|
262 | self.user_with_group_sm = "acl_mod_user3"
|
---|
263 | self.ldb_admin.newuser(self.user_with_wp, self.user_pass)
|
---|
264 | self.ldb_admin.newuser(self.user_with_sm, self.user_pass)
|
---|
265 | self.ldb_admin.newuser(self.user_with_group_sm, self.user_pass)
|
---|
266 | self.ldb_user = self.get_ldb_connection(self.user_with_wp, self.user_pass)
|
---|
267 | self.ldb_user2 = self.get_ldb_connection(self.user_with_sm, self.user_pass)
|
---|
268 | self.ldb_user3 = self.get_ldb_connection(self.user_with_group_sm, self.user_pass)
|
---|
269 | self.user_sid = self.sd_utils.get_object_sid( self.get_user_dn(self.user_with_wp))
|
---|
270 | self.ldb_admin.newgroup("test_modify_group2", grouptype=4)
|
---|
271 | self.ldb_admin.newgroup("test_modify_group3", grouptype=4)
|
---|
272 | self.ldb_admin.newuser("test_modify_user2", self.user_pass)
|
---|
273 |
|
---|
274 | def tearDown(self):
|
---|
275 | super(AclModifyTests, self).tearDown()
|
---|
276 | delete_force(self.ldb_admin, self.get_user_dn("test_modify_user1"))
|
---|
277 | delete_force(self.ldb_admin, "CN=test_modify_group1,CN=Users," + self.base_dn)
|
---|
278 | delete_force(self.ldb_admin, "CN=test_modify_group2,CN=Users," + self.base_dn)
|
---|
279 | delete_force(self.ldb_admin, "CN=test_modify_group3,CN=Users," + self.base_dn)
|
---|
280 | delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
|
---|
281 | delete_force(self.ldb_admin, self.get_user_dn(self.user_with_wp))
|
---|
282 | delete_force(self.ldb_admin, self.get_user_dn(self.user_with_sm))
|
---|
283 | delete_force(self.ldb_admin, self.get_user_dn(self.user_with_group_sm))
|
---|
284 | delete_force(self.ldb_admin, self.get_user_dn("test_modify_user2"))
|
---|
285 | delete_force(self.ldb_admin, self.get_user_dn("test_anonymous"))
|
---|
286 |
|
---|
287 | def test_modify_u1(self):
|
---|
288 | """5 Modify one attribute if you have DS_WRITE_PROPERTY for it"""
|
---|
289 | mod = "(OA;;WP;bf967953-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.user_sid)
|
---|
290 | # First test object -- User
|
---|
291 | print "Testing modify on User object"
|
---|
292 | self.ldb_admin.newuser("test_modify_user1", self.user_pass)
|
---|
293 | self.sd_utils.dacl_add_ace(self.get_user_dn("test_modify_user1"), mod)
|
---|
294 | ldif = """
|
---|
295 | dn: """ + self.get_user_dn("test_modify_user1") + """
|
---|
296 | changetype: modify
|
---|
297 | replace: displayName
|
---|
298 | displayName: test_changed"""
|
---|
299 | self.ldb_user.modify_ldif(ldif)
|
---|
300 | res = self.ldb_admin.search(self.base_dn,
|
---|
301 | expression="(distinguishedName=%s)" % self.get_user_dn("test_modify_user1"))
|
---|
302 | self.assertEqual(res[0]["displayName"][0], "test_changed")
|
---|
303 | # Second test object -- Group
|
---|
304 | print "Testing modify on Group object"
|
---|
305 | self.ldb_admin.newgroup("test_modify_group1", grouptype=4)
|
---|
306 | self.sd_utils.dacl_add_ace("CN=test_modify_group1,CN=Users," + self.base_dn, mod)
|
---|
307 | ldif = """
|
---|
308 | dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
|
---|
309 | changetype: modify
|
---|
310 | replace: displayName
|
---|
311 | displayName: test_changed"""
|
---|
312 | self.ldb_user.modify_ldif(ldif)
|
---|
313 | res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % str("CN=test_modify_group1,CN=Users," + self.base_dn))
|
---|
314 | self.assertEqual(res[0]["displayName"][0], "test_changed")
|
---|
315 | # Third test object -- Organizational Unit
|
---|
316 | print "Testing modify on OU object"
|
---|
317 | #delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
|
---|
318 | self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
|
---|
319 | self.sd_utils.dacl_add_ace("OU=test_modify_ou1," + self.base_dn, mod)
|
---|
320 | ldif = """
|
---|
321 | dn: OU=test_modify_ou1,""" + self.base_dn + """
|
---|
322 | changetype: modify
|
---|
323 | replace: displayName
|
---|
324 | displayName: test_changed"""
|
---|
325 | self.ldb_user.modify_ldif(ldif)
|
---|
326 | res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % str("OU=test_modify_ou1," + self.base_dn))
|
---|
327 | self.assertEqual(res[0]["displayName"][0], "test_changed")
|
---|
328 |
|
---|
329 | def test_modify_u2(self):
|
---|
330 | """6 Modify two attributes as you have DS_WRITE_PROPERTY granted only for one of them"""
|
---|
331 | mod = "(OA;;WP;bf967953-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.user_sid)
|
---|
332 | # First test object -- User
|
---|
333 | print "Testing modify on User object"
|
---|
334 | #delete_force(self.ldb_admin, self.get_user_dn("test_modify_user1"))
|
---|
335 | self.ldb_admin.newuser("test_modify_user1", self.user_pass)
|
---|
336 | self.sd_utils.dacl_add_ace(self.get_user_dn("test_modify_user1"), mod)
|
---|
337 | # Modify on attribute you have rights for
|
---|
338 | ldif = """
|
---|
339 | dn: """ + self.get_user_dn("test_modify_user1") + """
|
---|
340 | changetype: modify
|
---|
341 | replace: displayName
|
---|
342 | displayName: test_changed"""
|
---|
343 | self.ldb_user.modify_ldif(ldif)
|
---|
344 | res = self.ldb_admin.search(self.base_dn,
|
---|
345 | expression="(distinguishedName=%s)" %
|
---|
346 | self.get_user_dn("test_modify_user1"))
|
---|
347 | self.assertEqual(res[0]["displayName"][0], "test_changed")
|
---|
348 | # Modify on attribute you do not have rights for granted
|
---|
349 | ldif = """
|
---|
350 | dn: """ + self.get_user_dn("test_modify_user1") + """
|
---|
351 | changetype: modify
|
---|
352 | replace: url
|
---|
353 | url: www.samba.org"""
|
---|
354 | try:
|
---|
355 | self.ldb_user.modify_ldif(ldif)
|
---|
356 | except LdbError, (num, _):
|
---|
357 | self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
|
---|
358 | else:
|
---|
359 | # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
|
---|
360 | self.fail()
|
---|
361 | # Second test object -- Group
|
---|
362 | print "Testing modify on Group object"
|
---|
363 | self.ldb_admin.newgroup("test_modify_group1", grouptype=4)
|
---|
364 | self.sd_utils.dacl_add_ace("CN=test_modify_group1,CN=Users," + self.base_dn, mod)
|
---|
365 | ldif = """
|
---|
366 | dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
|
---|
367 | changetype: modify
|
---|
368 | replace: displayName
|
---|
369 | displayName: test_changed"""
|
---|
370 | self.ldb_user.modify_ldif(ldif)
|
---|
371 | res = self.ldb_admin.search(self.base_dn,
|
---|
372 | expression="(distinguishedName=%s)" %
|
---|
373 | str("CN=test_modify_group1,CN=Users," + self.base_dn))
|
---|
374 | self.assertEqual(res[0]["displayName"][0], "test_changed")
|
---|
375 | # Modify on attribute you do not have rights for granted
|
---|
376 | ldif = """
|
---|
377 | dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
|
---|
378 | changetype: modify
|
---|
379 | replace: url
|
---|
380 | url: www.samba.org"""
|
---|
381 | try:
|
---|
382 | self.ldb_user.modify_ldif(ldif)
|
---|
383 | except LdbError, (num, _):
|
---|
384 | self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
|
---|
385 | else:
|
---|
386 | # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
|
---|
387 | self.fail()
|
---|
388 | # Second test object -- Organizational Unit
|
---|
389 | print "Testing modify on OU object"
|
---|
390 | self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
|
---|
391 | self.sd_utils.dacl_add_ace("OU=test_modify_ou1," + self.base_dn, mod)
|
---|
392 | ldif = """
|
---|
393 | dn: OU=test_modify_ou1,""" + self.base_dn + """
|
---|
394 | changetype: modify
|
---|
395 | replace: displayName
|
---|
396 | displayName: test_changed"""
|
---|
397 | self.ldb_user.modify_ldif(ldif)
|
---|
398 | res = self.ldb_admin.search(self.base_dn,
|
---|
399 | expression="(distinguishedName=%s)" % str("OU=test_modify_ou1,"
|
---|
400 | + self.base_dn))
|
---|
401 | self.assertEqual(res[0]["displayName"][0], "test_changed")
|
---|
402 | # Modify on attribute you do not have rights for granted
|
---|
403 | ldif = """
|
---|
404 | dn: OU=test_modify_ou1,""" + self.base_dn + """
|
---|
405 | changetype: modify
|
---|
406 | replace: url
|
---|
407 | url: www.samba.org"""
|
---|
408 | try:
|
---|
409 | self.ldb_user.modify_ldif(ldif)
|
---|
410 | except LdbError, (num, _):
|
---|
411 | self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
|
---|
412 | else:
|
---|
413 | # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
|
---|
414 | self.fail()
|
---|
415 |
|
---|
416 | def test_modify_u3(self):
|
---|
417 | """7 Modify one attribute as you have no what so ever rights granted"""
|
---|
418 | # First test object -- User
|
---|
419 | print "Testing modify on User object"
|
---|
420 | self.ldb_admin.newuser("test_modify_user1", self.user_pass)
|
---|
421 | # Modify on attribute you do not have rights for granted
|
---|
422 | ldif = """
|
---|
423 | dn: """ + self.get_user_dn("test_modify_user1") + """
|
---|
424 | changetype: modify
|
---|
425 | replace: url
|
---|
426 | url: www.samba.org"""
|
---|
427 | try:
|
---|
428 | self.ldb_user.modify_ldif(ldif)
|
---|
429 | except LdbError, (num, _):
|
---|
430 | self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
|
---|
431 | else:
|
---|
432 | # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
|
---|
433 | self.fail()
|
---|
434 |
|
---|
435 | # Second test object -- Group
|
---|
436 | print "Testing modify on Group object"
|
---|
437 | self.ldb_admin.newgroup("test_modify_group1", grouptype=4)
|
---|
438 | # Modify on attribute you do not have rights for granted
|
---|
439 | ldif = """
|
---|
440 | dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
|
---|
441 | changetype: modify
|
---|
442 | replace: url
|
---|
443 | url: www.samba.org"""
|
---|
444 | try:
|
---|
445 | self.ldb_user.modify_ldif(ldif)
|
---|
446 | except LdbError, (num, _):
|
---|
447 | self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
|
---|
448 | else:
|
---|
449 | # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
|
---|
450 | self.fail()
|
---|
451 |
|
---|
452 | # Second test object -- Organizational Unit
|
---|
453 | print "Testing modify on OU object"
|
---|
454 | #delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
|
---|
455 | self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
|
---|
456 | # Modify on attribute you do not have rights for granted
|
---|
457 | ldif = """
|
---|
458 | dn: OU=test_modify_ou1,""" + self.base_dn + """
|
---|
459 | changetype: modify
|
---|
460 | replace: url
|
---|
461 | url: www.samba.org"""
|
---|
462 | try:
|
---|
463 | self.ldb_user.modify_ldif(ldif)
|
---|
464 | except LdbError, (num, _):
|
---|
465 | self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
|
---|
466 | else:
|
---|
467 | # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
|
---|
468 | self.fail()
|
---|
469 |
|
---|
470 |
|
---|
471 | def test_modify_u4(self):
|
---|
472 | """11 Grant WP to PRINCIPAL_SELF and test modify"""
|
---|
473 | ldif = """
|
---|
474 | dn: """ + self.get_user_dn(self.user_with_wp) + """
|
---|
475 | changetype: modify
|
---|
476 | add: adminDescription
|
---|
477 | adminDescription: blah blah blah"""
|
---|
478 | try:
|
---|
479 | self.ldb_user.modify_ldif(ldif)
|
---|
480 | except LdbError, (num, _):
|
---|
481 | self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
|
---|
482 | else:
|
---|
483 | # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
|
---|
484 | self.fail()
|
---|
485 |
|
---|
486 | mod = "(OA;;WP;bf967919-0de6-11d0-a285-00aa003049e2;;PS)"
|
---|
487 | self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
|
---|
488 | # Modify on attribute you have rights for
|
---|
489 | self.ldb_user.modify_ldif(ldif)
|
---|
490 | res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" \
|
---|
491 | % self.get_user_dn(self.user_with_wp), attrs=["adminDescription"] )
|
---|
492 | self.assertEqual(res[0]["adminDescription"][0], "blah blah blah")
|
---|
493 |
|
---|
494 | def test_modify_u5(self):
|
---|
495 | """12 test self membership"""
|
---|
496 | ldif = """
|
---|
497 | dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
|
---|
498 | changetype: modify
|
---|
499 | add: Member
|
---|
500 | Member: """ + self.get_user_dn(self.user_with_sm)
|
---|
501 | #the user has no rights granted, this should fail
|
---|
502 | try:
|
---|
503 | self.ldb_user2.modify_ldif(ldif)
|
---|
504 | except LdbError, (num, _):
|
---|
505 | self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
|
---|
506 | else:
|
---|
507 | # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
|
---|
508 | self.fail()
|
---|
509 |
|
---|
510 | #grant self-membership, should be able to add himself
|
---|
511 | user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.user_with_sm))
|
---|
512 | mod = "(OA;;SW;bf9679c0-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
|
---|
513 | self.sd_utils.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
|
---|
514 | self.ldb_user2.modify_ldif(ldif)
|
---|
515 | res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
|
---|
516 | % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
|
---|
517 | self.assertEqual(res[0]["Member"][0], self.get_user_dn(self.user_with_sm))
|
---|
518 | #but not other users
|
---|
519 | ldif = """
|
---|
520 | dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
|
---|
521 | changetype: modify
|
---|
522 | add: Member
|
---|
523 | Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
|
---|
524 | try:
|
---|
525 | self.ldb_user2.modify_ldif(ldif)
|
---|
526 | except LdbError, (num, _):
|
---|
527 | self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
|
---|
528 | else:
|
---|
529 | self.fail()
|
---|
530 |
|
---|
531 | def test_modify_u6(self):
|
---|
532 | """13 test self membership"""
|
---|
533 | ldif = """
|
---|
534 | dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
|
---|
535 | changetype: modify
|
---|
536 | add: Member
|
---|
537 | Member: """ + self.get_user_dn(self.user_with_sm) + """
|
---|
538 | Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
|
---|
539 |
|
---|
540 | #grant self-membership, should be able to add himself but not others at the same time
|
---|
541 | user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.user_with_sm))
|
---|
542 | mod = "(OA;;SW;bf9679c0-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
|
---|
543 | self.sd_utils.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
|
---|
544 | try:
|
---|
545 | self.ldb_user2.modify_ldif(ldif)
|
---|
546 | except LdbError, (num, _):
|
---|
547 | self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
|
---|
548 | else:
|
---|
549 | self.fail()
|
---|
550 |
|
---|
551 | def test_modify_u7(self):
|
---|
552 | """13 User with WP modifying Member"""
|
---|
553 | #a second user is given write property permission
|
---|
554 | user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.user_with_wp))
|
---|
555 | mod = "(A;;WP;;;%s)" % str(user_sid)
|
---|
556 | self.sd_utils.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
|
---|
557 | ldif = """
|
---|
558 | dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
|
---|
559 | changetype: modify
|
---|
560 | add: Member
|
---|
561 | Member: """ + self.get_user_dn(self.user_with_wp)
|
---|
562 | self.ldb_user.modify_ldif(ldif)
|
---|
563 | res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
|
---|
564 | % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
|
---|
565 | self.assertEqual(res[0]["Member"][0], self.get_user_dn(self.user_with_wp))
|
---|
566 | ldif = """
|
---|
567 | dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
|
---|
568 | changetype: modify
|
---|
569 | delete: Member"""
|
---|
570 | self.ldb_user.modify_ldif(ldif)
|
---|
571 | ldif = """
|
---|
572 | dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
|
---|
573 | changetype: modify
|
---|
574 | add: Member
|
---|
575 | Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
|
---|
576 | self.ldb_user.modify_ldif(ldif)
|
---|
577 | res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
|
---|
578 | % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
|
---|
579 | self.assertEqual(res[0]["Member"][0], "CN=test_modify_user2,CN=Users," + self.base_dn)
|
---|
580 |
|
---|
581 | def test_modify_anonymous(self):
|
---|
582 | """Test add operation with anonymous user"""
|
---|
583 | anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
|
---|
584 | self.ldb_admin.newuser("test_anonymous", "samba123@")
|
---|
585 | m = Message()
|
---|
586 | m.dn = Dn(anonymous, self.get_user_dn("test_anonymous"))
|
---|
587 |
|
---|
588 | m["description"] = MessageElement("sambauser2",
|
---|
589 | FLAG_MOD_ADD,
|
---|
590 | "description")
|
---|
591 | try:
|
---|
592 | anonymous.modify(m)
|
---|
593 | except LdbError, (num, _):
|
---|
594 | self.assertEquals(num, ERR_OPERATIONS_ERROR)
|
---|
595 | else:
|
---|
596 | self.fail()
|
---|
597 |
|
---|
598 | #enable these when we have search implemented
|
---|
599 | class AclSearchTests(AclTests):
|
---|
600 |
|
---|
601 | def setUp(self):
|
---|
602 | super(AclSearchTests, self).setUp()
|
---|
603 | self.u1 = "search_u1"
|
---|
604 | self.u2 = "search_u2"
|
---|
605 | self.u3 = "search_u3"
|
---|
606 | self.group1 = "group1"
|
---|
607 | self.ldb_admin.newuser(self.u1, self.user_pass)
|
---|
608 | self.ldb_admin.newuser(self.u2, self.user_pass)
|
---|
609 | self.ldb_admin.newuser(self.u3, self.user_pass)
|
---|
610 | self.ldb_admin.newgroup(self.group1, grouptype=-2147483646)
|
---|
611 | self.ldb_admin.add_remove_group_members(self.group1, self.u2,
|
---|
612 | add_members_operation=True)
|
---|
613 | self.ldb_user = self.get_ldb_connection(self.u1, self.user_pass)
|
---|
614 | self.ldb_user2 = self.get_ldb_connection(self.u2, self.user_pass)
|
---|
615 | self.ldb_user3 = self.get_ldb_connection(self.u3, self.user_pass)
|
---|
616 | self.full_list = [Dn(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn),
|
---|
617 | Dn(self.ldb_admin, "OU=ou1," + self.base_dn),
|
---|
618 | Dn(self.ldb_admin, "OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
|
---|
619 | Dn(self.ldb_admin, "OU=ou4,OU=ou2,OU=ou1," + self.base_dn),
|
---|
620 | Dn(self.ldb_admin, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
|
---|
621 | Dn(self.ldb_admin, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
|
---|
622 | self.user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.u1))
|
---|
623 | self.group_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.group1))
|
---|
624 |
|
---|
625 | def create_clean_ou(self, object_dn):
|
---|
626 | """ Base repeating setup for unittests to follow """
|
---|
627 | res = self.ldb_admin.search(base=self.base_dn, scope=SCOPE_SUBTREE, \
|
---|
628 | expression="distinguishedName=%s" % object_dn)
|
---|
629 | # Make sure top testing OU has been deleted before starting the test
|
---|
630 | self.assertEqual(len(res), 0)
|
---|
631 | self.ldb_admin.create_ou(object_dn)
|
---|
632 | desc_sddl = self.sd_utils.get_sd_as_sddl(object_dn)
|
---|
633 | # Make sure there are inheritable ACEs initially
|
---|
634 | self.assertTrue("CI" in desc_sddl or "OI" in desc_sddl)
|
---|
635 | # Find and remove all inherit ACEs
|
---|
636 | res = re.findall("\(.*?\)", desc_sddl)
|
---|
637 | res = [x for x in res if ("CI" in x) or ("OI" in x)]
|
---|
638 | for x in res:
|
---|
639 | desc_sddl = desc_sddl.replace(x, "")
|
---|
640 | # Add flag 'protected' in both DACL and SACL so no inherit ACEs
|
---|
641 | # can propagate from above
|
---|
642 | # remove SACL, we are not interested
|
---|
643 | desc_sddl = desc_sddl.replace(":AI", ":AIP")
|
---|
644 | self.sd_utils.modify_sd_on_dn(object_dn, desc_sddl)
|
---|
645 | # Verify all inheritable ACEs are gone
|
---|
646 | desc_sddl = self.sd_utils.get_sd_as_sddl(object_dn)
|
---|
647 | self.assertFalse("CI" in desc_sddl)
|
---|
648 | self.assertFalse("OI" in desc_sddl)
|
---|
649 |
|
---|
650 | def tearDown(self):
|
---|
651 | super(AclSearchTests, self).tearDown()
|
---|
652 | delete_force(self.ldb_admin, "OU=test_search_ou2,OU=test_search_ou1," + self.base_dn)
|
---|
653 | delete_force(self.ldb_admin, "OU=test_search_ou1," + self.base_dn)
|
---|
654 | delete_force(self.ldb_admin, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
|
---|
655 | delete_force(self.ldb_admin, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
|
---|
656 | delete_force(self.ldb_admin, "OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
|
---|
657 | delete_force(self.ldb_admin, "OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
|
---|
658 | delete_force(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn)
|
---|
659 | delete_force(self.ldb_admin, "OU=ou1," + self.base_dn)
|
---|
660 | delete_force(self.ldb_admin, self.get_user_dn("search_u1"))
|
---|
661 | delete_force(self.ldb_admin, self.get_user_dn("search_u2"))
|
---|
662 | delete_force(self.ldb_admin, self.get_user_dn("search_u3"))
|
---|
663 | delete_force(self.ldb_admin, self.get_user_dn("group1"))
|
---|
664 |
|
---|
665 | def test_search_anonymous1(self):
|
---|
666 | """Verify access of rootDSE with the correct request"""
|
---|
667 | anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
|
---|
668 | res = anonymous.search("", expression="(objectClass=*)", scope=SCOPE_BASE)
|
---|
669 | self.assertEquals(len(res), 1)
|
---|
670 | #verify some of the attributes
|
---|
671 | #dont care about values
|
---|
672 | self.assertTrue("ldapServiceName" in res[0])
|
---|
673 | self.assertTrue("namingContexts" in res[0])
|
---|
674 | self.assertTrue("isSynchronized" in res[0])
|
---|
675 | self.assertTrue("dsServiceName" in res[0])
|
---|
676 | self.assertTrue("supportedSASLMechanisms" in res[0])
|
---|
677 | self.assertTrue("isGlobalCatalogReady" in res[0])
|
---|
678 | self.assertTrue("domainControllerFunctionality" in res[0])
|
---|
679 | self.assertTrue("serverName" in res[0])
|
---|
680 |
|
---|
681 | def test_search_anonymous2(self):
|
---|
682 | """Make sure we cannot access anything else"""
|
---|
683 | anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
|
---|
684 | try:
|
---|
685 | res = anonymous.search("", expression="(objectClass=*)", scope=SCOPE_SUBTREE)
|
---|
686 | except LdbError, (num, _):
|
---|
687 | self.assertEquals(num, ERR_OPERATIONS_ERROR)
|
---|
688 | else:
|
---|
689 | self.fail()
|
---|
690 | try:
|
---|
691 | res = anonymous.search(self.base_dn, expression="(objectClass=*)", scope=SCOPE_SUBTREE)
|
---|
692 | except LdbError, (num, _):
|
---|
693 | self.assertEquals(num, ERR_OPERATIONS_ERROR)
|
---|
694 | else:
|
---|
695 | self.fail()
|
---|
696 | try:
|
---|
697 | res = anonymous.search("CN=Configuration," + self.base_dn, expression="(objectClass=*)",
|
---|
698 | scope=SCOPE_SUBTREE)
|
---|
699 | except LdbError, (num, _):
|
---|
700 | self.assertEquals(num, ERR_OPERATIONS_ERROR)
|
---|
701 | else:
|
---|
702 | self.fail()
|
---|
703 |
|
---|
704 | def test_search_anonymous3(self):
|
---|
705 | """Set dsHeuristics and repeat"""
|
---|
706 | self.ldb_admin.set_dsheuristics("0000002")
|
---|
707 | self.ldb_admin.create_ou("OU=test_search_ou1," + self.base_dn)
|
---|
708 | mod = "(A;CI;LC;;;AN)"
|
---|
709 | self.sd_utils.dacl_add_ace("OU=test_search_ou1," + self.base_dn, mod)
|
---|
710 | self.ldb_admin.create_ou("OU=test_search_ou2,OU=test_search_ou1," + self.base_dn)
|
---|
711 | anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
|
---|
712 | res = anonymous.search("OU=test_search_ou2,OU=test_search_ou1," + self.base_dn,
|
---|
713 | expression="(objectClass=*)", scope=SCOPE_SUBTREE)
|
---|
714 | self.assertEquals(len(res), 1)
|
---|
715 | self.assertTrue("dn" in res[0])
|
---|
716 | self.assertTrue(res[0]["dn"] == Dn(self.ldb_admin,
|
---|
717 | "OU=test_search_ou2,OU=test_search_ou1," + self.base_dn))
|
---|
718 | res = anonymous.search("CN=Configuration," + self.base_dn, expression="(objectClass=*)",
|
---|
719 | scope=SCOPE_SUBTREE)
|
---|
720 | self.assertEquals(len(res), 1)
|
---|
721 | self.assertTrue("dn" in res[0])
|
---|
722 | self.assertTrue(res[0]["dn"] == Dn(self.ldb_admin, self.configuration_dn))
|
---|
723 |
|
---|
724 | def test_search1(self):
|
---|
725 | """Make sure users can see us if given LC to user and group"""
|
---|
726 | self.create_clean_ou("OU=ou1," + self.base_dn)
|
---|
727 | mod = "(A;;LC;;;%s)(A;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
|
---|
728 | self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
|
---|
729 | tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
|
---|
730 | self.domain_sid)
|
---|
731 | self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
|
---|
732 | self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
|
---|
733 | self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
|
---|
734 | self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
|
---|
735 | self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
|
---|
736 |
|
---|
737 | #regular users must see only ou1 and ou2
|
---|
738 | res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
|
---|
739 | scope=SCOPE_SUBTREE)
|
---|
740 | self.assertEquals(len(res), 2)
|
---|
741 | ok_list = [Dn(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn),
|
---|
742 | Dn(self.ldb_admin, "OU=ou1," + self.base_dn)]
|
---|
743 |
|
---|
744 | res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
|
---|
745 | self.assertEquals(sorted(res_list), sorted(ok_list))
|
---|
746 |
|
---|
747 | #these users should see all ous
|
---|
748 | res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
|
---|
749 | scope=SCOPE_SUBTREE)
|
---|
750 | self.assertEquals(len(res), 6)
|
---|
751 | res_list = [ x["dn"] for x in res if x["dn"] in self.full_list ]
|
---|
752 | self.assertEquals(sorted(res_list), sorted(self.full_list))
|
---|
753 |
|
---|
754 | res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
|
---|
755 | scope=SCOPE_SUBTREE)
|
---|
756 | self.assertEquals(len(res), 6)
|
---|
757 | res_list = [ x["dn"] for x in res if x["dn"] in self.full_list ]
|
---|
758 | self.assertEquals(sorted(res_list), sorted(self.full_list))
|
---|
759 |
|
---|
760 | def test_search2(self):
|
---|
761 | """Make sure users can't see us if access is explicitly denied"""
|
---|
762 | self.create_clean_ou("OU=ou1," + self.base_dn)
|
---|
763 | self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn)
|
---|
764 | self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
|
---|
765 | self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
|
---|
766 | self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
|
---|
767 | self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
|
---|
768 | mod = "(D;;LC;;;%s)(D;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
|
---|
769 | self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
|
---|
770 | res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
|
---|
771 | scope=SCOPE_SUBTREE)
|
---|
772 | #this user should see all ous
|
---|
773 | res_list = [ x["dn"] for x in res if x["dn"] in self.full_list ]
|
---|
774 | self.assertEquals(sorted(res_list), sorted(self.full_list))
|
---|
775 |
|
---|
776 | #these users should see ou1, 2, 5 and 6 but not 3 and 4
|
---|
777 | res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
|
---|
778 | scope=SCOPE_SUBTREE)
|
---|
779 | ok_list = [Dn(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn),
|
---|
780 | Dn(self.ldb_admin, "OU=ou1," + self.base_dn),
|
---|
781 | Dn(self.ldb_admin, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
|
---|
782 | Dn(self.ldb_admin, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
|
---|
783 | res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
|
---|
784 | self.assertEquals(sorted(res_list), sorted(ok_list))
|
---|
785 |
|
---|
786 | res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
|
---|
787 | scope=SCOPE_SUBTREE)
|
---|
788 | self.assertEquals(len(res), 4)
|
---|
789 | res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
|
---|
790 | self.assertEquals(sorted(res_list), sorted(ok_list))
|
---|
791 |
|
---|
792 | def test_search3(self):
|
---|
793 | """Make sure users can't see ous if access is explicitly denied - 2"""
|
---|
794 | self.create_clean_ou("OU=ou1," + self.base_dn)
|
---|
795 | mod = "(A;CI;LC;;;%s)(A;CI;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
|
---|
796 | self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
|
---|
797 | tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
|
---|
798 | self.domain_sid)
|
---|
799 | self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
|
---|
800 | self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
|
---|
801 | self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
|
---|
802 | self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
|
---|
803 | self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
|
---|
804 |
|
---|
805 | print "Testing correct behavior on nonaccessible search base"
|
---|
806 | try:
|
---|
807 | self.ldb_user3.search("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
|
---|
808 | scope=SCOPE_BASE)
|
---|
809 | except LdbError, (num, _):
|
---|
810 | self.assertEquals(num, ERR_NO_SUCH_OBJECT)
|
---|
811 | else:
|
---|
812 | self.fail()
|
---|
813 |
|
---|
814 | mod = "(D;;LC;;;%s)(D;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
|
---|
815 | self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
|
---|
816 |
|
---|
817 | ok_list = [Dn(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn),
|
---|
818 | Dn(self.ldb_admin, "OU=ou1," + self.base_dn)]
|
---|
819 |
|
---|
820 | res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
|
---|
821 | scope=SCOPE_SUBTREE)
|
---|
822 | res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
|
---|
823 | self.assertEquals(sorted(res_list), sorted(ok_list))
|
---|
824 |
|
---|
825 | ok_list = [Dn(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn),
|
---|
826 | Dn(self.ldb_admin, "OU=ou1," + self.base_dn),
|
---|
827 | Dn(self.ldb_admin, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
|
---|
828 | Dn(self.ldb_admin, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
|
---|
829 |
|
---|
830 | #should not see ou3 and ou4, but should see ou5 and ou6
|
---|
831 | res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
|
---|
832 | scope=SCOPE_SUBTREE)
|
---|
833 | self.assertEquals(len(res), 4)
|
---|
834 | res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
|
---|
835 | self.assertEquals(sorted(res_list), sorted(ok_list))
|
---|
836 |
|
---|
837 | res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
|
---|
838 | scope=SCOPE_SUBTREE)
|
---|
839 | self.assertEquals(len(res), 4)
|
---|
840 | res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
|
---|
841 | self.assertEquals(sorted(res_list), sorted(ok_list))
|
---|
842 |
|
---|
843 | def test_search4(self):
|
---|
844 | """There is no difference in visibility if the user is also creator"""
|
---|
845 | self.create_clean_ou("OU=ou1," + self.base_dn)
|
---|
846 | mod = "(A;CI;CC;;;%s)" % (str(self.user_sid))
|
---|
847 | self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
|
---|
848 | tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
|
---|
849 | self.domain_sid)
|
---|
850 | self.ldb_user.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
|
---|
851 | self.ldb_user.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
|
---|
852 | self.ldb_user.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
|
---|
853 | self.ldb_user.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
|
---|
854 | self.ldb_user.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
|
---|
855 |
|
---|
856 | ok_list = [Dn(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn),
|
---|
857 | Dn(self.ldb_admin, "OU=ou1," + self.base_dn)]
|
---|
858 | res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
|
---|
859 | scope=SCOPE_SUBTREE)
|
---|
860 | self.assertEquals(len(res), 2)
|
---|
861 | res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
|
---|
862 | self.assertEquals(sorted(res_list), sorted(ok_list))
|
---|
863 |
|
---|
864 | res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
|
---|
865 | scope=SCOPE_SUBTREE)
|
---|
866 | self.assertEquals(len(res), 2)
|
---|
867 | res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
|
---|
868 | self.assertEquals(sorted(res_list), sorted(ok_list))
|
---|
869 |
|
---|
870 | def test_search5(self):
|
---|
871 | """Make sure users can see only attributes they are allowed to see"""
|
---|
872 | self.create_clean_ou("OU=ou1," + self.base_dn)
|
---|
873 | mod = "(A;CI;LC;;;%s)" % (str(self.user_sid))
|
---|
874 | self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
|
---|
875 | tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
|
---|
876 | self.domain_sid)
|
---|
877 | self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
|
---|
878 | # assert user can only see dn
|
---|
879 | res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
|
---|
880 | scope=SCOPE_SUBTREE)
|
---|
881 | ok_list = ['dn']
|
---|
882 | self.assertEquals(len(res), 1)
|
---|
883 | res_list = res[0].keys()
|
---|
884 | self.assertEquals(res_list, ok_list)
|
---|
885 |
|
---|
886 | res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
|
---|
887 | scope=SCOPE_BASE, attrs=["ou"])
|
---|
888 |
|
---|
889 | self.assertEquals(len(res), 1)
|
---|
890 | res_list = res[0].keys()
|
---|
891 | self.assertEquals(res_list, ok_list)
|
---|
892 |
|
---|
893 | #give read property on ou and assert user can only see dn and ou
|
---|
894 | mod = "(OA;;RP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % (str(self.user_sid))
|
---|
895 | self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
|
---|
896 | self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
|
---|
897 | res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
|
---|
898 | scope=SCOPE_SUBTREE)
|
---|
899 | ok_list = ['dn', 'ou']
|
---|
900 | self.assertEquals(len(res), 1)
|
---|
901 | res_list = res[0].keys()
|
---|
902 | self.assertEquals(sorted(res_list), sorted(ok_list))
|
---|
903 |
|
---|
904 | #give read property on Public Information and assert user can see ou and other members
|
---|
905 | mod = "(OA;;RP;e48d0154-bcf8-11d1-8702-00c04fb96050;;%s)" % (str(self.user_sid))
|
---|
906 | self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
|
---|
907 | self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
|
---|
908 | res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
|
---|
909 | scope=SCOPE_SUBTREE)
|
---|
910 |
|
---|
911 | ok_list = ['dn', 'objectClass', 'ou', 'distinguishedName', 'name', 'objectGUID', 'objectCategory']
|
---|
912 | res_list = res[0].keys()
|
---|
913 | self.assertEquals(sorted(res_list), sorted(ok_list))
|
---|
914 |
|
---|
915 | def test_search6(self):
|
---|
916 | """If an attribute that cannot be read is used in a filter, it is as if the attribute does not exist"""
|
---|
917 | self.create_clean_ou("OU=ou1," + self.base_dn)
|
---|
918 | mod = "(A;CI;LCCC;;;%s)" % (str(self.user_sid))
|
---|
919 | self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
|
---|
920 | tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
|
---|
921 | self.domain_sid)
|
---|
922 | self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
|
---|
923 | self.ldb_user.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
|
---|
924 |
|
---|
925 | res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou3)",
|
---|
926 | scope=SCOPE_SUBTREE)
|
---|
927 | #nothing should be returned as ou is not accessible
|
---|
928 | self.assertEquals(len(res), 0)
|
---|
929 |
|
---|
930 | #give read property on ou and assert user can only see dn and ou
|
---|
931 | mod = "(OA;;RP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % (str(self.user_sid))
|
---|
932 | self.sd_utils.dacl_add_ace("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, mod)
|
---|
933 | res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou3)",
|
---|
934 | scope=SCOPE_SUBTREE)
|
---|
935 | self.assertEquals(len(res), 1)
|
---|
936 | ok_list = ['dn', 'ou']
|
---|
937 | res_list = res[0].keys()
|
---|
938 | self.assertEquals(sorted(res_list), sorted(ok_list))
|
---|
939 |
|
---|
940 | #give read property on Public Information and assert user can see ou and other members
|
---|
941 | mod = "(OA;;RP;e48d0154-bcf8-11d1-8702-00c04fb96050;;%s)" % (str(self.user_sid))
|
---|
942 | self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
|
---|
943 | res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou2)",
|
---|
944 | scope=SCOPE_SUBTREE)
|
---|
945 | self.assertEquals(len(res), 1)
|
---|
946 | ok_list = ['dn', 'objectClass', 'ou', 'distinguishedName', 'name', 'objectGUID', 'objectCategory']
|
---|
947 | res_list = res[0].keys()
|
---|
948 | self.assertEquals(sorted(res_list), sorted(ok_list))
|
---|
949 |
|
---|
950 | #tests on ldap delete operations
|
---|
951 | class AclDeleteTests(AclTests):
|
---|
952 |
|
---|
953 | def setUp(self):
|
---|
954 | super(AclDeleteTests, self).setUp()
|
---|
955 | self.regular_user = "acl_delete_user1"
|
---|
956 | # Create regular user
|
---|
957 | self.ldb_admin.newuser(self.regular_user, self.user_pass)
|
---|
958 | self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
|
---|
959 |
|
---|
960 | def tearDown(self):
|
---|
961 | super(AclDeleteTests, self).tearDown()
|
---|
962 | delete_force(self.ldb_admin, self.get_user_dn("test_delete_user1"))
|
---|
963 | delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
|
---|
964 | delete_force(self.ldb_admin, self.get_user_dn("test_anonymous"))
|
---|
965 |
|
---|
966 | def test_delete_u1(self):
|
---|
967 | """User is prohibited by default to delete another User object"""
|
---|
968 | # Create user that we try to delete
|
---|
969 | self.ldb_admin.newuser("test_delete_user1", self.user_pass)
|
---|
970 | # Here delete User object should ALWAYS through exception
|
---|
971 | try:
|
---|
972 | self.ldb_user.delete(self.get_user_dn("test_delete_user1"))
|
---|
973 | except LdbError, (num, _):
|
---|
974 | self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
|
---|
975 | else:
|
---|
976 | self.fail()
|
---|
977 |
|
---|
978 | def test_delete_u2(self):
|
---|
979 | """User's group has RIGHT_DELETE to another User object"""
|
---|
980 | user_dn = self.get_user_dn("test_delete_user1")
|
---|
981 | # Create user that we try to delete
|
---|
982 | self.ldb_admin.newuser("test_delete_user1", self.user_pass)
|
---|
983 | mod = "(A;;SD;;;AU)"
|
---|
984 | self.sd_utils.dacl_add_ace(user_dn, mod)
|
---|
985 | # Try to delete User object
|
---|
986 | self.ldb_user.delete(user_dn)
|
---|
987 | res = self.ldb_admin.search(self.base_dn,
|
---|
988 | expression="(distinguishedName=%s)" % user_dn)
|
---|
989 | self.assertEqual(len(res), 0)
|
---|
990 |
|
---|
991 | def test_delete_u3(self):
|
---|
992 | """User indentified by SID has RIGHT_DELETE to another User object"""
|
---|
993 | user_dn = self.get_user_dn("test_delete_user1")
|
---|
994 | # Create user that we try to delete
|
---|
995 | self.ldb_admin.newuser("test_delete_user1", self.user_pass)
|
---|
996 | mod = "(A;;SD;;;%s)" % self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
|
---|
997 | self.sd_utils.dacl_add_ace(user_dn, mod)
|
---|
998 | # Try to delete User object
|
---|
999 | self.ldb_user.delete(user_dn)
|
---|
1000 | res = self.ldb_admin.search(self.base_dn,
|
---|
1001 | expression="(distinguishedName=%s)" % user_dn)
|
---|
1002 | self.assertEqual(len(res), 0)
|
---|
1003 |
|
---|
1004 | def test_delete_anonymous(self):
|
---|
1005 | """Test add operation with anonymous user"""
|
---|
1006 | anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
|
---|
1007 | self.ldb_admin.newuser("test_anonymous", "samba123@")
|
---|
1008 |
|
---|
1009 | try:
|
---|
1010 | anonymous.delete(self.get_user_dn("test_anonymous"))
|
---|
1011 | except LdbError, (num, _):
|
---|
1012 | self.assertEquals(num, ERR_OPERATIONS_ERROR)
|
---|
1013 | else:
|
---|
1014 | self.fail()
|
---|
1015 |
|
---|
1016 | #tests on ldap rename operations
|
---|
1017 | class AclRenameTests(AclTests):
|
---|
1018 |
|
---|
1019 | def setUp(self):
|
---|
1020 | super(AclRenameTests, self).setUp()
|
---|
1021 | self.regular_user = "acl_rename_user1"
|
---|
1022 | self.ou1 = "OU=test_rename_ou1"
|
---|
1023 | self.ou2 = "OU=test_rename_ou2"
|
---|
1024 | self.ou3 = "OU=test_rename_ou3,%s" % self.ou2
|
---|
1025 | self.testuser1 = "test_rename_user1"
|
---|
1026 | self.testuser2 = "test_rename_user2"
|
---|
1027 | self.testuser3 = "test_rename_user3"
|
---|
1028 | self.testuser4 = "test_rename_user4"
|
---|
1029 | self.testuser5 = "test_rename_user5"
|
---|
1030 | # Create regular user
|
---|
1031 | self.ldb_admin.newuser(self.regular_user, self.user_pass)
|
---|
1032 | self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
|
---|
1033 |
|
---|
1034 | def tearDown(self):
|
---|
1035 | super(AclRenameTests, self).tearDown()
|
---|
1036 | # Rename OU3
|
---|
1037 | delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser1, self.ou3, self.base_dn))
|
---|
1038 | delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser2, self.ou3, self.base_dn))
|
---|
1039 | delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser5, self.ou3, self.base_dn))
|
---|
1040 | delete_force(self.ldb_admin, "%s,%s" % (self.ou3, self.base_dn))
|
---|
1041 | # Rename OU2
|
---|
1042 | delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser1, self.ou2, self.base_dn))
|
---|
1043 | delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser2, self.ou2, self.base_dn))
|
---|
1044 | delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser5, self.ou2, self.base_dn))
|
---|
1045 | delete_force(self.ldb_admin, "%s,%s" % (self.ou2, self.base_dn))
|
---|
1046 | # Rename OU1
|
---|
1047 | delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser1, self.ou1, self.base_dn))
|
---|
1048 | delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser2, self.ou1, self.base_dn))
|
---|
1049 | delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser5, self.ou1, self.base_dn))
|
---|
1050 | delete_force(self.ldb_admin, "OU=test_rename_ou3,%s,%s" % (self.ou1, self.base_dn))
|
---|
1051 | delete_force(self.ldb_admin, "%s,%s" % (self.ou1, self.base_dn))
|
---|
1052 | delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
|
---|
1053 |
|
---|
1054 | def test_rename_u1(self):
|
---|
1055 | """Regular user fails to rename 'User object' within single OU"""
|
---|
1056 | # Create OU structure
|
---|
1057 | self.ldb_admin.create_ou("OU=test_rename_ou1," + self.base_dn)
|
---|
1058 | self.ldb_admin.newuser(self.testuser1, self.user_pass, userou=self.ou1)
|
---|
1059 | try:
|
---|
1060 | self.ldb_user.rename("CN=%s,%s,%s" % (self.testuser1, self.ou1, self.base_dn), \
|
---|
1061 | "CN=%s,%s,%s" % (self.testuser5, self.ou1, self.base_dn))
|
---|
1062 | except LdbError, (num, _):
|
---|
1063 | self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
|
---|
1064 | else:
|
---|
1065 | self.fail()
|
---|
1066 |
|
---|
1067 | def test_rename_u2(self):
|
---|
1068 | """Grant WRITE_PROPERTY to AU so regular user can rename 'User object' within single OU"""
|
---|
1069 | ou_dn = "OU=test_rename_ou1," + self.base_dn
|
---|
1070 | user_dn = "CN=test_rename_user1," + ou_dn
|
---|
1071 | rename_user_dn = "CN=test_rename_user5," + ou_dn
|
---|
1072 | # Create OU structure
|
---|
1073 | self.ldb_admin.create_ou(ou_dn)
|
---|
1074 | self.ldb_admin.newuser(self.testuser1, self.user_pass, userou=self.ou1)
|
---|
1075 | mod = "(A;;WP;;;AU)"
|
---|
1076 | self.sd_utils.dacl_add_ace(user_dn, mod)
|
---|
1077 | # Rename 'User object' having WP to AU
|
---|
1078 | self.ldb_user.rename(user_dn, rename_user_dn)
|
---|
1079 | res = self.ldb_admin.search(self.base_dn,
|
---|
1080 | expression="(distinguishedName=%s)" % user_dn)
|
---|
1081 | self.assertEqual(len(res), 0)
|
---|
1082 | res = self.ldb_admin.search(self.base_dn,
|
---|
1083 | expression="(distinguishedName=%s)" % rename_user_dn)
|
---|
1084 | self.assertNotEqual(len(res), 0)
|
---|
1085 |
|
---|
1086 | def test_rename_u3(self):
|
---|
1087 | """Test rename with rights granted to 'User object' SID"""
|
---|
1088 | ou_dn = "OU=test_rename_ou1," + self.base_dn
|
---|
1089 | user_dn = "CN=test_rename_user1," + ou_dn
|
---|
1090 | rename_user_dn = "CN=test_rename_user5," + ou_dn
|
---|
1091 | # Create OU structure
|
---|
1092 | self.ldb_admin.create_ou(ou_dn)
|
---|
1093 | self.ldb_admin.newuser(self.testuser1, self.user_pass, userou=self.ou1)
|
---|
1094 | sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
|
---|
1095 | mod = "(A;;WP;;;%s)" % str(sid)
|
---|
1096 | self.sd_utils.dacl_add_ace(user_dn, mod)
|
---|
1097 | # Rename 'User object' having WP to AU
|
---|
1098 | self.ldb_user.rename(user_dn, rename_user_dn)
|
---|
1099 | res = self.ldb_admin.search(self.base_dn,
|
---|
1100 | expression="(distinguishedName=%s)" % user_dn)
|
---|
1101 | self.assertEqual(len(res), 0)
|
---|
1102 | res = self.ldb_admin.search(self.base_dn,
|
---|
1103 | expression="(distinguishedName=%s)" % rename_user_dn)
|
---|
1104 | self.assertNotEqual(len(res), 0)
|
---|
1105 |
|
---|
1106 | def test_rename_u4(self):
|
---|
1107 | """Rename 'User object' cross OU with WP, SD and CC right granted on reg. user to AU"""
|
---|
1108 | ou1_dn = "OU=test_rename_ou1," + self.base_dn
|
---|
1109 | ou2_dn = "OU=test_rename_ou2," + self.base_dn
|
---|
1110 | user_dn = "CN=test_rename_user2," + ou1_dn
|
---|
1111 | rename_user_dn = "CN=test_rename_user5," + ou2_dn
|
---|
1112 | # Create OU structure
|
---|
1113 | self.ldb_admin.create_ou(ou1_dn)
|
---|
1114 | self.ldb_admin.create_ou(ou2_dn)
|
---|
1115 | self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
|
---|
1116 | mod = "(A;;WPSD;;;AU)"
|
---|
1117 | self.sd_utils.dacl_add_ace(user_dn, mod)
|
---|
1118 | mod = "(A;;CC;;;AU)"
|
---|
1119 | self.sd_utils.dacl_add_ace(ou2_dn, mod)
|
---|
1120 | # Rename 'User object' having SD and CC to AU
|
---|
1121 | self.ldb_user.rename(user_dn, rename_user_dn)
|
---|
1122 | res = self.ldb_admin.search(self.base_dn,
|
---|
1123 | expression="(distinguishedName=%s)" % user_dn)
|
---|
1124 | self.assertEqual(len(res), 0)
|
---|
1125 | res = self.ldb_admin.search(self.base_dn,
|
---|
1126 | expression="(distinguishedName=%s)" % rename_user_dn)
|
---|
1127 | self.assertNotEqual(len(res), 0)
|
---|
1128 |
|
---|
1129 | def test_rename_u5(self):
|
---|
1130 | """Test rename with rights granted to 'User object' SID"""
|
---|
1131 | ou1_dn = "OU=test_rename_ou1," + self.base_dn
|
---|
1132 | ou2_dn = "OU=test_rename_ou2," + self.base_dn
|
---|
1133 | user_dn = "CN=test_rename_user2," + ou1_dn
|
---|
1134 | rename_user_dn = "CN=test_rename_user5," + ou2_dn
|
---|
1135 | # Create OU structure
|
---|
1136 | self.ldb_admin.create_ou(ou1_dn)
|
---|
1137 | self.ldb_admin.create_ou(ou2_dn)
|
---|
1138 | self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
|
---|
1139 | sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
|
---|
1140 | mod = "(A;;WPSD;;;%s)" % str(sid)
|
---|
1141 | self.sd_utils.dacl_add_ace(user_dn, mod)
|
---|
1142 | mod = "(A;;CC;;;%s)" % str(sid)
|
---|
1143 | self.sd_utils.dacl_add_ace(ou2_dn, mod)
|
---|
1144 | # Rename 'User object' having SD and CC to AU
|
---|
1145 | self.ldb_user.rename(user_dn, rename_user_dn)
|
---|
1146 | res = self.ldb_admin.search(self.base_dn,
|
---|
1147 | expression="(distinguishedName=%s)" % user_dn)
|
---|
1148 | self.assertEqual(len(res), 0)
|
---|
1149 | res = self.ldb_admin.search(self.base_dn,
|
---|
1150 | expression="(distinguishedName=%s)" % rename_user_dn)
|
---|
1151 | self.assertNotEqual(len(res), 0)
|
---|
1152 |
|
---|
1153 | def test_rename_u6(self):
|
---|
1154 | """Rename 'User object' cross OU with WP, DC and CC right granted on OU & user to AU"""
|
---|
1155 | ou1_dn = "OU=test_rename_ou1," + self.base_dn
|
---|
1156 | ou2_dn = "OU=test_rename_ou2," + self.base_dn
|
---|
1157 | user_dn = "CN=test_rename_user2," + ou1_dn
|
---|
1158 | rename_user_dn = "CN=test_rename_user2," + ou2_dn
|
---|
1159 | # Create OU structure
|
---|
1160 | self.ldb_admin.create_ou(ou1_dn)
|
---|
1161 | self.ldb_admin.create_ou(ou2_dn)
|
---|
1162 | #mod = "(A;CI;DCWP;;;AU)"
|
---|
1163 | mod = "(A;;DC;;;AU)"
|
---|
1164 | self.sd_utils.dacl_add_ace(ou1_dn, mod)
|
---|
1165 | mod = "(A;;CC;;;AU)"
|
---|
1166 | self.sd_utils.dacl_add_ace(ou2_dn, mod)
|
---|
1167 | self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
|
---|
1168 | mod = "(A;;WP;;;AU)"
|
---|
1169 | self.sd_utils.dacl_add_ace(user_dn, mod)
|
---|
1170 | # Rename 'User object' having SD and CC to AU
|
---|
1171 | self.ldb_user.rename(user_dn, rename_user_dn)
|
---|
1172 | res = self.ldb_admin.search(self.base_dn,
|
---|
1173 | expression="(distinguishedName=%s)" % user_dn)
|
---|
1174 | self.assertEqual(len(res), 0)
|
---|
1175 | res = self.ldb_admin.search(self.base_dn,
|
---|
1176 | expression="(distinguishedName=%s)" % rename_user_dn)
|
---|
1177 | self.assertNotEqual(len(res), 0)
|
---|
1178 |
|
---|
1179 | def test_rename_u7(self):
|
---|
1180 | """Rename 'User object' cross OU (second level) with WP, DC and CC right granted on OU to AU"""
|
---|
1181 | ou1_dn = "OU=test_rename_ou1," + self.base_dn
|
---|
1182 | ou2_dn = "OU=test_rename_ou2," + self.base_dn
|
---|
1183 | ou3_dn = "OU=test_rename_ou3," + ou2_dn
|
---|
1184 | user_dn = "CN=test_rename_user2," + ou1_dn
|
---|
1185 | rename_user_dn = "CN=test_rename_user5," + ou3_dn
|
---|
1186 | # Create OU structure
|
---|
1187 | self.ldb_admin.create_ou(ou1_dn)
|
---|
1188 | self.ldb_admin.create_ou(ou2_dn)
|
---|
1189 | self.ldb_admin.create_ou(ou3_dn)
|
---|
1190 | mod = "(A;CI;WPDC;;;AU)"
|
---|
1191 | self.sd_utils.dacl_add_ace(ou1_dn, mod)
|
---|
1192 | mod = "(A;;CC;;;AU)"
|
---|
1193 | self.sd_utils.dacl_add_ace(ou3_dn, mod)
|
---|
1194 | self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
|
---|
1195 | # Rename 'User object' having SD and CC to AU
|
---|
1196 | self.ldb_user.rename(user_dn, rename_user_dn)
|
---|
1197 | res = self.ldb_admin.search(self.base_dn,
|
---|
1198 | expression="(distinguishedName=%s)" % user_dn)
|
---|
1199 | self.assertEqual(len(res), 0)
|
---|
1200 | res = self.ldb_admin.search(self.base_dn,
|
---|
1201 | expression="(distinguishedName=%s)" % rename_user_dn)
|
---|
1202 | self.assertNotEqual(len(res), 0)
|
---|
1203 |
|
---|
1204 | def test_rename_u8(self):
|
---|
1205 | """Test rename on an object with and without modify access on the RDN attribute"""
|
---|
1206 | ou1_dn = "OU=test_rename_ou1," + self.base_dn
|
---|
1207 | ou2_dn = "OU=test_rename_ou2," + ou1_dn
|
---|
1208 | ou3_dn = "OU=test_rename_ou3," + ou1_dn
|
---|
1209 | # Create OU structure
|
---|
1210 | self.ldb_admin.create_ou(ou1_dn)
|
---|
1211 | self.ldb_admin.create_ou(ou2_dn)
|
---|
1212 | sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
|
---|
1213 | mod = "(OA;;WP;bf967a0e-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
|
---|
1214 | self.sd_utils.dacl_add_ace(ou2_dn, mod)
|
---|
1215 | mod = "(OD;;WP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
|
---|
1216 | self.sd_utils.dacl_add_ace(ou2_dn, mod)
|
---|
1217 | try:
|
---|
1218 | self.ldb_user.rename(ou2_dn, ou3_dn)
|
---|
1219 | except LdbError, (num, _):
|
---|
1220 | self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
|
---|
1221 | else:
|
---|
1222 | # This rename operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
|
---|
1223 | self.fail()
|
---|
1224 | sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
|
---|
1225 | mod = "(A;;WP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
|
---|
1226 | self.sd_utils.dacl_add_ace(ou2_dn, mod)
|
---|
1227 | self.ldb_user.rename(ou2_dn, ou3_dn)
|
---|
1228 | res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % ou2_dn)
|
---|
1229 | self.assertEqual(len(res), 0)
|
---|
1230 | res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % ou3_dn)
|
---|
1231 | self.assertNotEqual(len(res), 0)
|
---|
1232 |
|
---|
1233 | #tests on Control Access Rights
|
---|
1234 | class AclCARTests(AclTests):
|
---|
1235 |
|
---|
1236 | def setUp(self):
|
---|
1237 | super(AclCARTests, self).setUp()
|
---|
1238 | self.user_with_wp = "acl_car_user1"
|
---|
1239 | self.user_with_pc = "acl_car_user2"
|
---|
1240 | self.ldb_admin.newuser(self.user_with_wp, self.user_pass)
|
---|
1241 | self.ldb_admin.newuser(self.user_with_pc, self.user_pass)
|
---|
1242 | self.ldb_user = self.get_ldb_connection(self.user_with_wp, self.user_pass)
|
---|
1243 | self.ldb_user2 = self.get_ldb_connection(self.user_with_pc, self.user_pass)
|
---|
1244 |
|
---|
1245 | def tearDown(self):
|
---|
1246 | super(AclCARTests, self).tearDown()
|
---|
1247 | delete_force(self.ldb_admin, self.get_user_dn(self.user_with_wp))
|
---|
1248 | delete_force(self.ldb_admin, self.get_user_dn(self.user_with_pc))
|
---|
1249 |
|
---|
1250 | def test_change_password1(self):
|
---|
1251 | """Try a password change operation without any CARs given"""
|
---|
1252 | #users have change password by default - remove for negative testing
|
---|
1253 | desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
|
---|
1254 | sddl = desc.as_sddl(self.domain_sid)
|
---|
1255 | sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
|
---|
1256 | sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
|
---|
1257 | self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
|
---|
1258 | try:
|
---|
1259 | self.ldb_user.modify_ldif("""
|
---|
1260 | dn: """ + self.get_user_dn(self.user_with_wp) + """
|
---|
1261 | changetype: modify
|
---|
1262 | delete: unicodePwd
|
---|
1263 | unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
|
---|
1264 | add: unicodePwd
|
---|
1265 | unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
|
---|
1266 | """)
|
---|
1267 | except LdbError, (num, _):
|
---|
1268 | self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
|
---|
1269 | else:
|
---|
1270 | # for some reason we get constraint violation instead of insufficient access error
|
---|
1271 | self.fail()
|
---|
1272 |
|
---|
1273 | def test_change_password2(self):
|
---|
1274 | """Make sure WP has no influence"""
|
---|
1275 | desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
|
---|
1276 | sddl = desc.as_sddl(self.domain_sid)
|
---|
1277 | sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
|
---|
1278 | sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
|
---|
1279 | self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
|
---|
1280 | mod = "(A;;WP;;;PS)"
|
---|
1281 | self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
|
---|
1282 | desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
|
---|
1283 | sddl = desc.as_sddl(self.domain_sid)
|
---|
1284 | try:
|
---|
1285 | self.ldb_user.modify_ldif("""
|
---|
1286 | dn: """ + self.get_user_dn(self.user_with_wp) + """
|
---|
1287 | changetype: modify
|
---|
1288 | delete: unicodePwd
|
---|
1289 | unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
|
---|
1290 | add: unicodePwd
|
---|
1291 | unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
|
---|
1292 | """)
|
---|
1293 | except LdbError, (num, _):
|
---|
1294 | self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
|
---|
1295 | else:
|
---|
1296 | # for some reason we get constraint violation instead of insufficient access error
|
---|
1297 | self.fail()
|
---|
1298 |
|
---|
1299 | def test_change_password3(self):
|
---|
1300 | """Make sure WP has no influence"""
|
---|
1301 | mod = "(D;;WP;;;PS)"
|
---|
1302 | self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
|
---|
1303 | desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
|
---|
1304 | sddl = desc.as_sddl(self.domain_sid)
|
---|
1305 | self.ldb_user.modify_ldif("""
|
---|
1306 | dn: """ + self.get_user_dn(self.user_with_wp) + """
|
---|
1307 | changetype: modify
|
---|
1308 | delete: unicodePwd
|
---|
1309 | unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
|
---|
1310 | add: unicodePwd
|
---|
1311 | unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
|
---|
1312 | """)
|
---|
1313 |
|
---|
1314 | def test_change_password5(self):
|
---|
1315 | """Make sure rights have no influence on dBCSPwd"""
|
---|
1316 | desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
|
---|
1317 | sddl = desc.as_sddl(self.domain_sid)
|
---|
1318 | sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
|
---|
1319 | sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
|
---|
1320 | self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
|
---|
1321 | mod = "(D;;WP;;;PS)"
|
---|
1322 | self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
|
---|
1323 | try:
|
---|
1324 | self.ldb_user.modify_ldif("""
|
---|
1325 | dn: """ + self.get_user_dn(self.user_with_wp) + """
|
---|
1326 | changetype: modify
|
---|
1327 | delete: dBCSPwd
|
---|
1328 | dBCSPwd: XXXXXXXXXXXXXXXX
|
---|
1329 | add: dBCSPwd
|
---|
1330 | dBCSPwd: YYYYYYYYYYYYYYYY
|
---|
1331 | """)
|
---|
1332 | except LdbError, (num, _):
|
---|
1333 | self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
|
---|
1334 | else:
|
---|
1335 | self.fail()
|
---|
1336 |
|
---|
1337 | def test_change_password6(self):
|
---|
1338 | """Test uneven delete/adds"""
|
---|
1339 | try:
|
---|
1340 | self.ldb_user.modify_ldif("""
|
---|
1341 | dn: """ + self.get_user_dn(self.user_with_wp) + """
|
---|
1342 | changetype: modify
|
---|
1343 | delete: userPassword
|
---|
1344 | userPassword: thatsAcomplPASS1
|
---|
1345 | delete: userPassword
|
---|
1346 | userPassword: thatsAcomplPASS1
|
---|
1347 | add: userPassword
|
---|
1348 | userPassword: thatsAcomplPASS2
|
---|
1349 | """)
|
---|
1350 | except LdbError, (num, _):
|
---|
1351 | self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
|
---|
1352 | else:
|
---|
1353 | self.fail()
|
---|
1354 | mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
|
---|
1355 | self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
|
---|
1356 | try:
|
---|
1357 | self.ldb_user.modify_ldif("""
|
---|
1358 | dn: """ + self.get_user_dn(self.user_with_wp) + """
|
---|
1359 | changetype: modify
|
---|
1360 | delete: userPassword
|
---|
1361 | userPassword: thatsAcomplPASS1
|
---|
1362 | delete: userPassword
|
---|
1363 | userPassword: thatsAcomplPASS1
|
---|
1364 | add: userPassword
|
---|
1365 | userPassword: thatsAcomplPASS2
|
---|
1366 | """)
|
---|
1367 | # This fails on Windows 2000 domain level with constraint violation
|
---|
1368 | except LdbError, (num, _):
|
---|
1369 | self.assertTrue(num == ERR_CONSTRAINT_VIOLATION or
|
---|
1370 | num == ERR_UNWILLING_TO_PERFORM)
|
---|
1371 | else:
|
---|
1372 | self.fail()
|
---|
1373 |
|
---|
1374 |
|
---|
1375 | def test_change_password7(self):
|
---|
1376 | """Try a password change operation without any CARs given"""
|
---|
1377 | #users have change password by default - remove for negative testing
|
---|
1378 | desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
|
---|
1379 | sddl = desc.as_sddl(self.domain_sid)
|
---|
1380 | self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
|
---|
1381 | #first change our own password
|
---|
1382 | self.ldb_user2.modify_ldif("""
|
---|
1383 | dn: """ + self.get_user_dn(self.user_with_pc) + """
|
---|
1384 | changetype: modify
|
---|
1385 | delete: unicodePwd
|
---|
1386 | unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
|
---|
1387 | add: unicodePwd
|
---|
1388 | unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
|
---|
1389 | """)
|
---|
1390 | #then someone else's
|
---|
1391 | self.ldb_user2.modify_ldif("""
|
---|
1392 | dn: """ + self.get_user_dn(self.user_with_wp) + """
|
---|
1393 | changetype: modify
|
---|
1394 | delete: unicodePwd
|
---|
1395 | unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
|
---|
1396 | add: unicodePwd
|
---|
1397 | unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
|
---|
1398 | """)
|
---|
1399 |
|
---|
1400 | def test_reset_password1(self):
|
---|
1401 | """Try a user password reset operation (unicodePwd) before and after granting CAR"""
|
---|
1402 | try:
|
---|
1403 | self.ldb_user.modify_ldif("""
|
---|
1404 | dn: """ + self.get_user_dn(self.user_with_wp) + """
|
---|
1405 | changetype: modify
|
---|
1406 | replace: unicodePwd
|
---|
1407 | unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
|
---|
1408 | """)
|
---|
1409 | except LdbError, (num, _):
|
---|
1410 | self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
|
---|
1411 | else:
|
---|
1412 | self.fail()
|
---|
1413 | mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
|
---|
1414 | self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
|
---|
1415 | self.ldb_user.modify_ldif("""
|
---|
1416 | dn: """ + self.get_user_dn(self.user_with_wp) + """
|
---|
1417 | changetype: modify
|
---|
1418 | replace: unicodePwd
|
---|
1419 | unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
|
---|
1420 | """)
|
---|
1421 |
|
---|
1422 | def test_reset_password2(self):
|
---|
1423 | """Try a user password reset operation (userPassword) before and after granting CAR"""
|
---|
1424 | try:
|
---|
1425 | self.ldb_user.modify_ldif("""
|
---|
1426 | dn: """ + self.get_user_dn(self.user_with_wp) + """
|
---|
1427 | changetype: modify
|
---|
1428 | replace: userPassword
|
---|
1429 | userPassword: thatsAcomplPASS1
|
---|
1430 | """)
|
---|
1431 | except LdbError, (num, _):
|
---|
1432 | self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
|
---|
1433 | else:
|
---|
1434 | self.fail()
|
---|
1435 | mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
|
---|
1436 | self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
|
---|
1437 | try:
|
---|
1438 | self.ldb_user.modify_ldif("""
|
---|
1439 | dn: """ + self.get_user_dn(self.user_with_wp) + """
|
---|
1440 | changetype: modify
|
---|
1441 | replace: userPassword
|
---|
1442 | userPassword: thatsAcomplPASS1
|
---|
1443 | """)
|
---|
1444 | # This fails on Windows 2000 domain level with constraint violation
|
---|
1445 | except LdbError, (num, _):
|
---|
1446 | self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
|
---|
1447 |
|
---|
1448 | def test_reset_password3(self):
|
---|
1449 | """Grant WP and see what happens (unicodePwd)"""
|
---|
1450 | mod = "(A;;WP;;;PS)"
|
---|
1451 | self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
|
---|
1452 | try:
|
---|
1453 | self.ldb_user.modify_ldif("""
|
---|
1454 | dn: """ + self.get_user_dn(self.user_with_wp) + """
|
---|
1455 | changetype: modify
|
---|
1456 | replace: unicodePwd
|
---|
1457 | unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
|
---|
1458 | """)
|
---|
1459 | except LdbError, (num, _):
|
---|
1460 | self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
|
---|
1461 | else:
|
---|
1462 | self.fail()
|
---|
1463 |
|
---|
1464 | def test_reset_password4(self):
|
---|
1465 | """Grant WP and see what happens (userPassword)"""
|
---|
1466 | mod = "(A;;WP;;;PS)"
|
---|
1467 | self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
|
---|
1468 | try:
|
---|
1469 | self.ldb_user.modify_ldif("""
|
---|
1470 | dn: """ + self.get_user_dn(self.user_with_wp) + """
|
---|
1471 | changetype: modify
|
---|
1472 | replace: userPassword
|
---|
1473 | userPassword: thatsAcomplPASS1
|
---|
1474 | """)
|
---|
1475 | except LdbError, (num, _):
|
---|
1476 | self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
|
---|
1477 | else:
|
---|
1478 | self.fail()
|
---|
1479 |
|
---|
1480 | def test_reset_password5(self):
|
---|
1481 | """Explicitly deny WP but grant CAR (unicodePwd)"""
|
---|
1482 | mod = "(D;;WP;;;PS)(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
|
---|
1483 | self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
|
---|
1484 | self.ldb_user.modify_ldif("""
|
---|
1485 | dn: """ + self.get_user_dn(self.user_with_wp) + """
|
---|
1486 | changetype: modify
|
---|
1487 | replace: unicodePwd
|
---|
1488 | unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
|
---|
1489 | """)
|
---|
1490 |
|
---|
1491 | def test_reset_password6(self):
|
---|
1492 | """Explicitly deny WP but grant CAR (userPassword)"""
|
---|
1493 | mod = "(D;;WP;;;PS)(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
|
---|
1494 | self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
|
---|
1495 | try:
|
---|
1496 | self.ldb_user.modify_ldif("""
|
---|
1497 | dn: """ + self.get_user_dn(self.user_with_wp) + """
|
---|
1498 | changetype: modify
|
---|
1499 | replace: userPassword
|
---|
1500 | userPassword: thatsAcomplPASS1
|
---|
1501 | """)
|
---|
1502 | # This fails on Windows 2000 domain level with constraint violation
|
---|
1503 | except LdbError, (num, _):
|
---|
1504 | self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
|
---|
1505 |
|
---|
1506 | class AclExtendedTests(AclTests):
|
---|
1507 |
|
---|
1508 | def setUp(self):
|
---|
1509 | super(AclExtendedTests, self).setUp()
|
---|
1510 | #regular user, will be the creator
|
---|
1511 | self.u1 = "ext_u1"
|
---|
1512 | #regular user
|
---|
1513 | self.u2 = "ext_u2"
|
---|
1514 | #admin user
|
---|
1515 | self.u3 = "ext_u3"
|
---|
1516 | self.ldb_admin.newuser(self.u1, self.user_pass)
|
---|
1517 | self.ldb_admin.newuser(self.u2, self.user_pass)
|
---|
1518 | self.ldb_admin.newuser(self.u3, self.user_pass)
|
---|
1519 | self.ldb_admin.add_remove_group_members("Domain Admins", self.u3,
|
---|
1520 | add_members_operation=True)
|
---|
1521 | self.ldb_user1 = self.get_ldb_connection(self.u1, self.user_pass)
|
---|
1522 | self.ldb_user2 = self.get_ldb_connection(self.u2, self.user_pass)
|
---|
1523 | self.ldb_user3 = self.get_ldb_connection(self.u3, self.user_pass)
|
---|
1524 | self.user_sid1 = self.sd_utils.get_object_sid(self.get_user_dn(self.u1))
|
---|
1525 | self.user_sid2 = self.sd_utils.get_object_sid(self.get_user_dn(self.u2))
|
---|
1526 |
|
---|
1527 | def tearDown(self):
|
---|
1528 | super(AclExtendedTests, self).tearDown()
|
---|
1529 | delete_force(self.ldb_admin, self.get_user_dn(self.u1))
|
---|
1530 | delete_force(self.ldb_admin, self.get_user_dn(self.u2))
|
---|
1531 | delete_force(self.ldb_admin, self.get_user_dn(self.u3))
|
---|
1532 | delete_force(self.ldb_admin, "CN=ext_group1,OU=ext_ou1," + self.base_dn)
|
---|
1533 | delete_force(self.ldb_admin, "ou=ext_ou1," + self.base_dn)
|
---|
1534 |
|
---|
1535 | def test_ntSecurityDescriptor(self):
|
---|
1536 | #create empty ou
|
---|
1537 | self.ldb_admin.create_ou("ou=ext_ou1," + self.base_dn)
|
---|
1538 | #give u1 Create children access
|
---|
1539 | mod = "(A;;CC;;;%s)" % str(self.user_sid1)
|
---|
1540 | self.sd_utils.dacl_add_ace("OU=ext_ou1," + self.base_dn, mod)
|
---|
1541 | mod = "(A;;LC;;;%s)" % str(self.user_sid2)
|
---|
1542 | self.sd_utils.dacl_add_ace("OU=ext_ou1," + self.base_dn, mod)
|
---|
1543 | #create a group under that, grant RP to u2
|
---|
1544 | self.ldb_user1.newgroup("ext_group1", groupou="OU=ext_ou1", grouptype=4)
|
---|
1545 | mod = "(A;;RP;;;%s)" % str(self.user_sid2)
|
---|
1546 | self.sd_utils.dacl_add_ace("CN=ext_group1,OU=ext_ou1," + self.base_dn, mod)
|
---|
1547 | #u2 must not read the descriptor
|
---|
1548 | res = self.ldb_user2.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
|
---|
1549 | SCOPE_BASE, None, ["nTSecurityDescriptor"])
|
---|
1550 | self.assertNotEqual(len(res), 0)
|
---|
1551 | self.assertFalse("nTSecurityDescriptor" in res[0].keys())
|
---|
1552 | #grant RC to u2 - still no access
|
---|
1553 | mod = "(A;;RC;;;%s)" % str(self.user_sid2)
|
---|
1554 | self.sd_utils.dacl_add_ace("CN=ext_group1,OU=ext_ou1," + self.base_dn, mod)
|
---|
1555 | res = self.ldb_user2.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
|
---|
1556 | SCOPE_BASE, None, ["nTSecurityDescriptor"])
|
---|
1557 | self.assertNotEqual(len(res), 0)
|
---|
1558 | self.assertFalse("nTSecurityDescriptor" in res[0].keys())
|
---|
1559 | #u3 is member of administrators group, should be able to read sd
|
---|
1560 | res = self.ldb_user3.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
|
---|
1561 | SCOPE_BASE, None, ["nTSecurityDescriptor"])
|
---|
1562 | self.assertEqual(len(res),1)
|
---|
1563 | self.assertTrue("nTSecurityDescriptor" in res[0].keys())
|
---|
1564 |
|
---|
1565 |
|
---|
1566 | class AclSPNTests(AclTests):
|
---|
1567 |
|
---|
1568 | def setUp(self):
|
---|
1569 | super(AclSPNTests, self).setUp()
|
---|
1570 | self.dcname = "TESTSRV8"
|
---|
1571 | self.rodcname = "TESTRODC8"
|
---|
1572 | self.computername = "testcomp8"
|
---|
1573 | self.test_user = "spn_test_user8"
|
---|
1574 | self.computerdn = "CN=%s,CN=computers,%s" % (self.computername, self.base_dn)
|
---|
1575 | self.dc_dn = "CN=%s,OU=Domain Controllers,%s" % (self.dcname, self.base_dn)
|
---|
1576 | self.site = "Default-First-Site-Name"
|
---|
1577 | self.rodcctx = dc_join(server=host, creds=creds, lp=lp,
|
---|
1578 | site=self.site, netbios_name=self.rodcname, targetdir=None,
|
---|
1579 | domain=None)
|
---|
1580 | self.dcctx = dc_join(server=host, creds=creds, lp=lp, site=self.site,
|
---|
1581 | netbios_name=self.dcname, targetdir=None, domain=None)
|
---|
1582 | self.ldb_admin.newuser(self.test_user, self.user_pass)
|
---|
1583 | self.ldb_user1 = self.get_ldb_connection(self.test_user, self.user_pass)
|
---|
1584 | self.user_sid1 = self.sd_utils.get_object_sid(self.get_user_dn(self.test_user))
|
---|
1585 | self.create_computer(self.computername, self.dcctx.dnsdomain)
|
---|
1586 | self.create_rodc(self.rodcctx)
|
---|
1587 | self.create_dc(self.dcctx)
|
---|
1588 |
|
---|
1589 | def tearDown(self):
|
---|
1590 | super(AclSPNTests, self).tearDown()
|
---|
1591 | self.rodcctx.cleanup_old_join()
|
---|
1592 | self.dcctx.cleanup_old_join()
|
---|
1593 | delete_force(self.ldb_admin, "cn=%s,cn=computers,%s" % (self.computername, self.base_dn))
|
---|
1594 | delete_force(self.ldb_admin, self.get_user_dn(self.test_user))
|
---|
1595 |
|
---|
1596 | def replace_spn(self, _ldb, dn, spn):
|
---|
1597 | print "Setting spn %s on %s" % (spn, dn)
|
---|
1598 | res = self.ldb_admin.search(dn, expression="(objectClass=*)",
|
---|
1599 | scope=SCOPE_BASE, attrs=["servicePrincipalName"])
|
---|
1600 | if "servicePrincipalName" in res[0].keys():
|
---|
1601 | flag = FLAG_MOD_REPLACE
|
---|
1602 | else:
|
---|
1603 | flag = FLAG_MOD_ADD
|
---|
1604 |
|
---|
1605 | msg = Message()
|
---|
1606 | msg.dn = Dn(self.ldb_admin, dn)
|
---|
1607 | msg["servicePrincipalName"] = MessageElement(spn, flag,
|
---|
1608 | "servicePrincipalName")
|
---|
1609 | _ldb.modify(msg)
|
---|
1610 |
|
---|
1611 | def create_computer(self, computername, domainname):
|
---|
1612 | dn = "CN=%s,CN=computers,%s" % (computername, self.base_dn)
|
---|
1613 | samaccountname = computername + "$"
|
---|
1614 | dnshostname = "%s.%s" % (computername, domainname)
|
---|
1615 | self.ldb_admin.add({
|
---|
1616 | "dn": dn,
|
---|
1617 | "objectclass": "computer",
|
---|
1618 | "sAMAccountName": samaccountname,
|
---|
1619 | "userAccountControl": str(samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT),
|
---|
1620 | "dNSHostName": dnshostname})
|
---|
1621 |
|
---|
1622 | # same as for join_RODC, but do not set any SPNs
|
---|
1623 | def create_rodc(self, ctx):
|
---|
1624 | ctx.krbtgt_dn = "CN=krbtgt_%s,CN=Users,%s" % (ctx.myname, ctx.base_dn)
|
---|
1625 |
|
---|
1626 | ctx.never_reveal_sid = [ "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_DENY),
|
---|
1627 | "<SID=%s>" % security.SID_BUILTIN_ADMINISTRATORS,
|
---|
1628 | "<SID=%s>" % security.SID_BUILTIN_SERVER_OPERATORS,
|
---|
1629 | "<SID=%s>" % security.SID_BUILTIN_BACKUP_OPERATORS,
|
---|
1630 | "<SID=%s>" % security.SID_BUILTIN_ACCOUNT_OPERATORS ]
|
---|
1631 | ctx.reveal_sid = "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_ALLOW)
|
---|
1632 |
|
---|
1633 | mysid = ctx.get_mysid()
|
---|
1634 | admin_dn = "<SID=%s>" % mysid
|
---|
1635 | ctx.managedby = admin_dn
|
---|
1636 |
|
---|
1637 | ctx.userAccountControl = (samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT |
|
---|
1638 | samba.dsdb.UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION |
|
---|
1639 | samba.dsdb.UF_PARTIAL_SECRETS_ACCOUNT)
|
---|
1640 |
|
---|
1641 | ctx.connection_dn = "CN=RODC Connection (FRS),%s" % ctx.ntds_dn
|
---|
1642 | ctx.secure_channel_type = misc.SEC_CHAN_RODC
|
---|
1643 | ctx.RODC = True
|
---|
1644 | ctx.replica_flags = (drsuapi.DRSUAPI_DRS_INIT_SYNC |
|
---|
1645 | drsuapi.DRSUAPI_DRS_PER_SYNC |
|
---|
1646 | drsuapi.DRSUAPI_DRS_GET_ANC |
|
---|
1647 | drsuapi.DRSUAPI_DRS_NEVER_SYNCED |
|
---|
1648 | drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
|
---|
1649 |
|
---|
1650 | ctx.join_add_objects()
|
---|
1651 |
|
---|
1652 | def create_dc(self, ctx):
|
---|
1653 | ctx.userAccountControl = samba.dsdb.UF_SERVER_TRUST_ACCOUNT | samba.dsdb.UF_TRUSTED_FOR_DELEGATION
|
---|
1654 | ctx.secure_channel_type = misc.SEC_CHAN_BDC
|
---|
1655 | ctx.replica_flags = (drsuapi.DRSUAPI_DRS_WRIT_REP |
|
---|
1656 | drsuapi.DRSUAPI_DRS_INIT_SYNC |
|
---|
1657 | drsuapi.DRSUAPI_DRS_PER_SYNC |
|
---|
1658 | drsuapi.DRSUAPI_DRS_FULL_SYNC_IN_PROGRESS |
|
---|
1659 | drsuapi.DRSUAPI_DRS_NEVER_SYNCED)
|
---|
1660 |
|
---|
1661 | ctx.join_add_objects()
|
---|
1662 |
|
---|
1663 | def dc_spn_test(self, ctx):
|
---|
1664 | netbiosdomain = self.dcctx.get_domain_name()
|
---|
1665 | try:
|
---|
1666 | self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s/%s" % (ctx.myname, netbiosdomain))
|
---|
1667 | except LdbError, (num, _):
|
---|
1668 | self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
|
---|
1669 |
|
---|
1670 | mod = "(OA;;SW;f3a64788-5306-11d1-a9c5-0000f80367c1;;%s)" % str(self.user_sid1)
|
---|
1671 | self.sd_utils.dacl_add_ace(ctx.acct_dn, mod)
|
---|
1672 | self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s/%s" % (ctx.myname, netbiosdomain))
|
---|
1673 | self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s" % (ctx.myname))
|
---|
1674 | self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s.%s/%s" %
|
---|
1675 | (ctx.myname, ctx.dnsdomain, netbiosdomain))
|
---|
1676 | self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s/%s" % (ctx.myname, ctx.dnsdomain))
|
---|
1677 | self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s.%s/%s" %
|
---|
1678 | (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
|
---|
1679 | self.replace_spn(self.ldb_user1, ctx.acct_dn, "GC/%s.%s/%s" %
|
---|
1680 | (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
|
---|
1681 | self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s/%s" % (ctx.myname, netbiosdomain))
|
---|
1682 | self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/%s" %
|
---|
1683 | (ctx.myname, ctx.dnsdomain, netbiosdomain))
|
---|
1684 | self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s" % (ctx.myname))
|
---|
1685 | self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s/%s" % (ctx.myname, ctx.dnsdomain))
|
---|
1686 | self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/%s" %
|
---|
1687 | (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
|
---|
1688 | self.replace_spn(self.ldb_user1, ctx.acct_dn, "DNS/%s/%s" % (ctx.myname, ctx.dnsdomain))
|
---|
1689 | self.replace_spn(self.ldb_user1, ctx.acct_dn, "RestrictedKrbHost/%s/%s" %
|
---|
1690 | (ctx.myname, ctx.dnsdomain))
|
---|
1691 | self.replace_spn(self.ldb_user1, ctx.acct_dn, "RestrictedKrbHost/%s" %
|
---|
1692 | (ctx.myname))
|
---|
1693 | self.replace_spn(self.ldb_user1, ctx.acct_dn, "Dfsr-12F9A27C-BF97-4787-9364-D31B6C55EB04/%s/%s" %
|
---|
1694 | (ctx.myname, ctx.dnsdomain))
|
---|
1695 | self.replace_spn(self.ldb_user1, ctx.acct_dn, "NtFrs-88f5d2bd-b646-11d2-a6d3-00c04fc9b232/%s/%s" %
|
---|
1696 | (ctx.myname, ctx.dnsdomain))
|
---|
1697 | self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s._msdcs.%s" %
|
---|
1698 | (ctx.ntds_guid, ctx.dnsdomain))
|
---|
1699 |
|
---|
1700 | #the following spns do not match the restrictions and should fail
|
---|
1701 | try:
|
---|
1702 | self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/ForestDnsZones.%s" %
|
---|
1703 | (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
|
---|
1704 | except LdbError, (num, _):
|
---|
1705 | self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
|
---|
1706 | try:
|
---|
1707 | self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/DomainDnsZones.%s" %
|
---|
1708 | (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
|
---|
1709 | except LdbError, (num, _):
|
---|
1710 | self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
|
---|
1711 | try:
|
---|
1712 | self.replace_spn(self.ldb_user1, ctx.acct_dn, "nosuchservice/%s/%s" % ("abcd", "abcd"))
|
---|
1713 | except LdbError, (num, _):
|
---|
1714 | self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
|
---|
1715 | try:
|
---|
1716 | self.replace_spn(self.ldb_user1, ctx.acct_dn, "GC/%s.%s/%s" %
|
---|
1717 | (ctx.myname, ctx.dnsdomain, netbiosdomain))
|
---|
1718 | except LdbError, (num, _):
|
---|
1719 | self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
|
---|
1720 | try:
|
---|
1721 | self.replace_spn(self.ldb_user1, ctx.acct_dn, "E3514235-4B06-11D1-AB04-00C04FC2DCD2/%s/%s" %
|
---|
1722 | (ctx.ntds_guid, ctx.dnsdomain))
|
---|
1723 | except LdbError, (num, _):
|
---|
1724 | self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
|
---|
1725 |
|
---|
1726 | def test_computer_spn(self):
|
---|
1727 | # with WP, any value can be set
|
---|
1728 | netbiosdomain = self.dcctx.get_domain_name()
|
---|
1729 | self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s/%s" %
|
---|
1730 | (self.computername, netbiosdomain))
|
---|
1731 | self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s" % (self.computername))
|
---|
1732 | self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s.%s/%s" %
|
---|
1733 | (self.computername, self.dcctx.dnsdomain, netbiosdomain))
|
---|
1734 | self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s/%s" %
|
---|
1735 | (self.computername, self.dcctx.dnsdomain))
|
---|
1736 | self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s.%s/%s" %
|
---|
1737 | (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
|
---|
1738 | self.replace_spn(self.ldb_admin, self.computerdn, "GC/%s.%s/%s" %
|
---|
1739 | (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
|
---|
1740 | self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s/%s" % (self.computername, netbiosdomain))
|
---|
1741 | self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/ForestDnsZones.%s" %
|
---|
1742 | (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
|
---|
1743 | self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/DomainDnsZones.%s" %
|
---|
1744 | (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
|
---|
1745 | self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/%s" %
|
---|
1746 | (self.computername, self.dcctx.dnsdomain, netbiosdomain))
|
---|
1747 | self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s" % (self.computername))
|
---|
1748 | self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s/%s" %
|
---|
1749 | (self.computername, self.dcctx.dnsdomain))
|
---|
1750 | self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/%s" %
|
---|
1751 | (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
|
---|
1752 | self.replace_spn(self.ldb_admin, self.computerdn, "DNS/%s/%s" %
|
---|
1753 | (self.computername, self.dcctx.dnsdomain))
|
---|
1754 | self.replace_spn(self.ldb_admin, self.computerdn, "RestrictedKrbHost/%s/%s" %
|
---|
1755 | (self.computername, self.dcctx.dnsdomain))
|
---|
1756 | self.replace_spn(self.ldb_admin, self.computerdn, "RestrictedKrbHost/%s" %
|
---|
1757 | (self.computername))
|
---|
1758 | self.replace_spn(self.ldb_admin, self.computerdn, "Dfsr-12F9A27C-BF97-4787-9364-D31B6C55EB04/%s/%s" %
|
---|
1759 | (self.computername, self.dcctx.dnsdomain))
|
---|
1760 | self.replace_spn(self.ldb_admin, self.computerdn, "NtFrs-88f5d2bd-b646-11d2-a6d3-00c04fc9b232/%s/%s" %
|
---|
1761 | (self.computername, self.dcctx.dnsdomain))
|
---|
1762 | self.replace_spn(self.ldb_admin, self.computerdn, "nosuchservice/%s/%s" % ("abcd", "abcd"))
|
---|
1763 |
|
---|
1764 | #user has neither WP nor Validated-SPN, access denied expected
|
---|
1765 | try:
|
---|
1766 | self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" % (self.computername, netbiosdomain))
|
---|
1767 | except LdbError, (num, _):
|
---|
1768 | self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
|
---|
1769 |
|
---|
1770 | mod = "(OA;;SW;f3a64788-5306-11d1-a9c5-0000f80367c1;;%s)" % str(self.user_sid1)
|
---|
1771 | self.sd_utils.dacl_add_ace(self.computerdn, mod)
|
---|
1772 | #grant Validated-SPN and check which values are accepted
|
---|
1773 | #see 3.1.1.5.3.1.1.4 servicePrincipalName for reference
|
---|
1774 |
|
---|
1775 | # for regular computer objects we shouldalways get constraint violation
|
---|
1776 |
|
---|
1777 | # This does not pass against Windows, although it should according to docs
|
---|
1778 | self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s" % (self.computername))
|
---|
1779 | self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s.%s" %
|
---|
1780 | (self.computername, self.dcctx.dnsdomain))
|
---|
1781 |
|
---|
1782 | try:
|
---|
1783 | self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" % (self.computername, netbiosdomain))
|
---|
1784 | except LdbError, (num, _):
|
---|
1785 | self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
|
---|
1786 | try:
|
---|
1787 | self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s.%s/%s" %
|
---|
1788 | (self.computername, self.dcctx.dnsdomain, netbiosdomain))
|
---|
1789 | except LdbError, (num, _):
|
---|
1790 | self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
|
---|
1791 | try:
|
---|
1792 | self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" %
|
---|
1793 | (self.computername, self.dcctx.dnsdomain))
|
---|
1794 | except LdbError, (num, _):
|
---|
1795 | self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
|
---|
1796 | try:
|
---|
1797 | self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s.%s/%s" %
|
---|
1798 | (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
|
---|
1799 | except LdbError, (num, _):
|
---|
1800 | self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
|
---|
1801 | try:
|
---|
1802 | self.replace_spn(self.ldb_user1, self.computerdn, "GC/%s.%s/%s" %
|
---|
1803 | (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
|
---|
1804 | except LdbError, (num, _):
|
---|
1805 | self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
|
---|
1806 | try:
|
---|
1807 | self.replace_spn(self.ldb_user1, self.computerdn, "ldap/%s/%s" % (self.computername, netbiosdomain))
|
---|
1808 | except LdbError, (num, _):
|
---|
1809 | self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
|
---|
1810 | try:
|
---|
1811 | self.replace_spn(self.ldb_user1, self.computerdn, "ldap/%s.%s/ForestDnsZones.%s" %
|
---|
1812 | (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
|
---|
1813 | except LdbError, (num, _):
|
---|
1814 | self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
|
---|
1815 |
|
---|
1816 | def test_spn_rwdc(self):
|
---|
1817 | self.dc_spn_test(self.dcctx)
|
---|
1818 |
|
---|
1819 | def test_spn_rodc(self):
|
---|
1820 | self.dc_spn_test(self.rodcctx)
|
---|
1821 |
|
---|
1822 |
|
---|
1823 | # Important unit running information
|
---|
1824 |
|
---|
1825 | ldb = SamDB(ldaphost, credentials=creds, session_info=system_session(lp), lp=lp)
|
---|
1826 |
|
---|
1827 | runner = SubunitTestRunner()
|
---|
1828 | rc = 0
|
---|
1829 | if not runner.run(unittest.makeSuite(AclAddTests)).wasSuccessful():
|
---|
1830 | rc = 1
|
---|
1831 | if not runner.run(unittest.makeSuite(AclModifyTests)).wasSuccessful():
|
---|
1832 | rc = 1
|
---|
1833 | if not runner.run(unittest.makeSuite(AclDeleteTests)).wasSuccessful():
|
---|
1834 | rc = 1
|
---|
1835 | if not runner.run(unittest.makeSuite(AclRenameTests)).wasSuccessful():
|
---|
1836 | rc = 1
|
---|
1837 |
|
---|
1838 | # Get the old "dSHeuristics" if it was set
|
---|
1839 | dsheuristics = ldb.get_dsheuristics()
|
---|
1840 | # Set the "dSHeuristics" to activate the correct "userPassword" behaviour
|
---|
1841 | ldb.set_dsheuristics("000000001")
|
---|
1842 | # Get the old "minPwdAge"
|
---|
1843 | minPwdAge = ldb.get_minPwdAge()
|
---|
1844 | # Set it temporarely to "0"
|
---|
1845 | ldb.set_minPwdAge("0")
|
---|
1846 | if not runner.run(unittest.makeSuite(AclCARTests)).wasSuccessful():
|
---|
1847 | rc = 1
|
---|
1848 | if not runner.run(unittest.makeSuite(AclSearchTests)).wasSuccessful():
|
---|
1849 | rc = 1
|
---|
1850 | # Reset the "dSHeuristics" as they were before
|
---|
1851 | ldb.set_dsheuristics(dsheuristics)
|
---|
1852 | # Reset the "minPwdAge" as it was before
|
---|
1853 | ldb.set_minPwdAge(minPwdAge)
|
---|
1854 |
|
---|
1855 | if not runner.run(unittest.makeSuite(AclExtendedTests)).wasSuccessful():
|
---|
1856 | rc = 1
|
---|
1857 | if not runner.run(unittest.makeSuite(AclSPNTests)).wasSuccessful():
|
---|
1858 | rc = 1
|
---|
1859 | sys.exit(rc)
|
---|