Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Authors: Sylvain MARIE <sylvain.marie@se.com>
2# + All contributors to <https://github.com/smarie/python-azureml-client>
3#
4# License: 3-clause BSD, <https://github.com/smarie/python-azureml-client/blob/master/LICENSE>
5from __future__ import print_function
6import csv
7import json
8import sys
9from collections import OrderedDict
10from datetime import datetime
11from io import BytesIO # to handle byte strings
12from io import StringIO # to handle unicode strings
13from math import isnan
15try: # python 3.5+
16 from typing import Dict, Union, List, Any, Tuple
18 # a few predefined type hints
19 SwaggerModeAzmlTable = List[Dict[str, Any]]
20 NonSwaggerModeAzmlTable = Dict[str, Union[List[str], List[List[Any]]]]
21 AzmlTable = Union[SwaggerModeAzmlTable, NonSwaggerModeAzmlTable]
22 AzmlOutputTable = Dict[str, Union[str, AzmlTable]]
23except ImportError:
24 pass
26import numpy as np
27import pandas
28import requests
29from valid8 import validate
32try:
33 from csv import unix_dialect
34except ImportError:
35 # noinspection PyPep8Naming,SpellCheckingInspection
36 class unix_dialect(csv.Dialect):
37 """Describe the usual properties of Unix-generated CSV files."""
38 delimiter = ','
39 quotechar = '"'
40 doublequote = True
41 skipinitialspace = False
42 lineterminator = '\n'
43 quoting = csv.QUOTE_ALL
44 csv.register_dialect("unix", unix_dialect)
46if sys.version_info >= (3, 0): 46 ↛ 53line 46 didn't jump to line 53, because the condition on line 46 was never false
47 def create_dest_buffer_for_csv():
48 return StringIO(newline='')
50 def create_reading_buffer(value):
51 return StringIO(value)
52else:
53 def create_dest_buffer_for_csv():
54 return BytesIO() # StringIO(newline='')
56 def create_reading_buffer(value):
57 return BytesIO(value)
60class AzmlException(Exception):
61 """
62 Represents an AzureMl exception, built from an HTTP error body received from AzureML.
63 Once constructed from an HTTPError, the error details appear in the exception fields.
64 """
66 def __init__(self,
67 http_error # type: requests.exceptions.HTTPError
68 ):
69 """
70 Constructor from an http error received from `requests`.
72 :param http_error:
73 """
74 # extract error contents from http json body
75 json_error = http_error.response.text
76 error_as_dict = json_to_azmltable(json_error)
78 # main error elements
79 try:
80 self.error_dict = error_as_dict['error']
81 # noinspection PyTypeChecker
82 self.error_code = self.error_dict['code']
83 # noinspection PyTypeChecker
84 self.error_message = self.error_dict['message']
85 # try:
86 self.details = self.error_dict['details']
87 # except KeyError:
88 # # legacy format ?
89 # self.details = error_as_dict['details']
90 except KeyError:
91 raise ValueError("Unrecognized format for AzureML http error. JSON content is :\n %s" % error_as_dict)
93 # create the message based on contents
94 try:
95 details_dict = error_as_dict['details'][0]
96 # noinspection PyTypeChecker
97 details_code = details_dict['code']
98 # noinspection PyTypeChecker
99 details_msg = details_dict['message']
100 except (IndexError, KeyError):
101 msg = 'Error [%s]: %s' % (self.error_code, self.error_message)
102 else:
103 msg = 'Error [%s][%s]: %s. %s' % (self.error_code, details_code, self.error_message, details_msg)
105 # finally call super
106 super(AzmlException, self).__init__(msg)
108 def __str__(self):
109 # if 'error' in self.__errorAsDict:
110 # # this is an azureML standard error
111 # if self.__errorAsDict['error']['code'] == 'LibraryExecutionError':
112 # if self.__errorAsDict['error']['details'][0]['code'] == 'TableSchemaColumnCountMismatch':
113 # return 'Dynamic schema validation is not supported in Request-Response mode, you should maybe
114 # use the BATCH response mode by setting useBatchMode to true in python'
115 return json.dumps(self.error_dict, indent=4)
118def df_to_csv(df, # type: pandas.DataFrame
119 df_name=None, # type: str
120 charset=None # type: str
121 ):
122 # type: (...) -> str
123 """
124 Converts the provided DataFrame to a csv, typically to store it on blob storage for Batch AzureML calls.
125 WARNING: datetime columns are converted in ISO format but the milliseconds are ignored and set to zero.
127 :param df:
128 :param df_name: the name of the DataFrame, for error messages
129 :param charset: the charset to use for encoding
130 :return:
131 """
132 validate(df_name, df, instance_of=pandas.DataFrame)
134 # TODO what about timezone detail if not present, will the %z be ok ?
135 return df.to_csv(path_or_buf=None, sep=',', decimal='.', na_rep='', encoding=charset,
136 index=False, date_format='%Y-%m-%dT%H:%M:%S.000%z')
139def dfs_to_csvs(dfs, # type: Dict[str, pandas.DataFrame]
140 charset=None # type: str
141 ):
142 # type: (...) -> Dict[str, str]
143 """
144 Converts each of the DataFrames in the provided dictionary to a csv, typically to store it on blob storage for
145 Batch AzureML calls. All CSV are returned in a dictionary with the same keys.
147 WARNING: datetime columns are converted in ISO format but the milliseconds are ignored and set to zero.
148 See `df_to_csv` for details
150 :param dfs: a dictionary containing input names and input content (each input content is a DataFrame)
151 :param charset: the charset to use for csv encoding
152 :return: a dictionary containing the string representations of the Csv inputs to store on the blob storage
153 """
154 validate('dfs', dfs, instance_of=dict)
156 return {input_name: df_to_csv(inputDf, df_name=input_name, charset=charset)
157 for input_name, inputDf in dfs.items()}
160def csv_to_df(csv_buffer_or_str_or_filepath, # type: Union[str, StringIO, BytesIO]
161 csv_name=None # type: str
162 ):
163 # type: (...) -> pandas.DataFrame
164 """
165 Converts the provided csv to a DatFrame, typically to read it from blob storage for Batch AzureML calls.
166 Helper method to ensure consistent reading in particular for timezones and datetime parsing
168 :param csv_buffer_or_str_or_filepath:
169 :param csv_name: the name of the DataFrame, for error messages
170 :return:
171 """
172 validate(csv_name, csv_buffer_or_str_or_filepath)
174 # pandas does not accept string. create a buffer
175 if isinstance(csv_buffer_or_str_or_filepath, str):
176 csv_buffer_or_str_or_filepath = create_reading_buffer(csv_buffer_or_str_or_filepath)
178 # read without parsing dates
179 res = pandas.read_csv(csv_buffer_or_str_or_filepath, sep=',', decimal='.') # infer_dt_format=True, parse_dates=[0]
181 # -- try to infer datetime columns
182 convert_all_datetime_columns(res)
184 # -- additionally we automatically configure the timezone as UTC
185 localize_all_datetime_columns(res)
187 return res
190def csvs_to_dfs(csv_dict # type: Dict[str, str]
191 ):
192 # type: (...) -> Dict[str, pandas.DataFrame]
193 """
194 Helper method to read CSVs compliant with AzureML web service BATCH inputs/outputs, into a dictionary of DataFrames
196 :param csv_dict:
197 :return:
198 """
199 validate('csv_dict', csv_dict, instance_of=dict)
201 return {input_name: csv_to_df(inputCsv, csv_name=input_name)
202 for input_name, inputCsv in csv_dict.items()}
205def df_to_azmltable(df, # type: pandas.DataFrame
206 table_name=None, # type: str
207 swagger_format=False, # type: bool
208 mimic_azml_output=False, # type: bool
209 replace_NaN_with=None, # type: Any
210 replace_NaT_with=None, # type: Any
211 ):
212 # type: (...) -> Union[AzmlTable, AzmlOutputTable]
213 """
214 Converts the provided DataFrame to a dictionary or list in the same format than the JSON expected by AzureML in
215 the Request-Response services. Note that contents are kept as is (values are not converted to string yet)
217 :param df: the DataFrame to convert
218 :param table_name: the table name for error messages
219 :param swagger_format: a boolean (default: False) indicating if the swagger format should be used (more verbose).
220 :param mimic_azml_output: set this to True if the result should be wrapped in a dictionary like AzureML outputs.
221 This is typically needed if you wish to mimic an AzureML web service's behaviour, for a mock web server.
222 :return:
223 """
224 validate(table_name, df, instance_of=pandas.DataFrame)
226 # only 2-dimensions tables are supported
227 validate("%s_nb_dimensions" % table_name, len(df.shape), equals=2,
228 help_msg="Only 2-dimensional tables are supported for AzureML format conversion.")
230 if mimic_azml_output:
231 # use this method recursively, in 'not output' mode
232 return {'type': 'table', 'value': df_to_azmltable(df, table_name=table_name, swagger_format=swagger_format,
233 replace_NaN_with=replace_NaN_with,
234 replace_NaT_with=replace_NaT_with)}
235 else:
236 col_names = df.columns.values.tolist()
238 # Convert the table entries to json-able format.
239 if swagger_format:
240 # swagger mode: the table is a list of object rows
242 def _get_item_in_df(df, col_name, row_idx):
243 """ Internal routine to convert all possible items to python primitive by asking numpy if possible.
244 Pandas types do not support it so return 'as is' then"""
245 cell = df[col_name].iloc[row_idx]
246 try:
247 return cell.item()
248 except AttributeError:
249 return cell
251 return [OrderedDict([(col_name, to_jsonable_primitive(_get_item_in_df(df, col_name, i),
252 replace_NaN_with=replace_NaN_with,
253 replace_NaT_with=replace_NaT_with))
254 for col_name in col_names])
255 for i in range(df.shape[0])]
256 else:
257 # non-swagger mode: the columns and values are separate attributes.
259 # "ColumnTypes": [dtype_to_azmltyp(dt) for dt in df.dtypes],
260 # --> dont do type conversion, AzureML type mapping does not seem to be reliable enough.
262 # convert all values in the table to primitives so that the json serializer supports it
263 list_of_rows = df.values.tolist()
264 def to_js_prim(obj):
265 return to_jsonable_primitive(obj, replace_NaN_with=replace_NaN_with, replace_NaT_with=replace_NaT_with)
266 values = [list(map(to_js_prim, row)) for row in list_of_rows]
268 return {'ColumnNames': col_names, "Values": values}
271def dfs_to_azmltables(dfs, # type: Dict[str, pandas.DataFrame]
272 swagger_format=False, # type: bool
273 mimic_azml_output=False, # type: bool
274 replace_NaN_with=None, # type: Any
275 replace_NaT_with=None, # type: Any
276 ):
277 # type: (...) -> Dict[str, Dict[str, Union[str, Dict[str, List]]]]
278 """
279 Converts a dictionary of DataFrames into a dictionary of dictionaries following the structure
280 required for AzureML JSON conversion
282 :param dfs: a dictionary containing input names and input content (each input content is a DataFrame)
283 :param swagger_format: a boolean (default: False) indicating if the 'swagger' azureml format should be used
284 :return: a dictionary of tables represented as dictionaries
285 """
286 validate('dfs', dfs, instance_of=dict)
288 # resultsDict = {}
289 # for dfName, df in DataFramesDict.items():
290 # resultsDict[dfName] = Df_to_AzmlTable(df, dfName)
291 # return resultsDict
293 return {df_name: df_to_azmltable(df, table_name=df_name, swagger_format=swagger_format,
294 mimic_azml_output=mimic_azml_output, replace_NaN_with=replace_NaN_with,
295 replace_NaT_with=replace_NaT_with)
296 for df_name, df in dfs.items()}
299def azmltable_to_df(azmltable, # type: Union[AzmlTable, AzmlOutputTable]
300 is_azml_output=False, # type: bool
301 table_name=None, # type: str
302 swagger_mode=None # type: bool
303 ):
304 # type: (...) -> pandas.DataFrame
305 """
306 Converts a parsed AzureML table (JSON-like dictionary or list obtained from parsing the json body) into a
307 DataFrame. Since two formats exist (one for inputs and one for outputs), there is a parameter you can use to
308 specify which one to use.
310 :param azmltable: the AzureML table to convert
311 :param is_azml_output: set this to True if the `azmltable` was received from an actual AzureML web service.
312 Indeed in this case the table is usually wrapped in a dictionary that needs to be unwrapped.
313 :param table_name: the table name for error messages
314 :param swagger_mode: a boolean (default None) indicating if the 'swagger' azureml format should be used
315 to read the data table. If None is provided, no check will be performed. Otherwise an error will be raised if
316 the actual format does not correspond.
317 :return:
318 """
319 validate(table_name, azmltable, instance_of=(list, dict))
321 is_swagger_format = isinstance(azmltable, list)
323 if not is_swagger_format and is_azml_output:
324 if 'type' in azmltable.keys() and 'value' in azmltable.keys(): 324 ↛ 332line 324 didn't jump to line 332, because the condition on line 324 was never false
325 if azmltable['type'] == 'table': 325 ↛ 330line 325 didn't jump to line 330, because the condition on line 325 was never false
326 # use this method recursively, in 'not output' mode
327 # noinspection PyTypeChecker
328 return azmltable_to_df(azmltable['value'], table_name=table_name)
329 else:
330 raise ValueError("This method is able to read table objects, found type=%s" % azmltable['type'])
331 else:
332 raise ValueError("object should be a dictionary with two fields 'type' and 'value', found: %s for "
333 "table object: %s" % (azmltable.keys(), table_name))
334 else:
335 if is_swagger_format:
336 # swagger format
337 if swagger_mode is not None and not swagger_mode: 337 ↛ 338line 337 didn't jump to line 338, because the condition on line 337 was never true
338 raise ValueError("Data table is in swagger format while non-swagger format is supposed to be received")
339 values = []
340 if len(azmltable) > 0: 340 ↛ 353line 340 didn't jump to line 353, because the condition on line 340 was never false
341 col_names = list(azmltable[0].keys())
342 for i, row in enumerate(azmltable):
343 try:
344 row_vals = [row[k] for k in col_names]
345 values.append(row_vals)
346 if len(row) > len(col_names): 346 ↛ 347line 346 didn't jump to line 347, because the condition on line 346 was never true
347 new_cols = set(row.keys()) - set(col_names)
348 raise ValueError("Columns are present in row #%s but not in the first row: "
349 "%s" % (i + 1, new_cols))
350 except KeyError as e:
351 raise ValueError("A column is missing in row #%s: %s" % (i + 1, e))
352 else:
353 col_names = []
355 else:
356 if 'ColumnNames' in azmltable.keys() and 'Values' in azmltable.keys(): 356 ↛ 365line 356 didn't jump to line 365, because the condition on line 356 was never false
357 # non-swagger format
358 if swagger_mode is not None and swagger_mode: 358 ↛ 359line 358 didn't jump to line 359, because the condition on line 358 was never true
359 raise ValueError(
360 "Data table is in non-swagger format while swagger format is supposed to be received")
362 values = azmltable['Values']
363 col_names = azmltable['ColumnNames']
364 else:
365 raise ValueError("object should be a list or a dictionary with two fields ColumnNames and Values, "
366 "found: %s for table object: %s" % (azmltable.keys(), table_name))
368 if len(values) > 0: 368 ↛ 395line 368 didn't jump to line 395, because the condition on line 368 was never false
369 # # create DataFrame manually
370 # c = pandas.DataFrame(np.array(values), columns=dictio['ColumnNames'])
371 #
372 # # auto-parse dates and floats
373 # for column in dictio['ColumnNames']:
374 # # try to parse as datetime
375 # try:
376 # c[column] = c[column].apply(dateutil.parser.parse)
377 # except ValueError:
378 # pass
379 #
380 # #try to parse as float
381 # # ...
383 # Easier: use pandas csv parser to infer most of the types
384 # -- for that we first dump in a buffer in a CSV format
385 buffer = create_dest_buffer_for_csv()
386 writer = csv.writer(buffer, dialect='unix')
387 writer.writerows([col_names])
388 writer.writerows(values)
389 # -- and then we parse with pandas
390 res = csv_to_df(create_reading_buffer(buffer.getvalue())) # StringIO
391 buffer.close()
393 else:
394 # empty DataFrame
395 res = pandas.DataFrame(columns=col_names)
397 return res
400def azmltables_to_dfs(azmltables_dict, # type: Dict[str, Dict[str, Union[str, Dict[str, List]]]]
401 is_azureml_output=False # type: bool
402 ):
403 # type: (...) -> Dict[str, pandas.DataFrame]
405 validate('azmltables_dict', azmltables_dict, instance_of=dict)
407 return {input_name: azmltable_to_df(dict_table, is_azml_output=is_azureml_output, table_name=input_name)
408 for input_name, dict_table in azmltables_dict.items()}
411def params_df_to_params_dict(params_df # type: pandas.DataFrame
412 ):
413 # type: (...) -> Dict[str, str]
414 """
415 Converts a parameters DataFrame into a dictionary following the structure required for JSON conversion
417 :param params_df: a dictionary of parameter names and values
418 :return: a dictionary of parameter names and values
419 """
420 validate('params_df', params_df, instance_of=pandas.DataFrame)
421 return {param_name: params_df.at[0, param_name] for param_name in params_df.columns.values}
424def params_dict_to_params_df(params_dict # type: Dict[str, Any]
425 ):
426 # type: (...) -> pandas.DataFrame
427 """
428 Converts a parameter dictionary into a parameter DataFrame
430 :param params_dict:
431 :return:
432 """
433 validate('params_dict', params_dict, instance_of=dict)
435 # create a single-row DataFrame
436 return pandas.DataFrame(params_dict, index=[0])
439def azmltable_to_json(azmltable # type: Union[AzmlTable, AzmlOutputTable]
440 ):
441 # type: (...) -> str
442 """
443 Transforms an AzureML table to a JSON string.
444 Datetimes are converted using ISO format.
446 :param azmltable:
447 :return:
448 """
449 # dump using our custom serializer so that types are supported by AzureML
450 return json.dumps(azmltable, default=azml_json_serializer)
453def json_to_azmltable(json_str # type: str
454 ):
455 # type: (...) -> Union[AzmlTable, AzmlOutputTable]
456 """
457 Creates an AzureML table from a json string.
459 :param json_str:
460 :return:
461 """
462 # load but keep order: use an ordered dict
463 return json.loads(json_str, object_pairs_hook=OrderedDict)
466if sys.version_info >= (3, 0, 0): 466 ↛ 469line 466 didn't jump to line 469, because the condition on line 466 was never false
467 PRIM_TYPES = (int, str, bool)
468else:
469 PRIM_TYPES = (int, str, bool, eval('long'))
472def to_jsonable_primitive(obj,
473 replace_NaN_with=None, # type: Any
474 replace_NaT_with=None # type: Any
475 ):
476 """
477 Converts the given item (should NOT be a container) to a json-able one.
479 :param obj:
480 :param replace_NaN_with:
481 :param replace_NaT_with:
482 :return:
483 """
484 if isinstance(obj, float):
485 if isnan(obj):
486 return replace_NaN_with or obj
487 else:
488 return obj
489 elif isinstance(obj, PRIM_TYPES): # , dict, list, tuple, set
490 return obj
491 else:
492 return azml_json_serializer(obj, replace_NaT_with=replace_NaT_with)
495def azml_json_serializer(obj,
496 replace_NaT_with=None # type: Any
497 ):
498 """
499 JSON custom serializer for objects not serializable by default json code
501 :param obj:
502 :return:
503 """
504 if isinstance(obj, np.integer): 504 ↛ 506line 504 didn't jump to line 506, because the condition on line 504 was never true
505 # since numpy ints are also bools, do ints first
506 return int(obj)
507 elif isinstance(obj, bool): 507 ↛ 508line 507 didn't jump to line 508, because the condition on line 507 was never true
508 return bool(obj)
509 elif isinstance(obj, np.floating): 509 ↛ 510line 509 didn't jump to line 510, because the condition on line 509 was never true
510 return float(obj)
511 elif isinstance(obj, np.ndarray): 511 ↛ 512line 511 didn't jump to line 512, because the condition on line 511 was never true
512 return obj.tolist()
513 elif isinstance(obj, datetime): # or isinstance(obj, np.generic) and obj.kind='M': 513 ↛ 520line 513 didn't jump to line 520, because the condition on line 513 was never false
514 # Datetime are written as ISO format string
515 if pandas.isnull(obj):
516 return replace_NaT_with or obj.isoformat() # returns "NaT"
517 else:
518 return obj.isoformat()
519 else:
520 raise TypeError("Type not serializable : " + str(obj))
523def convert_all_datetime_columns(df):
524 """
525 Utility method to try to convert all datetime columns in the provided DataFrame, inplace.
526 Note that only columns with dtype 'object' are considered as possible candidates.
528 :param df:
529 :return:
530 """
531 objColumns = [colName for colName, colType in df.dtypes.iteritems() if colType == np.dtype('O')] # noqa
532 for obj_col_name in objColumns:
533 try:
534 df[obj_col_name] = pandas.to_datetime(df[obj_col_name])
535 except Exception:
536 # silently escape, do not convert
537 pass
540def localize_all_datetime_columns(df):
541 """
542 Localizes all datetime columns in df, inplace.
543 :param df:
544 :return:
545 """
546 datetime_cols = [colName for colName, colType in df.dtypes.iteritems() if is_datetime_dtype(colType)] # noqa
547 for datetime_col in datetime_cols:
548 # time is in ISO format, so the time column after import is UTC. We just have to declare it
549 try:
550 df[datetime_col] = df[datetime_col].dt.tz_localize(tz="UTC")
551 except TypeError:
552 df[datetime_col] = df[datetime_col].dt.tz_convert(tz="UTC")
555def is_datetime_dtype(dtyp):
556 """
557 Returns True if the given dtype is a datetime dtype
558 :param dtyp:
559 :return:
560 """
561 # return np.issubdtype(dtyp.base, np.dtype(np.datetime64)) -> does not work for int64
562 return dtyp.kind == 'M'