Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# Authors: Sylvain MARIE <sylvain.marie@se.com> 

2# + All contributors to <https://github.com/smarie/python-azureml-client> 

3# 

4# License: 3-clause BSD, <https://github.com/smarie/python-azureml-client/blob/master/LICENSE> 

5from abc import abstractmethod, ABCMeta 

6 

7from valid8 import validate 

8from six import with_metaclass 

9 

10try: # python 3.5+ 

11 from typing import Dict, List 

12except ImportError: 

13 pass 

14 

15from requests import Session 

16import pandas as pd 

17 

18from azmlclient.base import execute_bes, execute_rr 

19from azmlclient.clients_config import ServiceConfig 

20 

21 

22class CallMode(with_metaclass(ABCMeta, object)): 

23 """ 

24 Abstract class representing a call mode 

25 """ 

26 pass 

27 

28 

29class LocalCallMode(CallMode): 

30 """ 

31 A "local" call mode 

32 """ 

33 pass 

34 

35 

36class RemoteCallMode(CallMode): 

37 """ 

38 Represents a way to call a service. It is composed of a main mode, and various options 

39 """ 

40 @abstractmethod 

41 def call_azureml(self, 

42 service_id, # type: str 

43 service_config, # type: ServiceConfig 

44 ws_inputs, # type: Dict[str, pd.DataFrame] 

45 ws_params=None, # type: Dict[str, str] 

46 ws_output_names=None, # type: List[str] 

47 session=None, # type: Session 

48 by_ref_inputs=None, # type: Dict[str, str] 

49 **kwargs 

50 ): 

51 # type: (...) -> Dict[str, pd.DataFrame] 

52 """ 

53 This method is called by `AzureMLClient` instances when their current call mode is a remote call mode. 

54 

55 :param service_id: the service id that will be used in error messages 

56 :param service_config: 

57 :param ws_inputs: 

58 :param ws_params: 

59 :param ws_output_names: 

60 :param session: 

61 :param by_ref_inputs: 

62 :param kwargs: 

63 :return: 

64 """ 

65 pass 

66 

67 

68class RequestResponse(RemoteCallMode): 

69 """ 

70 Represents the request-response call mode 

71 """ 

72 

73 def __init__(self, 

74 use_swagger_format=False 

75 ): 

76 """ 

77 

78 :param use_swagger_format: a boolean (default False) indicating if the 'swagger' azureml format should be used 

79 to format the data tables in json payloads. 

80 """ 

81 self.use_swagger_format = use_swagger_format 

82 

83 # noinspection PyMethodOverriding 

84 def call_azureml(self, 

85 service_id, # type: str 

86 service_config, # type: ServiceConfig 

87 ws_inputs, # type: Dict[str, pd.DataFrame] 

88 ws_params=None, # type: Dict[str, str] 

89 ws_output_names=None, # type: List[str] 

90 session=None, # type: Session 

91 ): 

92 # type: (...) -> Dict[str, pd.DataFrame] 

93 """ 

94 (See super for description) 

95 """ 

96 validate("%s:base_url" % service_id, service_config.base_url) 

97 validate("%s:api_key" % service_id, service_config.api_key) 

98 

99 # standard azureml request-response call 

100 return execute_rr(api_key=service_config.api_key, base_url=service_config.base_url, 

101 inputs=ws_inputs, params=ws_params, output_names=ws_output_names, 

102 use_swagger_format=self.use_swagger_format, requests_session=session) 

103 

104 

105class Batch(RemoteCallMode): 

106 """ 

107 Represents the "Batch" call mode. 

108 """ 

109 def __init__(self, 

110 polling_period_seconds=5 # type: int 

111 ): 

112 self.polling_period_seconds = polling_period_seconds 

113 

114 # noinspection PyMethodOverriding 

115 def call_azureml(self, 

116 service_id, # type: str 

117 service_config, # type: ServiceConfig 

118 ws_inputs, # type: Dict[str, pd.DataFrame] 

119 ws_params=None, # type: Dict[str, str] 

120 ws_output_names=None, # type: List[str] 

121 session=None, # type: Session 

122 ): 

123 """ 

124 (See super for base description) 

125 :return: 

126 """ 

127 validate("%s:base_url" % service_id, service_config.base_url) 

128 validate("%s:api_key" % service_id, service_config.api_key) 

129 validate("%s:blob_account" % service_id, service_config.blob_account) 

130 validate("%s:blob_api_key" % service_id, service_config.blob_api_key) 

131 validate("%s:blob_container" % service_id, service_config.blob_container) 

132 

133 return execute_bes( 

134 # all of this is filled using the `service_config` 

135 api_key=service_config.api_key, base_url=service_config.base_url, 

136 blob_storage_account=service_config.blob_account, blob_storage_apikey=service_config.blob_api_key, 

137 blob_container=service_config.blob_container, blob_path_prefix=service_config.blob_path_prefix, 

138 # blob_charset=None, 

139 # ------- 

140 inputs=ws_inputs, params=ws_params, output_names=ws_output_names, 

141 nb_seconds_between_status_queries=self.polling_period_seconds, 

142 requests_session=session 

143 ) 

144 

145 

146# class RequestResponseInputsByRef(RequestResponse): 

147# """ 

148# Represents the "Request Response" call mode with additional capability to pass some of the inputs "by reference". 

149# Note that the web service has to be designed accordingly. 

150# """ 

151# # noinspection PyMethodOverriding 

152# def call_azureml(self, 

153# service_config, # type: ServiceConfig 

154# ws_inputs, # type: Dict[str, pd.DataFrame] 

155# ws_params=None, # type: Dict[str, str] 

156# ws_output_names=None, # type: List[str] 

157# session=None, # type: Session 

158# by_ref_inputs=None, # type: Dict[str, str] 

159# ): 

160# # type: (...) -> Dict[str, pd.DataFrame] 

161# """ 

162# (See super for base description) 

163# 

164# :param by_ref_inputs: a dictionary {<input_name>: <param_name>} containing one entry for each input to send 

165# "by reference" rather than "by value". Each such input will be removed from the service inputs (the names 

166# have to be valid input names), its contents will be stored in the blob storage (the same used for batch 

167# mode), and the blob URL will be passed to a new parameter named <param_name> 

168# :return: 

169# """ 

170# # by reference: we have to upload some inputs to the blob storage first 

171# if by_ref_inputs is None: 

172# by_ref_inputs = dict() 

173# 

174# # copy inputs and params since we will modify them 

175# ws_inputs = copy(ws_inputs) 

176# ws_params = copy(ws_params) 

177# 

178# for by_ref_input_name, by_ref_refparam_name in by_ref_inputs.items(): 

179# # -- push input in blob and get a reference 

180# input_to_be_ref = ws_inputs.pop(by_ref_input_name) 

181# sas_url = push_blob_and_get_ref(input_to_be_ref, service_config=service_config, session=session) 

182# 

183# # -- create the new param containing the reference 

184# ws_params[by_ref_refparam_name] = sas_url 

185# 

186# # -- execute Request-Response on alternate 'by ref' endpoint 

187# return execute_rr(api_key=service_config.rr_by_ref_api_key, 

188# base_url=service_config.rr_by_ref_base_url, 

189# inputs=ws_inputs, params=ws_params, output_names=ws_output_names, 

190# requests_session=session) 

191# 

192# 

193# def push_blob_and_get_ref(input, 

194# service_config: ServiceConfig, 

195# session: Session): 

196# """ 

197# Uploads input to the blob storage defined in service_config (blob_account_for_batch, blob_apikey_for_batch). 

198# Generates a temporary shared access key valid for two hours, and returns the corresponding blob URL. 

199# 

200# :param input: 

201# :param service_config: 

202# :param session: 

203# :return: 

204# """ 

205# # a dummy name used only in this method 

206# by_ref_input_name = 'foo' 

207# 

208# # -- first upload the input to a blob storage and get the absolute reference to it. 

209# blob_service = BlockBlobService(account_name=service_config.blob_account_for_batch, 

210# account_key=service_config.blob_apikey_for_batch, 

211# request_session=session) 

212# batch_client = BatchClient(requests_session=session) 

213# input_refs, output_refs = batch_client. \ 

214# push_inputs_to_blob__and__create_output_references({by_ref_input_name: input}, 

215# output_names=[], 

216# blob_service=blob_service, 

217# blob_container=service_config.blob_containername_for_batch, 

218# blob_path_prefix='') 

219# 

220# # -- then generate shared access key (public SAS access to blob) 

221# blob_relative_loc = input_refs[by_ref_input_name]['RelativeLocation'] 

222# blob_name = blob_relative_loc[blob_relative_loc.find('/') + 1:] 

223# expiry_date = datetime.now() + timedelta(hours=2) # expires in 2 hours 

224# sas_token = blob_service.generate_blob_shared_access_signature( 

225# container_name=service_config.blob_containername_for_batch, 

226# blob_name=blob_name, expiry=expiry_date, 

227# permission=BlobPermissions.READ) 

228# sas_url = blob_service.make_blob_url(container_name=service_config.blob_containername_for_batch, 

229# blob_name=blob_name, sas_token=sas_token) 

230# return sas_url