-- coding: utf-8 -- flake8: noqa

 

System Message: SEVERE/4 (/home/docs/checkouts/readthedocs.org/user_builds/runestoneserver/checkouts/latest/controllers/appadmin.py, line 4)

Title overline & underline mismatch.

##########################################################
## make sure administrator is on localhost
###########################################################
 
import os
import socket
import datetime
import copy
import gluon.contenttype
import gluon.fileutils
from gluon._compat import iteritems

is_gae = request.env.web2py_runtime_gae or False
 

## critical — make a copy of the environment

 
global_env = copy.copy(globals())
global_env["datetime"] = datetime

http_host = request.env.http_host.split(":")[0]
remote_addr = request.env.remote_addr
try:
    hosts = (
        http_host,
        socket.gethostname(),
        socket.gethostbyname(http_host),
        "::1",
        "127.0.0.1",
        "::ffff:127.0.0.1",
    )
except:
    hosts = (http_host,)

if request.is_https:
    session.secure()
elif (
    (remote_addr not in hosts)
    and (remote_addr != "127.0.0.1")
    and (request.function != "manage")
):
    raise HTTP(200, T("appadmin is disabled because insecure channel"))

if request.function == "manage":
    if not "auth" in globals() or not request.args:
        redirect(URL(request.controller, "index"))
    manager_action = auth.settings.manager_actions.get(request.args(0), None)
    if manager_action is None and request.args(0) == "auth":
        manager_action = dict(
            role=auth.settings.auth_manager_role,
            heading=T("Manage Access Control"),
            tables=[auth.table_user(), auth.table_group(), auth.table_permission()],
        )
    manager_role = manager_action.get("role", None) if manager_action else None
    if not (
        gluon.fileutils.check_credentials(request) or auth.has_membership(manager_role)
    ):
        raise HTTP(403, "Not authorized")
    menu = False
elif (request.application == "admin" and not session.authorized) or (
    request.application != "admin" and not gluon.fileutils.check_credentials(request)
):
    redirect(
        URL(
            "admin",
            "default",
            "index",
            vars=dict(send=URL(args=request.args, vars=request.vars)),
        )
    )
else:
    response.subtitle = T("Database Administration (appadmin)")
    menu = True

ignore_rw = True
response.view = "appadmin.html"
if menu:
    response.menu = [
        [
            T("design"),
            False,
            URL("admin", "default", "design", args=[request.application]),
        ],
        [T("db"), False, URL("index")],
        [T("state"), False, URL("state")],
        [T("cache"), False, URL("ccache")],
    ]
 

System Message: SEVERE/4 (/home/docs/checkouts/readthedocs.org/user_builds/runestoneserver/checkouts/latest/controllers/appadmin.py, line 91)

Title overline & underline mismatch.

##########################################################
## auxiliary functions
###########################################################
 
if False and request.tickets_db:
    from gluon.restricted import TicketStorage

    ts = TicketStorage()
    ts._get_table(request.tickets_db, ts.tablename, request.application)


def get_databases(request):
    dbs = {}
    for (key, value) in global_env.items():
        try:
            cond = isinstance(value, GQLDB)
        except:
            cond = isinstance(value, SQLDB)
        if cond:
            dbs[key] = value
    return dbs


databases = get_databases(None)


def eval_in_global_env(text):
    exec("_ret=%s" % text, {}, global_env)
    return global_env["_ret"]


def get_database(request):
    if request.args and request.args[0] in databases:
        return eval_in_global_env(request.args[0])
    else:
        session.flash = T("invalid request")
        redirect(URL("index"))


def get_table(request):
    db = get_database(request)
    if len(request.args) > 1 and request.args[1] in db.tables:
        return (db, request.args[1])
    else:
        session.flash = T("invalid request")
        redirect(URL("index"))


def get_query(request):
    try:
        return eval_in_global_env(request.vars.query)
    except Exception:
        return None


def query_by_table_type(tablename, db, request=request):
    keyed = hasattr(db[tablename], "_primarykey")
    if keyed:
        firstkey = db[tablename][db[tablename]._primarykey[0]]
        cond = ">0"
        if firstkey.type in ["string", "text"]:
            cond = '!=""'
        qry = "%s.%s.%s%s" % (request.args[0], request.args[1], firstkey.name, cond)
    else:
        qry = "%s.%s.id>0" % tuple(request.args[:2])
    return qry
 
 

System Message: SEVERE/4 (/home/docs/checkouts/readthedocs.org/user_builds/runestoneserver/checkouts/latest/controllers/appadmin.py, line 159)

Title overline & underline mismatch.

##########################################################
## list all databases and tables
###########################################################
def index():
    return dict(databases=databases)
 
 

System Message: SEVERE/4 (/home/docs/checkouts/readthedocs.org/user_builds/runestoneserver/checkouts/latest/controllers/appadmin.py, line 166)

Title overline & underline mismatch.

##########################################################
## insert a new record
###########################################################
 
 
def insert():
    (db, table) = get_table(request)
    form = SQLFORM(db[table], ignore_rw=ignore_rw)
    if form.accepts(request.vars, session):
        response.flash = T("new record inserted")
    return dict(form=form, table=db[table])
 
 

System Message: SEVERE/4 (/home/docs/checkouts/readthedocs.org/user_builds/runestoneserver/checkouts/latest/controllers/appadmin.py, line 179)

Title overline & underline mismatch.

##########################################################
## list all records in table and insert new record
###########################################################
 
 
def download():
    import os

    db = get_database(request)
    return response.download(request, db)


def csv():
    import gluon.contenttype

    response.headers["Content-Type"] = gluon.contenttype.contenttype(".csv")
    db = get_database(request)
    query = get_query(request)
    if not query:
        return None
    response.headers["Content-disposition"] = "attachment; filename=%s_%s.csv" % tuple(
        request.vars.query.split(".")[:2]
    )
    return str(db(query, ignore_common_filters=True).select())


def import_csv(table, file):
    table.import_from_csv_file(file)


def select():
    import re

    db = get_database(request)
    dbname = request.args[0]
    try:
        is_imap = db._uri.startswith("imap://")
    except (KeyError, AttributeError, TypeError):
        is_imap = False
    regex = re.compile(r"(?P<table>\w+)\.(?P<field>\w+)=(?P<value>\d+)")
    if len(request.args) > 1 and hasattr(db[request.args[1]], "_primarykey"):
        regex = re.compile(r"(?P<table>\w+)\.(?P<field>\w+)=(?P<value>.+)")
    if request.vars.query:
        match = regex.match(request.vars.query)
        if match:
            request.vars.query = "%s.%s.%s==%s" % (
                request.args[0],
                match.group("table"),
                match.group("field"),
                match.group("value"),
            )
    else:
        request.vars.query = session.last_query
    query = get_query(request)
    if request.vars.start:
        start = int(request.vars.start)
    else:
        start = 0
    nrows = 0

    step = 100
    fields = []

    if is_imap:
        step = 3

    stop = start + step

    table = None
    rows = []
    orderby = request.vars.orderby
    if orderby:
        orderby = dbname + "." + orderby
        if orderby == session.last_orderby:
            if orderby[0] == "~":
                orderby = orderby[1:]
            else:
                orderby = "~" + orderby
    session.last_orderby = orderby
    session.last_query = request.vars.query
    form = FORM(
        TABLE(
            TR(
                T("Query:"),
                "",
                INPUT(
                    _style="width:400px",
                    _name="query",
                    _value=request.vars.query or "",
                    _class="form-control",
                    requires=IS_NOT_EMPTY(error_message=T("Cannot be empty")),
                ),
            ),
            TR(
                T("Update:"),
                INPUT(_name="update_check", _type="checkbox", value=False),
                INPUT(
                    _style="width:400px",
                    _name="update_fields",
                    _value=request.vars.update_fields or "",
                    _class="form-control",
                ),
            ),
            TR(
                T("Delete:"),
                INPUT(
                    _name="delete_check", _class="delete", _type="checkbox", value=False
                ),
                "",
            ),
            TR(
                "",
                "",
                INPUT(_type="submit", _value=T("submit"), _class="btn btn-primary"),
            ),
        ),
        _action=URL(r=request, args=request.args),
    )

    tb = None
    if form.accepts(request.vars, formname=None):
        regex = re.compile(request.args[0] + r"\.(?P<table>\w+)\..+")
        match = regex.match(form.vars.query.strip())
        if match:
            table = match.group("table")
        try:
            nrows = db(query, ignore_common_filters=True).count()
            if form.vars.update_check and form.vars.update_fields:
                db(query, ignore_common_filters=True).update(
                    **eval_in_global_env("dict(%s)" % form.vars.update_fields)
                )
                response.flash = T("%s %%{row} updated", nrows)
            elif form.vars.delete_check:
                db(query, ignore_common_filters=True).delete()
                response.flash = T("%s %%{row} deleted", nrows)
            nrows = db(query, ignore_common_filters=True).count()

            if is_imap:
                fields = [
                    db[table][name]
                    for name in ("id", "uid", "created", "to", "sender", "subject")
                ]
            if orderby:
                rows = db(query, ignore_common_filters=True).select(
                    *fields, limitby=(start, stop), orderby=eval_in_global_env(orderby)
                )
            else:
                rows = db(query, ignore_common_filters=True).select(
                    *fields, limitby=(start, stop)
                )
        except Exception as e:
            import traceback

            tb = traceback.format_exc()
            (rows, nrows) = ([], 0)
            response.flash = DIV(T("Invalid Query"), PRE(str(e)))

begin handle upload csv

    csv_table = table or request.vars.table
    if csv_table:
        formcsv = FORM(
            str(T("or import from csv file")) + " ",
            INPUT(_type="file", _name="csvfile"),
            INPUT(_type="hidden", _value=csv_table, _name="table"),
            INPUT(_type="submit", _value=T("import"), _class="btn btn-primary"),
        )
    else:
        formcsv = None
    if formcsv and formcsv.process().accepted:
        try:
            import_csv(db[request.vars.table], request.vars.csvfile.file)
            response.flash = T("data uploaded")
        except Exception as e:
            response.flash = DIV(T("unable to parse csv file"), PRE(str(e)))

end handle upload csv

 
    return dict(
        form=form,
        table=table,
        start=start,
        stop=stop,
        step=step,
        nrows=nrows,
        rows=rows,
        query=request.vars.query,
        formcsv=formcsv,
        tb=tb,
    )
 
 

System Message: SEVERE/4 (/home/docs/checkouts/readthedocs.org/user_builds/runestoneserver/checkouts/latest/controllers/appadmin.py, line 368)

Title overline & underline mismatch.

##########################################################
## edit delete one record
###########################################################
 
 
def update():
    (db, table) = get_table(request)
    keyed = hasattr(db[table], "_primarykey")
    record = None
    db[table]._common_filter = None
    if keyed:
        key = [f for f in request.vars if f in db[table]._primarykey]
        if key:
            record = db(db[table][key[0]] == request.vars[key[0]]).select().first()
    else:
        record = db(db[table].id == request.args(2)).select().first()

    if not record:
        qry = query_by_table_type(table, db)
        session.flash = T("record does not exist")
        redirect(URL("select", args=request.args[:1], vars=dict(query=qry)))

    if keyed:
        for k in db[table]._primarykey:
            db[table][k].writable = False

    form = SQLFORM(
        db[table],
        record,
        deletable=True,
        delete_label=T("Check to delete"),
        ignore_rw=ignore_rw and not keyed,
        linkto=URL("select", args=request.args[:1]),
        upload=URL(r=request, f="download", args=request.args[:1]),
    )

    if form.accepts(request.vars, session):
        session.flash = T("done!")
        qry = query_by_table_type(table, db)
        redirect(URL("select", args=request.args[:1], vars=dict(query=qry)))
    return dict(form=form, table=db[table])
 
 

System Message: SEVERE/4 (/home/docs/checkouts/readthedocs.org/user_builds/runestoneserver/checkouts/latest/controllers/appadmin.py, line 411)

Title overline & underline mismatch.

##########################################################
## get global variables
###########################################################
 
 
def state():
    return dict()


def ccache():
    if is_gae:
        form = FORM(
            P(TAG.BUTTON(T("Clear CACHE?"), _type="submit", _name="yes", _value="yes"))
        )
    else:
        cache.ram.initialize()
        cache.disk.initialize()

        form = FORM(
            P(TAG.BUTTON(T("Clear CACHE?"), _type="submit", _name="yes", _value="yes")),
            P(TAG.BUTTON(T("Clear RAM"), _type="submit", _name="ram", _value="ram")),
            P(TAG.BUTTON(T("Clear DISK"), _type="submit", _name="disk", _value="disk")),
        )

    if form.accepts(request.vars, session):
        session.flash = ""
        if is_gae:
            if request.vars.yes:
                cache.ram.clear()
                session.flash += T("Cache Cleared")
        else:
            clear_ram = False
            clear_disk = False
            if request.vars.yes:
                clear_ram = clear_disk = True
            if request.vars.ram:
                clear_ram = True
            if request.vars.disk:
                clear_disk = True
            if clear_ram:
                cache.ram.clear()
                session.flash += T("Ram Cleared")
            if clear_disk:
                cache.disk.clear()
                session.flash += T("Disk Cleared")
        redirect(URL(r=request))

    try:
        from pympler.asizeof import asizeof
    except ImportError:
        asizeof = False

    import shelve
    import os
    import copy
    import time
    import math
    from pydal.contrib import portalocker

    ram = {
        "entries": 0,
        "bytes": 0,
        "objects": 0,
        "hits": 0,
        "misses": 0,
        "ratio": 0,
        "oldest": time.time(),
        "keys": [],
    }

    disk = copy.copy(ram)
    total = copy.copy(ram)
    disk["keys"] = []
    total["keys"] = []

    def GetInHMS(seconds):
        hours = math.floor(seconds / 3600)
        seconds -= hours * 3600
        minutes = math.floor(seconds / 60)
        seconds -= minutes * 60
        seconds = math.floor(seconds)

        return (hours, minutes, seconds)

    if is_gae:
        gae_stats = cache.ram.client.get_stats()
        try:
            gae_stats["ratio"] = (gae_stats["hits"] * 100) / (
                gae_stats["hits"] + gae_stats["misses"]
            )
        except ZeroDivisionError:
            gae_stats["ratio"] = T("?")
        gae_stats["oldest"] = GetInHMS(time.time() - gae_stats["oldest_item_age"])
        total.update(gae_stats)
    else:

get ram stats directly from the cache object

        ram_stats = cache.ram.stats[request.application]
        ram["hits"] = ram_stats["hit_total"] - ram_stats["misses"]
        ram["misses"] = ram_stats["misses"]
        try:
            ram["ratio"] = ram["hits"] * 100 / ram_stats["hit_total"]
        except (KeyError, ZeroDivisionError):
            ram["ratio"] = 0

        for key, value in iteritems(cache.ram.storage):
            if asizeof:
                ram["bytes"] += asizeof(value[1])
                ram["objects"] += 1
            ram["entries"] += 1
            if value[0] < ram["oldest"]:
                ram["oldest"] = value[0]
            ram["keys"].append((key, GetInHMS(time.time() - value[0])))

        for key in cache.disk.storage:
            value = cache.disk.storage[key]
            if key == "web2py_cache_statistics" and isinstance(value[1], dict):
                disk["hits"] = value[1]["hit_total"] - value[1]["misses"]
                disk["misses"] = value[1]["misses"]
                try:
                    disk["ratio"] = disk["hits"] * 100 / value[1]["hit_total"]
                except (KeyError, ZeroDivisionError):
                    disk["ratio"] = 0
            else:
                if asizeof:
                    disk["bytes"] += asizeof(value[1])
                    disk["objects"] += 1
                disk["entries"] += 1
                if value[0] < disk["oldest"]:
                    disk["oldest"] = value[0]
                disk["keys"].append((key, GetInHMS(time.time() - value[0])))

        ram_keys = list(
            ram
        )  # ['hits', 'objects', 'ratio', 'entries', 'keys', 'oldest', 'bytes', 'misses']
        ram_keys.remove("ratio")
        ram_keys.remove("oldest")
        for key in ram_keys:
            total[key] = ram[key] + disk[key]

        try:
            total["ratio"] = total["hits"] * 100 / (total["hits"] + total["misses"])
        except (KeyError, ZeroDivisionError):
            total["ratio"] = 0

        if disk["oldest"] < ram["oldest"]:
            total["oldest"] = disk["oldest"]
        else:
            total["oldest"] = ram["oldest"]

        ram["oldest"] = GetInHMS(time.time() - ram["oldest"])
        disk["oldest"] = GetInHMS(time.time() - disk["oldest"])
        total["oldest"] = GetInHMS(time.time() - total["oldest"])

    def key_table(keys):
        return TABLE(
            TR(TD(B(T("Key"))), TD(B(T("Time in Cache (h:m:s)")))),
            *[TR(TD(k[0]), TD("%02d:%02d:%02d" % k[1])) for k in keys],
            **dict(
                _class="cache-keys",
                _style="border-collapse: separate; border-spacing: .5em;",
            )
        )

    if not is_gae:
        ram["keys"] = key_table(ram["keys"])
        disk["keys"] = key_table(disk["keys"])
        total["keys"] = key_table(total["keys"])

    return dict(
        form=form, total=total, ram=ram, disk=disk, object_stats=asizeof != False
    )


def table_template(table):
    from gluon.html import TR, TD, TABLE, TAG

    def FONT(*args, **kwargs):
        return TAG.font(*args, **kwargs)

    def types(field):
        f_type = field.type
        if not isinstance(f_type, str):
            return " "
        elif f_type == "string":
            return field.length
        elif f_type == "id":
            return B("pk")
        elif f_type.startswith("reference") or f_type.startswith("list:reference"):
            return B("fk")
        else:
            return " "
 

This is horribe HTML but the only one graphiz understands

    rows = []
    cellpadding = 4
    color = "#000000"
    bgcolor = "#FFFFFF"
    face = "Helvetica"
    face_bold = "Helvetica Bold"
    border = 0

    rows.append(
        TR(
            TD(
                FONT(table, _face=face_bold, _color=bgcolor),
                _colspan=3,
                _cellpadding=cellpadding,
                _align="center",
                _bgcolor=color,
            )
        )
    )
    for row in db[table]:
        rows.append(
            TR(
                TD(
                    FONT(row.name, _color=color, _face=face_bold),
                    _align="left",
                    _cellpadding=cellpadding,
                    _border=border,
                ),
                TD(
                    FONT(row.type, _color=color, _face=face),
                    _align="left",
                    _cellpadding=cellpadding,
                    _border=border,
                ),
                TD(
                    FONT(types(row), _color=color, _face=face),
                    _align="center",
                    _cellpadding=cellpadding,
                    _border=border,
                ),
            )
        )
    return (
        "< %s >"
        % TABLE(
            *rows, **dict(_bgcolor=bgcolor, _border=1, _cellborder=0, _cellspacing=0)
        ).xml()
    )


def manage():
    tables = manager_action["tables"]
    if isinstance(tables[0], str):
        db = manager_action.get("db", auth.db)
        db = globals()[db] if isinstance(db, str) else db
        tables = [db[table] for table in tables]
    if request.args(0) == "auth":
        auth.table_user()._plural = T("Users")
        auth.table_group()._plural = T("Roles")
        auth.table_membership()._plural = T("Memberships")
        auth.table_permission()._plural = T("Permissions")
    if request.extension != "load":
        return dict(
            heading=manager_action.get(
                "heading",
                T("Manage %(action)s")
                % dict(action=request.args(0).replace("_", " ").title()),
            ),
            tablenames=[table._tablename for table in tables],
            labels=[table._plural.title() for table in tables],
        )

    table = tables[request.args(1, cast=int)]
    formname = "%s_grid" % table._tablename
    linked_tables = orderby = None
    if request.args(0) == "auth":
        auth.table_group()._id.readable = (
            auth.table_membership()._id.readable
        ) = auth.table_permission()._id.readable = False
        auth.table_membership().user_id.label = T("User")
        auth.table_membership().group_id.label = T("Role")
        auth.table_permission().group_id.label = T("Role")
        auth.table_permission().name.label = T("Permission")
        if table == auth.table_user():
            linked_tables = [auth.settings.table_membership_name]
        elif table == auth.table_group():
            orderby = (
                "role"
                if not request.args(3) or ".group_id" not in request.args(3)
                else None
            )
        elif table == auth.table_permission():
            orderby = "group_id"
    kwargs = dict(
        user_signature=True,
        maxtextlength=1000,
        orderby=orderby,
        linked_tables=linked_tables,
    )
    smartgrid_args = manager_action.get("smartgrid_args", {})
    kwargs.update(**smartgrid_args.get("DEFAULT", {}))
    kwargs.update(**smartgrid_args.get(table._tablename, {}))
    grid = SQLFORM.smartgrid(table, args=request.args[:2], formname=formname, **kwargs)
    return grid


def hooks():
    import functools
    import inspect

    list_op = [
        "_%s_%s" % (h, m)
        for h in ["before", "after"]
        for m in ["insert", "update", "delete"]
    ]
    tables = []
    with_build_it = False
    for db_str in sorted(databases):
        db = databases[db_str]
        for t in db.tables:
            method_hooks = []
            for op in list_op:
                functions = []
                for f in getattr(db[t], op):
                    if hasattr(f, "__call__"):
                        try:
                            if isinstance(f, (functools.partial)):
                                f = f.func
                            filename = inspect.getsourcefile(f)
                            details = {
                                "funcname": f.__name__,
                                "filename": filename[len(request.folder) :]
                                if request.folder in filename
                                else None,
                                "lineno": inspect.getsourcelines(f)[1],
                            }
                            if details[
                                "filename"
                            ]:  # Built in functions as delete_uploaded_files are not editable
                                details["url"] = URL(
                                    a="admin",
                                    c="default",
                                    f="edit",
                                    args=[request["application"], details["filename"]],
                                    vars={"lineno": details["lineno"]},
                                )
                            if details["filename"] or with_build_it:
                                functions.append(details)

compiled app and windows build don’t support code inspection

                        except:
                            pass
                if len(functions):
                    method_hooks.append({"name": op, "functions": functions})
            if len(method_hooks):
                tables.append(
                    {
                        "name": "%s.%s" % (db_str, t),
                        "slug": IS_SLUG()("%s.%s" % (db_str, t))[0],
                        "method_hooks": method_hooks,
                    }
                )

Render

    ul_main = UL(_class="nav nav-list")
    for t in tables:
        ul_main.append(A(t["name"], _onclick="collapse('a_%s')" % t["slug"]))
        ul_t = UL(_class="nav nav-list", _id="a_%s" % t["slug"], _style="display:none")
        for op in t["method_hooks"]:
            ul_t.append(LI(op["name"]))
            ul_t.append(
                UL(
                    [
                        LI(
                            A(
                                f["funcname"],
                                _class="editor_filelink",
                                _href=f["url"] if "url" in f else None,
                                **{"_data-lineno": f["lineno"] - 1}
                            )
                        )
                        for f in op["functions"]
                    ]
                )
            )
        ul_main.append(ul_t)
    return ul_main
 
 

System Message: SEVERE/4 (/home/docs/checkouts/readthedocs.org/user_builds/runestoneserver/checkouts/latest/controllers/appadmin.py, line 791)

Title overline & underline mismatch.

##########################################################
d3 based model visualizations
###########################################################
 
 
def d3_graph_model():

See https://www.facebook.com/web2py/posts/145613995589010 from Bruno Rocha and also the app_admin bg_graph_model function

Create a list of table dicts, called “nodes”

 
    nodes = []
    links = []

    for database in databases:
        db = eval_in_global_env(database)
        for tablename in db.tables:
            fields = []
            for field in db[tablename]:
                f_type = field.type
                if not isinstance(f_type, str):
                    disp = " "
                elif f_type == "string":
                    disp = field.length
                elif f_type == "id":
                    disp = "PK"
                elif f_type.startswith("reference") or f_type.startswith(
                    "list:reference"
                ):
                    disp = "FK"
                else:
                    disp = " "
                fields.append(dict(name=field.name, type=field.type, disp=disp))

                if isinstance(f_type, str) and (
                    f_type.startswith("reference")
                    or f_type.startswith("list:reference")
                ):
                    referenced_table = f_type.split()[1].split(".")[0]

                    links.append(dict(source=tablename, target=referenced_table))

            nodes.append(dict(name=tablename, type="table", fields=fields))
 

d3 v4 allows individual modules to be specified. The complete d3 library is included below.

    response.files.append(URL("admin", "static", "js/d3.min.js"))
    response.files.append(URL("admin", "static", "js/d3_graph.js"))
    return dict(databases=databases, nodes=nodes, links=links)