From fed34b8df2b19ba996ef810f44d5666437c90fb7 Mon Sep 17 00:00:00 2001 From: Piotr Szarmanski Date: Fri, 25 Aug 2023 19:09:50 +0200 Subject: Add the parallel encoder. --- NEWS | 3 ++ README | 16 +++++++- eris.asd | 29 +++++++------- src/backend.lisp | 21 +++++++++++ src/eris-parallel.lisp | 94 ++++++++++++++++++++++++++++++++++++++++++++++ src/eris.lisp | 41 +++++++++++--------- src/file-backend.lisp | 2 +- src/package.lisp | 4 +- tests/decode-tests.lisp | 7 ++-- tests/encode-parallel.lisp | 58 ++++++++++++++++++++++++++++ 10 files changed, 238 insertions(+), 37 deletions(-) create mode 100644 src/eris-parallel.lisp create mode 100644 tests/encode-parallel.lisp diff --git a/NEWS b/NEWS index e64efb6..dacb30a 100644 --- a/NEWS +++ b/NEWS @@ -6,6 +6,9 @@ + Removed the :hash-output, as it is rather niche (outputting identical blocks practically does not happen outside of testing) and in the cases where it might be useful, it's simpler to do within the output-function. ++ Added a parallel encoder, accessible through ~p/eris-encode~ as well as + ~p/{fetch,store}-data~. ++ ~lparallel~ added as a dependency rather than ~bordeaux-threads~. * 0.2 *Backwards incompatible*: + The eris-encode function now takes an OUTPUT-FUNCTION diff --git a/README b/README index 53d4c6d..e3262f5 100644 --- a/README +++ b/README @@ -32,7 +32,16 @@ function can be used to decode an ERIS read-capability. It returns a stream of the class ERIS-DECODE-STREAM: this class implements the Gray streams protocol. See the docstrings of the specific functions for more details. However, you -should only use these to write custom backends; otherwise, see below.. +should only use these to write custom backends; otherwise, see below. + +NOTES ON ERIS-DECODE-STREAM: + +The implementation of all the methods behaves similarly to regular ones, except +for FILE-POSITION with two arguments. On SBCL and a few other CL +implementations, the implementation of the ordinary FILE-POSITION never fails. +However, per the spec, it could fail in some cases. As such, the (SETF +STREAM-FILE-POSITION) method on ERIS-DECODE-STREAM does fail if the index is +larger or equal to EOF. @@ -64,6 +73,11 @@ decoding-backend An instance of file-backend can be instanced using (make-instance 'file-backend :directory "my/dir/with/eris/chunks/"). +There are also parallel equivalents, p/fetch-data and p/store-data. These can be +used on backends marked with p/encoding-backend; for example the file-backend. +In practice, any backend that allows concurrent writes should has +p/encoding-backend as a superclass. + For further information, see the docstrings. diff --git a/eris.asd b/eris.asd index 7212699..405c9c4 100644 --- a/eris.asd +++ b/eris.asd @@ -2,21 +2,23 @@ :name "eris" :author "mail@ykonai.net" :license "LGPLv3 or later" - :depends-on ("ironclad" "alexandria" "serapeum" "trivial-gray-streams" "function-cache" "bordeaux-threads" #+unix "osicat" #+unix "mmap") + :depends-on ("ironclad" "alexandria" "serapeum" "trivial-gray-streams" + "function-cache" "lparallel" #+unix "osicat" #+unix "mmap") :components ((:module "src" - :serial t - :components ((:file "cache") - (:file "package") - (:file "common") - (:file "conditions") - (:file "base32") - (:file "eris") - (:file "eris-decode") - (:file "backend") - (:file "file-backend") - (:file "hash-backend") - #+nil (:file "parallel-decoder")))) + :serial t + :components ((:file "cache") + (:file "package") + (:file "common") + (:file "conditions") + (:file "base32") + (:file "eris") + (:file "eris-decode") + (:file "backend") + (:file "file-backend") + (:file "hash-backend") + (:file "eris-parallel") + #+nil (:file "parallel-decoder")))) :in-order-to ((test-op (test-op :eris/test)))) (defsystem "eris/test" @@ -33,4 +35,5 @@ (:file "rfc") (:file "autogenerated-tests") (:file "backend-tests") + (:file "encode-parallel") #+nil (:file "parallel-tests"))))) diff --git a/src/backend.lisp b/src/backend.lisp index d2b81b2..8571e66 100644 --- a/src/backend.lisp +++ b/src/backend.lisp @@ -22,6 +22,9 @@ (defclass decoding-backend () ((fetch-function :type function))) +(defclass p/encoding-backend () + ()) + (defgeneric fetch-data (read-capability backend &key &allow-other-keys) (:documentation "Using the BACKEND, return a stream that decodes the provided READ-CAPABILITY @@ -58,3 +61,21 @@ size less than 16kib. It should be set either to 1024b or 32kib.")) 1kib) output-function :secret secret))) + +(defgeneric p/store-data (input backend &key secret block-size threads) + (:documentation "Like store-data but parallel. lparallel:*kernel* has to be bound before +calling.")) + +(defmethod p/store-data (input (backend p/encoding-backend) + &key (secret null-secret) (block-size 32kib) &allow-other-keys) + (with-slots (output-function) backend + (p/eris-encode input + (if (> (etypecase input + (pathname (file-size input)) + (file-stream (file-length input)) + (vector (length input)) + (t block-size)) + 16384) + 32kib + 1kib) + output-function :secret secret))) diff --git a/src/eris-parallel.lisp b/src/eris-parallel.lisp new file mode 100644 index 0000000..7c2f702 --- /dev/null +++ b/src/eris-parallel.lisp @@ -0,0 +1,94 @@ +;; This file is part of eris-cl. +;; Copyright (C) 2023 Piotr Szarmański + +;; eris-cl is free software: you can redistribute it and/or modify it under the +;; terms of the GNU Lesser General Public License as published by the Free +;; Software Foundation, either version 3 of the License, or (at your option) any +;; later version. + +;; eris-cl is distributed in the hope that it will be useful, but WITHOUT ANY +;; WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +;; A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +;; You should have received a copy of the GNU General Public License along with +;; eris-cl. If not, see . + +(in-package :eris) + + +(defun split-into-n (i parts size) + "Splits an integer I into PARTS parts. Returns a vector of length 1- PARTS with +those parts.." + (declare (type integer i parts size)) + (let* ((k (* size (truncate (/ (/ i size) parts)))) + (vec (make-array + parts + :element-type 'integer + :initial-element k))) + (nlet rec ((remaining (- i (* k parts))) + (i 0)) + (if (< remaining size) + (setf (aref vec (1- parts)) (+ remaining (aref vec (1- parts)))) + (progn (setf (aref vec i) (+ size (aref vec i))) + (rec (- remaining size) (1+ i))))) + vec)) + +(defgeneric p/eris-encode (input block-size output-function &key secret &allow-other-keys) + (:documentation "Encode INPUT in parallel using LPARALLEL. Requires lparallel:*kernel* to be bound.")) + +;; eris-create-tree is not parallelized because it is insignificant on any large +;; input; for 32kib blocks, there's about 512 level 0 blocks for any higher +;; level block, almost three orders of magnitude. +(defmethod p/eris-encode ((input vector) block-size output-function + &key (secret null-secret) &allow-other-keys) + (let ((v (split-into-n (length input) (lparallel:kernel-worker-count) block-size))) + (eris-create-tree + (apply #'concatenate + 'simple-vector + (map + 'list + #'lparallel:force + ;; Recursion to avoid demons + (nreverse + (nlet rec ((i 0) (k 0) (l nil)) + (if (eql i (length v)) l + (rec + (1+ i) + (+ k (aref v i)) + (cons (lparallel:future + (chunk-array + input block-size output-function secret + :start k :end (+ k (aref v i)) + :pad (if (eq i (1- (length v))) t nil))) + l))))))) + block-size output-function))) + +(defmethod p/eris-encode ((input pathname) block-size output-function + &key (secret null-secret) &allow-other-keys) + (let ((v (split-into-n (file-size input) (lparallel:kernel-worker-count) block-size))) + (eris-create-tree + (apply #'concatenate + 'simple-vector + (map + 'list + #'lparallel:force + (nreverse + (nlet rec ((i 0) (k 0) (l nil)) + (if (eql i (length v)) l + (rec + (1+ i) + (+ k (aref v i)) + (cons (lparallel:future + (with-open-file (f input :element-type 'octet) + (file-position f k) + (chunk-stream + f block-size output-function (aref v i) secret + :pad (if (eq i (1- (length v))) t nil)))) + l))))))) + block-size output-function))) + +;; No methods for streams. A stream implementation could be done by allocating n +;; buffers, reading sequentially into each buffer and chunk-array'ing them until +;; eof, but it may be quite unoptimal, depending on the stream, buffer size, +;; etc. + diff --git a/src/eris.lisp b/src/eris.lisp index 196bcce..b23caac 100644 --- a/src/eris.lisp +++ b/src/eris.lisp @@ -151,29 +151,34 @@ versioning bytes are not supported by eris-cl." ;; These CHUNK- functions are written in order to allow processing files in ;; parallel. -(defun chunk-array (array block-size output-function secret &key pad) +(defun chunk-array (array block-size output-function secret &key pad (start 0) (end (length array))) "Split (SIMPLE-ARRAY (UNSIGNED-BYTE 8) that is a multiple of BLOCK-SIZE into chunks, output them and collect references. Returns a vector of references. +START and END behave as expected. Pass PAD as T if the output should be padded." (declare (type block-size block-size) (type octet-vector array)) - (let ((blocks (if pad - (/ (+ (length array) (- block-size (mod (length array) block-size))) block-size) - (/ (length array) block-size)))) - (let ((block (make-octet-vector block-size)) - (rks (make-array blocks :element-type 'octet-vector :initial-element null-secret))) - (loop for i from 0 below (1- blocks) - do (progn - (replace block array :start2 (* block-size i)) - (setf block (output-block rks i)))) - ;; handle last block - (replace block array :start2 (* block-size (1- blocks))) - (when pad - (setf (aref block (mod (length array) block-size)) #x80) - (fill block 0 :start (1+ (mod (length array) block-size)))) - (output-block rks (1- blocks)) - rks))) + (when (and (not pad) (zerop (- end start))) ;; need this because of the loop unrolling + (return-from chunk-array (make-array 0 :element-type 'octet-vector))) + + (let ((length (- end start))) + (let ((blocks (if pad + (/ (+ length (- block-size (mod length block-size))) block-size) + (/ length block-size)))) + (let ((block (make-octet-vector block-size)) + (rks (make-array blocks :element-type 'octet-vector :initial-element null-secret))) + (loop for i from 0 below (1- blocks) + do (progn + (replace block array :start2 (+ start (* block-size i))) + (setf block (output-block rks i)))) + ;; handle last block + (replace block array :start2 (+ start (* block-size (1- blocks)))) + (when pad + (setf (aref block (mod length block-size)) #x80) + (fill block 0 :start (1+ (mod length block-size)))) + (output-block rks (1- blocks)) + rks)))) ;; Implementation note: This is CHUNK-ARRAY but copypasted with (LENGTH ARRAY) @@ -185,6 +190,8 @@ Pass PAD as T if the output should be padded." read and should be a multiple of BLOCK-SIZE unless PAD is T." (declare (type block-size block-size) (type integer length)) + (when (and (not pad) (zerop length)) ;; need this because of the loop unrolling + (return-from chunk-stream (make-array 0 :element-type 'octet-vector))) (let ((blocks (if pad (/ (+ length (- block-size (mod length block-size))) block-size) (/ length block-size)))) diff --git a/src/file-backend.lisp b/src/file-backend.lisp index 6174cf7..db90d82 100644 --- a/src/file-backend.lisp +++ b/src/file-backend.lisp @@ -15,7 +15,7 @@ (in-package :eris) -(defclass file-backend (encoding-backend decoding-backend) +(defclass file-backend (encoding-backend decoding-backend p/encoding-backend) ((directory :initarg :directory :type (or string pathname) :documentation "Directory containing ERIS data."))) diff --git a/src/package.lisp b/src/package.lisp index dd6aa0f..51ca5c7 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -19,7 +19,7 @@ (:export #:eris-encode #:eris-decode - #:eris-decode-parallel + #:p/eris-encode #:32kib #:1kib #:null-secret @@ -51,8 +51,10 @@ #:store-data #:fetch-data + #:p/store-data #:encoding-backend #:decoding-backend + #:p/encoding-backend #:file-backend #:hash-backend #:output-function diff --git a/tests/decode-tests.lisp b/tests/decode-tests.lisp index 918cd82..162f32b 100644 --- a/tests/decode-tests.lisp +++ b/tests/decode-tests.lisp @@ -97,7 +97,7 @@ (assert-array-decode (make-octets 32767 :element 9) 1024) (assert-array-decode (make-octets 32768 :element 10) 1024) (assert-array-decode (make-octets 131072 :element 11) 1024) - (for-all ((buffer (gen-buffer :length (gen-integer :min 0 :max 40000)))) + (for-all ((buffer (gen-buffer :length (gen-integer :min 1 :max 40000)))) (assert-array-decode buffer 1024))) (test simple-decoding-32kib @@ -107,7 +107,7 @@ (assert-array-decode (make-octets 32769 :element 2) 32kib) (assert-array-decode (make-octets 32768 :element 2) 32kib) (assert-array-decode (make-octets 16777216 :element 2) 32kib) - (for-all ((buffer (gen-buffer :length (gen-integer :min 0 :max 70000)))) + (for-all ((buffer (gen-buffer :length (gen-integer :min 1 :max 70000)))) (assert-array-decode buffer 32kib))) (test proper-return-values @@ -199,7 +199,7 @@ (assert-length (make-array 2048 :element-type '(unsigned-byte 8) :initial-element 2) 1024) (assert-length (make-array 16383 :element-type '(unsigned-byte 8) :initial-element 2) 1024) (assert-length (make-array 16384 :element-type '(unsigned-byte 8) :initial-element 2) 1024) - (for-all ((buffer (gen-buffer :length (gen-integer :min 0 :max 40000)))) + (for-all ((buffer (gen-buffer :length (gen-integer :min 1 :max 40000)))) (assert-length buffer 1024))) @@ -243,4 +243,3 @@ (assert-read-byte-error (make-octets 1024 :element 2) 32kib) (assert-read-byte-error (make-octets 32767 :element 2) 32kib) (assert-read-byte-error (make-octets 32768 :element 2) 32kib)) - diff --git a/tests/encode-parallel.lisp b/tests/encode-parallel.lisp new file mode 100644 index 0000000..4bc6c23 --- /dev/null +++ b/tests/encode-parallel.lisp @@ -0,0 +1,58 @@ +;; This file is part of eris-cl. +;; Copyright (C) 2023 Piotr Szarmański + +;; eris-cl is free software: you can redistribute it and/or modify it under the +;; terms of the GNU Lesser General Public License as published by the Free +;; Software Foundation, either version 3 of the License, or (at your option) any +;; later version. + +;; eris-cl is distributed in the hope that it will be useful, but WITHOUT ANY +;; WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +;; A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +;; You should have received a copy of the GNU General Public License along with +;; eris-cl. If not, see . + +(in-package :eris/test) + +(def-suite* parallel-encoding-tests :in eris-tests) + +(defmacro with-lp-kernel (threads &body expr ) + `(let ((lparallel:*kernel* (lparallel:make-kernel ,threads))) + (unwind-protect (progn ,@expr) + (lparallel:end-kernel)))) + +(defmacro test-p/file-backend (array &optional (secret null-secret)) + `(let ((tmpdir (make-temporary-dir))) + (unwind-protect + (let* ((backend (make-instance 'file-backend :directory tmpdir)) + (array ,array) + (file (merge-pathnames (crypto:byte-array-to-hex-string (crypto:random-data 16)) tmpdir))) + (with-open-file (f file :direction :output :element-type 'serapeum:octet) + (write-sequence ,array f)) + (let ((list + (list + (read-capability-to-octets + (store-data array backend :secret ,secret)) + (read-capability-to-octets + (p/store-data array backend :secret ,secret)) + (read-capability-to-octets + (p/store-data file backend :secret ,secret))))) + (is (apply #'serapeum:equalp* list)))) + (uiop:delete-directory-tree tmpdir :validate t)))) + + +(test simple-parallel-tests + (with-lp-kernel 2 + (test-p/file-backend (make-octets 1023 :element 1) (make-octets 32 :element 1)) + (test-p/file-backend (make-octets 1025 :element 2) (make-octets 32 :element 2)) + (test-p/file-backend (make-octets 16383 :element 3) (make-octets 32 :element 3)) + (test-p/file-backend (make-octets 16384 :element 4) (make-octets 32 :element 4)) + (test-p/file-backend (make-octets 1 :element 5) (make-octets 32 :element 5)) + (test-p/file-backend (make-octets 16834 :element 5) (make-octets 32 :element 6)) + (test-p/file-backend (make-octets 96000 :element 5) (make-octets 32 :element 7)))) + +(test simple-parallel-tests2 + (with-lp-kernel 3 + (for-all ((buffer (gen-buffer :length (gen-integer :min 1 :max 130000)))) + (test-p/file-backend buffer)))) -- cgit v1.2.3