package wayland

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file connection.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
open Lwt.Syntax
open Internal

type 'a t = 'a Internal.connection

(* Dispatch all complete messages in [recv_buffer]. *)
let rec process_recv_buffer t recv_buffer =
  let* () = t.paused in
  match Msg.parse ~fds:t.incoming_fds (Recv_buffer.data recv_buffer) with
  | None -> Lwt.return_unit
  | Some msg ->
    begin
      let obj = Msg.obj msg in
      match Objects.find_opt obj t.objects with
      | None -> Fmt.failwith "No such object %ld" obj
      | Some (Generic proxy) ->
        let msg = Msg.cast msg in
        t.trace.inbound proxy msg;
        if proxy.can_recv then (
          try
            proxy.handler#dispatch proxy msg
          with ex ->
            let bt = Printexc.get_raw_backtrace () in
            Log.err (fun f -> f "Uncaught exception handling incoming message for %a:@,%a"
                        pp_proxy proxy Fmt.exn_backtrace (ex, bt))
        ) else (
          Fmt.failwith "Received message for %a, which was shut down!" pp_proxy proxy
        )
    end;
    Recv_buffer.update_consumer recv_buffer (Msg.length msg);
    (* Unix.sleepf 0.001; *)
    (* Fmt.pr "Buffer after dispatch: %a@." Recv_buffer.dump recv_buffer; *)
    process_recv_buffer t recv_buffer

let listen t =
  let recv_buffer = Recv_buffer.create 4096 in
  let rec aux () =
    let* (got, fds) = t.transport#recv (Recv_buffer.free_buffer recv_buffer) in
    if Lwt.is_sleeping t.closed then (
      List.iter (fun fd -> Queue.add fd t.incoming_fds) fds;
      if got = 0 then (
        Log.info (fun f -> f "Got end-of-file on wayland connection");
        Lwt.return_unit
      ) else (
        Recv_buffer.update_producer recv_buffer got;
        Log.debug (fun f -> f "Ring after adding %d bytes: %a" got Recv_buffer.dump recv_buffer);
        let* () = process_recv_buffer t recv_buffer in
        aux ()
      )
    ) else (
      List.iter Unix.close fds;
      failwith "Connection is closed"
    )
  in
  Lwt.try_bind aux
    (fun () ->
       if Lwt.is_sleeping t.closed then Lwt.wakeup t.set_closed (Ok ());
       Queue.iter Unix.close t.incoming_fds;
       Lwt.return_unit;
    )
    (fun ex ->
       if Lwt.is_sleeping t.closed then
         Lwt.wakeup t.set_closed (Error ex)
       else
         Log.debug (fun f -> f "Listen error (but connection already closed): %a" Fmt.exn ex);
       Queue.iter Unix.close t.incoming_fds;
       Lwt.return_unit;
    )

let clean_up t =
  t.objects |> Objects.iter (fun _ (Generic obj) ->
      obj.on_delete |> Queue.iter (fun f ->
          try f ()
          with ex ->
            Log.warn (fun f -> f "Error from %a's on_delete handler called at end-of-connection: %a"
                         pp_proxy obj
                         Fmt.exn ex)
        );
      Queue.clear obj.on_delete
    );
  Lwt.return_unit

let connect ~trace role transport handler =
  let closed, set_closed = Lwt.wait () in
  let t = {
    transport = (transport :> S.transport);
    paused = Lwt.return_unit;
    unpause = ignore;
    role;
    objects = Objects.empty;
    free_ids = [];
    next_id = (match role with `Client -> 2l | `Server -> 0xff000000l);
    incoming_fds = Queue.create ();
    outbox = Queue.create ();
    closed;
    set_closed;
    trace = Proxy.trace trace;
  } in
  let display_proxy = Proxy.add_root t (handler :> _ Proxy.Handler.t) in
  Lwt.async (fun () ->
      Lwt.finalize
        (fun () -> listen t)
        (fun () -> clean_up t)
    );
  (t, display_proxy)

let closed t = t.closed

let set_paused t = function
  | false ->
    if Lwt.is_sleeping t.paused then t.unpause ()
  | true ->
    if not (Lwt.is_sleeping t.paused) then (
      let paused, set_paused = Lwt.wait () in
      t.paused <- paused;
      t.unpause <- Lwt.wakeup set_paused
    )

let dump f t =
  let pp_item f (_id, Generic proxy) = pp_proxy f proxy in
  Fmt.pf f "@[<v2>Connection on %t with %d objects:@,%a@]"
    t.transport#pp
    (Objects.cardinal t.objects)
    (Fmt.Dump.list pp_item) (Objects.bindings t.objects)