Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Authors: Sylvain MARIE <sylvain.marie@se.com>
2# + All contributors to <https://github.com/smarie/python-azureml-client>
3#
4# License: 3-clause BSD, <https://github.com/smarie/python-azureml-client/blob/master/LICENSE>
5from io import BytesIO # to handle byte strings
6from io import StringIO # to handle unicode strings
8from requests import Session
9from valid8 import validate
10import pandas as pd
12try: # python 3.5+
13 from typing import Dict, Union, List, Any, Tuple
15 # a few predefined type hints
16 SwaggerModeAzmlTable = List[Dict[str, Any]]
17 NonSwaggerModeAzmlTable = Dict[str, Union[List[str], List[List[Any]]]]
18 AzmlTable = Union[SwaggerModeAzmlTable, NonSwaggerModeAzmlTable]
19 AzmlOutputTable = Dict[str, Union[str, AzmlTable]]
20 AzmlBlobTable = Dict[str, str]
21except ImportError:
22 pass
25from azure.storage.blob import BlockBlobService, ContentSettings
26from azmlclient.base_databinding import csv_to_df, df_to_csv
29def csv_to_blob_ref(csv_str, # type: str
30 blob_service, # type: BlockBlobService
31 blob_container, # type: str
32 blob_name, # type: str
33 blob_path_prefix=None, # type: str
34 charset=None # type: str
35 ):
36 # type: (...) -> AzmlBlobTable
37 """
38 Uploads the provided CSV to the selected Blob Storage service, and returns a reference to the created blob in
39 case of success.
41 :param csv_str:
42 :param blob_service: the BlockBlobService to use, defining the connection string
43 :param blob_container: the name of the blob storage container to use. This is the "root folder" in azure blob
44 storage wording.
45 :param blob_name: the "file name" of the blob, ending with .csv or not (in which case the .csv suffix will be
46 appended)
47 :param blob_path_prefix: an optional folder prefix that will be used to store your blob inside the container.
48 For example "path/to/my/"
49 :param charset:
50 :return:
51 """
52 # setup the charset used for file encoding
53 if charset is None:
54 charset = 'utf-8'
55 elif charset != 'utf-8':
56 print("Warning: blobs can be written in any charset but currently only utf-8 blobs may be read back into "
57 "DataFrames. We recommend setting charset to None or utf-8 ")
59 # validate inputs (the only one that is not validated below)
60 validate('csv_str', csv_str, instance_of=str)
62 # 1- first create the references in order to check all params are ok
63 blob_reference, blob_full_name = create_blob_ref(blob_service=blob_service, blob_container=blob_container,
64 blob_path_prefix=blob_path_prefix, blob_name=blob_name)
66 # -- push blob
67 blob_stream = BytesIO(csv_str.encode(encoding=charset))
68 # noinspection PyTypeChecker
69 blob_service.create_blob_from_stream(blob_container, blob_full_name, blob_stream,
70 content_settings=ContentSettings(content_type='text.csv',
71 content_encoding=charset))
72 # (For old method with temporary files: see git history)
74 return blob_reference
77def csvs_to_blob_refs(csvs_dict, # type: Dict[str, str]
78 blob_service, # type: BlockBlobService
79 blob_container, # type: str
80 blob_path_prefix=None, # type: str
81 blob_name_prefix=None, # type: str
82 charset=None # type: str
83 ):
84 # type: (...) -> Dict[str, Dict[str, str]]
85 """
86 Utility method to push all inputs described in the provided dictionary into the selected blob storage on the cloud.
87 Each input is an entry of the dictionary and containing the description of the input reference as dictionary.
88 The string will be written to the blob using the provided charset.
89 Note: files created on the blob storage will have names generated from the current time and the input name, and will
90 be stored in
92 :param csvs_dict:
93 :param blob_service:
94 :param blob_container:
95 :param blob_path_prefix: the optional prefix that will be prepended to all created blobs in the container
96 :param blob_name_prefix: the optional prefix that will be prepended to all created blob names in the container
97 :param charset: an optional charset to be used, by default utf-8 is used
98 :return: a dictionary of "by reference" input descriptions as dictionaries
99 """
101 validate('csvs_dict', csvs_dict, instance_of=dict)
102 if blob_name_prefix is None:
103 blob_name_prefix = ""
104 else:
105 validate('blob_name_prefix', blob_name_prefix, instance_of=str)
107 return {blobName: csv_to_blob_ref(csvStr, blob_service=blob_service, blob_container=blob_container,
108 blob_path_prefix=blob_path_prefix, blob_name=blob_name_prefix + blobName,
109 charset=charset)
110 for blobName, csvStr in csvs_dict.items()}
113def blob_ref_to_csv(blob_reference, # type: AzmlBlobTable
114 blob_name=None, # type: str
115 encoding=None, # type: str
116 requests_session=None # type: Session
117 ):
118 """
119 Reads a CSV stored in a Blob Storage and referenced according to the format defined by AzureML, and transforms
120 it into a DataFrame.
122 :param blob_reference: a (AzureML json-like) dictionary representing a table stored as a csv in a blob storage.
123 :param blob_name: blob name for error messages
124 :param encoding: an optional encoding to use to read the blob
125 :param requests_session: an optional Session object that should be used for the HTTP communication
126 :return:
127 """
128 validate(blob_name, blob_reference, instance_of=dict)
130 if encoding is not None and encoding != 'utf-8':
131 raise ValueError("Unsupported encoding to retrieve blobs : %s" % encoding)
133 if ('ConnectionString' in blob_reference.keys()) and ('RelativeLocation' in blob_reference.keys()):
135 # create the Blob storage client for this account
136 blob_service = BlockBlobService(connection_string=blob_reference['ConnectionString'],
137 request_session=requests_session)
139 # find the container and blob path
140 container, name = blob_reference['RelativeLocation'].split(sep='/', maxsplit=1)
142 # retrieve it and convert
143 # -- this works but is probably less optimized for big blobs that get chunked, than using streaming
144 blob_string = blob_service.get_blob_to_text(blob_name=name, container_name=container)
145 return blob_string.content
147 else:
148 raise ValueError(
149 'Blob reference is invalid: it should contain ConnectionString and RelativeLocation fields')
152def blob_refs_to_csvs(blob_refs, # type: Dict[str, Dict[str, str]]
153 charset=None, # type: str
154 requests_session=None # type: Session
155 ):
156 # type: (...) -> Dict[str, str]
157 """
159 :param blob_refs:
160 :param charset:
161 :param requests_session: an optional Session object that should be used for the HTTP communication
162 :return:
163 """
165 validate('blob_refs', blob_refs, instance_of=dict)
167 return {blobName: blob_ref_to_csv(csvBlobRef, encoding=charset, blob_name=blobName,
168 requests_session=requests_session)
169 for blobName, csvBlobRef in blob_refs.items()}
172def df_to_blob_ref(df, # type: pd.DataFrame
173 blob_service, # type: BlockBlobService
174 blob_container, # type: str
175 blob_name, # type: str
176 blob_path_prefix=None, # type: str
177 charset=None # type: str
178 ):
179 # type: (...) -> Dict[str, str]
180 """
181 Uploads the provided DataFrame to the selected Blob Storage service as a CSV file blob, and returns a reference
182 to the created blob in case of success.
184 :param df:
185 :param blob_service: the BlockBlobService to use, defining the connection string
186 :param blob_container: the name of the blob storage container to use. This is the "root folder" in azure blob
187 storage wording.
188 :param blob_name: the "file name" of the blob, ending with .csv or not (in which case the .csv suffix will be
189 appended)
190 :param blob_path_prefix: an optional folder prefix that will be used to store your blob inside the container.
191 For example "path/to/my/"
192 :param charset: the charset to use to encode the blob (default and recommended: 'utf-8')
193 :return:
194 """
196 # create the csv
197 csv_str = df_to_csv(df, df_name=blob_name, charset=charset)
199 # upload it
200 return csv_to_blob_ref(csv_str, blob_service=blob_service, blob_container=blob_container,
201 blob_path_prefix=blob_path_prefix, blob_name=blob_name, charset=charset)
204def dfs_to_blob_refs(dfs_dict, # type: Dict[str, pd.DataFrame]
205 blob_service, # type: BlockBlobService
206 blob_container, # type: str
207 blob_path_prefix=None, # type: str
208 blob_name_prefix=None, # type: str
209 charset=None # type: str
210 ):
211 # type: (...) -> Dict[str, Dict[str, str]]
213 validate('DataFramesDict', dfs_dict, instance_of=dict)
215 return {blobName: df_to_blob_ref(csvStr, blob_service=blob_service, blob_container=blob_container,
216 blob_path_prefix=blob_path_prefix, blob_name=blob_name_prefix + blobName,
217 charset=charset)
218 for blobName, csvStr in dfs_dict.items()}
221def blob_ref_to_df(blob_reference, # type: AzmlBlobTable
222 blob_name=None, # type: str
223 encoding=None, # type: str
224 requests_session=None # type: Session
225 ):
226 """
227 Reads a CSV blob referenced according to the format defined by AzureML, and transforms it into a DataFrame
229 :param blob_reference: a (AzureML json-like) dictionary representing a table stored as a csv in a blob storage.
230 :param blob_name: blob name for error messages
231 :param encoding: an optional encoding to use to read the blob
232 :param requests_session: an optional Session object that should be used for the HTTP communication
233 :return:
234 """
235 # TODO copy the blob_ref_to_csv method here and handle the blob in streaming mode to be big blobs
236 # chunking-compliant. However how to manage the buffer correctly, create the StringIO with correct encoding,
237 # and know the number of chunks that should be read in pandas.read_csv ? A lot to dig here to get it right...
238 #
239 # from io import TextIOWrapper
240 # contents = TextIOWrapper(buffer, encoding=charset, ...)
241 # blob = blob_service.get_blob_to_stream(blob_name=name, container_name=container, encoding=charset,
242 # stream=contents)
244 blob_content = blob_ref_to_csv(blob_reference, blob_name=blob_name, encoding=encoding,
245 requests_session=requests_session)
247 if len(blob_content) > 0:
248 # convert to DataFrame
249 return csv_to_df(StringIO(blob_content), blob_name)
250 else:
251 # empty blob > empty DataFrame
252 return pd.DataFrame()
255def blob_refs_to_dfs(blob_refs, # type: Dict[str, Dict[str, str]]
256 charset=None, # type: str
257 requests_session=None # type: Session
258 ):
259 # type: (...) -> Dict[str, pd.DataFrame]
260 """
261 Reads Blob references, for example responses from an AzureMl Batch web service call, into a dictionary of
262 pandas DataFrame
264 :param blob_refs: the json output description by reference for each output
265 :param charset:
266 :param requests_session: an optional Session object that should be used for the HTTP communication
267 :return: the dictionary of corresponding DataFrames mapped to the output names
268 """
269 validate('blob_refs', blob_refs, instance_of=dict)
271 return {blobName: blob_ref_to_df(csvBlobRef, encoding=charset, blob_name=blobName,
272 requests_session=requests_session)
273 for blobName, csvBlobRef in blob_refs.items()}
276def create_blob_ref(blob_service, # type: BlockBlobService
277 blob_container, # type: str
278 blob_name, # type: str
279 blob_path_prefix=None, # type: str
280 ):
281 # type: (...) -> Tuple[Dict[str, str], str]
282 """
283 Creates a reference in the AzureML format, to a csv blob stored on Azure Blob Storage, whether it exists or not.
284 The blob name can end with '.csv' or not, the code handles both.
286 :param blob_service: the BlockBlobService to use, defining the connection string
287 :param blob_container: the name of the blob storage container to use. This is the "root folder" in azure blob
288 storage wording.
289 :param blob_name: the "file name" of the blob, ending with .csv or not (in which case the .csv suffix will be
290 appended)
291 :param blob_path_prefix: an optional folder prefix that will be used to store your blob inside the container.
292 For example "path/to/my/"
293 :return: a tuple. First element is the AzureML blob reference (a dict). Second element is the full blob name
294 """
295 # validate input (blob_service and blob_path_prefix are done below)
296 validate('blob_container', blob_container, instance_of=str)
297 validate('blob_name', blob_name, instance_of=str)
299 # fix the blob name
300 if blob_name.lower().endswith('.csv'):
301 blob_name = blob_name[:-4]
303 # validate blob service and get connection string
304 connection_str = _get_blob_service_connection_string(blob_service)
306 # check the blob path prefix, append a trailing slash if necessary
307 blob_path_prefix = _get_valid_blob_path_prefix(blob_path_prefix)
309 # output reference and full name
310 blob_full_name = '%s%s.csv' % (blob_path_prefix, blob_name)
311 relative_location = "%s/%s" % (blob_container, blob_full_name)
312 output_ref = {'ConnectionString': connection_str,
313 'RelativeLocation': relative_location}
315 return output_ref, blob_full_name
318def create_blob_refs(blob_service, # type: BlockBlobService
319 blob_container, # type: str
320 blob_names, # type: List[str]
321 blob_path_prefix=None, # type: str
322 blob_name_prefix=None # type: str
323 ):
324 # type: (...) -> Dict[str, AzmlBlobTable]
325 """
326 Utility method to create one or several blob references on the same container on the same blob storage service.
328 :param blob_service:
329 :param blob_container:
330 :param blob_names:
331 :param blob_path_prefix: optional prefix to the blob names
332 :param blob_name_prefix:
333 :return:
334 """
335 validate('blob_names', blob_names, instance_of=list)
336 if blob_name_prefix is None:
337 blob_name_prefix = ""
338 else:
339 validate('blob_name_prefix', blob_name_prefix, instance_of=str)
341 # convert all and return in a dict
342 return {blob_name: create_blob_ref(blob_service, blob_container, blob_name_prefix + blob_name,
343 blob_path_prefix=blob_path_prefix)[0]
344 for blob_name in blob_names}
347def _get_valid_blob_path_prefix(blob_path_prefix # type: str
348 ):
349 # type: (...) -> str
350 """
351 Utility method to get a valid blob path prefix from a provided one. A trailing slash is added if non-empty
353 :param blob_path_prefix:
354 :return:
355 """
356 validate('blob_path_prefix', blob_path_prefix, instance_of=str, enforce_not_none=False)
358 if blob_path_prefix is None:
359 blob_path_prefix = ''
360 elif isinstance(blob_path_prefix, str):
361 if len(blob_path_prefix) > 0 and not blob_path_prefix.endswith('/'):
362 blob_path_prefix = blob_path_prefix + '/'
363 else:
364 raise TypeError("Blob path prefix should be a valid string or not be provided (default is empty string)")
366 return blob_path_prefix
369def _get_blob_service_connection_string(blob_service # type: BlockBlobService
370 ):
371 # type: (...) -> str
372 """
373 Utility method to get the connection string for a blob storage service (currently the BlockBlobService does
374 not provide any method to do that)
376 :param blob_service:
377 :return:
378 """
379 validate('blob_service', blob_service, instance_of=BlockBlobService)
381 return "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s" \
382 "" % (blob_service.account_name, blob_service.account_key)