Greetings, I've written the attached simple web app. It serves as a more pleasant user interface to a file located on our journal proxy server. It relies on the Paramiko library for sftp and Mako for templating. Basic file functions are as follows: * ule: The user starts here. A page is produced based on the template, which includes the deciphered output of any GET messages received. * ule_post: The view/controller for any posted actions. Acts on POST and redirects the user to "ule". * ule_model.py: The model -- all the real data handling goes on here. * sftp_wrapper.py: A wrapper for the Paramiko library to simplify sftp calls. * ule_template.html: The Mako template for the main page of the app. * redirect.html: Another Mako template for the redirect.
It functions without any major bugs that I'm aware of. I took advantage of the manageable scale of this project to really focus on writing clean code. I hope it will help others looking to do anything similar. I would appreciate any comments on its style and organization. Gabe
#!/usr/bin/python
'''
ule -- page display for the EZProxy User List Editor -- reads GET
form to determine variables for display. Processing is sent via
POST to ulepost.
'''
# uncomment cgitb for testing
#import cgitb; cgitb.enable()
import cgi
import sys
sys.path.insert(0, 'lib/python')
from mako.template import Template
from ule_model import UserList
# this form should be GET
form = cgi.FieldStorage()
ul = UserList()
def main():
try:
userpass_list = ul.getList()
if form.getvalue('submit') == 'please wait':
userpass_list = ul.check_against_innopac()
finally:
ul.closeSFTP()
tmpl = Template(filename='ule_template.html')
out_stream = tmpl.render(
form = form,
userpass_list = userpass_list,
)
print 'Content-Type: text/html\n'
print out_stream
if __name__ == '__main__':
main()
#!/usr/bin/python
'''
ule_post -- for handling POST from ule and redirecting back to ule with
proper GET.
'''
# uncomment cgitb for testing
#import cgitb; cgitb.enable()
import cgi
import re
from urllib import urlencode
import sys
sys.path.insert(0, 'lib/python')
from mako.template import Template
from ule_model import UserList, UleError
form = cgi.FieldStorage()
# initialize GET list -- will be appended to ule url
GET_params = []
ul = UserList()
def add():
username = ''
password = ''
username_valid = None
password_valid = None
not_allowed = re.compile(r'[^%s]' % ul.allowed)
if 'username' not in form:
GET_params.append(('username_status', 'empty'))
else:
username = form['username'].value
if not_allowed.search(username):
GET_params.append(('username_status', 'illegal'))
else:
for userpass in ul.getList():
if username == userpass[0]:
GET_params.append(('username_status', 'in_list'))
break
else:
username_valid = True
if 'password' not in form:
GET_params.append(('password_status', 'empty'))
else:
password = form['password'].value
if not_allowed.search(password):
GET_params.append(('password_status', 'illegal'))
else:
password_valid = True
if username_valid and password_valid:
ul.add(username, password)
GET_params.append(('username_status', 'added'))
GET_params.append(('added', username))
else:
if username:
GET_params.append(('username', username))
if password:
GET_params.append(('password', password))
def remove():
if 'userlist' not in form:
GET_params.append(('none_removed', True))
else:
user_list = form.getlist('userlist')
removed_list = ul.remove(user_list)
for username in removed_list:
GET_params.append(('removed', username))
def undo():
try:
bak_days, bak_seconds = ul.undo()
GET_params.append(('bak_days', bak_days))
GET_params.append(('bak_seconds', bak_seconds))
except UleError, e:
if str(e) == 'no backups to revert to':
GET_params.append(('no_backups', True))
def redirect():
url_add = urlencode(GET_params)
url = 'ule?' + url_add
print '''Status: 303 See Other
Location: %s
Pragma: no-cache
Content-Type: text/html
''' % url
redirect_template = Template(filename='redirect.html')
print redirect_template.render(
url = url,
)
def main():
try:
if form.getvalue('submit') == 'add':
add()
if form.getvalue('submit') == 'remove':
remove()
if form.getvalue('submit') == 'undo':
undo()
redirect()
# sync after everything else so render doesn't wait for it
ul.sync()
finally:
ul.closeSFTP()
if __name__ == '__main__':
main()
''' ule_model -- for maintaining the integrity of ule's data. ''' import re from datetime import datetime import os from time import strptime from sftp_wrapper import Session MAIN_SERVER = 'ezproxy.library.xxxxxx.edu' USERNAME = 'xxxxxx' PASSWORD = 'xxxxxxx' LDAP_USR_PATH = '/usr/local/ezproxy/ldap.usr' DUMPURL = 'http://innopac.library.xxxxxx.edu:4500/PATRONAPI/%s/dump' SECONDARY_SERVER = 'ezproxy2.library.xxxxxx.edu' class UleError(Exception): pass class UserList(object): def __init__(self): # characters allowed in usernames and passwords self.allowed = r'\w-' # userpass is the regex to parse ldap.usr for usernames and # passwords self.userpass_re = re.compile(r''' ^ # start of line ([%s]*) # username : # colon ([%s]*) # password $ # end of line ''' % (self.allowed, self.allowed), re.VERBOSE | re.MULTILINE) self.openSFTP() def openSFTP(self): self.ss = Session(MAIN_SERVER, USERNAME, PASSWORD) def closeSFTP(self): self.ss.close() def getList(self): ldap_usr_file = self.ss.open(LDAP_USR_PATH) self.ldap_usr_file_str = ldap_usr_file.read() ldap_usr_file.close() return self.userpass_re.findall(self.ldap_usr_file_str) def _getAdministrativa(self): admin_section = self.userpass_re.split(self.ldap_usr_file_str, maxsplit = 1)[0] return admin_section def _backup(self): now = datetime.now().strftime('%Y-%m-%dT%H:%M:%S') backup_file_name = LDAP_USR_PATH + '.' + now + '.bak' backup_file = self.ss.open(backup_file_name, 'w') backup_file.write(self.ldap_usr_file_str) backup_file.close() def _write(self, userpasses): self._backup() ldap_usr_file = self.ss.open(LDAP_USR_PATH, 'w') ldap_usr_file.write(self._getAdministrativa()) userpass_lines = [username + ':' + password for (username, password) in userpasses] ldap_usr_file.write('\n'.join(userpass_lines)) ldap_usr_file.close() def add(self, username, password): userpasses = self.getList() userpasses.append((username, password)) # Decorate-Sort-Undecorate dsu_list = [(x[0].lower(), x[0], x[1]) for x in userpasses] dsu_list.sort() userpasses = [(x[1], x[2]) for x in dsu_list] self._write(userpasses) def remove(self, user_list): userpasses = self.getList() removed_list = [] for removal in user_list: removal_tuple = tuple(removal.split(':')) username = removal_tuple[0] if removal_tuple in userpasses: userpasses.remove(removal_tuple) removed_list.append(username) self._write(userpasses) return removed_list def getBackups(self): ldap_dir = os.path.dirname(LDAP_USR_PATH) file_list = self.ss.listdir(ldap_dir) backup_list = [x for x in file_list if x.startswith('ldap.usr.') and x.endswith('.bak')] backup_list.sort() return (backup_list, ldap_dir) def undo(self): backup_list, ldap_dir = self.getBackups() try: most_recent = os.path.join(ldap_dir, backup_list.pop()) except IndexError: raise UleError('no backups to revert to') bak_file = self.ss.open(most_recent) bak_file_str = bak_file.read() bak_file.close() ldap_usr_file = self.ss.open(LDAP_USR_PATH, 'w') ldap_usr_file.write(bak_file_str) self.ss.remove(most_recent) bak_time = most_recent.split('.')[2] now = datetime.now() bak_time_obj = datetime(*strptime(bak_time, "%Y-%m-%dT%H:%M:%S")[0:6]) bak_time_delta = now - bak_time_obj return (bak_time_delta.days, bak_time_delta.seconds) def check_against_innopac(self): def innocheck(username): 'check username against innopac dump' import urllib2 dumpurl = DUMPURL % username dump = urllib2.urlopen(dumpurl) for line in dump: if 'REC INFO' in line: return True new_userlist = [] for userpass in self.getList(): if not innocheck(userpass[0]): # the flag is len(userpass) == 3 userpass = userpass + (1,) new_userlist.append(userpass) return new_userlist def _cleanup(self, max=10): 'delete backups in excess of max' backup_list, ldap_dir = self.getBackups() backup_list.reverse() def cleanup_rec(): if len(backup_list) > max: oldest = os.path.join(ldap_dir, backup_list.pop()) self.ss.remove(oldest) cleanup_rec() cleanup_rec() def sync(self): 'sync MAIN_SERVER and SECONDARY_SERVER' self._cleanup() ss2 = Session(SECONDARY_SERVER, USERNAME, PASSWORD) def remoteCopy(path): file1 = self.ss.open(path) file1_str = file1.read() file1.close() file2 = ss2.open(path, 'w') file2.write(file1_str) file2.close() # copy ldap.usr from main to secondary remoteCopy(LDAP_USR_PATH) # remove old backups and copy new backup_list, ldap_dir = self.getBackups() file_list2 = ss2.listdir(ldap_dir) backup_list2 = [x for x in file_list2 if x.startswith('ldap.usr.') and x.endswith('.bak')] for backup in backup_list2: backup_path = os.path.join(ldap_dir, backup) ss2.remove(backup_path) for backup in backup_list: backup_path = os.path.join(ldap_dir, backup) remoteCopy(backup_path) ss2.close()
'''
sftp_wrapper is a wrapper class for the sftp portion of the
paramiko library, located in lib/python/
'''
import sys
sys.path.insert(0, 'lib/python')
import paramiko
class Session(object):
def __init__(self, hostname, username, password):
self.make_sftp(hostname, username, password)
# uncomment to save log -- for testing
# hmm, doesn't seem to be working -- gsf 20061220
#paramiko.util.log_to_file('sftp.log')
def make_sftp(self, hostname, username, password):
self.tunnel = paramiko.Transport((hostname, 22))
hostkeys = paramiko.util.load_host_keys('known_hosts')
hostkey = hostkeys[hostname]['ssh-rsa']
self.tunnel.connect(username=username, password=password,
hostkey=hostkey)
self.sftp = paramiko.SFTPClient.from_transport(self.tunnel)
def open(self, filename, mode='r'):
return self.sftp.open(filename, mode=mode)
def chdir(self, dir):
self.sftp.chdir(dir)
def get(self, remote, local=None):
if not local:
local = remote
self.sftp.get(remote, local)
def put(self, local, remote=None):
if not remote:
remote = local
self.sftp.put(local, remote)
def listdir(self, remote):
return self.sftp.listdir(remote)
def remove(self, remote):
self.sftp.remove(remote)
def close(self):
self.tunnel.close()
Title: EZProxy User List Editor
EZProxy User List Editor(beta)
Manually created users must be added to this EZProxy User List in order to get access to online journals.
% for userpass in userpass_list:
${userpass[0]}:${userpass[1]}
% endfor
Revert to previous backup.
Users in red identify patron records with a duplicate or missing f: field (USERID). Patron access will not work until a unique f: field (USERID) is established.
This tool is in beta testing. Please send any questions or comments to Gabriel Farrell.
303 See Other
The response to your request is located at ${url | h}.
_______________________________________________ Tutor maillist - [email protected] http://mail.python.org/mailman/listinfo/tutor
