package caqti
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>
Unified interface to relational database libraries
Install
dune-project
Dependency
Authors
Maintainers
Sources
caqti-v1.7.0.tbz
sha256=a363cfcc15f2a3ab9721d08789a65aaa1108d27f974a9b68425a889596e27fb8
sha512=982b9c65fde0405b5d33822ff2743d1c8a8c0611dcd6615dd0af5b32048262f7ddbcae8434193cfd2b7ee845f29c2821d869862b34086099fcffc912b51d61a2
doc/src/caqti/caqti_pool.ml.html
Source file caqti_pool.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140(* Copyright (C) 2014--2021 Petter A. Urkedal <paurkedal@gmail.com> * * This library is free software; you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 3 of the License, or (at your * option) any later version, with the LGPL-3.0 Linking Exception. * * This library is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public * License for more details. * * You should have received a copy of the GNU Lesser General Public License * and the LGPL-3.0 Linking Exception along with this library. If not, see * <http://www.gnu.org/licenses/> and <https://spdx.org>, respectively. *) open Caqti_compat [@@warning "-33"] let default_max_size = try int_of_string (Sys.getenv "CAQTI_POOL_MAX_SIZE") with Not_found -> 8 let default_log_src = Logs.Src.create "Caqti_pool" module Make (System : Caqti_driver_sig.System_common) = struct open System let (>>=?) m mf = m >>= (function Ok x -> mf x | Error e -> return (Error e)) module Task = struct type t = {priority: float; mvar: unit Mvar.t} let wake {mvar; _} = Mvar.store () mvar let compare {priority = pA; _} {priority = pB; _} = Float.compare pB pA end module Taskq = Caqti_heap.Make (Task) type ('a, +'e) t = { create: unit -> ('a, 'e) result future; free: 'a -> unit future; check: 'a -> (bool -> unit) -> unit; validate: 'a -> bool future; log_src: Logs.Src.t; max_idle_size: int; max_size: int; mutable cur_size: int; pool: 'a Queue.t; mutable waiting: Taskq.t; } let create ?(max_size = default_max_size) ?(max_idle_size = max_size) ?(check = fun _ f -> f true) ?(validate = fun _ -> return true) ?(log_src = default_log_src) create free = assert (max_size > 0); assert (max_size >= max_idle_size); { create; free; check; validate; log_src; max_idle_size; max_size; cur_size = 0; pool = Queue.create (); waiting = Taskq.empty } let size {cur_size; _} = cur_size let wait ~priority p = let mvar = Mvar.create () in p.waiting <- Taskq.push Task.({priority; mvar}) p.waiting; Mvar.fetch mvar let schedule p = if not (Taskq.is_empty p.waiting) then begin let task, taskq = Taskq.pop_e p.waiting in p.waiting <- taskq; Task.wake task end let realloc p = p.create () >|= (function | Ok e -> Ok e | Error err -> p.cur_size <- p.cur_size - 1; schedule p; Error err) let rec acquire ~priority p = if Queue.is_empty p.pool then begin if p.cur_size < p.max_size then begin p.cur_size <- p.cur_size + 1; realloc p end else wait ~priority p >>= fun () -> acquire ~priority p end else begin let e = Queue.take p.pool in p.validate e >>= fun ok -> if ok then return (Ok e) else begin Log.warn ~src:p.log_src (fun f -> f "Dropped pooled connection due to invalidation.") >>= fun () -> realloc p end end let release p e = if p.cur_size > p.max_idle_size then begin p.cur_size <- p.cur_size - 1; p.free e >|= fun () -> schedule p end else begin p.check e begin fun ok -> if ok then Queue.add e p.pool else begin Logs.warn ~src:p.log_src (fun f -> f "Will not repool connection due to invalidation."); p.cur_size <- p.cur_size - 1 end; schedule p end; return () end let use ?(priority = 0.0) f p = acquire ~priority p >>=? fun e -> finally (fun () -> f e) (fun () -> release p e) let dispose p e = p.free e >|= fun () -> p.cur_size <- p.cur_size - 1 let rec drain p = if p.cur_size = 0 then return () else (if Queue.is_empty p.pool then wait ~priority:0.0 p else dispose p (Queue.take p.pool)) >>= fun () -> drain p end
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>