Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file Tiny_httpd_io.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226(** IO abstraction.
We abstract IO so we can support classic unix blocking IOs
with threads, and modern async IO with Eio.
{b NOTE}: experimental.
@since 0.14
*)moduleBuf=Tiny_httpd_buf(** Input channel (byte source) *)moduleInput=structtypet={input:bytes->int->int->int;(** Read into the slice. Returns [0] only if the
channel is closed. *)close:unit->unit;(** Close the input. Must be idempotent. *)}(** An input channel, i.e an incoming stream of bytes.
This can be a [string], an [int_channel], an [Unix.file_descr], a
decompression wrapper around another input channel, etc. *)letof_in_channel?(close_noerr=false)(ic:in_channel):t={input=(funbufilen->inputicbufilen);close=(fun()->ifclose_noerrthenclose_in_noerricelseclose_inic);}letof_unix_fd?(close_noerr=false)(fd:Unix.file_descr):t={input=(funbufilen->Unix.readfdbufilen);close=(fun()->ifclose_noerrthen(tryUnix.closefdwith_->())elseUnix.closefd);}(** Read into the given slice.
@return the number of bytes read, [0] means end of input. *)let[@inline]input(self:t)bufilen=self.inputbufilen(** Close the channel. *)let[@inline]closeself:unit=self.close()end(** Output channel (byte sink) *)moduleOutput=structtypet={output_char:char->unit;(** Output a single char *)output:bytes->int->int->unit;(** Output slice *)flush:unit->unit;(** Flush underlying buffer *)close:unit->unit;(** Close the output. Must be idempotent. *)}(** An output channel, ie. a place into which we can write bytes.
This can be a [Buffer.t], an [out_channel], a [Unix.file_descr], etc. *)(** [of_out_channel oc] wraps the channel into a {!Output.t}.
@param close_noerr if true, then closing the result uses [close_out_noerr]
instead of [close_out] to close [oc] *)letof_out_channel?(close_noerr=false)(oc:out_channel):t={output_char=(func->output_charocc);output=(funbufilen->outputocbufilen);flush=(fun()->flushoc);close=(fun()->ifclose_noerrthenclose_out_noerrocelseclose_outoc);}(** [of_buffer buf] is an output channel that writes directly into [buf].
[flush] and [close] have no effect. *)letof_buffer(buf:Buffer.t):t={output_char=Buffer.add_charbuf;output=Buffer.add_subbytesbuf;flush=ignore;close=ignore;}(** Output the buffer slice into this channel *)let[@inline]output_char(self:t)c:unit=self.output_charc(** Output the buffer slice into this channel *)let[@inline]output(self:t)bufilen:unit=self.outputbufilenlet[@inline]output_string(self:t)(str:string):unit=self.output(Bytes.unsafe_of_stringstr)0(String.lengthstr)(** Close the channel. *)let[@inline]closeself:unit=self.close()(** Flush (ie. force write) any buffered bytes. *)let[@inline]flushself:unit=self.flush()letoutput_buf(self:t)(buf:Buf.t):unit=letb=Buf.bytes_slicebufinoutputselfb0(Buf.sizebuf)(** [chunk_encoding oc] makes a new channel that outputs its content into [oc]
in chunk encoding form.
@param close_rec if true, closing the result will also close [oc]
@param buf a buffer used to accumulate data into chunks.
Chunks are emitted when [buf]'s size gets over a certain threshold,
or when [flush] is called.
*)letchunk_encoding?(buf=Buf.create())~close_rec(self:t):t=(* write content of [buf] as a chunk if it's big enough.
If [force=true] then write content of [buf] if it's simply non empty. *)letwrite_buf~force()=letn=Buf.sizebufinif(force&&n>0)||n>4_096then(output_stringself(Printf.sprintf"%x\r\n"n);self.output(Buf.bytes_slicebuf)0n;output_stringself"\r\n";Buf.clearbuf)inletflush()=write_buf~force:true();self.flush()inletclose()=write_buf~force:true();(* write an empty chunk to close the stream *)output_stringself"0\r\n";(* write another crlf after the stream (see #56) *)output_stringself"\r\n";self.flush();ifclose_recthenself.close()inletoutputbin=Buf.add_bytesbufbin;write_buf~force:false()inletoutput_charc=Buf.add_charbufc;write_buf~force:false()in{output_char;flush;close;output}end(** A writer abstraction. *)moduleWriter=structtypet={write:Output.t->unit}[@@unboxed](** Writer.
A writer is a push-based stream of bytes.
Give it an output channel and it will write the bytes in it.
This is useful for responses: an http endpoint can return a writer
as its response's body; the writer is given access to the connection
to the client and can write into it as if it were a regular
[out_channel], including controlling calls to [flush].
Tiny_httpd will convert these writes into valid HTTP chunks.
@since 0.14
*)let[@inline]make~write():t={write}(** Write into the channel. *)let[@inline]write(oc:Output.t)(self:t):unit=self.writeoc(** Empty writer, will output 0 bytes. *)letempty:t={write=ignore}(** A writer that just emits the bytes from the given string. *)let[@inline]of_string(str:string):t=letwriteoc=Output.output_stringocstrin{write}end(** A TCP server abstraction. *)moduleTCP_server=structtypeconn_handler={handle:client_addr:Unix.sockaddr->Input.t->Output.t->unit;(** Handle client connection *)}typet={endpoint:unit->string*int;(** Endpoint we listen on. This can only be called from within [serve]. *)active_connections:unit->int;(** Number of connections currently active *)running:unit->bool;(** Is the server currently running? *)stop:unit->unit;(** Ask the server to stop. This might not take effect immediately,
and is idempotent. After this [server.running()] must return [false]. *)}(** A running TCP server.
This contains some functions that provide information about the running
server, including whether it's active (as opposed to stopped), a function
to stop it, and statistics about the number of connections. *)typebuilder={serve:after_init:(t->unit)->handle:conn_handler->unit->unit;(** Blocking call to listen for incoming connections and handle them.
Uses the connection handler [handle] to handle individual client
connections in individual threads/fibers/tasks.
@param after_init is called once with the server after the server
has started. *)}(** A TCP server builder implementation.
Calling [builder.serve ~after_init ~handle ()] starts a new TCP server on
an unspecified endpoint
(most likely coming from the function returning this builder)
and returns the running server. *)end