summaryrefslogtreecommitdiff
path: root/lib/input_foreman.ml
diff options
context:
space:
mode:
Diffstat (limited to 'lib/input_foreman.ml')
-rw-r--r--lib/input_foreman.ml215
1 files changed, 126 insertions, 89 deletions
diff --git a/lib/input_foreman.ml b/lib/input_foreman.ml
index 696427b..56c46db 100644
--- a/lib/input_foreman.ml
+++ b/lib/input_foreman.ml
@@ -547,37 +547,50 @@ let lock_one ~env ~sw ~proc_mgr ~force ~name : (unit, error) result =
Ok ()
let lock_many ~env ~sw ~proc_mgr ~domain_count ~force ~(names : Name.t list) : (unit, error) result =
- Logs.debug (fun m -> m "Locking many: %a" Fmt.(brackets (list ~sep: semi Name.pp)) names);
- let dm = Eio.Stdenv.domain_mgr env in
- let pool = Eio.Executor_pool.create ~sw ~domain_count dm in
- let any_succeed, errors =
- names
- |> List.map
- (fun name ->
- Eio.Executor_pool.submit ~weight: 1.0 pool (fun () ->
- lock_one ~env ~sw ~proc_mgr ~force ~name
- )
- )
- |> List.fold_left
- (fun (suc, errs) ->
- function
- | Ok (Ok()) -> true, errs
- | Ok (Error err) -> suc, err :: errs
- | Error exn -> suc, (`Pool_exception exn) :: errs
- )
- (false, [])
+ Logs.debug (fun m ->
+ m "Locking many: %a" Fmt.(brackets (list ~sep: semi Name.pp)) names
+ );
+ let sem = Eio.Semaphore.make domain_count
+ and errors = ref []
+ and any_succeed = ref false
+ and result_lock = Eio.Mutex.create ()
+ in
+ let processes =
+ List.map
+ (fun name ->
+ fun () ->
+ Eio.Semaphore.acquire sem;
+ Fun.protect
+ ~finally: (fun () -> Eio.Semaphore.release sem)
+ (fun () ->
+ let lock_result = lock_one ~env ~sw ~proc_mgr ~force ~name in
+ Eio.Mutex.lock result_lock;
+ Fun.protect
+ ~finally: (fun () -> Eio.Mutex.unlock result_lock)
+ (fun () ->
+ match lock_result with
+ | Ok() -> any_succeed := true
+ | Error err -> errors := err :: !errors
+ )
+ )
+ )
+ names
in
- match any_succeed, errors with
+ Eio.Fiber.all processes;
+ match !any_succeed, !errors with
| true, errs ->
- let warn err =
- Logs.warn (fun m -> m "Couldn’t lock: %a" Error.pp_input_foreman_error err)
- in
- List.iter warn errs;
+ List.iter
+ (fun err -> Logs.warn (fun m -> m "Couldn’t lock: %a" Error.pp_input_foreman_error err))
+ errs;
Ok ()
| false, [err] ->
Error err
| false, errs ->
- let err_str = List.map (fun err -> Fmt.str "%a" Error.pp_input_foreman_error err) errs in
+ let err_str =
+ List.map
+ (fun err -> Fmt.str "%a" Error.pp_input_foreman_error err)
+ errs
+ in
Error (`Many_errors err_str)
let lock ~env ~sw ~proc_mgr ~domain_count ?(force = false) ?names () : (unit, error) result =
@@ -593,47 +606,61 @@ let lock ~env ~sw ~proc_mgr ~domain_count ?(force = false) ?names () : (unit, er
| Some names ->
lock_many ~env ~sw ~proc_mgr ~domain_count ~force ~names
-let list_stale ~env ~sw ~proc_mgr ~domain_count ~names : (unit, error) result =
- Logs.info (fun m -> m "Listing stale …");
- let (let*) = Result.bind in
- let dm = Eio.Stdenv.domain_mgr env in
- let pool = Eio.Executor_pool.create ~sw ~domain_count dm in
- let any_succeed, stale, errors =
- names
- |> List.map
- (fun name ->
- Eio.Executor_pool.submit ~weight: 1.0 pool (fun () ->
- let* input = get name in
- match get_latest ~sw ~proc_mgr input with
- | Error err -> Error err
- | Ok None -> Ok None
- | Ok (Some new_value) -> Ok (Some (name, new_value))
- )
- )
- |> List.fold_left
- (fun (suc, sacc, errs) ->
- function
- | Ok (Ok None) -> true, sacc, errs
- | Ok (Ok (Some stale)) -> true, stale :: sacc, errs
- | Ok (Error err) -> suc, sacc, err :: errs
- | Error exn -> suc, sacc, (`Pool_exception exn) :: errs
- )
- (false, [], [])
+let list_stale ~env: _ ~sw ~proc_mgr ~domain_count ~names : (unit, error) result =
+ Logs.info (fun m ->
+ m "Listing stale for: %a" Fmt.(brackets (list ~sep: semi Name.pp)) names
+ );
+ let sem = Eio.Semaphore.make domain_count
+ and errors = ref []
+ and stale_results = ref []
+ and any_succeed = ref false
+ and result_lock = Eio.Mutex.create ()
in
- match any_succeed, errors with
+ let processes =
+ List.map
+ (fun name ->
+ fun () ->
+ Eio.Semaphore.acquire sem;
+ Fun.protect
+ ~finally: (fun () -> Eio.Semaphore.release sem)
+ (fun () ->
+ let result =
+ let (let*) = Result.bind in
+ let* input = get name in
+ match get_latest ~sw ~proc_mgr input with
+ | Error err -> Error err
+ | Ok None -> Ok None
+ | Ok (Some new_value) -> Ok (Some (name, new_value))
+ in
+ (* only hold the mutex for shared state updates *)
+ Eio.Mutex.lock result_lock;
+ Fun.protect
+ ~finally: (fun () -> Eio.Mutex.unlock result_lock)
+ (fun () ->
+ match result with
+ | Ok None -> any_succeed := true
+ | Ok (Some stale) ->
+ any_succeed := true;
+ stale_results := stale :: !stale_results
+ | Error err -> errors := err :: !errors
+ )
+ )
+ )
+ names
+ in
+ Eio.Fiber.all processes;
+ match !any_succeed, !errors with
| true, errs ->
- begin
- let warn err =
- Logs.warn (fun m -> m "Couldn’t refresh: %a" Error.pp_input_foreman_error err)
- and prnt (name, latest_value) =
+ List.iter
+ (fun err -> Logs.warn (fun m -> m "Couldn’t refresh: %a" Error.pp_input_foreman_error err))
+ errs;
+ List.iter
+ (fun (name, latest_value) ->
Logs.app (fun m -> m "%a: %s" Fmt.(styled `Green string) (Name.take name) latest_value)
- in
- List.iter warn errs;
- List.iter prnt stale;
- Ok ()
- end
- | false, [err] ->
- Error err
+ )
+ !stale_results;
+ Ok ()
+ | false, [err] -> Error err
| false, errs ->
let err_str = List.map (fun err -> Fmt.str "%a" Error.pp_input_foreman_error err) errs in
Error (`Many_errors err_str)
@@ -671,35 +698,45 @@ let refresh_one ~env ~sw ~proc_mgr ~name : (unit, error) result =
prefetch ~env ~proc_mgr ~name ()
let refresh_many ~env ~sw ~proc_mgr ~domain_count ~(names : Name.t list) : (unit, error) result =
- Logs.debug (fun m -> m "Refreshing many: %a" Fmt.(brackets (list ~sep: semi Name.pp)) names);
- let dm = Eio.Stdenv.domain_mgr env in
- let pool = Eio.Executor_pool.create ~sw ~domain_count dm in
- let any_succeed, errors =
- names
- |> List.map
- (fun name ->
- Eio.Executor_pool.submit ~weight: 1.0 pool (fun () ->
- refresh_one ~env ~sw ~proc_mgr ~name
- )
- )
- |> List.fold_left
- (fun (suc, errs) ->
- function
- | Ok (Ok()) -> true, errs
- | Ok (Error err) -> suc, err :: errs
- | Error exn -> suc, (`Pool_exception exn) :: errs
- )
- (false, [])
+ Logs.debug (fun m ->
+ m "Refreshing many: %a" Fmt.(brackets (list ~sep: semi Name.pp)) names
+ );
+ let sem = Eio.Semaphore.make domain_count
+ and errors = ref []
+ and any_succeed = ref false
+ and result_lock = Eio.Mutex.create ()
in
- match any_succeed, errors with
+ let processes =
+ List.map
+ (fun name ->
+ fun () ->
+ Eio.Semaphore.acquire sem;
+ Fun.protect
+ ~finally: (fun () -> Eio.Semaphore.release sem)
+ (fun () ->
+ let refresh_result = refresh_one ~env ~sw ~proc_mgr ~name in
+ (* only hold the mutex for the shared state update *)
+ Eio.Mutex.lock result_lock;
+ Fun.protect
+ ~finally: (fun () -> Eio.Mutex.unlock result_lock)
+ (fun () ->
+ match refresh_result with
+ | Ok() -> any_succeed := true
+ | Error err -> errors := err :: !errors
+ )
+ )
+ )
+ names
+ in
+ Eio.Fiber.all
+ processes;
+ match !any_succeed, !errors with
| true, errs ->
- let warn err =
- Logs.warn (fun m -> m "Couldn’t refresh: %a" Error.pp_input_foreman_error err)
- in
- List.iter warn errs;
+ List.iter
+ (fun err -> Logs.warn (fun m -> m "Couldn’t refresh: %a" Error.pp_input_foreman_error err))
+ errs;
Ok ()
- | false, [err] ->
- Error err
+ | false, [err] -> Error err
| false, errs ->
let err_str = List.map (fun err -> Fmt.str "%a" Error.pp_input_foreman_error err) errs in
Error (`Many_errors err_str)