path: root/wqflask/utility
diff options
authorArun Isaac2023-12-29 18:55:37 +0000
committerArun Isaac2023-12-29 19:01:46 +0000
commit204a308be0f741726b9a620d88fbc22b22124c81 (patch)
treeb3cf66906674020b530c844c2bb4982c8a0e2d39 /wqflask/utility
parent83062c75442160427b50420161bfcae2c5c34c84 (diff)
Namespace all modules under gn2.
We move all modules under a gn2 directory. This is important for "correct" packaging and deployment as a Guix service.
Diffstat (limited to 'wqflask/utility')
27 files changed, 0 insertions, 3546 deletions
diff --git a/wqflask/utility/Plot.py b/wqflask/utility/Plot.py
deleted file mode 100644
index df7156b4..00000000
--- a/wqflask/utility/Plot.py
+++ /dev/null
@@ -1,343 +0,0 @@
-# Copyright (C) University of Tennessee Health Science Center, Memphis, TN.
-# This program is free software: you can redistribute it and/or modify it
-# under the terms of the GNU Affero General Public License
-# as published by the Free Software Foundation, either version 3 of the
-# License, or (at your option) any later version.
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# See the GNU Affero General Public License for more details.
-# This program is available from Source Forge: at GeneNetwork Project
-# (sourceforge.net/projects/genenetwork/).
-# Contact Drs. Robert W. Williams and Xiaodong Zhou (2010)
-# at rwilliams@uthsc.edu and xzhou15@uthsc.edu
-# This module is used by GeneNetwork project (www.genenetwork.org)
-# Created by GeneNetwork Core Team 2010/08/10
-# Last updated by GeneNetwork Core Team 2010/10/20
-from PIL import ImageColor
-from PIL import ImageDraw
-from PIL import ImageFont
-from math import *
-import utility.corestats as corestats
-from base import webqtlConfig
-from utility.pillow_utils import draw_rotated_text
-# ---- Define common colours ---- #
-BLUE = ImageColor.getrgb("blue")
-BLACK = ImageColor.getrgb("black")
-# ---- END: Define common colours ---- #
-# ---- FONT FILES ---- #
-VERDANA_FILE = "./wqflask/static/fonts/verdana.ttf"
-COUR_FILE = "./wqflask/static/fonts/courbd.ttf"
-TAHOMA_FILE = "./wqflask/static/fonts/tahoma.ttf"
-# ---- END: FONT FILES ---- #
-def cformat(d, rank=0):
- 'custom string format'
- strD = "%2.6f" % d
- if rank == 0:
- while strD[-1] in ('0', '.'):
- if strD[-1] == '0' and strD[-2] == '.' and len(strD) <= 4:
- break
- elif strD[-1] == '.':
- strD = strD[:-1]
- break
- else:
- strD = strD[:-1]
- else:
- strD = strD.split(".")[0]
- if strD == '-0.0':
- strD = '0.0'
- return strD
-def frange(start, end=None, inc=1.0):
- "A faster range-like function that does accept float increments..."
- if end == None:
- end = start + 0.0
- start = 0.0
- else:
- start += 0.0 # force it to be a float
- count = int((end - start) / inc)
- if start + count * inc != end:
- # Need to adjust the count. AFAICT, it always comes up one short.
- count += 1
- L = [start] * count
- for i in range(1, count):
- L[i] = start + i * inc
- return L
-def find_outliers(vals):
- """Calculates the upper and lower bounds of a set of sample/case values
- >>> find_outliers([3.504, 5.234, 6.123, 7.234, 3.542, 5.341, 7.852, 4.555, 12.537])
- (11.252500000000001, 0.5364999999999993)
- >>> find_outliers([9,12,15,17,31,50,7,5,6,8])
- (32.0, -8.0)
- If there are no vals, returns None for the upper and lower bounds,
- which code that calls it will have to deal with.
- >>> find_outliers([])
- (None, None)
- """
- if vals:
- stats = corestats.Stats(vals)
- low_hinge = stats.percentile(25)
- up_hinge = stats.percentile(75)
- hstep = 1.5 * (up_hinge - low_hinge)
- upper_bound = up_hinge + hstep
- lower_bound = low_hinge - hstep
- else:
- upper_bound = None
- lower_bound = None
- return upper_bound, lower_bound
-# parameter: data is either object returned by reaper permutation function (called by MarkerRegressionPage.py)
-# or the first object returned by direct (pair-scan) permu function (called by DirectPlotPage.py)
-def plotBar(canvas, data, barColor=BLUE, axesColor=BLACK, labelColor=BLACK, XLabel=None, YLabel=None, title=None, offset=(60, 20, 40, 40), zoom=1):
- im_drawer = ImageDraw.Draw(canvas)
- xLeftOffset, xRightOffset, yTopOffset, yBottomOffset = offset
- plotWidth = canvas.size[0] - xLeftOffset - xRightOffset
- plotHeight = canvas.size[1] - yTopOffset - yBottomOffset
- if plotHeight <= 0 or plotWidth <= 0:
- return
- if len(data) < 2:
- return
- max_D = max(data)
- min_D = min(data)
- # add by NL 06-20-2011: fix the error: when max_D is infinite, log function in detScale will go wrong
- if (max_D == float('inf') or max_D > webqtlConfig.MAXLRS) and min_D < webqtlConfig.MAXLRS:
- max_D = webqtlConfig.MAXLRS # maximum LRS value
- xLow, xTop, stepX = detScale(min_D, max_D)
- # reduce data
- # ZS: Used to determine number of bins for permutation output
- step = ceil((xTop - xLow) / 50.0)
- j = xLow
- dataXY = []
- Count = []
- while j <= xTop:
- dataXY.append(j)
- Count.append(0)
- j += step
- for i, item in enumerate(data):
- if (item == float('inf') or item > webqtlConfig.MAXLRS) and min_D < webqtlConfig.MAXLRS:
- item = webqtlConfig.MAXLRS # maximum LRS value
- j = int((item - xLow) / step)
- Count[j] += 1
- yLow, yTop, stepY = detScale(0, max(Count))
- # draw data
- xScale = plotWidth / (xTop - xLow)
- yScale = plotHeight / (yTop - yLow)
- barWidth = xScale * step
- for i, count in enumerate(Count):
- if count:
- xc = (dataXY[i] - xLow) * xScale + xLeftOffset
- yc = -(count - yLow) * yScale + yTopOffset + plotHeight
- im_drawer.rectangle(
- xy=((xc + 2, yc), (xc + barWidth - 2, yTopOffset + plotHeight)),
- outline=barColor, fill=barColor)
- # draw drawing region
- im_drawer.rectangle(
- xy=((xLeftOffset, yTopOffset),
- (xLeftOffset + plotWidth, yTopOffset + plotHeight))
- )
- # draw scale
- scaleFont = ImageFont.truetype(font=COUR_FILE, size=11)
- x = xLow
- for i in range(int(stepX) + 1):
- xc = xLeftOffset + (x - xLow) * xScale
- im_drawer.line(
- xy=((xc, yTopOffset + plotHeight),
- (xc, yTopOffset + plotHeight + 5)),
- fill=axesColor)
- strX = cformat(d=x, rank=0)
- im_drawer.text(
- text=strX,
- xy=(xc - im_drawer.textsize(strX, font=scaleFont)[0] / 2,
- yTopOffset + plotHeight + 14), font=scaleFont)
- x += (xTop - xLow) / stepX
- y = yLow
- for i in range(int(stepY) + 1):
- yc = yTopOffset + plotHeight - (y - yLow) * yScale
- im_drawer.line(
- xy=((xLeftOffset, yc), (xLeftOffset - 5, yc)), fill=axesColor)
- strY = "%d" % y
- im_drawer.text(
- text=strY,
- xy=(xLeftOffset - im_drawer.textsize(strY,
- font=scaleFont)[0] - 6, yc + 5),
- font=scaleFont)
- y += (yTop - yLow) / stepY
- # draw label
- labelFont = ImageFont.truetype(font=TAHOMA_FILE, size=17)
- if XLabel:
- im_drawer.text(
- text=XLabel,
- xy=(xLeftOffset + (
- plotWidth - im_drawer.textsize(XLabel, font=labelFont)[0]) / 2.0,
- yTopOffset + plotHeight + yBottomOffset - 10),
- font=labelFont, fill=labelColor)
- if YLabel:
- draw_rotated_text(canvas, text=YLabel,
- xy=(19,
- yTopOffset + plotHeight - (
- plotHeight - im_drawer.textsize(
- YLabel, font=labelFont)[0]) / 2.0),
- font=labelFont, fill=labelColor, angle=90)
- labelFont = ImageFont.truetype(font=VERDANA_FILE, size=16)
- if title:
- im_drawer.text(
- text=title,
- xy=(xLeftOffset + (plotWidth - im_drawer.textsize(
- title, font=labelFont)[0]) / 2.0,
- 20),
- font=labelFont, fill=labelColor)
-# This function determines the scale of the plot
-def detScaleOld(min, max):
- if min >= max:
- return None
- elif min == -1.0 and max == 1.0:
- return [-1.2, 1.2, 12]
- else:
- a = max - min
- b = floor(log10(a))
- c = pow(10.0, b)
- if a < c * 5.0:
- c /= 2.0
- # print a,b,c
- low = c * floor(min / c)
- high = c * ceil(max / c)
- return [low, high, round((high - low) / c)]
-def detScale(min=0, max=0):
- if min >= max:
- return None
- elif min == -1.0 and max == 1.0:
- return [-1.2, 1.2, 12]
- else:
- a = max - min
- if max != 0:
- max += 0.1 * a
- if min != 0:
- if min > 0 and min < 0.1 * a:
- min = 0.0
- else:
- min -= 0.1 * a
- a = max - min
- b = floor(log10(a))
- c = pow(10.0, b)
- low = c * floor(min / c)
- high = c * ceil(max / c)
- n = round((high - low) / c)
- div = 2.0
- while n < 5 or n > 15:
- if n < 5:
- c /= div
- else:
- c *= div
- if div == 2.0:
- div = 5.0
- else:
- div = 2.0
- low = c * floor(min / c)
- high = c * ceil(max / c)
- n = round((high - low) / c)
- return [low, high, n]
-def bluefunc(x):
- return 1.0 / (1.0 + exp(-10 * (x - 0.6)))
-def redfunc(x):
- return 1.0 / (1.0 + exp(10 * (x - 0.5)))
-def greenfunc(x):
- return 1 - pow(redfunc(x + 0.2), 2) - bluefunc(x - 0.3)
-def colorSpectrum(n=100):
- multiple = 10
- if n == 1:
- return [ImageColor.getrgb("rgb(100%,0%,0%)")]
- elif n == 2:
- return [ImageColor.getrgb("100%,0%,0%)"),
- ImageColor.getrgb("rgb(0%,0%,100%)")]
- elif n == 3:
- return [ImageColor.getrgb("rgb(100%,0%,0%)"),
- ImageColor.getrgb("rgb(0%,100%,0%)"),
- ImageColor.getrgb("rgb(0%,0%,100%)")]
- N = n * multiple
- out = [None] * N
- for i in range(N):
- x = float(i) / N
- out[i] = ImageColor.getrgb("rgb({}%,{}%,{}%".format(
- *[int(i * 100) for i in (
- redfunc(x), greenfunc(x), bluefunc(x))]))
- out2 = [out[0]]
- step = N / float(n - 1)
- j = 0
- for i in range(n - 2):
- j += step
- out2.append(out[int(j)])
- out2.append(out[-1])
- return out2
-def _test():
- import doctest
- doctest.testmod()
-if __name__ == "__main__":
- _test()
diff --git a/wqflask/utility/TDCell.py b/wqflask/utility/TDCell.py
deleted file mode 100644
index 4b0f4b1d..00000000
--- a/wqflask/utility/TDCell.py
+++ /dev/null
@@ -1,41 +0,0 @@
-# Copyright (C) University of Tennessee Health Science Center, Memphis, TN.
-# This program is free software: you can redistribute it and/or modify it
-# under the terms of the GNU Affero General Public License
-# as published by the Free Software Foundation, either version 3 of the
-# License, or (at your option) any later version.
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# See the GNU Affero General Public License for more details.
-# This program is available from Source Forge: at GeneNetwork Project
-# (sourceforge.net/projects/genenetwork/).
-# Contact Drs. Robert W. Williams and Xiaodong Zhou (2010)
-# at rwilliams@uthsc.edu and xzhou15@uthsc.edu
-# This module is used by GeneNetwork project (www.genenetwork.org)
-# Created by GeneNetwork Core Team 2010/08/10
-# Last updated by GeneNetwork Core Team 2010/10/20
-# Table Cell Class
-class TDCell:
- def __init__(self, html="", text="", val=0.0):
- self.html = html # html, for web page
- self.text = text # text value, for output to a text file
- self.val = val # sort by value
- def __str__(self):
- return self.text
diff --git a/wqflask/utility/THCell.py b/wqflask/utility/THCell.py
deleted file mode 100644
index f533dcb8..00000000
--- a/wqflask/utility/THCell.py
+++ /dev/null
@@ -1,42 +0,0 @@
-# Copyright (C) University of Tennessee Health Science Center, Memphis, TN.
-# This program is free software: you can redistribute it and/or modify it
-# under the terms of the GNU Affero General Public License
-# as published by the Free Software Foundation, either version 3 of the
-# License, or (at your option) any later version.
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# See the GNU Affero General Public License for more details.
-# This program is available from Source Forge: at GeneNetwork Project
-# (sourceforge.net/projects/genenetwork/).
-# Contact Drs. Robert W. Williams and Xiaodong Zhou (2010)
-# at rwilliams@uthsc.edu and xzhou15@uthsc.edu
-# This module is used by GeneNetwork project (www.genenetwork.org)
-# Created by GeneNetwork Core Team 2010/08/10
-# Last updated by GeneNetwork Core Team 2010/10/20
-# Table Header Class
-class THCell:
- def __init__(self, html="", text="", sort=1, idx=-1):
- self.html = html # html, for web page
- self.text = text # Column text value
- self.sort = sort # 0: not sortable, 1: yes
- self.idx = idx # sort by value
- def __str__(self):
- return self.text
diff --git a/wqflask/utility/__init__.py b/wqflask/utility/__init__.py
deleted file mode 100644
index 25273fa0..00000000
--- a/wqflask/utility/__init__.py
+++ /dev/null
@@ -1,35 +0,0 @@
-from pprint import pformat as pf
-# Todo: Move these out of __init__
-class Bunch:
- """Like a dictionary but using object notation"""
- def __init__(self, **kw):
- self.__dict__ = kw
- def __repr__(self):
- return pf(self.__dict__)
-class Struct:
- '''The recursive class for building and representing objects with.
- From http://stackoverflow.com/a/6573827/1175849
- '''
- def __init__(self, obj):
- for k, v in list(obj.items()):
- if isinstance(v, dict):
- setattr(self, k, Struct(v))
- else:
- setattr(self, k, v)
- def __getitem__(self, val):
- return self.__dict__[val]
- def __repr__(self):
- return '{%s}' % str(', '.join('%s : %s' % (k, repr(v)) for
- (k, v) in list(self.__dict__.items())))
diff --git a/wqflask/utility/after.py b/wqflask/utility/after.py
deleted file mode 100644
index 2b560e48..00000000
--- a/wqflask/utility/after.py
+++ /dev/null
@@ -1,15 +0,0 @@
-See: http://flask.pocoo.org/docs/patterns/deferredcallbacks/#deferred-callbacks
-from flask import g
-from wqflask import app
-def after_this_request(f):
- if not hasattr(g, 'after_request_callbacks'):
- g.after_request_callbacks = []
- g.after_request_callbacks.append(f)
- return f
diff --git a/wqflask/utility/authentication_tools.py b/wqflask/utility/authentication_tools.py
deleted file mode 100644
index 5d00d600..00000000
--- a/wqflask/utility/authentication_tools.py
+++ /dev/null
@@ -1,150 +0,0 @@
-import json
-import requests
-from flask import g
-from wqflask.database import database_connection
-from base import webqtlConfig
-from utility.redis_tools import (get_redis_conn,
- get_resource_info,
- get_resource_id,
- add_resource)
-from utility.tools import get_setting, GN_PROXY_URL
-Redis = get_redis_conn()
-def check_resource_availability(dataset, user_id, trait_id=None):
- # At least for now assume temporary entered traits are accessible
- if type(dataset) == str or dataset.type == "Temp":
- return webqtlConfig.DEFAULT_PRIVILEGES
- resource_id = get_resource_id(dataset, trait_id)
- # ZS: This should never be false, but it's technically possible if
- # a non-Temp dataset somehow had a type other than
- # Publish/ProbeSet/Geno
- if resource_id:
- resource_info = get_resource_info(resource_id)
- # If resource isn't already in redis, add it with default
- # privileges
- if not resource_info:
- resource_info = add_new_resource(dataset, trait_id)
- # Check if super-user - we should probably come up with some
- # way to integrate this into the proxy
- if user_id in Redis.smembers("super_users"):
- return webqtlConfig.SUPER_PRIVILEGES
- response = None
- the_url = f"{GN_PROXY_URL}available?resource={resource_id}&user={user_id}"
- try:
- response = json.loads(requests.get(the_url).content)
- except:
- response = resource_info['default_mask']
- return response
-def add_new_resource(dataset, trait_id=None):
- resource_ob = {
- 'owner_id': "none", # webqtlConfig.DEFAULT_OWNER_ID,
- 'default_mask': webqtlConfig.DEFAULT_PRIVILEGES,
- 'group_masks': {}
- }
- if dataset.type == "Publish":
- group_code = get_group_code(dataset)
- if group_code is None:
- group_code = ""
- resource_ob['name'] = group_code + "_" + str(trait_id)
- resource_ob['data'] = {
- 'dataset': dataset.id,
- 'trait': trait_id
- }
- resource_ob['type'] = 'dataset-publish'
- elif dataset.type == "Geno":
- resource_ob['name'] = dataset.name
- resource_ob['data'] = {
- 'dataset': dataset.id
- }
- resource_ob['type'] = 'dataset-geno'
- else:
- resource_ob['name'] = dataset.name
- resource_ob['data'] = {
- 'dataset': dataset.id
- }
- resource_ob['type'] = 'dataset-probeset'
- resource_info = add_resource(resource_ob, update=False)
- return resource_info
-def get_group_code(dataset):
- with database_connection(get_setting("SQL_URI")) as conn, conn.cursor() as cursor:
- cursor.execute(
- "SELECT InbredSetCode FROM InbredSet WHERE Name=%s",
- (dataset.group.name,)
- )
- if results := cursor.fetchone():
- return results[0]
- return ""
-def check_admin(resource_id=None):
- the_url = GN_PROXY_URL + "available?resource={}&user={}".format(
- resource_id, g.user_session.user_id)
- try:
- response = json.loads(requests.get(the_url).content)['admin']
- except:
- resource_info = get_resource_info(resource_id)
- response = resource_info['default_mask']['admin']
- if type(response) is list:
- if 'edit-admins' in response:
- return 'edit_admins'
- elif 'edit-access' in response:
- return 'edit-access'
- return response
-def check_owner(dataset=None, trait_id=None, resource_id=None):
- if resource_id:
- resource_info = get_resource_info(resource_id)
- if g.user_session.user_id == resource_info['owner_id']:
- return resource_id
- else:
- resource_id = get_resource_id(dataset, trait_id)
- if resource_id:
- resource_info = get_resource_info(resource_id)
- if g.user_session.user_id == resource_info['owner_id']:
- return resource_id
- return False
-def check_owner_or_admin(dataset=None, trait_id=None, resource_id=None):
- if not resource_id:
- if dataset.type == "Temp":
- return "not-admin"
- else:
- resource_id = get_resource_id(dataset, trait_id)
- try:
- user_id = g.user_session.user_id.encode('utf-8')
- except:
- user_id = g.user_session.user_id
- if user_id in Redis.smembers("super_users"):
- return "owner"
- resource_info = get_resource_info(resource_id)
- if resource_info:
- if user_id == resource_info['owner_id']:
- return "owner"
- else:
- return check_admin(resource_id)
- return "not-admin"
diff --git a/wqflask/utility/chunks.py b/wqflask/utility/chunks.py
deleted file mode 100644
index f6e88cbe..00000000
--- a/wqflask/utility/chunks.py
+++ /dev/null
@@ -1,30 +0,0 @@
-import math
-def divide_into_chunks(the_list, number_chunks):
- """Divides a list into approximately number_chunks smaller lists
- >>> divide_into_chunks([1, 2, 7, 3, 22, 8, 5, 22, 333], 3)
- [[1, 2, 7], [3, 22, 8], [5, 22, 333]]
- >>> divide_into_chunks([1, 2, 7, 3, 22, 8, 5, 22, 333], 4)
- [[1, 2, 7], [3, 22, 8], [5, 22, 333]]
- >>> divide_into_chunks([1, 2, 7, 3, 22, 8, 5, 22, 333], 5)
- [[1, 2], [7, 3], [22, 8], [5, 22], [333]]
- >>>
- """
- length = len(the_list)
- if length == 0:
- return [[]]
- if length <= number_chunks:
- number_chunks = length
- chunksize = int(math.ceil(length / number_chunks))
- chunks = []
- for counter in range(0, length, chunksize):
- chunks.append(the_list[counter:counter + chunksize])
- return chunks
diff --git a/wqflask/utility/corestats.py b/wqflask/utility/corestats.py
deleted file mode 100644
index da0a21db..00000000
--- a/wqflask/utility/corestats.py
+++ /dev/null
@@ -1,86 +0,0 @@
-#!/usr/bin/env python
-# corestats.py (COREy STATS)
-# Copyright (c) 2006-2007, Corey Goldberg (corey@goldb.org)
-# statistical calculation class
-# for processing numeric sequences
-# license: GNU LGPL
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-import sys
-# ZS: Should switch to using some third party library for this; maybe scipy has an equivalent
-class Stats:
- def __init__(self, sequence):
- # sequence of numbers we will process
- # convert all items to floats for numerical processing
- self.sequence = [float(item) for item in sequence]
- def sum(self):
- if len(self.sequence) < 1:
- return None
- else:
- return sum(self.sequence)
- def count(self):
- return len(self.sequence)
- def min(self):
- if len(self.sequence) < 1:
- return None
- else:
- return min(self.sequence)
- def max(self):
- if len(self.sequence) < 1:
- return None
- else:
- return max(self.sequence)
- def avg(self):
- if len(self.sequence) < 1:
- return None
- else:
- return sum(self.sequence) / len(self.sequence)
- def stdev(self):
- if len(self.sequence) < 1:
- return None
- else:
- avg = self.avg()
- sdsq = sum([(i - avg) ** 2 for i in self.sequence])
- stdev = (sdsq / (len(self.sequence) - 1)) ** .5
- return stdev
- def percentile(self, percentile):
- if len(self.sequence) < 1:
- value = None
- elif (percentile >= 100):
- sys.stderr.write(
- 'ERROR: percentile must be < 100. you supplied: %s\n' % percentile)
- value = None
- else:
- element_idx = int(len(self.sequence) * (percentile / 100.0))
- self.sequence.sort()
- value = self.sequence[element_idx]
- return value
-# Sample script using this class:
-# -------------------------------------------
-# #!/usr/bin/env python
-# import corestats
-# sequence = [1, 2.5, 7, 13.4, 8.0]
-# stats = corestats.Stats(sequence)
-# print stats.avg()
-# print stats.percentile(90)
-# -------------------------------------------
diff --git a/wqflask/utility/corr_result_helpers.py b/wqflask/utility/corr_result_helpers.py
deleted file mode 100644
index ea3ababf..00000000
--- a/wqflask/utility/corr_result_helpers.py
+++ /dev/null
@@ -1,42 +0,0 @@
-def normalize_values(a_values, b_values):
- """
- Trim two lists of values to contain only the values they both share
- Given two lists of sample values, trim each list so that it contains
- only the samples that contain a value in both lists. Also returns
- the number of such samples.
- >>> normalize_values([2.3, None, None, 3.2, 4.1, 5], [3.4, 7.2, 1.3, None, 6.2, 4.1])
- ([2.3, 4.1, 5], [3.4, 6.2, 4.1], 3)
- """
- min_length = min(len(a_values), len(b_values))
- a_new = []
- b_new = []
- for a, b in zip(a_values, b_values):
- if not (a == None or b == None):
- a_new.append(a)
- b_new.append(b)
- return a_new, b_new, len(a_new)
-def common_keys(a_samples, b_samples):
- """
- >>> a = dict(BXD1 = 9.113, BXD2 = 9.825, BXD14 = 8.985, BXD15 = 9.300)
- >>> b = dict(BXD1 = 9.723, BXD3 = 9.825, BXD14 = 9.124, BXD16 = 9.300)
- >>> sorted(common_keys(a, b))
- ['BXD1', 'BXD14']
- """
- return set(a_samples.keys()).intersection(set(b_samples.keys()))
-def normalize_values_with_samples(a_samples, b_samples):
- common_samples = common_keys(a_samples, b_samples)
- a_new = {}
- b_new = {}
- for sample in common_samples:
- a_new[sample] = a_samples[sample]
- b_new[sample] = b_samples[sample]
- return a_new, b_new, len(a_new)
diff --git a/wqflask/utility/db_tools.py b/wqflask/utility/db_tools.py
deleted file mode 100644
index 98da33f2..00000000
--- a/wqflask/utility/db_tools.py
+++ /dev/null
@@ -1,23 +0,0 @@
-from utility.tools import get_setting
-from wqflask.database import database_connection
-def escape_(string):
- with database_connection(get_setting("SQL_URI")) as conn:
- return conn.escape_string(str(string))
-def create_in_clause(items):
- """Create an in clause for mysql"""
- in_clause = ', '.join("'{}'".format(x) for x in mescape(*items))
- in_clause = '( {} )'.format(in_clause)
- return in_clause
-def mescape(*items):
- """Multiple escape"""
- return [escape_(str(item)).decode('utf8') for item in items]
-def escape(string_):
- return escape_(string_).decode('utf8')
diff --git a/wqflask/utility/external.py b/wqflask/utility/external.py
deleted file mode 100644
index 805d2ffe..00000000
--- a/wqflask/utility/external.py
+++ /dev/null
@@ -1,10 +0,0 @@
-# Call external program
-import os
-import sys
-import subprocess
-def shell(command):
- if subprocess.call(command, shell=True) != 0:
- raise Exception("ERROR: failed on " + command)
diff --git a/wqflask/utility/formatting.py b/wqflask/utility/formatting.py
deleted file mode 100644
index 1da3e9b7..00000000
--- a/wqflask/utility/formatting.py
+++ /dev/null
@@ -1,121 +0,0 @@
-def numify(number, singular=None, plural=None):
- """Turn a number into a word if less than 13 and optionally add a singular or plural word
- >>> numify(3)
- 'three'
- >>> numify(1, 'item', 'items')
- 'one item'
- >>> numify(9, 'book', 'books')
- 'nine books'
- You can add capitalize to change the capitalization
- >>> numify(9, 'book', 'books').capitalize()
- 'Nine books'
- Or capitalize every word using title
- >>> numify(9, 'book', 'books').title()
- 'Nine Books'
- >>> numify(15)
- '15'
- >>> numify(0)
- '0'
- >>> numify(12334, 'hippopotamus', 'hippopotami')
- '12,334 hippopotami'
- """
- num_repr = {0: "zero",
- 1: "one",
- 2: "two",
- 3: "three",
- 4: "four",
- 5: "five",
- 6: "six",
- 7: "seven",
- 8: "eight",
- 9: "nine",
- 10: "ten",
- 11: "eleven",
- 12: "twelve"}
- if number == 1:
- word = singular
- else:
- word = plural
- if number in num_repr:
- number = num_repr[number]
- elif number > 9999:
- number = commify(number)
- if word:
- return "%s %s" % (number, word)
- else:
- return str(number)
-def commify(n):
- """Add commas to an integer n.
- See http://stackoverflow.com/questions/3909457/whats-the-easiest-way-to-add-commas-to-an-integer-in-python
- But I (Sam) made some small changes based on http://www.grammarbook.com/numbers/numbers.asp
- >>> commify(1)
- '1'
- >>> commify(123)
- '123'
- >>> commify(1234)
- '1234'
- >>> commify(12345)
- '12,345'
- >>> commify(1234567890)
- '1,234,567,890'
- >>> commify(123.0)
- '123.0'
- >>> commify(1234.5)
- '1234.5'
- >>> commify(1234.56789)
- '1234.56789'
- >>> commify(123456.789)
- '123,456.789'
- >>> commify('%.2f' % 1234.5)
- '1234.50'
- >>> commify(None)
- >>>
- """
- if n is None:
- return None
- n = str(n)
- if len(n) <= 4: # Might as well do this early
- return n
- if '.' in n:
- dollars, cents = n.split('.')
- else:
- dollars, cents = n, None
- # Don't commify numbers less than 10000
- if len(dollars) <= 4:
- return n
- r = []
- for i, c in enumerate(reversed(str(dollars))):
- if i and (not (i % 3)):
- r.insert(0, ',')
- r.insert(0, c)
- out = ''.join(r)
- if cents:
- out += '.' + cents
- return out
-if __name__ == '__main__':
- import doctest
- doctest.testmod()
diff --git a/wqflask/utility/gen_geno_ob.py b/wqflask/utility/gen_geno_ob.py
deleted file mode 100644
index c7a1ea59..00000000
--- a/wqflask/utility/gen_geno_ob.py
+++ /dev/null
@@ -1,182 +0,0 @@
-class genotype:
- """
- Replacement for reaper.Dataset so we can remove qtlreaper use while still generating mapping output figure
- """
- def __init__(self, filename):
- self.group = None
- self.type = "riset"
- self.prgy = []
- self.nprgy = 0
- self.mat = -1
- self.pat = 1
- self.het = 0
- self.unk = "U"
- self.filler = False
- self.mb_exists = False
- # ZS: This is because I'm not sure if some files switch the column that contains Mb/cM positions; might be unnecessary
- self.cm_column = 2
- self.mb_column = 3
- self.chromosomes = []
- self.read_file(filename)
- def __iter__(self):
- return iter(self.chromosomes)
- def __getitem__(self, index):
- return self.chromosomes[index]
- def __len__(self):
- return len(self.chromosomes)
- def read_rdata_output(self, qtl_results):
- # ZS: This is necessary because R/qtl requires centimorgan marker positions, which it normally gets from the .geno file, but that doesn't exist for HET3-ITP (which only has RData), so it needs to read in the marker cM positions from the results
- # ZS: Overwriting since the .geno file's contents are just placeholders
- self.chromosomes = []
- this_chr = "" # ZS: This is so it can track when the chromosome changes as it iterates through markers
- chr_ob = None
- for marker in qtl_results:
- locus = Locus(self)
- # ZS: This is really awkward but works as a temporary fix
- if (str(marker['chr']) != this_chr) and this_chr != "X":
- if this_chr != "":
- self.chromosomes.append(chr_ob)
- this_chr = str(marker['chr'])
- if this_chr == "20":
- this_chr = "X"
- chr_ob = Chr(this_chr, self)
- if 'chr' in marker:
- locus.chr = str(marker['chr'])
- if 'name' in marker:
- locus.name = marker['name']
- if 'Mb' in marker:
- locus.Mb = marker['Mb']
- if 'cM' in marker:
- locus.cM = marker['cM']
- chr_ob.loci.append(locus)
- self.chromosomes.append(chr_ob)
- return self
- def read_file(self, filename):
- with open(filename, 'r') as geno_file:
- lines = geno_file.readlines()
- this_chr = "" # ZS: This is so it can track when the chromosome changes as it iterates through markers
- chr_ob = None
- for line in lines:
- if line[0] == "#":
- continue
- elif line[0] == "@":
- label = line.split(":")[0][1:]
- if label == "name":
- self.group = line.split(":")[1].strip()
- elif label == "filler":
- if line.split(":")[1].strip() == "yes":
- self.filler = True
- elif label == "type":
- self.type = line.split(":")[1].strip()
- elif label == "mat":
- self.mat = line.split(":")[1].strip()
- elif label == "pat":
- self.pat = line.split(":")[1].strip()
- elif label == "het":
- self.het = line.split(":")[1].strip()
- elif label == "unk":
- self.unk = line.split(":")[1].strip()
- else:
- continue
- elif line[:3] == "Chr":
- header_row = line.split("\t")
- if header_row[2] == "Mb":
- self.mb_exists = True
- self.mb_column = 2
- self.cm_column = 3
- elif header_row[3] == "Mb":
- self.mb_exists = True
- self.mb_column = 3
- elif header_row[2] == "cM":
- self.cm_column = 2
- if self.mb_exists:
- self.prgy = header_row[4:]
- else:
- self.prgy = header_row[3:]
- self.nprgy = len(self.prgy)
- else:
- if line.split("\t")[0] != this_chr:
- if this_chr != "":
- self.chromosomes.append(chr_ob)
- this_chr = line.split("\t")[0]
- chr_ob = Chr(line.split("\t")[0], self)
- chr_ob.add_marker(line.split("\t"))
- self.chromosomes.append(chr_ob)
-class Chr:
- def __init__(self, name, geno_ob):
- self.name = name
- self.loci = []
- self.mb_exists = geno_ob.mb_exists
- self.cm_column = geno_ob.cm_column
- self.mb_column = geno_ob.mb_column
- self.geno_ob = geno_ob
- def __iter__(self):
- return iter(self.loci)
- def __getitem__(self, index):
- return self.loci[index]
- def __len__(self):
- return len(self.loci)
- def add_marker(self, marker_row):
- self.loci.append(Locus(self.geno_ob, marker_row))
-class Locus:
- def __init__(self, geno_ob, marker_row=None):
- self.chr = None
- self.name = None
- self.cM = None
- self.Mb = None
- self.genotype = []
- if marker_row:
- self.chr = marker_row[0]
- self.name = marker_row[1]
- try:
- self.cM = float(marker_row[geno_ob.cm_column])
- except:
- self.cM = float(
- marker_row[geno_ob.mb_column]) if geno_ob.mb_exists else 0
- try:
- self.Mb = float(
- marker_row[geno_ob.mb_column]) if geno_ob.mb_exists else None
- except:
- self.Mb = self.cM
- geno_table = {
- geno_ob.mat: -1,
- geno_ob.pat: 1,
- geno_ob.het: 0,
- geno_ob.unk: "U"
- }
- self.genotype = []
- if geno_ob.mb_exists:
- start_pos = 4
- else:
- start_pos = 3
- for allele in marker_row[start_pos:]:
- if allele in list(geno_table.keys()):
- self.genotype.append(geno_table[allele])
- else: # ZS: Some genotype appears that isn't specified in the metadata, make it unknown
- self.genotype.append("U")
diff --git a/wqflask/utility/genofile_parser.py b/wqflask/utility/genofile_parser.py
deleted file mode 100644
index 86d9823e..00000000
--- a/wqflask/utility/genofile_parser.py
+++ /dev/null
@@ -1,100 +0,0 @@
-# CTL analysis for GN2
-# Author / Maintainer: Danny Arends <Danny.Arends@gmail.com>
-import sys
-import os
-import glob
-import traceback
-import gzip
-import simplejson as json
-from pprint import pformat as pf
-class Marker:
- def __init__(self):
- self.name = None
- self.chr = None
- self.cM = None
- self.Mb = None
- self.genotypes = []
-class ConvertGenoFile:
- def __init__(self, input_file):
- self.mb_exists = False
- self.cm_exists = False
- self.markers = []
- self.latest_row_pos = None
- self.latest_col_pos = None
- self.latest_row_value = None
- self.latest_col_value = None
- self.input_fh = open(input_file)
- print("!!!!!!!!!!!!!!!!PARSER!!!!!!!!!!!!!!!!!!")
- self.haplotype_notation = {
- '@mat': "1",
- '@pat': "2",
- '@het': "-999",
- '@unk': "-999"
- }
- self.configurations = {}
- def process_rows(self):
- for self.latest_row_pos, row in enumerate(self.input_fh):
- self.latest_row_value = row
- # Take care of headers
- if not row.strip():
- continue
- if row.startswith('#'):
- continue
- if row.startswith('Chr'):
- if 'Mb' in row.split():
- self.mb_exists = True
- if 'cM' in row.split():
- self.cm_exists = True
- skip = 2 + self.cm_exists + self.mb_exists
- self.individuals = row.split()[skip:]
- continue
- if row.startswith('@'):
- key, _separater, value = row.partition(':')
- key = key.strip()
- value = value.strip()
- if key in self.haplotype_notation:
- self.configurations[value] = self.haplotype_notation[key]
- continue
- if not len(self.configurations):
- raise EmptyConfigurations
- yield row
- def process_csv(self):
- for row in self.process_rows():
- row_items = row.split("\t")
- this_marker = Marker()
- this_marker.name = row_items[1]
- this_marker.chr = row_items[0]
- if self.cm_exists and self.mb_exists:
- this_marker.cM = row_items[2]
- this_marker.Mb = row_items[3]
- genotypes = row_items[4:]
- elif self.cm_exists:
- this_marker.cM = row_items[2]
- genotypes = row_items[3:]
- elif self.mb_exists:
- this_marker.Mb = row_items[2]
- genotypes = row_items[3:]
- else:
- genotypes = row_items[2:]
- for item_count, genotype in enumerate(genotypes):
- if genotype.upper().strip() in self.configurations:
- this_marker.genotypes.append(
- self.configurations[genotype.upper().strip()])
- else:
- print("WARNING:", genotype.upper())
- this_marker.genotypes.append("NA")
- self.markers.append(this_marker.__dict__)
diff --git a/wqflask/utility/helper_functions.py b/wqflask/utility/helper_functions.py
deleted file mode 100644
index 8b20bd74..00000000
--- a/wqflask/utility/helper_functions.py
+++ /dev/null
@@ -1,69 +0,0 @@
-from base import data_set
-from base.trait import create_trait
-from base.species import TheSpecies
-from utility import hmac
-from utility.tools import get_setting
-from wqflask.database import database_connection
-def get_species_dataset_trait(self, start_vars):
- if "temp_trait" in list(start_vars.keys()):
- if start_vars['temp_trait'] == "True":
- self.dataset = data_set.create_dataset(
- dataset_name="Temp",
- dataset_type="Temp",
- group_name=start_vars['group'])
- else:
- self.dataset = data_set.create_dataset(start_vars['dataset'])
- else:
- self.dataset = data_set.create_dataset(start_vars['dataset'])
- self.species = TheSpecies(dataset=self.dataset)
- self.this_trait = create_trait(dataset=self.dataset,
- name=start_vars['trait_id'],
- cellid=None,
- get_qtl_info=True)
-def get_trait_db_obs(self, trait_db_list):
- if isinstance(trait_db_list, str):
- trait_db_list = trait_db_list.split(",")
- self.trait_list = []
- for trait in trait_db_list:
- data, _separator, hmac_string = trait.rpartition(':')
- data = data.strip()
- assert hmac_string == hmac.hmac_creation(data), "Data tampering?"
- trait_name, dataset_name = data.split(":")[:2]
- if dataset_name == "Temp":
- dataset_ob = data_set.create_dataset(
- dataset_name=dataset_name, dataset_type="Temp",
- group_name=trait_name.split("_")[2])
- else:
- dataset_ob = data_set.create_dataset(dataset_name)
- trait_ob = create_trait(dataset=dataset_ob,
- name=trait_name,
- cellid=None)
- if trait_ob:
- self.trait_list.append((trait_ob, dataset_ob))
-def get_species_groups():
- """Group each species into a group"""
- _menu = {}
- species, group_name = None, None
- with database_connection(get_setting("SQL_URI")) as conn, conn.cursor() as cursor:
- cursor.execute(
- "SELECT s.MenuName, i.InbredSetName FROM InbredSet i "
- "INNER JOIN Species s ON s.SpeciesId = i.SpeciesId "
- "ORDER BY i.SpeciesId ASC, i.Name ASC"
- )
- for species, group_name in cursor.fetchall():
- if species in _menu:
- if _menu.get(species):
- _menu = _menu[species].append(group_name)
- else:
- _menu[species] = [group_name]
- return [{"species": key,
- "groups": value} for key, value in
- list(_menu.items())]
diff --git a/wqflask/utility/hmac.py b/wqflask/utility/hmac.py
deleted file mode 100644
index 29891677..00000000
--- a/wqflask/utility/hmac.py
+++ /dev/null
@@ -1,43 +0,0 @@
-import hmac
-import hashlib
-from flask import url_for
-from wqflask import app
-def hmac_creation(stringy):
- """Helper function to create the actual hmac"""
- secret = app.config['SECRET_HMAC_CODE']
- hmaced = hmac.new(bytearray(secret, "latin-1"),
- bytearray(stringy, "utf-8"),
- hashlib.sha1)
- hm = hmaced.hexdigest()
- # ZS: Leaving the below comment here to ask Pjotr about
- # "Conventional wisdom is that you don't lose much in terms of security if you throw away up to half of the output."
- # http://www.w3.org/QA/2009/07/hmac_truncation_in_xml_signatu.html
- hm = hm[:20]
- return hm
-def data_hmac(stringy):
- """Takes arbitrary data string and appends :hmac so we know data hasn't been tampered with"""
- return stringy + ":" + hmac_creation(stringy)
-def url_for_hmac(endpoint, **values):
- """Like url_for but adds an hmac at the end to insure the url hasn't been tampered with"""
- url = url_for(endpoint, **values)
- hm = hmac_creation(url)
- if '?' in url:
- combiner = "&"
- else:
- combiner = "?"
- return url + combiner + "hm=" + hm
- data_hmac=data_hmac)
diff --git a/wqflask/utility/json/__init__.py b/wqflask/utility/json/__init__.py
deleted file mode 100644
index b1141a34..00000000
--- a/wqflask/utility/json/__init__.py
+++ /dev/null
@@ -1,3 +0,0 @@
-"""Local JSON utilities."""
-from .encoders import CustomJSONEncoder
diff --git a/wqflask/utility/json/encoders.py b/wqflask/utility/json/encoders.py
deleted file mode 100644
index 7c5839ac..00000000
--- a/wqflask/utility/json/encoders.py
+++ /dev/null
@@ -1,17 +0,0 @@
-"""Custom JSON encoders"""
-from uuid import UUID
-from json import JSONEncoder
-# Do not use this `__ENCODERS__` variable outside of this module.
-__ENCODERS__ = {
- UUID: lambda obj: str(obj)
-class CustomJSONEncoder(JSONEncoder):
- """Custom JSONEncoder class."""
- def default(self, obj):
- """Serialise `obj` to a JSON representation."""
- obj_type = type(obj)
- if obj_type in __ENCODERS__:
- return __ENCODERS__[obj_type](obj)
- return JSONEncoder.default(self, obj)
diff --git a/wqflask/utility/monads.py b/wqflask/utility/monads.py
deleted file mode 100644
index 2d708261..00000000
--- a/wqflask/utility/monads.py
+++ /dev/null
@@ -1,114 +0,0 @@
-"""Monadic utilities
-This module is a collection of monadic utilities for use in
-GeneNetwork. It includes:
-* MonadicDict - monadic version of the built-in dictionary
-* MonadicDictCursor - monadic version of MySQLdb.cursors.DictCursor
- that returns a MonadicDict instead of the built-in dictionary
-from collections import UserDict
-from functools import partial
-from MySQLdb.cursors import DictCursor
-from pymonad.maybe import Just, Nothing
-class MonadicDict(UserDict):
- """
- Monadic version of the built-in dictionary.
- Keys in this dictionary can be any python object, but values must
- be monadic values.
- from pymonad.maybe import Just, Nothing
- Initialize by setting individual keys to monadic values.
- >>> d = MonadicDict()
- >>> d["foo"] = Just(1)
- >>> d["bar"] = Nothing
- >>> d
- {'foo': 1}
- Initialize by converting a built-in dictionary object.
- >>> MonadicDict({"foo": 1})
- {'foo': 1}
- >>> MonadicDict({"foo": 1, "bar": None})
- {'foo': 1}
- Initialize from a built-in dictionary object with monadic values.
- >>> MonadicDict({"foo": Just(1)}, convert=False)
- {'foo': 1}
- >>> MonadicDict({"foo": Just(1), "bar": Nothing}, convert=False)
- {'foo': 1}
- Get values. For non-existent keys, Nothing is returned. Else, a
- Just value is returned.
- >>> d["foo"]
- Just 1
- >>> d["bar"]
- Nothing
- Convert MonadicDict object to a built-in dictionary object.
- >>> d.data
- {'foo': 1}
- >>> type(d)
- <class 'utility.monads.MonadicDict'>
- >>> type(d.data)
- <class 'dict'>
- Delete keys. Deleting non-existent keys does nothing.
- >>> del d["bar"]
- >>> d
- {'foo': 1}
- >>> del d["foo"]
- >>> d
- {}
- """
- def __init__(self, d={}, convert=True):
- """Initialize monadic dictionary.
- If convert is False, values in dictionary d must be
- monadic. If convert is True, values in dictionary d are
- converted to monadic values.
- """
- if convert:
- super().__init__({key:Just(value) for key, value in d.items()
- if value is not None})
- else:
- super().__init__(d)
- def __getitem__(self, key):
- """Get key from dictionary.
- If key exists in the dictionary, return a Just value. Else,
- return Nothing.
- """
- try:
- return Just(self.data[key])
- except KeyError:
- return Nothing
- def __setitem__(self, key, value):
- """Set key in dictionary.
- value must be a monadic value---either Nothing or a Just
- value. If value is a Just value, set it in the dictionary. If
- value is Nothing, do nothing.
- """
- value.bind(partial(super().__setitem__, key))
- def __delitem__(self, key):
- """Delete key from dictionary.
- If key exists in the dictionary, delete it. Else, do nothing.
- """
- try:
- super().__delitem__(key)
- except KeyError:
- pass
-def sql_query_mdict(conn, query):
- """Execute SQL query and return a generator of MonadicDict objects."""
- with conn.cursor(DictCursor) as cursor:
- cursor.execute(query)
- while (row := cursor.fetchone()):
- yield MonadicDict(row)
diff --git a/wqflask/utility/pillow_utils.py b/wqflask/utility/pillow_utils.py
deleted file mode 100644
index e302df18..00000000
--- a/wqflask/utility/pillow_utils.py
+++ /dev/null
@@ -1,28 +0,0 @@
-from PIL import Image, ImageColor, ImageDraw, ImageFont
-from utility.tools import TEMPDIR
-BLACK = ImageColor.getrgb("black")
-WHITE = ImageColor.getrgb("white")
-# def draw_rotated_text(canvas: Image, text: str, font: ImageFont, xy: tuple, fill: ImageColor=BLACK, angle: int=-90):
-def draw_rotated_text(canvas, text, font, xy, fill=BLACK, angle=-90):
- # type: (Image, str, ImageFont, tuple, ImageColor, int)
- """Utility function draw rotated text"""
- tmp_img = Image.new("RGBA", font.getsize(text), color=(0, 0, 0, 0))
- draw_text = ImageDraw.Draw(tmp_img)
- draw_text.text(text=text, xy=(0, 0), font=font, fill=fill)
- tmp_img2 = tmp_img.rotate(angle, expand=1)
- tmp_img2.save("/{0}/{1}.png".format(TEMPDIR, text), format="png")
- canvas.paste(im=tmp_img2, box=tuple([int(i) for i in xy]))
-# def draw_open_polygon(canvas: Image, xy: tuple, fill: ImageColor=WHITE, outline: ImageColor=BLACK):
-def draw_open_polygon(canvas, xy, fill=None, outline=BLACK, width=0):
- # type: (Image, tuple, ImageColor, ImageColor)
- draw_ctx = ImageDraw.Draw(canvas)
- draw_ctx.polygon(xy, fill=fill)
- draw_ctx.line(xy, fill=outline, width=width)
diff --git a/wqflask/utility/redis_tools.py b/wqflask/utility/redis_tools.py
deleted file mode 100644
index 945efbbd..00000000
--- a/wqflask/utility/redis_tools.py
+++ /dev/null
@@ -1,285 +0,0 @@
-import uuid
-import simplejson as json
-import datetime
-import redis # used for collections
-from utility.hmac import hmac_creation
-def get_redis_conn():
- Redis = redis.StrictRedis(port=6379)
- return Redis
-Redis = get_redis_conn()
-def is_redis_available():
- try:
- Redis.ping()
- except:
- return False
- return True
-def load_json_from_redis(item_list, column_value):
- if type(column_value) == str:
- column_value = str.encode(column_value)
- try:
- return json.loads(item_list[column_value])
- except:
- return None
-def get_user_id(column_name, column_value):
- user_list = Redis.hgetall("users")
- key_list = []
- for key in user_list:
- user_ob = json.loads(user_list[key])
- if column_name in user_ob and user_ob[column_name] == column_value:
- return key
- return None
-def get_user_by_unique_column(column_name, column_value):
- item_details = None
- user_list = Redis.hgetall("users")
- if column_name != "user_id":
- for key in user_list:
- user_ob = json.loads(user_list[key])
- if column_name in user_ob and user_ob[column_name] == column_value:
- item_details = user_ob
- else:
- item_details = load_json_from_redis(user_list, column_value)
- return item_details
-def set_user_attribute(user_id, column_name, column_value):
- user_info = json.loads(Redis.hget("users", user_id))
- user_info[column_name] = column_value
- Redis.hset("users", user_id, json.dumps(user_info))
-def get_user_collections(user_id):
- collections = None
- collections = Redis.hget("collections", user_id)
- if collections:
- return json.loads(collections)
- else:
- return []
-def save_user(user, user_id):
- Redis.hset("users", user_id, json.dumps(user))
-def save_collections(user_id, collections_ob):
- Redis.hset("collections", user_id, collections_ob)
-def save_verification_code(user_email, code):
- Redis.hset("verification_codes", code, user_email)
-def check_verification_code(code):
- email_address = None
- user_details = None
- email_address = Redis.hget("verification_codes", code)
- if email_address:
- user_details = get_user_by_unique_column(
- 'email_address', email_address)
- if user_details:
- return user_details
- else:
- return None
- else:
- return None
-def get_user_groups(user_id):
- # Get the groups where a user is an admin or a member and
- # return lists corresponding to those two sets of groups
- admin_group_ids = [] # Group IDs where user is an admin
- user_group_ids = [] # Group IDs where user is a regular user
- groups_list = Redis.hgetall("groups")
- for group_id, group_details in groups_list.items():
- try:
- _details = json.loads(group_details)
- group_admins = set([this_admin if this_admin else None for this_admin in _details['admins']])
- group_members = set([this_member if this_member else None for this_member in _details['members']])
- if user_id in group_admins:
- admin_group_ids.append(group_id)
- elif user_id in group_members:
- user_group_ids.append(group_id)
- else:
- continue
- except:
- continue
- admin_groups = []
- user_groups = []
- for the_id in admin_group_ids:
- admin_groups.append(get_group_info(the_id))
- for the_id in user_group_ids:
- user_groups.append(get_group_info(the_id))
- return admin_groups, user_groups
-def get_group_info(group_id):
- group_json = Redis.hget("groups", group_id)
- group_info = None
- if group_json:
- group_info = json.loads(group_json)
- return group_info
-def create_group(admin_user_ids, member_user_ids=[],
- group_name="Default Group Name"):
- group_id = str(uuid.uuid4())
- new_group = {
- "id": group_id,
- "admins": admin_user_ids,
- "members": member_user_ids,
- "name": group_name,
- "created_timestamp": datetime.datetime.utcnow().strftime('%b %d %Y %I:%M%p'),
- "changed_timestamp": datetime.datetime.utcnow().strftime('%b %d %Y %I:%M%p')
- }
- Redis.hset("groups", group_id, json.dumps(new_group))
- return new_group
-def delete_group(user_id, group_id):
- # ZS: If user is an admin of a group, remove it from the groups hash
- group_info = get_group_info(group_id)
- if user_id in group_info["admins"]:
- Redis.hdel("groups", group_id)
- return get_user_groups(user_id)
- else:
- None
-# ZS "admins" is just to indicate whether the users should be added to
-# the groups admins or regular users set
-def add_users_to_group(user_id, group_id, user_emails=[], admins=False):
- group_info = get_group_info(group_id)
- # ZS: Just to make sure that the user is an admin for the group,
- # even though they shouldn't be able to reach this point unless
- # they are
- if user_id in group_info["admins"]:
- if admins:
- group_users = set(group_info["admins"])
- else:
- group_users = set(group_info["members"])
- for email in user_emails:
- user_id = get_user_id("email_address", email)
- group_users.add(user_id)
- if admins:
- group_info["admins"] = list(group_users)
- else:
- group_info["members"] = list(group_users)
- group_info["changed_timestamp"] = datetime.datetime.utcnow().strftime(
- '%b %d %Y %I:%M%p')
- Redis.hset("groups", group_id, json.dumps(group_info))
- return group_info
- else:
- return None
-# ZS: User type is because I assume admins can remove other admins
-def remove_users_from_group(user_id,
- users_to_remove_ids,
- group_id,
- user_type="members"):
- group_info = get_group_info(group_id)
- if user_id in group_info["admins"]:
- users_to_remove_set = set(users_to_remove_ids)
- # ZS: Make sure an admin can't remove themselves from a group,
- # since I imagine we don't want groups to be able to become
- # admin-less
- if user_type == "admins" and user_id in users_to_remove_set:
- users_to_remove_set.remove(user_id)
- group_users = set(group_info[user_type])
- group_users -= users_to_remove_set
- group_info[user_type] = list(group_users)
- group_info["changed_timestamp"] = datetime.datetime.utcnow().strftime(
- '%b %d %Y %I:%M%p')
- Redis.hset("groups", group_id, json.dumps(group_info))
-def change_group_name(user_id, group_id, new_name):
- group_info = get_group_info(group_id)
- if user_id in group_info["admins"]:
- group_info["name"] = new_name
- Redis.hset("groups", group_id, json.dumps(group_info))
- return group_info
- else:
- return None
-def get_resources():
- resource_list = Redis.hgetall("resources")
- return resource_list
-def get_resource_id(dataset, trait_id=None):
- resource_id = False
- if dataset.type == "Publish":
- if trait_id:
- resource_id = hmac_creation("{}:{}:{}".format(
- 'dataset-publish', dataset.id, trait_id))
- elif dataset.type == "ProbeSet":
- resource_id = hmac_creation(
- "{}:{}".format('dataset-probeset', dataset.id))
- elif dataset.type == "Geno":
- resource_id = hmac_creation(
- "{}:{}".format('dataset-geno', dataset.id))
- return resource_id
-def get_resource_info(resource_id):
- resource_info = Redis.hget("resources", resource_id)
- if resource_info:
- return json.loads(resource_info)
- else:
- return None
-def add_resource(resource_info, update=True):
- if 'trait' in resource_info['data']:
- resource_id = hmac_creation('{}:{}:{}'.format(
- str(resource_info['type']), str(
- resource_info['data']['dataset']),
- str(resource_info['data']['trait'])))
- else:
- resource_id = hmac_creation('{}:{}'.format(
- str(resource_info['type']), str(resource_info['data']['dataset'])))
- if update or not Redis.hexists("resources", resource_id):
- Redis.hset("resources", resource_id, json.dumps(resource_info))
- return resource_info
-def add_access_mask(resource_id, group_id, access_mask):
- the_resource = get_resource_info(resource_id)
- the_resource['group_masks'][group_id] = access_mask
- Redis.hset("resources", resource_id, json.dumps(the_resource))
- return the_resource
diff --git a/wqflask/utility/startup_config.py b/wqflask/utility/startup_config.py
deleted file mode 100644
index 69cac124..00000000
--- a/wqflask/utility/startup_config.py
+++ /dev/null
@@ -1,41 +0,0 @@
-from wqflask import app
-from utility.tools import WEBSERVER_MODE
-from utility.tools import show_settings
-from utility.tools import get_setting_int
-from utility.tools import get_setting
-from utility.tools import get_setting_bool
-BLUE = '\033[94m'
-GREEN = '\033[92m'
-BOLD = '\033[1m'
-ENDC = '\033[0m'
-def app_config():
- app.config['SESSION_TYPE'] = app.config.get('SESSION_TYPE', 'filesystem')
- if not app.config.get('SECRET_KEY'):
- import os
- app.config['SECRET_KEY'] = str(os.urandom(24))
- if mode in ["DEV", "DEBUG"]:
- app.config['TEMPLATES_AUTO_RELOAD'] = True
- if mode == "DEBUG":
- app.debug = True
- print("==========================================")
- show_settings()
- port = get_setting_int("SERVER_PORT")
- if get_setting_bool("USE_GN_SERVER"):
- print(f"GN2 API server URL is [{BLUE}GN_SERVER_URL{ENDC}]")
- import requests
- page = requests.get(get_setting("GN_SERVER_URL"))
- if page.status_code != 200:
- raise Exception("API server not found!")
- print(f"GN2 is running. Visit {BLUE}"
- f"[http://localhost:{str(port)}/{ENDC}]"
- f"({get_setting('WEBSERVER_URL')})")
diff --git a/wqflask/utility/svg.py b/wqflask/utility/svg.py
deleted file mode 100644
index 912cd04c..00000000
--- a/wqflask/utility/svg.py
+++ /dev/null
@@ -1,1179 +0,0 @@
-# Copyright (C) University of Tennessee Health Science Center, Memphis, TN.
-# This program is free software: you can redistribute it and/or modify it
-# under the terms of the GNU Affero General Public License
-# as published by the Free Software Foundation, either version 3 of the
-# License, or (at your option) any later version.
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# See the GNU Affero General Public License for more details.
-# This program is available from Source Forge: at GeneNetwork Project
-# (sourceforge.net/projects/genenetwork/).
-# Contact Drs. Robert W. Williams and Xiaodong Zhou (2010)
-# at rwilliams@uthsc.edu and xzhou15@uthsc.edu
-# This module is used by GeneNetwork project (www.genenetwork.org)
-# Created by GeneNetwork Core Team 2010/08/10
-# Last updated by GeneNetwork Core Team 2010/10/20
-#!/usr/bin/env python
-# Copyright (c) 2002, Fedor Baart & Hans de Wit (Stichting Farmaceutische Kengetallen)
-# All rights reserved.
-# Redistribution and use in source and binary forms, with or without modification,
-# are permitted provided that the following conditions are met:
-# Redistributions of source code must retain the above copyright notice, this
-# list of conditions and the following disclaimer.
-# Redistributions in binary form must reproduce the above copyright notice,
-# this list of conditions and the following disclaimer in the documentation and/or
-# other materials provided with the distribution.
-# Neither the name of the Stichting Farmaceutische Kengetallen nor the names of
-# its contributors may be used to endorse or promote products derived from this
-# software without specific prior written permission.
-# Thanks to Gerald Rosennfellner for his help and useful comments.
-import sys
-import exceptions
-__doc__ = """Use SVGdraw to generate your SVGdrawings.
-SVGdraw uses an object model drawing and a method toXML to create SVG graphics
-by using easy to use classes and methods usualy you start by creating a drawing eg
- d=drawing()
- # then you create a SVG root element
- s=svg()
- # then you add some elements eg a circle and add it to the svg root element
- c=circle()
- # you can supply attributes by using named arguments.
- c=circle(fill='red',stroke='blue')
- # or by updating the attributes attribute:
- c.attributes['stroke-width']=1
- s.addElement(c)
- # then you add the svg root element to the drawing
- d.setSVG(s)
- # and finaly you xmlify the drawing
- d.toXml()
-this results in the svg source of the drawing, which consists of a circle
-on a white background. Its as easy as that;)
-This module was created using the SVG specification of www.w3c.org and the
-O'Reilly (www.oreilly.com) python books as information sources. A svg viewer
-is available from www.adobe.com"""
-__version__ = "1.0"
-# there are two possibilities to generate svg:
-# via a dom implementation and directly using <element>text</element> strings
-# the latter is way faster (and shorter in coding)
-# the former is only used in debugging svg programs
-# maybe it will be removed alltogether after a while
-# with the following variable you indicate whether to use the dom implementation
-# Note that PyXML is required for using the dom implementation.
-# It is also possible to use the standard minidom. But I didn't try that one.
-# Anyway the text based approach is about 60 times faster than using the full dom implementation.
-use_dom_implementation = 0
-if use_dom_implementation != 0:
- try:
- from xml.dom import implementation
- from xml.dom.ext import PrettyPrint
- except:
- raise exceptions.ImportError(
- "PyXML is required for using the dom implementation")
-# The implementation is used for the creating the XML document.
-# The prettyprint module is used for converting the xml document object to a xml file
-sys.setrecursionlimit = 50
-# The recursion limit is set conservative so mistakes like s=svg() s.addElement(s)
-# won't eat up too much processor time.
-# the following code is pasted form xml.sax.saxutils
-# it makes it possible to run the code without the xml sax package installed
-# To make it possible to have <rubbish> in your text elements, it is necessary to escape the texts
-def _escape(data, entities={}):
- """Escape &, <, and > in a string of data.
- You can escape other strings of data by passing a dictionary as
- the optional entities parameter. The keys and values must all be
- strings; each key will be replaced with its corresponding value.
- """
- # data = data.replace("&", "&amp;")
- data = data.replace("<", "&lt;")
- data = data.replace(">", "&gt;")
- for chars, entity in list(entities.items()):
- data = data.replace(chars, entity)
- return data
-def _quoteattr(data, entities={}):
- """Escape and quote an attribute value.
- Escape &, <, and > in a string of data, then quote it for use as
- an attribute value. The \" character will be escaped as well, if
- necessary.
- You can escape other strings of data by passing a dictionary as
- the optional entities parameter. The keys and values must all be
- strings; each key will be replaced with its corresponding value.
- """
- data = _escape(data, entities)
- if '"' in data:
- if "'" in data:
- data = '"%s"' % data.replace('"', "&quot;")
- else:
- data = "'%s'" % data
- else:
- data = '"%s"' % data
- return data
-def _xypointlist(a):
- """formats a list of xy pairs"""
- s = ''
- for e in a: # this could be done more elegant
- s += str(e)[1:-1] + ' '
- return s
-def _viewboxlist(a):
- """formats a tuple"""
- s = ''
- for e in a:
- s += str(e) + ' '
- return s
-def _pointlist(a):
- """formats a list of numbers"""
- return str(a)[1:-1]
-class pathdata:
- """class used to create a pathdata object which can be used for a path.
- although most methods are pretty straightforward it might be useful to look at the SVG specification."""
- # I didn't test the methods below.
- def __init__(self, x=None, y=None):
- self.path = []
- if x is not None and y is not None:
- self.path.append('M ' + str(x) + ' ' + str(y))
- def closepath(self):
- """ends the path"""
- self.path.append('z')
- def move(self, x, y):
- """move to absolute"""
- self.path.append('M ' + str(x) + ' ' + str(y))
- def relmove(self, x, y):
- """move to relative"""
- self.path.append('m ' + str(x) + ' ' + str(y))
- def line(self, x, y):
- """line to absolute"""
- self.path.append('L ' + str(x) + ' ' + str(y))
- def relline(self, x, y):
- """line to relative"""
- self.path.append('l ' + str(x) + ' ' + str(y))
- def hline(self, x):
- """horizontal line to absolute"""
- self.path.append('H' + str(x))
- def relhline(self, x):
- """horizontal line to relative"""
- self.path.append('h' + str(x))
- def vline(self, y):
- """verical line to absolute"""
- self.path.append('V' + str(y))
- def relvline(self, y):
- """vertical line to relative"""
- self.path.append('v' + str(y))
- def bezier(self, x1, y1, x2, y2, x, y):
- """bezier with xy1 and xy2 to xy absolut"""
- self.path.append('C' + str(x1) + ',' + str(y1) + ' ' + str(x2)
- + ',' + str(y2) + ' ' + str(x) + ',' + str(y))
- def relbezier(self, x1, y1, x2, y2, x, y):
- """bezier with xy1 and xy2 to xy relative"""
- self.path.append('c' + str(x1) + ',' + str(y1) + ' ' + str(x2)
- + ',' + str(y2) + ' ' + str(x) + ',' + str(y))
- def smbezier(self, x2, y2, x, y):
- """smooth bezier with xy2 to xy absolut"""
- self.path.append('S' + str(x2) + ',' + str(y2) + \
- ' ' + str(x) + ',' + str(y))
- def relsmbezier(self, x2, y2, x, y):
- """smooth bezier with xy2 to xy relative"""
- self.path.append('s' + str(x2) + ',' + str(y2) + \
- ' ' + str(x) + ',' + str(y))
- def qbezier(self, x1, y1, x, y):
- """quadratic bezier with xy1 to xy absolut"""
- self.path.append('Q' + str(x1) + ',' + str(y1) + \
- ' ' + str(x) + ',' + str(y))
- def relqbezier(self, x1, y1, x, y):
- """quadratic bezier with xy1 to xy relative"""
- self.path.append('q' + str(x1) + ',' + str(y1) + \
- ' ' + str(x) + ',' + str(y))
- def smqbezier(self, x, y):
- """smooth quadratic bezier to xy absolut"""
- self.path.append('T' + str(x) + ',' + str(y))
- def relsmqbezier(self, x, y):
- """smooth quadratic bezier to xy relative"""
- self.path.append('t' + str(x) + ',' + str(y))
- def ellarc(self, rx, ry, xrot, laf, sf, x, y):
- """elliptival arc with rx and ry rotating with xrot using large-arc-flag and sweep-flag to xy absolut"""
- self.path.append('A' + str(rx) + ',' + str(ry) + ' ' + str(xrot)
- + ' ' + str(laf) + ' ' + str(sf) + ' ' + str(x) + ' ' + str(y))
- def relellarc(self, rx, ry, xrot, laf, sf, x, y):
- """elliptival arc with rx and ry rotating with xrot using large-arc-flag and sweep-flag to xy relative"""
- self.path.append('a' + str(rx) + ',' + str(ry) + ' ' + str(xrot)
- + ' ' + str(laf) + ' ' + str(sf) + ' ' + str(x) + ' ' + str(y))
- def __repr__(self):
- return ' '.join(self.path)
-class SVGelement:
- """SVGelement(type,attributes,elements,text,namespace,**args)
- Creates a arbitrary svg element and is intended to be subclassed not used on its own.
- This element is the base of every svg element it defines a class which resembles
- a xml-element. The main advantage of this kind of implementation is that you don't
- have to create a toXML method for every different graph object. Every element
- consists of a type, attribute, optional subelements, optional text and an optional
- namespace. Note the elements==None, if elements = None:self.elements=[] construction.
- This is done because if you default to elements=[] every object has a reference
- to the same empty list."""
- def __init__(self, type='', attributes=None, elements=None, text='', namespace='', cdata=None, **args):
- self.type = type
- if attributes == None:
- self.attributes = {}
- else:
- self.attributes = attributes
- if elements == None:
- self.elements = []
- else:
- self.elements = elements
- self.text = text
- self.namespace = namespace
- self.cdata = cdata
- for arg in list(args.keys()):
- arg2 = arg.replace("__", ":")
- arg2 = arg2.replace("_", "-")
- self.attributes[arg2] = args[arg]
- def addElement(self, SVGelement):
- """adds an element to a SVGelement
- SVGelement.addElement(SVGelement)
- """
- self.elements.append(SVGelement)
- def toXml(self, level, f):
- f.write('\t' * level)
- f.write('<' + self.type)
- for attkey in list(self.attributes.keys()):
- f.write(' ' + _escape(str(attkey)) + '='
- + _quoteattr(str(self.attributes[attkey])))
- if self.namespace:
- f.write(' xmlns="' + _escape(str(self.namespace))
- + '" xmlns:xlink="http://www.w3.org/1999/xlink"')
- if self.elements or self.text or self.cdata:
- f.write('>')
- if self.elements:
- f.write('\n')
- for element in self.elements:
- element.toXml(level + 1, f)
- if self.cdata:
- f.write('\n' + '\t' * (level + 1) + '<![CDATA[')
- for line in self.cdata.splitlines():
- f.write('\n' + '\t' * (level + 2) + line)
- f.write('\n' + '\t' * (level + 1) + ']]>\n')
- if self.text:
- if isinstance(self.text, type('')): # If the text is only text
- f.write(_escape(str(self.text)))
- else: # If the text is a spannedtext class
- f.write(str(self.text))
- if self.elements:
- f.write('\t' * level + '</' + self.type + '>\n')
- elif self.text:
- f.write('</' + self.type + '>\n')
- elif self.cdata:
- f.write('\t' * level + '</' + self.type + '>\n')
- else:
- f.write('/>\n')
-class tspan(SVGelement):
- """ts=tspan(text='',**args)
- a tspan element can be used for applying formatting to a textsection
- usage:
- ts=tspan('this text is bold')
- ts.attributes['font-weight']='bold'
- st=spannedtext()
- st.addtspan(ts)
- t=text(3,5,st)
- """
- def __init__(self, text=None, **args):
- SVGelement.__init__(self, 'tspan', **args)
- if self.text != None:
- self.text = text
- def __repr__(self):
- s = "<tspan"
- for key, value in list(self.attributes.items()):
- s += ' %s="%s"' % (key, value)
- s += '>'
- s += self.text
- s += '</tspan>'
- return s
-class tref(SVGelement):
- """tr=tref(link='',**args)
- a tref element can be used for referencing text by a link to its id.
- usage:
- tr=tref('#linktotext')
- st=spannedtext()
- st.addtref(tr)
- t=text(3,5,st)
- """
- def __init__(self, link, **args):
- SVGelement.__init__(self, 'tref', {'xlink:href': link}, **args)
- def __repr__(self):
- s = "<tref"
- for key, value in list(self.attributes.items()):
- s += ' %s="%s"' % (key, value)
- s += '/>'
- return s
-class spannedtext:
- """st=spannedtext(textlist=[])
- a spannedtext can be used for text which consists of text, tspan's and tref's
- You can use it to add to a text element or path element. Don't add it directly
- to a svg or a group element.
- usage:
- ts=tspan('this text is bold')
- ts.attributes['font-weight']='bold'
- tr=tref('#linktotext')
- tr.attributes['fill']='red'
- st=spannedtext()
- st.addtspan(ts)
- st.addtref(tr)
- st.addtext('This text is not bold')
- t=text(3,5,st)
- """
- def __init__(self, textlist=None):
- if textlist == None:
- self.textlist = []
- else:
- self.textlist = textlist
- def addtext(self, text=''):
- self.textlist.append(text)
- def addtspan(self, tspan):
- self.textlist.append(tspan)
- def addtref(self, tref):
- self.textlist.append(tref)
- def __repr__(self):
- s = ""
- for element in self.textlist:
- s += str(element)
- return s
-class rect(SVGelement):
- """r=rect(width,height,x,y,fill,stroke,stroke_width,**args)
- a rectangle is defined by a width and height and a xy pair
- """
- def __init__(self, x=None, y=None, width=None, height=None, fill=None, stroke=None, stroke_width=None, **args):
- if width == None or height == None:
- raise ValueError('both height and width are required')
- SVGelement.__init__(
- self, 'rect', {'width': width, 'height': height}, **args)
- if x != None:
- self.attributes['x'] = x
- if y != None:
- self.attributes['y'] = y
- if fill != None:
- self.attributes['fill'] = fill
- if stroke != None:
- self.attributes['stroke'] = stroke
- if stroke_width != None:
- self.attributes['stroke-width'] = stroke_width
-class ellipse(SVGelement):
- """e=ellipse(rx,ry,x,y,fill,stroke,stroke_width,**args)
- an ellipse is defined as a center and a x and y radius.
- """
- def __init__(self, cx=None, cy=None, rx=None, ry=None, fill=None, stroke=None, stroke_width=None, **args):
- if rx == None or ry == None:
- raise ValueError('both rx and ry are required')
- SVGelement.__init__(self, 'ellipse', {'rx': rx, 'ry': ry}, **args)
- if cx != None:
- self.attributes['cx'] = cx
- if cy != None:
- self.attributes['cy'] = cy
- if fill != None:
- self.attributes['fill'] = fill
- if stroke != None:
- self.attributes['stroke'] = stroke
- if stroke_width != None:
- self.attributes['stroke-width'] = stroke_width
-class circle(SVGelement):
- """c=circle(x,y,radius,fill,stroke,stroke_width,**args)
- The circle creates an element using a x, y and radius values eg
- """
- def __init__(self, cx=None, cy=None, r=None, fill=None, stroke=None, stroke_width=None, **args):
- if r == None:
- raise ValueError('r is required')
- SVGelement.__init__(self, 'circle', {'r': r}, **args)
- if cx != None:
- self.attributes['cx'] = cx
- if cy != None:
- self.attributes['cy'] = cy
- if fill != None:
- self.attributes['fill'] = fill
- if stroke != None:
- self.attributes['stroke'] = stroke
- if stroke_width != None:
- self.attributes['stroke-width'] = stroke_width
-class point(circle):
- """p=point(x,y,color)
- A point is defined as a circle with a size 1 radius. It may be more efficient to use a
- very small rectangle if you use many points because a circle is difficult to render.
- """
- def __init__(self, x, y, fill='black', **args):
- circle.__init__(self, x, y, 1, fill, **args)
-class line(SVGelement):
- """l=line(x1,y1,x2,y2,stroke,stroke_width,**args)
- A line is defined by a begin x,y pair and an end x,y pair
- """
- def __init__(self, x1=None, y1=None, x2=None, y2=None, stroke=None, stroke_width=None, **args):
- SVGelement.__init__(self, 'line', **args)
- if x1 != None:
- self.attributes['x1'] = x1
- if y1 != None:
- self.attributes['y1'] = y1
- if x2 != None:
- self.attributes['x2'] = x2
- if y2 != None:
- self.attributes['y2'] = y2
- if stroke_width != None:
- self.attributes['stroke-width'] = stroke_width
- if stroke != None:
- self.attributes['stroke'] = stroke
-class polyline(SVGelement):
- """pl=polyline([[x1,y1],[x2,y2],...],fill,stroke,stroke_width,**args)
- a polyline is defined by a list of xy pairs
- """
- def __init__(self, points, fill=None, stroke=None, stroke_width=None, **args):
- SVGelement.__init__(self, 'polyline', {
- 'points': _xypointlist(points)}, **args)
- if fill != None:
- self.attributes['fill'] = fill
- if stroke_width != None:
- self.attributes['stroke-width'] = stroke_width
- if stroke != None:
- self.attributes['stroke'] = stroke
-class polygon(SVGelement):
- """pl=polyline([[x1,y1],[x2,y2],...],fill,stroke,stroke_width,**args)
- a polygon is defined by a list of xy pairs
- """
- def __init__(self, points, fill=None, stroke=None, stroke_width=None, **args):
- SVGelement.__init__(
- self, 'polygon', {'points': _xypointlist(points)}, **args)
- if fill != None:
- self.attributes['fill'] = fill
- if stroke_width != None:
- self.attributes['stroke-width'] = stroke_width
- if stroke != None:
- self.attributes['stroke'] = stroke
-class path(SVGelement):
- """p=path(path,fill,stroke,stroke_width,**args)
- a path is defined by a path object and optional width, stroke and fillcolor
- """
- def __init__(self, pathdata, fill=None, stroke=None, stroke_width=None, id=None, **args):
- SVGelement.__init__(self, 'path', {'d': str(pathdata)}, **args)
- if stroke != None:
- self.attributes['stroke'] = stroke
- if fill != None:
- self.attributes['fill'] = fill
- if stroke_width != None:
- self.attributes['stroke-width'] = stroke_width
- if id != None:
- self.attributes['id'] = id
-class text(SVGelement):
- """t=text(x,y,text,font_size,font_family,**args)
- a text element can bge used for displaying text on the screen
- """
- def __init__(self, x=None, y=None, text=None, font_size=None, font_family=None, text_anchor=None, **args):
- SVGelement.__init__(self, 'text', **args)
- if x != None:
- self.attributes['x'] = x
- if y != None:
- self.attributes['y'] = y
- if font_size != None:
- self.attributes['font-size'] = font_size
- if font_family != None:
- self.attributes['font-family'] = font_family
- if text != None:
- self.text = text
- if text_anchor != None:
- self.attributes['text-anchor'] = text_anchor
-class textpath(SVGelement):
- """tp=textpath(text,link,**args)
- a textpath places a text on a path which is referenced by a link.
- """
- def __init__(self, link, text=None, **args):
- SVGelement.__init__(self, 'textPath', {'xlink:href': link}, **args)
- if text != None:
- self.text = text
-class pattern(SVGelement):
- """p=pattern(x,y,width,height,patternUnits,**args)
- A pattern is used to fill or stroke an object using a pre-defined
- graphic object which can be replicated ("tiled") at fixed intervals
- in x and y to cover the areas to be painted.
- """
- def __init__(self, x=None, y=None, width=None, height=None, patternUnits=None, **args):
- SVGelement.__init__(self, 'pattern', **args)
- if x != None:
- self.attributes['x'] = x
- if y != None:
- self.attributes['y'] = y
- if width != None:
- self.attributes['width'] = width
- if height != None:
- self.attributes['height'] = height
- if patternUnits != None:
- self.attributes['patternUnits'] = patternUnits
-class title(SVGelement):
- """t=title(text,**args)
- a title is a text element. The text is displayed in the title bar
- add at least one to the root svg element
- """
- def __init__(self, text=None, **args):
- SVGelement.__init__(self, 'title', **args)
- if text != None:
- self.text = text
-class description(SVGelement):
- """d=description(text,**args)
- a description can be added to any element and is used for a tooltip
- Add this element before adding other elements.
- """
- def __init__(self, text=None, **args):
- SVGelement.__init__(self, 'desc', **args)
- if text != None:
- self.text = text
-class lineargradient(SVGelement):
- """lg=lineargradient(x1,y1,x2,y2,id,**args)
- defines a lineargradient using two xy pairs.
- stop elements van be added to define the gradient colors.
- """
- def __init__(self, x1=None, y1=None, x2=None, y2=None, id=None, **args):
- SVGelement.__init__(self, 'linearGradient', **args)
- if x1 != None:
- self.attributes['x1'] = x1
- if y1 != None:
- self.attributes['y1'] = y1
- if x2 != None:
- self.attributes['x2'] = x2
- if y2 != None:
- self.attributes['y2'] = y2
- if id != None:
- self.attributes['id'] = id
-class radialgradient(SVGelement):
- """rg=radialgradient(cx,cy,r,fx,fy,id,**args)
- defines a radial gradient using a outer circle which are defined by a cx,cy and r and by using a focalpoint.
- stop elements van be added to define the gradient colors.
- """
- def __init__(self, cx=None, cy=None, r=None, fx=None, fy=None, id=None, **args):
- SVGelement.__init__(self, 'radialGradient', **args)
- if cx != None:
- self.attributes['cx'] = cx
- if cy != None:
- self.attributes['cy'] = cy
- if r != None:
- self.attributes['r'] = r
- if fx != None:
- self.attributes['fx'] = fx
- if fy != None:
- self.attributes['fy'] = fy
- if id != None:
- self.attributes['id'] = id
-class stop(SVGelement):
- """st=stop(offset,stop_color,**args)
- Puts a stop color at the specified radius
- """
- def __init__(self, offset, stop_color=None, **args):
- SVGelement.__init__(self, 'stop', {'offset': offset}, **args)
- if stop_color != None:
- self.attributes['stop-color'] = stop_color
-class style(SVGelement):
- """st=style(type,cdata=None,**args)
- Add a CDATA element to this element for defing in line stylesheets etc..
- """
- def __init__(self, type, cdata=None, **args):
- SVGelement.__init__(self, 'style', {'type': type}, cdata=cdata, **args)
-class image(SVGelement):
- """im=image(url,width,height,x,y,**args)
- adds an image to the drawing. Supported formats are .png, .jpg and .svg.
- """
- def __init__(self, url, x=None, y=None, width=None, height=None, **args):
- if width == None or height == None:
- raise ValueError('both height and width are required')
- SVGelement.__init__(
- self, 'image', {'xlink:href': url, 'width': width, 'height': height}, **args)
- if x != None:
- self.attributes['x'] = x
- if y != None:
- self.attributes['y'] = y
-class cursor(SVGelement):
- """c=cursor(url,**args)
- defines a custom cursor for a element or a drawing
- """
- def __init__(self, url, **args):
- SVGelement.__init__(self, 'cursor', {'xlink:href': url}, **args)
-class marker(SVGelement):
- """m=marker(id,viewbox,refX,refY,markerWidth,markerHeight,**args)
- defines a marker which can be used as an endpoint for a line or other pathtypes
- add an element to it which should be used as a marker.
- """
- def __init__(self, id=None, viewBox=None, refx=None, refy=None, markerWidth=None, markerHeight=None, **args):
- SVGelement.__init__(self, 'marker', **args)
- if id != None:
- self.attributes['id'] = id
- if viewBox != None:
- self.attributes['viewBox'] = _viewboxlist(viewBox)
- if refx != None:
- self.attributes['refX'] = refx
- if refy != None:
- self.attributes['refY'] = refy
- if markerWidth != None:
- self.attributes['markerWidth'] = markerWidth
- if markerHeight != None:
- self.attributes['markerHeight'] = markerHeight
-class group(SVGelement):
- """g=group(id,**args)
- a group is defined by an id and is used to contain elements
- g.addElement(SVGelement)
- """
- def __init__(self, id=None, **args):
- SVGelement.__init__(self, 'g', **args)
- if id != None:
- self.attributes['id'] = id
-class symbol(SVGelement):
- """sy=symbol(id,viewbox,**args)
- defines a symbol which can be used on different places in your graph using
- the use element. A symbol is not rendered but you can use 'use' elements to
- display it by referencing its id.
- sy.addElement(SVGelement)
- """
- def __init__(self, id=None, viewBox=None, **args):
- SVGelement.__init__(self, 'symbol', **args)
- if id != None:
- self.attributes['id'] = id
- if viewBox != None:
- self.attributes['viewBox'] = _viewboxlist(viewBox)
-class defs(SVGelement):
- """d=defs(**args)
- container for defining elements
- """
- def __init__(self, **args):
- SVGelement.__init__(self, 'defs', **args)
-class switch(SVGelement):
- """sw=switch(**args)
- Elements added to a switch element which are "switched" by the attributes
- requiredFeatures, requiredExtensions and systemLanguage.
- Refer to the SVG specification for details.
- """
- def __init__(self, **args):
- SVGelement.__init__(self, 'switch', **args)
-class use(SVGelement):
- """u=use(link,x,y,width,height,**args)
- references a symbol by linking to its id and its position, height and width
- """
- def __init__(self, link, x=None, y=None, width=None, height=None, **args):
- SVGelement.__init__(self, 'use', {'xlink:href': link}, **args)
- if x != None:
- self.attributes['x'] = x
- if y != None:
- self.attributes['y'] = y
- if width != None:
- self.attributes['width'] = width
- if height != None:
- self.attributes['height'] = height
-class link(SVGelement):
- """a=link(url,**args)
- a link is defined by a hyperlink. add elements which have to be linked
- a.addElement(SVGelement)
- """
- def __init__(self, link='', **args):
- SVGelement.__init__(self, 'a', {'xlink:href': link}, **args)
-class view(SVGelement):
- """v=view(id,**args)
- a view can be used to create a view with different attributes"""
- def __init__(self, id=None, **args):
- SVGelement.__init__(self, 'view', **args)
- if id != None:
- self.attributes['id'] = id
-class script(SVGelement):
- """sc=script(type,type,cdata,**args)
- adds a script element which contains CDATA to the SVG drawing
- """
- def __init__(self, type, cdata=None, **args):
- SVGelement.__init__(
- self, 'script', {'type': type}, cdata=cdata, **args)
-class animate(SVGelement):
- """an=animate(attribute,from,to,during,**args)
- animates an attribute.
- """
- def __init__(self, attribute, fr=None, to=None, dur=None, **args):
- SVGelement.__init__(
- self, 'animate', {'attributeName': attribute}, **args)
- if fr != None:
- self.attributes['from'] = fr
- if to != None:
- self.attributes['to'] = to
- if dur != None:
- self.attributes['dur'] = dur
-class animateMotion(SVGelement):
- """an=animateMotion(pathdata,dur,**args)
- animates a SVGelement over the given path in dur seconds
- """
- def __init__(self, pathdata, dur, **args):
- SVGelement.__init__(self, 'animateMotion', **args)
- if pathdata != None:
- self.attributes['path'] = str(pathdata)
- if dur != None:
- self.attributes['dur'] = dur
-class animateTransform(SVGelement):
- """antr=animateTransform(type,from,to,dur,**args)
- transform an element from and to a value.
- """
- def __init__(self, type=None, fr=None, to=None, dur=None, **args):
- SVGelement.__init__(self, 'animateTransform', {
- 'attributeName': 'transform'}, **args)
- # As far as I know the attributeName is always transform
- if type != None:
- self.attributes['type'] = type
- if fr != None:
- self.attributes['from'] = fr
- if to != None:
- self.attributes['to'] = to
- if dur != None:
- self.attributes['dur'] = dur
-class animateColor(SVGelement):
- """ac=animateColor(attribute,type,from,to,dur,**args)
- Animates the color of a element
- """
- def __init__(self, attribute, type=None, fr=None, to=None, dur=None, **args):
- SVGelement.__init__(self, 'animateColor', {
- 'attributeName': attribute}, **args)
- if type != None:
- self.attributes['type'] = type
- if fr != None:
- self.attributes['from'] = fr
- if to != None:
- self.attributes['to'] = to
- if dur != None:
- self.attributes['dur'] = dur
-class set(SVGelement):
- """st=set(attribute,to,during,**args)
- sets an attribute to a value for a
- """
- def __init__(self, attribute, to=None, dur=None, **args):
- SVGelement.__init__(self, 'set', {'attributeName': attribute}, **args)
- if to != None:
- self.attributes['to'] = to
- if dur != None:
- self.attributes['dur'] = dur
-class svg(SVGelement):
- """s=svg(viewbox,width,height,**args)
- a svg or element is the root of a drawing add all elements to a svg element.
- You can have different svg elements in one svg file
- s.addElement(SVGelement)
- eg
- d=drawing()
- s=svg((0,0,100,100),'100%','100%')
- c=circle(50,50,20)
- s.addElement(c)
- d.setSVG(s)
- d.toXml()
- """
- def __init__(self, viewBox=None, width=None, height=None, **args):
- SVGelement.__init__(self, 'svg', **args)
- if viewBox != None:
- self.attributes['viewBox'] = _viewboxlist(viewBox)
- if width != None:
- self.attributes['width'] = width
- if height != None:
- self.attributes['height'] = height
- self.namespace = "http://www.w3.org/2000/svg"
-class drawing:
- """d=drawing()
- this is the actual SVG document. It needs a svg element as a root.
- Use the addSVG method to set the svg to the root. Use the toXml method to write the SVG
- source to the screen or to a file
- d=drawing()
- d.addSVG(svg)
- d.toXml(optionalfilename)
- """
- def __init__(self, entity={}):
- self.svg = None
- self.entity = entity
- def setSVG(self, svg):
- self.svg = svg
- # Voeg een element toe aan de grafiek toe.
- if use_dom_implementation == 0:
- def toXml(self, filename='', compress=False):
- import io
- xml = io.StringIO()
- xml.write("<?xml version='1.0' encoding='UTF-8'?>\n")
- xml.write(
- "<!DOCTYPE svg PUBLIC \"-//W3C//DTD SVG 1.0//EN\" \"http://www.w3.org/TR/2001/REC-SVG-20010904/DTD/svg10.dtd\"")
- if self.entity:
- xml.write(" [\n")
- for item in list(self.entity.keys()):
- xml.write("<!ENTITY %s \"%s\">\n" %
- (item, self.entity[item]))
- xml.write("]")
- xml.write(">\n")
- self.svg.toXml(0, xml)
- if not filename:
- if compress:
- import gzip
- f = io.StringIO()
- zf = gzip.GzipFile(fileobj=f, mode='wb')
- zf.write(xml.getvalue())
- zf.close()
- f.seek(0)
- return f.read()
- else:
- return xml.getvalue()
- else:
- if filename[-4:] == 'svgz':
- import gzip
- f = gzip.GzipFile(filename=filename,
- mode="wb", compresslevel=9)
- f.write(xml.getvalue())
- f.close()
- else:
- f = file(filename, 'w')
- f.write(xml.getvalue())
- f.close()
- else:
- def toXml(self, filename='', compress=False):
- """drawing.toXml() ---->to the screen
- drawing.toXml(filename)---->to the file
- writes a svg drawing to the screen or to a file
- compresses if filename ends with svgz or if compress is true
- """
- doctype = implementation.createDocumentType(
- 'svg', "-//W3C//DTD SVG 1.0//EN""", 'http://www.w3.org/TR/2001/REC-SVG-20010904/DTD/svg10.dtd ')
- global root
- # root is defined global so it can be used by the appender. Its also possible to use it as an arugument but
- # that is a bit messy.
- root = implementation.createDocument(None, None, doctype)
- # Create the xml document.
- global appender
- def appender(element, elementroot):
- """This recursive function appends elements to an element and sets the attributes
- and type. It stops when alle elements have been appended"""
- if element.namespace:
- e = root.createElementNS(element.namespace, element.type)
- else:
- e = root.createElement(element.type)
- if element.text:
- textnode = root.createTextNode(element.text)
- e.appendChild(textnode)
- # in element.attributes is supported from python 2.2
- for attribute in list(element.attributes.keys()):
- e.setAttribute(attribute, str(
- element.attributes[attribute]))
- if element.elements:
- for el in element.elements:
- e = appender(el, e)
- elementroot.appendChild(e)
- return elementroot
- root = appender(self.svg, root)
- if not filename:
- import io
- xml = io.StringIO()
- PrettyPrint(root, xml)
- if compress:
- import gzip
- f = io.StringIO()
- zf = gzip.GzipFile(fileobj=f, mode='wb')
- zf.write(xml.getvalue())
- zf.close()
- f.seek(0)
- return f.read()
- else:
- return xml.getvalue()
- else:
- try:
- if filename[-4:] == 'svgz':
- import gzip
- import io
- xml = io.StringIO()
- PrettyPrint(root, xml)
- f = gzip.GzipFile(filename=filename,
- mode='wb', compresslevel=9)
- f.write(xml.getvalue())
- f.close()
- else:
- f = open(filename, 'w')
- PrettyPrint(root, f)
- f.close()
- except:
- print(("Cannot write SVG file: " + filename))
- def validate(self):
- try:
- import xml.parsers.xmlproc.xmlval
- except:
- raise exceptions.ImportError(
- 'PyXml is required for validating SVG')
- svg = self.toXml()
- xv = xml.parsers.xmlproc.xmlval.XMLValidator()
- try:
- xv.feed(svg)
- except:
- raise Exception("SVG is not well formed, see messages above")
- else:
- print("SVG well formed")
-if __name__ == '__main__':
- d = drawing()
- s = svg((0, 0, 100, 100))
- r = rect(-100, -100, 300, 300, 'cyan')
- s.addElement(r)
- t = title('SVGdraw Demo')
- s.addElement(t)
- g = group('animations')
- e = ellipse(0, 0, 5, 2)
- g.addElement(e)
- c = circle(0, 0, 1, 'red')
- g.addElement(c)
- pd = pathdata(0, -10)
- for i in range(6):
- pd.relsmbezier(10, 5, 0, 10)
- pd.relsmbezier(-10, 5, 0, 10)
- an = animateMotion(pd, 10)
- an.attributes['rotate'] = 'auto-reverse'
- an.attributes['repeatCount'] = "indefinite"
- g.addElement(an)
- s.addElement(g)
- for i in range(20, 120, 20):
- u = use('#animations', i, 0)
- s.addElement(u)
- for i in range(0, 120, 20):
- for j in range(5, 105, 10):
- c = circle(i, j, 1, 'red', 'black', .5)
- s.addElement(c)
- d.setSVG(s)
- print((d.toXml()))
diff --git a/wqflask/utility/temp_data.py b/wqflask/utility/temp_data.py
deleted file mode 100644
index 07c5a318..00000000
--- a/wqflask/utility/temp_data.py
+++ /dev/null
@@ -1,25 +0,0 @@
-from redis import Redis
-import simplejson as json
-class TempData:
- def __init__(self, temp_uuid):
- self.temp_uuid = temp_uuid
- self.redis = Redis()
- self.key = "tempdata:{}".format(self.temp_uuid)
- def store(self, field, value):
- self.redis.hset(self.key, field, value)
- self.redis.expire(self.key, 60 * 15) # Expire in 15 minutes
- def get_all(self):
- return self.redis.hgetall(self.key)
-if __name__ == "__main__":
- redis = Redis()
- for key in list(redis.keys()):
- for field in redis.hkeys(key):
- print("{}.{}={}".format(key, field, redis.hget(key, field)))
diff --git a/wqflask/utility/tools.py b/wqflask/utility/tools.py
deleted file mode 100644
index 283b0408..00000000
--- a/wqflask/utility/tools.py
+++ /dev/null
@@ -1,357 +0,0 @@
-# Tools/paths finder resolves external paths from settings and/or environment
-# variables
-import os
-import sys
-import json
-import socket
-from pathlib import Path
-from wqflask import app
-# Use the standard logger here to avoid a circular dependency
-import logging
-logger = logging.getLogger(__name__)
-def app_set(command_id, value):
- """Set application wide value"""
- app.config.setdefault(command_id, value)
- return value
-def get_setting(command_id, guess=None):
- """Resolve a setting from the environment or the global settings in
- app.config, with valid_path is a function checking whether the
- path points to an expected directory and returns the full path to
- the binary command
- guess = os.environ.get('HOME')+'/pylmm'
- valid_path(get_setting('PYLMM_PATH',guess))
- first tries the environment variable in +id+, next gets the Flask
- app setting for the same +id+ and finally does an educated
- +guess+.
- In all, the environment overrides the others, next is the flask
- setting, then the guess. A valid path to the binary command is
- returned. If none is resolved an exception is thrown.
- Note that we do not use the system path. This is on purpose
- because it will mess up controlled (reproducible) deployment. The
- proper way is to either use the GNU Guix defaults as listed in
- etc/default_settings.py or override them yourself by creating a
- different settings.py file (or setting the environment).
- """
- def value(command):
- if command:
- # sys.stderr.write("Found "+command+"\n")
- app_set(command_id, command)
- return command
- else:
- return app.config.get(command_id)
- # ---- Check whether environment exists
- # print("Looking for "+command_id+"\n")
- command = value(os.environ.get(command_id))
- if command is None or command == "":
- # ---- Check whether setting exists in app
- command = value(app.config.get(command_id))
- if command is None:
- command = value(guess)
- if command is None or command == "":
- # print command
- raise Exception(
- command_id + ' setting unknown or faulty (update default_settings.py?).')
- # print("Set "+command_id+"="+str(command))
- return command
-def get_setting_bool(id):
- v = get_setting(id)
- if v not in [0, False, 'False', 'FALSE', None]:
- return True
- return False
-def get_setting_int(id):
- v = get_setting(id)
- if isinstance(v, str):
- return int(v)
- if v is None:
- return 0
- return v
-def valid_bin(bin):
- if os.path.islink(bin) or valid_file(bin):
- return bin
- return None
-def valid_file(fn):
- if os.path.isfile(fn):
- return fn
- return None
-def valid_path(dir):
- if os.path.isdir(dir):
- return dir
- return None
-def js_path(module=None):
- """
- Find the JS module in the two paths
- """
- try_gn = get_setting("JS_GN_PATH") + "/" + module
- if valid_path(try_gn):
- return try_gn
- try_guix = get_setting("JS_GUIX_PATH") + "/" + module
- if valid_path(try_guix):
- return try_guix
- raise "No JS path found for " + module + \
- " (if not in Guix check JS_GN_PATH)"
-def reaper_command(guess=None):
- return get_setting("REAPER_COMMAND", guess)
-def gemma_command(guess=None):
- return assert_bin(get_setting("GEMMA_COMMAND", guess))
-def gemma_wrapper_command(guess=None):
- return assert_bin(get_setting("GEMMA_WRAPPER_COMMAND", guess))
-def plink_command(guess=None):
- return assert_bin(get_setting("PLINK_COMMAND", guess))
-def flat_file_exists(subdir):
- base = get_setting("GENENETWORK_FILES")
- return valid_path(base + "/" + subdir)
-def flat_files(subdir=None):
- base = get_setting("GENENETWORK_FILES")
- if subdir:
- return assert_dir(base + "/" + subdir)
- return assert_dir(base)
-def assert_bin(fn):
- if not valid_bin(fn):
- raise Exception("ERROR: can not find binary " + fn)
- return fn
-def assert_dir(the_dir):
- if not valid_path(the_dir):
- raise FileNotFoundError(f"ERROR: can not find directory '{the_dir}'")
- return the_dir
-def assert_writable_dir(dir):
- try:
- fn = os.path.join(dir, "test.txt")
- fh = open(fn, 'w')
- fh.write("I am writing this text to the file\n")
- fh.close()
- os.remove(fn)
- except IOError:
- raise Exception('Unable to write test.txt to directory ' + dir)
- return dir
-def assert_file(fn):
- if not valid_file(fn):
- raise FileNotFoundError(f"Unable to find file '{fn}'")
- return fn
-def mk_dir(dir):
- if not valid_path(dir):
- os.makedirs(dir)
- return assert_dir(dir)
-def locate(name, subdir=None):
- """
- Locate a static flat file in the GENENETWORK_FILES environment.
- This function throws an error when the file is not found.
- """
- base = get_setting("GENENETWORK_FILES")
- if subdir:
- base = base + "/" + subdir
- if valid_path(base):
- lookfor = base + "/" + name
- if valid_file(lookfor):
- return lookfor
- else:
- raise Exception("Can not locate " + lookfor)
- if subdir:
- sys.stderr.write(subdir)
- raise Exception("Can not locate " + name + " in " + base)
-def locate_phewas(name, subdir=None):
- return locate(name, '/phewas/' + subdir)
-def locate_ignore_error(name, subdir=None):
- """
- Locate a static flat file in the GENENETWORK_FILES environment.
- This function does not throw an error when the file is not found
- but returns None.
- """
- base = get_setting("GENENETWORK_FILES")
- if subdir:
- base = base + "/" + subdir
- if valid_path(base):
- lookfor = base + "/" + name
- if valid_file(lookfor):
- return lookfor
- return None
-def tempdir():
- """
- Get UNIX TMPDIR by default
- """
- return valid_path(get_setting("TMPDIR", "/tmp"))
-BLUE = '\033[94m'
-GREEN = '\033[92m'
-BOLD = '\033[1m'
-ENDC = '\033[0m'
-def show_settings():
- from utility.tools import LOG_LEVEL
- print(("Set global log level to " + BLUE + LOG_LEVEL + ENDC),
- file=sys.stderr)
- log_level = getattr(logging, LOG_LEVEL.upper())
- logging.basicConfig(level=log_level)
- logger.info(BLUE + "Mr. Mojo Risin 2" + ENDC)
- keylist = list(app.config.keys())
- print("runserver.py: ****** Webserver configuration - k,v pairs from app.config ******",
- file=sys.stderr)
- keylist.sort()
- for k in keylist:
- try:
- print(("%s: %s%s%s%s" % (k, BLUE, BOLD, get_setting(k), ENDC)),
- file=sys.stderr)
- except:
- print(("%s: %s%s%s%s" % (k, GREEN, BOLD, app.config[k], ENDC)),
- file=sys.stderr)
-def gn_version_repo_info(root_dir):
- """retrieve the branch name and abbreviated commit hash."""
- try:
- from git import Repo
- repo = Repo(root_dir)
- return f"{repo.head.ref.name}-{repo.head.commit.hexsha[0:9]}"
- except:
- return ""
-def gn_version():
- """Compute and return the version of the application."""
- hostname = socket.gethostname()
- basedir = Path(__file__).absolute().parent.parent.parent
- with open(Path(basedir, "etc", "VERSION"), encoding="utf8") as version_file:
- version_contents = version_file.read().strip()
- base_version = f"{hostname}:{basedir.name}:{version_contents}"
- repo_info = gn_version_repo_info(basedir)
- return f"{base_version}-{repo_info}" if bool(repo_info) else base_version
-# Cached values
-GN_VERSION = gn_version()
-HOME = get_setting('HOME')
-SERVER_PORT = get_setting('SERVER_PORT')
-GN2_BASE_URL = get_setting('GN2_BASE_URL')
-GN2_BRANCH_URL = get_setting('GN2_BRANCH_URL')
-GN_SERVER_URL = get_setting('GN_SERVER_URL')
-GN_PROXY_URL = get_setting('GN_PROXY_URL')
-GN3_LOCAL_URL = get_setting('GN3_LOCAL_URL')
-SERVER_PORT = get_setting_int('SERVER_PORT')
-SQL_URI = get_setting('SQL_URI')
-LOG_LEVEL = get_setting('LOG_LEVEL')
-LOG_LEVEL_DEBUG = get_setting_int('LOG_LEVEL_DEBUG')
-LOG_SQL = get_setting_bool('LOG_SQL')
-LOG_SQL_ALCHEMY = get_setting_bool('LOG_SQL_ALCHEMY')
-LOG_BENCH = get_setting_bool('LOG_BENCH')
-LOG_FORMAT = "%(message)s" # not yet in use
-USE_REDIS = get_setting_bool('USE_REDIS')
-REDIS_URL = get_setting('REDIS_URL')
-USE_GN_SERVER = get_setting_bool('USE_GN_SERVER')
-JS_GUIX_PATH = get_setting('JS_GUIX_PATH')
-JS_GN_PATH = get_setting('JS_GN_PATH')
-# assert_dir(JS_GN_PATH)
- GITHUB_AUTH_URL = "https://github.com/login/oauth/authorize?client_id=" + \
- GITHUB_API_URL = get_setting('GITHUB_API_URL')
- ORCID_AUTH_URL = "https://orcid.org/oauth/authorize?response_type=code&scope=/authenticate&show_login=true&client_id=" + \
- ORCID_CLIENT_ID + "&client_secret=" + ORCID_CLIENT_SECRET + \
- "&redirect_uri=" + GN2_BRANCH_URL + "n/login/orcid_oauth2"
-REAPER_COMMAND = app_set("REAPER_COMMAND", reaper_command())
-GEMMA_COMMAND = app_set("GEMMA_COMMAND", gemma_command())
-assert(GEMMA_COMMAND is not None)
-PLINK_COMMAND = app_set("PLINK_COMMAND", plink_command())
-GEMMA_WRAPPER_COMMAND = gemma_wrapper_command()
-TEMPDIR = tempdir() # defaults to UNIX TMPDIR
-# ---- Handle specific JS modules
-JS_GUIX_PATH = get_setting("JS_GUIX_PATH")
-assert_dir(JS_GUIX_PATH + '/cytoscape-panzoom')
-CSS_PATH = JS_GUIX_PATH # The CSS is bundled together with the JS
-# assert_dir(JS_PATH)
- "JS_TWITTER_POST_FETCHER_PATH", js_path("javascript-twitter-post-fetcher"))
-assert_file(JS_TWITTER_POST_FETCHER_PATH + "/js/twitterFetcher_min.js")
-JS_CYTOSCAPE_PATH = get_setting("JS_CYTOSCAPE_PATH", js_path("cytoscape"))
-assert_file(JS_CYTOSCAPE_PATH + '/cytoscape.min.js')
-# assert_file(PHEWAS_FILES+"/auwerx/PheWAS_pval_EMMA_norm.RData")
diff --git a/wqflask/utility/type_checking.py b/wqflask/utility/type_checking.py
deleted file mode 100644
index 5e98f4ae..00000000
--- a/wqflask/utility/type_checking.py
+++ /dev/null
@@ -1,47 +0,0 @@
-# Type checking functions
-def is_float(value):
- try:
- float(value)
- return True
- except:
- return False
-def is_int(value):
- try:
- int(value)
- return True
- except:
- return False
-def is_str(value):
- if value is None:
- return False
- try:
- str(value)
- return True
- except:
- return False
-def get_float(vars_obj, name, default=None):
- if name in vars_obj:
- if is_float(vars_obj[name]):
- return float(vars_obj[name])
- return default
-def get_int(vars_obj, name, default=None):
- if name in vars_obj:
- if is_int(vars_obj[name]):
- return float(vars_obj[name])
- return default
-def get_string(vars_obj, name, default=None):
- if name in vars_obj:
- if not vars_obj[name] is None and not vars_obj[name] == "":
- return str(vars_obj[name])
- return default
diff --git a/wqflask/utility/webqtlUtil.py b/wqflask/utility/webqtlUtil.py
deleted file mode 100644
index 0cb71567..00000000
--- a/wqflask/utility/webqtlUtil.py
+++ /dev/null
@@ -1,118 +0,0 @@
-# Copyright (C) University of Tennessee Health Science Center, Memphis, TN.
-# This program is free software: you can redistribute it and/or modify it
-# under the terms of the GNU Affero General Public License
-# as published by the Free Software Foundation, either version 3 of the
-# License, or (at your option) any later version.
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# See the GNU Affero General Public License for more details.
-# This program is available from Source Forge: at GeneNetwork Project
-# (sourceforge.net/projects/genenetwork/).
-# Contact Drs. Robert W. Williams and Xiaodong Zhou (2010)
-# at rwilliams@uthsc.edu and xzhou15@uthsc.edu
-# This module is used by GeneNetwork project (www.genenetwork.org)
-# Created by GeneNetwork Core Team 2010/08/10
-# Last updated by GeneNetwork Core Team 2010/10/20
-import string
-import time
-import re
-import math
-from math import *
-from base import webqtlConfig
-# NL, 07/27/2010. moved from webqtlForm.py
-# Dict of Parents and F1 information, In the order of [F1, Mat, Pat]
-ParInfo = {
- 'BXH': ['BHF1', 'HBF1', 'C57BL/6J', 'C3H/HeJ'],
- 'AKXD': ['AKF1', 'KAF1', 'AKR/J', 'DBA/2J'],
- 'BXD': ['B6D2F1', 'D2B6F1', 'C57BL/6J', 'DBA/2J'],
- 'C57BL-6JxC57BL-6NJF2': ['', '', 'C57BL/6J', 'C57BL/6NJ'],
- 'BXD300': ['B6D2F1', 'D2B6F1', 'C57BL/6J', 'DBA/2J'],
- 'B6BTBRF2': ['B6BTBRF1', 'BTBRB6F1', 'C57BL/6J', 'BTBRT<+>tf/J'],
- 'BHHBF2': ['B6HF2', 'HB6F2', 'C57BL/6J', 'C3H/HeJ'],
- 'BHF2': ['B6HF2', 'HB6F2', 'C57BL/6J', 'C3H/HeJ'],
- 'B6D2F2': ['B6D2F1', 'D2B6F1', 'C57BL/6J', 'DBA/2J'],
- 'BDF2-1999': ['B6D2F2', 'D2B6F2', 'C57BL/6J', 'DBA/2J'],
- 'BDF2-2005': ['B6D2F1', 'D2B6F1', 'C57BL/6J', 'DBA/2J'],
- 'CTB6F2': ['CTB6F2', 'B6CTF2', 'C57BL/6J', 'Castaneous'],
- 'CXB': ['CBF1', 'BCF1', 'C57BL/6ByJ', 'BALB/cByJ'],
- 'AXBXA': ['ABF1', 'BAF1', 'C57BL/6J', 'A/J'],
- 'AXB': ['ABF1', 'BAF1', 'C57BL/6J', 'A/J'],
- 'BXA': ['BAF1', 'ABF1', 'C57BL/6J', 'A/J'],
- 'LXS': ['LSF1', 'SLF1', 'ISS', 'ILS'],
- 'HXBBXH': ['SHR_BNF1', 'BN_SHRF1', 'BN-Lx/Cub', 'SHR/OlaIpcv'],
- 'BayXSha': ['BayXShaF1', 'ShaXBayF1', 'Bay-0', 'Shahdara'],
- 'ColXBur': ['ColXBurF1', 'BurXColF1', 'Col-0', 'Bur-0'],
- 'ColXCvi': ['ColXCviF1', 'CviXColF1', 'Col-0', 'Cvi'],
- 'SXM': ['SMF1', 'MSF1', 'Steptoe', 'Morex'],
- 'HRDP': ['SHR_BNF1', 'BN_SHRF1', 'BN-Lx/Cub', 'SHR/OlaIpcv']
-# Accessory Functions
-def genRandStr(prefix="", length=8, chars=string.ascii_letters + string.digits):
- from random import choice
- _str = prefix[:]
- for i in range(length):
- _str += choice(chars)
- return _str
-def ListNotNull(lst):
- '''Obsolete - Use built in function any (or all or whatever)
- Determine if the elements in a list are all null
- '''
- for item in lst:
- if item is not None:
- return 1
- return None
-def readLineCSV(line): # dcrowell July 2008
- """Parses a CSV string of text and returns a list containing each element as a string.
- Used by correlationPage"""
- returnList = line.split('","')
- returnList[-1] = returnList[-1][:-2]
- returnList[0] = returnList[0][1:]
- return returnList
-def cmpEigenValue(A, B):
- try:
- if A[0] > B[0]:
- return -1
- elif A[0] == B[0]:
- return 0
- else:
- return 1
- except:
- return 0
-def hasAccessToConfidentialPhenotypeTrait(privilege, userName, authorized_users):
- access_to_confidential_phenotype_trait = 0
- if webqtlConfig.USERDICT[privilege] > webqtlConfig.USERDICT['user']:
- access_to_confidential_phenotype_trait = 1
- else:
- AuthorizedUsersList = [x.strip() for x in authorized_users.split(',')]
- if userName in AuthorizedUsersList:
- access_to_confidential_phenotype_trait = 1
- return access_to_confidential_phenotype_trait