package ocsipersist-dbm

  1. Overview
  2. Docs

Source file ocsipersist.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
(* FIX: the log file is never reopened *)

open Ocsidbmtypes
open Lwt.Infix

module type TABLE = Ocsipersist_lib.Sigs.TABLE

let section = Lwt_log.Section.make "ocsigen:ocsipersist:dbm"

exception Ocsipersist_error

let socketname = "socket"

module Config = Ocsipersist_settings

module Aux = struct
  external sys_exit : int -> 'a = "caml_sys_exit"
end

module Db = struct
  let try_connect sname =
    Lwt.catch
      (fun () ->
         let socket = Lwt_unix.socket Unix.PF_UNIX Unix.SOCK_STREAM 0 in
         Lwt_unix.connect socket (Unix.ADDR_UNIX sname) >>= fun () ->
         Lwt.return socket)
      (fun _ ->
         Lwt_log.ign_warning_f ~section
           "Launching a new Ocsidbm process: %s on directory %s."
           !Config.ocsidbm !Config.directory;
         let param = [|!Config.ocsidbm; !Config.directory|] in
         let child () =
           let log =
             Unix.openfile !Config.error_log_path
               [Unix.O_WRONLY; Unix.O_CREAT; Unix.O_APPEND]
               0o640
           in
           Unix.dup2 log Unix.stderr;
           Unix.close log;
           let devnull = Unix.openfile "/dev/null" [Unix.O_WRONLY] 0 in
           Unix.dup2 devnull Unix.stdout;
           Unix.close devnull;
           Unix.close Unix.stdin;
           Unix.execvp !Config.ocsidbm param
         in
         let pid = Lwt_unix.fork () in
         if pid = 0
         then
           if (* double fork *)
              Lwt_unix.fork () = 0
           then child ()
           else Aux.sys_exit 0
         else
           Lwt_unix.waitpid [] pid >>= fun _ ->
           Lwt_unix.sleep 1.1 >>= fun () ->
           let socket = Lwt_unix.socket Unix.PF_UNIX Unix.SOCK_STREAM 0 in
           Lwt_unix.connect socket (Unix.ADDR_UNIX sname) >>= fun () ->
           Lwt.return socket)

  let rec get_indescr i =
    Lwt.catch
      (fun () -> try_connect (!Config.directory ^ "/" ^ socketname))
      (fun e ->
         if i = 0
         then (
           Lwt_log.ign_error_f ~section
             "Cannot connect to Ocsidbm. Will continue without persistent session support. Error message is: %s .Have a look at the logs to see if there is an error message from the Ocsidbm process."
             (match e with
             | Unix.Unix_error (a, b, c) ->
                 Printf.sprintf "%a in %s(%s)"
                   (fun () -> Unix.error_message)
                   a b c
             | _ -> Printexc.to_string e);
           Lwt.fail e)
         else Lwt_unix.sleep 2.1 >>= fun () -> get_indescr (i - 1))

  let send =
    let previous = ref (Lwt.return Ok) in
    fun v ->
      Lwt.catch (fun () -> !previous) (fun _ -> Lwt.return Ok) >>= fun _ ->
      !Config.inch >>= fun inch ->
      !Config.outch >>= fun outch ->
      (previous :=
         Lwt_io.write_value outch v >>= fun () ->
         Lwt_io.flush outch >>= fun () -> Lwt_io.read_value inch);
      !previous

  let get (store, name) =
    send (Get (store, name)) >>= function
    | Value v -> Lwt.return v
    | Dbm_not_found -> Lwt.fail Not_found
    | Error e -> Lwt.fail e
    | _ -> Lwt.fail Ocsipersist_error

  let remove (store, name) =
    send (Remove (store, name)) >>= function
    | Ok -> Lwt.return ()
    | Error e -> Lwt.fail e
    | _ -> Lwt.fail Ocsipersist_error

  let replace (store, name) value =
    send (Replace (store, name, value)) >>= function
    | Ok -> Lwt.return ()
    | Error e -> Lwt.fail e
    | _ -> Lwt.fail Ocsipersist_error

  let replace_if_exists (store, name) value =
    send (Replace_if_exists (store, name, value)) >>= function
    | Ok -> Lwt.return ()
    | Dbm_not_found -> Lwt.fail Not_found
    | Error e -> Lwt.fail e
    | _ -> Lwt.fail Ocsipersist_error

  let firstkey store =
    send (Firstkey store) >>= function
    | Key k -> Lwt.return (Some k)
    | Error e -> Lwt.fail e
    | _ -> Lwt.return None

  let nextkey store =
    send (Nextkey store) >>= function
    | Key k -> Lwt.return (Some k)
    | Error e -> Lwt.fail e
    | _ -> Lwt.return None

  let length store =
    send (Length store) >>= function
    | Value v -> Lwt.return (Marshal.from_string v 0)
    | Dbm_not_found -> Lwt.return 0
    | Error e -> Lwt.fail e
    | _ -> Lwt.fail Ocsipersist_error
end

module Store = struct
  type store = string

  type 'a t = store * string
  (** Type of persistent data *)

  let open_store name = Lwt.return name

  let make_persistent_lazy_lwt ~store ~name ~default =
    let pvname = store, name in
    Lwt.catch
      (fun () -> Db.get pvname >>= fun _ -> Lwt.return ())
      (function
         | Not_found ->
             default () >>= fun def ->
             Db.replace pvname (Marshal.to_string def [])
         | e -> Lwt.fail e)
    >>= fun () -> Lwt.return pvname

  let make_persistent_lazy ~store ~name ~default =
    let default () = Lwt.wrap default in
    make_persistent_lazy_lwt ~store ~name ~default

  let make_persistent ~store ~name ~default =
    make_persistent_lazy ~store ~name ~default:(fun () -> default)

  let get (pvname : 'a t) : 'a =
    Db.get pvname >>= fun r -> Lwt.return (Marshal.from_string r 0)

  let set pvname v =
    let data = Marshal.to_string v [] in
    Db.replace pvname data
end

type store = Store.store
type 'a variable = 'a Store.t

module Functorial = struct
  type internal = string

  module type COLUMN = sig
    type t

    val column_type : string
    val encode : t -> string
    val decode : string -> t
  end

  module Table
      (T : sig
         val name : string
       end)
      (Key : COLUMN)
      (Value : COLUMN) :
    Ocsipersist_lib.Sigs.TABLE with type key = Key.t and type value = Value.t =
  struct
    type key = Key.t
    type value = Value.t

    let name = T.name
    let find key = Lwt.map Value.decode @@ Db.get (name, Key.encode key)
    let add key value = Db.replace (name, Key.encode key) (Value.encode value)

    let replace_if_exists key value =
      Db.replace_if_exists (name, Key.encode key) (Value.encode value)

    let remove key = Db.remove (name, Key.encode key)

    let fold ?count ?gt ?geq ?lt ?leq f beg =
      let i = ref 0L in
      let rec aux nextkey beg =
        match count with
        | Some c when !i >= c -> Lwt.return beg
        | _ -> (
            nextkey name >>= function
            | None -> Lwt.return beg
            | Some k -> (
                let k = Key.decode k in
                match gt, geq, lt, leq with
                | _, _, Some lt, _ when k >= lt -> Lwt.return beg
                | _, _, _, Some le when k > le -> Lwt.return beg
                | Some gt, _, _, _ when k <= gt -> aux Db.nextkey beg
                | _, Some ge, _, _ when k < ge -> aux Db.nextkey beg
                | _ ->
                    i := Int64.succ !i;
                    find k >>= fun r -> f k r beg >>= aux Db.nextkey))
      in
      aux Db.firstkey beg

    let iter ?count ?gt ?geq ?lt ?leq f =
      fold ?count ?gt ?geq ?lt ?leq (fun k v () -> f k v) ()

    let iter_batch ?count:_ ?gt:_ ?geq:_ ?lt:_ ?leq:_ _ =
      failwith "Ocsipersist.iter_batch not implemented for DBM"

    let iter_block ?count:_ ?gt:_ ?geq:_ ?lt:_ ?leq:_ _ =
      failwith
        "iter_block not implemented for DBM. Please use Ocsipersist with sqlite"

    let modify_opt key f =
      Lwt.catch
        (fun () -> find key >>= fun v -> Lwt.return_some v)
        (function Not_found -> Lwt.return_none | _ -> assert false)
      >>= fun old_value ->
      match f old_value with
      | None -> remove key
      | Some new_value -> replace_if_exists key new_value

    let length () =
      (* for DBM the result may be less than the actual lengeth *)
      Db.length name

    module Variable = Ocsipersist_lib.Variable (struct
        type k = key
        type v = value

        let find = find
        let add = add
      end)
  end

  module Column = struct
    module String : COLUMN with type t = string = struct
      type t = string

      let column_type = "_"
      let encode s = s
      let decode s = s
    end

    module Float : COLUMN with type t = float = struct
      type t = float

      let column_type = "_"
      let encode = string_of_float
      let decode = float_of_string
    end

    module Marshal (C : sig
        type t
      end) : COLUMN with type t = C.t = struct
      type t = C.t

      let column_type = "_"
      let encode v = Marshal.to_string v []
      let decode v = Marshal.from_string v 0
    end
  end
end

module Polymorphic = Ocsipersist_lib.Polymorphic (Functorial)
module Ref = Ocsipersist_lib.Ref (Store)

type 'value table = 'value Polymorphic.table

(* iterator: with a separate connexion:
   exception Exn1
   let iter_table f table =
   let first = Marshal.to_string (Firstkey table) [] in
   let firstl = String.length first in
   let next = Marshal.to_string (Nextkey table) [] in
   let nextl = String.length next in
   (Lwt_unix.socket Unix.PF_UNIX Unix.SOCK_STREAM 0 >>=
   (fun socket ->
     Lwt_unix.connect
       (Lwt_unix.Plain socket)
       (Unix.ADDR_UNIX (Config.directory^"/"^socketname)) >>=
     (fun () -> return (Lwt_unix.Plain socket)) >>=
     (fun indescr ->
       let inch = Lwt_unix.in_channel_of_descr indescr in
       let nextkey next nextl =
         Lwt_unix.write indescr next 0 nextl >>=
         (fun l2 -> if l2 <> nextl
         then Lwt.fail Ocsipersist_error
         else (Lwt_unix.input_line inch >>=
               fun answ -> return (Marshal.from_string answ 0)))
       in
       let rec aux n l =
         nextkey n l >>=
         (function
           | End -> return ()
           | Key k -> find table k >>= f k
           | Error e -> Lwt.fail e
           | _ -> Lwt.fail Ocsipersist_error) >>=
         (fun () -> aux next nextl)
       in
       catch
         (fun () ->
           aux first firstl >>=
           (fun () -> Unix.close socket; return ()))
         (fun e -> Unix.close socket; Lwt.fail e))))
*)

let init () =
  if !Ocsipersist_settings.delay_loading
  then
    Lwt_log.ign_warning ~section "Asynchronuous initialization (may fail later)"
  else Lwt_log.ign_warning ~section "Initializing ...";
  let indescr = Db.get_indescr 2 in
  if !Ocsipersist_settings.delay_loading
  then (
    Ocsipersist_settings.inch :=
      Lwt.map (Lwt_io.of_fd ~mode:Lwt_io.input) indescr;
    Ocsipersist_settings.outch :=
      Lwt.map (Lwt_io.of_fd ~mode:Lwt_io.output) indescr)
  else
    let r = Lwt_main.run indescr in
    Ocsipersist_settings.inch := Lwt.return (Lwt_io.of_fd ~mode:Lwt_io.input r);
    Ocsipersist_settings.outch :=
      Lwt.return (Lwt_io.of_fd ~mode:Lwt_io.output r);
    Lwt_log.ign_warning ~section "...Initialization complete"