Source file client.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
open Import
open Irmin_server
open Lwt.Syntax
open Lwt.Infix
include Client_intf
exception Continue
module Conf = struct
include Irmin.Backend.Conf
let spec = Irmin.Backend.Conf.Spec.v "irmin-client"
let uri = Irmin.Type.(map string) Uri.of_string Uri.to_string
let uri =
Irmin.Backend.Conf.key ~spec "uri" uri
(Uri.of_string "tcp://127.0.0.1:9181")
let tls = Irmin.Backend.Conf.key ~spec "tls" Irmin.Type.bool false
let hostname =
Irmin.Backend.Conf.key ~spec "hostname" Irmin.Type.string "127.0.0.1"
end
let config ?(tls = false) ?hostname uri =
let default_host = Uri.host_with_default ~default:"127.0.0.1" uri in
let config =
Irmin.Backend.Conf.add (Irmin.Backend.Conf.empty Conf.spec) Conf.uri uri
in
let config =
Irmin.Backend.Conf.add config Conf.hostname
(Option.value ~default:default_host hostname)
in
Irmin.Backend.Conf.add config Conf.tls tls
module Client (I : IO) (Codec : Conn.Codec.S) (Store : Irmin.Generic_key.S) =
struct
module C = Command.Make (I) (Codec) (Store)
open C
module IO = I
type t = {
ctx : IO.ctx;
config : Conf.t;
mutable conn : Conn.t;
mutable closed : bool;
lock : Lwt_mutex.t;
}
let close t =
t.closed <- true;
IO.close (t.conn.ic, t.conn.oc)
let mk_client conf =
let uri = Conf.get conf Conf.uri in
let hostname = Conf.get conf Conf.hostname in
let tls = Conf.get conf Conf.tls in
let scheme = Uri.scheme uri |> Option.value ~default:"tcp" in
let addr = Uri.host_with_default ~default:"127.0.0.1" uri in
let client =
match String.lowercase_ascii scheme with
| "unix" -> `Unix_domain_socket (`File (Uri.path uri))
| "tcp" ->
let port = Uri.port uri |> Option.value ~default:9181 in
let ip = Ipaddr.of_string_exn addr in
if not tls then `TCP (`IP ip, `Port port)
else `TLS (`Hostname hostname, `IP ip, `Port port)
| "ws" | "wss" -> (
let port = Uri.port uri |> Option.value ~default:9181 in
match Ipaddr.of_string addr with
| Ok ip ->
if not tls then `Ws (Some (`IP ip, `Port port), Uri.to_string uri)
else `TLS (`Hostname hostname, `IP ip, `Port port)
| _ -> `Ws (None, Uri.to_string uri))
| x -> invalid_arg ("Unknown client scheme: " ^ x)
in
client
let lock t f = Lwt_mutex.with_lock t.lock f [@@inline]
let send_command_header t (module Cmd : C.CMD) =
let = Conn.Request.v_header ~command:Cmd.name in
Conn.Request.write_header t.conn header
let recv (t : t) name ty =
let* res = Conn.Response.read_header t.conn in
Conn.Response.get_error t.conn res >>= function
| Some err ->
[%log.err "Request error: command=%s, error=%s" name err];
Lwt.return_error (`Msg err)
| None ->
let+ x = Conn.read t.conn ty in
[%log.debug "Completed request: command=%s" name];
x
let request (t : t) (type x y)
(module Cmd : C.CMD with type res = x and type req = y) (a : y) =
if t.closed then raise Irmin.Closed
else
let name = Cmd.name in
[%log.debug "Starting request: command=%s" name];
lock t (fun () ->
let* () = send_command_header t (module Cmd) in
let* () = Conn.write t.conn Cmd.req_t a in
let* () = IO.flush t.conn.oc in
recv t name Cmd.res_t)
let recv_branch_diff (t : t) =
let* _status = Conn.Response.read_header t.conn in
Conn.read t.conn
(Irmin.Type.pair Store.Branch.t (Irmin.Diff.t Store.commit_key_t))
>|= Error.unwrap "recv_branch_diff"
let recv_branch_key_diff (t : t) =
let* _status = Conn.Response.read_header t.conn in
Conn.read t.conn (Irmin.Diff.t Store.commit_key_t)
>|= Error.unwrap "recv_branch_key_diff"
end
module Make (IO : IO) (Codec : Conn.Codec.S) (Store : Irmin.Generic_key.S) =
struct
module Client = Client (IO) (Codec) (Store)
module Command = Command.Make (IO) (Codec) (Store)
module Conn = Command.Conn
module Commands = Command.Commands
let request = Client.request
let rec connect ?ctx config =
let ctx = Option.value ~default:(Lazy.force IO.default_ctx) ctx in
let client = Client.mk_client config in
let* ic, oc = IO.connect ~ctx client in
let conn = Conn.v ic oc in
let+ ok = Conn.Handshake.V1.send (module Store) conn in
if not ok then Error.raise_error "invalid handshake"
else
let t =
Client.{ config; ctx; conn; closed = false; lock = Lwt_mutex.create () }
in
t
and reconnect t =
let* () = Lwt.catch (fun () -> Client.close t) (fun _ -> Lwt.return_unit) in
let+ conn = connect ~ctx:t.ctx t.Client.config in
t.conn <- conn.conn;
t.closed <- false
let dup client =
let* c = connect ~ctx:client.Client.ctx client.Client.config in
let () = if client.closed then c.closed <- true in
Lwt.return c
let uri t = Conf.get t.Client.config Conf.uri
module X = struct
open Lwt.Infix
module Schema = Store.Schema
module Hash = Store.Hash
module Contents = struct
type nonrec 'a t = Client.t
open Commands.Contents
module Key = Store.Backend.Contents.Key
module Val = Store.Backend.Contents.Val
module Hash = Store.Backend.Contents.Hash
type key = Key.t
type value = Val.t
type hash = Hash.t
let mem t key = request t (module Mem) key >|= Error.unwrap "Contents.mem"
let find t key =
request t (module Find) key >|= Error.unwrap "Contents.find"
let add t value =
request t (module Add) value >|= Error.unwrap "Contents.add"
let unsafe_add t key value =
request t (module Unsafe_add) (key, value)
>|= Error.unwrap "Contents.unsafe_add"
let index t hash =
request t (module Index) hash >|= Error.unwrap "Contents.index"
let batch t f = f t
let close t = Client.close t
let merge t =
let f ~old a b =
let* old = old () in
match old with
| Ok old ->
request t (module Merge) (old, a, b)
>|= Error.unwrap "Contents.merge"
| Error e -> Lwt.return_error e
in
Irmin.Merge.v Irmin.Type.(option Key.t) f
end
module Node = struct
type nonrec 'a t = Client.t
open Commands.Node
module Key = Store.Backend.Node.Key
module Val = Store.Backend.Node.Val
module Hash = Store.Backend.Node.Hash
module Path = Store.Backend.Node.Path
module Metadata = Store.Backend.Node.Metadata
module Contents = Store.Backend.Node.Contents
type key = Key.t
type value = Val.t
type hash = Hash.t
let mem t key = request t (module Mem) key >|= Error.unwrap "Node.mem"
let find t key = request t (module Find) key >|= Error.unwrap "Node.find"
let add t value = request t (module Add) value >|= Error.unwrap "Node.add"
let unsafe_add t key value =
request t (module Unsafe_add) (key, value)
>|= Error.unwrap "Node.unsafe_add"
let index t hash =
request t (module Index) hash >|= Error.unwrap "Node.index"
let batch t f = f t
let close t = Client.close t
let merge t =
let f ~old a b =
let* old = old () in
match old with
| Ok old ->
request t (module Merge) (old, a, b) >|= Error.unwrap "Node.merge"
| Error e -> Lwt.return_error e
in
Irmin.Merge.v Irmin.Type.(option Key.t) f
end
module Node_portable = Store.Backend.Node_portable
module Commit = struct
type nonrec 'a t = Client.t
open Commands.Commit
module Key = Store.Backend.Commit.Key
module Val = Store.Backend.Commit.Val
module Hash = Store.Backend.Commit.Hash
module Info = Store.Backend.Commit.Info
module Node = Node
type key = Key.t
type value = Val.t
type hash = Hash.t
let mem t key = request t (module Mem) key >|= Error.unwrap "Commit.mem"
let find t key =
request t (module Find) key >|= Error.unwrap "Commit.find"
let add t value =
request t (module Add) value >|= Error.unwrap "Commit.add"
let unsafe_add t key value =
request t (module Unsafe_add) (key, value)
>|= Error.unwrap "Commit.unsafe_add"
let index t hash =
request t (module Index) hash >|= Error.unwrap "Commit.index"
let batch t f = f t
let close t = Client.close t
let merge t ~info =
let f ~old a b =
let* old = old () in
match old with
| Ok old ->
request t (module Merge) (info (), (old, a, b))
>|= Error.unwrap "Node.merge"
| Error e -> Lwt.return_error e
in
Irmin.Merge.v Irmin.Type.(option Key.t) f
end
module Commit_portable = Store.Backend.Commit_portable
module Branch = struct
type nonrec t = Client.t
open Commands.Branch
module Key = Store.Backend.Branch.Key
module Val = Store.Backend.Branch.Val
type key = Key.t
type value = Val.t
let mem t key = request t (module Mem) key >|= Error.unwrap "Branch.mem"
let find t key =
request t (module Find) key >|= Error.unwrap "Branch.find"
let set t key value =
request t (module Set) (key, value) >|= Error.unwrap "Branch.set"
let test_and_set t key ~test ~set =
request t (module Test_and_set) (key, test, set)
>|= Error.unwrap "Branch.test_and_set"
let remove t key =
request t (module Remove) key >|= Error.unwrap "Branch.remove"
let list t = request t (module List) () >|= Error.unwrap "Branch.list"
type watch = t
let watch t ?init f =
let* t = dup t in
let* () =
request t (module Watch) init >|= Error.unwrap "Branch.watch"
in
let rec loop () =
if t.closed || Conn.is_closed t.conn then Lwt.return_unit
else
Lwt.catch
(fun () ->
Lwt.catch
(fun () -> Client.recv_branch_diff t)
(fun _ -> raise Continue)
>>= fun (key, diff) -> f key diff >>= loop)
(function _ -> loop ())
in
Lwt.async loop;
Lwt.return t
let watch_key t key ?init f =
let* t = dup t in
let* () =
request t (module Watch_key) (init, key)
>|= Error.unwrap "Branch.watch_key"
in
let rec loop () =
if t.closed || Conn.is_closed t.conn then Lwt.return_unit
else
Lwt.catch
(fun () ->
Lwt.catch
(fun () -> Client.recv_branch_key_diff t)
(fun _ -> raise Continue)
>>= f
>>= loop)
(function _ -> loop ())
in
Lwt.async loop;
Lwt.return t
let unwatch _t watch =
let* () = Conn.write watch.Client.conn Unwatch.req_t () in
Client.close watch
let clear t = request t (module Clear) () >|= Error.unwrap "Branch.clear"
let close t = Client.close t
end
module Slice = Store.Backend.Slice
module Repo = struct
type nonrec t = Client.t
let v config = connect config
let config (t : t) = t.Client.config
let close (t : t) = Client.close t
let contents_t (t : t) = t
let node_t (t : t) = t
let commit_t (t : t) = t
let branch_t (t : t) = t
let batch (t : t) f = f t t t
end
module Remote = Irmin.Backend.Remote.None (Commit.Key) (Store.Branch)
end
include Irmin.Of_backend (X)
let ping t = request t (module Commands.Ping) ()
let export ?depth t =
request t (module Commands.Export) depth >|= Error.unwrap "export"
let import t slice =
request t (module Commands.Import) slice >|= Error.unwrap "import"
let close t = Client.close t
let connect ?tls ?hostname uri =
let conf = config ?tls ?hostname uri in
Repo.v conf
let request_store store =
match status store with
| `Empty -> `Empty
| `Branch b -> `Branch b
| `Commit c -> `Commit (Commit.key c)
module Batch = struct
module Request_tree = Command.Tree
type store = t
type t =
(Store.path
* [ `Contents of
[ `Hash of Store.Hash.t | `Value of Store.contents ]
* Store.metadata option
| `Tree of Request_tree.t
| `Remove ])
list
[@@deriving irmin]
let v () = []
let remove k t = (k, `Remove) :: t
let add_value path ?metadata value t =
(path, `Contents (`Value value, metadata)) :: t
let add_hash path ?metadata hash t =
(path, `Contents (`Hash hash, metadata)) :: t
let add_tree path tree t =
let+ tree =
match Tree.key tree with
| None ->
let+ concrete_tree = Tree.to_concrete tree in
Request_tree.Concrete concrete_tree
| Some key -> Request_tree.Key key |> Lwt.return
in
(path, `Tree tree) :: t
let apply ~info ?(path = Store.Path.empty) store t =
let repo = repo store in
let store = request_store store in
request repo (module Commands.Batch.Apply) ((store, path), info (), t)
>|= Error.unwrap "Batch.apply"
end
module Commit = struct
include Commit
module Cache = struct
module Key = Irmin.Backend.Lru.Make (struct
type t = commit_key
let hash = Hashtbl.hash
let equal = Irmin.Type.(unstage (equal commit_key_t))
end)
module Hash = Irmin.Backend.Lru.Make (struct
type t = hash
let hash = Hashtbl.hash
let equal = Irmin.Type.(unstage (equal hash_t))
end)
let key : commit Key.t = Key.create 32
let hash : commit Hash.t = Hash.create 32
end
let of_key repo key =
if Cache.Key.mem Cache.key key then
Lwt.return_some (Cache.Key.find Cache.key key)
else
let+ x = of_key repo key in
Option.iter (Cache.Key.add Cache.key key) x;
x
let of_hash repo hash =
if Cache.Hash.mem Cache.hash hash then
Lwt.return_some (Cache.Hash.find Cache.hash hash)
else
let+ x = of_hash repo hash in
Option.iter (Cache.Hash.add Cache.hash hash) x;
x
end
module Contents = struct
include Contents
module Cache = struct
module Hash = Irmin.Backend.Lru.Make (struct
type t = hash
let hash = Hashtbl.hash
let equal = Irmin.Type.(unstage (equal hash_t))
end)
let hash : contents Hash.t = Hash.create 32
end
let of_hash repo hash =
if Cache.Hash.mem Cache.hash hash then
Lwt.return_some (Cache.Hash.find Cache.hash hash)
else
let+ x = of_hash repo hash in
Option.iter (Cache.Hash.add Cache.hash hash) x;
x
end
let clone ~src ~dst =
let repo = repo src in
let* () =
Head.find src >>= function
| None -> Branch.remove repo dst
| Some h -> Branch.set repo dst h
in
of_branch repo dst
let request_store store =
match status store with
| `Empty -> `Empty
| `Branch b -> `Branch b
| `Commit c -> `Commit (Commit.key c)
let mem store path =
let repo = repo store in
request repo (module Commands.Store.Mem) (request_store store, path)
>|= Error.unwrap "mem"
let mem_tree store path =
let repo = repo store in
request repo (module Commands.Store.Mem_tree) (request_store store, path)
>|= Error.unwrap "mem_tree"
let find store path =
let repo = repo store in
request repo (module Commands.Store.Find) (request_store store, path)
>|= Error.unwrap "find"
let remove_exn ?clear ?retries ?allow_empty ?parents ~info store path =
let parents = Option.map (List.map (fun c -> Commit.hash c)) parents in
let repo = repo store in
request repo
(module Commands.Store.Remove)
( ((clear, retries), (allow_empty, parents)),
(request_store store, path),
info () )
>|= Error.unwrap "remove"
let remove ?clear ?retries ?allow_empty ?parents ~info store path =
let* x =
remove_exn ?clear ?retries ?allow_empty ?parents ~info store path
in
Lwt.return_ok x
let find_tree store path =
let repo = repo store in
let+ concrete =
request repo (module Commands.Store.Find_tree) (request_store store, path)
>|= Error.unwrap "find_tree"
in
Option.map Tree.of_concrete concrete
end