Logo Search packages:      
Sourcecode: ubuntu-sso-client version File versions  Download package

main.py

#!/usr/bin/python

# ubuntu_sso.main - main login handling interface
#
# Author: Stuart Langridge <stuart.langridge@canonical.com>
#
# Copyright 2009 Canonical Ltd.
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License version 3, as published
# by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranties of
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
# PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.
"""OAuth login handler.

A command-line utility which accepts requests for OAuth login over D-Bus,
handles the OAuth process (including adding the OAuth access token to the
gnome keyring), and then alerts the calling app (and others) with a D-Bus
signal so they can retrieve the new token.
"""

import dbus.service, urlparse, time, gobject
import pynotify

from dbus.mainloop.glib import DBusGMainLoop

from ubuntu_sso import DBUS_IFACE_AUTH_NAME
from ubuntu_sso.config import get_config

from ubuntu_sso.logger import setupLogging
logger = setupLogging("ubuntu_sso.main")

DBusGMainLoop(set_as_default=True)

# Disable the invalid name warning, as we have a lot of DBus style names
# pylint: disable-msg=C0103

00044 class NoDefaultConfigError(Exception):
    """No default section in configuration file"""
    pass

00048 class BadRealmError(Exception):
    """Realm must be a URL"""
    pass

00052 class LoginProcessor:
    """Actually do the work of processing passed parameters"""
00054     def __init__(self, dbus_object, use_libnotify=True):
        """Initialize the login processor."""
        logger.debug("Creating a LoginProcessor")
        self.use_libnotify = use_libnotify
        if self.use_libnotify and pynotify:
            logger.debug("Hooking libnotify")
            pynotify.init("UbuntuOne Login")
        self.note1 = None
        self.realm = None
        self.consumer_key = None
        self.dbus_object = dbus_object
        logger.debug("Getting configuration")
        self.config = get_config()

00068     def login(self, realm, consumer_key, do_login=True):
        """Initiate an OAuth login"""
        logger.debug("Initiating OAuth login in LoginProcessor")
        self.realm = str(realm) # because they are dbus.Strings, not str
        self.consumer_key = str(consumer_key)

        logger.debug("Obtaining OAuth urls")
        (request_token_url, user_authorisation_url,
          access_token_url, consumer_secret) =  self.get_config_urls(realm)
        logger.debug("OAuth URLs are: request='%s', userauth='%s', " +\
                     "access='%s', secret='%s'", request_token_url,
                     user_authorisation_url, access_token_url, consumer_secret)

        from ubuntu_sso.auth import AuthorisationClient
        client = AuthorisationClient(self.realm,
                                     request_token_url,
                                     user_authorisation_url,
                                     access_token_url, self.consumer_key,
                                     consumer_secret,
                                     callback_parent=self.got_token,
                                     callback_denied=self.got_denial,
                                     callback_notoken=self.got_no_token,
                                     callback_error=self.got_error,
                                     do_login=do_login)

        logger.debug("Calling auth.client.ensure_access_token in thread")
        gobject.timeout_add_seconds(1, client.ensure_access_token)

00096     def clear_token(self, realm, consumer_key):
        """Remove the currently stored OAuth token from the keyring."""
        self.realm = str(realm)
        self.consumer_key = str(consumer_key)
        (request_token_url, user_authorisation_url,
          access_token_url, consumer_secret) =  self.get_config_urls(self.realm)
        from ubuntu_sso.auth import AuthorisationClient
        client = AuthorisationClient(self.realm,
                                     request_token_url,
                                     user_authorisation_url,
                                     access_token_url,
                                     self.consumer_key, consumer_secret,
                                     callback_parent=self.got_token,
                                     callback_denied=self.got_denial,
                                     callback_notoken=self.got_no_token,
                                     callback_error=self.got_error)
        gobject.timeout_add_seconds(1, client.clear_token)

00114     def error_handler(self, failure):
        """Deal with errors returned from auth process"""
        logger.debug("Error returned from auth process")
        self.dbus_object.currently_authing = False # not block future requests

00119     def get_config_urls(self, realm):
        """Look up the URLs to use in the config file"""
        logger.debug("Fetching config URLs for realm='%s'", realm)
        if self.config.has_section(realm):
            logger.debug("Realm '%s' is in config", realm)
            request_token_url = self.__get_url(realm, "request_token_url")
            user_authorisation_url = self.__get_url(realm,
              "user_authorisation_url")
            access_token_url = self.__get_url(realm, "access_token_url")
            consumer_secret = self.__get_option(realm, "consumer_secret")
        elif realm.startswith("http://localhost") and \
          self.config.has_section("http://localhost"):
            logger.debug("Realm is localhost and is in config")
            request_token_url = self.__get_url("http://localhost",
              "request_token_url", realm)
            user_authorisation_url = self.__get_url("http://localhost",
              "user_authorisation_url", realm)
            access_token_url = self.__get_url("http://localhost",
              "access_token_url", realm)
            consumer_secret = self.__get_option("http://localhost",
              "consumer_secret")
        elif self.is_valid_url(realm):
            logger.debug("Realm '%s' is not in config", realm)
            request_token_url = self.__get_url("default",
              "request_token_url", realm)
            user_authorisation_url = self.__get_url("default",
              "user_authorisation_url", realm)
            access_token_url = self.__get_url("default",
              "access_token_url", realm)
            consumer_secret = self.__get_option(realm, "consumer_secret")
        else:
            logger.debug("Realm '%s' is a bad realm", realm)
            raise BadRealmError
        return (request_token_url, user_authorisation_url,
                access_token_url, consumer_secret)

00155     def is_valid_url(self, url):
        """Simple check for URL validity"""
        # pylint: disable-msg=W0612
        scheme, netloc, path, query, fragment = urlparse.urlsplit(url)
        if scheme and netloc:
            return True
        else:
            return False

00164     def got_token(self, access_token):
        """Callback function when access token has been retrieved"""
        logger.debug("Token retrieved, calling NewCredentials function")
        self.dbus_object.NewCredentials(self.realm, self.consumer_key)

00169     def got_denial(self):
        """Callback function when request token has been denied"""
        self.dbus_object.AuthorizationDenied()

00173     def got_no_token(self):
        """Callback function when access token is not in keyring."""
        self.dbus_object.NoCredentials()

00177     def got_error(self, message):
        """Callback function to emit an error message over DBus."""
        self.dbus_object.OAuthError(message)

00181     def __get_url(self, realm, option, actual_realm=None):
        """Construct a full URL from realm and a URLpath for that realm in
           the config file."""
        if actual_realm:
            realm_to_use = actual_realm
        else:
            realm_to_use = realm
        urlstub = self.__get_option(realm, option)
        return urlparse.urljoin(realm_to_use, urlstub)

00191     def __get_option(self, realm, option):
        """Return a specific option for that realm in
           the config file. If the realm does not exist in the config file,
           fall back to the [default] section."""
        if self.config.has_section(realm) and \
           self.config.has_option(realm, option):
            urlstub = self.config.get(realm, option)
            return urlstub

        # either the realm exists and this url does not, or
        # the realm doesn't exist; either way, fall back to [default] section
        urlstub = self.config.get("default", option, None)
        if urlstub is not None:
            return urlstub

        # this url does not exist in default section either
        # this shouldn't happen
        raise NoDefaultConfigError("No default configuration for %s" % option)


00211 class Login(dbus.service.Object):
    """Object which listens for D-Bus OAuth requests"""
00213     def __init__(self, bus_name):
        """Initiate the Login object."""
        dbus.service.Object.__init__(self, object_path="/", bus_name=bus_name)
        self.processor = LoginProcessor(self)
        self.currently_authing = False
        logger.debug("Login D-Bus service starting up")

    @dbus.service.method(dbus_interface=DBUS_IFACE_AUTH_NAME,
                                           in_signature='ss', out_signature='')
00222     def login(self, realm, consumer_key):
        """D-Bus method, exported over the bus, to initiate an OAuth login"""
        logger.debug("login() D-Bus message received with realm='%s', " +
                     "consumer_key='%s'", realm, consumer_key)
        if self.currently_authing:
            logger.debug("Currently in the middle of OAuth: rejecting this")
            return
        self.currently_authing = True
        self.processor.login(realm, consumer_key)

    @dbus.service.method(dbus_interface=DBUS_IFACE_AUTH_NAME,
                                           in_signature='ssb', out_signature='')
00234     def maybe_login(self, realm, consumer_key, do_login):
        """
        D-Bus method, exported over the bus, to maybe initiate an OAuth login
        """
        logger.debug("maybe_login() D-Bus message received with realm='%s', " +
                     "consumer_key='%s'", realm, consumer_key)
        if self.currently_authing:
            logger.debug("Currently in the middle of OAuth: rejecting this")
            return
        self.currently_authing = True
        self.processor.login(realm, consumer_key, do_login)

    @dbus.service.method(dbus_interface=DBUS_IFACE_AUTH_NAME,
                         in_signature='ss', out_signature='')
00248     def clear_token(self, realm, consumer_key):
        """
        D-Bus method, exported over the bus, to clear the existing token.
        """
        self.processor.clear_token(realm, consumer_key)

    @dbus.service.signal(dbus_interface=DBUS_IFACE_AUTH_NAME, signature='ss')
00255     def NewCredentials(self, realm, consumer_key):
        """Fire D-Bus signal when the user accepts authorization."""
        logger.debug("Firing the NewCredentials signal")
        self.currently_authing = False
        return (self.processor.realm, self.processor.consumer_key)

    @dbus.service.signal(dbus_interface=DBUS_IFACE_AUTH_NAME)
00262     def AuthorizationDenied(self):
        """Fire the signal when the user denies authorization."""
        self.currently_authing = False

    @dbus.service.signal(dbus_interface=DBUS_IFACE_AUTH_NAME)
00267     def NoCredentials(self):
        """Fired when the user does not have a token in the keyring."""
        self.currently_authing = False

    @dbus.service.signal(dbus_interface=DBUS_IFACE_AUTH_NAME, signature='s')
00272     def OAuthError(self, message):
        """Fire the signal when an error needs to be propagated to the user."""
        self.currently_authing = False
        return message

def main():
    """Start everything"""
    logger.debug("Starting up at %s", time.asctime())
    logger.debug("Installing the Twisted glib2reactor")
    from twisted.internet import glib2reactor # for non-GUI apps
    glib2reactor.install()
    from twisted.internet import reactor

    logger.debug("Creating the D-Bus service")
    Login(dbus.service.BusName(DBUS_IFACE_AUTH_NAME,
                               bus=dbus.SessionBus()))
    # cleverness here to say:
    # am I already running (bound to this d-bus name)?
    # if so, send a signal to the already running instance
    # this means that this app can be started from an x-ubutnuone: URL
    # to kick off the signin process
    logger.debug("Starting the reactor mainloop")
    reactor.run()


Generated by  Doxygen 1.6.0   Back to index