Coverage for src / pytest_cases / plugin.py: 87%

538 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-02 23:01 +0000

1# Authors: Sylvain MARIE <sylvain.marie@se.com> 

2# + All contributors to <https://github.com/smarie/python-pytest-cases> 

3# 

4# License: 3-clause BSD, <https://github.com/smarie/python-pytest-cases/blob/master/LICENSE> 

5from collections import OrderedDict, namedtuple 

6from copy import copy 

7from functools import partial 

8from warnings import warn 

9 

10try: 

11 from collections.abc import MutableSequence 

12except: # noqa 

13 from collections import MutableSequence 

14 

15import pytest 

16 

17try: # python 3.3+ 

18 from inspect import signature 

19except ImportError: 

20 from funcsigs import signature # noqa 

21 

22try: # python 3.3+ type hints 

23 from typing import List, Tuple, Union, Iterable, MutableMapping, Mapping, Optional # noqa 

24 from _pytest.python import CallSpec2 

25 from _pytest.config import Config 

26except ImportError: 

27 pass 

28 

29from .common_mini_six import string_types 

30from .common_pytest_lazy_values import get_lazy_args 

31from .common_pytest_marks import PYTEST35_OR_GREATER, PYTEST46_OR_GREATER, PYTEST37_OR_GREATER, PYTEST7_OR_GREATER, \ 

32 PYTEST8_OR_GREATER, PYTEST9_OR_GREATER 

33from .common_pytest import get_pytest_nodeid, get_pytest_function_scopeval, is_function_node, get_param_names, \ 

34 get_param_argnames_as_list, has_function_scope, set_callspec_arg_scope_to_function, in_callspec_explicit_args 

35 

36from .fixture_core1_unions import NOT_USED, USED, is_fixture_union_params, UnionFixtureAlternative 

37 

38# if PYTEST54_OR_GREATER: 

39# # we will need to clean the empty ids explicitly in the plugin :'( 

40from .fixture_parametrize_plus import remove_empty_ids 

41 

42from .case_parametrizer_new import get_current_cases 

43 

44 

45_DEBUG = False 

46"""Note: this is a manual flag to turn when developing (do not forget to also call pytest with -s)""" 

47 

48 

49# @pytest.hookimpl(hookwrapper=True, tryfirst=True) 

50# def pytest_pycollect_makeitem(collector, name, obj): 

51# # custom collection of additional things - we could use it one day for Cases ? 

52# # see also https://hackebrot.github.io/pytest-tricks/customize_class_collection/ 

53# outcome = yield 

54# res = outcome.get_result() 

55# if res is not None: 

56# return 

57# # nothing was collected elsewhere, let's do it here 

58# if safe_isclass(obj): 

59# if collector.istestclass(obj, name): 

60# outcome.force_result(Class(name, parent=collector)) 

61# elif collector.istestfunction(obj, name): 

62# ... 

63 

64 

65@pytest.hookimpl(tryfirst=True, hookwrapper=True) 

66def pytest_runtest_setup(item): 

67 """ Resolve all `lazy_value` in the dictionary of function args """ 

68 

69 yield # first let all other hooks run, they will do the setup etc. 

70 

71 # now item.funcargs exists so we can handle it 

72 if hasattr(item, "funcargs"): 72 ↛ exitline 72 didn't return from function 'pytest_runtest_setup' because the condition on line 72 was always true

73 item.funcargs = {argname: get_lazy_args(argvalue, item) 

74 for argname, argvalue in item.funcargs.items()} 

75 

76 

77# @pytest.hookimpl(tryfirst=True, hookwrapper=True) 

78def pytest_collection(session): 

79 """ HACK: override the fixture manager's `getfixtureclosure` method to replace it with ours """ 

80 

81 # Note for reference: another way to access the fm is `metafunc.config.pluginmanager.get_plugin('funcmanage')` 

82 session._fixturemanager.getfixtureclosure = partial(getfixtureclosure, session._fixturemanager) # noqa 

83 

84 

85class FixtureDefsCache(object): 

86 """ 

87 A 'cache' for fixture definitions obtained from the FixtureManager `fm`, for test node `nodeid` 

88 """ 

89 __slots__ = 'fm', 'node', 'cached_fix_defs' 

90 

91 def __init__(self, fm, node): 

92 self.fm = fm 

93 self.node = node 

94 self.cached_fix_defs = dict() 

95 

96 def get_fixture_defs(self, fixname): 

97 try: 

98 # try to retrieve it from cache 

99 fixdefs = self.cached_fix_defs[fixname] 

100 except KeyError: 

101 # otherwise get it and store for next time 

102 if hasattr(pytest, "version_tuple") and pytest.version_tuple >= (8, 1): 102 ↛ 105line 102 didn't jump to line 105 because the condition on line 102 was always true

103 fixdefs = self.fm.getfixturedefs(fixname, self.node) 

104 else: 

105 fixdefs = self.fm.getfixturedefs(fixname, self.node.nodeid) 

106 self.cached_fix_defs[fixname] = fixdefs 

107 

108 return fixdefs 

109 

110 

111class FixtureClosureNode(object): 

112 """ 

113 A node in a fixture closure Tree. 

114 

115 - its `fixture_defs` is a {name: def} ordered dict containing all fixtures AND args that are required at this node 

116 (*before* a union is required). Note that some of them have def=None when the fixture manager has no definition 

117 for them (same behaviour than in pytest). `get_all_fixture_names` and `get_all_fixture_defs` helper functions 

118 allow to either return the full ordered list (equivalent to pytest `fixture_names`) or the dictionary of non-none 

119 definitions (equivalent to pytest `arg2fixturedefs`) 

120 

121 - if a union appears at this node, `split_fixture_name` is set to the name of the union fixture, and `children` 

122 contains an ordered dict of {split_fixture_alternative: node} 

123 

124 """ 

125 __slots__ = 'parent', 'fixture_defs_mgr', \ 

126 'fixture_defs', 'split_fixture_name', 'split_fixture_alternatives', 'children' 

127 

128 def __init__(self, 

129 fixture_defs_mgr=None, # type: FixtureDefsCache 

130 parent_node=None # type: FixtureClosureNode 

131 ): 

132 if fixture_defs_mgr is None: 

133 if parent_node is None: 133 ↛ 134line 133 didn't jump to line 134 because the condition on line 133 was never true

134 raise ValueError("root node should have a fixture defs manager") 

135 fixture_defs_mgr = parent_node.fixture_defs_mgr 

136 else: 

137 assert isinstance(fixture_defs_mgr, FixtureDefsCache) 

138 

139 self.fixture_defs_mgr = fixture_defs_mgr 

140 self.parent = parent_node 

141 

142 # these will be set after closure has been built 

143 self.fixture_defs = None # type: OrderedDict 

144 self.split_fixture_name = None # type: str 

145 self.split_fixture_alternatives = [] 

146 # we do not use a dict any more as several children can use the same union value (doubled unions) 

147 self.children = [] # type: List[FixtureClosureNode] 

148 

149 # ------ tree ------------------ 

150 

151 def get_leaves(self): 

152 if self.has_split(): 

153 return [n for c in self.children for n in c.get_leaves()] 

154 else: 

155 return [self] 

156 

157 # ------ str / repr --------------- 

158 

159 def to_str(self, indent_nb=0, with_children=True): 

160 """ a string representation, either with all the subtree (default) or without (with_children=False) """ 

161 

162 indent = " " * indent_nb 

163 

164 if not self.is_closure_built(): 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true

165 str_repr = "<pending, incomplete>" 

166 else: 

167 str_repr = "%s(%s)" % (indent, ",".join([("%s" % f) for f in self.fixture_defs.keys()])) 

168 

169 if self.has_split() and with_children: 

170 children_str_prefix = "\n%s - " % indent 

171 children_str = children_str_prefix + children_str_prefix.join([c.to_str(indent_nb=indent_nb + 1) 

172 for c in self.children]) 

173 str_repr = str_repr + " split: " + self.split_fixture_name + children_str 

174 

175 return str_repr 

176 

177 def __repr__(self): 

178 return self.to_str() 

179 

180 # ---- getters to read the "super" closure (used in SuperClosure) 

181 

182 def get_all_fixture_names(self, try_to_sort_by_scope=True): 

183 """ Return a list containing all unique fixture names used by this tree""" 

184 if not try_to_sort_by_scope: 184 ↛ 185line 184 didn't jump to line 185 because the condition on line 184 was never true

185 return [k for k, _ in self.gen_all_fixture_defs(drop_fake_fixtures=False)] 

186 else: 

187 return list(self.get_all_fixture_defs(drop_fake_fixtures=False, try_to_sort=True)) 

188 

189 def get_all_fixture_defs(self, drop_fake_fixtures=True, try_to_sort=True): 

190 """ Return a dict containing all fixture definitions for fixtures used in this tree""" 

191 # get all pairs 

192 items = self.gen_all_fixture_defs(drop_fake_fixtures=drop_fake_fixtures) 

193 

194 # sort by scope as in pytest fixture closure creator (pytest did not do it in early versions, align with this) 

195 if try_to_sort: 195 ↛ 213line 195 didn't jump to line 213 because the condition on line 195 was always true

196 if PYTEST7_OR_GREATER: 196 ↛ 205line 196 didn't jump to line 205 because the condition on line 196 was always true

197 # Scope is an enum, values are in reversed order, and the field is _scope 

198 f_scope = get_pytest_function_scopeval() 

199 

200 def sort_by_scope(kv_pair): 

201 fixture_name, fixture_defs = kv_pair 

202 return fixture_defs[-1]._scope if fixture_defs is not None else f_scope 

203 items = sorted(list(items), key=sort_by_scope, reverse=True) 

204 

205 elif PYTEST35_OR_GREATER: 

206 # scopes is a list, values are indices in the list, and the field is scopenum 

207 f_scope = get_pytest_function_scopeval() 

208 def sort_by_scope(kv_pair): # noqa 

209 fixture_name, fixture_defs = kv_pair 

210 return fixture_defs[-1].scopenum if fixture_defs is not None else f_scope 

211 items = sorted(list(items), key=sort_by_scope) 

212 

213 return OrderedDict(items) 

214 

215 def gen_all_fixture_defs(self, drop_fake_fixtures=True): 

216 """ 

217 Generate all pairs of (fixture name, fixture def or none) used in the tree in top to bottom order 

218 Note that this method could be generalized to also yield the parent defs, so as to be used to replace 

219 the engine in `self.gather_all_required`. But this is micro-optimization, really. 

220 Note: `gather_all_required` was not built to be concerned with ordering because it is only used as a set. 

221 """ 

222 

223 # fixtures required at this node 

224 for k, v in self.fixture_defs.items(): 

225 if not drop_fake_fixtures or v is not None: 225 ↛ 224line 225 didn't jump to line 224 because the condition on line 225 was always true

226 yield k, v 

227 

228 # split fixture: not needed since it is the last entry in self.fixture_defs 

229 

230 # fixtures required by children if any 

231 for c in self.children: 

232 for k, v in c.gen_all_fixture_defs(drop_fake_fixtures=drop_fake_fixtures): 

233 yield k, v 

234 

235 # ---- utils to build the closure 

236 

237 def build_closure(self, 

238 initial_fixture_names, # type: Iterable[str] 

239 ignore_args=() 

240 ): 

241 """ 

242 Updates this Node with the fixture names provided as argument. 

243 Fixture names and definitions will be stored in self.fixture_defs. 

244 

245 If some fixtures are Union fixtures, this node will become a "split" node 

246 and have children. If new fixtures are added to the node after that, 

247 they will be added to the child nodes rather than self. 

248 

249 :param initial_fixture_names: 

250 :param ignore_args: arguments to keep in the names but not to put in the fixture defs, because they correspond 

251 to "direct parametrization" 

252 :return: 

253 """ 

254 self._build_closure(self.fixture_defs_mgr, initial_fixture_names, ignore_args=ignore_args) 

255 

256 def is_closure_built(self): 

257 return self.fixture_defs is not None 

258 

259 def already_knows_fixture(self, fixture_name): 

260 """ Return True if this fixture is known by this node or one of its parents """ 

261 if fixture_name in self.fixture_defs: 

262 return True 

263 elif self.parent is None: 

264 return False 

265 else: 

266 return self.parent.already_knows_fixture(fixture_name) 

267 

268 def _build_closure(self, 

269 fixture_defs_mgr, # type: FixtureDefsCache 

270 initial_fixture_names, # type: Iterable[str] 

271 ignore_args 

272 ): 

273 """ 

274 

275 :param fixture_defs_mgr: 

276 :param initial_fixture_names: 

277 :param ignore_args: arguments to keep in the names but not to put in the fixture defs 

278 :return: nothing (the input arg2fixturedefs is modified) 

279 """ 

280 

281 # Grab all dependencies of all fixtures present at this node and add them to either this or to nodes below. 

282 

283 # -- first switch this object from 'pending' to 'under construction' if needed 

284 # (indeed we now authorize and use the possibility to call this twice. see split() ) 

285 if self.fixture_defs is None: 

286 self.fixture_defs = OrderedDict() 

287 

288 # -- then for all pending, add them with their dependencies 

289 pending_fixture_names = list(initial_fixture_names) 

290 while len(pending_fixture_names) > 0: 

291 fixname = pending_fixture_names.pop(0) 

292 

293 # if the fixture is already known in this node or above, do not care 

294 if self.already_knows_fixture(fixname): 

295 continue 

296 

297 # new ignore_args option in pytest 4.6+. Not really a fixture but a test function parameter, it seems. 

298 if fixname in ignore_args: 

299 self.add_required_fixture(fixname, None) 

300 continue 

301 

302 # else grab the fixture definition(s) for this fixture name for this test node id 

303 fixturedefs = fixture_defs_mgr.get_fixture_defs(fixname) 

304 if not fixturedefs: 

305 # fixture without definition: add it. This can happen with e.g. "requests", etc. 

306 self.add_required_fixture(fixname, None) 

307 continue 

308 else: 

309 # the actual definition is the last one 

310 _fixdef = fixturedefs[-1] 

311 _params = _fixdef.params 

312 

313 if _params is not None and is_fixture_union_params(_params): 

314 # create an UNION fixture 

315 

316 # transform the _params into a list of names 

317 alternative_f_names = UnionFixtureAlternative.to_list_of_fixture_names(_params) 

318 

319 # TO DO if only one name, simplify ? >> No, we leave such "optimization" to the end user 

320 

321 # if there are direct dependencies that are not the union members, add them to pending 

322 non_member_dependencies = [f for f in _fixdef.argnames if f not in alternative_f_names] 

323 # currently we only have 'requests' in this list but future impl of fixture_union may act otherwise 

324 pending_fixture_names += non_member_dependencies 

325 

326 # propagate WITH the pending 

327 self.split_and_build(fixture_defs_mgr, fixname, fixturedefs, alternative_f_names, 

328 pending_fixture_names, ignore_args=ignore_args) 

329 

330 # empty the pending because all of them have been propagated on all children with their dependencies 

331 pending_fixture_names = [] 

332 continue 

333 

334 else: 

335 # normal fixture 

336 self.add_required_fixture(fixname, fixturedefs) 

337 

338 # add all dependencies, accounting for overrides 

339 if PYTEST9_OR_GREATER: 339 ↛ 348line 339 didn't jump to line 348 because the condition on line 339 was always true

340 dependencies = [] 

341 for _fixture_or_overridden in reversed(fixturedefs): 341 ↛ 353line 341 didn't jump to line 353 because the loop on line 341 didn't complete

342 dependencies = list(_fixture_or_overridden.argnames) + dependencies 

343 # If there's an override and doesn't depend on the overridden fixture, 

344 # ignore remaining definitions 

345 if fixname not in _fixture_or_overridden.argnames: 

346 break 

347 else: 

348 dependencies = _fixdef.argnames 

349 

350 # - append: was pytest default 

351 # pending_fixture_names += dependencies 

352 # - prepend: makes much more sense 

353 pending_fixture_names = list(dependencies) + pending_fixture_names 

354 continue 

355 

356 # ------ tools to add new fixture names during closure construction 

357 

358 # def prepend_fixture_without_dependencies(self, fixname): 

359 # """""" 

360 # fixturedefs = self.fixture_defs_mgr.get_fixture_defs(fixname) 

361 # if not fixturedefs: 

362 # # fixture without definition: add it. This can happen with e.g. "requests", etc. 

363 # self.fixture_defs.insert((fixname, None)) 

364 # else: 

365 # # the actual definition is the last one 

366 # _fixdef = fixturedefs[-1] 

367 # _params = _fixdef.params 

368 # 

369 # if _params is not None and is_fixture_union_params(_params): 

370 # # union fixture 

371 # raise ValueError("It is not possible to add a union fixture after the initial closure has been built") 

372 # else: 

373 # # normal fixture 

374 # self.add_required_fixture(fixname, fixturedefs) 

375 # 

376 # # add all dependencies in the to do list 

377 # dependencies = _fixdef.argnames 

378 

379 # def add_fixture_without_dependencies(self, fixname): 

380 # """Used for later addition, once the closure has been built""" 

381 # fixturedefs = self.fixture_defs_mgr.get_fixture_defs(fixname) 

382 # if not fixturedefs: 

383 # # fixture without definition: add it. This can happen with e.g. "requests", etc. 

384 # self.add_required_fixture(fixname, None) 

385 # else: 

386 # # the actual definition is the last one 

387 # _fixdef = fixturedefs[-1] 

388 # _params = _fixdef.params 

389 # 

390 # if _params is not None and is_fixture_union_params(_params): 

391 # # union fixture 

392 # raise ValueError("It is not possible to add a union fixture after the initial closure has been built") 

393 # else: 

394 # # normal fixture 

395 # self.add_required_fixture(fixname, fixturedefs) 

396 

397 def remove_fixtures(self, fixture_names_to_remove): 

398 """Remove some fixture names from all nodes in this subtree. These fixtures should not be split fixtures""" 

399 _to_remove_in_children = [] 

400 for f in fixture_names_to_remove: 

401 if self.split_fixture_name == f: 

402 raise NotImplementedError("It is not currently possible to remove a split fixture name from a closure " 

403 "with splits") 

404 try: 

405 del self.fixture_defs[f] 

406 except KeyError: 

407 _to_remove_in_children.append(f) 

408 

409 # propagate to children if any 

410 if len(_to_remove_in_children) > 0: 

411 for c in self.children: 

412 c.remove_fixtures(_to_remove_in_children) 

413 

414 def add_required_fixture(self, new_fixture_name, new_fixture_defs): 

415 """Add some required fixture names to all leaves under this node""" 

416 if self.already_knows_fixture(new_fixture_name): 

417 return 

418 elif not self.has_split(): 

419 # add_required_fixture locally 

420 if new_fixture_name not in self.fixture_defs: 420 ↛ exitline 420 didn't return from function 'add_required_fixture' because the condition on line 420 was always true

421 self.fixture_defs[new_fixture_name] = new_fixture_defs 

422 else: 

423 # add_required_fixture in each child 

424 for c in self.children: 

425 c.add_required_fixture(new_fixture_name, new_fixture_defs) 

426 

427 def split_and_build(self, 

428 fixture_defs_mgr, # type: FixtureDefsCache 

429 split_fixture_name, # type: str 

430 split_fixture_defs, # type: Tuple[FixtureDefinition] # noqa 

431 alternative_fixture_names, # type: List[str] 

432 pending_fixtures_list, # 

433 ignore_args 

434 ): 

435 """ Declares that this node contains a union with alternatives (child nodes=subtrees) """ 

436 

437 if self.has_split(): 437 ↛ 438line 437 didn't jump to line 438 because the condition on line 437 was never true

438 raise ValueError("This should not happen anymore") 

439 # # propagate the split on the children: split each of them 

440 # for n in self.children: 

441 # n.split_and_build(fm, nodeid, split_fixture_name, split_fixture_defs, alternative_fixture_names) 

442 else: 

443 # add the split (union) name to known fixtures 

444 self.add_required_fixture(split_fixture_name, split_fixture_defs) 

445 

446 # remember it 

447 self.split_fixture_name = split_fixture_name 

448 self.split_fixture_alternatives = alternative_fixture_names 

449 

450 # create the child nodes 

451 for f in alternative_fixture_names: 

452 # create the child node 

453 new_c = FixtureClosureNode(parent_node=self) 

454 self.children.append(new_c) 

455 

456 # set the discarded fixture names 

457 # new_c.split_fixture_discarded_names = [g for g in alternative_fixture_names if g != f] 

458 

459 # perform the propagation: 

460 # (a) first propagate all child's dependencies, (b) then the ones required by parent 

461 # we need to do both at the same time in order to propagate the "pending for child" on all subbranches 

462 pending_for_child = [f] + pending_fixtures_list 

463 new_c._build_closure(fixture_defs_mgr, pending_for_child, ignore_args=ignore_args) 

464 

465 def has_split(self): 

466 return self.split_fixture_name is not None 

467 

468 # ----------- for calls parametrization 

469 

470 def get_not_always_used(self): 

471 """Return the list of fixtures used by this subtree, that are used in *some* leaves only, not all""" 

472 results_list = [] 

473 

474 # initial list is made of fixtures that are in the children 

475 initial_list = self.gather_all_required(include_parents=False) 

476 

477 for c in self.get_leaves(): 

478 j = 0 

479 for _ in range(len(initial_list)): 

480 # get next element in the list (but the list may reduce in size during the loop) 

481 fixture_name = initial_list[j] 

482 if fixture_name not in c.gather_all_required(): 

483 # Remove element from the list. Therefore, do not increment j 

484 del initial_list[j] 

485 results_list.append(fixture_name) 

486 else: 

487 # Do not remove from the list: increment j 

488 j += 1 

489 

490 return results_list 

491 

492 def gather_all_required(self, include_children=True, include_parents=True): 

493 """ 

494 Return a list of all fixtures required by the subtree containing this node 

495 and all of its parents (if include_parents=True) and all of its children (if include_children=True) 

496 

497 See also `self.gen_all_fixture_defs`, that could be generalized to tackle this use case too 

498 (micro-optimization, not really urgent) 

499 """ 

500 # first the fixtures required by this node 

501 required = list(self.fixture_defs.keys()) 

502 

503 # then the ones required by the parents 

504 if include_parents and self.parent is not None: 

505 required = required + self.parent.gather_all_required(include_children=False) 

506 

507 # then the ones from all the children 

508 if include_children: 

509 for child in self.children: 

510 required = required + child.gather_all_required(include_parents=False) 

511 

512 return required 

513 

514 def requires(self, fixturename): 

515 """ Return True if the fixture with this name is required by the subtree at this node """ 

516 return fixturename in self.gather_all_required() 

517 

518 # ------ tools to see the tree as a list of alternatives (used in SuperClosure) 

519 

520 def get_alternatives(self): 

521 """ 

522 Returns the tree "flattened" as a list of alternatives (one per leaf). 

523 Each entry in the list consists of: 

524 

525 - an ordered dictionary {union_fixture_name: (idx, value)} representing the active union filters in this 

526 alternative 

527 - a list of fixture names effectively used in this alternative 

528 

529 :return: a list of alternatives 

530 """ 

531 alternatives = self._get_alternatives() 

532 for i, a in enumerate(alternatives): 

533 # replace the first entry in the tuple with a reversed order one 

534 alternatives[i] = (OrderedDict(reversed(list(a[0].items()))), a[1]) 

535 return alternatives 

536 

537 def _get_alternatives(self): 

538 if self.has_split(): 

539 alternatives_list = [] 

540 for c_idx, (c_split_alternative, c_node) in enumerate(zip(self.split_fixture_alternatives, self.children)): 

541 # for all alternatives in this subtree 

542 for f_dct, n_lst in c_node._get_alternatives(): 

543 # - filter 

544 _f_dct = f_dct.copy() 

545 _f_dct[self.split_fixture_name] = (c_idx, c_split_alternative) 

546 

547 # - unique fixtures used 

548 _n_lst = list(self.fixture_defs) + [_i for _i in n_lst if _i not in self.fixture_defs] 

549 

550 alternatives_list.append((_f_dct, _n_lst)) 

551 

552 return alternatives_list 

553 else: 

554 # return a single partition containing no filter and all fixture names 

555 return [(OrderedDict(), self.get_all_fixture_names())] 

556 

557 

558class SuperClosure(MutableSequence): 

559 """ 

560 A "super closure" is a closure made of several closures, each induced by a fixture union parameter value. 

561 The number of alternative closures is `self.nb_alternative_closures` 

562 

563 This object behaves like a list (a mutable sequence), so that we can pass it to pytest in place of the list of 

564 fixture names that is returned in `getfixtureclosure`. 

565 

566 In this implementation, it is backed by a fixture closure tree, that we have to preserve in order to get 

567 parametrization right. In another branch of this project ('super_closure' branch) we tried to forget the tree 

568 and only keep the partitions, but parametrization order was not as intuitive for the end user as all unions 

569 appeared as parametrized first (since they induced the partitions). 

570 """ 

571 __slots__ = 'tree', 'all_fixture_defs' 

572 

573 def __init__(self, 

574 root_node # type: FixtureClosureNode 

575 ): 

576 # if we wish to drop the tree - but we do not anymore to get a better paramz order 

577 # filters_list, partitions_list = root_node._get_alternatives() 

578 

579 # save the fixture closure tree root 

580 self.tree = root_node 

581 # retrieve/sort fixture defs for quicker access 

582 self._update_fixture_defs() 

583 

584 def _update_fixture_defs(self): 

585 # get a list of all fixture defs, for quicker access (and sorted) 

586 # sort by scope as in pytest fixture closure creator, if scope information is available 

587 all_fixture_defs = self.tree.get_all_fixture_defs(drop_fake_fixtures=False, try_to_sort=True) 

588 

589 # # also sort all partitions (note that we cannot rely on the order in all_fixture_defs when scopes are same!) 

590 # if Version(pytest.__version__) >= Version('3.5.0'): 

591 # f_scope = get_pytest_function_scopeval() 

592 # for p in self.partitions: 

593 # def sort_by_scope2(fixture_name): # noqa 

594 # fixture_defs = all_fixture_defs[fixture_name] 

595 # return fixture_defs[-1].scopenum if fixture_defs is not None else f_scope 

596 # p.sort(key=sort_by_scope2) 

597 

598 self.all_fixture_defs = all_fixture_defs 

599 

600 # --- visualization tools ---- 

601 

602 @property 

603 def nb_alternative_closures(self): 

604 """ Return the number of alternative closures induced by fixture unions """ 

605 filters, partitions = self.tree.get_alternatives() 

606 return len(partitions) 

607 

608 def __repr__(self): 

609 """ Return a synthetic view, and a detailed tree view, of this closure """ 

610 alternatives = self.tree.get_alternatives() 

611 nb_alternative_closures = len(alternatives) 

612 return "SuperClosure with %s alternative closures:\n" % nb_alternative_closures \ 

613 + "\n".join(" - %s (filters: %s)" % (p, ", ".join("%s=%s[%s]=%s" % (k, k, v[0], v[1]) 

614 for k, v in f.items())) 

615 for f, p in alternatives) \ 

616 + "\nThe 'super closure list' is %s\n\nThe fixture tree is :\n%s\n" % (list(self), self.tree) 

617 

618 def get_all_fixture_defs(self, drop_fake_fixtures=True): 

619 """ 

620 Return a dictionary of all fixture defs used in this super closure 

621 

622 note: this is equivalent to 

623 self.tree.get_all_fixture_defs(drop_fake_fixtures=drop_fake_fixtures, try_to_sort=True) 

624 """ 

625 if drop_fake_fixtures: 625 ↛ 630line 625 didn't jump to line 630 because the condition on line 625 was always true

626 # remove the "fixtures" that are actually test function parameter args 

627 return {k: v for k, v in self.all_fixture_defs.items() if v is not None} 

628 else: 

629 # all fixtures AND pseudo-fixtures (test function parameters) 

630 return self.all_fixture_defs 

631 

632 # ---- list (MutableSequence) facade: behaves like a list of fixture names ------ 

633 

634 def __len__(self): 

635 return len(self.all_fixture_defs) 

636 

637 def __getitem__(self, i): 

638 # return the key (fixture name) associated with the i-th pair 

639 # try: 

640 # return next(islice(self.all_fixture_defs.keys(), i, i+1)) 

641 # except StopIteration: 

642 # raise IndexError(i) 

643 return list(self.all_fixture_defs.keys())[i] 

644 

645 def __setitem__(self, i, o): 

646 # try: 

647 # # pytest performs a full replacement using [:] so we handle at least this case 

648 # full_replace = i == slice(None, None, None) 

649 # except: # noqa 

650 # full_replace = False 

651 

652 # Get the existing value(s) that we wish to replace 

653 ref = list(self)[i] 

654 

655 if o == ref: 

656 # no change at all: of course we accept. 

657 return 

658 

659 if not isinstance(i, slice): 659 ↛ 668line 659 didn't jump to line 668 because the condition on line 659 was never true

660 # In-place change of a single item: let's be conservative and reject for now 

661 # if i == 0: 

662 # self.remove(ref) 

663 # self.insert(0, o) 

664 # elif i == len(self) - 1: 

665 # self.remove(ref) 

666 # self.append(o) 

667 # else: 

668 raise NotImplementedError("Replacing an element in a super fixture closure is not currently implemented. " 

669 "Please report this issue to the `pytest-cases` project.") 

670 else: 

671 # Replacement of multiple items at once: support reordering (ignored) and removal (actually done) 

672 new_set = set(o) 

673 ref_set = set(ref) 

674 if new_set == ref_set: 

675 # A change is required in the order of fixtures. Ignore but continue 

676 warn("WARNING: An attempt was made to reorder a super fixture closure with unions. This is not yet " 

677 "supported since the partitions use subsets of the fixtures ; please report it so that we can " 

678 "find a suitable solution for your need.") 

679 return 

680 

681 added = new_set.difference(ref_set) 

682 removed = ref_set.difference(new_set) 

683 if len(added) == 0: 683 ↛ 690line 683 didn't jump to line 690 because the condition on line 683 was always true

684 # Pure removal: ok. 

685 self.remove_all(removed) 

686 return 

687 else: 

688 # self.append_all(added) 

689 # Rather be conservative for now 

690 raise NotImplementedError("Adding elements to a super fixture closure with a slice is not currently" 

691 "implemented. Please report this issue to the `pytest-cases` project.") 

692 

693 def __delitem__(self, i): 

694 self.remove(self[i]) 

695 

696 def insert(self, index, fixture_name): 

697 """ 

698 Try to transparently support inserts. Since the underlying structure is a tree, only two cases 

699 are supported: inserting at position 0 and appending at position len(self). 

700 

701 Note that while appending has no restrictions, inserting at position 0 is only allowed for now if the 

702 fixture to insert does not have a union in its associated closure. 

703 

704 :param index: 

705 :param fixture_name: 

706 :return: 

707 """ 

708 if index == 0: 

709 # build the closure associated with this new fixture name 

710 fixture_defs_mgr = FixtureDefsCache(self.tree.fixture_defs_mgr.fm, self.tree.fixture_defs_mgr.node) 

711 closure_tree = FixtureClosureNode(fixture_defs_mgr=fixture_defs_mgr) 

712 closure_tree.build_closure((fixture_name,)) 

713 if closure_tree.has_split(): 713 ↛ 714line 713 didn't jump to line 714 because the condition on line 713 was never true

714 raise NotImplementedError("When fixture unions are present, inserting a fixture in the closure at " 

715 "position 0 is currently only supported if that fixture's closure does not" 

716 "contain a union. Please report this so that we can find a suitable solution" 

717 " for your need.") 

718 else: 

719 # remove those fixture definitions from all nodes in the tree 

720 self.tree.remove_fixtures(closure_tree.fixture_defs.keys()) 

721 

722 # finally prepend the defs at the beginning of the dictionary in the first node 

723 self.tree.fixture_defs = OrderedDict(list(closure_tree.fixture_defs.items()) 

724 + list(self.tree.fixture_defs.items())) 

725 

726 elif index == len(self): 

727 # appending is natively supported in our tree growing method 

728 self.tree.build_closure((fixture_name,)) 

729 else: 

730 raise NotImplementedError("When fixture unions are present, inserting a fixture in the closure at a " 

731 "position different from 0 (prepend) or <end> (append) is non-trivial. Please" 

732 "report this so that we can find a suitable solution for your need.") 

733 

734 # Finally update self.fixture_defs so that the "list" view reflects the changes in self.tree 

735 self._update_fixture_defs() 

736 

737 def append_all(self, fixture_names): 

738 """Append various fixture names to the closure""" 

739 # appending is natively supported in our tree growing method 

740 self.tree.build_closure(tuple(fixture_names)) 

741 

742 # Finally update self.fixture_defs so that the "list" view reflects the changes in self.tree 

743 self._update_fixture_defs() 

744 

745 def remove(self, value): 

746 """ 

747 Try to transparently support removal. Note: since the underlying structure is a tree, 

748 removing "union" fixtures is non-trivial so for now it is not supported. 

749 

750 :param value: 

751 :return: 

752 """ 

753 # remove in the tree 

754 self.tree.remove_fixtures((value,)) 

755 

756 # update fixture defs 

757 self._update_fixture_defs() 

758 

759 def remove_all(self, values): 

760 """Multiple `remove` operations at once.""" 

761 # remove in the tree 

762 self.tree.remove_fixtures(tuple(values)) 

763 

764 # update fixture defs 

765 self._update_fixture_defs() 

766 

767 

768def _getfixtureclosure(fm, fixturenames, parentnode, ignore_args=()): 

769 """ 

770 Replaces pytest's getfixtureclosure method to handle unions. 

771 """ 

772 

773 # (1) first retrieve the normal pytest output for comparison 

774 kwargs = dict() 

775 if PYTEST46_OR_GREATER: 775 ↛ 779line 775 didn't jump to line 779 because the condition on line 775 was always true

776 # new argument "ignore_args" in 4.6+ 

777 kwargs['ignore_args'] = ignore_args 

778 

779 if PYTEST8_OR_GREATER: 779 ↛ 782line 779 didn't jump to line 782 because the condition on line 779 was always true

780 # two outputs and sig change 

781 ref_fixturenames, ref_arg2fixturedefs = fm.__class__.getfixtureclosure(fm, parentnode, fixturenames, **kwargs) 

782 elif PYTEST37_OR_GREATER: 

783 # three outputs 

784 initial_names, ref_fixturenames, ref_arg2fixturedefs = \ 

785 fm.__class__.getfixtureclosure(fm, fixturenames, parentnode, **kwargs) 

786 else: 

787 # two outputs 

788 ref_fixturenames, ref_arg2fixturedefs = fm.__class__.getfixtureclosure(fm, fixturenames, parentnode) 

789 

790 # (2) now let's do it by ourselves to support fixture unions 

791 _init_fixnames, super_closure, arg2fixturedefs = create_super_closure(fm, parentnode, fixturenames, ignore_args) 

792 

793 # Compare with the previous behaviour TODO remove when in 'production' ? 

794 # NOTE different order happens all the time because of our "prepend" strategy in the closure building 

795 # which makes much more sense/intuition than pytest default 

796 assert set(super_closure) == set(ref_fixturenames) 

797 assert dict(arg2fixturedefs) == ref_arg2fixturedefs 

798 

799 if PYTEST37_OR_GREATER and not PYTEST8_OR_GREATER: 799 ↛ 800line 799 didn't jump to line 800 because the condition on line 799 was never true

800 return _init_fixnames, super_closure, arg2fixturedefs 

801 else: 

802 return super_closure, arg2fixturedefs 

803 

804 

805if PYTEST8_OR_GREATER: 805 ↛ 809line 805 didn't jump to line 809 because the condition on line 805 was always true

806 def getfixtureclosure(fm, parentnode, initialnames, ignore_args): 

807 return _getfixtureclosure(fm, fixturenames=initialnames, parentnode=parentnode, ignore_args=ignore_args) 

808else: 

809 getfixtureclosure = _getfixtureclosure 

810 

811 

812def create_super_closure(fm, 

813 parentnode, 

814 fixturenames, 

815 ignore_args 

816 ): 

817 # type: (...) -> Tuple[List, Union[List, SuperClosure], Mapping] 

818 """ 

819 

820 :param fm: 

821 :param parentnode: 

822 :param fixturenames: 

823 :param ignore_args: 

824 :return: 

825 """ 

826 

827 parentid = parentnode.nodeid 

828 

829 if _DEBUG: 829 ↛ 830line 829 didn't jump to line 830 because the condition on line 829 was never true

830 print("Creating closure for %s:" % parentid) 

831 

832 # -- auto-use fixtures 

833 if hasattr(pytest, "version_tuple") and pytest.version_tuple >= (8, 1): 833 ↛ 836line 833 didn't jump to line 836 because the condition on line 833 was always true

834 _init_fixnames = list(fm._getautousenames(parentnode)) # noqa 

835 else: 

836 _init_fixnames = list(fm._getautousenames(parentid)) # noqa 

837 

838 def _merge(new_items, into_list): 

839 """ Appends items from `new_items` into `into_list`, only if they are not already there. """ 

840 for item in new_items: 

841 if item not in into_list: 

842 into_list.append(item) 

843 

844 # -- required fixtures/params. 

845 # ********* fix the order of initial fixtures: indeed this order may not be the right one ************ 

846 # this only works when pytest version is > 3.4, otherwise the parent node is a Module 

847 if is_function_node(parentnode): 

848 # grab all the parametrization on that node and fix the order. 

849 # Note: on pytest >= 4 the list of param_names is probably the same than the `ignore_args` input 

850 param_names = get_param_names(parentnode) 

851 

852 sorted_fixturenames = sort_according_to_ref_list(fixturenames, param_names) 

853 # ********** 

854 # merge the fixture names in correct order into the _init_fixnames 

855 _merge(sorted_fixturenames, _init_fixnames) 

856 else: 

857 # we cannot sort yet - merge the fixture names into the _init_fixnames 

858 _merge(fixturenames, _init_fixnames) 

859 

860 # Bugfix GH#330 in progress... 

861 # TODO analyze why in the test "fixture_union_0simplest 

862 # the first node contains second, and the second contains first 

863 # or TODO check the test for get_callspecs, it is maybe simpler 

864 

865 # Finally create the closure 

866 fixture_defs_mgr = FixtureDefsCache(fm, parentnode) 

867 closure_tree = FixtureClosureNode(fixture_defs_mgr=fixture_defs_mgr) 

868 closure_tree.build_closure(_init_fixnames, ignore_args=ignore_args) 

869 super_closure = SuperClosure(closure_tree) 

870 all_fixture_defs = super_closure.get_all_fixture_defs(drop_fake_fixtures=True) 

871 

872 # possibly simplify into a list 

873 if not closure_tree.has_split(): 

874 super_closure = list(super_closure) 

875 

876 if _DEBUG: 876 ↛ 877line 876 didn't jump to line 877 because the condition on line 876 was never true

877 print("Closure for %s completed:" % parentid) 

878 print(closure_tree) 

879 print(super_closure) 

880 

881 return _init_fixnames, super_closure, all_fixture_defs 

882 

883 

884@pytest.hookimpl(tryfirst=True, hookwrapper=True) 

885def pytest_generate_tests(metafunc): 

886 """ 

887 We use this hook to replace the 'parametrize' function of `metafunc` with our own below, before it is called 

888 by pytest. Note we could do it in a static way in pytest_sessionstart or plugin init hook but 

889 that way we can still access the original method using metafunc.__class__.parametrize 

890 """ 

891 # override the parametrize method. 

892 metafunc.parametrize = partial(parametrize, metafunc) 

893 

894 # now let pytest parametrize the call as usual 

895 _ = yield 

896 

897 

898class UnionParamz(namedtuple('UnionParamz', ['union_fixture_name', 'alternative_names', 'ids', 'scope', 'kwargs'])): 

899 """ Represents some parametrization to be applied, for a union fixture """ 

900 

901 __slots__ = () 

902 

903 def __str__(self): 

904 return "[UNION] %s=[%s], ids=%s, scope=%s, kwargs=%s" \ 

905 "" % (self.union_fixture_name, ','.join([str(a) for a in self.alternative_names]), 

906 self.ids, self.scope, self.kwargs) 

907 

908 

909class NormalParamz(namedtuple('NormalParamz', ['argnames', 'argvalues', 'indirect', 'ids', 'scope', 'kwargs'])): 

910 """ Represents some parametrization to be applied """ 

911 

912 __slots__ = () 

913 

914 def __str__(self): 

915 return "[NORMAL] %s=[%s], indirect=%s, ids=%s, scope=%s, kwargs=%s" \ 

916 "" % (self.argnames, self.argvalues, self.indirect, self.ids, self.scope, self.kwargs) 

917 

918 

919def parametrize(metafunc, argnames, argvalues, indirect=False, ids=None, scope=None, **kwargs): 

920 """ 

921 This alternate implementation of metafunc.parametrize creates a list of calls that is not just the cartesian 

922 product of all parameters (like the pytest behaviour). Instead, it offers an alternate list of calls taking into 

923 account all "union" fixtures. 

924 

925 For this, it replaces the `metafunc._calls` attribute with a `CallsReactor` instance, and feeds it with all 

926 parameters and parametrized fixtures independently (not doing any cross-product during this call). The resulting 

927 `CallsReactor` instance is then able to dynamically behave like the correct list of calls, lazy-creating that list 

928 when it is used. 

929 """ 

930 if not isinstance(metafunc.fixturenames, SuperClosure): 

931 # legacy method 

932 metafunc.__class__.parametrize(metafunc, argnames, argvalues, indirect=indirect, ids=ids, scope=scope, **kwargs) 

933 

934 # clean EMPTY_ID : since they are never set by us in a normal parametrize, no need to do this here. 

935 # if PYTEST54_OR_GREATER: 

936 # for callspec in metafunc._calls: 

937 # remove_empty_ids(callspec) 

938 else: 

939 # get or create our special container object 

940 if not isinstance(metafunc._calls, CallsReactor): # noqa 

941 # first call: should be an empty list 

942 if len(metafunc._calls) > 0: # noqa 942 ↛ 946line 942 didn't jump to line 946 because the condition on line 942 was never true

943 # If this happens, it is most probably because another plugin has called 'parametrize' before our hook 

944 # plugin.py/pytest_generate_tests has replaced it with this function. It can be due to a regression 

945 # in pluggy too, see https://github.com/smarie/python-pytest-cases/issues/302 

946 raise ValueError("This should not happen - please file an issue") 

947 metafunc._calls = CallsReactor(metafunc) 

948 calls_reactor = metafunc._calls # noqa 

949 

950 # detect union fixtures 

951 if is_fixture_union_params(argvalues): 

952 if ',' in argnames or not isinstance(argnames, string_types): 952 ↛ 953line 952 didn't jump to line 953 because the condition on line 952 was never true

953 raise ValueError("Union fixtures can not be parametrized") 

954 union_fixture_name = argnames 

955 union_fixture_alternatives = argvalues 

956 if indirect is False or len(kwargs) > 0: 956 ↛ 957line 956 didn't jump to line 957 because the condition on line 956 was never true

957 raise ValueError("indirect cannot be set on a union fixture, as well as unknown kwargs") 

958 

959 # add a union parametrization in the queue (but do not apply it now) 

960 calls_reactor.append(UnionParamz(union_fixture_name, union_fixture_alternatives, ids, scope, kwargs)) 

961 else: 

962 # add a normal parametrization in the queue (but do not apply it now) 

963 calls_reactor.append(NormalParamz(argnames, argvalues, indirect, ids, scope, kwargs)) 

964 

965 

966class CallsReactor(object): 

967 """ 

968 This object replaces the list of calls that was in `metafunc._calls`. 

969 It behaves like a list, but it actually builds that list dynamically based on all parametrizations collected 

970 from the custom `metafunc.parametrize` above. 

971 

972 There are therefore three steps: 

973 

974 - when `metafunc.parametrize` is called, this object gets called on `add_union` or `add_param`. A parametrization 

975 order gets stored in `self._pending` 

976 

977 - when this object is first read as a list, all parametrization orders in `self._pending` are transformed into a 

978 tree in `self._tree`, and `self._pending` is discarded. This is done in `create_tree_from_pending_parametrization`. 

979 

980 - finally, the list is built from the tree using `self._tree.to_call_list()`. This will also be the case in 

981 subsequent usages of this object. 

982 

983 """ 

984 __slots__ = 'metafunc', '_pending', '_call_list' 

985 

986 def __init__(self, metafunc): 

987 self.metafunc = metafunc 

988 self._pending = [] # type: List[Union[UnionParamz, NormalParamz]] 

989 self._call_list = None 

990 

991 # -- methods to provising parametrization orders without executing them -- 

992 

993 def append(self, 

994 parametrization # type: Union[UnionParamz, NormalParamz] 

995 ): 

996 self._pending.append(parametrization) 

997 

998 def print_parametrization_list(self): 

999 """Helper method to print all pending parametrizations in this reactor """ 

1000 print("\n".join([str(p) for p in self._pending])) 

1001 

1002 # -- list facade -- 

1003 

1004 def __iter__(self): 

1005 return iter(self.calls_list) 

1006 

1007 def __getitem__(self, item): 

1008 return self.calls_list[item] 

1009 

1010 @property 

1011 def calls_list(self): 

1012 """ 

1013 Returns the list of calls. This property relies on self._tree, that is lazily created on first access, 

1014 based on `self.parametrizations`. 

1015 :return: 

1016 """ 

1017 if self._call_list is None: 1017 ↛ 1021line 1017 didn't jump to line 1021 because the condition on line 1017 was always true

1018 # create the definitive tree. 

1019 self.create_call_list_from_pending_parametrizations() 

1020 

1021 return self._call_list 

1022 

1023 # --- tree creation (executed once the first time this object is used as a list) 

1024 

1025 def create_call_list_from_pending_parametrizations(self): 

1026 """ 

1027 Takes all parametrization operations that are pending in `self._pending`, 

1028 and creates a parametrization tree out of them. 

1029 

1030 self._pending is set to None afterwards 

1031 :return: 

1032 """ 

1033 # self is on the _calls field, we'll temporarily remove it and finally set it back at the end of this call 

1034 assert self.metafunc._calls is self 

1035 

1036 # ------ parametrize the calls -------- 

1037 # create a dictionary of pending fixturenames/argnames to parametrize. 

1038 pending_dct = OrderedDict() 

1039 for p in self._pending: 

1040 k = get_param_argnames_as_list(p[0]) 

1041 # remember one of the argnames only so that we are able to detect where in the fixture tree the 

1042 # parametrization applies (it will still be applied for all of its argnames, no worries: see _process_node) 

1043 k = k[0] 

1044 pending_dct[k] = p 

1045 

1046 if _DEBUG: 1046 ↛ 1047line 1046 didn't jump to line 1047 because the condition on line 1046 was never true

1047 print("\n---- pending parametrization ----") 

1048 self.print_parametrization_list() 

1049 print("---------------------------------\n") 

1050 print("Applying all of them in the closure tree nodes:") 

1051 

1052 # grab the "super fixtures closure" created previously (see getfixtureclosure above) 

1053 super_closure = self.metafunc.fixturenames 

1054 assert isinstance(super_closure, SuperClosure) 

1055 

1056 # Apply parametrization for calls 

1057 calls = get_calls_for_tree(self.metafunc, super_closure.tree, pending_dct) 

1058 # Alternative: use the partitions for parametrization. The issue is that this leads to a less intuitive order 

1059 # calls = [] 

1060 # for i in range(super_closure.nb_alternative_closures): 

1061 # calls += get_calls_for_partition(self.metafunc, super_closure, i, pending.copy()) 

1062 

1063 if _DEBUG: 1063 ↛ 1064line 1063 didn't jump to line 1064 because the condition on line 1063 was never true

1064 print("\n".join(["%s[%s]: funcargs=%s, params=%s" % (get_pytest_nodeid(self.metafunc), 

1065 c.id, c.params if PYTEST8_OR_GREATER else c.funcargs, 

1066 c.params) 

1067 for c in calls]) + "\n") 

1068 

1069 # clean EMPTY_ID set by @parametrize when there is at least a MultiParamsAlternative 

1070 # if PYTEST54_OR_GREATER: 

1071 for callspec in calls: 

1072 remove_empty_ids(callspec) 

1073 

1074 # save the list and put back self as the _calls facade 

1075 self._call_list = calls 

1076 self.metafunc._calls = self 

1077 # forget about all parametrizations now - this won't happen again 

1078 self._pending = None 

1079 

1080 

1081def get_calls_for_tree(metafunc, 

1082 fix_closure_tree, # type: FixtureClosureNode 

1083 pending_dct # type: MutableMapping[str, Union[UnionParamz, NormalParamz]] 

1084 ): 

1085 """ 

1086 Creates the list of calls for `metafunc` based on 

1087 :param metafunc: 

1088 :param fix_closure_tree: 

1089 :param pending: 

1090 :return: 

1091 """ 

1092 pending_dct = pending_dct.copy() 

1093 calls, nodes_used_by_calls = _process_node(metafunc, fix_closure_tree, pending_dct, []) 

1094 # for each call in calls, the node in nodes_used_by_calls is the corresponding tree leaf. 

1095 _cleanup_calls_list(metafunc, fix_closure_tree, calls, nodes_used_by_calls, pending_dct) 

1096 return calls 

1097 

1098 

1099def _cleanup_calls_list(metafunc, 

1100 fix_closure_tree, # type: FixtureClosureNode 

1101 calls, # type: List[CallSpec2] 

1102 nodes, # type: List[FixtureClosureNode] 

1103 pending_dct # type: MutableMapping[str, Union[UnionParamz, NormalParamz]] 

1104 ): 

1105 """ 

1106 Cleans the calls list so that all calls contain a value for all parameters. This is basically 

1107 about adding "NOT_USED" parametrization everywhere relevant. 

1108 

1109 :param calls: 

1110 :param nodes: 

1111 :param pending: 

1112 :return: 

1113 """ 

1114 

1115 nb_calls = len(calls) 

1116 if nb_calls != len(nodes): 1116 ↛ 1117line 1116 didn't jump to line 1117 because the condition on line 1116 was never true

1117 raise ValueError("This should not happen !") 

1118 

1119 # create ref lists of fixtures per scope 

1120 _not_always_used_func_scoped = [] 

1121 # _not_always_used_other_scoped = [] 

1122 for fixture_name in fix_closure_tree.get_not_always_used(): 

1123 try: 

1124 fixdef = metafunc._arg2fixturedefs[fixture_name] # noqa 

1125 except KeyError: 

1126 continue # dont raise any error here and let pytest say "not found" later 

1127 else: 

1128 if has_function_scope(fixdef[-1]): 

1129 _not_always_used_func_scoped.append(fixture_name) 

1130 # else: 

1131 # _not_always_used_other_scoped.append(fixture_name) 

1132 

1133 for i in range(nb_calls): 

1134 c, n = calls[i], nodes[i] 

1135 

1136 # A/ set to "not used" all parametrized fixtures that were not used in some branches 

1137 for fixture, p_to_apply in pending_dct.items(): 

1138 if not in_callspec_explicit_args(c, fixture): 

1139 # parametrize with a single "not used" value and discard the id 

1140 if isinstance(p_to_apply, UnionParamz): 

1141 c_with_dummy = _parametrize_calls(metafunc, [c], p_to_apply.union_fixture_name, [NOT_USED], 

1142 indirect=True, discard_id=True, scope=p_to_apply.scope, 

1143 **p_to_apply.kwargs) 

1144 else: 

1145 _nb_argnames = len(get_param_argnames_as_list(p_to_apply.argnames)) 

1146 if _nb_argnames > 1: 1146 ↛ 1147line 1146 didn't jump to line 1147 because the condition on line 1146 was never true

1147 _vals = [(NOT_USED,) * _nb_argnames] 

1148 else: 

1149 _vals = [NOT_USED] 

1150 c_with_dummy = _parametrize_calls(metafunc, [c], p_to_apply.argnames, _vals, 

1151 indirect=p_to_apply.indirect, discard_id=True, 

1152 scope=p_to_apply.scope, **p_to_apply.kwargs) 

1153 assert len(c_with_dummy) == 1 

1154 calls[i] = c_with_dummy[0] 

1155 c = calls[i] 

1156 

1157 # B/ function-scoped non-parametrized fixtures also need to be explicitly deactivated in the callspecs 

1158 # where they are not required, otherwise they will be setup/teardown. 

1159 # 

1160 # For this we use a dirty hack: we add a parameter with they name in the callspec, it seems to be propagated 

1161 # in the `request`. TODO is there a better way? 

1162 for fixture_name in _not_always_used_func_scoped: 

1163 if not in_callspec_explicit_args(c, fixture_name): 

1164 if not n.requires(fixture_name): 

1165 # explicitly add it as discarded by creating a parameter value for it. 

1166 c.params[fixture_name] = NOT_USED 

1167 c.indices[fixture_name] = 1 

1168 set_callspec_arg_scope_to_function(c, fixture_name) 

1169 else: 

1170 # explicitly add it as active 

1171 c.params[fixture_name] = USED 

1172 c.indices[fixture_name] = 0 

1173 set_callspec_arg_scope_to_function(c, fixture_name) 

1174 

1175 # finally, if there are some session or module-scoped fixtures that 

1176 # are used in *none* of the calls, they could be deactivated too 

1177 # (see https://github.com/smarie/python-pytest-cases/issues/137) 

1178 # 

1179 # for fixture_name in _not_always_used_other_scoped: 

1180 # _scopenum = metafunc._arg2fixturedefs[fixture_name][-1].scopenum 

1181 # 

1182 # # check if there is at least one call that actually uses the fixture and is not skipped... 

1183 # # this seems a bit "too much" !! > WON'T FIX 

1184 # used = False 

1185 # for i in range(nb_calls): 

1186 # c, n = calls[i], nodes[i] 

1187 # if fixture_name in c.params or fixture_name in c.funcargs or n.requires(fixture_name): 

1188 # if not is_skipped_or_failed(c): # HOW can we implement this based on call (and not item) ??? 

1189 # used = True 

1190 # break 

1191 # 

1192 # if not used: 

1193 # # explicitly add it as discarded everywhere by creating a parameter value for it. 

1194 # for i in range(nb_calls): 

1195 # c = calls[i] 

1196 # c.params[fixture_name] = NOT_USED 

1197 # c.indices[fixture_name] = 0 

1198 # c._arg2scopenum[fixture_name] = _scopenum # noqa 

1199 

1200 

1201# def get_calls_for_partition(metafunc, super_closure, p_idx, pending): 

1202# """ 

1203# Parametrizes all fixtures that are actually used in this partition 

1204# Cleans the calls list so that all calls contain a value for all parameters. This is basically 

1205# about adding "NOT_USED" parametrization everywhere relevant. 

1206# 

1207# :return: a list of CallSpec2 

1208# """ 

1209# calls = [] 

1210# 

1211# # A/ parametrize all fixtures that are actually used in this partition 

1212# for fixture_name in super_closure.partitions[p_idx]: 

1213# try: 

1214# # pop it from pending - do not rely the order in pending but rather the order in the closure 

1215# p_to_apply = pending.pop(fixture_name) 

1216# except KeyError: 

1217# # not a parametrized fixture 

1218# continue 

1219# else: 

1220# if isinstance(p_to_apply, UnionParamz): 

1221# # ******** Union parametrization ********** 

1222# # selected_ids, selected_alternative = super_closure.get_parameter_to_apply(p_to_apply, p_idx) 

1223# num, selected_filter = super_closure.filters[p_idx][p_to_apply.union_fixture_name] 

1224# # in order to get the *actual* id to use (with all pytest subtleties in case of two identical ids 

1225# # appearing in the list), we create a fake calls list 

1226# fake_calls = _parametrize_calls(metafunc, [], p_to_apply.union_fixture_name, 

1227# p_to_apply.alternative_names, ids=p_to_apply.ids, 

1228# scope=p_to_apply.scope, indirect=True, **p_to_apply.kwargs) 

1229# selected_id = fake_calls[num].id 

1230# selected_alternative = p_to_apply.alternative_names[num] 

1231# # assert selected_alternative.alternative_name == selected_filter 

1232# 

1233# if _DEBUG: 

1234# print("[Partition %s] Applying parametrization for UNION fixture %r=%r" 

1235# "" % (p_idx, p_to_apply.union_fixture_name, selected_alternative)) 

1236# 

1237# # always use 'indirect' since that's a fixture. 

1238# calls = _parametrize_calls(metafunc, calls, p_to_apply.union_fixture_name, 

1239# [selected_alternative], ids=[selected_id], scope=p_to_apply.scope, 

1240# indirect=True, **p_to_apply.kwargs) 

1241# 

1242# elif isinstance(p_to_apply, NormalParamz): 

1243# # ******** Normal parametrization ********** 

1244# if _DEBUG: 

1245# print("[Partition %s] Applying parametrization for NORMAL %s" 

1246# "" % (p_idx, p_to_apply.argnames)) 

1247# 

1248# calls = _parametrize_calls(metafunc, calls, p_to_apply.argnames, p_to_apply.argvalues, 

1249# indirect=p_to_apply.indirect, ids=p_to_apply.ids, 

1250# scope=p_to_apply.scope, **p_to_apply.kwargs) 

1251# else: 

1252# raise TypeError("Invalid parametrization type: %s" % p_to_apply.__class__) 

1253# 

1254# # Cleaning 

1255# for i in range(len(calls)): 

1256# c = calls[i] 

1257# 

1258# # B/ set to "not used" all parametrized fixtures that were not used in some branches 

1259# for fixture_name, p_to_apply in pending.items(): 

1260# if fixture_name not in c.params and fixture_name not in c.funcargs: 

1261# # parametrize with a single "not used" value and discard the id 

1262# if isinstance(p_to_apply, UnionParamz): 

1263# c_with_dummy = _parametrize_calls(metafunc, [c], p_to_apply.union_fixture_name, [NOT_USED], 

1264# indirect=True, discard_id=True, scope=p_to_apply.scope, 

1265# **p_to_apply.kwargs) 

1266# else: 

1267# _nb_argnames = len(get_param_argnames_as_list(p_to_apply.argnames)) 

1268# if _nb_argnames > 1: 

1269# _vals = [(NOT_USED,) * _nb_argnames] 

1270# else: 

1271# _vals = [NOT_USED] 

1272# c_with_dummy = _parametrize_calls(metafunc, [c], p_to_apply.argnames, _vals, 

1273# indirect=p_to_apply.indirect, discard_id=True, 

1274# scope=p_to_apply.scope, **p_to_apply.kwargs) 

1275# assert len(c_with_dummy) == 1 

1276# calls[i] = c_with_dummy[0] 

1277# c = calls[i] 

1278# 

1279# # C/ some non-parametrized fixtures may also need to be explicitly deactivated in some callspecs 

1280# # otherwise they will be setup/teardown. 

1281# # 

1282# # For this we use a dirty hack: we add a parameter with they name in the callspec, it seems to be propagated 

1283# # in the `request`. TODO is there a better way? 

1284# for fixture_name in super_closure.get_not_always_used(): 

1285# try: 

1286# fixdef = metafunc._arg2fixturedefs[fixture_name] # noqa 

1287# except KeyError: 

1288# continue # dont raise any error here and instead let pytest say "not found" 

1289# 

1290# if fixture_name not in c.params and fixture_name not in c.funcargs: 

1291# if not super_closure.requires(fixture_name, p_idx): 

1292# # explicitly add it as discarded by creating a parameter value for it. 

1293# c.params[fixture_name] = NOT_USED 

1294# c.indices[fixture_name] = 1 

1295# c._arg2scopenum[fixture_name] = get_pytest_scopenum(fixdef[-1].scope) # noqa 

1296# else: 

1297# # explicitly add it as active by creating a parameter value for it. 

1298# c.params[fixture_name] = 'used' 

1299# c.indices[fixture_name] = 0 

1300# c._arg2scopenum[fixture_name] = get_pytest_scopenum(fixdef[-1].scope) # noqa 

1301# 

1302# return calls 

1303 

1304 

1305@property 

1306def id(self): 

1307 # legacy _CallSpec2 id was filtering empty strings, we'll put it back on the class below 

1308 # https://github.com/pytest-dev/pytest/blob/5.3.4/src/_pytest/python.py#L861 

1309 return "-".join(map(str, filter(None, self._idlist))) 

1310 

1311 

1312def _parametrize_calls(metafunc, init_calls, argnames, argvalues, discard_id=False, indirect=False, ids=None, 

1313 scope=None, **kwargs): 

1314 """Parametrizes the initial `calls` with the provided information and returns the resulting new calls""" 

1315 

1316 # make a backup so that we can restore the metafunc at the end 

1317 bak = metafunc._calls # noqa 

1318 

1319 # place the initial calls on the metafunc 

1320 metafunc._calls = init_calls if init_calls is not None else [] 

1321 

1322 # parametrize the metafunc. Since we replaced the `parametrize` method on `metafunc` we have to call super 

1323 metafunc.__class__.parametrize(metafunc, argnames, argvalues, indirect=indirect, ids=ids, scope=scope, **kwargs) 

1324 

1325 # extract the result 

1326 new_calls = metafunc._calls # noqa 

1327 

1328 # If the user wants to discard the newly created id, remove the last id in all these callspecs in this node 

1329 if discard_id: 

1330 for callspec in new_calls: 

1331 callspec._idlist.pop(-1) # noqa 

1332 

1333 # restore the metafunc and return the new calls 

1334 metafunc._calls = bak 

1335 return new_calls 

1336 

1337 

1338def _process_node(metafunc, 

1339 current_node, # type: FixtureClosureNode 

1340 pending, # type: MutableMapping[str, Union[UnionParamz, NormalParamz]] 

1341 calls # type: List[CallSpec2] 

1342 ): 

1343 """ 

1344 Routine to apply all the parametrization tasks in `pending` that are relevant to `current_node`, 

1345 to `calls` (a list of pytest CallSpec2). 

1346 

1347 It first applies all parametrization that correspond to current node (normal parameters), 

1348 then applies the "split" parametrization if needed and recurses into each tree branch. 

1349 

1350 It returns a tuple containing a list of calls and a list of same length containing which leaf node each one 

1351 corresponds to. 

1352 

1353 :param metafunc: 

1354 :param current_node: the closure tree node we're focusing on 

1355 :param pending: a list of parametrization orders to apply 

1356 :param calls: 

1357 :return: a tuple (calls, nodes) of two lists of the same length. So that for each CallSpec calls[i], you can see 

1358 the corresponding leaf node in nodes[i] 

1359 """ 

1360 

1361 # (1) first apply all **non-split** fixtures at this node = NORMAL PARAMETERS 

1362 # in the order defined in the closure tree, do not trust the order of the received parametrize (`pending`) 

1363 fixtures_at_this_node = [f for f in current_node.fixture_defs.keys() 

1364 if f is not current_node.split_fixture_name] 

1365 for fixturename in fixtures_at_this_node: 

1366 try: 

1367 # pop the corresponding parametrization from pending - do not trust the order 

1368 p_to_apply = pending.pop(fixturename) 

1369 except KeyError: 

1370 # fixturename is not a parametrized fixture, nothing to do 

1371 continue 

1372 else: 

1373 if isinstance(p_to_apply, UnionParamz): 1373 ↛ 1374line 1373 didn't jump to line 1374 because the condition on line 1373 was never true

1374 raise ValueError("This should not happen! Only Normal parameters should be in fixtures_at_this_node") 

1375 elif isinstance(p_to_apply, NormalParamz): 1375 ↛ 1385line 1375 didn't jump to line 1385 because the condition on line 1375 was always true

1376 # ******** Normal parametrization ********** 

1377 if _DEBUG: 1377 ↛ 1378line 1377 didn't jump to line 1378 because the condition on line 1377 was never true

1378 print("[Node %s] Applying parametrization for NORMAL %s" 

1379 "" % (current_node.to_str(with_children=False), p_to_apply.argnames)) 

1380 

1381 calls = _parametrize_calls(metafunc, calls, p_to_apply.argnames, p_to_apply.argvalues, 

1382 indirect=p_to_apply.indirect, ids=p_to_apply.ids, 

1383 scope=p_to_apply.scope, **p_to_apply.kwargs) 

1384 else: 

1385 raise TypeError("Invalid parametrization type: %s" % p_to_apply.__class__) 

1386 

1387 # (2) then is there a "union" = a split between two sub-branches in the tree ? 

1388 if not current_node.has_split(): 

1389 # No split = tree leaf: return 

1390 nodes = [current_node] * len(calls) 

1391 return calls, nodes 

1392 else: 

1393 # There is a **split** : apply its parametrization (a UNION parameter) 

1394 try: 

1395 # pop the corresponding parametrization from pending - do not trust the order 

1396 p_to_apply = pending.pop(current_node.split_fixture_name) 

1397 except KeyError: 

1398 raise ValueError("This should not happen! fixture union parametrization missing, but this is a split node") 

1399 else: 

1400 if isinstance(p_to_apply, NormalParamz): 1400 ↛ 1401line 1400 didn't jump to line 1401 because the condition on line 1400 was never true

1401 raise ValueError("This should not happen! Split nodes correspond to Union parameters, not Normal ones.") 

1402 elif isinstance(p_to_apply, UnionParamz): 1402 ↛ exitline 1402 didn't return from function '_process_node' because the condition on line 1402 was always true

1403 # ******** Union parametrization ********** 

1404 if _DEBUG: 1404 ↛ 1405line 1404 didn't jump to line 1405 because the condition on line 1404 was never true

1405 print("[Node %s] Applying parametrization for UNION %s" 

1406 "" % (current_node.to_str(with_children=False), p_to_apply.union_fixture_name)) 

1407 

1408 # always use 'indirect' since that's a fixture. 

1409 calls = _parametrize_calls(metafunc, calls, p_to_apply.union_fixture_name, 

1410 p_to_apply.alternative_names, indirect=True, 

1411 ids=p_to_apply.ids, scope=p_to_apply.scope, **p_to_apply.kwargs) 

1412 

1413 # now move to the children 

1414 nodes_children = [None] * len(calls) 

1415 for i in range(len(calls)): 

1416 active_alternative = calls[i].params[p_to_apply.union_fixture_name] 

1417 child_indices = [_i for _i, x in enumerate(current_node.split_fixture_alternatives) 

1418 if x == active_alternative.alternative_name] 

1419 # only use the first matching child, since the subtrees are identical. 

1420 child_node = current_node.children[child_indices[0]] 

1421 child_pending = pending.copy() 

1422 

1423 # place the children parameter in the first position if it is in the list 

1424 # not needed anymore - already automatic 

1425 # try: 

1426 # child_pending.move_to_end(child_alternative, last=False) 

1427 # except KeyError: 

1428 # # not in the list: the child alternative is a non-parametrized fixture 

1429 # pass 

1430 

1431 calls[i], nodes_children[i] = _process_node(metafunc, child_node, child_pending, [calls[i]]) 

1432 

1433 # finally flatten the list if needed 

1434 calls = flatten_list(calls) 

1435 nodes_children = flatten_list(nodes_children) 

1436 return calls, nodes_children 

1437 

1438 

1439# def _make_unique(lst): 

1440# _set = set() 

1441# 

1442# def _first_time_met(v): 

1443# if v not in _set: 

1444# _set.add(v) 

1445# return True 

1446# else: 

1447# return False 

1448# 

1449# return [v for v in lst if _first_time_met(v)] 

1450 

1451 

1452def flatten_list(lst): 

1453 return [v for nested_list in lst for v in nested_list] 

1454 

1455 

1456def sort_according_to_ref_list(fixturenames, param_names): 

1457 """ 

1458 Sorts items in the first list, according to their position in the second. 

1459 Items that are not in the second list stay in the same position, the others are just swapped. 

1460 A new list is returned. 

1461 

1462 :param fixturenames: 

1463 :param param_names: 

1464 :return: 

1465 """ 

1466 cur_indices = [] 

1467 for pname in param_names: 

1468 try: 

1469 cur_indices.append(fixturenames.index(pname)) 

1470 except (ValueError, IndexError): 

1471 # can happen in case of indirect parametrization: a parameter is not in the fixture name. 

1472 # TODO we should maybe rather add the pname to fixturenames in this case ? 

1473 pass 

1474 target_indices = sorted(cur_indices) 

1475 sorted_fixturenames = list(fixturenames) 

1476 for old_i, new_i in zip(cur_indices, target_indices): 

1477 sorted_fixturenames[new_i] = fixturenames[old_i] 

1478 return sorted_fixturenames 

1479 

1480 

1481_OPTION_NAME = 'with_reorder' 

1482_SKIP = 'skip' 

1483_NORMAL = 'normal' 

1484_OPTIONS = { 

1485 _NORMAL: """(default) the usual reordering done by pytest to optimize setup/teardown of session- / module- 

1486/ class- fixtures, as well as all the modifications made by other plugins (e.g. pytest-reorder)""", 

1487 _SKIP: """skips *all* reordering, even the one done by pytest itself or installed plugins 

1488(e.g. pytest-reorder)""" 

1489} 

1490 

1491 

1492# @hookspec(historic=True) 

1493def pytest_addoption(parser): 

1494 group = parser.getgroup('pytest-cases ordering', 'pytest-cases reordering options', after='general') 

1495 help_str = """String specifying one of the reordering alternatives to use. Should be one of : 

1496 - %s""" % ("\n - ".join(["%s: %s" % (k, v) for k, v in _OPTIONS.items()])) 

1497 group.addoption( 

1498 '--%s' % _OPTION_NAME.replace('_', '-'), type=str, default='normal', help=help_str 

1499 ) 

1500 

1501 

1502# will be loaded when the pytest_configure hook below is called 

1503PYTEST_CONFIG = None # type: Optional[Config] 

1504 

1505 

1506def pytest_load_initial_conftests(early_config): 

1507 # store the received config object for future use; see #165 #166 #196 

1508 global PYTEST_CONFIG 

1509 PYTEST_CONFIG = early_config 

1510 

1511 

1512# @hookspec(historic=True) 

1513def pytest_configure(config): 

1514 # validate the config 

1515 allowed_values = ('normal', 'skip') 

1516 reordering_choice = config.getoption(_OPTION_NAME) 

1517 if reordering_choice not in allowed_values: 1517 ↛ 1518line 1517 didn't jump to line 1518 because the condition on line 1517 was never true

1518 raise ValueError("[pytest-cases] Wrong --%s option: %s. Allowed values: %s" 

1519 "" % (_OPTION_NAME, reordering_choice, allowed_values)) 

1520 

1521 

1522@pytest.hookimpl(tryfirst=True, hookwrapper=True) 

1523def pytest_collection_modifyitems(session, config, items): # noqa 

1524 """ 

1525 An alternative to the `reorder_items` function in fixtures.py 

1526 (https://github.com/pytest-dev/pytest/blob/master/src/_pytest/fixtures.py#L209) 

1527 

1528 We basically set back the previous order once the pytest ordering routine has completed. 

1529 

1530 TODO we should set back an optimal ordering, but current PR https://github.com/pytest-dev/pytest/pull/3551 

1531 will probably not be relevant to handle our "union" fixtures > need to integrate the NOT_USED markers in the method 

1532 

1533 :param session: 

1534 :param config: 

1535 :param items: 

1536 :return: 

1537 """ 

1538 ordering_choice = config.getoption(_OPTION_NAME) 

1539 

1540 if ordering_choice == _SKIP: 

1541 # remember initial order 

1542 initial_order = copy(items) 

1543 yield 

1544 # put back the initial order but keep the filter 

1545 to_return = [None] * len(items) 

1546 i = 0 

1547 for item in initial_order: 

1548 if item in items: 

1549 to_return[i] = item 

1550 i += 1 

1551 assert i == len(items) 

1552 items[:] = to_return 

1553 

1554 else: 

1555 # do nothing 

1556 yield 

1557 

1558 

1559@pytest.fixture 

1560def current_cases(request): 

1561 """ 

1562 A fixture containing `get_current_cases(request)` 

1563 

1564 This is a dictionary containing all case parameters for the currently active `pytest` item. 

1565 For each test function argument parametrized using a `@parametrize_with_case(<argname>, ...)` this dictionary 

1566 contains an entry `{<argname>: (case_id, case_function, case_params)}`. If several argnames are parametrized this 

1567 way, a dedicated entry will be present for each argname. The tuple is a `namedtuple` containing 

1568 

1569 - `id` a string containing the actual case id constructed by `@parametrize_with_cases`. 

1570 - `function` the original case function. 

1571 - `params` a dictionary, containing the parameters of the case, if itself is parametrized. Note that if the 

1572 case is parametrized with `@parametrize_with_cases`, the associated parameter value in the dictionary will also be 

1573 `(actual_id, case_function, case_params)`. 

1574 

1575 If a fixture parametrized with cases is active, the dictionary will contain an entry `{<fixturename>: <dct>}` where 

1576 `<dct>` is a dictionary `{<argname>: (case_id, case_function, case_params)}`. 

1577 

1578 To get more information on a case function, you can use `get_case_marks(f)`, `get_case_tags(f)`. 

1579 You can also use `matches_tag_query` to check if a case function matches some expectations either concerning its id 

1580 or its tags. See https://smarie.github.io/python-pytest-cases/#filters-and-tags 

1581 """ 

1582 return get_current_cases(request)