From f939ff5a588d1faa7660247ce233444c98133e1b Mon Sep 17 00:00:00 2001 From: Hannes Mehnert Date: Tue, 23 Oct 2018 00:02:05 +0200 Subject: [PATCH] influx stats --- app/vmm_console.ml | 44 ++++++++++----------- app/vmm_influxdb_stats.ml | 81 +++++++++++++++------------------------ app/vmm_log.ml | 10 +---- pkg/pkg.ml | 4 +- src/vmm_lwt.ml | 13 ++++--- src/vmm_lwt.mli | 2 + stats/vmm_stats.ml | 59 ++++++++++++++++------------ stats/vmm_stats_lwt.ml | 13 ++++--- 8 files changed, 107 insertions(+), 119 deletions(-) diff --git a/app/vmm_console.ml b/app/vmm_console.ml index 81f7572..10475c9 100644 --- a/app/vmm_console.ml +++ b/app/vmm_console.ml @@ -104,34 +104,30 @@ let handle s addr () = Logs.info (fun m -> m "handling connection %a" Vmm_lwt.pp_sockaddr addr) ; let rec loop () = Vmm_lwt.read_wire s >>= function - | Error (`Msg msg) -> - Logs.err (fun m -> m "error while reading %s" msg) ; - loop () | Error _ -> Logs.err (fun m -> m "exception while reading") ; Lwt.return_unit - | Ok (_, `Success _) -> - Logs.err (fun m -> m "unexpected success reply") ; + | Ok (header, `Command (`Console_cmd cmd)) -> + begin + (if not (Vmm_asn.version_eq header.Vmm_asn.version my_version) then + Lwt.return (Error (`Msg "ignoring data with bad version")) + else + match cmd with + | `Console_add -> add_fifo header.Vmm_asn.id + | `Console_subscribe -> subscribe s header.Vmm_asn.id + | `Console_data _ -> Lwt.return (Error (`Msg "unexpected command"))) >>= (function + | Ok msg -> Vmm_lwt.write_wire s (header, `Success (`String msg)) + | Error (`Msg msg) -> + Logs.err (fun m -> m "error while processing command: %s" msg) ; + Vmm_lwt.write_wire s (header, `Failure msg)) >>= function + | Ok () -> loop () + | Error _ -> + Logs.err (fun m -> m "exception while writing to socket") ; + Lwt.return_unit + end + | Ok wire -> + Logs.warn (fun m -> m "ignoring %a" Vmm_asn.pp_wire wire) ; loop () - | Ok (_, `Failure _) -> - Logs.err (fun m -> m "unexpected failure reply") ; - loop () - | Ok (header, `Command cmd) -> - (if not (Vmm_asn.version_eq header.Vmm_asn.version my_version) then - Lwt.return (Error (`Msg "ignoring data with bad version")) - else - match cmd with - | `Console_cmd `Console_add -> add_fifo header.Vmm_asn.id - | `Console_cmd `Console_subscribe -> subscribe s header.Vmm_asn.id - | _ -> Lwt.return (Error (`Msg "unexpected command"))) >>= (function - | Ok msg -> Vmm_lwt.write_wire s (header, `Success (`String msg)) - | Error (`Msg msg) -> - Logs.err (fun m -> m "error while processing command: %s" msg) ; - Vmm_lwt.write_wire s (header, `Failure msg)) >>= function - | Ok () -> loop () - | Error _ -> - Logs.err (fun m -> m "exception while writing to socket") ; - Lwt.return_unit in loop () >>= fun () -> Vmm_lwt.safe_close s >|= fun () -> diff --git a/app/vmm_influxdb_stats.ml b/app/vmm_influxdb_stats.ml index 61b6cbd..f2780e7 100644 --- a/app/vmm_influxdb_stats.ml +++ b/app/vmm_influxdb_stats.ml @@ -140,7 +140,7 @@ module P = struct vm ifd.name (String.concat ~sep:"," fields) end -let my_version = `WV2 +let my_version = `AV2 let command = ref 1L @@ -181,7 +181,6 @@ let rec read_sock_write_tcp c ?fd addr addrtype = None) >>= fun fd -> read_sock_write_tcp c ?fd addr addrtype | Some fd -> - let open Vmm_wire in Logs.debug (fun m -> m "reading from unix socket") ; Vmm_lwt.read_wire c >>= function | Error e -> @@ -190,60 +189,40 @@ let rec read_sock_write_tcp c ?fd addr addrtype = safe_close fd >>= fun () -> safe_close c >|= fun () -> true - | Ok (hdr, data) -> - if not (version_eq hdr.version my_version) then begin - Logs.err (fun m -> m "unknown wire protocol version") ; - safe_close fd >>= fun () -> - safe_close c >|= fun () -> - false - end else if Vmm_wire.is_fail hdr then begin - Logs.err (fun m -> m "failed to retrieve statistics") ; - safe_close fd >>= fun () -> - safe_close c >|= fun () -> - false - end else if Vmm_wire.is_reply hdr then begin - Logs.info (fun m -> m "received reply, continuing") ; - read_sock_write_tcp c ~fd addr addrtype - end else - (match Vmm_wire.Stats.int_to_op hdr.Vmm_wire.tag with - | Some Vmm_wire.Stats.Data -> - begin - let r = - let open Rresult.R.Infix in - Vmm_wire.decode_strings data >>= fun (id, off) -> - Vmm_wire.Stats.decode_stats (Cstruct.shift data off) >>| fun stats -> - (Vmm_core.string_of_id id, stats) - in - match r with - | Error (`Msg msg) -> - Logs.warn (fun m -> m "error %s while decoding stats, ignoring" msg) ; - Lwt.return (Some fd) - | Ok (name, (ru, vmm, ifs)) -> - let ru = P.encode_ru name ru in - let vmm = match vmm with [] -> [] | _ -> [ P.encode_vmm name vmm ] in - let taps = List.map (P.encode_if name) ifs in - let out = (String.concat ~sep:"\n" (ru :: vmm @ taps)) ^ "\n" in - Logs.debug (fun m -> m "writing %d via tcp" (String.length out)) ; - Vmm_lwt.write_wire fd (Cstruct.of_string out) >>= function - | Ok () -> - Logs.debug (fun m -> m "wrote successfully") ; - Lwt.return (Some fd) - | Error e -> - Logs.err (fun m -> m "error %s while writing to tcp (%s)" - (str_of_e e) name) ; - safe_close fd >|= fun () -> - None - end - | _ -> - Logs.err (fun m -> m "unhandled tag %lu" hdr.tag) ; - Lwt.return (Some fd)) >>= fun fd -> + | Ok (hdr, `Command (`Stats_cmd (`Stats_data (ru, vmm, ifs)))) -> + begin + if not (Vmm_asn.version_eq hdr.Vmm_asn.version my_version) then begin + Logs.err (fun m -> m "unknown wire protocol version") ; + safe_close fd >>= fun () -> + safe_close c >|= fun () -> + false + end else + let name = string_of_id hdr.Vmm_asn.id in + let ru = P.encode_ru name ru in + let vmm = match vmm with [] -> [] | _ -> [ P.encode_vmm name vmm ] in + let taps = List.map (P.encode_if name) ifs in + let out = (String.concat ~sep:"\n" (ru :: vmm @ taps)) ^ "\n" in + Logs.debug (fun m -> m "writing %d via tcp" (String.length out)) ; + Vmm_lwt.write_raw fd (Bytes.unsafe_of_string out) >>= function + | Ok () -> + Logs.debug (fun m -> m "wrote successfully") ; + read_sock_write_tcp c ~fd addr addrtype + | Error e -> + Logs.err (fun m -> m "error %s while writing to tcp (%s)" + (str_of_e e) name) ; + safe_close fd >|= fun () -> + false + end + | Ok wire -> + Logs.warn (fun m -> m "ignoring %a" Vmm_asn.pp_wire wire) ; + Lwt.return (Some fd) >>= fun fd -> read_sock_write_tcp c ?fd addr addrtype let query_sock vm c = - let request = Vmm_wire.Stats.subscribe !command my_version vm in + let header = Vmm_asn.{ version = my_version ; sequence = !command ; id = vm } in command := Int64.succ !command ; Logs.debug (fun m -> m "%Lu requesting %a via socket" !command pp_id vm) ; - Vmm_lwt.write_wire c request + Vmm_lwt.write_wire c (header, `Command (`Stats_cmd `Stats_subscribe)) let rec maybe_connect stat_socket = let c = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in diff --git a/app/vmm_log.ml b/app/vmm_log.ml index 26b488c..a0fe782 100644 --- a/app/vmm_log.ml +++ b/app/vmm_log.ml @@ -98,12 +98,6 @@ let handle mvar ring s addr () = | Error _ -> Logs.err (fun m -> m "exception while reading") ; Lwt.return_unit - | Ok (_, `Failure _) -> - Logs.warn (fun m -> m "ignoring failure") ; - loop () - | Ok (_, `Success _) -> - Logs.warn (fun m -> m "ignoring success") ; - loop () | Ok (hdr, `Command (`Log_cmd lc)) -> if not (Vmm_asn.version_eq hdr.Vmm_asn.version my_version) then begin Logs.warn (fun m -> m "unsupported version") ; @@ -140,8 +134,8 @@ let handle mvar ring s addr () = Lwt.return_unit | Ok () -> loop () (* TODO no need to loop ;) *) end - | _ -> - Logs.err (fun m -> m "unknown command") ; + | Ok wire -> + Logs.warn (fun m -> m "ignoring %a" Vmm_asn.pp_wire wire) ; loop () in loop () >>= fun () -> diff --git a/pkg/pkg.ml b/pkg/pkg.ml index bd35027..e558c46 100644 --- a/pkg/pkg.ml +++ b/pkg/pkg.ml @@ -20,6 +20,6 @@ let () = Pkg.bin "provision/vmm_revoke" ; Pkg.bin "provision/vmm_gen_ca" ; *) (* Pkg.clib "stats/libvmm_stats_stubs.clib" ; *) -(* Pkg.bin "stats/vmm_stats_lwt" ; - Pkg.bin "app/vmm_influxdb_stats" ; *) + Pkg.bin "stats/vmm_stats_lwt" ; + Pkg.bin "app/vmm_influxdb_stats" ; ] diff --git a/src/vmm_lwt.ml b/src/vmm_lwt.ml index 9017109..fc59ec8 100644 --- a/src/vmm_lwt.ml +++ b/src/vmm_lwt.ml @@ -79,11 +79,7 @@ let read_wire s = else Lwt.return (Error `Eof) -let write_wire s wire = - let data = Vmm_asn.wire_to_cstruct wire in - let dlen = Cstruct.create 4 in - Cstruct.BE.set_uint32 dlen 0 (Int32.of_int (Cstruct.len data)) ; - let buf = Cstruct.(to_bytes (append dlen data)) in +let write_raw s buf = let rec w off l = Lwt.catch (fun () -> Lwt_unix.send s buf off l [] >>= fun n -> @@ -98,6 +94,13 @@ let write_wire s wire = (* Logs.debug (fun m -> m "writing %a" Cstruct.hexdump_pp (Cstruct.of_bytes buf)) ; *) w 0 (Bytes.length buf) +let write_wire s wire = + let data = Vmm_asn.wire_to_cstruct wire in + let dlen = Cstruct.create 4 in + Cstruct.BE.set_uint32 dlen 0 (Int32.of_int (Cstruct.len data)) ; + let buf = Cstruct.(to_bytes (append dlen data)) in + write_raw s buf + let safe_close fd = Lwt.catch (fun () -> Lwt_unix.close fd) diff --git a/src/vmm_lwt.mli b/src/vmm_lwt.mli index ea11a6d..c111b45 100644 --- a/src/vmm_lwt.mli +++ b/src/vmm_lwt.mli @@ -9,6 +9,8 @@ val wait_and_clear : val read_wire : Lwt_unix.file_descr -> (Vmm_asn.wire, [> `Eof | `Exception | `Toomuch ]) result Lwt.t +val write_raw : + Lwt_unix.file_descr -> bytes -> (unit, [> `Exception ]) result Lwt.t val write_wire : Lwt_unix.file_descr -> Vmm_asn.wire -> (unit, [> `Exception ]) result Lwt.t val safe_close : Lwt_unix.file_descr -> unit Lwt.t diff --git a/stats/vmm_stats.ml b/stats/vmm_stats.ml index 9b34541..4c8e752 100644 --- a/stats/vmm_stats.ml +++ b/stats/vmm_stats.ml @@ -16,7 +16,9 @@ external vmmapi_close : vmctx -> unit = "vmmanage_vmmapi_close" external vmmapi_statnames : vmctx -> string list = "vmmanage_vmmapi_statnames" external vmmapi_stats : vmctx -> int64 list = "vmmanage_vmmapi_stats" -let my_version = `WV2 +let my_version = `AV2 + +let bcast = ref 0L let descr = ref [] @@ -117,10 +119,10 @@ let tick t = match Vmm_core.drop_super ~super:id ~sub:vmid with | None -> Logs.err (fun m -> m "couldn't drop super %a from sub %a" Vmm_core.pp_id id Vmm_core.pp_id vmid) ; out | Some real_id -> - let name = Vmm_core.string_of_id real_id in - - let stats_encoded = Vmm_wire.Stats.(data 0L my_version name (encode_stats stats)) in - (socket, vmid, stats_encoded) :: out) + let header = Vmm_asn.{ version = my_version ; sequence = !bcast ; id = real_id } in + bcast := Int64.succ !bcast ; + let data = `Stats_data stats in + ((socket, vmid, (header, `Command (`Stats_cmd data))) :: out)) out xs) [] (Vmm_trie.all t'.vmid_pid) in @@ -171,29 +173,38 @@ let remove_vmid t vmid = let remove_vmids t vmids = List.fold_left remove_vmid t vmids -let handle t socket hdr cs = - let open Vmm_wire in - let open Vmm_wire.Stats in +let handle t socket (header, wire) = let r = - if not (version_eq my_version hdr.version) then + if not (Vmm_asn.version_eq my_version header.Vmm_asn.version) then Error (`Msg "cannot handle version") else - decode_strings cs >>= fun (id, off) -> - match int_to_op hdr.tag with - | Some Add -> - decode_pid_taps (Cstruct.shift cs off) >>= fun (pid, taps) -> - add_pid t id pid taps >>= fun t -> - Ok (t, `Add id, None, success ~msg:"added" my_version hdr.id (op_to_int Add)) - | Some Remove -> - let t = remove_vmid t id in - Ok (t, `Remove id, None, success ~msg:"removed" my_version hdr.id (op_to_int Remove)) - | Some Subscribe -> - let name_sockets, close = Vmm_trie.insert id socket t.name_sockets in - Ok ({ t with name_sockets }, `None, close, success ~msg:"subscribed" my_version hdr.id (op_to_int Subscribe)) - | _ -> Error (`Msg "unknown command") + match wire with + | `Command (`Stats_cmd cmd) -> + begin + let id = header.Vmm_asn.id in + match cmd with + | `Stats_add (pid, taps) -> + add_pid t id pid taps >>= fun t -> + Ok (t, `Add id, None, Some "added") + | `Stats_remove -> + let t = remove_vmid t id in + Ok (t, `Remove id, None, Some "removed") + | `Stats_subscribe -> + let name_sockets, close = Vmm_trie.insert id socket t.name_sockets in + Ok ({ t with name_sockets }, `None, close, Some "subscribed") + | _ -> Error (`Msg "unknown command") + end + | _ -> + Logs.warn (fun m -> m "ignoring %a" Vmm_asn.pp_wire (header, wire)) ; + Ok (t, `None, None, None) in match r with - | Ok (t, action, close, out) -> t, action, close, out + | Ok (t, action, close, out) -> + let out = match out with + | None -> None + | Some str -> Some (header, `Success (`String str)) + in + t, action, close, out | Error (`Msg msg) -> Logs.err (fun m -> m "error while processing %s" msg) ; - t, `None, None, fail ~msg my_version hdr.id + t, `None, None, Some (header, `Failure msg) diff --git a/stats/vmm_stats_lwt.ml b/stats/vmm_stats_lwt.ml index 0300e4d..532bb11 100644 --- a/stats/vmm_stats_lwt.ml +++ b/stats/vmm_stats_lwt.ml @@ -27,8 +27,8 @@ let handle s addr () = Vmm_lwt.read_wire s >>= function | Error (`Msg msg) -> Logs.err (fun m -> m "error while reading %s" msg) ; loop acc | Error _ -> Logs.err (fun m -> m "exception while reading") ; Lwt.return acc - | Ok (hdr, data) -> - let t', action, close, out = Vmm_stats.handle !t s hdr data in + | Ok wire -> + let t', action, close, out = Vmm_stats.handle !t s wire in let acc = match action with | `Add pid -> pid :: acc | `Remove pid -> List.filter (fun m -> m <> pid) acc @@ -36,9 +36,12 @@ let handle s addr () = in t := t' ; (match close with None -> Lwt.return_unit | Some s' -> Vmm_lwt.safe_close s') >>= fun () -> - Vmm_lwt.write_wire s out >>= function - | Ok () -> loop acc - | Error _ -> Logs.err (fun m -> m "exception while writing") ; Lwt.return acc + match out with + | None -> loop acc + | Some out -> + Vmm_lwt.write_wire s out >>= function + | Ok () -> loop acc + | Error _ -> Logs.err (fun m -> m "exception while writing") ; Lwt.return acc in loop [] >>= fun vmids -> Vmm_lwt.safe_close s >|= fun () ->