Coverage for src/pytest_cases/plugin.py: 85%
524 statements
« prev ^ index » next coverage.py v7.2.7, created at 2024-09-26 21:52 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2024-09-26 21:52 +0000
1# Authors: Sylvain MARIE <sylvain.marie@se.com>
2# + All contributors to <https://github.com/smarie/python-pytest-cases>
3#
4# License: 3-clause BSD, <https://github.com/smarie/python-pytest-cases/blob/master/LICENSE>
5from collections import OrderedDict, namedtuple
6from copy import copy
7from functools import partial
8from warnings import warn
10try:
11 from collections.abc import MutableSequence
12except: # noqa
13 from collections import MutableSequence
15import pytest
17try: # python 3.3+
18 from inspect import signature
19except ImportError:
20 from funcsigs import signature # noqa
22try: # python 3.3+ type hints
23 from typing import List, Tuple, Union, Iterable, MutableMapping, Mapping, Optional # noqa
24 from _pytest.python import CallSpec2
25 from _pytest.config import Config
26except ImportError:
27 pass
29from .common_mini_six import string_types
30from .common_pytest_lazy_values import get_lazy_args
31from .common_pytest_marks import PYTEST35_OR_GREATER, PYTEST46_OR_GREATER, PYTEST37_OR_GREATER, PYTEST7_OR_GREATER, PYTEST8_OR_GREATER
32from .common_pytest import get_pytest_nodeid, get_pytest_function_scopeval, is_function_node, get_param_names, \
33 get_param_argnames_as_list, has_function_scope, set_callspec_arg_scope_to_function, in_callspec_explicit_args
35from .fixture_core1_unions import NOT_USED, USED, is_fixture_union_params, UnionFixtureAlternative
37# if PYTEST54_OR_GREATER:
38# # we will need to clean the empty ids explicitly in the plugin :'(
39from .fixture_parametrize_plus import remove_empty_ids
41from .case_parametrizer_new import get_current_cases
44_DEBUG = False
45"""Note: this is a manual flag to turn when developing (do not forget to also call pytest with -s)"""
48# @pytest.hookimpl(hookwrapper=True, tryfirst=True)
49# def pytest_pycollect_makeitem(collector, name, obj):
50# # custom collection of additional things - we could use it one day for Cases ?
51# # see also https://hackebrot.github.io/pytest-tricks/customize_class_collection/
52# outcome = yield
53# res = outcome.get_result()
54# if res is not None:
55# return
56# # nothing was collected elsewhere, let's do it here
57# if safe_isclass(obj):
58# if collector.istestclass(obj, name):
59# outcome.force_result(Class(name, parent=collector))
60# elif collector.istestfunction(obj, name):
61# ...
64@pytest.hookimpl(tryfirst=True, hookwrapper=True)
65def pytest_runtest_setup(item):
66 """ Resolve all `lazy_value` in the dictionary of function args """
68 yield # first let all other hooks run, they will do the setup etc.
70 # now item.funcargs exists so we can handle it
71 if hasattr(item, "funcargs"): 71 ↛ exitline 71 didn't return from function 'pytest_runtest_setup', because the condition on line 71 was never false
72 item.funcargs = {argname: get_lazy_args(argvalue, item)
73 for argname, argvalue in item.funcargs.items()}
76# @pytest.hookimpl(tryfirst=True, hookwrapper=True)
77def pytest_collection(session):
78 """ HACK: override the fixture manager's `getfixtureclosure` method to replace it with ours """
80 # Note for reference: another way to access the fm is `metafunc.config.pluginmanager.get_plugin('funcmanage')`
81 session._fixturemanager.getfixtureclosure = partial(getfixtureclosure, session._fixturemanager) # noqa
84class FixtureDefsCache(object):
85 """
86 A 'cache' for fixture definitions obtained from the FixtureManager `fm`, for test node `nodeid`
87 """
88 __slots__ = 'fm', 'node', 'cached_fix_defs'
90 def __init__(self, fm, node):
91 self.fm = fm
92 self.node = node
93 self.cached_fix_defs = dict()
95 def get_fixture_defs(self, fixname):
96 try:
97 # try to retrieve it from cache
98 fixdefs = self.cached_fix_defs[fixname]
99 except KeyError:
100 # otherwise get it and store for next time
101 if hasattr(pytest, "version_tuple") and pytest.version_tuple >= (8, 1): 101 ↛ 102line 101 didn't jump to line 102, because the condition on line 101 was never true
102 fixdefs = self.fm.getfixturedefs(fixname, self.node)
103 else:
104 fixdefs = self.fm.getfixturedefs(fixname, self.node.nodeid)
105 self.cached_fix_defs[fixname] = fixdefs
107 return fixdefs
110class FixtureClosureNode(object):
111 """
112 A node in a fixture closure Tree.
114 - its `fixture_defs` is a {name: def} ordered dict containing all fixtures AND args that are required at this node
115 (*before* a union is required). Note that some of them have def=None when the fixture manager has no definition
116 for them (same behaviour than in pytest). `get_all_fixture_names` and `get_all_fixture_defs` helper functions
117 allow to either return the full ordered list (equivalent to pytest `fixture_names`) or the dictionary of non-none
118 definitions (equivalent to pytest `arg2fixturedefs`)
120 - if a union appears at this node, `split_fixture_name` is set to the name of the union fixture, and `children`
121 contains an ordered dict of {split_fixture_alternative: node}
123 """
124 __slots__ = 'parent', 'fixture_defs_mgr', \
125 'fixture_defs', 'split_fixture_name', 'split_fixture_alternatives', 'children'
127 def __init__(self,
128 fixture_defs_mgr=None, # type: FixtureDefsCache
129 parent_node=None # type: FixtureClosureNode
130 ):
131 if fixture_defs_mgr is None:
132 if parent_node is None: 132 ↛ 133line 132 didn't jump to line 133, because the condition on line 132 was never true
133 raise ValueError("root node should have a fixture defs manager")
134 fixture_defs_mgr = parent_node.fixture_defs_mgr
135 else:
136 assert isinstance(fixture_defs_mgr, FixtureDefsCache)
138 self.fixture_defs_mgr = fixture_defs_mgr
139 self.parent = parent_node
141 # these will be set after closure has been built
142 self.fixture_defs = None # type: OrderedDict
143 self.split_fixture_name = None # type: str
144 self.split_fixture_alternatives = []
145 # we do not use a dict any more as several children can use the same union value (doubled unions)
146 self.children = [] # type: List[FixtureClosureNode]
148 # ------ tree ------------------
150 def get_leaves(self):
151 if self.has_split():
152 return [n for c in self.children for n in c.get_leaves()]
153 else:
154 return [self]
156 # ------ str / repr ---------------
158 def to_str(self, indent_nb=0, with_children=True):
159 """ a string representation, either with all the subtree (default) or without (with_children=False) """
161 indent = " " * indent_nb
163 if not self.is_closure_built(): 163 ↛ 164line 163 didn't jump to line 164, because the condition on line 163 was never true
164 str_repr = "<pending, incomplete>"
165 else:
166 str_repr = "%s(%s)" % (indent, ",".join([("%s" % f) for f in self.fixture_defs.keys()]))
168 if self.has_split() and with_children:
169 children_str_prefix = "\n%s - " % indent
170 children_str = children_str_prefix + children_str_prefix.join([c.to_str(indent_nb=indent_nb + 1)
171 for c in self.children])
172 str_repr = str_repr + " split: " + self.split_fixture_name + children_str
174 return str_repr
176 def __repr__(self):
177 return self.to_str()
179 # ---- getters to read the "super" closure (used in SuperClosure)
181 def get_all_fixture_names(self, try_to_sort_by_scope=True):
182 """ Return a list containing all unique fixture names used by this tree"""
183 if not try_to_sort_by_scope: 183 ↛ 184line 183 didn't jump to line 184, because the condition on line 183 was never true
184 return [k for k, _ in self.gen_all_fixture_defs(drop_fake_fixtures=False)]
185 else:
186 return list(self.get_all_fixture_defs(drop_fake_fixtures=False, try_to_sort=True))
188 def get_all_fixture_defs(self, drop_fake_fixtures=True, try_to_sort=True):
189 """ Return a dict containing all fixture definitions for fixtures used in this tree"""
190 # get all pairs
191 items = self.gen_all_fixture_defs(drop_fake_fixtures=drop_fake_fixtures)
193 # sort by scope as in pytest fixture closure creator (pytest did not do it in early versions, align with this)
194 if try_to_sort: 194 ↛ 212line 194 didn't jump to line 212, because the condition on line 194 was never false
195 if PYTEST7_OR_GREATER: 195 ↛ 204line 195 didn't jump to line 204, because the condition on line 195 was never false
196 # Scope is an enum, values are in reversed order, and the field is _scope
197 f_scope = get_pytest_function_scopeval()
199 def sort_by_scope(kv_pair):
200 fixture_name, fixture_defs = kv_pair
201 return fixture_defs[-1]._scope if fixture_defs is not None else f_scope
202 items = sorted(list(items), key=sort_by_scope, reverse=True)
204 elif PYTEST35_OR_GREATER:
205 # scopes is a list, values are indices in the list, and the field is scopenum
206 f_scope = get_pytest_function_scopeval()
207 def sort_by_scope(kv_pair): # noqa
208 fixture_name, fixture_defs = kv_pair
209 return fixture_defs[-1].scopenum if fixture_defs is not None else f_scope
210 items = sorted(list(items), key=sort_by_scope)
212 return OrderedDict(items)
214 def gen_all_fixture_defs(self, drop_fake_fixtures=True):
215 """
216 Generate all pairs of (fixture name, fixture def or none) used in the tree in top to bottom order
217 Note that this method could be generalized to also yield the parent defs, so as to be used to replace
218 the engine in `self.gather_all_required`. But this is micro-optimization, really.
219 Note: `gather_all_required` was not built to be concerned with ordering because it is only used as a set.
220 """
222 # fixtures required at this node
223 for k, v in self.fixture_defs.items():
224 if not drop_fake_fixtures or v is not None: 224 ↛ 223line 224 didn't jump to line 223, because the condition on line 224 was never false
225 yield k, v
227 # split fixture: not needed since it is the last entry in self.fixture_defs
229 # fixtures required by children if any
230 for c in self.children:
231 for k, v in c.gen_all_fixture_defs(drop_fake_fixtures=drop_fake_fixtures):
232 yield k, v
234 # ---- utils to build the closure
236 def build_closure(self,
237 initial_fixture_names, # type: Iterable[str]
238 ignore_args=()
239 ):
240 """
241 Updates this Node with the fixture names provided as argument.
242 Fixture names and definitions will be stored in self.fixture_defs.
244 If some fixtures are Union fixtures, this node will become a "split" node
245 and have children. If new fixtures are added to the node after that,
246 they will be added to the child nodes rather than self.
248 :param initial_fixture_names:
249 :param ignore_args: arguments to keep in the names but not to put in the fixture defs, because they correspond
250 to "direct parametrization"
251 :return:
252 """
253 self._build_closure(self.fixture_defs_mgr, initial_fixture_names, ignore_args=ignore_args)
255 def is_closure_built(self):
256 return self.fixture_defs is not None
258 def already_knows_fixture(self, fixture_name):
259 """ Return True if this fixture is known by this node or one of its parents """
260 if fixture_name in self.fixture_defs:
261 return True
262 elif self.parent is None:
263 return False
264 else:
265 return self.parent.already_knows_fixture(fixture_name)
267 def _build_closure(self,
268 fixture_defs_mgr, # type: FixtureDefsCache
269 initial_fixture_names, # type: Iterable[str]
270 ignore_args
271 ):
272 """
274 :param fixture_defs_mgr:
275 :param initial_fixture_names:
276 :param ignore_args: arguments to keep in the names but not to put in the fixture defs
277 :return: nothing (the input arg2fixturedefs is modified)
278 """
280 # Grab all dependencies of all fixtures present at this node and add them to either this or to nodes below.
282 # -- first switch this object from 'pending' to 'under construction' if needed
283 # (indeed we now authorize and use the possibility to call this twice. see split() )
284 if self.fixture_defs is None:
285 self.fixture_defs = OrderedDict()
287 # -- then for all pending, add them with their dependencies
288 pending_fixture_names = list(initial_fixture_names)
289 while len(pending_fixture_names) > 0:
290 fixname = pending_fixture_names.pop(0)
292 # if the fixture is already known in this node or above, do not care
293 if self.already_knows_fixture(fixname):
294 continue
296 # new ignore_args option in pytest 4.6+. Not really a fixture but a test function parameter, it seems.
297 if fixname in ignore_args:
298 self.add_required_fixture(fixname, None)
299 continue
301 # else grab the fixture definition(s) for this fixture name for this test node id
302 fixturedefs = fixture_defs_mgr.get_fixture_defs(fixname)
303 if not fixturedefs:
304 # fixture without definition: add it. This can happen with e.g. "requests", etc.
305 self.add_required_fixture(fixname, None)
306 continue
307 else:
308 # the actual definition is the last one
309 _fixdef = fixturedefs[-1]
310 _params = _fixdef.params
312 if _params is not None and is_fixture_union_params(_params):
313 # create an UNION fixture
315 # transform the _params into a list of names
316 alternative_f_names = UnionFixtureAlternative.to_list_of_fixture_names(_params)
318 # TO DO if only one name, simplify ? >> No, we leave such "optimization" to the end user
320 # if there are direct dependencies that are not the union members, add them to pending
321 non_member_dependencies = [f for f in _fixdef.argnames if f not in alternative_f_names]
322 # currently we only have 'requests' in this list but future impl of fixture_union may act otherwise
323 pending_fixture_names += non_member_dependencies
325 # propagate WITH the pending
326 self.split_and_build(fixture_defs_mgr, fixname, fixturedefs, alternative_f_names,
327 pending_fixture_names, ignore_args=ignore_args)
329 # empty the pending because all of them have been propagated on all children with their dependencies
330 pending_fixture_names = []
331 continue
333 else:
334 # normal fixture
335 self.add_required_fixture(fixname, fixturedefs)
337 # add all dependencies in the to do list
338 dependencies = _fixdef.argnames
339 # - append: was pytest default
340 # pending_fixture_names += dependencies
341 # - prepend: makes much more sense
342 pending_fixture_names = list(dependencies) + pending_fixture_names
343 continue
345 # ------ tools to add new fixture names during closure construction
347 # def prepend_fixture_without_dependencies(self, fixname):
348 # """"""
349 # fixturedefs = self.fixture_defs_mgr.get_fixture_defs(fixname)
350 # if not fixturedefs:
351 # # fixture without definition: add it. This can happen with e.g. "requests", etc.
352 # self.fixture_defs.insert((fixname, None))
353 # else:
354 # # the actual definition is the last one
355 # _fixdef = fixturedefs[-1]
356 # _params = _fixdef.params
357 #
358 # if _params is not None and is_fixture_union_params(_params):
359 # # union fixture
360 # raise ValueError("It is not possible to add a union fixture after the initial closure has been built")
361 # else:
362 # # normal fixture
363 # self.add_required_fixture(fixname, fixturedefs)
364 #
365 # # add all dependencies in the to do list
366 # dependencies = _fixdef.argnames
368 # def add_fixture_without_dependencies(self, fixname):
369 # """Used for later addition, once the closure has been built"""
370 # fixturedefs = self.fixture_defs_mgr.get_fixture_defs(fixname)
371 # if not fixturedefs:
372 # # fixture without definition: add it. This can happen with e.g. "requests", etc.
373 # self.add_required_fixture(fixname, None)
374 # else:
375 # # the actual definition is the last one
376 # _fixdef = fixturedefs[-1]
377 # _params = _fixdef.params
378 #
379 # if _params is not None and is_fixture_union_params(_params):
380 # # union fixture
381 # raise ValueError("It is not possible to add a union fixture after the initial closure has been built")
382 # else:
383 # # normal fixture
384 # self.add_required_fixture(fixname, fixturedefs)
386 def remove_fixtures(self, fixture_names_to_remove):
387 """Remove some fixture names from all nodes in this subtree. These fixtures should not be split fixtures"""
388 _to_remove_in_children = []
389 for f in fixture_names_to_remove:
390 if self.split_fixture_name == f:
391 raise NotImplementedError("It is not currently possible to remove a split fixture name from a closure "
392 "with splits")
393 try:
394 del self.fixture_defs[f]
395 except KeyError:
396 _to_remove_in_children.append(f)
398 # propagate to children if any
399 if len(_to_remove_in_children) > 0:
400 for c in self.children:
401 c.remove_fixtures(_to_remove_in_children)
403 def add_required_fixture(self, new_fixture_name, new_fixture_defs):
404 """Add some required fixture names to all leaves under this node"""
405 if self.already_knows_fixture(new_fixture_name): 405 ↛ 406line 405 didn't jump to line 406, because the condition on line 405 was never true
406 return
407 elif not self.has_split():
408 # add_required_fixture locally
409 if new_fixture_name not in self.fixture_defs: 409 ↛ exitline 409 didn't return from function 'add_required_fixture', because the condition on line 409 was never false
410 self.fixture_defs[new_fixture_name] = new_fixture_defs
411 else:
412 # add_required_fixture in each child
413 for c in self.children:
414 c.add_required_fixture(new_fixture_name, new_fixture_defs)
416 def split_and_build(self,
417 fixture_defs_mgr, # type: FixtureDefsCache
418 split_fixture_name, # type: str
419 split_fixture_defs, # type: Tuple[FixtureDefinition] # noqa
420 alternative_fixture_names, # type: List[str]
421 pending_fixtures_list, #
422 ignore_args
423 ):
424 """ Declares that this node contains a union with alternatives (child nodes=subtrees) """
426 if self.has_split(): 426 ↛ 427line 426 didn't jump to line 427, because the condition on line 426 was never true
427 raise ValueError("This should not happen anymore")
428 # # propagate the split on the children: split each of them
429 # for n in self.children:
430 # n.split_and_build(fm, nodeid, split_fixture_name, split_fixture_defs, alternative_fixture_names)
431 else:
432 # add the split (union) name to known fixtures
433 self.add_required_fixture(split_fixture_name, split_fixture_defs)
435 # remember it
436 self.split_fixture_name = split_fixture_name
437 self.split_fixture_alternatives = alternative_fixture_names
439 # create the child nodes
440 for f in alternative_fixture_names:
441 # create the child node
442 new_c = FixtureClosureNode(parent_node=self)
443 self.children.append(new_c)
445 # set the discarded fixture names
446 # new_c.split_fixture_discarded_names = [g for g in alternative_fixture_names if g != f]
448 # perform the propagation:
449 # (a) first propagate all child's dependencies, (b) then the ones required by parent
450 # we need to do both at the same time in order to propagate the "pending for child" on all subbranches
451 pending_for_child = [f] + pending_fixtures_list
452 new_c._build_closure(fixture_defs_mgr, pending_for_child, ignore_args=ignore_args)
454 def has_split(self):
455 return self.split_fixture_name is not None
457 # ----------- for calls parametrization
459 def get_not_always_used(self):
460 """Return the list of fixtures used by this subtree, that are used in *some* leaves only, not all"""
461 results_list = []
463 # initial list is made of fixtures that are in the children
464 initial_list = self.gather_all_required(include_parents=False)
466 for c in self.get_leaves():
467 j = 0
468 for _ in range(len(initial_list)):
469 # get next element in the list (but the list may reduce in size during the loop)
470 fixture_name = initial_list[j]
471 if fixture_name not in c.gather_all_required():
472 # Remove element from the list. Therefore, do not increment j
473 del initial_list[j]
474 results_list.append(fixture_name)
475 else:
476 # Do not remove from the list: increment j
477 j += 1
479 return results_list
481 def gather_all_required(self, include_children=True, include_parents=True):
482 """
483 Return a list of all fixtures required by the subtree containing this node
484 and all of its parents (if include_parents=True) and all of its children (if include_children=True)
486 See also `self.gen_all_fixture_defs`, that could be generalized to tackle this use case too
487 (micro-optimization, not really urgent)
488 """
489 # first the fixtures required by this node
490 required = list(self.fixture_defs.keys())
492 # then the ones required by the parents
493 if include_parents and self.parent is not None:
494 required = required + self.parent.gather_all_required(include_children=False)
496 # then the ones from all the children
497 if include_children:
498 for child in self.children:
499 required = required + child.gather_all_required(include_parents=False)
501 return required
503 def requires(self, fixturename):
504 """ Return True if the fixture with this name is required by the subtree at this node """
505 return fixturename in self.gather_all_required()
507 # ------ tools to see the tree as a list of alternatives (used in SuperClosure)
509 def get_alternatives(self):
510 """
511 Returns the tree "flattened" as a list of alternatives (one per leaf).
512 Each entry in the list consists of:
514 - an ordered dictionary {union_fixture_name: (idx, value)} representing the active union filters in this
515 alternative
516 - a list of fixture names effectively used in this alternative
518 :return: a list of alternatives
519 """
520 alternatives = self._get_alternatives()
521 for i, a in enumerate(alternatives):
522 # replace the first entry in the tuple with a reversed order one
523 alternatives[i] = (OrderedDict(reversed(list(a[0].items()))), a[1])
524 return alternatives
526 def _get_alternatives(self):
527 if self.has_split():
528 alternatives_list = []
529 for c_idx, (c_split_alternative, c_node) in enumerate(zip(self.split_fixture_alternatives, self.children)):
530 # for all alternatives in this subtree
531 for f_dct, n_lst in c_node._get_alternatives():
532 # - filter
533 _f_dct = f_dct.copy()
534 _f_dct[self.split_fixture_name] = (c_idx, c_split_alternative)
536 # - unique fixtures used
537 _n_lst = list(self.fixture_defs) + [_i for _i in n_lst if _i not in self.fixture_defs]
539 alternatives_list.append((_f_dct, _n_lst))
541 return alternatives_list
542 else:
543 # return a single partition containing no filter and all fixture names
544 return [(OrderedDict(), self.get_all_fixture_names())]
547class SuperClosure(MutableSequence):
548 """
549 A "super closure" is a closure made of several closures, each induced by a fixture union parameter value.
550 The number of alternative closures is `self.nb_alternative_closures`
552 This object behaves like a list (a mutable sequence), so that we can pass it to pytest in place of the list of
553 fixture names that is returned in `getfixtureclosure`.
555 In this implementation, it is backed by a fixture closure tree, that we have to preserve in order to get
556 parametrization right. In another branch of this project ('super_closure' branch) we tried to forget the tree
557 and only keep the partitions, but parametrization order was not as intuitive for the end user as all unions
558 appeared as parametrized first (since they induced the partitions).
559 """
560 __slots__ = 'tree', 'all_fixture_defs'
562 def __init__(self,
563 root_node # type: FixtureClosureNode
564 ):
565 # if we wish to drop the tree - but we do not anymore to get a better paramz order
566 # filters_list, partitions_list = root_node._get_alternatives()
568 # save the fixture closure tree root
569 self.tree = root_node
570 # retrieve/sort fixture defs for quicker access
571 self._update_fixture_defs()
573 def _update_fixture_defs(self):
574 # get a list of all fixture defs, for quicker access (and sorted)
575 # sort by scope as in pytest fixture closure creator, if scope information is available
576 all_fixture_defs = self.tree.get_all_fixture_defs(drop_fake_fixtures=False, try_to_sort=True)
578 # # also sort all partitions (note that we cannot rely on the order in all_fixture_defs when scopes are same!)
579 # if Version(pytest.__version__) >= Version('3.5.0'):
580 # f_scope = get_pytest_function_scopeval()
581 # for p in self.partitions:
582 # def sort_by_scope2(fixture_name): # noqa
583 # fixture_defs = all_fixture_defs[fixture_name]
584 # return fixture_defs[-1].scopenum if fixture_defs is not None else f_scope
585 # p.sort(key=sort_by_scope2)
587 self.all_fixture_defs = all_fixture_defs
589 # --- visualization tools ----
591 @property
592 def nb_alternative_closures(self):
593 """ Return the number of alternative closures induced by fixture unions """
594 filters, partitions = self.tree.get_alternatives()
595 return len(partitions)
597 def __repr__(self):
598 """ Return a synthetic view, and a detailed tree view, of this closure """
599 alternatives = self.tree.get_alternatives()
600 nb_alternative_closures = len(alternatives)
601 return "SuperClosure with %s alternative closures:\n" % nb_alternative_closures \
602 + "\n".join(" - %s (filters: %s)" % (p, ", ".join("%s=%s[%s]=%s" % (k, k, v[0], v[1])
603 for k, v in f.items()))
604 for f, p in alternatives) \
605 + "\nThe 'super closure list' is %s\n\nThe fixture tree is :\n%s\n" % (list(self), self.tree)
607 def get_all_fixture_defs(self, drop_fake_fixtures=True):
608 """
609 Return a dictionary of all fixture defs used in this super closure
611 note: this is equivalent to
612 self.tree.get_all_fixture_defs(drop_fake_fixtures=drop_fake_fixtures, try_to_sort=True)
613 """
614 if drop_fake_fixtures: 614 ↛ 619line 614 didn't jump to line 619, because the condition on line 614 was never false
615 # remove the "fixtures" that are actually test function parameter args
616 return {k: v for k, v in self.all_fixture_defs.items() if v is not None}
617 else:
618 # all fixtures AND pseudo-fixtures (test function parameters)
619 return self.all_fixture_defs
621 # ---- list (MutableSequence) facade: behaves like a list of fixture names ------
623 def __len__(self):
624 return len(self.all_fixture_defs)
626 def __getitem__(self, i):
627 # return the key (fixture name) associated with the i-th pair
628 # try:
629 # return next(islice(self.all_fixture_defs.keys(), i, i+1))
630 # except StopIteration:
631 # raise IndexError(i)
632 return list(self.all_fixture_defs.keys())[i]
634 def __setitem__(self, i, o):
635 # try:
636 # # pytest performs a full replacement using [:] so we handle at least this case
637 # full_replace = i == slice(None, None, None)
638 # except: # noqa
639 # full_replace = False
641 # Get the existing value(s) that we wish to replace
642 ref = list(self)[i]
644 if o == ref:
645 # no change at all: of course we accept.
646 return
648 if not isinstance(i, slice): 648 ↛ 657line 648 didn't jump to line 657, because the condition on line 648 was never true
649 # In-place change of a single item: let's be conservative and reject for now
650 # if i == 0:
651 # self.remove(ref)
652 # self.insert(0, o)
653 # elif i == len(self) - 1:
654 # self.remove(ref)
655 # self.append(o)
656 # else:
657 raise NotImplementedError("Replacing an element in a super fixture closure is not currently implemented. "
658 "Please report this issue to the `pytest-cases` project.")
659 else:
660 # Replacement of multiple items at once: support reordering (ignored) and removal (actually done)
661 new_set = set(o)
662 ref_set = set(ref)
663 if new_set == ref_set: 663 ↛ 670line 663 didn't jump to line 670, because the condition on line 663 was never false
664 # A change is required in the order of fixtures. Ignore but continue
665 warn("WARNING: An attempt was made to reorder a super fixture closure with unions. This is not yet "
666 "supported since the partitions use subsets of the fixtures ; please report it so that we can "
667 "find a suitable solution for your need.")
668 return
670 added = new_set.difference(ref_set)
671 removed = ref_set.difference(new_set)
672 if len(added) == 0:
673 # Pure removal: ok.
674 self.remove_all(removed)
675 return
676 else:
677 # self.append_all(added)
678 # Rather be conservative for now
679 raise NotImplementedError("Adding elements to a super fixture closure with a slice is not currently"
680 "implemented. Please report this issue to the `pytest-cases` project.")
682 def __delitem__(self, i):
683 self.remove(self[i])
685 def insert(self, index, fixture_name):
686 """
687 Try to transparently support inserts. Since the underlying structure is a tree, only two cases
688 are supported: inserting at position 0 and appending at position len(self).
690 Note that while appending has no restrictions, inserting at position 0 is only allowed for now if the
691 fixture to insert does not have a union in its associated closure.
693 :param index:
694 :param fixture_name:
695 :return:
696 """
697 if index == 0:
698 # build the closure associated with this new fixture name
699 fixture_defs_mgr = FixtureDefsCache(self.tree.fixture_defs_mgr.fm, self.tree.fixture_defs_mgr.node)
700 closure_tree = FixtureClosureNode(fixture_defs_mgr=fixture_defs_mgr)
701 closure_tree.build_closure((fixture_name,))
702 if closure_tree.has_split(): 702 ↛ 703line 702 didn't jump to line 703, because the condition on line 702 was never true
703 raise NotImplementedError("When fixture unions are present, inserting a fixture in the closure at "
704 "position 0 is currently only supported if that fixture's closure does not"
705 "contain a union. Please report this so that we can find a suitable solution"
706 " for your need.")
707 else:
708 # remove those fixture definitions from all nodes in the tree
709 self.tree.remove_fixtures(closure_tree.fixture_defs.keys())
711 # finally prepend the defs at the beginning of the dictionary in the first node
712 self.tree.fixture_defs = OrderedDict(list(closure_tree.fixture_defs.items())
713 + list(self.tree.fixture_defs.items()))
715 elif index == len(self):
716 # appending is natively supported in our tree growing method
717 self.tree.build_closure((fixture_name,))
718 else:
719 raise NotImplementedError("When fixture unions are present, inserting a fixture in the closure at a "
720 "position different from 0 (prepend) or <end> (append) is non-trivial. Please"
721 "report this so that we can find a suitable solution for your need.")
723 # Finally update self.fixture_defs so that the "list" view reflects the changes in self.tree
724 self._update_fixture_defs()
726 def append_all(self, fixture_names):
727 """Append various fixture names to the closure"""
728 # appending is natively supported in our tree growing method
729 self.tree.build_closure(tuple(fixture_names))
731 # Finally update self.fixture_defs so that the "list" view reflects the changes in self.tree
732 self._update_fixture_defs()
734 def remove(self, value):
735 """
736 Try to transparently support removal. Note: since the underlying structure is a tree,
737 removing "union" fixtures is non-trivial so for now it is not supported.
739 :param value:
740 :return:
741 """
742 # remove in the tree
743 self.tree.remove_fixtures((value,))
745 # update fixture defs
746 self._update_fixture_defs()
748 def remove_all(self, values):
749 """Multiple `remove` operations at once."""
750 # remove in the tree
751 self.tree.remove_fixtures(tuple(values))
753 # update fixture defs
754 self._update_fixture_defs()
757def _getfixtureclosure(fm, fixturenames, parentnode, ignore_args=()):
758 """
759 Replaces pytest's getfixtureclosure method to handle unions.
760 """
762 # (1) first retrieve the normal pytest output for comparison
763 kwargs = dict()
764 if PYTEST46_OR_GREATER: 764 ↛ 768line 764 didn't jump to line 768, because the condition on line 764 was never false
765 # new argument "ignore_args" in 4.6+
766 kwargs['ignore_args'] = ignore_args
768 if PYTEST8_OR_GREATER: 768 ↛ 770line 768 didn't jump to line 770, because the condition on line 768 was never true
769 # two outputs and sig change
770 ref_fixturenames, ref_arg2fixturedefs = fm.__class__.getfixtureclosure(fm, parentnode, fixturenames, **kwargs)
771 elif PYTEST37_OR_GREATER: 771 ↛ 777line 771 didn't jump to line 777, because the condition on line 771 was never false
772 # three outputs
773 initial_names, ref_fixturenames, ref_arg2fixturedefs = \
774 fm.__class__.getfixtureclosure(fm, fixturenames, parentnode, **kwargs)
775 else:
776 # two outputs
777 ref_fixturenames, ref_arg2fixturedefs = fm.__class__.getfixtureclosure(fm, fixturenames, parentnode)
779 # (2) now let's do it by ourselves to support fixture unions
780 _init_fixnames, super_closure, arg2fixturedefs = create_super_closure(fm, parentnode, fixturenames, ignore_args)
782 # Compare with the previous behaviour TODO remove when in 'production' ?
783 # NOTE different order happens all the time because of our "prepend" strategy in the closure building
784 # which makes much more sense/intuition than pytest default
785 assert set(super_closure) == set(ref_fixturenames)
786 assert dict(arg2fixturedefs) == ref_arg2fixturedefs
788 if PYTEST37_OR_GREATER and not PYTEST8_OR_GREATER: 788 ↛ 791line 788 didn't jump to line 791, because the condition on line 788 was never false
789 return _init_fixnames, super_closure, arg2fixturedefs
790 else:
791 return super_closure, arg2fixturedefs
794if PYTEST8_OR_GREATER: 794 ↛ 795line 794 didn't jump to line 795, because the condition on line 794 was never true
795 def getfixtureclosure(fm, parentnode, initialnames, ignore_args):
796 return _getfixtureclosure(fm, fixturenames=initialnames, parentnode=parentnode, ignore_args=ignore_args)
797else:
798 getfixtureclosure = _getfixtureclosure
801def create_super_closure(fm,
802 parentnode,
803 fixturenames,
804 ignore_args
805 ):
806 # type: (...) -> Tuple[List, Union[List, SuperClosure], Mapping]
807 """
809 :param fm:
810 :param parentnode:
811 :param fixturenames:
812 :param ignore_args:
813 :return:
814 """
816 parentid = parentnode.nodeid
818 if _DEBUG: 818 ↛ 819line 818 didn't jump to line 819, because the condition on line 818 was never true
819 print("Creating closure for %s:" % parentid)
821 # -- auto-use fixtures
822 if hasattr(pytest, "version_tuple") and pytest.version_tuple >= (8, 1): 822 ↛ 823line 822 didn't jump to line 823, because the condition on line 822 was never true
823 _init_fixnames = list(fm._getautousenames(parentnode)) # noqa
824 else:
825 _init_fixnames = list(fm._getautousenames(parentid)) # noqa
827 def _merge(new_items, into_list):
828 """ Appends items from `new_items` into `into_list`, only if they are not already there. """
829 for item in new_items:
830 if item not in into_list: 830 ↛ 829line 830 didn't jump to line 829, because the condition on line 830 was never false
831 into_list.append(item)
833 # -- required fixtures/params.
834 # ********* fix the order of initial fixtures: indeed this order may not be the right one ************
835 # this only works when pytest version is > 3.4, otherwise the parent node is a Module
836 if is_function_node(parentnode):
837 # grab all the parametrization on that node and fix the order.
838 # Note: on pytest >= 4 the list of param_names is probably the same than the `ignore_args` input
839 param_names = get_param_names(parentnode)
841 sorted_fixturenames = sort_according_to_ref_list(fixturenames, param_names)
842 # **********
843 # merge the fixture names in correct order into the _init_fixnames
844 _merge(sorted_fixturenames, _init_fixnames)
845 else:
846 # we cannot sort yet - merge the fixture names into the _init_fixnames
847 _merge(fixturenames, _init_fixnames)
849 # Bugfix GH#330 in progress...
850 # TODO analyze why in the test "fixture_union_0simplest
851 # the first node contains second, and the second contains first
852 # or TODO check the test for get_callspecs, it is maybe simpler
854 # Finally create the closure
855 fixture_defs_mgr = FixtureDefsCache(fm, parentnode)
856 closure_tree = FixtureClosureNode(fixture_defs_mgr=fixture_defs_mgr)
857 closure_tree.build_closure(_init_fixnames, ignore_args=ignore_args)
858 super_closure = SuperClosure(closure_tree)
859 all_fixture_defs = super_closure.get_all_fixture_defs(drop_fake_fixtures=True)
861 # possibly simplify into a list
862 if not closure_tree.has_split():
863 super_closure = list(super_closure)
865 if _DEBUG: 865 ↛ 866line 865 didn't jump to line 866, because the condition on line 865 was never true
866 print("Closure for %s completed:" % parentid)
867 print(closure_tree)
868 print(super_closure)
870 return _init_fixnames, super_closure, all_fixture_defs
873@pytest.hookimpl(tryfirst=True, hookwrapper=True)
874def pytest_generate_tests(metafunc):
875 """
876 We use this hook to replace the 'parametrize' function of `metafunc` with our own below, before it is called
877 by pytest. Note we could do it in a static way in pytest_sessionstart or plugin init hook but
878 that way we can still access the original method using metafunc.__class__.parametrize
879 """
880 # override the parametrize method.
881 metafunc.parametrize = partial(parametrize, metafunc)
883 # now let pytest parametrize the call as usual
884 _ = yield
887class UnionParamz(namedtuple('UnionParamz', ['union_fixture_name', 'alternative_names', 'ids', 'scope', 'kwargs'])):
888 """ Represents some parametrization to be applied, for a union fixture """
890 __slots__ = ()
892 def __str__(self):
893 return "[UNION] %s=[%s], ids=%s, scope=%s, kwargs=%s" \
894 "" % (self.union_fixture_name, ','.join([str(a) for a in self.alternative_names]),
895 self.ids, self.scope, self.kwargs)
898class NormalParamz(namedtuple('NormalParamz', ['argnames', 'argvalues', 'indirect', 'ids', 'scope', 'kwargs'])):
899 """ Represents some parametrization to be applied """
901 __slots__ = ()
903 def __str__(self):
904 return "[NORMAL] %s=[%s], indirect=%s, ids=%s, scope=%s, kwargs=%s" \
905 "" % (self.argnames, self.argvalues, self.indirect, self.ids, self.scope, self.kwargs)
908def parametrize(metafunc, argnames, argvalues, indirect=False, ids=None, scope=None, **kwargs):
909 """
910 This alternate implementation of metafunc.parametrize creates a list of calls that is not just the cartesian
911 product of all parameters (like the pytest behaviour). Instead, it offers an alternate list of calls taking into
912 account all "union" fixtures.
914 For this, it replaces the `metafunc._calls` attribute with a `CallsReactor` instance, and feeds it with all
915 parameters and parametrized fixtures independently (not doing any cross-product during this call). The resulting
916 `CallsReactor` instance is then able to dynamically behave like the correct list of calls, lazy-creating that list
917 when it is used.
918 """
919 if not isinstance(metafunc.fixturenames, SuperClosure):
920 # legacy method
921 metafunc.__class__.parametrize(metafunc, argnames, argvalues, indirect=indirect, ids=ids, scope=scope, **kwargs)
923 # clean EMPTY_ID : since they are never set by us in a normal parametrize, no need to do this here.
924 # if PYTEST54_OR_GREATER:
925 # for callspec in metafunc._calls:
926 # remove_empty_ids(callspec)
927 else:
928 # get or create our special container object
929 if not isinstance(metafunc._calls, CallsReactor): # noqa
930 # first call: should be an empty list
931 if len(metafunc._calls) > 0: # noqa 931 ↛ 935line 931 didn't jump to line 935, because the condition on line 931 was never true
932 # If this happens, it is most probably because another plugin has called 'parametrize' before our hook
933 # plugin.py/pytest_generate_tests has replaced it with this function. It can be due to a regression
934 # in pluggy too, see https://github.com/smarie/python-pytest-cases/issues/302
935 raise ValueError("This should not happen - please file an issue")
936 metafunc._calls = CallsReactor(metafunc)
937 calls_reactor = metafunc._calls # noqa
939 # detect union fixtures
940 if is_fixture_union_params(argvalues):
941 if ',' in argnames or not isinstance(argnames, string_types): 941 ↛ 942line 941 didn't jump to line 942, because the condition on line 941 was never true
942 raise ValueError("Union fixtures can not be parametrized")
943 union_fixture_name = argnames
944 union_fixture_alternatives = argvalues
945 if indirect is False or len(kwargs) > 0: 945 ↛ 946line 945 didn't jump to line 946, because the condition on line 945 was never true
946 raise ValueError("indirect cannot be set on a union fixture, as well as unknown kwargs")
948 # add a union parametrization in the queue (but do not apply it now)
949 calls_reactor.append(UnionParamz(union_fixture_name, union_fixture_alternatives, ids, scope, kwargs))
950 else:
951 # add a normal parametrization in the queue (but do not apply it now)
952 calls_reactor.append(NormalParamz(argnames, argvalues, indirect, ids, scope, kwargs))
955class CallsReactor(object):
956 """
957 This object replaces the list of calls that was in `metafunc._calls`.
958 It behaves like a list, but it actually builds that list dynamically based on all parametrizations collected
959 from the custom `metafunc.parametrize` above.
961 There are therefore three steps:
963 - when `metafunc.parametrize` is called, this object gets called on `add_union` or `add_param`. A parametrization
964 order gets stored in `self._pending`
966 - when this object is first read as a list, all parametrization orders in `self._pending` are transformed into a
967 tree in `self._tree`, and `self._pending` is discarded. This is done in `create_tree_from_pending_parametrization`.
969 - finally, the list is built from the tree using `self._tree.to_call_list()`. This will also be the case in
970 subsequent usages of this object.
972 """
973 __slots__ = 'metafunc', '_pending', '_call_list'
975 def __init__(self, metafunc):
976 self.metafunc = metafunc
977 self._pending = [] # type: List[Union[UnionParamz, NormalParamz]]
978 self._call_list = None
980 # -- methods to provising parametrization orders without executing them --
982 def append(self,
983 parametrization # type: Union[UnionParamz, NormalParamz]
984 ):
985 self._pending.append(parametrization)
987 def print_parametrization_list(self):
988 """Helper method to print all pending parametrizations in this reactor """
989 print("\n".join([str(p) for p in self._pending]))
991 # -- list facade --
993 def __iter__(self):
994 return iter(self.calls_list)
996 def __getitem__(self, item):
997 return self.calls_list[item]
999 @property
1000 def calls_list(self):
1001 """
1002 Returns the list of calls. This property relies on self._tree, that is lazily created on first access,
1003 based on `self.parametrizations`.
1004 :return:
1005 """
1006 if self._call_list is None:
1007 # create the definitive tree.
1008 self.create_call_list_from_pending_parametrizations()
1010 return self._call_list
1012 # --- tree creation (executed once the first time this object is used as a list)
1014 def create_call_list_from_pending_parametrizations(self):
1015 """
1016 Takes all parametrization operations that are pending in `self._pending`,
1017 and creates a parametrization tree out of them.
1019 self._pending is set to None afterwards
1020 :return:
1021 """
1022 # self is on the _calls field, we'll temporarily remove it and finally set it back at the end of this call
1023 assert self.metafunc._calls is self
1025 # ------ parametrize the calls --------
1026 # create a dictionary of pending fixturenames/argnames to parametrize.
1027 pending_dct = OrderedDict()
1028 for p in self._pending:
1029 k = get_param_argnames_as_list(p[0])
1030 # remember one of the argnames only so that we are able to detect where in the fixture tree the
1031 # parametrization applies (it will still be applied for all of its argnames, no worries: see _process_node)
1032 k = k[0]
1033 pending_dct[k] = p
1035 if _DEBUG: 1035 ↛ 1036line 1035 didn't jump to line 1036, because the condition on line 1035 was never true
1036 print("\n---- pending parametrization ----")
1037 self.print_parametrization_list()
1038 print("---------------------------------\n")
1039 print("Applying all of them in the closure tree nodes:")
1041 # grab the "super fixtures closure" created previously (see getfixtureclosure above)
1042 super_closure = self.metafunc.fixturenames
1043 assert isinstance(super_closure, SuperClosure)
1045 # Apply parametrization for calls
1046 calls = get_calls_for_tree(self.metafunc, super_closure.tree, pending_dct)
1047 # Alternative: use the partitions for parametrization. The issue is that this leads to a less intuitive order
1048 # calls = []
1049 # for i in range(super_closure.nb_alternative_closures):
1050 # calls += get_calls_for_partition(self.metafunc, super_closure, i, pending.copy())
1052 if _DEBUG: 1052 ↛ 1053line 1052 didn't jump to line 1053, because the condition on line 1052 was never true
1053 print("\n".join(["%s[%s]: funcargs=%s, params=%s" % (get_pytest_nodeid(self.metafunc),
1054 c.id, c.params if PYTEST8_OR_GREATER else c.funcargs,
1055 c.params)
1056 for c in calls]) + "\n")
1058 # clean EMPTY_ID set by @parametrize when there is at least a MultiParamsAlternative
1059 # if PYTEST54_OR_GREATER:
1060 for callspec in calls:
1061 remove_empty_ids(callspec)
1063 # save the list and put back self as the _calls facade
1064 self._call_list = calls
1065 self.metafunc._calls = self
1066 # forget about all parametrizations now - this won't happen again
1067 self._pending = None
1070def get_calls_for_tree(metafunc,
1071 fix_closure_tree, # type: FixtureClosureNode
1072 pending_dct # type: MutableMapping[str, Union[UnionParamz, NormalParamz]]
1073 ):
1074 """
1075 Creates the list of calls for `metafunc` based on
1076 :param metafunc:
1077 :param fix_closure_tree:
1078 :param pending:
1079 :return:
1080 """
1081 pending_dct = pending_dct.copy()
1082 calls, nodes_used_by_calls = _process_node(metafunc, fix_closure_tree, pending_dct, [])
1083 # for each call in calls, the node in nodes_used_by_calls is the corresponding tree leaf.
1084 _cleanup_calls_list(metafunc, fix_closure_tree, calls, nodes_used_by_calls, pending_dct)
1085 return calls
1088def _cleanup_calls_list(metafunc,
1089 fix_closure_tree, # type: FixtureClosureNode
1090 calls, # type: List[CallSpec2]
1091 nodes, # type: List[FixtureClosureNode]
1092 pending_dct # type: MutableMapping[str, Union[UnionParamz, NormalParamz]]
1093 ):
1094 """
1095 Cleans the calls list so that all calls contain a value for all parameters. This is basically
1096 about adding "NOT_USED" parametrization everywhere relevant.
1098 :param calls:
1099 :param nodes:
1100 :param pending:
1101 :return:
1102 """
1104 nb_calls = len(calls)
1105 if nb_calls != len(nodes): 1105 ↛ 1106line 1105 didn't jump to line 1106, because the condition on line 1105 was never true
1106 raise ValueError("This should not happen !")
1108 # create ref lists of fixtures per scope
1109 _not_always_used_func_scoped = []
1110 # _not_always_used_other_scoped = []
1111 for fixture_name in fix_closure_tree.get_not_always_used():
1112 try:
1113 fixdef = metafunc._arg2fixturedefs[fixture_name] # noqa
1114 except KeyError:
1115 continue # dont raise any error here and let pytest say "not found" later
1116 else:
1117 if has_function_scope(fixdef[-1]):
1118 _not_always_used_func_scoped.append(fixture_name)
1119 # else:
1120 # _not_always_used_other_scoped.append(fixture_name)
1122 for i in range(nb_calls):
1123 c, n = calls[i], nodes[i]
1125 # A/ set to "not used" all parametrized fixtures that were not used in some branches
1126 for fixture, p_to_apply in pending_dct.items():
1127 if not in_callspec_explicit_args(c, fixture):
1128 # parametrize with a single "not used" value and discard the id
1129 if isinstance(p_to_apply, UnionParamz):
1130 c_with_dummy = _parametrize_calls(metafunc, [c], p_to_apply.union_fixture_name, [NOT_USED],
1131 indirect=True, discard_id=True, scope=p_to_apply.scope,
1132 **p_to_apply.kwargs)
1133 else:
1134 _nb_argnames = len(get_param_argnames_as_list(p_to_apply.argnames))
1135 if _nb_argnames > 1: 1135 ↛ 1136line 1135 didn't jump to line 1136, because the condition on line 1135 was never true
1136 _vals = [(NOT_USED,) * _nb_argnames]
1137 else:
1138 _vals = [NOT_USED]
1139 c_with_dummy = _parametrize_calls(metafunc, [c], p_to_apply.argnames, _vals,
1140 indirect=p_to_apply.indirect, discard_id=True,
1141 scope=p_to_apply.scope, **p_to_apply.kwargs)
1142 assert len(c_with_dummy) == 1
1143 calls[i] = c_with_dummy[0]
1144 c = calls[i]
1146 # B/ function-scoped non-parametrized fixtures also need to be explicitly deactivated in the callspecs
1147 # where they are not required, otherwise they will be setup/teardown.
1148 #
1149 # For this we use a dirty hack: we add a parameter with they name in the callspec, it seems to be propagated
1150 # in the `request`. TODO is there a better way?
1151 for fixture_name in _not_always_used_func_scoped:
1152 if not in_callspec_explicit_args(c, fixture_name):
1153 if not n.requires(fixture_name):
1154 # explicitly add it as discarded by creating a parameter value for it.
1155 c.params[fixture_name] = NOT_USED
1156 c.indices[fixture_name] = 1
1157 set_callspec_arg_scope_to_function(c, fixture_name)
1158 else:
1159 # explicitly add it as active
1160 c.params[fixture_name] = USED
1161 c.indices[fixture_name] = 0
1162 set_callspec_arg_scope_to_function(c, fixture_name)
1164 # finally, if there are some session or module-scoped fixtures that
1165 # are used in *none* of the calls, they could be deactivated too
1166 # (see https://github.com/smarie/python-pytest-cases/issues/137)
1167 #
1168 # for fixture_name in _not_always_used_other_scoped:
1169 # _scopenum = metafunc._arg2fixturedefs[fixture_name][-1].scopenum
1170 #
1171 # # check if there is at least one call that actually uses the fixture and is not skipped...
1172 # # this seems a bit "too much" !! > WON'T FIX
1173 # used = False
1174 # for i in range(nb_calls):
1175 # c, n = calls[i], nodes[i]
1176 # if fixture_name in c.params or fixture_name in c.funcargs or n.requires(fixture_name):
1177 # if not is_skipped_or_failed(c): # HOW can we implement this based on call (and not item) ???
1178 # used = True
1179 # break
1180 #
1181 # if not used:
1182 # # explicitly add it as discarded everywhere by creating a parameter value for it.
1183 # for i in range(nb_calls):
1184 # c = calls[i]
1185 # c.params[fixture_name] = NOT_USED
1186 # c.indices[fixture_name] = 0
1187 # c._arg2scopenum[fixture_name] = _scopenum # noqa
1190# def get_calls_for_partition(metafunc, super_closure, p_idx, pending):
1191# """
1192# Parametrizes all fixtures that are actually used in this partition
1193# Cleans the calls list so that all calls contain a value for all parameters. This is basically
1194# about adding "NOT_USED" parametrization everywhere relevant.
1195#
1196# :return: a list of CallSpec2
1197# """
1198# calls = []
1199#
1200# # A/ parametrize all fixtures that are actually used in this partition
1201# for fixture_name in super_closure.partitions[p_idx]:
1202# try:
1203# # pop it from pending - do not rely the order in pending but rather the order in the closure
1204# p_to_apply = pending.pop(fixture_name)
1205# except KeyError:
1206# # not a parametrized fixture
1207# continue
1208# else:
1209# if isinstance(p_to_apply, UnionParamz):
1210# # ******** Union parametrization **********
1211# # selected_ids, selected_alternative = super_closure.get_parameter_to_apply(p_to_apply, p_idx)
1212# num, selected_filter = super_closure.filters[p_idx][p_to_apply.union_fixture_name]
1213# # in order to get the *actual* id to use (with all pytest subtleties in case of two identical ids
1214# # appearing in the list), we create a fake calls list
1215# fake_calls = _parametrize_calls(metafunc, [], p_to_apply.union_fixture_name,
1216# p_to_apply.alternative_names, ids=p_to_apply.ids,
1217# scope=p_to_apply.scope, indirect=True, **p_to_apply.kwargs)
1218# selected_id = fake_calls[num].id
1219# selected_alternative = p_to_apply.alternative_names[num]
1220# # assert selected_alternative.alternative_name == selected_filter
1221#
1222# if _DEBUG:
1223# print("[Partition %s] Applying parametrization for UNION fixture %r=%r"
1224# "" % (p_idx, p_to_apply.union_fixture_name, selected_alternative))
1225#
1226# # always use 'indirect' since that's a fixture.
1227# calls = _parametrize_calls(metafunc, calls, p_to_apply.union_fixture_name,
1228# [selected_alternative], ids=[selected_id], scope=p_to_apply.scope,
1229# indirect=True, **p_to_apply.kwargs)
1230#
1231# elif isinstance(p_to_apply, NormalParamz):
1232# # ******** Normal parametrization **********
1233# if _DEBUG:
1234# print("[Partition %s] Applying parametrization for NORMAL %s"
1235# "" % (p_idx, p_to_apply.argnames))
1236#
1237# calls = _parametrize_calls(metafunc, calls, p_to_apply.argnames, p_to_apply.argvalues,
1238# indirect=p_to_apply.indirect, ids=p_to_apply.ids,
1239# scope=p_to_apply.scope, **p_to_apply.kwargs)
1240# else:
1241# raise TypeError("Invalid parametrization type: %s" % p_to_apply.__class__)
1242#
1243# # Cleaning
1244# for i in range(len(calls)):
1245# c = calls[i]
1246#
1247# # B/ set to "not used" all parametrized fixtures that were not used in some branches
1248# for fixture_name, p_to_apply in pending.items():
1249# if fixture_name not in c.params and fixture_name not in c.funcargs:
1250# # parametrize with a single "not used" value and discard the id
1251# if isinstance(p_to_apply, UnionParamz):
1252# c_with_dummy = _parametrize_calls(metafunc, [c], p_to_apply.union_fixture_name, [NOT_USED],
1253# indirect=True, discard_id=True, scope=p_to_apply.scope,
1254# **p_to_apply.kwargs)
1255# else:
1256# _nb_argnames = len(get_param_argnames_as_list(p_to_apply.argnames))
1257# if _nb_argnames > 1:
1258# _vals = [(NOT_USED,) * _nb_argnames]
1259# else:
1260# _vals = [NOT_USED]
1261# c_with_dummy = _parametrize_calls(metafunc, [c], p_to_apply.argnames, _vals,
1262# indirect=p_to_apply.indirect, discard_id=True,
1263# scope=p_to_apply.scope, **p_to_apply.kwargs)
1264# assert len(c_with_dummy) == 1
1265# calls[i] = c_with_dummy[0]
1266# c = calls[i]
1267#
1268# # C/ some non-parametrized fixtures may also need to be explicitly deactivated in some callspecs
1269# # otherwise they will be setup/teardown.
1270# #
1271# # For this we use a dirty hack: we add a parameter with they name in the callspec, it seems to be propagated
1272# # in the `request`. TODO is there a better way?
1273# for fixture_name in super_closure.get_not_always_used():
1274# try:
1275# fixdef = metafunc._arg2fixturedefs[fixture_name] # noqa
1276# except KeyError:
1277# continue # dont raise any error here and instead let pytest say "not found"
1278#
1279# if fixture_name not in c.params and fixture_name not in c.funcargs:
1280# if not super_closure.requires(fixture_name, p_idx):
1281# # explicitly add it as discarded by creating a parameter value for it.
1282# c.params[fixture_name] = NOT_USED
1283# c.indices[fixture_name] = 1
1284# c._arg2scopenum[fixture_name] = get_pytest_scopenum(fixdef[-1].scope) # noqa
1285# else:
1286# # explicitly add it as active by creating a parameter value for it.
1287# c.params[fixture_name] = 'used'
1288# c.indices[fixture_name] = 0
1289# c._arg2scopenum[fixture_name] = get_pytest_scopenum(fixdef[-1].scope) # noqa
1290#
1291# return calls
1294@property
1295def id(self):
1296 # legacy _CallSpec2 id was filtering empty strings, we'll put it back on the class below
1297 # https://github.com/pytest-dev/pytest/blob/5.3.4/src/_pytest/python.py#L861
1298 return "-".join(map(str, filter(None, self._idlist)))
1301def _parametrize_calls(metafunc, init_calls, argnames, argvalues, discard_id=False, indirect=False, ids=None,
1302 scope=None, **kwargs):
1303 """Parametrizes the initial `calls` with the provided information and returns the resulting new calls"""
1305 # make a backup so that we can restore the metafunc at the end
1306 bak = metafunc._calls # noqa
1308 # place the initial calls on the metafunc
1309 metafunc._calls = init_calls if init_calls is not None else []
1311 # parametrize the metafunc. Since we replaced the `parametrize` method on `metafunc` we have to call super
1312 metafunc.__class__.parametrize(metafunc, argnames, argvalues, indirect=indirect, ids=ids, scope=scope, **kwargs)
1314 # extract the result
1315 new_calls = metafunc._calls # noqa
1317 # If the user wants to discard the newly created id, remove the last id in all these callspecs in this node
1318 if discard_id:
1319 for callspec in new_calls:
1320 callspec._idlist.pop(-1) # noqa
1322 # restore the metafunc and return the new calls
1323 metafunc._calls = bak
1324 return new_calls
1327def _process_node(metafunc,
1328 current_node, # type: FixtureClosureNode
1329 pending, # type: MutableMapping[str, Union[UnionParamz, NormalParamz]]
1330 calls # type: List[CallSpec2]
1331 ):
1332 """
1333 Routine to apply all the parametrization tasks in `pending` that are relevant to `current_node`,
1334 to `calls` (a list of pytest CallSpec2).
1336 It first applies all parametrization that correspond to current node (normal parameters),
1337 then applies the "split" parametrization if needed and recurses into each tree branch.
1339 It returns a tuple containing a list of calls and a list of same length containing which leaf node each one
1340 corresponds to.
1342 :param metafunc:
1343 :param current_node: the closure tree node we're focusing on
1344 :param pending: a list of parametrization orders to apply
1345 :param calls:
1346 :return: a tuple (calls, nodes) of two lists of the same length. So that for each CallSpec calls[i], you can see
1347 the corresponding leaf node in nodes[i]
1348 """
1350 # (1) first apply all **non-split** fixtures at this node = NORMAL PARAMETERS
1351 # in the order defined in the closure tree, do not trust the order of the received parametrize (`pending`)
1352 fixtures_at_this_node = [f for f in current_node.fixture_defs.keys()
1353 if f is not current_node.split_fixture_name]
1354 for fixturename in fixtures_at_this_node:
1355 try:
1356 # pop the corresponding parametrization from pending - do not trust the order
1357 p_to_apply = pending.pop(fixturename)
1358 except KeyError:
1359 # fixturename is not a parametrized fixture, nothing to do
1360 continue
1361 else:
1362 if isinstance(p_to_apply, UnionParamz): 1362 ↛ 1363line 1362 didn't jump to line 1363, because the condition on line 1362 was never true
1363 raise ValueError("This should not happen! Only Normal parameters should be in fixtures_at_this_node")
1364 elif isinstance(p_to_apply, NormalParamz): 1364 ↛ 1374line 1364 didn't jump to line 1374, because the condition on line 1364 was never false
1365 # ******** Normal parametrization **********
1366 if _DEBUG: 1366 ↛ 1367line 1366 didn't jump to line 1367, because the condition on line 1366 was never true
1367 print("[Node %s] Applying parametrization for NORMAL %s"
1368 "" % (current_node.to_str(with_children=False), p_to_apply.argnames))
1370 calls = _parametrize_calls(metafunc, calls, p_to_apply.argnames, p_to_apply.argvalues,
1371 indirect=p_to_apply.indirect, ids=p_to_apply.ids,
1372 scope=p_to_apply.scope, **p_to_apply.kwargs)
1373 else:
1374 raise TypeError("Invalid parametrization type: %s" % p_to_apply.__class__)
1376 # (2) then is there a "union" = a split between two sub-branches in the tree ?
1377 if not current_node.has_split():
1378 # No split = tree leaf: return
1379 nodes = [current_node] * len(calls)
1380 return calls, nodes
1381 else:
1382 # There is a **split** : apply its parametrization (a UNION parameter)
1383 try:
1384 # pop the corresponding parametrization from pending - do not trust the order
1385 p_to_apply = pending.pop(current_node.split_fixture_name)
1386 except KeyError:
1387 raise ValueError("This should not happen! fixture union parametrization missing, but this is a split node")
1388 else:
1389 if isinstance(p_to_apply, NormalParamz): 1389 ↛ 1390line 1389 didn't jump to line 1390, because the condition on line 1389 was never true
1390 raise ValueError("This should not happen! Split nodes correspond to Union parameters, not Normal ones.")
1391 elif isinstance(p_to_apply, UnionParamz): 1391 ↛ exitline 1391 didn't return from function '_process_node', because the condition on line 1391 was never false
1392 # ******** Union parametrization **********
1393 if _DEBUG: 1393 ↛ 1394line 1393 didn't jump to line 1394, because the condition on line 1393 was never true
1394 print("[Node %s] Applying parametrization for UNION %s"
1395 "" % (current_node.to_str(with_children=False), p_to_apply.union_fixture_name))
1397 # always use 'indirect' since that's a fixture.
1398 calls = _parametrize_calls(metafunc, calls, p_to_apply.union_fixture_name,
1399 p_to_apply.alternative_names, indirect=True,
1400 ids=p_to_apply.ids, scope=p_to_apply.scope, **p_to_apply.kwargs)
1402 # now move to the children
1403 nodes_children = [None] * len(calls)
1404 for i in range(len(calls)):
1405 active_alternative = calls[i].params[p_to_apply.union_fixture_name]
1406 child_indices = [_i for _i, x in enumerate(current_node.split_fixture_alternatives)
1407 if x == active_alternative.alternative_name]
1408 # only use the first matching child, since the subtrees are identical.
1409 child_node = current_node.children[child_indices[0]]
1410 child_pending = pending.copy()
1412 # place the children parameter in the first position if it is in the list
1413 # not needed anymore - already automatic
1414 # try:
1415 # child_pending.move_to_end(child_alternative, last=False)
1416 # except KeyError:
1417 # # not in the list: the child alternative is a non-parametrized fixture
1418 # pass
1420 calls[i], nodes_children[i] = _process_node(metafunc, child_node, child_pending, [calls[i]])
1422 # finally flatten the list if needed
1423 calls = flatten_list(calls)
1424 nodes_children = flatten_list(nodes_children)
1425 return calls, nodes_children
1428# def _make_unique(lst):
1429# _set = set()
1430#
1431# def _first_time_met(v):
1432# if v not in _set:
1433# _set.add(v)
1434# return True
1435# else:
1436# return False
1437#
1438# return [v for v in lst if _first_time_met(v)]
1441def flatten_list(lst):
1442 return [v for nested_list in lst for v in nested_list]
1445def sort_according_to_ref_list(fixturenames, param_names):
1446 """
1447 Sorts items in the first list, according to their position in the second.
1448 Items that are not in the second list stay in the same position, the others are just swapped.
1449 A new list is returned.
1451 :param fixturenames:
1452 :param param_names:
1453 :return:
1454 """
1455 cur_indices = []
1456 for pname in param_names:
1457 try:
1458 cur_indices.append(fixturenames.index(pname))
1459 except (ValueError, IndexError):
1460 # can happen in case of indirect parametrization: a parameter is not in the fixture name.
1461 # TODO we should maybe rather add the pname to fixturenames in this case ?
1462 pass
1463 target_indices = sorted(cur_indices)
1464 sorted_fixturenames = list(fixturenames)
1465 for old_i, new_i in zip(cur_indices, target_indices):
1466 sorted_fixturenames[new_i] = fixturenames[old_i]
1467 return sorted_fixturenames
1470_OPTION_NAME = 'with_reorder'
1471_SKIP = 'skip'
1472_NORMAL = 'normal'
1473_OPTIONS = {
1474 _NORMAL: """(default) the usual reordering done by pytest to optimize setup/teardown of session- / module-
1475/ class- fixtures, as well as all the modifications made by other plugins (e.g. pytest-reorder)""",
1476 _SKIP: """skips *all* reordering, even the one done by pytest itself or installed plugins
1477(e.g. pytest-reorder)"""
1478}
1481# @hookspec(historic=True)
1482def pytest_addoption(parser):
1483 group = parser.getgroup('pytest-cases ordering', 'pytest-cases reordering options', after='general')
1484 help_str = """String specifying one of the reordering alternatives to use. Should be one of :
1485 - %s""" % ("\n - ".join(["%s: %s" % (k, v) for k, v in _OPTIONS.items()]))
1486 group.addoption(
1487 '--%s' % _OPTION_NAME.replace('_', '-'), type=str, default='normal', help=help_str
1488 )
1491# will be loaded when the pytest_configure hook below is called
1492PYTEST_CONFIG = None # type: Optional[Config]
1495def pytest_load_initial_conftests(early_config):
1496 # store the received config object for future use; see #165 #166 #196
1497 global PYTEST_CONFIG
1498 PYTEST_CONFIG = early_config
1501# @hookspec(historic=True)
1502def pytest_configure(config):
1503 # validate the config
1504 allowed_values = ('normal', 'skip')
1505 reordering_choice = config.getoption(_OPTION_NAME)
1506 if reordering_choice not in allowed_values: 1506 ↛ 1507line 1506 didn't jump to line 1507, because the condition on line 1506 was never true
1507 raise ValueError("[pytest-cases] Wrong --%s option: %s. Allowed values: %s"
1508 "" % (_OPTION_NAME, reordering_choice, allowed_values))
1511@pytest.hookimpl(tryfirst=True, hookwrapper=True)
1512def pytest_collection_modifyitems(session, config, items): # noqa
1513 """
1514 An alternative to the `reorder_items` function in fixtures.py
1515 (https://github.com/pytest-dev/pytest/blob/master/src/_pytest/fixtures.py#L209)
1517 We basically set back the previous order once the pytest ordering routine has completed.
1519 TODO we should set back an optimal ordering, but current PR https://github.com/pytest-dev/pytest/pull/3551
1520 will probably not be relevant to handle our "union" fixtures > need to integrate the NOT_USED markers in the method
1522 :param session:
1523 :param config:
1524 :param items:
1525 :return:
1526 """
1527 ordering_choice = config.getoption(_OPTION_NAME)
1529 if ordering_choice == _SKIP:
1530 # remember initial order
1531 initial_order = copy(items)
1532 yield
1533 # put back the initial order but keep the filter
1534 to_return = [None] * len(items)
1535 i = 0
1536 for item in initial_order:
1537 if item in items:
1538 to_return[i] = item
1539 i += 1
1540 assert i == len(items)
1541 items[:] = to_return
1543 else:
1544 # do nothing
1545 yield
1548@pytest.fixture
1549def current_cases(request):
1550 """
1551 A fixture containing `get_current_cases(request)`
1553 This is a dictionary containing all case parameters for the currently active `pytest` item.
1554 For each test function argument parametrized using a `@parametrize_with_case(<argname>, ...)` this dictionary
1555 contains an entry `{<argname>: (case_id, case_function, case_params)}`. If several argnames are parametrized this
1556 way, a dedicated entry will be present for each argname. The tuple is a `namedtuple` containing
1558 - `id` a string containing the actual case id constructed by `@parametrize_with_cases`.
1559 - `function` the original case function.
1560 - `params` a dictionary, containing the parameters of the case, if itself is parametrized. Note that if the
1561 case is parametrized with `@parametrize_with_cases`, the associated parameter value in the dictionary will also be
1562 `(actual_id, case_function, case_params)`.
1564 If a fixture parametrized with cases is active, the dictionary will contain an entry `{<fixturename>: <dct>}` where
1565 `<dct>` is a dictionary `{<argname>: (case_id, case_function, case_params)}`.
1567 To get more information on a case function, you can use `get_case_marks(f)`, `get_case_tags(f)`.
1568 You can also use `matches_tag_query` to check if a case function matches some expectations either concerning its id
1569 or its tags. See https://smarie.github.io/python-pytest-cases/#filters-and-tags
1570 """
1571 return get_current_cases(request)