From 8db501749d47964b436bc63cafb80a17bfd42396 Mon Sep 17 00:00:00 2001
From: Piotr Szarmanski
Date: Sun, 25 Sep 2022 19:54:42 +0200
Subject: Add parallel decoder.

---
 eris.asd                  |   5 +-
 src/eris-decode.lisp      |  55 +++++++++++--------
 src/package.lisp          |   3 +-
 src/parallel-decoder.lisp | 136 ++++++++++++++++++++++++++++++++++++++++++++++
 tests/decode-tests.lisp   |  12 ++--
 tests/encode-tests.lisp   |   4 +-
 tests/package.lisp        |   2 +-
 tests/parallel-tests.lisp |  38 +++++++++++++
 8 files changed, 219 insertions(+), 36 deletions(-)
 create mode 100644 src/parallel-decoder.lisp
 create mode 100644 tests/parallel-tests.lisp

diff --git a/eris.asd b/eris.asd
index 7f80a49..a1bdbb9 100644
--- a/eris.asd
+++ b/eris.asd
@@ -2,7 +2,7 @@
   :name "eris"
   :author "mail@ykonai.net"
   :license "LGPLv3 or later"
-  :depends-on ("ironclad" "alexandria" "function-cache")
+  :depends-on ("ironclad" "alexandria" "trivial-gray-streams" "function-cache" "bordeaux-threads" #+unix "osicat" #+unix "mmap")
   :components
   ((:module "src"
             :serial t
@@ -12,7 +12,8 @@
                          (:file "conditions")
                          (:file "base32")
                          (:file "eris")
-                         (:file "eris-decode"))))
+                         (:file "eris-decode")
+                         #+unix (:file "parallel-decoder"))))
   :in-order-to ((test-op (test-op :eris/test))))
 
 (defsystem "eris/test"
diff --git a/src/eris-decode.lisp b/src/eris-decode.lisp
index fe2dcbe..351da72 100644
--- a/src/eris-decode.lisp
+++ b/src/eris-decode.lisp
@@ -26,6 +26,10 @@ fetched from a trusted party.")
        (unless (equalp ,hash hash)
          (error 'hash-mismatch :reference ,hash :hash hash )))))
 
+(defmacro execute-fetch-function (fetch-function &rest args)
+  `(restart-case (funcall ,fetch-function ,@args)
+     (use-value (value) value)))
+
 (defun key-reference-null? (kr)
   (and (equalp (reference kr) null-secret)
        (equalp (key kr) null-secret)))
@@ -222,7 +226,7 @@ cache."
                                       :capacity cache-capacity
                                       :table (make-hash-table :size (1+ cache-capacity) :test #'equalp))
                           (reference key &optional nonce)
-                        (let* ((block (funcall fetch-function reference)))
+                        (let* ((block (execute-fetch-function fetch-function reference)))
                           (unless block (error 'missing-block :reference reference))
                           (hash-check block reference)
                           (decrypt-block block key nonce))))
@@ -262,34 +266,17 @@ cache."
                           :eof (find-eof root get-block block-size level)
                           :nonce-array (initialize-nonce-array level)))))))
 
-(defmethod stream-file-position ((stream eris-decode-stream) &optional (set-position nil))
-  "Provides the file position of the stream. If the optional second argument is
-set, try to move the stream to that position. It may signal an EOF condition if
-the new position is beyond the end of file.."
-  ;; NOTE: this should accept a "file-spec", which I believe is either an int, a
-  ;; :start or an :end. This only accepts a number.
-  (with-slots (position block-size buffer eof) stream
-    (when set-position 
-      (let ((buffer-pos (mod set-position block-size)))
-        (if (< set-position eof)
-            (cond
-              ;; If the pos is within the buffer (and initialized): 
-              ((and (<= 0 (- set-position (- position (pos buffer))) (1- block-size))
-                    (not (minusp (pos buffer))))
-               (setf (pos buffer) buffer-pos
-                     position set-position))
-              
-              (t (reupdate-block stream set-position)
-                 (setf (pos buffer) buffer-pos)))
-            (error 'eof :eof eof :position position))))
-    position))
+(defmethod stream-file-position ((stream eris-decode-stream))
+  "Provides the file position of the stream. This method is setf-able in order to
+change the position."
+  (pos stream))
 
-(defmethod stream-read-sequence ((stream eris-decode-stream) seq &optional (start 0) (end (length seq)))
+(defmethod stream-read-sequence ((stream eris-decode-stream) seq start end &key)
   (when (minusp (pos (buffer stream)))
     ;; initializes the buffer
     (reupdate-block stream (pos stream)))
   (with-slots (buffer position) stream
-    (read-to-seq seq buffer :start start :end (if end end (length seq)) :stream stream)))
+    (read-to-seq seq buffer :start start :end end :stream stream)))
 
 (defmethod stream-read-byte ((stream eris-decode-stream))
   (when (minusp (pos (buffer stream)))
@@ -313,3 +300,23 @@ the new position is beyond the end of file.."
 (defun eris-file-length (stream)
   "This is the equivalent of \"file-length\" for eris-decode-stream."
   (eof stream))
+
+(defmethod (setf stream-file-position) (set-position (stream eris-decode-stream))
+  (with-slots (position block-size buffer eof) stream
+    (when set-position
+      (case set-position
+        (:end (setf set-position eof))
+        (:start (setf set-position 0)))
+      (let ((buffer-pos (mod set-position block-size)))
+        (if (< set-position eof)
+            (cond
+              ;; If the pos is within the buffer (and initialized): 
+              ((and (<= 0 (- set-position (- position (pos buffer))) (1- block-size))
+                    (not (minusp (pos buffer))))
+               (setf (pos buffer) buffer-pos
+                     position set-position))
+              
+              (t (reupdate-block stream set-position)
+                 (setf (pos buffer) buffer-pos)))
+            (error 'eof :eof eof :position position))))
+    set-position))
diff --git a/src/package.lisp b/src/package.lisp
index b97799c..3041219 100644
--- a/src/package.lisp
+++ b/src/package.lisp
@@ -15,10 +15,11 @@
 
 
 (defpackage eris
-  (:use common-lisp sb-gray alexandria trivia function-cache)
+  (:use common-lisp trivial-gray-streams alexandria trivia function-cache)
   (:export
    #:eris-encode
    #:eris-decode
+   #:eris-decode-parallel 
    #:32kib
    #:1kib
    #:null-secret
diff --git a/src/parallel-decoder.lisp b/src/parallel-decoder.lisp
new file mode 100644
index 0000000..9af1ceb
--- /dev/null
+++ b/src/parallel-decoder.lisp
@@ -0,0 +1,136 @@
+;; This file is part of eris-cl.
+;; Copyright (C) 2022 Piotr Szarmański
+
+;; eris-cl is free software: you can redistribute it and/or modify it under the
+;; terms of the GNU Lesser General Public License as published by the Free
+;; Software Foundation, either version 3 of the License, or (at your option) any
+;; later versqion.
+
+;; eris-cl is distributed in the hope that it will be useful, but WITHOUT ANY
+;; WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+;; A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+;; You should have received a copy of the GNU General Public License along with
+;; eris-cl. If not, see <https://www.gnu.org/licenses/>.
+
+(in-package :eris)
+
+(defun split-list-equally (list parts)
+  (let* ((len (length list))
+         (mod (mod len parts))
+         (base (/ (- len mod) parts)))
+    (if (< len parts)
+        (map 'list #'list list)
+        (loop with pos = 0
+              for i from (1- parts) downto 0
+              collecting (subseq
+                          list
+                          pos
+                          (if (<= mod i)
+                              (setf pos (+ pos base))
+                              (setf pos (+ pos base 1))))))))
+
+(defun mem-write-vector (vector ptr &optional (offset 0) (count (length vector)))
+  (declare (type (simple-array (unsigned-byte 8)) vector)
+           (type fixnum offset count))
+  (declare (optimize ;; (speed 3) (safety 0) (space 0)
+                     (debug 3)))
+  (loop for i below count
+        for off from offset
+        do (setf (cffi:mem-ref ptr :unsigned-char off) (aref vector i))))
+
+(defclass reference-pair+ (reference-pair)
+  ((index :initarg :index :accessor index :type (integer 0 32768))))
+
+(defun map-over-key-references (function block)
+  (loop for i from 0 to (1- (/ (length block) 64))
+        for key-ref = (octets-to-reference-pair (subseq-shared block (* 64 i)))
+        until (key-reference-null? key-ref)
+        do (funcall function key-ref i)))
+
+(defun decode-blocks (reference-pair-list level block-capacity fetch-function output-file cache-capacity last-block)
+  (lambda ()
+    (mmap:with-mmap (addr fd size output-file :open :write :protection :write :mmap :shared)
+      (let ((get-block (cached-lambda (:cache-class 'lru-cache 
+                                       :capacity cache-capacity
+                                       :table (make-hash-table :size (1+ cache-capacity) :test #'equalp))
+                           (reference key &optional nonce)
+                         (let* ((block (execute-fetch-function fetch-function reference)))
+                           (unless block (error 'missing-block :reference reference))
+                           (hash-check block reference)
+                           (decrypt-block block key nonce))))
+            (nonce-array (initialize-nonce-array level)))
+        (labels ((descend (level reference-pair block-id)
+                   (let ((block (funcall get-block (reference reference-pair) (key reference-pair) (aref nonce-array level))))
+                     (if (zerop level)
+                         (if (= last-block block-id)
+                             (mem-write-vector block addr (* 64 block-capacity block-id) (unpad-block block))
+                             (mem-write-vector block addr (* 64 block-capacity block-id)))
+                         ;; (bordeaux-threads:with-lock-held (lock)
+                         ;;   (file-position stream (* 64 block-capacity block-id))
+                         ;;   (write-sequence block stream))
+                         (map-over-key-references
+                          (lambda (key-ref i)
+                            (descend (1- level) key-ref (+ i (* block-capacity block-id))))
+                          block)))))
+          (mapc (lambda (key-ref)
+                  (descend level key-ref (index key-ref)))
+                reference-pair-list))))))
+
+(defun eris-decode-parallel (read-capability fetch-function output-file
+                             &key (cache-capacity 4096) (threads 4) (initial-bindings bordeaux-threads:*default-special-bindings*))
+  "Decode an ERIS READ-CAPABILITY in parallel using THREADS threads into a file
+designated by OUTPUT-FILE.
+
+Fetch-function must be a function with one argument, the reference octet, which
+returns a (simple-array (unsigned-byte 8)) containing the block. The block will
+be destructively modified, so you MUST provide a fresh array every time. In
+addition, the function MUST be thread-safe.
+
+CACHE-CAPACITY indicates the total amount of blocks stored for all threads. Each
+thread has its own cache."
+  (declare (type read-capability read-capability)
+           (type function fetch-function)
+           (type integer cache-capacity))
+  (with-slots (level block-size root-reference-pair) read-capability
+    (let ((root (decrypt-block (execute-fetch-function fetch-function (reference root-reference-pair))
+                               (key root-reference-pair)
+                               (make-nonce level))))
+      (when (> level 0) (hash-check root (key root-reference-pair)))
+      (case level
+        (0 (with-open-file (file output-file :direction :output :element-type '(unsigned-byte 8))
+             (write-sequence root file :end (unpad-block root))))
+        (t (let* ((initial-list
+                    (loop for i from 0 to (/ block-size 64)
+                          for key-ref = (octets-to-reference-pair (subseq-shared root (* 64 i)))
+                          until (key-reference-null? key-ref)
+                          collect key-ref))
+                  (list (split-list-equally
+                         (loop for i from 0 to (1- (length initial-list))
+                               collecting (change-class (elt initial-list i) 'reference-pair+ :index i))
+                         threads))
+                  ;; (lock (bordeaux-threads:make-lock "stream-lock"))
+                  (eof (find-eof root
+                                 (lambda (reference key nonce)
+                                   (let* ((block (execute-fetch-function fetch-function reference)))
+                                     (unless block (error 'missing-block :reference reference))
+                                     (hash-check block reference)
+                                     (decrypt-block block key nonce)))
+                                 block-size
+                                 level)))
+             (let ((fd (osicat-posix:creat output-file #o666)))
+               (osicat-posix:posix-fallocate fd 0 eof)
+               (osicat-posix:close fd))
+             (map 'nil #'bordeaux-threads:join-thread
+                  (map 'list (lambda (reference-pairs)
+                               (bordeaux-threads:make-thread
+                                (decode-blocks reference-pairs
+                                               (1- level)
+                                               (/ block-size 64)
+                                               fetch-function
+                                               output-file
+                                               (truncate (/ cache-capacity threads))
+                                               (truncate (/ eof block-size)))
+                                :initial-bindings initial-bindings))
+                       list))))))))
+
diff --git a/tests/decode-tests.lisp b/tests/decode-tests.lisp
index af3edb0..ab9b56a 100644
--- a/tests/decode-tests.lisp
+++ b/tests/decode-tests.lisp
@@ -33,9 +33,9 @@
           (read-capability (eris-encode array ,block-size #'hashtable-encode))
           (decoded-array (make-array (length array) :element-type '(unsigned-byte 8)))
           (stream (eris-decode read-capability #'hashtable-decode)))
-     (stream-read-sequence stream decoded-array)
+     (stream-read-sequence stream decoded-array 0 (length decoded-array))
      (is (equalp decoded-array array))
-     (file-position stream 0)
+     (setf (stream-file-position stream) 0)
      (is (equalp array
                  (alexandria:read-stream-content-into-byte-vector stream)))))
 
@@ -124,8 +124,8 @@
           (read-capability (eris-encode array ,block-size #'hashtable-encode))
           (buf (make-array 24 :element-type '(unsigned-byte 8)))
           (stream (eris-decode read-capability #'hashtable-decode)))
-     (stream-file-position stream ,pos)
-     (stream-read-sequence stream buf)
+     (setf (stream-file-position stream) ,pos)
+     (stream-read-sequence stream buf 0 (length buf))
      ;; (print (pos (buffer stream)))
      ;; (print (+ 24 ,buffer-pos))
      ;; (print (pos stream))
@@ -146,7 +146,7 @@
           (read-capability (eris-encode array ,block-size #'hashtable-encode))
           (stream (eris-decode read-capability #'hashtable-decode)))
      (signals ,condition
-      (stream-file-position stream ,pos))))
+      (setf (stream-file-position stream) ,pos))))
 
 (test random-access-eof-1kib
   (assert-random-access-condition (make-octets 512 :element 1) 1024 512 eof)
@@ -189,7 +189,7 @@
           (read-capability (eris-encode array ,block-size #'hashtable-encode))
           (decoded-array (make-array (length array) :element-type '(unsigned-byte 8)))
           (stream (eris-decode read-capability #'hashtable-decode)))
-     (stream-read-sequence stream decoded-array)
+     (stream-read-sequence stream decoded-array 0 (length decoded-array))
      (is (equalp (length array)
                  (eof stream)))))
 
diff --git a/tests/encode-tests.lisp b/tests/encode-tests.lisp
index c6b50d4..6c04444 100644
--- a/tests/encode-tests.lisp
+++ b/tests/encode-tests.lisp
@@ -37,11 +37,11 @@
              "urn:eris:BIAD77QDJMFAKZYH2DXBUZYAP3MXZ3DJZVFYQ5DFWC6T65WSFCU5S2IT4YZGJ7AC4SYQMP2DM2ANS2ZTCP3DJJIRV733CRAAHOSWIYZM3M"))
 
 ;; simple gray stream class for this particular construction.
-(defclass null-stream (fundamental-binary-stream)
+(defclass null-stream (fundamental-binary-input-stream)
   ((counter :initform 0 :accessor counter)
    (max-counter :initarg :max-counter)))
 
-(defmethod stream-read-sequence ((stream null-stream) seq &optional start end)
+(defmethod stream-read-sequence ((stream null-stream) seq start end &key)
   (with-slots (counter max-counter) stream
     (if (eql counter max-counter)
         0
diff --git a/tests/package.lisp b/tests/package.lisp
index 71d6c82..98fdd81 100644
--- a/tests/package.lisp
+++ b/tests/package.lisp
@@ -1,5 +1,5 @@
 (defpackage eris/test
-  (:use common-lisp eris fiveam sb-gray ironclad))
+  (:use common-lisp eris fiveam trivial-gray-streams ironclad))
 
 (in-package :eris/test)
 
diff --git a/tests/parallel-tests.lisp b/tests/parallel-tests.lisp
new file mode 100644
index 0000000..c139b31
--- /dev/null
+++ b/tests/parallel-tests.lisp
@@ -0,0 +1,38 @@
+;; This file is part of eris-cl.
+;; Copyright (C) 2022 Piotr Szarmański
+
+;; eris-cl is free software: you can redistribute it and/or modify it under the
+;; terms of the GNU Lesser General Public License as published by the Free
+;; Software Foundation, either version 3 of the License, or (at your option) any
+;; later versqion.
+
+;; eris-cl is distributed in the hope that it will be useful, but WITHOUT ANY
+;; WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+;; A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+;; You should have received a copy of the GNU General Public License along with
+;; eris-cl. If not, see <https://www.gnu.org/licenses/>.
+
+(in-package :eris/test)
+
+(def-suite* parallel-tests :in eris-tests)
+
+(defmacro assert-parallel-decode (array block-size)
+  `(uiop:with-temporary-file (:stream output-file :pathname pathname :direction :io)
+     (let* ((*table* (make-hash-table :test #'equalp))
+            (array ,array)
+            (read-capability (eris-encode array ,block-size #'hashtable-encode)))
+       (eris-decode-parallel read-capability #'hashtable-decode pathname
+                                  :initial-bindings (acons '*table* *table* bordeaux-threads:*default-special-bindings*)
+                                  :threads 4)
+       (is (equalp array
+                   (alexandria:read-stream-content-into-byte-vector output-file))))))
+
+(test simple-parallel-decode
+  (assert-parallel-decode (make-octets 4096 :element 101) 1024)
+  (assert-parallel-decode (make-octets 4095 :element 102) 1024)
+  (assert-parallel-decode (make-octets 18000 :element 103) 1024)
+  (assert-parallel-decode (make-octets 128000 :element 104) 32768)
+  (assert-parallel-decode (make-octets 131071 :element 104) 32768)
+  (assert-parallel-decode (make-octets 131072 :element 104) 32768)
+  (assert-parallel-decode (make-octets 131073 :element 104) 32768))
-- 
cgit v1.2.3