Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# Authors: Sylvain MARIE <sylvain.marie@se.com> 

2# + All contributors to <https://github.com/smarie/python-pytest-steps> 

3# 

4# License: 3-clause BSD, <https://github.com/smarie/python-pytest-steps/blob/master/LICENSE> 

5 

6# WARNING do not import pandas here: it should remain optional 

7# WARNING do not import pytest-harvest here: it should remain optional 

8 

9from .common_mini_six import string_types 

10from .steps import CROSS_STEPS_MARK 

11from .steps_harvest import _get_step_param_names_or_default, get_all_pytest_param_names_except_step_id, \ 

12 remove_step_from_test_id 

13 

14try: # type hints for python 3.5+ 

15 from typing import List, Any, Iterable, Union 

16except ImportError: 

17 pass 

18 

19 

20def pivot_steps_on_df(results_df, 

21 pytest_session=None, 

22 pytest_session_filter=None, # type: Any 

23 cross_steps_columns=None, # type: List[str] 

24 error_if_not_present=True # type: bool 

25 ): 

26 """ 

27 Pivots the dataframe so that there is one row per pytest_obj[params except step id] containing all steps info. 

28 The input dataframe should have a multilevel index with two levels (test id, step id) and with names 

29 (`results_df.index.names` should be set). The test id should be independent on the step id. 

30 

31 :param results_df: a synthesis dataframe created by `pytest-harvest`. 

32 :param pytest_session: If this is provided, the cross_steps_columns will be inferred from the pytest session 

33 information. (only one of pytest_session of cross_steps_columns should be provided). 

34 :param pytest_session_filter: if this is provided, the cross_steps_columns will be better inferred from the pytest 

35 session information, by only using the filtered elements. This has the same behaviour than `pytest-harvest` 

36 filters. 

37 :param cross_steps_columns: a list of columns in the dataframe that are stable across steps. Provide this only if 

38 the pytest session is not provided. 

39 :param error_if_not_present: a boolean (default True) indicating if the function should raise an error if a name 

40 provided in `cross_steps_columns` is not present in the dataframe. 

41 :return: 

42 """ 

43 # check params 

44 if pytest_session is not None and cross_steps_columns is not None: 44 ↛ 45line 44 didn't jump to line 45, because the condition on line 44 was never true

45 raise ValueError("Only one of `pytest_session` and `cross_steps_columns` should be provided") 

46 

47 # auto-extract from session 

48 if pytest_session is not None: 48 ↛ 57line 48 didn't jump to line 57, because the condition on line 48 was never false

49 # Gather all names of columns that we know are cross-steps 

50 pytest_other_names = ['pytest_obj'] 

51 param_names = get_all_pytest_param_names_except_step_id(pytest_session, filter=pytest_session_filter) 

52 fixture_names = get_all_cross_steps_fixture_names(pytest_session, filter=pytest_session_filter) 

53 cross_steps_columns = pytest_other_names + param_names + fixture_names 

54 error_if_not_present = False 

55 

56 # check column names provided or guessed from session 

57 non_present = set(cross_steps_columns) - set(results_df.columns) 

58 if error_if_not_present and len(non_present) > 0: 58 ↛ 59line 58 didn't jump to line 59, because the condition on line 58 was never true

59 raise ValueError("Columns %s are not present in the provided dataframe. If this is normal set " 

60 "`error_if_not_present=False`. Available columns: %s" 

61 "" % (non_present, list(results_df.columns))) 

62 cross_steps_cols_list = list(c for c in results_df.columns if c in cross_steps_columns) 

63 

64 # extract the names of the two index levels 

65 test_id_name, step_id_name = results_df.index.names 

66 

67 # remember the original steps order 

68 all_step_ids = results_df.index.get_level_values(step_id_name).unique() 

69 

70 # split the df in two parts: the columns that do not depend on steps and the ones that have one value per step 

71 # --these do not depend on steps 

72 remaining_df = (results_df[cross_steps_cols_list] 

73 .reset_index() 

74 .set_index(test_id_name) 

75 .drop(step_id_name, axis=1)) 

76 remaining_df.drop_duplicates(inplace=True) 

77 

78 if remaining_df.index.has_duplicates: 78 ↛ 79line 78 didn't jump to line 79, because the condition on line 78 was never true

79 raise ValueError("At least one of the columns listed in '%s' varies across steps." % cross_steps_cols_list) 

80 

81 # --these depend on steps 

82 one_per_step_df = results_df.drop(cross_steps_columns, axis=1, errors='raise' if error_if_not_present else 'ignore') 

83 

84 # perform the pivot operation, and clean up (change levels order, drop nan columns, sort columns) 

85 # Note: pandas 'pivot' does not work with multiindex in many pandas versions so we use "unstack" instead 

86 one_per_step_df = one_per_step_df.unstack(step_id_name) \ 

87 .reorder_levels([1, 0], axis=1) \ 

88 .dropna(axis=1, how='all') \ 

89 .reindex(columns=all_step_ids, level=0) 

90 

91 # join the two 

92 return remaining_df.join(one_per_step_df) 

93 

94 

95def flatten_multilevel_columns(df, 

96 sep='/' # type: str 

97 ): 

98 """ 

99 Replaces the multilevel columns (typically after a pivot) with single-level ones, where the names contain all 

100 levels concatenated with the separator `sep`. For example when the two levels are `foo` and `bar`, the single level 

101 becomes `foo/bar`. 

102 

103 This method is a shortcut for `df.columns = get_flattened_multilevel_columns(df)`. 

104 

105 :param df: 

106 :param sep: 

107 :return: 

108 """ 

109 df.columns = get_flattened_multilevel_columns(df, sep=sep) 

110 return df 

111 

112 

113def get_flattened_multilevel_columns(df, 

114 sep='/' # type: str 

115 ): 

116 """ 

117 Creates new column names for the provided dataframe so that it does not have multilevel columns anymore. 

118 For columns with multi levels, the levels are all flatten into a single string column name with separator `sep`. 

119 

120 You should use the result as the new column names: 

121 

122 `df.columns = get_flattened_multilevel_columns(df)` 

123 

124 :param df: 

125 :param sep: the separator to use when joining the names of several levels into one unique name 

126 :return: 

127 """ 

128 def flatten_multilevel_colname(col_level_names): 

129 if isinstance(col_level_names, string_types): 

130 return col_level_names 

131 else: 

132 try: 

133 return sep.join([str(c) for c in col_level_names]) 

134 except TypeError: 

135 # not iterable not string: return as is 

136 return col_level_names 

137 

138 return [flatten_multilevel_colname(cols) for cols in df.columns.values] 

139 

140 

141def handle_steps_in_results_df(results_df, 

142 raise_if_one_test_without_step_id=False, # type: bool 

143 no_step_id='-', # type: str 

144 step_param_names=None, # type: Union[str, Iterable[str]] 

145 keep_orig_id=True, # type: bool 

146 no_steps_policy='raise', # type: str 

147 inplace=False 

148 ): 

149 """ 

150 Equivalent of `handle_steps_in_results_dct` 

151 

152 Improves the synthesis dataframe so that 

153 - the test_id index is replaced with a multilevel index (new_test_id, step_id) where new_test_id is a 

154 step-independent test id. A 'pytest_id' column remains with the original id except if keep_orig_id=False 

155 (default=True) 

156 - the 'step_id' parameter is removed from the contents 

157 

158 The step id is identified by looking at the columns, and finding one with a name included in the 

159 `step_param_names` list (`None` uses the default names). If no step id is found on an entry, it is replaced with 

160 the value of `no_step_id` except if `raise_if_one_test_without_step_id=True` - in which case an error is raised. 

161 

162 If all step ids are missing, for all entries in the dictionary, `no_steps_policy` determines what happens: it can 

163 either skip the whole function and return a copy of the input ('skip', or behave as usual ('ignore'), or raise an 

164 error ('raise'). 

165 

166 If `keep_orig_id` is set to True (default), the original id is added as a new column. 

167 

168 If `inplace` is `False` (default), a new dataframe will be returned. Otherwise the input dataframe will 

169 be modified inplace and nothing will be returned. 

170 

171 :param results_df: 

172 :param raise_if_one_test_without_step_id: if this is set to `True` and at least one step id can not be found in the 

173 tests, an error will be raised. By default this is set to `False`: in that case, when the step id is not found 

174 it is replaced with value of the `no_step_id` parameter. 

175 :param no_step_id: the identifier to use when the step id is not found (if `raise_if_no_step_id` is `False`) 

176 :param step_param_names: a singleton or iterable containing the names of the test step parameters used in the 

177 tests. By default the list is `[GENERATOR_MODE_STEP_ARGNAME, TEST_STEP_ARGNAME_DEFAULT]` to cover both 

178 generator-mode and legacy manual mode. 

179 :param keep_orig_id: if True (default) the original test id will appear in the df under 'pytest_id' column 

180 :param no_steps_policy: if `'ignore` the returned dataframe will be multilevel (test id, step id) in all 

181 cases, even if no step is present. If 'skip' and no step is present, the method will not modify anything in the 

182 dataframe. If 'raise' (default) and no step column is present, an error is raised. 

183 :param inplace: if this is `False` (default), a new dataframe will be returned. Otherwise the input dataframe will 

184 be modified inplace and None will be returned 

185 :return: 

186 """ 

187 import pandas as pd 

188 

189 # validate parameters 

190 step_param_names = _get_step_param_names_or_default(step_param_names) 

191 if not isinstance(no_steps_policy, str): 191 ↛ 193line 191 didn't jump to line 193, because the condition on line 191 was never true

192 # python 2 compatibility: unicode literals 

193 no_steps_policy = str(no_steps_policy) 

194 if no_steps_policy not in {'ignore', 'raise', 'skip'}: 194 ↛ 195line 194 didn't jump to line 195, because the condition on line 194 was never true

195 raise ValueError("`no_steps_policy` should be one of {'ignore', 'raise', 'skip'}") 

196 

197 if not inplace: 197 ↛ 201line 197 didn't jump to line 201, because the condition on line 197 was never false

198 results_df = results_df.copy() 

199 

200 # find the unique column containing "step id" parameter 

201 step_name_columns = set(step_param_names).intersection(set(results_df.columns)) 

202 if len(step_name_columns) == 1: 202 ↛ 204line 202 didn't jump to line 204, because the condition on line 202 was never false

203 step_name_col = step_name_columns.pop() 

204 elif len(step_name_columns) == 0: 

205 if no_steps_policy == 'raise': 

206 raise ValueError("The synthesis dataframe provided does not seem to contain step name columns. You can " 

207 "ignore this error by switching to `no_steps_policy`='ignore'. Available " 

208 "columns: %s" % list(results_df.columns)) 

209 elif no_steps_policy == 'skip': 

210 if inplace: 

211 return 

212 else: 

213 return results_df 

214 else: 

215 # no steps column - create one with only none values 

216 step_name_col = '__step_id' 

217 results_df[step_name_col] = None 

218 else: 

219 raise ValueError("The synthesis dataframe provided contains several 'step name' columns: %s" 

220 "" % step_name_columns) 

221 

222 # check that the column has at least one non-null value 

223 null_steps_indexer = results_df[step_name_col].isnull() 

224 nb_without_step_id = null_steps_indexer.sum() 

225 if nb_without_step_id > 0: 

226 if raise_if_one_test_without_step_id: 226 ↛ 227line 226 didn't jump to line 227, because the condition on line 226 was never true

227 raise ValueError("The synthesis DataFrame provided does not seem to contain step name parameters for " 

228 "test nodes %s" % list(results_df.loc[null_steps_indexer, 'pytest_id'])) 

229 # elif nb_without_step_id == len(df): 

230 # # no test has steps, simply return without change 

231 # if inplace: 

232 # return 

233 # else: 

234 # return df 

235 else: 

236 # replace missing values with `no_step_id` 

237 results_df.loc[null_steps_indexer, step_name_col] = no_step_id 

238 

239 # original test id column is the current index 

240 results_df.index.name = 'pytest_id' 

241 results_df.reset_index(inplace=True) 

242 

243 # split the id in two and use it as multiindex 

244 def _remove_step_from_test_id(s): 

245 test_id, step_id = s 

246 if pd.isnull(step_id): 246 ↛ 247line 246 didn't jump to line 247, because the condition on line 246 was never true

247 step_id = no_step_id 

248 return remove_step_from_test_id(test_id, step_id) 

249 results_df['test_id'] = results_df[['pytest_id', step_name_col]].apply(_remove_step_from_test_id, axis=1, raw=True) 

250 results_df.rename(columns={step_name_col: 'step_id'}, inplace=True) 

251 results_df.set_index(['test_id', 'step_id'], inplace=True) 

252 

253 # drop original id if required 

254 if not keep_orig_id: 254 ↛ 258line 254 didn't jump to line 258, because the condition on line 254 was never false

255 results_df.drop('pytest_id', axis=1, inplace=True) 

256 

257 # return 

258 if not inplace: 258 ↛ exitline 258 didn't return from function 'handle_steps_in_results_df', because the condition on line 258 was never false

259 return results_df 

260 

261 

262def get_all_cross_steps_fixture_names(pytest_session, filter=None): 

263 """ 

264 Returns a list of all fixtures used in the session, filtered so as to only use 

265 :param pytest_session: 

266 :return: 

267 """ 

268 try: 

269 from pytest_harvest import get_all_pytest_fixture_names 

270 fixture_names = get_all_pytest_fixture_names(pytest_session, 

271 filter=filter) 

272 returned_set = set() 

273 for name in fixture_names: 

274 all_fixtures = pytest_session._fixturemanager._arg2fixturedefs[name] 

275 

276 for f in all_fixtures: 

277 # get the fixture function 

278 fixture_function = f.func 

279 

280 # if it is cross-steps, add its name 

281 if hasattr(fixture_function, CROSS_STEPS_MARK): 

282 returned_set.add(name) 

283 break 

284 

285 return list(returned_set) 

286 

287 except ImportError: 

288 raise ImportError("pytest-harvest>=1.0.0 is required to use " 

289 "`get_all_cross_steps_fixture_names`")