diff --git a/app/vmm_prometheus_stats.ml b/app/vmm_prometheus_stats.ml index 98a2df9..f46398e 100644 --- a/app/vmm_prometheus_stats.ml +++ b/app/vmm_prometheus_stats.ml @@ -349,7 +349,7 @@ let cmd = `P "$(tname) connects to a server and initiates a TLS handshake" ] in Term.(pure run_client $ setup_log $ cas $ client_cert $ client_key $ destination $ db $ address $ port), - Term.info "vmm_client" ~version:"%%VERSION_NUM%%" ~doc ~man + Term.info "vmm_prometheus_stats" ~version:"%%VERSION_NUM%%" ~doc ~man let () = match Term.eval cmd diff --git a/stats/vmm_stats.ml b/stats/vmm_stats.ml index 9f406d3..9778df5 100644 --- a/stats/vmm_stats.ml +++ b/stats/vmm_stats.ml @@ -156,7 +156,8 @@ let remove_pid t pid = let pid_nic = IM.remove pid t.pid_nic in { t with pid_nic } -let remove_all t = IM.iter (fun pid _ -> ignore (remove_pid t pid)) t.pid_nic +let remove_pids t pids = + List.fold_left remove_pid t pids let handle t hdr buf = let open Vmm_wire in @@ -170,19 +171,19 @@ let handle t hdr buf = | Some Add -> decode_pid_taps cs >>= fun (pid, taps) -> add_pid t pid taps >>= fun t -> - Ok (t, success ~msg:"added" hdr.id my_version) + Ok (t, `Add pid, success ~msg:"added" hdr.id my_version) | Some Remove -> decode_pid cs >>= fun pid -> let t = remove_pid t pid in - Ok (t, success ~msg:"removed" hdr.id my_version) + Ok (t, `Remove pid, success ~msg:"removed" hdr.id my_version) | Some Stat_request -> decode_pid cs >>= fun pid -> stats t pid >>= fun s -> - Ok (t, stat_reply hdr.id my_version (encode_stats s)) + Ok (t, `None, stat_reply hdr.id my_version (encode_stats s)) | _ -> Error (`Msg "unknown command") in match r with - | Ok (t, out) -> t, out + | Ok (t, action, out) -> t, action, out | Error (`Msg msg) -> Logs.err (fun m -> m "error while processing %s" msg) ; - t, fail ~msg hdr.id my_version + t, `None, fail ~msg hdr.id my_version diff --git a/stats/vmm_stats_lwt.ml b/stats/vmm_stats_lwt.ml index 1bcadac..ab2aab8 100644 --- a/stats/vmm_stats_lwt.ml +++ b/stats/vmm_stats_lwt.ml @@ -23,24 +23,29 @@ let pp_sockaddr ppf = function let handle s addr () = Logs.info (fun m -> m "handling stats connection %a" pp_sockaddr addr) ; - let rec loop () = + let rec loop acc = Vmm_lwt.read_exactly s >>= function - | Error (`Msg msg) -> Logs.err (fun m -> m "error while reading %s" msg) ; loop () - | Error _ -> Logs.err (fun m -> m "exception while reading") ; Lwt.return_unit + | Error (`Msg msg) -> Logs.err (fun m -> m "error while reading %s" msg) ; loop acc + | Error _ -> Logs.err (fun m -> m "exception while reading") ; Lwt.return acc | Ok (hdr, data) -> Logs.debug (fun m -> m "received %a" Cstruct.hexdump_pp (Cstruct.of_string data)) ; - let t', out = Vmm_stats.handle !t hdr data in + let t', action, out = Vmm_stats.handle !t hdr data in + let acc = match action with + | `Add pid -> pid :: acc + | `Remove pid -> List.filter (fun m -> m <> pid) acc + | `None -> acc + in t := t' ; Logs.debug (fun m -> m "sent %a" Cstruct.hexdump_pp (Cstruct.of_string out)) ; Vmm_lwt.write_raw s out >>= function - | Ok () -> loop () - | Error _ -> Logs.err (fun m -> m "exception while writing") ; Lwt.return_unit + | Ok () -> loop acc + | Error _ -> Logs.err (fun m -> m "exception while writing") ; Lwt.return acc in - loop () >>= fun () -> + loop [] >>= fun pids -> Lwt.catch (fun () -> Lwt_unix.close s) (fun _ -> Lwt.return_unit) >|= fun () -> - Logs.warn (fun m -> m "disconnect, dropping vmm_stats!") ; - Vmm_stats.remove_all !t ; - t := Vmm_stats.empty () + Logs.warn (fun m -> m "disconnect, dropping %d pids!" (List.length pids)) ; + let t' = Vmm_stats.remove_pids !t pids in + t := t' let rec timer interval () = t := Vmm_stats.tick !t ;