Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Authors: Sylvain MARIE <sylvain.marie@se.com>
2# + All contributors to <https://github.com/smarie/python-pytest-steps>
3#
4# License: 3-clause BSD, <https://github.com/smarie/python-pytest-steps/blob/master/LICENSE>
6# WARNING do not import pandas here: it should remain optional
7# WARNING do not import pytest-harvest here: it should remain optional
9from .common_mini_six import string_types
10from .steps import CROSS_STEPS_MARK
11from .steps_harvest import _get_step_param_names_or_default, get_all_pytest_param_names_except_step_id, \
12 remove_step_from_test_id
14try: # type hints for python 3.5+
15 from typing import List, Any, Iterable, Union
16except ImportError:
17 pass
20def pivot_steps_on_df(results_df,
21 pytest_session=None,
22 pytest_session_filter=None, # type: Any
23 cross_steps_columns=None, # type: List[str]
24 error_if_not_present=True # type: bool
25 ):
26 """
27 Pivots the dataframe so that there is one row per pytest_obj[params except step id] containing all steps info.
28 The input dataframe should have a multilevel index with two levels (test id, step id) and with names
29 (`results_df.index.names` should be set). The test id should be independent on the step id.
31 :param results_df: a synthesis dataframe created by `pytest-harvest`.
32 :param pytest_session: If this is provided, the cross_steps_columns will be inferred from the pytest session
33 information. (only one of pytest_session of cross_steps_columns should be provided).
34 :param pytest_session_filter: if this is provided, the cross_steps_columns will be better inferred from the pytest
35 session information, by only using the filtered elements. This has the same behaviour than `pytest-harvest`
36 filters.
37 :param cross_steps_columns: a list of columns in the dataframe that are stable across steps. Provide this only if
38 the pytest session is not provided.
39 :param error_if_not_present: a boolean (default True) indicating if the function should raise an error if a name
40 provided in `cross_steps_columns` is not present in the dataframe.
41 :return:
42 """
43 # check params
44 if pytest_session is not None and cross_steps_columns is not None: 44 ↛ 45line 44 didn't jump to line 45, because the condition on line 44 was never true
45 raise ValueError("Only one of `pytest_session` and `cross_steps_columns` should be provided")
47 # auto-extract from session
48 if pytest_session is not None: 48 ↛ 57line 48 didn't jump to line 57, because the condition on line 48 was never false
49 # Gather all names of columns that we know are cross-steps
50 pytest_other_names = ['pytest_obj']
51 param_names = get_all_pytest_param_names_except_step_id(pytest_session, filter=pytest_session_filter)
52 fixture_names = get_all_cross_steps_fixture_names(pytest_session, filter=pytest_session_filter)
53 cross_steps_columns = pytest_other_names + param_names + fixture_names
54 error_if_not_present = False
56 # check column names provided or guessed from session
57 non_present = set(cross_steps_columns) - set(results_df.columns)
58 if error_if_not_present and len(non_present) > 0: 58 ↛ 59line 58 didn't jump to line 59, because the condition on line 58 was never true
59 raise ValueError("Columns %s are not present in the provided dataframe. If this is normal set "
60 "`error_if_not_present=False`. Available columns: %s"
61 "" % (non_present, list(results_df.columns)))
62 cross_steps_cols_list = list(c for c in results_df.columns if c in cross_steps_columns)
64 # extract the names of the two index levels
65 test_id_name, step_id_name = results_df.index.names
67 # remember the original steps order
68 all_step_ids = results_df.index.get_level_values(step_id_name).unique()
70 # split the df in two parts: the columns that do not depend on steps and the ones that have one value per step
71 # --these do not depend on steps
72 remaining_df = (results_df[cross_steps_cols_list]
73 .reset_index()
74 .set_index(test_id_name)
75 .drop(step_id_name, axis=1))
76 remaining_df.drop_duplicates(inplace=True)
78 if remaining_df.index.has_duplicates: 78 ↛ 79line 78 didn't jump to line 79, because the condition on line 78 was never true
79 raise ValueError("At least one of the columns listed in '%s' varies across steps." % cross_steps_cols_list)
81 # --these depend on steps
82 one_per_step_df = results_df.drop(cross_steps_columns, axis=1, errors='raise' if error_if_not_present else 'ignore')
84 # perform the pivot operation, and clean up (change levels order, drop nan columns, sort columns)
85 # Note: pandas 'pivot' does not work with multiindex in many pandas versions so we use "unstack" instead
86 one_per_step_df = one_per_step_df.unstack(step_id_name) \
87 .reorder_levels([1, 0], axis=1) \
88 .dropna(axis=1, how='all') \
89 .reindex(columns=all_step_ids, level=0)
91 # join the two
92 return remaining_df.join(one_per_step_df)
95def flatten_multilevel_columns(df,
96 sep='/' # type: str
97 ):
98 """
99 Replaces the multilevel columns (typically after a pivot) with single-level ones, where the names contain all
100 levels concatenated with the separator `sep`. For example when the two levels are `foo` and `bar`, the single level
101 becomes `foo/bar`.
103 This method is a shortcut for `df.columns = get_flattened_multilevel_columns(df)`.
105 :param df:
106 :param sep:
107 :return:
108 """
109 df.columns = get_flattened_multilevel_columns(df, sep=sep)
110 return df
113def get_flattened_multilevel_columns(df,
114 sep='/' # type: str
115 ):
116 """
117 Creates new column names for the provided dataframe so that it does not have multilevel columns anymore.
118 For columns with multi levels, the levels are all flatten into a single string column name with separator `sep`.
120 You should use the result as the new column names:
122 `df.columns = get_flattened_multilevel_columns(df)`
124 :param df:
125 :param sep: the separator to use when joining the names of several levels into one unique name
126 :return:
127 """
128 def flatten_multilevel_colname(col_level_names):
129 if isinstance(col_level_names, string_types):
130 return col_level_names
131 else:
132 try:
133 return sep.join([str(c) for c in col_level_names])
134 except TypeError:
135 # not iterable not string: return as is
136 return col_level_names
138 return [flatten_multilevel_colname(cols) for cols in df.columns.values]
141def handle_steps_in_results_df(results_df,
142 raise_if_one_test_without_step_id=False, # type: bool
143 no_step_id='-', # type: str
144 step_param_names=None, # type: Union[str, Iterable[str]]
145 keep_orig_id=True, # type: bool
146 no_steps_policy='raise', # type: str
147 inplace=False
148 ):
149 """
150 Equivalent of `handle_steps_in_results_dct`
152 Improves the synthesis dataframe so that
153 - the test_id index is replaced with a multilevel index (new_test_id, step_id) where new_test_id is a
154 step-independent test id. A 'pytest_id' column remains with the original id except if keep_orig_id=False
155 (default=True)
156 - the 'step_id' parameter is removed from the contents
158 The step id is identified by looking at the columns, and finding one with a name included in the
159 `step_param_names` list (`None` uses the default names). If no step id is found on an entry, it is replaced with
160 the value of `no_step_id` except if `raise_if_one_test_without_step_id=True` - in which case an error is raised.
162 If all step ids are missing, for all entries in the dictionary, `no_steps_policy` determines what happens: it can
163 either skip the whole function and return a copy of the input ('skip', or behave as usual ('ignore'), or raise an
164 error ('raise').
166 If `keep_orig_id` is set to True (default), the original id is added as a new column.
168 If `inplace` is `False` (default), a new dataframe will be returned. Otherwise the input dataframe will
169 be modified inplace and nothing will be returned.
171 :param results_df:
172 :param raise_if_one_test_without_step_id: if this is set to `True` and at least one step id can not be found in the
173 tests, an error will be raised. By default this is set to `False`: in that case, when the step id is not found
174 it is replaced with value of the `no_step_id` parameter.
175 :param no_step_id: the identifier to use when the step id is not found (if `raise_if_no_step_id` is `False`)
176 :param step_param_names: a singleton or iterable containing the names of the test step parameters used in the
177 tests. By default the list is `[GENERATOR_MODE_STEP_ARGNAME, TEST_STEP_ARGNAME_DEFAULT]` to cover both
178 generator-mode and legacy manual mode.
179 :param keep_orig_id: if True (default) the original test id will appear in the df under 'pytest_id' column
180 :param no_steps_policy: if `'ignore` the returned dataframe will be multilevel (test id, step id) in all
181 cases, even if no step is present. If 'skip' and no step is present, the method will not modify anything in the
182 dataframe. If 'raise' (default) and no step column is present, an error is raised.
183 :param inplace: if this is `False` (default), a new dataframe will be returned. Otherwise the input dataframe will
184 be modified inplace and None will be returned
185 :return:
186 """
187 import pandas as pd
189 # validate parameters
190 step_param_names = _get_step_param_names_or_default(step_param_names)
191 if not isinstance(no_steps_policy, str): 191 ↛ 193line 191 didn't jump to line 193, because the condition on line 191 was never true
192 # python 2 compatibility: unicode literals
193 no_steps_policy = str(no_steps_policy)
194 if no_steps_policy not in {'ignore', 'raise', 'skip'}: 194 ↛ 195line 194 didn't jump to line 195, because the condition on line 194 was never true
195 raise ValueError("`no_steps_policy` should be one of {'ignore', 'raise', 'skip'}")
197 if not inplace: 197 ↛ 201line 197 didn't jump to line 201, because the condition on line 197 was never false
198 results_df = results_df.copy()
200 # find the unique column containing "step id" parameter
201 step_name_columns = set(step_param_names).intersection(set(results_df.columns))
202 if len(step_name_columns) == 1: 202 ↛ 204line 202 didn't jump to line 204, because the condition on line 202 was never false
203 step_name_col = step_name_columns.pop()
204 elif len(step_name_columns) == 0:
205 if no_steps_policy == 'raise':
206 raise ValueError("The synthesis dataframe provided does not seem to contain step name columns. You can "
207 "ignore this error by switching to `no_steps_policy`='ignore'. Available "
208 "columns: %s" % list(results_df.columns))
209 elif no_steps_policy == 'skip':
210 if inplace:
211 return
212 else:
213 return results_df
214 else:
215 # no steps column - create one with only none values
216 step_name_col = '__step_id'
217 results_df[step_name_col] = None
218 else:
219 raise ValueError("The synthesis dataframe provided contains several 'step name' columns: %s"
220 "" % step_name_columns)
222 # check that the column has at least one non-null value
223 null_steps_indexer = results_df[step_name_col].isnull()
224 nb_without_step_id = null_steps_indexer.sum()
225 if nb_without_step_id > 0:
226 if raise_if_one_test_without_step_id: 226 ↛ 227line 226 didn't jump to line 227, because the condition on line 226 was never true
227 raise ValueError("The synthesis DataFrame provided does not seem to contain step name parameters for "
228 "test nodes %s" % list(results_df.loc[null_steps_indexer, 'pytest_id']))
229 # elif nb_without_step_id == len(df):
230 # # no test has steps, simply return without change
231 # if inplace:
232 # return
233 # else:
234 # return df
235 else:
236 # replace missing values with `no_step_id`
237 results_df.loc[null_steps_indexer, step_name_col] = no_step_id
239 # original test id column is the current index
240 results_df.index.name = 'pytest_id'
241 results_df.reset_index(inplace=True)
243 # split the id in two and use it as multiindex
244 def _remove_step_from_test_id(s):
245 test_id, step_id = s
246 if pd.isnull(step_id): 246 ↛ 247line 246 didn't jump to line 247, because the condition on line 246 was never true
247 step_id = no_step_id
248 return remove_step_from_test_id(test_id, step_id)
249 results_df['test_id'] = results_df[['pytest_id', step_name_col]].apply(_remove_step_from_test_id, axis=1, raw=True)
250 results_df.rename(columns={step_name_col: 'step_id'}, inplace=True)
251 results_df.set_index(['test_id', 'step_id'], inplace=True)
253 # drop original id if required
254 if not keep_orig_id: 254 ↛ 258line 254 didn't jump to line 258, because the condition on line 254 was never false
255 results_df.drop('pytest_id', axis=1, inplace=True)
257 # return
258 if not inplace: 258 ↛ exitline 258 didn't return from function 'handle_steps_in_results_df', because the condition on line 258 was never false
259 return results_df
262def get_all_cross_steps_fixture_names(pytest_session, filter=None):
263 """
264 Returns a list of all fixtures used in the session, filtered so as to only use
265 :param pytest_session:
266 :return:
267 """
268 try:
269 from pytest_harvest import get_all_pytest_fixture_names
270 fixture_names = get_all_pytest_fixture_names(pytest_session,
271 filter=filter)
272 returned_set = set()
273 for name in fixture_names:
274 all_fixtures = pytest_session._fixturemanager._arg2fixturedefs[name]
276 for f in all_fixtures:
277 # get the fixture function
278 fixture_function = f.func
280 # if it is cross-steps, add its name
281 if hasattr(fixture_function, CROSS_STEPS_MARK):
282 returned_set.add(name)
283 break
285 return list(returned_set)
287 except ImportError:
288 raise ImportError("pytest-harvest>=1.0.0 is required to use "
289 "`get_all_cross_steps_fixture_names`")