From 933b704317f786357cc669e2317fd82a8ec22b36 Mon Sep 17 00:00:00 2001 From: ·𐑑𐑴𐑕𐑑𐑩𐑤 Date: Tue, 23 Dec 2025 07:31:55 +0000 Subject: foreman: use Semaphore --- lib/input_foreman.ml | 215 ++++++++++++++++++++++++++++++--------------------- 1 file changed, 126 insertions(+), 89 deletions(-) (limited to 'lib') diff --git a/lib/input_foreman.ml b/lib/input_foreman.ml index 696427b..56c46db 100644 --- a/lib/input_foreman.ml +++ b/lib/input_foreman.ml @@ -547,37 +547,50 @@ let lock_one ~env ~sw ~proc_mgr ~force ~name : (unit, error) result = Ok () let lock_many ~env ~sw ~proc_mgr ~domain_count ~force ~(names : Name.t list) : (unit, error) result = - Logs.debug (fun m -> m "Locking many: %a" Fmt.(brackets (list ~sep: semi Name.pp)) names); - let dm = Eio.Stdenv.domain_mgr env in - let pool = Eio.Executor_pool.create ~sw ~domain_count dm in - let any_succeed, errors = - names - |> List.map - (fun name -> - Eio.Executor_pool.submit ~weight: 1.0 pool (fun () -> - lock_one ~env ~sw ~proc_mgr ~force ~name - ) - ) - |> List.fold_left - (fun (suc, errs) -> - function - | Ok (Ok()) -> true, errs - | Ok (Error err) -> suc, err :: errs - | Error exn -> suc, (`Pool_exception exn) :: errs - ) - (false, []) + Logs.debug (fun m -> + m "Locking many: %a" Fmt.(brackets (list ~sep: semi Name.pp)) names + ); + let sem = Eio.Semaphore.make domain_count + and errors = ref [] + and any_succeed = ref false + and result_lock = Eio.Mutex.create () + in + let processes = + List.map + (fun name -> + fun () -> + Eio.Semaphore.acquire sem; + Fun.protect + ~finally: (fun () -> Eio.Semaphore.release sem) + (fun () -> + let lock_result = lock_one ~env ~sw ~proc_mgr ~force ~name in + Eio.Mutex.lock result_lock; + Fun.protect + ~finally: (fun () -> Eio.Mutex.unlock result_lock) + (fun () -> + match lock_result with + | Ok() -> any_succeed := true + | Error err -> errors := err :: !errors + ) + ) + ) + names in - match any_succeed, errors with + Eio.Fiber.all processes; + match !any_succeed, !errors with | true, errs -> - let warn err = - Logs.warn (fun m -> m "Couldn’t lock: %a" Error.pp_input_foreman_error err) - in - List.iter warn errs; + List.iter + (fun err -> Logs.warn (fun m -> m "Couldn’t lock: %a" Error.pp_input_foreman_error err)) + errs; Ok () | false, [err] -> Error err | false, errs -> - let err_str = List.map (fun err -> Fmt.str "%a" Error.pp_input_foreman_error err) errs in + let err_str = + List.map + (fun err -> Fmt.str "%a" Error.pp_input_foreman_error err) + errs + in Error (`Many_errors err_str) let lock ~env ~sw ~proc_mgr ~domain_count ?(force = false) ?names () : (unit, error) result = @@ -593,47 +606,61 @@ let lock ~env ~sw ~proc_mgr ~domain_count ?(force = false) ?names () : (unit, er | Some names -> lock_many ~env ~sw ~proc_mgr ~domain_count ~force ~names -let list_stale ~env ~sw ~proc_mgr ~domain_count ~names : (unit, error) result = - Logs.info (fun m -> m "Listing stale …"); - let (let*) = Result.bind in - let dm = Eio.Stdenv.domain_mgr env in - let pool = Eio.Executor_pool.create ~sw ~domain_count dm in - let any_succeed, stale, errors = - names - |> List.map - (fun name -> - Eio.Executor_pool.submit ~weight: 1.0 pool (fun () -> - let* input = get name in - match get_latest ~sw ~proc_mgr input with - | Error err -> Error err - | Ok None -> Ok None - | Ok (Some new_value) -> Ok (Some (name, new_value)) - ) - ) - |> List.fold_left - (fun (suc, sacc, errs) -> - function - | Ok (Ok None) -> true, sacc, errs - | Ok (Ok (Some stale)) -> true, stale :: sacc, errs - | Ok (Error err) -> suc, sacc, err :: errs - | Error exn -> suc, sacc, (`Pool_exception exn) :: errs - ) - (false, [], []) +let list_stale ~env: _ ~sw ~proc_mgr ~domain_count ~names : (unit, error) result = + Logs.info (fun m -> + m "Listing stale for: %a" Fmt.(brackets (list ~sep: semi Name.pp)) names + ); + let sem = Eio.Semaphore.make domain_count + and errors = ref [] + and stale_results = ref [] + and any_succeed = ref false + and result_lock = Eio.Mutex.create () in - match any_succeed, errors with + let processes = + List.map + (fun name -> + fun () -> + Eio.Semaphore.acquire sem; + Fun.protect + ~finally: (fun () -> Eio.Semaphore.release sem) + (fun () -> + let result = + let (let*) = Result.bind in + let* input = get name in + match get_latest ~sw ~proc_mgr input with + | Error err -> Error err + | Ok None -> Ok None + | Ok (Some new_value) -> Ok (Some (name, new_value)) + in + (* only hold the mutex for shared state updates *) + Eio.Mutex.lock result_lock; + Fun.protect + ~finally: (fun () -> Eio.Mutex.unlock result_lock) + (fun () -> + match result with + | Ok None -> any_succeed := true + | Ok (Some stale) -> + any_succeed := true; + stale_results := stale :: !stale_results + | Error err -> errors := err :: !errors + ) + ) + ) + names + in + Eio.Fiber.all processes; + match !any_succeed, !errors with | true, errs -> - begin - let warn err = - Logs.warn (fun m -> m "Couldn’t refresh: %a" Error.pp_input_foreman_error err) - and prnt (name, latest_value) = + List.iter + (fun err -> Logs.warn (fun m -> m "Couldn’t refresh: %a" Error.pp_input_foreman_error err)) + errs; + List.iter + (fun (name, latest_value) -> Logs.app (fun m -> m "%a: %s" Fmt.(styled `Green string) (Name.take name) latest_value) - in - List.iter warn errs; - List.iter prnt stale; - Ok () - end - | false, [err] -> - Error err + ) + !stale_results; + Ok () + | false, [err] -> Error err | false, errs -> let err_str = List.map (fun err -> Fmt.str "%a" Error.pp_input_foreman_error err) errs in Error (`Many_errors err_str) @@ -671,35 +698,45 @@ let refresh_one ~env ~sw ~proc_mgr ~name : (unit, error) result = prefetch ~env ~proc_mgr ~name () let refresh_many ~env ~sw ~proc_mgr ~domain_count ~(names : Name.t list) : (unit, error) result = - Logs.debug (fun m -> m "Refreshing many: %a" Fmt.(brackets (list ~sep: semi Name.pp)) names); - let dm = Eio.Stdenv.domain_mgr env in - let pool = Eio.Executor_pool.create ~sw ~domain_count dm in - let any_succeed, errors = - names - |> List.map - (fun name -> - Eio.Executor_pool.submit ~weight: 1.0 pool (fun () -> - refresh_one ~env ~sw ~proc_mgr ~name - ) - ) - |> List.fold_left - (fun (suc, errs) -> - function - | Ok (Ok()) -> true, errs - | Ok (Error err) -> suc, err :: errs - | Error exn -> suc, (`Pool_exception exn) :: errs - ) - (false, []) + Logs.debug (fun m -> + m "Refreshing many: %a" Fmt.(brackets (list ~sep: semi Name.pp)) names + ); + let sem = Eio.Semaphore.make domain_count + and errors = ref [] + and any_succeed = ref false + and result_lock = Eio.Mutex.create () in - match any_succeed, errors with + let processes = + List.map + (fun name -> + fun () -> + Eio.Semaphore.acquire sem; + Fun.protect + ~finally: (fun () -> Eio.Semaphore.release sem) + (fun () -> + let refresh_result = refresh_one ~env ~sw ~proc_mgr ~name in + (* only hold the mutex for the shared state update *) + Eio.Mutex.lock result_lock; + Fun.protect + ~finally: (fun () -> Eio.Mutex.unlock result_lock) + (fun () -> + match refresh_result with + | Ok() -> any_succeed := true + | Error err -> errors := err :: !errors + ) + ) + ) + names + in + Eio.Fiber.all + processes; + match !any_succeed, !errors with | true, errs -> - let warn err = - Logs.warn (fun m -> m "Couldn’t refresh: %a" Error.pp_input_foreman_error err) - in - List.iter warn errs; + List.iter + (fun err -> Logs.warn (fun m -> m "Couldn’t refresh: %a" Error.pp_input_foreman_error err)) + errs; Ok () - | false, [err] -> - Error err + | false, [err] -> Error err | false, errs -> let err_str = List.map (fun err -> Fmt.str "%a" Error.pp_input_foreman_error err) errs in Error (`Many_errors err_str) -- cgit v1.2.3