stats are back now! no longer two pullers, but now with one pusher :)

This commit is contained in:
Hannes Mehnert 2018-09-20 22:53:42 +02:00
parent e7b4742964
commit 99ba1c5e4b
6 changed files with 231 additions and 199 deletions

View file

@ -140,12 +140,10 @@ module P = struct
vm ifd.name (String.concat ~sep:"," fields) vm ifd.name (String.concat ~sep:"," fields)
end end
let my_version = `WV1 let my_version = `WV2
let command = ref 1L let command = ref 1L
let (req : string IM64.t ref) = ref IM64.empty
let str_of_e = function let str_of_e = function
| `Eof -> "end of file" | `Eof -> "end of file"
| `Exception -> "exception" | `Exception -> "exception"
@ -160,65 +158,67 @@ let safe_close s =
Logs.err (fun m -> m "exception %s while closing" (Printexc.to_string e)) ; Logs.err (fun m -> m "exception %s while closing" (Printexc.to_string e)) ;
Lwt.return_unit) Lwt.return_unit)
let rec read_sock_write_tcp closing db c ?fd addr addrtype = let rec read_sock_write_tcp c ?fd addr addrtype =
match fd with match fd with
| None -> | None ->
if !closing then Logs.debug (fun m -> m "new connection to TCP") ;
Lwt.return_unit let fd = Lwt_unix.socket addrtype Lwt_unix.SOCK_STREAM 0 in
else begin Lwt_unix.setsockopt fd Lwt_unix.SO_KEEPALIVE true ;
Logs.debug (fun m -> m "new connection to TCP") ; Lwt.catch
let fd = Lwt_unix.socket addrtype Lwt_unix.SOCK_STREAM 0 in (fun () ->
Lwt_unix.setsockopt fd Lwt_unix.SO_KEEPALIVE true ; Lwt_unix.connect fd addr >|= fun () ->
Lwt.catch Logs.debug (fun m -> m "connected to TCP") ;
(fun () -> Some fd)
Lwt_unix.connect fd addr >|= fun () -> (fun e ->
Logs.debug (fun m -> m "connected to TCP") ; let addr', port = match addr with
Some fd) | Lwt_unix.ADDR_INET (ip, port) -> Unix.string_of_inet_addr ip, port
(fun e -> | Lwt_unix.ADDR_UNIX addr -> addr, 0
let addr', port = match addr with in
| Lwt_unix.ADDR_INET (ip, port) -> Unix.string_of_inet_addr ip, port Logs.warn (fun m -> m "error %s connecting to influxd %s:%d, retrying in 5s"
| Lwt_unix.ADDR_UNIX addr -> addr, 0 (Printexc.to_string e) addr' port) ;
in safe_close fd >>= fun () ->
Logs.warn (fun m -> m "error %s connecting to influxd %s:%d, retrying in 5s" Lwt_unix.sleep 5.0 >|= fun () ->
(Printexc.to_string e) addr' port) ; None) >>= fun fd ->
safe_close fd >>= fun () -> read_sock_write_tcp c ?fd addr addrtype
Lwt_unix.sleep 5.0 >|= fun () ->
None) >>= fun fd ->
read_sock_write_tcp closing db c ?fd addr addrtype
end
| Some fd -> | Some fd ->
if !closing then let open Vmm_wire in
safe_close fd Logs.debug (fun m -> m "reading from unix socket") ;
else begin Vmm_lwt.read_wire c >>= function
let open Vmm_wire in | Error e ->
Logs.debug (fun m -> m "reading from unix socket") ; Logs.err (fun m -> m "error %s while reading vmm socket (return)"
Vmm_lwt.read_wire c >>= function (str_of_e e)) ;
| Error e -> safe_close fd >>= fun () ->
Logs.err (fun m -> m "error %s while reading vmm socket (return)" safe_close c >|= fun () ->
(str_of_e e)) ; true
closing := true ; | Ok (hdr, data) ->
safe_close fd if not (version_eq hdr.version my_version) then begin
| Ok (hdr, data) -> Logs.err (fun m -> m "unknown wire protocol version") ;
let name = safe_close fd >>= fun () ->
try IM64.find hdr.id !req safe_close c >|= fun () ->
with Not_found -> "not found" false
in end else if Vmm_wire.is_fail hdr then begin
req := IM64.remove hdr.id !req ; Logs.err (fun m -> m "failed to retrieve statistics") ;
(if not (version_eq hdr.version my_version) then begin safe_close fd >>= fun () ->
Logs.err (fun m -> m "unknown wire protocol version") ; safe_close c >|= fun () ->
closing := true ; false
safe_close fd >|= fun () -> end else if Vmm_wire.is_reply hdr then begin
None Logs.info (fun m -> m "received reply, continuing") ;
end else if Vmm_wire.is_fail hdr then begin read_sock_write_tcp c ~fd addr addrtype
Logs.err (fun m -> m "failed to retrieve statistics for %s" name) ; end else
Lwt.return (Some fd) (match Vmm_wire.Stats.int_to_op hdr.Vmm_wire.tag with
end else if Vmm_wire.is_reply hdr then | Some Vmm_wire.Stats.Data ->
begin match Vmm_wire.Stats.decode_stats data with begin
let r =
let open Rresult.R.Infix in
Vmm_wire.decode_strings data >>= fun (id, off) ->
Vmm_wire.Stats.decode_stats (Cstruct.shift data off) >>| fun stats ->
(Vmm_core.string_of_id id, stats)
in
match r with
| Error (`Msg msg) -> | Error (`Msg msg) ->
Logs.warn (fun m -> m "error %s while decoding stats %s, ignoring" Logs.warn (fun m -> m "error %s while decoding stats, ignoring" msg) ;
msg name) ;
Lwt.return (Some fd) Lwt.return (Some fd)
| Ok (ru, vmm, ifs) -> | Ok (name, (ru, vmm, ifs)) ->
let ru = P.encode_ru name ru in let ru = P.encode_ru name ru in
let vmm = P.encode_vmm name vmm in let vmm = P.encode_vmm name vmm in
let taps = List.map (P.encode_if name) ifs in let taps = List.map (P.encode_if name) ifs in
@ -234,37 +234,23 @@ let rec read_sock_write_tcp closing db c ?fd addr addrtype =
safe_close fd >|= fun () -> safe_close fd >|= fun () ->
None None
end end
else begin | _ ->
Logs.err (fun m -> m "unhandled tag %lu for %s" hdr.tag name) ; Logs.err (fun m -> m "unhandled tag %lu" hdr.tag) ;
Lwt.return (Some fd) Lwt.return (Some fd)) >>= fun fd ->
end) >>= fun fd -> read_sock_write_tcp c ?fd addr addrtype
read_sock_write_tcp closing db c ?fd addr addrtype
end
let rec query_sock closing prefix db c interval = let query_sock vms c =
(* query c for everyone in db *) (* query c for everyone in db *)
if !closing then Lwt_list.fold_left_s (fun r name ->
Lwt.return_unit match r with
else | Error e -> Lwt.return (Error e)
Lwt_list.fold_left_s (fun r (id, name) -> | Ok () ->
match r with let id = Astring.String.cuts ~sep:"." name in
| Error e -> Lwt.return (Error e) let request = Vmm_wire.Stats.stat !command my_version id in
| Ok () -> command := Int64.succ !command ;
let id = identifier id in Logs.debug (fun m -> m "%Lu requesting %a via socket" !command pp_id id) ;
let id = match prefix with None -> [ id ] | Some p -> [ p ; id ] in Vmm_lwt.write_wire c request)
let request = Vmm_wire.Stats.stat !command my_version id in (Ok ()) vms
req := IM64.add !command name !req ;
command := Int64.succ !command ;
Logs.debug (fun m -> m "%Lu requesting %a via socket" !command pp_id id) ;
Vmm_lwt.write_wire c request)
(Ok ()) db >>= function
| Error e ->
Logs.err (fun m -> m "error %s while writing to vmm socket" (str_of_e e)) ;
closing := true ;
Lwt.return_unit
| Ok () ->
Lwt_unix.sleep (float_of_int interval) >>= fun () ->
query_sock closing prefix db c interval
let rec maybe_connect stat_socket = let rec maybe_connect stat_socket =
let c = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in let c = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in
@ -281,10 +267,7 @@ let rec maybe_connect stat_socket =
Lwt_unix.sleep (float_of_int 5) >>= fun () -> Lwt_unix.sleep (float_of_int 5) >>= fun () ->
maybe_connect stat_socket) maybe_connect stat_socket)
let client stat_socket influxhost influxport db prefix interval = let client stat_socket influxhost influxport vms =
(* start a socket connection to vmm_stats *)
maybe_connect stat_socket >>= fun c ->
(* figure out address of influx *) (* figure out address of influx *)
Lwt_unix.gethostbyname influxhost >>= fun host_entry -> Lwt_unix.gethostbyname influxhost >>= fun host_entry ->
let host_inet_addr = Array.get host_entry.Lwt_unix.h_addr_list 0 in let host_inet_addr = Array.get host_entry.Lwt_unix.h_addr_list 0 in
@ -293,7 +276,7 @@ let client stat_socket influxhost influxport db prefix interval =
in in
(* loop *) (* loop *)
(* the query task queries the stat_socket at each interval (* the query task queries the stat_socket at each
- if this fails, closing is set to true (and unit is returned) - if this fails, closing is set to true (and unit is returned)
the read_sock reads the stat_socket, and forwards to a TCP socket the read_sock reads the stat_socket, and forwards to a TCP socket
@ -305,28 +288,23 @@ let client stat_socket influxhost influxport db prefix interval =
- query_sock/read_sock_write_tcp write an read from it - query_sock/read_sock_write_tcp write an read from it
- on failure in read or write, the TCP connection is closed, and loop - on failure in read or write, the TCP connection is closed, and loop
takes control: safe_close, maybe_connect, rinse, repeat *) takes control: safe_close, maybe_connect, rinse, repeat *)
let rec loop c =
let closing = ref false in
Lwt.join [
query_sock closing prefix db c interval ;
read_sock_write_tcp closing db c addr addrtype
] >>= fun () ->
safe_close c >>= fun () ->
maybe_connect stat_socket >>= fun c ->
loop c
in
loop c
let run_client _ socket (influxhost, influxport) db prefix interval = let rec loop () =
Sys.(set_signal sigpipe Signal_ignore) ; (* start a socket connection to vmm_stats *)
let db = maybe_connect stat_socket >>= fun c ->
let open Rresult.R.Infix in query_sock vms c >>= function
match Bos.OS.File.read_lines (Fpath.v db) >>= parse_db with | Error e ->
| Ok [] -> invalid_arg "empty database" Logs.err (fun m -> m "error %s while writing to stat socket" (str_of_e e)) ;
| Ok db -> db Lwt.return_unit
| Error (`Msg m) -> invalid_arg ("couldn't parse database " ^ m) | Ok () ->
read_sock_write_tcp c addr addrtype >>= fun restart ->
if restart then loop () else Lwt.return_unit
in in
Lwt_main.run (client socket influxhost influxport db prefix interval) loop ()
let run_client _ socket (influxhost, influxport) vms =
Sys.(set_signal sigpipe Signal_ignore) ;
Lwt_main.run (client socket influxhost influxport vms)
let setup_log style_renderer level = let setup_log style_renderer level =
Fmt_tty.setup_std_outputs ?style_renderer (); Fmt_tty.setup_std_outputs ?style_renderer ();
@ -361,17 +339,9 @@ let influx =
Arg.(required & pos 0 (some host_port) None & info [] ~docv:"influx" Arg.(required & pos 0 (some host_port) None & info [] ~docv:"influx"
~doc:"the influx hostname:port to connect to") ~doc:"the influx hostname:port to connect to")
let db = let vms =
let doc = "VMID database" in let doc = "virtual machine names" in
Arg.(required & pos 1 (some file) None & info [] ~doc) Arg.(value & opt_all string [] & info [ "n" ; "name" ] ~doc)
let prefix =
let doc = "prefix" in
Arg.(value & opt (some string) None & info [ "prefix" ] ~doc)
let interval =
let doc = "Poll interval in seconds" in
Arg.(value & opt int 10 & info [ "interval" ] ~doc)
let cmd = let cmd =
let doc = "VMM InfluxDB connector" in let doc = "VMM InfluxDB connector" in
@ -379,7 +349,7 @@ let cmd =
`S "DESCRIPTION" ; `S "DESCRIPTION" ;
`P "$(tname) connects to a vmm stats socket, pulls statistics and pushes them via TCP to influxdb" ] `P "$(tname) connects to a vmm stats socket, pulls statistics and pushes them via TCP to influxdb" ]
in in
Term.(pure run_client $ setup_log $ socket $ influx $ db $ prefix $ interval), Term.(pure run_client $ setup_log $ socket $ influx $ vms),
Term.info "vmm_influxdb_stats" ~version:"%%VERSION_NUM%%" ~doc ~man Term.info "vmm_influxdb_stats" ~version:"%%VERSION_NUM%%" ~doc ~man
let () = let () =

View file

@ -43,8 +43,7 @@ let connect socket_path =
let info_ _ opt_socket name = let info_ _ opt_socket name =
Lwt_main.run ( Lwt_main.run (
connect (socket `Vmmd opt_socket) >>= fun fd -> connect (socket `Vmmd opt_socket) >>= fun fd ->
let name' = Astring.String.cuts ~empty:false ~sep:"." name in let info = Vmm_wire.Vm.info my_command my_version name in
let info = Vmm_wire.Vm.info my_command my_version name' in
(Vmm_lwt.write_wire fd info >>= function (Vmm_lwt.write_wire fd info >>= function
| Ok () -> | Ok () ->
(process fd >|= function (process fd >|= function
@ -65,7 +64,7 @@ let info_ _ opt_socket name =
let really_destroy opt_socket name = let really_destroy opt_socket name =
connect (socket `Vmmd opt_socket) >>= fun fd -> connect (socket `Vmmd opt_socket) >>= fun fd ->
let cmd = Vmm_wire.Vm.destroy my_command my_version (Astring.String.cuts ~empty:false ~sep:"." name) in let cmd = Vmm_wire.Vm.destroy my_command my_version name in
(Vmm_lwt.write_wire fd cmd >>= function (Vmm_lwt.write_wire fd cmd >>= function
| Ok () -> | Ok () ->
(process fd >|= function (process fd >|= function
@ -83,7 +82,7 @@ let create _ opt_socket force name image cpuid requested_memory boot_params bloc
| Ok data -> data | Ok data -> data
| Error (`Msg s) -> invalid_arg s | Error (`Msg s) -> invalid_arg s
in in
let prefix, vname = match List.rev (Astring.String.cuts ~empty:false ~sep:"." name) with let prefix, vname = match List.rev name with
| [ name ] -> [], name | [ name ] -> [], name
| name::tl -> List.rev tl, name | name::tl -> List.rev tl, name
| [] -> assert false | [] -> assert false
@ -116,7 +115,7 @@ let create _ opt_socket force name image cpuid requested_memory boot_params bloc
let console _ opt_socket name = let console _ opt_socket name =
Lwt_main.run ( Lwt_main.run (
connect (socket `Console opt_socket) >>= fun fd -> connect (socket `Console opt_socket) >>= fun fd ->
let cmd = Vmm_wire.Console.attach my_command my_version (Astring.String.cuts ~empty:false ~sep:"." name) in let cmd = Vmm_wire.Console.attach my_command my_version name in
(Vmm_lwt.write_wire fd cmd >>= function (Vmm_lwt.write_wire fd cmd >>= function
| Error `Exception -> | Error `Exception ->
Logs.err (fun m -> m "couldn't write to socket") ; Logs.err (fun m -> m "couldn't write to socket") ;
@ -147,7 +146,7 @@ let console _ opt_socket name =
let r = let r =
let open Rresult.R.Infix in let open Rresult.R.Infix in
match Vmm_wire.Console.int_to_op hdr.Vmm_wire.tag with match Vmm_wire.Console.int_to_op hdr.Vmm_wire.tag with
| Some Data -> | Some Vmm_wire.Console.Data ->
Vmm_wire.decode_id_ts data >>= fun ((name, ts), off) -> Vmm_wire.decode_id_ts data >>= fun ((name, ts), off) ->
Vmm_wire.decode_string (Cstruct.shift data off) >>= fun (msg, _) -> Vmm_wire.decode_string (Cstruct.shift data off) >>= fun (msg, _) ->
Logs.app (fun m -> m "%a %a: %s" Ptime.pp ts Vmm_core.pp_id name msg) ; Logs.app (fun m -> m "%a %a: %s" Ptime.pp ts Vmm_core.pp_id name msg) ;
@ -165,6 +164,62 @@ let console _ opt_socket name =
Vmm_lwt.safe_close fd) ; Vmm_lwt.safe_close fd) ;
`Ok () `Ok ()
let stats _ opt_socket vms =
Lwt_main.run (
connect (socket `Stats opt_socket) >>= fun fd ->
let count = ref 0L in
Lwt_list.iter_s (fun name ->
let cmd = Vmm_wire.Stats.stat !count my_version name in
count := Int64.succ !count ;
Vmm_lwt.write_wire fd cmd >>= function
| Error `Exception -> Lwt.fail_with "write error"
| Ok () -> Lwt.return_unit) vms >>= fun () ->
(* now we busy read and process stat output *)
let rec loop () =
Vmm_lwt.read_wire fd >>= function
| Error (`Msg msg) -> Logs.err (fun m -> m "error while reading %s" msg) ; loop ()
| Error _ -> Logs.err (fun m -> m "exception while reading") ; Lwt.return_unit
| Ok (hdr, data) ->
if Vmm_wire.is_fail hdr then
let msg = match Vmm_wire.decode_string data with
| Error _ -> None
| Ok (m, _) -> Some m
in
Logs.err (fun m -> m "operation failed: %a" Fmt.(option ~none:(unit "") string) msg) ;
Lwt.return_unit
else if Vmm_wire.is_reply hdr then
let msg = match Vmm_wire.decode_string data with
| Error _ -> None
| Ok (m, _) -> Some m
in
Logs.app (fun m -> m "operation succeeded: %a" Fmt.(option ~none:(unit "") string) msg) ;
loop ()
else
let r =
let open Rresult.R.Infix in
match Vmm_wire.Stats.int_to_op hdr.Vmm_wire.tag with
| Some Vmm_wire.Stats.Data ->
Vmm_wire.decode_strings data >>= fun (id, off) ->
Vmm_wire.Stats.decode_stats (Cstruct.shift data off) >>| fun stats ->
(Astring.String.concat ~sep:"." id, stats)
| _ ->
Error (`Msg (Printf.sprintf "unknown operation %lx" hdr.Vmm_wire.tag))
in
match r with
| Ok (name, (ru, vmm, ifs)) ->
Logs.app (fun m -> m "stats %s: %a %a %a"
name Vmm_core.pp_rusage ru
Fmt.(list ~sep:(unit "@.") (pair ~sep:(unit ": ") string int64)) vmm
Fmt.(list ~sep:(unit "@.") Vmm_core.pp_ifdata) ifs) ;
loop ()
| Error (`Msg msg) ->
Logs.err (fun m -> m "%s" msg) ;
Lwt.return_unit
in
loop () >>= fun () ->
Vmm_lwt.safe_close fd) ;
`Ok ()
let help _ _ man_format cmds = function let help _ _ man_format cmds = function
| None -> `Help (`Pager, None) | None -> `Help (`Pager, None)
| Some t when List.mem t cmds -> `Help (man_format, Some t) | Some t when List.mem t cmds -> `Help (man_format, Some t)
@ -194,9 +249,14 @@ let image =
let doc = "File of virtual machine image." in let doc = "File of virtual machine image." in
Arg.(required & pos 1 (some file) None & info [] ~doc) Arg.(required & pos 1 (some file) None & info [] ~doc)
let vm_c =
let parse s = `Ok (Vmm_core.id_of_string s)
in
(parse, Vmm_core.pp_id)
let vm_name = let vm_name =
let doc = "Name virtual machine." in let doc = "Name virtual machine." in
Arg.(required & pos 0 (some string) None & info [] ~doc) Arg.(required & pos 0 (some vm_c) None & info [] ~doc)
let destroy_cmd = let destroy_cmd =
let doc = "destroys a virtual machine" in let doc = "destroys a virtual machine" in
@ -246,14 +306,27 @@ let create_cmd =
Term.info "create" ~doc ~man Term.info "create" ~doc ~man
let console_cmd = let console_cmd =
let doc = "console of a VMs" in let doc = "console of a VM" in
let man = let man =
[`S "DESCRIPTION"; [`S "DESCRIPTION";
`P "Shows console output of a VMs."] `P "Shows console output of a VM."]
in in
Term.(ret (const console $ setup_log $ socket $ vm_name)), Term.(ret (const console $ setup_log $ socket $ vm_name)),
Term.info "console" ~doc ~man Term.info "console" ~doc ~man
let vm_names =
let doc = "Name virtual machine." in
Arg.(value & opt_all vm_c [] & info [ "n" ; "name" ] ~doc)
let stats_cmd =
let doc = "statistics of VMs" in
let man =
[`S "DESCRIPTION";
`P "Shows statistics of VMs."]
in
Term.(ret (const stats $ setup_log $ socket $ vm_names)),
Term.info "stats" ~doc ~man
let help_cmd = let help_cmd =
let topic = let topic =
let doc = "The topic to get help on. `topics' lists the topics." in let doc = "The topic to get help on. `topics' lists the topics." in
@ -276,7 +349,7 @@ let default_cmd =
Term.(ret (const help $ setup_log $ socket $ Term.man_format $ Term.choice_names $ Term.pure None)), Term.(ret (const help $ setup_log $ socket $ Term.man_format $ Term.choice_names $ Term.pure None)),
Term.info "vmmc" ~version:"%%VERSION_NUM%%" ~doc ~man Term.info "vmmc" ~version:"%%VERSION_NUM%%" ~doc ~man
let cmds = [ help_cmd ; info_cmd ; destroy_cmd ; create_cmd ; console_cmd ] let cmds = [ help_cmd ; info_cmd ; destroy_cmd ; create_cmd ; console_cmd ; stats_cmd ]
let () = let () =
match Term.eval_choice default_cmd cmds match Term.eval_choice default_cmd cmds

View file

@ -7,6 +7,9 @@ open Vmm_core
open Rresult open Rresult
open R.Infix open R.Infix
let handle_command t s prefix perms hdr buf = let handle_command t s prefix perms hdr buf =
let res = let res =
if not (Vmm_wire.version_eq hdr.Vmm_wire.version t.client_version) then if not (Vmm_wire.version_eq hdr.Vmm_wire.version t.client_version) then

View file

@ -261,16 +261,19 @@ module Stats = struct
| Add | Add
| Remove | Remove
| Stats | Stats
| Data
let op_to_int = function let op_to_int = function
| Add -> 0x0200l | Add -> 0x0200l
| Remove -> 0x0201l | Remove -> 0x0201l
| Stats -> 0x0202l | Stats -> 0x0202l
| Data -> 0x0203l
let int_to_op = function let int_to_op = function
| 0x0200l -> Some Add | 0x0200l -> Some Add
| 0x0201l -> Some Remove | 0x0201l -> Some Remove
| 0x0202l -> Some Stats | 0x0202l -> Some Stats
| 0x0203l -> Some Data
| _ -> None | _ -> None
let rusage_len = 144l let rusage_len = 144l
@ -381,8 +384,9 @@ module Stats = struct
let stat id version name = encode ~name version id (op_to_int Stats) let stat id version name = encode ~name version id (op_to_int Stats)
let stat_reply id version body = let data id version vm body =
reply ~body version id (op_to_int Stats) let name = Vmm_core.id_of_string vm in
encode ~name ~body version id (op_to_int Data)
let encode_int64 i = let encode_int64 i =
let cs = Cstruct.create 8 in let cs = Cstruct.create 8 in

View file

@ -16,22 +16,24 @@ external vmmapi_close : vmctx -> unit = "vmmanage_vmmapi_close"
external vmmapi_statnames : vmctx -> string list = "vmmanage_vmmapi_statnames" external vmmapi_statnames : vmctx -> string list = "vmmanage_vmmapi_statnames"
external vmmapi_stats : vmctx -> int64 list = "vmmanage_vmmapi_stats" external vmmapi_stats : vmctx -> int64 list = "vmmanage_vmmapi_stats"
let my_version = `WV1 let my_version = `WV2
let descr = ref [] let descr = ref []
type t = { type 'a t = {
pid_nic : ((vmctx, int) result * (int * string) list) IM.t ; pid_nic : ((vmctx, int) result * (int * string) list) IM.t ;
pid_rusage : rusage IM.t ;
pid_vmmapi : (string * int64) list IM.t ;
nic_ifdata : ifdata String.Map.t ;
vmid_pid : int String.Map.t ; vmid_pid : int String.Map.t ;
name_sockets : 'a String.Map.t ;
} }
let pp_strings pp taps = Fmt.(list ~sep:(unit ",@ ") string) pp taps let pp_strings pp taps = Fmt.(list ~sep:(unit ",@ ") string) pp taps
let empty () = let empty () =
{ pid_nic = IM.empty ; pid_rusage = IM.empty ; pid_vmmapi = IM.empty ; nic_ifdata = String.Map.empty ; vmid_pid = String.Map.empty } { pid_nic = IM.empty ; vmid_pid = String.Map.empty ; name_sockets = String.Map.empty }
let remove_socket t name =
let name_sockets = String.Map.remove name t.name_sockets in
{ t with name_sockets }
let rec wrap f arg = let rec wrap f arg =
try Some (f arg) with try Some (f arg) with
@ -91,33 +93,33 @@ let gather pid vmctx nics =
ifd ifd
| Some data -> | Some data ->
Logs.debug (fun m -> m "adding ifdata for %s" nname) ; Logs.debug (fun m -> m "adding ifdata for %s" nname) ;
String.Map.add data.name data ifd) data::ifd)
String.Map.empty nics [] nics
let tick t = let tick t =
Logs.debug (fun m -> m "tick with %d vms" (IM.cardinal t.pid_nic)) ; Logs.debug (fun m -> m "tick with %d vms" (IM.cardinal t.pid_nic)) ;
let pid_rusage, pid_vmmapi, nic_ifdata =
IM.fold (fun pid (vmctx, nics) (rus, vmms, ifds) ->
let ru, vmm, ifd = gather pid vmctx nics in
(match ru with
| None ->
Logs.warn (fun m -> m "failed to get rusage for %d" pid) ;
rus
| Some ru ->
Logs.debug (fun m -> m "adding resource usage for %d" pid) ;
IM.add pid ru rus),
(match vmm with
| None ->
Logs.warn (fun m -> m "failed to get vmmapi_stats for %d" pid) ;
vmms
| Some vmm ->
Logs.debug (fun m -> m "adding vmmapi_stats for %d" pid) ;
IM.add pid (List.combine !descr vmm) vmms),
String.Map.union (fun _k a _b -> Some a) ifd ifds)
t.pid_nic (IM.empty, IM.empty, String.Map.empty)
in
let pid_nic = try_open_vmmapi t.pid_nic in let pid_nic = try_open_vmmapi t.pid_nic in
{ t with pid_rusage ; pid_vmmapi ; nic_ifdata ; pid_nic } let t' = { t with pid_nic } in
let outs =
String.Map.fold (fun name socket out ->
match String.Map.find_opt name t.vmid_pid with
| None -> Logs.warn (fun m -> m "couldn't find pid of %s" name) ; out
| Some pid -> match IM.find_opt pid t.pid_nic with
| None -> Logs.warn (fun m -> m "couldn't find nics of %d" pid) ; out
| Some (vmctx, nics) ->
let ru, vmm, ifd = gather pid vmctx nics in
match ru with
| None -> Logs.err (fun m -> m "failed to get rusage for %d" pid) ; out
| Some ru' ->
let stats =
let vmm' = match vmm with None -> [] | Some xs -> List.combine !descr xs in
ru', vmm', ifd
in
let stats_encoded = Vmm_wire.Stats.(data 0L my_version name (encode_stats stats)) in
(socket, name, stats_encoded) :: out)
t'.name_sockets []
in
(t', outs)
let add_pid t vmid pid nics = let add_pid t vmid pid nics =
match wrap sysctl_ifcount () with match wrap sysctl_ifcount () with
@ -143,35 +145,6 @@ let add_pid t vmid pid nics =
in in
Ok { t with pid_nic ; vmid_pid } Ok { t with pid_nic ; vmid_pid }
let stats t vmid =
Logs.debug (fun m -> m "querying statistics for vmid %s" vmid) ;
match String.Map.find vmid t.vmid_pid with
| None -> Error (`Msg ("unknown vm " ^ vmid))
| Some pid ->
Logs.debug (fun m -> m "querying statistics for %d" pid) ;
try
let _, nics = IM.find pid t.pid_nic
and ru = IM.find pid t.pid_rusage
and vmm =
try IM.find pid t.pid_vmmapi with
| Not_found ->
Logs.err (fun m -> m "failed to find vmm stats for %d" pid);
[]
in
match
List.fold_left (fun acc nic ->
match String.Map.find nic t.nic_ifdata, acc with
| None, _ -> None
| _, None -> None
| Some ifd, Some acc -> Some (ifd :: acc))
(Some []) (snd (List.split nics))
with
| None -> Error (`Msg "failed to find interface statistics")
| Some ifd -> Ok (ru, vmm, ifd)
with
| _ -> Error (`Msg "failed to find resource usage")
let remove_vmid t vmid = let remove_vmid t vmid =
Logs.info (fun m -> m "removing vmid %s" vmid) ; Logs.info (fun m -> m "removing vmid %s" vmid) ;
match String.Map.find vmid t.vmid_pid with match String.Map.find vmid t.vmid_pid with
@ -192,14 +165,15 @@ let remove_vmid t vmid =
let remove_vmids t vmids = let remove_vmids t vmids =
List.fold_left remove_vmid t vmids List.fold_left remove_vmid t vmids
let handle t hdr cs = let handle t socket hdr cs =
let open Vmm_wire in let open Vmm_wire in
let open Vmm_wire.Stats in let open Vmm_wire.Stats in
let r = let r =
if not (version_eq my_version hdr.version) then if not (version_eq my_version hdr.version) then
Error (`Msg "cannot handle version") Error (`Msg "cannot handle version")
else else
decode_string cs >>= fun (name, off) -> decode_strings cs >>= fun (id, off) ->
let name = Vmm_core.string_of_id id in
match int_to_op hdr.tag with match int_to_op hdr.tag with
| Some Add -> | Some Add ->
decode_pid_taps (Cstruct.shift cs off) >>= fun (pid, taps) -> decode_pid_taps (Cstruct.shift cs off) >>= fun (pid, taps) ->
@ -209,8 +183,8 @@ let handle t hdr cs =
let t = remove_vmid t name in let t = remove_vmid t name in
Ok (t, `Remove name, success ~msg:"removed" my_version hdr.id (op_to_int Remove)) Ok (t, `Remove name, success ~msg:"removed" my_version hdr.id (op_to_int Remove))
| Some Stats -> | Some Stats ->
stats t name >>= fun s -> let name_sockets = String.Map.add name socket t.name_sockets in
Ok (t, `None, stat_reply hdr.id my_version (encode_stats s)) Ok ({ t with name_sockets }, `None, success ~msg:"subscribed" my_version hdr.id (op_to_int Stats))
| _ -> Error (`Msg "unknown command") | _ -> Error (`Msg "unknown command")
in in
match r with match r with

View file

@ -29,7 +29,7 @@ let handle s addr () =
| Error _ -> Logs.err (fun m -> m "exception while reading") ; Lwt.return acc | Error _ -> Logs.err (fun m -> m "exception while reading") ; Lwt.return acc
| Ok (hdr, data) -> | Ok (hdr, data) ->
Logs.debug (fun m -> m "received %a" Cstruct.hexdump_pp data) ; Logs.debug (fun m -> m "received %a" Cstruct.hexdump_pp data) ;
let t', action, out = Vmm_stats.handle !t hdr data in let t', action, out = Vmm_stats.handle !t s hdr data in
let acc = match action with let acc = match action with
| `Add pid -> pid :: acc | `Add pid -> pid :: acc
| `Remove pid -> List.filter (fun m -> m <> pid) acc | `Remove pid -> List.filter (fun m -> m <> pid) acc
@ -48,7 +48,15 @@ let handle s addr () =
t := t' t := t'
let rec timer interval () = let rec timer interval () =
t := Vmm_stats.tick !t ; let t', outs = Vmm_stats.tick !t in
t := t' ;
Lwt_list.iter_p (fun (s, name, stat) ->
Vmm_lwt.write_wire s stat >>= function
| Ok () -> Lwt.return_unit
| Error `Exception ->
t := Vmm_stats.remove_socket !t name ;
Vmm_lwt.safe_close s)
outs >>= fun () ->
Lwt_unix.sleep interval >>= fun () -> Lwt_unix.sleep interval >>= fun () ->
timer interval () timer interval ()