package rizzo
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file signal.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197open Types let ( @: ) = fun h t -> Internals.MainTypes.signal_of_data (Internals.Heap.alloc h t) let const x = x @: never let head s = (Internals.MainTypes.signal_get_data s).head let rec mkSig d = (fun a -> a @: (mkSig d)) <<| d let mkSig_of_channel k = mkSig @@ wait k let init_signal k v = v @: mkSig_of_channel k let rec map f s = f (head s) @: (map f <<| tail s) let mapL f s = map f <<| s let rec map2 f xs ys = let cont = function | Fst xs' -> map2 f xs' ys | Snd ys' -> map2 f xs ys' | Both (xs',ys') -> map2 f xs' ys' in f (head xs) (head ys) @: (cont <<| sync (tail xs) (tail ys)) let rec switch s d = let cont = function | Fst s' -> switch s' d | Snd d' -> d' | Both (_, d') -> d' in head s @: (cont <<| (sync (tail s) d)) let rec zip xs ys = let cont = function | Fst xs' -> zip xs' ys | Snd ys' -> zip xs ys' | Both (xs', ys') -> zip xs' ys' in (head xs, head ys) @: (cont <<| sync (tail xs) (tail ys)) let rec switchS s d = let x, xs = head s, tail s in let cont = function | Fst xs' -> switchS xs' d | Snd f -> f x | Both(_,f) -> f x in x @: (cont <<| sync xs d) (** repeatedly switch whenever `d` ticks *) let rec switchR s d = let d' = (fun s' x -> switchR (head s' x) (tail s') ) <<| d in switchS s d' let pp_signal pp_a out s = let hd, tl = head s, tail s in Format.fprintf out "%a :: later(%a)" pp_a hd pp_later tl let rec scan f b s = let hd, tl = head s, tail s in let b' = f b hd in b' @: (scan f b' <<| tl) let scanL f b s = scan f b <<| s let sample xs ys = map (fun x -> (x, head ys)) xs let sampleL xs ys = xs |>> map (fun x -> (x, head ys)) let rec jump f s = let cont s = match f (head s) with | None -> jump f s | Some s' -> s' in head s @: (cont <<| (tail s)) let interleave : ('a -> 'a -> 'a) -> 'a signal -> 'a signal -> 'a signal = fun f xs ys -> (* Produce a value whenever either input signal advances. If only xs advances emit its new head; if only ys advances emit its new head; if both advance simultaneously emit f applied to their new heads. Initial element chosen as the current head of xs. *) let rec build current xs ys = let cont = function | Fst xs' -> build (head xs') xs' ys | Snd ys' -> build (head ys') xs ys' | Both (xs', ys') -> build (f (head xs') (head ys')) xs' ys' in current @: (cont <<| sync (tail xs) (tail ys)) in build (f (head xs) (head ys)) xs ys let filter_map p s = mkSig (trig (p (head s) @: mapL p (tail s))) let filter_mapL p s = mkSig (trig (None @: mapL p s)) let filter p = filter_map (fun x -> if p x then Some x else None) let filterL p = filter_mapL (fun x -> if p x then Some x else None) let trigger f s1 s2 = sample s1 s2 |> map (fun (a,b) -> f a b) let triggerL (f: 'a -> 'b -> 'c) (s1 : 'a signal later) (s2 : 'b signal) : 'c signal later = s1 |>> (fun s1' -> trigger f s1' s2) (* returns a channel that produces a tick every [interval] seconds, starting [interval] seconds from now *) let clock_channel interval = let start = Unix.gettimeofday () +. interval in let chan = new_channel () in let stop_flag = ref false in let rec aux next = if !stop_flag then () else try let now = Unix.gettimeofday () in let wait_time = max 0.0 (next -. now) in Unix.sleepf wait_time; Internals.Heap.step chan next; aux (next +. interval) with exn -> prerr_endline ("clock thread error: " ^ Printexc.to_string exn); (* simple backoff to avoid tight crash loop, then continue schedule *) Unix.sleepf (max 0.1 interval); aux (next +. interval) in let th = Thread.create (fun () -> aux start) () in (* return channel and a stop function so caller can cancel the clock cleanly *) (chan, (fun () -> stop_flag := true; Thread.join th)) let clock_signal interval = let chan, stop = clock_channel interval in let signal = init_signal chan (Unix.gettimeofday ()) in (signal, stop) let clock_signalL interval = let chan, stop = clock_channel interval in let signal = mkSig_of_channel chan in (signal, stop) (* TODO: Make the output methods return a callback that can clean up the output signal from the list *) let output_signals = ref [] let console_output (s : string signal) : unit = output_signals := map print_endline s :: !output_signals let console_outputL (s : string signal later) : unit = output_signals := switch (const ()) (mapL print_endline s) :: !output_signals let set_quit (s : 'a signal later) : unit = output_signals := switch (const ()) (mapL (fun _ -> exit 0) s) :: !output_signals let start_event_loop () : unit = while true do Thread.delay 1.0 (* adjust as needed; smaller = more responsive, larger = less CPU *) done (** Send newline-terminated strings from a later string signal to a TCP connection. Parameters: - address: the IPv4 address to connect to (use Unix.inet_addr_loopback for localhost) - port: the TCP port number to connect to - s: a later string signal to send values from Behavior: - Opens a TCP (PF_INET, SOCK_STREAM) connection to (address, port) once when called and obtains an OCaml out_channel for writing. - Registers a sender that writes each incoming string value followed by "\n" and flushes. - On write errors the channel is closed and the internal connection is marked None; subsequent sends will be no-ops (there is no automatic reconnect logic). - The sender is stored in output_signals so it will be driven by the runtime/event loop. *) let port_outputL address port (s : string signal later) : unit = let connect () = let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in Unix.connect sock (Unix.ADDR_INET (address, port)); Unix.out_channel_of_descr sock in let out_chan = ref (Some (connect ())) in let send v = match !out_chan with | None -> () | Some ch -> try output_string ch (v ^ "\n"); (* print_endline ("sent to port " ^ string_of_int port ^ ": " ^ v); *) flush ch; () with exn -> prerr_endline ("port_send_output error: " ^ Printexc.to_string exn); (try close_out ch with _ -> ()); out_chan := None; () in output_signals := switch (const ()) (mapL send s) :: !output_signals