package plebeia

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file node_storage.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
(*****************************************************************************)
(*                                                                           *)
(* Open Source License                                                       *)
(* Copyright (c) 2019,2020 DaiLambda, Inc. <contact@dailambda.jp>            *)
(*                                                                           *)
(* Permission is hereby granted, free of charge, to any person obtaining a   *)
(* copy of this software and associated documentation files (the "Software"),*)
(* to deal in the Software without restriction, including without limitation *)
(* the rights to use, copy, modify, merge, publish, distribute, sublicense,  *)
(* and/or sell copies of the Software, and to permit persons to whom the     *)
(* Software is furnished to do so, subject to the following conditions:      *)
(*                                                                           *)
(* The above copyright notice and this permission notice shall be included   *)
(* in all copies or substantial portions of the Software.                    *)
(*                                                                           *)
(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)
(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,  *)
(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL   *)
(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)
(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING   *)
(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER       *)
(* DEALINGS IN THE SOFTWARE.                                                 *)
(*                                                                           *)
(*****************************************************************************)

open Node_type
open Storage
open Result.Syntax
open Result.Infix

module B = Mmap.Buffer

(** node storage.
    See Layout.md for the format *)

exception ReadFailure of Error.t

(* tag must be larger than [- Storage.hard_limit_max_tag] *)
let tag_empty_bud = (1 lsl 32) -256
let tag_large_leaf = (1 lsl 32) -255
let tag_link = (1 lsl 32) -254

let max_small_leaf_size = 253 (* max_small_leaf_size < - tag_link *)
let least_small_leaf_tag = 1 lsl 32 - max_small_leaf_size

let zero_sized_leaf_index = Index.zero

let offset_of_index_part (t : Context.t) = t.bytes_per_cell - 4

let rec read_hash_prefix context i =
  let get_hash_prefix buf =
    Hash.Prefix.of_string @@ B.copy buf 0 context.Context.hash.bytes
  in
  let storage = Context.get_storage context in
  let offset_of_index_part = offset_of_index_part context in
  let bytes_per_cell = context.bytes_per_cell in
  let buf = get_cell storage i in
  let tag = B.get_index buf offset_of_index_part in
  let tag_int = Index.to_int tag in
  if i = zero_sized_leaf_index then
    let v = Value.of_string "" in
    let hp = Node_hash.of_leaf context.hash v in
    hp
  else begin
    match tag_int with
    | x when x = tag_empty_bud ->
        (* empty bud *)
        (* XXX defined as a constant
           Probably no, since empty bud is normally vanishes
        *)
        Node_hash.of_bud context.hash None

    | x when x = tag_link ->
        (* linked *)
        let i' = B.get_index buf (offset_of_index_part - 4) (*XXX*) in
        read_hash_prefix context i'

    | x when x = tag_large_leaf ->
        (* leaf whose value is in Plebeia *)
        get_hash_prefix buf

    | x when least_small_leaf_tag <= x ->
        (* leaf whose value is in the previous cell
           or, leaf whose value is in the 2 previous cells *)
        get_hash_prefix buf

    | _ when not context.hash.flags_combined ->
        let last_byte = B.get_uint8 buf (bytes_per_cell - 5) in
        begin match last_byte land 0b11 with
        | 0b01 ->
            (* extender  |<- segment ->0..0|6bits|01| |<- the index of the child ->| *)
            let i' = B.get_index buf offset_of_index_part in
            (* require to read the subnode to get the hash *)
            read_hash_prefix context i'

        | _ -> (* non empty bud / internal *)
            get_hash_prefix buf
        end

    | _ ->
        let bytes_per_hash = context.hash.bytes in
        let s_bytes_per_hash = B.copy buf 0 bytes_per_hash in
        let last_byte = Char.code @@ String.unsafe_get s_bytes_per_hash (bytes_per_hash - 1) in
        match last_byte land 0x03 with
        | 0b01 -> (* extender *)
            (* extender  |<- segment ->0..0|6bits|01| |<- the index of the child ->| *)
            let i' = B.get_index buf offset_of_index_part in
            read_hash_prefix context i'
        | 0b11 -> (* non empty bud *)
            get_hash_prefix buf

        | 0b00 | 0b10 -> (* internal *)
            (* the last 2 bits of the hash prefix must be reset *)
            let s_0_215 = B.copy buf 0 (bytes_per_hash - 1) in
            let c_216_223 =
              let c = Char.code @@ B.get_char buf (bytes_per_hash - 1) in
              Char.chr (c land 0xfc)
            in
            Hash.Prefix.of_string (s_0_215 ^ String.make 1 c_216_223)
        | _ -> assert false
  end

(* If [keep_hash=false], hashes are not read into the memory except trivial ones,
   but it remains in the disk cache *)
let rec parse_cell ~keep_hash context i =
  let storage = Context.get_storage context in
  let offset_of_index_part = offset_of_index_part context in
  let bytes_per_cell = context.bytes_per_cell in
  let buf = get_cell storage i in
  let get_hash_prefix () =
    Hash.Prefix.of_string @@ B.copy buf 0 context.Context.hash.bytes
  in
  let tag = B.get_index buf offset_of_index_part in
  let tag_int = Index.to_int tag in (* easier to match *)
  assert (tag_int < 1 lsl 32);
  if i = zero_sized_leaf_index then
    let v = Value.of_string "" in
    (* XXX h is precomputable *)
    let h = Hashed (Node_hash.of_leaf context.hash v) in
    _Leaf (v, Indexed i, h)
  else begin
    (* XXX Branching order is arbitrary.  We can have the best decision tree by taking
           the stats of the tags this function sees.
    *)
    if tag_int = tag_empty_bud then
      (* empty bud *)
      let h = Hashed context.hash.Hash.Hasher.zero in
      _Bud (None, Indexed i, h)
    else if tag_int = tag_large_leaf then
      (* leaf whose value is in Plebeia *)
      let v = Value.of_string @@ Chunk.read storage @@ Index.pred i in
      let h = if keep_hash then Hashed (get_hash_prefix ()) else Not_Hashed in
      _Leaf (v, Indexed i, h)
    else if tag_int = tag_link then
      (* linked *)
      let i' = B.get_index buf (offset_of_index_part - 4) (*XXX should be precomputed *) in
      parse_cell ~keep_hash context i'
    else if  1 lsl 32 -bytes_per_cell <= tag_int && tag_int <= 1 lsl 32 -1 then
      (* leaf whose value is in the previous cell *)
      let l = 1 lsl 32 - tag_int in (* 1 to bytes_per_cell *)
      let buf = get_cell storage (Index.pred i) in
      let v = Value.of_string @@ B.copy buf 0 l in
      let h = if keep_hash then Hashed (get_hash_prefix ()) else Not_Hashed in
      _Leaf (v, Indexed i, h)
    else if (1 lsl 32 - (min max_small_leaf_size (bytes_per_cell * 2))) <= tag_int
         && tag_int < 1 lsl 32 - bytes_per_cell then
      (* size is limited < -(the largest tag, -254) *)
      (* leaf whose value is in the 2 previous cells *)
      let l = 1 lsl 32 - tag_int in (* bytes_per_cell + 1 to bytes_per_cell * 2 *)
      assert (bytes_per_cell + 1 <= l && l <= bytes_per_cell * 2);
      let buf = get_cell2 storage (Index.pred @@ Index.pred i) in
      let v = Value.of_string @@ B.copy buf 0 l in
      let h = if keep_hash then Hashed (get_hash_prefix ()) else Not_Hashed in
      _Leaf (v, Indexed i, h)

    else if 1 lsl 32 -256 <= tag_int && tag_int <= 1 lsl 32 -1 then begin
      Format.eprintf "tag_int= %d@." (tag_int - 1 lsl 32);
      assert false
    end

    else if not context.hash.flags_combined then
      let last_byte = B.get_uint8 buf (bytes_per_cell - 5) in
      begin match last_byte land 0b11 with
      | 0b01 ->
          (* extender  |<- segment ->0..0|6bits|01| |<- the index of the child ->| *)
          let i' = B.get_index buf offset_of_index_part in
          let cells_extra = last_byte lsr 2 in
          assert (cells_extra <= 63);
          let nbytes = cells_extra * bytes_per_cell + bytes_per_cell - 5 in
          let buf = get_bytes storage Index.(i - Unsafe.of_int cells_extra) nbytes in
          let s = B.copy buf 0 nbytes in
          let seg = Segment.Serialization.decode_exn s in
          let v = parse_cell ~keep_hash context i' in (* XXX We can delay the read *)
          let h =
            match v with
            | Leaf (_, _, h)
            | Internal (_, _, _, h)
            | Bud (_ ,_, h) -> h
            | Extender _ -> assert false
          in
          _Extender (seg, View v, Indexed i, h)
      | 0b11-> (* non empty bud *)
          let i' = B.get_index buf offset_of_index_part in
          let h = if keep_hash then Hashed (get_hash_prefix ()) else Not_Hashed in
          _Bud (Some (Disk (i', Maybe_Extender)), Indexed i, h)

      | 0b00 | 0b10 -> (* internal *)
          let refer_to_right = last_byte land 0x03 = 2 in
          let i' = B.get_index buf offset_of_index_part in
          let j = Index.pred i in
          let h = if keep_hash then Hashed (get_hash_prefix ()) else Not_Hashed in
          if refer_to_right then
            _Internal (Disk (j, Maybe_Extender), Disk (i', Maybe_Extender), Indexed i, h)
          else
            _Internal (Disk (i', Maybe_Extender), Disk(j, Maybe_Extender), Indexed i, h)
      | _ -> assert false
      end

    else
      let bytes_per_hash = context.hash.bytes in
      let s_bytes_per_hash = B.copy buf 0 bytes_per_hash in
      let last_byte = Char.code @@ String.unsafe_get s_bytes_per_hash (bytes_per_hash - 1) in
      match last_byte land 0x03 with
      | 0b01 -> (* extender *)
          (* extender  |<- segment ->0..0|6bits|01| |<- the index of the child ->| *)
          let i' = B.get_index buf offset_of_index_part in
          let cells_extra = last_byte lsr 2 in
          let nbytes = cells_extra * bytes_per_cell + bytes_per_cell - 5 in
          let buf = get_bytes storage Index.(i - Unsafe.of_int cells_extra) nbytes in
          let s = B.copy buf 0 nbytes in
          let seg = Segment.Serialization.decode_exn s in
          let v = parse_cell ~keep_hash context i' in
          let h =
            match v with
            | Leaf (_, _, h)
            | Internal (_, _, _, h)
            | Bud (_ ,_, h) -> h
            | Extender _ -> assert false
          in
          _Extender (seg, View v, Indexed i, h)
      | 0b11 -> (* non empty bud *)
          let i' = B.get_index buf offset_of_index_part in
          let h = if keep_hash then Hashed (get_hash_prefix ()) else Not_Hashed in
          _Bud (Some (Disk (i', Maybe_Extender)), Indexed i, h)

      | 0b00 | 0b10 -> (* internal *)
          (* the last 2 bits of the hash prefix must be reset *)
          let refer_to_right =
            let c = Char.code @@ B.get_char buf (bytes_per_hash - 1) in
            (c land 2) = 2
          in
          let i' = B.get_index buf offset_of_index_part in
          (* Because of the link, Index.pred i may not the index of the child!
          *)
          let j =
            let i_pred = Index.pred i in
            let buf = get_cell storage i_pred in
            let tag = B.get_index buf offset_of_index_part in
            let tag_int = Index.to_int tag in (* easier to match *)
            if tag_int = tag_link (* it is a link *) then B.get_index buf (offset_of_index_part - 4) (*XXX*)
            else i_pred
          in
          let h =
            if keep_hash then
              (* the last 2 bits of the hash prefix must be reset *)
              let s_0_215 = B.copy buf 0 (bytes_per_hash - 1) in
              let c_216_223 =
                let c = Char.code @@ B.get_char buf (bytes_per_hash - 1) in
                Char.chr (c land 0xfc)
              in
              Hashed (Hash.Prefix.of_string (s_0_215 ^ String.make 1 c_216_223))
            else Not_Hashed
          in
          if refer_to_right then
            _Internal (Disk (j, Maybe_Extender), Disk (i', Maybe_Extender), Indexed i, h)
          else
            _Internal (Disk (i', Maybe_Extender), Disk(j, Maybe_Extender), Indexed i, h)
      | _ -> assert false
      end

(* Hash may not be not loaded *)
let read_node context (index : Index.t) (ewit:extender_witness) : view =
  let keep_hash = context.Context.keep_hash in
  let v = parse_cell ~keep_hash context index in
  Stat.incr_loaded_nodes context.Context.stat;
  match ewit, v with
  | Is_Extender, Extender _ -> v
  | Is_Extender, _ -> assert false (* better report *)
  | Maybe_Extender, Extender _ -> v
  | Not_Extender, Extender _ -> assert false (* better report *)
  | Not_Extender, _ -> v
  | Maybe_Extender, _ -> v

(* Hash are assured to be loaded *)
let read_node_with_hash context index ewit =
  let v = parse_cell ~keep_hash:true context index in
  Stat.incr_loaded_nodes context.Context.stat;
  match ewit, v with
  | Is_Extender, Extender _ -> v
  | Is_Extender, _ -> assert false (* better report *)
  | Maybe_Extender, Extender _ -> v
  | Not_Extender, Extender _ -> assert false (* better report *)
  | Not_Extender, _ -> v
  | Maybe_Extender, _ -> v

(* if the hash of the node is not loaded, load it if possible *)
let read_hash context n = match n with
  | Disk (i, ewit) ->
      let v = read_node_with_hash context i ewit in
      begin match v with
        | Leaf (_, _, Hashed hp) -> `Hashed ((hp, ""), View v)
        | Bud (_, _, Hashed hp) -> `Hashed ((hp, ""), View v)
        | Internal (_, _, _, Hashed hp) -> `Hashed ((hp, ""), View v)
        | Extender (seg, _, _, Hashed hp) -> `Hashed (Node_hash.of_extender seg hp, View v)
        | _ -> assert false
      end

  | Hash nh -> `Hashed (nh, n)

  | View (Leaf (_, _, Hashed hp)) -> `Hashed ((hp, ""), n)
  | View (Bud (_, _, Hashed hp)) -> `Hashed ((hp, ""), n)
  | View (Internal (_, _, _, Hashed hp)) -> `Hashed ((hp, ""), n)
  | View (Extender (seg, _, _, Hashed hp)) -> `Hashed (Node_hash.of_extender seg hp, n)

  | View (Leaf (a, Indexed i, Not_Hashed)) ->
      let hp = read_hash_prefix context i in
      let nh = hp, "" in
      `Hashed (nh, View (_Leaf (a, Indexed i, Hashed hp)))
  | View (Bud (a, Indexed i, Not_Hashed)) ->
      let hp = read_hash_prefix context i in
      let nh = hp, "" in
      `Hashed (nh, View (_Bud (a, Indexed i, Hashed hp)))
  | View (Internal (a, b, Indexed i, Not_Hashed)) ->
      let hp = read_hash_prefix context i in
      let nh = hp, "" in
      `Hashed (nh, View (_Internal (a, b, Indexed i, Hashed hp)))
  | View (Extender (seg, a, Indexed i, Not_Hashed)) ->
      let hp = read_hash_prefix context i in
      let nh = Node_hash.of_extender seg hp in
      `Hashed (nh, View (_Extender (seg, a, Indexed i, Hashed hp)))

  | View v ->
      (* No hash nor index available *)
      `Not_Hashed v

exception HashOnly of Hash.t

let view c = function
  | Hash h -> raise (HashOnly h)
  | View v -> v
  | Disk (i, ewit) -> read_node c i ewit

let read_node_fully ~reset_index context n =
  Traverse.Map.map
    ~enter:(fun n -> `Continue (view context n))
    ~leave:(fun ~org v ->
        let index = if reset_index then (fun _ -> Not_Indexed) else (fun i -> i) in
        View
          (match org, v with
           | Leaf (v, i, h), Leaf _ -> _Leaf (v, index i, h)
           | Internal (_, _, i, h), Internal(nl, nr, _, _) -> _Internal (nl, nr, index i, h)
           | Bud (_, i, h), Bud (no, _, _) -> _Bud (no, index i, h)
           | Extender (_, _, i, h), Extender (seg, n, _, _) -> _Extender (seg, n, index i, h)
           | _ -> assert false))
    n

let leaf _context (v, ir, hit) = View (_Leaf (v, ir, hit))

let internal _context (nl, nr, ir, hit) = View (_Internal (nl, nr, ir, hit))

let bud context (nopt, ir, hit) =
  View (match nopt with
      | None -> _Bud (None, ir, hit)
      | Some n ->
          (* Bud cannot have Disk to avoid having potential Bud-Bud/Bud-Leaf *)
          _Bud (Some (View (view context n)), ir, hit))

let extender context (seg, n, ir, hit) =
  (* Extender cannot have Disk to avoid having potential Extender-Extender *)
  View (_Extender (seg, View (view context n), ir, hit))

let change_context ~src:context ~dst:context' n =
  let node_cache' = context'.Context.node_cache in
  let enter n =
    let v = view context n in
    match Node_type.hash_of_view v with
    | None -> `Continue v
    | Some nh ->
        match Node_cache.find_opt node_cache' nh with
        | None -> `Continue v
        | Some i ->
            `Return (Disk (i, match v with Extender _ -> Is_Extender | _ -> Not_Extender))
  in
  (* We must reset the index of [context], since it is invalid for [context'] *)
  let leave =
    Fold.
      { leaf= (fun v _i h -> leaf context' (v, Not_Indexed, h))
      ; bud= (fun xop _i h -> bud context' (xop, Not_Indexed, h))
      ; extender= (fun seg n _i h -> extender context' (seg, n , Not_Indexed, h))
      ; internal= (fun nl nr _i h -> internal context' (nl, nr, Not_Indexed, h))
      }
  in
  Fold.fold ~enter ~leave n

(* index 32 bits (4294967296)
   block 32 bytes
   max size of the storage 137_438_953_472 =~ 128GB
*)
let index n = match index n with
  | Some i -> i
  | None -> assert false

let zero_link (t : Context.t) = String.make (t.bytes_per_cell - 8) '\000'

(* The code assumes the only one writer. *)

let write_small_value storage v =
  let+ i = Storage.new_index storage in
  let buf = get_cell storage i in
  let s = Value.to_string v in
  B.write_string s buf 0

let write_medium_value storage v =
  let+ i = Storage.new_indices storage 2 in
  let buf = get_cell2 storage i in
  B.write_string (Value.to_string v) buf 0

let write_large_value storage v =
  Storage.Chunk.write storage (Value.to_string v) >|= fun _ -> ()

let write_leaf context v hp =
  (* contents are ALREADY written *)
  let storage = Context.get_storage context in
  let bytes_per_hash = context.Context.hash.bytes in
  let offset_of_index_part = offset_of_index_part context in
  let bytes_per_cell = context.bytes_per_cell in
  let+ i = Storage.new_index storage in
  let len = Value.length v in
  if len <= bytes_per_cell * 2 then begin
    let buf = get_cell storage i in
    let s = Hash.Prefix.to_string hp in
    assert (String.length s = bytes_per_hash);
    B.write_string s buf 0;
    B.set_index buf offset_of_index_part (Index.Unsafe.of_int (-len)) (* 1 => -1  64 -> -64 *)
  end else begin
    let buf = get_cell storage i in
    let hp = Hash.Prefix.to_string hp in
    assert (String.length hp = bytes_per_hash);
    B.write_string hp buf 0;
    B.set_index buf offset_of_index_part (Index.of_int tag_large_leaf)
  end;
  Stat.incr_written_leaves context.Context.stat;
  Stat.incr_written_leaf_sizes context.Context.stat len;
  _Leaf (v, Indexed i, Hashed hp), i

let write_link context storage i index =
  (* |<- 0's ->|<-   child index  ->| |<- 2^32 - 254 ->| *)
  let zero_link = zero_link context in
  let offset_of_index_part = offset_of_index_part context in
  let buf = get_cell storage i in
  B.write_string zero_link buf 0;
  B.set_index buf (offset_of_index_part - 4)(*XXX*) index;
  B.set_index buf offset_of_index_part (Index.of_int tag_link);
  Stat.incr_written_links context.Context.stat

let with_node_cache { Context.node_cache ; _ } nh f =
  match Node_cache.find_opt node_cache nh with
  | Some i -> Ok i
  | None ->
      let+ i = f () in
      Node_cache.add node_cache nh i;
      i

let write_internal context nl nr nh =
  (* internal  |<- hash -------->|D|0| |<- the index of one of the child ----->|
       if combine_flags

     internal  |<- hash -------->|..0..|D|0| |<- the index of one of the child ----->|
       if not combine_flags
  *)
  let storage = Context.get_storage context in
  let offset_of_index_part = offset_of_index_part context in
  let bytes_per_cell = context.bytes_per_cell in
  let hp = Hash.prefix nh in
  let+ i = with_node_cache context nh @@ fun () ->
    let hpstr = Hash.Prefix.to_string hp in
    let il = index nl in
    let ir = index nr in

    let* i = Storage.new_index storage in
    let i' = Index.pred i in

    let+ must_refer_to, i =
      if i' = il then Ok (`Right, i)
      else if i' = ir then Ok (`Left, i)
      else begin
        (* Fat internal *)
        (* Write the link to the right at i *)
        write_link context storage i ir;
        let+ i = Storage.new_index storage in
        (`Left, i)
      end
    in

    let buf = get_cell storage i in
    let bytes_per_hash = context.hash.bytes in

    if context.hash.flags_combined then begin
      (* 0 to 215 bits *)
      B.blit_from_string hpstr 0 buf 0 (bytes_per_hash - 1);

      (* fix for the 223rd and 224th bits (pos 222, 223) *)
      B.set_char buf (bytes_per_hash - 1)
        (let c = Char.code @@ String.unsafe_get hpstr (bytes_per_hash - 1) in
         let c = c land 0xfc in
         Char.chr (if must_refer_to = `Left then c else c lor 2));

      (* next 32bits *)
      B.set_index buf offset_of_index_part (if must_refer_to = `Left then il else ir);
    end else begin
      B.blit_from_string hpstr 0 buf 0 bytes_per_hash;

      B.set_uint8 buf (bytes_per_cell - 5) (if must_refer_to = `Left then 0 else 2);

      (* next 32bits *)
      B.set_index buf offset_of_index_part (if must_refer_to = `Left then il else ir);
    end;
    Stat.incr_written_internals context.Context.stat;
    i
  in
  (_Internal (nl, nr, Indexed i, Hashed hp), i, nh)

let write_empty_bud context =
  let hash = context.Context.hash in
  let storage = Context.get_storage context in
  let offset_of_index_part = offset_of_index_part context in
  let bytes_per_cell = context.bytes_per_cell in
  let bud_first = String.make (bytes_per_cell - 4) '\255' in
  (* XXX No point to store the empty bud more than once... *)
  (* empty bud |<- 1111111111111111111111111111 ->| |<- 2^32 - 256 ->| *)
  let+ i = Storage.new_index storage in
  let buf = get_cell storage i in
  B.write_string bud_first buf 0;
  B.set_index buf offset_of_index_part (Index.of_int tag_empty_bud);
  Stat.incr_written_buds context.Context.stat;
  Stat.incr_written_empty_buds context.Context.stat;
  (* XXX hash is precomputable *)
  let hp = Node_hash.of_bud hash None in
  (_Bud (None, Indexed i, Hashed hp), i, (hp, ""))

let write_bud context n nh =
  (* bud       |<- hash -------->|1|1| |<- the index of the child ->| *)
  let hash = context.Context.hash in
  let storage = Context.get_storage context in
  let offset_of_index_part = offset_of_index_part context in
  let bytes_per_cell = context.bytes_per_cell in
  let hp = Hash.prefix nh in
  let+ i = with_node_cache context nh @@ fun () ->
    let+ i = Storage.new_index storage in
    let buf = get_cell storage i in
    if hash.flags_combined then begin
      (* flags are already in [hp] *)
      let s = Hash.Prefix.to_string hp in
      B.write_string s buf 0;
      B.set_index buf offset_of_index_part @@ index n;
    end else begin
      let s = Hash.Prefix.to_string hp in
      B.write_string s buf 0;
      B.set_uint8 buf (bytes_per_cell - 5) 0b11;
      B.set_index buf offset_of_index_part @@ index n;
    end;
    Stat.incr_written_buds context.Context.stat;
    i
  in
  (_Bud (Some n, Indexed i, Hashed hp), i, nh)

let write_extender context seg n nh =
  (* extender  |<- segment --|..|---------->|6bits|01| |<- the index of the child ->| *)
  let storage = Context.get_storage context in
  let offset_of_index_part = offset_of_index_part context in
  let bytes_per_cell = context.bytes_per_cell in
  let hp = Hash.prefix nh in
  let+ i' = with_node_cache context nh @@ fun () ->
    let sseg = Segment.Serialization.encode seg in
    let extra_cells =
      (String.length sseg - (bytes_per_cell - 5) + bytes_per_cell - 1) / bytes_per_cell
    in
    if extra_cells > 63 then assert false;
    let+ i = Storage.new_indices storage (extra_cells + 1) in
    let i' = Index.(+) i (Index.Unsafe.of_int extra_cells) in
    let buf = get_cell storage i' in
    B.set_index buf offset_of_index_part @@ index n;
    B.set_uint8 buf (bytes_per_cell - 5) (extra_cells lsl 2 + 0b01);

    let buf = get_bytes storage i ((extra_cells + 1) * bytes_per_cell) in
    B.write_string sseg buf 0;
    let zeros = extra_cells * bytes_per_cell + (bytes_per_cell - 5) - String.length sseg in
    B.write_string (String.make zeros '\x00') buf (String.length sseg);
    Stat.incr_written_extenders context.Context.stat;
    i'
  in
  (_Extender (seg, n, Indexed i', Hashed hp), i', nh)

let write_node ?(clear=true) context node =
  if Context.is_memory_only context then
    failwith "MemoryOnly context cannot write a node"
  else
    let storage = context.Context.storage in
    let hash = context.Context.hash in
    if not (Context.mode context = Writer) then
      failwith "Non Writer context cannot write a node";
    let bytes_per_cell = context.bytes_per_cell in
    let hashcons = context.Context.hashcons in
    (* XXX It is not tail recursive.  May crash if the depth of
       the tree exceeds the stack limit *)
    let rec write_aux : node -> (node * Index.t * Hash.t, _) result = function
      | Hash _ -> assert false (* XXX cannot write *)
      | Disk (index, wit) ->
          (* Need to get the hash from the disk *)
          let v = read_node_with_hash context index wit in
          let nh = Option.from_Some @@ Node_type.hash_of_view v in
          (* XXX Do we need to return the view? *)
          Ok ((if clear then Disk (index, wit) else View v), index, nh)
      | View v ->
          let+ v', i, nh = write_aux' v in
          let wit = match v with
            | Extender _ -> Is_Extender
            | _ -> Not_Extender
          in
          ((if clear then Disk (i, wit) else View v'), i, nh)

    and write_aux' : view -> (view * Index.t * Hash.t, _) result = fun v ->
      match v with
      | Leaf (_, Indexed i, h)
      | Bud (_, Indexed i, h)
      | Internal (_, _, Indexed i, h) ->
          let hp = match h with
          | Hashed hp -> hp
          | Not_Hashed -> read_hash_prefix context i in
          let nh = Hash.of_prefix hp in
          Ok (v, i, nh)
      | Extender (seg, _, Indexed i, h) ->
          let hp = match h with
          | Hashed hp -> hp
          | Not_Hashed -> read_hash_prefix context i in
          let nh = Node_hash.of_extender seg hp in
          Ok (v, i, nh)

      (* indexing is necessary below.  If required, the hash is also computed *)
      | Leaf (value, Not_Indexed, h) ->
          (* if the size of the value is 1 <= size <= bytes_per_cell, the contents are
              written to the previous index of the leaf

              XXX we may write the value at the node bytes_per_cell > 32 and
              the value is very small
          *)
          let hp = match h with
          | Hashed hp -> hp
          | Not_Hashed -> Node_hash.of_leaf hash value in
          let nh = Hash.of_prefix hp in
          let len = Value.length value in
          let create_new () =
            if len <> 0 then Stat.incr_committed_leaf_sizes context.Context.stat len;
            let* () =
              if len <= bytes_per_cell
              then
                write_small_value storage value
              else if
                (* cannot exceed the minimum tag -254 *)
                bytes_per_cell + 1 <= len && len <= min (bytes_per_cell * 2) max_small_leaf_size
              then
                write_medium_value storage value
              else
                write_large_value storage value
            in
            let+ v, i = write_leaf context value hp in
            (v, i, nh)
          in
          if len = 0 then
            (* We don't store 0 size leaves *)
            let index = zero_sized_leaf_index in
            Ok (_Leaf (value, Indexed index, Hashed hp), index, nh)
          else if 1 <= len && len <= (Hashcons.config context.Context.hashcons).max_leaf_size (* XXX should have an API *) then begin
            (* try hashcons *)
            match Hashcons.find hashcons value with
            | Error e -> Error.raise e
            | Ok (Some index) ->
                Ok (_Leaf (value, Indexed index, Hashed hp), index, nh)
            | Ok None ->
                let+ v, i, nh = create_new () in
                begin match Hashcons.add hashcons value i with
                  | Ok () -> ()
                  | Error e -> Error.raise e
                end;
                (v, i, nh)
          end else create_new ()

      | Bud (None, Not_Indexed, _) -> write_empty_bud context

      | Bud (Some underneath, Not_Indexed, h) ->
          let* (node, _, nh') = write_aux underneath in
          let hp = match h with
          | Hashed hp -> hp
          | Not_Hashed -> Node_hash.of_bud hash (Some nh') in
          let nh = Hash.of_prefix hp in
          write_bud context node nh

      | Internal (left, right, Not_Indexed, h) ->
          let* (left, _il, nhl) = write_aux left in
          let* (right, _ir, nhr) = write_aux right in
          let hp = match h with
          | Hashed hp -> hp
          | Not_Hashed -> Node_hash.of_internal hash nhl nhr in
          let nh = Hash.of_prefix hp in
          write_internal context left right nh

      | Extender (segment, underneath, Not_Indexed, _) ->
          let* (underneath, _i, nh') = write_aux underneath in
          let hp' = Hash.prefix nh' in
          let nh = Node_hash.of_extender segment hp' in
          write_extender context segment underneath nh
    in
    let+ (node, i, nh) = write_aux node in
    (* shrink the hashcons buckets *)
    Hashcons.shrink context.hashcons;
    Node_cache.shrink context.node_cache;
    (node, i, Hash.prefix nh)

module Internal = struct
  let parse_cell = parse_cell

  let read_node_fully_for_test context n =
    let rec aux n =
      let v = match n with
        | Hash _ -> assert false
        | Disk (i, ewit) -> read_node context i ewit
        | View v -> v
      in
      match v with
      | Leaf _ -> View v
      | Bud (None, _, _) -> View v
      | Bud (Some n, i, h) ->
          let n = aux n in
          View (_Bud (Some n, i, h))
      | Internal (n1, n2, i, h) ->
          let n1 = aux n1 in
          let n2 = aux n2 in
          View (_Internal (n1, n2, i, h))
      | Extender (seg, n, i, h) ->
          let n = aux n in
          View (_Extender (seg, n, i, h))
    in
    aux n

  let equal_for_test context n1 n2 =
    let rec aux = function
      | [] -> Ok ()
      | (n1,n2)::rest ->
          match n1, n2 with
          | Disk (i1, ew1), Disk (i2, ew2) when i1 = i2 && ew1 = ew2 -> aux rest
          | Disk _, Disk _ -> Error (n1,n2)
          | Disk (i, ew), n2 ->
              let n1 = View (read_node context i ew) in
              aux @@ (n1,n2)::rest
          | n1, Disk (i, ew) ->
              let n2 = View (read_node context i ew) in
              aux @@ (n1,n2)::rest
          | View v1, View v2 ->
              begin match v1, v2 with
              | Internal (n11, n12, _, _), Internal (n21, n22, _, _) ->
                  aux @@ (n11,n21)::(n12,n22)::rest
              | Bud (None, _, _), Bud (None, _, _) -> aux rest
              | Bud (Some n1, _, _), Bud (Some n2, _, _) -> aux @@ (n1,n2) :: rest
              | Leaf (v1, _, _), Leaf (v2, _, _) when v1 = v2 -> aux rest
              | Extender (seg1, n1, _, _), Extender (seg2, n2, _, _) when Segment.equal seg1 seg2 ->
                  aux @@ (n1,n2)::rest
              | _ -> Error (n1,n2)
              end
          | Hash h1, Hash h2 when h1 = h2 -> Ok ()
          | Hash _, _ | _, Hash _ -> Error (n1, n2)
    in
    aux [(n1, n2)]
end