db.py - Core tables and functions¶
import datetime
import jwt
import os
import random
import re
from gluon import current
import logging
logger = logging.getLogger(settings.logger)
logger.setLevel(settings.log_level)
## if you need to use OpenID, Facebook, MySpace, Twitter, Linkedin, etc.
## register with janrain.com, write your domain:api_key in private/janrain.key
from gluon.contrib.login_methods.rpx_account import use_janrain use_janrain(auth,filename=’private/janrain.key’)
try:
from gluon.contrib.login_methods.janrain_account import RPXAccount
except ImportError:
print("Warning you should upgrade to a newer web2py for better janrain support")
from gluon.contrib.login_methods.rpx_account import RPXAccount # noqa: F401
from gluon.contrib.login_methods.extended_login_form import ( # noqa: F401
ExtendedLoginForm,
)
from gluon.tools import Auth, Crud, Service, PluginManager, prettydate # noqa: F401
#########################################################################
## This scaffolding model makes your app work on Google App Engine too
## File is released under public domain and you can use without limitations
#########################################################################
## if SSL/HTTPS is properly configured and you want all HTTP requests to
## be redirected to HTTPS, uncomment the line below:
request.requires_htps()
If the bookserver owns (performs migrations) on this table, then don’t do web2py migrations on it.
def bookserver_owned(table_name):
return False
table_migrate_prefix = "runestone_"
if not request.env.web2py_runtime_gae:
## if NOT running on Google App Engine use SQLite or other DB
if os.environ.get("WEB2PY_CONFIG", "") == "test":
db = DAL(
settings.database_uri,
migrate=False,
pool_size=5,
adapter_args=dict(logfile="test_runestone_migrate.log"),
migrate_enabled=(
os.environ.get("WEB2PY_MIGRATE", "Yes") in ["Yes", "Fake"]
),
)
table_migrate_prefix = "test_runestone_"
else:
WEB2PY_MIGRATE is either “Yes”, “No”, “Fake”, or missing
db = DAL(
settings.database_uri,
pool_size=10,
fake_migrate_all=(os.environ.get("WEB2PY_MIGRATE", "Yes") == "Fake"),
migrate=False,
migrate_enabled=(
os.environ.get("WEB2PY_MIGRATE", "Yes") in ["Yes", "Fake"]
),
)
session.connect(
request,
response,
db,
masterapp=None,
migrate=table_migrate_prefix + "web2py_sessions.table",
)
else:
## connect to Google BigTable (optional 'google:datastore://namespace')
db = DAL("google:datastore")
## store sessions and tickets there
session.connect(request, response, db=db)
## or store session in Memcache, Redis, etc.
## from gluon.contrib.memdb import MEMDB
## from google.appengine.api.memcache import Client
## session.connect(request, response, db = MEMDB(Client()))
For LTI you may want to open Runestone in an iframe. This is tricky and can run afoul of browser settings that disable 3rd party tracking. However this seems to do the trick at least from the test tool at https://lti.tools/saltire/tc - More testing with Canvas and Company is required. The Content Request launch also works in an iframe.
This seems like it should allow us to share the session cookie across subdomains. and seems to work for every browser except for Safari I’m not sure what the issue is… So I’m commenting it out until I understand what is gong on.
- if settings.session_domain and “session_id_runestone” in response.cookies:
response.cookies[“session_id_runestone”][“domain”] = settings.session_domain
response.optimize_css = ‘concat,minify,inline’ response.optimize_js = ‘concat,minify,inline’
#########################################################################
## Here is sample code if you need for
## - email capabilities
## - authentication (registration, login, logout, ... )
## - authorization (role based authorization)
## - services (xml, csv, json, xmlrpc, jsonrpc, amf, rss)
## - old style crud actions
## (more options discussed in gluon/tools.py)
#########################################################################
auth = Auth(db, hmac_key=Auth.get_or_create_key())
crud, service, plugins = Crud(db), Service(), PluginManager()
Make the settings and database available in modules.
current.db = db
current.settings = settings
current.auth = auth
if settings.enable_captchas:
## Enable captcha's :-(
from gluon.tools import Recaptcha
auth.settings.captcha = Recaptcha(
request,
"6Lfb_t4SAAAAAB9pG_o1CwrMB40YPsdBsD8GsvlD",
"6Lfb_t4SAAAAAGvAHwmkahQ6s44478AL5Cf-fI-x",
options="theme:'blackglass'",
)
auth.settings.login_captcha = False
auth.settings.retrieve_password_captcha = False
auth.settings.retrieve_username_captcha = False
Set up for two-factor authentication. auth.settings.auth_two_factor_enabled = True auth.settings.two_factor_methods = [lambda user, auth_two_factor: ‘password_here’]
courses
table¶
## create all tables needed by auth if not custom tables
db.define_table(
"courses",
Field("course_name", "string", unique=True),
Field("term_start_date", "date"),
Field("institution", "string"),
Field("base_course", "string"),
Field("python3", type="boolean", default=True),
Field("login_required", type="boolean", default=True),
Field("allow_pairs", type="boolean", default=False),
Field("student_price", type="integer"),
Field("downloads_enabled", type="boolean", default=False),
Field("courselevel", type="string"),
Field("new_server", type="boolean", default=False),
migrate=bookserver_owned("courses"),
)
Provide a common query. Pass db.courses.ALL
to retrieve all fields; otherwise, only the course_name
and base_course
are selected.
Make this available to modules.
Provide the correct URL to a book, based on if it’s statically or dynamically served. This function return URL(*args) and provides the correct controller/function based on the type of the current course (static vs dynamic).
def get_course_url(*args):
course = db(db.courses.id == auth.user.course_id).select().first()
args = tuple(x for x in args if x != "")
if course:
return URL(
a=settings.bks,
c="books",
f="published",
args=(course.course_name,) + args,
)
else:
return URL(c="default")
########################################
def getCourseNameFromId(courseid):
used to compute auth.user.course_name field
q = db.courses.id == courseid
row = db(q).select().first()
return row.course_name if row else ""
def getCourseOrigin(base_course):
res = (
db(
(db.course_attributes.course_id == db.courses.id)
& (db.courses.course_name == base_course)
& (db.course_attributes.attr == "markup_system")
)
.select(db.course_attributes.value, **SELECT_CACHE)
.first()
)
return res
def getCourseAttributesDict(course_id):
attributes = db(db.course_attributes.course_id == course_id).select(**SELECT_CACHE)
attrdict = {row.attr: row.value for row in attributes}
return attrdict
def verifyInstructorStatus(course, instructor):
Make sure that the instructor specified is actually an instructor for the given course.
res = False
if type(course) == str:
course = (
db(db.courses.course_name == course)
.select(db.courses.id, **SELECT_CACHE)
.first()
)
try:
res = (
db(
(db.course_instructor.course == course)
& (db.course_instructor.instructor == instructor)
).count(**COUNT_CACHE)
> 0
)
except Exception as e:
logger.error(f"VIS -- {e}")
db.rollback()
res = (
db(
(db.course_instructor.course == course)
& (db.course_instructor.instructor == instructor)
).count(**COUNT_CACHE)
> 0
)
return res
def is_editor(userid):
ed = db(db.auth_group.role == "editor").select(db.auth_group.id).first()
row = (
db((db.auth_membership.user_id == userid) & (db.auth_membership.group_id == ed))
.select()
.first()
)
if row:
return True
else:
return False
def is_author(userid):
ed = db(db.auth_group.role == "author").select(db.auth_group.id).first()
row = (
db((db.auth_membership.user_id == userid) & (db.auth_membership.group_id == ed))
.select()
.first()
)
if row:
return True
else:
return False
class IS_COURSE_ID:
used to validate that a course name entered (e.g. devcourse) corresponds to a valid course ID (i.e. db.courses.id)
Do not allow any of the reserved CSS characters in a username.
class HAS_NO_DOTS:
def __init__(
self,
error_message=r"""Your username may not contain spaces or any other special characters: !"#$%&'()*+,./:;<=>?@[\]^`{|}~ just letters and numbers""",
):
self.e = error_message
def __call__(self, value):
match = re.search(r"""[!"#$%&'()*+,./:;<=>?@[\]^`{|}~ ]""", value)
if match:
exist = db(db.auth_user.username == value).count()
if exist > 0: # user already registered give them a pass
return (value, None)
self.e = f"""Your username may not contain a {match.group(0).replace(" ","space")} or any other special characters except - or _"""
return (value, self.e)
return (value, None)
def formatter(self, value):
return value
db.define_table(
"auth_user",
Field("username", type="string", label=T("Username")),
Field("first_name", type="string", label=T("First Name")),
Field("last_name", type="string", label=T("Last Name")),
Field(
"email",
type="string",
requires=IS_EMAIL(banned="^.*shoeonlineblog\\.com$"),
label=T("Email"),
),
Field("password", type="password", readable=False, label=T("Password")),
Field(
"created_on",
"datetime",
default=request.now,
label=T("Created On"),
writable=False,
readable=False,
),
Field(
"modified_on",
"datetime",
default=request.now,
label=T("Modified On"),
writable=False,
readable=False,
update=request.now,
),
Field("registration_key", default="", writable=False, readable=False),
Field("reset_password_key", default="", writable=False, readable=False),
Field("registration_id", default="", writable=False, readable=False),
Field(
"course_id",
"reference courses",
label=T("Course Name"),
required=True,
default=1,
),
Field(
"course_name",
compute=lambda row: getCourseNameFromId(row.course_id),
readable=False,
writable=False,
),
Field(
"accept_tcp", required=True, type="boolean", default=True, label=T("I Accept")
),
Field("active", type="boolean", writable=False, readable=False, default=True),
Field("donated", type="boolean", writable=False, readable=False, default=False),
format=’%(username)s’,
format=lambda u: (u.first_name or "") + " " + (u.last_name or ""),
migrate=bookserver_owned("auth_user"),
)
db.auth_user.first_name.requires = IS_NOT_EMPTY(error_message=auth.messages.is_empty)
db.auth_user.last_name.requires = IS_NOT_EMPTY(error_message=auth.messages.is_empty)
db.auth_user.password.requires = CRYPT(key=auth.settings.hmac_key)
db.auth_user.username.requires = (
HAS_NO_DOTS(),
IS_NOT_IN_DB(db, db.auth_user.username),
)
db.auth_user.registration_id.requires = IS_NOT_IN_DB(db, db.auth_user.registration_id)
db.auth_user.email.requires = (
IS_EMAIL(error_message=auth.messages.invalid_email),
IS_NOT_IN_DB(db, db.auth_user.email),
)
db.auth_user.course_id.requires = IS_COURSE_ID()
auth.define_tables(username=True, signature=False, migrate=table_migrate_prefix + "")
Because so many pages rely on views/_sphinx_static_file.html
it makes
sense to provide some default values for variables used in the template here
The latex_preamble attribute can be used for any custom latex macros used in
the text, that need to be available for grading, assignments, and practice
This is used in nearly every PreTeXt book.
See models/db_ebook.py
for course_attributes table
bc = db(db.courses.course_name == base_course).select().first()
res = (
db(
(db.course_attributes.course_id == bc.id)
& (db.course_attributes.attr == "latex_macros")
)
.select()
.first()
)
request.latex_preamble = res.value if res else ""
## configure email
mail = auth.settings.mailer
mail.settings.server = settings.email_server
mail.settings.sender = settings.email_sender
mail.settings.login = settings.email_login
## configure auth policy
auth.settings.registration_requires_verification = False
auth.settings.registration_requires_approval = False
auth.settings.reset_password_requires_verification = True
auth.settings.register_next = URL("default", "index")
change default session login time from 1 hour to 24 hours
auth.settings.expiration = 3600 * 24
janrain_url = "http://%s/%s/default/user/login" % (
request.env.http_host,
request.application,
)
db.define_table(
"user_courses",
Field("user_id", db.auth_user, ondelete="CASCADE"),
Field("course_id", db.courses, ondelete="CASCADE"),
Field("user_id", db.auth_user),
Field("course_id", db.courses),
migrate=bookserver_owned("user_courses"),
)
For whatever reason the automatic migration of this table failed. Need the following manual statements alter table user_courses alter column user_id type integer using user_id::integer; alter table user_courses alter column course_id type integer using course_id::integer;
#########################################################################
## Define your tables below (or better in another model file) for example
##
## >>> db.define_table('mytable',Field('myfield','string'))
##
## Fields can be 'string','text','password','integer','double','boolean'
## 'date','time','datetime','blob','upload', 'reference TABLENAME'
## There is an implicit 'id integer autoincrement' field
## Consult manual for more options, validators, etc.
##
## More API examples for controllers:
##
## >>> db.mytable.insert(myfield='value')
## >>> rows=db(db.mytable.myfield=='value').select(db.mytable.ALL)
## >>> for row in rows: print(row.id, row.myfield)
#########################################################################
mail.settings.server = settings.email_server mail.settings.sender = settings.email_sender mail.settings.login = settings.email_login
auth.messages.retrieve_username_subject = "Runestone Academy username"
auth.messages.reset_password_subject = "Runestone Academy password"
auth.messages.retrieve_username = """<html>
Hello,
<br>
<p>We received your request to retrieve your username. According to our files
Your username is: %(username)s </p>
<p>If you have any trouble with this automated system you can also ask your instructor
and they can help you retrieve your username or reset your password. If you are
an instructor, you can (as a last resort) contact Runestone by creating an issue
on <a href="https://github.com/RunestoneInteractive/RunestoneServer/issues">Github</a>.</p>
<p>This message was generated automatically and comes from an unmonitored email address. If you reply to this message a human will not see it. Use the github link above if you need help from a real person.</p>
Thanks for using Runestone!<br><br>
Brad Miller
</html>
"""
auth.messages.reset_password = """<html>
Hello, <br>
<p>If you click on <a href="%(link)s">this link</a> you will reset your password. Sometimes schools have software that tries to sanitize the previous link and makes it useless.</p>
<p>If you get a 404 try changing the http to https in the link provided.</p>
<p>If you have any trouble with the link you can also ask your instructor
and they can help you retrieve your username or reset your password. If you are
an instructor, you can (as a last resort) contact Runestone by creating an issue
on <a href="https://github.com/RunestoneInteractive/RunestoneServer/issues">Github</a>.</p>
<p>This message was generated automatically and comes from an unmonitored email address. If you reply to this message a human will not see it. Use the github link above if you need help from a real person.</p>
Thanks for using Runestone!<br><br>
Brad Miller
</html>
"""
Make sure the latest version of admin is always loaded.
response.headers[“Access-Control-Allow-Origin”] = “*”
def check_for_donate_or_build(field_dict, id_of_insert):
if "donate" in request.vars:
session.donate = request.vars.donate
if "ccn_checkbox" in request.vars:
session.build_course = True
if "auth_user" in db:
db.auth_user._after_insert.append(check_for_donate_or_build)
def admin_logger(logger):
if settings.academy_mode:
if auth.user:
sid = auth.user.username
course = auth.user.course_name
else:
sid = "Anonymous"
course = "boguscourse"
try:
db.useinfo.insert(
sid=sid,
act=request.function,
div_id=request.env.query_string or "no params",
event=request.controller,
timestamp=datetime.datetime.utcnow(),
course_id=course,
)
except Exception as e:
logger.error(
f"failed to insert log record for {request.controller} {request.function}: {e}"
)
db.rollback()
def createUser(username, password, fname, lname, email, course_name, instructor=False):
cinfo = db(db.courses.course_name == course_name).select().first()
if not cinfo:
raise ValueError("Course {} does not exist".format(course_name))
pw = CRYPT(auth.settings.hmac_key)(password)[0]
uid = db.auth_user.insert(
username=username,
password=pw,
first_name=fname,
last_name=lname,
email=email,
course_id=cinfo.id,
course_name=course_name,
active="T",
created_on=datetime.datetime.now(),
)
db.user_courses.insert(user_id=uid, course_id=cinfo.id)
if instructor:
irole = db(db.auth_group.role == "instructor").select(db.auth_group.id).first()
db.auth_membership.insert(user_id=uid, group_id=irole)
db.course_instructor.insert(course=cinfo.id, instructor=uid)
def _validateUser(username, password, fname, lname, email, course_name, line):
errors = []
if auth.user.course_name != course_name:
errors.append(f"Course name does not match your course on line {line}")
cinfo = db(db.courses.course_name == course_name).select().first()
if not cinfo:
errors.append(f"Course {course_name} does not exist on line {line}")
match = re.search(r"""[!"#$%&'()*+,./:;<=>?@[\]^`{|}~ ]""", username)
if match:
errors.append(
f"""Username cannot contain a {match.group(0).replace(" ", "space")} on line {line}"""
)
uinfo = db(db.auth_user.username == username).count()
if uinfo > 0:
errors.append(f"Username {username} already exists on line {line}")
if password == "":
errors.append(f"password cannot be blank on line {line}")
if "@" not in email:
errors.append(f"Email address missing @ on line {line}")
return errors
def create_rs_token():
d = dict(sub=auth.user.username)
expires = datetime.timedelta(days=105)
_create_access_token(d, expires=expires)
This function is basically copied from the fastapi_login plugin see their github repo
Helper function to create the encoded access token using the provided secret and the algorithm of the LoginManager instance
- Args:
data (dict): The data which should be stored in the token expires (datetime.timedelta): An optional timedelta in which the token expires.
Defaults to 15 minutes
scopes (Collection): Optional scopes the token user has access to.
- Returns:
The encoded JWT with the data and the expiry. The expiry is available under the ‘exp’ key
default to 105 days expiry times about a semester
the secret key value should be set in 1.py as part of the web2py installation.
Added Runestone-only code: set cookie
if encoded_jwt:
response.cookies["access_token"] = encoded_jwt
response.cookies["access_token"]["expires"] = 24 * 3600 * 105 # 15 weeks
response.cookies["access_token"]["path"] = "/"
if "LOAD_BALANCER_HOST" in os.environ:
response.cookies["access_token"]["domain"] = os.environ[
"LOAD_BALANCER_HOST"
]
decode here decodes the byte str to a normal str not the token
This may be a workaround for the bad urls in email resets