lti.py - LTI Endpoint for integrating with LMS¶
Imports¶
These are listed in the order prescribed by PEP 8.
Standard library¶
Third-party imports¶
Local application imports¶
Code¶
For some reason, URL query parameters are being processed twice by Canvas and returned as a list, like [23, 23]. So, just take the first element in the list.
Main LTI Launch Endpoint¶
Basic processing of the LTI request starts here this first block is about getting the user information provided by the LMS
myrecord = None
consumer = None
masterapp = None
userinfo = None
user_id = request.vars.get("user_id", None)
last_name = request.vars.get("lis_person_name_family", None)
first_name = request.vars.get("lis_person_name_given", None)
full_name = request.vars.get("lis_person_name_full", None)
message_type = request.vars.get("lti_message_type")
course_id = _param_converter(request.vars.get("custom_course_id", None))
if course_id:
Allow the course_id to be either a number or the name of the course, as a string.
try:
course_id = int(course_id)
except ValueError:
course_id = (
db(db.courses.course_name == course_id).select(**SELECT_CACHE).first()
)
course_id = course_id.id
if full_name and not last_name:
names = full_name.strip().split()
last_name = names[-1]
first_name = " ".join(names[:-1])
email = request.vars.get("lis_person_contact_email_primary", None)
instructor = ("Instructor" in request.vars.get("roles", [])) or (
"TeachingAssistant" in request.vars.get("roles", [])
)
result_source_did = request.vars.get("lis_result_sourcedid", None)
outcome_url = request.vars.get("lis_outcome_service_url", None)
Deprecated: the use of the non-LTI-compliant name assignment_id
. The parameter should be custom_assignment_id
.
assignment_id = _param_converter(
request.vars.get(
"custom_assignment_id", request.vars.get("assignment_id", None)
)
)
practice = request.vars.get("practice", None)
if user_id is None:
return dict(
logged_in=False,
lti_errors=["user_id is required for this tool to function", request.vars],
masterapp=masterapp,
)
elif first_name is None:
return dict(
logged_in=False,
lti_errors=[
"First Name is required for this tool to function",
request.vars,
],
masterapp=masterapp,
)
elif last_name is None:
return dict(
logged_in=False,
lti_errors=[
"Last Name is required for this tool to function",
request.vars,
],
masterapp=masterapp,
)
elif email is None:
return dict(
logged_in=False,
lti_errors=["Email is required for this tool to function", request.vars],
masterapp=masterapp,
)
else:
userinfo = dict()
userinfo["first_name"] = first_name
userinfo["last_name"] = last_name
In the Canvas Student View as of 7-Jan-2019, the lis_person_contact_email_primary
is an empty string. In this case, use the userid instead.
Now we need to get some security info oauth_consumer_key
key = request.vars.get("oauth_consumer_key", None)
if key is not None:
myrecord = db(db.lti_keys.consumer == key).select().first()
if myrecord is None:
return dict(
logged_in=False,
lti_errors=["Could not find oauth_consumer_key", request.vars],
masterapp=masterapp,
)
else:
session.oauth_consumer_key = key
if myrecord is not None:
masterapp = myrecord.application
if len(masterapp) < 1:
masterapp = "welcome"
session.connect(request, response, masterapp=masterapp, db=db)
oauth_server = oauth2.Server()
oauth_server.add_signature_method(oauth2.SignatureMethod_PLAINTEXT())
oauth_server.add_signature_method(oauth2.SignatureMethod_HMAC_SHA1())
Use setting.lti_uri
if it’s defined; otherwise, use the current URI (which must be built from its components). Don’t include query parameters, which causes a failure in OAuth security validation.
Fix encoding – the signed keys are in bytes, but the oauth2 Request constructor translates everything to a string. Therefore, they never compare as equal. ???
if isinstance(oauth_request.get("oauth_signature"), str):
oauth_request["oauth_signature"] = oauth_request["oauth_signature"].encode(
"utf-8"
)
consumer = oauth2.Consumer(myrecord.consumer, myrecord.secret)
try:
oauth_server.verify_request(oauth_request, consumer, None)
except oauth2.Error as err:
return dict(
logged_in=False,
lti_errors=[
"OAuth Security Validation failed:" + err.message,
request.vars,
],
masterapp=masterapp,
)
consumer = None
###############################################################################
I think everything from the beginning to here could/should be refactored into a validate function. Or make use of the lti package
Time to create / update / login the user
Only assign a password if we’re creating the user. The
get_or_create_user
method checks for an existing user using both
the username and the email.
update_fields = ["email", "first_name", "last_name"]
if (
not db(
(db.auth_user.username == userinfo["username"])
| (db.auth_user.email == userinfo["email"])
)
.select(db.auth_user.id)
.first()
):
pw = db.auth_user.password.validate(str(uuid.uuid4()))[0]
userinfo["password"] = pw
update_fields.append("password")
user = auth.get_or_create_user(userinfo, update_fields=update_fields)
if user is None:
return dict(
logged_in=False,
lti_errors=["Unable to create user record", request.vars],
masterapp=masterapp,
)
user exists; make sure course name and id are set based on custom parameters passed, if this is for runestone. As noted for assignment_id
, parameters are passed as a two-element list.
course_id = _param_converter(request.vars.get(“custom_course_id”, None))
if the instructor uses their course name instead of its id number then get the number.
Update instructor status. TODO: this block should be removed. The only way to become an instructor is through Runestone
Give the instructor free access to the book.
Make sure previous instructors are removed from the instructor list.
Before creating a new user_courses record:
In academy mode, present payment or donation options, per the discussion at https://github.com/RunestoneInteractive/RunestoneServer/pull/1322.
To do so, store the current URL, so this request can be completed after creating the user.
TODO: this doesn’t work, since the course_id`
and assignment_id
aren’t saved in this redirect. Therefore, these should be stored (perhaps in session
) then used after a user pays / donates.
Otherwise, simply create the user.
At this point the user has logged in add a jwt cookie for compatibility with bookserver
If the assignment is released, but this is the first time a student has visited the assignment, auto-upload the grade.
If we got here, the assignment wasn’t launched.
else just redirect to the book index
redirect(get_course_url("index.html"))
def _launch_practice(outcome_url, result_source_did, user, course_id):
if outcome_url and result_source_did:
db.practice_grades.update_or_insert(
(db.practice_grades.auth_user == user.id),
auth_user=user.id,
lis_result_sourcedid=result_source_did,
lis_outcome_url=outcome_url,
course_name=getCourseNameFromId(course_id),
)
else: # don't overwrite outcome_url and result_source_did
db.practice_grades.update_or_insert(
(db.practice_grades.auth_user == user.id),
auth_user=user.id,
course_name=getCourseNameFromId(course_id),
)
redirect(
URL(
"assignments",
"settz_then_practice",
vars={"course_name": user["course_name"]},
)
)
def _launch_assignment(assignment_id, user, result_source_did, outcome_url):
assignment = (
db(db.assignments.id == assignment_id).select(db.assignments.released).first()
)
If the assignment isn’t valid, return instead of redirecting. The caller will report the error.
if not assignment:
return
grade = (
db((db.grades.auth_user == user.id) & (db.grades.assignment == assignment_id))
.select(db.grades.lis_result_sourcedid, db.grades.lis_outcome_url)
.first()
)
send_grade = (
assignment
and assignment.released
and grade
and not grade.lis_result_sourcedid
and not grade.lis_outcome_url
)
save the guid and url for reporting back the grade
db.grades.update_or_insert(
(db.grades.auth_user == user.id) & (db.grades.assignment == assignment_id),
auth_user=user.id,
assignment=assignment_id,
lis_result_sourcedid=result_source_did,
lis_outcome_url=outcome_url,
)
if send_grade:
_try_to_send_lti_grade(user.id, assignment_id)
redirect(URL("assignments", "doAssignment", vars={"assignment_id": assignment_id}))
def _provide_assignment_list(course_id, consumer):
Gather all of the assignments for this course package them up per https://www.imsglobal.org/specs/lticiv1p0/specification and return a form.
This form is then auto-submitted by javascript The key element of the form is the content_items structure which should look like this: .. code-block:
{
"@context" : "http://purl.imsglobal.org/ctx/lti/v1/ContentItem",
"@graph" : [
{ "@type" : "LtiLinkItem",
"@id" : ":item2",
"icon" : { OPTIONAL
"@id" : "http://tool.provider.com/icons/small.png",
"width" : 50,
"height" : 50
},
"thumbnail" : { OPTIONAL
"@id" : "http://tool.provider.com/images/thumb.jpg",
"width" : 100,
"height" : 150
},
"title" : "Open sIMSon application",
"text" : "The <em>sIMSon</em> application provides a collaborative space for developing semantic modelling skills.",
"mediaType" : "application/vnd.ims.lti.v1.ltilink",
"custom" : {
"level" : "novice",
"mode" : "interactive"
},
},
]
}
keys are to include custom parameters for course_id and assignment_id
using mediaType as specified will allow the TC to use the usual LTI
launch mechanism
rdict = {}
rdict["oauth_timestamp"] = str(int(time.time()))
rdict["oauth_nonce"] = str(uuid.uuid1().int)
rdict["oauth_consumer_key"] = consumer.key
rdict["oauth_signature_method"] = "HMAC-SHA1"
rdict["lti_message_type"] = "ContentItemSelection"
rdict["lti_version"] = "LTI-1p0"
rdict["oauth_version"] = "1.0"
rdict["oauth_callback"] = "about:blank"
extra_data = request.vars.get("data", None)
if extra_data:
rdict["data"] = extra_data
return_url = request.vars.get("content_item_return_url")
return_url = “http://dev.runestoneinteractive.org/runestone/lti/fakestore”
query_res = db(db.assignments.course == course_id).select(
orderby=~db.assignments.duedate
)
result = {
"@context": "http://purl.imsglobal.org/ctx/lti/v1/ContentItem",
"@graph": [],
}
if query_res:
for assignment in query_res:
item = {
"@type": "LtiLinkItem",
"mediaType": "application/vnd.ims.lti.v1.ltilink",
"@id": assignment.id,
"title": assignment.name,
"text": assignment.description,
"custom": {
"custom_course_id": course_id,
"assignment_id": assignment.id,
},
}
result["@graph"].append(item)
result = json.dumps(result)
rdict["content_items"] = result
response.view = “/srv/web2py/applications/runestone/views/lti/store.html” req = oauth2.Request(“post”, return_url, rdict, is_form_encoded=True)
req = oauth2.Request.from_consumer_and_token(
consumer,
token=None,
http_method="POST",
http_url=return_url,
parameters=rdict,
is_form_encoded=True,
)
req.sign_request(oauth2.SignatureMethod_HMAC_SHA1(), consumer, None)
rdict["return_url"] = return_url
rdict["oauth_signature"] = req["oauth_signature"].decode("utf8")
rdict["content_items"] = html.escape(result)
tplate = """
<!DOCTYPE html>
<html>
<body>
<form name="storeForm" action="{return_url}" method="post" encType="application/x-www-form-urlencoded">
<input type="hidden" name="lti_message_type" value="ContentItemSelection" />
<input type="hidden" name="lti_version" value="LTI-1p0" />
<input type="hidden" name="content_items" value="{content_items}" />
"""
tplate += (
""" <input type="hidden" name="data" value="{data}" /> """
if extra_data
else ""
)
tplate += """
<input type="hidden" name="oauth_version" value="1.0" />
<input type="hidden" name="oauth_nonce" value="{oauth_nonce}" />
<input type="hidden" name="oauth_timestamp" value="{oauth_timestamp}" />
<input type="hidden" name="oauth_consumer_key" value="{oauth_consumer_key}" />
<input type="hidden" name="oauth_callback" value="about:blank" />
<input type="hidden" name="oauth_signature_method" value="HMAC-SHA1" />
<input type="hidden" name="oauth_signature" value="{oauth_signature}" />
</form>
"""
tplate = tplate.format(**rdict)
scpt = """
<script type="text/javascript">
window.onload=function(){
var auto = setTimeout(function(){ submitform(); }, 1000);
function submitform(){
console.log(document.forms["storeForm"]);
document.forms["storeForm"].submit();
}
}
</script>
</body>
</html>
"""
return tplate + scpt
def fakestore():
define this function just to show what is coming through I’m going to keep this around as it may be useful for future debugging.
content = request.vars.get("content_items")
consumer = oauth2.Consumer("bnm.runestone", "supersecret")
return_url = "http://dev.runestoneinteractive.org/runestone/lti/fakestore"
d = dict(request.vars)
req = oauth2.Request.from_consumer_and_token(
consumer,
token=None,
http_method="POST",
http_url=return_url,
parameters=d,
is_form_encoded=True,
)
req.sign_request(oauth2.SignatureMethod_HMAC_SHA1(), consumer, None)
for k, v in req.items():
print(f"{k} : {v}")
print(req.method)
print(req.normalized_url)
print(req.get_normalized_parameters())
return f" sent sig = {d['oauth_signature']} computed sig {req['oauth_signature'].decode('utf8')} {d}"