package flux

  1. Overview
  2. Docs

Source file flux.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
(* Part of this code is based on the streaming project and
   Copyright (c) 2020 Rizo I. <rizo@odis.io>
   SPDX-License-Identifier: ISC
   Copyright (c) 2024 Romain Calascibetta <romain.calascibetta@gmail.com>
*)

module Bqueue = Bqueue

external reraise : exn -> 'a = "%reraise"

type +'a source =
  | Source : {
        init: unit -> 's
      ; pull: 's -> ('a * 's) option
      ; stop: 's -> unit
    }
      -> 'a source

module Source = struct
  let unfold seed pull =
    let init = Fun.const seed and stop = Fun.const () in
    Source { init; pull; stop }

  let list lst =
    let pull = function [] -> None | x :: r -> Some (x, r) in
    Source { init= Fun.const lst; pull; stop= ignore }

  let seq seq =
    let pull = Seq.uncons in
    Source { init= Fun.const seq; pull; stop= ignore }

  let array arr =
    let len = Array.length arr in
    let init = Fun.const 0
    and pull idx = if idx >= len then None else Some (arr.(idx), succ idx)
    and stop = Fun.const () in
    Source { init; pull; stop }

  let string str =
    let len = String.length str in
    let init = Fun.const 0
    and pull idx = if idx >= len then None else Some (str.[idx], succ idx)
    and stop = Fun.const () in
    Source { init; pull; stop }

  let queue q =
    let init = Fun.const q in
    let pull q = try Some (Queue.pop q, q) with Queue.Empty -> None
    and stop = Fun.const () in
    Source { init; pull; stop }

  let bqueue : type a.
      ?stop:[ `Ignore | `Halt | `Close ] -> (a, a option) Bqueue.t -> a source =
   fun ?stop:(behavior = `Ignore) q ->
    let init = Fun.const q
    and pull q = Option.map (fun a -> (a, q)) (Bqueue.get q)
    and stop =
      match behavior with
      | `Ignore -> ignore
      | `Halt -> Bqueue.halt
      | `Close -> Bqueue.close
    in
    Source { init; pull; stop }

  let map fn (Source src) =
    let pull s =
      let fn (v, s) = (fn v, s) in
      Option.map fn (src.pull s)
    in
    Source { src with pull }

  let empty =
    let init = Fun.const () and pull = Fun.const None and stop = Fun.const () in
    Source { init; pull; stop }

  let dispose (Source src) = src.stop (src.init ())

  let resource ~finally pull resource =
    let init () =
      let r = Miou.Ownership.create ~finally resource in
      Miou.Ownership.own r; (r, resource)
    (* NOTE(dinosaure): we probably need [Miou.Ownership.with_value] which can
       update the [resource] kept by Miou with a new one. Currently, we
       physically use the same [resource] for all [pull] and for
       [stop]/[finally]. *)
    and pull (r, resource) =
      match pull resource with
      | Some x -> Some (x, (r, resource))
      | None -> None
    and stop (r, _) = Miou.Ownership.release r in
    Source { init; pull; stop }

  type 'a task = ('a, 'a option) Bqueue.t -> unit

  let with_task ?(parallel = false) ?(halt = false) ~size fn =
    let bqueue =
      match halt with
      | true -> Bqueue.(create with_close_and_halt size)
      | false -> Bqueue.(create with_close size)
    in
    let init () =
      let finally = if halt then Bqueue.halt else Bqueue.close in
      let res = Miou.Ownership.create ~finally bqueue in
      Miou.Ownership.own res;
      let spawn ~give fn =
        if parallel then Miou.call ~give fn else Miou.async ~give fn
      in
      spawn ~give:[ res ] @@ fun () -> fn bqueue; Miou.Ownership.disown res
    and pull prm =
      match Option.map (fun a -> (a, prm)) (Bqueue.get bqueue) with
      | None -> Miou.await_exn prm; None
      | Some _ as prm -> prm
    (* NOTE(dinosaure): A task that has completed successfully can be cancelled.
       The idea behind using [Miou.cancel] rather than [Miou.await_exn] is that
       the user may want to force the producer to terminate. Thanks to
       [Miou.Ownership], we can be sure that if [fn] raises an exception or is
       cancelled (with [Miou.cancel]), the queue is closed properly. *)
    and stop = Miou.cancel in
    Source { init; pull; stop }

  let with_formatter ?halt ~size fn =
    with_task ?halt ~size @@ fun q ->
    let out str off len = Bqueue.put q (String.sub str off len) in
    fn (Format.make_formatter out ignore);
    Bqueue.close q

  let each fn (Source src) =
    let rec go acc =
      match src.pull acc with
      | None -> src.stop acc
      | Some (x, acc) -> fn x; go acc
    in
    go (src.init ())

  let file ~filename len =
    if len <= 0 then invalid_arg "Flux.Source.file";
    let init () = (Bytes.create len, lazy (Stdlib.open_in_bin filename))
    and stop (_buf, ic) = if Lazy.is_val ic then close_in (Lazy.force ic)
    and pull ((buf, ic) as state) =
      let len = input (Lazy.force ic) buf 0 (Bytes.length buf) in
      if len = 0 then None else Some (Bytes.sub_string buf 0 len, state)
    in
    Source { init; stop; pull }

  let in_channel ?(close = true) ic =
    let init () = (Bytes.create 0x7ff, ic)
    and stop (_buf, ic) = if ic != stdin && close then close_in ic
    and pull ((buf, ic) as state) =
      try
        let len = Stdlib.input ic buf 0 (Bytes.length buf) in
        if len = 0 then None else Some (Bytes.sub_string buf 0 len, state)
      with End_of_file -> None
    in
    Source { init; stop; pull }

  let next (Source src) =
    let s0 = src.init () in
    try
      match src.pull s0 with
      | Some (x, s1) -> Some (x, Source { src with init= Fun.const s1 })
      | None -> None
    with exn -> src.stop s0; reraise exn
end

type ('a, 'r) sink =
  | Sink : {
        init: unit -> 's
      ; push: 's -> 'a -> 's
      ; full: 's -> bool
      ; stop: 's -> 'r
    }
      -> ('a, 'r) sink

module Sink = struct
  let fill x =
    let init = Fun.const ()
    and push _ = Fun.const ()
    and full = Fun.const true
    and stop = Fun.const x in
    Sink { init; push; full; stop }

  let fold push init =
    let init = Fun.const init and full = Fun.const false and stop = Fun.id in
    Sink { init; push; full; stop }

  let array =
    let init () = ([], 0)
    and push (acc, len) x = (x :: acc, len + 1)
    and full _ = false
    and stop (acc, len) =
      match acc with
      | [] -> [||]
      | [ x ] -> [| x |]
      | x :: r ->
          let arr = Array.make len x in
          let fn idx x = arr.(len - idx - 2) <- x in
          List.iteri fn r; arr
    in
    Sink { init; push; full; stop }

  let string =
    let init () = Buffer.create 0x7ff in
    let push buf str = Buffer.add_string buf str; buf in
    let full = Fun.const false in
    let stop = Buffer.contents in
    Sink { init; push; full; stop }

  let list =
    let init () = []
    and push acc x = x :: acc
    and full _ = false
    and stop acc = List.rev acc in
    Sink { init; push; full; stop }

  let seq init =
    let init = Fun.const init in
    let push acc x = Seq.cons x acc in
    let full = Fun.const false in
    let stop = Fun.id in
    Sink { init; push; full; stop }

  let buffer len =
    if len < 0 then invalid_arg "Flux.Sink.buffer: negative buffer size";
    if len = 0 then fill [||]
    else
      let buf = Array.make len None in
      let init () = 0
      and push idx x = Array.set buf idx (Some x); idx + 1
      and full idx = idx = len
      and stop len = Array.init len (fun idx -> Option.get buf.(idx)) in
      Sink { init; push; full; stop }

  let file ~filename =
    let init () = lazy (Stdlib.open_out_bin filename) in
    let stop oc = if Lazy.is_val oc then close_out (Lazy.force oc) in
    let push oc str =
      let ch = Lazy.force oc in
      Stdlib.output_string ch str;
      Stdlib.flush ch;
      oc
    in
    let full _ = false in
    Sink { init; stop; full; push }

  let out_channel ?(close = true) oc =
    let init () = oc
    and stop oc = if oc != stdout && close then close_out oc
    and push oc str =
      Stdlib.output_string oc str;
      Stdlib.flush oc;
      oc
    and full _ = false in
    Sink { init; stop; full; push }

  let drain =
    let init () = () in
    let push () _ = () in
    let full () = false in
    let stop () = () in
    Sink { init; push; full; stop }

  let each ?(parallel = false) ~init ~merge fn =
    let rec terminate ?exn (acc, orphans) =
      match (Miou.care orphans, exn) with
      | None, None -> Ok (acc, orphans)
      | None, Some exn -> Error exn
      | Some None, _ ->
          Miou.yield ();
          terminate ?exn (acc, orphans)
      | Some (Some prm), _ -> (
          match (Miou.await prm, exn) with
          | Ok x, _ -> terminate ?exn (merge x acc, orphans)
          | Error exn, None -> terminate ~exn (acc, orphans)
          | Error _, _ -> terminate ?exn (acc, orphans))
    in
    let rec clean (acc, orphans) =
      match Miou.care orphans with
      | None | Some None -> Ok (acc, orphans)
      | Some (Some prm) -> (
          match Miou.await prm with
          | Ok x -> clean (merge x acc, orphans)
          | Error exn -> terminate ~exn (acc, orphans))
    in
    let init () = Ok (init, Miou.orphans ()) in
    let push value x =
      match Result.bind value clean with
      | Ok (acc, orphans) ->
          if parallel then ignore (Miou.call ~orphans @@ fun () -> fn x)
          else ignore (Miou.async ~orphans @@ fun () -> fn x);
          Ok (acc, orphans)
      | Error _ as err -> err
    in
    let full = Result.is_error in
    let stop x =
      match Result.(map fst (bind x terminate)) with
      | Ok acc -> acc
      | Error exn -> raise exn
    in
    Sink { init; stop; full; push }

  let sequential ~stop (Sink l) (Sink r) =
    let init () = (l.init (), r.init ())
    and push (l_acc, r_acc) x = (l.push l_acc x, r.push r_acc x)
    and full (l_acc, r_acc) = l.full l_acc || r.full r_acc
    and stop (l_acc, r_acc) = stop (l.stop l_acc, r.stop r_acc) in
    Sink { init; push; full; stop }

  let zip x y = sequential ~stop:Fun.id x y

  let parallel ~stop ~limit l r =
    let init () =
      let is_full = Atomic.make false in
      let lq = Bqueue.create Bqueue.with_close limit in
      let rq = Bqueue.create Bqueue.with_close limit in
      let fn (Sink s, q) =
        let rec go acc () =
          match Bqueue.get q with
          | Some x ->
              if s.full acc then (Atomic.set is_full true; s.stop acc)
              else go (s.push acc x) ()
          | None -> s.stop acc
        in
        go (s.init ())
      in
      let lprm = Miou.call (fn (l, lq)) in
      let rprm = Miou.call (fn (r, rq)) in
      (is_full, lq, lprm, rq, rprm)
    and push ((_, lq, _, rq, _) as state) x =
      Bqueue.put lq x; Bqueue.put rq x; state
    and full (is_full, _, _, _, _) = Atomic.get is_full
    and stop (_, lq, lprm, rq, rprm) =
      Bqueue.close lq;
      Bqueue.close rq;
      let lres = Miou.await_exn lprm in
      let rres = Miou.await_exn rprm in
      stop (lres, rres)
    in
    Sink { init; push; full; stop }

  let both x y = parallel ~stop:Fun.id ~limit:0x7ff x y

  let unzip (Sink l) (Sink r) =
    let init () = (l.init (), r.init ())
    and push (l_acc, r_acc) (x, y) = (l.push l_acc x, r.push r_acc y)
    and full (l_acc, r_acc) = l.full l_acc || r.full r_acc
    and stop (l_acc, r_acc) = (l.stop l_acc, r.stop r_acc) in
    Sink { init; push; full; stop }

  type ('a, 'b) race = Left of 'a | Right of 'b | Both of 'a * 'b

  let race (Sink l) (Sink r) =
    let init () = Both (l.init (), r.init ())
    and push state x =
      match state with
      | Both (l_acc, r_acc) ->
          let l_acc' = l.push l_acc x in
          let r_acc' = r.push r_acc x in
          if l.full l_acc' then Left l_acc'
          else if r.full r_acc' then Right r_acc'
          else Both (l_acc', r_acc')
      | _ -> failwith "Flux.Sink.race: one of the sinks is already filled"
    in
    let full = function Both _ -> false | _ -> true in
    let stop = function
      | Left l_acc -> Left (l.stop l_acc)
      | Right r_acc -> Right (r.stop r_acc)
      | Both (l_acc, r_acc) -> Both (l.stop l_acc, r.stop r_acc)
    in
    Sink { init; push; full; stop }

  let map fn (Sink k) = Sink { k with stop= (fun x -> fn (k.stop x)) }

  let premap fn (Sink k) =
    Sink { k with push= (fun acc x -> k.push acc (fn x)) }

  let length =
    let init _ = 0 and push acc _ = acc + 1 and full _ = false and stop x = x in
    Sink { init; push; full; stop }

  type ('top, 'a, 'b) flat_map =
    | Flat_map_top : 'top -> ('top, 'a, 'b) flat_map
    | Flat_map_sub : {
          init: 'sub
        ; push: 'sub -> 'a -> 'sub
        ; full: 'sub -> bool
        ; stop: 'sub -> 'b
      }
        -> ('top, 'a, 'b) flat_map

  let flat_map fn (Sink top) =
    let init () = Flat_map_top (top.init ()) in
    let push s x =
      match s with
      | Flat_map_top acc ->
          let acc' = top.push acc x in
          if top.full acc' then
            let r = top.stop acc' in
            let (Sink sub) = fn r in
            Flat_map_sub
              {
                init= sub.init ()
              ; push= sub.push
              ; full= sub.full
              ; stop= sub.stop
              }
          else Flat_map_top acc'
      | Flat_map_sub sub -> Flat_map_sub { sub with init= sub.push sub.init x }
    in
    let full = function
      | Flat_map_top acc -> top.full acc
      | Flat_map_sub sub -> sub.full sub.init
    in
    let stop = function
      | Flat_map_top acc ->
          let (Sink sub) = fn (top.stop acc) in
          sub.stop (sub.init ())
      | Flat_map_sub sub -> sub.stop sub.init
    in
    Sink { init; push; full; stop }

  let full =
    let init () = ()
    and push () _ = invalid_arg "Flux.Sink.full: push to full sink"
    and full () = true
    and stop () = () in
    Sink { init; push; full; stop }

  let is_full (Sink k) = k.full (k.init ())

  let is_empty =
    let init () = true
    and push _ _ = false
    and full _ = false
    and stop = Fun.id in
    Sink { init; push; full; stop }

  module Syntax = struct
    let ( let* ) x fn = flat_map fn x
    let ( let+ ) x fn = map fn x
    let ( and+ ) x y = zip x y
  end

  module Infix = struct
    let ( >>= ) x fn = flat_map fn x
    let ( <@> ) x fn = map fn x
    let ( <&> ) x y = zip x y
    let ( <|> ) x y = race x y

    let ( <*> ) l r =
      let stop (fn, x) = fn x in
      sequential ~stop l r
  end
end

type ('a, 'b) flow = { flow: 'r. ('b, 'r) sink -> ('a, 'r) sink } [@@unboxed]

type bstr =
  (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t

module Unsafe = struct
  external bset_uint8 : bstr -> int -> int -> unit = "%caml_ba_set_1"
  external bset_uint32 : bstr -> int -> int32 -> unit = "%caml_bigstring_set32"
  external sget_uint32 : string -> int -> int32 = "%caml_string_get32"
  external sget_uint8 : string -> int -> int = "%string_unsafe_get"

  let blit_from_string src ~src_off dst ~dst_off ~len =
    let len0 = len land 3 in
    let len1 = len asr 2 in
    for i = 0 to len1 - 1 do
      let i = i * 4 in
      let v = sget_uint32 src (src_off + i) in
      bset_uint32 dst (dst_off + i) v
    done;
    for i = 0 to len0 - 1 do
      let i = (len1 * 4) + i in
      let v = sget_uint8 src (src_off + i) in
      bset_uint8 dst (dst_off + i) v
    done
end

module Flow = struct
  let identity = { flow= Fun.id }
  let compose { flow= f } { flow= g } = { flow= (fun sink -> f (g sink)) }
  let ( << ) a b = compose a b
  let ( >> ) b a = compose a b

  let tap fn =
    let flow (Sink k) =
      let push r x = fn x; k.push r x in
      Sink { k with push }
    in
    { flow }

  let bstr ~len =
    let open Bigarray in
    let flow (Sink k) =
      let init () =
        let bstr = Array1.create char c_layout len in
        (k.init (), bstr, 0)
      and push (acc, bstr, dst_off) str =
        let rec go acc src_off dst_off =
          if src_off = String.length str then (acc, bstr, dst_off)
          else
            let rem_bstr = Array1.dim bstr - dst_off
            and rem_str = String.length str - src_off in
            let len = Int.min rem_bstr rem_str in
            Unsafe.blit_from_string str ~src_off bstr ~dst_off ~len;
            if dst_off + len = Array1.dim bstr then
              let acc = k.push acc bstr in
              if k.full acc then (acc, bstr, 0) else go acc (src_off + len) 0
            else (acc, bstr, dst_off + len)
        in
        go acc 0 dst_off
      and full (acc, _, _) = k.full acc
      and stop (acc, bstr, dst_off) =
        if dst_off > 0 && not (k.full acc) then
          let bstr = Array1.sub bstr 0 dst_off in
          k.stop (k.push acc bstr)
        else k.stop acc
      in
      Sink { init; push; full; stop }
    in
    { flow }

  let split_on_char chr =
    let flow (Sink k) =
      let init () = (Buffer.create 0x7ff, k.init ()) in
      let push (buf, acc) str =
        match String.split_on_char chr str with
        | [] -> assert false
        | [ x ] -> Buffer.add_string buf x; (buf, acc)
        | [ x; y ] ->
            Buffer.add_string buf x;
            let acc = k.push acc (Buffer.contents buf) in
            Buffer.clear buf; Buffer.add_string buf y; (buf, acc)
        | x :: rest ->
            Buffer.add_string buf x;
            let acc = k.push acc (Buffer.contents buf) in
            Buffer.clear buf;
            let rec go acc = function
              | [] -> (buf, acc)
              | [ x ] -> Buffer.add_string buf x; (buf, acc)
              | x :: r when not (k.full acc) -> go (k.push acc x) r
              | _ -> (buf, acc)
            in
            go acc rest
      in
      let full (_, acc) = k.full acc in
      let stop (buf, acc) =
        if Buffer.length buf > 0 && not (k.full acc) then
          let acc = k.push acc (Buffer.contents buf) in
          k.stop acc
        else k.stop acc
      in
      Sink { init; push; full; stop }
    in
    { flow }

  let map fn =
    let flow (Sink k) =
      let push r x = k.push r (fn x) in
      Sink { k with push }
    in
    { flow }

  let take n =
    let flow (Sink k) =
      let init () = (k.init (), 0)
      and push (acc, idx) x = (k.push acc x, succ idx)
      and full (acc, idx) = k.full acc || idx = n
      and stop (acc, _) = k.stop acc in
      Sink { init; push; full; stop }
    in
    { flow }

  let transfer bqueue push acc =
    let rec go acc () =
      match Bqueue.get bqueue with None -> acc | Some a -> go (push acc a) ()
    in
    go acc

  let filter fn =
    let flow (Sink k) =
      let push r x = if fn x then k.push r x else r in
      Sink { k with push }
    in
    { flow }

  let filter_map fn =
    let flow (Sink k) =
      let push r x = match fn x with Some x -> k.push r x | None -> r in
      Sink { k with push }
    in
    { flow }

  let bound limit =
    let flow (Sink k) =
      let init () =
        let bqueue = Bqueue.(create with_close limit) in
        let acc = k.init () in
        let prm = Miou.async (transfer bqueue k.push acc) in
        (bqueue, prm)
      in
      let push (bqueue, prm) a = Bqueue.put bqueue a; (bqueue, prm) in
      let full _ = false in
      let stop (bqueue, prm) =
        Bqueue.close bqueue;
        let acc = Miou.await_exn prm in
        k.stop acc
      in
      Sink { init; stop; full; push }
    in
    { flow }
end

external reraise : exn -> 'a = "%reraise"

type 'a stream = { stream: 'r. ('a, 'r) sink -> 'r } [@@unboxed]

module Stream = struct
  let run ~from:(Source src) ~via:{ flow } ~into:snk =
    let (Sink snk) = flow snk in
    let rec loop r s =
      match snk.full r with
      | true ->
          let r' = snk.stop r in
          (* NOTE(dinosaure): it's really important to replace [init] by the
             current source's state. By this way, the user is able to
             [Source.dispose] without re-init the given source [src]. *)
          let leftover = Source { src with init= Fun.const s } in
          (r', Some leftover)
      | false -> begin
          match src.pull s with
          | Some (x, s') -> loop (snk.push r x) s'
          | None ->
              src.stop s;
              let r' = snk.stop r in
              (r', None)
        end
    in
    let r0 = snk.init () in
    match snk.full r0 with
    | true ->
        let r' = snk.stop r0 in
        (r', Some (Source src))
    | false -> (
        let s0' = ref None in
        try
          let s0 = src.init () in
          s0' := Some s0;
          loop r0 s0
        with exn ->
          Option.iter src.stop !s0';
          let _ = snk.stop r0 in
          reraise exn)

  let into sink t = t.stream sink

  let via { flow } t =
    let stream sink = into (flow sink) t in
    { stream }

  let from (Source src) =
    let stream (Sink k) =
      let rec go r s =
        let is_full = k.full r in
        if is_full then k.stop r
        else
          match src.pull s with
          | None -> src.stop s; k.stop r
          | Some (x, s') ->
              let r' = k.push r x in
              go r' s'
      in
      let r0 = k.init () in
      let is_full = k.full r0 in
      if is_full then k.stop r0
      else
        let s0' = ref None in
        try
          let s0 = src.init () in
          s0' := Some s0;
          go r0 s0
        with exn ->
          Option.iter src.stop !s0';
          let _ = k.stop r0 in
          reraise exn
    in
    { stream }

  let map fn t = via (Flow.map fn) t
  let filter fn t = via (Flow.filter fn) t
  let file ~filename = into (Sink.file ~filename)
  let drain t = into Sink.drain t

  let empty =
    let stream (Sink k) = k.stop (k.init ()) in
    { stream }

  let each ?parallel fn t =
    into (Sink.each ?parallel ~init:() ~merge:Fun.const fn) t

  let range ?by:(step = 1) n m =
    if n > m then invalid_arg "Flux.Stream.range: invalid range";
    let stream (Sink k) =
      let rec go i r =
        if k.full r then r else if i >= m then r else go (i + step) (k.push r i)
      in
      k.stop (go n (k.init ()))
    in
    { stream }

  let interpose sep t =
    let stream (Sink k) =
      let started = ref false in
      let push acc x =
        match !started with
        | true ->
            let acc = k.push acc sep in
            if k.full acc then acc else k.push acc x
        | false ->
            started := true;
            k.push acc x
      in
      t.stream (Sink { k with push })
    in
    { stream }

  let bracket : init:(unit -> 's) -> stop:('s -> 'r) -> ('s -> 's) -> 'r =
   fun ~init ~stop fn ->
    let acc = init () in
    try stop (fn acc)
    with exn ->
      ignore (stop acc);
      reraise exn

  let repeat ?times:n x =
    let stream (Sink k) =
      match n with
      | None ->
          let rec go r = if k.full r then r else go (k.push r x) in
          bracket go ~init:k.init ~stop:k.stop
      | Some n ->
          let rec go i r =
            if k.full r || i = n then r else go (succ i) (k.push r x)
          in
          bracket (go 0) ~init:k.init ~stop:k.stop
    in
    { stream }

  let flat_map fn t =
    let stream (Sink k) =
      let push r x =
        (fn x).stream (Sink { k with init= Fun.const r; stop= Fun.id })
      in
      t.stream (Sink { k with push })
    in
    { stream }

  let unfold s0 pull = from (Source.unfold s0 pull)
  let tap fn t = via (Flow.tap fn) t
  let filter_map fn t = via (Flow.filter_map fn) t
  let take n t = via (Flow.take n) t
end