package tezos-p2p

  1. Overview
  2. Docs

Scheduling of I/O operations over file descriptors.

This module defines the scheduler type t, and connection type connection. A connection is a wrapper over a P2p_fd.t. R/W functions over connections behave like regular R/W over file descriptors, but the scheduler ensures of fair allocation of bandwidth between them.

To each connection is associated a read (resp. write) queue where data is copied to (resp. read from), at a rate of max_download_speed / num_connections (resp. max_upload_speed / num_connections).

type connection

Type of a connection.

type t

Type of an IO scheduler.

val create : ?max_upload_speed:int -> ?max_download_speed:int -> ?read_queue_size:int -> ?write_queue_size:int -> read_buffer_size:int -> unit -> t

create ~max_upload_speed ~max_download_speed ~read_queue_size ~write_queue_size () is an IO scheduler with specified (global) max upload (resp. download) speed, and specified read (resp. write) queue sizes (in bytes) for connections.

ma_state sched returns the state of the moving average worker.

val register : t -> P2p_fd.t -> connection

register sched fd is a connection managed by sched.

val write : ?canceler:Lwt_canceler.t -> connection -> Stdlib.Bytes.t -> (unit, Tezos_error_monad.TzCore.error list) Stdlib.result Lwt.t

write conn msg returns Ok () when msg has been added to conn's write queue, or fail with an error.

val write_now : connection -> Stdlib.Bytes.t -> bool

write_now conn msg is true iff msg has been (immediately) added to conn's write queue, false if it has been dropped.

type buffer

Container to write data to when reading bytes from a connection

val mk_buffer : ?pos:int -> ?len:int -> bytes -> (buffer, Tezos_error_monad.TzCore.error list) Stdlib.result

mk_buffer ?pos ?len buf creates an instance of buffer, for copying len bytes starting at pos in buf. If pos is omitted, it is defaulted to 0. If len is omitted, it is defaulted to Bytes.length buf - pos.

val mk_buffer_safe : bytes -> buffer

mk_buffer_safe buf creates an instance of buffer, that uses the entirety of buf; i.e. it will read at most Bytes.length buf bytes from the connection, and will write starting at position 0.

val read_now : connection -> buffer -> (int, Tezos_error_monad.TzCore.error list) Stdlib.result option

read_now conn buffer blits at most buffer.len bytes from conn's read queue and returns the number of bytes written in buffer.buf starting at buffer.pos.

val read : ?canceler:Lwt_canceler.t -> connection -> buffer -> (int, Tezos_error_monad.TzCore.error list) Stdlib.result Lwt.t

Like read_now, but waits till conn read queue has at least one element instead of failing.

val read_full : ?canceler:Lwt_canceler.t -> connection -> buffer -> (unit, Tezos_error_monad.TzCore.error list) Stdlib.result Lwt.t

Like read, but blits exactly len bytes in buf.

stat conn is a snapshot of current bandwidth usage for conn.

val global_stat : t -> Tezos_base.P2p_stat.t

global_stat sched is a snapshot of sched's bandwidth usage (sum of stat conn for each conn in sched).

val iter_connection : t -> (connection -> unit) -> unit

iter_connection sched f applies f on each connection managed by sched.

val close : ?timeout:float -> connection -> (unit, Tezos_error_monad.TzCore.error list) Stdlib.result Lwt.t

close conn returns after any pending data has been sent and the canceler of conn has been triggered.

It does not wait for the canceler callbacks, so there is no guarantee that the file descriptor is already closed, but it will eventually be closed.

If timeout is set, the canceler will be triggered after the timeout, even if pending data remains to be sent.

val shutdown : ?timeout:float -> t -> unit Lwt.t

shutdown sched returns after all connections managed by sched have been closed and sched's inner worker has successfully canceled.

val id : connection -> int

id connection returns the identifier of the underlying P2p_fd.t file descriptor. This uniquely identifies a connection.

module Internal : sig ... end

Module for testing only, do not use in production.

OCaml

Innovation. Community. Security.