⬅ azmlclient/base_databinding_blobs.py source

1 # Authors: Sylvain MARIE <sylvain.marie@se.com>
2 # + All contributors to <https://github.com/smarie/python-azureml-client>
3 #
4 # License: 3-clause BSD, <https://github.com/smarie/python-azureml-client/blob/master/LICENSE>
5 from io import BytesIO # to handle byte strings
6 from io import StringIO # to handle unicode strings
7  
8 from requests import Session
9 from valid8 import validate
10 import pandas as pd
11  
12 try: # python 3.5+
13 from typing import Dict, Union, List, Any, Tuple
14  
15 # a few predefined type hints
16 SwaggerModeAzmlTable = List[Dict[str, Any]]
17 NonSwaggerModeAzmlTable = Dict[str, Union[List[str], List[List[Any]]]]
18 AzmlTable = Union[SwaggerModeAzmlTable, NonSwaggerModeAzmlTable]
19 AzmlOutputTable = Dict[str, Union[str, AzmlTable]]
20 AzmlBlobTable = Dict[str, str]
21 except ImportError:
22 pass
23  
24  
25 from azure.storage.blob import BlockBlobService, ContentSettings
26 from azmlclient.base_databinding import csv_to_df, df_to_csv
27  
28  
29 def csv_to_blob_ref(csv_str, # type: str
30 blob_service, # type: BlockBlobService
31 blob_container, # type: str
32 blob_name, # type: str
33 blob_path_prefix=None, # type: str
34 charset=None # type: str
35 ):
36 # type: (...) -> AzmlBlobTable
37 """
38 Uploads the provided CSV to the selected Blob Storage service, and returns a reference to the created blob in
39 case of success.
40  
41 :param csv_str:
42 :param blob_service: the BlockBlobService to use, defining the connection string
43 :param blob_container: the name of the blob storage container to use. This is the "root folder" in azure blob
44 storage wording.
45 :param blob_name: the "file name" of the blob, ending with .csv or not (in which case the .csv suffix will be
46 appended)
47 :param blob_path_prefix: an optional folder prefix that will be used to store your blob inside the container.
48 For example "path/to/my/"
49 :param charset:
50 :return:
51 """
52 # setup the charset used for file encoding
53 if charset is None:
54 charset = 'utf-8'
55 elif charset != 'utf-8':
  • T001 Print found.
56 print("Warning: blobs can be written in any charset but currently only utf-8 blobs may be read back into "
57 "DataFrames. We recommend setting charset to None or utf-8 ")
58  
59 # validate inputs (the only one that is not validated below)
60 validate('csv_str', csv_str, instance_of=str)
61  
62 # 1- first create the references in order to check all params are ok
63 blob_reference, blob_full_name = create_blob_ref(blob_service=blob_service, blob_container=blob_container,
64 blob_path_prefix=blob_path_prefix, blob_name=blob_name)
65  
66 # -- push blob
67 blob_stream = BytesIO(csv_str.encode(encoding=charset))
68 # noinspection PyTypeChecker
69 blob_service.create_blob_from_stream(blob_container, blob_full_name, blob_stream,
70 content_settings=ContentSettings(content_type='text.csv',
71 content_encoding=charset))
72 # (For old method with temporary files: see git history)
73  
74 return blob_reference
75  
76  
77 def csvs_to_blob_refs(csvs_dict, # type: Dict[str, str]
78 blob_service, # type: BlockBlobService
79 blob_container, # type: str
80 blob_path_prefix=None, # type: str
81 blob_name_prefix=None, # type: str
82 charset=None # type: str
83 ):
84 # type: (...) -> Dict[str, Dict[str, str]]
85 """
86 Utility method to push all inputs described in the provided dictionary into the selected blob storage on the cloud.
87 Each input is an entry of the dictionary and containing the description of the input reference as dictionary.
88 The string will be written to the blob using the provided charset.
89 Note: files created on the blob storage will have names generated from the current time and the input name, and will
90 be stored in
91  
92 :param csvs_dict:
93 :param blob_service:
94 :param blob_container:
95 :param blob_path_prefix: the optional prefix that will be prepended to all created blobs in the container
96 :param blob_name_prefix: the optional prefix that will be prepended to all created blob names in the container
97 :param charset: an optional charset to be used, by default utf-8 is used
98 :return: a dictionary of "by reference" input descriptions as dictionaries
99 """
100  
101 validate('csvs_dict', csvs_dict, instance_of=dict)
102 if blob_name_prefix is None:
103 blob_name_prefix = ""
104 else:
105 validate('blob_name_prefix', blob_name_prefix, instance_of=str)
106  
107 return {blobName: csv_to_blob_ref(csvStr, blob_service=blob_service, blob_container=blob_container,
  • W291 Trailing whitespace
108 blob_path_prefix=blob_path_prefix, blob_name=blob_name_prefix + blobName,
109 charset=charset)
110 for blobName, csvStr in csvs_dict.items()}
111  
112  
113 def blob_ref_to_csv(blob_reference, # type: AzmlBlobTable
114 blob_name=None, # type: str
115 encoding=None, # type: str
116 requests_session=None # type: Session
117 ):
118 """
119 Reads a CSV stored in a Blob Storage and referenced according to the format defined by AzureML, and transforms
120 it into a DataFrame.
121  
122 :param blob_reference: a (AzureML json-like) dictionary representing a table stored as a csv in a blob storage.
123 :param blob_name: blob name for error messages
124 :param encoding: an optional encoding to use to read the blob
125 :param requests_session: an optional Session object that should be used for the HTTP communication
126 :return:
127 """
128 validate(blob_name, blob_reference, instance_of=dict)
129  
130 if encoding is not None and encoding != 'utf-8':
131 raise ValueError("Unsupported encoding to retrieve blobs : %s" % encoding)
132  
133 if ('ConnectionString' in blob_reference.keys()) and ('RelativeLocation' in blob_reference.keys()):
134  
135 # create the Blob storage client for this account
136 blob_service = BlockBlobService(connection_string=blob_reference['ConnectionString'],
137 request_session=requests_session)
138  
139 # find the container and blob path
140 container, name = blob_reference['RelativeLocation'].split(sep='/', maxsplit=1)
141  
142 # retrieve it and convert
143 # -- this works but is probably less optimized for big blobs that get chunked, than using streaming
144 blob_string = blob_service.get_blob_to_text(blob_name=name, container_name=container)
145 return blob_string.content
146  
147 else:
148 raise ValueError(
149 'Blob reference is invalid: it should contain ConnectionString and RelativeLocation fields')
150  
151  
152 def blob_refs_to_csvs(blob_refs, # type: Dict[str, Dict[str, str]]
153 charset=None, # type: str
154 requests_session=None # type: Session
155 ):
156 # type: (...) -> Dict[str, str]
157 """
158  
159 :param blob_refs:
160 :param charset:
161 :param requests_session: an optional Session object that should be used for the HTTP communication
162 :return:
163 """
164  
165 validate('blob_refs', blob_refs, instance_of=dict)
166  
  • W291 Trailing whitespace
167 return {blobName: blob_ref_to_csv(csvBlobRef, encoding=charset, blob_name=blobName,
168 requests_session=requests_session)
169 for blobName, csvBlobRef in blob_refs.items()}
170  
171  
172 def df_to_blob_ref(df, # type: pd.DataFrame
173 blob_service, # type: BlockBlobService
174 blob_container, # type: str
175 blob_name, # type: str
176 blob_path_prefix=None, # type: str
177 charset=None # type: str
178 ):
179 # type: (...) -> Dict[str, str]
180 """
181 Uploads the provided DataFrame to the selected Blob Storage service as a CSV file blob, and returns a reference
182 to the created blob in case of success.
183  
184 :param df:
185 :param blob_service: the BlockBlobService to use, defining the connection string
186 :param blob_container: the name of the blob storage container to use. This is the "root folder" in azure blob
187 storage wording.
188 :param blob_name: the "file name" of the blob, ending with .csv or not (in which case the .csv suffix will be
189 appended)
190 :param blob_path_prefix: an optional folder prefix that will be used to store your blob inside the container.
191 For example "path/to/my/"
192 :param charset: the charset to use to encode the blob (default and recommended: 'utf-8')
193 :return:
194 """
195  
196 # create the csv
197 csv_str = df_to_csv(df, df_name=blob_name, charset=charset)
198  
199 # upload it
200 return csv_to_blob_ref(csv_str, blob_service=blob_service, blob_container=blob_container,
201 blob_path_prefix=blob_path_prefix, blob_name=blob_name, charset=charset)
202  
203  
204 def dfs_to_blob_refs(dfs_dict, # type: Dict[str, pd.DataFrame]
205 blob_service, # type: BlockBlobService
206 blob_container, # type: str
207 blob_path_prefix=None, # type: str
208 blob_name_prefix=None, # type: str
209 charset=None # type: str
210 ):
211 # type: (...) -> Dict[str, Dict[str, str]]
212  
213 validate('DataFramesDict', dfs_dict, instance_of=dict)
214  
215 return {blobName: df_to_blob_ref(csvStr, blob_service=blob_service, blob_container=blob_container,
  • W291 Trailing whitespace
216 blob_path_prefix=blob_path_prefix, blob_name=blob_name_prefix + blobName,
217 charset=charset)
218 for blobName, csvStr in dfs_dict.items()}
219  
220  
221 def blob_ref_to_df(blob_reference, # type: AzmlBlobTable
222 blob_name=None, # type: str
223 encoding=None, # type: str
224 requests_session=None # type: Session
225 ):
226 """
227 Reads a CSV blob referenced according to the format defined by AzureML, and transforms it into a DataFrame
228  
229 :param blob_reference: a (AzureML json-like) dictionary representing a table stored as a csv in a blob storage.
230 :param blob_name: blob name for error messages
231 :param encoding: an optional encoding to use to read the blob
232 :param requests_session: an optional Session object that should be used for the HTTP communication
233 :return:
234 """
235 # TODO copy the blob_ref_to_csv method here and handle the blob in streaming mode to be big blobs
236 # chunking-compliant. However how to manage the buffer correctly, create the StringIO with correct encoding,
237 # and know the number of chunks that should be read in pandas.read_csv ? A lot to dig here to get it right...
238 #
239 # from io import TextIOWrapper
240 # contents = TextIOWrapper(buffer, encoding=charset, ...)
241 # blob = blob_service.get_blob_to_stream(blob_name=name, container_name=container, encoding=charset,
242 # stream=contents)
243  
  • W291 Trailing whitespace
244 blob_content = blob_ref_to_csv(blob_reference, blob_name=blob_name, encoding=encoding,
245 requests_session=requests_session)
246  
247 if len(blob_content) > 0:
248 # convert to DataFrame
249 return csv_to_df(StringIO(blob_content), blob_name)
250 else:
251 # empty blob > empty DataFrame
252 return pd.DataFrame()
253  
254  
255 def blob_refs_to_dfs(blob_refs, # type: Dict[str, Dict[str, str]]
256 charset=None, # type: str
257 requests_session=None # type: Session
258 ):
259 # type: (...) -> Dict[str, pd.DataFrame]
260 """
261 Reads Blob references, for example responses from an AzureMl Batch web service call, into a dictionary of
262 pandas DataFrame
263  
264 :param blob_refs: the json output description by reference for each output
265 :param charset:
266 :param requests_session: an optional Session object that should be used for the HTTP communication
267 :return: the dictionary of corresponding DataFrames mapped to the output names
268 """
269 validate('blob_refs', blob_refs, instance_of=dict)
270  
  • W291 Trailing whitespace
271 return {blobName: blob_ref_to_df(csvBlobRef, encoding=charset, blob_name=blobName,
272 requests_session=requests_session)
273 for blobName, csvBlobRef in blob_refs.items()}
274  
275  
276 def create_blob_ref(blob_service, # type: BlockBlobService
277 blob_container, # type: str
278 blob_name, # type: str
279 blob_path_prefix=None, # type: str
280 ):
281 # type: (...) -> Tuple[Dict[str, str], str]
282 """
283 Creates a reference in the AzureML format, to a csv blob stored on Azure Blob Storage, whether it exists or not.
284 The blob name can end with '.csv' or not, the code handles both.
285  
286 :param blob_service: the BlockBlobService to use, defining the connection string
287 :param blob_container: the name of the blob storage container to use. This is the "root folder" in azure blob
288 storage wording.
289 :param blob_name: the "file name" of the blob, ending with .csv or not (in which case the .csv suffix will be
290 appended)
291 :param blob_path_prefix: an optional folder prefix that will be used to store your blob inside the container.
292 For example "path/to/my/"
293 :return: a tuple. First element is the AzureML blob reference (a dict). Second element is the full blob name
294 """
295 # validate input (blob_service and blob_path_prefix are done below)
296 validate('blob_container', blob_container, instance_of=str)
297 validate('blob_name', blob_name, instance_of=str)
298  
299 # fix the blob name
300 if blob_name.lower().endswith('.csv'):
301 blob_name = blob_name[:-4]
302  
303 # validate blob service and get connection string
304 connection_str = _get_blob_service_connection_string(blob_service)
305  
306 # check the blob path prefix, append a trailing slash if necessary
307 blob_path_prefix = _get_valid_blob_path_prefix(blob_path_prefix)
308  
309 # output reference and full name
310 blob_full_name = '%s%s.csv' % (blob_path_prefix, blob_name)
311 relative_location = "%s/%s" % (blob_container, blob_full_name)
312 output_ref = {'ConnectionString': connection_str,
313 'RelativeLocation': relative_location}
314  
315 return output_ref, blob_full_name
316  
317  
318 def create_blob_refs(blob_service, # type: BlockBlobService
319 blob_container, # type: str
320 blob_names, # type: List[str]
321 blob_path_prefix=None, # type: str
322 blob_name_prefix=None # type: str
323 ):
324 # type: (...) -> Dict[str, AzmlBlobTable]
325 """
326 Utility method to create one or several blob references on the same container on the same blob storage service.
327  
328 :param blob_service:
329 :param blob_container:
330 :param blob_names:
331 :param blob_path_prefix: optional prefix to the blob names
332 :param blob_name_prefix:
333 :return:
334 """
335 validate('blob_names', blob_names, instance_of=list)
336 if blob_name_prefix is None:
337 blob_name_prefix = ""
338 else:
339 validate('blob_name_prefix', blob_name_prefix, instance_of=str)
340  
341 # convert all and return in a dict
342 return {blob_name: create_blob_ref(blob_service, blob_container, blob_name_prefix + blob_name,
343 blob_path_prefix=blob_path_prefix)[0]
344 for blob_name in blob_names}
345  
346  
347 def _get_valid_blob_path_prefix(blob_path_prefix # type: str
348 ):
349 # type: (...) -> str
350 """
351 Utility method to get a valid blob path prefix from a provided one. A trailing slash is added if non-empty
352  
353 :param blob_path_prefix:
354 :return:
355 """
356 validate('blob_path_prefix', blob_path_prefix, instance_of=str, enforce_not_none=False)
357  
358 if blob_path_prefix is None:
359 blob_path_prefix = ''
360 elif isinstance(blob_path_prefix, str):
361 if len(blob_path_prefix) > 0 and not blob_path_prefix.endswith('/'):
362 blob_path_prefix = blob_path_prefix + '/'
363 else:
364 raise TypeError("Blob path prefix should be a valid string or not be provided (default is empty string)")
365  
366 return blob_path_prefix
367  
368  
369 def _get_blob_service_connection_string(blob_service # type: BlockBlobService
370 ):
371 # type: (...) -> str
372 """
373 Utility method to get the connection string for a blob storage service (currently the BlockBlobService does
374 not provide any method to do that)
375  
376 :param blob_service:
377 :return:
378 """
379 validate('blob_service', blob_service, instance_of=BlockBlobService)
380  
381 return "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s" \
382 "" % (blob_service.account_name, blob_service.account_key)