diff options
| -rwxr-xr-x | examples/probesets-experiment-metadata.scm | 9 | ||||
| -rw-r--r-- | transform/special-forms.scm | 525 |
2 files changed, 391 insertions, 143 deletions
diff --git a/examples/probesets-experiment-metadata.scm b/examples/probesets-experiment-metadata.scm index b2eb93e..d7c0764 100755 --- a/examples/probesets-experiment-metadata.scm +++ b/examples/probesets-experiment-metadata.scm @@ -70,10 +70,11 @@ (name "ProbeSet Experiments Metadata") (connection %connection-settings) (table-metadata? #f) - (total-rows (assoc-ref - (sql-find db "SELECT count(*) AS count from ProbeSetXRef") - "count")) - (rows-per-chunk 1000000) + ;; Use composite keyset pagination over joined table identifiers. + (keyset-pagination? #t) + (chunk-key-field '("ProbeSetFreeze.Id" "ProbeSet.Id")) + ;; (total-rows 1000000) + (rows-per-chunk 10000) (prefixes '(("dcat:" "<http://www.w3.org/ns/dcat#>") ("gn:" "<http://rdf.genenetwork.org/v1/id/>") diff --git a/transform/special-forms.scm b/transform/special-forms.scm index 0c07a0a..3cf6a24 100644 --- a/transform/special-forms.scm +++ b/transform/special-forms.scm @@ -31,19 +31,16 @@ (else #t))) (define (emit-short-turtle subject po-alist) - (let loop ((pairs po-alist) (first? #t)) - (match pairs - (((p . o) rest ...) - (if (not (emittable-object? o)) - (loop rest first?) ; skip malformed or empty objects - (begin - ;; subject only once - (when first? - (format #t "~a " subject)) - (when (not first?) - (format #t "\t")) - - ;; emit predicate–object + ;; Hot path: avoid rescanning remaining pairs on each step (O(n^2)). + ;; Build filtered pairs and count in one pass. + (let collect ((pairs po-alist) (kept '()) (count 0)) + (if (null? pairs) + (let loop ((pairs (reverse kept)) (index 0)) + (match pairs + (((p . o) rest ...) + (if (zero? index) + (format #t "~a " subject) + (display "\t")) (match o ((? symbol?) (format #t "~a ~a" p (symbol->string o))) @@ -51,16 +48,47 @@ (format #t "~a \"~a\"" p o)) (_ (format #t "~a ~s" p o))) - - ;; separator depends on *remaining emittable pairs* - (if (any (match-lambda - ((p . o) (emittable-object? o))) - rest) - (format #t " ;~%") - (format #t " .~%")) - - (loop rest #f)))) - (() #f)))) + (if (< (+ index 1) count) + (display " ;\n") + (display " .\n")) + (loop rest (+ index 1))) + (() #f))) + (match (car pairs) + ((p . o) + (if (emittable-object? o) + (collect (cdr pairs) (cons (cons p o) kept) (+ count 1)) + (collect (cdr pairs) kept count))))))) + +(define (make-short-turtle-emitter subject) + "Return two procedures: EMIT! and FINISH! for streaming short-turtle output." + (let ((first? #t) + (pending #f)) + (define (emit-one pair tail) + (let ((p (car pair)) + (o (cdr pair))) + (if first? + (format #t "~a " subject) + (display "\t")) + (match o + ((? symbol?) + (format #t "~a ~a" p (symbol->string o))) + ((? string?) + (format #t "~a \"~a\"" p o)) + (_ + (format #t "~a ~s" p o))) + (display tail) + (set! first? #f))) + (define (emit! predicate object) + (when (emittable-object? object) + (let ((current (cons predicate object))) + (when pending + (emit-one pending " ;\n")) + (set! pending current)))) + (define (finish!) + (when pending + (emit-one pending " .\n") + (set! pending #f))) + (values emit! finish!))) @@ -184,65 +212,76 @@ Example: (syntax-case x () ((_ alist actions ...) ;; TODO: Check that all actions are valid. - #`(let ((evaluated-alist alist)) - (append (remove (match-lambda - ;; Filter out results of filter-set actions. - ((key . #f) - (member key '#,(filter-map (lambda (action) - (syntax-case action (filter-set) - ((filter-set key expression) #'key) - (_ #f))) - #'(actions ...)))) - (_ #f)) - ;; Do set and filter-set. - `#,(filter-map (lambda (action) - (syntax-case action (set filter-set) - ((set key expression) - #`(key . ,#,(key->assoc-ref #'evaluated-alist #'expression))) - ((filter-set key expression) - #`(key . ,#,(key->assoc-ref #'evaluated-alist #'expression))) - (_ #f))) - #'(actions ...))) - ;; Do multiset. - #,@(filter-map (lambda (action) - (syntax-case action (multiset) - ((multiset key expression) - #`(map (cut cons 'key <>) - #,(key->assoc-ref #'evaluated-alist #'expression))) - (_ #f))) - #'(actions ...)) - ;; Apply else=> procedure on unspecified keys. If - ;; no else=> procedure is specified, delete - ;; unspecified keys. - (filter-map #,(or (any (lambda (action) - (syntax-case action (else=>) - ((else=> proc) #'proc) - (_ #f))) - #'(actions ...)) - #'(const #f)) - ;; The unspecified part of the input - ;; alist - (alist-delete* evaluated-alist - (list - ;; Keys that were referenced - #,@(append-map (lambda (action) - (syntax-case action () - ((_ key expression) - (collect-keys #'expression)) - (_ '()))) - #'(actions ...)) - ;; Keys that were deleted - #,@(filter-map (lambda (action) - (syntax-case action (remove) - ((remove key) #''key) - (_ #f))) - #'(actions ...)) - ;; Keys that were set - #,@(filter-map (lambda (action) - (syntax-case action () - ((_ key expression) #''key) - (_ #f))) - #'(actions ...))))))))))) + (let* ((actions #'(actions ...)) + (filter-set-keys + (filter-map (lambda (action) + (syntax-case action (filter-set) + ((filter-set key expression) #'key) + (_ #f))) + actions)) + (set/filter-pairs + (filter-map (lambda (action) + (syntax-case action (set filter-set) + ((set key expression) + #`(cons 'key #,(key->assoc-ref #'evaluated-alist #'expression))) + ((filter-set key expression) + #`(cons 'key #,(key->assoc-ref #'evaluated-alist #'expression))) + (_ #f))) + actions)) + (multiset-exprs + (filter-map (lambda (action) + (syntax-case action (multiset) + ((multiset key expression) + #`(map (cut cons 'key <>) + #,(key->assoc-ref #'evaluated-alist #'expression))) + (_ #f))) + actions)) + (else-proc + (any (lambda (action) + (syntax-case action (else=>) + ((else=> proc) #'proc) + (_ #f))) + actions)) + (unspecified-keys + #`(list + ;; Keys that were referenced + #,@(append-map (lambda (action) + (syntax-case action () + ((_ key expression) + (collect-keys #'expression)) + (_ '()))) + actions) + ;; Keys that were deleted + #,@(filter-map (lambda (action) + (syntax-case action (remove) + ((remove key) #''key) + (_ #f))) + actions) + ;; Keys that were set + #,@(filter-map (lambda (action) + (syntax-case action () + ((_ key expression) #''key) + (_ #f))) + actions)))) + #`(let ((evaluated-alist alist)) + (append + ;; Do set and filter-set. + #,(if (null? filter-set-keys) + #`(list #,@set/filter-pairs) + #`(remove (match-lambda + ((key . #f) + (member key '#,filter-set-keys)) + (_ #f)) + (list #,@set/filter-pairs))) + ;; Do multiset. + #,@multiset-exprs + ;; Apply else=> only when explicitly present. + #,@(if else-proc + (list + #`(filter-map #,else-proc + (alist-delete* evaluated-alist + #,unspecified-keys))) + '())))))))) @@ -416,22 +455,56 @@ must be remedied." #'(schema-triples))) ((triples subject predicate-clauses ...) (triples) (find-clause #'(clauses ...) 'triples))) - #`(define* (name db #:key - (metadata? #f) - (data? #t) - (documentation? #f) - (limit #f) - (offset #f)) - (let* ((base-sql - (select-query #,(collect-fields #'(subject predicate-clauses ...)) - (primary-table other-tables ...) - tables-raw ...)) - (sql - (if (and limit offset) - (format #f "~a LIMIT ~a OFFSET ~a" - base-sql limit offset) - base-sql))) - (when metadata? + #`(define* (name db #:key + (metadata? #f) + (data? #t) + (documentation? #f) + (keyset? #f) + (chunk-key "Id") + (limit #f) + (offset #f) + (cursor #f)) + (let* ((base-sql + (select-query #,(collect-fields #'(subject predicate-clauses ...)) + (primary-table other-tables ...) + tables-raw ...)) + (chunk-keys + (if (list? chunk-key) + chunk-key + (list chunk-key))) + (chunk-key-exprs + (map (lambda (k) + (if (string-index k #\.) + k + (format #f "~a.~a" + #,(symbol->string (syntax->datum #'primary-table)) + k))) + chunk-keys)) + (sql-for-keyset + (prepend-select-expr + base-sql + (string-join + (map (lambda (expr idx) + (format #f "~a AS __chunk_key_~a" expr (+ idx 1))) + chunk-key-exprs + (iota (length chunk-key-exprs))) + ", "))) + (sql + (cond + (keyset? + (append-order-limit + (if cursor + (append-filter-clause + sql-for-keyset + (build-keyset-where chunk-key-exprs cursor)) + sql-for-keyset) + (build-keyset-order chunk-key-exprs) + (or limit 100000))) + ((and limit offset) + (format #f "~a LIMIT ~a OFFSET ~a" + base-sql limit offset)) + (else base-sql)))) + (when metadata? #,@(let ((table (symbol->string (syntax->datum #'primary-table))) (subject-type (any (lambda (predicate) (syntax-case predicate (rdf:type) @@ -557,16 +630,144 @@ The above query results to triples that have the form: (list 'triple-predicate ...) (list 'triple-object ...))) (_ (error "Invalid schema triples clause:" #'schema-triples-clause))) - (sql-for-each - (lambda (row) - (let* ((subject-val #,(field->assoc-ref #'row #'subject)) - (po-alist - (map-alist row #,@(field->key #'(predicate-clauses ...))))) - (emit-short-turtle subject-val po-alist))) - db - sql)))))) + (let ((rows-written 0) + (last-cursor cursor)) + (sql-for-each + (lambda (row) + (set! rows-written (+ rows-written 1)) + (when keyset? + (set! last-cursor + (map (lambda (idx) + (assoc-ref row (format #f "__chunk_key_~a" (+ idx 1)))) + (iota (if (list? chunk-key) + (length chunk-key) + 1))))) + (let ((subject-val #,(field->assoc-ref #'row #'subject))) + (call-with-values + (lambda () (make-short-turtle-emitter subject-val)) + (lambda (emit! finish!) + #,@(map (lambda (predicate-clause) + (syntax-case predicate-clause (set multiset filter-set) + ((set predicate object) + #`(emit! 'predicate + #,(field->assoc-ref #'row #'object))) + ((filter-set predicate object) + #`(let ((val #,(field->assoc-ref #'row #'object))) + (when val + (emit! 'predicate val)))) + ((multiset predicate object) + #`(for-each (lambda (val) + (emit! 'predicate val)) + #,(field->assoc-ref #'row #'object))) + (_ (error "Invalid predicate clause:" predicate-clause)))) + #'(predicate-clauses ...)) + (finish!))))) + db + sql) + (if keyset? + (cons rows-written last-cursor) + rows-written))))))) (_ (error "Invalid define-transformer syntax:" (syntax->datum x)))))) +(define (sql-literal v) + (cond + ((number? v) (number->string v)) + ((string? v) + (format #f "'~a'" (replace-substrings v '(("'" . "''"))))) + ((symbol? v) + (format #f "'~a'" (symbol->string v))) + (else + (format #f "'~a'" v)))) + +(define (normalize-keyset-cursor cursor key-count) + (cond + ((not cursor) #f) + ((and (= key-count 1) (not (list? cursor))) + (list cursor)) + ((and (list? cursor) (= (length cursor) key-count)) + cursor) + (else + (error "Invalid keyset cursor, expected " + key-count " values, got: " cursor)))) + +(define (normalize-key-exprs key-spec) + "Accept either a list of SQL key expressions or a key count." + (cond + ((list? key-spec) key-spec) + ((number? key-spec) + (map (lambda (idx) + (format #f "__chunk_key_~a" (+ idx 1))) + (iota key-spec))) + (else + (error "Invalid key specification: " key-spec)))) + +(define (build-keyset-order key-spec) + (string-join + (normalize-key-exprs key-spec) + ", ")) + +(define (build-keyset-where key-spec cursor) + (let* ((key-exprs (normalize-key-exprs key-spec)) + (key-count (length key-exprs)) + (vals (normalize-keyset-cursor cursor key-count))) + (unless vals + (error "build-keyset-where called with empty cursor")) + (string-join + (map (lambda (i) + (let ((eq-prefix + (string-join + (map (lambda (j) + (format #f "~a = ~a" + (list-ref key-exprs j) + (sql-literal (list-ref vals j)))) + (iota i)) + " AND ")) + (gt-part + (format #f "~a > ~a" + (list-ref key-exprs i) + (sql-literal (list-ref vals i))))) + (if (zero? i) + gt-part + (string-append "(" eq-prefix " AND " gt-part ")")))) + (iota key-count)) + " OR "))) + +(define (find-sql-tail-start sql) + (let ((positions + (filter-map identity + (list (string-contains sql " GROUP BY ") + (string-contains sql " ORDER BY ") + (string-contains sql " LIMIT "))))) + (if (null? positions) #f (apply min positions)))) + +(define (append-filter-clause sql clause) + (let* ((tail-start (find-sql-tail-start sql)) + (head (if tail-start (substring sql 0 tail-start) sql)) + (tail (if tail-start (substring sql tail-start) ""))) + (if (string-contains head " WHERE ") + (string-append head " AND (" clause ")" tail) + (string-append head " WHERE (" clause ")" tail)))) + +(define (append-order-limit sql order-expr limit) + (let* ((tail-start (find-sql-tail-start sql)) + (head (if tail-start (substring sql 0 tail-start) sql)) + (tail (if tail-start (substring sql tail-start) ""))) + (string-append head tail + " ORDER BY " order-expr + " LIMIT " (number->string limit)))) + +(define (prepend-select-expr sql expr) + "Prepend EXPR to the SELECT list of SQL." + (cond + ((and (>= (string-length sql) 16) + (string=? (substring sql 0 16) "SELECT DISTINCT ")) + (string-append "SELECT DISTINCT " expr ", " (substring sql 16))) + ((and (>= (string-length sql) 7) + (string=? (substring sql 0 7) "SELECT ")) + (string-append "SELECT " expr ", " (substring sql 7))) + (else + (error "Expected SQL SELECT query" sql)))) + (define (get-keyword-value args keyword default) (let ((kv (memq keyword args))) (if (and kv (>= (length kv) 2)) @@ -582,10 +783,12 @@ The above query results to triples that have the form: (table-metadata? (assoc-ref alist 'table-metadata?)) (prefixes (assoc-ref alist 'prefixes)) (inputs (assoc-ref alist 'inputs)) - (outputs (assoc-ref alist 'outputs)) - (total-rows (assoc-ref alist 'total-rows)) - (rows-per-chunk (assoc-ref alist 'rows-per-chunk)) - (chunking? (and total-rows rows-per-chunk)) + (outputs (assoc-ref alist 'outputs)) + (total-rows (assoc-ref alist 'total-rows)) + (rows-per-chunk (assoc-ref alist 'rows-per-chunk)) + (keyset-pagination? (assoc-ref alist 'keyset-pagination?)) + (chunk-key-field (assoc-ref alist 'chunk-key-field)) + (chunking? (and total-rows rows-per-chunk)) (chunks (if chunking? (ceiling (/ total-rows rows-per-chunk)) 1)) @@ -614,32 +817,76 @@ The above query results to triples that have the form: inputs)) #:encoding "UTF-8")) - ;; Dumping the actual data - (when rdf-path - (do ((i 0 (+ i 1))) - ((>= i chunks)) - (let* ((offset (* i (or rows-per-chunk 0))) - (out-file - (if (= chunks 1) - rdf-path - (string-append (path-without-extension rdf-path) - "." (number->string (+ i 1)) ".ttl")))) - (with-output-to-file - out-file - (lambda () - ;; Add the prefixes - (for-each - (match-lambda - ((k v) - (begin - (prefix k v)))) - prefixes) - (newline) - (for-each - (lambda (proc) - (proc db #:metadata? table-metadata? - #:limit rows-per-chunk - #:offset offset)) - inputs)) - #:encoding "UTF-8")))))))))) - + ;; Dumping the actual data + (when rdf-path + (if keyset-pagination? + ;; Cursor-based chunking with configurable key column. + (let loop ((chunk-index 0) + (states (map (lambda (_) (cons #f #f)) inputs))) + (unless (every cdr states) + (let* ((out-file + (if (= chunk-index 0) + rdf-path + (string-append (path-without-extension rdf-path) + "." (number->string (+ chunk-index 1)) ".ttl"))) + (next-states + (with-output-to-file + out-file + (lambda () + (for-each + (match-lambda + ((k v) + (prefix k v))) + prefixes) + (newline) + (let walk ((procs inputs) + (st states) + (acc '())) + (if (null? procs) + (reverse acc) + (let* ((state (car st)) + (cursor (car state)) + (done? (cdr state))) + (if done? + (walk (cdr procs) (cdr st) (cons state acc)) + (let* ((result ((car procs) + db + #:metadata? table-metadata? + #:keyset? #t + #:chunk-key (or chunk-key-field "Id") + #:limit rows-per-chunk + #:cursor cursor)) + (rows (car result)) + (next-cursor (cdr result)) + (proc-done? (or (= rows 0) + (< rows rows-per-chunk)))) + (walk (cdr procs) + (cdr st) + (cons (cons next-cursor proc-done?) acc)))))))) + #:encoding "UTF-8"))) + (loop (+ chunk-index 1) next-states)))) + ;; Legacy OFFSET chunking. + (do ((i 0 (+ i 1))) + ((>= i chunks)) + (let* ((offset (* i (or rows-per-chunk 0))) + (out-file + (if (= chunks 1) + rdf-path + (string-append (path-without-extension rdf-path) + "." (number->string (+ i 1)) ".ttl")))) + (with-output-to-file + out-file + (lambda () + (for-each + (match-lambda + ((k v) + (prefix k v))) + prefixes) + (newline) + (for-each + (lambda (proc) + (proc db #:metadata? table-metadata? + #:limit rows-per-chunk + #:offset offset)) + inputs)) + #:encoding "UTF-8"))))))))))) |
