Page
Library
Module
Module type
Parameter
Class
Class type
Source
Proton.Async_insertSourceThis module provides high-performance asynchronous bulk insert capabilities for Timeplus Proton. It manages buffering, batching, and automatic flushing of data to optimize throughput for streaming data ingestion.
The inserter operates in the background, automatically batching rows and sending them to the server when thresholds are met or flush intervals expire.
type config = {table_name : string;The name of the target table for inserts
*)max_batch_size : int;Maximum number of rows to accumulate before triggering a flush. Larger values improve throughput but increase memory usage and latency.
*)max_batch_bytes : int;Maximum size in bytes for a single batch. Prevents excessive memory usage for large rows.
*)flush_interval : float;Time in seconds between automatic flushes. Ensures data is sent even during low-activity periods.
*)max_retries : int;Maximum number of retry attempts when an insert fails. Uses exponential backoff between retries.
*)retry_delay : float;Initial delay in seconds before the first retry. Subsequent retries use exponential backoff (2x, 4x, 8x, etc.).
*)response_timeout : float;Maximum time in seconds to wait for server acknowledgment. Prevents indefinite blocking on network issues.
*)}Configuration parameters for controlling async insert behavior. These settings allow fine-tuning of batching, retry logic, and performance characteristics.
default_config table_name creates a default configuration for the specified table.
Default values:
Example:
let config = Async_insert.default_config "events" in
let custom_config = { config with max_batch_size = 50000 }Abstract type representing an async inserter instance. Each inserter manages its own buffer, flush timer, and retry logic.
create config connection creates a new async inserter instance.
The inserter is created in a stopped state. Call start to begin processing.
Example:
let config = Async_insert.default_config "events" in
let inserter = Async_insert.create config client.conn in
Async_insert.start inserterstart inserter starts the background processing tasks.
Begins the flush timer and enables automatic batching. This function returns immediately; processing happens in the background.
stop inserter stops the inserter and flushes any remaining data.
Gracefully shuts down the inserter, ensuring all buffered data is sent to the server before returning. After stopping, the inserter cannot be restarted.
add_row ?columns inserter row adds a single row to the insert buffer.
The row is added to the internal buffer and will be sent to the server when batching thresholds are met or a flush occurs.
Example:
Async_insert.add_row inserter
~columns:[ ("timestamp", "DateTime"); ("value", "Float64") ]
[ Column.DateTime (Unix.time ()); Column.Float 42.0 ]add_rows ?columns inserter rows adds multiple rows to the insert buffer.
More efficient than calling add_row multiple times. The rows may be split across multiple batches if they exceed configured thresholds.
Example:
let rows =
List.init 1000 (fun i -> [ Column.Int i; Column.String (Printf.sprintf "event_%d" i) ])
in
Async_insert.add_rows inserter rowsflush inserter forces an immediate flush of buffered data.
Sends all currently buffered rows to the server immediately, regardless of batch size or timer settings. Useful for ensuring data persistence at specific points in your application.
get_stats inserter returns current buffer statistics.
Provides insight into the current state of the internal buffer, useful for monitoring and debugging.
Example:
let%lwt rows, bytes = Async_insert.get_stats inserter in
Printf.printf "Buffer contains %d rows (%d bytes)\n" rows bytes