From 8db501749d47964b436bc63cafb80a17bfd42396 Mon Sep 17 00:00:00 2001
From: Piotr Szarmanski
Date: Sun, 25 Sep 2022 19:54:42 +0200
Subject: Add parallel decoder.
---
eris.asd | 5 +-
src/eris-decode.lisp | 55 +++++++++++--------
src/package.lisp | 3 +-
src/parallel-decoder.lisp | 136 ++++++++++++++++++++++++++++++++++++++++++++++
tests/decode-tests.lisp | 12 ++--
tests/encode-tests.lisp | 4 +-
tests/package.lisp | 2 +-
tests/parallel-tests.lisp | 38 +++++++++++++
8 files changed, 219 insertions(+), 36 deletions(-)
create mode 100644 src/parallel-decoder.lisp
create mode 100644 tests/parallel-tests.lisp
diff --git a/eris.asd b/eris.asd
index 7f80a49..a1bdbb9 100644
--- a/eris.asd
+++ b/eris.asd
@@ -2,7 +2,7 @@
:name "eris"
:author "mail@ykonai.net"
:license "LGPLv3 or later"
- :depends-on ("ironclad" "alexandria" "function-cache")
+ :depends-on ("ironclad" "alexandria" "trivial-gray-streams" "function-cache" "bordeaux-threads" #+unix "osicat" #+unix "mmap")
:components
((:module "src"
:serial t
@@ -12,7 +12,8 @@
(:file "conditions")
(:file "base32")
(:file "eris")
- (:file "eris-decode"))))
+ (:file "eris-decode")
+ #+unix (:file "parallel-decoder"))))
:in-order-to ((test-op (test-op :eris/test))))
(defsystem "eris/test"
diff --git a/src/eris-decode.lisp b/src/eris-decode.lisp
index fe2dcbe..351da72 100644
--- a/src/eris-decode.lisp
+++ b/src/eris-decode.lisp
@@ -26,6 +26,10 @@ fetched from a trusted party.")
(unless (equalp ,hash hash)
(error 'hash-mismatch :reference ,hash :hash hash )))))
+(defmacro execute-fetch-function (fetch-function &rest args)
+ `(restart-case (funcall ,fetch-function ,@args)
+ (use-value (value) value)))
+
(defun key-reference-null? (kr)
(and (equalp (reference kr) null-secret)
(equalp (key kr) null-secret)))
@@ -222,7 +226,7 @@ cache."
:capacity cache-capacity
:table (make-hash-table :size (1+ cache-capacity) :test #'equalp))
(reference key &optional nonce)
- (let* ((block (funcall fetch-function reference)))
+ (let* ((block (execute-fetch-function fetch-function reference)))
(unless block (error 'missing-block :reference reference))
(hash-check block reference)
(decrypt-block block key nonce))))
@@ -262,34 +266,17 @@ cache."
:eof (find-eof root get-block block-size level)
:nonce-array (initialize-nonce-array level)))))))
-(defmethod stream-file-position ((stream eris-decode-stream) &optional (set-position nil))
- "Provides the file position of the stream. If the optional second argument is
-set, try to move the stream to that position. It may signal an EOF condition if
-the new position is beyond the end of file.."
- ;; NOTE: this should accept a "file-spec", which I believe is either an int, a
- ;; :start or an :end. This only accepts a number.
- (with-slots (position block-size buffer eof) stream
- (when set-position
- (let ((buffer-pos (mod set-position block-size)))
- (if (< set-position eof)
- (cond
- ;; If the pos is within the buffer (and initialized):
- ((and (<= 0 (- set-position (- position (pos buffer))) (1- block-size))
- (not (minusp (pos buffer))))
- (setf (pos buffer) buffer-pos
- position set-position))
-
- (t (reupdate-block stream set-position)
- (setf (pos buffer) buffer-pos)))
- (error 'eof :eof eof :position position))))
- position))
+(defmethod stream-file-position ((stream eris-decode-stream))
+ "Provides the file position of the stream. This method is setf-able in order to
+change the position."
+ (pos stream))
-(defmethod stream-read-sequence ((stream eris-decode-stream) seq &optional (start 0) (end (length seq)))
+(defmethod stream-read-sequence ((stream eris-decode-stream) seq start end &key)
(when (minusp (pos (buffer stream)))
;; initializes the buffer
(reupdate-block stream (pos stream)))
(with-slots (buffer position) stream
- (read-to-seq seq buffer :start start :end (if end end (length seq)) :stream stream)))
+ (read-to-seq seq buffer :start start :end end :stream stream)))
(defmethod stream-read-byte ((stream eris-decode-stream))
(when (minusp (pos (buffer stream)))
@@ -313,3 +300,23 @@ the new position is beyond the end of file.."
(defun eris-file-length (stream)
"This is the equivalent of \"file-length\" for eris-decode-stream."
(eof stream))
+
+(defmethod (setf stream-file-position) (set-position (stream eris-decode-stream))
+ (with-slots (position block-size buffer eof) stream
+ (when set-position
+ (case set-position
+ (:end (setf set-position eof))
+ (:start (setf set-position 0)))
+ (let ((buffer-pos (mod set-position block-size)))
+ (if (< set-position eof)
+ (cond
+ ;; If the pos is within the buffer (and initialized):
+ ((and (<= 0 (- set-position (- position (pos buffer))) (1- block-size))
+ (not (minusp (pos buffer))))
+ (setf (pos buffer) buffer-pos
+ position set-position))
+
+ (t (reupdate-block stream set-position)
+ (setf (pos buffer) buffer-pos)))
+ (error 'eof :eof eof :position position))))
+ set-position))
diff --git a/src/package.lisp b/src/package.lisp
index b97799c..3041219 100644
--- a/src/package.lisp
+++ b/src/package.lisp
@@ -15,10 +15,11 @@
(defpackage eris
- (:use common-lisp sb-gray alexandria trivia function-cache)
+ (:use common-lisp trivial-gray-streams alexandria trivia function-cache)
(:export
#:eris-encode
#:eris-decode
+ #:eris-decode-parallel
#:32kib
#:1kib
#:null-secret
diff --git a/src/parallel-decoder.lisp b/src/parallel-decoder.lisp
new file mode 100644
index 0000000..9af1ceb
--- /dev/null
+++ b/src/parallel-decoder.lisp
@@ -0,0 +1,136 @@
+;; This file is part of eris-cl.
+;; Copyright (C) 2022 Piotr Szarmański
+
+;; eris-cl is free software: you can redistribute it and/or modify it under the
+;; terms of the GNU Lesser General Public License as published by the Free
+;; Software Foundation, either version 3 of the License, or (at your option) any
+;; later versqion.
+
+;; eris-cl is distributed in the hope that it will be useful, but WITHOUT ANY
+;; WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+;; A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+;; You should have received a copy of the GNU General Public License along with
+;; eris-cl. If not, see .
+
+(in-package :eris)
+
+(defun split-list-equally (list parts)
+ (let* ((len (length list))
+ (mod (mod len parts))
+ (base (/ (- len mod) parts)))
+ (if (< len parts)
+ (map 'list #'list list)
+ (loop with pos = 0
+ for i from (1- parts) downto 0
+ collecting (subseq
+ list
+ pos
+ (if (<= mod i)
+ (setf pos (+ pos base))
+ (setf pos (+ pos base 1))))))))
+
+(defun mem-write-vector (vector ptr &optional (offset 0) (count (length vector)))
+ (declare (type (simple-array (unsigned-byte 8)) vector)
+ (type fixnum offset count))
+ (declare (optimize ;; (speed 3) (safety 0) (space 0)
+ (debug 3)))
+ (loop for i below count
+ for off from offset
+ do (setf (cffi:mem-ref ptr :unsigned-char off) (aref vector i))))
+
+(defclass reference-pair+ (reference-pair)
+ ((index :initarg :index :accessor index :type (integer 0 32768))))
+
+(defun map-over-key-references (function block)
+ (loop for i from 0 to (1- (/ (length block) 64))
+ for key-ref = (octets-to-reference-pair (subseq-shared block (* 64 i)))
+ until (key-reference-null? key-ref)
+ do (funcall function key-ref i)))
+
+(defun decode-blocks (reference-pair-list level block-capacity fetch-function output-file cache-capacity last-block)
+ (lambda ()
+ (mmap:with-mmap (addr fd size output-file :open :write :protection :write :mmap :shared)
+ (let ((get-block (cached-lambda (:cache-class 'lru-cache
+ :capacity cache-capacity
+ :table (make-hash-table :size (1+ cache-capacity) :test #'equalp))
+ (reference key &optional nonce)
+ (let* ((block (execute-fetch-function fetch-function reference)))
+ (unless block (error 'missing-block :reference reference))
+ (hash-check block reference)
+ (decrypt-block block key nonce))))
+ (nonce-array (initialize-nonce-array level)))
+ (labels ((descend (level reference-pair block-id)
+ (let ((block (funcall get-block (reference reference-pair) (key reference-pair) (aref nonce-array level))))
+ (if (zerop level)
+ (if (= last-block block-id)
+ (mem-write-vector block addr (* 64 block-capacity block-id) (unpad-block block))
+ (mem-write-vector block addr (* 64 block-capacity block-id)))
+ ;; (bordeaux-threads:with-lock-held (lock)
+ ;; (file-position stream (* 64 block-capacity block-id))
+ ;; (write-sequence block stream))
+ (map-over-key-references
+ (lambda (key-ref i)
+ (descend (1- level) key-ref (+ i (* block-capacity block-id))))
+ block)))))
+ (mapc (lambda (key-ref)
+ (descend level key-ref (index key-ref)))
+ reference-pair-list))))))
+
+(defun eris-decode-parallel (read-capability fetch-function output-file
+ &key (cache-capacity 4096) (threads 4) (initial-bindings bordeaux-threads:*default-special-bindings*))
+ "Decode an ERIS READ-CAPABILITY in parallel using THREADS threads into a file
+designated by OUTPUT-FILE.
+
+Fetch-function must be a function with one argument, the reference octet, which
+returns a (simple-array (unsigned-byte 8)) containing the block. The block will
+be destructively modified, so you MUST provide a fresh array every time. In
+addition, the function MUST be thread-safe.
+
+CACHE-CAPACITY indicates the total amount of blocks stored for all threads. Each
+thread has its own cache."
+ (declare (type read-capability read-capability)
+ (type function fetch-function)
+ (type integer cache-capacity))
+ (with-slots (level block-size root-reference-pair) read-capability
+ (let ((root (decrypt-block (execute-fetch-function fetch-function (reference root-reference-pair))
+ (key root-reference-pair)
+ (make-nonce level))))
+ (when (> level 0) (hash-check root (key root-reference-pair)))
+ (case level
+ (0 (with-open-file (file output-file :direction :output :element-type '(unsigned-byte 8))
+ (write-sequence root file :end (unpad-block root))))
+ (t (let* ((initial-list
+ (loop for i from 0 to (/ block-size 64)
+ for key-ref = (octets-to-reference-pair (subseq-shared root (* 64 i)))
+ until (key-reference-null? key-ref)
+ collect key-ref))
+ (list (split-list-equally
+ (loop for i from 0 to (1- (length initial-list))
+ collecting (change-class (elt initial-list i) 'reference-pair+ :index i))
+ threads))
+ ;; (lock (bordeaux-threads:make-lock "stream-lock"))
+ (eof (find-eof root
+ (lambda (reference key nonce)
+ (let* ((block (execute-fetch-function fetch-function reference)))
+ (unless block (error 'missing-block :reference reference))
+ (hash-check block reference)
+ (decrypt-block block key nonce)))
+ block-size
+ level)))
+ (let ((fd (osicat-posix:creat output-file #o666)))
+ (osicat-posix:posix-fallocate fd 0 eof)
+ (osicat-posix:close fd))
+ (map 'nil #'bordeaux-threads:join-thread
+ (map 'list (lambda (reference-pairs)
+ (bordeaux-threads:make-thread
+ (decode-blocks reference-pairs
+ (1- level)
+ (/ block-size 64)
+ fetch-function
+ output-file
+ (truncate (/ cache-capacity threads))
+ (truncate (/ eof block-size)))
+ :initial-bindings initial-bindings))
+ list))))))))
+
diff --git a/tests/decode-tests.lisp b/tests/decode-tests.lisp
index af3edb0..ab9b56a 100644
--- a/tests/decode-tests.lisp
+++ b/tests/decode-tests.lisp
@@ -33,9 +33,9 @@
(read-capability (eris-encode array ,block-size #'hashtable-encode))
(decoded-array (make-array (length array) :element-type '(unsigned-byte 8)))
(stream (eris-decode read-capability #'hashtable-decode)))
- (stream-read-sequence stream decoded-array)
+ (stream-read-sequence stream decoded-array 0 (length decoded-array))
(is (equalp decoded-array array))
- (file-position stream 0)
+ (setf (stream-file-position stream) 0)
(is (equalp array
(alexandria:read-stream-content-into-byte-vector stream)))))
@@ -124,8 +124,8 @@
(read-capability (eris-encode array ,block-size #'hashtable-encode))
(buf (make-array 24 :element-type '(unsigned-byte 8)))
(stream (eris-decode read-capability #'hashtable-decode)))
- (stream-file-position stream ,pos)
- (stream-read-sequence stream buf)
+ (setf (stream-file-position stream) ,pos)
+ (stream-read-sequence stream buf 0 (length buf))
;; (print (pos (buffer stream)))
;; (print (+ 24 ,buffer-pos))
;; (print (pos stream))
@@ -146,7 +146,7 @@
(read-capability (eris-encode array ,block-size #'hashtable-encode))
(stream (eris-decode read-capability #'hashtable-decode)))
(signals ,condition
- (stream-file-position stream ,pos))))
+ (setf (stream-file-position stream) ,pos))))
(test random-access-eof-1kib
(assert-random-access-condition (make-octets 512 :element 1) 1024 512 eof)
@@ -189,7 +189,7 @@
(read-capability (eris-encode array ,block-size #'hashtable-encode))
(decoded-array (make-array (length array) :element-type '(unsigned-byte 8)))
(stream (eris-decode read-capability #'hashtable-decode)))
- (stream-read-sequence stream decoded-array)
+ (stream-read-sequence stream decoded-array 0 (length decoded-array))
(is (equalp (length array)
(eof stream)))))
diff --git a/tests/encode-tests.lisp b/tests/encode-tests.lisp
index c6b50d4..6c04444 100644
--- a/tests/encode-tests.lisp
+++ b/tests/encode-tests.lisp
@@ -37,11 +37,11 @@
"urn:eris:BIAD77QDJMFAKZYH2DXBUZYAP3MXZ3DJZVFYQ5DFWC6T65WSFCU5S2IT4YZGJ7AC4SYQMP2DM2ANS2ZTCP3DJJIRV733CRAAHOSWIYZM3M"))
;; simple gray stream class for this particular construction.
-(defclass null-stream (fundamental-binary-stream)
+(defclass null-stream (fundamental-binary-input-stream)
((counter :initform 0 :accessor counter)
(max-counter :initarg :max-counter)))
-(defmethod stream-read-sequence ((stream null-stream) seq &optional start end)
+(defmethod stream-read-sequence ((stream null-stream) seq start end &key)
(with-slots (counter max-counter) stream
(if (eql counter max-counter)
0
diff --git a/tests/package.lisp b/tests/package.lisp
index 71d6c82..98fdd81 100644
--- a/tests/package.lisp
+++ b/tests/package.lisp
@@ -1,5 +1,5 @@
(defpackage eris/test
- (:use common-lisp eris fiveam sb-gray ironclad))
+ (:use common-lisp eris fiveam trivial-gray-streams ironclad))
(in-package :eris/test)
diff --git a/tests/parallel-tests.lisp b/tests/parallel-tests.lisp
new file mode 100644
index 0000000..c139b31
--- /dev/null
+++ b/tests/parallel-tests.lisp
@@ -0,0 +1,38 @@
+;; This file is part of eris-cl.
+;; Copyright (C) 2022 Piotr Szarmański
+
+;; eris-cl is free software: you can redistribute it and/or modify it under the
+;; terms of the GNU Lesser General Public License as published by the Free
+;; Software Foundation, either version 3 of the License, or (at your option) any
+;; later versqion.
+
+;; eris-cl is distributed in the hope that it will be useful, but WITHOUT ANY
+;; WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+;; A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+;; You should have received a copy of the GNU General Public License along with
+;; eris-cl. If not, see .
+
+(in-package :eris/test)
+
+(def-suite* parallel-tests :in eris-tests)
+
+(defmacro assert-parallel-decode (array block-size)
+ `(uiop:with-temporary-file (:stream output-file :pathname pathname :direction :io)
+ (let* ((*table* (make-hash-table :test #'equalp))
+ (array ,array)
+ (read-capability (eris-encode array ,block-size #'hashtable-encode)))
+ (eris-decode-parallel read-capability #'hashtable-decode pathname
+ :initial-bindings (acons '*table* *table* bordeaux-threads:*default-special-bindings*)
+ :threads 4)
+ (is (equalp array
+ (alexandria:read-stream-content-into-byte-vector output-file))))))
+
+(test simple-parallel-decode
+ (assert-parallel-decode (make-octets 4096 :element 101) 1024)
+ (assert-parallel-decode (make-octets 4095 :element 102) 1024)
+ (assert-parallel-decode (make-octets 18000 :element 103) 1024)
+ (assert-parallel-decode (make-octets 128000 :element 104) 32768)
+ (assert-parallel-decode (make-octets 131071 :element 104) 32768)
+ (assert-parallel-decode (make-octets 131072 :element 104) 32768)
+ (assert-parallel-decode (make-octets 131073 :element 104) 32768))
--
cgit v1.2.3