Source file caqti_blocking.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
open Caqti_platform
type Caqti_error.msg += Msg_unix of Unix.error * string * string
let () =
  let pp ppf = function
   | Msg_unix (err, func, arg) ->
      Format.fprintf ppf "%s in %s(%S)" (Unix.error_message err) func arg
   | _ -> assert false
  in
  Caqti_error.define_msg ~pp [%extension_constructor Msg_unix]
module Fiber = struct
  type 'a t = 'a
  module Infix = struct
    let (>>=) x f = f x
    let (>|=) x f = f x
  end
  let return x = x
  let catch f g = try f () with exn -> g exn
  let finally f g =
    (match f () with
     | y -> g (); y
     | exception exn -> g (); raise exn)
  let cleanup f g = try f () with exn -> g (); raise exn
end
module Stream = Caqti_platform.Stream.Make (Fiber)
module System_core = struct
  module Fiber = Fiber
  module Switch = Caqti_platform.Switch.Make (Fiber)
  let async ~sw:_ f = f ()
  module Stream = Stream
  module Mutex = Mutex
  module Condition = Condition
  module Log = struct
    type 'a log = 'a Logs.log
    let err ?(src = Logging.default_log_src) = Logs.err ~src
    let warn ?(src = Logging.default_log_src) = Logs.warn ~src
    let info ?(src = Logging.default_log_src) = Logs.info ~src
    let debug ?(src = Logging.default_log_src) = Logs.debug ~src
  end
  type stdenv = unit
  module Sequencer = struct
    type 'a t = 'a
    let create m = m
    let enqueue m f = f m
  end
end
module Pool = Caqti_platform.Pool.Make_without_alarm (System_core)
module System = struct
  include System_core
  module Net = struct
    module Sockaddr = struct
      type t = Unix.sockaddr
      let unix s = Unix.ADDR_UNIX s
      let tcp (addr, port) =
        Unix.ADDR_INET (Unix.inet_addr_of_string (Ipaddr.to_string addr), port)
    end
    let getaddrinfo ~stdenv:() host port =
      try
        let opts = Unix.[AI_SOCKTYPE SOCK_STREAM] in
        Unix.getaddrinfo (Domain_name.to_string host) (string_of_int port) opts
          |> List.map (fun ai -> ai.Unix.ai_addr) |> Result.ok
      with
       | Not_found -> Ok []
       | Unix.Unix_error (code, _, _) ->
          Error (`Msg ("Cannot resolve host name: " ^ Unix.error_message code))
    let convert_io_exception = function
     | Unix.Unix_error (err, fn, arg) -> Some (Msg_unix (err, fn, arg))
     | _ -> None
    module Socket = struct
      type t = Tcp of in_channel * out_channel
      let output_char (Tcp (_, oc)) = output_char oc
      let output_string (Tcp (_, oc)) = output_string oc
      let flush (Tcp (_, oc)) = flush oc
      let input_char (Tcp (ic, _)) = input_char ic
      let really_input (Tcp (ic, _)) = really_input ic
      let close (Tcp (_, oc)) = close_out oc
    end
    type tcp_flow = Socket.t
    type tls_flow = Socket.t
    let connect_tcp ~sw:_ ~stdenv:() sockaddr =
      try
        let ic, oc = Unix.open_connection sockaddr in
        Ok (Socket.Tcp (ic, oc))
      with
       | Unix.Unix_error (err, func, arg) -> Error (Msg_unix (err, func, arg))
    let tcp_flow_of_socket _ = None
    let socket_of_tls_flow ~sw:_ = Fun.id
    module type TLS_PROVIDER = System_sig.TLS_PROVIDER
      with type 'a fiber := 'a
       and type tcp_flow := Socket.t
       and type tls_flow := Socket.t
    let tls_providers_r : (module TLS_PROVIDER) list ref = ref []
    let register_tls_provider p = tls_providers_r := p :: !tls_providers_r
    let tls_providers _ =
      
      !tls_providers_r
  end
end
module System_unix = struct
  module Unix = struct
    type file_descr = Unix.file_descr
    let wrap_fd f fd = f fd
    let poll ~stdenv:()
             ?(read = false) ?(write = false) ?(timeout = -1.0) fd =
      let read_fds = if read then [fd] else [] in
      let write_fds = if write then [fd] else [] in
      let read_fds, write_fds, _ = Unix.select read_fds write_fds [] timeout in
      (read_fds <> [], write_fds <> [], read_fds = [] && write_fds = [])
  end
  module Preemptive = struct
    let detach f x = f x
    let run_in_main f = f ()
  end
end
module Loader = Caqti_platform_unix.Driver_loader.Make (System) (System_unix)
include Connector.Make (System) (Pool) (Loader)
open System
module type CONNECTION = Caqti_connection_sig.S
  with type 'a fiber := 'a
   and type ('a, 'e) stream := ('a, 'e) Stream.t
type connection = (module CONNECTION)
let connect ?subst ?env ?config ?tweaks_version uri =
  let sw = Switch.create () in
  connect ?subst ?env ?config ?tweaks_version ~sw ~stdenv:() uri
let with_connection = with_connection ~stdenv:()
let connect_pool
      ?pool_config ?post_connect ?subst ?env ?config ?tweaks_version uri =
  let sw = Switch.create () in
  connect_pool
    ?pool_config ?post_connect ?subst ?env ?config ?tweaks_version
    ~sw ~stdenv:() uri
let or_fail = function
 | Ok x -> x
 | Error (#Caqti_error.t as err) -> raise (Caqti_error.Exn err)