summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPiotr Szarmanski2023-08-25 19:09:50 +0200
committerPiotr Szarmanski2023-08-25 19:09:50 +0200
commitfed34b8df2b19ba996ef810f44d5666437c90fb7 (patch)
tree6d5aac31bbfb7eab265490c8e37856382ebdd2ba
parentda0e1aa69defa7cbc87209966c751918f523f1fb (diff)
Add the parallel encoder.
-rw-r--r--NEWS3
-rw-r--r--README16
-rw-r--r--eris.asd29
-rw-r--r--src/backend.lisp21
-rw-r--r--src/eris-parallel.lisp94
-rw-r--r--src/eris.lisp41
-rw-r--r--src/file-backend.lisp2
-rw-r--r--src/package.lisp4
-rw-r--r--tests/decode-tests.lisp7
-rw-r--r--tests/encode-parallel.lisp58
10 files changed, 238 insertions, 37 deletions
diff --git a/NEWS b/NEWS
index e64efb6..dacb30a 100644
--- a/NEWS
+++ b/NEWS
@@ -6,6 +6,9 @@
+ Removed the :hash-output, as it is rather niche (outputting identical blocks
practically does not happen outside of testing) and in the cases where it
might be useful, it's simpler to do within the output-function.
++ Added a parallel encoder, accessible through ~p/eris-encode~ as well as
+ ~p/{fetch,store}-data~.
++ ~lparallel~ added as a dependency rather than ~bordeaux-threads~.
* 0.2
*Backwards incompatible*:
+ The eris-encode function now takes an OUTPUT-FUNCTION
diff --git a/README b/README
index 53d4c6d..e3262f5 100644
--- a/README
+++ b/README
@@ -32,7 +32,16 @@ function can be used to decode an ERIS read-capability. It returns a stream of
the class ERIS-DECODE-STREAM: this class implements the Gray streams protocol.
See the docstrings of the specific functions for more details. However, you
-should only use these to write custom backends; otherwise, see below..
+should only use these to write custom backends; otherwise, see below.
+
+NOTES ON ERIS-DECODE-STREAM:
+
+The implementation of all the methods behaves similarly to regular ones, except
+for FILE-POSITION with two arguments. On SBCL and a few other CL
+implementations, the implementation of the ordinary FILE-POSITION never fails.
+However, per the spec, it could fail in some cases. As such, the (SETF
+STREAM-FILE-POSITION) method on ERIS-DECODE-STREAM does fail if the index is
+larger or equal to EOF.
@@ -64,6 +73,11 @@ decoding-backend
An instance of file-backend can be instanced using (make-instance 'file-backend
:directory "my/dir/with/eris/chunks/").
+There are also parallel equivalents, p/fetch-data and p/store-data. These can be
+used on backends marked with p/encoding-backend; for example the file-backend.
+In practice, any backend that allows concurrent writes should has
+p/encoding-backend as a superclass.
+
For further information, see the docstrings.
diff --git a/eris.asd b/eris.asd
index 7212699..405c9c4 100644
--- a/eris.asd
+++ b/eris.asd
@@ -2,21 +2,23 @@
:name "eris"
:author "mail@ykonai.net"
:license "LGPLv3 or later"
- :depends-on ("ironclad" "alexandria" "serapeum" "trivial-gray-streams" "function-cache" "bordeaux-threads" #+unix "osicat" #+unix "mmap")
+ :depends-on ("ironclad" "alexandria" "serapeum" "trivial-gray-streams"
+ "function-cache" "lparallel" #+unix "osicat" #+unix "mmap")
:components
((:module "src"
- :serial t
- :components ((:file "cache")
- (:file "package")
- (:file "common")
- (:file "conditions")
- (:file "base32")
- (:file "eris")
- (:file "eris-decode")
- (:file "backend")
- (:file "file-backend")
- (:file "hash-backend")
- #+nil (:file "parallel-decoder"))))
+ :serial t
+ :components ((:file "cache")
+ (:file "package")
+ (:file "common")
+ (:file "conditions")
+ (:file "base32")
+ (:file "eris")
+ (:file "eris-decode")
+ (:file "backend")
+ (:file "file-backend")
+ (:file "hash-backend")
+ (:file "eris-parallel")
+ #+nil (:file "parallel-decoder"))))
:in-order-to ((test-op (test-op :eris/test))))
(defsystem "eris/test"
@@ -33,4 +35,5 @@
(:file "rfc")
(:file "autogenerated-tests")
(:file "backend-tests")
+ (:file "encode-parallel")
#+nil (:file "parallel-tests")))))
diff --git a/src/backend.lisp b/src/backend.lisp
index d2b81b2..8571e66 100644
--- a/src/backend.lisp
+++ b/src/backend.lisp
@@ -22,6 +22,9 @@
(defclass decoding-backend ()
((fetch-function :type function)))
+(defclass p/encoding-backend ()
+ ())
+
(defgeneric fetch-data (read-capability backend &key &allow-other-keys)
(:documentation
"Using the BACKEND, return a stream that decodes the provided READ-CAPABILITY
@@ -58,3 +61,21 @@ size less than 16kib. It should be set either to 1024b or 32kib."))
1kib)
output-function
:secret secret)))
+
+(defgeneric p/store-data (input backend &key secret block-size threads)
+ (:documentation "Like store-data but parallel. lparallel:*kernel* has to be bound before
+calling."))
+
+(defmethod p/store-data (input (backend p/encoding-backend)
+ &key (secret null-secret) (block-size 32kib) &allow-other-keys)
+ (with-slots (output-function) backend
+ (p/eris-encode input
+ (if (> (etypecase input
+ (pathname (file-size input))
+ (file-stream (file-length input))
+ (vector (length input))
+ (t block-size))
+ 16384)
+ 32kib
+ 1kib)
+ output-function :secret secret)))
diff --git a/src/eris-parallel.lisp b/src/eris-parallel.lisp
new file mode 100644
index 0000000..7c2f702
--- /dev/null
+++ b/src/eris-parallel.lisp
@@ -0,0 +1,94 @@
+;; This file is part of eris-cl.
+;; Copyright (C) 2023 Piotr Szarmański
+
+;; eris-cl is free software: you can redistribute it and/or modify it under the
+;; terms of the GNU Lesser General Public License as published by the Free
+;; Software Foundation, either version 3 of the License, or (at your option) any
+;; later version.
+
+;; eris-cl is distributed in the hope that it will be useful, but WITHOUT ANY
+;; WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+;; A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+;; You should have received a copy of the GNU General Public License along with
+;; eris-cl. If not, see <https://www.gnu.org/licenses/>.
+
+(in-package :eris)
+
+
+(defun split-into-n (i parts size)
+ "Splits an integer I into PARTS parts. Returns a vector of length 1- PARTS with
+those parts.."
+ (declare (type integer i parts size))
+ (let* ((k (* size (truncate (/ (/ i size) parts))))
+ (vec (make-array
+ parts
+ :element-type 'integer
+ :initial-element k)))
+ (nlet rec ((remaining (- i (* k parts)))
+ (i 0))
+ (if (< remaining size)
+ (setf (aref vec (1- parts)) (+ remaining (aref vec (1- parts))))
+ (progn (setf (aref vec i) (+ size (aref vec i)))
+ (rec (- remaining size) (1+ i)))))
+ vec))
+
+(defgeneric p/eris-encode (input block-size output-function &key secret &allow-other-keys)
+ (:documentation "Encode INPUT in parallel using LPARALLEL. Requires lparallel:*kernel* to be bound."))
+
+;; eris-create-tree is not parallelized because it is insignificant on any large
+;; input; for 32kib blocks, there's about 512 level 0 blocks for any higher
+;; level block, almost three orders of magnitude.
+(defmethod p/eris-encode ((input vector) block-size output-function
+ &key (secret null-secret) &allow-other-keys)
+ (let ((v (split-into-n (length input) (lparallel:kernel-worker-count) block-size)))
+ (eris-create-tree
+ (apply #'concatenate
+ 'simple-vector
+ (map
+ 'list
+ #'lparallel:force
+ ;; Recursion to avoid demons
+ (nreverse
+ (nlet rec ((i 0) (k 0) (l nil))
+ (if (eql i (length v)) l
+ (rec
+ (1+ i)
+ (+ k (aref v i))
+ (cons (lparallel:future
+ (chunk-array
+ input block-size output-function secret
+ :start k :end (+ k (aref v i))
+ :pad (if (eq i (1- (length v))) t nil)))
+ l)))))))
+ block-size output-function)))
+
+(defmethod p/eris-encode ((input pathname) block-size output-function
+ &key (secret null-secret) &allow-other-keys)
+ (let ((v (split-into-n (file-size input) (lparallel:kernel-worker-count) block-size)))
+ (eris-create-tree
+ (apply #'concatenate
+ 'simple-vector
+ (map
+ 'list
+ #'lparallel:force
+ (nreverse
+ (nlet rec ((i 0) (k 0) (l nil))
+ (if (eql i (length v)) l
+ (rec
+ (1+ i)
+ (+ k (aref v i))
+ (cons (lparallel:future
+ (with-open-file (f input :element-type 'octet)
+ (file-position f k)
+ (chunk-stream
+ f block-size output-function (aref v i) secret
+ :pad (if (eq i (1- (length v))) t nil))))
+ l)))))))
+ block-size output-function)))
+
+;; No methods for streams. A stream implementation could be done by allocating n
+;; buffers, reading sequentially into each buffer and chunk-array'ing them until
+;; eof, but it may be quite unoptimal, depending on the stream, buffer size,
+;; etc.
+
diff --git a/src/eris.lisp b/src/eris.lisp
index 196bcce..b23caac 100644
--- a/src/eris.lisp
+++ b/src/eris.lisp
@@ -151,29 +151,34 @@ versioning bytes are not supported by eris-cl."
;; These CHUNK- functions are written in order to allow processing files in
;; parallel.
-(defun chunk-array (array block-size output-function secret &key pad)
+(defun chunk-array (array block-size output-function secret &key pad (start 0) (end (length array)))
"Split (SIMPLE-ARRAY (UNSIGNED-BYTE 8) that is a multiple of BLOCK-SIZE into
chunks, output them and collect references. Returns a vector of references.
+START and END behave as expected.
Pass PAD as T if the output should be padded."
(declare (type block-size block-size)
(type octet-vector array))
- (let ((blocks (if pad
- (/ (+ (length array) (- block-size (mod (length array) block-size))) block-size)
- (/ (length array) block-size))))
- (let ((block (make-octet-vector block-size))
- (rks (make-array blocks :element-type 'octet-vector :initial-element null-secret)))
- (loop for i from 0 below (1- blocks)
- do (progn
- (replace block array :start2 (* block-size i))
- (setf block (output-block rks i))))
- ;; handle last block
- (replace block array :start2 (* block-size (1- blocks)))
- (when pad
- (setf (aref block (mod (length array) block-size)) #x80)
- (fill block 0 :start (1+ (mod (length array) block-size))))
- (output-block rks (1- blocks))
- rks)))
+ (when (and (not pad) (zerop (- end start))) ;; need this because of the loop unrolling
+ (return-from chunk-array (make-array 0 :element-type 'octet-vector)))
+
+ (let ((length (- end start)))
+ (let ((blocks (if pad
+ (/ (+ length (- block-size (mod length block-size))) block-size)
+ (/ length block-size))))
+ (let ((block (make-octet-vector block-size))
+ (rks (make-array blocks :element-type 'octet-vector :initial-element null-secret)))
+ (loop for i from 0 below (1- blocks)
+ do (progn
+ (replace block array :start2 (+ start (* block-size i)))
+ (setf block (output-block rks i))))
+ ;; handle last block
+ (replace block array :start2 (+ start (* block-size (1- blocks))))
+ (when pad
+ (setf (aref block (mod length block-size)) #x80)
+ (fill block 0 :start (1+ (mod length block-size))))
+ (output-block rks (1- blocks))
+ rks))))
;; Implementation note: This is CHUNK-ARRAY but copypasted with (LENGTH ARRAY)
@@ -185,6 +190,8 @@ Pass PAD as T if the output should be padded."
read and should be a multiple of BLOCK-SIZE unless PAD is T."
(declare (type block-size block-size)
(type integer length))
+ (when (and (not pad) (zerop length)) ;; need this because of the loop unrolling
+ (return-from chunk-stream (make-array 0 :element-type 'octet-vector)))
(let ((blocks (if pad
(/ (+ length (- block-size (mod length block-size))) block-size)
(/ length block-size))))
diff --git a/src/file-backend.lisp b/src/file-backend.lisp
index 6174cf7..db90d82 100644
--- a/src/file-backend.lisp
+++ b/src/file-backend.lisp
@@ -15,7 +15,7 @@
(in-package :eris)
-(defclass file-backend (encoding-backend decoding-backend)
+(defclass file-backend (encoding-backend decoding-backend p/encoding-backend)
((directory :initarg :directory :type (or string pathname)
:documentation
"Directory containing ERIS data.")))
diff --git a/src/package.lisp b/src/package.lisp
index dd6aa0f..51ca5c7 100644
--- a/src/package.lisp
+++ b/src/package.lisp
@@ -19,7 +19,7 @@
(:export
#:eris-encode
#:eris-decode
- #:eris-decode-parallel
+ #:p/eris-encode
#:32kib
#:1kib
#:null-secret
@@ -51,8 +51,10 @@
#:store-data
#:fetch-data
+ #:p/store-data
#:encoding-backend
#:decoding-backend
+ #:p/encoding-backend
#:file-backend
#:hash-backend
#:output-function
diff --git a/tests/decode-tests.lisp b/tests/decode-tests.lisp
index 918cd82..162f32b 100644
--- a/tests/decode-tests.lisp
+++ b/tests/decode-tests.lisp
@@ -97,7 +97,7 @@
(assert-array-decode (make-octets 32767 :element 9) 1024)
(assert-array-decode (make-octets 32768 :element 10) 1024)
(assert-array-decode (make-octets 131072 :element 11) 1024)
- (for-all ((buffer (gen-buffer :length (gen-integer :min 0 :max 40000))))
+ (for-all ((buffer (gen-buffer :length (gen-integer :min 1 :max 40000))))
(assert-array-decode buffer 1024)))
(test simple-decoding-32kib
@@ -107,7 +107,7 @@
(assert-array-decode (make-octets 32769 :element 2) 32kib)
(assert-array-decode (make-octets 32768 :element 2) 32kib)
(assert-array-decode (make-octets 16777216 :element 2) 32kib)
- (for-all ((buffer (gen-buffer :length (gen-integer :min 0 :max 70000))))
+ (for-all ((buffer (gen-buffer :length (gen-integer :min 1 :max 70000))))
(assert-array-decode buffer 32kib)))
(test proper-return-values
@@ -199,7 +199,7 @@
(assert-length (make-array 2048 :element-type '(unsigned-byte 8) :initial-element 2) 1024)
(assert-length (make-array 16383 :element-type '(unsigned-byte 8) :initial-element 2) 1024)
(assert-length (make-array 16384 :element-type '(unsigned-byte 8) :initial-element 2) 1024)
- (for-all ((buffer (gen-buffer :length (gen-integer :min 0 :max 40000))))
+ (for-all ((buffer (gen-buffer :length (gen-integer :min 1 :max 40000))))
(assert-length buffer 1024)))
@@ -243,4 +243,3 @@
(assert-read-byte-error (make-octets 1024 :element 2) 32kib)
(assert-read-byte-error (make-octets 32767 :element 2) 32kib)
(assert-read-byte-error (make-octets 32768 :element 2) 32kib))
-
diff --git a/tests/encode-parallel.lisp b/tests/encode-parallel.lisp
new file mode 100644
index 0000000..4bc6c23
--- /dev/null
+++ b/tests/encode-parallel.lisp
@@ -0,0 +1,58 @@
+;; This file is part of eris-cl.
+;; Copyright (C) 2023 Piotr Szarmański
+
+;; eris-cl is free software: you can redistribute it and/or modify it under the
+;; terms of the GNU Lesser General Public License as published by the Free
+;; Software Foundation, either version 3 of the License, or (at your option) any
+;; later version.
+
+;; eris-cl is distributed in the hope that it will be useful, but WITHOUT ANY
+;; WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+;; A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+;; You should have received a copy of the GNU General Public License along with
+;; eris-cl. If not, see <https://www.gnu.org/licenses/>.
+
+(in-package :eris/test)
+
+(def-suite* parallel-encoding-tests :in eris-tests)
+
+(defmacro with-lp-kernel (threads &body expr )
+ `(let ((lparallel:*kernel* (lparallel:make-kernel ,threads)))
+ (unwind-protect (progn ,@expr)
+ (lparallel:end-kernel))))
+
+(defmacro test-p/file-backend (array &optional (secret null-secret))
+ `(let ((tmpdir (make-temporary-dir)))
+ (unwind-protect
+ (let* ((backend (make-instance 'file-backend :directory tmpdir))
+ (array ,array)
+ (file (merge-pathnames (crypto:byte-array-to-hex-string (crypto:random-data 16)) tmpdir)))
+ (with-open-file (f file :direction :output :element-type 'serapeum:octet)
+ (write-sequence ,array f))
+ (let ((list
+ (list
+ (read-capability-to-octets
+ (store-data array backend :secret ,secret))
+ (read-capability-to-octets
+ (p/store-data array backend :secret ,secret))
+ (read-capability-to-octets
+ (p/store-data file backend :secret ,secret)))))
+ (is (apply #'serapeum:equalp* list))))
+ (uiop:delete-directory-tree tmpdir :validate t))))
+
+
+(test simple-parallel-tests
+ (with-lp-kernel 2
+ (test-p/file-backend (make-octets 1023 :element 1) (make-octets 32 :element 1))
+ (test-p/file-backend (make-octets 1025 :element 2) (make-octets 32 :element 2))
+ (test-p/file-backend (make-octets 16383 :element 3) (make-octets 32 :element 3))
+ (test-p/file-backend (make-octets 16384 :element 4) (make-octets 32 :element 4))
+ (test-p/file-backend (make-octets 1 :element 5) (make-octets 32 :element 5))
+ (test-p/file-backend (make-octets 16834 :element 5) (make-octets 32 :element 6))
+ (test-p/file-backend (make-octets 96000 :element 5) (make-octets 32 :element 7))))
+
+(test simple-parallel-tests2
+ (with-lp-kernel 3
+ (for-all ((buffer (gen-buffer :length (gen-integer :min 1 :max 130000))))
+ (test-p/file-backend buffer))))