Source code for gen3.auth
import json
from requests.auth import AuthBase
import requests
class Gen3AuthError(Exception):
pass
[docs]class Gen3Auth(AuthBase):
"""Gen3 auth helper class for use with requests auth.
Implements requests.auth.AuthBase in order to support JWT authentication.
Generates access tokens from the provided refresh token file or string.
Automatically refreshes access tokens when they expire.
Args:
endpoint (str): The URL of the data commons.
refresh_file (str): The file containing the downloaded json web token.
refresh_token (str): The json web token.
Examples:
This generates the Gen3Auth class pointed at the sandbox commons while
using the credentials.json downloaded from the commons profile page.
>>> auth = Gen3Auth("https://nci-crdc-demo.datacommons.io", refresh_file="credentials.json")
"""
def __init__(self, endpoint, refresh_file=None, refresh_token=None):
if not refresh_file and not refresh_token:
raise ValueError(
"Either parameter 'refresh_file' or parameter 'refresh_token' must be specified."
)
if refresh_file and refresh_token:
raise ValueError(
"Only one of 'refresh_file' and 'refresh_token' must be specified."
)
self._refresh_file = refresh_file
self._refresh_token = refresh_token
if refresh_file:
try:
file_data = open(self._refresh_file).read()
self._refresh_token = json.loads(file_data)
except Exception as e:
raise ValueError(
"Couldn't load your refresh token file: {}\n{}".format(
self._refresh_file, str(e)
)
)
self._access_token = None
self._endpoint = endpoint
def __call__(self, request):
"""Adds authorization header to the request
This gets called by the python.requests package on outbound requests
so that authentication can be added.
Args:
request (object): The incoming request object
"""
request.headers["Authorization"] = self._get_auth_value()
request.register_hook("response", self._handle_401)
return request
def _handle_401(self, response, **kwargs):
"""Handles failed requests when authorization failed.
This gets called after a failed request when an HTTP 401 error
occurs. This then tries to refresh the access token in the event
that it expired.
Args:
request (object): The failed request object
"""
if not response.status_code == 401 and not response.status_code == 403:
return response
# Free the original connection
response.content
response.close()
# copy the request to resend
newreq = response.request.copy()
self._access_token = None
newreq.headers["Authorization"] = self._get_auth_value()
_response = response.connection.send(newreq, **kwargs)
_response.history.append(response)
_response.request = newreq
return _response
def _get_auth_value(self):
"""Returns the Authorization header value for the request
This gets called when added the Authorization header to the request.
This fetches the access token from the refresh token if the access token is missing.
"""
if not self._access_token:
auth_url = "{}/user/credentials/cdis/access_token".format(self._endpoint)
try:
self._access_token = requests.post(
auth_url, json=self._refresh_token
).json()["access_token"]
except Exception as e:
raise Gen3AuthError(
"Failed to authenticate to {}\n{}".format(auth_url, str(e))
)
return "Bearer " + self._access_token