⬅ odsclient/core.py source

1 # Authors: Sylvain MARIE <sylvain.marie@se.com>
2 # + All contributors to <https://github.com/smarie/python-odsclient>
3 #
4 # License: 3-clause BSD, <https://github.com/smarie/python-odsclient/blob/master/LICENSE>
5 import warnings
6 from ast import literal_eval
7 from getpass import getpass
8 import io
9 import os
10 from shutil import copyfile
11 from threading import Lock
12  
13 try:
14 # Python 3
15 from urllib.parse import urlparse
16 except ImportError:
17 # Python 2
18 from urlparse import urlparse
19  
20 try:
21 from pathlib import Path
22 except ImportError:
23 from pathlib2 import Path # python 2
24  
25 import sys
26  
27 if sys.version_info < (3,):
28 # py2: we need a version of open that supports encoding and nline endings
29 # See https://stackoverflow.com/a/10975371/7262247
30 from io import open
31 from StringIO import StringIO
32 string_types = (str, bytes)
33 else:
34 from io import StringIO
35 string_types = (str,)
36  
37 try:
38 from json import loads, JSONDecodeError, dumps
39 except ImportError:
40 # python 2
41 from json import loads, dumps
42 JSONDecodeError = ValueError
43 try:
44 FileNotFoundError
45 except NameError:
46 # python 2
47 FileNotFoundError = IOError
48  
49 from requests import Session, HTTPError
50  
51 try:
52 # noinspection PyUnresolvedReferences
53 from typing import Dict, Union, Iterable
54 except ImportError:
55 pass
56  
57 CACHE_ROOT_FOLDER = ".odsclient"
58 CACHE_ENCODING = "utf-8"
59 ODS_BASE_URL_TEMPLATE = "https://%s.opendatasoft.com"
60 ENV_ODS_APIKEY = 'ODS_APIKEY'
61 KR_DEFAULT_USERNAME = 'apikey_user'
62  
63  
64 class ODSClient(object):
65 """
66 An `ODSClient` is a client for a given OpenDataSoft (ODS) platform. By default the target platform base url is
67 `https://<platform_id>.opendatasoft.com` with `platform_id='public'`. One can change either customize the platform
68 id through the `platform_id` constructor argument, or the whole base url with `base_url`.
69  
70 A client instance offers methods to interact with the various ODS API. Currently three high-level methods are
71 provided: `<client>.get_whole_dataset(dataset_id, ...)`, `<client>.get_whole_dataframe(dataset_id, ...)`
72 and `<client>.push_dataset_realtime(dataset_id, ...)`.
73  
74 A file cache can be activated for the two `get` methods by setting `file_cache` to `True` or to a path-like (string
75 or `Path`) indicating a custom cache root path. `True` will use the default cache root folder `.odsclient`.
76 `<client>.get_cached_dataset_entry` can be used to get a `CacheEntry` object representing the (possibly
77 non-existing) cache entry for a given dataset.
78  
79 You can customize the `requests.Session` object used for the HTTPS transport using `requests_session`. If you do so,
80 remember to close it yourself or to switch `auto_close_session` to True.
81  
82 A client is meant to use a single api key at a time, or none. You can force the api key to be mandatory using
83 `enforce_apikey=True`. There are 4 ways to pass an api key, they are used in the following order:
84  
85 - explicitly with the `apikey` argument
86  
87 - through a text file containing the key. This file if present should be named `ods.apikey` (name can be changed
88 using `apikey_filepath`, it does not make the file mandatory)
89  
90 - if `keyring` is installed (`pip install keyring`), an apikey can be created as an entry in it for service
91 `<base_url>` and username `'apikey_user'`. `keyring` leverages your OS' vault (Windows Credential Locker,
92 macOS Keychain, etc. This is the **most secure** method available. You can override the default keyring entry
93 username with the `keyring_entries_username=...` argument. You can easily add or remove an entry in the keyring
94 through the OS interface, with the `odskeys` commandline utility (`odskeys --help`) or with the
95 `<client>.store_apikey_in_keyring` / `<client>.get_apikey_from_keyring` / `<client>.remove_apikey_from_keyring`
96 methods.
97  
98 - through the `'ODS_APIKEY'` OS environment variable. It should either contain the key without quotes or a
99 dict-like structure where keys can either be `platform_id`, `base_url`, or the special fallback key `'default'`
100  
101 For debugging purposes, you may wish to use `<client>.get_apikey()` to check if the api key that is actually used
102 is the one you think you have configured through one of the above methods.
103  
104 """
105  
106 def __del__(self):
107 """
108 This is called when the garbage collector deletes this object.
109 Let's use this opportunity to close the requests Session to avoid
110 leaving hanging Sockets, see https://github.com/smarie/python-odsclient/issues/27
111 """
112 if self.auto_close_session and self.session is not None:
113 try:
114 # close the underlying `requests.Session`
115 self.session.close()
116 except Exception as e:
117 warnings.warn("Error while closing session: %r" % e)
118  
119 def __init__(self,
120 platform_id='public', # type: str
121 base_url=None, # type: str
122 enforce_apikey=False, # type: bool
123 apikey=None, # type: str
124 apikey_filepath='ods.apikey', # type: Union[str, Path]
125 use_keyring=True, # type: bool
126 keyring_entries_username=KR_DEFAULT_USERNAME, # type: str
127 requests_session=None, # type: Session
128 auto_close_session=None # type: bool
129 ):
130 """
131 Constructor for `ODSClient`s
132  
133 :param platform_id: the ods platform id to use. This id is used to construct the base URL based on the pattern
134 https://<platform_id>.opendatasoft.com. Default is `'public'` which leads to the base url
135 https://public.opendatasoft.com
136 :param base_url: an explicit base url to use instead of the one generated from `platform_id`
137 :param enforce_apikey: an optional boolean indicating if an error should be raised if no apikey is found at all
138 (not in the explicit argument, not in a file, environment variable, nor keyring) (default `False`)
139 :param apikey: an explicit api key as a string.
140 :param apikey_filepath: the path that should be used to look for api keys on the file system. Such files are
141 optional, other (safer) methods exist to pass the api key, see documentation for details.
142 :param use_keyring: an optional boolean (default `True`) specifying whether the `keyring` library should be
143 used to lookup existing api keys. Keys should be stored using `store_apikey_in_keyring()`.
144 :param keyring_entries_username: keyring stores secrets with a key made of a service id and a username. We use
145 the base url for the service id, however the user name can be anything. By default we use a string:
146 'apikey_user'.
147 :param requests_session: an optional `Session` object to use (from `requests` lib). If `None` is provided,
148 a new `Session` will be used and deleted when this object is garbaged out. If a custom object is provided,
149 you should close it yourself or switch `auto_close_session` to `True` explicitly.
150 :param auto_close_session: an optional boolean indicating if `self.session` should be closed when this object
151 is garbaged out. By default this is `None` and means "`True` if no custom `requests_session` is passed, else
152 `False`"). Turning this to `False` can leave hanging Sockets unclosed.
153 """
154 # keyring option
155 self.use_keyring = use_keyring
156 self.keyring_entries_username = keyring_entries_username
157  
158 # Construct the base url
159 if base_url is not None:
160 if platform_id != 'public' and platform_id is not None:
161 raise ValueError("Only one of `platform_id` and `base_url` should be provided. Received "
162 "platform_id='%s' and base_url='%s'" % (platform_id, base_url))
163 # remove trailing slashes
164 while base_url.endswith('/'):
165 base_url = base_url[:-1]
166 self.base_url = base_url
167 self.platform_id = None
168 else:
169 self.platform_id = platform_id
170 self.base_url = ODS_BASE_URL_TEMPLATE % platform_id
171  
172 # Load apikey from file and validate it
173 self.apikey_filepath = apikey_filepath
174 if apikey is not None:
175 # api key passed as argument
176 if apikey_filepath != 'ods.apikey':
177 raise ValueError("Only one of `apikey` and custom `apikey_filepath` should be provided.")
178 self.apikey = apikey
179  
180 elif apikey_filepath is not None:
181 try:
182 # read the api key from the file
183 with open(str(apikey_filepath)) as f:
184 self.apikey = f.read()
185 except FileNotFoundError:
186 self.apikey = None
187 else:
188 # remove trailing new lines or blanks if any
189 self.apikey = self.apikey.rstrip()
190 else:
191 # no explicit api key. Environment variable will apply
192 self.apikey = None
193  
194 if self.apikey is not None and len(self.apikey) == 0:
195 raise ValueError('The provided api key is empty!')
196  
197 # checker flag
198 self.enforce_apikey = enforce_apikey
199  
200 # create and store a session
201 self.session = requests_session or Session()
202 # auto-close behaviour
203 if auto_close_session is None:
204 # default: only auto-close if this session was created by us.
205 auto_close_session = requests_session is None
206 self.auto_close_session = auto_close_session
207  
208 def get_whole_dataframe(self,
209 dataset_id, # type: str
210 use_labels_for_header=True, # type: bool
211 tqdm=False, # type: bool
212 block_size=1024, # type: int
213 file_cache=False, # type: bool
214 **other_opts
215 ):
216 """
217 Returns a dataset as a pandas dataframe. pandas must be installed.
218  
219 :param dataset_id:
220 :param use_labels_for_header:
221 :param tqdm: a boolean indicating if a progress bar using tqdm should be displayed. tqdm should be installed
222 :param block_size: an int block size used in streaming mode when tqdm is used
223 :param file_cache: a boolean (default False) indicating whether the file should be written to a local cache
224 `.odsclient/<base_url>_<dataset_id>.<format>`. Or a path-like object with the custom cache root folder.
225 :param other_opts:
226 :return:
227 """
228 try:
229 import pandas as pd
230 except ImportError as e:
231 raise Exception("`get_whole_dataframe` requires `pandas` to be installed. [%s] %s" % (e.__class__, e))
232  
233 # Combine all the options
234 opts = other_opts
235 apikey = self.get_apikey()
236 if apikey is not None:
237 opts['apikey'] = apikey
238 if use_labels_for_header is not None:
239 opts['use_labels_for_header'] = use_labels_for_header
240  
241 # hardcoded
242 if 'timezone' in opts:
243 raise ValueError("'timezone' should not be specified with this method")
244 if 'format' in opts:
245 raise ValueError("'format' should not be specified with this method")
246 opts['format'] = format = 'csv'
247 if 'csv_separator' in opts:
248 raise ValueError("'csv_separator' should not be specified with this method")
249 opts['csv_separator'] = ';'
250  
251 # Cache usage
252 if file_cache:
253 # it can be a boolean or a path
254 if file_cache is True:
255 file_cache = CACHE_ROOT_FOLDER
256 cached_file = self.get_cached_dataset_entry(dataset_id=dataset_id, format=format, cache_root=file_cache)
257 try:
258 # try to read the cached file in a thread-safe operation
259 with cached_file.rw_lock:
260 cached_file.assert_exists()
261 df = pd.read_csv(str(cached_file.file_path), sep=';')
262 return df
263 except CacheFileNotFoundError:
264 pass # does not exist. continue to query
265 else:
266 cached_file = None
267 del file_cache
268  
269 # The URL to call
270 url = self.get_download_url(dataset_id)
271  
272 # Execute call in stream mode with automatic content-type decoding
273 result = self._http_call(url, params=opts, stream=True, decode=True)
274 # print(iterable_to_stream(result.iter_content()).read())
275 # noinspection PyTypeChecker
276  
277 if tqdm:
278 from tqdm import tqdm as _tqdm
279  
280 total_size = int(result.headers.get('Content-Length', 0))
281 with _tqdm(desc=url, total=total_size,
282 unit='B' if block_size == 1024 else 'it',
283 unit_scale=True,
284 unit_divisor=block_size
285 ) as bar:
286 if not cached_file:
287 # Directly stream to memory with updates of the progress bar
288 df = pd.read_csv(iterable_to_stream(result.iter_content(), buffer_size=block_size, progressbar=bar),
289 sep=';')
290 else:
291 # stream to cache file and read the dataframe from the cache (use the lock to make sure it is here)
292 with cached_file.rw_lock:
293 cached_file.fill_from_iterable(result.iter_content(block_size), it_encoding=result.encoding,
294 progress_bar=bar, lock=False)
295 df = pd.read_csv(str(cached_file.file_path), sep=';')
296 else:
297 if not cached_file:
298 # directly stream to memory dataframe
299 df = pd.read_csv(iterable_to_stream(result.iter_content(block_size)), sep=';')
300 else:
301 # stream to cache file and read the dataframe from the cache (use the lock to make sure it is here)
302 with cached_file.rw_lock:
303 cached_file.fill_from_iterable(result.iter_content(block_size), it_encoding=result.encoding,
304 lock=False)
305 df = pd.read_csv(str(cached_file.file_path), sep=';')
306  
307 return df
308  
309 # noinspection PyShadowingBuiltins
310 def get_whole_dataset(self,
311 dataset_id, # type: str
312 format='csv', # type: str
313 timezone=None, # type: str
314 use_labels_for_header=True, # type: bool
315 csv_separator=';', # type: str
316 tqdm=False, # type: bool
317 to_path=None, # type: Union[str, Path]
318 file_cache=False, # type: bool
319 block_size=1024, # type: int
320 **other_opts
321 ):
322 """
323 Returns a dataset as a csv string.
324  
325 :param dataset_id:
326 :param format:
327 :param timezone:
328 :param use_labels_for_header:
329 :param csv_separator: ';', ','...
330 :param tqdm: a boolean indicating if a progress bar using tqdm should be displayed. tqdm should be installed
331 :param to_path: a string indicating the file path where to write the csv. In that case None is returned
332 :param file_cache: a boolean (default False) indicating whether the file should be written to a local cache
333 `.odsclient/<base_url>_<dataset_id>.<format>`. Or a path-like object with the custom cache root folder.
334 :param block_size: an int block size used in streaming mode when to_csv or tqdm is used
335 :param other_opts:
336 :return:
337 """
338  
339 # ------- To uncomment one day if headers and/or body are needed
340 # headers = {'Authorization': ('Bearer ' + api_key)}
341 #
342 # if not (requestJsonBodyStr is None):
343 # # first encode the string as bytes using the charset
344 # charset = 'utf-8'
345 # json_body_encoded_with_charset = str.encode(requestJsonBodyStr, encoding=charset)
346 # headers['Content-Type'] = 'application/json; charset=' + charset
347 # else:
348 # json_body_encoded_with_charset = None
349 # ------------------
350  
351 # Combine all the options
352 opts = other_opts
353 apikey = self.get_apikey()
354 if apikey is not None:
355 opts['apikey'] = apikey
356 if format is not None:
357 opts['format'] = format
358 if timezone is not None:
359 opts['timezone'] = timezone
360 if use_labels_for_header is not None:
361 opts['use_labels_for_header'] = use_labels_for_header
362 if csv_separator is not None:
363 opts['csv_separator'] = csv_separator
364  
365 # The URL to call
366 url = self.get_download_url(dataset_id)
367  
368 # Should we write anything to disk ?
369 # -- Because it is the target
370 if to_path is not None:
371 if isinstance(to_path, string_types):
372 to_path = Path(to_path)
373 to_path.parent.mkdir(parents=True, exist_ok=True) # make sure the parents exist
374  
375 # -- Because the cache is used
376 if file_cache:
377 # it can be a boolean or a path
378 if file_cache is True:
379 file_cache = CACHE_ROOT_FOLDER
380 cached_file = self.get_cached_dataset_entry(dataset_id=dataset_id, format=format, cache_root=file_cache)
381 try:
382 # Do NOT call cached_file.exists(): not thread-safe
383 if to_path is None:
384 return cached_file.read() # this is atomic: thread-safe
385 else:
386 cached_file.copy_to_file(to_path) # this is atomic: thread-safe
387 return None
388 except CacheFileNotFoundError:
389 # does not exist. continue to query
390 pass
391 else:
392 cached_file = None
393 del file_cache
394  
395 # Execute call, since no cache was used
396 result = None
397 if not tqdm:
398 if to_path is None:
399 # We need to return a csv string, so load everything in memory
400 result, content_type = self._http_call(url, params=opts, stream=False, decode=True)
401  
402 if cached_file: # cache it in local cache if needed
403 cached_file.fill_from_str(txt_initial_encoding=content_type, decoded_txt=result)
404 else:
405 # No need to return a csv string: stream directly to csv file (no decoding/encoding)
406 r = self._http_call(url, params=opts, stream=True, decode=False)
407 with open(str(to_path), mode='wb') as f:
408 for data in r.iter_content(block_size):
409 f.write(data)
410  
411 if cached_file: # cache it in local cache if needed
412 cached_file.fill_from_file(file_path=to_path, file_encoding=r.encoding)
413 else:
414 # Progress bar is needed: we need streaming mode
415 r = self._http_call(url, params=opts, stream=True, decode=False)
416 total_size = int(r.headers.get('Content-Length', 0))
417  
418 from tqdm import tqdm as _tqdm
419 with _tqdm(desc=url, total=total_size,
420 unit='B' if block_size == 1024 else 'it',
421 unit_scale=True,
422 unit_divisor=block_size
423 ) as bar:
424 if to_path is None:
425 result = io.StringIO() # stream to a string in memory
426 for data in r.iter_content(block_size): # block by block
427 bar.update(len(data)) # - update progress bar
428 result.write(data.decode(r.encoding)) # - decode with proper encoding
429 result = result.getvalue()
430  
431 if cached_file: # cache it in local cache if needed
432 cached_file.fill_from_str(txt_initial_encoding=r.encoding, decoded_txt=result)
433 else:
434 with open(str(to_path), 'wb') as f: # stream to csv file in binary mode
435 for data in r.iter_content(block_size): # block by block
436 bar.update(len(data)) # - update progress bar
437 f.write(data) # - direct copy (no decoding/encoding)
438  
439 if cached_file: # cache it in local cache if needed
440 cached_file.fill_from_file(file_path=to_path, file_encoding=r.encoding)
441  
442 if total_size != 0 and bar.n != total_size:
443 raise ValueError("ERROR, something went wrong")
444  
445 return result
446  
447 # noinspection PyShadowingBuiltins
448 def push_dataset_realtime(self,
449 dataset_id, # type: str
  • F821 Undefined name 'pandas'
450 dataset, # type: Union[str, pandas.DataFrame]
451 push_key, # type: str
452 format='csv', # type: str
453 csv_separator=';', # type: str
454 **other_opts
455 ):
456 """
457 Pushes a Dataset. This functions accepts either a Pandas Dataframe or a CSV string with header included.
458  
459 :param dataset_id:
460 :param dataset: The dataset to push as a list of dicts, where the dict keys are the column names
461 :param push_key: The Push Key provided by the API for pushing this dataset. Warning: This key is independent
462 from the API key. It can be acquired from the Realtime Push API URL section in ODS.
463 :param format: The format of the dataset to be pushed. Can be `pandas` or `csv`.
464 :param csv_separator: CSV separator character in case of a csv dataset input.
465 :returns: HTTP Response status
466 """
467  
468 if format == 'pandas':
469 try:
470 import pandas as pd
471 except ImportError as e:
472 raise Exception("`push_dataset_realtime` with the `pandas` format requires `pandas` to be installed. "
473 "[%s] %s" % (e.__class__, e))
474  
475 if not isinstance(dataset, pd.DataFrame):
476 raise TypeError('If format is set to "pandas" then `dataset` should be a DataFrame. Found %s'
477 % type(dataset))
478  
  • F821 Undefined name 'pandas'
479 dataset # type:pandas.DataFrame
480 request_body = dataset.to_json(orient='records')
481 elif format == 'csv':
482 try:
483 import csv
484 except ImportError as e:
485 raise Exception("`push_dataset_realtime` with the `csv` format requires `csv` to be installed. [%s] %s"
486 % (e.__class__, e))
487 # noinspection PyStatementEffect
488 dataset # type:str
489 csv_reader = csv.DictReader(StringIO(dataset), delimiter=csv_separator)
490 request_body = dumps([r for r in csv_reader])
491 else:
492 raise ValueError("Dataset format must be either `pandas` or `csv`")
493  
494 # Combine all the options
495 opts = other_opts
496 opts['pushkey'] = push_key
497  
498 # The URL to call
499 url = self.get_realtime_push_url(dataset_id)
500  
501 # Execute call
502 return self._http_call(url, method='post', body=request_body, params=opts, decode=False)
503  
504 def store_apikey_in_keyring(self,
505 apikey=None # type: str
506 ):
507 """
508 Convenience method to store a password in the OS keyring using `keyring` lib.
509  
510 This method is a shortcut for `keyring.set_password(<base_url>, <keyring_entries_username>, <apikey>)`.
511  
512 :param apikey: an explicit apikey string. If not provided, `getpass()` will be used to prompt the user for the
513 api key
514 :return:
515 """
516 import keyring
517 if apikey is None:
518 apikey = getpass(prompt="Please enter your api key: ")
519  
520 if apikey is None or len(apikey) == 0:
521 raise ValueError("Empty api key provided.")
522  
523 keyring.set_password(self.base_url, self.keyring_entries_username, apikey)
524  
525 def remove_apikey_from_keyring(self):
526 """
527 Convenience method to remove a previously stored password in the OS keyring using `keyring` lib.
528  
529 :return:
530 """
531 import keyring
532 keyring.delete_password(self.base_url, self.keyring_entries_username)
533  
534 def get_apikey_from_keyring(self, ignore_import_errors=False):
535 """
536 Looks for a keyring entry containing the api key and returns it.
537 If not found, returns `None`
538  
539 :param ignore_import_errors: when this is `True`, the method will return `None` if `keyring` is not installed
540 instead of raising an `ImportError`.
541 :return:
542 """
543 if ignore_import_errors:
544 try:
545 import keyring
546 except ImportError as e:
547 # not installed: simply warn instead of raising exception
548 warnings.warn("`keyring` is not installed but the `ODSClient` is configured to use it. You can either"
549 "set `use_keyring=False` explicitly or install keyring to disable this warning. "
550 "Caught: %r" % e)
551 return None
552 else:
553 # do not catch any exception
554 import keyring
555  
556 for _url in (self.base_url, self.base_url + '/'):
557 apikey = keyring.get_password(_url, self.keyring_entries_username)
558 if apikey is not None:
559 return apikey
560  
561 def get_apikey_from_envvar(self):
562 """
563 Looks for the 'ODS_APIKEY' environment variable.
564  
565 - if it does not exist return None
566 - otherwise if the env variable does not begin with '{', consider it as the key
567 - if it begins with '{', loads it as a dict and find a match in it, in the following order:
568 platform_id, base_url, 'default'
569  
570 If the found key is an empty string, a ValueError is raised.
571  
572 :return: the api key found in the 'ODS_APIKEY' env variable (possibly for this platform_id /
573 base_url), or None if it does not exist.
574 """
575 try:
576 env_api_key = os.environ[ENV_ODS_APIKEY]
577 except KeyError:
578 # no env var - return None
579 return None
580  
581 if len(env_api_key) > 0 and env_api_key[0] == '{':
582 # a dictionary: use ast.literal_eval: more permissive than json and as safe.
583 apikeys_dct = literal_eval(env_api_key)
584 if not isinstance(apikeys_dct, dict):
585 raise TypeError("Environment variable contains something that is neither a str not a dict")
586  
587 # remove trailing slash in keys
588 def _remove_trailing_slash(k):
589 while k.endswith('/'):
590 k = k[:-1]
591 return k
592  
593 apikeys_dct = {_remove_trailing_slash(k): v for k, v in apikeys_dct.items()}
594  
595 # Try to get a match in the dict: first platform id, then base url, then default
596 if self.platform_id in apikeys_dct:
597 env_api_key = apikeys_dct[self.platform_id]
598 elif self.base_url in apikeys_dct:
599 env_api_key = apikeys_dct[self.base_url]
600 elif 'default' in apikeys_dct:
601 env_api_key = apikeys_dct['default']
602 else:
603 return None
604  
605 if len(env_api_key) == 0:
606 raise ValueError("Empty api key found in '%s' environment variable." % ENV_ODS_APIKEY)
607  
608 return env_api_key
609  
610 def get_apikey(self):
611 """
612 Returns the api key that this client currently uses.
613  
614 :return:
615 """
616 # 1- if there is an overridden api key, use it
617 if self.apikey is not None:
618 return self.apikey
619  
620 # 2- if keyring service is installed and contains an entry, use it
621 if self.use_keyring:
622 apikey = self.get_apikey_from_keyring(ignore_import_errors=True)
623 if apikey is not None:
624 return apikey
625  
626 # 3- check existence of the reference environment variable
627 apikey = self.get_apikey_from_envvar()
628 if apikey is not None:
629 return apikey
630  
631 # 4- finally if no key was found, raise an exception if a key was required
632 if self.enforce_apikey:
633 raise NoODSAPIKeyFoundError(self)
634  
635 def get_download_url(self,
636 dataset_id # type: str
637 ):
638 # type: (...) -> str
639 """
640  
641 :param dataset_id:
642 :return:
643 """
644 return "%s/explore/dataset/%s/download/" % (self.base_url, dataset_id)
645  
646 def get_cached_dataset_entry(self,
647 dataset_id, # type: str
648 format, # type: str
649 cache_root=None # type: Union[str, Path]
650 ):
651 # type: (...) -> CacheEntry
652 """
653 Returns a `CacheEntry` for the given dataset
654 :param dataset_id:
655 :param format:
656 :param cache_root:
657 :return:
658 """
659 if self.platform_id is not None:
660 p = self.platform_id
661 else:
662 p = baseurl_to_id_str(self.base_url)
663 return CacheEntry(dataset_id=dataset_id, dataset_format=format, platform_pseudo_id=p, cache_root=cache_root)
664  
665 def get_realtime_push_url(self,
666 dataset_id, # type: str
667 ):
668 # type: (...) -> str
669 """
670  
671 :param dataset_id:
672 :return:
673 """
674 return "%s/api/push/1.0/%s/realtime/push/" % (self.base_url, dataset_id)
675  
676 def _http_call(self,
677 url, # type: str
678 body=None, # type: bytes
679 headers=None, # type: Dict[str, str]
680 method='get', # type: str
681 params=None, # type: Dict[str, str]
682 decode=True, # type: bool
683 stream=False # type: bool
684 ):
685 """
686 Sub-routine for HTTP web service call. If Body is None, a GET is performed
687  
688 :param url:
689 :param body:
690 :param headers:
691 :param method:
692 :param params:
693 :param decode: a boolean (default True) indicating if the contents should be automatically decoded following
694 the content-type encoding received in the HTTP response. If this is True and stream=False (default), the
695 function returns a tuple (body, content type)
696 :param stream:
697 :return: either a tuple (text, encoding) (if stream=False and decode=True), or the response object
698 """
699 try:
700 # Send the request (DO NOT encode the params, this is done automatically)
701 response = self.session.request(method, url, headers=headers, data=body, params=params, stream=stream)
702  
703 # Success ? Read status code, raise an HTTPError if status is error
704 # status = int(response.status_code)
705 response.raise_for_status()
706  
707 # detect a "wrong 200 but true 401" (unauthorized)
708 if 'html' in response.headers['Content-Type']:
709 raise InsufficientRightsForODSResourceError(response.headers, response.text)
710  
711 if not stream:
712 if decode:
713 # Contents (encoding is automatically used to read the body when calling response.text)
714 result = response.text
715 return result, response.encoding
716 else:
717 return response
718 else:
719 response.raw.decode_content = decode
720 return response
721  
722 except HTTPError as error:
723 try:
724 body = error.response.text
725 # {
726 # "errorcode": 10002,
727 # "reset_time": "2017-10-17T00:00:00Z",
728 # "limit_time_unit": "day",
729 # "call_limit": 10000,
730 # "error": "Too many requests on the domain. Please contact the domain administrator."
731 # }
732 details = loads(body)
733 except JSONDecodeError:
734 # error parsing the json payload?
735 pass
736 else:
737 raise ODSException(error.response.status_code, error.response.headers, **details)
738  
739 raise error
740  
741  
742 class NoODSAPIKeyFoundError(Exception):
743 """
744 Raised when no api key was found (no explicit api key provided, no api key file, no env variable entry, no keyring
745 entry)
746 """
747  
748 def __init__(self,
749 odsclient # type: ODSClient
750 ):
751 self.odsclient = odsclient
752  
753 def __str__(self):
754 return "ODS API key file not found, while it is marked as mandatory for this call (`enforce_apikey=True`). " \
755 "It should either be put in a text file at path '%s', or in the `ODS_APIKEY` OS environment variable, " \
756 "or (recommended, most secure) in the local `keyring`. " \
757 "See documentation for details: %s. Note that you can generate an API key on this web page: " \
758 "%s/account/my-api-keys/." \
759 % (self.odsclient.apikey_filepath, "https://smarie.github.io/python-odsclient/#c-declaring-an-api-key",
760 self.odsclient.base_url)
761  
762  
763 class InsufficientRightsForODSResourceError(Exception):
764 """
765 Raised when a HTTP 200 is received from ODS together with an HTML page as body. This happens when api key is
766 missing or does not grant the appropriate rights for the required resource.
767 """
768  
769 def __init__(self, headers, contents):
770 self.headers = headers
771 self.contents = contents
772  
773 def __str__(self):
774 return "An ODS query returned a HTTP 200 (OK) but with a html content-type. This is probably an " \
775 "authentication problem, please check your api key using `get_apikey()`. " \
776 "Headers:\n%s\nResponse:\n%s\n" % (self.headers, self.contents)
777  
778  
779 class ODSException(Exception):
780 """
781 An error returned by the ODS API
782 """
783  
784 def __init__(self, status_code, headers, **details):
785 """
786  
787 :param status_code:
788 :param headers:
789 :param details:
790 """
791 super(ODSException, self).__init__()
792 self.status_code = status_code
793 self.headers = headers
794 self.error_msg = details['error']
795 self.details = details
796  
797 def __str__(self):
798 return repr(self)
799  
800 def __repr__(self):
801 return "Request failed (%s): %s\nDetails: %s\nHeaders: %s" % (self.status_code, self.error_msg,
802 self.details, self.headers)
803  
804  
805 def create_session_for_fiddler():
806 # type: (...) -> Session
807 return create_session_for_proxy(http_proxyhost='localhost', http_proxyport=8888,
808 use_http_for_https_proxy=True, ssl_verify=False)
809  
810  
811 def create_session_for_proxy(http_proxyhost, # type: str
812 http_proxyport, # type: int
813 https_proxyhost=None, # type: str
814 https_proxyport=None, # type: int
815 use_http_for_https_proxy=False, # type: bool
816 ssl_verify=None
817 ):
818 # type: (...) -> Session
819 """
820 Helper method to configure the `requests` package to use the proxy fo your choice and adapt the SSL certificate
821 validation accordingly. Note that this is only if you do not with to use the default configuration (inherited
822 from environment variables, so you can use https://smarie.github.io/develop-behind-proxy/switching/#envswitcher)
823  
824 :param http_proxyhost: mandatory proxy host for http
825 :param http_proxyport: mandatory proxy port for http
826 :param https_proxyhost: optional proxy host for https. If none is provided, http_proxyhost will be used
827 :param https_proxyport: optional proxy port for https. If none is provided, http_proxyport will be used
828 :param use_http_for_https_proxy: optional, if set to true the http protocol will be used to initiate communications
829 with the proxy even for https calls (then calls will be done in https as usual).
830 :param ssl_verify: optional ssl verification parameter. It may either be the path to an additional certificate
831 to trust (recommended), or a boolean to enable (default)/disable (not recommended ! use only in debug mode !)
832 certificate validation.
833 See here for details : http://docs.python-requests.org/en/master/user/advanced/#ssl-cert-verification
834 :return: a requests.Session object that you may use with the rest of the library
835 """
836 # config and fallback
837 https_proxyhost = https_proxyhost if https_proxyhost is not None else http_proxyhost
838 https_proxyport = https_proxyport if https_proxyport is not None else http_proxyport
839 https_proxy_protocol = 'http' if use_http_for_https_proxy else 'https'
840  
841 s = Session()
842 s.proxies = {
843 'http': 'http://%s:%s' % (http_proxyhost, http_proxyport),
844 'https': '%s://%s:%s' % (https_proxy_protocol, https_proxyhost, https_proxyport),
845 }
846 if not (ssl_verify is None):
847 s.verify = ssl_verify
848  
849 # IMPORTANT : otherwise the environment variables will always have precedence over user settings
850 s.trust_env = False
851  
852 return s
853  
854  
855 def iterable_to_stream(iterable, buffer_size=io.DEFAULT_BUFFER_SIZE, progressbar=None):
856 """
857 Lets you use an iterable (e.g. a generator) that yields bytestrings as a read-only
858 input stream.
859  
860 The stream implements Python 3's newer I/O API (available in Python 2's io module).
861 For efficiency, the stream is buffered.
862  
863 Source: https://stackoverflow.com/a/20260030/7262247
864 """
865  
866 class IterStream(io.RawIOBase):
867 def __init__(self):
868 self.leftover = None
869  
870 def readable(self):
871 return True
872  
873 def readinto(self, b):
874 try:
875 ln = len(b) # We're supposed to return at most this much
876 chunk = self.leftover or next(iterable)
877 output, self.leftover = chunk[:ln], chunk[ln:]
878 b[:len(output)] = output
879 if progressbar:
880 progressbar.update(len(output))
881 return len(output)
882 except StopIteration:
883 return 0 # indicate EOF
884  
885 return io.BufferedReader(IterStream(), buffer_size=buffer_size)
886  
887  
888 class CacheFileNotFoundError(FileNotFoundError):
889 pass
890  
891  
892 class CacheEntry(object):
893 """
894 Represents a cache entry for a dataset, under `cache_root` (default CACHE_ROOT_FOLDER).
895 It may not exist.
896  
897 Access to the file are thread-safe (atomic) to avoid collisions while the file is updated.
898 """
899 __slots__ = ('dataset_id', 'dataset_format', 'platform_pseudo_id', '_cache_root', 'rw_lock')
900  
901 def __init__(self,
902 dataset_id, # type: str
903 dataset_format, # type: str
904 platform_pseudo_id, # type: str
905 cache_root=None # type: Union[str, Path]
906 ):
907 """Constructor from a dataset id and an optional root cache path"""
908 self.dataset_id = dataset_id
909 self.dataset_format = dataset_format
910 self.platform_pseudo_id = platform_pseudo_id
911 self.rw_lock = Lock()
912  
913 if cache_root is None:
914 self._cache_root = None
915 else:
916 if isinstance(cache_root, string_types):
917 cache_root = Path(cache_root)
918 self._cache_root = cache_root
919  
920 def __repr__(self):
921 return "CacheEntry(path='%s')" % self.file_path
922  
923 def exists(self):
924 """Return True if there is a file for this cache entry, at self.file_path. Note that in multithread context
925 this does not ensure that this will remain True at next python code step :) """
926 return self.file_path.exists()
927  
928 @property
929 def cache_root(self):
930 # type: (...) -> Path
931 """The root folder of the cache where this entry sits"""
932 return self._cache_root if self._cache_root is not None else CACHE_ROOT_FOLDER
933  
934 @property
935 def file_path(self):
936 # type: (...) -> Path
937 """The file where this entry sits (it may exist or not)"""
938 return Path("%s/%s/%s.%s" % (self.cache_root, self.platform_pseudo_id, self.dataset_id, self.dataset_format))
939  
940 def assert_exists(self):
941 """Raises an error if the file does not exist"""
942 if not self.exists():
943 raise CacheFileNotFoundError("Cached file entry can not be read as it does not exist: '%s'"
944 % self.file_path)
945  
946 def read(self):
947 # type: (...) -> str
948 """
949 Returns a string read from the cached file.
950 Preserve line endings thanks to newline='' see See https://stackoverflow.com/a/50996542/7262247
951 """
952 with self.rw_lock: # potentially wait for ongoing write/read to be completed, and prevent others to happen
953 self.assert_exists()
954 with self.file_path.open(mode="rt", newline='', encoding=CACHE_ENCODING) as f:
955 result = f.read()
956 return result
957  
958 def copy_to_file(self,
959 file_path # type: Union[str, Path]
960 ):
961 """
962 Copy this cached file to to_path. Note that it will be encoded using CACHE_ENCODING
963 but a warning was already issued at dataset retrieval time if original encoding was different
964 """
965 with self.rw_lock: # potentially wait for ongoing write/read to be completed, and prevent others to happen
966 self.assert_exists()
967 copyfile(str(self.file_path), str(file_path))
968  
969 def delete(self):
970 """
971 Removes this cache entry with thread-safe protection
972 """
973 with self.rw_lock:
974 if not self.exists():
975 warnings.warn("Can not delete file entry: file does not exist: '%s'" % self.file_path)
976 else:
977 os.remove(str(self.file_path))
978  
979 def prepare_for_writing(self):
980 """
981 Makes all parent directories if needed
982 Issues a warning if the file exists and is therefore overridden
983 """
984 if self.exists():
985 warnings.warn("Cached file entry already exists and will be overridden: '%s'" % self.file_path)
986  
987 # make sure the parents exist
988 self.file_path.parent.mkdir(parents=True, exist_ok=True)
989  
990 def fill_from_str(self,
991 txt_initial_encoding, # type: str
992 decoded_txt, # type: str
993 ):
994 """Writes a text string (already decoded) to the cache, according to the cache's encoding
995  
996 If the original encoding is not equal to the cache encoding, a warning is issued.
997 """
998 with self.rw_lock: # potentially wait for ongoing write/read to be completed, and prevent others to happen
999 self.prepare_for_writing()
1000 # Our cache uses utf-8 for all files, in order not to have to remember encodings to read back
1001 if txt_initial_encoding != CACHE_ENCODING:
1002 self.warn_encoding(original_encoding=txt_initial_encoding, cache_encoding=CACHE_ENCODING)
1003  
1004 # copy with the correct encoding
1005 self.file_path.write_bytes(decoded_txt.encode(CACHE_ENCODING))
1006  
1007 def _fill_from_it_no_lock(self,
1008 it, # type: Iterable
1009 it_encoding, # type: str
1010 progress_bar=None,
1011 ):
1012 """The no-lock version of fill from iterable"""
1013  
1014 self.prepare_for_writing()
1015 if it_encoding == CACHE_ENCODING:
1016 # no encoding change: direct copy
1017 with open(str(self.file_path), 'wb') as f: # stream to csv file in binary mode
1018 for data in it: # block by block
1019 if progress_bar:
1020 progress_bar.update(len(data)) # - update progress bar
1021 f.write(data) # - direct copy (no decoding/encoding)
1022 else:
1023 # Our cache uses utf-8 for all files, in order not to have to remember encodings to read back
1024 self.warn_encoding(original_encoding=it_encoding, cache_encoding=CACHE_ENCODING)
1025  
1026 # we will need transcoding. Fully stream to memory string and dump to cache and datframe after
1027 csv_str_io = io.StringIO() # stream to a string in memory
1028 for data in it: # block by block
1029 if progress_bar:
1030 progress_bar.update(len(data)) # - update progress bar
1031 csv_str_io.write(data.decode(it_encoding)) # - decode with proper encoding
1032 csv_str = csv_str_io.getvalue()
1033  
1034 # store in cache with proper encoding
1035 self.file_path.write_bytes(csv_str.encode(CACHE_ENCODING))
1036  
1037 def fill_from_iterable(self,
1038 it, # type: Iterable
1039 it_encoding, # type: str
1040 progress_bar=None,
1041 lock=True
1042 ):
1043 """
1044 Fill this cache entry from an iterable of bytes
1045 :param it:
1046 :param it_encoding:
1047 :param progress_bar:
1048 :return: csv_str if it was streamed to memory in the process (if transcoding was needed)
1049 """
1050 if lock:
1051 with self.rw_lock: # potentially wait for ongoing write/read to be completed, and prevent others to happen
1052 self._fill_from_it_no_lock(it=it, it_encoding=it_encoding, progress_bar=progress_bar)
1053 else:
1054 self._fill_from_it_no_lock(it=it, it_encoding=it_encoding, progress_bar=progress_bar)
1055  
1056 def fill_from_file(self,
1057 file_path, # type: Union[str, Path]
1058 file_encoding, # type: str
1059 ):
1060 """Copies a file to the cache.
1061  
1062 If the original encoding is not equal to the cache encoding, conversion happens and a warning is issued.
1063 """
1064 with self.rw_lock: # potentially wait for ongoing write/read to be completed, and prevent others to happen
1065 self.prepare_for_writing()
1066 if file_encoding == CACHE_ENCODING:
1067 # no encoding change: direct copy
1068 copyfile(str(file_path), str(self.file_path))
1069 else:
1070 # Our cache uses utf-8 for all files, in order not to have to remember encodings to read back
1071 self.warn_encoding(original_encoding=file_encoding, cache_encoding=CACHE_ENCODING)
1072 # read with newline-preserve and with original encoding
1073 with open(str(file_path), mode="rt", newline='', encoding=file_encoding) as f_src:
1074 contents = f_src.read()
1075 # write with the cache encoding
1076 with open(str(self.file_path), mode='wt', encoding=CACHE_ENCODING) as f_dest:
1077 f_dest.write(contents)
1078  
1079 def warn_encoding(self, original_encoding, cache_encoding):
1080 """
1081 Issues a warning when the original encoding was different from the one in the cache and a conversion occured
1082 """
1083 warnings.warn(
1084 "[odsclient-cache] Cached file for dataset %r will use %r encoding while original encoding on "
1085 " ODS was %r. This will most probably have no side effects except if your dataset"
1086 " contains characters that can not be encoded in utf-8 such as old/alternative"
1087 " forms of east asian kanji. See https://en.wikipedia.org/wiki/Unicode#Issues"
1088 % (self.dataset_id, cache_encoding, original_encoding))
1089  
1090  
1091 def baseurl_to_id_str(base_url):
1092 """ Transform an ODS platform url into an identifier string usable for example as file/folder name"""
1093  
1094 o = urlparse(base_url)
1095  
1096 # start with host name
1097 result_str = o.netloc
1098  
1099 # simplify the public ODS site
1100 if result_str.endswith(".opendatasoft.com"):
1101 result_str = result_str.replace(".opendatasoft.com", "")
1102  
1103 # optionally add custom sub-path
1104 if o.path and o.path != "/":
1105 _path = o.path.replace("/", "_")
1106  
1107 # ensure trailing _
1108 if not _path.startswith("_"):
1109 _path = "_" + _path
1110  
1111 # ensure no ending _
1112 if _path.endswith("_"):
1113 _path = _path[:-1]
1114  
1115 result_str += _path
1116  
1117 return result_str