package resource_cache

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file resource_cache_intf.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
open! Core
open! Async_kernel
open! Import

module type S = sig
  type key
  type common_args
  type resource

  module Status : Status.S with type Key.t = key

  type t

  val init : config:Config.t -> log_error:(string -> unit) -> common_args -> t
  val status : t -> Status.t
  val config : t -> Config.t

  (** [with_ t key ~f] calls [f resource] where [resource] is either:

      1) An existing cached resource that was opened with key' such that
      [R.Key.equal key key']
      2) A newly opened resource created by [R.open_ key common_args], respecting
      the limits of [t.config]

      Returns an error if:
      - the cache is closed
      - [R.open_] returned an error
      - no resource is obtained before [give_up] is determined

      If [f] raises, the exception is not caught, but the [resource] will be
      closed and the [Cache] will remain in a working state (no resources are lost). *)
  val with_
    :  ?open_timeout:Time_ns.Span.t (** default [None] *)
    -> ?give_up:unit Deferred.t (** default [Deferred.never] *)
    -> t
    -> key
    -> f:(resource -> 'a Deferred.t)
    -> 'a Deferred.Or_error.t

  (** Like [with_] but classify the different errors *)
  val with_'
    :  ?open_timeout:Time_ns.Span.t
    -> ?give_up:unit Deferred.t
    -> t
    -> key
    -> f:(resource -> 'a Deferred.t)
    -> [ `Ok of 'a
       | `Gave_up_waiting_for_resource
       | `Error_opening_resource of Error.t
       | `Cache_is_closed
       ]
       Deferred.t

  (** Like [with_] and [with_'] except [f] is run on the first matching available resource
      (or the first resource that has availability to be opened).

      Preference is given towards resources earlier in the list, unless
      [~load_balance:true] has been specified, in which case preference is given to ensure
      that load is approximately balanced. The key with the least number of open
      connections will be favored. *)
  val with_any
    :  ?open_timeout:Time_ns.Span.t
    -> ?give_up:unit Deferred.t
    -> ?load_balance:bool
    -> t
    -> key list
    -> f:(resource -> 'a Deferred.t)
    -> (key * 'a) Deferred.Or_error.t

  val with_any'
    :  ?open_timeout:Time_ns.Span.t
    -> ?give_up:unit Deferred.t
    -> ?load_balance:bool
    -> t
    -> key list
    -> f:(resource -> 'a Deferred.t)
    -> [ `Ok of key * 'a
       | `Error_opening_resource of key * Error.t
       | `Gave_up_waiting_for_resource
       | `Cache_is_closed
       ]
       Deferred.t

  (** Tries [with_any'] in a loop (removing args that have open errors) until receiving an
      [`Ok], or until it has failed to open all resources in [args_list]. *)
  val with_any_loop
    :  ?open_timeout:Time_ns.Span.t
    -> ?give_up:unit Deferred.t
    -> ?load_balance:bool
    -> t
    -> key list
    -> f:(resource -> 'a Deferred.t)
    -> [ `Ok of key * 'a
       | `Error_opening_all_resources of (key * Error.t) list
       | `Gave_up_waiting_for_resource
       | `Cache_is_closed
       ]
       Deferred.t

  val close_started : t -> bool
  val close_finished : t -> unit Deferred.t

  (** Close all currently open resources and prevent the creation of new ones. All
      subsequent calls to [with_] and [immediate] fail with [`Cache_is_closed]. Any jobs
      that are waiting for a connection will return with [`Cache_is_closed]. The
      returned [Deferred.t] is determined when all jobs have finished running and all
      resources have been closed. *)
  val close_and_flush : t -> unit Deferred.t
end

module type Resource_cache = sig
  module type S = S

  (** [Cache.Make] creates a cache module that exposes a simple [with_] interface over its
      resources. The cache has the following properties:

      Resource reuse: When a resource [r] is opened, it will remain open until one of the
      following:
      - [f r] raised an exception where [f] was a function passed to [with_]
      - [r] has been idle for [idle_cleanup_after]
      - [r] has been used [max_resource_reuse] times
      - [close_and_flush] has been called on the cache

      When a resource is closed, either because of one of the above conditions, or because
      it was closed by other means, it no longer counts towards the limits.

      Limits: The cache respects the following limits:
      - No more than [max_resources] are open simultaneously
      - No more than [max_resources_per_id] are open simultaneously for a given id (args)
  *)
  module Make (R : Resource.S) () :
    S
      with type key := R.Key.t
       and type common_args := R.Common_args.t
       and type resource := R.t

  (** Wrap a resource that does not natively support a [has_close_started] operation
      in a simple record to add such tracking. *)
  module Make_simple (R : Resource.Simple) () :
    S
      with type key := R.Key.t
       and type common_args := R.Common_args.t
       and type resource := R.t

  (** Make a cache from a resource where the type clients wish to operate on is
      derived from, but not necessarily equal to, the type held by the cache. *)
  module Make_wrapped (R : Resource.S_wrapped) () :
    S
      with type key := R.Key.t
       and type common_args := R.Common_args.t
       and type resource := R.resource
end