1 # Authors: Sylvain MARIE <sylvain.marie@se.com>
2 # + All contributors to <https://github.com/smarie/python-azureml-client>
3 #
4 # License: 3-clause BSD, <https://github.com/smarie/python-azureml-client/blob/master/LICENSE>
5 import json
6 import time
7
8 from datetime import datetime
9 from warnings import warn
10
11 from six import raise_from
12
13 try:
14 from urllib.error import HTTPError as Urllib_HTTPError
15 except ImportError:
16 # create a dummy class
17 class Urllib_HTTPError(Exception):
18 pass
19
20 import pandas as pd
21 import requests
22
23 try: # python 3.5+
24 from typing import List, Dict, Tuple, Any, Union, Optional
25 except ImportError:
26 pass
27
28 from .requests_utils import set_http_proxy
29 from .base_databinding import AzmlException, dfs_to_azmltables, params_df_to_params_dict, azmltable_to_json, \
30 json_to_azmltable, azmltables_to_dfs
31
32
33 class IllegalJobStateException(Exception):
34 """ This is raised whenever a job has illegal state"""
35
36
37 class JobExecutionException(Exception):
38 """ This is raised whenever a job ended in failed mode"""
39
40
41 def create_session_for_proxy(http_proxyhost, # type: str
42 http_proxyport, # type: int
43 https_proxyhost=None, # type: str
44 https_proxyport=None, # type: int
45 use_http_for_https_proxy=False, # type: bool
46 ssl_verify=None # type: Union[bool, str]
47 ):
48 # type: (...) -> requests.Session
49 """
50 DEPRECATED - users should rather create a Session() and use set_http_proxy(session, **kwargs) instead
51
52 Helper method to configure the request package to use the proxy of your choice and adapt the SSL certificate
53 validation accordingly.
54
55 ```python
56 # create a temporary Session to use Fiddler as the network proxy
57 debug_session = create_session_for_proxy('localhost', 8888, use_http_for_https_proxy=True, ssl_verify=False)
58
59 # use that session in a, AzureML web service call
60 execute_rr(..., requests_session=debug_session)
61 ```
62
63 :param http_proxyhost: mandatory proxy host for http
64 :param http_proxyport: mandatory proxy port for http
65 :param https_proxyhost: optional proxy host for https. If none is provided, http_proxyhost will be used
66 :param https_proxyport: optional proxy port for https. If none is provided, http_proxyport will be used
67 :param use_http_for_https_proxy: optional, if set to true the http protocol will be used to initiate communications
68 with the proxy even for https calls (then calls will be done in https as usual).
69 :param ssl_verify: optional ssl verification parameter. It may either be the path to an additional certificate
70 to trust (recommended), or a boolean to enable (default)/disable (not recommended ! use only in debug mode !)
71 certificate validation.
72 See here for details : http://docs.python-requests.org/en/master/user/advanced/#ssl-cert-verification
73 :return: a requests.Session object that you may use with the rest of the library
74 """
75 warn("This method is deprecated - please create a Session() and use set_http_proxy(session, **kwargs) instead")
76
77 session = requests.Session()
78
79 set_http_proxy(session,
80 http_proxyhost=http_proxyhost, http_proxyport=http_proxyport,
81 https_proxyhost=https_proxyhost, https_proxyport=https_proxyport,
82 use_http_proxy_for_https_requests=use_http_for_https_proxy)
83
84 if ssl_verify is not None:
85 session.verify = ssl_verify
86
87 return session
88
89
90 def execute_rr(api_key, # type: str
91 base_url, # type: str
92 inputs=None, # type: Dict[str, pd.DataFrame]
93 params=None, # type: Union[pd.DataFrame, Dict[str, Any]]
94 output_names=None, # type: List[str]
95 only_keep_selected_output_names=False, # type: bool
96 use_swagger_format=False, # type: bool
97 replace_NaN_with=None, # type: Any
98 replace_NaT_with=None, # type: Any
99 requests_session=None # type: requests.Session
100 ):
101 # type: (...) -> Dict[str, pd.DataFrame]
102 """
103 Executes an AzureMl web service in request-response (RR) mode. This mode is typically used when the web service does
104 not take too long to execute. For longer operations you should use the batch mode (BES).
105
106 :param api_key: the api key for the AzureML web service to call. For example 'fdjmxkqktcuhifljflkdmw'
107 :param base_url: the URL of the AzureML web service to call. It should not contain the "execute". This is typically
108 in the form 'https://<geo>.services.azureml.net/workspaces/<wId>/services/<sId>'.
109 :param inputs: an optional dictionary containing the inputs, by name. Inputs should be DataFrames.
110 :param params: an optional dictionary containing the parameters by name, or a DataFrame containing the parameters.
111 :param output_names: an optional list of expected output names, for automatic validation.
112 :param only_keep_selected_output_names: a boolean (default False) to indicate if only the outputs selected in
113 `output_names` should be kept in the returned dictionary.
114 :param use_swagger_format: a boolean (default False) indicating if the 'swagger' azureml format should be used
115 to format the data tables in json payloads.
116 :param requests_session: an optional requests.Session object, for example created from create_session_for_proxy()
117 :return: a dictionary of outputs, by name. Outputs are DataFrames
118 """
119 # quick check before spending time with the query
120 if output_names is None and only_keep_selected_output_names:
121 raise ValueError("`only_keep_selected_output_names` can only be used with a non-None list of "
122 "`output_names`")
123
124 # 0- Create the generic request-response client
125 rr_client = RequestResponseClient(requests_session=requests_session, use_swagger_format=use_swagger_format,
126 replace_NaN_with=replace_NaN_with, replace_NaT_with=replace_NaT_with)
127
128 # 1- Create the query body
129 request_body = rr_client.create_request_body(inputs, params)
130
131 # 2- Execute the query and receive the response body
132 response_body = rr_client.execute_rr(base_url, api_key, request_body)
133
134 # 3- parse the response body into a dictionary of DataFrames
135 result_dfs = rr_client.read_response_json_body(response_body, output_names)
136
137 # 4- possibly filter outputs
138 if only_keep_selected_output_names:
139 selected_dfs = {k: result_dfs[k] for k in output_names}
140 return selected_dfs
141 else:
142 return result_dfs
143
144
145 def execute_bes(api_key, # type: str
146 base_url, # type: str
147 blob_storage_account, # type: str
148 blob_storage_apikey, # type: str
149 blob_container, # type: str
150 blob_path_prefix=None, # type: str
151 blob_charset=None, # type: str
152 inputs=None, # type: Dict[str, pd.DataFrame]
153 params=None,
154 output_names=None, # type: List[str]
155 nb_seconds_between_status_queries=5, # type: int
156 requests_session=None # type: requests.Session
157 ):
158 """
159 Executes an AzureMl web service in batch mode (BES: Batch Execution Service).
160
161 Its inputs are the same than `execute_rr` but in addition it takes information about the blob storage service to
162 use. Indeed in batch mode, all inputs and outputs go through an intermediate blob storage.
163
164 The AzureML job status is queried every 5 seconds by default, you may wish to change that number with
165 `nb_seconds_between_status_queries`.
166
167 :param api_key: the api key for the service to call
168 :param base_url: the URL of the service to call
169 :param blob_storage_account: the storage account to use to store the inputs and outputs
170 :param blob_storage_apikey: the storage api key to use to store the inputs and outputs
171 :param blob_container: the container in the blob storage, that will be used to store the inputs and outputs
172 :param blob_path_prefix: an optional prefix that will be used to store the blobs
173 :param blob_charset: optional encoding of files used on the blob storage
174 :param inputs: an optional dictionary containing the inputs, by name. Inputs should be DataFrames.
175 :param params: an optional dictionary containing the parameters by name, or a DataFrame containing the parameters.
176 :param output_names: an optional list of expected output names. Note that contrary to rr mode, no outputs will be
177 provided if this is empty.
178 :param nb_seconds_between_status_queries: nb of seconds that the engine waits between job status queries. By
179 default this is set to 5.
180 :param requests_session: an optional requests.Session object, for example created from create_session_for_proxy()
181 :return: a dictionary of outputs, by name. Outputs are DataFrames
182 """
183
184 # 0 create the blob service client and the generic batch mode client
185 batch_client = BatchClient(requests_session=requests_session)
186
187 # if we're here without error that means that `azure-storage` is available
188 from azure.storage.blob import BlockBlobService
189 from azmlclient.base_databinding_blobs import blob_refs_to_dfs
190
191 blob_service = BlockBlobService(account_name=blob_storage_account, account_key=blob_storage_apikey,
192 request_session=requests_session)
193
194 # 1- Push inputs to blob storage and create output references
195 print('Pushing inputs to blob storage')
196 input_refs, output_refs = batch_client.push_inputs_to_blob__and__create_output_references(
197 inputs,
198 output_names=output_names,
199 blob_service=blob_service,
200 blob_container=blob_container,
201 blob_path_prefix=blob_path_prefix,
202 charset=blob_charset
203 )
204
205 # 2- Create the query body
206 request_body = batch_client.create_request_body(input_refs, params, output_refs)
207
208 # 3- Perform the call
209 json_job_id = None
210 try:
211 # -- a) create the job
212 print('Creating job')
213 json_job_id = batch_client.execute_batch_createJob(base_url, api_key, request_body)
214
215 # -- b) start the job
216 print('Starting job ' + str(json_job_id))
217 batch_client.execute_batch_startJob(base_url, api_key, json_job_id)
218 print('Job ' + str(json_job_id) + ' started')
219
220 # -- polling loop
221 outputs_refs2 = None
222 while outputs_refs2 is None:
223 # -- c) poll job status
224 print('Polling job status for job ' + str(json_job_id))
225 statusOrResult = batch_client.execute_batch_getJobStatusOrResult(base_url, api_key, json_job_id)
226
227 # -- e) check the job status and read response into a dictionary
228 outputs_refs2 = batch_client.read_status_or_result(statusOrResult)
229
230 # wait
231 print('Waiting ' + str(nb_seconds_between_status_queries) + 's until next call.')
232 time.sleep(nb_seconds_between_status_queries)
233
234 finally:
235 # -- e) delete the job
236 if not (json_job_id is None):
237 print('Deleting job ' + str(json_job_id))
238 batch_client.execute_batch_deleteJob(base_url, api_key, json_job_id)
239
240 # 4- Retrieve the outputs
241 print('Job ' + str(json_job_id) + ' completed, results: ')
242 print(json.dumps(outputs_refs2, indent=4))
243
244 print('Retrieving the outputs from the blob storage')
245
246 # dont use the output of the job status (outputs_refs2), it does not contain the connectionString
247 result_dfs = blob_refs_to_dfs(output_refs, requests_session=requests_session)
248
249 return result_dfs
250
251
252 class BaseHttpClient(object):
253 """
254 Base class for our http clients. It contains a `requests.Session` object and
255 """
256 def __init__(self,
257 requests_session=None, # type: requests.Session
258 ):
259 """
260 Constructor with an optional `requests.Session` to use for subsequent calls.
261 Also you can declare to use the 'swagger' AzureML format for data table formatting (not enabled by default
262 because it is more verbose).
263
264 :param requests_session:
265 """
266 # optionally create a session
267 if requests_session is None:
268 requests_session = requests.Session()
269
270 # store it
271 self.session = requests_session
272
273 # if one day we want to reuse Microsoft's Http client to align with blockblobservice, they have this:
274 # self._httpclient = _HTTPClient(
275 # protocol=DEFAULT_PROTOCOL,
276 # session=request_session,
277 # timeout=SOCKET_TIMEOUT,
278 # )
279
280 def azureml_http_call(self,
281 url, # type: str
282 api_key, # type: str
283 method, # type: str
284 body_str=None, # type: Optional[str]
285 charset='utf-8' # type: str
286 ):
287 # type: (...) -> str
288 """
289 Performs an HTTP(s) request to an AzureML web service, whatever it is.
290
291 This method
292
293 - sets the Authorization header wth the api key
294 - optionally encodes the input body according to the charset selected
295 - performs
296
297 :param api_key: the api key for this AzureML call.
298 :param body_str: the input body, for PUT and POST methods
299 :param url: the url to call
300 :param method: the HTTP verb to use ('GET', 'PUT', 'POST'...)
301 :param charset: the optional charset to use to encode the body. Default is 'utf-8'
302 :return: the response body
303 """
304 # fill the information about the query to perform
305 headers = {'Authorization': ('Bearer ' + api_key)}
306
307 # encode the string as bytes using the charset
308 if body_str is not None:
309 json_body_encoded_with_charset = str.encode(body_str, encoding=charset)
310 headers['Content-Type'] = 'application/json; charset=' + charset
311 else:
312 json_body_encoded_with_charset = None
313
314 # finally execute
315 json_result = self.http_call(json_body_encoded_with_charset, headers, method, url)
316
317 return json_result
318
319 def http_call(self,
320 body,
321 headers,
322 method, # type: str
323 url
324 ):
325 """
326 Sub-routine for HTTP web service call. If Body is None, a GET is performed
327
328 :param body:
329 :param headers:
330 :param method
331 :param url:
332 :return:
333 """
334 try:
335 # Send the request
336 response = self.session.request(method, url, headers=headers, data=body or None)
337
338 # Parse the response
339 # http_status = int(response.status_code)
340
341 # Possibly raise associated exceptions
342 response.raise_for_status()
343
344 # Decode contents
345 # headers not useful anymore : encoding is automatically used to read the body when calling response.text
346 # respheaders = {key.lower(): name for key, name in response.headers.items()}
347 jsonResult = response.text
348 return jsonResult
349
350 except requests.exceptions.HTTPError as error:
351 print("The request failed with status code: %s" % error.response.status_code)
352 # Print the headers - they include the request ID and timestamp, which are useful for debugging the failure
353 print(error.response.headers)
354 raise AzmlException(error)
355
356 except Urllib_HTTPError as error:
357 print("The request failed with status code: %s" + error.code)
358 # Print the headers - they include the request ID and timestamp, which are useful for debugging the failure
360 raise AzmlException(error)
361
362
363 class RequestResponseClient(BaseHttpClient):
364 """
365 A class providing static methods to perform Request-response calls to AzureML web services
366 """
367
368 def __init__(self,
369 requests_session=None, # type: requests.Session
370 use_swagger_format=False, # type: bool
371 replace_NaN_with=None, # type: Any
372 replace_NaT_with=None, # type: Any
373 ):
374 """
375 Constructor with an optional `requests.Session` to use for subsequent calls.
376 Also you can declare to use the 'swagger' AzureML format for data table formatting (not enabled by default
377 because it is more verbose).
378
379 :param requests_session:
380 :param use_swagger_format: a boolean (default False) indicating if the 'swagger' azureml format should be used
381 to format the data tables in json payloads.
382 """
383 # save swagger format
384 self.use_swagger_format = use_swagger_format
385 self.replace_NaN_with = replace_NaN_with
386 self.replace_NaT_with = replace_NaT_with
387
388 # super constructor
389 super(RequestResponseClient, self).__init__(requests_session=requests_session)
390
391 def create_request_body(self,
392 input_df_dict=None, # type: Dict[str, pd.DataFrame]
393 params_df_or_dict=None, # type: Union[pd.DataFrame, Dict[str, Any]]
394 ):
395 # type (...) -> str
396 """
397 Helper method to create a JSON AzureML web service input from inputs and parameters DataFrames
398
399 :param input_df_dict: a dictionary containing input names and input content (each input content is a DataFrame)
400 :param params_df_or_dict: a dictionary of parameter names and values
401 :return: a string representation of the request JSON body (not yet encoded in bytes)
402 """
403 # handle optional arguments
404 if input_df_dict is None:
405 input_df_dict = {}
406 if params_df_or_dict is None:
407 params_df_or_dict = {}
408
409 # inputs
410 inputs = dfs_to_azmltables(input_df_dict, swagger_format=self.use_swagger_format,
411 replace_NaN_with=self.replace_NaN_with, replace_NaT_with=self.replace_NaT_with)
412
413 # params
414 if isinstance(params_df_or_dict, dict):
415 params = params_df_or_dict
416 elif isinstance(params_df_or_dict, pd.DataFrame):
417 params = params_df_to_params_dict(params_df_or_dict)
418 else:
419 raise TypeError('paramsDfOrDict should be a DataFrame or a dictionary, or None, found: '
420 + str(type(params_df_or_dict)))
421
422 # final body : combine them into a single dictionary ...
423 body_dict = {'Inputs': inputs, 'GlobalParameters': params}
424
425 # ... and serialize as Json
426 json_body_str = azmltable_to_json(body_dict)
427 return json_body_str
428
429 def create_response_body(self,
430 output_df_dict=None, # type: Dict[str, pd.DataFrame]
431 ):
432 """
433 Creates a fake server response mimicking AzureML behaviour, from a dictionary of output dataframes.
434
435 :param output_df_dict: a dictionary of {output_name: dataframe}
436 :return:
437 """
438 res_dict = dfs_to_azmltables(output_df_dict, swagger_format=self.use_swagger_format, mimic_azml_output=True)
439 return {"Results": res_dict}
440
441 def execute_rr(self,
442 base_url, # type: str
443 api_key, # type: str
444 request_body_json, # type: str
445 ):
446 # type: (...) -> str
447 """
448 Performs a web service call to AzureML using Request-response mode (synchronous, by value).
449 Supports Fiddler capture for debug.
450
451 :param base_url:
452 :param api_key:
453 :param request_body_json: the json body of the web service request, as a string.
454 :return: the json body of the response, as a string
455 """
456 rr_url = base_url + '/execute?api-version=2.0&details=true'
457 if self.use_swagger_format:
458 rr_url += '&format=swagger'
459
460 json_result = self.azureml_http_call(url=rr_url, api_key=api_key, method='POST', body_str=request_body_json)
461
462 return json_result
463
464 @staticmethod
465 def read_response_json_body(body_json, # type: str
466 output_names=None, # type: List[str]
467 ):
468 # type: (...) -> Dict[str, pd.DataFrame]
469 """
470 Reads a response body from a request-response web service call, into a dictionary of pandas DataFrame
471
472 :param body_json: the response body, already decoded as a string
473 :param output_names: if a non-None list of output names is provided, each of these names must be present in
474 the outputs dictionary, otherwise an error is raised.
475 :return: the dictionary of corresponding DataFrames mapped to the output names
476 """
477 # first read the json as a dictionary
478 result_dict = json_to_azmltable(body_json)
479
480 # then transform it into a DataFrame
481 result_dfs = azmltables_to_dfs(result_dict['Results'], is_azureml_output=True)
482
483 if output_names is not None:
484 # check the names
485 missing = list(set(output_names) - set(result_dfs.keys()))
486 if len(missing) > 0:
487 raise Exception("Error : the following outputs are missing in the results: %s. Found outputs: %s"
488 "" % (missing, set(result_dfs.keys())))
489
490 # return all outputs
491 return result_dfs
492
493 @staticmethod
494 def decode_request_json_body(body_json # type: str
495 ):
496 # type: (...) -> Tuple[Dict[str, pd.DataFrame], Dict]
497 """
498 Reads a request body from a request-response web service call, into a dictionary of pandas DataFrame + a
499 dictionary of parameters. This is typically useful if you want to debug a request provided by someone else.
500
501 :param body_json:
502 :return:
503 """
504 # first read the json as a dictionary
505 result_dct = json_to_azmltable(body_json)
506
507 return azmltables_to_dfs(result_dct['Inputs']), result_dct['GlobalParameters']
508
509
510 class BatchClient(BaseHttpClient):
511 """ This class provides static methods to call AzureML services in batch mode"""
512
513 def __init__(self,
514 requests_session=None # type: requests.Session
515 ):
516 # check that the `azure-storage` package is installed
517 try:
518 from azure.storage.blob import BlockBlobService # noqa
519 except ImportError as e:
520 raise_from(ValueError("Please install `azure-storage==0.33` to use BATCH mode"), e)
521
522 super(BatchClient, self).__init__(requests_session=requests_session)
523
524 def push_inputs_to_blob__and__create_output_references(self,
525 inputs_df_dict, # type: Dict[str, pd.DataFrame]
526 blob_service, # type: BlockBlobService # noqa
527 blob_container, # type: str
528 blob_path_prefix=None, # type: str
529 charset=None, # type: str
530 output_names=None # type: List[str]
531 ):
532 # type: (...) -> Tuple[Dict[str, Dict[str, str]], Dict[str, Dict[str, str]]]
533 """
534 Utility method to push all inputs from the provided dictionary into the selected blob storage on the cloud.
535 Each input is an entry of the dictionary and should be a Dataframe.
536 The inputs will be written to the blob using the provided charset.
537
538 Files created on the blob storage will have a prefix generated from the current time, in order to
539 quickly identify inputs pushed at the same time. For convenience, this prefix is provided as an output of this
540 function so that outputs may be
541
542 :param inputs_df_dict:
543 :param blob_service:
544 :param blob_container: the blob container name
545 :param blob_path_prefix: the prefix to use for all blobs
546 :param charset:
547 :param output_names:
548 :return: a tuple containing (1) a dictionary of "by reference" input descriptions
549 and (2) a dictionary of "by reference" output descriptions
550 """
551 from azmlclient.base_databinding_blobs import dfs_to_blob_refs, create_blob_refs
552
553 if output_names is None:
554 output_names = []
555
556 # 1- create unique blob naming prefix
557 now = datetime.now()
558 unique_blob_name_prefix = now.strftime("%Y-%m-%d_%H%M%S_%f")
559
560 # 2- store INPUTS and retrieve references
561 input_refs = dfs_to_blob_refs(inputs_df_dict, blob_service=blob_service, blob_container=blob_container,
562 blob_path_prefix=blob_path_prefix,
563 blob_name_prefix=unique_blob_name_prefix + '-input-', charset=charset)
564
565 # 3- create OUTPUT references
566 output_refs = create_blob_refs(blob_names=output_names, blob_service=blob_service,
567 blob_container=blob_container, blob_path_prefix=blob_path_prefix,
568 blob_name_prefix=unique_blob_name_prefix + '-output-')
569
570 return input_refs, output_refs
571
572 @staticmethod
573 def create_request_body(input_refs=None, # type: Dict[str, Dict[str, str]]
574 params_df_or_dict=None, # type: Union[Dict[str, Any], pd.DataFrame]
575 output_refs=None # type: Dict[str, Dict[str, str]]
576 ):
577 # type: (...) -> str
578 """
579 Helper method to create a JSON AzureML web service input in Batch mode, from 'by reference' inputs, and
580 parameters as DataFrame
581
582 :param input_refs: a dictionary containing input names and input references (each input reference is a
583 dictionary)
584 :param params_df_or_dict: a dictionary of parameter names and values
585 :param output_refs: a dictionary containing output names and output references (each output reference is a
586 dictionary)
587 :return: a string representation of the request JSON body (not yet encoded in bytes)
588 """
589
590 # params
591 if params_df_or_dict is None:
592 params_df_or_dict = {}
593
594 if isinstance(params_df_or_dict, dict):
595 params = params_df_or_dict
596 elif isinstance(params_df_or_dict, pd.DataFrame):
597 params = params_df_to_params_dict(params_df_or_dict)
598 else:
599 raise TypeError(
600 'paramsDfOrDict should be a DataFrame or a dictionary, or None, found: ' + str(type(params_df_or_dict)))
601
602 # final body : combine them into a single dictionary ...
603 body_dict = {'Inputs': input_refs, 'GlobalParameters': params, 'Outputs': output_refs}
604
605 # ... and serialize as Json
606 json_body_str = azmltable_to_json(body_dict)
607 return json_body_str
608
609 def execute_batch_createJob(self,
610 base_url, # type: str
611 api_key, # type: str
612 request_json_body, # type: str
613 ):
614 # type: (...) -> str
615 """
616 Performs a web service call to AzureML using Batch mode (asynchronous, by reference).
617 Supports Fiddler capture for debug.
618
619 :param base_url:
620 :param api_key:
621 :param request_json_body:
622 :return:
623 """
624
625 batch_url = base_url + '/jobs?api-version=2.0'
626 jsonJobId = self.azureml_http_call(url=batch_url, api_key=api_key, method='POST', body_str=request_json_body)
627
628 # unquote the json Job Id
629 if jsonJobId.startswith('"') and jsonJobId.endswith('"'):
630 return jsonJobId[1:-1]
631 else:
632 return jsonJobId
633
634 def execute_batch_startJob(self,
635 base_url, # type: str
636 api_key, # type: str
637 job_id, # type: str
638 ):
639 """
640 Starts an AzureML Batch job (asynchronous, by reference).
641 Supports Fiddler capture for debug.
642
643 :param base_url:
644 :param api_key:
645 :param job_id:
646 :return:
647 """
648
649 batch_url = base_url + '/jobs/' + job_id + '/start?api-version=2.0'
650
651 self.azureml_http_call(url=batch_url, api_key=api_key, method='POST', body_str=None)
652 return
653
654 def execute_batch_getJobStatusOrResult(self,
655 base_url, # type: str
656 api_key, # type: str
657 job_id, # type: str
658 ):
659 # type: (...) -> str
660 """
661 Gets the status or the result of an AzureML Batch job (asynchronous, by reference).
662 Supports Fiddler capture for debug.
663
664 :param base_url:
665 :param api_key:
666 :param job_id:
667 :return:
668 """
669 batch_url = base_url + '/jobs/' + job_id + '?api-version=2.0'
670 json_job_status_or_result = self.azureml_http_call(url=batch_url, api_key=api_key, method='GET', body_str=None)
671 return json_job_status_or_result
672
673 def read_status_or_result(self,
674 jobstatus_or_result_json # type: str
675 ):
676 # type: (...) -> Dict[str, Dict[str, str]]
677 """
678 An alias to the static method
679
680 :param jobstatus_or_result_json:
681 :return:
682 """
683 return BatchClient.read_status_or_result_static(jobstatus_or_result_json)
684
685 @staticmethod
686 def read_status_or_result_static(jobstatus_or_result_json # type: str
687 ):
688 # type: (...) -> Dict[str, Dict[str, str]]
689 """
690 Reads the status or the result of an AzureML Batch job (asynchronous, by reference).
691 Throws an error if the status is an error, or an empty result if the status is a
692
693 :param jobstatus_or_result_json:
694 :return: the status as a dictionary, and throws an error if the job had an error
695 """
696
697 # first read the json as a dictionary
698 result_dict = json_to_azmltable(jobstatus_or_result_json)
699
700 try:
701 status_code = result_dict['StatusCode']
702
703 if status_code in ('3', 'Cancelled'):
704 raise IllegalJobStateException("The job state is '%s' : cannot read the outcome" % status_code)
705
706 elif status_code in ('2', 'Failed'):
707 raise JobExecutionException("The job ended with an error : %s" % result_dict['Details'])
708
709 elif status_code in ('0', 'NotStarted', '1', 'Running', '4', 'Finished'):
710 jobstatus_or_result_json = result_dict['Results']
711
712 else:
713 raise IllegalJobStateException(
714 'The job state is ' + status_code + ' : unknown state')
715
716 return jobstatus_or_result_json
717
718 except KeyError:
719 raise ValueError("Error reading job state : received %s" % result_dict)
720
721 def execute_batch_deleteJob(self,
722 base_url, # type: str
723 api_key, # type: str
724 job_id, # type: str
725 ):
726 """
727 Deletes an AzureML Batch job (asynchronous, by reference).
728 Supports Fiddler capture for debug.
729
730 :param base_url:
731 :param api_key:
732 :param job_id:
733 :return:
734 """
735 batch_url = base_url + '/jobs/' + job_id + '?api-version=2.0'
736
737 self.azureml_http_call(url=batch_url, api_key=api_key, method='DELETE', body_str=None)
738 return
739
740
741 RR_Client = RequestResponseClient
742 """Legacy alias"""
743
744 Batch_Client = BatchClient
745 """Legacy alias"""