package kaun

  1. Overview
  2. Docs

Module Kaun.DatasetSource

Efficient dataset handling for machine learning pipelines This module provides composable dataset transformations with support for:

  • Memory-mapped file reading (no OOM on large datasets)
  • Streaming and lazy evaluation
  • Efficient batching and padding
  • Shuffling with configurable buffer sizes
  • Multi-threaded data loading

All datasets are unified under the polymorphic 'a t type, with specializations via type aliases where helpful (e.g., for tensors). Text handling uses string t directly for better composability.

Core Types

Sourcetype 'a t

A dataset of elements of type 'a. Datasets are lazy, composable, and abstract. Use creation functions to build them and transformations to modify.

Sourcetype ('elt, 'kind) tensor_dataset = ('elt, 'kind) Rune.t t

Generalized dataset of tensors, parameterized over element, kind, and device

Sourcetype cardinality =
  1. | Finite of int
  2. | Unknown
  3. | Infinite
    (*

    Cardinality of a dataset: known finite length, unknown (but finite), or infinite

    *)
Sourcetype element_spec =
  1. | Unknown
  2. | Scalar of string
    (*

    e.g., "string" or "int"

    *)
  3. | Tensor of int array * string
    (*

    shape * dtype

    *)
  4. | Tuple of element_spec list
  5. | Array of element_spec
    (*

    Structured description of dataset element types, similar to TF's element_spec. Use for type-safe downstream processing.

    *)
Sourcetype tokenizer = string -> int array

Function type for pluggable tokenizers

Sourceval whitespace_tokenizer : tokenizer

Built-in whitespace tokenizer.

@warning The tokenizer maintains an internal mutable vocabulary and is not thread-safe. Create a fresh tokenizer when you need an isolated vocabulary.

Dataset Creation

Sourceval from_array : 'a array -> 'a t

from_array arr creates a dataset from an in-memory array

Sourceval from_list : 'a list -> 'a t

from_list lst creates a dataset from a list

Sourceval from_seq : 'a Seq.t -> 'a t

from_seq seq creates a dataset from a sequence

Sourceval from_tensor : ('elt, 'kind) Rune.t -> ('elt, 'kind) Rune.t t

from_tensor tensor creates a dataset where each element is a slice of the first dimension

Sourceval from_tensors : (('elt, 'kind) Rune.t * ('elt, 'kind) Rune.t) -> (('elt, 'kind) Rune.t * ('elt, 'kind) Rune.t) t

from_tensors (x, y) creates a dataset of (input, target) pairs

Sourceval from_file : (string -> 'a) -> string -> 'a t

from_file parser path creates a dataset from a file, parsing each line with parser

Text Data Sources

Sourceval from_text_file : ?encoding:[ `UTF8 | `ASCII | `LATIN1 ] -> ?chunk_size:int -> string -> string t

from_text_file ?encoding ?chunk_size path creates a memory-mapped text dataset yielding lines as strings.

  • encoding: Text encoding (default: UTF8). `LATIN1 is rejected with an Invalid_parameter error; use UTF-8 instead. `ASCII is treated as a UTF-8 subset.
  • chunk_size: Size of chunks to read at once (default: 64KB) Lines are streamed lazily and Windows style line endings (\\r\\n) are normalised to \\n.
Sourceval from_text_files : ?encoding:[ `UTF8 | `ASCII | `LATIN1 ] -> ?chunk_size:int -> string list -> string t

from_text_files paths creates a dataset from multiple text files. Files are processed sequentially without loading all into memory. The resulting dataset supports reset, restarting from the first file.

Sourceval from_jsonl : ?field:string -> string -> string t

from_jsonl ?field path reads a JSONL file where each line is a JSON object.

  • field: Extract text from this field (default: "text") Example JSONL format:

    {"text": "First document", "label": 0}
    {"text": "Second document", "label": 1}
Sourceval sliding_window : block_size:int -> tokenize:(string -> int list) -> string list -> ((float, Rune.float32_elt) Rune.t * (float, Rune.float32_elt) Rune.t) t

sliding_window ~block_size ~tokenize texts creates a dataset of sliding window context/target pairs for language modeling.

  • parameter block_size

    Size of the context window

  • parameter tokenize

    Function to convert text to token indices

  • parameter texts

    List of input texts (e.g., names for character-level modeling)

  • returns

    Dataset of (context, target) tensor pairs

Creates all possible sliding windows of size block_size from the input texts, where each window predicts the next token. Automatically handles padding with a special token.

Example:

  let dataset =
    sliding_window ~block_size:3
      ~tokenize:(fun s -> encode_chars ~vocab s)
      [ "hello"; "world" ]
  (* Generates windows like: "...h" -> "e" "..he" -> "l" ".hel" -> "l"
     "hell" -> "o" etc. *)
Sourceval from_csv : ?separator:char -> ?text_column:int -> ?has_header:bool -> string -> string t

from_csv ?separator ?text_column ?has_header path reads a CSV file and returns the text column as a dataset of strings. Rows that do not contain the requested column are skipped.

Sourceval from_csv_with_labels : ?separator:char -> ?text_column:int -> ?has_header:bool -> label_column:int -> string -> (string * string) t

from_csv_with_labels ?separator ?text_column ?has_header ~label_column path reads a CSV file and returns a dataset of (text, label) tuples. Rows missing either the text or label column are skipped.

Sourceval from_text : tokenizer:tokenizer -> string -> int array t

from_text ~tokenizer path reads a text file and returns a dataset of token ID arrays. The entire file is loaded into memory as a single document before tokenization. For streaming inputs prefer from_text_file pipelines.

Transformations

Sourceval map : ?spec:element_spec -> ('a -> 'b) -> 'a t -> 'b t

map ?spec f dataset applies function f to each element. Provide spec to describe the resulting element type when it is known.

Sourceval filter : ('a -> bool) -> 'a t -> 'a t

filter pred dataset keeps only elements satisfying pred

Sourceval flat_map : ('a -> 'b t) -> 'a t -> 'b t

flat_map f dataset maps and flattens nested datasets

Sourceval zip : 'a t -> 'b t -> ('a * 'b) t

zip ds1 ds2 pairs corresponding elements. Stops at shorter dataset.

Sourceval concatenate : 'a t -> 'a t -> 'a t

concatenate ds1 ds2 appends ds2 after ds1

Sourceval interleave : 'a t list -> 'a t

interleave datasets alternates between datasets in round-robin fashion

Sourceval enumerate : 'a t -> (int * 'a) t

enumerate dataset adds indices to elements, starting from 0

Text Processing

Sourceval tokenize : tokenizer -> ?max_length:int -> ?padding:[ `None | `Max of int | `Dynamic ] -> ?truncation:bool -> ?add_special_tokens:bool -> string t -> int array t

tokenize tokenizer ?max_length ?padding ?truncation dataset tokenizes text data using the provided tokenizer.

  • max_length: Maximum sequence length
  • padding: Padding strategy
  • truncation: Whether to truncate long sequences
  • add_special_tokens: Add <bos>, <eos> tokens
Sourceval normalize : ?lowercase:bool -> ?remove_punctuation:bool -> ?collapse_whitespace:bool -> string t -> string t

normalize ?lowercase ?remove_punctuation ?collapse_whitespace dataset applies text normalization

Batching

Sourceval batch : ?drop_remainder:bool -> int -> ((float, 'layout) Rune.t * (float, 'layout) Rune.t) t -> ((float, 'layout) Rune.t * (float, 'layout) Rune.t) t

batch ?drop_remainder size dataset groups tensor pairs into batches and automatically stacks them along the batch dimension.

  • drop_remainder: Drop final batch if incomplete (default: false)

This is the primary batching function for ML workflows where datasets contain (input, target) tensor pairs. The tensors are automatically stacked using Rune.stack ~axis:0.

Sourceval batch_map : ?drop_remainder:bool -> int -> ('a array -> 'b) -> 'a t -> 'b t

batch_map ?drop_remainder size f dataset groups elements into batches and applies function f to each batch.

This is useful for custom batching logic that can't be handled by batch or batch_array.

Sourceval bucket_by_length : ?boundaries:int list -> ?batch_sizes:int list -> ?drop_remainder:bool -> ('a -> int) -> 'a t -> 'a array t

bucket_by_length ?boundaries ?batch_sizes ?drop_remainder length_fn dataset groups elements into buckets by length for efficient padding. Example:

  bucket_by_length ~boundaries:[ 10; 20; 30 ] ~batch_sizes:[ 32; 16; 8; 4 ]
    (fun text -> String.length text)
    dataset

Creates 4 buckets: <10, 10-20, 20-30, >30 with different batch sizes. Partial batches are dropped when drop_remainder is true.

Shuffling and Sampling

Sourceval shuffle : ?rng:Rune.Rng.key -> ?buffer_size:int -> 'a t -> 'a t

shuffle ?rng ?buffer_size dataset randomly shuffles elements.

  • rng: Random state for reproducibility (default: self-init)
  • buffer_size: Size of shuffle buffer (default: 10000) Uses a buffer to shuffle without loading entire dataset in memory.
Sourceval sample : ?rng:Rune.Rng.key -> ?replacement:bool -> int -> 'a t -> 'a t

sample ?rng ?replacement n dataset randomly samples n elements

Sourceval weighted_sample : ?rng:Rune.Rng.key -> weights:float array -> int -> 'a t -> 'a t

weighted_sample ?rng ~weights n dataset samples with given weights

Iteration Control

Sourceval take : int -> 'a t -> 'a t

take n dataset takes first n elements

Sourceval skip : int -> 'a t -> 'a t

skip n dataset skips first n elements

Sourceval repeat : ?count:int -> 'a t -> 'a t

repeat ?count dataset repeats dataset. Infinite if count not specified.

Sourceval window : ?shift:int -> ?stride:int -> ?drop_remainder:bool -> int -> 'a t -> 'a array t

window ?shift ?stride ?drop_remainder size dataset creates sliding windows.

  • shift: How far to advance between windows (default: size)
  • stride: Subsample stride within each emitted window (default: 1) Example: window ~shift:1 3 dataset produces overlapping windows of size 3.

Caching and Prefetching

Sourceval cache : ?directory:string -> 'a t -> 'a t

cache ?directory dataset caches dataset elements.

  • directory: Directory for file cache, in-memory if not specified
Sourceval prefetch : ?buffer_size:int -> 'a t -> 'a t

prefetch ?buffer_size dataset pre-fetches elements on a background domain.

  • buffer_size: Number of elements to prefetch (default: 2) Prefetching stops automatically when the dataset is exhausted or reset.

Parallel Processing

Sourceval parallel_map : ?pool:Domainslib.Task.pool -> ?num_workers:int -> ('a -> 'b) -> 'a t -> 'b t

parallel_map ?pool ?num_workers f dataset applies f using multiple workers.

  • pool: Reuse an existing Domainslib.Task.pool; when omitted an internal pool is created and torn down automatically.
  • num_workers: Number of parallel workers (default: CPU count) Exceptions raised by f are propagated to the consumer immediately.
Sourceval parallel_interleave : ?num_workers:int -> ?block_length:int -> ('a -> 'b t) -> 'a t -> 'b t

parallel_interleave ?num_workers ?block_length f dataset applies f in parallel and interleaves results

High-level Pipeline

Sourceval prepare : ?shuffle_buffer:int -> ?batch_size:int -> ?prefetch:int -> ?cache:bool -> ?drop_remainder:bool -> ((float, 'layout) Rune.t * (float, 'layout) Rune.t) t -> ((float, 'layout) Rune.t * (float, 'layout) Rune.t) t

prepare ?shuffle_buffer ?batch_size ?prefetch ?cache ?drop_remainder dataset applies common preprocessing pipeline for tensor datasets: 1. Cache (if enabled) 2. Shuffle (if buffer size provided) 3. Batch with automatic tensor stacking (if batch size provided) 4. Prefetch (if prefetch count provided)

This is the primary pipeline function for ML training data.

Iteration

Sourceval iter : ('a -> unit) -> 'a t -> unit

iter f dataset applies f to each element for side effects

Sourceval fold : ('acc -> 'a -> 'acc) -> 'acc -> 'a t -> 'acc

fold f init dataset folds over dataset elements

Sourceval to_seq : 'a t -> 'a Seq.t

to_seq dataset converts to a sequence for lazy iteration

Sourceval to_list : 'a t -> 'a list

to_list dataset materializes dataset as list. Warning: loads all into memory.

Sourceval to_array : 'a t -> 'a array

to_array dataset materializes dataset as array. Warning: loads all into memory.

Dataset Information

Sourceval cardinality : 'a t -> cardinality

cardinality dataset returns the cardinality (finite length, unknown, or infinite)

Sourceval element_spec : 'a t -> element_spec

element_spec dataset returns a structured description of element types

Dataset Control

Sourceval reset : 'a t -> unit

reset dataset resets the dataset to its initial state if supported. This makes it possible to iterate a dataset multiple times (e.g., across training epochs). If the dataset does not support reset, this is a no-op.

Common Pipelines

Sourceval text_classification_pipeline : ?tokenizer:tokenizer -> ?max_length:int -> ?batch_size:int -> ?shuffle_buffer:int -> ?num_workers:int -> string t -> (int32, Rune.int32_elt) Rune.t t

Pre-configured pipeline for text classification tasks. Returns batched token tensors ready for embedding layers.

Sourceval language_model_pipeline : ?tokenizer:tokenizer -> ?sequence_length:int -> ?batch_size:int -> ?shuffle_buffer:int -> ?num_workers:int -> string t -> ((int32, Rune.int32_elt) Rune.t * (int32, Rune.int32_elt) Rune.t) t

Pre-configured pipeline for language modeling. Returns batched (input, target) tensor pairs ready for training.

Examples

  (* Load and process text data *)
  let dataset =
    from_text_file "data/corpus.txt"
    |> tokenize whitespace_tokenizer ~max_length:512
    |> shuffle ~buffer_size:10000
    |> batch 32
    |> prefetch ~buffer_size:2
  (* Iterate through batches *)
  dataset
  |> iter (fun batch ->
         let tensor = process_batch batch in
         train_step model tensor)

  (* Multi-file dataset with bucketing *)
  let dataset =
    from_text_files [ "shard1.txt"; "shard2.txt"; "shard3.txt" ]
    |> normalize ~lowercase:true
    |> tokenize whitespace_tokenizer
    |> bucket_by_length ~boundaries:[ 100; 200; 300 ]
         ~batch_sizes:[ 64; 32; 16; 8 ] Array.length
    |> prefetch

  (* Parallel processing *)
  let dataset =
    from_jsonl "data.jsonl"
    |> parallel_map ~num_workers:4 preprocess
    |> cache ~directory:"/tmp/cache"
    |> shuffle ~buffer_size:50000
    |> batch 128

  (* Custom tokenizer and tensor batching *)
  let custom_tok = fun s -> (* ... *) [|1;2;3|] in
  let tensor_ds =
    from_text_file "texts.txt"
    |> tokenize custom_tok
    |> batch_map 32 (Rune.stack ~axis:0)