Source code for requests_ecp.auth

# -*- coding: utf-8 -*-
# Copyright (C) Cardiff University (2020-2022)
#
# This file is part of requests_ecp.
#
# requests_ecp is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# requests_ecp is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with requests_ecp.  If not, see <http://www.gnu.org/licenses/>.

"""Auth plugin for ECP requests.
"""

__author__ = "Duncan Macleod <duncan.macleod@ligo.org>"

from getpass import getpass
from urllib.parse import (
    parse_qs,
    urlparse,
    urlunsplit,
)

from requests import auth as requests_auth

from .ecp import authenticate as ecp_authenticate

GITLAB_AUTH_SHIB_CALLBACK_PATH = "/users/auth/shibboleth/callback"


# -- Auth utilities ---------

def _prompt_username_password(host, username=None):
    """Prompt for a username and password from the console
    """
    if not username:
        username = input("Enter username for {0}: ".format(host))
    password = getpass(
        "Enter password for {0!r} on {1}: ".format(username, host),
    )
    return username, password


# -- Response interception --

def is_ecp_auth_redirect(response):
    """Return `True` if a response indicates a request for ECP authentication.

    Parameters
    ----------
    response : `requests.Response`
        The response object to inspect.

    Returns
    -------
    state : bool
        `True` if ``response`` looks like a redirect initiated by Shibboleth,
        otherwise `False`.
    """
    if not response.is_redirect:
        return False

    # strip out the redirect location and parse it
    target = response.headers['location']
    query = parse_qs(urlparse(target).query)

    return (
        # Identity Provider
        "SAMLRequest" in query
        # Shibboleth discovery service
        or "Shibboleth.sso" in target
    )


def is_gitlab_auth_redirect(response):
    """Return `True` if a response indicates a gitlab auth redirect.

    Parameters
    ----------
    response : `requests.Response`
        The response object to inspect.

    Returns
    -------
    state : bool
        `True` if ``response`` looks like a redirect initiated by GitLab,
        otherwise `False`.
    """
    if not response.is_redirect:
        return False

    # if the redirect cam from the callback, then the callback doesn't work
    # so let's not get stuck in an infinite loop
    if urlparse(response.url).path == GITLAB_AUTH_SHIB_CALLBACK_PATH:
        return False

    # parse the redirect target to get the gitlab host name
    uparts = urlparse(response.headers['location'])

    # if not redirecting to login, this isn't meant for us
    if not uparts.path == "/users/sign_in":
        return False

    # only redirect if there is a _gitlab_session cookie to use later
    for cookie in response.cookies:
        if (
            cookie.name == "_gitlab_session"
            and cookie.domain == uparts.hostname
        ):
            return True


# -- Auth -------------------

def _import_kerberos_auth():
    try:
        from requests_gssapi import HTTPKerberosAuth
    except ModuleNotFoundError as exc:  # pragma: no cover
        try:
            # debian doesn't have requests-gssapi
            from requests_kerberos import HTTPKerberosAuth
        except ModuleNotFoundError:
            # no kerberos interface, display a useful error message
            raise ModuleNotFoundError(
                f"{exc.msg}; you must install requests-gssapi "
                "to use Kerberos auth"
            ) from exc
    return HTTPKerberosAuth


def _kerberos_auth(url):
    """Intialise a `requests_gssapi.HTTPKerberosAuth`.
    """
    HTTPKerberosAuth = _import_kerberos_auth()
    loginhost = urlparse(url).hostname
    return HTTPKerberosAuth(
        force_preemptive=True,
        hostname_override=loginhost,
    )


[docs] class HTTPECPAuth(requests_auth.AuthBase): """SAML2/ECP authorisation plugin for :mod:`requests`. This auth plugin intercepts ``302 Found`` redirect responses that target a `SAMLRequest` authorisation or a `Shibboleth.sso` discovery service, and executes a SAML2/ECP workflow against the configured Identity Provider (``idp``). Kerberos authentication is supported via the `requests GSSAPI <https://github.com/pythongssapi/requests-gssapi>`__ module. >>> from requests import Session >>> from requests_ecp import HTTPECPAuth >>> with Session() as sess: ... sess.auth = HTTPECPAuth(idp="https://idp.example.com/SAML2/SOAP/ECP") ... sess.get("https://private.example.com/data") """ # noqa: E501 def __init__( self, idp, kerberos=False, username=None, password=None, ): #: Address of Identity Provider ECP endpoint. self.idp = idp #: Authentication object to attach to requests made directly #: to the IdP. if kerberos: # raise an ImportError early _import_kerberos_auth() self.kerberos = kerberos self.username = username self.password = password self._idpauth = None #: counter for authentication attemps for a single request self._num_ecp_auth = 0 @staticmethod def _init_auth(idp, kerberos=False, username=None, password=None): if kerberos: return _kerberos_auth( kerberos if isinstance(kerberos, str) else idp, ) elif username and password: return requests_auth.HTTPBasicAuth(username, password) return requests_auth.HTTPBasicAuth(*_prompt_username_password( urlparse(idp).hostname, username, ))
[docs] def reset(self): self._num_ecp_auth = 0
# -- auth method -------- def _authenticate_session( self, session, endpoint=None, url=None, **kwargs, ): """Execute ECP authenticate for a `requests.Session`. """ url = url or endpoint or self.idp self._authenticate(session, url=url) def _authenticate_response(self, response, endpoint=None, **kwargs): """Execute ECP authenticate based on a `requests.Response`. Returns ------- response : `requests.Response` The final response from the service provider that should be a `302 Found` redirect back to the original resource URL. """ response.raw.read() response.raw.release_conn() new = list(self._authenticate( response.connection, endpoint=endpoint, url=response.url, **kwargs, )) r = new.pop(-1) r.history.extend([response] + new) return r def _authenticate( self, connection, endpoint=None, url=None, **kwargs ): """Handle user authentication with ECP. """ if self._idpauth is None: # init auth now self._idpauth = self._init_auth( self.idp, kerberos=self.kerberos, username=self.username, password=self.password, ) # authenticate return ecp_authenticate( connection, self._idpauth, endpoint or self.idp, url=url, **kwargs, ) # -- event handling -----
[docs] def handle_response(self, response, **kwargs): """Handle ECP authentication based on a transation response """ # if we've already tried, don't try again, # otherwise we end up in an infinite loop if self._num_ecp_auth: return response # if the redirect looks like gitlab trying to go through ECP auth, # redirect to the shibboleth callback for gitlab if is_gitlab_auth_redirect(response): # redirect the redirect to the shibboleth callback URL parts = urlparse(response.headers['location']) response.headers['location'] = urlunsplit(( parts.scheme, parts.netloc, GITLAB_AUTH_SHIB_CALLBACK_PATH, parts.query, parts.fragment, )) # if the request was redirected in a way that looks like the SP # is asking for ECP authentication, then handle that here: # (but only do that once) elif is_ecp_auth_redirect(response): # authenticate and return the final redirect response = self._authenticate_response(response, **kwargs) self._num_ecp_auth += 1 return response
[docs] def deregister(self, response): """Deregister the response handler """ response.request.deregister_hook('response', self.handle_response) self.reset()
[docs] def __call__(self, request): """Register the response handler """ self.reset() request.register_hook('response', self.handle_response) return request