From 8db501749d47964b436bc63cafb80a17bfd42396 Mon Sep 17 00:00:00 2001 From: Piotr Szarmanski Date: Sun, 25 Sep 2022 19:54:42 +0200 Subject: Add parallel decoder. --- eris.asd | 5 +- src/eris-decode.lisp | 55 +++++++++++-------- src/package.lisp | 3 +- src/parallel-decoder.lisp | 136 ++++++++++++++++++++++++++++++++++++++++++++++ tests/decode-tests.lisp | 12 ++-- tests/encode-tests.lisp | 4 +- tests/package.lisp | 2 +- tests/parallel-tests.lisp | 38 +++++++++++++ 8 files changed, 219 insertions(+), 36 deletions(-) create mode 100644 src/parallel-decoder.lisp create mode 100644 tests/parallel-tests.lisp diff --git a/eris.asd b/eris.asd index 7f80a49..a1bdbb9 100644 --- a/eris.asd +++ b/eris.asd @@ -2,7 +2,7 @@ :name "eris" :author "mail@ykonai.net" :license "LGPLv3 or later" - :depends-on ("ironclad" "alexandria" "function-cache") + :depends-on ("ironclad" "alexandria" "trivial-gray-streams" "function-cache" "bordeaux-threads" #+unix "osicat" #+unix "mmap") :components ((:module "src" :serial t @@ -12,7 +12,8 @@ (:file "conditions") (:file "base32") (:file "eris") - (:file "eris-decode")))) + (:file "eris-decode") + #+unix (:file "parallel-decoder")))) :in-order-to ((test-op (test-op :eris/test)))) (defsystem "eris/test" diff --git a/src/eris-decode.lisp b/src/eris-decode.lisp index fe2dcbe..351da72 100644 --- a/src/eris-decode.lisp +++ b/src/eris-decode.lisp @@ -26,6 +26,10 @@ fetched from a trusted party.") (unless (equalp ,hash hash) (error 'hash-mismatch :reference ,hash :hash hash ))))) +(defmacro execute-fetch-function (fetch-function &rest args) + `(restart-case (funcall ,fetch-function ,@args) + (use-value (value) value))) + (defun key-reference-null? (kr) (and (equalp (reference kr) null-secret) (equalp (key kr) null-secret))) @@ -222,7 +226,7 @@ cache." :capacity cache-capacity :table (make-hash-table :size (1+ cache-capacity) :test #'equalp)) (reference key &optional nonce) - (let* ((block (funcall fetch-function reference))) + (let* ((block (execute-fetch-function fetch-function reference))) (unless block (error 'missing-block :reference reference)) (hash-check block reference) (decrypt-block block key nonce)))) @@ -262,34 +266,17 @@ cache." :eof (find-eof root get-block block-size level) :nonce-array (initialize-nonce-array level))))))) -(defmethod stream-file-position ((stream eris-decode-stream) &optional (set-position nil)) - "Provides the file position of the stream. If the optional second argument is -set, try to move the stream to that position. It may signal an EOF condition if -the new position is beyond the end of file.." - ;; NOTE: this should accept a "file-spec", which I believe is either an int, a - ;; :start or an :end. This only accepts a number. - (with-slots (position block-size buffer eof) stream - (when set-position - (let ((buffer-pos (mod set-position block-size))) - (if (< set-position eof) - (cond - ;; If the pos is within the buffer (and initialized): - ((and (<= 0 (- set-position (- position (pos buffer))) (1- block-size)) - (not (minusp (pos buffer)))) - (setf (pos buffer) buffer-pos - position set-position)) - - (t (reupdate-block stream set-position) - (setf (pos buffer) buffer-pos))) - (error 'eof :eof eof :position position)))) - position)) +(defmethod stream-file-position ((stream eris-decode-stream)) + "Provides the file position of the stream. This method is setf-able in order to +change the position." + (pos stream)) -(defmethod stream-read-sequence ((stream eris-decode-stream) seq &optional (start 0) (end (length seq))) +(defmethod stream-read-sequence ((stream eris-decode-stream) seq start end &key) (when (minusp (pos (buffer stream))) ;; initializes the buffer (reupdate-block stream (pos stream))) (with-slots (buffer position) stream - (read-to-seq seq buffer :start start :end (if end end (length seq)) :stream stream))) + (read-to-seq seq buffer :start start :end end :stream stream))) (defmethod stream-read-byte ((stream eris-decode-stream)) (when (minusp (pos (buffer stream))) @@ -313,3 +300,23 @@ the new position is beyond the end of file.." (defun eris-file-length (stream) "This is the equivalent of \"file-length\" for eris-decode-stream." (eof stream)) + +(defmethod (setf stream-file-position) (set-position (stream eris-decode-stream)) + (with-slots (position block-size buffer eof) stream + (when set-position + (case set-position + (:end (setf set-position eof)) + (:start (setf set-position 0))) + (let ((buffer-pos (mod set-position block-size))) + (if (< set-position eof) + (cond + ;; If the pos is within the buffer (and initialized): + ((and (<= 0 (- set-position (- position (pos buffer))) (1- block-size)) + (not (minusp (pos buffer)))) + (setf (pos buffer) buffer-pos + position set-position)) + + (t (reupdate-block stream set-position) + (setf (pos buffer) buffer-pos))) + (error 'eof :eof eof :position position)))) + set-position)) diff --git a/src/package.lisp b/src/package.lisp index b97799c..3041219 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -15,10 +15,11 @@ (defpackage eris - (:use common-lisp sb-gray alexandria trivia function-cache) + (:use common-lisp trivial-gray-streams alexandria trivia function-cache) (:export #:eris-encode #:eris-decode + #:eris-decode-parallel #:32kib #:1kib #:null-secret diff --git a/src/parallel-decoder.lisp b/src/parallel-decoder.lisp new file mode 100644 index 0000000..9af1ceb --- /dev/null +++ b/src/parallel-decoder.lisp @@ -0,0 +1,136 @@ +;; This file is part of eris-cl. +;; Copyright (C) 2022 Piotr Szarmański + +;; eris-cl is free software: you can redistribute it and/or modify it under the +;; terms of the GNU Lesser General Public License as published by the Free +;; Software Foundation, either version 3 of the License, or (at your option) any +;; later versqion. + +;; eris-cl is distributed in the hope that it will be useful, but WITHOUT ANY +;; WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +;; A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +;; You should have received a copy of the GNU General Public License along with +;; eris-cl. If not, see . + +(in-package :eris) + +(defun split-list-equally (list parts) + (let* ((len (length list)) + (mod (mod len parts)) + (base (/ (- len mod) parts))) + (if (< len parts) + (map 'list #'list list) + (loop with pos = 0 + for i from (1- parts) downto 0 + collecting (subseq + list + pos + (if (<= mod i) + (setf pos (+ pos base)) + (setf pos (+ pos base 1)))))))) + +(defun mem-write-vector (vector ptr &optional (offset 0) (count (length vector))) + (declare (type (simple-array (unsigned-byte 8)) vector) + (type fixnum offset count)) + (declare (optimize ;; (speed 3) (safety 0) (space 0) + (debug 3))) + (loop for i below count + for off from offset + do (setf (cffi:mem-ref ptr :unsigned-char off) (aref vector i)))) + +(defclass reference-pair+ (reference-pair) + ((index :initarg :index :accessor index :type (integer 0 32768)))) + +(defun map-over-key-references (function block) + (loop for i from 0 to (1- (/ (length block) 64)) + for key-ref = (octets-to-reference-pair (subseq-shared block (* 64 i))) + until (key-reference-null? key-ref) + do (funcall function key-ref i))) + +(defun decode-blocks (reference-pair-list level block-capacity fetch-function output-file cache-capacity last-block) + (lambda () + (mmap:with-mmap (addr fd size output-file :open :write :protection :write :mmap :shared) + (let ((get-block (cached-lambda (:cache-class 'lru-cache + :capacity cache-capacity + :table (make-hash-table :size (1+ cache-capacity) :test #'equalp)) + (reference key &optional nonce) + (let* ((block (execute-fetch-function fetch-function reference))) + (unless block (error 'missing-block :reference reference)) + (hash-check block reference) + (decrypt-block block key nonce)))) + (nonce-array (initialize-nonce-array level))) + (labels ((descend (level reference-pair block-id) + (let ((block (funcall get-block (reference reference-pair) (key reference-pair) (aref nonce-array level)))) + (if (zerop level) + (if (= last-block block-id) + (mem-write-vector block addr (* 64 block-capacity block-id) (unpad-block block)) + (mem-write-vector block addr (* 64 block-capacity block-id))) + ;; (bordeaux-threads:with-lock-held (lock) + ;; (file-position stream (* 64 block-capacity block-id)) + ;; (write-sequence block stream)) + (map-over-key-references + (lambda (key-ref i) + (descend (1- level) key-ref (+ i (* block-capacity block-id)))) + block))))) + (mapc (lambda (key-ref) + (descend level key-ref (index key-ref))) + reference-pair-list)))))) + +(defun eris-decode-parallel (read-capability fetch-function output-file + &key (cache-capacity 4096) (threads 4) (initial-bindings bordeaux-threads:*default-special-bindings*)) + "Decode an ERIS READ-CAPABILITY in parallel using THREADS threads into a file +designated by OUTPUT-FILE. + +Fetch-function must be a function with one argument, the reference octet, which +returns a (simple-array (unsigned-byte 8)) containing the block. The block will +be destructively modified, so you MUST provide a fresh array every time. In +addition, the function MUST be thread-safe. + +CACHE-CAPACITY indicates the total amount of blocks stored for all threads. Each +thread has its own cache." + (declare (type read-capability read-capability) + (type function fetch-function) + (type integer cache-capacity)) + (with-slots (level block-size root-reference-pair) read-capability + (let ((root (decrypt-block (execute-fetch-function fetch-function (reference root-reference-pair)) + (key root-reference-pair) + (make-nonce level)))) + (when (> level 0) (hash-check root (key root-reference-pair))) + (case level + (0 (with-open-file (file output-file :direction :output :element-type '(unsigned-byte 8)) + (write-sequence root file :end (unpad-block root)))) + (t (let* ((initial-list + (loop for i from 0 to (/ block-size 64) + for key-ref = (octets-to-reference-pair (subseq-shared root (* 64 i))) + until (key-reference-null? key-ref) + collect key-ref)) + (list (split-list-equally + (loop for i from 0 to (1- (length initial-list)) + collecting (change-class (elt initial-list i) 'reference-pair+ :index i)) + threads)) + ;; (lock (bordeaux-threads:make-lock "stream-lock")) + (eof (find-eof root + (lambda (reference key nonce) + (let* ((block (execute-fetch-function fetch-function reference))) + (unless block (error 'missing-block :reference reference)) + (hash-check block reference) + (decrypt-block block key nonce))) + block-size + level))) + (let ((fd (osicat-posix:creat output-file #o666))) + (osicat-posix:posix-fallocate fd 0 eof) + (osicat-posix:close fd)) + (map 'nil #'bordeaux-threads:join-thread + (map 'list (lambda (reference-pairs) + (bordeaux-threads:make-thread + (decode-blocks reference-pairs + (1- level) + (/ block-size 64) + fetch-function + output-file + (truncate (/ cache-capacity threads)) + (truncate (/ eof block-size))) + :initial-bindings initial-bindings)) + list)))))))) + diff --git a/tests/decode-tests.lisp b/tests/decode-tests.lisp index af3edb0..ab9b56a 100644 --- a/tests/decode-tests.lisp +++ b/tests/decode-tests.lisp @@ -33,9 +33,9 @@ (read-capability (eris-encode array ,block-size #'hashtable-encode)) (decoded-array (make-array (length array) :element-type '(unsigned-byte 8))) (stream (eris-decode read-capability #'hashtable-decode))) - (stream-read-sequence stream decoded-array) + (stream-read-sequence stream decoded-array 0 (length decoded-array)) (is (equalp decoded-array array)) - (file-position stream 0) + (setf (stream-file-position stream) 0) (is (equalp array (alexandria:read-stream-content-into-byte-vector stream))))) @@ -124,8 +124,8 @@ (read-capability (eris-encode array ,block-size #'hashtable-encode)) (buf (make-array 24 :element-type '(unsigned-byte 8))) (stream (eris-decode read-capability #'hashtable-decode))) - (stream-file-position stream ,pos) - (stream-read-sequence stream buf) + (setf (stream-file-position stream) ,pos) + (stream-read-sequence stream buf 0 (length buf)) ;; (print (pos (buffer stream))) ;; (print (+ 24 ,buffer-pos)) ;; (print (pos stream)) @@ -146,7 +146,7 @@ (read-capability (eris-encode array ,block-size #'hashtable-encode)) (stream (eris-decode read-capability #'hashtable-decode))) (signals ,condition - (stream-file-position stream ,pos)))) + (setf (stream-file-position stream) ,pos)))) (test random-access-eof-1kib (assert-random-access-condition (make-octets 512 :element 1) 1024 512 eof) @@ -189,7 +189,7 @@ (read-capability (eris-encode array ,block-size #'hashtable-encode)) (decoded-array (make-array (length array) :element-type '(unsigned-byte 8))) (stream (eris-decode read-capability #'hashtable-decode))) - (stream-read-sequence stream decoded-array) + (stream-read-sequence stream decoded-array 0 (length decoded-array)) (is (equalp (length array) (eof stream))))) diff --git a/tests/encode-tests.lisp b/tests/encode-tests.lisp index c6b50d4..6c04444 100644 --- a/tests/encode-tests.lisp +++ b/tests/encode-tests.lisp @@ -37,11 +37,11 @@ "urn:eris:BIAD77QDJMFAKZYH2DXBUZYAP3MXZ3DJZVFYQ5DFWC6T65WSFCU5S2IT4YZGJ7AC4SYQMP2DM2ANS2ZTCP3DJJIRV733CRAAHOSWIYZM3M")) ;; simple gray stream class for this particular construction. -(defclass null-stream (fundamental-binary-stream) +(defclass null-stream (fundamental-binary-input-stream) ((counter :initform 0 :accessor counter) (max-counter :initarg :max-counter))) -(defmethod stream-read-sequence ((stream null-stream) seq &optional start end) +(defmethod stream-read-sequence ((stream null-stream) seq start end &key) (with-slots (counter max-counter) stream (if (eql counter max-counter) 0 diff --git a/tests/package.lisp b/tests/package.lisp index 71d6c82..98fdd81 100644 --- a/tests/package.lisp +++ b/tests/package.lisp @@ -1,5 +1,5 @@ (defpackage eris/test - (:use common-lisp eris fiveam sb-gray ironclad)) + (:use common-lisp eris fiveam trivial-gray-streams ironclad)) (in-package :eris/test) diff --git a/tests/parallel-tests.lisp b/tests/parallel-tests.lisp new file mode 100644 index 0000000..c139b31 --- /dev/null +++ b/tests/parallel-tests.lisp @@ -0,0 +1,38 @@ +;; This file is part of eris-cl. +;; Copyright (C) 2022 Piotr Szarmański + +;; eris-cl is free software: you can redistribute it and/or modify it under the +;; terms of the GNU Lesser General Public License as published by the Free +;; Software Foundation, either version 3 of the License, or (at your option) any +;; later versqion. + +;; eris-cl is distributed in the hope that it will be useful, but WITHOUT ANY +;; WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +;; A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +;; You should have received a copy of the GNU General Public License along with +;; eris-cl. If not, see . + +(in-package :eris/test) + +(def-suite* parallel-tests :in eris-tests) + +(defmacro assert-parallel-decode (array block-size) + `(uiop:with-temporary-file (:stream output-file :pathname pathname :direction :io) + (let* ((*table* (make-hash-table :test #'equalp)) + (array ,array) + (read-capability (eris-encode array ,block-size #'hashtable-encode))) + (eris-decode-parallel read-capability #'hashtable-decode pathname + :initial-bindings (acons '*table* *table* bordeaux-threads:*default-special-bindings*) + :threads 4) + (is (equalp array + (alexandria:read-stream-content-into-byte-vector output-file)))))) + +(test simple-parallel-decode + (assert-parallel-decode (make-octets 4096 :element 101) 1024) + (assert-parallel-decode (make-octets 4095 :element 102) 1024) + (assert-parallel-decode (make-octets 18000 :element 103) 1024) + (assert-parallel-decode (make-octets 128000 :element 104) 32768) + (assert-parallel-decode (make-octets 131071 :element 104) 32768) + (assert-parallel-decode (make-octets 131072 :element 104) 32768) + (assert-parallel-decode (make-octets 131073 :element 104) 32768)) -- cgit v1.2.3