Pipe handles provide an abstraction over local domain sockets on Unix and named pipes on Windows.

type t
include module type of Stream with type t := t
include module type of Handle with type t := t
val close : t -> Int_result.unit

Handles are closed automatically, if they are not longer referenced from the OCaml heap. Nevertheless, you should nearly always close them with close, because:

  • if they wrap a file descriptor, you will sooner or later run out of file descriptors. The OCaml garbage collector doesn't give any guarantee, when orphaned memory blocks are removed.
  • you might have registered some repeatedly called action (e.g. timeout, read_start,...), that prevent that all references get removed from the OCaml heap.

However, it's safe to write code in this manner:

let s = Uwt.Tcp.init () in
let c = Uwt.Tcp.init () in
Uwt.Tcp.nodelay s false;
Uwt.Tcp.simultaneous_accepts true;
if foobar () then (* no file descriptor yet assigned, no need to worry
                     about exceptions inside foobar,... *)
  Lwt.return_unit (* no need to close *)
else
  ...

If you want - for whatever reason - keep a file descriptor open for the whole lifetime of your process, remember to keep a reference to its handle.

val close_noerr : t -> unit
val close_wait : t -> unit Lwt.t

Prefer close or close_noerr to close_wait. close or close_noerr return immediately (there are no useful error messages, beside perhaps a notice, that you've already closed that handle).

close_wait is only useful, if you intend to wait until all concurrent write and read threads related to this handle are canceled.

val is_active : t -> bool

Returns non-zero if the handle is active, zero if it's inactive. What "active" means depends on the type of handle:

  • A Async.t handle is always active and cannot be deactivated, except by closing it with uv_close().
  • A Pipe.t, Tcp.t, Udp.t, etc. handle - basically any handle that deals with i/o - is active when it is doing something that involves i/o, like reading, writing, connecting, accepting new connections, etc.

Rule of thumb: if a handle of type Uwt.Foo.t has a uv_foo_start() function, then it's active from the moment that function is called. Likewise, uv_foo_stop() deactivates the handle again.

val ref' : t -> unit

Reference the given handle. References are idempotent, that is, if a handle is already referenced calling this function again will have no effect.

val unref : t -> unit

Un-reference the given handle. References are idempotent, that is, if a handle is not referenced calling this function again will have no effect.

val has_ref : t -> bool

Returns non-zero if the handle is referenced, zero otherwise.

val to_handle : t -> Handle.t
val is_readable : t -> bool
val is_writable : t -> bool
val read_start : t -> cb:(Bytes.t uv_result -> unit) -> Int_result.unit

Read data from an incoming stream. The ~cb will be made several times until there is no more data to read or read_stop is called.

val read_start_exn : t -> cb:(Bytes.t uv_result -> unit) -> unit
val read_stop : t -> Int_result.unit

Stop reading data from the stream.

val read_stop_exn : t -> unit
val read : ?pos:int -> ?len:int -> t -> buf:bytes -> int Lwt.t

There is currently no uv_read function in libuv, just uv_read_start and uv_read_stop. This is a wrapper for your convenience. It calls read_stop internally, if you don't continue with reading immediately. Zero result indicates EOF.

In future libuv versions, there might be uv_read and uv_try_read functions (it was discussed several times). If these changes got merged, Stream.read will wrap them - even if there will be small semantic differences.

It is currently not possible to start several read threads in parallel, you must serialize the requests manually. In the following example t2 will fail with EBUSY:

let t1 = Uwt.Stream.read t ~buf:buf1 in
let t2 = Uwt.Stream.read t ~buf:buf2 in
(* ... *)

Calling the function with ~len:0 has a dubious, system dependent semantic.

val read_ba : ?pos:int -> ?len:int -> t -> buf:buf -> int Lwt.t
val write_queue_size : t -> int

Returns the amount of queued bytes waiting to be sent

val try_write : ?pos:int -> ?len:int -> t -> buf:bytes -> Int_result.int

Write data to stream, but won't queue a write request if it can't be completed immediately.

val try_write_ba : ?pos:int -> ?len:int -> t -> buf:buf -> Int_result.int
val try_write_string : ?pos:int -> ?len:int -> t -> buf:string -> Int_result.int
val write : ?pos:int -> ?len:int -> t -> buf:bytes -> unit Lwt.t

Write data to stream

val write_string : ?pos:int -> ?len:int -> t -> buf:string -> unit Lwt.t
val write_ba : ?pos:int -> ?len:int -> t -> buf:buf -> unit Lwt.t
val write_raw : ?pos:int -> ?len:int -> t -> buf:bytes -> unit Lwt.t

write is eager - like the counterparts inside Lwt_unix. It first calls try_write internally to check if it can return immediately (without the overhead of creating a sleeping thread and waking it up later). If it can't write everything instantly, it will call write_raw internally. write_raw is exposed here mainly in order to write unit tests for it. But you can also use it, if you your ~buf is very large or you know for another reason, that try_write will fail.

val write_raw_string : ?pos:int -> ?len:int -> t -> buf:string -> unit Lwt.t
val write_raw_ba : ?pos:int -> ?len:int -> t -> buf:buf -> unit Lwt.t
val try_writev : t -> Iovec_write.t list -> Int_result.int

Windows doesn't support writing multiple buffers with a single syscall for some HANDLEs (e.g. it's supported for tcp handles, but not pipes). uwt then writes the buffers one by one

If the number of buffers is greater than IOV_MAX, libuv already contains the necessary workarounds

val writev : t -> Iovec_write.t list -> unit Lwt.t

See comment to try_writev! This function will fail with Unix.EOPNOTSUPP on Windows for e.g. pipe handles

val writev_emul : t -> Iovec_write.t list -> unit Lwt.t

Similar to writev, but if passing several buffers at once is not supported by the OS, the buffers will be written one by one. Please note that as a consequence you should not start several writev_emul threads in parallel. The writing order would be surprising in this case. If you don't use windows, this function is identic to writev

val writev_raw : t -> Iovec_write.t list -> unit Lwt.t
val listen : t -> max:int -> cb:(t -> Int_result.unit -> unit) -> Int_result.unit

Start listening for incoming connections. ~max indicates the number of connections the kernel might queue, same as listen(2). When a new incoming connection is received ~cb is called.

val listen_exn : t -> max:int -> cb:(t -> Int_result.unit -> unit) -> unit
val shutdown : t -> unit Lwt.t

Shutdown the outgoing (write) side of a duplex stream. It waits for pending write requests to complete.

val set_blocking : t -> bool -> Int_result.unit

Just don't use this function. It will only cause trouble.

include module type of Handle_ext with type t := t
val get_send_buffer_size : t -> Int_result.int

Gets the size of the send buffer that the operating system uses for the socket.

val get_send_buffer_size_exn : t -> int
val get_recv_buffer_size : t -> Int_result.int

Gets the size of the receive buffer that the operating system uses for the socket.

val get_recv_buffer_size_exn : t -> int
val set_send_buffer_size : t -> int -> Int_result.unit

Sets the size of the send buffer that the operating system uses for the socket.

val set_send_buffer_size_exn : t -> int -> unit
val set_recv_buffer_size : t -> int -> Int_result.unit

Sets the size of the receive buffer that the operating system uses for the socket.

val set_recv_buffer_size_exn : t -> int -> unit
include module type of Handle_fileno with type t := t
val fileno : t -> Unix.file_descr uv_result
val fileno_exn : t -> Unix.file_descr
val to_stream : t -> Stream.t
val init : ?ipc:bool -> unit -> t

The only thing that can go wrong, is memory allocation. In this case the ordinary exception Out_of_memory is thrown. The function is not called init_exn, because this exception can be thrown by nearly all functions.

  • parameter ipc

    is false by default

val openpipe : ?ipc:bool -> Unix.file_descr -> t uv_result

Be careful with open* functions. They exists, so you can re-use system dependent libraries. But if you pass a file descriptor to openpipe (or opentcp,...), that is not really a file descriptor of a pipe (or tcp socket,...) you can trigger assert failures inside libuv.

  • parameter ipc

    is false by default

val openpipe_exn : ?ipc:bool -> Unix.file_descr -> t
val bind : t -> path:string -> Int_result.unit

Bind the pipe to a file path (Unix) or a name (Windows).

val bind_exn : t -> path:string -> unit
val getsockname : t -> string uv_result

Get the name of the Unix domain socket or the named pipe.

val getsockname_exn : t -> string
val getpeername : t -> string uv_result

Get the name of the Unix domain socket or the named pipe to which the handle is connected.

val getpeername_exn : t -> string
val pending_instances : t -> int -> Int_result.unit

Set the number of pending pipe instance handles when the pipe server is waiting for connections.

Note: This setting applies to Windows only.

val pending_instances_exn : t -> int -> unit
val accept : t -> t uv_result

initializes a new client, accepts and returns it.

This call is used in conjunction with listen to accept incoming connections. Call this function after receiving a listen callback to accept the connection.

When the listen callback is called it is guaranteed that this function will complete successfully the first time. If you attempt to use it more than once, it may fail. It is suggested to only call this function once per listen callback call.

Don't use this function for pipes that have been initialized with ~ipc:true. It's not portable.

val accept_exn : t -> t
val accept_raw : server:t -> client:t -> Int_result.unit
val accept_raw_exn : server:t -> client:t -> unit
val pending_count : t -> Int_result.int
val pending_count_exn : t -> int

how many handles are waiting to be accepted with accept_ipc

type ipc_result =
  1. | Ipc_error of error
    (*

    internal call to accept failed

    *)
  2. | Ipc_none
    (*

    no pending handles

    *)
  3. | Ipc_tcp of Tcp.t
  4. | Ipc_udp of Udp.t
  5. | Ipc_pipe of t
val accept_ipc : t -> ipc_result

Used to receive handles over IPC pipes.

It call's pending_count, if it's > 0 then it initializes a handle of the appropriate type, accepts and returns it

val write2 : ?pos:int -> ?len:int -> buf:bytes -> send:Tcp.t -> t -> unit Lwt.t

Extended write function for sending handles over a pipe. The pipe must be initialized with ~ipc:true.

Note: send_handle must be a TCP socket or pipe, which is a server or a connection (listening or connected state). Bound sockets or pipes will be assumed to be servers.

val write2_ba : ?pos:int -> ?len:int -> buf:buf -> send:Tcp.t -> t -> unit Lwt.t
val write2_string : ?pos:int -> ?len:int -> buf:string -> send:Tcp.t -> t -> unit Lwt.t
val write2_pipe : ?pos:int -> ?len:int -> buf:bytes -> send:t -> t -> unit Lwt.t

Similar to write2, but the send handle is a pipe. Note: This is not supported on Windows

val write2_pipe_ba : ?pos:int -> ?len:int -> buf:buf -> send:t -> t -> unit Lwt.t
val write2_pipe_string : ?pos:int -> ?len:int -> buf:string -> send:t -> t -> unit Lwt.t
val write2_udp : ?pos:int -> ?len:int -> buf:bytes -> send:Udp.t -> t -> unit Lwt.t

Similar to write2, but the send handle is an udp socket. Note: This is not portable at all

val write2_udp_ba : ?pos:int -> ?len:int -> buf:buf -> send:Udp.t -> t -> unit Lwt.t
val write2_udp_string : ?pos:int -> ?len:int -> buf:string -> send:Udp.t -> t -> unit Lwt.t
val connect : t -> path:string -> unit Lwt.t

Connect to the Unix domain socket or a named pipe.

val with_pipe : ?ipc:bool -> (t -> 'a Lwt.t) -> 'a Lwt.t

with_pipe ?ipc f creates a new handle and passes the pipe to f. It is ensured that the pipe is closed when f t terminates (even if it fails).

You can also close the pipe manually inside f without further consequences.

val with_connect : path:string -> (t -> 'a Lwt.t) -> 'a Lwt.t
val with_open : ?ipc:bool -> Unix.file_descr -> (t -> 'a Lwt.t) -> 'a Lwt.t
type chmod =
  1. | Pipe_readable
  2. | Pipe_writeable
  3. | Pipe_readable_writeable
val chmod : t -> chmod -> Int_result.unit

Alters pipe permissions, allowing it to be accessed from processes run by different users. Makes the pipe writable or readable by all users. This function is blocking.

val chmod_exn : t -> chmod -> unit