From a5c1958d5b1abfaaa4ae2a2749ce7f8b0ed3055c Mon Sep 17 00:00:00 2001 From: Munyoki Kilyungi Date: Wed, 4 Mar 2026 18:16:59 +0300 Subject: Revert "transform: add keyset pagination and streaming emit path" This reverts commit ba347362d64f9e634fad7f1aa8ab0a7753ff7a81. Causes and endless loop. See: https://ci.genenetwork.org/jobs/transform-genenetwork-database/99 Signed-off-by: Munyoki Kilyungi --- examples/probesets-experiment-metadata.scm | 9 +- transform/special-forms.scm | 525 ++++++++--------------------- 2 files changed, 143 insertions(+), 391 deletions(-) diff --git a/examples/probesets-experiment-metadata.scm b/examples/probesets-experiment-metadata.scm index d7c0764..b2eb93e 100755 --- a/examples/probesets-experiment-metadata.scm +++ b/examples/probesets-experiment-metadata.scm @@ -70,11 +70,10 @@ (name "ProbeSet Experiments Metadata") (connection %connection-settings) (table-metadata? #f) - ;; Use composite keyset pagination over joined table identifiers. - (keyset-pagination? #t) - (chunk-key-field '("ProbeSetFreeze.Id" "ProbeSet.Id")) - ;; (total-rows 1000000) - (rows-per-chunk 10000) + (total-rows (assoc-ref + (sql-find db "SELECT count(*) AS count from ProbeSetXRef") + "count")) + (rows-per-chunk 1000000) (prefixes '(("dcat:" "") ("gn:" "") diff --git a/transform/special-forms.scm b/transform/special-forms.scm index 3cf6a24..0c07a0a 100644 --- a/transform/special-forms.scm +++ b/transform/special-forms.scm @@ -31,16 +31,19 @@ (else #t))) (define (emit-short-turtle subject po-alist) - ;; Hot path: avoid rescanning remaining pairs on each step (O(n^2)). - ;; Build filtered pairs and count in one pass. - (let collect ((pairs po-alist) (kept '()) (count 0)) - (if (null? pairs) - (let loop ((pairs (reverse kept)) (index 0)) - (match pairs - (((p . o) rest ...) - (if (zero? index) - (format #t "~a " subject) - (display "\t")) + (let loop ((pairs po-alist) (first? #t)) + (match pairs + (((p . o) rest ...) + (if (not (emittable-object? o)) + (loop rest first?) ; skip malformed or empty objects + (begin + ;; subject only once + (when first? + (format #t "~a " subject)) + (when (not first?) + (format #t "\t")) + + ;; emit predicate–object (match o ((? symbol?) (format #t "~a ~a" p (symbol->string o))) @@ -48,47 +51,16 @@ (format #t "~a \"~a\"" p o)) (_ (format #t "~a ~s" p o))) - (if (< (+ index 1) count) - (display " ;\n") - (display " .\n")) - (loop rest (+ index 1))) - (() #f))) - (match (car pairs) - ((p . o) - (if (emittable-object? o) - (collect (cdr pairs) (cons (cons p o) kept) (+ count 1)) - (collect (cdr pairs) kept count))))))) - -(define (make-short-turtle-emitter subject) - "Return two procedures: EMIT! and FINISH! for streaming short-turtle output." - (let ((first? #t) - (pending #f)) - (define (emit-one pair tail) - (let ((p (car pair)) - (o (cdr pair))) - (if first? - (format #t "~a " subject) - (display "\t")) - (match o - ((? symbol?) - (format #t "~a ~a" p (symbol->string o))) - ((? string?) - (format #t "~a \"~a\"" p o)) - (_ - (format #t "~a ~s" p o))) - (display tail) - (set! first? #f))) - (define (emit! predicate object) - (when (emittable-object? object) - (let ((current (cons predicate object))) - (when pending - (emit-one pending " ;\n")) - (set! pending current)))) - (define (finish!) - (when pending - (emit-one pending " .\n") - (set! pending #f))) - (values emit! finish!))) + + ;; separator depends on *remaining emittable pairs* + (if (any (match-lambda + ((p . o) (emittable-object? o))) + rest) + (format #t " ;~%") + (format #t " .~%")) + + (loop rest #f)))) + (() #f)))) @@ -212,76 +184,65 @@ Example: (syntax-case x () ((_ alist actions ...) ;; TODO: Check that all actions are valid. - (let* ((actions #'(actions ...)) - (filter-set-keys - (filter-map (lambda (action) - (syntax-case action (filter-set) - ((filter-set key expression) #'key) - (_ #f))) - actions)) - (set/filter-pairs - (filter-map (lambda (action) - (syntax-case action (set filter-set) - ((set key expression) - #`(cons 'key #,(key->assoc-ref #'evaluated-alist #'expression))) - ((filter-set key expression) - #`(cons 'key #,(key->assoc-ref #'evaluated-alist #'expression))) - (_ #f))) - actions)) - (multiset-exprs - (filter-map (lambda (action) - (syntax-case action (multiset) - ((multiset key expression) - #`(map (cut cons 'key <>) - #,(key->assoc-ref #'evaluated-alist #'expression))) - (_ #f))) - actions)) - (else-proc - (any (lambda (action) - (syntax-case action (else=>) - ((else=> proc) #'proc) - (_ #f))) - actions)) - (unspecified-keys - #`(list - ;; Keys that were referenced - #,@(append-map (lambda (action) - (syntax-case action () - ((_ key expression) - (collect-keys #'expression)) - (_ '()))) - actions) - ;; Keys that were deleted - #,@(filter-map (lambda (action) - (syntax-case action (remove) - ((remove key) #''key) - (_ #f))) - actions) - ;; Keys that were set - #,@(filter-map (lambda (action) - (syntax-case action () - ((_ key expression) #''key) - (_ #f))) - actions)))) - #`(let ((evaluated-alist alist)) - (append - ;; Do set and filter-set. - #,(if (null? filter-set-keys) - #`(list #,@set/filter-pairs) - #`(remove (match-lambda - ((key . #f) - (member key '#,filter-set-keys)) - (_ #f)) - (list #,@set/filter-pairs))) - ;; Do multiset. - #,@multiset-exprs - ;; Apply else=> only when explicitly present. - #,@(if else-proc - (list - #`(filter-map #,else-proc - (alist-delete* evaluated-alist - #,unspecified-keys))) - '())))))))) + #`(let ((evaluated-alist alist)) + (append (remove (match-lambda + ;; Filter out results of filter-set actions. + ((key . #f) + (member key '#,(filter-map (lambda (action) + (syntax-case action (filter-set) + ((filter-set key expression) #'key) + (_ #f))) + #'(actions ...)))) + (_ #f)) + ;; Do set and filter-set. + `#,(filter-map (lambda (action) + (syntax-case action (set filter-set) + ((set key expression) + #`(key . ,#,(key->assoc-ref #'evaluated-alist #'expression))) + ((filter-set key expression) + #`(key . ,#,(key->assoc-ref #'evaluated-alist #'expression))) + (_ #f))) + #'(actions ...))) + ;; Do multiset. + #,@(filter-map (lambda (action) + (syntax-case action (multiset) + ((multiset key expression) + #`(map (cut cons 'key <>) + #,(key->assoc-ref #'evaluated-alist #'expression))) + (_ #f))) + #'(actions ...)) + ;; Apply else=> procedure on unspecified keys. If + ;; no else=> procedure is specified, delete + ;; unspecified keys. + (filter-map #,(or (any (lambda (action) + (syntax-case action (else=>) + ((else=> proc) #'proc) + (_ #f))) + #'(actions ...)) + #'(const #f)) + ;; The unspecified part of the input + ;; alist + (alist-delete* evaluated-alist + (list + ;; Keys that were referenced + #,@(append-map (lambda (action) + (syntax-case action () + ((_ key expression) + (collect-keys #'expression)) + (_ '()))) + #'(actions ...)) + ;; Keys that were deleted + #,@(filter-map (lambda (action) + (syntax-case action (remove) + ((remove key) #''key) + (_ #f))) + #'(actions ...)) + ;; Keys that were set + #,@(filter-map (lambda (action) + (syntax-case action () + ((_ key expression) #''key) + (_ #f))) + #'(actions ...))))))))))) @@ -455,56 +416,22 @@ must be remedied." #'(schema-triples))) ((triples subject predicate-clauses ...) (triples) (find-clause #'(clauses ...) 'triples))) - #`(define* (name db #:key - (metadata? #f) - (data? #t) - (documentation? #f) - (keyset? #f) - (chunk-key "Id") - (limit #f) - (offset #f) - (cursor #f)) - (let* ((base-sql - (select-query #,(collect-fields #'(subject predicate-clauses ...)) - (primary-table other-tables ...) - tables-raw ...)) - (chunk-keys - (if (list? chunk-key) - chunk-key - (list chunk-key))) - (chunk-key-exprs - (map (lambda (k) - (if (string-index k #\.) - k - (format #f "~a.~a" - #,(symbol->string (syntax->datum #'primary-table)) - k))) - chunk-keys)) - (sql-for-keyset - (prepend-select-expr - base-sql - (string-join - (map (lambda (expr idx) - (format #f "~a AS __chunk_key_~a" expr (+ idx 1))) - chunk-key-exprs - (iota (length chunk-key-exprs))) - ", "))) - (sql - (cond - (keyset? - (append-order-limit - (if cursor - (append-filter-clause - sql-for-keyset - (build-keyset-where chunk-key-exprs cursor)) - sql-for-keyset) - (build-keyset-order chunk-key-exprs) - (or limit 100000))) - ((and limit offset) - (format #f "~a LIMIT ~a OFFSET ~a" - base-sql limit offset)) - (else base-sql)))) - (when metadata? + #`(define* (name db #:key + (metadata? #f) + (data? #t) + (documentation? #f) + (limit #f) + (offset #f)) + (let* ((base-sql + (select-query #,(collect-fields #'(subject predicate-clauses ...)) + (primary-table other-tables ...) + tables-raw ...)) + (sql + (if (and limit offset) + (format #f "~a LIMIT ~a OFFSET ~a" + base-sql limit offset) + base-sql))) + (when metadata? #,@(let ((table (symbol->string (syntax->datum #'primary-table))) (subject-type (any (lambda (predicate) (syntax-case predicate (rdf:type) @@ -630,144 +557,16 @@ The above query results to triples that have the form: (list 'triple-predicate ...) (list 'triple-object ...))) (_ (error "Invalid schema triples clause:" #'schema-triples-clause))) - (let ((rows-written 0) - (last-cursor cursor)) - (sql-for-each - (lambda (row) - (set! rows-written (+ rows-written 1)) - (when keyset? - (set! last-cursor - (map (lambda (idx) - (assoc-ref row (format #f "__chunk_key_~a" (+ idx 1)))) - (iota (if (list? chunk-key) - (length chunk-key) - 1))))) - (let ((subject-val #,(field->assoc-ref #'row #'subject))) - (call-with-values - (lambda () (make-short-turtle-emitter subject-val)) - (lambda (emit! finish!) - #,@(map (lambda (predicate-clause) - (syntax-case predicate-clause (set multiset filter-set) - ((set predicate object) - #`(emit! 'predicate - #,(field->assoc-ref #'row #'object))) - ((filter-set predicate object) - #`(let ((val #,(field->assoc-ref #'row #'object))) - (when val - (emit! 'predicate val)))) - ((multiset predicate object) - #`(for-each (lambda (val) - (emit! 'predicate val)) - #,(field->assoc-ref #'row #'object))) - (_ (error "Invalid predicate clause:" predicate-clause)))) - #'(predicate-clauses ...)) - (finish!))))) - db - sql) - (if keyset? - (cons rows-written last-cursor) - rows-written))))))) + (sql-for-each + (lambda (row) + (let* ((subject-val #,(field->assoc-ref #'row #'subject)) + (po-alist + (map-alist row #,@(field->key #'(predicate-clauses ...))))) + (emit-short-turtle subject-val po-alist))) + db + sql)))))) (_ (error "Invalid define-transformer syntax:" (syntax->datum x)))))) -(define (sql-literal v) - (cond - ((number? v) (number->string v)) - ((string? v) - (format #f "'~a'" (replace-substrings v '(("'" . "''"))))) - ((symbol? v) - (format #f "'~a'" (symbol->string v))) - (else - (format #f "'~a'" v)))) - -(define (normalize-keyset-cursor cursor key-count) - (cond - ((not cursor) #f) - ((and (= key-count 1) (not (list? cursor))) - (list cursor)) - ((and (list? cursor) (= (length cursor) key-count)) - cursor) - (else - (error "Invalid keyset cursor, expected " - key-count " values, got: " cursor)))) - -(define (normalize-key-exprs key-spec) - "Accept either a list of SQL key expressions or a key count." - (cond - ((list? key-spec) key-spec) - ((number? key-spec) - (map (lambda (idx) - (format #f "__chunk_key_~a" (+ idx 1))) - (iota key-spec))) - (else - (error "Invalid key specification: " key-spec)))) - -(define (build-keyset-order key-spec) - (string-join - (normalize-key-exprs key-spec) - ", ")) - -(define (build-keyset-where key-spec cursor) - (let* ((key-exprs (normalize-key-exprs key-spec)) - (key-count (length key-exprs)) - (vals (normalize-keyset-cursor cursor key-count))) - (unless vals - (error "build-keyset-where called with empty cursor")) - (string-join - (map (lambda (i) - (let ((eq-prefix - (string-join - (map (lambda (j) - (format #f "~a = ~a" - (list-ref key-exprs j) - (sql-literal (list-ref vals j)))) - (iota i)) - " AND ")) - (gt-part - (format #f "~a > ~a" - (list-ref key-exprs i) - (sql-literal (list-ref vals i))))) - (if (zero? i) - gt-part - (string-append "(" eq-prefix " AND " gt-part ")")))) - (iota key-count)) - " OR "))) - -(define (find-sql-tail-start sql) - (let ((positions - (filter-map identity - (list (string-contains sql " GROUP BY ") - (string-contains sql " ORDER BY ") - (string-contains sql " LIMIT "))))) - (if (null? positions) #f (apply min positions)))) - -(define (append-filter-clause sql clause) - (let* ((tail-start (find-sql-tail-start sql)) - (head (if tail-start (substring sql 0 tail-start) sql)) - (tail (if tail-start (substring sql tail-start) ""))) - (if (string-contains head " WHERE ") - (string-append head " AND (" clause ")" tail) - (string-append head " WHERE (" clause ")" tail)))) - -(define (append-order-limit sql order-expr limit) - (let* ((tail-start (find-sql-tail-start sql)) - (head (if tail-start (substring sql 0 tail-start) sql)) - (tail (if tail-start (substring sql tail-start) ""))) - (string-append head tail - " ORDER BY " order-expr - " LIMIT " (number->string limit)))) - -(define (prepend-select-expr sql expr) - "Prepend EXPR to the SELECT list of SQL." - (cond - ((and (>= (string-length sql) 16) - (string=? (substring sql 0 16) "SELECT DISTINCT ")) - (string-append "SELECT DISTINCT " expr ", " (substring sql 16))) - ((and (>= (string-length sql) 7) - (string=? (substring sql 0 7) "SELECT ")) - (string-append "SELECT " expr ", " (substring sql 7))) - (else - (error "Expected SQL SELECT query" sql)))) - (define (get-keyword-value args keyword default) (let ((kv (memq keyword args))) (if (and kv (>= (length kv) 2)) @@ -783,12 +582,10 @@ The above query results to triples that have the form: (table-metadata? (assoc-ref alist 'table-metadata?)) (prefixes (assoc-ref alist 'prefixes)) (inputs (assoc-ref alist 'inputs)) - (outputs (assoc-ref alist 'outputs)) - (total-rows (assoc-ref alist 'total-rows)) - (rows-per-chunk (assoc-ref alist 'rows-per-chunk)) - (keyset-pagination? (assoc-ref alist 'keyset-pagination?)) - (chunk-key-field (assoc-ref alist 'chunk-key-field)) - (chunking? (and total-rows rows-per-chunk)) + (outputs (assoc-ref alist 'outputs)) + (total-rows (assoc-ref alist 'total-rows)) + (rows-per-chunk (assoc-ref alist 'rows-per-chunk)) + (chunking? (and total-rows rows-per-chunk)) (chunks (if chunking? (ceiling (/ total-rows rows-per-chunk)) 1)) @@ -817,76 +614,32 @@ The above query results to triples that have the form: inputs)) #:encoding "UTF-8")) - ;; Dumping the actual data - (when rdf-path - (if keyset-pagination? - ;; Cursor-based chunking with configurable key column. - (let loop ((chunk-index 0) - (states (map (lambda (_) (cons #f #f)) inputs))) - (unless (every cdr states) - (let* ((out-file - (if (= chunk-index 0) - rdf-path - (string-append (path-without-extension rdf-path) - "." (number->string (+ chunk-index 1)) ".ttl"))) - (next-states - (with-output-to-file - out-file - (lambda () - (for-each - (match-lambda - ((k v) - (prefix k v))) - prefixes) - (newline) - (let walk ((procs inputs) - (st states) - (acc '())) - (if (null? procs) - (reverse acc) - (let* ((state (car st)) - (cursor (car state)) - (done? (cdr state))) - (if done? - (walk (cdr procs) (cdr st) (cons state acc)) - (let* ((result ((car procs) - db - #:metadata? table-metadata? - #:keyset? #t - #:chunk-key (or chunk-key-field "Id") - #:limit rows-per-chunk - #:cursor cursor)) - (rows (car result)) - (next-cursor (cdr result)) - (proc-done? (or (= rows 0) - (< rows rows-per-chunk)))) - (walk (cdr procs) - (cdr st) - (cons (cons next-cursor proc-done?) acc)))))))) - #:encoding "UTF-8"))) - (loop (+ chunk-index 1) next-states)))) - ;; Legacy OFFSET chunking. - (do ((i 0 (+ i 1))) - ((>= i chunks)) - (let* ((offset (* i (or rows-per-chunk 0))) - (out-file - (if (= chunks 1) - rdf-path - (string-append (path-without-extension rdf-path) - "." (number->string (+ i 1)) ".ttl")))) - (with-output-to-file - out-file - (lambda () - (for-each - (match-lambda - ((k v) - (prefix k v))) - prefixes) - (newline) - (for-each - (lambda (proc) - (proc db #:metadata? table-metadata? - #:limit rows-per-chunk - #:offset offset)) - inputs)) - #:encoding "UTF-8"))))))))))) + ;; Dumping the actual data + (when rdf-path + (do ((i 0 (+ i 1))) + ((>= i chunks)) + (let* ((offset (* i (or rows-per-chunk 0))) + (out-file + (if (= chunks 1) + rdf-path + (string-append (path-without-extension rdf-path) + "." (number->string (+ i 1)) ".ttl")))) + (with-output-to-file + out-file + (lambda () + ;; Add the prefixes + (for-each + (match-lambda + ((k v) + (begin + (prefix k v)))) + prefixes) + (newline) + (for-each + (lambda (proc) + (proc db #:metadata? table-metadata? + #:limit rows-per-chunk + #:offset offset)) + inputs)) + #:encoding "UTF-8")))))))))) + -- cgit 1.4.1