Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# Authors: Sylvain MARIE <sylvain.marie@se.com> 

2# + All contributors to <https://github.com/smarie/python-azureml-client> 

3# 

4# License: 3-clause BSD, <https://github.com/smarie/python-azureml-client/blob/master/LICENSE> 

5from io import BytesIO # to handle byte strings 

6from io import StringIO # to handle unicode strings 

7 

8from requests import Session 

9from valid8 import validate 

10import pandas as pd 

11 

12try: # python 3.5+ 

13 from typing import Dict, Union, List, Any, Tuple 

14 

15 # a few predefined type hints 

16 SwaggerModeAzmlTable = List[Dict[str, Any]] 

17 NonSwaggerModeAzmlTable = Dict[str, Union[List[str], List[List[Any]]]] 

18 AzmlTable = Union[SwaggerModeAzmlTable, NonSwaggerModeAzmlTable] 

19 AzmlOutputTable = Dict[str, Union[str, AzmlTable]] 

20 AzmlBlobTable = Dict[str, str] 

21except ImportError: 

22 pass 

23 

24 

25from azure.storage.blob import BlockBlobService, ContentSettings 

26from azmlclient.base_databinding import csv_to_df, df_to_csv 

27 

28 

29def csv_to_blob_ref(csv_str, # type: str 

30 blob_service, # type: BlockBlobService 

31 blob_container, # type: str 

32 blob_name, # type: str 

33 blob_path_prefix=None, # type: str 

34 charset=None # type: str 

35 ): 

36 # type: (...) -> AzmlBlobTable 

37 """ 

38 Uploads the provided CSV to the selected Blob Storage service, and returns a reference to the created blob in 

39 case of success. 

40 

41 :param csv_str: 

42 :param blob_service: the BlockBlobService to use, defining the connection string 

43 :param blob_container: the name of the blob storage container to use. This is the "root folder" in azure blob 

44 storage wording. 

45 :param blob_name: the "file name" of the blob, ending with .csv or not (in which case the .csv suffix will be 

46 appended) 

47 :param blob_path_prefix: an optional folder prefix that will be used to store your blob inside the container. 

48 For example "path/to/my/" 

49 :param charset: 

50 :return: 

51 """ 

52 # setup the charset used for file encoding 

53 if charset is None: 

54 charset = 'utf-8' 

55 elif charset != 'utf-8': 

56 print("Warning: blobs can be written in any charset but currently only utf-8 blobs may be read back into " 

57 "DataFrames. We recommend setting charset to None or utf-8 ") 

58 

59 # validate inputs (the only one that is not validated below) 

60 validate('csv_str', csv_str, instance_of=str) 

61 

62 # 1- first create the references in order to check all params are ok 

63 blob_reference, blob_full_name = create_blob_ref(blob_service=blob_service, blob_container=blob_container, 

64 blob_path_prefix=blob_path_prefix, blob_name=blob_name) 

65 

66 # -- push blob 

67 blob_stream = BytesIO(csv_str.encode(encoding=charset)) 

68 # noinspection PyTypeChecker 

69 blob_service.create_blob_from_stream(blob_container, blob_full_name, blob_stream, 

70 content_settings=ContentSettings(content_type='text.csv', 

71 content_encoding=charset)) 

72 # (For old method with temporary files: see git history) 

73 

74 return blob_reference 

75 

76 

77def csvs_to_blob_refs(csvs_dict, # type: Dict[str, str] 

78 blob_service, # type: BlockBlobService 

79 blob_container, # type: str 

80 blob_path_prefix=None, # type: str 

81 blob_name_prefix=None, # type: str 

82 charset=None # type: str 

83 ): 

84 # type: (...) -> Dict[str, Dict[str, str]] 

85 """ 

86 Utility method to push all inputs described in the provided dictionary into the selected blob storage on the cloud. 

87 Each input is an entry of the dictionary and containing the description of the input reference as dictionary. 

88 The string will be written to the blob using the provided charset. 

89 Note: files created on the blob storage will have names generated from the current time and the input name, and will 

90 be stored in 

91 

92 :param csvs_dict: 

93 :param blob_service: 

94 :param blob_container: 

95 :param blob_path_prefix: the optional prefix that will be prepended to all created blobs in the container 

96 :param blob_name_prefix: the optional prefix that will be prepended to all created blob names in the container 

97 :param charset: an optional charset to be used, by default utf-8 is used 

98 :return: a dictionary of "by reference" input descriptions as dictionaries 

99 """ 

100 

101 validate('csvs_dict', csvs_dict, instance_of=dict) 

102 if blob_name_prefix is None: 

103 blob_name_prefix = "" 

104 else: 

105 validate('blob_name_prefix', blob_name_prefix, instance_of=str) 

106 

107 return {blobName: csv_to_blob_ref(csvStr, blob_service=blob_service, blob_container=blob_container, 

108 blob_path_prefix=blob_path_prefix, blob_name=blob_name_prefix + blobName, 

109 charset=charset) 

110 for blobName, csvStr in csvs_dict.items()} 

111 

112 

113def blob_ref_to_csv(blob_reference, # type: AzmlBlobTable 

114 blob_name=None, # type: str 

115 encoding=None, # type: str 

116 requests_session=None # type: Session 

117 ): 

118 """ 

119 Reads a CSV stored in a Blob Storage and referenced according to the format defined by AzureML, and transforms 

120 it into a DataFrame. 

121 

122 :param blob_reference: a (AzureML json-like) dictionary representing a table stored as a csv in a blob storage. 

123 :param blob_name: blob name for error messages 

124 :param encoding: an optional encoding to use to read the blob 

125 :param requests_session: an optional Session object that should be used for the HTTP communication 

126 :return: 

127 """ 

128 validate(blob_name, blob_reference, instance_of=dict) 

129 

130 if encoding is not None and encoding != 'utf-8': 

131 raise ValueError("Unsupported encoding to retrieve blobs : %s" % encoding) 

132 

133 if ('ConnectionString' in blob_reference.keys()) and ('RelativeLocation' in blob_reference.keys()): 

134 

135 # create the Blob storage client for this account 

136 blob_service = BlockBlobService(connection_string=blob_reference['ConnectionString'], 

137 request_session=requests_session) 

138 

139 # find the container and blob path 

140 container, name = blob_reference['RelativeLocation'].split(sep='/', maxsplit=1) 

141 

142 # retrieve it and convert 

143 # -- this works but is probably less optimized for big blobs that get chunked, than using streaming 

144 blob_string = blob_service.get_blob_to_text(blob_name=name, container_name=container) 

145 return blob_string.content 

146 

147 else: 

148 raise ValueError( 

149 'Blob reference is invalid: it should contain ConnectionString and RelativeLocation fields') 

150 

151 

152def blob_refs_to_csvs(blob_refs, # type: Dict[str, Dict[str, str]] 

153 charset=None, # type: str 

154 requests_session=None # type: Session 

155 ): 

156 # type: (...) -> Dict[str, str] 

157 """ 

158 

159 :param blob_refs: 

160 :param charset: 

161 :param requests_session: an optional Session object that should be used for the HTTP communication 

162 :return: 

163 """ 

164 

165 validate('blob_refs', blob_refs, instance_of=dict) 

166 

167 return {blobName: blob_ref_to_csv(csvBlobRef, encoding=charset, blob_name=blobName, 

168 requests_session=requests_session) 

169 for blobName, csvBlobRef in blob_refs.items()} 

170 

171 

172def df_to_blob_ref(df, # type: pd.DataFrame 

173 blob_service, # type: BlockBlobService 

174 blob_container, # type: str 

175 blob_name, # type: str 

176 blob_path_prefix=None, # type: str 

177 charset=None # type: str 

178 ): 

179 # type: (...) -> Dict[str, str] 

180 """ 

181 Uploads the provided DataFrame to the selected Blob Storage service as a CSV file blob, and returns a reference 

182 to the created blob in case of success. 

183 

184 :param df: 

185 :param blob_service: the BlockBlobService to use, defining the connection string 

186 :param blob_container: the name of the blob storage container to use. This is the "root folder" in azure blob 

187 storage wording. 

188 :param blob_name: the "file name" of the blob, ending with .csv or not (in which case the .csv suffix will be 

189 appended) 

190 :param blob_path_prefix: an optional folder prefix that will be used to store your blob inside the container. 

191 For example "path/to/my/" 

192 :param charset: the charset to use to encode the blob (default and recommended: 'utf-8') 

193 :return: 

194 """ 

195 

196 # create the csv 

197 csv_str = df_to_csv(df, df_name=blob_name, charset=charset) 

198 

199 # upload it 

200 return csv_to_blob_ref(csv_str, blob_service=blob_service, blob_container=blob_container, 

201 blob_path_prefix=blob_path_prefix, blob_name=blob_name, charset=charset) 

202 

203 

204def dfs_to_blob_refs(dfs_dict, # type: Dict[str, pd.DataFrame] 

205 blob_service, # type: BlockBlobService 

206 blob_container, # type: str 

207 blob_path_prefix=None, # type: str 

208 blob_name_prefix=None, # type: str 

209 charset=None # type: str 

210 ): 

211 # type: (...) -> Dict[str, Dict[str, str]] 

212 

213 validate('DataFramesDict', dfs_dict, instance_of=dict) 

214 

215 return {blobName: df_to_blob_ref(csvStr, blob_service=blob_service, blob_container=blob_container, 

216 blob_path_prefix=blob_path_prefix, blob_name=blob_name_prefix + blobName, 

217 charset=charset) 

218 for blobName, csvStr in dfs_dict.items()} 

219 

220 

221def blob_ref_to_df(blob_reference, # type: AzmlBlobTable 

222 blob_name=None, # type: str 

223 encoding=None, # type: str 

224 requests_session=None # type: Session 

225 ): 

226 """ 

227 Reads a CSV blob referenced according to the format defined by AzureML, and transforms it into a DataFrame 

228 

229 :param blob_reference: a (AzureML json-like) dictionary representing a table stored as a csv in a blob storage. 

230 :param blob_name: blob name for error messages 

231 :param encoding: an optional encoding to use to read the blob 

232 :param requests_session: an optional Session object that should be used for the HTTP communication 

233 :return: 

234 """ 

235 # TODO copy the blob_ref_to_csv method here and handle the blob in streaming mode to be big blobs 

236 # chunking-compliant. However how to manage the buffer correctly, create the StringIO with correct encoding, 

237 # and know the number of chunks that should be read in pandas.read_csv ? A lot to dig here to get it right... 

238 # 

239 # from io import TextIOWrapper 

240 # contents = TextIOWrapper(buffer, encoding=charset, ...) 

241 # blob = blob_service.get_blob_to_stream(blob_name=name, container_name=container, encoding=charset, 

242 # stream=contents) 

243 

244 blob_content = blob_ref_to_csv(blob_reference, blob_name=blob_name, encoding=encoding, 

245 requests_session=requests_session) 

246 

247 if len(blob_content) > 0: 

248 # convert to DataFrame 

249 return csv_to_df(StringIO(blob_content), blob_name) 

250 else: 

251 # empty blob > empty DataFrame 

252 return pd.DataFrame() 

253 

254 

255def blob_refs_to_dfs(blob_refs, # type: Dict[str, Dict[str, str]] 

256 charset=None, # type: str 

257 requests_session=None # type: Session 

258 ): 

259 # type: (...) -> Dict[str, pd.DataFrame] 

260 """ 

261 Reads Blob references, for example responses from an AzureMl Batch web service call, into a dictionary of 

262 pandas DataFrame 

263 

264 :param blob_refs: the json output description by reference for each output 

265 :param charset: 

266 :param requests_session: an optional Session object that should be used for the HTTP communication 

267 :return: the dictionary of corresponding DataFrames mapped to the output names 

268 """ 

269 validate('blob_refs', blob_refs, instance_of=dict) 

270 

271 return {blobName: blob_ref_to_df(csvBlobRef, encoding=charset, blob_name=blobName, 

272 requests_session=requests_session) 

273 for blobName, csvBlobRef in blob_refs.items()} 

274 

275 

276def create_blob_ref(blob_service, # type: BlockBlobService 

277 blob_container, # type: str 

278 blob_name, # type: str 

279 blob_path_prefix=None, # type: str 

280 ): 

281 # type: (...) -> Tuple[Dict[str, str], str] 

282 """ 

283 Creates a reference in the AzureML format, to a csv blob stored on Azure Blob Storage, whether it exists or not. 

284 The blob name can end with '.csv' or not, the code handles both. 

285 

286 :param blob_service: the BlockBlobService to use, defining the connection string 

287 :param blob_container: the name of the blob storage container to use. This is the "root folder" in azure blob 

288 storage wording. 

289 :param blob_name: the "file name" of the blob, ending with .csv or not (in which case the .csv suffix will be 

290 appended) 

291 :param blob_path_prefix: an optional folder prefix that will be used to store your blob inside the container. 

292 For example "path/to/my/" 

293 :return: a tuple. First element is the AzureML blob reference (a dict). Second element is the full blob name 

294 """ 

295 # validate input (blob_service and blob_path_prefix are done below) 

296 validate('blob_container', blob_container, instance_of=str) 

297 validate('blob_name', blob_name, instance_of=str) 

298 

299 # fix the blob name 

300 if blob_name.lower().endswith('.csv'): 

301 blob_name = blob_name[:-4] 

302 

303 # validate blob service and get connection string 

304 connection_str = _get_blob_service_connection_string(blob_service) 

305 

306 # check the blob path prefix, append a trailing slash if necessary 

307 blob_path_prefix = _get_valid_blob_path_prefix(blob_path_prefix) 

308 

309 # output reference and full name 

310 blob_full_name = '%s%s.csv' % (blob_path_prefix, blob_name) 

311 relative_location = "%s/%s" % (blob_container, blob_full_name) 

312 output_ref = {'ConnectionString': connection_str, 

313 'RelativeLocation': relative_location} 

314 

315 return output_ref, blob_full_name 

316 

317 

318def create_blob_refs(blob_service, # type: BlockBlobService 

319 blob_container, # type: str 

320 blob_names, # type: List[str] 

321 blob_path_prefix=None, # type: str 

322 blob_name_prefix=None # type: str 

323 ): 

324 # type: (...) -> Dict[str, AzmlBlobTable] 

325 """ 

326 Utility method to create one or several blob references on the same container on the same blob storage service. 

327 

328 :param blob_service: 

329 :param blob_container: 

330 :param blob_names: 

331 :param blob_path_prefix: optional prefix to the blob names 

332 :param blob_name_prefix: 

333 :return: 

334 """ 

335 validate('blob_names', blob_names, instance_of=list) 

336 if blob_name_prefix is None: 

337 blob_name_prefix = "" 

338 else: 

339 validate('blob_name_prefix', blob_name_prefix, instance_of=str) 

340 

341 # convert all and return in a dict 

342 return {blob_name: create_blob_ref(blob_service, blob_container, blob_name_prefix + blob_name, 

343 blob_path_prefix=blob_path_prefix)[0] 

344 for blob_name in blob_names} 

345 

346 

347def _get_valid_blob_path_prefix(blob_path_prefix # type: str 

348 ): 

349 # type: (...) -> str 

350 """ 

351 Utility method to get a valid blob path prefix from a provided one. A trailing slash is added if non-empty 

352 

353 :param blob_path_prefix: 

354 :return: 

355 """ 

356 validate('blob_path_prefix', blob_path_prefix, instance_of=str, enforce_not_none=False) 

357 

358 if blob_path_prefix is None: 

359 blob_path_prefix = '' 

360 elif isinstance(blob_path_prefix, str): 

361 if len(blob_path_prefix) > 0 and not blob_path_prefix.endswith('/'): 

362 blob_path_prefix = blob_path_prefix + '/' 

363 else: 

364 raise TypeError("Blob path prefix should be a valid string or not be provided (default is empty string)") 

365 

366 return blob_path_prefix 

367 

368 

369def _get_blob_service_connection_string(blob_service # type: BlockBlobService 

370 ): 

371 # type: (...) -> str 

372 """ 

373 Utility method to get the connection string for a blob storage service (currently the BlockBlobService does 

374 not provide any method to do that) 

375 

376 :param blob_service: 

377 :return: 

378 """ 

379 validate('blob_service', blob_service, instance_of=BlockBlobService) 

380 

381 return "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s" \ 

382 "" % (blob_service.account_name, blob_service.account_key)