Page
Library
Module
Module type
Parameter
Class
Class type
Source
Kaun.DatasetSourceEfficient dataset handling for machine learning pipelines This module provides composable dataset transformations with support for:
All datasets are unified under the polymorphic 'a t type, with specializations via type aliases where helpful (e.g., for tensors). Text handling uses string t directly for better composability.
A dataset of elements of type 'a. Datasets are lazy, composable, and abstract. Use creation functions to build them and transformations to modify.
Generalized dataset of tensors, parameterized over element, kind, and device
type element_spec = | Unknown| Scalar of stringe.g., "string" or "int"
*)| Tensor of int array * stringshape * dtype
*)| Tuple of element_spec list| Array of element_specStructured description of dataset element types, similar to TF's element_spec. Use for type-safe downstream processing.
*)Function type for pluggable tokenizers
Built-in whitespace tokenizer.
@warning The tokenizer maintains an internal mutable vocabulary and is not thread-safe. Create a fresh tokenizer when you need an isolated vocabulary.
from_tensor tensor creates a dataset where each element is a slice of the first dimension
val from_tensors :
(('elt, 'kind) Rune.t * ('elt, 'kind) Rune.t) ->
(('elt, 'kind) Rune.t * ('elt, 'kind) Rune.t) tfrom_tensors (x, y) creates a dataset of (input, target) pairs
from_file parser path creates a dataset from a file, parsing each line with parser
val from_text_file :
?encoding:[ `UTF8 | `ASCII | `LATIN1 ] ->
?chunk_size:int ->
string ->
string tfrom_text_file ?encoding ?chunk_size path creates a memory-mapped text dataset yielding lines as strings.
encoding: Text encoding (default: UTF8). `LATIN1 is rejected with an Invalid_parameter error; use UTF-8 instead. `ASCII is treated as a UTF-8 subset.chunk_size: Size of chunks to read at once (default: 64KB) Lines are streamed lazily and Windows style line endings (\\r\\n) are normalised to \\n.val from_text_files :
?encoding:[ `UTF8 | `ASCII | `LATIN1 ] ->
?chunk_size:int ->
string list ->
string tfrom_text_files paths creates a dataset from multiple text files. Files are processed sequentially without loading all into memory. The resulting dataset supports reset, restarting from the first file.
from_jsonl ?field path reads a JSONL file where each line is a JSON object.
field: Extract text from this field (default: "text") Example JSONL format:
{"text": "First document", "label": 0}
{"text": "Second document", "label": 1}val sliding_window :
block_size:int ->
tokenize:(string -> int list) ->
string list ->
((float, Rune.float32_elt) Rune.t * (float, Rune.float32_elt) Rune.t) tsliding_window ~block_size ~tokenize texts creates a dataset of sliding window context/target pairs for language modeling.
Creates all possible sliding windows of size block_size from the input texts, where each window predicts the next token. Automatically handles padding with a special token.
Example:
let dataset =
sliding_window ~block_size:3
~tokenize:(fun s -> encode_chars ~vocab s)
[ "hello"; "world" ]
(* Generates windows like: "...h" -> "e" "..he" -> "l" ".hel" -> "l"
"hell" -> "o" etc. *)from_csv ?separator ?text_column ?has_header path reads a CSV file and returns the text column as a dataset of strings. Rows that do not contain the requested column are skipped.
val from_csv_with_labels :
?separator:char ->
?text_column:int ->
?has_header:bool ->
label_column:int ->
string ->
(string * string) tfrom_csv_with_labels ?separator ?text_column ?has_header ~label_column path reads a CSV file and returns a dataset of (text, label) tuples. Rows missing either the text or label column are skipped.
from_text ~tokenizer path reads a text file and returns a dataset of token ID arrays. The entire file is loaded into memory as a single document before tokenization. For streaming inputs prefer from_text_file pipelines.
map ?spec f dataset applies function f to each element. Provide spec to describe the resulting element type when it is known.
filter pred dataset keeps only elements satisfying pred
flat_map f dataset maps and flattens nested datasets
zip ds1 ds2 pairs corresponding elements. Stops at shorter dataset.
interleave datasets alternates between datasets in round-robin fashion
enumerate dataset adds indices to elements, starting from 0
val tokenize :
tokenizer ->
?max_length:int ->
?padding:[ `None | `Max of int | `Dynamic ] ->
?truncation:bool ->
?add_special_tokens:bool ->
string t ->
int array ttokenize tokenizer ?max_length ?padding ?truncation dataset tokenizes text data using the provided tokenizer.
max_length: Maximum sequence lengthpadding: Padding strategytruncation: Whether to truncate long sequencesadd_special_tokens: Add <bos>, <eos> tokensval normalize :
?lowercase:bool ->
?remove_punctuation:bool ->
?collapse_whitespace:bool ->
string t ->
string tnormalize ?lowercase ?remove_punctuation ?collapse_whitespace dataset applies text normalization
val batch :
?drop_remainder:bool ->
int ->
((float, 'layout) Rune.t * (float, 'layout) Rune.t) t ->
((float, 'layout) Rune.t * (float, 'layout) Rune.t) tbatch ?drop_remainder size dataset groups tensor pairs into batches and automatically stacks them along the batch dimension.
drop_remainder: Drop final batch if incomplete (default: false)This is the primary batching function for ML workflows where datasets contain (input, target) tensor pairs. The tensors are automatically stacked using Rune.stack ~axis:0.
batch_map ?drop_remainder size f dataset groups elements into batches and applies function f to each batch.
This is useful for custom batching logic that can't be handled by batch or batch_array.
val bucket_by_length :
?boundaries:int list ->
?batch_sizes:int list ->
?drop_remainder:bool ->
('a -> int) ->
'a t ->
'a array tbucket_by_length ?boundaries ?batch_sizes ?drop_remainder length_fn dataset groups elements into buckets by length for efficient padding. Example:
bucket_by_length ~boundaries:[ 10; 20; 30 ] ~batch_sizes:[ 32; 16; 8; 4 ]
(fun text -> String.length text)
datasetCreates 4 buckets: <10, 10-20, 20-30, >30 with different batch sizes. Partial batches are dropped when drop_remainder is true.
shuffle ?rng ?buffer_size dataset randomly shuffles elements.
rng: Random state for reproducibility (default: self-init)buffer_size: Size of shuffle buffer (default: 10000) Uses a buffer to shuffle without loading entire dataset in memory.sample ?rng ?replacement n dataset randomly samples n elements
weighted_sample ?rng ~weights n dataset samples with given weights
repeat ?count dataset repeats dataset. Infinite if count not specified.
window ?shift ?stride ?drop_remainder size dataset creates sliding windows.
shift: How far to advance between windows (default: size)stride: Subsample stride within each emitted window (default: 1) Example: window ~shift:1 3 dataset produces overlapping windows of size 3.cache ?directory dataset caches dataset elements.
directory: Directory for file cache, in-memory if not specifiedprefetch ?buffer_size dataset pre-fetches elements on a background domain.
buffer_size: Number of elements to prefetch (default: 2) Prefetching stops automatically when the dataset is exhausted or reset.val parallel_map :
?pool:Domainslib.Task.pool ->
?num_workers:int ->
('a -> 'b) ->
'a t ->
'b tparallel_map ?pool ?num_workers f dataset applies f using multiple workers.
pool: Reuse an existing Domainslib.Task.pool; when omitted an internal pool is created and torn down automatically.num_workers: Number of parallel workers (default: CPU count) Exceptions raised by f are propagated to the consumer immediately.val parallel_interleave :
?num_workers:int ->
?block_length:int ->
('a -> 'b t) ->
'a t ->
'b tparallel_interleave ?num_workers ?block_length f dataset applies f in parallel and interleaves results
val prepare :
?shuffle_buffer:int ->
?batch_size:int ->
?prefetch:int ->
?cache:bool ->
?drop_remainder:bool ->
((float, 'layout) Rune.t * (float, 'layout) Rune.t) t ->
((float, 'layout) Rune.t * (float, 'layout) Rune.t) tprepare ?shuffle_buffer ?batch_size ?prefetch ?cache ?drop_remainder dataset applies common preprocessing pipeline for tensor datasets: 1. Cache (if enabled) 2. Shuffle (if buffer size provided) 3. Batch with automatic tensor stacking (if batch size provided) 4. Prefetch (if prefetch count provided)
This is the primary pipeline function for ML training data.
iter f dataset applies f to each element for side effects
fold f init dataset folds over dataset elements
to_list dataset materializes dataset as list. Warning: loads all into memory.
to_array dataset materializes dataset as array. Warning: loads all into memory.
cardinality dataset returns the cardinality (finite length, unknown, or infinite)
element_spec dataset returns a structured description of element types
reset dataset resets the dataset to its initial state if supported. This makes it possible to iterate a dataset multiple times (e.g., across training epochs). If the dataset does not support reset, this is a no-op.
val text_classification_pipeline :
?tokenizer:tokenizer ->
?max_length:int ->
?batch_size:int ->
?shuffle_buffer:int ->
?num_workers:int ->
string t ->
(int32, Rune.int32_elt) Rune.t tPre-configured pipeline for text classification tasks. Returns batched token tensors ready for embedding layers.
val language_model_pipeline :
?tokenizer:tokenizer ->
?sequence_length:int ->
?batch_size:int ->
?shuffle_buffer:int ->
?num_workers:int ->
string t ->
((int32, Rune.int32_elt) Rune.t * (int32, Rune.int32_elt) Rune.t) tPre-configured pipeline for language modeling. Returns batched (input, target) tensor pairs ready for training.
(* Load and process text data *)
let dataset =
from_text_file "data/corpus.txt"
|> tokenize whitespace_tokenizer ~max_length:512
|> shuffle ~buffer_size:10000
|> batch 32
|> prefetch ~buffer_size:2
(* Iterate through batches *)
dataset
|> iter (fun batch ->
let tensor = process_batch batch in
train_step model tensor)
(* Multi-file dataset with bucketing *)
let dataset =
from_text_files [ "shard1.txt"; "shard2.txt"; "shard3.txt" ]
|> normalize ~lowercase:true
|> tokenize whitespace_tokenizer
|> bucket_by_length ~boundaries:[ 100; 200; 300 ]
~batch_sizes:[ 64; 32; 16; 8 ] Array.length
|> prefetch
(* Parallel processing *)
let dataset =
from_jsonl "data.jsonl"
|> parallel_map ~num_workers:4 preprocess
|> cache ~directory:"/tmp/cache"
|> shuffle ~buffer_size:50000
|> batch 128
(* Custom tokenizer and tensor batching *)
let custom_tok = fun s -> (* ... *) [|1;2;3|] in
let tensor_ds =
from_text_file "texts.txt"
|> tokenize custom_tok
|> batch_map 32 (Rune.stack ~axis:0)