Source file prevalidator.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
(** Minimal delay between two mempool advertisements *)
let advertisement_delay = 0.1
(** Argument that will be provided to {!Worker.MakeGroup} to create
    the prevalidator worker. *)
module Name = struct
  type t = Chain_id.t * Protocol_hash.t
  let encoding = Data_encoding.tup2 Chain_id.encoding Protocol_hash.encoding
  let base = ["prevalidator"]
  let pp fmt (chain_id, proto_hash) =
    Format.fprintf
      fmt
      "%a:%a"
      Chain_id.pp_short
      chain_id
      Protocol_hash.pp_short
      proto_hash
  let equal (c1, p1) (c2, p2) =
    Chain_id.equal c1 c2 && Protocol_hash.equal p1 p2
end
open Prevalidator_worker_state
(** A prevalidator instance, tailored to a specific protocol (even if
    it is not visible in this module type). *)
module type T = sig
  type types_state
  val get_rpc_directory :
    types_state -> types_state Tezos_rpc.Directory.t lazy_t
  val name : Name.t
  module Types : Worker_intf.TYPES with type state = types_state
  module Worker :
    Worker.T
      with type ('a, 'b) Request.t = ('a, 'b) Request.t
       and type Request.view = Request.view
       and type Types.state = types_state
  type worker = Worker.infinite Worker.queue Worker.t
  val worker : worker Lazy.t
end
open Shell_operation
module Events = Prevalidator_events
module Classification = Prevalidator_classification
(** This module encapsulates pending operations to maintain them in two
    different data structure and avoid coslty repetitive convertions when
    handling batches in [classify_pending_operations]. *)
module Pending_ops = Prevalidator_pending_operations
(** Module encapsulating some types that are used both in production
    and in tests. Having them in a module makes it possible to
    [include] this module in {!Internal_for_tests} below and avoid
    code duplication.
    The raison d'etre of these records of functions is to be able to use
    alternative implementations of all functions in tests.
    The purpose of the {!Tools.tools} record is to abstract away from {!Store.chain_store}.
    Under the hood [Store.chain_store] requires an Irmin store on disk,
    which makes it impractical for fast testing: every test would need
    to create a temporary folder on disk which doesn't scale well.
    The purpose of the {!Tools.worker_tools} record is to abstract away
    from the {!Worker} implementation. This implementation is overkill
    for testing: we don't need asynchronicity and concurrency in our
    pretty basic existing tests. Having this abstraction allows to get
    away with a much simpler state machine model of execution and
    to have simpler test setup. *)
module Tools = struct
  (** Functions provided by {!Distributed_db} and {!Store.chain_store}
      that are used in various places of the mempool. Gathered here so that we can test
      the mempool without requiring a full-fledged [Distributed_db]/[Store.Chain_store]. *)
  type 'prevalidation_t tools = {
    advertise_current_head : mempool:Mempool.t -> Store.Block.t -> unit;
        (** [advertise_current_head mempool head] sends a
            [Current_head (chain_id, head_header, mempool)] message to all known
            active peers for the chain being considered. *)
    chain_tools : Store.Block.t Classification.chain_tools;
        (** Lower-level tools provided by {!Prevalidator_classification} *)
    flush :
      head:Store.Block.t ->
      timestamp:Time.Protocol.t ->
      'prevalidation_t ->
      'prevalidation_t tzresult Lwt.t;
        (** Create a new empty prevalidation state, recycling some elements
            of the provided previous prevalidation state. *)
    fetch :
      ?peer:P2p_peer.Id.t ->
      ?timeout:Time.System.Span.t ->
      Operation_hash.t ->
      Operation.t tzresult Lwt.t;
        (** [fetch ?peer ?timeout oph] returns the value when it is known.
            It can fail with [Requester.Timeout] if [timeout] is provided and the value
            isn't known before the timeout expires. It can fail with [Requester.Cancel] if
            the request is canceled. *)
    read_block : Block_hash.t -> Store.Block.t tzresult Lwt.t;
        (** [read_block bh] tries to read the block [bh] from the chain store. *)
    send_get_current_head : ?peer:P2p_peer_id.t -> unit -> unit;
        (** [send_get_current_head ?peer ()] sends a [Get_Current_head]
            to a given peer, or to all known active peers for the chain considered.
            Expected answer is a [Get_current_head] message *)
    set_mempool : head:Block_hash.t -> Mempool.t -> unit tzresult Lwt.t;
        (** [set_mempool ~head mempool] sets the [mempool] of
            the [chain_store] of the chain considered. Does nothing if [head] differs
            from current_head which might happen when a new head concurrently arrives just
            before this operation is being called. *)
  }
  (** Abstraction over services implemented in production by {!Worker}
      but implemented differently in tests.
      Also see the enclosing module documentation as to why we have this record. *)
  type worker_tools = {
    push_request :
      (unit, Empty.t) Prevalidator_worker_state.Request.t -> bool Lwt.t;
        (** Adds a message to the queue. *)
    push_request_now :
      (unit, Empty.t) Prevalidator_worker_state.Request.t -> unit;
        (** Adds a message to the queue immediately. *)
  }
end
type 'a parameters = {
  limits : Shell_limits.prevalidator_limits;
  tools : 'a Tools.tools;
}
(** The type needed for the implementation of [Make] below, but
 *  which is independent from the protocol. *)
type ('protocol_data, 'a) types_state_shell = {
  classification : 'protocol_data Classification.t;
  parameters : 'a parameters;
  mutable predecessor : Store.Block.t;
  mutable timestamp : Time.System.t;
  mutable live_blocks : Block_hash.Set.t;
  mutable live_operations : Operation_hash.Set.t;
  mutable fetching : Operation_hash.Set.t;
      (** An operation is in [fetching] while the ddb is actively
          requesting it from peers. It is removed from it when the
          operation arrives or if the request fails (e.g. timeout). *)
  mutable pending : 'protocol_data Pending_ops.t;
  mutable mempool : Mempool.t;
  mutable advertisement : [`Pending of Mempool.t | `None];
  mutable banned_operations : Operation_hash.Set.t;
  worker : Tools.worker_tools;
}
let metrics = Shell_metrics.Mempool.init ["mempool"]
(** The concrete production instance of {!block_tools} *)
let block_tools : Store.Block.t Classification.block_tools =
  {
    bhash = Store.Block.hash;
    operations = Store.Block.operations;
    all_operation_hashes = Store.Block.all_operation_hashes;
  }
(** How to create an instance of {!chain_tools} from a {!Distributed_db.chain_db}. *)
let mk_chain_tools (chain_db : Distributed_db.chain_db) :
    Store.Block.t Classification.chain_tools =
  let open Lwt_syntax in
  let new_blocks ~from_block ~to_block =
    let chain_store = Distributed_db.chain_store chain_db in
    Store.Chain_traversal.new_blocks chain_store ~from_block ~to_block
  in
  let read_predecessor_opt block =
    let chain_store = Distributed_db.chain_store chain_db in
    Store.Block.read_predecessor_opt chain_store block
  in
  let inject_operation oph op =
    let* _ = Distributed_db.inject_operation chain_db oph op in
    Lwt.return_unit
  in
  {
    clear_or_cancel = Distributed_db.Operation.clear_or_cancel chain_db;
    inject_operation;
    new_blocks;
    read_predecessor_opt;
  }
(** Module type used both in production and in tests. *)
module type S = sig
  (** Type instantiated by {!Prevalidation.T.config}. *)
  type config
  (** Similar to the type [operation] from the protocol,
      see {!Tezos_protocol_environment.PROTOCOL} *)
  type protocol_operation
  (** Type instantiated by {!Prevalidation.t} *)
  type prevalidation_t
  type types_state = {
    shell : (protocol_operation, prevalidation_t) types_state_shell;
    mutable validation_state : prevalidation_t;
        (** Internal prevalidation state. Among others, this contains the
            internal states of the protocol mempool and of the plugin. *)
    mutable operation_stream :
      (Classification.classification * protocol_operation operation)
      Lwt_watcher.input;
    mutable rpc_directory : types_state Tezos_rpc.Directory.t lazy_t;
    mutable config : config;
    lock : Lwt_mutex.t;
  }
  (** This function fetches an operation if it is not already handled
      as defined by [already_handled] below. The implementation makes
      sure to fetch an operation at most once, modulo operations
      lost because of bounded buffers becoming full.
      This function is an intruder to this module type. It just happens
      that it is needed both by internals of the implementation of {!S}
      and by the internals of the implementation of {!T}; so it needs
      to be exposed here. *)
  val may_fetch_operation :
    (protocol_operation, prevalidation_t) types_state_shell ->
    P2p_peer_id.t option ->
    Operation_hash.t ->
    unit
  (** The function called after every call to a function of {!API}. *)
  val handle_unprocessed : types_state -> unit Lwt.t
  (** The inner API of the mempool i.e. functions called by the worker
      when an individual request arrives. These functions are the
      most high-level ones that we test. All these [on_*] functions
      correspond to a single event. Possible
      sequences of calls to this API are always of the form:
      on_*; handle_unprocessed; on_*; handle_unprocessed; ... *)
  module Requests : sig
    val on_advertise : _ types_state_shell -> unit
    val on_arrived :
      types_state ->
      Operation_hash.t ->
      Operation.t ->
      (unit, Empty.t) result Lwt.t
    val on_ban : types_state -> Operation_hash.t -> unit tzresult Lwt.t
    val on_flush :
      handle_branch_refused:bool ->
      types_state ->
      Store.Block.t ->
      Block_hash.Set.t ->
      Operation_hash.Set.t ->
      unit tzresult Lwt.t
    val on_inject :
      types_state -> force:bool -> Operation.t -> unit tzresult Lwt.t
    val on_notify : _ types_state_shell -> P2p_peer_id.t -> Mempool.t -> unit
  end
end
(** A functor for obtaining the testable part of this file (see
    the instantiation of this functor in {!Internal_for_tests} at the
    end of this file). Contrary to the production-only functor {!Make} below,
    this functor doesn't assume a specific chain store implementation,
    which is the crux for having it easily unit-testable. *)
module Make_s
    (Proto : Protocol_plugin.T)
    (Prevalidation_t : Prevalidation.T
                         with type protocol_operation = Proto.operation) :
  S
    with type config = Prevalidation_t.config
     and type protocol_operation = Proto.operation
     and type prevalidation_t = Prevalidation_t.t = struct
  type config = Prevalidation_t.config
  type protocol_operation = Proto.operation
  type prevalidation_t = Prevalidation_t.t
  type types_state = {
    shell : (protocol_operation, prevalidation_t) types_state_shell;
    mutable validation_state : prevalidation_t;
    mutable operation_stream :
      (Classification.classification * protocol_operation operation)
      Lwt_watcher.input;
    mutable rpc_directory : types_state Tezos_rpc.Directory.t lazy_t;
    mutable config : config;
    lock : Lwt_mutex.t;
  }
  let already_handled ~origin shell oph =
    if Operation_hash.Set.mem oph shell.banned_operations then (
      
      ignore
        (Unit.catch_s (fun () ->
             Events.(emit ban_operation_encountered) (origin, oph))) ;
      true)
    else
      Pending_ops.mem oph shell.pending
      || Operation_hash.Set.mem oph shell.live_operations
      || Classification.is_in_mempool oph shell.classification <> None
      || Classification.is_known_unparsable oph shell.classification
  let advertise (shell : ('operation_data, _) types_state_shell) mempool =
    let open Lwt_syntax in
    match shell.advertisement with
    | `Pending {Mempool.known_valid; pending} ->
        shell.advertisement <-
          `Pending
            {
              known_valid =
                Operation_hash.Set.union known_valid mempool.Mempool.known_valid;
              pending = Operation_hash.Set.union pending mempool.pending;
            }
    | `None ->
        shell.advertisement <- `Pending mempool ;
        Lwt.dont_wait
          (fun () ->
            let* () = Lwt_unix.sleep advertisement_delay in
            shell.worker.push_request_now Advertise ;
            Lwt.return_unit)
          (fun exc ->
            Format.eprintf "Uncaught exception: %s\n%!" (Printexc.to_string exc))
  
  let handle_classification
      ~(notifier :
         Classification.classification -> protocol_operation operation -> unit)
      shell (op, kind) =
    Classification.add kind op shell.classification ;
    notifier kind op
  let mk_notifier operation_stream classification op =
    
    Lwt_watcher.notify operation_stream (classification, op)
  let pre_filter pv ~notifier parsed_op : [Pending_ops.priority | `Drop] Lwt.t =
    let open Lwt_syntax in
    let+ v =
      Prevalidation_t.pre_filter pv.validation_state pv.config parsed_op
    in
    match v with
    | (`Branch_delayed _ | `Branch_refused _ | `Refused _ | `Outdated _) as errs
      ->
        handle_classification ~notifier pv.shell (parsed_op, errs) ;
        `Drop
    | `Passed_prefilter priority -> (priority :> [Pending_ops.priority | `Drop])
  let set_mempool shell mempool =
    shell.mempool <- mempool ;
    shell.parameters.tools.set_mempool
      ~head:(Store.Block.hash shell.predecessor)
      shell.mempool
  let remove_from_advertisement oph = function
    | `Pending mempool -> `Pending (Mempool.remove oph mempool)
    | `None -> `None
  
  let reclassify_replaced_manager_op old_hash shell
      (replacement_classification : [< Classification.error_classification]) =
    shell.advertisement <-
      remove_from_advertisement old_hash shell.advertisement ;
    match Classification.remove old_hash shell.classification with
    | Some (op, _class) ->
        Some (op, (replacement_classification :> Classification.classification))
    | None ->
        
        shell.parameters.tools.chain_tools.clear_or_cancel old_hash ;
        None
  
  let classify_operation shell ~config ~validation_state mempool op :
      (prevalidation_t
      * Mempool.t
      * (protocol_operation operation * Classification.classification) trace)
      Lwt.t =
    let open Lwt_syntax in
    let* v_state, op, classification, replacements =
      Prevalidation_t.add_operation validation_state config op
    in
    let to_replace =
      List.filter_map
        (fun (replaced_oph, new_classification) ->
          reclassify_replaced_manager_op replaced_oph shell new_classification)
        replacements
    in
    let to_handle = (op, classification) :: to_replace in
    let mempool =
      match classification with
      | `Validated -> Mempool.cons_valid op.hash mempool
      | `Branch_refused _ | `Branch_delayed _ | `Refused _ | `Outdated _ ->
          mempool
    in
    return (v_state, mempool, to_handle)
  
  let classify_pending_operations ~notifier shell config state =
    let open Lwt_syntax in
    let* r =
      Pending_ops.fold_es
        (fun _prio oph op (acc_validation_state, acc_mempool, limit) ->
          if limit <= 0 then
            
            Lwt.return_error (acc_validation_state, acc_mempool)
          else (
            shell.pending <- Pending_ops.remove oph shell.pending ;
            let* new_validation_state, new_mempool, to_handle =
              classify_operation
                shell
                ~config
                ~validation_state:acc_validation_state
                acc_mempool
                op
            in
            let+ () = Events.(emit operation_reclassified) oph in
            List.iter (handle_classification ~notifier shell) to_handle ;
            Ok (new_validation_state, new_mempool, limit - 1)))
        shell.pending
        (state, Mempool.empty, shell.parameters.limits.operations_batch_size)
    in
    match r with
    | Error (state, advertised_mempool) ->
        
        let* (_was_pushed : bool) =
          shell.worker.push_request Request.Leftover
        in
        Lwt.return (state, advertised_mempool)
    | Ok (state, advertised_mempool, _) -> Lwt.return (state, advertised_mempool)
  let update_advertised_mempool_fields pv_shell delta_mempool =
    let open Lwt_syntax in
    if Mempool.is_empty delta_mempool then Lwt.return_unit
    else
      
      let mempool_to_advertise =
        Mempool.{delta_mempool with known_valid = delta_mempool.known_valid}
      in
      advertise pv_shell mempool_to_advertise ;
      let our_mempool =
        let validated_hashes =
          Classification.Sized_map.fold
            (fun x _ acc -> Operation_hash.Set.add x acc)
            pv_shell.classification.validated
            Operation_hash.Set.empty
        in
        {
          Mempool.known_valid = validated_hashes;
          pending = Pending_ops.hashes pv_shell.pending;
        }
      in
      let* _res = set_mempool pv_shell our_mempool in
      Lwt.pause ()
  let handle_unprocessed pv =
    let open Lwt_syntax in
    let notifier = mk_notifier pv.operation_stream in
    if Pending_ops.is_empty pv.shell.pending then Lwt.return_unit
    else
      let* () = Events.(emit processing_operations) () in
      let* validation_state, delta_mempool =
        classify_pending_operations
          ~notifier
          pv.shell
          pv.config
          pv.validation_state
      in
      pv.validation_state <- validation_state ;
      update_advertised_mempool_fields pv.shell delta_mempool
  
  let fetch_operation ~notify_arrival
      (shell : ('operation_data, _) types_state_shell) ?peer oph =
    let open Lwt_syntax in
    let* () = Events.(emit fetching_operation) oph in
    let* r =
      protect @@ fun () ->
      shell.parameters.tools.fetch
        ~timeout:shell.parameters.limits.operation_timeout
        ?peer
        oph
    in
    match r with
    | Ok op ->
        if notify_arrival then shell.worker.push_request_now (Arrived (oph, op)) ;
        Lwt.return_unit
    | Error err -> (
        
        if notify_arrival then
          shell.fetching <- Operation_hash.Set.remove oph shell.fetching ;
        match err with
        | Distributed_db.Operation.Canceled _ :: _ ->
            Events.(emit operation_included) oph
        | _ ->
            
            Events.(emit operation_not_fetched) oph)
  
  let may_fetch_operation (shell : ('operation_data, _) types_state_shell) peer
      oph =
    let origin =
      match peer with Some peer -> Events.Peer peer | None -> Leftover
    in
    let spawn_fetch_operation ~notify_arrival =
      ignore
        (Unit.catch_s (fun () ->
             fetch_operation ~notify_arrival shell ?peer oph))
    in
    if Operation_hash.Set.mem oph shell.fetching then
      
      spawn_fetch_operation ~notify_arrival:false
    else if not (already_handled ~origin shell oph) then (
      shell.fetching <- Operation_hash.Set.add oph shell.fetching ;
      spawn_fetch_operation ~notify_arrival:true)
  (** Module containing functions that are the internal transitions
      of the mempool. These functions are called by the {!Worker} when
      an event arrives. *)
  module Requests = struct
    module Parser = MakeParser (Proto)
    let on_arrived (pv : types_state) oph op : (unit, Empty.t) result Lwt.t =
      let open Lwt_syntax in
      pv.shell.fetching <- Operation_hash.Set.remove oph pv.shell.fetching ;
      if already_handled ~origin:Events.Arrived pv.shell oph then return_ok_unit
      else
        match Parser.parse oph op with
        | Error _ ->
            let* () = Events.(emit unparsable_operation) oph in
            Prevalidator_classification.add_unparsable
              oph
              pv.shell.classification ;
            return_ok_unit
        | Ok parsed_op -> (
            let* v =
              pre_filter
                pv
                ~notifier:(mk_notifier pv.operation_stream)
                parsed_op
            in
            match v with
            | `Drop -> return_ok_unit
            | (`High | `Medium | `Low _) as prio ->
                if
                  not
                    (Block_hash.Set.mem
                       op.Operation.shell.branch
                       pv.shell.live_blocks)
                then (
                  pv.shell.parameters.tools.chain_tools.clear_or_cancel oph ;
                  return_ok_unit)
                else (
                  
                  pv.shell.pending <-
                    Pending_ops.add parsed_op prio pv.shell.pending ;
                  return_ok_unit))
    let on_inject (pv : types_state) ~force op =
      let open Lwt_result_syntax in
      let oph = Operation.hash op in
      
      let prio = `High in
      if already_handled ~origin:Events.Injected pv.shell oph then
        
        return_unit
      else
        match Parser.parse oph op with
        | Error err ->
            failwith
              "Invalid operation %a: %a."
              Operation_hash.pp
              oph
              Error_monad.pp_print_trace
              err
        | Ok parsed_op -> (
            if force then (
              let*! () =
                pv.shell.parameters.tools.chain_tools.inject_operation oph op
              in
              pv.shell.pending <-
                Pending_ops.add parsed_op prio pv.shell.pending ;
              let*! () = Events.(emit operation_injected) oph in
              return_unit)
            else if
              not
                (Block_hash.Set.mem
                   op.Operation.shell.branch
                   pv.shell.live_blocks)
            then
              failwith
                "Operation %a is branched on a block %a which is too old"
                Operation_hash.pp
                oph
                Block_hash.pp
                op.Operation.shell.branch
            else
              let notifier = mk_notifier pv.operation_stream in
              let*! validation_state, delta_mempool, to_handle =
                classify_operation
                  pv.shell
                  ~config:pv.config
                  ~validation_state:pv.validation_state
                  Mempool.empty
                  parsed_op
              in
              let op_status =
                
                List.find_opt
                  (function
                    | ({hash; _} : protocol_operation operation), _ ->
                        Operation_hash.equal hash oph)
                  to_handle
              in
              match op_status with
              | Some (_h, `Validated) ->
                  
                  let*! () =
                    pv.shell.parameters.tools.chain_tools.inject_operation
                      oph
                      op
                  in
                  
                  List.iter (handle_classification ~notifier pv.shell) to_handle ;
                  pv.validation_state <- validation_state ;
                  
                  let*! v =
                    update_advertised_mempool_fields pv.shell delta_mempool
                  in
                  let*! () = Events.(emit operation_injected) oph in
                  return v
              | Some
                  ( _h,
                    ( `Branch_delayed e
                    | `Branch_refused e
                    | `Refused e
                    | `Outdated e ) ) ->
                  Lwt.return
                  @@ error_with
                       "Error while validating injected operation %a:@ %a"
                       Operation_hash.pp
                       oph
                       pp_print_trace
                       e
              | None ->
                  
                  failwith
                    "Unexpected error while injecting operation %a. Operation \
                     not found after classifying it."
                    Operation_hash.pp
                    oph)
    let on_notify (shell : ('operation_data, _) types_state_shell) peer mempool
        =
      let may_fetch_operation = may_fetch_operation shell (Some peer) in
      let () =
        Operation_hash.Set.iter may_fetch_operation mempool.Mempool.known_valid
      in
      Seq.iter
        may_fetch_operation
        (Operation_hash.Set.to_seq mempool.Mempool.pending)
    let on_flush ~handle_branch_refused pv new_predecessor new_live_blocks
        new_live_operations =
      let open Lwt_result_syntax in
      let old_predecessor = pv.shell.predecessor in
      pv.shell.predecessor <- new_predecessor ;
      pv.shell.live_blocks <- new_live_blocks ;
      pv.shell.live_operations <- new_live_operations ;
      Lwt_watcher.shutdown_input pv.operation_stream ;
      pv.operation_stream <- Lwt_watcher.create_input () ;
      let timestamp_system = Tezos_base.Time.System.now () in
      pv.shell.timestamp <- timestamp_system ;
      let timestamp = Time.System.to_protocol timestamp_system in
      let* validation_state =
        pv.shell.parameters.tools.flush
          ~head:new_predecessor
          ~timestamp
          pv.validation_state
      in
      pv.validation_state <- validation_state ;
      let*! new_pending_operations =
        Classification.recycle_operations
          ~from_branch:old_predecessor
          ~to_branch:new_predecessor
          ~live_blocks:new_live_blocks
          ~parse:(fun oph op -> Result.to_option (Parser.parse oph op))
          ~classes:pv.shell.classification
          ~pending:(Pending_ops.operations pv.shell.pending)
          ~block_store:block_tools
          ~chain:pv.shell.parameters.tools.chain_tools
          ~handle_branch_refused
      in
      
      let*! new_pending_operations, nb_pending =
        Operation_hash.Map.fold_s
          (fun _oph op (pending, nb_pending) ->
            let*! v =
              pre_filter pv ~notifier:(mk_notifier pv.operation_stream) op
            in
            match v with
            | `Drop -> Lwt.return (pending, nb_pending)
            | (`High | `Medium | `Low _) as prio ->
                
                Lwt.return (Pending_ops.add op prio pending, nb_pending + 1))
          new_pending_operations
          (Pending_ops.empty, 0)
      in
      let*! () = Events.(emit operations_to_reclassify) nb_pending in
      pv.shell.pending <- new_pending_operations ;
      set_mempool pv.shell Mempool.empty
    let on_advertise (shell : ('protocol_data, _) types_state_shell) =
      match shell.advertisement with
      | `None ->
          () 
      | `Pending mempool ->
          shell.advertisement <- `None ;
          
          if not (Mempool.is_empty mempool) then
            shell.parameters.tools.advertise_current_head
              ~mempool
              shell.predecessor
    
    let remove ~flush_if_validated pv oph =
      let open Lwt_result_syntax in
      pv.shell.parameters.tools.chain_tools.clear_or_cancel oph ;
      pv.shell.advertisement <-
        remove_from_advertisement oph pv.shell.advertisement ;
      pv.shell.banned_operations <-
        Operation_hash.Set.add oph pv.shell.banned_operations ;
      match Classification.remove oph pv.shell.classification with
      | None ->
          pv.shell.pending <- Pending_ops.remove oph pv.shell.pending ;
          pv.shell.fetching <- Operation_hash.Set.remove oph pv.shell.fetching ;
          return_unit
      | Some (_op, classification) -> (
          match (classification, flush_if_validated) with
          | `Validated, true ->
              let+ () =
                on_flush
                  ~handle_branch_refused:false
                  pv
                  pv.shell.predecessor
                  pv.shell.live_blocks
                  pv.shell.live_operations
              in
              pv.shell.pending <- Pending_ops.remove oph pv.shell.pending
          | `Branch_delayed _, _
          | `Branch_refused _, _
          | `Refused _, _
          | `Outdated _, _
          | `Validated, false ->
              pv.validation_state <-
                Prevalidation_t.remove_operation pv.validation_state oph ;
              return_unit)
    let on_ban pv oph_to_ban =
      let open Lwt_result_syntax in
      pv.shell.banned_operations <-
        Operation_hash.Set.add oph_to_ban pv.shell.banned_operations ;
      let* res = remove ~flush_if_validated:true pv oph_to_ban in
      let*! () = Events.(emit operation_banned) oph_to_ban in
      return res
  end
end
module type ARG = sig
  val limits : Shell_limits.prevalidator_limits
  val chain_db : Distributed_db.chain_db
  val chain_id : Chain_id.t
end
module WorkerGroup = Worker.MakeGroup (Name) (Prevalidator_worker_state.Request)
(** The functor that is not tested, in other words used only in production.
    This functor's code is not tested (contrary to functor {!Make_s} above),
    because it hardcodes a dependency to [Store.chain_store] in its instantiation
    of type [chain_store]. This is what makes the code of this functor
    not testable for the moment, because [Store.chain_store] has poor
    testing capabilities.
    Note that, because this functor [include]s {!Make_s}, it is a
    strict extension of [Make_s]. *)
module Make
    (Proto : Protocol_plugin.T)
    (Arg : ARG)
    (Prevalidation_t : Prevalidation.T
                         with type protocol_operation = Proto.operation
                          and type chain_store = Store.chain_store) : T = struct
  module S = Make_s (Proto) (Prevalidation_t)
  open S
  type types_state = S.types_state
  let get_rpc_directory pv = pv.rpc_directory
  let name = (Arg.chain_id, Proto.hash)
  module Types = struct
    type state = types_state
    type parameters = Shell_limits.prevalidator_limits * Distributed_db.chain_db
  end
  module Worker :
    Worker.T
      with type Name.t = Name.t
       and type ('a, 'b) Request.t = ('a, 'b) Request.t
       and type Request.view = Request.view
       and type Types.state = Types.state
       and type Types.parameters = Types.parameters =
    WorkerGroup.MakeWorker (Types)
  open Types
  type worker = Worker.infinite Worker.queue Worker.t
  (** Return a json describing the prevalidator's [config].
      The boolean [include_default] ([true] by default) indicates
      whether the json should include the fields which have a value
      equal to their default value. *)
  let get_config_json ?(include_default = true) pv =
    let include_default_fields = if include_default then `Always else `Never in
    Data_encoding.Json.construct
      ~include_default_fields
      Prevalidation_t.config_encoding
      pv.config
  let filter_validation_passes allowed_validation_passes
      (op : protocol_operation) =
    match allowed_validation_passes with
    | [] -> true
    | validation_passes -> (
        match Proto.acceptable_pass op with
        | None -> false
        | Some validation_pass ->
            List.mem ~equal:Compare.Int.equal validation_pass validation_passes)
  let build_rpc_directory w =
    lazy
      (let open Lwt_result_syntax in
      let dir : state Tezos_rpc.Directory.t ref =
        ref Tezos_rpc.Directory.empty
      in
      let module Proto_services = Block_services.Make (Proto) (Proto) in
      dir :=
        Tezos_rpc.Directory.register
          !dir
          (Proto_services.S.Mempool.get_filter Tezos_rpc.Path.open_root)
          (fun pv params () ->
            return (get_config_json ~include_default:params#include_default pv)) ;
      dir :=
        Tezos_rpc.Directory.register
          !dir
          (Proto_services.S.Mempool.set_filter Tezos_rpc.Path.open_root)
          (fun pv () obj ->
            let open Lwt_syntax in
            let* () =
              try
                let config =
                  Data_encoding.Json.destruct
                    Prevalidation_t.config_encoding
                    obj
                in
                pv.config <- config ;
                Lwt.return_unit
              with _ -> Events.(emit invalid_mempool_filter_configuration) ()
            in
            
            return_ok (get_config_json pv)) ;
      
      dir :=
        Tezos_rpc.Directory.register
          !dir
          (Proto_services.S.Mempool.ban_operation Tezos_rpc.Path.open_root)
          (fun _pv () oph ->
            let open Lwt_result_syntax in
            let*! r = Worker.Queue.push_request_and_wait w (Request.Ban oph) in
            match r with
            | Error (Closed None) -> fail [Worker_types.Terminated]
            | Error (Closed (Some errs)) -> fail errs
            | Error (Request_error err) -> fail err
            | Error (Any exn) -> fail [Exn exn]
            | Ok () -> return_unit) ;
      
      dir :=
        Tezos_rpc.Directory.register
          !dir
          (Proto_services.S.Mempool.unban_operation Tezos_rpc.Path.open_root)
          (fun pv () oph ->
            pv.shell.banned_operations <-
              Operation_hash.Set.remove oph pv.shell.banned_operations ;
            return_unit) ;
      
      dir :=
        Tezos_rpc.Directory.register
          !dir
          (Proto_services.S.Mempool.unban_all_operations
             Tezos_rpc.Path.open_root)
          (fun pv () () ->
            pv.shell.banned_operations <- Operation_hash.Set.empty ;
            return_unit) ;
      dir :=
        Tezos_rpc.Directory.gen_register
          !dir
          (Proto_services.S.Mempool.pending_operations Tezos_rpc.Path.open_root)
          (fun pv params () ->
            let validated =
              if
                params#validated && Option.value ~default:true params#applied
                
              then
                Classification.Sized_map.to_map
                  pv.shell.classification.validated
                |> Operation_hash.Map.to_seq
                |> Seq.filter_map (fun (oph, op) ->
                       if
                         filter_validation_passes
                           params#validation_passes
                           op.protocol
                       then Some (oph, op.protocol)
                       else None)
                |> List.of_seq
              else []
            in
            let process_map map =
              let open Operation_hash in
              Map.filter_map
                (fun _oph (op, error) ->
                  if
                    filter_validation_passes
                      params#validation_passes
                      op.protocol
                  then Some (op.protocol, error)
                  else None)
                map
            in
            let refused =
              if params#refused then
                process_map (Classification.map pv.shell.classification.refused)
              else Operation_hash.Map.empty
            in
            let outdated =
              if params#outdated then
                process_map
                  (Classification.map pv.shell.classification.outdated)
              else Operation_hash.Map.empty
            in
            let branch_refused =
              if params#branch_refused then
                process_map
                  (Classification.map pv.shell.classification.branch_refused)
              else Operation_hash.Map.empty
            in
            let branch_delayed =
              if params#branch_delayed then
                process_map
                  (Classification.map pv.shell.classification.branch_delayed)
              else Operation_hash.Map.empty
            in
            let unprocessed =
              Operation_hash.Map.filter_map
                (fun _ {protocol; _} ->
                  if filter_validation_passes params#validation_passes protocol
                  then Some protocol
                  else None)
                (Pending_ops.operations pv.shell.pending)
            in
            let pending_operations =
              {
                Proto_services.Mempool.validated;
                refused;
                outdated;
                branch_refused;
                branch_delayed;
                unprocessed;
              }
            in
            Tezos_rpc.Answer.return (params#version, pending_operations)) ;
      dir :=
        Tezos_rpc.Directory.register
          !dir
          (Proto_services.S.Mempool.request_operations Tezos_rpc.Path.open_root)
          (fun pv t () ->
            pv.shell.parameters.tools.send_get_current_head ?peer:t#peer_id () ;
            return_unit) ;
      dir :=
        Tezos_rpc.Directory.gen_register
          !dir
          (Proto_services.S.Mempool.monitor_operations Tezos_rpc.Path.open_root)
          (fun pv params () ->
            Lwt_mutex.with_lock pv.lock @@ fun () ->
            let op_stream, stopper =
              Lwt_watcher.create_stream pv.operation_stream
            in
            
            let validated_seq =
              if
                params#validated && Option.value ~default:true params#applied
                
              then
                Classification.Sized_map.to_map
                  pv.shell.classification.validated
                |> Operation_hash.Map.to_seq
                |> Seq.map (fun (hash, {protocol; _}) ->
                       ((hash, protocol), None))
              else Seq.empty
            in
            let process_error_map map =
              let open Operation_hash in
              map |> Map.to_seq
              |> Seq.map (fun (hash, (op, error)) ->
                     ((hash, op.protocol), Some error))
            in
            let refused_seq =
              if params#refused then
                process_error_map
                  (Classification.map pv.shell.classification.refused)
              else Seq.empty
            in
            let branch_refused_seq =
              if params#branch_refused then
                process_error_map
                  (Classification.map pv.shell.classification.branch_refused)
              else Seq.empty
            in
            let branch_delayed_seq =
              if params#branch_delayed then
                process_error_map
                  (Classification.map pv.shell.classification.branch_delayed)
              else Seq.empty
            in
            let outdated_seq =
              if params#outdated then
                process_error_map
                  (Classification.map pv.shell.classification.outdated)
              else Seq.empty
            in
            let filter ((_, op), _) =
              filter_validation_passes params#validation_passes op
            in
            let current_mempool =
              Seq.append outdated_seq branch_delayed_seq
              |> Seq.append branch_refused_seq
              |> Seq.append refused_seq |> Seq.append validated_seq
              |> Seq.filter filter |> List.of_seq
            in
            let current_mempool = ref (Some current_mempool) in
            let filter_result = function
              | `Validated ->
                  params#validated && Option.value ~default:true params#applied
              | `Refused _ -> params#refused
              | `Outdated _ -> params#outdated
              | `Branch_refused _ -> params#branch_refused
              | `Branch_delayed _ -> params#branch_delayed
            in
            let rec next () =
              let open Lwt_syntax in
              match !current_mempool with
              | Some mempool ->
                  current_mempool := None ;
                  Lwt.return_some (params#version, mempool)
              | None -> (
                  let* o = Lwt_stream.get op_stream in
                  match o with
                  | Some (kind, op)
                    when filter_result kind
                         && filter_validation_passes
                              params#validation_passes
                              op.protocol ->
                      let errors =
                        match kind with
                        | `Validated -> None
                        | `Branch_delayed errors
                        | `Branch_refused errors
                        | `Refused errors
                        | `Outdated errors ->
                            Some errors
                      in
                      Lwt.return_some
                        (params#version, [((op.hash, op.protocol), errors)])
                  | Some _ -> next ()
                  | None -> Lwt.return_none)
            in
            let shutdown () = Lwt_watcher.shutdown stopper in
            Tezos_rpc.Answer.return_stream {next; shutdown}) ;
      !dir)
  (** Module implementing the events at the {!Worker} level. Contrary
      to {!Requests}, these functions depend on [Worker]. *)
  module Handlers = struct
    type self = worker
    let on_request :
        type r request_error.
        worker ->
        (r, request_error) Request.t ->
        (r, request_error) result Lwt.t =
     fun w request ->
      let open Lwt_result_syntax in
      Prometheus.Counter.inc_one metrics.worker_counters.worker_request_count ;
      let pv = Worker.state w in
      let post_processing :
          (r, request_error) result Lwt.t -> (r, request_error) result Lwt.t =
       fun r ->
        let open Lwt_syntax in
        let* () = handle_unprocessed pv in
        r
      in
      post_processing
      @@
      match request with
      | Request.Flush (hash, event, live_blocks, live_operations) ->
          Requests.on_advertise pv.shell ;
          
          let* block = pv.shell.parameters.tools.read_block hash in
          let handle_branch_refused =
            Chain_validator_worker_state.(
              match event with
              | Head_increment | Ignored_head -> false
              | Branch_switch -> true)
          in
          Lwt_mutex.with_lock pv.lock
          @@ fun () : (r, error trace) result Lwt.t ->
          Requests.on_flush
            ~handle_branch_refused
            pv
            block
            live_blocks
            live_operations
      | Request.Notify (peer, mempool) ->
          Requests.on_notify pv.shell peer mempool ;
          return_unit
      | Request.Leftover ->
          
          return_unit
      | Request.Inject {op; force} -> Requests.on_inject pv ~force op
      | Request.Arrived (oph, op) -> Requests.on_arrived pv oph op
      | Request.Advertise ->
          Requests.on_advertise pv.shell ;
          return_unit
      | Request.Ban oph -> Requests.on_ban pv oph
    let on_close w =
      let pv = Worker.state w in
      Lwt_watcher.shutdown_input pv.operation_stream ;
      Operation_hash.Set.iter
        pv.shell.parameters.tools.chain_tools.clear_or_cancel
        pv.shell.fetching ;
      Lwt.return_unit
    let mk_tools (chain_db : Distributed_db.chain_db) : _ Tools.tools =
      let advertise_current_head ~mempool bh =
        Distributed_db.Advertise.current_head chain_db ~mempool bh
      in
      let chain_tools = mk_chain_tools chain_db in
      let flush = Prevalidation_t.flush (Distributed_db.chain_store chain_db) in
      let fetch ?peer ?timeout oph =
        Distributed_db.Operation.fetch chain_db ?timeout ?peer oph ()
      in
      let read_block bh =
        let chain_store = Distributed_db.chain_store chain_db in
        Store.Block.read_block chain_store bh
      in
      let send_get_current_head ?peer () =
        match peer with
        | None -> Distributed_db.Request.current_head_from_all chain_db
        | Some peer ->
            Distributed_db.Request.current_head_from_peer chain_db peer
      in
      let set_mempool ~head mempool =
        let chain_store = Distributed_db.chain_store chain_db in
        Store.Chain.set_mempool chain_store ~head mempool
      in
      {
        advertise_current_head;
        chain_tools;
        flush;
        fetch;
        read_block;
        send_get_current_head;
        set_mempool;
      }
    let mk_worker_tools w : Tools.worker_tools =
      let push_request r = Worker.Queue.push_request w r in
      let push_request_now r = Worker.Queue.push_request_now w r in
      {push_request; push_request_now}
    type launch_error = error trace
    let on_launch w _ (limits, chain_db) : (state, launch_error) result Lwt.t =
      let open Lwt_result_syntax in
      let chain_store = Distributed_db.chain_store chain_db in
      let*! head = Store.Chain.current_head chain_store in
      let*! mempool = Store.Chain.mempool chain_store in
      let*! live_blocks, live_operations =
        Store.Chain.live_blocks chain_store
      in
      let timestamp_system = Tezos_base.Time.System.now () in
      let timestamp = Time.System.to_protocol timestamp_system in
      let* validation_state =
        Prevalidation_t.create chain_store ~head ~timestamp
      in
      let fetching = mempool.known_valid in
      let classification_parameters =
        Classification.
          {
            map_size_limit = limits.Shell_limits.max_refused_operations;
            on_discarded_operation =
              Distributed_db.Operation.clear_or_cancel chain_db;
          }
      in
      let classification = Classification.create classification_parameters in
      let parameters = {limits; tools = mk_tools chain_db} in
      let shell =
        {
          classification;
          parameters;
          predecessor = head;
          timestamp = timestamp_system;
          live_blocks;
          live_operations;
          mempool = Mempool.empty;
          fetching;
          pending = Pending_ops.empty;
          advertisement = `None;
          banned_operations = Operation_hash.Set.empty;
          worker = mk_worker_tools w;
        }
      in
      Shell_metrics.Mempool.set_validated_collector (fun () ->
          Prevalidator_classification.Sized_map.cardinal
            shell.classification.validated
          |> float_of_int) ;
      Shell_metrics.Mempool.set_refused_collector (fun () ->
          Prevalidator_classification.cardinal shell.classification.refused
          |> float_of_int) ;
      Shell_metrics.Mempool.set_branch_refused_collector (fun () ->
          Prevalidator_classification.cardinal
            shell.classification.branch_refused
          |> float_of_int) ;
      Shell_metrics.Mempool.set_branch_delayed_collector (fun () ->
          Prevalidator_classification.cardinal
            shell.classification.branch_delayed
          |> float_of_int) ;
      Shell_metrics.Mempool.set_outdated_collector (fun () ->
          Prevalidator_classification.cardinal shell.classification.outdated
          |> float_of_int) ;
      Shell_metrics.Mempool.set_unprocessed_collector (fun () ->
          Prevalidator_pending_operations.cardinal shell.pending |> float_of_int) ;
      let pv =
        {
          shell;
          validation_state;
          operation_stream = Lwt_watcher.create_input ();
          rpc_directory = build_rpc_directory w;
          config =
            
            Prevalidation_t.default_config;
          lock = Lwt_mutex.create ();
        }
      in
      Seq.iter
        (may_fetch_operation pv.shell None)
        (Operation_hash.Set.to_seq fetching) ;
      return pv
    let on_error (type a b) _w st (request : (a, b) Request.t) (errs : b) :
        unit tzresult Lwt.t =
      Prometheus.Counter.inc_one metrics.worker_counters.worker_error_count ;
      let open Lwt_syntax in
      match request with
      | Request.(Inject _) as r ->
          let* () = Events.(emit request_failed) (Request.view r, st, errs) in
          return_ok_unit
      | Request.Notify _ -> ( match errs with _ -> .)
      | Request.Leftover -> ( match errs with _ -> .)
      | Request.Arrived _ -> ( match errs with _ -> .)
      | Request.Advertise -> ( match errs with _ -> .)
      | Request.Flush _ ->
          let request_view = Request.view request in
          let* () = Events.(emit request_failed) (request_view, st, errs) in
          Lwt.return_error errs
      | Request.Ban _ ->
          let request_view = Request.view request in
          let* () = Events.(emit request_failed) (request_view, st, errs) in
          Lwt.return_error errs
    let on_completion _w r _ st =
      Prometheus.Counter.inc_one metrics.worker_counters.worker_completion_count ;
      match Request.view r with
      | View (Inject _) | View (Ban _) | Request.View (Flush _) ->
          Events.(emit request_completed_info) (Request.view r, st)
      | View (Notify _) | View Leftover | View (Arrived _) | View Advertise ->
          Events.(emit request_completed_debug) (Request.view r, st)
    let on_no_request _ = Lwt.return_unit
  end
  let table = Worker.create_table Queue
  
  let worker_promise =
    Worker.launch table name (Arg.limits, Arg.chain_db) (module Handlers)
  let worker =
    lazy
      (match Lwt.state worker_promise with
      | Lwt.Return (Ok worker) -> worker
      | Lwt.Return (Error _) | Lwt.Fail _ | Lwt.Sleep -> assert false)
end
let make limits chain_db chain_id (module Proto : Protocol_plugin.T) =
  let module Prevalidation_t = Prevalidation.Make (Proto) in
  let module Prevalidator =
    Make
      (Proto)
      (struct
        let limits = limits
        let chain_db = chain_db
        let chain_id = chain_id
      end)
      (Prevalidation_t)
  in
  (module Prevalidator : T)
module ChainProto_registry = Map.Make (struct
  type t = Chain_id.t * Protocol_hash.t
  let compare (c1, p1) (c2, p2) =
    let pc = Protocol_hash.compare p1 p2 in
    if pc = 0 then Chain_id.compare c1 c2 else pc
end)
(** {2 Public interface} *)
type t = (module T)
let chain_proto_registry : t ChainProto_registry.t ref =
  ref ChainProto_registry.empty
let create limits (module Proto : Protocol_plugin.T) chain_db =
  let open Lwt_result_syntax in
  let chain_store = Distributed_db.chain_store chain_db in
  let chain_id = Store.Chain.chain_id chain_store in
  match
    ChainProto_registry.find (chain_id, Proto.hash) !chain_proto_registry
  with
  | None ->
      let prevalidator = make limits chain_db chain_id (module Proto) in
      let (module Prevalidator : T) = prevalidator in
      chain_proto_registry :=
        ChainProto_registry.add
          Prevalidator.name
          prevalidator
          !chain_proto_registry ;
      return prevalidator
  | Some p -> return p
let shutdown (t : t) =
  let module Prevalidator : T = (val t) in
  let w = Lazy.force Prevalidator.worker in
  chain_proto_registry :=
    ChainProto_registry.remove Prevalidator.name !chain_proto_registry ;
  Prevalidator.Worker.shutdown w
let flush (t : t) event head live_blocks live_operations =
  let open Lwt_result_syntax in
  let module Prevalidator : T = (val t) in
  let w = Lazy.force Prevalidator.worker in
  let*! r =
    Prevalidator.Worker.Queue.push_request_and_wait
      w
      (Request.Flush (head, event, live_blocks, live_operations))
  in
  match r with
  | Ok r -> Lwt.return_ok r
  | Error (Closed None) -> fail [Worker_types.Terminated]
  | Error (Closed (Some errs)) -> fail errs
  | Error (Any exn) -> fail [Exn exn]
  | Error (Request_error error_trace) -> fail error_trace
let notify_operations (t : t) peer mempool =
  let module Prevalidator : T = (val t) in
  let w = Lazy.force Prevalidator.worker in
  let open Lwt_result_syntax in
  let*! (_was_pushed : bool) =
    Prevalidator.Worker.Queue.push_request w (Request.Notify (peer, mempool))
  in
  Lwt.return_unit
let inject_operation (t : t) ~force op =
  let module Prevalidator : T = (val t) in
  let open Lwt_result_syntax in
  let w = Lazy.force Prevalidator.worker in
  let*! r =
    Prevalidator.Worker.Queue.push_request_and_wait w (Inject {op; force})
  in
  match r with
  | Ok r -> Lwt.return_ok r
  | Error (Closed None) -> fail [Worker_types.Terminated]
  | Error (Closed (Some errs)) -> fail errs
  | Error (Any exn) -> fail [Exn exn]
  | Error (Request_error error_trace) -> fail error_trace
let status (t : t) =
  let module Prevalidator : T = (val t) in
  let w = Lazy.force Prevalidator.worker in
  Prevalidator.Worker.status w
let running_workers () =
  ChainProto_registry.fold
    (fun (id, proto) t acc -> (id, proto, t) :: acc)
    !chain_proto_registry
    []
let pending_requests (t : t) =
  let module Prevalidator : T = (val t) in
  let w = Lazy.force Prevalidator.worker in
  Prevalidator.Worker.Queue.pending_requests w
let current_request (t : t) =
  let module Prevalidator : T = (val t) in
  let w = Lazy.force Prevalidator.worker in
  Prevalidator.Worker.current_request w
let information (t : t) =
  let module Prevalidator : T = (val t) in
  let w = Lazy.force Prevalidator.worker in
  Prevalidator.Worker.information w
let pipeline_length (t : t) =
  let module Prevalidator : T = (val t) in
  let w = Lazy.force Prevalidator.worker in
  Prevalidator.Worker.Queue.pending_requests_length w
let empty_rpc_directory : unit Tezos_rpc.Directory.t =
  Tezos_rpc.Directory.gen_register
    Tezos_rpc.Directory.empty
    (Block_services.Empty.S.Mempool.pending_operations Tezos_rpc.Path.open_root)
    (fun _pv params () ->
      let pending_operations =
        {
          Block_services.Empty.Mempool.validated = [];
          refused = Operation_hash.Map.empty;
          outdated = Operation_hash.Map.empty;
          branch_refused = Operation_hash.Map.empty;
          branch_delayed = Operation_hash.Map.empty;
          unprocessed = Operation_hash.Map.empty;
        }
      in
      Tezos_rpc.Answer.return (params#version, pending_operations))
let rpc_directory : t option Tezos_rpc.Directory.t =
  Tezos_rpc.Directory.register_dynamic_directory
    Tezos_rpc.Directory.empty
    (Block_services.mempool_path Tezos_rpc.Path.open_root)
    (function
      | None ->
          Lwt.return
            (Tezos_rpc.Directory.map
               (fun _ -> Lwt.return_unit)
               empty_rpc_directory)
      | Some t ->
          let module Prevalidator : T = (val t : T) in
          let w = Lazy.force Prevalidator.worker in
          let pv = Prevalidator.Worker.state w in
          let pv_rpc_dir = Lazy.force (Prevalidator.get_rpc_directory pv) in
          Lwt.return
            (Tezos_rpc.Directory.map (fun _ -> Lwt.return pv) pv_rpc_dir))