vmm_stats_lwt: track pids for each socket connection to tear them down individually
This commit is contained in:
parent
2bb808105e
commit
c04f062960
|
@ -349,7 +349,7 @@ let cmd =
|
|||
`P "$(tname) connects to a server and initiates a TLS handshake" ]
|
||||
in
|
||||
Term.(pure run_client $ setup_log $ cas $ client_cert $ client_key $ destination $ db $ address $ port),
|
||||
Term.info "vmm_client" ~version:"%%VERSION_NUM%%" ~doc ~man
|
||||
Term.info "vmm_prometheus_stats" ~version:"%%VERSION_NUM%%" ~doc ~man
|
||||
|
||||
let () =
|
||||
match Term.eval cmd
|
||||
|
|
|
@ -156,7 +156,8 @@ let remove_pid t pid =
|
|||
let pid_nic = IM.remove pid t.pid_nic in
|
||||
{ t with pid_nic }
|
||||
|
||||
let remove_all t = IM.iter (fun pid _ -> ignore (remove_pid t pid)) t.pid_nic
|
||||
let remove_pids t pids =
|
||||
List.fold_left remove_pid t pids
|
||||
|
||||
let handle t hdr buf =
|
||||
let open Vmm_wire in
|
||||
|
@ -170,19 +171,19 @@ let handle t hdr buf =
|
|||
| Some Add ->
|
||||
decode_pid_taps cs >>= fun (pid, taps) ->
|
||||
add_pid t pid taps >>= fun t ->
|
||||
Ok (t, success ~msg:"added" hdr.id my_version)
|
||||
Ok (t, `Add pid, success ~msg:"added" hdr.id my_version)
|
||||
| Some Remove ->
|
||||
decode_pid cs >>= fun pid ->
|
||||
let t = remove_pid t pid in
|
||||
Ok (t, success ~msg:"removed" hdr.id my_version)
|
||||
Ok (t, `Remove pid, success ~msg:"removed" hdr.id my_version)
|
||||
| Some Stat_request ->
|
||||
decode_pid cs >>= fun pid ->
|
||||
stats t pid >>= fun s ->
|
||||
Ok (t, stat_reply hdr.id my_version (encode_stats s))
|
||||
Ok (t, `None, stat_reply hdr.id my_version (encode_stats s))
|
||||
| _ -> Error (`Msg "unknown command")
|
||||
in
|
||||
match r with
|
||||
| Ok (t, out) -> t, out
|
||||
| Ok (t, action, out) -> t, action, out
|
||||
| Error (`Msg msg) ->
|
||||
Logs.err (fun m -> m "error while processing %s" msg) ;
|
||||
t, fail ~msg hdr.id my_version
|
||||
t, `None, fail ~msg hdr.id my_version
|
||||
|
|
|
@ -23,24 +23,29 @@ let pp_sockaddr ppf = function
|
|||
|
||||
let handle s addr () =
|
||||
Logs.info (fun m -> m "handling stats connection %a" pp_sockaddr addr) ;
|
||||
let rec loop () =
|
||||
let rec loop acc =
|
||||
Vmm_lwt.read_exactly s >>= function
|
||||
| Error (`Msg msg) -> Logs.err (fun m -> m "error while reading %s" msg) ; loop ()
|
||||
| Error _ -> Logs.err (fun m -> m "exception while reading") ; Lwt.return_unit
|
||||
| Error (`Msg msg) -> Logs.err (fun m -> m "error while reading %s" msg) ; loop acc
|
||||
| Error _ -> Logs.err (fun m -> m "exception while reading") ; Lwt.return acc
|
||||
| Ok (hdr, data) ->
|
||||
Logs.debug (fun m -> m "received %a" Cstruct.hexdump_pp (Cstruct.of_string data)) ;
|
||||
let t', out = Vmm_stats.handle !t hdr data in
|
||||
let t', action, out = Vmm_stats.handle !t hdr data in
|
||||
let acc = match action with
|
||||
| `Add pid -> pid :: acc
|
||||
| `Remove pid -> List.filter (fun m -> m <> pid) acc
|
||||
| `None -> acc
|
||||
in
|
||||
t := t' ;
|
||||
Logs.debug (fun m -> m "sent %a" Cstruct.hexdump_pp (Cstruct.of_string out)) ;
|
||||
Vmm_lwt.write_raw s out >>= function
|
||||
| Ok () -> loop ()
|
||||
| Error _ -> Logs.err (fun m -> m "exception while writing") ; Lwt.return_unit
|
||||
| Ok () -> loop acc
|
||||
| Error _ -> Logs.err (fun m -> m "exception while writing") ; Lwt.return acc
|
||||
in
|
||||
loop () >>= fun () ->
|
||||
loop [] >>= fun pids ->
|
||||
Lwt.catch (fun () -> Lwt_unix.close s) (fun _ -> Lwt.return_unit) >|= fun () ->
|
||||
Logs.warn (fun m -> m "disconnect, dropping vmm_stats!") ;
|
||||
Vmm_stats.remove_all !t ;
|
||||
t := Vmm_stats.empty ()
|
||||
Logs.warn (fun m -> m "disconnect, dropping %d pids!" (List.length pids)) ;
|
||||
let t' = Vmm_stats.remove_pids !t pids in
|
||||
t := t'
|
||||
|
||||
let rec timer interval () =
|
||||
t := Vmm_stats.tick !t ;
|
||||
|
|
Loading…
Reference in a new issue