#! /usr/bin/env guile !# ;; To run this script run: ;; ;; $ guix shell guile guile-hashing guile3-dbi guile3-dbd-mysql \ ;; guile-lmdb -- guile lmdb-publishdata-export.scm ;; conn.scm ;; ;; Example conn.scm: ;; ((sql-username . "webqtlout") ;; (sql-password . "xxxx") ;; (sql-database . "db_webqtl") ;; (sql-host . "localhost") ;; (sql-port . 3306) ;; (output-dir . "/tmp/data") ;; (log-file . "/tmp/export.log")) (use-modules (dbi dbi) (rnrs bytevectors) (system foreign) (ice-9 match) (srfi srfi-1) (srfi srfi-26) (srfi srfi-43) (rnrs io ports) (hashing md5) ((lmdb lmdb) #:prefix mdb:) (ice-9 format) (ice-9 exceptions) (json) (logging logger) (logging rotating-log) (logging port-log) (oop goops)) ;; Set up logging (define* (setup-logging #:key (log-file "lmdb-dump-log")) "Initialize the logging system with rotating file logs and error port output. Creates a new logger, adds rotating and error port handlers, sets it as the default logger, and opens the log for writing." (let ((lgr (make )) (rotating (make #:num-files 3 #:size-limit 1024 #:file-name log-file)) (err (make #:port (current-error-port)))) ;; add the handlers to our logger (add-handler! lgr rotating) (add-handler! lgr err) ;; make this the application's default logger (set-default-logger! lgr) (open-log! lgr))) (define (shutdown-logging) "Properly shutdown the logging system. Flushes any pending log messages, closes the log handlers, and removes the default logger reference." (flush-log) ;; since no args, it uses the default (close-log!) ;; since no args, it uses the default (set-default-logger! #f)) (define (call-with-database backend connection-string proc) "Execute PROC with an open database connection. BACKEND is the database type (e.g. \"mysql\"). CONNECTION-STRING is the database connection string." (let ((db #f)) (dynamic-wind (lambda () (set! db (dbi-open backend connection-string))) (cut proc db) (lambda () (when db (dbi-close db)))))) (define (call-with-target-database connection-settings proc) "Connect to the target database using CONNECTION-SETTINGS and execute PROC." (call-with-database "mysql" (string-join (list (assq-ref connection-settings 'sql-username) (assq-ref connection-settings 'sql-password) (assq-ref connection-settings 'sql-database) "tcp" (assq-ref connection-settings 'sql-host) (number->string (assq-ref connection-settings 'sql-port))) ":") proc)) (define* (lmdb-save path key value) "Save a NUM with KEY to PATH." (mdb:with-env-and-txn (path) (env txn) (let ((dbi (mdb:dbi-open txn #f 0))) (mdb:put txn dbi key (if (number? value) (number->string value) value))))) (define (sql-exec db statement) "Execute an SQL STATEMENT on database connection DB. Throws an error if the statement execution fails." (dbi-query db statement) (database-check-status db)) (define (sql-fold proc init db statement) "Fold over SQL query results." (sql-exec db statement) (let loop ((result init)) (let ((row (dbi-get_row db))) (if row (loop (proc row result)) result)))) (define (sql-for-each proc db statement) "Apply PROC to each row returned by STATEMENT." (sql-fold (lambda (row _) (proc row)) #f db statement)) (define (sql-map proc db statement) "Map PROC over rows returned by STATEMENT." (sql-fold (lambda (row result) (cons (proc row) result)) (list) db statement)) (define (sql-find db statement) (sql-exec db statement) (dbi-get_row db)) (define (database-check-status db) "Check the status of the last database operation on DB. Throws an error if the status code is non-zero." (match (dbi-get_status db) ((code . str) (unless (zero? code) (error str))))) (define* (save-dataset-values settings) "Main function to extract and save dataset values. Queries the database for datasets and their values, computes MD5 hashes for dataset-trait combinations, and saves strain values to LMDB files in /export5/lmdb-data-hashes/." (dynamic-wind (lambda () (setup-logging #:log-file (assq-ref settings 'log-file)) (log-msg 'INFO "Starting dataset value extraction")) (lambda () (call-with-target-database settings (lambda (db) (sql-for-each (lambda (row) (match row ((("Name" . dataset-name) ("Id" . trait-id)) (let* ((md5-hash (md5->string (md5 (string->bytevector (format #f "~a-~a" dataset-name trait-id) (make-transcoder (utf-8-codec)))))) (data-dir (assq-ref settings 'output-dir)) (md5-hash-dir (format #f "~a/~a" data-dir md5-hash)) (data-query (format #f "SELECT JSON_ARRAYAGG(JSON_ARRAY(Strain.Name, PublishData.Value)) AS data, MD5(JSON_ARRAY(Strain.Name, PublishData.Value)) as md5hash FROM PublishData INNER JOIN Strain ON PublishData.StrainId = Strain.Id INNER JOIN PublishXRef ON PublishData.Id = PublishXRef.DataId INNER JOIN PublishFreeze ON PublishXRef.InbredSetId = PublishFreeze.InbredSetId LEFT JOIN PublishSE ON PublishSE.DataId = PublishData.Id AND PublishSE.StrainId = PublishData.StrainId LEFT JOIN NStrain ON NStrain.DataId = PublishData.Id AND NStrain.StrainId = PublishData.StrainId WHERE PublishFreeze.Name = \"~a\" AND PublishXRef.Id = ~a AND PublishFreeze.public > 0 AND PublishData.value IS NOT NULL AND PublishFreeze.confidentiality < 1 ORDER BY LENGTH(Strain.Name), Strain.Name" dataset-name trait-id))) (match (call-with-target-database settings (lambda (db2) (sql-find db2 data-query))) ((("data" . data) ("md5hash" . dataset-hash)) (let ((lmdb-dir (string-join data-dir "/" md5-hash "-" dataset-hash))) (log-msg 'INFO (format #f "Writing ~a-~a to: ~a" dataset-name trait-id lmdb-dir)) (unless (file-exists? data-dir) (mkdir data-dir)) (lmdb-save (string-join data-dir "/index") (string-join (list dataset-name "-" trait-id)) (string-join (list md5-hash "-" dataset-hash))) (vector-for-each (lambda (_ x) (match x (#(strain value) (lmdb-save lmdb-dir strain value)))) (json-string->scm data))))))))) db "SELECT DISTINCT PublishFreeze.Name, PublishXRef.Id FROM PublishData INNER JOIN Strain ON PublishData.StrainId = Strain.Id INNER JOIN PublishXRef ON PublishData.Id = PublishXRef.DataId INNER JOIN PublishFreeze ON PublishXRef.InbredSetId = PublishFreeze.InbredSetId LEFT JOIN PublishSE ON PublishSE.DataId = PublishData.Id AND PublishSE.StrainId = PublishData.StrainId LEFT JOIN NStrain ON NStrain.DataId = PublishData.Id AND NStrain.StrainId = PublishData.StrainId WHERE PublishFreeze.public > 0 AND PublishFreeze.confidentiality < 1 ORDER BY PublishFreeze.Id, PublishXRef.Id")))) (lambda () (shutdown-logging)))) (define main (match-lambda* ((_ connection-settings-file) (save-dataset-values (call-with-input-file connection-settings-file read))) ((arg0 _ ...) (display (format "Usage: ~a CONNECTION-SETTINGS-FILE~%" arg0) (current-error-port)) (exit #f)))) (apply main (command-line))