package MlFront_Exec

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file BuildTaskDistribution.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
open BuildCore

let parallel_fetch keys fetch =
  let open BuildInstance.Syntax in
  let open Alacarte_3_2_apparatus in
  let (vx_list : V.t cont list) = List.map (fetch O.distribution) keys in
  parallel vx_list

let get_files_from_valuestore ~valuestore ~value_ids =
  let ( let* ), return =
    BuildCore.Alacarte_xpromise_apparatus.Promise.(bind, return)
  in
  let rec aux acc = function
    | [] -> return (Ok (List.rev acc))
    | value_id :: rest -> (
        let* value_fp_opt =
          BuildInstance.ValueStore.get_value_file ~valuestore ~value_id ()
        in
        match value_fp_opt with
        | None ->
            return
              (Error
                 (Printf.sprintf
                    "Value file not found for `%s`. It may have been deleted \
                     from the valuestore."
                    value_id))
        | Some value_fp -> aux (value_fp :: acc) rest)
  in
  aux [] value_ids

let get_asset_value_ids ~error_locations ~what values =
  let open BuildInstance.Syntax in
  let open Alacarte_3_2_apparatus in
  List.fold_left
    (fun acc v ->
      let* acc = acc in
      match acc with
      | Error e -> return (Error e)
      | Ok ids -> begin
          match v with
          | (V.Failure_is_pending | V.Input_not_found _) as v ->
              return (Error v)
          | V.ValuesFile _ | V.Values _ | V.Distribution _ | V.Form _
          | V.Bundle _ | V.Object _ | V.Constant _ ->
              let* () =
                fail ~error_code:"8719a1de"
                  ~cant_do:(Printf.sprintf "import distribution %s" what)
                  ~because:
                    (Printf.sprintf "expected %s is an asset value but got %s"
                       what (V.show v))
                  ~error_locations ()
              in
              return (Error V.Failure_is_pending)
          | V.Asset { value_id; value_sha256 = _; value = _ } ->
              return (Ok (value_id :: ids))
        end)
    (return (Ok [])) values

let get_asset_filepaths ~valuestore ~error_locations ~what values =
  let open BuildInstance.Syntax in
  let open Alacarte_3_2_apparatus in
  let* value_ids_result = get_asset_value_ids ~what ~error_locations values in
  match value_ids_result with
  | Error v_error -> return (Error v_error)
  | Ok value_ids -> (
      (* Get the assets *)
      let* filepath_result =
        lift_promise @@ get_files_from_valuestore ~valuestore ~value_ids
      in
      match filepath_result with
      | Error e ->
          let* () =
            fail ~error_code:"99416a41"
              ~cant_do:(Printf.sprintf "import distribution %s" what)
              ~because:e ~error_locations ()
          in
          return (Error V.Failure_is_pending)
      | Ok filepaths -> return (Ok filepaths))

let rec make_task ~valuestore ~importtrace ~importtrace2 ~build_pubkey
    ~build_seckey ~values_file ~package_id ~package_semver ~apply_aliases
    distribution distribution_json fetch =
  let open BuildInstance.Syntax in
  let open Alacarte_3_2_apparatus in
  (* Depend on MlFront_Std.Version *)
  let* _v_version = fetch O.not_relevant K.reserved_version_key in
  (* Depend on the asset *)
  let* distresult =
    match (distribution : MlFront_Thunk.ThunkDist.t) with
    | {
     id = idrange, _, _;
     producer = _;
     license = _;
     continuations = _;
     build =
       {
         build_attestation = _;
         build_to_sign =
           {
             build_bundle_id = _range, module_id, module_semver;
             build_modules = _;
             build_producer_accepts = _;
             build_bundle_canonical = _;
             build_traces = first_trace, rest_traces;
             build_values;
           };
       };
    } -> (
        let* error_locations =
          BuildTaskForm.range_into_problem_location ~source:values_file
            (MlFront_Thunk.ThunkLexers.Ranges.display_range idrange)
        in

        let mk_k_asset asset_path =
          K.create_for_asset ~apply_aliases ~debug_reference:None ~module_id
            ~module_semver ~asset_path ()
        in
        (* TRACE STORES: ignore the compatibility tag; do them all which is the
           only guarantee today that we get all the traces for all the slots.
           TODO: If the `distribute` command exercised all of the slots, then
           we may not have that problem ... but then `distribute` on N machines
           would exercise N*N slots. *)
        let k_tracestore_assets =
          List.map
            (fun ({ trace_tag = _; trace_path = _range, trace_path } :
                   MlFront_Thunk.ThunkDist.build_trace) ->
              mk_k_asset trace_path)
            (first_trace :: rest_traces)
        in
        let* v_tracestores = parallel_fetch k_tracestore_assets fetch in
        (* Get error or get the value stores *)
        let* tracestore_fps_result =
          get_asset_filepaths ~valuestore ~what:"trace store" ~error_locations
            v_tracestores
        in
        match tracestore_fps_result with
        | Error v_error -> return (Error v_error)
        | Ok tracestore_fps -> (
            (* VALUE STORES: We need all of them (all the slots). *)
            let k_valuestore_assets =
              List.map
                (fun ({ value_path = _range, path } :
                       MlFront_Thunk.ThunkDist.build_value) -> mk_k_asset path)
                build_values
            in
            let* v_valuestores = parallel_fetch k_valuestore_assets fetch in
            (* Get error or get the value stores *)
            let* valuestore_fps_result =
              get_asset_filepaths ~valuestore ~what:"value store"
                ~error_locations v_valuestores
            in
            match valuestore_fps_result with
            | Error v_error -> return (Error v_error)
            | Ok valuestore_fps -> (
                (* Import the distribution *)
                let* state = get in
                let* v =
                  import_distribution ~valuestore ~importtrace ~importtrace2
                    ~build_pubkey ~build_seckey ~tracestore_fps ~valuestore_fps
                    ~package_id state
                in
                match v with
                | `AlreadyFailed -> return (Error V.Failure_is_pending)
                | `Success ->
                    (* Done *)
                    return
                      (Ok
                         (V.Distribution
                            {
                              distribution_id = (package_id, package_semver);
                              distribution_json;
                              distribution;
                            })))))
  in
  (* Done *)
  match distresult with
  | Error v_error -> return v_error
  | Ok v_ok -> return v_ok

and import_distribution ~valuestore ~importtrace ~importtrace2 ~build_pubkey
    ~build_seckey ~tracestore_fps ~valuestore_fps ~package_id state =
  let open BuildInstance.Syntax in
  let open Alacarte_3_2_apparatus in
  let valuestore_put key value_id =
    if importtrace2 then
      Printf.eprintf "[import2] putting %s value %s into valuestore\n%!"
        (K.show key) value_id;
    BuildInstance.ValueStore.put_value_file ~valuestore ~value_id
  in
  let import_from_tracestore tracestore_fp =
    let tracefd =
      Unix.openfile
        (MlFront_Core.FilePath.to_string tracestore_fp)
        [ Unix.O_RDONLY ] 0
    in
    Fun.protect ~finally:(fun () -> Unix.close tracefd) @@ fun () ->
    BuildTraceStore.visit_traces_in_file ~build_pubkey ~build_seckey
      ~valuestore_get:
        (distributed_valuestore_get ~valuestore ~valuestore_fps ~importtrace
           ~importtrace2 ~package_id)
      ~valuestore_put tracefd
      (visit_trace ~importtrace ~package_id state);
    return `Success
  in
  (* Import all trace stores *)
  let rec aux = function
    | [] -> return `Success
    | tracestore_fp :: rest -> (
        let* result = import_from_tracestore tracestore_fp in
        match result with
        | `Success -> aux rest)
  in
  aux tracestore_fps

and is_key_part_of_distribution ~package_id key =
  let open BuildCore.Alacarte_3_2_apparatus in
  match (key : K.t) with
  | {
   key_datum = K.ModuleKey { module_kind = _; module_id; module_semver = _ };
   debug_reference = _;
  } ->
      if
        MlFront_Core.PackageId.is_strict_subpackage ~parent:package_id
          ~child:(MlFront_Core.StandardModuleId.cast_as_package_id module_id)
      then `Yes
      else `No
  | {
   key_datum =
     K.PackageKey { package_kind = _; package_id = child; package_semver = _ };
   debug_reference = _;
  } ->
      if MlFront_Core.PackageId.is_strict_subpackage ~parent:package_id ~child
      then `Yes
      else `No
  | { key_datum = K.ChecksumKey _; debug_reference = _ } -> `Maybe

and visit_trace ~importtrace ~package_id state trace_size key dependencies value
    =
  if is_key_part_of_distribution ~package_id key = `No then begin
    if importtrace then
      Printf.eprintf "[import] skipping trace outside distribution %s\n%!"
        (Alacarte_3_2_apparatus.K.show key)
  end
  else begin
    if importtrace then
      Printf.eprintf
        "[import] adding trace of %s (%d bytes) with %d deps and value %s\n%!"
        (Alacarte_3_2_apparatus.K.show key)
        trace_size (List.length dependencies)
        (Alacarte_3_2_apparatus.V.show value);
    let (_state' : Alacarte_6_4_test.StateSuspending.state) =
      Alacarte_6_4_test.StateSuspending.post_record_trace state key dependencies
        value
    in
    ()
  end

(** [distributed_valuestore_get ~valuestore ~valuestore_fps key value_id] copies
    the value with id [value_id] from one of the distributed value stores in
    [valuestore_fps] into the local valuestore [valuestore] if it doesn't
    already exist.

    The following key kinds are {b never} distributed:
    - [Constant] values.
      {!Assumptions.imports_from_distributions_skip_constant_values} *)
and distributed_valuestore_get ~valuestore ~valuestore_fps ~importtrace:_
    ~importtrace2 ~package_id =
  (* memoize the value stores. the map key is value_id *)
  let module ValueIdMap = Map.Make (String) in
  let open struct
    type entry = { srczip : string; zip_entry : string }
  end in
  let distributed_valueid_map =
    List.fold_left
      (fun acc fp ->
        let srczip = MlFront_Core.FilePath.to_string fp in
        MlFront_ZipFile.ZipFile.zip_fold_entries ~srczip
          (fun acc name is_dir _size ->
            let value_id_candidate =
              if String.starts_with ~prefix:"./" name then
                String.sub name 2 (String.length name - 2)
              else name
            in
            if (not is_dir) && BuildPaths.is_value_id value_id_candidate then begin
              if importtrace2 then
                Printf.eprintf "[import2] indexing value store %s value %s\n%!"
                  (MlFront_Core.FilePath.to_string fp)
                  value_id_candidate;
              ValueIdMap.add value_id_candidate { srczip; zip_entry = name } acc
            end
            else acc)
          acc)
      ValueIdMap.empty valuestore_fps
  in
  (* serve the key + value_id *)
  fun key value_id ->
    let ( let* ), return =
      BuildCore.Alacarte_xpromise_apparatus.Promise.(bind, return)
    in
    let local_valuestore_get () =
      BuildInstance.ValueStore.get_value_file ~valuestore ~value_id ()
    in
    (* Skip constant values and keys not part of the distribution *)
    if String.starts_with ~prefix:BuildPaths.prefix_constant value_id then begin
      (* constant value *)
      Assumptions.imports_from_distributions_skip_constant_values ();
      if importtrace2 then
        Printf.eprintf "[import2] skipping constant value %s\n"
          (BuildCore.Alacarte_3_2_apparatus.K.show key);
      local_valuestore_get ()
    end
    else if is_key_part_of_distribution ~package_id key = `No then begin
      (* not part of the distribution.
         optimization: remove it from ShellDistribute.create_valuestore_zip *)
      if importtrace2 then
        Printf.eprintf "[import2] [perf warn] skipping non-distribution %s\n"
          (BuildCore.Alacarte_3_2_apparatus.K.show key);
      local_valuestore_get ()
    end
    else
      (* Non-constant values *)
      let* value_fp_opt =
        BuildInstance.ValueStore.get_value_file ~valuestore ~value_id ()
      in
      match value_fp_opt with
      | None -> begin
          (* Not part of the local value store. Search in distribution. *)
          match ValueIdMap.find_opt value_id distributed_valueid_map with
          | None ->
              (* Not in distribution. *)
              return None
          | Some { srczip; zip_entry } -> (
              (* In distribution. *)
              if importtrace2 then
                Printf.eprintf "[import2] extracting %s for %s\n%!" value_id
                  (BuildCore.Alacarte_3_2_apparatus.K.show key);
              (* Find where we need to place the zip extracted file *)
              let* destfile =
                BuildInstance.ValueStore.prepare_value_for_upload ~valuestore
                  ~value_id ()
              in
              let destfile_s = MlFront_Core.FilePath.to_string destfile in
              (* Do extraction + SHA256 *)
              try
                MlFront_ZipFile.ZipFile.extract_file_exn ~srczip
                  ~destfile:destfile_s zip_entry;
                let sha256_result =
                  MlFront_Thunk_IoDisk.ThunkIoDisk.checksum_local_file
                    ~algo:`Sha256 ~return:Fun.id destfile_s
                in
                match sha256_result with
                | `Error _ ->
                    ignore
                      (MlFront_Thunk_IoDisk.ThunkIoDisk.delete_local_file
                         ~return:Fun.id destfile_s);
                    return None
                | `Checksum (value_sha256, _value_sz) ->
                    let* () =
                      BuildInstance.ValueStore.upload_value ~valuestore
                        ~value_id ~value_sha256 destfile
                    in
                    return (Some destfile)
              with MlFront_ZipFile.ZipFile.ZipError _ ->
                ignore
                  (MlFront_Thunk_IoDisk.ThunkIoDisk.delete_local_file
                     ~return:Fun.id destfile_s);
                return None)
        end
      | Some value_fp ->
          if importtrace2 then
            Printf.eprintf "[import2] skipping existing %s for %s\n%!" value_id
              (BuildCore.Alacarte_3_2_apparatus.K.show key);
          return (Some value_fp)