Coverage for odsclient/core.py: 77%
Shortcuts on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Authors: Sylvain MARIE <sylvain.marie@se.com>
2# + All contributors to <https://github.com/smarie/python-odsclient>
3#
4# License: 3-clause BSD, <https://github.com/smarie/python-odsclient/blob/master/LICENSE>
5import warnings
6from ast import literal_eval
7from getpass import getpass
8import io
9import os
10from shutil import copyfile
11from threading import Lock
13try:
14 # Python 3
15 from urllib.parse import urlparse
16except ImportError:
17 # Python 2
18 from urlparse import urlparse
20try:
21 from pathlib import Path
22except ImportError:
23 from pathlib2 import Path # python 2
25import sys
27if sys.version_info < (3,): 27 ↛ 30line 27 didn't jump to line 30, because the condition on line 27 was never true
28 # py2: we need a version of open that supports encoding and nline endings
29 # See https://stackoverflow.com/a/10975371/7262247
30 from io import open
31 from StringIO import StringIO
32 string_types = (str, bytes)
33else:
34 from io import StringIO
35 string_types = (str,)
37try:
38 from json import loads, JSONDecodeError, dumps
39except ImportError:
40 # python 2
41 from json import loads, dumps
42 JSONDecodeError = ValueError
43try:
44 FileNotFoundError
45except NameError:
46 # python 2
47 FileNotFoundError = IOError
49from requests import Session, HTTPError
51try:
52 # noinspection PyUnresolvedReferences
53 from typing import Dict, Union, Iterable
54except ImportError:
55 pass
57CACHE_ROOT_FOLDER = ".odsclient"
58CACHE_ENCODING = "utf-8"
59ODS_BASE_URL_TEMPLATE = "https://%s.opendatasoft.com"
60ENV_ODS_APIKEY = 'ODS_APIKEY'
61KR_DEFAULT_USERNAME = 'apikey_user'
64class ODSClient(object):
65 """
66 An `ODSClient` is a client for a given OpenDataSoft (ODS) platform. By default the target platform base url is
67 `https://<platform_id>.opendatasoft.com` with `platform_id='public'`. One can change either customize the platform
68 id through the `platform_id` constructor argument, or the whole base url with `base_url`.
70 A client instance offers methods to interact with the various ODS API. Currently three high-level methods are
71 provided: `<client>.get_whole_dataset(dataset_id, ...)`, `<client>.get_whole_dataframe(dataset_id, ...)`
72 and `<client>.push_dataset_realtime(dataset_id, ...)`.
74 A file cache can be activated for the two `get` methods by setting `file_cache` to `True` or to a path-like (string
75 or `Path`) indicating a custom cache root path. `True` will use the default cache root folder `.odsclient`.
76 `<client>.get_cached_dataset_entry` can be used to get a `CacheEntry` object representing the (possibly
77 non-existing) cache entry for a given dataset.
79 You can customize the `requests.Session` object used for the HTTPS transport using `requests_session`. If you do so,
80 remember to close it yourself or to switch `auto_close_session` to True.
82 A client is meant to use a single api key at a time, or none. You can force the api key to be mandatory using
83 `enforce_apikey=True`. There are 4 ways to pass an api key, they are used in the following order:
85 - explicitly with the `apikey` argument
87 - through a text file containing the key. This file if present should be named `ods.apikey` (name can be changed
88 using `apikey_filepath`, it does not make the file mandatory)
90 - if `keyring` is installed (`pip install keyring`), an apikey can be created as an entry in it for service
91 `<base_url>` and username `'apikey_user'`. `keyring` leverages your OS' vault (Windows Credential Locker,
92 macOS Keychain, etc. This is the **most secure** method available. You can override the default keyring entry
93 username with the `keyring_entries_username=...` argument. You can easily add or remove an entry in the keyring
94 through the OS interface, with the `odskeys` commandline utility (`odskeys --help`) or with the
95 `<client>.store_apikey_in_keyring` / `<client>.get_apikey_from_keyring` / `<client>.remove_apikey_from_keyring`
96 methods.
98 - through the `'ODS_APIKEY'` OS environment variable. It should either contain the key without quotes or a
99 dict-like structure where keys can either be `platform_id`, `base_url`, or the special fallback key `'default'`
101 For debugging purposes, you may wish to use `<client>.get_apikey()` to check if the api key that is actually used
102 is the one you think you have configured through one of the above methods.
104 """
106 def __del__(self):
107 """
108 This is called when the garbage collector deletes this object.
109 Let's use this opportunity to close the requests Session to avoid
110 leaving hanging Sockets, see https://github.com/smarie/python-odsclient/issues/27
111 """
112 if self.auto_close_session and self.session is not None:
113 try:
114 # close the underlying `requests.Session`
115 self.session.close()
116 except Exception as e:
117 warnings.warn("Error while closing session: %r" % e)
119 def __init__(self,
120 platform_id='public', # type: str
121 base_url=None, # type: str
122 enforce_apikey=False, # type: bool
123 apikey=None, # type: str
124 apikey_filepath='ods.apikey', # type: Union[str, Path]
125 use_keyring=True, # type: bool
126 keyring_entries_username=KR_DEFAULT_USERNAME, # type: str
127 requests_session=None, # type: Session
128 auto_close_session=None # type: bool
129 ):
130 """
131 Constructor for `ODSClient`s
133 :param platform_id: the ods platform id to use. This id is used to construct the base URL based on the pattern
134 https://<platform_id>.opendatasoft.com. Default is `'public'` which leads to the base url
135 https://public.opendatasoft.com
136 :param base_url: an explicit base url to use instead of the one generated from `platform_id`
137 :param enforce_apikey: an optional boolean indicating if an error should be raised if no apikey is found at all
138 (not in the explicit argument, not in a file, environment variable, nor keyring) (default `False`)
139 :param apikey: an explicit api key as a string.
140 :param apikey_filepath: the path that should be used to look for api keys on the file system. Such files are
141 optional, other (safer) methods exist to pass the api key, see documentation for details.
142 :param use_keyring: an optional boolean (default `True`) specifying whether the `keyring` library should be
143 used to lookup existing api keys. Keys should be stored using `store_apikey_in_keyring()`.
144 :param keyring_entries_username: keyring stores secrets with a key made of a service id and a username. We use
145 the base url for the service id, however the user name can be anything. By default we use a string:
146 'apikey_user'.
147 :param requests_session: an optional `Session` object to use (from `requests` lib). If `None` is provided,
148 a new `Session` will be used and deleted when this object is garbaged out. If a custom object is provided,
149 you should close it yourself or switch `auto_close_session` to `True` explicitly.
150 :param auto_close_session: an optional boolean indicating if `self.session` should be closed when this object
151 is garbaged out. By default this is `None` and means "`True` if no custom `requests_session` is passed, else
152 `False`"). Turning this to `False` can leave hanging Sockets unclosed.
153 """
154 # keyring option
155 self.use_keyring = use_keyring
156 self.keyring_entries_username = keyring_entries_username
158 # Construct the base url
159 if base_url is not None:
160 if platform_id != 'public' and platform_id is not None: 160 ↛ 161line 160 didn't jump to line 161, because the condition on line 160 was never true
161 raise ValueError("Only one of `platform_id` and `base_url` should be provided. Received "
162 "platform_id='%s' and base_url='%s'" % (platform_id, base_url))
163 # remove trailing slashes
164 while base_url.endswith('/'):
165 base_url = base_url[:-1]
166 self.base_url = base_url
167 self.platform_id = None
168 else:
169 self.platform_id = platform_id
170 self.base_url = ODS_BASE_URL_TEMPLATE % platform_id
172 # Load apikey from file and validate it
173 self.apikey_filepath = apikey_filepath
174 if apikey is not None:
175 # api key passed as argument
176 if apikey_filepath != 'ods.apikey': 176 ↛ 177line 176 didn't jump to line 177, because the condition on line 176 was never true
177 raise ValueError("Only one of `apikey` and custom `apikey_filepath` should be provided.")
178 self.apikey = apikey
180 elif apikey_filepath is not None: 180 ↛ 192line 180 didn't jump to line 192, because the condition on line 180 was never false
181 try:
182 # read the api key from the file
183 with open(str(apikey_filepath)) as f:
184 self.apikey = f.read()
185 except FileNotFoundError:
186 self.apikey = None
187 else:
188 # remove trailing new lines or blanks if any
189 self.apikey = self.apikey.rstrip()
190 else:
191 # no explicit api key. Environment variable will apply
192 self.apikey = None
194 if self.apikey is not None and len(self.apikey) == 0: 194 ↛ 195line 194 didn't jump to line 195, because the condition on line 194 was never true
195 raise ValueError('The provided api key is empty!')
197 # checker flag
198 self.enforce_apikey = enforce_apikey
200 # create and store a session
201 self.session = requests_session or Session()
202 # auto-close behaviour
203 if auto_close_session is None: 203 ↛ 206line 203 didn't jump to line 206, because the condition on line 203 was never false
204 # default: only auto-close if this session was created by us.
205 auto_close_session = requests_session is None
206 self.auto_close_session = auto_close_session
208 def get_whole_dataframe(self,
209 dataset_id, # type: str
210 use_labels_for_header=True, # type: bool
211 tqdm=False, # type: bool
212 block_size=1024, # type: int
213 file_cache=False, # type: bool
214 **other_opts
215 ):
216 """
217 Returns a dataset as a pandas dataframe. pandas must be installed.
219 :param dataset_id:
220 :param use_labels_for_header:
221 :param tqdm: a boolean indicating if a progress bar using tqdm should be displayed. tqdm should be installed
222 :param block_size: an int block size used in streaming mode when tqdm is used
223 :param file_cache: a boolean (default False) indicating whether the file should be written to a local cache
224 `.odsclient/<base_url>_<dataset_id>.<format>`. Or a path-like object with the custom cache root folder.
225 :param other_opts:
226 :return:
227 """
228 try:
229 import pandas as pd
230 except ImportError as e:
231 raise Exception("`get_whole_dataframe` requires `pandas` to be installed. [%s] %s" % (e.__class__, e))
233 # Combine all the options
234 opts = other_opts
235 apikey = self.get_apikey()
236 if apikey is not None: 236 ↛ 237line 236 didn't jump to line 237, because the condition on line 236 was never true
237 opts['apikey'] = apikey
238 if use_labels_for_header is not None: 238 ↛ 242line 238 didn't jump to line 242, because the condition on line 238 was never false
239 opts['use_labels_for_header'] = use_labels_for_header
241 # hardcoded
242 if 'timezone' in opts: 242 ↛ 243line 242 didn't jump to line 243, because the condition on line 242 was never true
243 raise ValueError("'timezone' should not be specified with this method")
244 if 'format' in opts: 244 ↛ 245line 244 didn't jump to line 245, because the condition on line 244 was never true
245 raise ValueError("'format' should not be specified with this method")
246 opts['format'] = format = 'csv'
247 if 'csv_separator' in opts: 247 ↛ 248line 247 didn't jump to line 248, because the condition on line 247 was never true
248 raise ValueError("'csv_separator' should not be specified with this method")
249 opts['csv_separator'] = ';'
251 # Cache usage
252 if file_cache:
253 # it can be a boolean or a path
254 if file_cache is True:
255 file_cache = CACHE_ROOT_FOLDER
256 cached_file = self.get_cached_dataset_entry(dataset_id=dataset_id, format=format, cache_root=file_cache)
257 try:
258 # try to read the cached file in a thread-safe operation
259 with cached_file.rw_lock:
260 cached_file.assert_exists()
261 df = pd.read_csv(str(cached_file.file_path), sep=';')
262 return df
263 except CacheFileNotFoundError:
264 pass # does not exist. continue to query
265 else:
266 cached_file = None
267 del file_cache
269 # The URL to call
270 url = self.get_download_url(dataset_id)
272 # Execute call in stream mode with automatic content-type decoding
273 result = self._http_call(url, params=opts, stream=True, decode=True)
274 # print(iterable_to_stream(result.iter_content()).read())
275 # noinspection PyTypeChecker
277 if tqdm:
278 from tqdm import tqdm as _tqdm
280 total_size = int(result.headers.get('Content-Length', 0))
281 with _tqdm(desc=url, total=total_size,
282 unit='B' if block_size == 1024 else 'it',
283 unit_scale=True,
284 unit_divisor=block_size
285 ) as bar:
286 if not cached_file:
287 # Directly stream to memory with updates of the progress bar
288 df = pd.read_csv(iterable_to_stream(result.iter_content(), buffer_size=block_size, progressbar=bar),
289 sep=';')
290 else:
291 # stream to cache file and read the dataframe from the cache (use the lock to make sure it is here)
292 with cached_file.rw_lock:
293 cached_file.fill_from_iterable(result.iter_content(block_size), it_encoding=result.encoding,
294 progress_bar=bar, lock=False)
295 df = pd.read_csv(str(cached_file.file_path), sep=';')
296 else:
297 if not cached_file:
298 # directly stream to memory dataframe
299 df = pd.read_csv(iterable_to_stream(result.iter_content(block_size)), sep=';')
300 else:
301 # stream to cache file and read the dataframe from the cache (use the lock to make sure it is here)
302 with cached_file.rw_lock:
303 cached_file.fill_from_iterable(result.iter_content(block_size), it_encoding=result.encoding,
304 lock=False)
305 df = pd.read_csv(str(cached_file.file_path), sep=';')
307 return df
309 # noinspection PyShadowingBuiltins
310 def get_whole_dataset(self,
311 dataset_id, # type: str
312 format='csv', # type: str
313 timezone=None, # type: str
314 use_labels_for_header=True, # type: bool
315 csv_separator=';', # type: str
316 tqdm=False, # type: bool
317 to_path=None, # type: Union[str, Path]
318 file_cache=False, # type: bool
319 block_size=1024, # type: int
320 **other_opts
321 ):
322 """
323 Returns a dataset as a csv string.
325 :param dataset_id:
326 :param format:
327 :param timezone:
328 :param use_labels_for_header:
329 :param csv_separator: ';', ','...
330 :param tqdm: a boolean indicating if a progress bar using tqdm should be displayed. tqdm should be installed
331 :param to_path: a string indicating the file path where to write the csv. In that case None is returned
332 :param file_cache: a boolean (default False) indicating whether the file should be written to a local cache
333 `.odsclient/<base_url>_<dataset_id>.<format>`. Or a path-like object with the custom cache root folder.
334 :param block_size: an int block size used in streaming mode when to_csv or tqdm is used
335 :param other_opts:
336 :return:
337 """
339 # ------- To uncomment one day if headers and/or body are needed
340 # headers = {'Authorization': ('Bearer ' + api_key)}
341 #
342 # if not (requestJsonBodyStr is None):
343 # # first encode the string as bytes using the charset
344 # charset = 'utf-8'
345 # json_body_encoded_with_charset = str.encode(requestJsonBodyStr, encoding=charset)
346 # headers['Content-Type'] = 'application/json; charset=' + charset
347 # else:
348 # json_body_encoded_with_charset = None
349 # ------------------
351 # Combine all the options
352 opts = other_opts
353 apikey = self.get_apikey()
354 if apikey is not None:
355 opts['apikey'] = apikey
356 if format is not None: 356 ↛ 358line 356 didn't jump to line 358, because the condition on line 356 was never false
357 opts['format'] = format
358 if timezone is not None: 358 ↛ 359line 358 didn't jump to line 359, because the condition on line 358 was never true
359 opts['timezone'] = timezone
360 if use_labels_for_header is not None: 360 ↛ 362line 360 didn't jump to line 362, because the condition on line 360 was never false
361 opts['use_labels_for_header'] = use_labels_for_header
362 if csv_separator is not None: 362 ↛ 366line 362 didn't jump to line 366, because the condition on line 362 was never false
363 opts['csv_separator'] = csv_separator
365 # The URL to call
366 url = self.get_download_url(dataset_id)
368 # Should we write anything to disk ?
369 # -- Because it is the target
370 if to_path is not None:
371 if isinstance(to_path, string_types): 371 ↛ 372line 371 didn't jump to line 372, because the condition on line 371 was never true
372 to_path = Path(to_path)
373 to_path.parent.mkdir(parents=True, exist_ok=True) # make sure the parents exist
375 # -- Because the cache is used
376 if file_cache:
377 # it can be a boolean or a path
378 if file_cache is True:
379 file_cache = CACHE_ROOT_FOLDER
380 cached_file = self.get_cached_dataset_entry(dataset_id=dataset_id, format=format, cache_root=file_cache)
381 try:
382 # Do NOT call cached_file.exists(): not thread-safe
383 if to_path is None:
384 return cached_file.read() # this is atomic: thread-safe
385 else:
386 cached_file.copy_to_file(to_path) # this is atomic: thread-safe
387 return None
388 except CacheFileNotFoundError:
389 # does not exist. continue to query
390 pass
391 else:
392 cached_file = None
393 del file_cache
395 # Execute call, since no cache was used
396 result = None
397 if not tqdm:
398 if to_path is None:
399 # We need to return a csv string, so load everything in memory
400 result, content_type = self._http_call(url, params=opts, stream=False, decode=True)
402 if cached_file: # cache it in local cache if needed
403 cached_file.fill_from_str(txt_initial_encoding=content_type, decoded_txt=result)
404 else:
405 # No need to return a csv string: stream directly to csv file (no decoding/encoding)
406 r = self._http_call(url, params=opts, stream=True, decode=False)
407 with open(str(to_path), mode='wb') as f:
408 for data in r.iter_content(block_size):
409 f.write(data)
411 if cached_file: # cache it in local cache if needed
412 cached_file.fill_from_file(file_path=to_path, file_encoding=r.encoding)
413 else:
414 # Progress bar is needed: we need streaming mode
415 r = self._http_call(url, params=opts, stream=True, decode=False)
416 total_size = int(r.headers.get('Content-Length', 0))
418 from tqdm import tqdm as _tqdm
419 with _tqdm(desc=url, total=total_size,
420 unit='B' if block_size == 1024 else 'it',
421 unit_scale=True,
422 unit_divisor=block_size
423 ) as bar:
424 if to_path is None:
425 result = io.StringIO() # stream to a string in memory
426 for data in r.iter_content(block_size): # block by block
427 bar.update(len(data)) # - update progress bar
428 result.write(data.decode(r.encoding)) # - decode with proper encoding
429 result = result.getvalue()
431 if cached_file: # cache it in local cache if needed
432 cached_file.fill_from_str(txt_initial_encoding=r.encoding, decoded_txt=result)
433 else:
434 with open(str(to_path), 'wb') as f: # stream to csv file in binary mode
435 for data in r.iter_content(block_size): # block by block
436 bar.update(len(data)) # - update progress bar
437 f.write(data) # - direct copy (no decoding/encoding)
439 if cached_file: # cache it in local cache if needed
440 cached_file.fill_from_file(file_path=to_path, file_encoding=r.encoding)
442 if total_size != 0 and bar.n != total_size: 442 ↛ 443line 442 didn't jump to line 443, because the condition on line 442 was never true
443 raise ValueError("ERROR, something went wrong")
445 return result
447 # noinspection PyShadowingBuiltins
448 def push_dataset_realtime(self,
449 dataset_id, # type: str
450 dataset, # type: Union[str, pandas.DataFrame]
451 push_key, # type: str
452 format='csv', # type: str
453 csv_separator=';', # type: str
454 **other_opts
455 ):
456 """
457 Pushes a Dataset. This functions accepts either a Pandas Dataframe or a CSV string with header included.
459 :param dataset_id:
460 :param dataset: The dataset to push as a list of dicts, where the dict keys are the column names
461 :param push_key: The Push Key provided by the API for pushing this dataset. Warning: This key is independent
462 from the API key. It can be acquired from the Realtime Push API URL section in ODS.
463 :param format: The format of the dataset to be pushed. Can be `pandas` or `csv`.
464 :param csv_separator: CSV separator character in case of a csv dataset input.
465 :returns: HTTP Response status
466 """
468 if format == 'pandas':
469 try:
470 import pandas as pd
471 except ImportError as e:
472 raise Exception("`push_dataset_realtime` with the `pandas` format requires `pandas` to be installed. "
473 "[%s] %s" % (e.__class__, e))
475 if not isinstance(dataset, pd.DataFrame):
476 raise TypeError('If format is set to "pandas" then `dataset` should be a DataFrame. Found %s'
477 % type(dataset))
479 dataset # type:pandas.DataFrame
480 request_body = dataset.to_json(orient='records')
481 elif format == 'csv':
482 try:
483 import csv
484 except ImportError as e:
485 raise Exception("`push_dataset_realtime` with the `csv` format requires `csv` to be installed. [%s] %s"
486 % (e.__class__, e))
487 # noinspection PyStatementEffect
488 dataset # type:str
489 csv_reader = csv.DictReader(StringIO(dataset), delimiter=csv_separator)
490 request_body = dumps([r for r in csv_reader])
491 else:
492 raise ValueError("Dataset format must be either `pandas` or `csv`")
494 # Combine all the options
495 opts = other_opts
496 opts['pushkey'] = push_key
498 # The URL to call
499 url = self.get_realtime_push_url(dataset_id)
501 # Execute call
502 return self._http_call(url, method='post', body=request_body, params=opts, decode=False)
504 def store_apikey_in_keyring(self,
505 apikey=None # type: str
506 ):
507 """
508 Convenience method to store a password in the OS keyring using `keyring` lib.
510 This method is a shortcut for `keyring.set_password(<base_url>, <keyring_entries_username>, <apikey>)`.
512 :param apikey: an explicit apikey string. If not provided, `getpass()` will be used to prompt the user for the
513 api key
514 :return:
515 """
516 import keyring
517 if apikey is None: 517 ↛ 518line 517 didn't jump to line 518, because the condition on line 517 was never true
518 apikey = getpass(prompt="Please enter your api key: ")
520 if apikey is None or len(apikey) == 0: 520 ↛ 521line 520 didn't jump to line 521, because the condition on line 520 was never true
521 raise ValueError("Empty api key provided.")
523 keyring.set_password(self.base_url, self.keyring_entries_username, apikey)
525 def remove_apikey_from_keyring(self):
526 """
527 Convenience method to remove a previously stored password in the OS keyring using `keyring` lib.
529 :return:
530 """
531 import keyring
532 keyring.delete_password(self.base_url, self.keyring_entries_username)
534 def get_apikey_from_keyring(self, ignore_import_errors=False):
535 """
536 Looks for a keyring entry containing the api key and returns it.
537 If not found, returns `None`
539 :param ignore_import_errors: when this is `True`, the method will return `None` if `keyring` is not installed
540 instead of raising an `ImportError`.
541 :return:
542 """
543 if ignore_import_errors:
544 try:
545 import keyring
546 except ImportError as e:
547 # not installed: simply warn instead of raising exception
548 warnings.warn("`keyring` is not installed but the `ODSClient` is configured to use it. You can either"
549 "set `use_keyring=False` explicitly or install keyring to disable this warning. "
550 "Caught: %r" % e)
551 return None
552 else:
553 # do not catch any exception
554 import keyring
556 for _url in (self.base_url, self.base_url + '/'):
557 apikey = keyring.get_password(_url, self.keyring_entries_username)
558 if apikey is not None:
559 return apikey
561 def get_apikey_from_envvar(self):
562 """
563 Looks for the 'ODS_APIKEY' environment variable.
565 - if it does not exist return None
566 - otherwise if the env variable does not begin with '{', consider it as the key
567 - if it begins with '{', loads it as a dict and find a match in it, in the following order:
568 platform_id, base_url, 'default'
570 If the found key is an empty string, a ValueError is raised.
572 :return: the api key found in the 'ODS_APIKEY' env variable (possibly for this platform_id /
573 base_url), or None if it does not exist.
574 """
575 try:
576 env_api_key = os.environ[ENV_ODS_APIKEY]
577 except KeyError:
578 # no env var - return None
579 return None
581 if len(env_api_key) > 0 and env_api_key[0] == '{':
582 # a dictionary: use ast.literal_eval: more permissive than json and as safe.
583 apikeys_dct = literal_eval(env_api_key)
584 if not isinstance(apikeys_dct, dict): 584 ↛ 585line 584 didn't jump to line 585, because the condition on line 584 was never true
585 raise TypeError("Environment variable contains something that is neither a str not a dict")
587 # remove trailing slash in keys
588 def _remove_trailing_slash(k):
589 while k.endswith('/'):
590 k = k[:-1]
591 return k
593 apikeys_dct = {_remove_trailing_slash(k): v for k, v in apikeys_dct.items()}
595 # Try to get a match in the dict: first platform id, then base url, then default
596 if self.platform_id in apikeys_dct: 596 ↛ 597line 596 didn't jump to line 597, because the condition on line 596 was never true
597 env_api_key = apikeys_dct[self.platform_id]
598 elif self.base_url in apikeys_dct:
599 env_api_key = apikeys_dct[self.base_url]
600 elif 'default' in apikeys_dct: 600 ↛ 603line 600 didn't jump to line 603, because the condition on line 600 was never false
601 env_api_key = apikeys_dct['default']
602 else:
603 return None
605 if len(env_api_key) == 0: 605 ↛ 606line 605 didn't jump to line 606, because the condition on line 605 was never true
606 raise ValueError("Empty api key found in '%s' environment variable." % ENV_ODS_APIKEY)
608 return env_api_key
610 def get_apikey(self):
611 """
612 Returns the api key that this client currently uses.
614 :return:
615 """
616 # 1- if there is an overridden api key, use it
617 if self.apikey is not None:
618 return self.apikey
620 # 2- if keyring service is installed and contains an entry, use it
621 if self.use_keyring: 621 ↛ 627line 621 didn't jump to line 627, because the condition on line 621 was never false
622 apikey = self.get_apikey_from_keyring(ignore_import_errors=True)
623 if apikey is not None:
624 return apikey
626 # 3- check existence of the reference environment variable
627 apikey = self.get_apikey_from_envvar()
628 if apikey is not None:
629 return apikey
631 # 4- finally if no key was found, raise an exception if a key was required
632 if self.enforce_apikey:
633 raise NoODSAPIKeyFoundError(self)
635 def get_download_url(self,
636 dataset_id # type: str
637 ):
638 # type: (...) -> str
639 """
641 :param dataset_id:
642 :return:
643 """
644 return "%s/explore/dataset/%s/download/" % (self.base_url, dataset_id)
646 def get_cached_dataset_entry(self,
647 dataset_id, # type: str
648 format, # type: str
649 cache_root=None # type: Union[str, Path]
650 ):
651 # type: (...) -> CacheEntry
652 """
653 Returns a `CacheEntry` for the given dataset
654 :param dataset_id:
655 :param format:
656 :param cache_root:
657 :return:
658 """
659 if self.platform_id is not None:
660 p = self.platform_id
661 else:
662 p = baseurl_to_id_str(self.base_url)
663 return CacheEntry(dataset_id=dataset_id, dataset_format=format, platform_pseudo_id=p, cache_root=cache_root)
665 def get_realtime_push_url(self,
666 dataset_id, # type: str
667 ):
668 # type: (...) -> str
669 """
671 :param dataset_id:
672 :return:
673 """
674 return "%s/api/push/1.0/%s/realtime/push/" % (self.base_url, dataset_id)
676 def _http_call(self,
677 url, # type: str
678 body=None, # type: bytes
679 headers=None, # type: Dict[str, str]
680 method='get', # type: str
681 params=None, # type: Dict[str, str]
682 decode=True, # type: bool
683 stream=False # type: bool
684 ):
685 """
686 Sub-routine for HTTP web service call. If Body is None, a GET is performed
688 :param url:
689 :param body:
690 :param headers:
691 :param method:
692 :param params:
693 :param decode: a boolean (default True) indicating if the contents should be automatically decoded following
694 the content-type encoding received in the HTTP response. If this is True and stream=False (default), the
695 function returns a tuple (body, content type)
696 :param stream:
697 :return: either a tuple (text, encoding) (if stream=False and decode=True), or the response object
698 """
699 try:
700 # Send the request (DO NOT encode the params, this is done automatically)
701 response = self.session.request(method, url, headers=headers, data=body, params=params, stream=stream)
703 # Success ? Read status code, raise an HTTPError if status is error
704 # status = int(response.status_code)
705 response.raise_for_status()
707 # detect a "wrong 200 but true 401" (unauthorized)
708 if 'html' in response.headers['Content-Type']:
709 raise InsufficientRightsForODSResourceError(response.headers, response.text)
711 if not stream:
712 if decode: 712 ↛ 717line 712 didn't jump to line 717, because the condition on line 712 was never false
713 # Contents (encoding is automatically used to read the body when calling response.text)
714 result = response.text
715 return result, response.encoding
716 else:
717 return response
718 else:
719 response.raw.decode_content = decode
720 return response
722 except HTTPError as error:
723 try:
724 body = error.response.text
725 # {
726 # "errorcode": 10002,
727 # "reset_time": "2017-10-17T00:00:00Z",
728 # "limit_time_unit": "day",
729 # "call_limit": 10000,
730 # "error": "Too many requests on the domain. Please contact the domain administrator."
731 # }
732 details = loads(body)
733 except JSONDecodeError:
734 # error parsing the json payload?
735 pass
736 else:
737 raise ODSException(error.response.status_code, error.response.headers, **details)
739 raise error
742class NoODSAPIKeyFoundError(Exception):
743 """
744 Raised when no api key was found (no explicit api key provided, no api key file, no env variable entry, no keyring
745 entry)
746 """
748 def __init__(self,
749 odsclient # type: ODSClient
750 ):
751 self.odsclient = odsclient
753 def __str__(self):
754 return "ODS API key file not found, while it is marked as mandatory for this call (`enforce_apikey=True`). " \
755 "It should either be put in a text file at path '%s', or in the `ODS_APIKEY` OS environment variable, " \
756 "or (recommended, most secure) in the local `keyring`. " \
757 "See documentation for details: %s. Note that you can generate an API key on this web page: " \
758 "%s/account/my-api-keys/." \
759 % (self.odsclient.apikey_filepath, "https://smarie.github.io/python-odsclient/#c-declaring-an-api-key",
760 self.odsclient.base_url)
763class InsufficientRightsForODSResourceError(Exception):
764 """
765 Raised when a HTTP 200 is received from ODS together with an HTML page as body. This happens when api key is
766 missing or does not grant the appropriate rights for the required resource.
767 """
769 def __init__(self, headers, contents):
770 self.headers = headers
771 self.contents = contents
773 def __str__(self):
774 return "An ODS query returned a HTTP 200 (OK) but with a html content-type. This is probably an " \
775 "authentication problem, please check your api key using `get_apikey()`. " \
776 "Headers:\n%s\nResponse:\n%s\n" % (self.headers, self.contents)
779class ODSException(Exception):
780 """
781 An error returned by the ODS API
782 """
784 def __init__(self, status_code, headers, **details):
785 """
787 :param status_code:
788 :param headers:
789 :param details:
790 """
791 super(ODSException, self).__init__()
792 self.status_code = status_code
793 self.headers = headers
794 self.error_msg = details['error']
795 self.details = details
797 def __str__(self):
798 return repr(self)
800 def __repr__(self):
801 return "Request failed (%s): %s\nDetails: %s\nHeaders: %s" % (self.status_code, self.error_msg,
802 self.details, self.headers)
805def create_session_for_fiddler():
806 # type: (...) -> Session
807 return create_session_for_proxy(http_proxyhost='localhost', http_proxyport=8888,
808 use_http_for_https_proxy=True, ssl_verify=False)
811def create_session_for_proxy(http_proxyhost, # type: str
812 http_proxyport, # type: int
813 https_proxyhost=None, # type: str
814 https_proxyport=None, # type: int
815 use_http_for_https_proxy=False, # type: bool
816 ssl_verify=None
817 ):
818 # type: (...) -> Session
819 """
820 Helper method to configure the `requests` package to use the proxy fo your choice and adapt the SSL certificate
821 validation accordingly. Note that this is only if you do not with to use the default configuration (inherited
822 from environment variables, so you can use https://smarie.github.io/develop-behind-proxy/switching/#envswitcher)
824 :param http_proxyhost: mandatory proxy host for http
825 :param http_proxyport: mandatory proxy port for http
826 :param https_proxyhost: optional proxy host for https. If none is provided, http_proxyhost will be used
827 :param https_proxyport: optional proxy port for https. If none is provided, http_proxyport will be used
828 :param use_http_for_https_proxy: optional, if set to true the http protocol will be used to initiate communications
829 with the proxy even for https calls (then calls will be done in https as usual).
830 :param ssl_verify: optional ssl verification parameter. It may either be the path to an additional certificate
831 to trust (recommended), or a boolean to enable (default)/disable (not recommended ! use only in debug mode !)
832 certificate validation.
833 See here for details : http://docs.python-requests.org/en/master/user/advanced/#ssl-cert-verification
834 :return: a requests.Session object that you may use with the rest of the library
835 """
836 # config and fallback
837 https_proxyhost = https_proxyhost if https_proxyhost is not None else http_proxyhost
838 https_proxyport = https_proxyport if https_proxyport is not None else http_proxyport
839 https_proxy_protocol = 'http' if use_http_for_https_proxy else 'https'
841 s = Session()
842 s.proxies = {
843 'http': 'http://%s:%s' % (http_proxyhost, http_proxyport),
844 'https': '%s://%s:%s' % (https_proxy_protocol, https_proxyhost, https_proxyport),
845 }
846 if not (ssl_verify is None):
847 s.verify = ssl_verify
849 # IMPORTANT : otherwise the environment variables will always have precedence over user settings
850 s.trust_env = False
852 return s
855def iterable_to_stream(iterable, buffer_size=io.DEFAULT_BUFFER_SIZE, progressbar=None):
856 """
857 Lets you use an iterable (e.g. a generator) that yields bytestrings as a read-only
858 input stream.
860 The stream implements Python 3's newer I/O API (available in Python 2's io module).
861 For efficiency, the stream is buffered.
863 Source: https://stackoverflow.com/a/20260030/7262247
864 """
866 class IterStream(io.RawIOBase):
867 def __init__(self):
868 self.leftover = None
870 def readable(self):
871 return True
873 def readinto(self, b):
874 try:
875 ln = len(b) # We're supposed to return at most this much
876 chunk = self.leftover or next(iterable)
877 output, self.leftover = chunk[:ln], chunk[ln:]
878 b[:len(output)] = output
879 if progressbar: 879 ↛ 880line 879 didn't jump to line 880, because the condition on line 879 was never true
880 progressbar.update(len(output))
881 return len(output)
882 except StopIteration:
883 return 0 # indicate EOF
885 return io.BufferedReader(IterStream(), buffer_size=buffer_size)
888class CacheFileNotFoundError(FileNotFoundError):
889 pass
892class CacheEntry(object):
893 """
894 Represents a cache entry for a dataset, under `cache_root` (default CACHE_ROOT_FOLDER).
895 It may not exist.
897 Access to the file are thread-safe (atomic) to avoid collisions while the file is updated.
898 """
899 __slots__ = ('dataset_id', 'dataset_format', 'platform_pseudo_id', '_cache_root', 'rw_lock')
901 def __init__(self,
902 dataset_id, # type: str
903 dataset_format, # type: str
904 platform_pseudo_id, # type: str
905 cache_root=None # type: Union[str, Path]
906 ):
907 """Constructor from a dataset id and an optional root cache path"""
908 self.dataset_id = dataset_id
909 self.dataset_format = dataset_format
910 self.platform_pseudo_id = platform_pseudo_id
911 self.rw_lock = Lock()
913 if cache_root is None:
914 self._cache_root = None
915 else:
916 if isinstance(cache_root, string_types): 916 ↛ 918line 916 didn't jump to line 918, because the condition on line 916 was never false
917 cache_root = Path(cache_root)
918 self._cache_root = cache_root
920 def __repr__(self):
921 return "CacheEntry(path='%s')" % self.file_path
923 def exists(self):
924 """Return True if there is a file for this cache entry, at self.file_path. Note that in multithread context
925 this does not ensure that this will remain True at next python code step :) """
926 return self.file_path.exists()
928 @property
929 def cache_root(self):
930 # type: (...) -> Path
931 """The root folder of the cache where this entry sits"""
932 return self._cache_root if self._cache_root is not None else CACHE_ROOT_FOLDER
934 @property
935 def file_path(self):
936 # type: (...) -> Path
937 """The file where this entry sits (it may exist or not)"""
938 return Path("%s/%s/%s.%s" % (self.cache_root, self.platform_pseudo_id, self.dataset_id, self.dataset_format))
940 def assert_exists(self):
941 """Raises an error if the file does not exist"""
942 if not self.exists():
943 raise CacheFileNotFoundError("Cached file entry can not be read as it does not exist: '%s'"
944 % self.file_path)
946 def read(self):
947 # type: (...) -> str
948 """
949 Returns a string read from the cached file.
950 Preserve line endings thanks to newline='' see See https://stackoverflow.com/a/50996542/7262247
951 """
952 with self.rw_lock: # potentially wait for ongoing write/read to be completed, and prevent others to happen
953 self.assert_exists()
954 with self.file_path.open(mode="rt", newline='', encoding=CACHE_ENCODING) as f:
955 result = f.read()
956 return result
958 def copy_to_file(self,
959 file_path # type: Union[str, Path]
960 ):
961 """
962 Copy this cached file to to_path. Note that it will be encoded using CACHE_ENCODING
963 but a warning was already issued at dataset retrieval time if original encoding was different
964 """
965 with self.rw_lock: # potentially wait for ongoing write/read to be completed, and prevent others to happen
966 self.assert_exists()
967 copyfile(str(self.file_path), str(file_path))
969 def delete(self):
970 """
971 Removes this cache entry with thread-safe protection
972 """
973 with self.rw_lock:
974 if not self.exists(): 974 ↛ 975line 974 didn't jump to line 975, because the condition on line 974 was never true
975 warnings.warn("Can not delete file entry: file does not exist: '%s'" % self.file_path)
976 else:
977 os.remove(str(self.file_path))
979 def prepare_for_writing(self):
980 """
981 Makes all parent directories if needed
982 Issues a warning if the file exists and is therefore overridden
983 """
984 if self.exists(): 984 ↛ 985line 984 didn't jump to line 985, because the condition on line 984 was never true
985 warnings.warn("Cached file entry already exists and will be overridden: '%s'" % self.file_path)
987 # make sure the parents exist
988 self.file_path.parent.mkdir(parents=True, exist_ok=True)
990 def fill_from_str(self,
991 txt_initial_encoding, # type: str
992 decoded_txt, # type: str
993 ):
994 """Writes a text string (already decoded) to the cache, according to the cache's encoding
996 If the original encoding is not equal to the cache encoding, a warning is issued.
997 """
998 with self.rw_lock: # potentially wait for ongoing write/read to be completed, and prevent others to happen
999 self.prepare_for_writing()
1000 # Our cache uses utf-8 for all files, in order not to have to remember encodings to read back
1001 if txt_initial_encoding != CACHE_ENCODING: 1001 ↛ 1002line 1001 didn't jump to line 1002, because the condition on line 1001 was never true
1002 self.warn_encoding(original_encoding=txt_initial_encoding, cache_encoding=CACHE_ENCODING)
1004 # copy with the correct encoding
1005 self.file_path.write_bytes(decoded_txt.encode(CACHE_ENCODING))
1007 def _fill_from_it_no_lock(self,
1008 it, # type: Iterable
1009 it_encoding, # type: str
1010 progress_bar=None,
1011 ):
1012 """The no-lock version of fill from iterable"""
1014 self.prepare_for_writing()
1015 if it_encoding == CACHE_ENCODING: 1015 ↛ 1024line 1015 didn't jump to line 1024, because the condition on line 1015 was never false
1016 # no encoding change: direct copy
1017 with open(str(self.file_path), 'wb') as f: # stream to csv file in binary mode
1018 for data in it: # block by block
1019 if progress_bar: 1019 ↛ 1020line 1019 didn't jump to line 1020, because the condition on line 1019 was never true
1020 progress_bar.update(len(data)) # - update progress bar
1021 f.write(data) # - direct copy (no decoding/encoding)
1022 else:
1023 # Our cache uses utf-8 for all files, in order not to have to remember encodings to read back
1024 self.warn_encoding(original_encoding=it_encoding, cache_encoding=CACHE_ENCODING)
1026 # we will need transcoding. Fully stream to memory string and dump to cache and datframe after
1027 csv_str_io = io.StringIO() # stream to a string in memory
1028 for data in it: # block by block
1029 if progress_bar:
1030 progress_bar.update(len(data)) # - update progress bar
1031 csv_str_io.write(data.decode(it_encoding)) # - decode with proper encoding
1032 csv_str = csv_str_io.getvalue()
1034 # store in cache with proper encoding
1035 self.file_path.write_bytes(csv_str.encode(CACHE_ENCODING))
1037 def fill_from_iterable(self,
1038 it, # type: Iterable
1039 it_encoding, # type: str
1040 progress_bar=None,
1041 lock=True
1042 ):
1043 """
1044 Fill this cache entry from an iterable of bytes
1045 :param it:
1046 :param it_encoding:
1047 :param progress_bar:
1048 :return: csv_str if it was streamed to memory in the process (if transcoding was needed)
1049 """
1050 if lock: 1050 ↛ 1051line 1050 didn't jump to line 1051, because the condition on line 1050 was never true
1051 with self.rw_lock: # potentially wait for ongoing write/read to be completed, and prevent others to happen
1052 self._fill_from_it_no_lock(it=it, it_encoding=it_encoding, progress_bar=progress_bar)
1053 else:
1054 self._fill_from_it_no_lock(it=it, it_encoding=it_encoding, progress_bar=progress_bar)
1056 def fill_from_file(self,
1057 file_path, # type: Union[str, Path]
1058 file_encoding, # type: str
1059 ):
1060 """Copies a file to the cache.
1062 If the original encoding is not equal to the cache encoding, conversion happens and a warning is issued.
1063 """
1064 with self.rw_lock: # potentially wait for ongoing write/read to be completed, and prevent others to happen
1065 self.prepare_for_writing()
1066 if file_encoding == CACHE_ENCODING: 1066 ↛ 1071line 1066 didn't jump to line 1071, because the condition on line 1066 was never false
1067 # no encoding change: direct copy
1068 copyfile(str(file_path), str(self.file_path))
1069 else:
1070 # Our cache uses utf-8 for all files, in order not to have to remember encodings to read back
1071 self.warn_encoding(original_encoding=file_encoding, cache_encoding=CACHE_ENCODING)
1072 # read with newline-preserve and with original encoding
1073 with open(str(file_path), mode="rt", newline='', encoding=file_encoding) as f_src:
1074 contents = f_src.read()
1075 # write with the cache encoding
1076 with open(str(self.file_path), mode='wt', encoding=CACHE_ENCODING) as f_dest:
1077 f_dest.write(contents)
1079 def warn_encoding(self, original_encoding, cache_encoding):
1080 """
1081 Issues a warning when the original encoding was different from the one in the cache and a conversion occured
1082 """
1083 warnings.warn(
1084 "[odsclient-cache] Cached file for dataset %r will use %r encoding while original encoding on "
1085 " ODS was %r. This will most probably have no side effects except if your dataset"
1086 " contains characters that can not be encoded in utf-8 such as old/alternative"
1087 " forms of east asian kanji. See https://en.wikipedia.org/wiki/Unicode#Issues"
1088 % (self.dataset_id, cache_encoding, original_encoding))
1091def baseurl_to_id_str(base_url):
1092 """ Transform an ODS platform url into an identifier string usable for example as file/folder name"""
1094 o = urlparse(base_url)
1096 # start with host name
1097 result_str = o.netloc
1099 # simplify the public ODS site
1100 if result_str.endswith(".opendatasoft.com"):
1101 result_str = result_str.replace(".opendatasoft.com", "")
1103 # optionally add custom sub-path
1104 if o.path and o.path != "/":
1105 _path = o.path.replace("/", "_")
1107 # ensure trailing _
1108 if not _path.startswith("_"): 1108 ↛ 1109line 1108 didn't jump to line 1109, because the condition on line 1108 was never true
1109 _path = "_" + _path
1111 # ensure no ending _
1112 if _path.endswith("_"):
1113 _path = _path[:-1]
1115 result_str += _path
1117 return result_str