package async_kernel

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file deferred_map.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
open Core
open Deferred_std
module Deferred = Deferred1
module List = Deferred_list
module Throttled_map = Map.Make_applicative_traversals (Throttled)

type ('a, 'b, 'c) t = ('a, 'b, 'c) Map.t

let change t k ~f =
  let%map opt = f (Map.find t k) in
  Map.change t k ~f:(fun _ -> opt)
;;

let update t k ~f =
  let%map data = f (Map.find t k) in
  Map.set t ~key:k ~data
;;

let iter_keys ~how t ~f = List.iter ~how (Map.keys t) ~f
let iter ~how t ~f = List.iter ~how (Map.data t) ~f
let iteri ~how t ~f = List.iter ~how (Map.to_alist t) ~f:(fun (key, data) -> f ~key ~data)

let fold t ~init ~f =
  let alist_in_increasing_key_order =
    Map.fold_right t ~init:[] ~f:(fun ~key ~data alist -> (key, data) :: alist)
  in
  List.fold alist_in_increasing_key_order ~init ~f:(fun ac (key, data) -> f ~key ~data ac)
;;

let fold_right t ~init ~f =
  let alist_in_decreasing_key_order =
    Map.fold t ~init:[] ~f:(fun ~key ~data alist -> (key, data) :: alist)
  in
  List.fold alist_in_decreasing_key_order ~init ~f:(fun ac (key, data) -> f ~key ~data ac)
;;

module Job = struct
  type ('a, 'b, 'c) t =
    { key : 'a
    ; data : 'b
    ; mutable result : 'c option
    }
  [@@deriving fields ~getters]
end

let filter_mapi_sequential t ~f =
  let comparator = Map.comparator t in
  let sequence = Map.to_sequence ~order:`Increasing_key t in
  Deferred.create (fun ivar ->
    Sequence.delayed_fold
      sequence
      ~init:Base.Map.Using_comparator.Tree.Build_increasing.empty
      ~f:(fun s (key, data) ~k ->
        upon (f ~key ~data) (function
          | None -> k s
          | Some data ->
            let s =
              Base.Map.Using_comparator.Tree.Build_increasing.add_exn
                s
                ~comparator
                ~key
                ~data
            in
            k s))
      ~finish:(fun x ->
        Ivar.fill_exn
          ivar
          (Map.Using_comparator.of_tree
             ~comparator
             (Base.Map.Using_comparator.Tree.Build_increasing.to_tree x))))
;;

let filter_mapi_max_concurrent t ~f ~max_concurrent_jobs =
  let computation =
    Throttled_map.filter_mapi t ~f:(fun ~key ~data ->
      Throttled.job (fun () -> f ~key ~data))
  in
  Throttled.run computation ~max_concurrent_jobs
;;

let filter_mapi ~how t ~f =
  match how with
  | `Sequential -> filter_mapi_sequential t ~f
  | `Max_concurrent_jobs max_concurrent_jobs ->
    filter_mapi_max_concurrent t ~f ~max_concurrent_jobs
  | `Parallel ->
    let jobs = ref [] in
    let job_map =
      Map.mapi t ~f:(fun ~key ~data ->
        let job = { Job.key; data; result = None } in
        jobs := job :: !jobs;
        job)
    in
    let%map () =
      List.iter ~how (Base.List.rev !jobs) ~f:(function
        | { Job.key; data; result = _ } as job ->
        let%map x = f ~key ~data in
        job.result <- x)
    in
    Map.filter_map job_map ~f:Job.result
;;

let filter_map ~how t ~f = filter_mapi ~how t ~f:(fun ~key:_ ~data -> f data)

let filter_keys ~how t ~f =
  filter_mapi ~how t ~f:(fun ~key ~data ->
    let%map b = f key in
    if b then Some data else None)
;;

let filter ~how t ~f =
  filter_mapi ~how t ~f:(fun ~key:_ ~data ->
    let%map b = f data in
    if b then Some data else None)
;;

let filteri ~how t ~f =
  filter_mapi ~how t ~f:(fun ~key ~data ->
    let%map b = f ~key ~data in
    if b then Some data else None)
;;

let mapi_max_concurrent t ~f ~max_concurrent_jobs =
  let computation =
    Throttled_map.mapi t ~f:(fun ~key ~data -> Throttled.job (fun () -> f ~key ~data))
  in
  Throttled.run computation ~max_concurrent_jobs
;;

let mapi ~how t ~f =
  match how with
  | `Sequential | `Parallel ->
    filter_mapi ~how t ~f:(fun ~key ~data ->
      let%map z = f ~key ~data in
      Some z)
  | `Max_concurrent_jobs max_concurrent_jobs ->
    mapi_max_concurrent t ~f ~max_concurrent_jobs
;;

let map ~how t ~f = mapi ~how t ~f:(fun ~key:_ ~data -> f data)

let merge ~how t1 t2 ~f =
  filter_map
    ~how
    (Map.merge t1 t2 ~f:(fun ~key z -> Some (fun () -> f ~key z)))
    ~f:(fun thunk -> thunk ())
;;

let all t = map t ~f:Fn.id ~how:`Sequential