ajax.py - Runestone API¶
Most of this file is Deprecated
Most of the endpoints in this file are no longer used. See BookServer ** **Do not make any changes to the following functions. They will be removed in an upcoming release. This module implements the API that the Runestone Components use to communicate with a Runestone Server. If you are trying to debug some browser to server API stuff you almost certainly want to check the BookServer not here. def compareAndUpdateCookieData(sid: str): def hsblog(): def runlog(): def gethist(): def getuser(): def set_tz_offset(): def updatelastpage(): def getCompletionStatus(): def getAllCompletionStatus(): def getlastpage(): def _getCorrectStats(miscdata, event): def _getStudentResults(question: str): def getaggregateresults(): def getpollresults(): def gettop10Answers(): def getassignmentgrade(): def _canonicalize_tz(tstring): def getAssessResults(): def tookTimedAssessment(): def get_datafile(): def _same_class(user1: str, user2: str) -> bool: def login_status(): def get_question_source():
TODO: Move these to a new controller file (maybe admin.py) def preview_question(): def save_donate(): def did_donate(): def broadcast_code(): def update_selected_question(): # Imports ======= These are listed in the order prescribed by PEP 8.
Third-party imports¶
Local application imports¶
from feedback import is_server_feedback, fitb_feedback
from rs_practice import _get_qualified_questions
logger = logging.getLogger(settings.logger)
logger.setLevel(settings.log_level)
EVENT_TABLE = {
"mChoice": "mchoice_answers",
"fillb": "fitb_answers",
"dragNdrop": "dragndrop_answers",
"clickableArea": "clickablearea_answers",
"parsons": "parsons_answers",
"codelensq": "codelens_answers",
"shortanswer": "shortanswer_answers",
"fillintheblank": "fitb_answers",
"mchoice": "mchoice_answers",
"dragndrop": "dragndrop_answers",
"clickablearea": "clickablearea_answers",
"parsonsprob": "parsons_answers",
}
COMMENT_MAP = {
"sql": "--",
"python": "#",
"java": "//",
"javascript": "//",
"c": "//",
"cpp": "//",
}
def compareAndUpdateCookieData(sid: str):
if (
"ipuser" in request.cookies
and request.cookies["ipuser"].value != sid
and request.cookies["ipuser"].value.endswith("@" + request.client)
):
db.useinfo.update_or_insert(
db.useinfo.sid == request.cookies["ipuser"].value, sid=sid
)
Endpoints¶
hsblog endpoint¶
Given a JSON record of a clickstream event record the event in the useinfo
table.
If the event is an answer to a runestone question record that answer in the database in
one of the xxx_answers tables.
def hsblog():
setCookie = False
if auth.user:
if request.vars.course != auth.user.course_name:
return json.dumps(
dict(
log=False,
message="You appear to have changed courses in another tab. Please switch to this course",
)
)
sid = auth.user.username
compareAndUpdateCookieData(sid)
setCookie = True # we set our own cookie anyway to eliminate many of the extraneous anonymous
log entries that come from auth timing out even but the user hasn’t reloaded the page. If the incoming data contains an sid then prefer that.
if request.vars.sid:
sid = request.vars.sid
else:
if request.vars.clientLoginStatus == "true":
logger.error("Session Expired")
return json.dumps(dict(log=False, message="Session Expired"))
if "ipuser" in request.cookies:
sid = request.cookies["ipuser"].value
setCookie = True
else:
sid = str(uuid.uuid1().int) + "@" + request.client
setCookie = True
act = request.vars.get("act", "")
div_id = request.vars.div_id
event = request.vars.event
course = request.vars.course
Get the current time, rounded to the nearest second – this is how time time will be stored in the database.
ts = datetime.datetime.utcnow()
ts -= datetime.timedelta(microseconds=ts.microsecond)
tt = request.vars.time
if not tt:
tt = 0
try:
db.useinfo.insert(
sid=sid,
act=act[0:512],
div_id=div_id,
event=event,
timestamp=ts,
course_id=course,
)
except Exception as e:
logger.error(
"failed to insert log record for {} in {} : {} {} {}".format(
sid, course, div_id, event, act
)
)
logger.error("Details: {}".format(e))
if event == "timedExam" and (act == "finish" or act == "reset" or act == "start"):
logger.debug(act)
if act == "reset":
r = "T"
else:
r = None
try:
db.timed_exam.insert(
sid=sid,
course_name=course,
correct=int(request.vars.correct or 0),
incorrect=int(request.vars.incorrect or 0),
skipped=int(request.vars.skipped or 0),
time_taken=int(tt),
timestamp=ts,
div_id=div_id,
reset=r,
)
except Exception as e:
logger.debug(
"failed to insert a timed exam record for {} in {} : {}".format(
sid, course, div_id
)
)
logger.debug(
"correct {} incorrect {} skipped {} time {}".format(
request.vars.correct,
request.vars.incorrect,
request.vars.skipped,
request.vars.time,
)
)
logger.debug("Error: {}".format(e.message))
Produce a default result.
Process this event.
if event == "mChoice" and auth.user:
answer = request.vars.answer or ""
correct = request.vars.correct or False
db.mchoice_answers.insert(
sid=sid,
timestamp=ts,
div_id=div_id,
answer=answer,
correct=correct,
course_name=course,
percent=pct,
)
elif event == "fillb" and auth.user:
answer_json = request.vars.answer
correct = request.vars.correct or False
Grade on the server if needed.
Save this data.
db.fitb_answers.insert(
sid=sid,
timestamp=ts,
div_id=div_id,
answer=answer_json,
correct=correct,
course_name=course,
percent=pct,
)
elif event == "dragNdrop" and auth.user:
answers = request.vars.answer
minHeight = request.vars.minHeight
correct = request.vars.correct or False
db.dragndrop_answers.insert(
sid=sid,
timestamp=ts,
div_id=div_id,
answer=answers,
correct=correct,
course_name=course,
min_height=minHeight,
percent=pct,
)
elif event == "clickableArea" and auth.user:
correct = request.vars.correct or False
db.clickablearea_answers.insert(
sid=sid,
timestamp=ts,
div_id=div_id,
answer=act,
correct=correct,
course_name=course,
percent=pct,
)
elif event == "parsons" and auth.user:
correct = request.vars.correct or False
answer = request.vars.answer or ""
source = request.vars.source
db.parsons_answers.insert(
sid=sid,
timestamp=ts,
div_id=div_id,
answer=answer,
source=source,
correct=correct,
course_name=course,
percent=pct,
)
elif event == "codelensq" and auth.user:
correct = request.vars.correct or False
answer = request.vars.answer or ""
source = request.vars.source
db.codelens_answers.insert(
sid=sid,
timestamp=ts,
div_id=div_id,
answer=answer,
source=source,
correct=correct,
course_name=course,
percent=pct,
)
elif event == "shortanswer" and auth.user:
db.shortanswer_answers.insert(
sid=sid,
answer=act,
div_id=div_id,
timestamp=ts,
course_name=course,
)
elif event == "unittest" and auth.user:
statslist = act.split(":")
if "undefined" not in act:
pct = float(statslist[1])
passed = int(statslist[3])
failed = int(statslist[5])
if math.isnan(pct):
pct = 0
else:
pct = passed = failed = 0
logger.error(f"Got undefined unittest results for {div_id} {sid}")
if pct >= 99.99999:
correct = "T"
else:
correct = "F"
db.unittest_answers.insert(
sid=sid,
timestamp=ts,
div_id=div_id,
correct=correct,
passed=passed,
failed=failed,
course_name=course,
percent=pct,
)
response.headers["content-type"] = "application/json"
if setCookie:
response.cookies["ipuser"] = sid
response.cookies["ipuser"]["expires"] = 24 * 3600 * 90
response.cookies["ipuser"]["path"] = "/"
if auth.user:
response.cookies["last_course"] = auth.user.course_name
response.cookies["last_course"]["expires"] = 24 * 3600 * 90
response.cookies["last_course"]["path"] = "/"
return json.dumps(res)
runlog endpoint¶
The logRunEvent
client-side function calls this endpoint to record TODO…
response.headers[‘content-type’] = ‘application/json’
setCookie = False
if auth.user:
if request.vars.course != auth.user.course_name:
return json.dumps(
dict(
log=False,
message="You appear to have changed courses in another tab. Please switch to this course",
)
)
if request.vars.sid:
sid = request.vars.sid
else:
sid = auth.user.username
setCookie = True
else:
if request.vars.clientLoginStatus == "true":
logger.error("Session Expired")
return json.dumps(dict(log=False, message="Session Expired"))
if "ipuser" in request.cookies:
sid = request.cookies["ipuser"].value
setCookie = True
else:
sid = str(uuid.uuid1().int) + "@" + request.client
setCookie = True
div_id = request.vars.div_id
course = request.vars.course
code = request.vars.code if request.vars.code else ""
ts = datetime.datetime.utcnow()
error_info = request.vars.errinfo
pre = request.vars.prefix if request.vars.prefix else ""
post = request.vars.suffix if request.vars.suffix else ""
if error_info != "success":
event = "ac_error"
act = str(error_info)[:512]
else:
act = "run"
if request.vars.event:
event = request.vars.event
else:
event = "activecode"
num_tries = 3
done = False
while num_tries > 0 and not done:
try:
db.useinfo.insert(
sid=sid,
act=act,
div_id=div_id,
event=event,
timestamp=ts,
course_id=course,
)
done = True
except Exception as e:
logger.error(
"probable Too Long problem trying to insert sid={} act={} div_id={} event={} timestamp={} course_id={} exception={}".format(
sid, act, div_id, event, ts, course, e
)
)
num_tries -= 1
if num_tries == 0:
raise Exception("Runlog Failed to insert into useinfo")
if auth.user:
if "to_save" in request.vars and (
request.vars.to_save == "True" or request.vars.to_save == "true"
):
num_tries = 3
done = False
dbcourse = (
db(db.courses.course_name == course).select(**SELECT_CACHE).first()
)
while num_tries > 0 and not done:
try:
db.code.insert(
sid=sid,
acid=div_id,
code=code,
emessage=error_info,
timestamp=ts,
course_id=dbcourse,
language=request.vars.lang,
)
if request.vars.partner:
if _same_class(sid, request.vars.partner):
comchar = COMMENT_MAP.get(request.vars.lang, "#")
newcode = (
"{} This code was shared by {}\n\n".format(comchar, sid)
+ code
)
db.code.insert(
sid=request.vars.partner,
acid=div_id,
code=newcode,
emessage=error_info,
timestamp=ts,
course_id=dbcourse,
language=request.vars.lang,
)
else:
res = {
"message": "You must be enrolled in the same class as your partner"
}
return json.dumps(res)
done = True
except Exception as e:
num_tries -= 1
logger.error("INSERT into code FAILED retrying -- {}".format(e))
if num_tries == 0:
raise Exception("Runlog Failed to insert into code")
res = {"log": True}
if setCookie:
response.cookies["ipuser"] = sid
response.cookies["ipuser"]["expires"] = 24 * 3600 * 90
response.cookies["ipuser"]["path"] = "/"
return json.dumps(res)
Ajax Handlers for saving and restoring active code blocks
return the history of saved code by this user for a particular acid :Parameters:
acid
: id of the active code block
user
: optional identifier for the owner of the code
- Return
json object containing a list/array of source texts
if vars.sid then we know this is being called from the grading interface
if request.vars.sid:
sid = request.vars.sid
if auth.user and verifyInstructorStatus(
auth.user.course_name, auth.user.id
): # noqa: F405
course_id = auth.user.course_id
else:
course_id = None
elif auth.user:
sid = auth.user.username
course_id = auth.user.course_id
else:
sid = None
course_id = None
res = {}
if sid:
query = (
(codetbl.sid == sid)
& (codetbl.acid == acid)
& (codetbl.course_id == course_id)
& (codetbl.timestamp != None) # noqa: E711
)
res["acid"] = acid
res["sid"] = sid
get the code they saved in chronological order; id order gets that for us
@auth.requires_login() This function is deprecated as of June 2019 We need to keep it in place as long as we continue to serve books from runestone/static/ When that period is over we can eliminate
return the list of courses that auth.user is registered for to keep them from accidentally wandering into courses they are not registered for.
cres = db(
(db.user_courses.user_id == auth.user.id)
& (db.user_courses.course_id == db.courses.id)
).select(db.courses.course_name)
clist = []
for row in cres:
clist.append(row.course_name)
res = {
"email": auth.user.email,
"nick": auth.user.username,
"donated": auth.user.donated,
"isInstructor": verifyInstructorStatus( # noqa: F405
auth.user.course_name, auth.user.id
),
"course_list": clist,
}
session.timezoneoffset = request.vars.timezoneoffset
logger.debug(
"setting timezone offset in session %s hours" % session.timezoneoffset
)
except Exception:
res = dict(redirect=auth.settings.login_url) # ?_next=....
else:
res = dict(redirect=auth.settings.login_url) # ?_next=....
if session.readings:
res["readings"] = session.readings
logger.debug("returning login info: %s" % res)
return json.dumps([res])
def set_tz_offset():
session.timezoneoffset = request.vars.timezoneoffset
logger.debug("setting timezone offset in session %s hours" % session.timezoneoffset)
return "done"
Ajax Handlers to update and retrieve the last position of the user in the course
def updatelastpage():
lastPageUrl = request.vars.lastPageUrl
lastPageScrollLocation = request.vars.lastPageScrollLocation
if lastPageUrl is None:
return # todo: log request.vars, request.args and request.env.path_info
course = request.vars.course
completionFlag = request.vars.completionFlag
lastPageChapter = lastPageUrl.split("/")[-2]
lastPageSubchapter = ".".join(lastPageUrl.split("/")[-1].split(".")[:-1])
if auth.user:
done = False
num_tries = 3
while not done and num_tries > 0:
try:
db(
(db.user_state.user_id == auth.user.id)
& (db.user_state.course_name == course)
).update(
last_page_url=lastPageUrl,
last_page_chapter=lastPageChapter,
last_page_subchapter=lastPageSubchapter,
last_page_scroll_location=lastPageScrollLocation,
last_page_accessed_on=datetime.datetime.utcnow(),
)
done = True
except Exception:
num_tries -= 1
if num_tries == 0:
raise Exception("Failed to save the user state in update_last_page")
done = False
num_tries = 3
while not done and num_tries > 0:
try:
db(
(db.user_sub_chapter_progress.user_id == auth.user.id)
& (db.user_sub_chapter_progress.chapter_id == lastPageChapter)
& (
db.user_sub_chapter_progress.sub_chapter_id
== lastPageSubchapter
)
& (
(db.user_sub_chapter_progress.course_name == course)
| (
db.user_sub_chapter_progress.course_name == None
) # Back fill for old entries without course
)
).update(
status=completionFlag,
end_date=datetime.datetime.utcnow(),
course_name=course,
)
done = True
except Exception:
num_tries -= 1
if num_tries == 0:
raise Exception("Failed to save sub chapter progress in update_last_page")
practice_settings = db(db.course_practice.course_name == auth.user.course_name)
if (
practice_settings.count() != 0
and practice_settings.select().first().flashcard_creation_method == 0
):
Since each authenticated user has only one active course, we retrieve the course this way.
We only retrieve questions to be used in flashcards if they are marked for practice purpose.
questions = _get_qualified_questions(
course.base_course, lastPageChapter, lastPageSubchapter, db
)
if len(questions) > 0:
now = datetime.datetime.utcnow()
now_local = now - datetime.timedelta(
hours=float(session.timezoneoffset)
if "timezoneoffset" in session
else 0
)
existing_flashcards = db(
(db.user_topic_practice.user_id == auth.user.id)
& (db.user_topic_practice.course_name == auth.user.course_name)
& (db.user_topic_practice.chapter_label == lastPageChapter)
& (db.user_topic_practice.sub_chapter_label == lastPageSubchapter)
& (db.user_topic_practice.question_name == questions[0].name)
)
There is at least one qualified question in this subchapter, so insert a flashcard for the subchapter.
Treat it as if the first eligible question is the last one asked.
add as if yesterday, so can practice right away
last_presented=now - datetime.timedelta(1),
last_completed=now - datetime.timedelta(1),
creation_time=now,
timezoneoffset=float(session.timezoneoffset)
if "timezoneoffset" in session
else 0,
)
if completionFlag == "0" and not existing_flashcards.isempty():
existing_flashcards.delete()
def getCompletionStatus():
if auth.user:
lastPageUrl = request.vars.lastPageUrl
lastPageChapter = lastPageUrl.split("/")[-2]
lastPageSubchapter = ".".join(lastPageUrl.split("/")[-1].split(".")[:-1])
result = db(
(db.user_sub_chapter_progress.user_id == auth.user.id)
& (db.user_sub_chapter_progress.chapter_id == lastPageChapter)
& (db.user_sub_chapter_progress.sub_chapter_id == lastPageSubchapter)
& (
(db.user_sub_chapter_progress.course_name == auth.user.course_name)
| (
db.user_sub_chapter_progress.course_name == None
) # for backward compatibility
)
).select(db.user_sub_chapter_progress.status)
rowarray_list = []
if result:
for row in result:
res = {"completionStatus": row.status}
rowarray_list.append(res)
question: since the javascript in user-highlights.js is going to look only at the first row, shouldn’t we be returning just the last status? Or is there no history of status kept anyway?
haven’t seen this Chapter/Subchapter before make the insertions into the DB as necessary
we know the subchapter doesn’t exist
the chapter might exist without the subchapter
result = db(
(db.user_chapter_progress.user_id == auth.user.id)
& (db.user_chapter_progress.chapter_id == lastPageChapter)
).select()
if not result:
db.user_chapter_progress.insert(
user_id=auth.user.id, chapter_id=lastPageChapter, status=-1
)
return json.dumps([{"completionStatus": -1}])
def getAllCompletionStatus():
if auth.user:
result = db(
(db.user_sub_chapter_progress.user_id == auth.user.id)
& (db.user_sub_chapter_progress.course_name == auth.user.course_name)
).select(
db.user_sub_chapter_progress.chapter_id,
db.user_sub_chapter_progress.sub_chapter_id,
db.user_sub_chapter_progress.status,
db.user_sub_chapter_progress.status,
db.user_sub_chapter_progress.end_date,
)
rowarray_list = []
if result:
for row in result:
if row.end_date is None:
endDate = 0
else:
endDate = row.end_date.strftime("%d %b, %Y")
res = {
"chapterName": row.chapter_id,
"subChapterName": row.sub_chapter_id,
"completionStatus": row.status,
"endDate": endDate,
}
rowarray_list.append(res)
return json.dumps(rowarray_list)
@auth.requires_login()
def getlastpage():
course = request.vars.course
course = db(db.courses.course_name == course).select(**SELECT_CACHE).first()
result = db(
(db.user_state.user_id == auth.user.id)
& (db.user_state.course_name == course.course_name)
& (db.chapters.course_id == course.base_course)
& (db.user_state.last_page_chapter == db.chapters.chapter_label)
& (db.sub_chapters.chapter_id == db.chapters.id)
& (db.user_state.last_page_subchapter == db.sub_chapters.sub_chapter_label)
).select(
db.user_state.last_page_url,
db.user_state.last_page_hash,
db.chapters.chapter_name,
db.user_state.last_page_scroll_location,
db.sub_chapters.sub_chapter_name,
)
rowarray_list = []
if result:
for row in result:
res = {
"lastPageUrl": row.user_state.last_page_url,
"lastPageHash": row.user_state.last_page_hash,
"lastPageChapter": row.chapters.chapter_name,
"lastPageSubchapter": row.sub_chapters.sub_chapter_name,
"lastPageScrollLocation": row.user_state.last_page_scroll_location,
}
rowarray_list.append(res)
return json.dumps(rowarray_list)
else:
db.user_state.insert(user_id=auth.user.id, course_name=course.course_name)
def _getCorrectStats(miscdata, event):
TODO: update this to use the xxx_answer table select and count grouping by the correct column this version can suffer from division by zero error
sid = None
dbtable = EVENT_TABLE[event] # translate event to correct table
if auth.user:
sid = auth.user.username
else:
if "ipuser" in request.cookies:
sid = request.cookies["ipuser"].value
if sid:
course = (
db(db.courses.course_name == miscdata["course"])
.select(**SELECT_CACHE)
.first()
)
tbl = db[dbtable]
count_expr = tbl.correct.count()
rows = db((tbl.sid == sid) & (tbl.timestamp > course.term_start_date)).select(
tbl.correct, count_expr, groupby=tbl.correct
)
total = 0
correct = 0
for row in rows:
count = row[count_expr]
total += count
if row[dbtable].correct:
correct = count
if total > 0:
pctcorr = round(float(correct) / total * 100)
else:
pctcorr = "unavailable"
else:
pctcorr = "unavailable"
miscdata["yourpct"] = pctcorr
def _getStudentResults(question: str):
Internal function to collect student answers
cc = db(db.courses.id == auth.user.course_id).select().first()
qst = (
db(
(db.questions.name == question)
& (db.questions.base_course == cc.base_course)
)
.select()
.first()
)
tbl_name = EVENT_TABLE[qst.question_type]
tbl = db[tbl_name]
res = db(
(tbl.div_id == question)
& (tbl.course_name == cc.course_name)
& (tbl.timestamp >= cc.term_start_date)
).select(tbl.sid, tbl.answer, orderby=tbl.sid)
resultList = []
if len(res) > 0:
currentSid = res[0].sid
currentAnswers = []
for row in res:
if row.answer:
answer = clean(row.answer)
else:
answer = None
if row.sid == currentSid:
if answer is not None:
currentAnswers.append(answer)
else:
currentAnswers.sort()
resultList.append((currentSid, currentAnswers))
currentAnswers = [answer] if answer is not None else []
currentSid = row.sid
currentAnswers.sort()
resultList.append((currentSid, currentAnswers))
return resultList
def getaggregateresults():
course = request.vars.course
question = request.vars.div_id
select act, count(*) from useinfo where div_id = ‘question4_2_1’ group by act;
Yes, these two things could be done as a join. but this may be better for performance
if course in (
"thinkcspy",
"pythonds",
"fopp",
"csawesome",
"apcsareview",
"StudentCSP",
):
start_date = datetime.datetime.utcnow() - datetime.timedelta(days=90)
else:
start_date = (
db(db.courses.course_name == course)
.select(db.courses.term_start_date)
.first()
.term_start_date
)
count = db.useinfo.id.count()
try:
result = db(
(db.useinfo.div_id == question)
& (db.useinfo.course_id == course)
& (db.useinfo.timestamp >= start_date)
).select(db.useinfo.act, count, groupby=db.useinfo.act)
except Exception:
return json.dumps(
[dict(answerDict={}, misc={}, emess="Sorry, the request timed out")]
)
tdata = {}
tot = 0
for row in result:
tdata[clean(row.useinfo.act)] = row[count]
tot += row[count]
tot = float(tot)
rdata = {}
miscdata = {}
correct = ""
if tot > 0:
for key in tdata:
all_a = key.split(":")
try:
answer = all_a[1]
if "correct" in key:
correct = answer
count = int(tdata[key])
if answer in rdata:
count += rdata[answer] / 100.0 * tot
pct = round(count / tot * 100.0)
if answer != "undefined" and answer != "":
rdata[answer] = pct
except Exception as e:
logger.error("Bad data for %s data is %s -- %s" % (question, key, e))
miscdata["correct"] = correct
miscdata["course"] = course
_getCorrectStats(miscdata, "mChoice")
returnDict = dict(answerDict=rdata, misc=miscdata)
if auth.user and is_instructor:
resultList = _getStudentResults(question)
returnDict["reslist"] = resultList
return json.dumps([returnDict])
def getpollresults():
course = request.vars.course
div_id = request.vars.div_id
response.headers["content-type"] = "application/json"
query = """select act from useinfo
join (select sid, max(id) mid
from useinfo where event='poll' and div_id = %s and course_id = %s group by sid) as T
on id = T.mid"""
rows = db.executesql(query, (div_id, course))
result_list = []
for row in rows:
val = row[0].split(":")[0]
result_list.append(int(val))
maps option : count
opt_list holds the option numbers from smallest to largest count_list[i] holds the count of responses that chose option i
opt_list = sorted(opt_counts.keys())
count_list = []
for i in opt_list:
count_list.append(opt_counts[i])
user_res = None
if auth.user:
user_res = (
db(
(db.useinfo.sid == auth.user.username)
& (db.useinfo.course_id == course)
& (db.useinfo.div_id == div_id)
)
.select(db.useinfo.act, orderby=~db.useinfo.id)
.first()
)
if user_res:
my_vote = user_res.act
else:
my_vote = -1
return json.dumps([len(result_list), opt_list, count_list, div_id, my_vote])
def gettop10Answers():
course = request.vars.course
question = request.vars.div_id
response.headers["content-type"] = "application/json"
rows = []
try:
dbcourse = db(db.courses.course_name == course).select(**SELECT_CACHE).first()
count_expr = db.fitb_answers.answer.count()
rows = db(
(db.fitb_answers.div_id == question)
& (db.fitb_answers.course_name == course)
& (db.fitb_answers.timestamp > dbcourse.term_start_date)
).select(
db.fitb_answers.answer,
count_expr,
groupby=db.fitb_answers.answer,
orderby=~count_expr,
limitby=(0, 10),
)
res = [
{"answer": clean(row.fitb_answers.answer), "count": row[count_expr]}
for row in rows
]
except Exception as e:
logger.debug(e)
res = "error in query"
miscdata = {"course": course}
_getCorrectStats(
miscdata, "fillb"
) # TODO: rewrite _getCorrectStats to use xxx_answers
if auth.user and verifyInstructorStatus(course, auth.user.id): # noqa: F405
resultList = _getStudentResults(question)
miscdata["reslist"] = resultList
return json.dumps([res, miscdata])
def getassignmentgrade():
response.headers["content-type"] = "application/json"
if not auth.user:
return json.dumps([dict(message="not logged in")])
divid = request.vars.div_id
ret = {
"grade": "Not graded yet",
"comment": "No Comments",
"avg": "None",
"count": "None",
"released": False,
}
check that the assignment is released
a_q = (
db(
(db.assignments.course == auth.user.course_id)
& (db.assignment_questions.assignment_id == db.assignments.id)
& (db.assignment_questions.question_id == db.questions.id)
& (db.questions.name == divid)
)
.select(
db.assignments.released, db.assignments.id, db.assignment_questions.points
)
.first()
)
if there is no assignment_question try new way that we store scores and comments divid is a question; find question_grades row
say that we’re sending back result styles in new version, so they can be processed differently without affecting old way during transition.
ret["version"] = 2
ret["released"] = a_q.assignments.released if a_q else False
if a_q and not a_q.assignments.released:
ret["grade"] = "Not graded yet"
elif a_q and a_q.assignments.released:
ret["grade"] = result.score or "Written Feedback Only"
if a_q and a_q.assignments.released == True:
ret["max"] = a_q.assignment_questions.points
else:
ret["max"] = ""
if result.comment:
ret["comment"] = result.comment
return json.dumps([ret])
def _canonicalize_tz(tstring):
x = re.search(r"\((.*)\)", tstring)
x = x.group(1)
y = x.split()
if len(y) == 1:
return tstring
else:
zstring = "".join([i[0] for i in y])
return re.sub(r"(.*)\((.*)\)", r"\1({})".format(zstring), tstring)
getAssessResults¶
can’t query for user’s answers if we don’t know who the user is, so just load from local storage
TODO This whole thing is messy - get the deadline from the assignment in the db
if request.vars.deadline:
try:
deadline = parse(_canonicalize_tz(request.vars.deadline))
tzoff = session.timezoneoffset if session.timezoneoffset else 0
deadline = deadline + datetime.timedelta(hours=float(tzoff))
deadline = deadline.replace(tzinfo=None)
except Exception:
logger.error("Bad Timezone - {}".format(request.vars.deadline))
deadline = datetime.datetime.utcnow()
else:
deadline = datetime.datetime.utcnow()
response.headers["content-type"] = "application/json"
Identify the correct event and query the database so we can load it from the server
if event == "fillb":
rows = (
db(
(db.fitb_answers.div_id == div_id)
& (db.fitb_answers.course_name == course)
& (db.fitb_answers.sid == sid)
)
.select(
db.fitb_answers.answer,
db.fitb_answers.timestamp,
orderby=~db.fitb_answers.id,
)
.first()
)
if not rows:
return "" # server doesn't have it so we load from local storage instead
res = {"answer": rows.answer, "timestamp": str(rows.timestamp)}
do_server_feedback, feedback = is_server_feedback(div_id, course)
if do_server_feedback and rows.answer != None:
correct, res_update = fitb_feedback(rows.answer, feedback)
res.update(res_update)
return json.dumps(res)
elif event == "mChoice":
rows = (
db(
(db.mchoice_answers.div_id == div_id)
& (db.mchoice_answers.course_name == course)
& (db.mchoice_answers.sid == sid)
)
.select(
db.mchoice_answers.answer,
db.mchoice_answers.timestamp,
db.mchoice_answers.correct,
orderby=~db.mchoice_answers.id,
)
.first()
)
if not rows:
return ""
res = {
"answer": rows.answer,
"timestamp": str(rows.timestamp),
"correct": rows.correct,
}
return json.dumps(res)
elif event == "dragNdrop":
rows = (
db(
(db.dragndrop_answers.div_id == div_id)
& (db.dragndrop_answers.course_name == course)
& (db.dragndrop_answers.sid == sid)
)
.select(
db.dragndrop_answers.answer,
db.dragndrop_answers.timestamp,
db.dragndrop_answers.correct,
db.dragndrop_answers.min_height,
orderby=~db.dragndrop_answers.id,
)
.first()
)
if not rows:
return ""
res = {
"answer": rows.answer,
"timestamp": str(rows.timestamp),
"correct": rows.correct,
"minHeight": str(rows.min_height),
}
return json.dumps(res)
elif event == "clickableArea":
rows = (
db(
(db.clickablearea_answers.div_id == div_id)
& (db.clickablearea_answers.course_name == course)
& (db.clickablearea_answers.sid == sid)
)
.select(
db.clickablearea_answers.answer,
db.clickablearea_answers.timestamp,
db.clickablearea_answers.correct,
orderby=~db.clickablearea_answers.id,
)
.first()
)
if not rows:
return ""
res = {
"answer": rows.answer,
"timestamp": str(rows.timestamp),
"correct": rows.correct,
}
return json.dumps(res)
elif event == "timedExam":
rows = (
db(
(db.timed_exam.reset == None) # noqa: E711
& (db.timed_exam.div_id == div_id)
& (db.timed_exam.course_name == course)
& (db.timed_exam.sid == sid)
)
.select(
db.timed_exam.correct,
db.timed_exam.incorrect,
db.timed_exam.skipped,
db.timed_exam.time_taken,
db.timed_exam.timestamp,
db.timed_exam.reset,
orderby=~db.timed_exam.id,
)
.first()
)
if not rows:
return ""
res = {
"correct": rows.correct,
"incorrect": rows.incorrect,
"skipped": str(rows.skipped),
"timeTaken": str(rows.time_taken),
"timestamp": str(rows.timestamp),
"reset": str(rows.reset),
}
return json.dumps(res)
elif event == "parsons":
rows = (
db(
(db.parsons_answers.div_id == div_id)
& (db.parsons_answers.course_name == course)
& (db.parsons_answers.sid == sid)
)
.select(
db.parsons_answers.answer,
db.parsons_answers.source,
db.parsons_answers.timestamp,
orderby=~db.parsons_answers.id,
)
.first()
)
if not rows:
return ""
res = {
"answer": rows.answer,
"source": rows.source,
"timestamp": str(rows.timestamp),
}
return json.dumps(res)
elif event == "shortanswer":
logger.debug(f"Getting shortanswer: deadline is {deadline} ")
rows = db(
(db.shortanswer_answers.sid == sid)
& (db.shortanswer_answers.div_id == div_id)
& (db.shortanswer_answers.course_name == course)
).select(orderby=~db.shortanswer_answers.id)
if not rows:
return ""
last_answer = None
if not request.vars.deadline:
row = rows[0]
else:
last_answer = rows[0]
for row in rows:
if row.timestamp <= deadline:
break
if row.timestamp > deadline:
row = None
if row and row == last_answer:
res = {"answer": row.answer, "timestamp": row.timestamp.isoformat()}
else:
if row and row.timestamp <= deadline:
res = {"answer": row.answer, "timestamp": row.timestamp.isoformat()}
else:
res = {
"answer": "",
"timestamp": None,
"last_answer": last_answer.answer,
"last_timestamp": last_answer.timestamp.isoformat(),
}
srow = (
db(
(db.question_grades.sid == sid)
& (db.question_grades.div_id == div_id)
& (db.question_grades.course_name == course)
)
.select()
.first()
)
if srow:
res["score"] = srow.score
res["comment"] = srow.comment
return json.dumps(res)
def tookTimedAssessment():
if auth.user:
sid = auth.user.username
else:
return json.dumps({"tookAssessment": False})
exam_id = request.vars.div_id
course = request.vars.course_name
rows = (
db(
(db.timed_exam.div_id == exam_id)
& (db.timed_exam.sid == sid)
& (db.timed_exam.course_name == course)
)
.select(orderby=~db.timed_exam.id)
.first()
)
logger.debug(f"checking {exam_id} {sid} {course} {rows}")
if rows:
return json.dumps({"tookAssessment": True})
else:
return json.dumps({"tookAssessment": False})
The request variable code
must contain JSON-encoded RST to be rendered by Runestone. Only the HTML containing the actual Runestone component will be returned.
def preview_question():
begin = """
.. raw:: html
<begin_directive>
"""
end = """
.. raw:: html
<end_directive>
"""
try:
code = begin + dedent(json.loads(request.vars.code)) + end
with open(
"applications/{}/build/preview/_sources/index.rst".format(
request.application
),
"w",
encoding="utf-8",
) as ixf:
ixf.write(code)
Note that os.environ
isn’t a dict, it’s an object whose setter modifies environment variables. So, modifications of a copy/deepcopy still modify the original environment. Therefore, convert it to a dict, where modifications will not affect the environment.
Prevent any changes to the database when building a preview question.
Run a runestone build. We would like to use sys.executable But when we run web2py in uwsgi then sys.executable is uwsgi which doesn’t work. Why not just run runestone?
if "python" not in settings.python_interpreter:
logger.error(f"Error {settings.python_interpreter} is not a valid python")
return json.dumps(
f"Error: settings.python_interpreter must be set to a valid interpreter not {settings.python_interpreter}"
)
popen_obj = subprocess.Popen(
[settings.python_interpreter, "-m", "runestone", "build"],
The build must be run from the directory containing a conf.py
and all the needed support files.
Capture the build output as text in case of an error.
Pass the modified environment which doesn’t contain DBURL
.
If there was an error, return stdout and stderr from the build.
if popen_obj.returncode != 0:
return json.dumps(
"Error: Runestone build failed:\n\n" + stdout + "\n" + stderr
)
with open(
"applications/{}/build/preview/build/preview/index.html".format(
request.application
),
"r",
encoding="utf-8",
) as ixf:
src = ixf.read()
tree = html.fromstring(src)
if len(tree.cssselect(".runestone")) == 0:
src = ""
result = re.search(
"<begin_directive>(.*)<end_directive>", src, flags=re.DOTALL
)
if result:
ctext = result.group(1)
else:
component = tree.cssselect(".system-message")
if len(component) > 0:
ctext = html.tostring(component[0]).decode("utf-8")
logger.debug("error - ", ctext)
else:
ctext = "Error: Runestone content missing."
return json.dumps(ctext)
except Exception as ex:
return json.dumps("Error: {}".format(ex))
def save_donate():
if auth.user:
db(db.auth_user.id == auth.user.id).update(donated=True)
def did_donate():
if auth.user:
d_status = (
db(db.auth_user.id == auth.user.id).select(db.auth_user.donated).first()
)
return json.dumps(dict(donate=d_status.donated))
return json.dumps(dict(donate=False))
def get_datafile():
course_id - string, the name of the course acid - the acid of this datafile
course = request.vars.course_id # the course name
the_course = db(db.courses.course_name == course).select(**SELECT_CACHE).first()
acid = request.vars.acid
file_contents = (
db(
(db.source_code.acid == acid)
& (
(db.source_code.course_id == the_course.base_course)
| (db.source_code.course_id == course)
)
)
.select(db.source_code.main_code)
.first()
)
if file_contents:
file_contents = file_contents.main_code
else:
file_contents = None
return json.dumps(dict(data=file_contents))
@auth.requires(
lambda: verifyInstructorStatus(auth.user.course_name, auth.user),
requires_login=True,
)
def broadcast_code():
Callable by an instructor to send the code in their scratch activecode to all students in the class.
the_course = (
db(db.courses.course_name == auth.user.course_name)
.select(**SELECT_CACHE)
.first()
)
cid = the_course.id
student_list = db(
(db.user_courses.course_id == cid)
& (db.auth_user.id == db.user_courses.user_id)
).select()
shared_code = (
"{} Instructor shared code on {}\n".format(
COMMENT_MAP.get(request.vars.lang, "#"), datetime.datetime.utcnow().date()
)
+ request.vars.code
)
counter = 0
for student in student_list:
if student.auth_user.id == auth.user.id:
continue
sid = student.auth_user.username
try:
db.code.insert(
sid=sid,
acid=request.vars.divid,
code=shared_code,
emessage="",
timestamp=datetime.datetime.utcnow(),
course_id=cid,
language=request.vars.lang,
comment="Instructor shared code",
)
except Exception as e:
logger.error("Failed to insert instructor code! details: {}".format(e))
return json.dumps(dict(mess="failed"))
counter += 1
return json.dumps(dict(mess="success", share_count=counter))
def _same_class(user1: str, user2: str) -> bool:
user1_course = (
db(db.auth_user.username == user1).select(db.auth_user.course_id).first()
)
user2_course = (
db(db.auth_user.username == user2).select(db.auth_user.course_id).first()
)
return user1_course == user2_course
def login_status():
if auth.user:
return json.dumps(dict(status="loggedin", course_name=auth.user.course_name))
else:
return json.dumps(dict(status="loggedout", course_name=auth.user.course_name))
auto_gradable_q = [
"clickablearea",
"mchoice",
"parsonsprob",
"dragndrop",
"fillintheblank",
"quizly",
"khanex",
]
def get_question_source():
Called from the selectquestion directive There are 4 cases:
If there is only 1 question in the question list then return the html source for it.
If there are multiple questions then choose a question at random
If a proficiency is selected then select a random question that tests that proficiency
If the question is an AB question then see if this student is an A or a B or assign them to one randomly.
In the last two cases, first check to see if there is a question for this student for this component that was previously selected.
- Returns:
json: html source for this question
prof = False
points = request.vars.points or 0 # no nulls allowed
logger.debug(f"POINTS = {points}")
min_difficulty = request.vars.min_difficulty
max_difficulty = request.vars.max_difficulty
not_seen_ever = request.vars.not_seen_ever
autogradable = request.vars.autogradable
is_primary = request.vars.primary
is_ab = request.vars.AB
selector_id = request.vars["selector_id"]
assignment_name = request.vars["timedWrapper"]
toggle = request.vars["toggleOptions"]
questionlist = []
If the question has a :points: option then those points are the default however sometimes questions are entered in the web ui without the :points: and points are assigned in the UI instead. If this is part of an assignment or timed exam AND the points are set in the web UI we will use the points from the UI over the :points: If this is an assignment or exam that is totally written in RST then the points in the UI will match the points from the assignment anyway.
if assignment_name:
ui_points = (
db(
(db.assignments.name == assignment_name)
& (db.assignments.id == db.assignment_questions.assignment_id)
& (db.assignment_questions.question_id == db.questions.id)
& (db.questions.name == selector_id)
)
.select(db.assignment_questions.points)
.first()
)
logger.debug(
f"Assignment Points for {assignment_name}, {selector_id} = {ui_points}"
)
if ui_points:
points = ui_points.points
if request.vars["questions"]:
questionlist = request.vars["questions"].split(",")
questionlist = [q.strip() for q in questionlist]
elif request.vars["proficiency"]:
prof = request.vars["proficiency"]
query = (db.competency.competency == prof) & (
db.competency.question == db.questions.id
)
if is_primary:
query = query & (db.competency.is_primary == True)
if min_difficulty:
query = query & (db.questions.difficulty >= float(min_difficulty))
if max_difficulty:
query = query & (db.questions.difficulty <= float(max_difficulty))
if autogradable:
query = query & (
(db.questions.autograde == "unittest")
| db.questions.question_type.contains(auto_gradable_q, all=False)
)
res = db(query).select(db.questions.name)
logger.debug(f"Query was {db._lastsql}")
if res:
questionlist = [row.name for row in res]
else:
questionlist = []
logger.error(f"No questions found for proficiency {prof}")
return json.dumps(f"<p>No Questions found for proficiency: {prof}</p>")
if not auth.user:
user is not logged in so just give them a random question from questions list and be done with it.
if questionlist:
q = random.choice(questionlist)
res = db(db.questions.name == q).select(db.questions.htmlsrc).first()
if res:
return json.dumps(res.htmlsrc)
else:
return json.dumps(f"<p>Question {q} is not in the database.</p>")
else:
return json.dumps(f"<p>No Questions available</p>")
logger.debug(f"is_ab is {is_ab}")
if is_ab:
res = db(
(db.user_experiment.sid == auth.user.username)
& (db.user_experiment.experiment_id == is_ab)
).select(orderby=db.user_experiment.id)
if not res:
exp_group = random.randrange(2)
db.user_experiment.insert(
sid=auth.user.username, experiment_id=is_ab, exp_group=exp_group
)
logger.debug(f"added {auth.user.username} to {is_ab} group {exp_group}")
else:
exp_group = res[0].exp_group
logger.debug(f"experimental group is {exp_group}")
prev_selection = (
db(
(db.selected_questions.sid == auth.user.username)
& (db.selected_questions.selector_id == selector_id)
)
.select()
.first()
)
if prev_selection:
questionid = prev_selection.selected_id
else:
questionid = questionlist[exp_group]
if not is_ab:
poss = set()
if not_seen_ever:
seenq = db(
(db.useinfo.sid == auth.user.username)
& (db.useinfo.div_id.contains(questionlist, all=False))
).select(db.useinfo.div_id)
seen = set([x.div_id for x in seenq])
poss = set(questionlist)
questionlist = list(poss - seen)
if len(questionlist) == 0 and len(poss) > 0:
questionlist = list(poss)
htmlsrc = ""
prev_selection = (
db(
(db.selected_questions.sid == auth.user.username)
& (db.selected_questions.selector_id == selector_id)
)
.select()
.first()
)
if prev_selection:
questionid = prev_selection.selected_id
else:
Eliminate any previous exam questions for this student
prev_questions = db(db.selected_questions.sid == auth.user.username).select(
db.selected_questions.selected_id
)
prev_questions = set([row.selected_id for row in prev_questions])
possible = set(questionlist)
questionlist = list(possible - prev_questions)
if questionlist:
questionid = random.choice(questionlist)
else:
If there are no questions left we should still return a random question.
questionid = random.choice(list(possible))
logger.debug(f"toggle is {toggle}")
if toggle:
prev_selection = (
db(
(db.selected_questions.sid == auth.user.username)
& (db.selected_questions.selector_id == selector_id)
)
.select()
.first()
)
if prev_selection:
questionid = prev_selection.selected_id
else:
questionid = request.vars["questions"].split(",")[0]
- else:
- logger.error(
f”Question ID ‘{questionid}’ not found in select question list of ‘{selector_id}’.”
) return json.dumps(
f”<p>Question ID ‘{questionid}’ not found in select question list of ‘{selector_id}’.</p>”
)
res = db((db.questions.name == questionid)).select(db.questions.htmlsrc).first()
if res and not prev_selection:
qid = db.selected_questions.insert(
selector_id=selector_id,
sid=auth.user.username,
selected_id=questionid,
points=points,
)
if not qid:
logger.error(
f"Failed to insert a selected question for {selector_id} and {auth.user.username}"
)
else:
logger.debug(
f"Did not insert a record for {selector_id}, {questionid} Conditions are {res} QL: {questionlist} PREV: {prev_selection}"
)
if res and res.htmlsrc:
htmlsrc = res.htmlsrc
else:
logger.error(
f"HTML Source not found for {questionid} in course {auth.user.course_name} for {auth.user.username}"
)
htmlsrc = "<p>No preview available</p>"
return json.dumps(htmlsrc)
@auth.requires_login()
def update_selected_question():
This endpoint is used by the selectquestion problems that allow the student to select the problem they work on. For example they may have a programming problem that can be solved with writing code, or they can switch to a parsons problem if necessary.
Caller must provide:
* metaid
– the id of the selectquestion
* selected
– the id of the real question chosen by the student
sid = auth.user.username
selector_id = request.vars.metaid
selected_id = request.vars.selected
logger.debug(f"USQ - {selector_id} --> {selected_id} for {sid}")
db.selected_questions.update_or_insert(
(db.selected_questions.selector_id == selector_id)
& (db.selected_questions.sid == sid),
selected_id=selected_id,
selector_id=selector_id,
sid=sid,
)