⬅ pytest_cases/plugin.py source

1 # Authors: Sylvain MARIE <sylvain.marie@se.com>
2 # + All contributors to <https://github.com/smarie/python-pytest-cases>
3 #
4 # License: 3-clause BSD, <https://github.com/smarie/python-pytest-cases/blob/master/LICENSE>
5 from collections import OrderedDict, namedtuple
6 from copy import copy
7 from functools import partial
8 from warnings import warn
9  
10 try:
11 from collections.abc import MutableSequence
12 except: # noqa
13 from collections import MutableSequence
14  
15 import pytest
16  
17 try: # python 3.3+
18 from inspect import signature
19 except ImportError:
20 from funcsigs import signature # noqa
21  
22 try: # python 3.3+ type hints
23 from typing import List, Tuple, Union, Iterable, MutableMapping, Mapping, Optional # noqa
24 from _pytest.python import CallSpec2
25 from _pytest.config import Config
26 except ImportError:
27 pass
28  
29 from .common_mini_six import string_types
30 from .common_pytest_lazy_values import get_lazy_args
  • E501 Line too long (134 > 120 characters)
31 from .common_pytest_marks import PYTEST35_OR_GREATER, PYTEST46_OR_GREATER, PYTEST37_OR_GREATER, PYTEST7_OR_GREATER, PYTEST8_OR_GREATER
32 from .common_pytest import get_pytest_nodeid, get_pytest_function_scopeval, is_function_node, get_param_names, \
33 get_param_argnames_as_list, has_function_scope, set_callspec_arg_scope_to_function, in_callspec_explicit_args
34  
35 from .fixture_core1_unions import NOT_USED, USED, is_fixture_union_params, UnionFixtureAlternative
36  
37 # if PYTEST54_OR_GREATER:
38 # # we will need to clean the empty ids explicitly in the plugin :'(
39 from .fixture_parametrize_plus import remove_empty_ids
40  
41 from .case_parametrizer_new import get_current_cases
42  
43  
44 _DEBUG = False
45 """Note: this is a manual flag to turn when developing (do not forget to also call pytest with -s)"""
46  
47  
48 # @pytest.hookimpl(hookwrapper=True, tryfirst=True)
49 # def pytest_pycollect_makeitem(collector, name, obj):
50 # # custom collection of additional things - we could use it one day for Cases ?
51 # # see also https://hackebrot.github.io/pytest-tricks/customize_class_collection/
52 # outcome = yield
53 # res = outcome.get_result()
54 # if res is not None:
55 # return
56 # # nothing was collected elsewhere, let's do it here
57 # if safe_isclass(obj):
58 # if collector.istestclass(obj, name):
59 # outcome.force_result(Class(name, parent=collector))
60 # elif collector.istestfunction(obj, name):
61 # ...
62  
63  
64 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
65 def pytest_runtest_setup(item):
66 """ Resolve all `lazy_value` in the dictionary of function args """
67  
68 yield # first let all other hooks run, they will do the setup etc.
69  
70 # now item.funcargs exists so we can handle it
71 if hasattr(item, "funcargs"):
72 item.funcargs = {argname: get_lazy_args(argvalue, item)
73 for argname, argvalue in item.funcargs.items()}
74  
75  
76 # @pytest.hookimpl(tryfirst=True, hookwrapper=True)
77 def pytest_collection(session):
78 """ HACK: override the fixture manager's `getfixtureclosure` method to replace it with ours """
79  
80 # Note for reference: another way to access the fm is `metafunc.config.pluginmanager.get_plugin('funcmanage')`
81 session._fixturemanager.getfixtureclosure = partial(getfixtureclosure, session._fixturemanager) # noqa
82  
83  
84 class FixtureDefsCache(object):
85 """
86 A 'cache' for fixture definitions obtained from the FixtureManager `fm`, for test node `nodeid`
87 """
88 __slots__ = 'fm', 'node', 'cached_fix_defs'
89  
90 def __init__(self, fm, node):
91 self.fm = fm
92 self.node = node
93 self.cached_fix_defs = dict()
94  
95 def get_fixture_defs(self, fixname):
96 try:
97 # try to retrieve it from cache
98 fixdefs = self.cached_fix_defs[fixname]
99 except KeyError:
100 # otherwise get it and store for next time
101 if hasattr(pytest, "version_tuple") and pytest.version_tuple >= (8, 1):
102 fixdefs = self.fm.getfixturedefs(fixname, self.node)
103 else:
104 fixdefs = self.fm.getfixturedefs(fixname, self.node.nodeid)
105 self.cached_fix_defs[fixname] = fixdefs
106  
107 return fixdefs
108  
109  
110 class FixtureClosureNode(object):
111 """
112 A node in a fixture closure Tree.
113  
114 - its `fixture_defs` is a {name: def} ordered dict containing all fixtures AND args that are required at this node
115 (*before* a union is required). Note that some of them have def=None when the fixture manager has no definition
116 for them (same behaviour than in pytest). `get_all_fixture_names` and `get_all_fixture_defs` helper functions
117 allow to either return the full ordered list (equivalent to pytest `fixture_names`) or the dictionary of non-none
118 definitions (equivalent to pytest `arg2fixturedefs`)
119  
120 - if a union appears at this node, `split_fixture_name` is set to the name of the union fixture, and `children`
121 contains an ordered dict of {split_fixture_alternative: node}
122  
123 """
124 __slots__ = 'parent', 'fixture_defs_mgr', \
125 'fixture_defs', 'split_fixture_name', 'split_fixture_alternatives', 'children'
126  
127 def __init__(self,
128 fixture_defs_mgr=None, # type: FixtureDefsCache
129 parent_node=None # type: FixtureClosureNode
130 ):
131 if fixture_defs_mgr is None:
132 if parent_node is None:
133 raise ValueError("root node should have a fixture defs manager")
134 fixture_defs_mgr = parent_node.fixture_defs_mgr
135 else:
  • S101 Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.
136 assert isinstance(fixture_defs_mgr, FixtureDefsCache)
137  
138 self.fixture_defs_mgr = fixture_defs_mgr
139 self.parent = parent_node
140  
141 # these will be set after closure has been built
142 self.fixture_defs = None # type: OrderedDict
143 self.split_fixture_name = None # type: str
144 self.split_fixture_alternatives = []
145 # we do not use a dict any more as several children can use the same union value (doubled unions)
146 self.children = [] # type: List[FixtureClosureNode]
147  
148 # ------ tree ------------------
149  
150 def get_leaves(self):
151 if self.has_split():
152 return [n for c in self.children for n in c.get_leaves()]
153 else:
154 return [self]
155  
156 # ------ str / repr ---------------
157  
158 def to_str(self, indent_nb=0, with_children=True):
159 """ a string representation, either with all the subtree (default) or without (with_children=False) """
160  
161 indent = " " * indent_nb
162  
163 if not self.is_closure_built():
164 str_repr = "<pending, incomplete>"
165 else:
166 str_repr = "%s(%s)" % (indent, ",".join([("%s" % f) for f in self.fixture_defs.keys()]))
167  
168 if self.has_split() and with_children:
169 children_str_prefix = "\n%s - " % indent
170 children_str = children_str_prefix + children_str_prefix.join([c.to_str(indent_nb=indent_nb + 1)
171 for c in self.children])
172 str_repr = str_repr + " split: " + self.split_fixture_name + children_str
173  
174 return str_repr
175  
176 def __repr__(self):
177 return self.to_str()
178  
179 # ---- getters to read the "super" closure (used in SuperClosure)
180  
181 def get_all_fixture_names(self, try_to_sort_by_scope=True):
182 """ Return a list containing all unique fixture names used by this tree"""
183 if not try_to_sort_by_scope:
184 return [k for k, _ in self.gen_all_fixture_defs(drop_fake_fixtures=False)]
185 else:
186 return list(self.get_all_fixture_defs(drop_fake_fixtures=False, try_to_sort=True))
187  
188 def get_all_fixture_defs(self, drop_fake_fixtures=True, try_to_sort=True):
189 """ Return a dict containing all fixture definitions for fixtures used in this tree"""
190 # get all pairs
191 items = self.gen_all_fixture_defs(drop_fake_fixtures=drop_fake_fixtures)
192  
193 # sort by scope as in pytest fixture closure creator (pytest did not do it in early versions, align with this)
194 if try_to_sort:
195 if PYTEST7_OR_GREATER:
196 # Scope is an enum, values are in reversed order, and the field is _scope
197 f_scope = get_pytest_function_scopeval()
198  
199 def sort_by_scope(kv_pair):
200 fixture_name, fixture_defs = kv_pair
201 return fixture_defs[-1]._scope if fixture_defs is not None else f_scope
202 items = sorted(list(items), key=sort_by_scope, reverse=True)
203  
204 elif PYTEST35_OR_GREATER:
205 # scopes is a list, values are indices in the list, and the field is scopenum
206 f_scope = get_pytest_function_scopeval()
207 def sort_by_scope(kv_pair): # noqa
208 fixture_name, fixture_defs = kv_pair
209 return fixture_defs[-1].scopenum if fixture_defs is not None else f_scope
210 items = sorted(list(items), key=sort_by_scope)
211  
212 return OrderedDict(items)
213  
214 def gen_all_fixture_defs(self, drop_fake_fixtures=True):
215 """
216 Generate all pairs of (fixture name, fixture def or none) used in the tree in top to bottom order
217 Note that this method could be generalized to also yield the parent defs, so as to be used to replace
218 the engine in `self.gather_all_required`. But this is micro-optimization, really.
219 Note: `gather_all_required` was not built to be concerned with ordering because it is only used as a set.
220 """
221  
222 # fixtures required at this node
223 for k, v in self.fixture_defs.items():
224 if not drop_fake_fixtures or v is not None:
225 yield k, v
226  
227 # split fixture: not needed since it is the last entry in self.fixture_defs
228  
229 # fixtures required by children if any
230 for c in self.children:
231 for k, v in c.gen_all_fixture_defs(drop_fake_fixtures=drop_fake_fixtures):
232 yield k, v
233  
234 # ---- utils to build the closure
235  
236 def build_closure(self,
237 initial_fixture_names, # type: Iterable[str]
238 ignore_args=()
239 ):
240 """
241 Updates this Node with the fixture names provided as argument.
242 Fixture names and definitions will be stored in self.fixture_defs.
243  
244 If some fixtures are Union fixtures, this node will become a "split" node
245 and have children. If new fixtures are added to the node after that,
246 they will be added to the child nodes rather than self.
247  
248 :param initial_fixture_names:
249 :param ignore_args: arguments to keep in the names but not to put in the fixture defs, because they correspond
250 to "direct parametrization"
251 :return:
252 """
253 self._build_closure(self.fixture_defs_mgr, initial_fixture_names, ignore_args=ignore_args)
254  
255 def is_closure_built(self):
256 return self.fixture_defs is not None
257  
258 def already_knows_fixture(self, fixture_name):
259 """ Return True if this fixture is known by this node or one of its parents """
260 if fixture_name in self.fixture_defs:
261 return True
262 elif self.parent is None:
263 return False
264 else:
265 return self.parent.already_knows_fixture(fixture_name)
266  
267 def _build_closure(self,
268 fixture_defs_mgr, # type: FixtureDefsCache
269 initial_fixture_names, # type: Iterable[str]
270 ignore_args
271 ):
272 """
273  
274 :param fixture_defs_mgr:
275 :param initial_fixture_names:
276 :param ignore_args: arguments to keep in the names but not to put in the fixture defs
277 :return: nothing (the input arg2fixturedefs is modified)
278 """
279  
280 # Grab all dependencies of all fixtures present at this node and add them to either this or to nodes below.
281  
282 # -- first switch this object from 'pending' to 'under construction' if needed
283 # (indeed we now authorize and use the possibility to call this twice. see split() )
284 if self.fixture_defs is None:
285 self.fixture_defs = OrderedDict()
286  
287 # -- then for all pending, add them with their dependencies
288 pending_fixture_names = list(initial_fixture_names)
289 while len(pending_fixture_names) > 0:
290 fixname = pending_fixture_names.pop(0)
291  
292 # if the fixture is already known in this node or above, do not care
293 if self.already_knows_fixture(fixname):
294 continue
295  
296 # new ignore_args option in pytest 4.6+. Not really a fixture but a test function parameter, it seems.
297 if fixname in ignore_args:
298 self.add_required_fixture(fixname, None)
299 continue
300  
301 # else grab the fixture definition(s) for this fixture name for this test node id
302 fixturedefs = fixture_defs_mgr.get_fixture_defs(fixname)
303 if not fixturedefs:
304 # fixture without definition: add it. This can happen with e.g. "requests", etc.
305 self.add_required_fixture(fixname, None)
306 continue
307 else:
308 # the actual definition is the last one
309 _fixdef = fixturedefs[-1]
310 _params = _fixdef.params
311  
312 if _params is not None and is_fixture_union_params(_params):
313 # create an UNION fixture
314  
315 # transform the _params into a list of names
316 alternative_f_names = UnionFixtureAlternative.to_list_of_fixture_names(_params)
317  
318 # TO DO if only one name, simplify ? >> No, we leave such "optimization" to the end user
319  
320 # if there are direct dependencies that are not the union members, add them to pending
321 non_member_dependencies = [f for f in _fixdef.argnames if f not in alternative_f_names]
322 # currently we only have 'requests' in this list but future impl of fixture_union may act otherwise
323 pending_fixture_names += non_member_dependencies
324  
325 # propagate WITH the pending
326 self.split_and_build(fixture_defs_mgr, fixname, fixturedefs, alternative_f_names,
327 pending_fixture_names, ignore_args=ignore_args)
328  
329 # empty the pending because all of them have been propagated on all children with their dependencies
330 pending_fixture_names = []
331 continue
332  
333 else:
334 # normal fixture
335 self.add_required_fixture(fixname, fixturedefs)
336  
337 # add all dependencies in the to do list
338 dependencies = _fixdef.argnames
339 # - append: was pytest default
340 # pending_fixture_names += dependencies
341 # - prepend: makes much more sense
342 pending_fixture_names = list(dependencies) + pending_fixture_names
343 continue
344  
345 # ------ tools to add new fixture names during closure construction
346  
347 # def prepend_fixture_without_dependencies(self, fixname):
348 # """"""
349 # fixturedefs = self.fixture_defs_mgr.get_fixture_defs(fixname)
350 # if not fixturedefs:
351 # # fixture without definition: add it. This can happen with e.g. "requests", etc.
352 # self.fixture_defs.insert((fixname, None))
353 # else:
354 # # the actual definition is the last one
355 # _fixdef = fixturedefs[-1]
356 # _params = _fixdef.params
357 #
358 # if _params is not None and is_fixture_union_params(_params):
359 # # union fixture
360 # raise ValueError("It is not possible to add a union fixture after the initial closure has been built")
361 # else:
362 # # normal fixture
363 # self.add_required_fixture(fixname, fixturedefs)
364 #
365 # # add all dependencies in the to do list
366 # dependencies = _fixdef.argnames
367  
368 # def add_fixture_without_dependencies(self, fixname):
369 # """Used for later addition, once the closure has been built"""
370 # fixturedefs = self.fixture_defs_mgr.get_fixture_defs(fixname)
371 # if not fixturedefs:
372 # # fixture without definition: add it. This can happen with e.g. "requests", etc.
373 # self.add_required_fixture(fixname, None)
374 # else:
375 # # the actual definition is the last one
376 # _fixdef = fixturedefs[-1]
377 # _params = _fixdef.params
378 #
379 # if _params is not None and is_fixture_union_params(_params):
380 # # union fixture
381 # raise ValueError("It is not possible to add a union fixture after the initial closure has been built")
382 # else:
383 # # normal fixture
384 # self.add_required_fixture(fixname, fixturedefs)
385  
386 def remove_fixtures(self, fixture_names_to_remove):
387 """Remove some fixture names from all nodes in this subtree. These fixtures should not be split fixtures"""
388 _to_remove_in_children = []
389 for f in fixture_names_to_remove:
390 if self.split_fixture_name == f:
391 raise NotImplementedError("It is not currently possible to remove a split fixture name from a closure "
392 "with splits")
393 try:
394 del self.fixture_defs[f]
395 except KeyError:
396 _to_remove_in_children.append(f)
397  
398 # propagate to children if any
399 if len(_to_remove_in_children) > 0:
400 for c in self.children:
401 c.remove_fixtures(_to_remove_in_children)
402  
403 def add_required_fixture(self, new_fixture_name, new_fixture_defs):
404 """Add some required fixture names to all leaves under this node"""
405 if self.already_knows_fixture(new_fixture_name):
406 return
407 elif not self.has_split():
408 # add_required_fixture locally
409 if new_fixture_name not in self.fixture_defs:
410 self.fixture_defs[new_fixture_name] = new_fixture_defs
411 else:
412 # add_required_fixture in each child
413 for c in self.children:
414 c.add_required_fixture(new_fixture_name, new_fixture_defs)
415  
416 def split_and_build(self,
417 fixture_defs_mgr, # type: FixtureDefsCache
418 split_fixture_name, # type: str
419 split_fixture_defs, # type: Tuple[FixtureDefinition] # noqa
420 alternative_fixture_names, # type: List[str]
421 pending_fixtures_list, #
422 ignore_args
423 ):
424 """ Declares that this node contains a union with alternatives (child nodes=subtrees) """
425  
426 if self.has_split():
427 raise ValueError("This should not happen anymore")
428 # # propagate the split on the children: split each of them
429 # for n in self.children:
430 # n.split_and_build(fm, nodeid, split_fixture_name, split_fixture_defs, alternative_fixture_names)
431 else:
432 # add the split (union) name to known fixtures
433 self.add_required_fixture(split_fixture_name, split_fixture_defs)
434  
435 # remember it
436 self.split_fixture_name = split_fixture_name
437 self.split_fixture_alternatives = alternative_fixture_names
438  
439 # create the child nodes
440 for f in alternative_fixture_names:
441 # create the child node
442 new_c = FixtureClosureNode(parent_node=self)
443 self.children.append(new_c)
444  
445 # set the discarded fixture names
446 # new_c.split_fixture_discarded_names = [g for g in alternative_fixture_names if g != f]
447  
448 # perform the propagation:
449 # (a) first propagate all child's dependencies, (b) then the ones required by parent
450 # we need to do both at the same time in order to propagate the "pending for child" on all subbranches
451 pending_for_child = [f] + pending_fixtures_list
452 new_c._build_closure(fixture_defs_mgr, pending_for_child, ignore_args=ignore_args)
453  
454 def has_split(self):
455 return self.split_fixture_name is not None
456  
457 # ----------- for calls parametrization
458  
459 def get_not_always_used(self):
460 """Return the list of fixtures used by this subtree, that are used in *some* leaves only, not all"""
461 results_list = []
462  
463 # initial list is made of fixtures that are in the children
464 initial_list = self.gather_all_required(include_parents=False)
465  
466 for c in self.get_leaves():
467 j = 0
468 for _ in range(len(initial_list)):
469 # get next element in the list (but the list may reduce in size during the loop)
470 fixture_name = initial_list[j]
471 if fixture_name not in c.gather_all_required():
472 # Remove element from the list. Therefore, do not increment j
473 del initial_list[j]
474 results_list.append(fixture_name)
475 else:
476 # Do not remove from the list: increment j
477 j += 1
478  
479 return results_list
480  
481 def gather_all_required(self, include_children=True, include_parents=True):
482 """
483 Return a list of all fixtures required by the subtree containing this node
484 and all of its parents (if include_parents=True) and all of its children (if include_children=True)
485  
486 See also `self.gen_all_fixture_defs`, that could be generalized to tackle this use case too
487 (micro-optimization, not really urgent)
488 """
489 # first the fixtures required by this node
490 required = list(self.fixture_defs.keys())
491  
492 # then the ones required by the parents
493 if include_parents and self.parent is not None:
494 required = required + self.parent.gather_all_required(include_children=False)
495  
496 # then the ones from all the children
497 if include_children:
498 for child in self.children:
499 required = required + child.gather_all_required(include_parents=False)
500  
501 return required
502  
503 def requires(self, fixturename):
504 """ Return True if the fixture with this name is required by the subtree at this node """
505 return fixturename in self.gather_all_required()
506  
507 # ------ tools to see the tree as a list of alternatives (used in SuperClosure)
508  
509 def get_alternatives(self):
510 """
511 Returns the tree "flattened" as a list of alternatives (one per leaf).
512 Each entry in the list consists of:
513  
514 - an ordered dictionary {union_fixture_name: (idx, value)} representing the active union filters in this
515 alternative
516 - a list of fixture names effectively used in this alternative
517  
518 :return: a list of alternatives
519 """
520 alternatives = self._get_alternatives()
521 for i, a in enumerate(alternatives):
522 # replace the first entry in the tuple with a reversed order one
523 alternatives[i] = (OrderedDict(reversed(list(a[0].items()))), a[1])
524 return alternatives
525  
526 def _get_alternatives(self):
527 if self.has_split():
528 alternatives_list = []
529 for c_idx, (c_split_alternative, c_node) in enumerate(zip(self.split_fixture_alternatives, self.children)):
530 # for all alternatives in this subtree
531 for f_dct, n_lst in c_node._get_alternatives():
532 # - filter
533 _f_dct = f_dct.copy()
534 _f_dct[self.split_fixture_name] = (c_idx, c_split_alternative)
535  
536 # - unique fixtures used
537 _n_lst = list(self.fixture_defs) + [_i for _i in n_lst if _i not in self.fixture_defs]
538  
539 alternatives_list.append((_f_dct, _n_lst))
540  
541 return alternatives_list
542 else:
543 # return a single partition containing no filter and all fixture names
544 return [(OrderedDict(), self.get_all_fixture_names())]
545  
546  
547 class SuperClosure(MutableSequence):
548 """
549 A "super closure" is a closure made of several closures, each induced by a fixture union parameter value.
550 The number of alternative closures is `self.nb_alternative_closures`
551  
552 This object behaves like a list (a mutable sequence), so that we can pass it to pytest in place of the list of
553 fixture names that is returned in `getfixtureclosure`.
554  
555 In this implementation, it is backed by a fixture closure tree, that we have to preserve in order to get
556 parametrization right. In another branch of this project ('super_closure' branch) we tried to forget the tree
557 and only keep the partitions, but parametrization order was not as intuitive for the end user as all unions
558 appeared as parametrized first (since they induced the partitions).
559 """
560 __slots__ = 'tree', 'all_fixture_defs'
561  
562 def __init__(self,
563 root_node # type: FixtureClosureNode
564 ):
565 # if we wish to drop the tree - but we do not anymore to get a better paramz order
566 # filters_list, partitions_list = root_node._get_alternatives()
567  
568 # save the fixture closure tree root
569 self.tree = root_node
570 # retrieve/sort fixture defs for quicker access
571 self._update_fixture_defs()
572  
573 def _update_fixture_defs(self):
574 # get a list of all fixture defs, for quicker access (and sorted)
575 # sort by scope as in pytest fixture closure creator, if scope information is available
576 all_fixture_defs = self.tree.get_all_fixture_defs(drop_fake_fixtures=False, try_to_sort=True)
577  
578 # # also sort all partitions (note that we cannot rely on the order in all_fixture_defs when scopes are same!)
579 # if Version(pytest.__version__) >= Version('3.5.0'):
580 # f_scope = get_pytest_function_scopeval()
581 # for p in self.partitions:
582 # def sort_by_scope2(fixture_name): # noqa
583 # fixture_defs = all_fixture_defs[fixture_name]
584 # return fixture_defs[-1].scopenum if fixture_defs is not None else f_scope
585 # p.sort(key=sort_by_scope2)
586  
587 self.all_fixture_defs = all_fixture_defs
588  
589 # --- visualization tools ----
590  
591 @property
592 def nb_alternative_closures(self):
593 """ Return the number of alternative closures induced by fixture unions """
594 filters, partitions = self.tree.get_alternatives()
595 return len(partitions)
596  
597 def __repr__(self):
598 """ Return a synthetic view, and a detailed tree view, of this closure """
599 alternatives = self.tree.get_alternatives()
600 nb_alternative_closures = len(alternatives)
601 return "SuperClosure with %s alternative closures:\n" % nb_alternative_closures \
602 + "\n".join(" - %s (filters: %s)" % (p, ", ".join("%s=%s[%s]=%s" % (k, k, v[0], v[1])
603 for k, v in f.items()))
604 for f, p in alternatives) \
605 + "\nThe 'super closure list' is %s\n\nThe fixture tree is :\n%s\n" % (list(self), self.tree)
606  
607 def get_all_fixture_defs(self, drop_fake_fixtures=True):
608 """
609 Return a dictionary of all fixture defs used in this super closure
610  
611 note: this is equivalent to
612 self.tree.get_all_fixture_defs(drop_fake_fixtures=drop_fake_fixtures, try_to_sort=True)
613 """
614 if drop_fake_fixtures:
615 # remove the "fixtures" that are actually test function parameter args
616 return {k: v for k, v in self.all_fixture_defs.items() if v is not None}
617 else:
618 # all fixtures AND pseudo-fixtures (test function parameters)
619 return self.all_fixture_defs
620  
621 # ---- list (MutableSequence) facade: behaves like a list of fixture names ------
622  
623 def __len__(self):
624 return len(self.all_fixture_defs)
625  
626 def __getitem__(self, i):
627 # return the key (fixture name) associated with the i-th pair
628 # try:
629 # return next(islice(self.all_fixture_defs.keys(), i, i+1))
630 # except StopIteration:
631 # raise IndexError(i)
632 return list(self.all_fixture_defs.keys())[i]
633  
634 def __setitem__(self, i, o):
635 # try:
636 # # pytest performs a full replacement using [:] so we handle at least this case
637 # full_replace = i == slice(None, None, None)
638 # except: # noqa
639 # full_replace = False
640  
641 # Get the existing value(s) that we wish to replace
642 ref = list(self)[i]
643  
644 if o == ref:
645 # no change at all: of course we accept.
646 return
647  
648 if not isinstance(i, slice):
649 # In-place change of a single item: let's be conservative and reject for now
650 # if i == 0:
651 # self.remove(ref)
652 # self.insert(0, o)
653 # elif i == len(self) - 1:
654 # self.remove(ref)
655 # self.append(o)
656 # else:
657 raise NotImplementedError("Replacing an element in a super fixture closure is not currently implemented. "
658 "Please report this issue to the `pytest-cases` project.")
659 else:
660 # Replacement of multiple items at once: support reordering (ignored) and removal (actually done)
661 new_set = set(o)
662 ref_set = set(ref)
663 if new_set == ref_set:
664 # A change is required in the order of fixtures. Ignore but continue
665 warn("WARNING: An attempt was made to reorder a super fixture closure with unions. This is not yet "
666 "supported since the partitions use subsets of the fixtures ; please report it so that we can "
667 "find a suitable solution for your need.")
668 return
669  
670 added = new_set.difference(ref_set)
671 removed = ref_set.difference(new_set)
672 if len(added) == 0:
673 # Pure removal: ok.
674 self.remove_all(removed)
675 return
676 else:
677 # self.append_all(added)
678 # Rather be conservative for now
679 raise NotImplementedError("Adding elements to a super fixture closure with a slice is not currently"
680 "implemented. Please report this issue to the `pytest-cases` project.")
681  
682 def __delitem__(self, i):
683 self.remove(self[i])
684  
685 def insert(self, index, fixture_name):
686 """
687 Try to transparently support inserts. Since the underlying structure is a tree, only two cases
688 are supported: inserting at position 0 and appending at position len(self).
689  
690 Note that while appending has no restrictions, inserting at position 0 is only allowed for now if the
691 fixture to insert does not have a union in its associated closure.
692  
693 :param index:
694 :param fixture_name:
695 :return:
696 """
697 if index == 0:
698 # build the closure associated with this new fixture name
699 fixture_defs_mgr = FixtureDefsCache(self.tree.fixture_defs_mgr.fm, self.tree.fixture_defs_mgr.node)
700 closure_tree = FixtureClosureNode(fixture_defs_mgr=fixture_defs_mgr)
701 closure_tree.build_closure((fixture_name,))
702 if closure_tree.has_split():
703 raise NotImplementedError("When fixture unions are present, inserting a fixture in the closure at "
704 "position 0 is currently only supported if that fixture's closure does not"
705 "contain a union. Please report this so that we can find a suitable solution"
706 " for your need.")
707 else:
708 # remove those fixture definitions from all nodes in the tree
709 self.tree.remove_fixtures(closure_tree.fixture_defs.keys())
710  
711 # finally prepend the defs at the beginning of the dictionary in the first node
712 self.tree.fixture_defs = OrderedDict(list(closure_tree.fixture_defs.items())
713 + list(self.tree.fixture_defs.items()))
714  
715 elif index == len(self):
716 # appending is natively supported in our tree growing method
717 self.tree.build_closure((fixture_name,))
718 else:
719 raise NotImplementedError("When fixture unions are present, inserting a fixture in the closure at a "
720 "position different from 0 (prepend) or <end> (append) is non-trivial. Please"
721 "report this so that we can find a suitable solution for your need.")
722  
723 # Finally update self.fixture_defs so that the "list" view reflects the changes in self.tree
724 self._update_fixture_defs()
725  
726 def append_all(self, fixture_names):
727 """Append various fixture names to the closure"""
728 # appending is natively supported in our tree growing method
729 self.tree.build_closure(tuple(fixture_names))
730  
731 # Finally update self.fixture_defs so that the "list" view reflects the changes in self.tree
732 self._update_fixture_defs()
733  
734 def remove(self, value):
735 """
736 Try to transparently support removal. Note: since the underlying structure is a tree,
737 removing "union" fixtures is non-trivial so for now it is not supported.
738  
739 :param value:
740 :return:
741 """
742 # remove in the tree
743 self.tree.remove_fixtures((value,))
744  
745 # update fixture defs
746 self._update_fixture_defs()
747  
748 def remove_all(self, values):
749 """Multiple `remove` operations at once."""
750 # remove in the tree
751 self.tree.remove_fixtures(tuple(values))
752  
753 # update fixture defs
754 self._update_fixture_defs()
755  
756  
757 def _getfixtureclosure(fm, fixturenames, parentnode, ignore_args=()):
758 """
759 Replaces pytest's getfixtureclosure method to handle unions.
760 """
761  
762 # (1) first retrieve the normal pytest output for comparison
763 kwargs = dict()
764 if PYTEST46_OR_GREATER:
765 # new argument "ignore_args" in 4.6+
766 kwargs['ignore_args'] = ignore_args
767  
768 if PYTEST8_OR_GREATER:
769 # two outputs and sig change
770 ref_fixturenames, ref_arg2fixturedefs = fm.__class__.getfixtureclosure(fm, parentnode, fixturenames, **kwargs)
771 elif PYTEST37_OR_GREATER:
772 # three outputs
773 initial_names, ref_fixturenames, ref_arg2fixturedefs = \
774 fm.__class__.getfixtureclosure(fm, fixturenames, parentnode, **kwargs)
775 else:
776 # two outputs
777 ref_fixturenames, ref_arg2fixturedefs = fm.__class__.getfixtureclosure(fm, fixturenames, parentnode)
778  
779 # (2) now let's do it by ourselves to support fixture unions
780 _init_fixnames, super_closure, arg2fixturedefs = create_super_closure(fm, parentnode, fixturenames, ignore_args)
781  
782 # Compare with the previous behaviour TODO remove when in 'production' ?
783 # NOTE different order happens all the time because of our "prepend" strategy in the closure building
784 # which makes much more sense/intuition than pytest default
  • S101 Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.
785 assert set(super_closure) == set(ref_fixturenames)
  • S101 Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.
786 assert dict(arg2fixturedefs) == ref_arg2fixturedefs
787  
788 if PYTEST37_OR_GREATER and not PYTEST8_OR_GREATER:
789 return _init_fixnames, super_closure, arg2fixturedefs
790 else:
791 return super_closure, arg2fixturedefs
792  
793  
794 if PYTEST8_OR_GREATER:
795 def getfixtureclosure(fm, parentnode, initialnames, ignore_args):
796 return _getfixtureclosure(fm, fixturenames=initialnames, parentnode=parentnode, ignore_args=ignore_args)
797 else:
798 getfixtureclosure = _getfixtureclosure
799  
800  
801 def create_super_closure(fm,
802 parentnode,
803 fixturenames,
804 ignore_args
805 ):
806 # type: (...) -> Tuple[List, Union[List, SuperClosure], Mapping]
807 """
808  
809 :param fm:
810 :param parentnode:
811 :param fixturenames:
812 :param ignore_args:
813 :return:
814 """
815  
816 parentid = parentnode.nodeid
817  
818 if _DEBUG:
  • T001 Print found.
819 print("Creating closure for %s:" % parentid)
820  
821 # -- auto-use fixtures
822 if hasattr(pytest, "version_tuple") and pytest.version_tuple >= (8, 1):
823 _init_fixnames = list(fm._getautousenames(parentnode)) # noqa
824 else:
825 _init_fixnames = list(fm._getautousenames(parentid)) # noqa
826  
827 def _merge(new_items, into_list):
828 """ Appends items from `new_items` into `into_list`, only if they are not already there. """
829 for item in new_items:
830 if item not in into_list:
831 into_list.append(item)
832  
833 # -- required fixtures/params.
834 # ********* fix the order of initial fixtures: indeed this order may not be the right one ************
835 # this only works when pytest version is > 3.4, otherwise the parent node is a Module
836 if is_function_node(parentnode):
837 # grab all the parametrization on that node and fix the order.
838 # Note: on pytest >= 4 the list of param_names is probably the same than the `ignore_args` input
839 param_names = get_param_names(parentnode)
840  
841 sorted_fixturenames = sort_according_to_ref_list(fixturenames, param_names)
842 # **********
843 # merge the fixture names in correct order into the _init_fixnames
844 _merge(sorted_fixturenames, _init_fixnames)
845 else:
846 # we cannot sort yet - merge the fixture names into the _init_fixnames
847 _merge(fixturenames, _init_fixnames)
848  
849 # Bugfix GH#330 in progress...
850 # TODO analyze why in the test "fixture_union_0simplest
851 # the first node contains second, and the second contains first
852 # or TODO check the test for get_callspecs, it is maybe simpler
853  
854 # Finally create the closure
855 fixture_defs_mgr = FixtureDefsCache(fm, parentnode)
856 closure_tree = FixtureClosureNode(fixture_defs_mgr=fixture_defs_mgr)
857 closure_tree.build_closure(_init_fixnames, ignore_args=ignore_args)
858 super_closure = SuperClosure(closure_tree)
859 all_fixture_defs = super_closure.get_all_fixture_defs(drop_fake_fixtures=True)
860  
861 # possibly simplify into a list
862 if not closure_tree.has_split():
863 super_closure = list(super_closure)
864  
865 if _DEBUG:
  • T001 Print found.
866 print("Closure for %s completed:" % parentid)
  • T001 Print found.
867 print(closure_tree)
  • T001 Print found.
868 print(super_closure)
869  
870 return _init_fixnames, super_closure, all_fixture_defs
871  
872  
873 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
874 def pytest_generate_tests(metafunc):
875 """
876 We use this hook to replace the 'parametrize' function of `metafunc` with our own below, before it is called
877 by pytest. Note we could do it in a static way in pytest_sessionstart or plugin init hook but
878 that way we can still access the original method using metafunc.__class__.parametrize
879 """
880 # override the parametrize method.
881 metafunc.parametrize = partial(parametrize, metafunc)
882  
883 # now let pytest parametrize the call as usual
884 _ = yield
885  
886  
887 class UnionParamz(namedtuple('UnionParamz', ['union_fixture_name', 'alternative_names', 'ids', 'scope', 'kwargs'])):
888 """ Represents some parametrization to be applied, for a union fixture """
889  
890 __slots__ = ()
891  
892 def __str__(self):
893 return "[UNION] %s=[%s], ids=%s, scope=%s, kwargs=%s" \
894 "" % (self.union_fixture_name, ','.join([str(a) for a in self.alternative_names]),
895 self.ids, self.scope, self.kwargs)
896  
897  
898 class NormalParamz(namedtuple('NormalParamz', ['argnames', 'argvalues', 'indirect', 'ids', 'scope', 'kwargs'])):
899 """ Represents some parametrization to be applied """
900  
901 __slots__ = ()
902  
903 def __str__(self):
904 return "[NORMAL] %s=[%s], indirect=%s, ids=%s, scope=%s, kwargs=%s" \
905 "" % (self.argnames, self.argvalues, self.indirect, self.ids, self.scope, self.kwargs)
906  
907  
908 def parametrize(metafunc, argnames, argvalues, indirect=False, ids=None, scope=None, **kwargs):
909 """
910 This alternate implementation of metafunc.parametrize creates a list of calls that is not just the cartesian
911 product of all parameters (like the pytest behaviour). Instead, it offers an alternate list of calls taking into
912 account all "union" fixtures.
913  
914 For this, it replaces the `metafunc._calls` attribute with a `CallsReactor` instance, and feeds it with all
915 parameters and parametrized fixtures independently (not doing any cross-product during this call). The resulting
916 `CallsReactor` instance is then able to dynamically behave like the correct list of calls, lazy-creating that list
917 when it is used.
918 """
919 if not isinstance(metafunc.fixturenames, SuperClosure):
920 # legacy method
921 metafunc.__class__.parametrize(metafunc, argnames, argvalues, indirect=indirect, ids=ids, scope=scope, **kwargs)
922  
923 # clean EMPTY_ID : since they are never set by us in a normal parametrize, no need to do this here.
924 # if PYTEST54_OR_GREATER:
925 # for callspec in metafunc._calls:
926 # remove_empty_ids(callspec)
927 else:
928 # get or create our special container object
929 if not isinstance(metafunc._calls, CallsReactor): # noqa
930 # first call: should be an empty list
931 if len(metafunc._calls) > 0: # noqa
932 # If this happens, it is most probably because another plugin has called 'parametrize' before our hook
933 # plugin.py/pytest_generate_tests has replaced it with this function. It can be due to a regression
934 # in pluggy too, see https://github.com/smarie/python-pytest-cases/issues/302
935 raise ValueError("This should not happen - please file an issue")
936 metafunc._calls = CallsReactor(metafunc)
937 calls_reactor = metafunc._calls # noqa
938  
939 # detect union fixtures
940 if is_fixture_union_params(argvalues):
941 if ',' in argnames or not isinstance(argnames, string_types):
942 raise ValueError("Union fixtures can not be parametrized")
943 union_fixture_name = argnames
944 union_fixture_alternatives = argvalues
945 if indirect is False or len(kwargs) > 0:
946 raise ValueError("indirect cannot be set on a union fixture, as well as unknown kwargs")
947  
948 # add a union parametrization in the queue (but do not apply it now)
949 calls_reactor.append(UnionParamz(union_fixture_name, union_fixture_alternatives, ids, scope, kwargs))
950 else:
951 # add a normal parametrization in the queue (but do not apply it now)
952 calls_reactor.append(NormalParamz(argnames, argvalues, indirect, ids, scope, kwargs))
953  
954  
955 class CallsReactor(object):
956 """
957 This object replaces the list of calls that was in `metafunc._calls`.
958 It behaves like a list, but it actually builds that list dynamically based on all parametrizations collected
959 from the custom `metafunc.parametrize` above.
960  
961 There are therefore three steps:
962  
963 - when `metafunc.parametrize` is called, this object gets called on `add_union` or `add_param`. A parametrization
964 order gets stored in `self._pending`
965  
966 - when this object is first read as a list, all parametrization orders in `self._pending` are transformed into a
967 tree in `self._tree`, and `self._pending` is discarded. This is done in `create_tree_from_pending_parametrization`.
968  
969 - finally, the list is built from the tree using `self._tree.to_call_list()`. This will also be the case in
970 subsequent usages of this object.
971  
972 """
973 __slots__ = 'metafunc', '_pending', '_call_list'
974  
975 def __init__(self, metafunc):
976 self.metafunc = metafunc
977 self._pending = [] # type: List[Union[UnionParamz, NormalParamz]]
978 self._call_list = None
979  
980 # -- methods to provising parametrization orders without executing them --
981  
982 def append(self,
983 parametrization # type: Union[UnionParamz, NormalParamz]
984 ):
985 self._pending.append(parametrization)
986  
987 def print_parametrization_list(self):
988 """Helper method to print all pending parametrizations in this reactor """
  • T001 Print found.
989 print("\n".join([str(p) for p in self._pending]))
990  
991 # -- list facade --
992  
993 def __iter__(self):
994 return iter(self.calls_list)
995  
996 def __getitem__(self, item):
997 return self.calls_list[item]
998  
999 @property
1000 def calls_list(self):
1001 """
1002 Returns the list of calls. This property relies on self._tree, that is lazily created on first access,
1003 based on `self.parametrizations`.
1004 :return:
1005 """
1006 if self._call_list is None:
1007 # create the definitive tree.
1008 self.create_call_list_from_pending_parametrizations()
1009  
1010 return self._call_list
1011  
1012 # --- tree creation (executed once the first time this object is used as a list)
1013  
1014 def create_call_list_from_pending_parametrizations(self):
1015 """
1016 Takes all parametrization operations that are pending in `self._pending`,
1017 and creates a parametrization tree out of them.
1018  
1019 self._pending is set to None afterwards
1020 :return:
1021 """
1022 # self is on the _calls field, we'll temporarily remove it and finally set it back at the end of this call
  • S101 Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.
1023 assert self.metafunc._calls is self
1024  
1025 # ------ parametrize the calls --------
1026 # create a dictionary of pending fixturenames/argnames to parametrize.
1027 pending_dct = OrderedDict()
1028 for p in self._pending:
1029 k = get_param_argnames_as_list(p[0])
1030 # remember one of the argnames only so that we are able to detect where in the fixture tree the
1031 # parametrization applies (it will still be applied for all of its argnames, no worries: see _process_node)
1032 k = k[0]
1033 pending_dct[k] = p
1034  
1035 if _DEBUG:
  • T001 Print found.
1036 print("\n---- pending parametrization ----")
1037 self.print_parametrization_list()
  • T001 Print found.
1038 print("---------------------------------\n")
  • T001 Print found.
1039 print("Applying all of them in the closure tree nodes:")
1040  
1041 # grab the "super fixtures closure" created previously (see getfixtureclosure above)
1042 super_closure = self.metafunc.fixturenames
  • S101 Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.
1043 assert isinstance(super_closure, SuperClosure)
1044  
1045 # Apply parametrization for calls
1046 calls = get_calls_for_tree(self.metafunc, super_closure.tree, pending_dct)
1047 # Alternative: use the partitions for parametrization. The issue is that this leads to a less intuitive order
1048 # calls = []
1049 # for i in range(super_closure.nb_alternative_closures):
1050 # calls += get_calls_for_partition(self.metafunc, super_closure, i, pending.copy())
1051  
1052 if _DEBUG:
  • T001 Print found.
1053 print("\n".join(["%s[%s]: funcargs=%s, params=%s" % (get_pytest_nodeid(self.metafunc),
1054 c.id, c.params if PYTEST8_OR_GREATER else c.funcargs,
1055 c.params)
1056 for c in calls]) + "\n")
1057  
1058 # clean EMPTY_ID set by @parametrize when there is at least a MultiParamsAlternative
1059 # if PYTEST54_OR_GREATER:
1060 for callspec in calls:
1061 remove_empty_ids(callspec)
1062  
1063 # save the list and put back self as the _calls facade
1064 self._call_list = calls
1065 self.metafunc._calls = self
1066 # forget about all parametrizations now - this won't happen again
1067 self._pending = None
1068  
1069  
1070 def get_calls_for_tree(metafunc,
1071 fix_closure_tree, # type: FixtureClosureNode
1072 pending_dct # type: MutableMapping[str, Union[UnionParamz, NormalParamz]]
1073 ):
1074 """
1075 Creates the list of calls for `metafunc` based on
1076 :param metafunc:
1077 :param fix_closure_tree:
1078 :param pending:
1079 :return:
1080 """
1081 pending_dct = pending_dct.copy()
1082 calls, nodes_used_by_calls = _process_node(metafunc, fix_closure_tree, pending_dct, [])
1083 # for each call in calls, the node in nodes_used_by_calls is the corresponding tree leaf.
1084 _cleanup_calls_list(metafunc, fix_closure_tree, calls, nodes_used_by_calls, pending_dct)
1085 return calls
1086  
1087  
1088 def _cleanup_calls_list(metafunc,
1089 fix_closure_tree, # type: FixtureClosureNode
1090 calls, # type: List[CallSpec2]
1091 nodes, # type: List[FixtureClosureNode]
1092 pending_dct # type: MutableMapping[str, Union[UnionParamz, NormalParamz]]
1093 ):
1094 """
1095 Cleans the calls list so that all calls contain a value for all parameters. This is basically
1096 about adding "NOT_USED" parametrization everywhere relevant.
1097  
1098 :param calls:
1099 :param nodes:
1100 :param pending:
1101 :return:
1102 """
1103  
1104 nb_calls = len(calls)
1105 if nb_calls != len(nodes):
1106 raise ValueError("This should not happen !")
1107  
1108 # create ref lists of fixtures per scope
1109 _not_always_used_func_scoped = []
1110 # _not_always_used_other_scoped = []
1111 for fixture_name in fix_closure_tree.get_not_always_used():
1112 try:
1113 fixdef = metafunc._arg2fixturedefs[fixture_name] # noqa
1114 except KeyError:
1115 continue # dont raise any error here and let pytest say "not found" later
1116 else:
1117 if has_function_scope(fixdef[-1]):
1118 _not_always_used_func_scoped.append(fixture_name)
1119 # else:
1120 # _not_always_used_other_scoped.append(fixture_name)
1121  
1122 for i in range(nb_calls):
1123 c, n = calls[i], nodes[i]
1124  
1125 # A/ set to "not used" all parametrized fixtures that were not used in some branches
1126 for fixture, p_to_apply in pending_dct.items():
1127 if not in_callspec_explicit_args(c, fixture):
1128 # parametrize with a single "not used" value and discard the id
1129 if isinstance(p_to_apply, UnionParamz):
1130 c_with_dummy = _parametrize_calls(metafunc, [c], p_to_apply.union_fixture_name, [NOT_USED],
1131 indirect=True, discard_id=True, scope=p_to_apply.scope,
1132 **p_to_apply.kwargs)
1133 else:
1134 _nb_argnames = len(get_param_argnames_as_list(p_to_apply.argnames))
1135 if _nb_argnames > 1:
1136 _vals = [(NOT_USED,) * _nb_argnames]
1137 else:
1138 _vals = [NOT_USED]
1139 c_with_dummy = _parametrize_calls(metafunc, [c], p_to_apply.argnames, _vals,
1140 indirect=p_to_apply.indirect, discard_id=True,
1141 scope=p_to_apply.scope, **p_to_apply.kwargs)
  • S101 Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.
1142 assert len(c_with_dummy) == 1
1143 calls[i] = c_with_dummy[0]
1144 c = calls[i]
1145  
1146 # B/ function-scoped non-parametrized fixtures also need to be explicitly deactivated in the callspecs
1147 # where they are not required, otherwise they will be setup/teardown.
1148 #
1149 # For this we use a dirty hack: we add a parameter with they name in the callspec, it seems to be propagated
1150 # in the `request`. TODO is there a better way?
1151 for fixture_name in _not_always_used_func_scoped:
1152 if not in_callspec_explicit_args(c, fixture_name):
1153 if not n.requires(fixture_name):
1154 # explicitly add it as discarded by creating a parameter value for it.
1155 c.params[fixture_name] = NOT_USED
1156 c.indices[fixture_name] = 1
1157 set_callspec_arg_scope_to_function(c, fixture_name)
1158 else:
1159 # explicitly add it as active
1160 c.params[fixture_name] = USED
1161 c.indices[fixture_name] = 0
1162 set_callspec_arg_scope_to_function(c, fixture_name)
1163  
1164 # finally, if there are some session or module-scoped fixtures that
1165 # are used in *none* of the calls, they could be deactivated too
1166 # (see https://github.com/smarie/python-pytest-cases/issues/137)
1167 #
1168 # for fixture_name in _not_always_used_other_scoped:
1169 # _scopenum = metafunc._arg2fixturedefs[fixture_name][-1].scopenum
1170 #
1171 # # check if there is at least one call that actually uses the fixture and is not skipped...
1172 # # this seems a bit "too much" !! > WON'T FIX
1173 # used = False
1174 # for i in range(nb_calls):
1175 # c, n = calls[i], nodes[i]
1176 # if fixture_name in c.params or fixture_name in c.funcargs or n.requires(fixture_name):
1177 # if not is_skipped_or_failed(c): # HOW can we implement this based on call (and not item) ???
1178 # used = True
1179 # break
1180 #
1181 # if not used:
1182 # # explicitly add it as discarded everywhere by creating a parameter value for it.
1183 # for i in range(nb_calls):
1184 # c = calls[i]
1185 # c.params[fixture_name] = NOT_USED
1186 # c.indices[fixture_name] = 0
1187 # c._arg2scopenum[fixture_name] = _scopenum # noqa
1188  
1189  
1190 # def get_calls_for_partition(metafunc, super_closure, p_idx, pending):
1191 # """
1192 # Parametrizes all fixtures that are actually used in this partition
1193 # Cleans the calls list so that all calls contain a value for all parameters. This is basically
1194 # about adding "NOT_USED" parametrization everywhere relevant.
1195 #
1196 # :return: a list of CallSpec2
1197 # """
1198 # calls = []
1199 #
1200 # # A/ parametrize all fixtures that are actually used in this partition
1201 # for fixture_name in super_closure.partitions[p_idx]:
1202 # try:
1203 # # pop it from pending - do not rely the order in pending but rather the order in the closure
1204 # p_to_apply = pending.pop(fixture_name)
1205 # except KeyError:
1206 # # not a parametrized fixture
1207 # continue
1208 # else:
1209 # if isinstance(p_to_apply, UnionParamz):
1210 # # ******** Union parametrization **********
1211 # # selected_ids, selected_alternative = super_closure.get_parameter_to_apply(p_to_apply, p_idx)
1212 # num, selected_filter = super_closure.filters[p_idx][p_to_apply.union_fixture_name]
1213 # # in order to get the *actual* id to use (with all pytest subtleties in case of two identical ids
1214 # # appearing in the list), we create a fake calls list
1215 # fake_calls = _parametrize_calls(metafunc, [], p_to_apply.union_fixture_name,
1216 # p_to_apply.alternative_names, ids=p_to_apply.ids,
1217 # scope=p_to_apply.scope, indirect=True, **p_to_apply.kwargs)
1218 # selected_id = fake_calls[num].id
1219 # selected_alternative = p_to_apply.alternative_names[num]
1220 # # assert selected_alternative.alternative_name == selected_filter
1221 #
1222 # if _DEBUG:
1223 # print("[Partition %s] Applying parametrization for UNION fixture %r=%r"
1224 # "" % (p_idx, p_to_apply.union_fixture_name, selected_alternative))
1225 #
1226 # # always use 'indirect' since that's a fixture.
1227 # calls = _parametrize_calls(metafunc, calls, p_to_apply.union_fixture_name,
1228 # [selected_alternative], ids=[selected_id], scope=p_to_apply.scope,
1229 # indirect=True, **p_to_apply.kwargs)
1230 #
1231 # elif isinstance(p_to_apply, NormalParamz):
1232 # # ******** Normal parametrization **********
1233 # if _DEBUG:
1234 # print("[Partition %s] Applying parametrization for NORMAL %s"
1235 # "" % (p_idx, p_to_apply.argnames))
1236 #
1237 # calls = _parametrize_calls(metafunc, calls, p_to_apply.argnames, p_to_apply.argvalues,
1238 # indirect=p_to_apply.indirect, ids=p_to_apply.ids,
1239 # scope=p_to_apply.scope, **p_to_apply.kwargs)
1240 # else:
1241 # raise TypeError("Invalid parametrization type: %s" % p_to_apply.__class__)
1242 #
1243 # # Cleaning
1244 # for i in range(len(calls)):
1245 # c = calls[i]
1246 #
1247 # # B/ set to "not used" all parametrized fixtures that were not used in some branches
1248 # for fixture_name, p_to_apply in pending.items():
1249 # if fixture_name not in c.params and fixture_name not in c.funcargs:
1250 # # parametrize with a single "not used" value and discard the id
1251 # if isinstance(p_to_apply, UnionParamz):
1252 # c_with_dummy = _parametrize_calls(metafunc, [c], p_to_apply.union_fixture_name, [NOT_USED],
1253 # indirect=True, discard_id=True, scope=p_to_apply.scope,
1254 # **p_to_apply.kwargs)
1255 # else:
1256 # _nb_argnames = len(get_param_argnames_as_list(p_to_apply.argnames))
1257 # if _nb_argnames > 1:
1258 # _vals = [(NOT_USED,) * _nb_argnames]
1259 # else:
1260 # _vals = [NOT_USED]
1261 # c_with_dummy = _parametrize_calls(metafunc, [c], p_to_apply.argnames, _vals,
1262 # indirect=p_to_apply.indirect, discard_id=True,
1263 # scope=p_to_apply.scope, **p_to_apply.kwargs)
1264 # assert len(c_with_dummy) == 1
1265 # calls[i] = c_with_dummy[0]
1266 # c = calls[i]
1267 #
1268 # # C/ some non-parametrized fixtures may also need to be explicitly deactivated in some callspecs
1269 # # otherwise they will be setup/teardown.
1270 # #
1271 # # For this we use a dirty hack: we add a parameter with they name in the callspec, it seems to be propagated
1272 # # in the `request`. TODO is there a better way?
1273 # for fixture_name in super_closure.get_not_always_used():
1274 # try:
1275 # fixdef = metafunc._arg2fixturedefs[fixture_name] # noqa
1276 # except KeyError:
1277 # continue # dont raise any error here and instead let pytest say "not found"
1278 #
1279 # if fixture_name not in c.params and fixture_name not in c.funcargs:
1280 # if not super_closure.requires(fixture_name, p_idx):
1281 # # explicitly add it as discarded by creating a parameter value for it.
1282 # c.params[fixture_name] = NOT_USED
1283 # c.indices[fixture_name] = 1
1284 # c._arg2scopenum[fixture_name] = get_pytest_scopenum(fixdef[-1].scope) # noqa
1285 # else:
1286 # # explicitly add it as active by creating a parameter value for it.
1287 # c.params[fixture_name] = 'used'
1288 # c.indices[fixture_name] = 0
1289 # c._arg2scopenum[fixture_name] = get_pytest_scopenum(fixdef[-1].scope) # noqa
1290 #
1291 # return calls
1292  
1293  
1294 @property
1295 def id(self):
1296 # legacy _CallSpec2 id was filtering empty strings, we'll put it back on the class below
1297 # https://github.com/pytest-dev/pytest/blob/5.3.4/src/_pytest/python.py#L861
1298 return "-".join(map(str, filter(None, self._idlist)))
1299  
1300  
1301 def _parametrize_calls(metafunc, init_calls, argnames, argvalues, discard_id=False, indirect=False, ids=None,
1302 scope=None, **kwargs):
1303 """Parametrizes the initial `calls` with the provided information and returns the resulting new calls"""
1304  
1305 # make a backup so that we can restore the metafunc at the end
1306 bak = metafunc._calls # noqa
1307  
1308 # place the initial calls on the metafunc
1309 metafunc._calls = init_calls if init_calls is not None else []
1310  
1311 # parametrize the metafunc. Since we replaced the `parametrize` method on `metafunc` we have to call super
1312 metafunc.__class__.parametrize(metafunc, argnames, argvalues, indirect=indirect, ids=ids, scope=scope, **kwargs)
1313  
1314 # extract the result
1315 new_calls = metafunc._calls # noqa
1316  
1317 # If the user wants to discard the newly created id, remove the last id in all these callspecs in this node
1318 if discard_id:
1319 for callspec in new_calls:
1320 callspec._idlist.pop(-1) # noqa
1321  
1322 # restore the metafunc and return the new calls
1323 metafunc._calls = bak
1324 return new_calls
1325  
1326  
1327 def _process_node(metafunc,
1328 current_node, # type: FixtureClosureNode
1329 pending, # type: MutableMapping[str, Union[UnionParamz, NormalParamz]]
1330 calls # type: List[CallSpec2]
1331 ):
1332 """
1333 Routine to apply all the parametrization tasks in `pending` that are relevant to `current_node`,
1334 to `calls` (a list of pytest CallSpec2).
1335  
1336 It first applies all parametrization that correspond to current node (normal parameters),
1337 then applies the "split" parametrization if needed and recurses into each tree branch.
1338  
1339 It returns a tuple containing a list of calls and a list of same length containing which leaf node each one
1340 corresponds to.
1341  
1342 :param metafunc:
1343 :param current_node: the closure tree node we're focusing on
1344 :param pending: a list of parametrization orders to apply
1345 :param calls:
1346 :return: a tuple (calls, nodes) of two lists of the same length. So that for each CallSpec calls[i], you can see
1347 the corresponding leaf node in nodes[i]
1348 """
1349  
1350 # (1) first apply all **non-split** fixtures at this node = NORMAL PARAMETERS
1351 # in the order defined in the closure tree, do not trust the order of the received parametrize (`pending`)
1352 fixtures_at_this_node = [f for f in current_node.fixture_defs.keys()
1353 if f is not current_node.split_fixture_name]
1354 for fixturename in fixtures_at_this_node:
1355 try:
1356 # pop the corresponding parametrization from pending - do not trust the order
1357 p_to_apply = pending.pop(fixturename)
1358 except KeyError:
1359 # fixturename is not a parametrized fixture, nothing to do
1360 continue
1361 else:
1362 if isinstance(p_to_apply, UnionParamz):
1363 raise ValueError("This should not happen! Only Normal parameters should be in fixtures_at_this_node")
1364 elif isinstance(p_to_apply, NormalParamz):
1365 # ******** Normal parametrization **********
1366 if _DEBUG:
  • T001 Print found.
1367 print("[Node %s] Applying parametrization for NORMAL %s"
1368 "" % (current_node.to_str(with_children=False), p_to_apply.argnames))
1369  
1370 calls = _parametrize_calls(metafunc, calls, p_to_apply.argnames, p_to_apply.argvalues,
1371 indirect=p_to_apply.indirect, ids=p_to_apply.ids,
1372 scope=p_to_apply.scope, **p_to_apply.kwargs)
1373 else:
1374 raise TypeError("Invalid parametrization type: %s" % p_to_apply.__class__)
1375  
1376 # (2) then is there a "union" = a split between two sub-branches in the tree ?
1377 if not current_node.has_split():
1378 # No split = tree leaf: return
1379 nodes = [current_node] * len(calls)
1380 return calls, nodes
1381 else:
1382 # There is a **split** : apply its parametrization (a UNION parameter)
1383 try:
1384 # pop the corresponding parametrization from pending - do not trust the order
1385 p_to_apply = pending.pop(current_node.split_fixture_name)
1386 except KeyError:
1387 raise ValueError("This should not happen! fixture union parametrization missing, but this is a split node")
1388 else:
1389 if isinstance(p_to_apply, NormalParamz):
1390 raise ValueError("This should not happen! Split nodes correspond to Union parameters, not Normal ones.")
1391 elif isinstance(p_to_apply, UnionParamz):
1392 # ******** Union parametrization **********
1393 if _DEBUG:
  • T001 Print found.
1394 print("[Node %s] Applying parametrization for UNION %s"
1395 "" % (current_node.to_str(with_children=False), p_to_apply.union_fixture_name))
1396  
1397 # always use 'indirect' since that's a fixture.
1398 calls = _parametrize_calls(metafunc, calls, p_to_apply.union_fixture_name,
1399 p_to_apply.alternative_names, indirect=True,
1400 ids=p_to_apply.ids, scope=p_to_apply.scope, **p_to_apply.kwargs)
1401  
1402 # now move to the children
1403 nodes_children = [None] * len(calls)
1404 for i in range(len(calls)):
1405 active_alternative = calls[i].params[p_to_apply.union_fixture_name]
1406 child_indices = [_i for _i, x in enumerate(current_node.split_fixture_alternatives)
1407 if x == active_alternative.alternative_name]
1408 # only use the first matching child, since the subtrees are identical.
1409 child_node = current_node.children[child_indices[0]]
1410 child_pending = pending.copy()
1411  
1412 # place the children parameter in the first position if it is in the list
1413 # not needed anymore - already automatic
1414 # try:
1415 # child_pending.move_to_end(child_alternative, last=False)
1416 # except KeyError:
1417 # # not in the list: the child alternative is a non-parametrized fixture
1418 # pass
1419  
1420 calls[i], nodes_children[i] = _process_node(metafunc, child_node, child_pending, [calls[i]])
1421  
1422 # finally flatten the list if needed
1423 calls = flatten_list(calls)
1424 nodes_children = flatten_list(nodes_children)
1425 return calls, nodes_children
1426  
1427  
1428 # def _make_unique(lst):
1429 # _set = set()
1430 #
1431 # def _first_time_met(v):
1432 # if v not in _set:
1433 # _set.add(v)
1434 # return True
1435 # else:
1436 # return False
1437 #
1438 # return [v for v in lst if _first_time_met(v)]
1439  
1440  
1441 def flatten_list(lst):
1442 return [v for nested_list in lst for v in nested_list]
1443  
1444  
1445 def sort_according_to_ref_list(fixturenames, param_names):
1446 """
1447 Sorts items in the first list, according to their position in the second.
1448 Items that are not in the second list stay in the same position, the others are just swapped.
1449 A new list is returned.
1450  
1451 :param fixturenames:
1452 :param param_names:
1453 :return:
1454 """
1455 cur_indices = []
1456 for pname in param_names:
1457 try:
1458 cur_indices.append(fixturenames.index(pname))
1459 except (ValueError, IndexError):
1460 # can happen in case of indirect parametrization: a parameter is not in the fixture name.
1461 # TODO we should maybe rather add the pname to fixturenames in this case ?
1462 pass
1463 target_indices = sorted(cur_indices)
1464 sorted_fixturenames = list(fixturenames)
1465 for old_i, new_i in zip(cur_indices, target_indices):
1466 sorted_fixturenames[new_i] = fixturenames[old_i]
1467 return sorted_fixturenames
1468  
1469  
1470 _OPTION_NAME = 'with_reorder'
1471 _SKIP = 'skip'
1472 _NORMAL = 'normal'
1473 _OPTIONS = {
1474 _NORMAL: """(default) the usual reordering done by pytest to optimize setup/teardown of session- / module-
1475 / class- fixtures, as well as all the modifications made by other plugins (e.g. pytest-reorder)""",
1476 _SKIP: """skips *all* reordering, even the one done by pytest itself or installed plugins
1477 (e.g. pytest-reorder)"""
1478 }
1479  
1480  
1481 # @hookspec(historic=True)
1482 def pytest_addoption(parser):
1483 group = parser.getgroup('pytest-cases ordering', 'pytest-cases reordering options', after='general')
1484 help_str = """String specifying one of the reordering alternatives to use. Should be one of :
1485 - %s""" % ("\n - ".join(["%s: %s" % (k, v) for k, v in _OPTIONS.items()]))
1486 group.addoption(
1487 '--%s' % _OPTION_NAME.replace('_', '-'), type=str, default='normal', help=help_str
1488 )
1489  
1490  
1491 # will be loaded when the pytest_configure hook below is called
1492 PYTEST_CONFIG = None # type: Optional[Config]
1493  
1494  
1495 def pytest_load_initial_conftests(early_config):
1496 # store the received config object for future use; see #165 #166 #196
1497 global PYTEST_CONFIG
1498 PYTEST_CONFIG = early_config
1499  
1500  
1501 # @hookspec(historic=True)
1502 def pytest_configure(config):
1503 # validate the config
1504 allowed_values = ('normal', 'skip')
1505 reordering_choice = config.getoption(_OPTION_NAME)
1506 if reordering_choice not in allowed_values:
1507 raise ValueError("[pytest-cases] Wrong --%s option: %s. Allowed values: %s"
1508 "" % (_OPTION_NAME, reordering_choice, allowed_values))
1509  
1510  
1511 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
1512 def pytest_collection_modifyitems(session, config, items): # noqa
1513 """
1514 An alternative to the `reorder_items` function in fixtures.py
1515 (https://github.com/pytest-dev/pytest/blob/master/src/_pytest/fixtures.py#L209)
1516  
1517 We basically set back the previous order once the pytest ordering routine has completed.
1518  
1519 TODO we should set back an optimal ordering, but current PR https://github.com/pytest-dev/pytest/pull/3551
1520 will probably not be relevant to handle our "union" fixtures > need to integrate the NOT_USED markers in the method
1521  
1522 :param session:
1523 :param config:
1524 :param items:
1525 :return:
1526 """
1527 ordering_choice = config.getoption(_OPTION_NAME)
1528  
1529 if ordering_choice == _SKIP:
1530 # remember initial order
1531 initial_order = copy(items)
1532 yield
1533 # put back the initial order but keep the filter
1534 to_return = [None] * len(items)
1535 i = 0
1536 for item in initial_order:
1537 if item in items:
1538 to_return[i] = item
1539 i += 1
  • S101 Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.
1540 assert i == len(items)
1541 items[:] = to_return
1542  
1543 else:
1544 # do nothing
1545 yield
1546  
1547  
1548 @pytest.fixture
1549 def current_cases(request):
1550 """
1551 A fixture containing `get_current_cases(request)`
1552  
1553 This is a dictionary containing all case parameters for the currently active `pytest` item.
1554 For each test function argument parametrized using a `@parametrize_with_case(<argname>, ...)` this dictionary
1555 contains an entry `{<argname>: (case_id, case_function, case_params)}`. If several argnames are parametrized this
1556 way, a dedicated entry will be present for each argname. The tuple is a `namedtuple` containing
1557  
1558 - `id` a string containing the actual case id constructed by `@parametrize_with_cases`.
1559 - `function` the original case function.
1560 - `params` a dictionary, containing the parameters of the case, if itself is parametrized. Note that if the
1561 case is parametrized with `@parametrize_with_cases`, the associated parameter value in the dictionary will also be
1562 `(actual_id, case_function, case_params)`.
1563  
1564 If a fixture parametrized with cases is active, the dictionary will contain an entry `{<fixturename>: <dct>}` where
1565 `<dct>` is a dictionary `{<argname>: (case_id, case_function, case_params)}`.
1566  
1567 To get more information on a case function, you can use `get_case_marks(f)`, `get_case_tags(f)`.
1568 You can also use `matches_tag_query` to check if a case function matches some expectations either concerning its id
1569 or its tags. See https://smarie.github.io/python-pytest-cases/#filters-and-tags
1570 """
1571 return get_current_cases(request)