From e492c9d3812ee2f6d6c7b640e216af09710b91d7 Mon Sep 17 00:00:00 2001 From: Pjotr Prins Date: Wed, 11 Mar 2015 14:36:36 +0300 Subject: Adding multi-core kinship.py --- wqflask/wqflask/my_pylmm/pyLMM/kinship.py | 136 ++++++++++++++++++++++++++++ wqflask/wqflask/my_pylmm/pyLMM/lmm.py | 4 + wqflask/wqflask/my_pylmm/pyLMM/optmatrix.py | 2 +- wqflask/wqflask/my_pylmm/pyLMM/runlmm.py | 59 +++++++----- 4 files changed, 177 insertions(+), 24 deletions(-) create mode 100644 wqflask/wqflask/my_pylmm/pyLMM/kinship.py diff --git a/wqflask/wqflask/my_pylmm/pyLMM/kinship.py b/wqflask/wqflask/my_pylmm/pyLMM/kinship.py new file mode 100644 index 00000000..dc2d717d --- /dev/null +++ b/wqflask/wqflask/my_pylmm/pyLMM/kinship.py @@ -0,0 +1,136 @@ +# pylmm kinship calculation +# +# Copyright (C) 2013 Nicholas A. Furlotte (nick.furlotte@gmail.com) +# Copyright (C) 2015 Pjotr Prins (pjotr.prins@thebird.nl) +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +# env PYTHONPATH=$pylmm_lib_path:./lib python $pylmm_lib_path/runlmm.py --pheno test.pheno --geno test9000.geno kinship --test + +import sys +import os +import numpy as np +import multiprocessing as mp # Multiprocessing is part of the Python stdlib +import Queue + +from optmatrix import matrix_initialize, matrixMultT + + +def compute_W(job,G,n,compute_size): + """ + Read 1000 SNPs at a time into matrix and return the result + """ + W = np.ones((n,compute_size)) * np.nan # W matrix has dimensions individuals x SNPs (initially all NaNs) + for j in range(0,compute_size): + row = job*compute_size + j + if row >= compute_size: + W = W[:,range(0,j)] + break + snp = G[job*compute_size+j] + # print snp.shape,snp + if snp.var() == 0: + continue + W[:,j] = snp # set row to list of SNPs + return W + +def compute_matrixMult(job,W,q = None): + """ + Compute Kinship(W)*j + + For every set of SNPs matrixMult is used to multiply matrices T(W)*W + """ + res = matrixMultT(W) + if not q: q=compute_matrixMult.q + q.put([job,res]) + return job + +def f_init(q): + compute_matrixMult.q = q + +# Calculate the kinship matrix from G (SNPs as rows!), returns K +# +def kinship(G,options): + numThreads = None + if options.numThreads: + numThreads = int(options.numThreads) + options.computeSize = 1000 + matrix_initialize(options.useBLAS) + + sys.stderr.write(str(G.shape)+"\n") + n = G.shape[1] # inds + inds = n + m = G.shape[0] # snps + snps = m + sys.stderr.write(str(m)+" SNPs\n") + + q = mp.Queue() + p = mp.Pool(numThreads, f_init, [q]) + iterations = snps/options.computeSize+1 + if options.testing: + iterations = 8 + # jobs = range(0,8) # range(0,iterations) + + results = [] + + K = np.zeros((n,n)) # The Kinship matrix has dimension individuals x individuals + + completed = 0 + for job in range(iterations): + if options.verbose: + sys.stderr.write("Processing job %d first %d SNPs\n" % (job, ((job+1)*options.computeSize))) + W = compute_W(job,G,n,options.computeSize) + if numThreads == 1: + compute_matrixMult(job,W,q) + j,x = q.get() + if options.verbose: sys.stderr.write("Job "+str(j)+" finished\n") + K_j = x + # print j,K_j[:,0] + K = K + K_j + else: + results.append(p.apply_async(compute_matrixMult, (job,W))) + # Do we have a result? + try: + j,x = q.get_nowait() + if options.verbose: sys.stderr.write("Job "+str(j)+" finished\n") + K_j = x + # print j,K_j[:,0] + K = K + K_j + completed += 1 + except Queue.Empty: + pass + + if numThreads == None or numThreads > 1: + for job in range(len(results)-completed): + j,x = q.get(True,15) + if options.verbose: sys.stderr.write("Job "+str(j)+" finished\n") + K_j = x + # print j,K_j[:,0] + K = K + K_j + + print K.shape,K + K = K / float(snps) + outFile = 'runtest.kin' + if options.verbose: sys.stderr.write("Saving Kinship file to %s\n" % outFile) + np.savetxt(outFile,K) + + if options.saveKvaKve: + if options.verbose: sys.stderr.write("Obtaining Eigendecomposition\n") + Kva,Kve = linalg.eigh(K) + if options.verbose: sys.stderr.write("Saving eigendecomposition to %s.[kva | kve]\n" % outFile) + np.savetxt(outFile+".kva",Kva) + np.savetxt(outFile+".kve",Kve) + + + + diff --git a/wqflask/wqflask/my_pylmm/pyLMM/lmm.py b/wqflask/wqflask/my_pylmm/pyLMM/lmm.py index 58d7593d..65b989a8 100644 --- a/wqflask/wqflask/my_pylmm/pyLMM/lmm.py +++ b/wqflask/wqflask/my_pylmm/pyLMM/lmm.py @@ -406,6 +406,8 @@ def GWAS(pheno_vector, v = np.isnan(pheno_vector) if v.sum(): keep = True - v + print(pheno_vector.shape,pheno_vector) + print(keep.shape,keep) pheno_vector = pheno_vector[keep] #genotype_matrix = genotype_matrix[keep,:] #covariate_matrix = covariate_matrix[keep,:] @@ -437,6 +439,8 @@ def GWAS(pheno_vector, p_values.append(0) t_statistics.append(np.nan) continue + + print(genotype_matrix.shape,pheno_vector.shape,keep.shape) pheno_vector = pheno_vector[keep] covariate_matrix = covariate_matrix[keep,:] diff --git a/wqflask/wqflask/my_pylmm/pyLMM/optmatrix.py b/wqflask/wqflask/my_pylmm/pyLMM/optmatrix.py index abb72081..5c71db6a 100644 --- a/wqflask/wqflask/my_pylmm/pyLMM/optmatrix.py +++ b/wqflask/wqflask/my_pylmm/pyLMM/optmatrix.py @@ -9,7 +9,7 @@ from scipy import stats useNumpy = None hasBLAS = None -def matrix_initialize(useBLAS): +def matrix_initialize(useBLAS=True): global useNumpy # module based variable if useBLAS and useNumpy == None: print get_info('blas_opt') diff --git a/wqflask/wqflask/my_pylmm/pyLMM/runlmm.py b/wqflask/wqflask/my_pylmm/pyLMM/runlmm.py index 738268be..bd6c55fc 100644 --- a/wqflask/wqflask/my_pylmm/pyLMM/runlmm.py +++ b/wqflask/wqflask/my_pylmm/pyLMM/runlmm.py @@ -22,17 +22,19 @@ import sys import tsvreader import numpy as np from lmm import gn2_load_redis +from kinship import kinship usage = """ python runlmm.py [options] command runlmm.py processing multiplexer reads standardised input formats - and calls the different routines + and calls the different routines (writes to stdout) Current commands are: parse : only parse input files redis : use Redis to call into GN2 + kinship : calculate (new) kinship matrix try --help for more information """ @@ -50,6 +52,13 @@ parser.add_option("--geno",dest="geno", parser.add_option("-q", "--quiet", action="store_false", dest="verbose", default=True, help="don't print status messages to stdout") +parser.add_option("--blas", action="store_true", default=False, dest="useBLAS", help="Use BLAS instead of numpy matrix multiplication") +parser.add_option("-t", "--threads", + type="int", dest="numThreads", + help="Threads to use") +parser.add_option("--saveKvaKve", + action="store_true", dest="saveKvaKve", default=False, + help="Testing mode") parser.add_option("--test", action="store_true", dest="testing", default=False, help="Testing mode") @@ -63,6 +72,10 @@ if len(args) != 1: cmd = args[0] print "Command: ",cmd +k = None +y = None +g = None + if options.kinship: k = tsvreader.kinship(options.kinship) print k.shape @@ -74,7 +87,7 @@ if options.pheno: if options.geno: g = tsvreader.geno(options.geno) print g.shape - + def normalizeGenotype(G): # Run for every SNP list (num individuals) x = True - np.isnan(G) # Matrix of True/False @@ -89,32 +102,32 @@ def normalizeGenotype(G): G = (G - m) / s # Normalize the deviation return G -# g = g.reshape((1, -1))[0] print("Before",g) -gn = [] -for snp in g: - gn.append( normalizeGenotype(snp) ) - -gn = g -gn = np.array(gn) -print("After1",gn) -gnT = gn.T -print("After",gnT) -# G = gnT -G = gnT + +gT = g.T +print("After",gT) +G = np.array(gT) print "G shape",G.shape -# sys.exit(1) -# assert(G[0,0]==-2.25726341) # Remove individuals with missing phenotypes -v = np.isnan(y) -keep = True - v -if v.sum(): - if options.verbose: sys.stderr.write("Cleaning the phenotype vector by removing %d individuals...\n" % (v.sum())) - y = y[keep] - G = G[keep,:] - k = k[keep,:][:,keep] +if y != None: + v = np.isnan(y) + keep = True - v + if v.sum(): + if options.verbose: sys.stderr.write("Cleaning the phenotype vector by removing %d individuals...\n" % (v.sum())) + y = y[keep] + G = G[keep,:] + if k != None: k = k[keep,:][:,keep] if cmd == 'redis': ps, ts = gn2_load_redis('testrun','other',np.array(k),y,G,options.testing) print np.array(ps) +elif cmd == 'kinship': + gn = [] + for snp in G.T: + gn.append( normalizeGenotype(snp) ) + G2 = np.array(gn) + print G2.shape,G2 + K = kinship(G2,options) +else: + print "Doing nothing" -- cgit v1.2.3