login here
top

OpenID test consumer

There's a nice consumer test server here

This is what I'm using for public tests of prooveme.com

But something else is needed when you're developing a site that the Internet can't see. For example, my laptop sits behind a NATed network. So clearly the openid test consumer couldn't authenticate to it even if it wanted to.

This is where OpenIDTestConsumer comes in.

It's a simple little python HTTP server which will authenticate to whatever you ask it to. For example:

 http://localhost:8000/?login=http://prooveme.com/id/nicferrier

will cause it to authenticate against my OpenId on prooveme.com.

But I could also authenticate an OpenId behind my NAT point:

http://localhost:8000/?login=http://localhost/openid/nic

Simple registration

This test consumer can also do simple registration requests:

http://localhost:8000/?login=http://localhost/openid/nic&openid.sreg.optional=fullname,dob

or:

http://localhost:8000/?login=http://localhost/openid/nic&openid.sreg.required=email,openid.sreg.optional=fullname,dob

Source code

The code isn't long so in the interests of indexing here it is:

#!/usr/bin/python
import BaseHTTPServer
import Cookie
import cgi
import os
import os.path
import sys
import re
import urllib
import logging
from StringIO import StringIO
# HTTP server
import openid.consumer.consumer
import openid.store.filestore
sess_store = {}
def sreg_data_as_html(params):
    def rowize(key):
        return "<tr><td>%s</td><td>%s</td></tr>" % (key, params[key])
    keys = params.keys()
    lst = [rowize(key) for key in filter(lambda name: name.find("openid.sreg.") == 0, keys)]
    return "\n".join(lst)
class OpenIDRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
    def handle_one_request(self):
        """Handle a single HTTP request. Pinched from python."""
        self.raw_requestline = self.rfile.readline()
        if not self.raw_requestline:
            self.close_connection = 1
            return
        if not self.parse_request(): # An error code has been sent, just exit
            return
        # FieldStorage MUST have be told about the HTTP method
        spoof_env = { "REQUEST_METHOD": self.command }
        field_params = {}
        try:
            body_params = cgi.FieldStorage(self.rfile, self.headers, "", spoof_env)
            field_params.update(body_params)
        except:
            pass
        # Check the path for a query
        path_part = self.path
        query_match = re.match("(.*)\\?(.*)", path_part)
        if query_match:
            path_part = query_match.group(1)
            query = query_match.group(2)
            query_params = cgi.parse_qs(query)
            field_params.update(query_params)
        params = {}
        for name, value in field_params.iteritems():
            params[name] = value[0]
        # Setup openid
        global sess_store
        store = openid.store.filestore.FileOpenIDStore(os.path.join(os.getcwd(), ".openidstore"))
        con = openid.consumer.consumer.Consumer(sess_store, store)
        # What request is this? login or callback from openid ip?
        try:
            openid_id = params["login"]
        except:
            # It's a callaback
            response = con.complete(params)
            if response.status == 'success':
                self.send_response(200, self.responses[200][0])
                self.send_header("XXX-done", "done")
                self.end_headers()
                print >>self.wfile, """<html>
<body>
 <p>Authentication succeeded</p>
 <table>
   %s
 </table>
</body>
</html>""" % (sreg_data_as_html(params))
            elif response.status == "failure":
                self.send_response(400, self.responses[400][0])
                self.send_header("XXX-done", "done")
                self.end_headers()
                print >>self.wfile, "<html><body>failed: %s</body></html>" % response.message
            elif response.status == "setup_needed":
                self.send_response(400, self.responses[400][0])
                self.send_header("XXX-done", "done")
                self.end_headers()
                print >>self.wfile, "<html><body>setup needed.</body></html>"
            elif response.status == "cancel":
                self.send_response(400, self.responses[400][0])
                self.send_header("XXX-done", "done")
                self.end_headers()
                print >>self.wfile, "<html><body>authentication failed.</body></html>"
        else:
            # It's a login request
            auth_req = con.begin(openid_id)
            try:
                m = re.match("(.*)\?.*", self.path)
                m.group(1)
            except:
                print >>sys.stderr, "failed to parse the server's URL from the path: " + self.path
                return
            else:
                immediate_on = False
                try:
                    immediate_on = params["immediate"] != None
                except KeyError:
                    pass
                serverurl = "http://" + self.headers["host"] + m.group(1)
                redirect_url = auth_req.redirectURL(serverurl, serverurl, immediate = immediate_on)
                # Should the redirect url have simple registration fields added to it?
                try:
                    required = params["openid.sreg.required"]
                    redirect_url = redirect_url + "&openid.sreg.required=" + required
                except KeyError:
                    pass
                try:
                    optional = params["openid.sreg.optional"]
                    redirect_url = redirect_url + "&openid.sreg.optional=" + optional
                except KeyError:
                    pass
                print >>sys.stderr, "redirect url: %s" % (redirect_url)
                # Send the response
                self.send_response(302, "Moved")
                self.send_header("Location", redirect_url)
                self.end_headers()
        return
def run(HandlerClass = OpenIDRequestHandler,
        ServerClass = BaseHTTPServer.HTTPServer):
    BaseHTTPServer.test(HandlerClass, ServerClass)
if __name__ == '__main__':
    import pdb
    try:
        #init()
        os.environ["DEBUG"] == "yes"
    except:
        run()
    else:
        pdb.runcall(run)
# End
Nic Ferrier