From 886934b9cf18cf092c5d6cd0667d860aa30e64b8 Mon Sep 17 00:00:00 2001 From: Pjotr Prins Date: Mon, 16 Mar 2015 18:08:00 +0300 Subject: GWAS: results get returned --- wqflask/wqflask/my_pylmm/pyLMM/gwas.py | 56 +++++++++++++++++++--------------- wqflask/wqflask/my_pylmm/pyLMM/lmm2.py | 10 +++--- 2 files changed, 36 insertions(+), 30 deletions(-) (limited to 'wqflask') diff --git a/wqflask/wqflask/my_pylmm/pyLMM/gwas.py b/wqflask/wqflask/my_pylmm/pyLMM/gwas.py index b9d5db61..f8d77ab6 100644 --- a/wqflask/wqflask/my_pylmm/pyLMM/gwas.py +++ b/wqflask/wqflask/my_pylmm/pyLMM/gwas.py @@ -36,15 +36,17 @@ def formatResult(id,beta,betaSD,ts,ps): return "\t".join([str(x) for x in [id,beta,betaSD,ts,ps]]) + "\n" def compute_snp(j,n,snp_ids,lmm2,REML,q = None): - # print(j,snp_ids,"\n") + # print("COMPUTE SNP",j,snp_ids,"\n") result = [] for snp_id in snp_ids: snp,id = snp_id x = snp.reshape((n,1)) # all the SNPs + # print "X=",x # if refit: # L.fit(X=snp,REML=REML) ts,ps,beta,betaVar = lmm2.association(x,REML=REML,returnBeta=True) - result.append(formatResult(id,beta,np.sqrt(betaVar).sum(),ts,ps)) + # result.append(formatResult(id,beta,np.sqrt(betaVar).sum(),ts,ps)) + result.append( (ts,ps) ) if not q: q = compute_snp.q q.put([j,result]) @@ -95,7 +97,8 @@ def gwas(Y,G,K,restricted_max_likelihood=True,refit=False,verbose=True): out.write(formatResult(id,beta,betaSD,ts,ps)) def printOutHead(): out.write("\t".join(["SNP_ID","BETA","BETA_SD","F_STAT","P_VALUE"]) + "\n") - printOutHead() + # printOutHead() + res = [] # Set up the pool # mp.set_start_method('spawn') @@ -114,6 +117,7 @@ def gwas(Y,G,K,restricted_max_likelihood=True,refit=False,verbose=True): for snp in G: snp_id = (snp,'SNPID') count += 1 + # print count,snp_id if count % 1000 == 0: job = count/1000 if verbose: @@ -122,21 +126,23 @@ def gwas(Y,G,K,restricted_max_likelihood=True,refit=False,verbose=True): print "Running on 1 THREAD" compute_snp(job,n,collect,lmm2,reml,q) collect = [] - j,lines = q.get() + j,lst = q.get() if verbose: sys.stderr.write("Job "+str(j)+" finished\n") - for line in lines: - out.write(line) + # for line in lines: + # out.write(line) + res.append(lst) else: p.apply_async(compute_snp,(job,n,collect,lmm2,reml)) collect = [] while job > completed: try: - j,lines = q.get_nowait() + j,lst = q.get_nowait() if verbose: sys.stderr.write("Job "+str(j)+" finished\n") - for line in lines: - out.write(line) + # for line in lines: + # out.write(line) + res.append(lst) completed += 1 except Queue.Empty: pass @@ -148,23 +154,23 @@ def gwas(Y,G,K,restricted_max_likelihood=True,refit=False,verbose=True): collect.append(snp_id) - if not numThreads or numThreads > 1: + if numThreads==1: + print "Running on 1 THREAD" + compute_snp(count/1000,n,collect,lmm2,reml,q) + j,lst = q.get() + # for line in lines: + # out.write(line) + res.append(lst) + else: for job in range(int(count/1000)-completed): - j,lines = q.get(True,15) # time out + j,lst = q.get(True,15) # time out if verbose: sys.stderr.write("Job "+str(j)+" finished\n") - for line in lines: - out.write(line) - else: - print "Running on 1 THREAD" - # print collect - compute_snp(count/1000,n,collect,lmm2,reml,q) - j,lines = q.get() - for line in lines: - out.write(line) - - # print collect - ps = [item[1][3] for item in collect] - ts = [item[1][2] for item in collect] - print ps + res.append(lst) + + # print res + ts = [item[0] for res1 in res for item in res1] + ps = [item[1] for res1 in res for item in res1] + # ts = [item[1] for item in res] + # print ps return ts,ps diff --git a/wqflask/wqflask/my_pylmm/pyLMM/lmm2.py b/wqflask/wqflask/my_pylmm/pyLMM/lmm2.py index cba47a9b..d50b0111 100644 --- a/wqflask/wqflask/my_pylmm/pyLMM/lmm2.py +++ b/wqflask/wqflask/my_pylmm/pyLMM/lmm2.py @@ -329,11 +329,11 @@ class LMM2: """ if False: - print "X=",X - print "h=",h - print "q=",self.q - print "self.Kve=",self.Kve - print "X0t_stack=",self.X0t_stack.shape,self.X0t_stack + print "X=",X + print "h=",h + print "q=",self.q + print "self.Kve=",self.Kve + print "X0t_stack=",self.X0t_stack.shape,self.X0t_stack if stack: # X = np.hstack([self.X0t,matrixMult(self.Kve.T, X)]) -- cgit v1.2.3