Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Page
Library
Module
Module type
Parameter
Class
Class type
Source
connection.ml1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112open! Core open! Async let close_channels reader writer = let%bind () = Output_channel.close writer in Input_channel.close reader ;; let collect_errors writer fn = let monitor = Output_channel.monitor writer in ignore (Monitor.detach_and_get_error_stream monitor : _ Stream.t); choose [ choice (Monitor.get_next_error monitor) (fun e -> Error e) ; choice (Monitor.try_with ~run:`Now ~rest:`Log fn) Fn.id ] ;; let listen ?max_connections ?max_accepts_per_batch ?backlog ?socket ?input_buffer_size ?max_input_buffer_size ?output_buffer_size ?max_output_buffer_size ?write_timeout ?time_source ~on_handler_error ~f:handler where_to_listen = Tcp.Server.create_sock ?max_connections ?max_accepts_per_batch ?backlog ?socket ?time_source ~on_handler_error where_to_listen (fun addr socket -> let fd = Socket.fd socket in let input_channel = Input_channel.create ?max_buffer_size:max_input_buffer_size ?buf_len:input_buffer_size ?time_source fd in let output_channel = Output_channel.create ?max_buffer_size:max_output_buffer_size ?buf_len:output_buffer_size ?write_timeout ?time_source fd in let%bind res = Deferred.any [ collect_errors output_channel (fun () -> handler addr input_channel output_channel) ; Output_channel.remote_closed output_channel |> Deferred.ok ] in let%bind () = close_channels input_channel output_channel in match res with | Ok () -> Deferred.unit | Error exn -> Exn.reraise exn "Shuttle.Connection.create: exception from output_channel") ;; let with_connection ?interrupt ?timeout ?input_buffer_size ?max_input_buffer_size ?output_buffer_size ?max_output_buffer_size ?time_source ~f where_to_connect = let%bind socket = Tcp.connect_sock ?interrupt ?timeout ?time_source where_to_connect in let fd = Socket.fd socket in let input_channel = Input_channel.create ?max_buffer_size:max_input_buffer_size ?buf_len:input_buffer_size ?time_source fd in let output_channel = Output_channel.create ?max_buffer_size:max_output_buffer_size ?buf_len:output_buffer_size ?time_source fd in let res = collect_errors output_channel (fun () -> f input_channel output_channel) in let%bind () = Deferred.any_unit [ (res >>| fun _ -> ()) ; Output_channel.close_finished output_channel ; Input_channel.closed input_channel ] in let%bind () = close_channels input_channel output_channel in match%map res with | Ok v -> v | Error exn -> Exn.reraise exn "Shuttle.Connection.with_connection: Exception in output channel" ;;