package kaun-board

  1. Overview
  2. Docs
Training dashboard and logging for Raven

Install

dune-project
 Dependency

Authors

Maintainers

Sources

raven-1.0.0.alpha3.tbz
sha256=96d35ce03dfbebd2313657273e24c2e2d20f9e6c7825b8518b69bd1d6ed5870f
sha512=90c5053731d4108f37c19430e45456063e872b04b8a1bbad064c356e1b18e69222de8bfcf4ec14757e71f18164ec6e4630ba770dbcb1291665de5418827d1465

doc/src/kaun-board/run.ml.html

Source file run.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
(*---------------------------------------------------------------------------
  Copyright (c) 2026 The Raven authors. All rights reserved.
  SPDX-License-Identifier: ISC
  ---------------------------------------------------------------------------*)

(* JSON helpers *)

let json_obj pairs =
  Jsont.Json.object' (List.map (fun (k, v) -> (Jsont.Json.name k, v)) pairs)

let json_mem name = function
  | Jsont.Object (mems, _) -> (
      match Jsont.Json.find_mem name mems with
      | Some (_, v) -> v
      | None -> Jsont.Null ((), Jsont.Meta.none))
  | _ -> Jsont.Null ((), Jsont.Meta.none)

let json_of_string s =
  match Jsont_bytesrw.decode_string Jsont.json s with
  | Ok v -> v
  | Error e -> failwith e

let json_to_string j =
  match Jsont_bytesrw.encode_string ~format:Jsont.Minify Jsont.json j with
  | Ok s -> s
  | Error e -> failwith e

let json_to_string_pretty j =
  match Jsont_bytesrw.encode_string ~format:Jsont.Indent Jsont.json j with
  | Ok s -> s
  | Error e -> failwith e

let json_of_file path =
  let ic = open_in path in
  let s =
    Fun.protect
      ~finally:(fun () -> close_in ic)
      (fun () -> really_input_string ic (in_channel_length ic))
  in
  json_of_string s

(* Run type *)

type t = {
  run_id : string;
  created_at : float;
  experiment_name : string option;
  tags : string list;
  config : (string * Jsont.json) list;
  total_epochs : int option;
  dir : string;
}

let run_id t = t.run_id
let created_at t = t.created_at
let experiment_name t = t.experiment_name
let tags t = t.tags
let total_epochs t = t.total_epochs
let dir t = t.dir

(* Path helpers *)

let manifest_path dir = Filename.concat dir "run.json"
let events_path dir = Filename.concat dir "events.jsonl"

(* Directory creation *)

let ensure_dir dir =
  let sep = Filename.dir_sep.[0] in
  let parts = String.split_on_char sep dir in
  let start, parts =
    match parts with
    | "" :: rest -> (Filename.dir_sep, rest)
    | parts -> ("", parts)
  in
  let rec loop acc parts =
    match parts with
    | [] -> ()
    | "" :: rest -> loop acc rest
    | part :: rest ->
        let next = if acc = "" then part else acc ^ Filename.dir_sep ^ part in
        if not (Sys.file_exists next) then Unix.mkdir next 0o755;
        loop next rest
  in
  loop start parts

(* ID generation *)

let random_state = lazy (Random.State.make_self_init ())

let generate_id ?experiment () =
  let state = Lazy.force random_state in
  let now = Unix.gettimeofday () in
  let tm = Unix.localtime now in
  let date =
    Printf.sprintf "%04d-%02d-%02d_%02d-%02d-%02d" (tm.Unix.tm_year + 1900)
      (tm.Unix.tm_mon + 1) tm.Unix.tm_mday tm.Unix.tm_hour tm.Unix.tm_min
      tm.Unix.tm_sec
  in
  let suffix = Printf.sprintf "%04x" (Random.State.int state 0x10000) in
  let base = date ^ "_" ^ suffix in
  Option.fold ~none:base ~some:(Printf.sprintf "%s_%s" base) experiment

(* Manifest I/O *)

let load dir =
  let path = manifest_path dir in
  if not (Sys.file_exists path) then None
  else
    try
      let json = json_of_file path in
      let run_id =
        match json_mem "run_id" json with
        | Jsont.String (s, _) -> s
        | _ -> failwith "expected string for run_id"
      in
      let created_at =
        match json_mem "created_at" json with
        | Jsont.Number (f, _) -> f
        | _ -> 0.0
      in
      let experiment_name =
        match json_mem "experiment" json with
        | Jsont.String (s, _) -> Some s
        | _ -> None
      in
      let tags =
        match json_mem "tags" json with
        | Jsont.Array (l, _) ->
            List.filter_map
              (function Jsont.String (s, _) -> Some s | _ -> None)
              l
        | _ -> []
      in
      let config =
        match json_mem "config" json with
        | Jsont.Object (mems, _) -> List.map (fun ((k, _), v) -> (k, v)) mems
        | _ -> []
      in
      let total_epochs =
        match json_mem "total_epochs" json with
        | Jsont.Number (f, _) -> Some (int_of_float f)
        | _ -> None
      in
      Some
        { run_id; created_at; experiment_name; tags; config; total_epochs; dir }
    with _ -> None

let latest base_dir =
  if not (Sys.file_exists base_dir) then None
  else
    let entries = Sys.readdir base_dir in
    Array.sort (fun a b -> String.compare b a) entries;
    let rec find i =
      if i >= Array.length entries then None
      else
        let dir = Filename.concat base_dir entries.(i) in
        match load dir with Some run -> Some run | None -> find (i + 1)
    in
    find 0

let write_manifest t =
  let experiment_field =
    Option.map (fun e -> ("experiment", Jsont.Json.string e)) t.experiment_name
    |> Option.to_list
  in
  let config_field =
    match t.config with [] -> [] | pairs -> [ ("config", json_obj pairs) ]
  in
  let json =
    json_obj
      ([
         ("schema_version", Jsont.Json.int 1);
         ("run_id", Jsont.Json.string t.run_id);
         ("created_at", Jsont.Json.number t.created_at);
         ("tags", Jsont.Json.list (List.map Jsont.Json.string t.tags));
       ]
      @ experiment_field @ config_field)
  in
  let path = manifest_path t.dir in
  let oc = open_out path in
  Fun.protect
    ~finally:(fun () -> close_out oc)
    (fun () ->
      output_string oc (json_to_string_pretty json);
      output_char oc '\n')

let create ?base_dir ?experiment ?(tags = []) ?(config = []) () =
  let base = Option.value base_dir ~default:(Env.base_dir ()) in
  let run_id = generate_id ?experiment () in
  let dir = Filename.concat base run_id in
  ensure_dir dir;
  let t =
    {
      run_id;
      created_at = Unix.gettimeofday ();
      experiment_name = experiment;
      tags;
      config;
      total_epochs = None;
      dir;
    }
  in
  write_manifest t;
  t

(* Event writing *)

let append_event t event =
  let path = events_path t.dir in
  let oc = open_out_gen [ Open_append; Open_creat ] 0o644 path in
  Fun.protect
    ~finally:(fun () -> close_out oc)
    (fun () ->
      output_string oc (json_to_string (Event.to_json event));
      output_char oc '\n')

(* Incremental event reading *)

type file_id = int * int

type event_stream = {
  path : string;
  mutable position : int64;
  mutable last_mtime : float;
  mutable file_id : file_id option;
  mutable channel : in_channel option;
  mutable pending : string;
}

let open_events t =
  {
    path = events_path t.dir;
    position = 0L;
    last_mtime = 0.0;
    file_id = None;
    channel = None;
    pending = "";
  }

let last_mtime stream = stream.last_mtime

let close_events stream =
  Option.iter
    (fun ic ->
      stream.channel <- None;
      try close_in ic with _ -> ())
    stream.channel

let reset_stream stream =
  close_events stream;
  stream.position <- 0L;
  stream.last_mtime <- 0.0;
  stream.file_id <- None;
  stream.pending <- ""

let ensure_channel stream =
  match stream.channel with
  | Some ic -> ic
  | None ->
      let ic = open_in_bin stream.path in
      stream.channel <- Some ic;
      (try
         let st = Unix.fstat (Unix.descr_of_in_channel ic) in
         stream.file_id <- Some (st.Unix.st_dev, st.Unix.st_ino)
       with _ -> ());
      ic

(* JSONL chunk parsing *)

let is_whitespace c =
  match c with ' ' | '\t' | '\r' | '\n' -> true | _ -> false

let is_blank s =
  let len = String.length s in
  let rec loop i =
    if i >= len then true
    else if is_whitespace s.[i] then loop (i + 1)
    else false
  in
  loop 0

let parse_jsonl_chunk chunk =
  let len = String.length chunk in
  let rec scan i line_start acc =
    if i >= len then
      let pending =
        if line_start >= len then ""
        else String.sub chunk line_start (len - line_start)
      in
      (List.rev acc, pending)
    else if chunk.[i] = '\n' then
      let line_len = i - line_start in
      let line =
        if line_len <= 0 then ""
        else
          let effective_len =
            if line_len > 0 && chunk.[i - 1] = '\r' then line_len - 1
            else line_len
          in
          if effective_len <= 0 then ""
          else String.sub chunk line_start effective_len
      in
      let acc =
        if line = "" || is_blank line then acc
        else
          match json_of_string line |> Event.of_json with
          | Ok ev -> ev :: acc
          | Error _ -> acc
      in
      scan (i + 1) (i + 1) acc
    else scan (i + 1) line_start acc
  in
  scan 0 0 []

let read_events stream =
  if not (Sys.file_exists stream.path) then (
    reset_stream stream;
    [])
  else
    try
      let st = Unix.LargeFile.stat stream.path in
      let path_id : file_id =
        (st.Unix.LargeFile.st_dev, st.Unix.LargeFile.st_ino)
      in
      let file_size = st.Unix.LargeFile.st_size in
      let mtime = st.Unix.LargeFile.st_mtime in
      let rotated =
        match stream.file_id with
        | None -> false
        | Some (dev, ino) -> dev <> fst path_id || ino <> snd path_id
      in
      let truncated = stream.position > file_size in
      if rotated || truncated then (
        reset_stream stream;
        stream.file_id <- Some path_id);
      if stream.position >= file_size && mtime <= stream.last_mtime then []
      else
        let ic = ensure_channel stream in
        LargeFile.seek_in ic stream.position;
        let buf = Bytes.create 65536 in
        let b = Buffer.create 65536 in
        let rec read_loop total =
          match input ic buf 0 (Bytes.length buf) with
          | 0 -> total
          | n ->
              Buffer.add_subbytes b buf 0 n;
              read_loop (total + n)
        in
        let bytes_read = read_loop 0 in
        stream.last_mtime <- mtime;
        if bytes_read = 0 then []
        else (
          stream.position <- Int64.add stream.position (Int64.of_int bytes_read);
          let data = Buffer.contents b in
          let chunk =
            if stream.pending = "" then data else stream.pending ^ data
          in
          let events, pending = parse_jsonl_chunk chunk in
          stream.pending <- pending;
          events)
    with
    | Sys_error _ ->
        close_events stream;
        []
    | Unix.Unix_error _ ->
        close_events stream;
        []