summaryrefslogtreecommitdiff
path: root/src/eris-parallel.lisp
blob: 7c2f70238cfadf51ec085a99d9bda9365f528b13 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
;; This file is part of eris-cl.
;; Copyright (C) 2023 Piotr Szarmański

;; eris-cl is free software: you can redistribute it and/or modify it under the
;; terms of the GNU Lesser General Public License as published by the Free
;; Software Foundation, either version 3 of the License, or (at your option) any
;; later version.

;; eris-cl is distributed in the hope that it will be useful, but WITHOUT ANY
;; WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
;; A PARTICULAR PURPOSE. See the GNU General Public License for more details.

;; You should have received a copy of the GNU General Public License along with
;; eris-cl. If not, see <https://www.gnu.org/licenses/>.

(in-package :eris)


(defun split-into-n (i parts size)
  "Splits an integer I into PARTS parts. Returns a vector of length 1- PARTS with
those parts.."
  (declare (type integer i parts size))
  (let* ((k (* size (truncate (/ (/ i size) parts))))
         (vec (make-array
               parts
               :element-type 'integer
               :initial-element k)))
    (nlet rec ((remaining (- i (* k parts)))
               (i 0))
      (if (< remaining size)
          (setf (aref vec (1- parts)) (+ remaining (aref vec (1- parts))))
          (progn (setf (aref vec i) (+ size (aref vec i)))
                 (rec (- remaining size) (1+ i)))))
    vec))

(defgeneric p/eris-encode (input block-size output-function &key secret &allow-other-keys)
  (:documentation "Encode INPUT in parallel using LPARALLEL. Requires lparallel:*kernel* to be bound."))

;; eris-create-tree is not parallelized because it is insignificant on any large
;; input; for 32kib blocks, there's about 512 level 0 blocks for any higher
;; level block, almost three orders of magnitude.
(defmethod p/eris-encode ((input vector) block-size output-function
                          &key (secret null-secret) &allow-other-keys)
  (let ((v (split-into-n (length input) (lparallel:kernel-worker-count) block-size)))
    (eris-create-tree
     (apply #'concatenate 
      'simple-vector
      (map
       'list
       #'lparallel:force
       ;; Recursion to avoid demons
       (nreverse
        (nlet rec ((i 0) (k 0) (l nil))
          (if (eql i (length v)) l
              (rec
               (1+ i)
               (+ k (aref v i))
               (cons (lparallel:future
                       (chunk-array
                        input block-size output-function secret
                        :start k :end (+ k (aref v i))
                        :pad (if (eq i (1- (length v))) t nil)))
                     l)))))))
     block-size output-function)))

(defmethod p/eris-encode ((input pathname) block-size output-function
                          &key (secret null-secret) &allow-other-keys)
  (let ((v (split-into-n (file-size input) (lparallel:kernel-worker-count) block-size)))
    (eris-create-tree
     (apply #'concatenate
      'simple-vector
      (map
       'list
       #'lparallel:force
       (nreverse
        (nlet rec ((i 0) (k 0) (l nil))
          (if (eql i (length v)) l
              (rec
               (1+ i)
               (+ k (aref v i))
               (cons (lparallel:future
                       (with-open-file (f input :element-type 'octet)
                         (file-position f k)
                         (chunk-stream  
                          f block-size output-function (aref v i) secret 
                          :pad (if (eq i (1- (length v))) t nil))))
                     l)))))))
     block-size output-function)))

;; No methods for streams. A stream implementation could be done by allocating n
;; buffers, reading sequentially into each buffer and chunk-array'ing them until
;; eof, but it may be quite unoptimal, depending on the stream, buffer size,
;; etc.