about summary refs log tree commit diff
path: root/wqflask
diff options
context:
space:
mode:
Diffstat (limited to 'wqflask')
-rw-r--r--wqflask/wqflask/my_pylmm/pyLMM/kinship.py30
1 files changed, 19 insertions, 11 deletions
diff --git a/wqflask/wqflask/my_pylmm/pyLMM/kinship.py b/wqflask/wqflask/my_pylmm/pyLMM/kinship.py
index a5497a77..967eca81 100644
--- a/wqflask/wqflask/my_pylmm/pyLMM/kinship.py
+++ b/wqflask/wqflask/my_pylmm/pyLMM/kinship.py
@@ -22,7 +22,8 @@ import sys
 import os
 import numpy as np
 import multiprocessing as mp # Multiprocessing is part of the Python stdlib
-import Queue 
+import Queue
+import time
 
 from optmatrix import matrix_initialize, matrixMultT
 
@@ -90,6 +91,8 @@ def kinship(G,options):
 
    q = mp.Queue()
    p = mp.Pool(numThreads, f_init, [q])
+   cpu_num = mp.cpu_count()
+   print "CPU cores:",cpu_num
    iterations = snps/options.computeSize+1
    if options.testing:
       iterations = 8
@@ -105,6 +108,7 @@ def kinship(G,options):
          sys.stderr.write("Processing job %d first %d SNPs\n" % (job, ((job+1)*options.computeSize)))
       W = compute_W(job,G,n,snps,options.computeSize)
       if numThreads == 1:
+         # Single-core
          compute_matrixMult(job,W,q)
          j,x = q.get()
          if options.verbose: sys.stderr.write("Job "+str(j)+" finished\n")
@@ -112,25 +116,29 @@ def kinship(G,options):
          # print j,K_j[:,0]
          K = K + K_j
       else:
+         # Multi-core
          results.append(p.apply_async(compute_matrixMult, (job,W)))
          # Do we have a result?
-         try: 
-            j,x = q.get_nowait()
-            if options.verbose: sys.stderr.write("Job "+str(j)+" finished\n")
-            K_j = x
-            # print j,K_j[:,0]
-            K = K + K_j
-            completed += 1
-         except Queue.Empty:
-            pass
-
+         while (len(results)-completed>cpu_num):
+            try: 
+               j,x = q.get_nowait()
+               if options.verbose: sys.stderr.write("Job "+str(j)+" finished\n")
+               K_j = x
+               # print j,K_j[:,0]
+               K = K + K_j
+               completed += 1
+            except Queue.Empty:
+               pass
+         
    if numThreads == None or numThreads > 1:
+      # results contains the growing result set
       for job in range(len(results)-completed):
          j,x = q.get(True,15)
          if options.verbose: sys.stderr.write("Job "+str(j)+" finished\n")
          K_j = x
          # print j,K_j[:,0]
          K = K + K_j
+         completed += 1
 
    K = K / float(snps)
    outFile = 'runtest.kin'