From 99ba1c5e4b9c352b4948fb5b3cf197ce4ae2945b Mon Sep 17 00:00:00 2001 From: Hannes Mehnert Date: Thu, 20 Sep 2018 22:53:42 +0200 Subject: [PATCH] stats are back now! no longer two pullers, but now with one pusher :) --- app/vmm_influxdb_stats.ml | 216 ++++++++++++++++---------------------- app/vmmc.ml | 93 ++++++++++++++-- src/vmm_commands.ml | 3 + src/vmm_wire.ml | 8 +- stats/vmm_stats.ml | 98 +++++++---------- stats/vmm_stats_lwt.ml | 12 ++- 6 files changed, 231 insertions(+), 199 deletions(-) diff --git a/app/vmm_influxdb_stats.ml b/app/vmm_influxdb_stats.ml index 62d389d..1ccb359 100644 --- a/app/vmm_influxdb_stats.ml +++ b/app/vmm_influxdb_stats.ml @@ -140,12 +140,10 @@ module P = struct vm ifd.name (String.concat ~sep:"," fields) end -let my_version = `WV1 +let my_version = `WV2 let command = ref 1L -let (req : string IM64.t ref) = ref IM64.empty - let str_of_e = function | `Eof -> "end of file" | `Exception -> "exception" @@ -160,65 +158,67 @@ let safe_close s = Logs.err (fun m -> m "exception %s while closing" (Printexc.to_string e)) ; Lwt.return_unit) -let rec read_sock_write_tcp closing db c ?fd addr addrtype = +let rec read_sock_write_tcp c ?fd addr addrtype = match fd with | None -> - if !closing then - Lwt.return_unit - else begin - Logs.debug (fun m -> m "new connection to TCP") ; - let fd = Lwt_unix.socket addrtype Lwt_unix.SOCK_STREAM 0 in - Lwt_unix.setsockopt fd Lwt_unix.SO_KEEPALIVE true ; - Lwt.catch - (fun () -> - Lwt_unix.connect fd addr >|= fun () -> - Logs.debug (fun m -> m "connected to TCP") ; - Some fd) - (fun e -> - let addr', port = match addr with - | Lwt_unix.ADDR_INET (ip, port) -> Unix.string_of_inet_addr ip, port - | Lwt_unix.ADDR_UNIX addr -> addr, 0 - in - Logs.warn (fun m -> m "error %s connecting to influxd %s:%d, retrying in 5s" - (Printexc.to_string e) addr' port) ; - safe_close fd >>= fun () -> - Lwt_unix.sleep 5.0 >|= fun () -> - None) >>= fun fd -> - read_sock_write_tcp closing db c ?fd addr addrtype - end + Logs.debug (fun m -> m "new connection to TCP") ; + let fd = Lwt_unix.socket addrtype Lwt_unix.SOCK_STREAM 0 in + Lwt_unix.setsockopt fd Lwt_unix.SO_KEEPALIVE true ; + Lwt.catch + (fun () -> + Lwt_unix.connect fd addr >|= fun () -> + Logs.debug (fun m -> m "connected to TCP") ; + Some fd) + (fun e -> + let addr', port = match addr with + | Lwt_unix.ADDR_INET (ip, port) -> Unix.string_of_inet_addr ip, port + | Lwt_unix.ADDR_UNIX addr -> addr, 0 + in + Logs.warn (fun m -> m "error %s connecting to influxd %s:%d, retrying in 5s" + (Printexc.to_string e) addr' port) ; + safe_close fd >>= fun () -> + Lwt_unix.sleep 5.0 >|= fun () -> + None) >>= fun fd -> + read_sock_write_tcp c ?fd addr addrtype | Some fd -> - if !closing then - safe_close fd - else begin - let open Vmm_wire in - Logs.debug (fun m -> m "reading from unix socket") ; - Vmm_lwt.read_wire c >>= function - | Error e -> - Logs.err (fun m -> m "error %s while reading vmm socket (return)" - (str_of_e e)) ; - closing := true ; - safe_close fd - | Ok (hdr, data) -> - let name = - try IM64.find hdr.id !req - with Not_found -> "not found" - in - req := IM64.remove hdr.id !req ; - (if not (version_eq hdr.version my_version) then begin - Logs.err (fun m -> m "unknown wire protocol version") ; - closing := true ; - safe_close fd >|= fun () -> - None - end else if Vmm_wire.is_fail hdr then begin - Logs.err (fun m -> m "failed to retrieve statistics for %s" name) ; - Lwt.return (Some fd) - end else if Vmm_wire.is_reply hdr then - begin match Vmm_wire.Stats.decode_stats data with + let open Vmm_wire in + Logs.debug (fun m -> m "reading from unix socket") ; + Vmm_lwt.read_wire c >>= function + | Error e -> + Logs.err (fun m -> m "error %s while reading vmm socket (return)" + (str_of_e e)) ; + safe_close fd >>= fun () -> + safe_close c >|= fun () -> + true + | Ok (hdr, data) -> + if not (version_eq hdr.version my_version) then begin + Logs.err (fun m -> m "unknown wire protocol version") ; + safe_close fd >>= fun () -> + safe_close c >|= fun () -> + false + end else if Vmm_wire.is_fail hdr then begin + Logs.err (fun m -> m "failed to retrieve statistics") ; + safe_close fd >>= fun () -> + safe_close c >|= fun () -> + false + end else if Vmm_wire.is_reply hdr then begin + Logs.info (fun m -> m "received reply, continuing") ; + read_sock_write_tcp c ~fd addr addrtype + end else + (match Vmm_wire.Stats.int_to_op hdr.Vmm_wire.tag with + | Some Vmm_wire.Stats.Data -> + begin + let r = + let open Rresult.R.Infix in + Vmm_wire.decode_strings data >>= fun (id, off) -> + Vmm_wire.Stats.decode_stats (Cstruct.shift data off) >>| fun stats -> + (Vmm_core.string_of_id id, stats) + in + match r with | Error (`Msg msg) -> - Logs.warn (fun m -> m "error %s while decoding stats %s, ignoring" - msg name) ; + Logs.warn (fun m -> m "error %s while decoding stats, ignoring" msg) ; Lwt.return (Some fd) - | Ok (ru, vmm, ifs) -> + | Ok (name, (ru, vmm, ifs)) -> let ru = P.encode_ru name ru in let vmm = P.encode_vmm name vmm in let taps = List.map (P.encode_if name) ifs in @@ -234,37 +234,23 @@ let rec read_sock_write_tcp closing db c ?fd addr addrtype = safe_close fd >|= fun () -> None end - else begin - Logs.err (fun m -> m "unhandled tag %lu for %s" hdr.tag name) ; - Lwt.return (Some fd) - end) >>= fun fd -> - read_sock_write_tcp closing db c ?fd addr addrtype - end + | _ -> + Logs.err (fun m -> m "unhandled tag %lu" hdr.tag) ; + Lwt.return (Some fd)) >>= fun fd -> + read_sock_write_tcp c ?fd addr addrtype -let rec query_sock closing prefix db c interval = +let query_sock vms c = (* query c for everyone in db *) - if !closing then - Lwt.return_unit - else - Lwt_list.fold_left_s (fun r (id, name) -> - match r with - | Error e -> Lwt.return (Error e) - | Ok () -> - let id = identifier id in - let id = match prefix with None -> [ id ] | Some p -> [ p ; id ] in - let request = Vmm_wire.Stats.stat !command my_version id in - req := IM64.add !command name !req ; - command := Int64.succ !command ; - Logs.debug (fun m -> m "%Lu requesting %a via socket" !command pp_id id) ; - Vmm_lwt.write_wire c request) - (Ok ()) db >>= function - | Error e -> - Logs.err (fun m -> m "error %s while writing to vmm socket" (str_of_e e)) ; - closing := true ; - Lwt.return_unit - | Ok () -> - Lwt_unix.sleep (float_of_int interval) >>= fun () -> - query_sock closing prefix db c interval + Lwt_list.fold_left_s (fun r name -> + match r with + | Error e -> Lwt.return (Error e) + | Ok () -> + let id = Astring.String.cuts ~sep:"." name in + let request = Vmm_wire.Stats.stat !command my_version id in + command := Int64.succ !command ; + Logs.debug (fun m -> m "%Lu requesting %a via socket" !command pp_id id) ; + Vmm_lwt.write_wire c request) + (Ok ()) vms let rec maybe_connect stat_socket = let c = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in @@ -281,10 +267,7 @@ let rec maybe_connect stat_socket = Lwt_unix.sleep (float_of_int 5) >>= fun () -> maybe_connect stat_socket) -let client stat_socket influxhost influxport db prefix interval = - (* start a socket connection to vmm_stats *) - maybe_connect stat_socket >>= fun c -> - +let client stat_socket influxhost influxport vms = (* figure out address of influx *) Lwt_unix.gethostbyname influxhost >>= fun host_entry -> let host_inet_addr = Array.get host_entry.Lwt_unix.h_addr_list 0 in @@ -293,7 +276,7 @@ let client stat_socket influxhost influxport db prefix interval = in (* loop *) - (* the query task queries the stat_socket at each interval + (* the query task queries the stat_socket at each - if this fails, closing is set to true (and unit is returned) the read_sock reads the stat_socket, and forwards to a TCP socket @@ -305,28 +288,23 @@ let client stat_socket influxhost influxport db prefix interval = - query_sock/read_sock_write_tcp write an read from it - on failure in read or write, the TCP connection is closed, and loop takes control: safe_close, maybe_connect, rinse, repeat *) - let rec loop c = - let closing = ref false in - Lwt.join [ - query_sock closing prefix db c interval ; - read_sock_write_tcp closing db c addr addrtype - ] >>= fun () -> - safe_close c >>= fun () -> - maybe_connect stat_socket >>= fun c -> - loop c - in - loop c -let run_client _ socket (influxhost, influxport) db prefix interval = - Sys.(set_signal sigpipe Signal_ignore) ; - let db = - let open Rresult.R.Infix in - match Bos.OS.File.read_lines (Fpath.v db) >>= parse_db with - | Ok [] -> invalid_arg "empty database" - | Ok db -> db - | Error (`Msg m) -> invalid_arg ("couldn't parse database " ^ m) + let rec loop () = + (* start a socket connection to vmm_stats *) + maybe_connect stat_socket >>= fun c -> + query_sock vms c >>= function + | Error e -> + Logs.err (fun m -> m "error %s while writing to stat socket" (str_of_e e)) ; + Lwt.return_unit + | Ok () -> + read_sock_write_tcp c addr addrtype >>= fun restart -> + if restart then loop () else Lwt.return_unit in - Lwt_main.run (client socket influxhost influxport db prefix interval) + loop () + +let run_client _ socket (influxhost, influxport) vms = + Sys.(set_signal sigpipe Signal_ignore) ; + Lwt_main.run (client socket influxhost influxport vms) let setup_log style_renderer level = Fmt_tty.setup_std_outputs ?style_renderer (); @@ -361,17 +339,9 @@ let influx = Arg.(required & pos 0 (some host_port) None & info [] ~docv:"influx" ~doc:"the influx hostname:port to connect to") -let db = - let doc = "VMID database" in - Arg.(required & pos 1 (some file) None & info [] ~doc) - -let prefix = - let doc = "prefix" in - Arg.(value & opt (some string) None & info [ "prefix" ] ~doc) - -let interval = - let doc = "Poll interval in seconds" in - Arg.(value & opt int 10 & info [ "interval" ] ~doc) +let vms = + let doc = "virtual machine names" in + Arg.(value & opt_all string [] & info [ "n" ; "name" ] ~doc) let cmd = let doc = "VMM InfluxDB connector" in @@ -379,7 +349,7 @@ let cmd = `S "DESCRIPTION" ; `P "$(tname) connects to a vmm stats socket, pulls statistics and pushes them via TCP to influxdb" ] in - Term.(pure run_client $ setup_log $ socket $ influx $ db $ prefix $ interval), + Term.(pure run_client $ setup_log $ socket $ influx $ vms), Term.info "vmm_influxdb_stats" ~version:"%%VERSION_NUM%%" ~doc ~man let () = diff --git a/app/vmmc.ml b/app/vmmc.ml index e300dde..b5c2950 100644 --- a/app/vmmc.ml +++ b/app/vmmc.ml @@ -43,8 +43,7 @@ let connect socket_path = let info_ _ opt_socket name = Lwt_main.run ( connect (socket `Vmmd opt_socket) >>= fun fd -> - let name' = Astring.String.cuts ~empty:false ~sep:"." name in - let info = Vmm_wire.Vm.info my_command my_version name' in + let info = Vmm_wire.Vm.info my_command my_version name in (Vmm_lwt.write_wire fd info >>= function | Ok () -> (process fd >|= function @@ -65,7 +64,7 @@ let info_ _ opt_socket name = let really_destroy opt_socket name = connect (socket `Vmmd opt_socket) >>= fun fd -> - let cmd = Vmm_wire.Vm.destroy my_command my_version (Astring.String.cuts ~empty:false ~sep:"." name) in + let cmd = Vmm_wire.Vm.destroy my_command my_version name in (Vmm_lwt.write_wire fd cmd >>= function | Ok () -> (process fd >|= function @@ -83,7 +82,7 @@ let create _ opt_socket force name image cpuid requested_memory boot_params bloc | Ok data -> data | Error (`Msg s) -> invalid_arg s in - let prefix, vname = match List.rev (Astring.String.cuts ~empty:false ~sep:"." name) with + let prefix, vname = match List.rev name with | [ name ] -> [], name | name::tl -> List.rev tl, name | [] -> assert false @@ -116,7 +115,7 @@ let create _ opt_socket force name image cpuid requested_memory boot_params bloc let console _ opt_socket name = Lwt_main.run ( connect (socket `Console opt_socket) >>= fun fd -> - let cmd = Vmm_wire.Console.attach my_command my_version (Astring.String.cuts ~empty:false ~sep:"." name) in + let cmd = Vmm_wire.Console.attach my_command my_version name in (Vmm_lwt.write_wire fd cmd >>= function | Error `Exception -> Logs.err (fun m -> m "couldn't write to socket") ; @@ -147,7 +146,7 @@ let console _ opt_socket name = let r = let open Rresult.R.Infix in match Vmm_wire.Console.int_to_op hdr.Vmm_wire.tag with - | Some Data -> + | Some Vmm_wire.Console.Data -> Vmm_wire.decode_id_ts data >>= fun ((name, ts), off) -> Vmm_wire.decode_string (Cstruct.shift data off) >>= fun (msg, _) -> Logs.app (fun m -> m "%a %a: %s" Ptime.pp ts Vmm_core.pp_id name msg) ; @@ -165,6 +164,62 @@ let console _ opt_socket name = Vmm_lwt.safe_close fd) ; `Ok () +let stats _ opt_socket vms = + Lwt_main.run ( + connect (socket `Stats opt_socket) >>= fun fd -> + let count = ref 0L in + Lwt_list.iter_s (fun name -> + let cmd = Vmm_wire.Stats.stat !count my_version name in + count := Int64.succ !count ; + Vmm_lwt.write_wire fd cmd >>= function + | Error `Exception -> Lwt.fail_with "write error" + | Ok () -> Lwt.return_unit) vms >>= fun () -> + (* now we busy read and process stat output *) + let rec loop () = + Vmm_lwt.read_wire fd >>= function + | Error (`Msg msg) -> Logs.err (fun m -> m "error while reading %s" msg) ; loop () + | Error _ -> Logs.err (fun m -> m "exception while reading") ; Lwt.return_unit + | Ok (hdr, data) -> + if Vmm_wire.is_fail hdr then + let msg = match Vmm_wire.decode_string data with + | Error _ -> None + | Ok (m, _) -> Some m + in + Logs.err (fun m -> m "operation failed: %a" Fmt.(option ~none:(unit "") string) msg) ; + Lwt.return_unit + else if Vmm_wire.is_reply hdr then + let msg = match Vmm_wire.decode_string data with + | Error _ -> None + | Ok (m, _) -> Some m + in + Logs.app (fun m -> m "operation succeeded: %a" Fmt.(option ~none:(unit "") string) msg) ; + loop () + else + let r = + let open Rresult.R.Infix in + match Vmm_wire.Stats.int_to_op hdr.Vmm_wire.tag with + | Some Vmm_wire.Stats.Data -> + Vmm_wire.decode_strings data >>= fun (id, off) -> + Vmm_wire.Stats.decode_stats (Cstruct.shift data off) >>| fun stats -> + (Astring.String.concat ~sep:"." id, stats) + | _ -> + Error (`Msg (Printf.sprintf "unknown operation %lx" hdr.Vmm_wire.tag)) + in + match r with + | Ok (name, (ru, vmm, ifs)) -> + Logs.app (fun m -> m "stats %s: %a %a %a" + name Vmm_core.pp_rusage ru + Fmt.(list ~sep:(unit "@.") (pair ~sep:(unit ": ") string int64)) vmm + Fmt.(list ~sep:(unit "@.") Vmm_core.pp_ifdata) ifs) ; + loop () + | Error (`Msg msg) -> + Logs.err (fun m -> m "%s" msg) ; + Lwt.return_unit + in + loop () >>= fun () -> + Vmm_lwt.safe_close fd) ; + `Ok () + let help _ _ man_format cmds = function | None -> `Help (`Pager, None) | Some t when List.mem t cmds -> `Help (man_format, Some t) @@ -194,9 +249,14 @@ let image = let doc = "File of virtual machine image." in Arg.(required & pos 1 (some file) None & info [] ~doc) +let vm_c = + let parse s = `Ok (Vmm_core.id_of_string s) + in + (parse, Vmm_core.pp_id) + let vm_name = let doc = "Name virtual machine." in - Arg.(required & pos 0 (some string) None & info [] ~doc) + Arg.(required & pos 0 (some vm_c) None & info [] ~doc) let destroy_cmd = let doc = "destroys a virtual machine" in @@ -246,14 +306,27 @@ let create_cmd = Term.info "create" ~doc ~man let console_cmd = - let doc = "console of a VMs" in + let doc = "console of a VM" in let man = [`S "DESCRIPTION"; - `P "Shows console output of a VMs."] + `P "Shows console output of a VM."] in Term.(ret (const console $ setup_log $ socket $ vm_name)), Term.info "console" ~doc ~man +let vm_names = + let doc = "Name virtual machine." in + Arg.(value & opt_all vm_c [] & info [ "n" ; "name" ] ~doc) + +let stats_cmd = + let doc = "statistics of VMs" in + let man = + [`S "DESCRIPTION"; + `P "Shows statistics of VMs."] + in + Term.(ret (const stats $ setup_log $ socket $ vm_names)), + Term.info "stats" ~doc ~man + let help_cmd = let topic = let doc = "The topic to get help on. `topics' lists the topics." in @@ -276,7 +349,7 @@ let default_cmd = Term.(ret (const help $ setup_log $ socket $ Term.man_format $ Term.choice_names $ Term.pure None)), Term.info "vmmc" ~version:"%%VERSION_NUM%%" ~doc ~man -let cmds = [ help_cmd ; info_cmd ; destroy_cmd ; create_cmd ; console_cmd ] +let cmds = [ help_cmd ; info_cmd ; destroy_cmd ; create_cmd ; console_cmd ; stats_cmd ] let () = match Term.eval_choice default_cmd cmds diff --git a/src/vmm_commands.ml b/src/vmm_commands.ml index e4bf64b..aad9c0e 100644 --- a/src/vmm_commands.ml +++ b/src/vmm_commands.ml @@ -7,6 +7,9 @@ open Vmm_core open Rresult open R.Infix + + + let handle_command t s prefix perms hdr buf = let res = if not (Vmm_wire.version_eq hdr.Vmm_wire.version t.client_version) then diff --git a/src/vmm_wire.ml b/src/vmm_wire.ml index 2c138bb..8e23452 100644 --- a/src/vmm_wire.ml +++ b/src/vmm_wire.ml @@ -261,16 +261,19 @@ module Stats = struct | Add | Remove | Stats + | Data let op_to_int = function | Add -> 0x0200l | Remove -> 0x0201l | Stats -> 0x0202l + | Data -> 0x0203l let int_to_op = function | 0x0200l -> Some Add | 0x0201l -> Some Remove | 0x0202l -> Some Stats + | 0x0203l -> Some Data | _ -> None let rusage_len = 144l @@ -381,8 +384,9 @@ module Stats = struct let stat id version name = encode ~name version id (op_to_int Stats) - let stat_reply id version body = - reply ~body version id (op_to_int Stats) + let data id version vm body = + let name = Vmm_core.id_of_string vm in + encode ~name ~body version id (op_to_int Data) let encode_int64 i = let cs = Cstruct.create 8 in diff --git a/stats/vmm_stats.ml b/stats/vmm_stats.ml index e8268d7..cbe7328 100644 --- a/stats/vmm_stats.ml +++ b/stats/vmm_stats.ml @@ -16,22 +16,24 @@ external vmmapi_close : vmctx -> unit = "vmmanage_vmmapi_close" external vmmapi_statnames : vmctx -> string list = "vmmanage_vmmapi_statnames" external vmmapi_stats : vmctx -> int64 list = "vmmanage_vmmapi_stats" -let my_version = `WV1 +let my_version = `WV2 let descr = ref [] -type t = { +type 'a t = { pid_nic : ((vmctx, int) result * (int * string) list) IM.t ; - pid_rusage : rusage IM.t ; - pid_vmmapi : (string * int64) list IM.t ; - nic_ifdata : ifdata String.Map.t ; vmid_pid : int String.Map.t ; + name_sockets : 'a String.Map.t ; } let pp_strings pp taps = Fmt.(list ~sep:(unit ",@ ") string) pp taps let empty () = - { pid_nic = IM.empty ; pid_rusage = IM.empty ; pid_vmmapi = IM.empty ; nic_ifdata = String.Map.empty ; vmid_pid = String.Map.empty } + { pid_nic = IM.empty ; vmid_pid = String.Map.empty ; name_sockets = String.Map.empty } + +let remove_socket t name = + let name_sockets = String.Map.remove name t.name_sockets in + { t with name_sockets } let rec wrap f arg = try Some (f arg) with @@ -91,33 +93,33 @@ let gather pid vmctx nics = ifd | Some data -> Logs.debug (fun m -> m "adding ifdata for %s" nname) ; - String.Map.add data.name data ifd) - String.Map.empty nics + data::ifd) + [] nics let tick t = Logs.debug (fun m -> m "tick with %d vms" (IM.cardinal t.pid_nic)) ; - let pid_rusage, pid_vmmapi, nic_ifdata = - IM.fold (fun pid (vmctx, nics) (rus, vmms, ifds) -> - let ru, vmm, ifd = gather pid vmctx nics in - (match ru with - | None -> - Logs.warn (fun m -> m "failed to get rusage for %d" pid) ; - rus - | Some ru -> - Logs.debug (fun m -> m "adding resource usage for %d" pid) ; - IM.add pid ru rus), - (match vmm with - | None -> - Logs.warn (fun m -> m "failed to get vmmapi_stats for %d" pid) ; - vmms - | Some vmm -> - Logs.debug (fun m -> m "adding vmmapi_stats for %d" pid) ; - IM.add pid (List.combine !descr vmm) vmms), - String.Map.union (fun _k a _b -> Some a) ifd ifds) - t.pid_nic (IM.empty, IM.empty, String.Map.empty) - in let pid_nic = try_open_vmmapi t.pid_nic in - { t with pid_rusage ; pid_vmmapi ; nic_ifdata ; pid_nic } + let t' = { t with pid_nic } in + let outs = + String.Map.fold (fun name socket out -> + match String.Map.find_opt name t.vmid_pid with + | None -> Logs.warn (fun m -> m "couldn't find pid of %s" name) ; out + | Some pid -> match IM.find_opt pid t.pid_nic with + | None -> Logs.warn (fun m -> m "couldn't find nics of %d" pid) ; out + | Some (vmctx, nics) -> + let ru, vmm, ifd = gather pid vmctx nics in + match ru with + | None -> Logs.err (fun m -> m "failed to get rusage for %d" pid) ; out + | Some ru' -> + let stats = + let vmm' = match vmm with None -> [] | Some xs -> List.combine !descr xs in + ru', vmm', ifd + in + let stats_encoded = Vmm_wire.Stats.(data 0L my_version name (encode_stats stats)) in + (socket, name, stats_encoded) :: out) + t'.name_sockets [] + in + (t', outs) let add_pid t vmid pid nics = match wrap sysctl_ifcount () with @@ -143,35 +145,6 @@ let add_pid t vmid pid nics = in Ok { t with pid_nic ; vmid_pid } - -let stats t vmid = - Logs.debug (fun m -> m "querying statistics for vmid %s" vmid) ; - match String.Map.find vmid t.vmid_pid with - | None -> Error (`Msg ("unknown vm " ^ vmid)) - | Some pid -> - Logs.debug (fun m -> m "querying statistics for %d" pid) ; - try - let _, nics = IM.find pid t.pid_nic - and ru = IM.find pid t.pid_rusage - and vmm = - try IM.find pid t.pid_vmmapi with - | Not_found -> - Logs.err (fun m -> m "failed to find vmm stats for %d" pid); - [] - in - match - List.fold_left (fun acc nic -> - match String.Map.find nic t.nic_ifdata, acc with - | None, _ -> None - | _, None -> None - | Some ifd, Some acc -> Some (ifd :: acc)) - (Some []) (snd (List.split nics)) - with - | None -> Error (`Msg "failed to find interface statistics") - | Some ifd -> Ok (ru, vmm, ifd) - with - | _ -> Error (`Msg "failed to find resource usage") - let remove_vmid t vmid = Logs.info (fun m -> m "removing vmid %s" vmid) ; match String.Map.find vmid t.vmid_pid with @@ -192,14 +165,15 @@ let remove_vmid t vmid = let remove_vmids t vmids = List.fold_left remove_vmid t vmids -let handle t hdr cs = +let handle t socket hdr cs = let open Vmm_wire in let open Vmm_wire.Stats in let r = if not (version_eq my_version hdr.version) then Error (`Msg "cannot handle version") else - decode_string cs >>= fun (name, off) -> + decode_strings cs >>= fun (id, off) -> + let name = Vmm_core.string_of_id id in match int_to_op hdr.tag with | Some Add -> decode_pid_taps (Cstruct.shift cs off) >>= fun (pid, taps) -> @@ -209,8 +183,8 @@ let handle t hdr cs = let t = remove_vmid t name in Ok (t, `Remove name, success ~msg:"removed" my_version hdr.id (op_to_int Remove)) | Some Stats -> - stats t name >>= fun s -> - Ok (t, `None, stat_reply hdr.id my_version (encode_stats s)) + let name_sockets = String.Map.add name socket t.name_sockets in + Ok ({ t with name_sockets }, `None, success ~msg:"subscribed" my_version hdr.id (op_to_int Stats)) | _ -> Error (`Msg "unknown command") in match r with diff --git a/stats/vmm_stats_lwt.ml b/stats/vmm_stats_lwt.ml index ce12b0f..642d4d0 100644 --- a/stats/vmm_stats_lwt.ml +++ b/stats/vmm_stats_lwt.ml @@ -29,7 +29,7 @@ let handle s addr () = | Error _ -> Logs.err (fun m -> m "exception while reading") ; Lwt.return acc | Ok (hdr, data) -> Logs.debug (fun m -> m "received %a" Cstruct.hexdump_pp data) ; - let t', action, out = Vmm_stats.handle !t hdr data in + let t', action, out = Vmm_stats.handle !t s hdr data in let acc = match action with | `Add pid -> pid :: acc | `Remove pid -> List.filter (fun m -> m <> pid) acc @@ -48,7 +48,15 @@ let handle s addr () = t := t' let rec timer interval () = - t := Vmm_stats.tick !t ; + let t', outs = Vmm_stats.tick !t in + t := t' ; + Lwt_list.iter_p (fun (s, name, stat) -> + Vmm_lwt.write_wire s stat >>= function + | Ok () -> Lwt.return_unit + | Error `Exception -> + t := Vmm_stats.remove_socket !t name ; + Vmm_lwt.safe_close s) + outs >>= fun () -> Lwt_unix.sleep interval >>= fun () -> timer interval ()