From bdedadf6897fd4c8584fcf1d23b4ca707194dbf3 Mon Sep 17 00:00:00 2001 From: Hannes Mehnert Date: Thu, 17 Aug 2017 19:53:36 +0200 Subject: [PATCH] vmmd: - fix fd leak (always close socket) - send first message (login) after renegotiation vmm_stats: - remove unneeded functionality (keeping old statistics around) - translate internal tap names to bridge names - gather statistics from vmmapi as well vmm_prometheus_stats: - new exporter of statistics to prometheus *: - fix typo in README - style --- README.md | 4 +- _tags | 3 +- app/vmm_client.ml | 20 +-- app/vmm_console.ml | 2 +- app/vmm_prometheus_stats.ml | 344 ++++++++++++++++++++++++++++++++++++ app/vmmd.ml | 22 ++- myocamlbuild.ml | 10 +- pkg/pkg.ml | 1 + src/vmm_commands.ml | 8 + src/vmm_core.ml | 5 + src/vmm_engine.ml | 79 +++++---- src/vmm_lwt.ml | 1 - src/vmm_wire.ml | 50 ++++-- stats/vmm_stats.ml | 39 ++-- stats/vmm_stats_lwt.ml | 2 +- stats/vmm_stats_stubs.c | 49 ++++- 16 files changed, 547 insertions(+), 92 deletions(-) create mode 100644 app/vmm_prometheus_stats.ml diff --git a/README.md b/README.md index f83401f..e9680cb 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ is used on top to (more gracefully) handle multiple connection, and to have a watching thread (in `waitpid(2)`) for every virtual machine started by vmmd. It requires some pinned packages: -- `asn1-combinators https://github.com/hannesm/ocaml-asn-combinators.git#enum` +- `asn1-combinators https://github.com/hannesm/ocaml-asn1-combinators.git#enum` - `x509 https://github.com/hannesm/ocaml-x509.git#crl` - `tls https://github.com/hannesm/ocaml-tls.git#changes` - on FreeBSD, `solo5-kernel-ukvm https://github.com/solo5/solo5.git` @@ -146,7 +146,7 @@ And deploying (watch the output of the processes started on the server above!): ``` DEV> vmm_client cacert.pem hello.bundle hello.key SRV:1025 -DEV> vmm_client cacert.pem admin.bundle hello.key SRV:1025 --db dev.db +DEV> vmm_client cacert.pem admin.bundle admin.key SRV:1025 --db dev.db ``` Commands are at the moment `info`, `statistics`, `destroy`, `attach`, `detach`, diff --git a/_tags b/_tags index 97e0a41..62d1bab 100644 --- a/_tags +++ b/_tags @@ -1,4 +1,4 @@ -true : bin_annot, safe_string, principal +true : bin_annot, safe_string, principal, color(always) true : warn(+A-44) true : package(rresult logs ipaddr x509 tls bos hex ptime ptime.clock.os astring duration) "src" : include @@ -11,6 +11,7 @@ true : package(rresult logs ipaddr x509 tls bos hex ptime ptime.clock.os astring : package(lwt.unix cmdliner logs.fmt fmt.cli logs.cli fmt.tty lwt ipaddr.unix) : package(nocrypto tls.lwt nocrypto.lwt) : package(tls.lwt) +: package(nocrypto tls.lwt nocrypto.lwt) : package(cmdliner logs.fmt fmt.cli logs.cli fmt.tty asn1-combinators nocrypto.unix lwt) diff --git a/app/vmm_client.ml b/app/vmm_client.ml index 1f9bac2..a6bc641 100644 --- a/app/vmm_client.ml +++ b/app/vmm_client.ml @@ -16,9 +16,11 @@ let process db hdr data = let r = match hdr.tag with | x when x = Client.stat_msg_tag -> - Client.decode_stat data >>= fun (ru, ifd) -> - Logs.app (fun m -> m "statistics: %a %a" - pp_rusage ru Fmt.(list ~sep:(unit ", ") pp_ifdata) ifd) ; + Client.decode_stat data >>= fun (ru, vmm, ifd) -> + Logs.app (fun m -> m "statistics: %a %a %a" + pp_rusage ru + Fmt.(list ~sep:(unit ", ") (pair ~sep:(unit ": ") string uint64)) vmm + Fmt.(list ~sep:(unit ", ") pp_ifdata) ifd) ; Ok () | x when x = Client.log_msg_tag -> Client.decode_log data >>= fun log -> @@ -56,12 +58,12 @@ let rec read_tls_write_cons db t = Logs.err (fun m -> m "error while reading %s" msg) ; read_tls_write_cons db t | Ok (hdr, data) -> - Logs.debug (fun m -> m "read from tls id %d %a tag %d data %a" - hdr.Vmm_wire.id Vmm_wire.pp_version hdr.Vmm_wire.version - hdr.Vmm_wire.tag Cstruct.hexdump_pp (Cstruct.of_string data)) ; process db hdr data ; read_tls_write_cons db t) - (fun _ -> Lwt.return_unit) + (fun e -> + Logs.err (fun m -> m "exception reading TLS stream %s" + (Printexc.to_string e)) ; + Tls_lwt.Unix.close t) let rec read_cons_write_tls db t = Lwt.catch (fun () -> @@ -99,9 +101,7 @@ let client cas host port cert priv_key db = Tls_lwt.Unix.client_of_fd client (* ~host *) fd >>= fun t -> if Vmm_asn.contains_vm leaf || Vmm_asn.contains_crl leaf then - Vmm_tls.read_tls t >|= function - | Error (`Msg msg) -> Logs.err (fun m -> m "error while reading %s" msg) - | Ok (hdr, data) -> process db hdr data + read_tls_write_cons db t else (Logs.debug (fun m -> m "read/write games!") ; Lwt.join [ read_tls_write_cons db t ; read_cons_write_tls db t ])) diff --git a/app/vmm_console.ml b/app/vmm_console.ml index 03e282b..a0c3ff2 100644 --- a/app/vmm_console.ml +++ b/app/vmm_console.ml @@ -170,7 +170,7 @@ let setup_log = let socket = let doc = "Socket to listen onto" in - Arg.(value & pos 0 string "" & info [] ~doc) + Arg.(required & pos 0 (some string) None & info [] ~doc) let cmd = Term.(ret (const jump $ setup_log $ socket)), diff --git a/app/vmm_prometheus_stats.ml b/app/vmm_prometheus_stats.ml new file mode 100644 index 0000000..a95f01d --- /dev/null +++ b/app/vmm_prometheus_stats.ml @@ -0,0 +1,344 @@ +(* (c) 2017 Hannes Mehnert, all rights reserved *) + +open Lwt.Infix + +open Astring + +open Vmm_core + +let my_version = `WV0 + +let command = ref 1 + +let t : (Lwt_unix.file_descr * Lwt_unix.sockaddr * string) IM.t ref = ref IM.empty + +module S = struct + type t = Lwt_unix.sockaddr + let compare : Lwt_unix.sockaddr -> Lwt_unix.sockaddr -> int = compare +end + +module SM = Map.Make(S) + +let count : int SM.t ref = ref SM.empty + +let dec s = + match SM.find s !count with + | exception Not_found -> `Not_found + | 1 -> count := SM.remove s !count ; `Close + | x -> count := SM.add s (pred x) !count ; `Continue + +let known_vms : string list ref = ref [] + +module P = struct + let p vm ?(typ = `Counter) name help value = + let t_s = function `Counter -> "counter" | `Gauge -> "gauge" in + let name = vm ^ "_" ^ name in + let p a v = String.concat ~sep:" " [ "#" ; a ; name ; v ] in + String.concat ~sep:"\n" + [ p "HELP" help ; p "TYPE" (t_s typ) ; name ^ " " ^ value ] + + let tv (sec, usec) = Printf.sprintf "%Lu.%06d" sec usec + let i64 = Int64.to_string + + let encode_ru vm ru = + let p = p vm in + String.concat ~sep:"\n" + [ p "utime" "user time used" (tv ru.utime) ; + p "stime" "system time used" (tv ru.stime) ; + p "maxrss" "maximum resident set" (i64 ru.maxrss) ; + p ~typ:`Gauge "ixrss" "shared memory" (i64 ru.ixrss) ; + p ~typ:`Gauge "idrss" "unshared data" (i64 ru.idrss) ; + p ~typ:`Gauge "isrss" "unshared stack" (i64 ru.isrss) ; + p "minflt" "page reclaims" (i64 ru.minflt) ; + p "maxflt" "page faults" (i64 ru.majflt) ; + p "nswap" "swaps" (i64 ru.nswap) ; + p "inblock" "block input ops" (i64 ru.inblock) ; + p "outblock" "block output ops" (i64 ru.outblock) ; + p "msgsnd" "messages send" (i64 ru.msgsnd) ; + p "msgrcv" "messages received" (i64 ru.msgrcv) ; + p "nsignals" "signals received" (i64 ru.nsignals) ; + p "nvcsw" "voluntary context switches" (i64 ru.nvcsw) ; + p "nivcsw" "involuntary context switches" (i64 ru.nivcsw) + ] + + let encode_vmm vm xs = + let p = p vm in + let massage s = + let cutted = match String.cut ~sep:"umber of " s with + | Some (_, r) -> r + | None -> s + in + let cutted = match String.cut ~sep:"[" cutted with + | None -> cutted + | Some (l, r) -> match String.cut ~sep:"]" r with + | None -> cutted + | Some (l', r) when r = "" -> l ^ "_" ^ l' + | Some (l', r') -> l ^ "_" ^ l' ^ "_" ^ r' + in + let cutted = + List.fold_left (fun str sep -> + match String.cut ~sep str with + | None -> str + | Some (l, r) -> l ^ r) + cutted [ "%" ; "/" ; "-" ] + in + String.concat ~sep:"_" (String.cuts ~sep:" " cutted) + in + String.concat ~sep:"\n" + (List.map (fun (k, v) -> p (massage k) k (i64 v)) xs) + + let i32 = Int32.to_string + + let encode_if vm ifd = + let p = p (vm ^ "_" ^ ifd.name) in + String.concat ~sep:"\n" + (* TODO: flags *) + [ p ~typ:`Gauge "send_length" "length of send queue" (i32 ifd.send_length) ; + p "max_send_length" "maximum length of send queue" (i32 ifd.max_send_length) ; + p "send_drops" "drops in send queue" (i32 ifd.send_drops) ; + p ~typ:`Gauge "mtu" "maximum transmission unit" (i32 ifd.mtu) ; + p ~typ:`Gauge "baudrate" "linespeed" (i64 ifd.baudrate) ; + p "vm_to_host_packets" "packets from vm" (i64 ifd.input_packets) ; + p "vm_to_host_errors" "packet errors from vm" (i64 ifd.input_errors) ; + p "vm_to_host_bytes" "bytes from vm" (i64 ifd.input_bytes) ; + p "vm_to_host_mcast" "packets from vm via multicast" (i64 ifd.input_mcast) ; + p "vm_to_host_dropped" "packets dropped from vm" (i64 ifd.input_dropped) ; + p "collisions" "collisions on csma interface" (i64 ifd.collisions) ; + p "host_to_vm_packets" "packets to vm" (i64 ifd.output_packets) ; + p "host_to_vm_errors" "packet errors to vm" (i64 ifd.output_errors) ; + p "host_to_vm_bytes" "bytes to vm" (i64 ifd.output_bytes) ; + p "host_to_vm_mcast" "packets to vm via multicast" (i64 ifd.output_mcast) ; + p "host_to_vm_dropped" "packets dropped to vm" (i64 ifd.output_dropped) + ] +end + +(* just a reminder whether we already sent the initial "info" or not *) +let f_done = ref false + +let process db tls hdr data = + let open Vmm_wire in + let open Rresult.R.Infix in + if not (version_eq hdr.version my_version) then begin + Logs.err (fun m -> m "unknown wire protocol version") ; Lwt.return_unit + end else + match hdr.tag with + | x when x = Client.log_msg_tag && not !f_done -> + f_done := true ; + (* issue initial "info" to get all the vm names *) + let out = Vmm_wire.Client.cmd `Info !command my_version in + command := succ !command ; + Logs.debug (fun m -> m "writing %a over TLS" Cstruct.hexdump_pp (Cstruct.of_string out)) ; + Vmm_tls.write_tls tls out + | _ -> + let r = + match hdr.tag with + | x when x = Client.log_msg_tag -> + Client.decode_log data >>= fun (hdr, event) -> + let nam = translate_serial db hdr.Vmm_core.Log.name in + begin match event with + | `VM_start _ -> known_vms := nam :: !known_vms + | `VM_stop _ -> known_vms := List.filter (fun m -> m <> nam) !known_vms + | _ -> () + end ; + Ok `None + | x when x = Client.info_msg_tag -> + Client.decode_info data >>= fun vms -> + let vms = List.map (fun (name, _, _, _) -> translate_serial db name) vms in + known_vms := vms ; + Ok `None + | x when x = Client.stat_msg_tag -> + Client.decode_stat data >>= fun (ru, vmm, ifd) -> + begin match IM.find hdr.id !t with + | exception Not_found -> Logs.err (fun m -> m "unexpected reply") ; Ok `None + | (fd, s, vm) -> + t := IM.remove hdr.id !t ; + let out = String.concat ~sep:"\n" (P.encode_ru vm ru :: P.encode_vmm vm vmm :: List.map (P.encode_if vm) ifd @ [""]) in + Ok (`Stat (fd, s, out)) + end + | x when x = fail_tag -> + let res = + match IM.find hdr.id !t with + | exception Not_found -> `None + | (fd, s, _) -> `Sockaddr (fd, s) + in + t := IM.remove hdr.id !t ; + decode_str data >>= fun (msg, _) -> + Logs.err (fun m -> m "failed %s" msg) ; + Ok res + | x -> Rresult.R.error_msgf "ignoring header tag %02X" x + in + let d (fd, s) = match dec s with + | `Continue -> Lwt.return_unit + | `Close -> Lwt_unix.close fd + | `Not_found -> Logs.err (fun m -> m "sockaddr not found") ; Lwt.return_unit + in + let open Lwt.Infix in + match r with + | Ok `None -> Lwt.return_unit + | Ok (`Sockaddr s) -> d s + | Ok (`Stat (fd, s, out)) -> Vmm_lwt.write_raw fd out >>= fun () -> d (fd, s) + | Error (`Msg msg) -> Logs.err (fun m -> m "error while processing: %s" msg) ; Lwt.return_unit + +let rec tls_listener db tls = + Lwt.catch (fun () -> + Vmm_tls.read_tls tls >>= function + | Error (`Msg msg) -> + Logs.err (fun m -> m "error while reading %s" msg) ; + Lwt.return (Ok ()) + | Ok (hdr, data) -> + process db tls hdr data >>= fun () -> + Lwt.return (Ok ())) + (fun e -> + Logs.err (fun m -> m "received exception in read_tls: %s" (Printexc.to_string e)) ; + Lwt.return (Error ())) >>= function + | Ok () -> tls_listener db tls + | Error () -> Lwt.return_unit + +let hdr = + String.concat ~sep:"\r\n" + [ "HTTP/1.1 200 OK" ; + "Content-Type: text/plain; version=0.0.4" ; + "\r\n" ] + +(* wait for TCP connection, once received request stats from vmmd, and loop *) +let rec tcp_listener db tcp tls = + Lwt_unix.accept tcp >>= fun (cs, sockaddr) -> + Vmm_lwt.write_raw cs hdr >>= fun () -> + let l = List.length !known_vms in + let ip, port = match sockaddr with Lwt_unix.ADDR_INET (ip, port) -> ip, port | _ -> invalid_arg "unexpected" in + Logs.info (fun m -> m "connection from %s:%d with %d known" (Unix.string_of_inet_addr ip) port l) ; + (if l = 0 then + Lwt_unix.close cs + else begin + count := SM.add sockaddr (List.length !known_vms) !count ; + Lwt_list.iter_s + (fun vm -> + let vm_id = translate_name db vm in + let out = Vmm_wire.Client.cmd `Statistics ~arg:vm_id !command my_version in + t := IM.add !command (cs, sockaddr, vm) !t ; + command := succ !command ; + Vmm_tls.write_tls tls out) + !known_vms + end) >>= fun () -> + tcp_listener db tcp tls + +let client cas host port cert priv_key db listen_ip listen_port = + Nocrypto_entropy_lwt.initialize () >>= fun () -> + let auth = if Sys.is_directory cas then `Ca_dir cas else `Ca_file cas in + X509_lwt.authenticator auth >>= fun authenticator -> + Lwt.catch (fun () -> + (* start TCP listening socket *) + let tcp = Lwt_unix.(socket PF_INET SOCK_STREAM 0) in + Lwt_unix.(setsockopt tcp SO_REUSEADDR true) ; + let addr = Lwt_unix.ADDR_INET (Ipaddr_unix.V4.to_inet_addr listen_ip, listen_port) in + Lwt_unix.bind tcp addr >>= fun () -> + Lwt_unix.listen tcp 1 ; + + (* setup remote connection to VMMD *) + Lwt_unix.gethostbyname host >>= fun host_entry -> + let host_inet_addr = Array.get host_entry.Lwt_unix.h_addr_list 0 in + let fd = Lwt_unix.socket host_entry.Lwt_unix.h_addrtype Lwt_unix.SOCK_STREAM 0 in + + Lwt_unix.connect fd (Lwt_unix.ADDR_INET (host_inet_addr, port)) >>= fun _ -> + X509_lwt.private_of_pems ~cert ~priv_key >>= fun cert -> + let certificates = `Single cert in + let client = Tls.Config.client ~reneg:true ~certificates ~authenticator () in + Tls_lwt.Unix.client_of_fd client (* ~host *) fd >>= fun tls -> + + (* loop on both tcp and tls connections *) + Lwt.join [ tcp_listener db tcp tls ; tls_listener db tls ]) + (fun exn -> + Logs.err (fun m -> m "failed to establish TLS connection: %s" + (Printexc.to_string exn)) ; + Lwt.return_unit) + +let run_client _ cas cert key (host, port) db listen_ip listen_port = + Printexc.register_printer (function + | Tls_lwt.Tls_alert x -> Some ("TLS alert: " ^ Tls.Packet.alert_type_to_string x) + | Tls_lwt.Tls_failure f -> Some ("TLS failure: " ^ Tls.Engine.string_of_failure f) + | _ -> None) ; + Sys.(set_signal sigpipe Signal_ignore) ; + let db = + let open Rresult.R.Infix in + match db with + | None -> [] + | Some db -> + match Bos.OS.File.read_lines (Fpath.v db) >>= parse_db with + | Ok db -> db + | Error (`Msg m) -> Logs.warn (fun f -> f "couldn't parse database %s" m) ; [] + in + Lwt_main.run (client cas host port cert key db listen_ip listen_port) + +let setup_log style_renderer level = + Fmt_tty.setup_std_outputs ?style_renderer (); + Logs.set_level level; + Logs.set_reporter (Logs_fmt.reporter ~dst:Format.std_formatter ()) + +open Cmdliner + +let setup_log = + Term.(const setup_log + $ Fmt_cli.style_renderer () + $ Logs_cli.level ()) + +let host_port : (string * int) Arg.converter = + let parse s = + match String.cut ~sep:":" s with + | None -> `Error "broken: no port specified" + | Some (hostname, port) -> + try + `Ok (hostname, int_of_string port) + with + Not_found -> `Error "failed to parse port" + in + parse, fun ppf (h, p) -> Format.fprintf ppf "%s:%d" h p + +let cas = + let doc = "The full path to PEM encoded certificate authorities. Can either be a FILE or a DIRECTORY." in + Arg.(required & pos 0 (some string) None & info [] ~docv:"FILE" ~doc) + +let client_cert = + let doc = "Use a client certificate chain" in + Arg.(required & pos 1 (some file) None & info [] ~doc) + +let client_key = + let doc = "Use a client key" in + Arg.(required & pos 2 (some file) None & info [] ~doc) + +let destination = + Arg.(required & pos 3 (some host_port) None & info [] ~docv:"destination" + ~doc:"the destination hostname:port to connect to") + +let ip : Ipaddr.V4.t Arg.converter = + let parse s = + try + `Ok (Ipaddr.V4.of_string_exn s) + with + Not_found -> `Error "broken" + in + parse, Ipaddr.V4.pp_hum + +let address = + let doc = "Address to listen on" in + Arg.(value & opt ip (Ipaddr.V4.of_string_exn "127.0.0.1") & info [ "address" ] ~doc) + +let port = + let doc = "TCP port to listen on" in + Arg.(value & opt int 9080 & info [ "port" ] ~doc) + +let db = + let doc = "Certificate database" in + Arg.(value & opt (some file) None & info [ "db" ] ~doc) + +let cmd = + let doc = "VMM TLS client" in + let man = [ + `S "DESCRIPTION" ; + `P "$(tname) connects to a server and initiates a TLS handshake" ] + in + Term.(pure run_client $ setup_log $ cas $ client_cert $ client_key $ destination $ db $ address $ port), + Term.info "vmm_client" ~version:"%%VERSION_NUM%%" ~doc ~man + +let () = + match Term.eval cmd + with `Error _ -> exit 1 | _ -> exit 0 diff --git a/app/vmmd.ml b/app/vmmd.ml index 6a30ac6..3ffdc29 100644 --- a/app/vmmd.ml +++ b/app/vmmd.ml @@ -8,6 +8,7 @@ let write_tls state t data = let state', out = Vmm_engine.handle_disconnect !state t in state := state' ; Lwt_list.iter_s (fun (s, data) -> Vmm_lwt.write_raw s data) out >>= fun () -> + Tls_lwt.Unix.close (fst t) >>= fun () -> raise e) let to_ipaddr (_, sa) = match sa with @@ -31,17 +32,20 @@ let handle ca state t = let time = Unix.gettimeofday () in X509.Authenticator.chain_of_trust ~time ~crls:!state.Vmm_engine.crls [ca] in - Lwt.catch (fun () -> - Tls_lwt.Unix.reneg ~authenticator (fst t)) + Lwt.catch + (fun () -> Tls_lwt.Unix.reneg ~authenticator (fst t)) (fun e -> (match e with | Tls_lwt.Tls_alert a -> Logs.err (fun m -> m "TLS ALERT %s" (Tls.Packet.alert_type_to_string a)) | Tls_lwt.Tls_failure f -> Logs.err (fun m -> m "TLS FAILURE %s" (Tls.Engine.string_of_failure f)) | exn -> Logs.err (fun m -> m "%s" (Printexc.to_string exn))) ; + Tls_lwt.Unix.close (fst t) >>= fun () -> Lwt.fail e) >>= fun () -> (match Tls_lwt.Unix.epoch (fst t) with | `Ok epoch -> Lwt.return epoch.Tls.Core.peer_certificate_chain - | `Error -> Lwt.fail_with "error while getting epoch") >>= fun chain -> + | `Error -> + Tls_lwt.Unix.close (fst t) >>= fun () -> + Lwt.fail_with "error while getting epoch") >>= fun chain -> match Vmm_engine.handle_initial !state t (to_ipaddr t) chain ca with | Ok (state', outs, next) -> state := state' ; @@ -75,11 +79,13 @@ let handle ca state t = process state out >>= fun () -> loop () in - Lwt.catch loop (fun e -> - let state', cons = Vmm_engine.handle_disconnect !state t in - state := state' ; - Lwt_list.iter_s (fun (s, data) -> Vmm_lwt.write_raw s data) cons >>= fun () -> - raise e) + Lwt.catch loop + (fun e -> + let state', cons = Vmm_engine.handle_disconnect !state t in + state := state' ; + Lwt_list.iter_s (fun (s, data) -> Vmm_lwt.write_raw s data) cons >>= fun () -> + Tls_lwt.Unix.close (fst t) >>= fun () -> + raise e) | `Close socks -> Logs.debug (fun m -> m "closing session with %d active ones" (List.length socks)) ; Lwt_list.iter_s (fun (t, _) -> Tls_lwt.Unix.close t) socks >>= fun () -> diff --git a/myocamlbuild.ml b/myocamlbuild.ml index 1ff3526..7f9eb76 100644 --- a/myocamlbuild.ml +++ b/myocamlbuild.ml @@ -3,6 +3,13 @@ open Ocamlbuild_plugin let to_opt = List.fold_left (fun acc x -> [A "-ccopt"; A x] @ acc) [] let ccopt = to_opt [ "-O3" ; "-Wall" ] +let os = Ocamlbuild_pack.My_unix.run_and_read "uname -s" + +let vmm_lib = match os with +| "FreeBSD\n" -> [A "-cclib"; A "-lvmmapi"] +| _ -> [] + + let () = dispatch begin function | After_rules -> @@ -11,7 +18,8 @@ let () = (S ([A "-dllib"; A "-lvmm_stats_stubs"])); flag ["link"; "library"; "ocaml"; "native"; "use_vmm_stats"] (S ([A "-cclib"; A "-lvmm_stats_stubs"])); - flag ["link"; "ocaml"; "link_vmm_stats"] (A "stats/libvmm_stats_stubs.a"); + flag ["link"; "ocaml"; "link_vmm_stats"] + (S ([A "stats/libvmm_stats_stubs.a"] @ vmm_lib)); dep ["link"; "ocaml"; "use_vmm_stats"] ["stats/libvmm_stats_stubs.a"]; dep ["link"; "ocaml"; "link_vmm_stats"] ["stats/libvmm_stats_stubs.a"]; | _ -> () diff --git a/pkg/pkg.ml b/pkg/pkg.ml index d786be0..f2ae5a5 100644 --- a/pkg/pkg.ml +++ b/pkg/pkg.ml @@ -18,4 +18,5 @@ let () = Pkg.bin "provision/vmm_gen_ca" ; Pkg.clib "stats/libvmm_stats_stubs.clib" ; Pkg.bin "stats/vmm_stats_lwt" ; + Pkg.bin "app/vmm_prometheus_stats" ; ] diff --git a/src/vmm_commands.ml b/src/vmm_commands.ml index af1683e..328c392 100644 --- a/src/vmm_commands.ml +++ b/src/vmm_commands.ml @@ -177,6 +177,14 @@ let exec dir vm fifo vmimage taps = Logs.debug (fun m -> m "creating process"); let pid = create_process prog line stdout stdout in Logs.debug (fun m -> m "created process %d: %a" pid Bos.Cmd.pp cmd) ; + (* on FreeBSD we need to chmod g+rw /dev/vmm/ukvm$pid to run + bhyvectl --get-stats --vm=ukvm$pid as non-priviliged user *) + (Lazy.force (uname ()) >>= fun (sys, _) -> + match sys with + | x when x = "FreeBSD" -> + let dev = "/dev/vmm/ukvm" ^ string_of_int pid in + Bos.OS.Cmd.run Bos.Cmd.(v "chmod" % "g+rw" % dev) + | _ -> Ok ()) >>= fun () -> Ok { config = vm ; cmd ; pid ; taps ; stdout } with Unix.Unix_error (e, _, _) -> diff --git a/src/vmm_core.ml b/src/vmm_core.ml index 2733db6..88ddd59 100644 --- a/src/vmm_core.ml +++ b/src/vmm_core.ml @@ -227,6 +227,11 @@ let pp_vm ppf vm = vm.pid Fmt.(list ~sep:(unit ", ") string) vm.taps Bos.Cmd.pp vm.cmd +let translate_tap vm tap = + match List.filter (fun (t, b) -> tap = t) (List.combine vm.taps vm.config.network) with + | [ (_, b) ] -> Some b + | _ -> None + let identifier serial = match Hex.of_cstruct @@ Nocrypto.Hash.SHA256.digest @@ Nocrypto.Numeric.Z.to_cstruct_be @@ serial diff --git a/src/vmm_engine.ml b/src/vmm_engine.ml index 691e461..cb7eb84 100644 --- a/src/vmm_engine.ml +++ b/src/vmm_engine.ml @@ -17,7 +17,7 @@ type ('a, 'b) t = { console_version : Vmm_wire.version ; stats_socket : 'a option ; stats_counter : int ; - stats_requests : ('b * int) IM.t ; + stats_requests : ('b * int * (string -> string option)) IM.t ; stats_version : Vmm_wire.version ; log_socket : 'a ; log_counter : int ; @@ -256,8 +256,9 @@ let handle_command t s prefix perms hdr buf = | None -> Error (`Msg "no statistics available") | Some _ -> match Vmm_resources.find_vm t.resources arg with | Some vm -> - let stat_out = Vmm_wire.Stats.stat t.stats_counter t.stats_version vm.Vmm_core.pid in - let stats_requests = IM.add t.stats_counter (s, hdr.Vmm_wire.id) t.stats_requests in + let stat_out = Vmm_wire.Stats.stat t.stats_counter t.stats_version vm.pid in + let d = (s, hdr.Vmm_wire.id, translate_tap vm) in + let stats_requests = IM.add t.stats_counter d t.stats_requests in Ok ({ t with stats_counter = succ t.stats_counter ; stats_requests }, stat t stat_out) | _ -> Error (`Msg ("statistics: not found " ^ buf)) @@ -373,31 +374,33 @@ let handle_initial t s addr chain ca = (* TODO here: inspect top-level-cert of chain. may need to create bridges and/or block device subdirectory (zfs create) *) let prefix = List.map id chain in - let t, out = log t (Log.hdr prefix (id leaf), `Login addr) in + let login_hdr, login_ev = Log.hdr prefix (id leaf), `Login addr in + let t, out = log t (login_hdr, login_ev) in + let initial_out = `Tls (s, Vmm_wire.Client.log login_hdr login_ev t.client_version) in Vmm_asn.permissions_of_cert asn_version leaf >>= fun perms -> - if List.mem `Image perms && Vmm_asn.contains_vm leaf then - handle_create t prefix chain leaf >>= fun (file, cont) -> - let cons = Vmm_wire.Console.add t.console_counter t.console_version file in - Ok ({ t with console_counter = succ t.console_counter }, - `Raw (t.console_socket, cons) :: out, - `Create cont) - else if List.mem `Crl perms && Vmm_asn.contains_crl leaf then - handle_revocation t s leaf chain ca prefix - else - let log_attached = - if cmd_allowed perms `Log then - let pre = string_of_id prefix in - let v = match String.Map.find pre t.log_attached with - | None -> [] - | Some xs -> xs - in - String.Map.add pre ((s, id leaf) :: v) t.log_attached - else - t.log_attached - in - Ok ({ t with log_attached }, - out, - `Loop (prefix, perms)) + (if List.mem `Image perms && Vmm_asn.contains_vm leaf then + handle_create t prefix chain leaf >>= fun (file, cont) -> + let cons = Vmm_wire.Console.add t.console_counter t.console_version file in + Ok ({ t with console_counter = succ t.console_counter }, + [ `Raw (t.console_socket, cons) ], + `Create cont) + else if List.mem `Crl perms && Vmm_asn.contains_crl leaf then + handle_revocation t s leaf chain ca prefix + else + let log_attached = + if cmd_allowed perms `Log then + let pre = string_of_id prefix in + let v = match String.Map.find pre t.log_attached with + | None -> [] + | Some xs -> xs + in + String.Map.add pre ((s, id leaf) :: v) t.log_attached + else + t.log_attached + in + Ok ({ t with log_attached }, [], `Loop (prefix, perms)) + ) >>= fun (t, outs, res) -> + Ok (t, initial_out :: out @ outs, res) let handle_stat state hdr data = let open Vmm_wire in @@ -411,14 +414,30 @@ let handle_stat state hdr data = | exception Not_found -> Logs.err (fun m -> m "couldn't find stat request") ; state, [] - | (s, req_id) -> + | (s, req_id, f) -> let stats_requests = IM.remove hdr.id state.stats_requests in let state = { state with stats_requests } in let out = match Stats.int_to_op hdr.tag with | Some Stats.StatReply -> - let out = Client.stat data req_id state.client_version in - [ `Tls (s, out) ] + begin match Stats.decode_stats (Cstruct.of_string data) with + | Ok (ru, vmm, ifs) -> + let ifs = + List.map + (fun x -> + match f x.name with + | Some name -> { x with name } + | None -> x) + ifs + in + let data = Cstruct.to_string (Stats.encode_stats (ru, vmm, ifs)) in + let out = Client.stat data req_id state.client_version in + [ `Tls (s, out) ] + | Error (`Msg msg) -> + Logs.err (fun m -> m "error %s while decode statistics" msg) ; + let out = fail req_id state.client_version in + [ `Tls (s, out) ] + end | None when hdr.tag = fail_tag -> let out = fail ~msg:data req_id state.client_version in [ `Tls (s, out) ] diff --git a/src/vmm_lwt.ml b/src/vmm_lwt.ml index 81836d1..6d9ea20 100644 --- a/src/vmm_lwt.ml +++ b/src/vmm_lwt.ml @@ -13,7 +13,6 @@ let ret = function | Unix.WSTOPPED s -> `Stop s let wait_and_clear pid stdout = - let open Lwt.Infix in Lwt_unix.waitpid [] pid >>= fun (_, s) -> Logs.debug (fun m -> m "pid %d exited: %a" pid pp_process_status s) ; Vmm_commands.close_no_err stdout ; diff --git a/src/vmm_wire.ml b/src/vmm_wire.ml index 615bcce..8701d0c 100644 --- a/src/vmm_wire.ml +++ b/src/vmm_wire.ml @@ -326,7 +326,7 @@ module Stats = struct let decode_ifdata buf = decode_string buf >>= fun (name, l) -> - check_exact buf (l + 116) >>= fun () -> + check_len buf (l + 116) >>= fun () -> let cs = Cstruct.shift buf l in let flags = Cstruct.BE.get_uint32 cs 0 and send_length = Cstruct.BE.get_uint32 cs 4 @@ -369,23 +369,53 @@ module Stats = struct in Cstruct.to_string r - let encode_stats (ru, ifd) = + let encode_int64 i = + let cs = Cstruct.create 8 in + Cstruct.BE.set_uint64 cs 0 i ; + cs + + let decode_int64 ?(off = 0) cs = + check_len cs (8 + off) >>= fun () -> + Ok (Cstruct.BE.get_uint64 cs off) + + let encode_vmm_stats xs = + encode_int (List.length xs) :: + List.flatten + (List.map (fun (k, v) -> [ fst (encode_string k) ; encode_int64 v ]) xs) + + let decode_vmm_stats cs = + let rec go acc ctr buf = + if ctr = 0 then + Ok (List.rev acc, buf) + else + decode_string buf >>= fun (str, off) -> + decode_int64 ~off buf >>= fun v -> + go ((str, v) :: acc) (pred ctr) (Cstruct.shift buf (off + 8)) + in + decode_int cs >>= fun stat_num -> + go [] stat_num (Cstruct.shift cs 8) + + let encode_stats (ru, vmm, ifd) = Cstruct.concat - (encode_rusage ru :: List.map encode_ifdata ifd) + (encode_rusage ru :: + encode_vmm_stats vmm @ + encode_int (List.length ifd) :: List.map encode_ifdata ifd) let decode_stats cs = check_len cs 144 >>= fun () -> - let ru, ifd = Cstruct.split cs 144 in + let ru, rest = Cstruct.split cs 144 in decode_rusage ru >>= fun ru -> - let rec go acc buf = - if Cstruct.len buf = 0 then - Ok (List.rev acc) + decode_vmm_stats rest >>= fun (vmm, rest) -> + let rec go acc ctr buf = + if ctr = 0 then + Ok (List.rev acc, buf) else decode_ifdata buf >>= fun (this, used) -> - go (this :: acc) (Cstruct.shift buf used) + go (this :: acc) (pred ctr) (Cstruct.shift buf used) in - go [] ifd >>= fun ifs -> - Ok (ru, ifs) + decode_int rest >>= fun num_if -> + go [] num_if (Cstruct.shift rest 8) >>= fun (ifs, _rest) -> + Ok (ru, vmm, ifs) let decode_pid_taps data = decode_pid data >>= fun pid -> diff --git a/stats/vmm_stats.ml b/stats/vmm_stats.ml index a095d3d..77a6859 100644 --- a/stats/vmm_stats.ml +++ b/stats/vmm_stats.ml @@ -7,29 +7,32 @@ open Vmm_core external sysctl_rusage : int -> rusage = "vmmanage_sysctl_rusage" external sysctl_ifcount : unit -> int = "vmmanage_sysctl_ifcount" external sysctl_ifdata : int -> ifdata = "vmmanage_sysctl_ifdata" +external vmmapi_stats : string -> (string * int64) list = "vmmanage_vmmapi_stats" let my_version = `WV0 type t = { pid_nic : (int * string) list IM.t ; pid_rusage : rusage IM.t ; - old_pid_rusage : rusage IM.t ; + pid_vmmapi : (string * int64) list IM.t ; nic_ifdata : ifdata String.Map.t ; - old_nic_ifdata : ifdata String.Map.t ; } let empty () = - { pid_nic = IM.empty ; - pid_rusage = IM.empty ; nic_ifdata = String.Map.empty ; - old_pid_rusage = IM.empty ; old_nic_ifdata = String.Map.empty } + { pid_nic = IM.empty ; pid_rusage = IM.empty ; pid_vmmapi = IM.empty ; nic_ifdata = String.Map.empty } let rec safe_sysctl f arg = try Some (f arg) with | Unix.Unix_error (Unix.EINTR, _, _) -> safe_sysctl f arg | _ -> None +let vm_vmmapi_stats pid = + let name = "ukvm" ^ string_of_int pid in + vmmapi_stats name + let gather pid nics = safe_sysctl sysctl_rusage pid, + vm_vmmapi_stats pid, List.fold_left (fun ifd (nic, _) -> match safe_sysctl sysctl_ifdata nic with | None -> ifd @@ -37,17 +40,17 @@ let gather pid nics = String.Map.empty nics let tick t = - let pid_rusage, nic_ifdata = - IM.fold (fun pid nics (rus, ifds) -> - let ru, ifd = gather pid nics in + let pid_rusage, pid_vmmapi, nic_ifdata = + IM.fold (fun pid nics (rus, vmms, ifds) -> + let ru, vmm, ifd = gather pid nics in (match ru with | None -> rus | Some ru -> IM.add pid ru rus), + IM.add pid vmm vmms, String.Map.union (fun _k a _b -> Some a) ifd ifds) - t.pid_nic (IM.empty, String.Map.empty) + t.pid_nic (IM.empty, IM.empty, String.Map.empty) in - let old_pid_rusage, old_nic_ifdata = t.pid_rusage, t.nic_ifdata in - { t with pid_rusage ; nic_ifdata ; old_pid_rusage ; old_nic_ifdata } + { t with pid_rusage ; pid_vmmapi ; nic_ifdata } let add_pid t pid nics = match safe_sysctl sysctl_ifcount () with @@ -64,18 +67,14 @@ let add_pid t pid nics = in let nic_ids = go (List.length nics) [] max_nic in let pid_nic = IM.add pid nic_ids t.pid_nic in - let ru, ifd = gather pid nic_ids in - (match ru with - | None -> () - | Some ru -> Logs.info (fun m -> m "RU %a" pp_rusage ru)) ; - Logs.info (fun m -> m "interfaces: %a" Fmt.(list ~sep:(unit ",@ ") pp_ifdata) (snd (List.split (String.Map.bindings ifd)))) ; Ok { t with pid_nic } -(* TODO: we can now compute deltas: t contains also old ru & ifdata *) let stats t pid = try - let nics = IM.find pid t.pid_nic in - let ru = IM.find pid t.pid_rusage in + let nics = IM.find pid t.pid_nic + and ru = IM.find pid t.pid_rusage + and vmm = IM.find pid t.pid_vmmapi + in match List.fold_left (fun acc nic -> match String.Map.find nic t.nic_ifdata, acc with @@ -85,7 +84,7 @@ let stats t pid = (Some []) (snd (List.split nics)) with | None -> Error (`Msg "failed to find interface statistics") - | Some ifd -> Ok (ru, ifd) + | Some ifd -> Ok (ru, vmm, ifd) with | _ -> Error (`Msg "failed to find resource usage") diff --git a/stats/vmm_stats_lwt.ml b/stats/vmm_stats_lwt.ml index 02378c6..a29e041 100644 --- a/stats/vmm_stats_lwt.ml +++ b/stats/vmm_stats_lwt.ml @@ -38,7 +38,7 @@ let handle s addr () = let rec timer () = t := Vmm_stats.tick !t ; - Lwt_unix.sleep Duration.(to_f (of_min 5)) >>= fun () -> + Lwt_unix.sleep Duration.(to_f (of_sec 15)) >>= fun () -> timer () let jump _ file = diff --git a/stats/vmm_stats_stubs.c b/stats/vmm_stats_stubs.c index fc564e3..4db9a63 100644 --- a/stats/vmm_stats_stubs.c +++ b/stats/vmm_stats_stubs.c @@ -12,14 +12,13 @@ #include #include -#ifdef __FreeBSD__ -#include -#endif - #define Val32 caml_copy_int32 #define Val64 caml_copy_int64 #ifdef __FreeBSD__ +#include +#include + CAMLprim value vmmanage_sysctl_rusage (value pid_r) { CAMLparam1(pid_r); CAMLlocal3(res, utime, stime); @@ -71,6 +70,38 @@ CAMLprim value vmmanage_sysctl_rusage (value pid_r) { CAMLreturn(res); } +CAMLprim value vmmanage_vmmapi_stats (value name) { + CAMLparam1(name); + CAMLlocal3(res, tmp, t); + int i, num_stats; + uint64_t *stats; + const char *desc; + struct vmctx *ctx; + const char *devname; + + if (! caml_string_is_c_safe(name)) caml_raise_not_found(); + + devname = String_val(name); + ctx = vm_open(devname); + if (ctx == NULL) uerror("vm_open", Nothing); + + stats = vm_get_stats(ctx, 0, NULL, &num_stats); + if (stats != NULL) { + for (i = 0; i < num_stats; i++) { + tmp = caml_alloc(2, 0); + desc = vm_get_stat_desc(ctx, i); + Store_field (tmp, 0, caml_copy_string(desc)); + Store_field (tmp, 1, Val64(stats[i])); + t = caml_alloc(2, 0); + Store_field (t, 0, tmp); + Store_field (t, 1, res); + res = t; + } + } + CAMLreturn(res); +} + + CAMLprim value vmmanage_sysctl_ifcount (value unit) { CAMLparam1(unit); int data = 0; @@ -136,17 +167,21 @@ CAMLprim value vmmanage_sysctl_ifdata (value num) { CAMLprim value vmmanage_sysctl_rusage (value pid_r) { CAMLparam1(pid_r); - uerror("sysctl", Nothing); + uerror("sysctl_rusage", Nothing); } CAMLprim value vmmanage_sysctl_ifcount (value unit) { CAMLparam1(unit); - uerror("sysctl", Nothing); + uerror("sysctl_ifcount", Nothing); } CAMLprim value vmmanage_sysctl_ifdata (value num) { CAMLparam1(num); - uerror("sysctl", Nothing); + uerror("sysctl_ifdata", Nothing); } +CAMLprim value vmmanage_vmmapi_stats (value name) { + CAMLparam1(name); + uerror("vm_stat", Nothing); +} #endif