Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# Authors: Sylvain MARIE <sylvain.marie@se.com> 

2# + All contributors to <https://github.com/smarie/python-azureml-client> 

3# 

4# License: 3-clause BSD, <https://github.com/smarie/python-azureml-client/blob/master/LICENSE> 

5import json 

6import time 

7 

8from datetime import datetime 

9from warnings import warn 

10 

11from six import raise_from 

12 

13try: 

14 from urllib.error import HTTPError as Urllib_HTTPError 

15except ImportError: 

16 # create a dummy class 

17 class Urllib_HTTPError(Exception): 

18 pass 

19 

20import pandas as pd 

21import requests 

22 

23try: # python 3.5+ 

24 from typing import List, Dict, Tuple, Any, Union, Optional 

25except ImportError: 

26 pass 

27 

28from .requests_utils import set_http_proxy 

29from .base_databinding import AzmlException, dfs_to_azmltables, params_df_to_params_dict, azmltable_to_json, \ 

30 json_to_azmltable, azmltables_to_dfs 

31 

32 

33class IllegalJobStateException(Exception): 

34 """ This is raised whenever a job has illegal state""" 

35 

36 

37class JobExecutionException(Exception): 

38 """ This is raised whenever a job ended in failed mode""" 

39 

40 

41def create_session_for_proxy(http_proxyhost, # type: str 

42 http_proxyport, # type: int 

43 https_proxyhost=None, # type: str 

44 https_proxyport=None, # type: int 

45 use_http_for_https_proxy=False, # type: bool 

46 ssl_verify=None # type: Union[bool, str] 

47 ): 

48 # type: (...) -> requests.Session 

49 """ 

50 DEPRECATED - users should rather create a Session() and use set_http_proxy(session, **kwargs) instead 

51 

52 Helper method to configure the request package to use the proxy of your choice and adapt the SSL certificate 

53 validation accordingly. 

54 

55 ```python 

56 # create a temporary Session to use Fiddler as the network proxy 

57 debug_session = create_session_for_proxy('localhost', 8888, use_http_for_https_proxy=True, ssl_verify=False) 

58 

59 # use that session in a, AzureML web service call 

60 execute_rr(..., requests_session=debug_session) 

61 ``` 

62 

63 :param http_proxyhost: mandatory proxy host for http 

64 :param http_proxyport: mandatory proxy port for http 

65 :param https_proxyhost: optional proxy host for https. If none is provided, http_proxyhost will be used 

66 :param https_proxyport: optional proxy port for https. If none is provided, http_proxyport will be used 

67 :param use_http_for_https_proxy: optional, if set to true the http protocol will be used to initiate communications 

68 with the proxy even for https calls (then calls will be done in https as usual). 

69 :param ssl_verify: optional ssl verification parameter. It may either be the path to an additional certificate 

70 to trust (recommended), or a boolean to enable (default)/disable (not recommended ! use only in debug mode !) 

71 certificate validation. 

72 See here for details : http://docs.python-requests.org/en/master/user/advanced/#ssl-cert-verification 

73 :return: a requests.Session object that you may use with the rest of the library 

74 """ 

75 warn("This method is deprecated - please create a Session() and use set_http_proxy(session, **kwargs) instead") 

76 

77 session = requests.Session() 

78 

79 set_http_proxy(session, 

80 http_proxyhost=http_proxyhost, http_proxyport=http_proxyport, 

81 https_proxyhost=https_proxyhost, https_proxyport=https_proxyport, 

82 use_http_proxy_for_https_requests=use_http_for_https_proxy) 

83 

84 if ssl_verify is not None: 

85 session.verify = ssl_verify 

86 

87 return session 

88 

89 

90def execute_rr(api_key, # type: str 

91 base_url, # type: str 

92 inputs=None, # type: Dict[str, pd.DataFrame] 

93 params=None, # type: Union[pd.DataFrame, Dict[str, Any]] 

94 output_names=None, # type: List[str] 

95 only_keep_selected_output_names=False, # type: bool 

96 use_swagger_format=False, # type: bool 

97 replace_NaN_with=None, # type: Any 

98 replace_NaT_with=None, # type: Any 

99 requests_session=None # type: requests.Session 

100 ): 

101 # type: (...) -> Dict[str, pd.DataFrame] 

102 """ 

103 Executes an AzureMl web service in request-response (RR) mode. This mode is typically used when the web service does 

104 not take too long to execute. For longer operations you should use the batch mode (BES). 

105 

106 :param api_key: the api key for the AzureML web service to call. For example 'fdjmxkqktcuhifljflkdmw' 

107 :param base_url: the URL of the AzureML web service to call. It should not contain the "execute". This is typically 

108 in the form 'https://<geo>.services.azureml.net/workspaces/<wId>/services/<sId>'. 

109 :param inputs: an optional dictionary containing the inputs, by name. Inputs should be DataFrames. 

110 :param params: an optional dictionary containing the parameters by name, or a DataFrame containing the parameters. 

111 :param output_names: an optional list of expected output names, for automatic validation. 

112 :param only_keep_selected_output_names: a boolean (default False) to indicate if only the outputs selected in 

113 `output_names` should be kept in the returned dictionary. 

114 :param use_swagger_format: a boolean (default False) indicating if the 'swagger' azureml format should be used 

115 to format the data tables in json payloads. 

116 :param requests_session: an optional requests.Session object, for example created from create_session_for_proxy() 

117 :return: a dictionary of outputs, by name. Outputs are DataFrames 

118 """ 

119 # quick check before spending time with the query 

120 if output_names is None and only_keep_selected_output_names: 120 ↛ 121line 120 didn't jump to line 121, because the condition on line 120 was never true

121 raise ValueError("`only_keep_selected_output_names` can only be used with a non-None list of " 

122 "`output_names`") 

123 

124 # 0- Create the generic request-response client 

125 rr_client = RequestResponseClient(requests_session=requests_session, use_swagger_format=use_swagger_format, 

126 replace_NaN_with=replace_NaN_with, replace_NaT_with=replace_NaT_with) 

127 

128 # 1- Create the query body 

129 request_body = rr_client.create_request_body(inputs, params) 

130 

131 # 2- Execute the query and receive the response body 

132 response_body = rr_client.execute_rr(base_url, api_key, request_body) 

133 

134 # 3- parse the response body into a dictionary of DataFrames 

135 result_dfs = rr_client.read_response_json_body(response_body, output_names) 

136 

137 # 4- possibly filter outputs 

138 if only_keep_selected_output_names: 138 ↛ 139line 138 didn't jump to line 139, because the condition on line 138 was never true

139 selected_dfs = {k: result_dfs[k] for k in output_names} 

140 return selected_dfs 

141 else: 

142 return result_dfs 

143 

144 

145def execute_bes(api_key, # type: str 

146 base_url, # type: str 

147 blob_storage_account, # type: str 

148 blob_storage_apikey, # type: str 

149 blob_container, # type: str 

150 blob_path_prefix=None, # type: str 

151 blob_charset=None, # type: str 

152 inputs=None, # type: Dict[str, pd.DataFrame] 

153 params=None, 

154 output_names=None, # type: List[str] 

155 nb_seconds_between_status_queries=5, # type: int 

156 requests_session=None # type: requests.Session 

157 ): 

158 """ 

159 Executes an AzureMl web service in batch mode (BES: Batch Execution Service). 

160 

161 Its inputs are the same than `execute_rr` but in addition it takes information about the blob storage service to 

162 use. Indeed in batch mode, all inputs and outputs go through an intermediate blob storage. 

163 

164 The AzureML job status is queried every 5 seconds by default, you may wish to change that number with 

165 `nb_seconds_between_status_queries`. 

166 

167 :param api_key: the api key for the service to call 

168 :param base_url: the URL of the service to call 

169 :param blob_storage_account: the storage account to use to store the inputs and outputs 

170 :param blob_storage_apikey: the storage api key to use to store the inputs and outputs 

171 :param blob_container: the container in the blob storage, that will be used to store the inputs and outputs 

172 :param blob_path_prefix: an optional prefix that will be used to store the blobs 

173 :param blob_charset: optional encoding of files used on the blob storage 

174 :param inputs: an optional dictionary containing the inputs, by name. Inputs should be DataFrames. 

175 :param params: an optional dictionary containing the parameters by name, or a DataFrame containing the parameters. 

176 :param output_names: an optional list of expected output names. Note that contrary to rr mode, no outputs will be 

177 provided if this is empty. 

178 :param nb_seconds_between_status_queries: nb of seconds that the engine waits between job status queries. By 

179 default this is set to 5. 

180 :param requests_session: an optional requests.Session object, for example created from create_session_for_proxy() 

181 :return: a dictionary of outputs, by name. Outputs are DataFrames 

182 """ 

183 

184 # 0 create the blob service client and the generic batch mode client 

185 batch_client = BatchClient(requests_session=requests_session) 

186 

187 # if we're here without error that means that `azure-storage` is available 

188 from azure.storage.blob import BlockBlobService 

189 from azmlclient.base_databinding_blobs import blob_refs_to_dfs 

190 

191 blob_service = BlockBlobService(account_name=blob_storage_account, account_key=blob_storage_apikey, 

192 request_session=requests_session) 

193 

194 # 1- Push inputs to blob storage and create output references 

195 print('Pushing inputs to blob storage') 

196 input_refs, output_refs = batch_client.push_inputs_to_blob__and__create_output_references( 

197 inputs, 

198 output_names=output_names, 

199 blob_service=blob_service, 

200 blob_container=blob_container, 

201 blob_path_prefix=blob_path_prefix, 

202 charset=blob_charset 

203 ) 

204 

205 # 2- Create the query body 

206 request_body = batch_client.create_request_body(input_refs, params, output_refs) 

207 

208 # 3- Perform the call 

209 json_job_id = None 

210 try: 

211 # -- a) create the job 

212 print('Creating job') 

213 json_job_id = batch_client.execute_batch_createJob(base_url, api_key, request_body) 

214 

215 # -- b) start the job 

216 print('Starting job ' + str(json_job_id)) 

217 batch_client.execute_batch_startJob(base_url, api_key, json_job_id) 

218 print('Job ' + str(json_job_id) + ' started') 

219 

220 # -- polling loop 

221 outputs_refs2 = None 

222 while outputs_refs2 is None: 

223 # -- c) poll job status 

224 print('Polling job status for job ' + str(json_job_id)) 

225 statusOrResult = batch_client.execute_batch_getJobStatusOrResult(base_url, api_key, json_job_id) 

226 

227 # -- e) check the job status and read response into a dictionary 

228 outputs_refs2 = batch_client.read_status_or_result(statusOrResult) 

229 

230 # wait 

231 print('Waiting ' + str(nb_seconds_between_status_queries) + 's until next call.') 

232 time.sleep(nb_seconds_between_status_queries) 

233 

234 finally: 

235 # -- e) delete the job 

236 if not (json_job_id is None): 

237 print('Deleting job ' + str(json_job_id)) 

238 batch_client.execute_batch_deleteJob(base_url, api_key, json_job_id) 

239 

240 # 4- Retrieve the outputs 

241 print('Job ' + str(json_job_id) + ' completed, results: ') 

242 print(json.dumps(outputs_refs2, indent=4)) 

243 

244 print('Retrieving the outputs from the blob storage') 

245 

246 # dont use the output of the job status (outputs_refs2), it does not contain the connectionString 

247 result_dfs = blob_refs_to_dfs(output_refs, requests_session=requests_session) 

248 

249 return result_dfs 

250 

251 

252class BaseHttpClient(object): 

253 """ 

254 Base class for our http clients. It contains a `requests.Session` object and 

255 """ 

256 def __init__(self, 

257 requests_session=None, # type: requests.Session 

258 ): 

259 """ 

260 Constructor with an optional `requests.Session` to use for subsequent calls. 

261 Also you can declare to use the 'swagger' AzureML format for data table formatting (not enabled by default 

262 because it is more verbose). 

263 

264 :param requests_session: 

265 """ 

266 # optionally create a session 

267 if requests_session is None: 

268 requests_session = requests.Session() 

269 

270 # store it 

271 self.session = requests_session 

272 

273 # if one day we want to reuse Microsoft's Http client to align with blockblobservice, they have this: 

274 # self._httpclient = _HTTPClient( 

275 # protocol=DEFAULT_PROTOCOL, 

276 # session=request_session, 

277 # timeout=SOCKET_TIMEOUT, 

278 # ) 

279 

280 def azureml_http_call(self, 

281 url, # type: str 

282 api_key, # type: str 

283 method, # type: str 

284 body_str=None, # type: Optional[str] 

285 charset='utf-8' # type: str 

286 ): 

287 # type: (...) -> str 

288 """ 

289 Performs an HTTP(s) request to an AzureML web service, whatever it is. 

290 

291 This method 

292 

293 - sets the Authorization header wth the api key 

294 - optionally encodes the input body according to the charset selected 

295 - performs 

296 

297 :param api_key: the api key for this AzureML call. 

298 :param body_str: the input body, for PUT and POST methods 

299 :param url: the url to call 

300 :param method: the HTTP verb to use ('GET', 'PUT', 'POST'...) 

301 :param charset: the optional charset to use to encode the body. Default is 'utf-8' 

302 :return: the response body 

303 """ 

304 # fill the information about the query to perform 

305 headers = {'Authorization': ('Bearer ' + api_key)} 

306 

307 # encode the string as bytes using the charset 

308 if body_str is not None: 308 ↛ 312line 308 didn't jump to line 312, because the condition on line 308 was never false

309 json_body_encoded_with_charset = str.encode(body_str, encoding=charset) 

310 headers['Content-Type'] = 'application/json; charset=' + charset 

311 else: 

312 json_body_encoded_with_charset = None 

313 

314 # finally execute 

315 json_result = self.http_call(json_body_encoded_with_charset, headers, method, url) 

316 

317 return json_result 

318 

319 def http_call(self, 

320 body, 

321 headers, 

322 method, # type: str 

323 url 

324 ): 

325 """ 

326 Sub-routine for HTTP web service call. If Body is None, a GET is performed 

327 

328 :param body: 

329 :param headers: 

330 :param method 

331 :param url: 

332 :return: 

333 """ 

334 try: 

335 # Send the request 

336 response = self.session.request(method, url, headers=headers, data=body or None) 

337 

338 # Parse the response 

339 # http_status = int(response.status_code) 

340 

341 # Possibly raise associated exceptions 

342 response.raise_for_status() 

343 

344 # Decode contents 

345 # headers not useful anymore : encoding is automatically used to read the body when calling response.text 

346 # respheaders = {key.lower(): name for key, name in response.headers.items()} 

347 jsonResult = response.text 

348 return jsonResult 

349 

350 except requests.exceptions.HTTPError as error: 350 ↛ 351line 350 didn't jump to line 351, because the exception caught by line 350 didn't happen

351 print("The request failed with status code: %s" % error.response.status_code) 

352 # Print the headers - they include the request ID and timestamp, which are useful for debugging the failure 

353 print(error.response.headers) 

354 raise AzmlException(error) 

355 

356 except Urllib_HTTPError as error: 

357 print("The request failed with status code: %s" + error.code) 

358 # Print the headers - they include the request ID and timestamp, which are useful for debugging the failure 

359 print(error.info()) 

360 raise AzmlException(error) 

361 

362 

363class RequestResponseClient(BaseHttpClient): 

364 """ 

365 A class providing static methods to perform Request-response calls to AzureML web services 

366 """ 

367 

368 def __init__(self, 

369 requests_session=None, # type: requests.Session 

370 use_swagger_format=False, # type: bool 

371 replace_NaN_with=None, # type: Any 

372 replace_NaT_with=None, # type: Any 

373 ): 

374 """ 

375 Constructor with an optional `requests.Session` to use for subsequent calls. 

376 Also you can declare to use the 'swagger' AzureML format for data table formatting (not enabled by default 

377 because it is more verbose). 

378 

379 :param requests_session: 

380 :param use_swagger_format: a boolean (default False) indicating if the 'swagger' azureml format should be used 

381 to format the data tables in json payloads. 

382 """ 

383 # save swagger format 

384 self.use_swagger_format = use_swagger_format 

385 self.replace_NaN_with = replace_NaN_with 

386 self.replace_NaT_with = replace_NaT_with 

387 

388 # super constructor 

389 super(RequestResponseClient, self).__init__(requests_session=requests_session) 

390 

391 def create_request_body(self, 

392 input_df_dict=None, # type: Dict[str, pd.DataFrame] 

393 params_df_or_dict=None, # type: Union[pd.DataFrame, Dict[str, Any]] 

394 ): 

395 # type (...) -> str 

396 """ 

397 Helper method to create a JSON AzureML web service input from inputs and parameters DataFrames 

398 

399 :param input_df_dict: a dictionary containing input names and input content (each input content is a DataFrame) 

400 :param params_df_or_dict: a dictionary of parameter names and values 

401 :return: a string representation of the request JSON body (not yet encoded in bytes) 

402 """ 

403 # handle optional arguments 

404 if input_df_dict is None: 404 ↛ 405line 404 didn't jump to line 405, because the condition on line 404 was never true

405 input_df_dict = {} 

406 if params_df_or_dict is None: 406 ↛ 407line 406 didn't jump to line 407, because the condition on line 406 was never true

407 params_df_or_dict = {} 

408 

409 # inputs 

410 inputs = dfs_to_azmltables(input_df_dict, swagger_format=self.use_swagger_format, 

411 replace_NaN_with=self.replace_NaN_with, replace_NaT_with=self.replace_NaT_with) 

412 

413 # params 

414 if isinstance(params_df_or_dict, dict): 414 ↛ 416line 414 didn't jump to line 416, because the condition on line 414 was never false

415 params = params_df_or_dict 

416 elif isinstance(params_df_or_dict, pd.DataFrame): 

417 params = params_df_to_params_dict(params_df_or_dict) 

418 else: 

419 raise TypeError('paramsDfOrDict should be a DataFrame or a dictionary, or None, found: ' 

420 + str(type(params_df_or_dict))) 

421 

422 # final body : combine them into a single dictionary ... 

423 body_dict = {'Inputs': inputs, 'GlobalParameters': params} 

424 

425 # ... and serialize as Json 

426 json_body_str = azmltable_to_json(body_dict) 

427 return json_body_str 

428 

429 def create_response_body(self, 

430 output_df_dict=None, # type: Dict[str, pd.DataFrame] 

431 ): 

432 """ 

433 Creates a fake server response mimicking AzureML behaviour, from a dictionary of output dataframes. 

434 

435 :param output_df_dict: a dictionary of {output_name: dataframe} 

436 :return: 

437 """ 

438 res_dict = dfs_to_azmltables(output_df_dict, swagger_format=self.use_swagger_format, mimic_azml_output=True) 

439 return {"Results": res_dict} 

440 

441 def execute_rr(self, 

442 base_url, # type: str 

443 api_key, # type: str 

444 request_body_json, # type: str 

445 ): 

446 # type: (...) -> str 

447 """ 

448 Performs a web service call to AzureML using Request-response mode (synchronous, by value). 

449 Supports Fiddler capture for debug. 

450 

451 :param base_url: 

452 :param api_key: 

453 :param request_body_json: the json body of the web service request, as a string. 

454 :return: the json body of the response, as a string 

455 """ 

456 rr_url = base_url + '/execute?api-version=2.0&details=true' 

457 if self.use_swagger_format: 

458 rr_url += '&format=swagger' 

459 

460 json_result = self.azureml_http_call(url=rr_url, api_key=api_key, method='POST', body_str=request_body_json) 

461 

462 return json_result 

463 

464 @staticmethod 

465 def read_response_json_body(body_json, # type: str 

466 output_names=None, # type: List[str] 

467 ): 

468 # type: (...) -> Dict[str, pd.DataFrame] 

469 """ 

470 Reads a response body from a request-response web service call, into a dictionary of pandas DataFrame 

471 

472 :param body_json: the response body, already decoded as a string 

473 :param output_names: if a non-None list of output names is provided, each of these names must be present in 

474 the outputs dictionary, otherwise an error is raised. 

475 :return: the dictionary of corresponding DataFrames mapped to the output names 

476 """ 

477 # first read the json as a dictionary 

478 result_dict = json_to_azmltable(body_json) 

479 

480 # then transform it into a DataFrame 

481 result_dfs = azmltables_to_dfs(result_dict['Results'], is_azureml_output=True) 

482 

483 if output_names is not None: 483 ↛ 491line 483 didn't jump to line 491, because the condition on line 483 was never false

484 # check the names 

485 missing = list(set(output_names) - set(result_dfs.keys())) 

486 if len(missing) > 0: 486 ↛ 487line 486 didn't jump to line 487, because the condition on line 486 was never true

487 raise Exception("Error : the following outputs are missing in the results: %s. Found outputs: %s" 

488 "" % (missing, set(result_dfs.keys()))) 

489 

490 # return all outputs 

491 return result_dfs 

492 

493 @staticmethod 

494 def decode_request_json_body(body_json # type: str 

495 ): 

496 # type: (...) -> Tuple[Dict[str, pd.DataFrame], Dict] 

497 """ 

498 Reads a request body from a request-response web service call, into a dictionary of pandas DataFrame + a 

499 dictionary of parameters. This is typically useful if you want to debug a request provided by someone else. 

500 

501 :param body_json: 

502 :return: 

503 """ 

504 # first read the json as a dictionary 

505 result_dct = json_to_azmltable(body_json) 

506 

507 return azmltables_to_dfs(result_dct['Inputs']), result_dct['GlobalParameters'] 

508 

509 

510class BatchClient(BaseHttpClient): 

511 """ This class provides static methods to call AzureML services in batch mode""" 

512 

513 def __init__(self, 

514 requests_session=None # type: requests.Session 

515 ): 

516 # check that the `azure-storage` package is installed 

517 try: 

518 from azure.storage.blob import BlockBlobService # noqa 

519 except ImportError as e: 

520 raise_from(ValueError("Please install `azure-storage==0.33` to use BATCH mode"), e) 

521 

522 super(BatchClient, self).__init__(requests_session=requests_session) 

523 

524 def push_inputs_to_blob__and__create_output_references(self, 

525 inputs_df_dict, # type: Dict[str, pd.DataFrame] 

526 blob_service, # type: BlockBlobService # noqa 

527 blob_container, # type: str 

528 blob_path_prefix=None, # type: str 

529 charset=None, # type: str 

530 output_names=None # type: List[str] 

531 ): 

532 # type: (...) -> Tuple[Dict[str, Dict[str, str]], Dict[str, Dict[str, str]]] 

533 """ 

534 Utility method to push all inputs from the provided dictionary into the selected blob storage on the cloud. 

535 Each input is an entry of the dictionary and should be a Dataframe. 

536 The inputs will be written to the blob using the provided charset. 

537 

538 Files created on the blob storage will have a prefix generated from the current time, in order to 

539 quickly identify inputs pushed at the same time. For convenience, this prefix is provided as an output of this 

540 function so that outputs may be 

541 

542 :param inputs_df_dict: 

543 :param blob_service: 

544 :param blob_container: the blob container name 

545 :param blob_path_prefix: the prefix to use for all blobs 

546 :param charset: 

547 :param output_names: 

548 :return: a tuple containing (1) a dictionary of "by reference" input descriptions 

549 and (2) a dictionary of "by reference" output descriptions 

550 """ 

551 from azmlclient.base_databinding_blobs import dfs_to_blob_refs, create_blob_refs 

552 

553 if output_names is None: 

554 output_names = [] 

555 

556 # 1- create unique blob naming prefix 

557 now = datetime.now() 

558 unique_blob_name_prefix = now.strftime("%Y-%m-%d_%H%M%S_%f") 

559 

560 # 2- store INPUTS and retrieve references 

561 input_refs = dfs_to_blob_refs(inputs_df_dict, blob_service=blob_service, blob_container=blob_container, 

562 blob_path_prefix=blob_path_prefix, 

563 blob_name_prefix=unique_blob_name_prefix + '-input-', charset=charset) 

564 

565 # 3- create OUTPUT references 

566 output_refs = create_blob_refs(blob_names=output_names, blob_service=blob_service, 

567 blob_container=blob_container, blob_path_prefix=blob_path_prefix, 

568 blob_name_prefix=unique_blob_name_prefix + '-output-') 

569 

570 return input_refs, output_refs 

571 

572 @staticmethod 

573 def create_request_body(input_refs=None, # type: Dict[str, Dict[str, str]] 

574 params_df_or_dict=None, # type: Union[Dict[str, Any], pd.DataFrame] 

575 output_refs=None # type: Dict[str, Dict[str, str]] 

576 ): 

577 # type: (...) -> str 

578 """ 

579 Helper method to create a JSON AzureML web service input in Batch mode, from 'by reference' inputs, and 

580 parameters as DataFrame 

581 

582 :param input_refs: a dictionary containing input names and input references (each input reference is a 

583 dictionary) 

584 :param params_df_or_dict: a dictionary of parameter names and values 

585 :param output_refs: a dictionary containing output names and output references (each output reference is a 

586 dictionary) 

587 :return: a string representation of the request JSON body (not yet encoded in bytes) 

588 """ 

589 

590 # params 

591 if params_df_or_dict is None: 

592 params_df_or_dict = {} 

593 

594 if isinstance(params_df_or_dict, dict): 

595 params = params_df_or_dict 

596 elif isinstance(params_df_or_dict, pd.DataFrame): 

597 params = params_df_to_params_dict(params_df_or_dict) 

598 else: 

599 raise TypeError( 

600 'paramsDfOrDict should be a DataFrame or a dictionary, or None, found: ' + str(type(params_df_or_dict))) 

601 

602 # final body : combine them into a single dictionary ... 

603 body_dict = {'Inputs': input_refs, 'GlobalParameters': params, 'Outputs': output_refs} 

604 

605 # ... and serialize as Json 

606 json_body_str = azmltable_to_json(body_dict) 

607 return json_body_str 

608 

609 def execute_batch_createJob(self, 

610 base_url, # type: str 

611 api_key, # type: str 

612 request_json_body, # type: str 

613 ): 

614 # type: (...) -> str 

615 """ 

616 Performs a web service call to AzureML using Batch mode (asynchronous, by reference). 

617 Supports Fiddler capture for debug. 

618 

619 :param base_url: 

620 :param api_key: 

621 :param request_json_body: 

622 :return: 

623 """ 

624 

625 batch_url = base_url + '/jobs?api-version=2.0' 

626 jsonJobId = self.azureml_http_call(url=batch_url, api_key=api_key, method='POST', body_str=request_json_body) 

627 

628 # unquote the json Job Id 

629 if jsonJobId.startswith('"') and jsonJobId.endswith('"'): 

630 return jsonJobId[1:-1] 

631 else: 

632 return jsonJobId 

633 

634 def execute_batch_startJob(self, 

635 base_url, # type: str 

636 api_key, # type: str 

637 job_id, # type: str 

638 ): 

639 """ 

640 Starts an AzureML Batch job (asynchronous, by reference). 

641 Supports Fiddler capture for debug. 

642 

643 :param base_url: 

644 :param api_key: 

645 :param job_id: 

646 :return: 

647 """ 

648 

649 batch_url = base_url + '/jobs/' + job_id + '/start?api-version=2.0' 

650 

651 self.azureml_http_call(url=batch_url, api_key=api_key, method='POST', body_str=None) 

652 return 

653 

654 def execute_batch_getJobStatusOrResult(self, 

655 base_url, # type: str 

656 api_key, # type: str 

657 job_id, # type: str 

658 ): 

659 # type: (...) -> str 

660 """ 

661 Gets the status or the result of an AzureML Batch job (asynchronous, by reference). 

662 Supports Fiddler capture for debug. 

663 

664 :param base_url: 

665 :param api_key: 

666 :param job_id: 

667 :return: 

668 """ 

669 batch_url = base_url + '/jobs/' + job_id + '?api-version=2.0' 

670 json_job_status_or_result = self.azureml_http_call(url=batch_url, api_key=api_key, method='GET', body_str=None) 

671 return json_job_status_or_result 

672 

673 def read_status_or_result(self, 

674 jobstatus_or_result_json # type: str 

675 ): 

676 # type: (...) -> Dict[str, Dict[str, str]] 

677 """ 

678 An alias to the static method 

679 

680 :param jobstatus_or_result_json: 

681 :return: 

682 """ 

683 return BatchClient.read_status_or_result_static(jobstatus_or_result_json) 

684 

685 @staticmethod 

686 def read_status_or_result_static(jobstatus_or_result_json # type: str 

687 ): 

688 # type: (...) -> Dict[str, Dict[str, str]] 

689 """ 

690 Reads the status or the result of an AzureML Batch job (asynchronous, by reference). 

691 Throws an error if the status is an error, or an empty result if the status is a 

692 

693 :param jobstatus_or_result_json: 

694 :return: the status as a dictionary, and throws an error if the job had an error 

695 """ 

696 

697 # first read the json as a dictionary 

698 result_dict = json_to_azmltable(jobstatus_or_result_json) 

699 

700 try: 

701 status_code = result_dict['StatusCode'] 

702 

703 if status_code in ('3', 'Cancelled'): 

704 raise IllegalJobStateException("The job state is '%s' : cannot read the outcome" % status_code) 

705 

706 elif status_code in ('2', 'Failed'): 

707 raise JobExecutionException("The job ended with an error : %s" % result_dict['Details']) 

708 

709 elif status_code in ('0', 'NotStarted', '1', 'Running', '4', 'Finished'): 

710 jobstatus_or_result_json = result_dict['Results'] 

711 

712 else: 

713 raise IllegalJobStateException( 

714 'The job state is ' + status_code + ' : unknown state') 

715 

716 return jobstatus_or_result_json 

717 

718 except KeyError: 

719 raise ValueError("Error reading job state : received %s" % result_dict) 

720 

721 def execute_batch_deleteJob(self, 

722 base_url, # type: str 

723 api_key, # type: str 

724 job_id, # type: str 

725 ): 

726 """ 

727 Deletes an AzureML Batch job (asynchronous, by reference). 

728 Supports Fiddler capture for debug. 

729 

730 :param base_url: 

731 :param api_key: 

732 :param job_id: 

733 :return: 

734 """ 

735 batch_url = base_url + '/jobs/' + job_id + '?api-version=2.0' 

736 

737 self.azureml_http_call(url=batch_url, api_key=api_key, method='DELETE', body_str=None) 

738 return 

739 

740 

741RR_Client = RequestResponseClient 

742"""Legacy alias""" 

743 

744Batch_Client = BatchClient 

745"""Legacy alias"""