⬅ azmlclient/base_databinding.py source

1 # Authors: Sylvain MARIE <sylvain.marie@se.com>
2 # + All contributors to <https://github.com/smarie/python-azureml-client>
3 #
4 # License: 3-clause BSD, <https://github.com/smarie/python-azureml-client/blob/master/LICENSE>
5 from __future__ import print_function
6 import csv
7 import json
8 import sys
9 from collections import OrderedDict
10 from datetime import datetime
11 from io import BytesIO # to handle byte strings
12 from io import StringIO # to handle unicode strings
13 from math import isnan
14  
15 try: # python 3.5+
  • F401 'typing.Tuple' imported but unused
16 from typing import Dict, Union, List, Any, Tuple
17  
18 # a few predefined type hints
19 SwaggerModeAzmlTable = List[Dict[str, Any]]
20 NonSwaggerModeAzmlTable = Dict[str, Union[List[str], List[List[Any]]]]
21 AzmlTable = Union[SwaggerModeAzmlTable, NonSwaggerModeAzmlTable]
22 AzmlOutputTable = Dict[str, Union[str, AzmlTable]]
23 except ImportError:
24 pass
25  
26 import numpy as np
27 import pandas
28 import requests
29 from valid8 import validate
30  
31  
32 try:
33 from csv import unix_dialect
34 except ImportError:
35 # noinspection PyPep8Naming,SpellCheckingInspection
36 class unix_dialect(csv.Dialect):
37 """Describe the usual properties of Unix-generated CSV files."""
38 delimiter = ','
39 quotechar = '"'
40 doublequote = True
41 skipinitialspace = False
42 lineterminator = '\n'
43 quoting = csv.QUOTE_ALL
44 csv.register_dialect("unix", unix_dialect)
45  
46 if sys.version_info >= (3, 0):
47 def create_dest_buffer_for_csv():
48 return StringIO(newline='')
49  
50 def create_reading_buffer(value):
51 return StringIO(value)
52 else:
53 def create_dest_buffer_for_csv():
54 return BytesIO() # StringIO(newline='')
55  
56 def create_reading_buffer(value):
57 return BytesIO(value)
58  
59  
60 class AzmlException(Exception):
61 """
62 Represents an AzureMl exception, built from an HTTP error body received from AzureML.
63 Once constructed from an HTTPError, the error details appear in the exception fields.
64 """
65  
66 def __init__(self,
67 http_error # type: requests.exceptions.HTTPError
68 ):
69 """
70 Constructor from an http error received from `requests`.
71  
72 :param http_error:
73 """
74 # extract error contents from http json body
75 json_error = http_error.response.text
76 error_as_dict = json_to_azmltable(json_error)
77  
78 # main error elements
79 try:
80 self.error_dict = error_as_dict['error']
81 # noinspection PyTypeChecker
82 self.error_code = self.error_dict['code']
83 # noinspection PyTypeChecker
84 self.error_message = self.error_dict['message']
85 # try:
86 self.details = self.error_dict['details']
87 # except KeyError:
88 # # legacy format ?
89 # self.details = error_as_dict['details']
90 except KeyError:
91 raise ValueError("Unrecognized format for AzureML http error. JSON content is :\n %s" % error_as_dict)
92  
93 # create the message based on contents
94 try:
95 details_dict = error_as_dict['details'][0]
96 # noinspection PyTypeChecker
97 details_code = details_dict['code']
98 # noinspection PyTypeChecker
99 details_msg = details_dict['message']
100 except (IndexError, KeyError):
101 msg = 'Error [%s]: %s' % (self.error_code, self.error_message)
102 else:
103 msg = 'Error [%s][%s]: %s. %s' % (self.error_code, details_code, self.error_message, details_msg)
104  
105 # finally call super
106 super(AzmlException, self).__init__(msg)
107  
108 def __str__(self):
109 # if 'error' in self.__errorAsDict:
110 # # this is an azureML standard error
111 # if self.__errorAsDict['error']['code'] == 'LibraryExecutionError':
112 # if self.__errorAsDict['error']['details'][0]['code'] == 'TableSchemaColumnCountMismatch':
113 # return 'Dynamic schema validation is not supported in Request-Response mode, you should maybe
114 # use the BATCH response mode by setting useBatchMode to true in python'
115 return json.dumps(self.error_dict, indent=4)
116  
117  
118 def df_to_csv(df, # type: pandas.DataFrame
119 df_name=None, # type: str
120 charset=None # type: str
121 ):
122 # type: (...) -> str
123 """
124 Converts the provided DataFrame to a csv, typically to store it on blob storage for Batch AzureML calls.
125 WARNING: datetime columns are converted in ISO format but the milliseconds are ignored and set to zero.
126  
127 :param df:
128 :param df_name: the name of the DataFrame, for error messages
129 :param charset: the charset to use for encoding
130 :return:
131 """
132 validate(df_name, df, instance_of=pandas.DataFrame)
133  
134 # TODO what about timezone detail if not present, will the %z be ok ?
135 return df.to_csv(path_or_buf=None, sep=',', decimal='.', na_rep='', encoding=charset,
136 index=False, date_format='%Y-%m-%dT%H:%M:%S.000%z')
137  
138  
139 def dfs_to_csvs(dfs, # type: Dict[str, pandas.DataFrame]
140 charset=None # type: str
141 ):
142 # type: (...) -> Dict[str, str]
143 """
144 Converts each of the DataFrames in the provided dictionary to a csv, typically to store it on blob storage for
145 Batch AzureML calls. All CSV are returned in a dictionary with the same keys.
146  
147 WARNING: datetime columns are converted in ISO format but the milliseconds are ignored and set to zero.
148 See `df_to_csv` for details
149  
150 :param dfs: a dictionary containing input names and input content (each input content is a DataFrame)
151 :param charset: the charset to use for csv encoding
152 :return: a dictionary containing the string representations of the Csv inputs to store on the blob storage
153 """
154 validate('dfs', dfs, instance_of=dict)
155  
156 return {input_name: df_to_csv(inputDf, df_name=input_name, charset=charset)
157 for input_name, inputDf in dfs.items()}
158  
159  
160 def csv_to_df(csv_buffer_or_str_or_filepath, # type: Union[str, StringIO, BytesIO]
161 csv_name=None # type: str
162 ):
163 # type: (...) -> pandas.DataFrame
164 """
165 Converts the provided csv to a DatFrame, typically to read it from blob storage for Batch AzureML calls.
166 Helper method to ensure consistent reading in particular for timezones and datetime parsing
167  
168 :param csv_buffer_or_str_or_filepath:
169 :param csv_name: the name of the DataFrame, for error messages
170 :return:
171 """
172 validate(csv_name, csv_buffer_or_str_or_filepath)
173  
174 # pandas does not accept string. create a buffer
175 if isinstance(csv_buffer_or_str_or_filepath, str):
176 csv_buffer_or_str_or_filepath = create_reading_buffer(csv_buffer_or_str_or_filepath)
177  
178 # read without parsing dates
179 res = pandas.read_csv(csv_buffer_or_str_or_filepath, sep=',', decimal='.') # infer_dt_format=True, parse_dates=[0]
180  
181 # -- try to infer datetime columns
182 convert_all_datetime_columns(res)
183  
184 # -- additionally we automatically configure the timezone as UTC
185 localize_all_datetime_columns(res)
186  
187 return res
188  
189  
190 def csvs_to_dfs(csv_dict # type: Dict[str, str]
191 ):
192 # type: (...) -> Dict[str, pandas.DataFrame]
193 """
194 Helper method to read CSVs compliant with AzureML web service BATCH inputs/outputs, into a dictionary of DataFrames
195  
196 :param csv_dict:
197 :return:
198 """
199 validate('csv_dict', csv_dict, instance_of=dict)
200  
201 return {input_name: csv_to_df(inputCsv, csv_name=input_name)
202 for input_name, inputCsv in csv_dict.items()}
203  
204  
205 def df_to_azmltable(df, # type: pandas.DataFrame
206 table_name=None, # type: str
207 swagger_format=False, # type: bool
208 mimic_azml_output=False, # type: bool
209 replace_NaN_with=None, # type: Any
210 replace_NaT_with=None, # type: Any
211 ):
212 # type: (...) -> Union[AzmlTable, AzmlOutputTable]
213 """
214 Converts the provided DataFrame to a dictionary or list in the same format than the JSON expected by AzureML in
215 the Request-Response services. Note that contents are kept as is (values are not converted to string yet)
216  
217 :param df: the DataFrame to convert
218 :param table_name: the table name for error messages
219 :param swagger_format: a boolean (default: False) indicating if the swagger format should be used (more verbose).
220 :param mimic_azml_output: set this to True if the result should be wrapped in a dictionary like AzureML outputs.
221 This is typically needed if you wish to mimic an AzureML web service's behaviour, for a mock web server.
222 :return:
223 """
224 validate(table_name, df, instance_of=pandas.DataFrame)
225  
226 # only 2-dimensions tables are supported
227 validate("%s_nb_dimensions" % table_name, len(df.shape), equals=2,
228 help_msg="Only 2-dimensional tables are supported for AzureML format conversion.")
229  
230 if mimic_azml_output:
231 # use this method recursively, in 'not output' mode
232 return {'type': 'table', 'value': df_to_azmltable(df, table_name=table_name, swagger_format=swagger_format,
233 replace_NaN_with=replace_NaN_with,
234 replace_NaT_with=replace_NaT_with)}
235 else:
236 col_names = df.columns.values.tolist()
237  
238 # Convert the table entries to json-able format.
239 if swagger_format:
240 # swagger mode: the table is a list of object rows
241  
242 def _get_item_in_df(df, col_name, row_idx):
243 """ Internal routine to convert all possible items to python primitive by asking numpy if possible.
244 Pandas types do not support it so return 'as is' then"""
245 cell = df[col_name].iloc[row_idx]
246 try:
247 return cell.item()
248 except AttributeError:
249 return cell
250  
251 return [OrderedDict([(col_name, to_jsonable_primitive(_get_item_in_df(df, col_name, i),
252 replace_NaN_with=replace_NaN_with,
253 replace_NaT_with=replace_NaT_with))
254 for col_name in col_names])
255 for i in range(df.shape[0])]
256 else:
257 # non-swagger mode: the columns and values are separate attributes.
258  
259 # "ColumnTypes": [dtype_to_azmltyp(dt) for dt in df.dtypes],
260 # --> dont do type conversion, AzureML type mapping does not seem to be reliable enough.
261  
262 # convert all values in the table to primitives so that the json serializer supports it
263 list_of_rows = df.values.tolist()
  • E306 Expected 1 blank line before a nested definition, found 0
264 def to_js_prim(obj):
265 return to_jsonable_primitive(obj, replace_NaN_with=replace_NaN_with, replace_NaT_with=replace_NaT_with)
266 values = [list(map(to_js_prim, row)) for row in list_of_rows]
267  
268 return {'ColumnNames': col_names, "Values": values}
269  
270  
271 def dfs_to_azmltables(dfs, # type: Dict[str, pandas.DataFrame]
272 swagger_format=False, # type: bool
273 mimic_azml_output=False, # type: bool
274 replace_NaN_with=None, # type: Any
275 replace_NaT_with=None, # type: Any
276 ):
277 # type: (...) -> Dict[str, Dict[str, Union[str, Dict[str, List]]]]
278 """
279 Converts a dictionary of DataFrames into a dictionary of dictionaries following the structure
280 required for AzureML JSON conversion
281  
282 :param dfs: a dictionary containing input names and input content (each input content is a DataFrame)
283 :param swagger_format: a boolean (default: False) indicating if the 'swagger' azureml format should be used
284 :return: a dictionary of tables represented as dictionaries
285 """
286 validate('dfs', dfs, instance_of=dict)
287  
288 # resultsDict = {}
289 # for dfName, df in DataFramesDict.items():
290 # resultsDict[dfName] = Df_to_AzmlTable(df, dfName)
291 # return resultsDict
292  
293 return {df_name: df_to_azmltable(df, table_name=df_name, swagger_format=swagger_format,
294 mimic_azml_output=mimic_azml_output, replace_NaN_with=replace_NaN_with,
295 replace_NaT_with=replace_NaT_with)
296 for df_name, df in dfs.items()}
297  
298  
299 def azmltable_to_df(azmltable, # type: Union[AzmlTable, AzmlOutputTable]
300 is_azml_output=False, # type: bool
301 table_name=None, # type: str
302 swagger_mode=None # type: bool
303 ):
304 # type: (...) -> pandas.DataFrame
305 """
306 Converts a parsed AzureML table (JSON-like dictionary or list obtained from parsing the json body) into a
307 DataFrame. Since two formats exist (one for inputs and one for outputs), there is a parameter you can use to
308 specify which one to use.
309  
310 :param azmltable: the AzureML table to convert
311 :param is_azml_output: set this to True if the `azmltable` was received from an actual AzureML web service.
312 Indeed in this case the table is usually wrapped in a dictionary that needs to be unwrapped.
313 :param table_name: the table name for error messages
314 :param swagger_mode: a boolean (default None) indicating if the 'swagger' azureml format should be used
315 to read the data table. If None is provided, no check will be performed. Otherwise an error will be raised if
316 the actual format does not correspond.
317 :return:
318 """
319 validate(table_name, azmltable, instance_of=(list, dict))
320  
321 is_swagger_format = isinstance(azmltable, list)
322  
323 if not is_swagger_format and is_azml_output:
324 if 'type' in azmltable.keys() and 'value' in azmltable.keys():
325 if azmltable['type'] == 'table':
326 # use this method recursively, in 'not output' mode
327 # noinspection PyTypeChecker
328 return azmltable_to_df(azmltable['value'], table_name=table_name)
329 else:
330 raise ValueError("This method is able to read table objects, found type=%s" % azmltable['type'])
331 else:
332 raise ValueError("object should be a dictionary with two fields 'type' and 'value', found: %s for "
333 "table object: %s" % (azmltable.keys(), table_name))
334 else:
335 if is_swagger_format:
336 # swagger format
337 if swagger_mode is not None and not swagger_mode:
338 raise ValueError("Data table is in swagger format while non-swagger format is supposed to be received")
339 values = []
340 if len(azmltable) > 0:
341 col_names = list(azmltable[0].keys())
342 for i, row in enumerate(azmltable):
343 try:
344 row_vals = [row[k] for k in col_names]
345 values.append(row_vals)
346 if len(row) > len(col_names):
347 new_cols = set(row.keys()) - set(col_names)
348 raise ValueError("Columns are present in row #%s but not in the first row: "
349 "%s" % (i + 1, new_cols))
350 except KeyError as e:
351 raise ValueError("A column is missing in row #%s: %s" % (i + 1, e))
352 else:
353 col_names = []
354  
355 else:
356 if 'ColumnNames' in azmltable.keys() and 'Values' in azmltable.keys():
357 # non-swagger format
358 if swagger_mode is not None and swagger_mode:
359 raise ValueError(
360 "Data table is in non-swagger format while swagger format is supposed to be received")
361  
362 values = azmltable['Values']
363 col_names = azmltable['ColumnNames']
364 else:
365 raise ValueError("object should be a list or a dictionary with two fields ColumnNames and Values, "
366 "found: %s for table object: %s" % (azmltable.keys(), table_name))
367  
368 if len(values) > 0:
369 # # create DataFrame manually
370 # c = pandas.DataFrame(np.array(values), columns=dictio['ColumnNames'])
371 #
372 # # auto-parse dates and floats
373 # for column in dictio['ColumnNames']:
374 # # try to parse as datetime
375 # try:
376 # c[column] = c[column].apply(dateutil.parser.parse)
377 # except ValueError:
378 # pass
379 #
380 # #try to parse as float
381 # # ...
382  
383 # Easier: use pandas csv parser to infer most of the types
384 # -- for that we first dump in a buffer in a CSV format
385 buffer = create_dest_buffer_for_csv()
386 writer = csv.writer(buffer, dialect='unix')
387 writer.writerows([col_names])
388 writer.writerows(values)
389 # -- and then we parse with pandas
390 res = csv_to_df(create_reading_buffer(buffer.getvalue())) # StringIO
391 buffer.close()
392  
393 else:
394 # empty DataFrame
395 res = pandas.DataFrame(columns=col_names)
396  
397 return res
398  
399  
400 def azmltables_to_dfs(azmltables_dict, # type: Dict[str, Dict[str, Union[str, Dict[str, List]]]]
401 is_azureml_output=False # type: bool
402 ):
403 # type: (...) -> Dict[str, pandas.DataFrame]
404  
405 validate('azmltables_dict', azmltables_dict, instance_of=dict)
406  
407 return {input_name: azmltable_to_df(dict_table, is_azml_output=is_azureml_output, table_name=input_name)
408 for input_name, dict_table in azmltables_dict.items()}
409  
410  
411 def params_df_to_params_dict(params_df # type: pandas.DataFrame
412 ):
413 # type: (...) -> Dict[str, str]
414 """
415 Converts a parameters DataFrame into a dictionary following the structure required for JSON conversion
416  
417 :param params_df: a dictionary of parameter names and values
418 :return: a dictionary of parameter names and values
419 """
420 validate('params_df', params_df, instance_of=pandas.DataFrame)
421 return {param_name: params_df.at[0, param_name] for param_name in params_df.columns.values}
422  
423  
424 def params_dict_to_params_df(params_dict # type: Dict[str, Any]
425 ):
426 # type: (...) -> pandas.DataFrame
427 """
428 Converts a parameter dictionary into a parameter DataFrame
429  
430 :param params_dict:
431 :return:
432 """
433 validate('params_dict', params_dict, instance_of=dict)
434  
435 # create a single-row DataFrame
436 return pandas.DataFrame(params_dict, index=[0])
437  
438  
439 def azmltable_to_json(azmltable # type: Union[AzmlTable, AzmlOutputTable]
440 ):
441 # type: (...) -> str
442 """
443 Transforms an AzureML table to a JSON string.
444 Datetimes are converted using ISO format.
445  
446 :param azmltable:
447 :return:
448 """
449 # dump using our custom serializer so that types are supported by AzureML
450 return json.dumps(azmltable, default=azml_json_serializer)
451  
  • W293 Blank line contains whitespace
452
453 def json_to_azmltable(json_str # type: str
454 ):
455 # type: (...) -> Union[AzmlTable, AzmlOutputTable]
456 """
457 Creates an AzureML table from a json string.
458  
459 :param json_str:
460 :return:
461 """
462 # load but keep order: use an ordered dict
463 return json.loads(json_str, object_pairs_hook=OrderedDict)
464  
465  
466 if sys.version_info >= (3, 0, 0):
467 PRIM_TYPES = (int, str, bool)
468 else:
  • S307 Use of possibly insecure function - consider using safer ast.literal_eval.
469 PRIM_TYPES = (int, str, bool, eval('long'))
470  
471  
472 def to_jsonable_primitive(obj,
473 replace_NaN_with=None, # type: Any
474 replace_NaT_with=None # type: Any
475 ):
476 """
477 Converts the given item (should NOT be a container) to a json-able one.
478  
479 :param obj:
480 :param replace_NaN_with:
481 :param replace_NaT_with:
482 :return:
483 """
484 if isinstance(obj, float):
485 if isnan(obj):
486 return replace_NaN_with or obj
487 else:
488 return obj
489 elif isinstance(obj, PRIM_TYPES): # , dict, list, tuple, set
490 return obj
491 else:
492 return azml_json_serializer(obj, replace_NaT_with=replace_NaT_with)
493  
494  
495 def azml_json_serializer(obj,
496 replace_NaT_with=None # type: Any
497 ):
498 """
499 JSON custom serializer for objects not serializable by default json code
500  
501 :param obj:
502 :return:
503 """
504 if isinstance(obj, np.integer):
505 # since numpy ints are also bools, do ints first
506 return int(obj)
507 elif isinstance(obj, bool):
508 return bool(obj)
509 elif isinstance(obj, np.floating):
510 return float(obj)
511 elif isinstance(obj, np.ndarray):
512 return obj.tolist()
513 elif isinstance(obj, datetime): # or isinstance(obj, np.generic) and obj.kind='M':
514 # Datetime are written as ISO format string
515 if pandas.isnull(obj):
516 return replace_NaT_with or obj.isoformat() # returns "NaT"
517 else:
518 return obj.isoformat()
519 else:
520 raise TypeError("Type not serializable : " + str(obj))
521  
522  
523 def convert_all_datetime_columns(df):
524 """
525 Utility method to try to convert all datetime columns in the provided DataFrame, inplace.
526 Note that only columns with dtype 'object' are considered as possible candidates.
527  
528 :param df:
529 :return:
530 """
531 objColumns = [colName for colName, colType in df.dtypes.iteritems() if colType == np.dtype('O')] # noqa
532 for obj_col_name in objColumns:
533 try:
534 df[obj_col_name] = pandas.to_datetime(df[obj_col_name])
  • S110 Try, Except, Pass detected.
535 except Exception:
536 # silently escape, do not convert
537 pass
538  
539  
540 def localize_all_datetime_columns(df):
541 """
542 Localizes all datetime columns in df, inplace.
543 :param df:
544 :return:
545 """
546 datetime_cols = [colName for colName, colType in df.dtypes.iteritems() if is_datetime_dtype(colType)] # noqa
547 for datetime_col in datetime_cols:
548 # time is in ISO format, so the time column after import is UTC. We just have to declare it
549 try:
550 df[datetime_col] = df[datetime_col].dt.tz_localize(tz="UTC")
551 except TypeError:
552 df[datetime_col] = df[datetime_col].dt.tz_convert(tz="UTC")
553  
554  
555 def is_datetime_dtype(dtyp):
556 """
557 Returns True if the given dtype is a datetime dtype
558 :param dtyp:
559 :return:
560 """
561 # return np.issubdtype(dtyp.base, np.dtype(np.datetime64)) -> does not work for int64
562 return dtyp.kind == 'M'