Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# Authors: Sylvain MARIE <sylvain.marie@se.com> 

2# + All contributors to <https://github.com/smarie/python-azureml-client> 

3# 

4# License: 3-clause BSD, <https://github.com/smarie/python-azureml-client/blob/master/LICENSE> 

5from __future__ import print_function 

6import csv 

7import json 

8import sys 

9from collections import OrderedDict 

10from datetime import datetime 

11from io import BytesIO # to handle byte strings 

12from io import StringIO # to handle unicode strings 

13from math import isnan 

14 

15try: # python 3.5+ 

16 from typing import Dict, Union, List, Any, Tuple 

17 

18 # a few predefined type hints 

19 SwaggerModeAzmlTable = List[Dict[str, Any]] 

20 NonSwaggerModeAzmlTable = Dict[str, Union[List[str], List[List[Any]]]] 

21 AzmlTable = Union[SwaggerModeAzmlTable, NonSwaggerModeAzmlTable] 

22 AzmlOutputTable = Dict[str, Union[str, AzmlTable]] 

23except ImportError: 

24 pass 

25 

26import numpy as np 

27import pandas 

28import requests 

29from valid8 import validate 

30 

31 

32try: 

33 from csv import unix_dialect 

34except ImportError: 

35 # noinspection PyPep8Naming,SpellCheckingInspection 

36 class unix_dialect(csv.Dialect): 

37 """Describe the usual properties of Unix-generated CSV files.""" 

38 delimiter = ',' 

39 quotechar = '"' 

40 doublequote = True 

41 skipinitialspace = False 

42 lineterminator = '\n' 

43 quoting = csv.QUOTE_ALL 

44 csv.register_dialect("unix", unix_dialect) 

45 

46if sys.version_info >= (3, 0): 46 ↛ 53line 46 didn't jump to line 53, because the condition on line 46 was never false

47 def create_dest_buffer_for_csv(): 

48 return StringIO(newline='') 

49 

50 def create_reading_buffer(value): 

51 return StringIO(value) 

52else: 

53 def create_dest_buffer_for_csv(): 

54 return BytesIO() # StringIO(newline='') 

55 

56 def create_reading_buffer(value): 

57 return BytesIO(value) 

58 

59 

60class AzmlException(Exception): 

61 """ 

62 Represents an AzureMl exception, built from an HTTP error body received from AzureML. 

63 Once constructed from an HTTPError, the error details appear in the exception fields. 

64 """ 

65 

66 def __init__(self, 

67 http_error # type: requests.exceptions.HTTPError 

68 ): 

69 """ 

70 Constructor from an http error received from `requests`. 

71 

72 :param http_error: 

73 """ 

74 # extract error contents from http json body 

75 json_error = http_error.response.text 

76 error_as_dict = json_to_azmltable(json_error) 

77 

78 # main error elements 

79 try: 

80 self.error_dict = error_as_dict['error'] 

81 # noinspection PyTypeChecker 

82 self.error_code = self.error_dict['code'] 

83 # noinspection PyTypeChecker 

84 self.error_message = self.error_dict['message'] 

85 # try: 

86 self.details = self.error_dict['details'] 

87 # except KeyError: 

88 # # legacy format ? 

89 # self.details = error_as_dict['details'] 

90 except KeyError: 

91 raise ValueError("Unrecognized format for AzureML http error. JSON content is :\n %s" % error_as_dict) 

92 

93 # create the message based on contents 

94 try: 

95 details_dict = error_as_dict['details'][0] 

96 # noinspection PyTypeChecker 

97 details_code = details_dict['code'] 

98 # noinspection PyTypeChecker 

99 details_msg = details_dict['message'] 

100 except (IndexError, KeyError): 

101 msg = 'Error [%s]: %s' % (self.error_code, self.error_message) 

102 else: 

103 msg = 'Error [%s][%s]: %s. %s' % (self.error_code, details_code, self.error_message, details_msg) 

104 

105 # finally call super 

106 super(AzmlException, self).__init__(msg) 

107 

108 def __str__(self): 

109 # if 'error' in self.__errorAsDict: 

110 # # this is an azureML standard error 

111 # if self.__errorAsDict['error']['code'] == 'LibraryExecutionError': 

112 # if self.__errorAsDict['error']['details'][0]['code'] == 'TableSchemaColumnCountMismatch': 

113 # return 'Dynamic schema validation is not supported in Request-Response mode, you should maybe 

114 # use the BATCH response mode by setting useBatchMode to true in python' 

115 return json.dumps(self.error_dict, indent=4) 

116 

117 

118def df_to_csv(df, # type: pandas.DataFrame 

119 df_name=None, # type: str 

120 charset=None # type: str 

121 ): 

122 # type: (...) -> str 

123 """ 

124 Converts the provided DataFrame to a csv, typically to store it on blob storage for Batch AzureML calls. 

125 WARNING: datetime columns are converted in ISO format but the milliseconds are ignored and set to zero. 

126 

127 :param df: 

128 :param df_name: the name of the DataFrame, for error messages 

129 :param charset: the charset to use for encoding 

130 :return: 

131 """ 

132 validate(df_name, df, instance_of=pandas.DataFrame) 

133 

134 # TODO what about timezone detail if not present, will the %z be ok ? 

135 return df.to_csv(path_or_buf=None, sep=',', decimal='.', na_rep='', encoding=charset, 

136 index=False, date_format='%Y-%m-%dT%H:%M:%S.000%z') 

137 

138 

139def dfs_to_csvs(dfs, # type: Dict[str, pandas.DataFrame] 

140 charset=None # type: str 

141 ): 

142 # type: (...) -> Dict[str, str] 

143 """ 

144 Converts each of the DataFrames in the provided dictionary to a csv, typically to store it on blob storage for 

145 Batch AzureML calls. All CSV are returned in a dictionary with the same keys. 

146 

147 WARNING: datetime columns are converted in ISO format but the milliseconds are ignored and set to zero. 

148 See `df_to_csv` for details 

149 

150 :param dfs: a dictionary containing input names and input content (each input content is a DataFrame) 

151 :param charset: the charset to use for csv encoding 

152 :return: a dictionary containing the string representations of the Csv inputs to store on the blob storage 

153 """ 

154 validate('dfs', dfs, instance_of=dict) 

155 

156 return {input_name: df_to_csv(inputDf, df_name=input_name, charset=charset) 

157 for input_name, inputDf in dfs.items()} 

158 

159 

160def csv_to_df(csv_buffer_or_str_or_filepath, # type: Union[str, StringIO, BytesIO] 

161 csv_name=None # type: str 

162 ): 

163 # type: (...) -> pandas.DataFrame 

164 """ 

165 Converts the provided csv to a DatFrame, typically to read it from blob storage for Batch AzureML calls. 

166 Helper method to ensure consistent reading in particular for timezones and datetime parsing 

167 

168 :param csv_buffer_or_str_or_filepath: 

169 :param csv_name: the name of the DataFrame, for error messages 

170 :return: 

171 """ 

172 validate(csv_name, csv_buffer_or_str_or_filepath) 

173 

174 # pandas does not accept string. create a buffer 

175 if isinstance(csv_buffer_or_str_or_filepath, str): 

176 csv_buffer_or_str_or_filepath = create_reading_buffer(csv_buffer_or_str_or_filepath) 

177 

178 # read without parsing dates 

179 res = pandas.read_csv(csv_buffer_or_str_or_filepath, sep=',', decimal='.') # infer_dt_format=True, parse_dates=[0] 

180 

181 # -- try to infer datetime columns 

182 convert_all_datetime_columns(res) 

183 

184 # -- additionally we automatically configure the timezone as UTC 

185 localize_all_datetime_columns(res) 

186 

187 return res 

188 

189 

190def csvs_to_dfs(csv_dict # type: Dict[str, str] 

191 ): 

192 # type: (...) -> Dict[str, pandas.DataFrame] 

193 """ 

194 Helper method to read CSVs compliant with AzureML web service BATCH inputs/outputs, into a dictionary of DataFrames 

195 

196 :param csv_dict: 

197 :return: 

198 """ 

199 validate('csv_dict', csv_dict, instance_of=dict) 

200 

201 return {input_name: csv_to_df(inputCsv, csv_name=input_name) 

202 for input_name, inputCsv in csv_dict.items()} 

203 

204 

205def df_to_azmltable(df, # type: pandas.DataFrame 

206 table_name=None, # type: str 

207 swagger_format=False, # type: bool 

208 mimic_azml_output=False, # type: bool 

209 replace_NaN_with=None, # type: Any 

210 replace_NaT_with=None, # type: Any 

211 ): 

212 # type: (...) -> Union[AzmlTable, AzmlOutputTable] 

213 """ 

214 Converts the provided DataFrame to a dictionary or list in the same format than the JSON expected by AzureML in 

215 the Request-Response services. Note that contents are kept as is (values are not converted to string yet) 

216 

217 :param df: the DataFrame to convert 

218 :param table_name: the table name for error messages 

219 :param swagger_format: a boolean (default: False) indicating if the swagger format should be used (more verbose). 

220 :param mimic_azml_output: set this to True if the result should be wrapped in a dictionary like AzureML outputs. 

221 This is typically needed if you wish to mimic an AzureML web service's behaviour, for a mock web server. 

222 :return: 

223 """ 

224 validate(table_name, df, instance_of=pandas.DataFrame) 

225 

226 # only 2-dimensions tables are supported 

227 validate("%s_nb_dimensions" % table_name, len(df.shape), equals=2, 

228 help_msg="Only 2-dimensional tables are supported for AzureML format conversion.") 

229 

230 if mimic_azml_output: 

231 # use this method recursively, in 'not output' mode 

232 return {'type': 'table', 'value': df_to_azmltable(df, table_name=table_name, swagger_format=swagger_format, 

233 replace_NaN_with=replace_NaN_with, 

234 replace_NaT_with=replace_NaT_with)} 

235 else: 

236 col_names = df.columns.values.tolist() 

237 

238 # Convert the table entries to json-able format. 

239 if swagger_format: 

240 # swagger mode: the table is a list of object rows 

241 

242 def _get_item_in_df(df, col_name, row_idx): 

243 """ Internal routine to convert all possible items to python primitive by asking numpy if possible. 

244 Pandas types do not support it so return 'as is' then""" 

245 cell = df[col_name].iloc[row_idx] 

246 try: 

247 return cell.item() 

248 except AttributeError: 

249 return cell 

250 

251 return [OrderedDict([(col_name, to_jsonable_primitive(_get_item_in_df(df, col_name, i), 

252 replace_NaN_with=replace_NaN_with, 

253 replace_NaT_with=replace_NaT_with)) 

254 for col_name in col_names]) 

255 for i in range(df.shape[0])] 

256 else: 

257 # non-swagger mode: the columns and values are separate attributes. 

258 

259 # "ColumnTypes": [dtype_to_azmltyp(dt) for dt in df.dtypes], 

260 # --> dont do type conversion, AzureML type mapping does not seem to be reliable enough. 

261 

262 # convert all values in the table to primitives so that the json serializer supports it 

263 list_of_rows = df.values.tolist() 

264 def to_js_prim(obj): 

265 return to_jsonable_primitive(obj, replace_NaN_with=replace_NaN_with, replace_NaT_with=replace_NaT_with) 

266 values = [list(map(to_js_prim, row)) for row in list_of_rows] 

267 

268 return {'ColumnNames': col_names, "Values": values} 

269 

270 

271def dfs_to_azmltables(dfs, # type: Dict[str, pandas.DataFrame] 

272 swagger_format=False, # type: bool 

273 mimic_azml_output=False, # type: bool 

274 replace_NaN_with=None, # type: Any 

275 replace_NaT_with=None, # type: Any 

276 ): 

277 # type: (...) -> Dict[str, Dict[str, Union[str, Dict[str, List]]]] 

278 """ 

279 Converts a dictionary of DataFrames into a dictionary of dictionaries following the structure 

280 required for AzureML JSON conversion 

281 

282 :param dfs: a dictionary containing input names and input content (each input content is a DataFrame) 

283 :param swagger_format: a boolean (default: False) indicating if the 'swagger' azureml format should be used 

284 :return: a dictionary of tables represented as dictionaries 

285 """ 

286 validate('dfs', dfs, instance_of=dict) 

287 

288 # resultsDict = {} 

289 # for dfName, df in DataFramesDict.items(): 

290 # resultsDict[dfName] = Df_to_AzmlTable(df, dfName) 

291 # return resultsDict 

292 

293 return {df_name: df_to_azmltable(df, table_name=df_name, swagger_format=swagger_format, 

294 mimic_azml_output=mimic_azml_output, replace_NaN_with=replace_NaN_with, 

295 replace_NaT_with=replace_NaT_with) 

296 for df_name, df in dfs.items()} 

297 

298 

299def azmltable_to_df(azmltable, # type: Union[AzmlTable, AzmlOutputTable] 

300 is_azml_output=False, # type: bool 

301 table_name=None, # type: str 

302 swagger_mode=None # type: bool 

303 ): 

304 # type: (...) -> pandas.DataFrame 

305 """ 

306 Converts a parsed AzureML table (JSON-like dictionary or list obtained from parsing the json body) into a 

307 DataFrame. Since two formats exist (one for inputs and one for outputs), there is a parameter you can use to 

308 specify which one to use. 

309 

310 :param azmltable: the AzureML table to convert 

311 :param is_azml_output: set this to True if the `azmltable` was received from an actual AzureML web service. 

312 Indeed in this case the table is usually wrapped in a dictionary that needs to be unwrapped. 

313 :param table_name: the table name for error messages 

314 :param swagger_mode: a boolean (default None) indicating if the 'swagger' azureml format should be used 

315 to read the data table. If None is provided, no check will be performed. Otherwise an error will be raised if 

316 the actual format does not correspond. 

317 :return: 

318 """ 

319 validate(table_name, azmltable, instance_of=(list, dict)) 

320 

321 is_swagger_format = isinstance(azmltable, list) 

322 

323 if not is_swagger_format and is_azml_output: 

324 if 'type' in azmltable.keys() and 'value' in azmltable.keys(): 324 ↛ 332line 324 didn't jump to line 332, because the condition on line 324 was never false

325 if azmltable['type'] == 'table': 325 ↛ 330line 325 didn't jump to line 330, because the condition on line 325 was never false

326 # use this method recursively, in 'not output' mode 

327 # noinspection PyTypeChecker 

328 return azmltable_to_df(azmltable['value'], table_name=table_name) 

329 else: 

330 raise ValueError("This method is able to read table objects, found type=%s" % azmltable['type']) 

331 else: 

332 raise ValueError("object should be a dictionary with two fields 'type' and 'value', found: %s for " 

333 "table object: %s" % (azmltable.keys(), table_name)) 

334 else: 

335 if is_swagger_format: 

336 # swagger format 

337 if swagger_mode is not None and not swagger_mode: 337 ↛ 338line 337 didn't jump to line 338, because the condition on line 337 was never true

338 raise ValueError("Data table is in swagger format while non-swagger format is supposed to be received") 

339 values = [] 

340 if len(azmltable) > 0: 340 ↛ 353line 340 didn't jump to line 353, because the condition on line 340 was never false

341 col_names = list(azmltable[0].keys()) 

342 for i, row in enumerate(azmltable): 

343 try: 

344 row_vals = [row[k] for k in col_names] 

345 values.append(row_vals) 

346 if len(row) > len(col_names): 346 ↛ 347line 346 didn't jump to line 347, because the condition on line 346 was never true

347 new_cols = set(row.keys()) - set(col_names) 

348 raise ValueError("Columns are present in row #%s but not in the first row: " 

349 "%s" % (i + 1, new_cols)) 

350 except KeyError as e: 

351 raise ValueError("A column is missing in row #%s: %s" % (i + 1, e)) 

352 else: 

353 col_names = [] 

354 

355 else: 

356 if 'ColumnNames' in azmltable.keys() and 'Values' in azmltable.keys(): 356 ↛ 365line 356 didn't jump to line 365, because the condition on line 356 was never false

357 # non-swagger format 

358 if swagger_mode is not None and swagger_mode: 358 ↛ 359line 358 didn't jump to line 359, because the condition on line 358 was never true

359 raise ValueError( 

360 "Data table is in non-swagger format while swagger format is supposed to be received") 

361 

362 values = azmltable['Values'] 

363 col_names = azmltable['ColumnNames'] 

364 else: 

365 raise ValueError("object should be a list or a dictionary with two fields ColumnNames and Values, " 

366 "found: %s for table object: %s" % (azmltable.keys(), table_name)) 

367 

368 if len(values) > 0: 368 ↛ 395line 368 didn't jump to line 395, because the condition on line 368 was never false

369 # # create DataFrame manually 

370 # c = pandas.DataFrame(np.array(values), columns=dictio['ColumnNames']) 

371 # 

372 # # auto-parse dates and floats 

373 # for column in dictio['ColumnNames']: 

374 # # try to parse as datetime 

375 # try: 

376 # c[column] = c[column].apply(dateutil.parser.parse) 

377 # except ValueError: 

378 # pass 

379 # 

380 # #try to parse as float 

381 # # ... 

382 

383 # Easier: use pandas csv parser to infer most of the types 

384 # -- for that we first dump in a buffer in a CSV format 

385 buffer = create_dest_buffer_for_csv() 

386 writer = csv.writer(buffer, dialect='unix') 

387 writer.writerows([col_names]) 

388 writer.writerows(values) 

389 # -- and then we parse with pandas 

390 res = csv_to_df(create_reading_buffer(buffer.getvalue())) # StringIO 

391 buffer.close() 

392 

393 else: 

394 # empty DataFrame 

395 res = pandas.DataFrame(columns=col_names) 

396 

397 return res 

398 

399 

400def azmltables_to_dfs(azmltables_dict, # type: Dict[str, Dict[str, Union[str, Dict[str, List]]]] 

401 is_azureml_output=False # type: bool 

402 ): 

403 # type: (...) -> Dict[str, pandas.DataFrame] 

404 

405 validate('azmltables_dict', azmltables_dict, instance_of=dict) 

406 

407 return {input_name: azmltable_to_df(dict_table, is_azml_output=is_azureml_output, table_name=input_name) 

408 for input_name, dict_table in azmltables_dict.items()} 

409 

410 

411def params_df_to_params_dict(params_df # type: pandas.DataFrame 

412 ): 

413 # type: (...) -> Dict[str, str] 

414 """ 

415 Converts a parameters DataFrame into a dictionary following the structure required for JSON conversion 

416 

417 :param params_df: a dictionary of parameter names and values 

418 :return: a dictionary of parameter names and values 

419 """ 

420 validate('params_df', params_df, instance_of=pandas.DataFrame) 

421 return {param_name: params_df.at[0, param_name] for param_name in params_df.columns.values} 

422 

423 

424def params_dict_to_params_df(params_dict # type: Dict[str, Any] 

425 ): 

426 # type: (...) -> pandas.DataFrame 

427 """ 

428 Converts a parameter dictionary into a parameter DataFrame 

429 

430 :param params_dict: 

431 :return: 

432 """ 

433 validate('params_dict', params_dict, instance_of=dict) 

434 

435 # create a single-row DataFrame 

436 return pandas.DataFrame(params_dict, index=[0]) 

437 

438 

439def azmltable_to_json(azmltable # type: Union[AzmlTable, AzmlOutputTable] 

440 ): 

441 # type: (...) -> str 

442 """ 

443 Transforms an AzureML table to a JSON string. 

444 Datetimes are converted using ISO format. 

445 

446 :param azmltable: 

447 :return: 

448 """ 

449 # dump using our custom serializer so that types are supported by AzureML 

450 return json.dumps(azmltable, default=azml_json_serializer) 

451 

452 

453def json_to_azmltable(json_str # type: str 

454 ): 

455 # type: (...) -> Union[AzmlTable, AzmlOutputTable] 

456 """ 

457 Creates an AzureML table from a json string. 

458 

459 :param json_str: 

460 :return: 

461 """ 

462 # load but keep order: use an ordered dict 

463 return json.loads(json_str, object_pairs_hook=OrderedDict) 

464 

465 

466if sys.version_info >= (3, 0, 0): 466 ↛ 469line 466 didn't jump to line 469, because the condition on line 466 was never false

467 PRIM_TYPES = (int, str, bool) 

468else: 

469 PRIM_TYPES = (int, str, bool, eval('long')) 

470 

471 

472def to_jsonable_primitive(obj, 

473 replace_NaN_with=None, # type: Any 

474 replace_NaT_with=None # type: Any 

475 ): 

476 """ 

477 Converts the given item (should NOT be a container) to a json-able one. 

478 

479 :param obj: 

480 :param replace_NaN_with: 

481 :param replace_NaT_with: 

482 :return: 

483 """ 

484 if isinstance(obj, float): 

485 if isnan(obj): 

486 return replace_NaN_with or obj 

487 else: 

488 return obj 

489 elif isinstance(obj, PRIM_TYPES): # , dict, list, tuple, set 

490 return obj 

491 else: 

492 return azml_json_serializer(obj, replace_NaT_with=replace_NaT_with) 

493 

494 

495def azml_json_serializer(obj, 

496 replace_NaT_with=None # type: Any 

497 ): 

498 """ 

499 JSON custom serializer for objects not serializable by default json code 

500 

501 :param obj: 

502 :return: 

503 """ 

504 if isinstance(obj, np.integer): 504 ↛ 506line 504 didn't jump to line 506, because the condition on line 504 was never true

505 # since numpy ints are also bools, do ints first 

506 return int(obj) 

507 elif isinstance(obj, bool): 507 ↛ 508line 507 didn't jump to line 508, because the condition on line 507 was never true

508 return bool(obj) 

509 elif isinstance(obj, np.floating): 509 ↛ 510line 509 didn't jump to line 510, because the condition on line 509 was never true

510 return float(obj) 

511 elif isinstance(obj, np.ndarray): 511 ↛ 512line 511 didn't jump to line 512, because the condition on line 511 was never true

512 return obj.tolist() 

513 elif isinstance(obj, datetime): # or isinstance(obj, np.generic) and obj.kind='M': 513 ↛ 520line 513 didn't jump to line 520, because the condition on line 513 was never false

514 # Datetime are written as ISO format string 

515 if pandas.isnull(obj): 

516 return replace_NaT_with or obj.isoformat() # returns "NaT" 

517 else: 

518 return obj.isoformat() 

519 else: 

520 raise TypeError("Type not serializable : " + str(obj)) 

521 

522 

523def convert_all_datetime_columns(df): 

524 """ 

525 Utility method to try to convert all datetime columns in the provided DataFrame, inplace. 

526 Note that only columns with dtype 'object' are considered as possible candidates. 

527 

528 :param df: 

529 :return: 

530 """ 

531 objColumns = [colName for colName, colType in df.dtypes.iteritems() if colType == np.dtype('O')] # noqa 

532 for obj_col_name in objColumns: 

533 try: 

534 df[obj_col_name] = pandas.to_datetime(df[obj_col_name]) 

535 except Exception: 

536 # silently escape, do not convert 

537 pass 

538 

539 

540def localize_all_datetime_columns(df): 

541 """ 

542 Localizes all datetime columns in df, inplace. 

543 :param df: 

544 :return: 

545 """ 

546 datetime_cols = [colName for colName, colType in df.dtypes.iteritems() if is_datetime_dtype(colType)] # noqa 

547 for datetime_col in datetime_cols: 

548 # time is in ISO format, so the time column after import is UTC. We just have to declare it 

549 try: 

550 df[datetime_col] = df[datetime_col].dt.tz_localize(tz="UTC") 

551 except TypeError: 

552 df[datetime_col] = df[datetime_col].dt.tz_convert(tz="UTC") 

553 

554 

555def is_datetime_dtype(dtyp): 

556 """ 

557 Returns True if the given dtype is a datetime dtype 

558 :param dtyp: 

559 :return: 

560 """ 

561 # return np.issubdtype(dtyp.base, np.dtype(np.datetime64)) -> does not work for int64 

562 return dtyp.kind == 'M'