- Export
Time_source
from Async_kernel
. - Name the non-
t
arguments to Error.tag
and similar functions to allow easier partial application. - Name the non-
t
arguments to Error.tag
and similar functions to allow easier partial application. Switched if-then-else style from:
if test then begin E1; E2; end else begin E3; E4; end
to:
if test then ( E1; E2) else ( E3; E4);
This style puts keywords at the start of lines and closing delimiters at the end. And makes it lighter-weight to be more consistent about using the delimiters.
- Switched
Async_kernel
from >>=
and >>|
to let%bind
and let%map
, where appropriate. - Mirroring
Or_error.error_s
, like several of the other Or_error
functions are mirrored - Improved a unit test in
Pipe
. - Removed from
Monitor.try_with
's implementation the use of Deferred.choose
, which is very costly in terms of allocation and memory use. Reworked the implementation of Monitor.try_with
and related functions, using a new underlying concept, Ok_and_exns
, which represents the output of a computation running in a detached monitor, i.e.:
type 'a t = { ok : 'a Deferred.t ; exns : exn Stream.t }
Using this explicit type simplifies reasoning, as compared to implicit monitor error streams.
- Some minor cleanup in
core_kernel/src/scheduler.ml
. - Added
Scheduler.make_async_unusable
. Added Pipe.create_reader
, which is like Pipe.init
, but has different behavior w.r.t. f
raising. It forces the caller to choose the behavior on raise to avoid subtle bugs related to closing the reading end. The comment recommends ~close_on_exception:false
.
val create_reader : close_on_exception:bool -> ('a Writer.t -> unit Deferred.t) -> 'a Reader.t
A child feature deprecates init
and replaces calls with create_reader ~close_on_exception:true
.
Pipe.init_reader
was renamed to Pipe.create_writer
so that the names are consistent. Its behaviour was not changed: it also closes the pipe if f
raises, but here this is probably what you want, so that the writer is notified that the reader is dead.
Changed Pipe.of_sequence
, which is implemented with create_reader
, to use ~close_on_exception:false
. This seems like an improvement, because it won't treat raising as end of sequence.
Changed Pipe.unfold
, which is implemented with create_reader
, to use the ~close_on_exception:false
. This shouldn't be a problem, because unfold
is barely used. And it was never specified what f
raising means. It seems fine to not treat raising as end-of-sequence, because unfold
already has a way to express end-of-sequence.
Here's some more explanation of motivation for the change. In the current world, Pipe.init
looks like this:
let init f = let r, w = create () in don't_wait_for (Monitor.protect (fun () -> f w) ~finally:(fun () -> close w; Deferred.unit)); r ;;
This means that if f
raises, then the pipe is both closed and the exception is sent to the monitor active when init
was called.
If you have something (like Pipe.fold_without_pushback
) consuming the pipe, the exception being delivered can race against the fold finishing, and you could end up missing it. Moreover, the race seems to reliably go in the direction you don't want:
$ cat pipes.ml #!/some/path/jane-script run open Core.Std open Async.Std
let main () = Monitor.try_with_or_error (fun () -> Pipe.init (fun _writer -> assert false) |> Pipe.fold_without_pushback ~init:() ~f:(fun () () -> ()) ) >>= fun res -> printf !"Result: %{sexp:unit Or_error.t}\n" res; exit 0
let () = don't_wait_for (main ()); never_returns (Scheduler.go ())
$ ./pipes.ml Result: (Ok ()) 2015-10-21 15:42:22.494737+01:00 Error Exception raised to Monitor.try_with that already returned (monitor.ml.Error_ ((exn "Assert_failure /home/toto/pipes.ml:7:30") snip
Deprecated Pipe.init
and replaced its uses with the semantically equivalent:
Pipe.create_reader ~close_on_exception:true
See the parent feature for an explanation of why Pipe.init
had the wrong default behavior.
If this is the third time you're reading this feature, please accept my apologies! Review of this feature produced requests for the parent feature.
Adds phantom type to Validated as witness of invariant
The witness prevents types from different applications of the Validated functors from unifying with one another.
- add compare to pipe on the internal id
Added Bvar
, a new basic data structure, to Async.
Bvar
is like an Ivar
that can be filled multiple times, or can be thought of as a condition variable that supports broadcast
but does not support signal
.
- Switch the implementation of
Scheduler.yield
to use a Bvar.t
Adds Scheduler.yield_until_no_jobs_remain
which is determined when the transitive closure of all currently runnable jobs and jobs that become runnable in the process of their execution have been run to completion.
This differs from Scheduler.yield
whose result is determined as soon as the scheduler has completed the current cycle.
Monad.Let_syntax.return
Added to Monad.Syntax.Let_syntax
:
val return : 'a -> 'a t
so that when one does:
open Some_random_monad.Let_syntax
return
is in scope.
- Added
Bvar.invariant
. - Moved the newly introduced expect test from
Async_kernel
to Async_kernel_test
, to avoid having Async_kernel
depend on Unix
, which breaks the 32-bit build. An experiment to make async errors more readable.
Before:
(monitor.ml.Error_ ((exn (Failure "something went wrong")) (backtrace ("Raised at file "pervasives.ml", line 30, characters 22-33" "Called from file "deferred0.ml", line 64, characters 64-69" "Called from file "job_queue.ml", line 160, characters 6-47" "")) (monitor (((name try_with) (here ()) (id 2) (has_seen_error true) (is_detached true))))))
(monitor.ml.Error_ ((exn (Failure "something went wrong")) (backtrace ("Raised at file "pervasives.ml", line 30, characters 22-33" "Called from file "deferred0.ml", line 64, characters 64-69" "Called from file "job_queue.ml", line 160, characters 6-47" "")) (monitor (((name foo) (here (lib/async/a.ml:13:56)) (id 2) (has_seen_error true) (is_detached true))))))
After:
(monitor.ml.Error (Failure "something went wrong") ("Raised at file "pervasives.ml", line 30, characters 22-33" "Called from file "deferred0.ml", line 64, characters 64-69" "Called from file "job_queue.ml", line 160, characters 6-47")))
(monitor.ml.Error (Failure "something went wrong") ("Raised at file "pervasives.ml", line 30, characters 22-33" "Called from file "deferred0.ml", line 64, characters 64-69" "Called from file "job_queue.ml", line 160, characters 6-47" "Caught by monitor foo at file "lib/async/a.ml", line 13, characters 56-56"))
So:
- remove the trailing "" in the backtrace
- remove the ugly underscore in the exception name
- remove the monitor when it contains nothing useful
- when the monitor contains something useful, namely name or position, just stick that information in the rest of the backtrace
- remove the fields exception, backtrace, monitor. It's ok in the code, but in the sexp it doesn't add much.
- remove the names for the monitors from
Monitor.try_with, Deferred.Or_error.try_with
and Deferred.Or_error.try_with_join
since they are so common they contain no information
A child feature tries to remove the useless bits of backtraces, since they can represent a significant source of noise (in the examples above, the whole backtraces are noise).
Added module Async.Std.Require_explicit_time_source
, so that one can require code to be explicit about what time source is used and not unintentionally use the wall clock. The idiom is to do:
open! Require_explicit_time_source
or, in an import.ml:
include Require_explicit_time_source
- Added a test demonstrating a space leak in
Monitor.try_with
, in which the Ok
result is held on to by background jobs. Added Async_kernel.Ivar_filler
a new module that is a reference to an ivar that allows one to fill the ivar, but not to read it. This allows the implementation to drop the reference to the ivar once it is full, which can be useful to avoid holding onto unused memory.
This will be used to fix the space leak in Monitor.try_with
.
Fixed the space leak in Monitor.try_with
, using an Ivar_filler
to avoid holding on to memory unnecessarily.
Here are the allocation numbers for lib/async/bench/try_with.ml
at the base and tip of the feature.
| | base | tip |
|----------------+------+-----|
| minor words | 113 | 120 |
| promoted words | 74 | 78 |
| live words | 71 | 75 |
Unsurprisingly, the Ivar_filler
costs a little. But not as much as switching the implementation back to choose
.
Remove more noise from the async errors, namely useless backtraces that say things like "and this is called in the scheduler, in Deferred.bind". We already know the error comes from async from the monitor.ml.Error line. Also this is frequently repeated, when several monitor errors are nested, making things worse.
Here is a synthetic example: before:
((monitor.ml.Error (Failure "something went wrong") ("Raised at file "pervasives.ml", line 30, characters 22-33" "Called from file "a.ml", line 16, characters 17-48" "Called from file "deferred0.ml", line 64, characters 64-69" "Called from file "job_queue.ml", line 160, characters 6-47")) (monitor.ml.Error (Failure "something went wrong") ("Raised at file "pervasives.ml", line 30, characters 22-33" "Called from file "a.ml", line 22, characters 26-57" "Called from file "deferred1.ml", line 14, characters 63-68" "Called from file "job_queue.ml", line 160, characters 6-47")) (monitor.ml.Error (Failure "something went wrong") ("Raised at file "pervasives.ml", line 30, characters 22-33" "Called from file "monitor.ml", line 256, characters 42-51" "Called from file "job_queue.ml", line 160, characters 6-47")) (monitor.ml.Error (Failure "something went wrong") ("Raised at file "pervasives.ml", line 30, characters 22-33" "Called from file "a.ml", line 29, characters 26-57" "Called from file "result.ml", line 105, characters 9-15")))
after:
((monitor.ml.Error (Failure "something went wrong") ("Raised at file "pervasives.ml", line 30, characters 22-33" "Called from file "a.ml", line 16, characters 17-48")) (monitor.ml.Error (Failure "something went wrong") ("Raised at file "pervasives.ml", line 30, characters 22-33" "Called from file "a.ml", line 22, characters 26-57")) (monitor.ml.Error (Failure "something went wrong") ("Raised at file "pervasives.ml", line 30, characters 22-33")) (monitor.ml.Error (Failure "something went wrong") ("Raised at file "pervasives.ml", line 30, characters 22-33" "Called from file "a.ml", line 29, characters 26-57" "Called from file "result.ml", line 105, characters 9-15")))
N.B. some interface change in Core (notably to Hashtbl
and Map
) implied some interface change in this package as well, although they are not mentionned in this changelog.
- Switched to ppx.
- Improved the Async scheduler's to allocate a
handle_fired
function once, rather than every time it calls advance_clock
. Removed configurability of Monitor's try_with
-ignored-exception handling, i.e. removed Monitor.try_with_rest_handling
and Monitor.try_with_ignored_exn_handling
.
The behavior of exceptions raised to a monitor after it returns is unchanged -- i.e. they are logged, as they have been since 112.28.
Changed Monitor.try_with
's ?rest
argument from:
?rest :
Ignore | Raise
?rest :
Log | Raise
This naming reflects the fact that subsequent exceptions are logged, not ignored.
In Async_kernel
, moved Scheduler.set_execution_context
from the Types.Scheduler
module to its own file. Because Types
is a module rec
, set_execution_context
hadn't been inlined and was called via caml_apply2
. In its own file, it will be inlined.
This release creates a new scheduler0.ml, and moves the old scheduler0.ml to scheduler1.ml.
Fixed a space leak in Pipe
due to a pipe holding pointers to its upstream_flusheds
after they are closed. The leak shows up in Pipe.transfer
and related functions, e.g. with:
Pipe.transfer temporary_pipe long_lived_pipe
called repeatedly, in which long_lived_pipe
would accumulate a large number of upstream_flusheds
.
The fix is to maintain upstream_flusheds
as a Bag.t
, and to remove an upstream pipe when it is closed.
- Implement
Pipe.of_sequence
- Improved the error message when an exception is raised to a
Monitor.try_with
that has returned before Async has initialized Monitor.try_with_log_exn
. Improved the implementation of Monitor.get_next_error
, by replacing the monitor's list of handlers:
; mutable handlers_for_next_error : (exn -> unit) list
with a single ivar:
; mutable next_error : exn Ivar.t
I think this wasn't done originally because of a dependency cycle. But now that we have types.ml, we can do the clear thing.
- Improved the implementation of Monitor exception handling, i.e.
detach_and_iter_errors
to make it clear that Monitor.send_exn
does not run user code -- it only schedules jobs. - Fix an error message in
Pipe
to match the condition that led to it. Add a new pipe constructor:
val unfold : 'b -> f:('b -> ('a * 'b) option Deferred.t) -> 'a Reader.t
unfold
is more powerful than the combination of
Useful for, e.g., creating a pipe of natural numbers:
Pipe.unfold 0 ~f:(fun n -> return (Some (n, n+1)))
Add Deferred.Map.all
similar to Deferred.List.all
.
This does what you would expect:
val all : ('a, 'b Deferred.t, 'cmp) Map.t -> ('a, 'b, 'cmp) Map.t Deferred.t
Added some simple functions that seem missing from Deferred
and Ivar
.
val Ivar.peek : 'a t -> 'a option val Ivar.value_exn : 'a t -> 'a val Deferred.value_exn : 'a t -> 'a
- Add
Monitor.catch_error
, which provides error handling for processes/subsystems intended to run forever. Added to the Async scheduler a configurable:
min_inter_cycle_timeout : Time_ns.Span.t
When scheduler calls epoll(), it uses a timeout of at least min_inter_cycle_timeout
.
min_inter_cycle_timeout
can be configured via ASYNC_CONFIG
or via
val Scheduler.set_min_inter_cycle_timeout : Time_ns.Span.t -> unit
This allows one to tweak the scheduler to be more fair w.r.t. threads, e.g. with:
Scheduler.set_min_inter_cycle_timeout <- Time_ns.Span.of_us 1.;
- Optimized
Scheduler.schedule'
to avoid a closure allocation. - Removed
Monitor.kill
, which was unused internally. This removes the kill_index
field from Execution_context.t
, which saves us a word everytime we allocate or store an execution context. - Assert that
Deferred.forever
never returns statically, rather than dynamically. - Changed
Async.Std
to not include Deferred.Monad_syntax
, so that one must explicitly opt in (via open Deferred.Monad_syntax
) to use let%bind
syntax with Async. - Add
Pipe.to_sequence
Make Stream.closed s
return immediately when s
is already closed.
Currently the following property holds:
for any s, Deferred.peek (Stream.closed s) = None
For Pipe
functions that deal with batches of elements in a queue, added an optional argument:
?max_queue_length : int (** default is Int.max_value
*)
This limits the size of the queue that is used, which can improve Async fairness.
Affected functions are:
filter_map filter_map' fold' iter' map' read' read_now' transfer' transfer_id
This also obviates read_at_most
and read_now_at_most
, which we will deprecate in a later release.
Removed a couple helper types, iter
and fold
, that had been used to express commonality among functions, but were becoming unwieldy due to differences.
- Changed
Pipe
's default max_queue_length
from Int.max_value
to 100. Added to Pipe.iter_without_pushback
an optional argument:
?max_iterations_per_job : int (** default is Int.max_value
*)
iter_without_pushback
will not make more than max_iterations_per_job
calls to f
in a single Async_job; this can be used to increase Async-scheduling fairness.
Added Pipe.write_if_open
which does exactly what it says. This is a common pattern. Also added a pushback-oblivious variant write_without_pushback_if_open
.
Call sites for these two new functions were introduced wherever I found that doing so would not introduce any side effects (even counting allocation) in the case of a closed pipe.
- Added
Clock.Event.run_at
and run_after
. Eliminated a space leak in Clock.with_timeout
.
Previously Clock.with_timeout span d
created a deferred that waited for span
even if d
(and hence with_timeout
) became determined sooner. Now, with_timeout
aborts the clock alarm if d
becomes determined, which saves space in Async's timing wheel.
Added Clock.Event.fired
, and removed the fired
value that was returned by at
and after
.
val fired : t -> [ `Happened | `Aborted ] Deferred.t
- Added
Clock.Event.reschedule_{at,after}
. Fixed the space leak that caused nested Monitor.try_with
to use linear space rather than constant space.
Changed Monitor.try_with_ignored_exn_handling
so that with Eprintf` or
Run f ``, the error processing runs in
Monitor.mainrather than in the monitor that called
Monitor.try_with`. This avoids space leaks due to chains of monitors, e.g.:
open! Core.Std
open! Async.Std
let () =
Monitor.try_with_ignored_exn_handling := `Run ignore;
let num_monitors = 10_000_000 in
let num_remaining = ref num_monitors in
let rec loop n : unit =
if n > 0
then
upon
(Monitor.try_with
(fun () ->
loop (n - 1);
return ()))
(function
| Error _ -> assert false
| Ok () ->
decr num_remaining;
if !num_remaining = 0 then shutdown 0)
in
loop num_monitors;
never_returns (Scheduler.go ());
;;
Added a unit test to detect if nested monitors leak space.
Removed Lazy_deferred.follow
, which is not used in the tree anymore.
Removing this function allows Lazy_deferred.t
to be implemented as:
type 'a t = 'a Deferred.t Lazy.t
although that's not done yet.
Added hooks to Async_kernel.Scheduler
for js_of_ocaml
.
This hook aims to be called every time one add a job to the scheduler (enqueue + timing_wheel).
- Made
Async_kernel
not depend on thread. - Added
Deferred.Memo
, which wraps functions in Core.Memo
to correctly handle asynchronous exceptions. Renamed Pipe
and Thread_safe_pipe
functions that clear their input queue so that their name starts with transfer_in
, to make it clearer that they side effect their input.
old name | new name |
---|
write'
| transfer_in
|
write_without_pushback'
| transfer_in_without_pushback
|
Added Pipe.init_reader
, symmetric to Pipe.init
.
val init : ('a Writer.t -> unit Deferred.t) -> 'a Reader.t
val init_reader : ('a Reader.t -> unit Deferred.t) -> 'a Writer.t
Changed Async_kernel.Job_queue.run_jobs
to call Exn.backtrace
immediately after catching an unhandled exception
There should be no change in behavior. This change was to make it more clear that there is no intervening code that interferes with the global backtrace state.
- Made
Deferred
functions that take an argument ?how:[
Parallel | Sequential ]
accept `Max_concurrent_jobs of int
, which operates in a sequence in parallel, limited via a throttle. - Made
Deferred.Or_error
match Applicative.S
. - Fixed
Scheduler.run_cycles_until_no_jobs_remain
so that it continues running if one has done Scheduler.yield
. Split the implementation of Deferred
into a number of files, to solve some problems with circularities.
Split into:
- deferred.ml
- deferred_sequence.ml
- deferred_list.ml
- deferred_array.ml
- deferred_queue.ml
- deferred_map.ml
- deferred_result.ml
- deferred_option.ml
For a sequence of multiple modules used to construct a module, switched from the Raw_*
prefix convention to the numeric suffix convention. E.g. we now have Deferred0
, Deferred1
, Deferred
.