Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Page
Library
Module
Module type
Parameter
Class
Class type
Source
mirror.ml1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378(* * Copyright (C) Citrix Systems Inc. * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published * by the Free Software Foundation; version 2.1 only. with the special * exception on linking described in file LICENSE. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. *) open Lwt.Infix open Result module Make (Primary : Mirage_block.S) (Secondary : Mirage_block.S) = struct type error = [ Mirage_block.error | `Primary of Primary.error | `Secondary of Secondary.error ] type write_error = [ Mirage_block.write_error | `Primary of Primary.write_error | `Secondary of Secondary.write_error ] type mirror_error = [`Primary of Primary.error | `Secondary of Secondary.write_error] let pp_error ppf = function | #Mirage_block.error as e -> Mirage_block.pp_error ppf e | `Primary p -> Primary.pp_error ppf p | `Secondary s -> Secondary.pp_error ppf s let pp_write_error ppf = function | #Mirage_block.write_error as e -> Mirage_block.pp_write_error ppf e | `Primary p -> Primary.pp_write_error ppf p | `Secondary s -> Secondary.pp_write_error ppf s let string_of_error x = let b = Buffer.create 32 in let f = Format.formatter_of_buffer b in pp_error f x ; Buffer.contents b module Region_lock = struct (* We need to prevent the background mirror thread racing with an I/O write to a particular region *) type region = int64 * int let overlap (start, length) (start', _length') = start' >= start && start' < Int64.(add start (of_int length)) let before (start, length) (start', _length') = Int64.(add start (of_int length)) < start' type t = { mutable exclusive_lock: region ; (* extent we're currently copying *) mutable active: region list ; (* extents which are being written to *) c: unit Lwt_condition.t ; m: Lwt_mutex.t } (* Exclusively lock up to [offset'] *) let extend_right t offset' = Lwt_mutex.with_lock t.m (fun () -> let rec wait () = let length = Int64.(to_int (sub offset' (fst t.exclusive_lock))) in if List.fold_left ( || ) false (List.map (overlap (fst t.exclusive_lock, length)) t.active) then Lwt_condition.wait ~mutex:t.m t.c >>= fun () -> wait () else Lwt.return length in wait () >>= fun length -> t.exclusive_lock <- (fst t.exclusive_lock, length) ; Lwt_condition.broadcast t.c () ; Lwt.return () ) (* Release lock up to [offset'] *) let release_left t offset' = Lwt_mutex.with_lock t.m (fun () -> let length = Int64.( to_int (sub (add (fst t.exclusive_lock) (of_int (snd t.exclusive_lock))) offset' ) ) in t.exclusive_lock <- (offset', length) ; Lwt_condition.broadcast t.c () ; Lwt.return () ) (* Exclude the background copying thread from [offset:offset+length]. This avoids updating a region while it is being actively mirrored, which could cause the old data to overtake and overwrite the new data. *) let with_lock t offset length f = Lwt_mutex.with_lock t.m (fun () -> let rec loop () = if overlap t.exclusive_lock (offset, length) then Lwt_condition.wait ~mutex:t.m t.c >>= fun () -> loop () else (* if the copy might catch up with us then mark the region as locked *) let unlock = if before t.exclusive_lock (offset, length) then ( t.active <- (offset, length) :: t.active ; fun () -> t.active <- List.filter (fun (o, l) -> o <> offset || l <> length) t.active ; Lwt_condition.broadcast t.c () ) else fun () -> () in Lwt.catch (fun () -> f () >>= fun r -> unlock () ; Lwt.return r) (fun e -> unlock () ; Lwt.fail e) in loop () ) let make () = let exclusive_lock = (0L, 0) in let active = [] in let c = Lwt_condition.create () in let m = Lwt_mutex.create () in {exclusive_lock; active; c; m} end type t = { primary: Primary.t ; secondary: Secondary.t ; primary_block_size: int ; (* number of primary sectors per info.sector_size *) secondary_block_size: int ; (* number of secondary sectors per info.sector_size *) info: Mirage_block.info ; lock: Region_lock.t ; result: (unit, mirror_error) result Lwt.t ; mutable percent_complete: int ; progress_cb: [`Percent of int | `Complete] -> unit ; mutable disconnected: bool } let start_copy t u = let buffer = Io_page.(to_cstruct (get 4096)) in (* round to the nearest sector *) let block = Cstruct.length buffer / t.info.Mirage_block.sector_size in let buffer = Cstruct.sub buffer 0 (block * t.info.Mirage_block.sector_size) in (* split into an array of slots *) let nr_slots = 8 in let block = block / nr_slots in let slots = Array.make nr_slots (Cstruct.create 0) in for i = 0 to nr_slots - 1 do slots.(i) <- Cstruct.sub buffer (i * block * t.info.Mirage_block.sector_size) (block * t.info.Mirage_block.sector_size) done ; (* treat the slots as a circular buffer *) let producer_idx = ref 0 in let consumer_idx = ref 0 in let c = Lwt_condition.create () in let rec reader sector = if t.disconnected || sector = t.info.Mirage_block.size_sectors then Lwt.return_ok () else if !producer_idx - !consumer_idx >= nr_slots then Lwt_condition.wait c >>= fun () -> reader sector else Region_lock.extend_right t.lock Int64.(add sector (of_int block)) >>= fun () -> Primary.read t.primary Int64.(mul sector (of_int t.primary_block_size)) [slots.(!producer_idx mod nr_slots)] >>= function | Error e -> t.disconnected <- true ; Lwt_condition.signal c () ; Lwt.return_error e | Ok () -> incr producer_idx ; Lwt_condition.signal c () ; reader Int64.(add sector (of_int block)) in let rec writer sector = let percent_complete = Int64.(to_int (div (mul sector 100L) t.info.Mirage_block.size_sectors)) in if percent_complete <> t.percent_complete then t.progress_cb ( if percent_complete = 100 then `Complete else `Percent percent_complete ) ; t.percent_complete <- percent_complete ; if t.disconnected || sector = t.info.Mirage_block.size_sectors then Lwt.return_ok () else if !consumer_idx = !producer_idx then Lwt_condition.wait c >>= fun () -> writer sector else Secondary.write t.secondary Int64.(mul sector (of_int t.secondary_block_size)) [slots.(!consumer_idx mod nr_slots)] >>= function | Error e -> t.disconnected <- true ; Lwt_condition.signal c () ; Lwt.return_error e | Ok () -> incr consumer_idx ; Region_lock.release_left t.lock Int64.(add sector (of_int block)) >>= fun () -> Lwt_condition.signal c () ; writer Int64.(add sector (of_int block)) in let read_t = reader 0L in let write_t = writer 0L in read_t >>= fun read_result -> write_t >>= fun write_result -> ( match (read_result, write_result) with | Ok (), Ok () -> Lwt.wakeup u (Ok ()) | Error e, _ -> Lwt.wakeup u (Error (`Primary e)) | Ok (), Error e -> Lwt.wakeup u (Error (`Secondary e)) ) ; Lwt.return () type _id = unit let get_info t = Lwt.return t.info let connect ?(progress_cb = fun _ -> ()) primary secondary = Primary.get_info primary >>= fun primary_info -> Secondary.get_info secondary >>= fun secondary_info -> let sector_size = max primary_info.Mirage_block.sector_size secondary_info.Mirage_block.sector_size in (* We need our chosen sector_size to be an integer multiple of both primary and secondary sector sizes. This should be the very common case e.g. 4096 and 512; 512 and 1 *) let primary_block_size = sector_size / primary_info.Mirage_block.sector_size in let secondary_block_size = sector_size / secondary_info.Mirage_block.sector_size in let primary_bytes = Int64.( mul primary_info.Mirage_block.size_sectors (of_int primary_info.Mirage_block.sector_size) ) in let secondary_bytes = Int64.( mul secondary_info.Mirage_block.size_sectors (of_int secondary_info.Mirage_block.sector_size) ) in (let open Rresult in ( if sector_size mod primary_info.Mirage_block.sector_size <> 0 || sector_size mod secondary_info.Mirage_block.sector_size <> 0 then Error (`Msg (Printf.sprintf "Incompatible sector sizes: either primary (%d) or secondary \ (%d) must be an integer multiple of the other" primary_info.Mirage_block.sector_size secondary_info.Mirage_block.sector_size ) ) else Ok () ) >>= fun () -> ( if primary_bytes <> secondary_bytes then Error (`Msg (Printf.sprintf "Incompatible overall sizes: primary (%Ld bytes) and secondary \ (%Ld bytes) must be the same size" primary_bytes secondary_bytes ) ) else Ok () ) >>= fun () -> if not secondary_info.Mirage_block.read_write then Error (`Msg "Cannot mirror to a read-only secondary device") else Ok () ) |> Lwt.return >>= function | Error (`Msg x) -> Lwt.fail_with x | Ok () -> let disconnected = false in let read_write = primary_info.Mirage_block.read_write in let size_sectors = Int64.(div primary_bytes (of_int sector_size)) in let info = {Mirage_block.read_write; sector_size; size_sectors} in let lock = Region_lock.make () in let result, u = Lwt.task () in let percent_complete = 0 in let t = { progress_cb ; primary ; secondary ; primary_block_size ; secondary_block_size ; info ; lock ; result ; percent_complete ; disconnected } in let (_ : unit Lwt.t) = start_copy t u in Lwt.return t let read t ofs bufs = Primary.read t.primary ofs bufs >>= function | Error e -> Lwt.return_error (`Primary e) | Ok x -> Lwt.return_ok x let write t ofs bufs = let total_length_bytes = List.(fold_left ( + ) 0 (map Cstruct.length bufs)) in let length = total_length_bytes / t.info.Mirage_block.sector_size in let primary_ofs = Int64.(mul ofs (of_int t.primary_block_size)) in let secondary_ofs = Int64.(mul ofs (of_int t.secondary_block_size)) in Region_lock.with_lock t.lock ofs length (fun () -> Primary.write t.primary primary_ofs bufs >>= function | Error e -> Lwt.return_error (`Primary e) | Ok () -> ( Secondary.write t.secondary secondary_ofs bufs >>= function | Error e -> Lwt.return_error (`Secondary e) | Ok () -> Lwt.return_ok () ) ) let disconnect t = t.disconnected <- true ; t.result >>= fun _ -> Lwt.return () end