Coverage for src/pytest_cases/plugin.py: 85%

524 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-09-26 21:52 +0000

1# Authors: Sylvain MARIE <sylvain.marie@se.com> 

2# + All contributors to <https://github.com/smarie/python-pytest-cases> 

3# 

4# License: 3-clause BSD, <https://github.com/smarie/python-pytest-cases/blob/master/LICENSE> 

5from collections import OrderedDict, namedtuple 

6from copy import copy 

7from functools import partial 

8from warnings import warn 

9 

10try: 

11 from collections.abc import MutableSequence 

12except: # noqa 

13 from collections import MutableSequence 

14 

15import pytest 

16 

17try: # python 3.3+ 

18 from inspect import signature 

19except ImportError: 

20 from funcsigs import signature # noqa 

21 

22try: # python 3.3+ type hints 

23 from typing import List, Tuple, Union, Iterable, MutableMapping, Mapping, Optional # noqa 

24 from _pytest.python import CallSpec2 

25 from _pytest.config import Config 

26except ImportError: 

27 pass 

28 

29from .common_mini_six import string_types 

30from .common_pytest_lazy_values import get_lazy_args 

31from .common_pytest_marks import PYTEST35_OR_GREATER, PYTEST46_OR_GREATER, PYTEST37_OR_GREATER, PYTEST7_OR_GREATER, PYTEST8_OR_GREATER 

32from .common_pytest import get_pytest_nodeid, get_pytest_function_scopeval, is_function_node, get_param_names, \ 

33 get_param_argnames_as_list, has_function_scope, set_callspec_arg_scope_to_function, in_callspec_explicit_args 

34 

35from .fixture_core1_unions import NOT_USED, USED, is_fixture_union_params, UnionFixtureAlternative 

36 

37# if PYTEST54_OR_GREATER: 

38# # we will need to clean the empty ids explicitly in the plugin :'( 

39from .fixture_parametrize_plus import remove_empty_ids 

40 

41from .case_parametrizer_new import get_current_cases 

42 

43 

44_DEBUG = False 

45"""Note: this is a manual flag to turn when developing (do not forget to also call pytest with -s)""" 

46 

47 

48# @pytest.hookimpl(hookwrapper=True, tryfirst=True) 

49# def pytest_pycollect_makeitem(collector, name, obj): 

50# # custom collection of additional things - we could use it one day for Cases ? 

51# # see also https://hackebrot.github.io/pytest-tricks/customize_class_collection/ 

52# outcome = yield 

53# res = outcome.get_result() 

54# if res is not None: 

55# return 

56# # nothing was collected elsewhere, let's do it here 

57# if safe_isclass(obj): 

58# if collector.istestclass(obj, name): 

59# outcome.force_result(Class(name, parent=collector)) 

60# elif collector.istestfunction(obj, name): 

61# ... 

62 

63 

64@pytest.hookimpl(tryfirst=True, hookwrapper=True) 

65def pytest_runtest_setup(item): 

66 """ Resolve all `lazy_value` in the dictionary of function args """ 

67 

68 yield # first let all other hooks run, they will do the setup etc. 

69 

70 # now item.funcargs exists so we can handle it 

71 if hasattr(item, "funcargs"): 71 ↛ exitline 71 didn't return from function 'pytest_runtest_setup', because the condition on line 71 was never false

72 item.funcargs = {argname: get_lazy_args(argvalue, item) 

73 for argname, argvalue in item.funcargs.items()} 

74 

75 

76# @pytest.hookimpl(tryfirst=True, hookwrapper=True) 

77def pytest_collection(session): 

78 """ HACK: override the fixture manager's `getfixtureclosure` method to replace it with ours """ 

79 

80 # Note for reference: another way to access the fm is `metafunc.config.pluginmanager.get_plugin('funcmanage')` 

81 session._fixturemanager.getfixtureclosure = partial(getfixtureclosure, session._fixturemanager) # noqa 

82 

83 

84class FixtureDefsCache(object): 

85 """ 

86 A 'cache' for fixture definitions obtained from the FixtureManager `fm`, for test node `nodeid` 

87 """ 

88 __slots__ = 'fm', 'node', 'cached_fix_defs' 

89 

90 def __init__(self, fm, node): 

91 self.fm = fm 

92 self.node = node 

93 self.cached_fix_defs = dict() 

94 

95 def get_fixture_defs(self, fixname): 

96 try: 

97 # try to retrieve it from cache 

98 fixdefs = self.cached_fix_defs[fixname] 

99 except KeyError: 

100 # otherwise get it and store for next time 

101 if hasattr(pytest, "version_tuple") and pytest.version_tuple >= (8, 1): 101 ↛ 102line 101 didn't jump to line 102, because the condition on line 101 was never true

102 fixdefs = self.fm.getfixturedefs(fixname, self.node) 

103 else: 

104 fixdefs = self.fm.getfixturedefs(fixname, self.node.nodeid) 

105 self.cached_fix_defs[fixname] = fixdefs 

106 

107 return fixdefs 

108 

109 

110class FixtureClosureNode(object): 

111 """ 

112 A node in a fixture closure Tree. 

113 

114 - its `fixture_defs` is a {name: def} ordered dict containing all fixtures AND args that are required at this node 

115 (*before* a union is required). Note that some of them have def=None when the fixture manager has no definition 

116 for them (same behaviour than in pytest). `get_all_fixture_names` and `get_all_fixture_defs` helper functions 

117 allow to either return the full ordered list (equivalent to pytest `fixture_names`) or the dictionary of non-none 

118 definitions (equivalent to pytest `arg2fixturedefs`) 

119 

120 - if a union appears at this node, `split_fixture_name` is set to the name of the union fixture, and `children` 

121 contains an ordered dict of {split_fixture_alternative: node} 

122 

123 """ 

124 __slots__ = 'parent', 'fixture_defs_mgr', \ 

125 'fixture_defs', 'split_fixture_name', 'split_fixture_alternatives', 'children' 

126 

127 def __init__(self, 

128 fixture_defs_mgr=None, # type: FixtureDefsCache 

129 parent_node=None # type: FixtureClosureNode 

130 ): 

131 if fixture_defs_mgr is None: 

132 if parent_node is None: 132 ↛ 133line 132 didn't jump to line 133, because the condition on line 132 was never true

133 raise ValueError("root node should have a fixture defs manager") 

134 fixture_defs_mgr = parent_node.fixture_defs_mgr 

135 else: 

136 assert isinstance(fixture_defs_mgr, FixtureDefsCache) 

137 

138 self.fixture_defs_mgr = fixture_defs_mgr 

139 self.parent = parent_node 

140 

141 # these will be set after closure has been built 

142 self.fixture_defs = None # type: OrderedDict 

143 self.split_fixture_name = None # type: str 

144 self.split_fixture_alternatives = [] 

145 # we do not use a dict any more as several children can use the same union value (doubled unions) 

146 self.children = [] # type: List[FixtureClosureNode] 

147 

148 # ------ tree ------------------ 

149 

150 def get_leaves(self): 

151 if self.has_split(): 

152 return [n for c in self.children for n in c.get_leaves()] 

153 else: 

154 return [self] 

155 

156 # ------ str / repr --------------- 

157 

158 def to_str(self, indent_nb=0, with_children=True): 

159 """ a string representation, either with all the subtree (default) or without (with_children=False) """ 

160 

161 indent = " " * indent_nb 

162 

163 if not self.is_closure_built(): 163 ↛ 164line 163 didn't jump to line 164, because the condition on line 163 was never true

164 str_repr = "<pending, incomplete>" 

165 else: 

166 str_repr = "%s(%s)" % (indent, ",".join([("%s" % f) for f in self.fixture_defs.keys()])) 

167 

168 if self.has_split() and with_children: 

169 children_str_prefix = "\n%s - " % indent 

170 children_str = children_str_prefix + children_str_prefix.join([c.to_str(indent_nb=indent_nb + 1) 

171 for c in self.children]) 

172 str_repr = str_repr + " split: " + self.split_fixture_name + children_str 

173 

174 return str_repr 

175 

176 def __repr__(self): 

177 return self.to_str() 

178 

179 # ---- getters to read the "super" closure (used in SuperClosure) 

180 

181 def get_all_fixture_names(self, try_to_sort_by_scope=True): 

182 """ Return a list containing all unique fixture names used by this tree""" 

183 if not try_to_sort_by_scope: 183 ↛ 184line 183 didn't jump to line 184, because the condition on line 183 was never true

184 return [k for k, _ in self.gen_all_fixture_defs(drop_fake_fixtures=False)] 

185 else: 

186 return list(self.get_all_fixture_defs(drop_fake_fixtures=False, try_to_sort=True)) 

187 

188 def get_all_fixture_defs(self, drop_fake_fixtures=True, try_to_sort=True): 

189 """ Return a dict containing all fixture definitions for fixtures used in this tree""" 

190 # get all pairs 

191 items = self.gen_all_fixture_defs(drop_fake_fixtures=drop_fake_fixtures) 

192 

193 # sort by scope as in pytest fixture closure creator (pytest did not do it in early versions, align with this) 

194 if try_to_sort: 194 ↛ 212line 194 didn't jump to line 212, because the condition on line 194 was never false

195 if PYTEST7_OR_GREATER: 195 ↛ 204line 195 didn't jump to line 204, because the condition on line 195 was never false

196 # Scope is an enum, values are in reversed order, and the field is _scope 

197 f_scope = get_pytest_function_scopeval() 

198 

199 def sort_by_scope(kv_pair): 

200 fixture_name, fixture_defs = kv_pair 

201 return fixture_defs[-1]._scope if fixture_defs is not None else f_scope 

202 items = sorted(list(items), key=sort_by_scope, reverse=True) 

203 

204 elif PYTEST35_OR_GREATER: 

205 # scopes is a list, values are indices in the list, and the field is scopenum 

206 f_scope = get_pytest_function_scopeval() 

207 def sort_by_scope(kv_pair): # noqa 

208 fixture_name, fixture_defs = kv_pair 

209 return fixture_defs[-1].scopenum if fixture_defs is not None else f_scope 

210 items = sorted(list(items), key=sort_by_scope) 

211 

212 return OrderedDict(items) 

213 

214 def gen_all_fixture_defs(self, drop_fake_fixtures=True): 

215 """ 

216 Generate all pairs of (fixture name, fixture def or none) used in the tree in top to bottom order 

217 Note that this method could be generalized to also yield the parent defs, so as to be used to replace 

218 the engine in `self.gather_all_required`. But this is micro-optimization, really. 

219 Note: `gather_all_required` was not built to be concerned with ordering because it is only used as a set. 

220 """ 

221 

222 # fixtures required at this node 

223 for k, v in self.fixture_defs.items(): 

224 if not drop_fake_fixtures or v is not None: 224 ↛ 223line 224 didn't jump to line 223, because the condition on line 224 was never false

225 yield k, v 

226 

227 # split fixture: not needed since it is the last entry in self.fixture_defs 

228 

229 # fixtures required by children if any 

230 for c in self.children: 

231 for k, v in c.gen_all_fixture_defs(drop_fake_fixtures=drop_fake_fixtures): 

232 yield k, v 

233 

234 # ---- utils to build the closure 

235 

236 def build_closure(self, 

237 initial_fixture_names, # type: Iterable[str] 

238 ignore_args=() 

239 ): 

240 """ 

241 Updates this Node with the fixture names provided as argument. 

242 Fixture names and definitions will be stored in self.fixture_defs. 

243 

244 If some fixtures are Union fixtures, this node will become a "split" node 

245 and have children. If new fixtures are added to the node after that, 

246 they will be added to the child nodes rather than self. 

247 

248 :param initial_fixture_names: 

249 :param ignore_args: arguments to keep in the names but not to put in the fixture defs, because they correspond 

250 to "direct parametrization" 

251 :return: 

252 """ 

253 self._build_closure(self.fixture_defs_mgr, initial_fixture_names, ignore_args=ignore_args) 

254 

255 def is_closure_built(self): 

256 return self.fixture_defs is not None 

257 

258 def already_knows_fixture(self, fixture_name): 

259 """ Return True if this fixture is known by this node or one of its parents """ 

260 if fixture_name in self.fixture_defs: 

261 return True 

262 elif self.parent is None: 

263 return False 

264 else: 

265 return self.parent.already_knows_fixture(fixture_name) 

266 

267 def _build_closure(self, 

268 fixture_defs_mgr, # type: FixtureDefsCache 

269 initial_fixture_names, # type: Iterable[str] 

270 ignore_args 

271 ): 

272 """ 

273 

274 :param fixture_defs_mgr: 

275 :param initial_fixture_names: 

276 :param ignore_args: arguments to keep in the names but not to put in the fixture defs 

277 :return: nothing (the input arg2fixturedefs is modified) 

278 """ 

279 

280 # Grab all dependencies of all fixtures present at this node and add them to either this or to nodes below. 

281 

282 # -- first switch this object from 'pending' to 'under construction' if needed 

283 # (indeed we now authorize and use the possibility to call this twice. see split() ) 

284 if self.fixture_defs is None: 

285 self.fixture_defs = OrderedDict() 

286 

287 # -- then for all pending, add them with their dependencies 

288 pending_fixture_names = list(initial_fixture_names) 

289 while len(pending_fixture_names) > 0: 

290 fixname = pending_fixture_names.pop(0) 

291 

292 # if the fixture is already known in this node or above, do not care 

293 if self.already_knows_fixture(fixname): 

294 continue 

295 

296 # new ignore_args option in pytest 4.6+. Not really a fixture but a test function parameter, it seems. 

297 if fixname in ignore_args: 

298 self.add_required_fixture(fixname, None) 

299 continue 

300 

301 # else grab the fixture definition(s) for this fixture name for this test node id 

302 fixturedefs = fixture_defs_mgr.get_fixture_defs(fixname) 

303 if not fixturedefs: 

304 # fixture without definition: add it. This can happen with e.g. "requests", etc. 

305 self.add_required_fixture(fixname, None) 

306 continue 

307 else: 

308 # the actual definition is the last one 

309 _fixdef = fixturedefs[-1] 

310 _params = _fixdef.params 

311 

312 if _params is not None and is_fixture_union_params(_params): 

313 # create an UNION fixture 

314 

315 # transform the _params into a list of names 

316 alternative_f_names = UnionFixtureAlternative.to_list_of_fixture_names(_params) 

317 

318 # TO DO if only one name, simplify ? >> No, we leave such "optimization" to the end user 

319 

320 # if there are direct dependencies that are not the union members, add them to pending 

321 non_member_dependencies = [f for f in _fixdef.argnames if f not in alternative_f_names] 

322 # currently we only have 'requests' in this list but future impl of fixture_union may act otherwise 

323 pending_fixture_names += non_member_dependencies 

324 

325 # propagate WITH the pending 

326 self.split_and_build(fixture_defs_mgr, fixname, fixturedefs, alternative_f_names, 

327 pending_fixture_names, ignore_args=ignore_args) 

328 

329 # empty the pending because all of them have been propagated on all children with their dependencies 

330 pending_fixture_names = [] 

331 continue 

332 

333 else: 

334 # normal fixture 

335 self.add_required_fixture(fixname, fixturedefs) 

336 

337 # add all dependencies in the to do list 

338 dependencies = _fixdef.argnames 

339 # - append: was pytest default 

340 # pending_fixture_names += dependencies 

341 # - prepend: makes much more sense 

342 pending_fixture_names = list(dependencies) + pending_fixture_names 

343 continue 

344 

345 # ------ tools to add new fixture names during closure construction 

346 

347 # def prepend_fixture_without_dependencies(self, fixname): 

348 # """""" 

349 # fixturedefs = self.fixture_defs_mgr.get_fixture_defs(fixname) 

350 # if not fixturedefs: 

351 # # fixture without definition: add it. This can happen with e.g. "requests", etc. 

352 # self.fixture_defs.insert((fixname, None)) 

353 # else: 

354 # # the actual definition is the last one 

355 # _fixdef = fixturedefs[-1] 

356 # _params = _fixdef.params 

357 # 

358 # if _params is not None and is_fixture_union_params(_params): 

359 # # union fixture 

360 # raise ValueError("It is not possible to add a union fixture after the initial closure has been built") 

361 # else: 

362 # # normal fixture 

363 # self.add_required_fixture(fixname, fixturedefs) 

364 # 

365 # # add all dependencies in the to do list 

366 # dependencies = _fixdef.argnames 

367 

368 # def add_fixture_without_dependencies(self, fixname): 

369 # """Used for later addition, once the closure has been built""" 

370 # fixturedefs = self.fixture_defs_mgr.get_fixture_defs(fixname) 

371 # if not fixturedefs: 

372 # # fixture without definition: add it. This can happen with e.g. "requests", etc. 

373 # self.add_required_fixture(fixname, None) 

374 # else: 

375 # # the actual definition is the last one 

376 # _fixdef = fixturedefs[-1] 

377 # _params = _fixdef.params 

378 # 

379 # if _params is not None and is_fixture_union_params(_params): 

380 # # union fixture 

381 # raise ValueError("It is not possible to add a union fixture after the initial closure has been built") 

382 # else: 

383 # # normal fixture 

384 # self.add_required_fixture(fixname, fixturedefs) 

385 

386 def remove_fixtures(self, fixture_names_to_remove): 

387 """Remove some fixture names from all nodes in this subtree. These fixtures should not be split fixtures""" 

388 _to_remove_in_children = [] 

389 for f in fixture_names_to_remove: 

390 if self.split_fixture_name == f: 

391 raise NotImplementedError("It is not currently possible to remove a split fixture name from a closure " 

392 "with splits") 

393 try: 

394 del self.fixture_defs[f] 

395 except KeyError: 

396 _to_remove_in_children.append(f) 

397 

398 # propagate to children if any 

399 if len(_to_remove_in_children) > 0: 

400 for c in self.children: 

401 c.remove_fixtures(_to_remove_in_children) 

402 

403 def add_required_fixture(self, new_fixture_name, new_fixture_defs): 

404 """Add some required fixture names to all leaves under this node""" 

405 if self.already_knows_fixture(new_fixture_name): 405 ↛ 406line 405 didn't jump to line 406, because the condition on line 405 was never true

406 return 

407 elif not self.has_split(): 

408 # add_required_fixture locally 

409 if new_fixture_name not in self.fixture_defs: 409 ↛ exitline 409 didn't return from function 'add_required_fixture', because the condition on line 409 was never false

410 self.fixture_defs[new_fixture_name] = new_fixture_defs 

411 else: 

412 # add_required_fixture in each child 

413 for c in self.children: 

414 c.add_required_fixture(new_fixture_name, new_fixture_defs) 

415 

416 def split_and_build(self, 

417 fixture_defs_mgr, # type: FixtureDefsCache 

418 split_fixture_name, # type: str 

419 split_fixture_defs, # type: Tuple[FixtureDefinition] # noqa 

420 alternative_fixture_names, # type: List[str] 

421 pending_fixtures_list, # 

422 ignore_args 

423 ): 

424 """ Declares that this node contains a union with alternatives (child nodes=subtrees) """ 

425 

426 if self.has_split(): 426 ↛ 427line 426 didn't jump to line 427, because the condition on line 426 was never true

427 raise ValueError("This should not happen anymore") 

428 # # propagate the split on the children: split each of them 

429 # for n in self.children: 

430 # n.split_and_build(fm, nodeid, split_fixture_name, split_fixture_defs, alternative_fixture_names) 

431 else: 

432 # add the split (union) name to known fixtures 

433 self.add_required_fixture(split_fixture_name, split_fixture_defs) 

434 

435 # remember it 

436 self.split_fixture_name = split_fixture_name 

437 self.split_fixture_alternatives = alternative_fixture_names 

438 

439 # create the child nodes 

440 for f in alternative_fixture_names: 

441 # create the child node 

442 new_c = FixtureClosureNode(parent_node=self) 

443 self.children.append(new_c) 

444 

445 # set the discarded fixture names 

446 # new_c.split_fixture_discarded_names = [g for g in alternative_fixture_names if g != f] 

447 

448 # perform the propagation: 

449 # (a) first propagate all child's dependencies, (b) then the ones required by parent 

450 # we need to do both at the same time in order to propagate the "pending for child" on all subbranches 

451 pending_for_child = [f] + pending_fixtures_list 

452 new_c._build_closure(fixture_defs_mgr, pending_for_child, ignore_args=ignore_args) 

453 

454 def has_split(self): 

455 return self.split_fixture_name is not None 

456 

457 # ----------- for calls parametrization 

458 

459 def get_not_always_used(self): 

460 """Return the list of fixtures used by this subtree, that are used in *some* leaves only, not all""" 

461 results_list = [] 

462 

463 # initial list is made of fixtures that are in the children 

464 initial_list = self.gather_all_required(include_parents=False) 

465 

466 for c in self.get_leaves(): 

467 j = 0 

468 for _ in range(len(initial_list)): 

469 # get next element in the list (but the list may reduce in size during the loop) 

470 fixture_name = initial_list[j] 

471 if fixture_name not in c.gather_all_required(): 

472 # Remove element from the list. Therefore, do not increment j 

473 del initial_list[j] 

474 results_list.append(fixture_name) 

475 else: 

476 # Do not remove from the list: increment j 

477 j += 1 

478 

479 return results_list 

480 

481 def gather_all_required(self, include_children=True, include_parents=True): 

482 """ 

483 Return a list of all fixtures required by the subtree containing this node 

484 and all of its parents (if include_parents=True) and all of its children (if include_children=True) 

485 

486 See also `self.gen_all_fixture_defs`, that could be generalized to tackle this use case too 

487 (micro-optimization, not really urgent) 

488 """ 

489 # first the fixtures required by this node 

490 required = list(self.fixture_defs.keys()) 

491 

492 # then the ones required by the parents 

493 if include_parents and self.parent is not None: 

494 required = required + self.parent.gather_all_required(include_children=False) 

495 

496 # then the ones from all the children 

497 if include_children: 

498 for child in self.children: 

499 required = required + child.gather_all_required(include_parents=False) 

500 

501 return required 

502 

503 def requires(self, fixturename): 

504 """ Return True if the fixture with this name is required by the subtree at this node """ 

505 return fixturename in self.gather_all_required() 

506 

507 # ------ tools to see the tree as a list of alternatives (used in SuperClosure) 

508 

509 def get_alternatives(self): 

510 """ 

511 Returns the tree "flattened" as a list of alternatives (one per leaf). 

512 Each entry in the list consists of: 

513 

514 - an ordered dictionary {union_fixture_name: (idx, value)} representing the active union filters in this 

515 alternative 

516 - a list of fixture names effectively used in this alternative 

517 

518 :return: a list of alternatives 

519 """ 

520 alternatives = self._get_alternatives() 

521 for i, a in enumerate(alternatives): 

522 # replace the first entry in the tuple with a reversed order one 

523 alternatives[i] = (OrderedDict(reversed(list(a[0].items()))), a[1]) 

524 return alternatives 

525 

526 def _get_alternatives(self): 

527 if self.has_split(): 

528 alternatives_list = [] 

529 for c_idx, (c_split_alternative, c_node) in enumerate(zip(self.split_fixture_alternatives, self.children)): 

530 # for all alternatives in this subtree 

531 for f_dct, n_lst in c_node._get_alternatives(): 

532 # - filter 

533 _f_dct = f_dct.copy() 

534 _f_dct[self.split_fixture_name] = (c_idx, c_split_alternative) 

535 

536 # - unique fixtures used 

537 _n_lst = list(self.fixture_defs) + [_i for _i in n_lst if _i not in self.fixture_defs] 

538 

539 alternatives_list.append((_f_dct, _n_lst)) 

540 

541 return alternatives_list 

542 else: 

543 # return a single partition containing no filter and all fixture names 

544 return [(OrderedDict(), self.get_all_fixture_names())] 

545 

546 

547class SuperClosure(MutableSequence): 

548 """ 

549 A "super closure" is a closure made of several closures, each induced by a fixture union parameter value. 

550 The number of alternative closures is `self.nb_alternative_closures` 

551 

552 This object behaves like a list (a mutable sequence), so that we can pass it to pytest in place of the list of 

553 fixture names that is returned in `getfixtureclosure`. 

554 

555 In this implementation, it is backed by a fixture closure tree, that we have to preserve in order to get 

556 parametrization right. In another branch of this project ('super_closure' branch) we tried to forget the tree 

557 and only keep the partitions, but parametrization order was not as intuitive for the end user as all unions 

558 appeared as parametrized first (since they induced the partitions). 

559 """ 

560 __slots__ = 'tree', 'all_fixture_defs' 

561 

562 def __init__(self, 

563 root_node # type: FixtureClosureNode 

564 ): 

565 # if we wish to drop the tree - but we do not anymore to get a better paramz order 

566 # filters_list, partitions_list = root_node._get_alternatives() 

567 

568 # save the fixture closure tree root 

569 self.tree = root_node 

570 # retrieve/sort fixture defs for quicker access 

571 self._update_fixture_defs() 

572 

573 def _update_fixture_defs(self): 

574 # get a list of all fixture defs, for quicker access (and sorted) 

575 # sort by scope as in pytest fixture closure creator, if scope information is available 

576 all_fixture_defs = self.tree.get_all_fixture_defs(drop_fake_fixtures=False, try_to_sort=True) 

577 

578 # # also sort all partitions (note that we cannot rely on the order in all_fixture_defs when scopes are same!) 

579 # if Version(pytest.__version__) >= Version('3.5.0'): 

580 # f_scope = get_pytest_function_scopeval() 

581 # for p in self.partitions: 

582 # def sort_by_scope2(fixture_name): # noqa 

583 # fixture_defs = all_fixture_defs[fixture_name] 

584 # return fixture_defs[-1].scopenum if fixture_defs is not None else f_scope 

585 # p.sort(key=sort_by_scope2) 

586 

587 self.all_fixture_defs = all_fixture_defs 

588 

589 # --- visualization tools ---- 

590 

591 @property 

592 def nb_alternative_closures(self): 

593 """ Return the number of alternative closures induced by fixture unions """ 

594 filters, partitions = self.tree.get_alternatives() 

595 return len(partitions) 

596 

597 def __repr__(self): 

598 """ Return a synthetic view, and a detailed tree view, of this closure """ 

599 alternatives = self.tree.get_alternatives() 

600 nb_alternative_closures = len(alternatives) 

601 return "SuperClosure with %s alternative closures:\n" % nb_alternative_closures \ 

602 + "\n".join(" - %s (filters: %s)" % (p, ", ".join("%s=%s[%s]=%s" % (k, k, v[0], v[1]) 

603 for k, v in f.items())) 

604 for f, p in alternatives) \ 

605 + "\nThe 'super closure list' is %s\n\nThe fixture tree is :\n%s\n" % (list(self), self.tree) 

606 

607 def get_all_fixture_defs(self, drop_fake_fixtures=True): 

608 """ 

609 Return a dictionary of all fixture defs used in this super closure 

610 

611 note: this is equivalent to 

612 self.tree.get_all_fixture_defs(drop_fake_fixtures=drop_fake_fixtures, try_to_sort=True) 

613 """ 

614 if drop_fake_fixtures: 614 ↛ 619line 614 didn't jump to line 619, because the condition on line 614 was never false

615 # remove the "fixtures" that are actually test function parameter args 

616 return {k: v for k, v in self.all_fixture_defs.items() if v is not None} 

617 else: 

618 # all fixtures AND pseudo-fixtures (test function parameters) 

619 return self.all_fixture_defs 

620 

621 # ---- list (MutableSequence) facade: behaves like a list of fixture names ------ 

622 

623 def __len__(self): 

624 return len(self.all_fixture_defs) 

625 

626 def __getitem__(self, i): 

627 # return the key (fixture name) associated with the i-th pair 

628 # try: 

629 # return next(islice(self.all_fixture_defs.keys(), i, i+1)) 

630 # except StopIteration: 

631 # raise IndexError(i) 

632 return list(self.all_fixture_defs.keys())[i] 

633 

634 def __setitem__(self, i, o): 

635 # try: 

636 # # pytest performs a full replacement using [:] so we handle at least this case 

637 # full_replace = i == slice(None, None, None) 

638 # except: # noqa 

639 # full_replace = False 

640 

641 # Get the existing value(s) that we wish to replace 

642 ref = list(self)[i] 

643 

644 if o == ref: 

645 # no change at all: of course we accept. 

646 return 

647 

648 if not isinstance(i, slice): 648 ↛ 657line 648 didn't jump to line 657, because the condition on line 648 was never true

649 # In-place change of a single item: let's be conservative and reject for now 

650 # if i == 0: 

651 # self.remove(ref) 

652 # self.insert(0, o) 

653 # elif i == len(self) - 1: 

654 # self.remove(ref) 

655 # self.append(o) 

656 # else: 

657 raise NotImplementedError("Replacing an element in a super fixture closure is not currently implemented. " 

658 "Please report this issue to the `pytest-cases` project.") 

659 else: 

660 # Replacement of multiple items at once: support reordering (ignored) and removal (actually done) 

661 new_set = set(o) 

662 ref_set = set(ref) 

663 if new_set == ref_set: 663 ↛ 670line 663 didn't jump to line 670, because the condition on line 663 was never false

664 # A change is required in the order of fixtures. Ignore but continue 

665 warn("WARNING: An attempt was made to reorder a super fixture closure with unions. This is not yet " 

666 "supported since the partitions use subsets of the fixtures ; please report it so that we can " 

667 "find a suitable solution for your need.") 

668 return 

669 

670 added = new_set.difference(ref_set) 

671 removed = ref_set.difference(new_set) 

672 if len(added) == 0: 

673 # Pure removal: ok. 

674 self.remove_all(removed) 

675 return 

676 else: 

677 # self.append_all(added) 

678 # Rather be conservative for now 

679 raise NotImplementedError("Adding elements to a super fixture closure with a slice is not currently" 

680 "implemented. Please report this issue to the `pytest-cases` project.") 

681 

682 def __delitem__(self, i): 

683 self.remove(self[i]) 

684 

685 def insert(self, index, fixture_name): 

686 """ 

687 Try to transparently support inserts. Since the underlying structure is a tree, only two cases 

688 are supported: inserting at position 0 and appending at position len(self). 

689 

690 Note that while appending has no restrictions, inserting at position 0 is only allowed for now if the 

691 fixture to insert does not have a union in its associated closure. 

692 

693 :param index: 

694 :param fixture_name: 

695 :return: 

696 """ 

697 if index == 0: 

698 # build the closure associated with this new fixture name 

699 fixture_defs_mgr = FixtureDefsCache(self.tree.fixture_defs_mgr.fm, self.tree.fixture_defs_mgr.node) 

700 closure_tree = FixtureClosureNode(fixture_defs_mgr=fixture_defs_mgr) 

701 closure_tree.build_closure((fixture_name,)) 

702 if closure_tree.has_split(): 702 ↛ 703line 702 didn't jump to line 703, because the condition on line 702 was never true

703 raise NotImplementedError("When fixture unions are present, inserting a fixture in the closure at " 

704 "position 0 is currently only supported if that fixture's closure does not" 

705 "contain a union. Please report this so that we can find a suitable solution" 

706 " for your need.") 

707 else: 

708 # remove those fixture definitions from all nodes in the tree 

709 self.tree.remove_fixtures(closure_tree.fixture_defs.keys()) 

710 

711 # finally prepend the defs at the beginning of the dictionary in the first node 

712 self.tree.fixture_defs = OrderedDict(list(closure_tree.fixture_defs.items()) 

713 + list(self.tree.fixture_defs.items())) 

714 

715 elif index == len(self): 

716 # appending is natively supported in our tree growing method 

717 self.tree.build_closure((fixture_name,)) 

718 else: 

719 raise NotImplementedError("When fixture unions are present, inserting a fixture in the closure at a " 

720 "position different from 0 (prepend) or <end> (append) is non-trivial. Please" 

721 "report this so that we can find a suitable solution for your need.") 

722 

723 # Finally update self.fixture_defs so that the "list" view reflects the changes in self.tree 

724 self._update_fixture_defs() 

725 

726 def append_all(self, fixture_names): 

727 """Append various fixture names to the closure""" 

728 # appending is natively supported in our tree growing method 

729 self.tree.build_closure(tuple(fixture_names)) 

730 

731 # Finally update self.fixture_defs so that the "list" view reflects the changes in self.tree 

732 self._update_fixture_defs() 

733 

734 def remove(self, value): 

735 """ 

736 Try to transparently support removal. Note: since the underlying structure is a tree, 

737 removing "union" fixtures is non-trivial so for now it is not supported. 

738 

739 :param value: 

740 :return: 

741 """ 

742 # remove in the tree 

743 self.tree.remove_fixtures((value,)) 

744 

745 # update fixture defs 

746 self._update_fixture_defs() 

747 

748 def remove_all(self, values): 

749 """Multiple `remove` operations at once.""" 

750 # remove in the tree 

751 self.tree.remove_fixtures(tuple(values)) 

752 

753 # update fixture defs 

754 self._update_fixture_defs() 

755 

756 

757def _getfixtureclosure(fm, fixturenames, parentnode, ignore_args=()): 

758 """ 

759 Replaces pytest's getfixtureclosure method to handle unions. 

760 """ 

761 

762 # (1) first retrieve the normal pytest output for comparison 

763 kwargs = dict() 

764 if PYTEST46_OR_GREATER: 764 ↛ 768line 764 didn't jump to line 768, because the condition on line 764 was never false

765 # new argument "ignore_args" in 4.6+ 

766 kwargs['ignore_args'] = ignore_args 

767 

768 if PYTEST8_OR_GREATER: 768 ↛ 770line 768 didn't jump to line 770, because the condition on line 768 was never true

769 # two outputs and sig change 

770 ref_fixturenames, ref_arg2fixturedefs = fm.__class__.getfixtureclosure(fm, parentnode, fixturenames, **kwargs) 

771 elif PYTEST37_OR_GREATER: 771 ↛ 777line 771 didn't jump to line 777, because the condition on line 771 was never false

772 # three outputs 

773 initial_names, ref_fixturenames, ref_arg2fixturedefs = \ 

774 fm.__class__.getfixtureclosure(fm, fixturenames, parentnode, **kwargs) 

775 else: 

776 # two outputs 

777 ref_fixturenames, ref_arg2fixturedefs = fm.__class__.getfixtureclosure(fm, fixturenames, parentnode) 

778 

779 # (2) now let's do it by ourselves to support fixture unions 

780 _init_fixnames, super_closure, arg2fixturedefs = create_super_closure(fm, parentnode, fixturenames, ignore_args) 

781 

782 # Compare with the previous behaviour TODO remove when in 'production' ? 

783 # NOTE different order happens all the time because of our "prepend" strategy in the closure building 

784 # which makes much more sense/intuition than pytest default 

785 assert set(super_closure) == set(ref_fixturenames) 

786 assert dict(arg2fixturedefs) == ref_arg2fixturedefs 

787 

788 if PYTEST37_OR_GREATER and not PYTEST8_OR_GREATER: 788 ↛ 791line 788 didn't jump to line 791, because the condition on line 788 was never false

789 return _init_fixnames, super_closure, arg2fixturedefs 

790 else: 

791 return super_closure, arg2fixturedefs 

792 

793 

794if PYTEST8_OR_GREATER: 794 ↛ 795line 794 didn't jump to line 795, because the condition on line 794 was never true

795 def getfixtureclosure(fm, parentnode, initialnames, ignore_args): 

796 return _getfixtureclosure(fm, fixturenames=initialnames, parentnode=parentnode, ignore_args=ignore_args) 

797else: 

798 getfixtureclosure = _getfixtureclosure 

799 

800 

801def create_super_closure(fm, 

802 parentnode, 

803 fixturenames, 

804 ignore_args 

805 ): 

806 # type: (...) -> Tuple[List, Union[List, SuperClosure], Mapping] 

807 """ 

808 

809 :param fm: 

810 :param parentnode: 

811 :param fixturenames: 

812 :param ignore_args: 

813 :return: 

814 """ 

815 

816 parentid = parentnode.nodeid 

817 

818 if _DEBUG: 818 ↛ 819line 818 didn't jump to line 819, because the condition on line 818 was never true

819 print("Creating closure for %s:" % parentid) 

820 

821 # -- auto-use fixtures 

822 if hasattr(pytest, "version_tuple") and pytest.version_tuple >= (8, 1): 822 ↛ 823line 822 didn't jump to line 823, because the condition on line 822 was never true

823 _init_fixnames = list(fm._getautousenames(parentnode)) # noqa 

824 else: 

825 _init_fixnames = list(fm._getautousenames(parentid)) # noqa 

826 

827 def _merge(new_items, into_list): 

828 """ Appends items from `new_items` into `into_list`, only if they are not already there. """ 

829 for item in new_items: 

830 if item not in into_list: 830 ↛ 829line 830 didn't jump to line 829, because the condition on line 830 was never false

831 into_list.append(item) 

832 

833 # -- required fixtures/params. 

834 # ********* fix the order of initial fixtures: indeed this order may not be the right one ************ 

835 # this only works when pytest version is > 3.4, otherwise the parent node is a Module 

836 if is_function_node(parentnode): 

837 # grab all the parametrization on that node and fix the order. 

838 # Note: on pytest >= 4 the list of param_names is probably the same than the `ignore_args` input 

839 param_names = get_param_names(parentnode) 

840 

841 sorted_fixturenames = sort_according_to_ref_list(fixturenames, param_names) 

842 # ********** 

843 # merge the fixture names in correct order into the _init_fixnames 

844 _merge(sorted_fixturenames, _init_fixnames) 

845 else: 

846 # we cannot sort yet - merge the fixture names into the _init_fixnames 

847 _merge(fixturenames, _init_fixnames) 

848 

849 # Bugfix GH#330 in progress... 

850 # TODO analyze why in the test "fixture_union_0simplest 

851 # the first node contains second, and the second contains first 

852 # or TODO check the test for get_callspecs, it is maybe simpler 

853 

854 # Finally create the closure 

855 fixture_defs_mgr = FixtureDefsCache(fm, parentnode) 

856 closure_tree = FixtureClosureNode(fixture_defs_mgr=fixture_defs_mgr) 

857 closure_tree.build_closure(_init_fixnames, ignore_args=ignore_args) 

858 super_closure = SuperClosure(closure_tree) 

859 all_fixture_defs = super_closure.get_all_fixture_defs(drop_fake_fixtures=True) 

860 

861 # possibly simplify into a list 

862 if not closure_tree.has_split(): 

863 super_closure = list(super_closure) 

864 

865 if _DEBUG: 865 ↛ 866line 865 didn't jump to line 866, because the condition on line 865 was never true

866 print("Closure for %s completed:" % parentid) 

867 print(closure_tree) 

868 print(super_closure) 

869 

870 return _init_fixnames, super_closure, all_fixture_defs 

871 

872 

873@pytest.hookimpl(tryfirst=True, hookwrapper=True) 

874def pytest_generate_tests(metafunc): 

875 """ 

876 We use this hook to replace the 'parametrize' function of `metafunc` with our own below, before it is called 

877 by pytest. Note we could do it in a static way in pytest_sessionstart or plugin init hook but 

878 that way we can still access the original method using metafunc.__class__.parametrize 

879 """ 

880 # override the parametrize method. 

881 metafunc.parametrize = partial(parametrize, metafunc) 

882 

883 # now let pytest parametrize the call as usual 

884 _ = yield 

885 

886 

887class UnionParamz(namedtuple('UnionParamz', ['union_fixture_name', 'alternative_names', 'ids', 'scope', 'kwargs'])): 

888 """ Represents some parametrization to be applied, for a union fixture """ 

889 

890 __slots__ = () 

891 

892 def __str__(self): 

893 return "[UNION] %s=[%s], ids=%s, scope=%s, kwargs=%s" \ 

894 "" % (self.union_fixture_name, ','.join([str(a) for a in self.alternative_names]), 

895 self.ids, self.scope, self.kwargs) 

896 

897 

898class NormalParamz(namedtuple('NormalParamz', ['argnames', 'argvalues', 'indirect', 'ids', 'scope', 'kwargs'])): 

899 """ Represents some parametrization to be applied """ 

900 

901 __slots__ = () 

902 

903 def __str__(self): 

904 return "[NORMAL] %s=[%s], indirect=%s, ids=%s, scope=%s, kwargs=%s" \ 

905 "" % (self.argnames, self.argvalues, self.indirect, self.ids, self.scope, self.kwargs) 

906 

907 

908def parametrize(metafunc, argnames, argvalues, indirect=False, ids=None, scope=None, **kwargs): 

909 """ 

910 This alternate implementation of metafunc.parametrize creates a list of calls that is not just the cartesian 

911 product of all parameters (like the pytest behaviour). Instead, it offers an alternate list of calls taking into 

912 account all "union" fixtures. 

913 

914 For this, it replaces the `metafunc._calls` attribute with a `CallsReactor` instance, and feeds it with all 

915 parameters and parametrized fixtures independently (not doing any cross-product during this call). The resulting 

916 `CallsReactor` instance is then able to dynamically behave like the correct list of calls, lazy-creating that list 

917 when it is used. 

918 """ 

919 if not isinstance(metafunc.fixturenames, SuperClosure): 

920 # legacy method 

921 metafunc.__class__.parametrize(metafunc, argnames, argvalues, indirect=indirect, ids=ids, scope=scope, **kwargs) 

922 

923 # clean EMPTY_ID : since they are never set by us in a normal parametrize, no need to do this here. 

924 # if PYTEST54_OR_GREATER: 

925 # for callspec in metafunc._calls: 

926 # remove_empty_ids(callspec) 

927 else: 

928 # get or create our special container object 

929 if not isinstance(metafunc._calls, CallsReactor): # noqa 

930 # first call: should be an empty list 

931 if len(metafunc._calls) > 0: # noqa 931 ↛ 935line 931 didn't jump to line 935, because the condition on line 931 was never true

932 # If this happens, it is most probably because another plugin has called 'parametrize' before our hook 

933 # plugin.py/pytest_generate_tests has replaced it with this function. It can be due to a regression 

934 # in pluggy too, see https://github.com/smarie/python-pytest-cases/issues/302 

935 raise ValueError("This should not happen - please file an issue") 

936 metafunc._calls = CallsReactor(metafunc) 

937 calls_reactor = metafunc._calls # noqa 

938 

939 # detect union fixtures 

940 if is_fixture_union_params(argvalues): 

941 if ',' in argnames or not isinstance(argnames, string_types): 941 ↛ 942line 941 didn't jump to line 942, because the condition on line 941 was never true

942 raise ValueError("Union fixtures can not be parametrized") 

943 union_fixture_name = argnames 

944 union_fixture_alternatives = argvalues 

945 if indirect is False or len(kwargs) > 0: 945 ↛ 946line 945 didn't jump to line 946, because the condition on line 945 was never true

946 raise ValueError("indirect cannot be set on a union fixture, as well as unknown kwargs") 

947 

948 # add a union parametrization in the queue (but do not apply it now) 

949 calls_reactor.append(UnionParamz(union_fixture_name, union_fixture_alternatives, ids, scope, kwargs)) 

950 else: 

951 # add a normal parametrization in the queue (but do not apply it now) 

952 calls_reactor.append(NormalParamz(argnames, argvalues, indirect, ids, scope, kwargs)) 

953 

954 

955class CallsReactor(object): 

956 """ 

957 This object replaces the list of calls that was in `metafunc._calls`. 

958 It behaves like a list, but it actually builds that list dynamically based on all parametrizations collected 

959 from the custom `metafunc.parametrize` above. 

960 

961 There are therefore three steps: 

962 

963 - when `metafunc.parametrize` is called, this object gets called on `add_union` or `add_param`. A parametrization 

964 order gets stored in `self._pending` 

965 

966 - when this object is first read as a list, all parametrization orders in `self._pending` are transformed into a 

967 tree in `self._tree`, and `self._pending` is discarded. This is done in `create_tree_from_pending_parametrization`. 

968 

969 - finally, the list is built from the tree using `self._tree.to_call_list()`. This will also be the case in 

970 subsequent usages of this object. 

971 

972 """ 

973 __slots__ = 'metafunc', '_pending', '_call_list' 

974 

975 def __init__(self, metafunc): 

976 self.metafunc = metafunc 

977 self._pending = [] # type: List[Union[UnionParamz, NormalParamz]] 

978 self._call_list = None 

979 

980 # -- methods to provising parametrization orders without executing them -- 

981 

982 def append(self, 

983 parametrization # type: Union[UnionParamz, NormalParamz] 

984 ): 

985 self._pending.append(parametrization) 

986 

987 def print_parametrization_list(self): 

988 """Helper method to print all pending parametrizations in this reactor """ 

989 print("\n".join([str(p) for p in self._pending])) 

990 

991 # -- list facade -- 

992 

993 def __iter__(self): 

994 return iter(self.calls_list) 

995 

996 def __getitem__(self, item): 

997 return self.calls_list[item] 

998 

999 @property 

1000 def calls_list(self): 

1001 """ 

1002 Returns the list of calls. This property relies on self._tree, that is lazily created on first access, 

1003 based on `self.parametrizations`. 

1004 :return: 

1005 """ 

1006 if self._call_list is None: 

1007 # create the definitive tree. 

1008 self.create_call_list_from_pending_parametrizations() 

1009 

1010 return self._call_list 

1011 

1012 # --- tree creation (executed once the first time this object is used as a list) 

1013 

1014 def create_call_list_from_pending_parametrizations(self): 

1015 """ 

1016 Takes all parametrization operations that are pending in `self._pending`, 

1017 and creates a parametrization tree out of them. 

1018 

1019 self._pending is set to None afterwards 

1020 :return: 

1021 """ 

1022 # self is on the _calls field, we'll temporarily remove it and finally set it back at the end of this call 

1023 assert self.metafunc._calls is self 

1024 

1025 # ------ parametrize the calls -------- 

1026 # create a dictionary of pending fixturenames/argnames to parametrize. 

1027 pending_dct = OrderedDict() 

1028 for p in self._pending: 

1029 k = get_param_argnames_as_list(p[0]) 

1030 # remember one of the argnames only so that we are able to detect where in the fixture tree the 

1031 # parametrization applies (it will still be applied for all of its argnames, no worries: see _process_node) 

1032 k = k[0] 

1033 pending_dct[k] = p 

1034 

1035 if _DEBUG: 1035 ↛ 1036line 1035 didn't jump to line 1036, because the condition on line 1035 was never true

1036 print("\n---- pending parametrization ----") 

1037 self.print_parametrization_list() 

1038 print("---------------------------------\n") 

1039 print("Applying all of them in the closure tree nodes:") 

1040 

1041 # grab the "super fixtures closure" created previously (see getfixtureclosure above) 

1042 super_closure = self.metafunc.fixturenames 

1043 assert isinstance(super_closure, SuperClosure) 

1044 

1045 # Apply parametrization for calls 

1046 calls = get_calls_for_tree(self.metafunc, super_closure.tree, pending_dct) 

1047 # Alternative: use the partitions for parametrization. The issue is that this leads to a less intuitive order 

1048 # calls = [] 

1049 # for i in range(super_closure.nb_alternative_closures): 

1050 # calls += get_calls_for_partition(self.metafunc, super_closure, i, pending.copy()) 

1051 

1052 if _DEBUG: 1052 ↛ 1053line 1052 didn't jump to line 1053, because the condition on line 1052 was never true

1053 print("\n".join(["%s[%s]: funcargs=%s, params=%s" % (get_pytest_nodeid(self.metafunc), 

1054 c.id, c.params if PYTEST8_OR_GREATER else c.funcargs, 

1055 c.params) 

1056 for c in calls]) + "\n") 

1057 

1058 # clean EMPTY_ID set by @parametrize when there is at least a MultiParamsAlternative 

1059 # if PYTEST54_OR_GREATER: 

1060 for callspec in calls: 

1061 remove_empty_ids(callspec) 

1062 

1063 # save the list and put back self as the _calls facade 

1064 self._call_list = calls 

1065 self.metafunc._calls = self 

1066 # forget about all parametrizations now - this won't happen again 

1067 self._pending = None 

1068 

1069 

1070def get_calls_for_tree(metafunc, 

1071 fix_closure_tree, # type: FixtureClosureNode 

1072 pending_dct # type: MutableMapping[str, Union[UnionParamz, NormalParamz]] 

1073 ): 

1074 """ 

1075 Creates the list of calls for `metafunc` based on 

1076 :param metafunc: 

1077 :param fix_closure_tree: 

1078 :param pending: 

1079 :return: 

1080 """ 

1081 pending_dct = pending_dct.copy() 

1082 calls, nodes_used_by_calls = _process_node(metafunc, fix_closure_tree, pending_dct, []) 

1083 # for each call in calls, the node in nodes_used_by_calls is the corresponding tree leaf. 

1084 _cleanup_calls_list(metafunc, fix_closure_tree, calls, nodes_used_by_calls, pending_dct) 

1085 return calls 

1086 

1087 

1088def _cleanup_calls_list(metafunc, 

1089 fix_closure_tree, # type: FixtureClosureNode 

1090 calls, # type: List[CallSpec2] 

1091 nodes, # type: List[FixtureClosureNode] 

1092 pending_dct # type: MutableMapping[str, Union[UnionParamz, NormalParamz]] 

1093 ): 

1094 """ 

1095 Cleans the calls list so that all calls contain a value for all parameters. This is basically 

1096 about adding "NOT_USED" parametrization everywhere relevant. 

1097 

1098 :param calls: 

1099 :param nodes: 

1100 :param pending: 

1101 :return: 

1102 """ 

1103 

1104 nb_calls = len(calls) 

1105 if nb_calls != len(nodes): 1105 ↛ 1106line 1105 didn't jump to line 1106, because the condition on line 1105 was never true

1106 raise ValueError("This should not happen !") 

1107 

1108 # create ref lists of fixtures per scope 

1109 _not_always_used_func_scoped = [] 

1110 # _not_always_used_other_scoped = [] 

1111 for fixture_name in fix_closure_tree.get_not_always_used(): 

1112 try: 

1113 fixdef = metafunc._arg2fixturedefs[fixture_name] # noqa 

1114 except KeyError: 

1115 continue # dont raise any error here and let pytest say "not found" later 

1116 else: 

1117 if has_function_scope(fixdef[-1]): 

1118 _not_always_used_func_scoped.append(fixture_name) 

1119 # else: 

1120 # _not_always_used_other_scoped.append(fixture_name) 

1121 

1122 for i in range(nb_calls): 

1123 c, n = calls[i], nodes[i] 

1124 

1125 # A/ set to "not used" all parametrized fixtures that were not used in some branches 

1126 for fixture, p_to_apply in pending_dct.items(): 

1127 if not in_callspec_explicit_args(c, fixture): 

1128 # parametrize with a single "not used" value and discard the id 

1129 if isinstance(p_to_apply, UnionParamz): 

1130 c_with_dummy = _parametrize_calls(metafunc, [c], p_to_apply.union_fixture_name, [NOT_USED], 

1131 indirect=True, discard_id=True, scope=p_to_apply.scope, 

1132 **p_to_apply.kwargs) 

1133 else: 

1134 _nb_argnames = len(get_param_argnames_as_list(p_to_apply.argnames)) 

1135 if _nb_argnames > 1: 1135 ↛ 1136line 1135 didn't jump to line 1136, because the condition on line 1135 was never true

1136 _vals = [(NOT_USED,) * _nb_argnames] 

1137 else: 

1138 _vals = [NOT_USED] 

1139 c_with_dummy = _parametrize_calls(metafunc, [c], p_to_apply.argnames, _vals, 

1140 indirect=p_to_apply.indirect, discard_id=True, 

1141 scope=p_to_apply.scope, **p_to_apply.kwargs) 

1142 assert len(c_with_dummy) == 1 

1143 calls[i] = c_with_dummy[0] 

1144 c = calls[i] 

1145 

1146 # B/ function-scoped non-parametrized fixtures also need to be explicitly deactivated in the callspecs 

1147 # where they are not required, otherwise they will be setup/teardown. 

1148 # 

1149 # For this we use a dirty hack: we add a parameter with they name in the callspec, it seems to be propagated 

1150 # in the `request`. TODO is there a better way? 

1151 for fixture_name in _not_always_used_func_scoped: 

1152 if not in_callspec_explicit_args(c, fixture_name): 

1153 if not n.requires(fixture_name): 

1154 # explicitly add it as discarded by creating a parameter value for it. 

1155 c.params[fixture_name] = NOT_USED 

1156 c.indices[fixture_name] = 1 

1157 set_callspec_arg_scope_to_function(c, fixture_name) 

1158 else: 

1159 # explicitly add it as active 

1160 c.params[fixture_name] = USED 

1161 c.indices[fixture_name] = 0 

1162 set_callspec_arg_scope_to_function(c, fixture_name) 

1163 

1164 # finally, if there are some session or module-scoped fixtures that 

1165 # are used in *none* of the calls, they could be deactivated too 

1166 # (see https://github.com/smarie/python-pytest-cases/issues/137) 

1167 # 

1168 # for fixture_name in _not_always_used_other_scoped: 

1169 # _scopenum = metafunc._arg2fixturedefs[fixture_name][-1].scopenum 

1170 # 

1171 # # check if there is at least one call that actually uses the fixture and is not skipped... 

1172 # # this seems a bit "too much" !! > WON'T FIX 

1173 # used = False 

1174 # for i in range(nb_calls): 

1175 # c, n = calls[i], nodes[i] 

1176 # if fixture_name in c.params or fixture_name in c.funcargs or n.requires(fixture_name): 

1177 # if not is_skipped_or_failed(c): # HOW can we implement this based on call (and not item) ??? 

1178 # used = True 

1179 # break 

1180 # 

1181 # if not used: 

1182 # # explicitly add it as discarded everywhere by creating a parameter value for it. 

1183 # for i in range(nb_calls): 

1184 # c = calls[i] 

1185 # c.params[fixture_name] = NOT_USED 

1186 # c.indices[fixture_name] = 0 

1187 # c._arg2scopenum[fixture_name] = _scopenum # noqa 

1188 

1189 

1190# def get_calls_for_partition(metafunc, super_closure, p_idx, pending): 

1191# """ 

1192# Parametrizes all fixtures that are actually used in this partition 

1193# Cleans the calls list so that all calls contain a value for all parameters. This is basically 

1194# about adding "NOT_USED" parametrization everywhere relevant. 

1195# 

1196# :return: a list of CallSpec2 

1197# """ 

1198# calls = [] 

1199# 

1200# # A/ parametrize all fixtures that are actually used in this partition 

1201# for fixture_name in super_closure.partitions[p_idx]: 

1202# try: 

1203# # pop it from pending - do not rely the order in pending but rather the order in the closure 

1204# p_to_apply = pending.pop(fixture_name) 

1205# except KeyError: 

1206# # not a parametrized fixture 

1207# continue 

1208# else: 

1209# if isinstance(p_to_apply, UnionParamz): 

1210# # ******** Union parametrization ********** 

1211# # selected_ids, selected_alternative = super_closure.get_parameter_to_apply(p_to_apply, p_idx) 

1212# num, selected_filter = super_closure.filters[p_idx][p_to_apply.union_fixture_name] 

1213# # in order to get the *actual* id to use (with all pytest subtleties in case of two identical ids 

1214# # appearing in the list), we create a fake calls list 

1215# fake_calls = _parametrize_calls(metafunc, [], p_to_apply.union_fixture_name, 

1216# p_to_apply.alternative_names, ids=p_to_apply.ids, 

1217# scope=p_to_apply.scope, indirect=True, **p_to_apply.kwargs) 

1218# selected_id = fake_calls[num].id 

1219# selected_alternative = p_to_apply.alternative_names[num] 

1220# # assert selected_alternative.alternative_name == selected_filter 

1221# 

1222# if _DEBUG: 

1223# print("[Partition %s] Applying parametrization for UNION fixture %r=%r" 

1224# "" % (p_idx, p_to_apply.union_fixture_name, selected_alternative)) 

1225# 

1226# # always use 'indirect' since that's a fixture. 

1227# calls = _parametrize_calls(metafunc, calls, p_to_apply.union_fixture_name, 

1228# [selected_alternative], ids=[selected_id], scope=p_to_apply.scope, 

1229# indirect=True, **p_to_apply.kwargs) 

1230# 

1231# elif isinstance(p_to_apply, NormalParamz): 

1232# # ******** Normal parametrization ********** 

1233# if _DEBUG: 

1234# print("[Partition %s] Applying parametrization for NORMAL %s" 

1235# "" % (p_idx, p_to_apply.argnames)) 

1236# 

1237# calls = _parametrize_calls(metafunc, calls, p_to_apply.argnames, p_to_apply.argvalues, 

1238# indirect=p_to_apply.indirect, ids=p_to_apply.ids, 

1239# scope=p_to_apply.scope, **p_to_apply.kwargs) 

1240# else: 

1241# raise TypeError("Invalid parametrization type: %s" % p_to_apply.__class__) 

1242# 

1243# # Cleaning 

1244# for i in range(len(calls)): 

1245# c = calls[i] 

1246# 

1247# # B/ set to "not used" all parametrized fixtures that were not used in some branches 

1248# for fixture_name, p_to_apply in pending.items(): 

1249# if fixture_name not in c.params and fixture_name not in c.funcargs: 

1250# # parametrize with a single "not used" value and discard the id 

1251# if isinstance(p_to_apply, UnionParamz): 

1252# c_with_dummy = _parametrize_calls(metafunc, [c], p_to_apply.union_fixture_name, [NOT_USED], 

1253# indirect=True, discard_id=True, scope=p_to_apply.scope, 

1254# **p_to_apply.kwargs) 

1255# else: 

1256# _nb_argnames = len(get_param_argnames_as_list(p_to_apply.argnames)) 

1257# if _nb_argnames > 1: 

1258# _vals = [(NOT_USED,) * _nb_argnames] 

1259# else: 

1260# _vals = [NOT_USED] 

1261# c_with_dummy = _parametrize_calls(metafunc, [c], p_to_apply.argnames, _vals, 

1262# indirect=p_to_apply.indirect, discard_id=True, 

1263# scope=p_to_apply.scope, **p_to_apply.kwargs) 

1264# assert len(c_with_dummy) == 1 

1265# calls[i] = c_with_dummy[0] 

1266# c = calls[i] 

1267# 

1268# # C/ some non-parametrized fixtures may also need to be explicitly deactivated in some callspecs 

1269# # otherwise they will be setup/teardown. 

1270# # 

1271# # For this we use a dirty hack: we add a parameter with they name in the callspec, it seems to be propagated 

1272# # in the `request`. TODO is there a better way? 

1273# for fixture_name in super_closure.get_not_always_used(): 

1274# try: 

1275# fixdef = metafunc._arg2fixturedefs[fixture_name] # noqa 

1276# except KeyError: 

1277# continue # dont raise any error here and instead let pytest say "not found" 

1278# 

1279# if fixture_name not in c.params and fixture_name not in c.funcargs: 

1280# if not super_closure.requires(fixture_name, p_idx): 

1281# # explicitly add it as discarded by creating a parameter value for it. 

1282# c.params[fixture_name] = NOT_USED 

1283# c.indices[fixture_name] = 1 

1284# c._arg2scopenum[fixture_name] = get_pytest_scopenum(fixdef[-1].scope) # noqa 

1285# else: 

1286# # explicitly add it as active by creating a parameter value for it. 

1287# c.params[fixture_name] = 'used' 

1288# c.indices[fixture_name] = 0 

1289# c._arg2scopenum[fixture_name] = get_pytest_scopenum(fixdef[-1].scope) # noqa 

1290# 

1291# return calls 

1292 

1293 

1294@property 

1295def id(self): 

1296 # legacy _CallSpec2 id was filtering empty strings, we'll put it back on the class below 

1297 # https://github.com/pytest-dev/pytest/blob/5.3.4/src/_pytest/python.py#L861 

1298 return "-".join(map(str, filter(None, self._idlist))) 

1299 

1300 

1301def _parametrize_calls(metafunc, init_calls, argnames, argvalues, discard_id=False, indirect=False, ids=None, 

1302 scope=None, **kwargs): 

1303 """Parametrizes the initial `calls` with the provided information and returns the resulting new calls""" 

1304 

1305 # make a backup so that we can restore the metafunc at the end 

1306 bak = metafunc._calls # noqa 

1307 

1308 # place the initial calls on the metafunc 

1309 metafunc._calls = init_calls if init_calls is not None else [] 

1310 

1311 # parametrize the metafunc. Since we replaced the `parametrize` method on `metafunc` we have to call super 

1312 metafunc.__class__.parametrize(metafunc, argnames, argvalues, indirect=indirect, ids=ids, scope=scope, **kwargs) 

1313 

1314 # extract the result 

1315 new_calls = metafunc._calls # noqa 

1316 

1317 # If the user wants to discard the newly created id, remove the last id in all these callspecs in this node 

1318 if discard_id: 

1319 for callspec in new_calls: 

1320 callspec._idlist.pop(-1) # noqa 

1321 

1322 # restore the metafunc and return the new calls 

1323 metafunc._calls = bak 

1324 return new_calls 

1325 

1326 

1327def _process_node(metafunc, 

1328 current_node, # type: FixtureClosureNode 

1329 pending, # type: MutableMapping[str, Union[UnionParamz, NormalParamz]] 

1330 calls # type: List[CallSpec2] 

1331 ): 

1332 """ 

1333 Routine to apply all the parametrization tasks in `pending` that are relevant to `current_node`, 

1334 to `calls` (a list of pytest CallSpec2). 

1335 

1336 It first applies all parametrization that correspond to current node (normal parameters), 

1337 then applies the "split" parametrization if needed and recurses into each tree branch. 

1338 

1339 It returns a tuple containing a list of calls and a list of same length containing which leaf node each one 

1340 corresponds to. 

1341 

1342 :param metafunc: 

1343 :param current_node: the closure tree node we're focusing on 

1344 :param pending: a list of parametrization orders to apply 

1345 :param calls: 

1346 :return: a tuple (calls, nodes) of two lists of the same length. So that for each CallSpec calls[i], you can see 

1347 the corresponding leaf node in nodes[i] 

1348 """ 

1349 

1350 # (1) first apply all **non-split** fixtures at this node = NORMAL PARAMETERS 

1351 # in the order defined in the closure tree, do not trust the order of the received parametrize (`pending`) 

1352 fixtures_at_this_node = [f for f in current_node.fixture_defs.keys() 

1353 if f is not current_node.split_fixture_name] 

1354 for fixturename in fixtures_at_this_node: 

1355 try: 

1356 # pop the corresponding parametrization from pending - do not trust the order 

1357 p_to_apply = pending.pop(fixturename) 

1358 except KeyError: 

1359 # fixturename is not a parametrized fixture, nothing to do 

1360 continue 

1361 else: 

1362 if isinstance(p_to_apply, UnionParamz): 1362 ↛ 1363line 1362 didn't jump to line 1363, because the condition on line 1362 was never true

1363 raise ValueError("This should not happen! Only Normal parameters should be in fixtures_at_this_node") 

1364 elif isinstance(p_to_apply, NormalParamz): 1364 ↛ 1374line 1364 didn't jump to line 1374, because the condition on line 1364 was never false

1365 # ******** Normal parametrization ********** 

1366 if _DEBUG: 1366 ↛ 1367line 1366 didn't jump to line 1367, because the condition on line 1366 was never true

1367 print("[Node %s] Applying parametrization for NORMAL %s" 

1368 "" % (current_node.to_str(with_children=False), p_to_apply.argnames)) 

1369 

1370 calls = _parametrize_calls(metafunc, calls, p_to_apply.argnames, p_to_apply.argvalues, 

1371 indirect=p_to_apply.indirect, ids=p_to_apply.ids, 

1372 scope=p_to_apply.scope, **p_to_apply.kwargs) 

1373 else: 

1374 raise TypeError("Invalid parametrization type: %s" % p_to_apply.__class__) 

1375 

1376 # (2) then is there a "union" = a split between two sub-branches in the tree ? 

1377 if not current_node.has_split(): 

1378 # No split = tree leaf: return 

1379 nodes = [current_node] * len(calls) 

1380 return calls, nodes 

1381 else: 

1382 # There is a **split** : apply its parametrization (a UNION parameter) 

1383 try: 

1384 # pop the corresponding parametrization from pending - do not trust the order 

1385 p_to_apply = pending.pop(current_node.split_fixture_name) 

1386 except KeyError: 

1387 raise ValueError("This should not happen! fixture union parametrization missing, but this is a split node") 

1388 else: 

1389 if isinstance(p_to_apply, NormalParamz): 1389 ↛ 1390line 1389 didn't jump to line 1390, because the condition on line 1389 was never true

1390 raise ValueError("This should not happen! Split nodes correspond to Union parameters, not Normal ones.") 

1391 elif isinstance(p_to_apply, UnionParamz): 1391 ↛ exitline 1391 didn't return from function '_process_node', because the condition on line 1391 was never false

1392 # ******** Union parametrization ********** 

1393 if _DEBUG: 1393 ↛ 1394line 1393 didn't jump to line 1394, because the condition on line 1393 was never true

1394 print("[Node %s] Applying parametrization for UNION %s" 

1395 "" % (current_node.to_str(with_children=False), p_to_apply.union_fixture_name)) 

1396 

1397 # always use 'indirect' since that's a fixture. 

1398 calls = _parametrize_calls(metafunc, calls, p_to_apply.union_fixture_name, 

1399 p_to_apply.alternative_names, indirect=True, 

1400 ids=p_to_apply.ids, scope=p_to_apply.scope, **p_to_apply.kwargs) 

1401 

1402 # now move to the children 

1403 nodes_children = [None] * len(calls) 

1404 for i in range(len(calls)): 

1405 active_alternative = calls[i].params[p_to_apply.union_fixture_name] 

1406 child_indices = [_i for _i, x in enumerate(current_node.split_fixture_alternatives) 

1407 if x == active_alternative.alternative_name] 

1408 # only use the first matching child, since the subtrees are identical. 

1409 child_node = current_node.children[child_indices[0]] 

1410 child_pending = pending.copy() 

1411 

1412 # place the children parameter in the first position if it is in the list 

1413 # not needed anymore - already automatic 

1414 # try: 

1415 # child_pending.move_to_end(child_alternative, last=False) 

1416 # except KeyError: 

1417 # # not in the list: the child alternative is a non-parametrized fixture 

1418 # pass 

1419 

1420 calls[i], nodes_children[i] = _process_node(metafunc, child_node, child_pending, [calls[i]]) 

1421 

1422 # finally flatten the list if needed 

1423 calls = flatten_list(calls) 

1424 nodes_children = flatten_list(nodes_children) 

1425 return calls, nodes_children 

1426 

1427 

1428# def _make_unique(lst): 

1429# _set = set() 

1430# 

1431# def _first_time_met(v): 

1432# if v not in _set: 

1433# _set.add(v) 

1434# return True 

1435# else: 

1436# return False 

1437# 

1438# return [v for v in lst if _first_time_met(v)] 

1439 

1440 

1441def flatten_list(lst): 

1442 return [v for nested_list in lst for v in nested_list] 

1443 

1444 

1445def sort_according_to_ref_list(fixturenames, param_names): 

1446 """ 

1447 Sorts items in the first list, according to their position in the second. 

1448 Items that are not in the second list stay in the same position, the others are just swapped. 

1449 A new list is returned. 

1450 

1451 :param fixturenames: 

1452 :param param_names: 

1453 :return: 

1454 """ 

1455 cur_indices = [] 

1456 for pname in param_names: 

1457 try: 

1458 cur_indices.append(fixturenames.index(pname)) 

1459 except (ValueError, IndexError): 

1460 # can happen in case of indirect parametrization: a parameter is not in the fixture name. 

1461 # TODO we should maybe rather add the pname to fixturenames in this case ? 

1462 pass 

1463 target_indices = sorted(cur_indices) 

1464 sorted_fixturenames = list(fixturenames) 

1465 for old_i, new_i in zip(cur_indices, target_indices): 

1466 sorted_fixturenames[new_i] = fixturenames[old_i] 

1467 return sorted_fixturenames 

1468 

1469 

1470_OPTION_NAME = 'with_reorder' 

1471_SKIP = 'skip' 

1472_NORMAL = 'normal' 

1473_OPTIONS = { 

1474 _NORMAL: """(default) the usual reordering done by pytest to optimize setup/teardown of session- / module- 

1475/ class- fixtures, as well as all the modifications made by other plugins (e.g. pytest-reorder)""", 

1476 _SKIP: """skips *all* reordering, even the one done by pytest itself or installed plugins 

1477(e.g. pytest-reorder)""" 

1478} 

1479 

1480 

1481# @hookspec(historic=True) 

1482def pytest_addoption(parser): 

1483 group = parser.getgroup('pytest-cases ordering', 'pytest-cases reordering options', after='general') 

1484 help_str = """String specifying one of the reordering alternatives to use. Should be one of : 

1485 - %s""" % ("\n - ".join(["%s: %s" % (k, v) for k, v in _OPTIONS.items()])) 

1486 group.addoption( 

1487 '--%s' % _OPTION_NAME.replace('_', '-'), type=str, default='normal', help=help_str 

1488 ) 

1489 

1490 

1491# will be loaded when the pytest_configure hook below is called 

1492PYTEST_CONFIG = None # type: Optional[Config] 

1493 

1494 

1495def pytest_load_initial_conftests(early_config): 

1496 # store the received config object for future use; see #165 #166 #196 

1497 global PYTEST_CONFIG 

1498 PYTEST_CONFIG = early_config 

1499 

1500 

1501# @hookspec(historic=True) 

1502def pytest_configure(config): 

1503 # validate the config 

1504 allowed_values = ('normal', 'skip') 

1505 reordering_choice = config.getoption(_OPTION_NAME) 

1506 if reordering_choice not in allowed_values: 1506 ↛ 1507line 1506 didn't jump to line 1507, because the condition on line 1506 was never true

1507 raise ValueError("[pytest-cases] Wrong --%s option: %s. Allowed values: %s" 

1508 "" % (_OPTION_NAME, reordering_choice, allowed_values)) 

1509 

1510 

1511@pytest.hookimpl(tryfirst=True, hookwrapper=True) 

1512def pytest_collection_modifyitems(session, config, items): # noqa 

1513 """ 

1514 An alternative to the `reorder_items` function in fixtures.py 

1515 (https://github.com/pytest-dev/pytest/blob/master/src/_pytest/fixtures.py#L209) 

1516 

1517 We basically set back the previous order once the pytest ordering routine has completed. 

1518 

1519 TODO we should set back an optimal ordering, but current PR https://github.com/pytest-dev/pytest/pull/3551 

1520 will probably not be relevant to handle our "union" fixtures > need to integrate the NOT_USED markers in the method 

1521 

1522 :param session: 

1523 :param config: 

1524 :param items: 

1525 :return: 

1526 """ 

1527 ordering_choice = config.getoption(_OPTION_NAME) 

1528 

1529 if ordering_choice == _SKIP: 

1530 # remember initial order 

1531 initial_order = copy(items) 

1532 yield 

1533 # put back the initial order but keep the filter 

1534 to_return = [None] * len(items) 

1535 i = 0 

1536 for item in initial_order: 

1537 if item in items: 

1538 to_return[i] = item 

1539 i += 1 

1540 assert i == len(items) 

1541 items[:] = to_return 

1542 

1543 else: 

1544 # do nothing 

1545 yield 

1546 

1547 

1548@pytest.fixture 

1549def current_cases(request): 

1550 """ 

1551 A fixture containing `get_current_cases(request)` 

1552 

1553 This is a dictionary containing all case parameters for the currently active `pytest` item. 

1554 For each test function argument parametrized using a `@parametrize_with_case(<argname>, ...)` this dictionary 

1555 contains an entry `{<argname>: (case_id, case_function, case_params)}`. If several argnames are parametrized this 

1556 way, a dedicated entry will be present for each argname. The tuple is a `namedtuple` containing 

1557 

1558 - `id` a string containing the actual case id constructed by `@parametrize_with_cases`. 

1559 - `function` the original case function. 

1560 - `params` a dictionary, containing the parameters of the case, if itself is parametrized. Note that if the 

1561 case is parametrized with `@parametrize_with_cases`, the associated parameter value in the dictionary will also be 

1562 `(actual_id, case_function, case_params)`. 

1563 

1564 If a fixture parametrized with cases is active, the dictionary will contain an entry `{<fixturename>: <dct>}` where 

1565 `<dct>` is a dictionary `{<argname>: (case_id, case_function, case_params)}`. 

1566 

1567 To get more information on a case function, you can use `get_case_marks(f)`, `get_case_tags(f)`. 

1568 You can also use `matches_tag_query` to check if a case function matches some expectations either concerning its id 

1569 or its tags. See https://smarie.github.io/python-pytest-cases/#filters-and-tags 

1570 """ 

1571 return get_current_cases(request)