aboutsummaryrefslogtreecommitdiff
#! /usr/bin/env guile
!#
;; To run this script run:
;;
;; $ guix shell guile guile-hashing guile3-dbi guile3-dbd-mysql \
;; guile-lmdb -- guile lmdb-publishdata-export.scm
;; conn.scm
;;
;; Example conn.scm:
;; ((sql-username . "webqtlout")
;;  (sql-password . "xxxx")
;;  (sql-database . "db_webqtl")
;;  (sql-host . "localhost")
;;  (sql-port . 3306)
;;  (output-dir . "/tmp/data")
;;  (log-file . "/tmp/export.log"))

(use-modules (dbi dbi)
             (rnrs bytevectors)
             (system foreign)
             (ice-9 match)
             (srfi srfi-1)
             (srfi srfi-26)
	     (srfi srfi-43)
             (rnrs io ports)
             (hashing md5)
             ((lmdb lmdb) #:prefix mdb:)
             (ice-9 format)
             (ice-9 exceptions)
	     (json)
             (logging logger)
             (logging rotating-log)
             (logging port-log)
             (oop goops))


;; Set up logging
(define* (setup-logging #:key (log-file "lmdb-dump-log"))
  "Initialize the logging system with rotating file logs and error port output.
   Creates a new logger, adds rotating and error port handlers,
   sets it as the default logger, and opens the log for writing."
  (let ((lgr (make <logger>))
	(rotating (make <rotating-log>
		    #:num-files 3
		    #:size-limit 1024
		    #:file-name log-file))
	(err (make <port-log> #:port (current-error-port))))
    ;; add the handlers to our logger
    (add-handler! lgr rotating)
    (add-handler! lgr err)
    ;; make this the application's default logger
    (set-default-logger! lgr)
    (open-log! lgr)))

(define (shutdown-logging)
  "Properly shutdown the logging system.
   Flushes any pending log messages, closes the log handlers,
   and removes the default logger reference."
  (flush-log)  ;; since no args, it uses the default
  (close-log!) ;; since no args, it uses the default
  (set-default-logger! #f))

(define (call-with-database backend connection-string proc)
  "Execute PROC with an open database connection.  BACKEND is the
database type (e.g. \"mysql\").  CONNECTION-STRING is the database
connection string."
  (let ((db #f))
    (dynamic-wind
      (lambda ()
        (set! db (dbi-open backend connection-string)))
      (cut proc db)
      (lambda ()
        (when db
          (dbi-close db))))))

(define (call-with-target-database connection-settings proc)
  "Connect to the target database using CONNECTION-SETTINGS and execute
PROC."
  (call-with-database "mysql" (string-join
                               (list (assq-ref connection-settings 'sql-username)
                                     (assq-ref connection-settings 'sql-password)
                                     (assq-ref connection-settings 'sql-database)
                                     "tcp"
                                     (assq-ref connection-settings 'sql-host)
                                     (number->string
                                      (assq-ref connection-settings 'sql-port)))
                               ":")
                      proc))

(define* (lmdb-save path key value)
  "Save a NUM with KEY to PATH."
  (mdb:with-env-and-txn
     (path) (env txn)
     (let ((dbi (mdb:dbi-open txn #f 0)))
       (mdb:put txn dbi key
		(if (number? value)
                    (number->string value)
		    value)))))

(define (sql-exec db statement)
  "Execute an SQL STATEMENT on database connection DB.  Throws an error
if the statement execution fails."
  (dbi-query db statement)
  (database-check-status db))

(define (sql-fold proc init db statement)
  "Fold over SQL query results."
  (sql-exec db statement)
  (let loop ((result init))
    (let ((row (dbi-get_row db)))
      (if row
          (loop (proc row result))
          result))))

(define (sql-for-each proc db statement)
  "Apply PROC to each row returned by STATEMENT."
  (sql-fold (lambda (row _)
              (proc row))
            #f db statement))

(define (sql-map proc db statement)
  "Map PROC over rows returned by STATEMENT."
  (sql-fold (lambda (row result)
              (cons (proc row) result))
            (list) db statement))

(define (sql-find db statement)
  (sql-exec db statement)
  (dbi-get_row db))

(define (database-check-status db)
  "Check the status of the last database operation on DB.  Throws an
error if the status code is non-zero."
  (match (dbi-get_status db)
    ((code . str)
     (unless (zero? code)
       (error str)))))

(define* (save-dataset-values settings)
  "Main function to extract and save dataset values.  Queries the
database for datasets and their values, computes MD5 hashes for
dataset-trait combinations, and saves strain values to LMDB files in
/export5/lmdb-data-hashes/."
  (dynamic-wind
    (lambda ()
      (setup-logging #:log-file (assq-ref settings 'log-file))
      (log-msg 'INFO "Starting dataset value extraction"))
    (lambda ()
      (call-with-target-database
       settings
       (lambda (db)
         (sql-for-each
          (lambda (row)
	    (match row
	      ((("Name" . dataset-name)
		("Id" . trait-id))
			    (let* ((data-query (format #f "SELECT
JSON_ARRAYAGG(JSON_ARRAY(Strain.Name, PublishData.Value)) AS data,
 MD5(JSON_ARRAY(Strain.Name, PublishData.Value)) as md5hash
FROM
    PublishData
    INNER JOIN Strain ON PublishData.StrainId = Strain.Id
    INNER JOIN PublishXRef ON PublishData.Id = PublishXRef.DataId
    INNER JOIN PublishFreeze ON PublishXRef.InbredSetId = PublishFreeze.InbredSetId
LEFT JOIN PublishSE ON
    PublishSE.DataId = PublishData.Id AND
    PublishSE.StrainId = PublishData.StrainId
LEFT JOIN NStrain ON
    NStrain.DataId = PublishData.Id AND
    NStrain.StrainId = PublishData.StrainId
WHERE
    PublishFreeze.Name = \"~a\" AND
    PublishXRef.Id = ~a AND
    PublishFreeze.public > 0 AND
    PublishData.value IS NOT NULL AND
    PublishFreeze.confidentiality < 1
ORDER BY
    LENGTH(Strain.Name), Strain.Name" dataset-name trait-id)))
			      (match (call-with-target-database
				      settings
				      (lambda (db2) (sql-find db2 data-query)))
				((("data" . data)
				  ("md5hash" . md5-hash))
				 (let* ((trait-name (format #f "~a~a" dataset-name trait-id))
					(base-dir (assq-ref settings 'output-dir))
					(out (format #f "~a-~a" trait-name
						     (substring md5-hash 0 12)))
					(out-dir (format #f "~a/~a" base-dir out)))
				   (log-msg
				    'INFO (format #f "Writing ~a to: ~a" trait-name out-dir))
				   (unless (file-exists? out-dir)
				     (mkdir out-dir))
				   (lmdb-save (format #f "~a/index" base-dir) trait-name out)
				   (vector-for-each
				    (lambda (_ x)
				      (match x
					(#(strain value)
					 (lmdb-save out-dir strain value))))
				    (json-string->scm data)))))))))
          db
          "SELECT DISTINCT PublishFreeze.Name, PublishXRef.Id FROM
PublishData INNER JOIN Strain ON PublishData.StrainId = Strain.Id
INNER JOIN PublishXRef ON PublishData.Id = PublishXRef.DataId
INNER JOIN PublishFreeze ON PublishXRef.InbredSetId = PublishFreeze.InbredSetId
LEFT JOIN PublishSE ON
    PublishSE.DataId = PublishData.Id AND
    PublishSE.StrainId = PublishData.StrainId
LEFT JOIN NStrain ON
    NStrain.DataId = PublishData.Id AND
    NStrain.StrainId = PublishData.StrainId
WHERE
    PublishFreeze.public > 0 AND
    PublishFreeze.confidentiality < 1
ORDER BY
    PublishFreeze.Id, PublishXRef.Id"))))
    (lambda ()
      (shutdown-logging))))

(define main
  (match-lambda*
    ((_ connection-settings-file)
     (save-dataset-values (call-with-input-file connection-settings-file
			    read)))
    ((arg0 _ ...)
     (display (format "Usage: ~a CONNECTION-SETTINGS-FILE~%" arg0)
	      (current-error-port))
     (exit #f))))

(apply main (command-line))