Coverage for odsclient/core.py: 77%

Shortcuts on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

443 statements  

1# Authors: Sylvain MARIE <sylvain.marie@se.com> 

2# + All contributors to <https://github.com/smarie/python-odsclient> 

3# 

4# License: 3-clause BSD, <https://github.com/smarie/python-odsclient/blob/master/LICENSE> 

5import warnings 

6from ast import literal_eval 

7from getpass import getpass 

8import io 

9import os 

10from shutil import copyfile 

11from threading import Lock 

12 

13try: 

14 # Python 3 

15 from urllib.parse import urlparse 

16except ImportError: 

17 # Python 2 

18 from urlparse import urlparse 

19 

20try: 

21 from pathlib import Path 

22except ImportError: 

23 from pathlib2 import Path # python 2 

24 

25import sys 

26 

27if sys.version_info < (3,): 27 ↛ 30line 27 didn't jump to line 30, because the condition on line 27 was never true

28 # py2: we need a version of open that supports encoding and nline endings 

29 # See https://stackoverflow.com/a/10975371/7262247 

30 from io import open 

31 from StringIO import StringIO 

32 string_types = (str, bytes) 

33else: 

34 from io import StringIO 

35 string_types = (str,) 

36 

37try: 

38 from json import loads, JSONDecodeError, dumps 

39except ImportError: 

40 # python 2 

41 from json import loads, dumps 

42 JSONDecodeError = ValueError 

43try: 

44 FileNotFoundError 

45except NameError: 

46 # python 2 

47 FileNotFoundError = IOError 

48 

49from requests import Session, HTTPError 

50 

51try: 

52 # noinspection PyUnresolvedReferences 

53 from typing import Dict, Union, Iterable 

54except ImportError: 

55 pass 

56 

57CACHE_ROOT_FOLDER = ".odsclient" 

58CACHE_ENCODING = "utf-8" 

59ODS_BASE_URL_TEMPLATE = "https://%s.opendatasoft.com" 

60ENV_ODS_APIKEY = 'ODS_APIKEY' 

61KR_DEFAULT_USERNAME = 'apikey_user' 

62 

63 

64class ODSClient(object): 

65 """ 

66 An `ODSClient` is a client for a given OpenDataSoft (ODS) platform. By default the target platform base url is 

67 `https://<platform_id>.opendatasoft.com` with `platform_id='public'`. One can change either customize the platform 

68 id through the `platform_id` constructor argument, or the whole base url with `base_url`. 

69 

70 A client instance offers methods to interact with the various ODS API. Currently three high-level methods are 

71 provided: `<client>.get_whole_dataset(dataset_id, ...)`, `<client>.get_whole_dataframe(dataset_id, ...)` 

72 and `<client>.push_dataset_realtime(dataset_id, ...)`. 

73 

74 A file cache can be activated for the two `get` methods by setting `file_cache` to `True` or to a path-like (string 

75 or `Path`) indicating a custom cache root path. `True` will use the default cache root folder `.odsclient`. 

76 `<client>.get_cached_dataset_entry` can be used to get a `CacheEntry` object representing the (possibly 

77 non-existing) cache entry for a given dataset. 

78 

79 You can customize the `requests.Session` object used for the HTTPS transport using `requests_session`. If you do so, 

80 remember to close it yourself or to switch `auto_close_session` to True. 

81 

82 A client is meant to use a single api key at a time, or none. You can force the api key to be mandatory using 

83 `enforce_apikey=True`. There are 4 ways to pass an api key, they are used in the following order: 

84 

85 - explicitly with the `apikey` argument 

86 

87 - through a text file containing the key. This file if present should be named `ods.apikey` (name can be changed 

88 using `apikey_filepath`, it does not make the file mandatory) 

89 

90 - if `keyring` is installed (`pip install keyring`), an apikey can be created as an entry in it for service 

91 `<base_url>` and username `'apikey_user'`. `keyring` leverages your OS' vault (Windows Credential Locker, 

92 macOS Keychain, etc. This is the **most secure** method available. You can override the default keyring entry 

93 username with the `keyring_entries_username=...` argument. You can easily add or remove an entry in the keyring 

94 through the OS interface, with the `odskeys` commandline utility (`odskeys --help`) or with the 

95 `<client>.store_apikey_in_keyring` / `<client>.get_apikey_from_keyring` / `<client>.remove_apikey_from_keyring` 

96 methods. 

97 

98 - through the `'ODS_APIKEY'` OS environment variable. It should either contain the key without quotes or a 

99 dict-like structure where keys can either be `platform_id`, `base_url`, or the special fallback key `'default'` 

100 

101 For debugging purposes, you may wish to use `<client>.get_apikey()` to check if the api key that is actually used 

102 is the one you think you have configured through one of the above methods. 

103 

104 """ 

105 

106 def __del__(self): 

107 """ 

108 This is called when the garbage collector deletes this object. 

109 Let's use this opportunity to close the requests Session to avoid 

110 leaving hanging Sockets, see https://github.com/smarie/python-odsclient/issues/27 

111 """ 

112 if self.auto_close_session and self.session is not None: 

113 try: 

114 # close the underlying `requests.Session` 

115 self.session.close() 

116 except Exception as e: 

117 warnings.warn("Error while closing session: %r" % e) 

118 

119 def __init__(self, 

120 platform_id='public', # type: str 

121 base_url=None, # type: str 

122 enforce_apikey=False, # type: bool 

123 apikey=None, # type: str 

124 apikey_filepath='ods.apikey', # type: Union[str, Path] 

125 use_keyring=True, # type: bool 

126 keyring_entries_username=KR_DEFAULT_USERNAME, # type: str 

127 requests_session=None, # type: Session 

128 auto_close_session=None # type: bool 

129 ): 

130 """ 

131 Constructor for `ODSClient`s 

132 

133 :param platform_id: the ods platform id to use. This id is used to construct the base URL based on the pattern 

134 https://<platform_id>.opendatasoft.com. Default is `'public'` which leads to the base url 

135 https://public.opendatasoft.com 

136 :param base_url: an explicit base url to use instead of the one generated from `platform_id` 

137 :param enforce_apikey: an optional boolean indicating if an error should be raised if no apikey is found at all 

138 (not in the explicit argument, not in a file, environment variable, nor keyring) (default `False`) 

139 :param apikey: an explicit api key as a string. 

140 :param apikey_filepath: the path that should be used to look for api keys on the file system. Such files are 

141 optional, other (safer) methods exist to pass the api key, see documentation for details. 

142 :param use_keyring: an optional boolean (default `True`) specifying whether the `keyring` library should be 

143 used to lookup existing api keys. Keys should be stored using `store_apikey_in_keyring()`. 

144 :param keyring_entries_username: keyring stores secrets with a key made of a service id and a username. We use 

145 the base url for the service id, however the user name can be anything. By default we use a string: 

146 'apikey_user'. 

147 :param requests_session: an optional `Session` object to use (from `requests` lib). If `None` is provided, 

148 a new `Session` will be used and deleted when this object is garbaged out. If a custom object is provided, 

149 you should close it yourself or switch `auto_close_session` to `True` explicitly. 

150 :param auto_close_session: an optional boolean indicating if `self.session` should be closed when this object 

151 is garbaged out. By default this is `None` and means "`True` if no custom `requests_session` is passed, else 

152 `False`"). Turning this to `False` can leave hanging Sockets unclosed. 

153 """ 

154 # keyring option 

155 self.use_keyring = use_keyring 

156 self.keyring_entries_username = keyring_entries_username 

157 

158 # Construct the base url 

159 if base_url is not None: 

160 if platform_id != 'public' and platform_id is not None: 160 ↛ 161line 160 didn't jump to line 161, because the condition on line 160 was never true

161 raise ValueError("Only one of `platform_id` and `base_url` should be provided. Received " 

162 "platform_id='%s' and base_url='%s'" % (platform_id, base_url)) 

163 # remove trailing slashes 

164 while base_url.endswith('/'): 

165 base_url = base_url[:-1] 

166 self.base_url = base_url 

167 self.platform_id = None 

168 else: 

169 self.platform_id = platform_id 

170 self.base_url = ODS_BASE_URL_TEMPLATE % platform_id 

171 

172 # Load apikey from file and validate it 

173 self.apikey_filepath = apikey_filepath 

174 if apikey is not None: 

175 # api key passed as argument 

176 if apikey_filepath != 'ods.apikey': 176 ↛ 177line 176 didn't jump to line 177, because the condition on line 176 was never true

177 raise ValueError("Only one of `apikey` and custom `apikey_filepath` should be provided.") 

178 self.apikey = apikey 

179 

180 elif apikey_filepath is not None: 180 ↛ 192line 180 didn't jump to line 192, because the condition on line 180 was never false

181 try: 

182 # read the api key from the file 

183 with open(str(apikey_filepath)) as f: 

184 self.apikey = f.read() 

185 except FileNotFoundError: 

186 self.apikey = None 

187 else: 

188 # remove trailing new lines or blanks if any 

189 self.apikey = self.apikey.rstrip() 

190 else: 

191 # no explicit api key. Environment variable will apply 

192 self.apikey = None 

193 

194 if self.apikey is not None and len(self.apikey) == 0: 194 ↛ 195line 194 didn't jump to line 195, because the condition on line 194 was never true

195 raise ValueError('The provided api key is empty!') 

196 

197 # checker flag 

198 self.enforce_apikey = enforce_apikey 

199 

200 # create and store a session 

201 self.session = requests_session or Session() 

202 # auto-close behaviour 

203 if auto_close_session is None: 203 ↛ 206line 203 didn't jump to line 206, because the condition on line 203 was never false

204 # default: only auto-close if this session was created by us. 

205 auto_close_session = requests_session is None 

206 self.auto_close_session = auto_close_session 

207 

208 def get_whole_dataframe(self, 

209 dataset_id, # type: str 

210 use_labels_for_header=True, # type: bool 

211 tqdm=False, # type: bool 

212 block_size=1024, # type: int 

213 file_cache=False, # type: bool 

214 **other_opts 

215 ): 

216 """ 

217 Returns a dataset as a pandas dataframe. pandas must be installed. 

218 

219 :param dataset_id: 

220 :param use_labels_for_header: 

221 :param tqdm: a boolean indicating if a progress bar using tqdm should be displayed. tqdm should be installed 

222 :param block_size: an int block size used in streaming mode when tqdm is used 

223 :param file_cache: a boolean (default False) indicating whether the file should be written to a local cache 

224 `.odsclient/<base_url>_<dataset_id>.<format>`. Or a path-like object with the custom cache root folder. 

225 :param other_opts: 

226 :return: 

227 """ 

228 try: 

229 import pandas as pd 

230 except ImportError as e: 

231 raise Exception("`get_whole_dataframe` requires `pandas` to be installed. [%s] %s" % (e.__class__, e)) 

232 

233 # Combine all the options 

234 opts = other_opts 

235 apikey = self.get_apikey() 

236 if apikey is not None: 236 ↛ 237line 236 didn't jump to line 237, because the condition on line 236 was never true

237 opts['apikey'] = apikey 

238 if use_labels_for_header is not None: 238 ↛ 242line 238 didn't jump to line 242, because the condition on line 238 was never false

239 opts['use_labels_for_header'] = use_labels_for_header 

240 

241 # hardcoded 

242 if 'timezone' in opts: 242 ↛ 243line 242 didn't jump to line 243, because the condition on line 242 was never true

243 raise ValueError("'timezone' should not be specified with this method") 

244 if 'format' in opts: 244 ↛ 245line 244 didn't jump to line 245, because the condition on line 244 was never true

245 raise ValueError("'format' should not be specified with this method") 

246 opts['format'] = format = 'csv' 

247 if 'csv_separator' in opts: 247 ↛ 248line 247 didn't jump to line 248, because the condition on line 247 was never true

248 raise ValueError("'csv_separator' should not be specified with this method") 

249 opts['csv_separator'] = ';' 

250 

251 # Cache usage 

252 if file_cache: 

253 # it can be a boolean or a path 

254 if file_cache is True: 

255 file_cache = CACHE_ROOT_FOLDER 

256 cached_file = self.get_cached_dataset_entry(dataset_id=dataset_id, format=format, cache_root=file_cache) 

257 try: 

258 # try to read the cached file in a thread-safe operation 

259 with cached_file.rw_lock: 

260 cached_file.assert_exists() 

261 df = pd.read_csv(str(cached_file.file_path), sep=';') 

262 return df 

263 except CacheFileNotFoundError: 

264 pass # does not exist. continue to query 

265 else: 

266 cached_file = None 

267 del file_cache 

268 

269 # The URL to call 

270 url = self.get_download_url(dataset_id) 

271 

272 # Execute call in stream mode with automatic content-type decoding 

273 result = self._http_call(url, params=opts, stream=True, decode=True) 

274 # print(iterable_to_stream(result.iter_content()).read()) 

275 # noinspection PyTypeChecker 

276 

277 if tqdm: 

278 from tqdm import tqdm as _tqdm 

279 

280 total_size = int(result.headers.get('Content-Length', 0)) 

281 with _tqdm(desc=url, total=total_size, 

282 unit='B' if block_size == 1024 else 'it', 

283 unit_scale=True, 

284 unit_divisor=block_size 

285 ) as bar: 

286 if not cached_file: 

287 # Directly stream to memory with updates of the progress bar 

288 df = pd.read_csv(iterable_to_stream(result.iter_content(), buffer_size=block_size, progressbar=bar), 

289 sep=';') 

290 else: 

291 # stream to cache file and read the dataframe from the cache (use the lock to make sure it is here) 

292 with cached_file.rw_lock: 

293 cached_file.fill_from_iterable(result.iter_content(block_size), it_encoding=result.encoding, 

294 progress_bar=bar, lock=False) 

295 df = pd.read_csv(str(cached_file.file_path), sep=';') 

296 else: 

297 if not cached_file: 

298 # directly stream to memory dataframe 

299 df = pd.read_csv(iterable_to_stream(result.iter_content(block_size)), sep=';') 

300 else: 

301 # stream to cache file and read the dataframe from the cache (use the lock to make sure it is here) 

302 with cached_file.rw_lock: 

303 cached_file.fill_from_iterable(result.iter_content(block_size), it_encoding=result.encoding, 

304 lock=False) 

305 df = pd.read_csv(str(cached_file.file_path), sep=';') 

306 

307 return df 

308 

309 # noinspection PyShadowingBuiltins 

310 def get_whole_dataset(self, 

311 dataset_id, # type: str 

312 format='csv', # type: str 

313 timezone=None, # type: str 

314 use_labels_for_header=True, # type: bool 

315 csv_separator=';', # type: str 

316 tqdm=False, # type: bool 

317 to_path=None, # type: Union[str, Path] 

318 file_cache=False, # type: bool 

319 block_size=1024, # type: int 

320 **other_opts 

321 ): 

322 """ 

323 Returns a dataset as a csv string. 

324 

325 :param dataset_id: 

326 :param format: 

327 :param timezone: 

328 :param use_labels_for_header: 

329 :param csv_separator: ';', ','... 

330 :param tqdm: a boolean indicating if a progress bar using tqdm should be displayed. tqdm should be installed 

331 :param to_path: a string indicating the file path where to write the csv. In that case None is returned 

332 :param file_cache: a boolean (default False) indicating whether the file should be written to a local cache 

333 `.odsclient/<base_url>_<dataset_id>.<format>`. Or a path-like object with the custom cache root folder. 

334 :param block_size: an int block size used in streaming mode when to_csv or tqdm is used 

335 :param other_opts: 

336 :return: 

337 """ 

338 

339 # ------- To uncomment one day if headers and/or body are needed 

340 # headers = {'Authorization': ('Bearer ' + api_key)} 

341 # 

342 # if not (requestJsonBodyStr is None): 

343 # # first encode the string as bytes using the charset 

344 # charset = 'utf-8' 

345 # json_body_encoded_with_charset = str.encode(requestJsonBodyStr, encoding=charset) 

346 # headers['Content-Type'] = 'application/json; charset=' + charset 

347 # else: 

348 # json_body_encoded_with_charset = None 

349 # ------------------ 

350 

351 # Combine all the options 

352 opts = other_opts 

353 apikey = self.get_apikey() 

354 if apikey is not None: 

355 opts['apikey'] = apikey 

356 if format is not None: 356 ↛ 358line 356 didn't jump to line 358, because the condition on line 356 was never false

357 opts['format'] = format 

358 if timezone is not None: 358 ↛ 359line 358 didn't jump to line 359, because the condition on line 358 was never true

359 opts['timezone'] = timezone 

360 if use_labels_for_header is not None: 360 ↛ 362line 360 didn't jump to line 362, because the condition on line 360 was never false

361 opts['use_labels_for_header'] = use_labels_for_header 

362 if csv_separator is not None: 362 ↛ 366line 362 didn't jump to line 366, because the condition on line 362 was never false

363 opts['csv_separator'] = csv_separator 

364 

365 # The URL to call 

366 url = self.get_download_url(dataset_id) 

367 

368 # Should we write anything to disk ? 

369 # -- Because it is the target 

370 if to_path is not None: 

371 if isinstance(to_path, string_types): 371 ↛ 372line 371 didn't jump to line 372, because the condition on line 371 was never true

372 to_path = Path(to_path) 

373 to_path.parent.mkdir(parents=True, exist_ok=True) # make sure the parents exist 

374 

375 # -- Because the cache is used 

376 if file_cache: 

377 # it can be a boolean or a path 

378 if file_cache is True: 

379 file_cache = CACHE_ROOT_FOLDER 

380 cached_file = self.get_cached_dataset_entry(dataset_id=dataset_id, format=format, cache_root=file_cache) 

381 try: 

382 # Do NOT call cached_file.exists(): not thread-safe 

383 if to_path is None: 

384 return cached_file.read() # this is atomic: thread-safe 

385 else: 

386 cached_file.copy_to_file(to_path) # this is atomic: thread-safe 

387 return None 

388 except CacheFileNotFoundError: 

389 # does not exist. continue to query 

390 pass 

391 else: 

392 cached_file = None 

393 del file_cache 

394 

395 # Execute call, since no cache was used 

396 result = None 

397 if not tqdm: 

398 if to_path is None: 

399 # We need to return a csv string, so load everything in memory 

400 result, content_type = self._http_call(url, params=opts, stream=False, decode=True) 

401 

402 if cached_file: # cache it in local cache if needed 

403 cached_file.fill_from_str(txt_initial_encoding=content_type, decoded_txt=result) 

404 else: 

405 # No need to return a csv string: stream directly to csv file (no decoding/encoding) 

406 r = self._http_call(url, params=opts, stream=True, decode=False) 

407 with open(str(to_path), mode='wb') as f: 

408 for data in r.iter_content(block_size): 

409 f.write(data) 

410 

411 if cached_file: # cache it in local cache if needed 

412 cached_file.fill_from_file(file_path=to_path, file_encoding=r.encoding) 

413 else: 

414 # Progress bar is needed: we need streaming mode 

415 r = self._http_call(url, params=opts, stream=True, decode=False) 

416 total_size = int(r.headers.get('Content-Length', 0)) 

417 

418 from tqdm import tqdm as _tqdm 

419 with _tqdm(desc=url, total=total_size, 

420 unit='B' if block_size == 1024 else 'it', 

421 unit_scale=True, 

422 unit_divisor=block_size 

423 ) as bar: 

424 if to_path is None: 

425 result = io.StringIO() # stream to a string in memory 

426 for data in r.iter_content(block_size): # block by block 

427 bar.update(len(data)) # - update progress bar 

428 result.write(data.decode(r.encoding)) # - decode with proper encoding 

429 result = result.getvalue() 

430 

431 if cached_file: # cache it in local cache if needed 

432 cached_file.fill_from_str(txt_initial_encoding=r.encoding, decoded_txt=result) 

433 else: 

434 with open(str(to_path), 'wb') as f: # stream to csv file in binary mode 

435 for data in r.iter_content(block_size): # block by block 

436 bar.update(len(data)) # - update progress bar 

437 f.write(data) # - direct copy (no decoding/encoding) 

438 

439 if cached_file: # cache it in local cache if needed 

440 cached_file.fill_from_file(file_path=to_path, file_encoding=r.encoding) 

441 

442 if total_size != 0 and bar.n != total_size: 442 ↛ 443line 442 didn't jump to line 443, because the condition on line 442 was never true

443 raise ValueError("ERROR, something went wrong") 

444 

445 return result 

446 

447 # noinspection PyShadowingBuiltins 

448 def push_dataset_realtime(self, 

449 dataset_id, # type: str 

450 dataset, # type: Union[str, pandas.DataFrame] 

451 push_key, # type: str 

452 format='csv', # type: str 

453 csv_separator=';', # type: str 

454 **other_opts 

455 ): 

456 """ 

457 Pushes a Dataset. This functions accepts either a Pandas Dataframe or a CSV string with header included. 

458 

459 :param dataset_id: 

460 :param dataset: The dataset to push as a list of dicts, where the dict keys are the column names 

461 :param push_key: The Push Key provided by the API for pushing this dataset. Warning: This key is independent 

462 from the API key. It can be acquired from the Realtime Push API URL section in ODS. 

463 :param format: The format of the dataset to be pushed. Can be `pandas` or `csv`. 

464 :param csv_separator: CSV separator character in case of a csv dataset input. 

465 :returns: HTTP Response status 

466 """ 

467 

468 if format == 'pandas': 

469 try: 

470 import pandas as pd 

471 except ImportError as e: 

472 raise Exception("`push_dataset_realtime` with the `pandas` format requires `pandas` to be installed. " 

473 "[%s] %s" % (e.__class__, e)) 

474 

475 if not isinstance(dataset, pd.DataFrame): 

476 raise TypeError('If format is set to "pandas" then `dataset` should be a DataFrame. Found %s' 

477 % type(dataset)) 

478 

479 dataset # type:pandas.DataFrame 

480 request_body = dataset.to_json(orient='records') 

481 elif format == 'csv': 

482 try: 

483 import csv 

484 except ImportError as e: 

485 raise Exception("`push_dataset_realtime` with the `csv` format requires `csv` to be installed. [%s] %s" 

486 % (e.__class__, e)) 

487 # noinspection PyStatementEffect 

488 dataset # type:str 

489 csv_reader = csv.DictReader(StringIO(dataset), delimiter=csv_separator) 

490 request_body = dumps([r for r in csv_reader]) 

491 else: 

492 raise ValueError("Dataset format must be either `pandas` or `csv`") 

493 

494 # Combine all the options 

495 opts = other_opts 

496 opts['pushkey'] = push_key 

497 

498 # The URL to call 

499 url = self.get_realtime_push_url(dataset_id) 

500 

501 # Execute call 

502 return self._http_call(url, method='post', body=request_body, params=opts, decode=False) 

503 

504 def store_apikey_in_keyring(self, 

505 apikey=None # type: str 

506 ): 

507 """ 

508 Convenience method to store a password in the OS keyring using `keyring` lib. 

509 

510 This method is a shortcut for `keyring.set_password(<base_url>, <keyring_entries_username>, <apikey>)`. 

511 

512 :param apikey: an explicit apikey string. If not provided, `getpass()` will be used to prompt the user for the 

513 api key 

514 :return: 

515 """ 

516 import keyring 

517 if apikey is None: 517 ↛ 518line 517 didn't jump to line 518, because the condition on line 517 was never true

518 apikey = getpass(prompt="Please enter your api key: ") 

519 

520 if apikey is None or len(apikey) == 0: 520 ↛ 521line 520 didn't jump to line 521, because the condition on line 520 was never true

521 raise ValueError("Empty api key provided.") 

522 

523 keyring.set_password(self.base_url, self.keyring_entries_username, apikey) 

524 

525 def remove_apikey_from_keyring(self): 

526 """ 

527 Convenience method to remove a previously stored password in the OS keyring using `keyring` lib. 

528 

529 :return: 

530 """ 

531 import keyring 

532 keyring.delete_password(self.base_url, self.keyring_entries_username) 

533 

534 def get_apikey_from_keyring(self, ignore_import_errors=False): 

535 """ 

536 Looks for a keyring entry containing the api key and returns it. 

537 If not found, returns `None` 

538 

539 :param ignore_import_errors: when this is `True`, the method will return `None` if `keyring` is not installed 

540 instead of raising an `ImportError`. 

541 :return: 

542 """ 

543 if ignore_import_errors: 

544 try: 

545 import keyring 

546 except ImportError as e: 

547 # not installed: simply warn instead of raising exception 

548 warnings.warn("`keyring` is not installed but the `ODSClient` is configured to use it. You can either" 

549 "set `use_keyring=False` explicitly or install keyring to disable this warning. " 

550 "Caught: %r" % e) 

551 return None 

552 else: 

553 # do not catch any exception 

554 import keyring 

555 

556 for _url in (self.base_url, self.base_url + '/'): 

557 apikey = keyring.get_password(_url, self.keyring_entries_username) 

558 if apikey is not None: 

559 return apikey 

560 

561 def get_apikey_from_envvar(self): 

562 """ 

563 Looks for the 'ODS_APIKEY' environment variable. 

564 

565 - if it does not exist return None 

566 - otherwise if the env variable does not begin with '{', consider it as the key 

567 - if it begins with '{', loads it as a dict and find a match in it, in the following order: 

568 platform_id, base_url, 'default' 

569 

570 If the found key is an empty string, a ValueError is raised. 

571 

572 :return: the api key found in the 'ODS_APIKEY' env variable (possibly for this platform_id / 

573 base_url), or None if it does not exist. 

574 """ 

575 try: 

576 env_api_key = os.environ[ENV_ODS_APIKEY] 

577 except KeyError: 

578 # no env var - return None 

579 return None 

580 

581 if len(env_api_key) > 0 and env_api_key[0] == '{': 

582 # a dictionary: use ast.literal_eval: more permissive than json and as safe. 

583 apikeys_dct = literal_eval(env_api_key) 

584 if not isinstance(apikeys_dct, dict): 584 ↛ 585line 584 didn't jump to line 585, because the condition on line 584 was never true

585 raise TypeError("Environment variable contains something that is neither a str not a dict") 

586 

587 # remove trailing slash in keys 

588 def _remove_trailing_slash(k): 

589 while k.endswith('/'): 

590 k = k[:-1] 

591 return k 

592 

593 apikeys_dct = {_remove_trailing_slash(k): v for k, v in apikeys_dct.items()} 

594 

595 # Try to get a match in the dict: first platform id, then base url, then default 

596 if self.platform_id in apikeys_dct: 596 ↛ 597line 596 didn't jump to line 597, because the condition on line 596 was never true

597 env_api_key = apikeys_dct[self.platform_id] 

598 elif self.base_url in apikeys_dct: 

599 env_api_key = apikeys_dct[self.base_url] 

600 elif 'default' in apikeys_dct: 600 ↛ 603line 600 didn't jump to line 603, because the condition on line 600 was never false

601 env_api_key = apikeys_dct['default'] 

602 else: 

603 return None 

604 

605 if len(env_api_key) == 0: 605 ↛ 606line 605 didn't jump to line 606, because the condition on line 605 was never true

606 raise ValueError("Empty api key found in '%s' environment variable." % ENV_ODS_APIKEY) 

607 

608 return env_api_key 

609 

610 def get_apikey(self): 

611 """ 

612 Returns the api key that this client currently uses. 

613 

614 :return: 

615 """ 

616 # 1- if there is an overridden api key, use it 

617 if self.apikey is not None: 

618 return self.apikey 

619 

620 # 2- if keyring service is installed and contains an entry, use it 

621 if self.use_keyring: 621 ↛ 627line 621 didn't jump to line 627, because the condition on line 621 was never false

622 apikey = self.get_apikey_from_keyring(ignore_import_errors=True) 

623 if apikey is not None: 

624 return apikey 

625 

626 # 3- check existence of the reference environment variable 

627 apikey = self.get_apikey_from_envvar() 

628 if apikey is not None: 

629 return apikey 

630 

631 # 4- finally if no key was found, raise an exception if a key was required 

632 if self.enforce_apikey: 

633 raise NoODSAPIKeyFoundError(self) 

634 

635 def get_download_url(self, 

636 dataset_id # type: str 

637 ): 

638 # type: (...) -> str 

639 """ 

640 

641 :param dataset_id: 

642 :return: 

643 """ 

644 return "%s/explore/dataset/%s/download/" % (self.base_url, dataset_id) 

645 

646 def get_cached_dataset_entry(self, 

647 dataset_id, # type: str 

648 format, # type: str 

649 cache_root=None # type: Union[str, Path] 

650 ): 

651 # type: (...) -> CacheEntry 

652 """ 

653 Returns a `CacheEntry` for the given dataset 

654 :param dataset_id: 

655 :param format: 

656 :param cache_root: 

657 :return: 

658 """ 

659 if self.platform_id is not None: 

660 p = self.platform_id 

661 else: 

662 p = baseurl_to_id_str(self.base_url) 

663 return CacheEntry(dataset_id=dataset_id, dataset_format=format, platform_pseudo_id=p, cache_root=cache_root) 

664 

665 def get_realtime_push_url(self, 

666 dataset_id, # type: str 

667 ): 

668 # type: (...) -> str 

669 """ 

670 

671 :param dataset_id: 

672 :return: 

673 """ 

674 return "%s/api/push/1.0/%s/realtime/push/" % (self.base_url, dataset_id) 

675 

676 def _http_call(self, 

677 url, # type: str 

678 body=None, # type: bytes 

679 headers=None, # type: Dict[str, str] 

680 method='get', # type: str 

681 params=None, # type: Dict[str, str] 

682 decode=True, # type: bool 

683 stream=False # type: bool 

684 ): 

685 """ 

686 Sub-routine for HTTP web service call. If Body is None, a GET is performed 

687 

688 :param url: 

689 :param body: 

690 :param headers: 

691 :param method: 

692 :param params: 

693 :param decode: a boolean (default True) indicating if the contents should be automatically decoded following 

694 the content-type encoding received in the HTTP response. If this is True and stream=False (default), the 

695 function returns a tuple (body, content type) 

696 :param stream: 

697 :return: either a tuple (text, encoding) (if stream=False and decode=True), or the response object 

698 """ 

699 try: 

700 # Send the request (DO NOT encode the params, this is done automatically) 

701 response = self.session.request(method, url, headers=headers, data=body, params=params, stream=stream) 

702 

703 # Success ? Read status code, raise an HTTPError if status is error 

704 # status = int(response.status_code) 

705 response.raise_for_status() 

706 

707 # detect a "wrong 200 but true 401" (unauthorized) 

708 if 'html' in response.headers['Content-Type']: 

709 raise InsufficientRightsForODSResourceError(response.headers, response.text) 

710 

711 if not stream: 

712 if decode: 712 ↛ 717line 712 didn't jump to line 717, because the condition on line 712 was never false

713 # Contents (encoding is automatically used to read the body when calling response.text) 

714 result = response.text 

715 return result, response.encoding 

716 else: 

717 return response 

718 else: 

719 response.raw.decode_content = decode 

720 return response 

721 

722 except HTTPError as error: 

723 try: 

724 body = error.response.text 

725 # { 

726 # "errorcode": 10002, 

727 # "reset_time": "2017-10-17T00:00:00Z", 

728 # "limit_time_unit": "day", 

729 # "call_limit": 10000, 

730 # "error": "Too many requests on the domain. Please contact the domain administrator." 

731 # } 

732 details = loads(body) 

733 except JSONDecodeError: 

734 # error parsing the json payload? 

735 pass 

736 else: 

737 raise ODSException(error.response.status_code, error.response.headers, **details) 

738 

739 raise error 

740 

741 

742class NoODSAPIKeyFoundError(Exception): 

743 """ 

744 Raised when no api key was found (no explicit api key provided, no api key file, no env variable entry, no keyring 

745 entry) 

746 """ 

747 

748 def __init__(self, 

749 odsclient # type: ODSClient 

750 ): 

751 self.odsclient = odsclient 

752 

753 def __str__(self): 

754 return "ODS API key file not found, while it is marked as mandatory for this call (`enforce_apikey=True`). " \ 

755 "It should either be put in a text file at path '%s', or in the `ODS_APIKEY` OS environment variable, " \ 

756 "or (recommended, most secure) in the local `keyring`. " \ 

757 "See documentation for details: %s. Note that you can generate an API key on this web page: " \ 

758 "%s/account/my-api-keys/." \ 

759 % (self.odsclient.apikey_filepath, "https://smarie.github.io/python-odsclient/#c-declaring-an-api-key", 

760 self.odsclient.base_url) 

761 

762 

763class InsufficientRightsForODSResourceError(Exception): 

764 """ 

765 Raised when a HTTP 200 is received from ODS together with an HTML page as body. This happens when api key is 

766 missing or does not grant the appropriate rights for the required resource. 

767 """ 

768 

769 def __init__(self, headers, contents): 

770 self.headers = headers 

771 self.contents = contents 

772 

773 def __str__(self): 

774 return "An ODS query returned a HTTP 200 (OK) but with a html content-type. This is probably an " \ 

775 "authentication problem, please check your api key using `get_apikey()`. " \ 

776 "Headers:\n%s\nResponse:\n%s\n" % (self.headers, self.contents) 

777 

778 

779class ODSException(Exception): 

780 """ 

781 An error returned by the ODS API 

782 """ 

783 

784 def __init__(self, status_code, headers, **details): 

785 """ 

786 

787 :param status_code: 

788 :param headers: 

789 :param details: 

790 """ 

791 super(ODSException, self).__init__() 

792 self.status_code = status_code 

793 self.headers = headers 

794 self.error_msg = details['error'] 

795 self.details = details 

796 

797 def __str__(self): 

798 return repr(self) 

799 

800 def __repr__(self): 

801 return "Request failed (%s): %s\nDetails: %s\nHeaders: %s" % (self.status_code, self.error_msg, 

802 self.details, self.headers) 

803 

804 

805def create_session_for_fiddler(): 

806 # type: (...) -> Session 

807 return create_session_for_proxy(http_proxyhost='localhost', http_proxyport=8888, 

808 use_http_for_https_proxy=True, ssl_verify=False) 

809 

810 

811def create_session_for_proxy(http_proxyhost, # type: str 

812 http_proxyport, # type: int 

813 https_proxyhost=None, # type: str 

814 https_proxyport=None, # type: int 

815 use_http_for_https_proxy=False, # type: bool 

816 ssl_verify=None 

817 ): 

818 # type: (...) -> Session 

819 """ 

820 Helper method to configure the `requests` package to use the proxy fo your choice and adapt the SSL certificate 

821 validation accordingly. Note that this is only if you do not with to use the default configuration (inherited 

822 from environment variables, so you can use https://smarie.github.io/develop-behind-proxy/switching/#envswitcher) 

823 

824 :param http_proxyhost: mandatory proxy host for http 

825 :param http_proxyport: mandatory proxy port for http 

826 :param https_proxyhost: optional proxy host for https. If none is provided, http_proxyhost will be used 

827 :param https_proxyport: optional proxy port for https. If none is provided, http_proxyport will be used 

828 :param use_http_for_https_proxy: optional, if set to true the http protocol will be used to initiate communications 

829 with the proxy even for https calls (then calls will be done in https as usual). 

830 :param ssl_verify: optional ssl verification parameter. It may either be the path to an additional certificate 

831 to trust (recommended), or a boolean to enable (default)/disable (not recommended ! use only in debug mode !) 

832 certificate validation. 

833 See here for details : http://docs.python-requests.org/en/master/user/advanced/#ssl-cert-verification 

834 :return: a requests.Session object that you may use with the rest of the library 

835 """ 

836 # config and fallback 

837 https_proxyhost = https_proxyhost if https_proxyhost is not None else http_proxyhost 

838 https_proxyport = https_proxyport if https_proxyport is not None else http_proxyport 

839 https_proxy_protocol = 'http' if use_http_for_https_proxy else 'https' 

840 

841 s = Session() 

842 s.proxies = { 

843 'http': 'http://%s:%s' % (http_proxyhost, http_proxyport), 

844 'https': '%s://%s:%s' % (https_proxy_protocol, https_proxyhost, https_proxyport), 

845 } 

846 if not (ssl_verify is None): 

847 s.verify = ssl_verify 

848 

849 # IMPORTANT : otherwise the environment variables will always have precedence over user settings 

850 s.trust_env = False 

851 

852 return s 

853 

854 

855def iterable_to_stream(iterable, buffer_size=io.DEFAULT_BUFFER_SIZE, progressbar=None): 

856 """ 

857 Lets you use an iterable (e.g. a generator) that yields bytestrings as a read-only 

858 input stream. 

859 

860 The stream implements Python 3's newer I/O API (available in Python 2's io module). 

861 For efficiency, the stream is buffered. 

862 

863 Source: https://stackoverflow.com/a/20260030/7262247 

864 """ 

865 

866 class IterStream(io.RawIOBase): 

867 def __init__(self): 

868 self.leftover = None 

869 

870 def readable(self): 

871 return True 

872 

873 def readinto(self, b): 

874 try: 

875 ln = len(b) # We're supposed to return at most this much 

876 chunk = self.leftover or next(iterable) 

877 output, self.leftover = chunk[:ln], chunk[ln:] 

878 b[:len(output)] = output 

879 if progressbar: 879 ↛ 880line 879 didn't jump to line 880, because the condition on line 879 was never true

880 progressbar.update(len(output)) 

881 return len(output) 

882 except StopIteration: 

883 return 0 # indicate EOF 

884 

885 return io.BufferedReader(IterStream(), buffer_size=buffer_size) 

886 

887 

888class CacheFileNotFoundError(FileNotFoundError): 

889 pass 

890 

891 

892class CacheEntry(object): 

893 """ 

894 Represents a cache entry for a dataset, under `cache_root` (default CACHE_ROOT_FOLDER). 

895 It may not exist. 

896 

897 Access to the file are thread-safe (atomic) to avoid collisions while the file is updated. 

898 """ 

899 __slots__ = ('dataset_id', 'dataset_format', 'platform_pseudo_id', '_cache_root', 'rw_lock') 

900 

901 def __init__(self, 

902 dataset_id, # type: str 

903 dataset_format, # type: str 

904 platform_pseudo_id, # type: str 

905 cache_root=None # type: Union[str, Path] 

906 ): 

907 """Constructor from a dataset id and an optional root cache path""" 

908 self.dataset_id = dataset_id 

909 self.dataset_format = dataset_format 

910 self.platform_pseudo_id = platform_pseudo_id 

911 self.rw_lock = Lock() 

912 

913 if cache_root is None: 

914 self._cache_root = None 

915 else: 

916 if isinstance(cache_root, string_types): 916 ↛ 918line 916 didn't jump to line 918, because the condition on line 916 was never false

917 cache_root = Path(cache_root) 

918 self._cache_root = cache_root 

919 

920 def __repr__(self): 

921 return "CacheEntry(path='%s')" % self.file_path 

922 

923 def exists(self): 

924 """Return True if there is a file for this cache entry, at self.file_path. Note that in multithread context 

925 this does not ensure that this will remain True at next python code step :) """ 

926 return self.file_path.exists() 

927 

928 @property 

929 def cache_root(self): 

930 # type: (...) -> Path 

931 """The root folder of the cache where this entry sits""" 

932 return self._cache_root if self._cache_root is not None else CACHE_ROOT_FOLDER 

933 

934 @property 

935 def file_path(self): 

936 # type: (...) -> Path 

937 """The file where this entry sits (it may exist or not)""" 

938 return Path("%s/%s/%s.%s" % (self.cache_root, self.platform_pseudo_id, self.dataset_id, self.dataset_format)) 

939 

940 def assert_exists(self): 

941 """Raises an error if the file does not exist""" 

942 if not self.exists(): 

943 raise CacheFileNotFoundError("Cached file entry can not be read as it does not exist: '%s'" 

944 % self.file_path) 

945 

946 def read(self): 

947 # type: (...) -> str 

948 """ 

949 Returns a string read from the cached file. 

950 Preserve line endings thanks to newline='' see See https://stackoverflow.com/a/50996542/7262247 

951 """ 

952 with self.rw_lock: # potentially wait for ongoing write/read to be completed, and prevent others to happen 

953 self.assert_exists() 

954 with self.file_path.open(mode="rt", newline='', encoding=CACHE_ENCODING) as f: 

955 result = f.read() 

956 return result 

957 

958 def copy_to_file(self, 

959 file_path # type: Union[str, Path] 

960 ): 

961 """ 

962 Copy this cached file to to_path. Note that it will be encoded using CACHE_ENCODING 

963 but a warning was already issued at dataset retrieval time if original encoding was different 

964 """ 

965 with self.rw_lock: # potentially wait for ongoing write/read to be completed, and prevent others to happen 

966 self.assert_exists() 

967 copyfile(str(self.file_path), str(file_path)) 

968 

969 def delete(self): 

970 """ 

971 Removes this cache entry with thread-safe protection 

972 """ 

973 with self.rw_lock: 

974 if not self.exists(): 974 ↛ 975line 974 didn't jump to line 975, because the condition on line 974 was never true

975 warnings.warn("Can not delete file entry: file does not exist: '%s'" % self.file_path) 

976 else: 

977 os.remove(str(self.file_path)) 

978 

979 def prepare_for_writing(self): 

980 """ 

981 Makes all parent directories if needed 

982 Issues a warning if the file exists and is therefore overridden 

983 """ 

984 if self.exists(): 984 ↛ 985line 984 didn't jump to line 985, because the condition on line 984 was never true

985 warnings.warn("Cached file entry already exists and will be overridden: '%s'" % self.file_path) 

986 

987 # make sure the parents exist 

988 self.file_path.parent.mkdir(parents=True, exist_ok=True) 

989 

990 def fill_from_str(self, 

991 txt_initial_encoding, # type: str 

992 decoded_txt, # type: str 

993 ): 

994 """Writes a text string (already decoded) to the cache, according to the cache's encoding 

995 

996 If the original encoding is not equal to the cache encoding, a warning is issued. 

997 """ 

998 with self.rw_lock: # potentially wait for ongoing write/read to be completed, and prevent others to happen 

999 self.prepare_for_writing() 

1000 # Our cache uses utf-8 for all files, in order not to have to remember encodings to read back 

1001 if txt_initial_encoding != CACHE_ENCODING: 1001 ↛ 1002line 1001 didn't jump to line 1002, because the condition on line 1001 was never true

1002 self.warn_encoding(original_encoding=txt_initial_encoding, cache_encoding=CACHE_ENCODING) 

1003 

1004 # copy with the correct encoding 

1005 self.file_path.write_bytes(decoded_txt.encode(CACHE_ENCODING)) 

1006 

1007 def _fill_from_it_no_lock(self, 

1008 it, # type: Iterable 

1009 it_encoding, # type: str 

1010 progress_bar=None, 

1011 ): 

1012 """The no-lock version of fill from iterable""" 

1013 

1014 self.prepare_for_writing() 

1015 if it_encoding == CACHE_ENCODING: 1015 ↛ 1024line 1015 didn't jump to line 1024, because the condition on line 1015 was never false

1016 # no encoding change: direct copy 

1017 with open(str(self.file_path), 'wb') as f: # stream to csv file in binary mode 

1018 for data in it: # block by block 

1019 if progress_bar: 1019 ↛ 1020line 1019 didn't jump to line 1020, because the condition on line 1019 was never true

1020 progress_bar.update(len(data)) # - update progress bar 

1021 f.write(data) # - direct copy (no decoding/encoding) 

1022 else: 

1023 # Our cache uses utf-8 for all files, in order not to have to remember encodings to read back 

1024 self.warn_encoding(original_encoding=it_encoding, cache_encoding=CACHE_ENCODING) 

1025 

1026 # we will need transcoding. Fully stream to memory string and dump to cache and datframe after 

1027 csv_str_io = io.StringIO() # stream to a string in memory 

1028 for data in it: # block by block 

1029 if progress_bar: 

1030 progress_bar.update(len(data)) # - update progress bar 

1031 csv_str_io.write(data.decode(it_encoding)) # - decode with proper encoding 

1032 csv_str = csv_str_io.getvalue() 

1033 

1034 # store in cache with proper encoding 

1035 self.file_path.write_bytes(csv_str.encode(CACHE_ENCODING)) 

1036 

1037 def fill_from_iterable(self, 

1038 it, # type: Iterable 

1039 it_encoding, # type: str 

1040 progress_bar=None, 

1041 lock=True 

1042 ): 

1043 """ 

1044 Fill this cache entry from an iterable of bytes 

1045 :param it: 

1046 :param it_encoding: 

1047 :param progress_bar: 

1048 :return: csv_str if it was streamed to memory in the process (if transcoding was needed) 

1049 """ 

1050 if lock: 1050 ↛ 1051line 1050 didn't jump to line 1051, because the condition on line 1050 was never true

1051 with self.rw_lock: # potentially wait for ongoing write/read to be completed, and prevent others to happen 

1052 self._fill_from_it_no_lock(it=it, it_encoding=it_encoding, progress_bar=progress_bar) 

1053 else: 

1054 self._fill_from_it_no_lock(it=it, it_encoding=it_encoding, progress_bar=progress_bar) 

1055 

1056 def fill_from_file(self, 

1057 file_path, # type: Union[str, Path] 

1058 file_encoding, # type: str 

1059 ): 

1060 """Copies a file to the cache. 

1061 

1062 If the original encoding is not equal to the cache encoding, conversion happens and a warning is issued. 

1063 """ 

1064 with self.rw_lock: # potentially wait for ongoing write/read to be completed, and prevent others to happen 

1065 self.prepare_for_writing() 

1066 if file_encoding == CACHE_ENCODING: 1066 ↛ 1071line 1066 didn't jump to line 1071, because the condition on line 1066 was never false

1067 # no encoding change: direct copy 

1068 copyfile(str(file_path), str(self.file_path)) 

1069 else: 

1070 # Our cache uses utf-8 for all files, in order not to have to remember encodings to read back 

1071 self.warn_encoding(original_encoding=file_encoding, cache_encoding=CACHE_ENCODING) 

1072 # read with newline-preserve and with original encoding 

1073 with open(str(file_path), mode="rt", newline='', encoding=file_encoding) as f_src: 

1074 contents = f_src.read() 

1075 # write with the cache encoding 

1076 with open(str(self.file_path), mode='wt', encoding=CACHE_ENCODING) as f_dest: 

1077 f_dest.write(contents) 

1078 

1079 def warn_encoding(self, original_encoding, cache_encoding): 

1080 """ 

1081 Issues a warning when the original encoding was different from the one in the cache and a conversion occured 

1082 """ 

1083 warnings.warn( 

1084 "[odsclient-cache] Cached file for dataset %r will use %r encoding while original encoding on " 

1085 " ODS was %r. This will most probably have no side effects except if your dataset" 

1086 " contains characters that can not be encoded in utf-8 such as old/alternative" 

1087 " forms of east asian kanji. See https://en.wikipedia.org/wiki/Unicode#Issues" 

1088 % (self.dataset_id, cache_encoding, original_encoding)) 

1089 

1090 

1091def baseurl_to_id_str(base_url): 

1092 """ Transform an ODS platform url into an identifier string usable for example as file/folder name""" 

1093 

1094 o = urlparse(base_url) 

1095 

1096 # start with host name 

1097 result_str = o.netloc 

1098 

1099 # simplify the public ODS site 

1100 if result_str.endswith(".opendatasoft.com"): 

1101 result_str = result_str.replace(".opendatasoft.com", "") 

1102 

1103 # optionally add custom sub-path 

1104 if o.path and o.path != "/": 

1105 _path = o.path.replace("/", "_") 

1106 

1107 # ensure trailing _ 

1108 if not _path.startswith("_"): 1108 ↛ 1109line 1108 didn't jump to line 1109, because the condition on line 1108 was never true

1109 _path = "_" + _path 

1110 

1111 # ensure no ending _ 

1112 if _path.endswith("_"): 

1113 _path = _path[:-1] 

1114 

1115 result_str += _path 

1116 

1117 return result_str