influx stats

This commit is contained in:
Hannes Mehnert 2018-10-23 00:02:05 +02:00
parent 467debe303
commit f939ff5a58
8 changed files with 107 additions and 119 deletions

View File

@ -104,34 +104,30 @@ let handle s addr () =
Logs.info (fun m -> m "handling connection %a" Vmm_lwt.pp_sockaddr addr) ;
let rec loop () =
Vmm_lwt.read_wire s >>= function
| Error (`Msg msg) ->
Logs.err (fun m -> m "error while reading %s" msg) ;
loop ()
| Error _ ->
Logs.err (fun m -> m "exception while reading") ;
Lwt.return_unit
| Ok (_, `Success _) ->
Logs.err (fun m -> m "unexpected success reply") ;
| Ok (header, `Command (`Console_cmd cmd)) ->
begin
(if not (Vmm_asn.version_eq header.Vmm_asn.version my_version) then
Lwt.return (Error (`Msg "ignoring data with bad version"))
else
match cmd with
| `Console_add -> add_fifo header.Vmm_asn.id
| `Console_subscribe -> subscribe s header.Vmm_asn.id
| `Console_data _ -> Lwt.return (Error (`Msg "unexpected command"))) >>= (function
| Ok msg -> Vmm_lwt.write_wire s (header, `Success (`String msg))
| Error (`Msg msg) ->
Logs.err (fun m -> m "error while processing command: %s" msg) ;
Vmm_lwt.write_wire s (header, `Failure msg)) >>= function
| Ok () -> loop ()
| Error _ ->
Logs.err (fun m -> m "exception while writing to socket") ;
Lwt.return_unit
end
| Ok wire ->
Logs.warn (fun m -> m "ignoring %a" Vmm_asn.pp_wire wire) ;
loop ()
| Ok (_, `Failure _) ->
Logs.err (fun m -> m "unexpected failure reply") ;
loop ()
| Ok (header, `Command cmd) ->
(if not (Vmm_asn.version_eq header.Vmm_asn.version my_version) then
Lwt.return (Error (`Msg "ignoring data with bad version"))
else
match cmd with
| `Console_cmd `Console_add -> add_fifo header.Vmm_asn.id
| `Console_cmd `Console_subscribe -> subscribe s header.Vmm_asn.id
| _ -> Lwt.return (Error (`Msg "unexpected command"))) >>= (function
| Ok msg -> Vmm_lwt.write_wire s (header, `Success (`String msg))
| Error (`Msg msg) ->
Logs.err (fun m -> m "error while processing command: %s" msg) ;
Vmm_lwt.write_wire s (header, `Failure msg)) >>= function
| Ok () -> loop ()
| Error _ ->
Logs.err (fun m -> m "exception while writing to socket") ;
Lwt.return_unit
in
loop () >>= fun () ->
Vmm_lwt.safe_close s >|= fun () ->

View File

@ -140,7 +140,7 @@ module P = struct
vm ifd.name (String.concat ~sep:"," fields)
end
let my_version = `WV2
let my_version = `AV2
let command = ref 1L
@ -181,7 +181,6 @@ let rec read_sock_write_tcp c ?fd addr addrtype =
None) >>= fun fd ->
read_sock_write_tcp c ?fd addr addrtype
| Some fd ->
let open Vmm_wire in
Logs.debug (fun m -> m "reading from unix socket") ;
Vmm_lwt.read_wire c >>= function
| Error e ->
@ -190,60 +189,40 @@ let rec read_sock_write_tcp c ?fd addr addrtype =
safe_close fd >>= fun () ->
safe_close c >|= fun () ->
true
| Ok (hdr, data) ->
if not (version_eq hdr.version my_version) then begin
Logs.err (fun m -> m "unknown wire protocol version") ;
safe_close fd >>= fun () ->
safe_close c >|= fun () ->
false
end else if Vmm_wire.is_fail hdr then begin
Logs.err (fun m -> m "failed to retrieve statistics") ;
safe_close fd >>= fun () ->
safe_close c >|= fun () ->
false
end else if Vmm_wire.is_reply hdr then begin
Logs.info (fun m -> m "received reply, continuing") ;
read_sock_write_tcp c ~fd addr addrtype
end else
(match Vmm_wire.Stats.int_to_op hdr.Vmm_wire.tag with
| Some Vmm_wire.Stats.Data ->
begin
let r =
let open Rresult.R.Infix in
Vmm_wire.decode_strings data >>= fun (id, off) ->
Vmm_wire.Stats.decode_stats (Cstruct.shift data off) >>| fun stats ->
(Vmm_core.string_of_id id, stats)
in
match r with
| Error (`Msg msg) ->
Logs.warn (fun m -> m "error %s while decoding stats, ignoring" msg) ;
Lwt.return (Some fd)
| Ok (name, (ru, vmm, ifs)) ->
let ru = P.encode_ru name ru in
let vmm = match vmm with [] -> [] | _ -> [ P.encode_vmm name vmm ] in
let taps = List.map (P.encode_if name) ifs in
let out = (String.concat ~sep:"\n" (ru :: vmm @ taps)) ^ "\n" in
Logs.debug (fun m -> m "writing %d via tcp" (String.length out)) ;
Vmm_lwt.write_wire fd (Cstruct.of_string out) >>= function
| Ok () ->
Logs.debug (fun m -> m "wrote successfully") ;
Lwt.return (Some fd)
| Error e ->
Logs.err (fun m -> m "error %s while writing to tcp (%s)"
(str_of_e e) name) ;
safe_close fd >|= fun () ->
None
end
| _ ->
Logs.err (fun m -> m "unhandled tag %lu" hdr.tag) ;
Lwt.return (Some fd)) >>= fun fd ->
| Ok (hdr, `Command (`Stats_cmd (`Stats_data (ru, vmm, ifs)))) ->
begin
if not (Vmm_asn.version_eq hdr.Vmm_asn.version my_version) then begin
Logs.err (fun m -> m "unknown wire protocol version") ;
safe_close fd >>= fun () ->
safe_close c >|= fun () ->
false
end else
let name = string_of_id hdr.Vmm_asn.id in
let ru = P.encode_ru name ru in
let vmm = match vmm with [] -> [] | _ -> [ P.encode_vmm name vmm ] in
let taps = List.map (P.encode_if name) ifs in
let out = (String.concat ~sep:"\n" (ru :: vmm @ taps)) ^ "\n" in
Logs.debug (fun m -> m "writing %d via tcp" (String.length out)) ;
Vmm_lwt.write_raw fd (Bytes.unsafe_of_string out) >>= function
| Ok () ->
Logs.debug (fun m -> m "wrote successfully") ;
read_sock_write_tcp c ~fd addr addrtype
| Error e ->
Logs.err (fun m -> m "error %s while writing to tcp (%s)"
(str_of_e e) name) ;
safe_close fd >|= fun () ->
false
end
| Ok wire ->
Logs.warn (fun m -> m "ignoring %a" Vmm_asn.pp_wire wire) ;
Lwt.return (Some fd) >>= fun fd ->
read_sock_write_tcp c ?fd addr addrtype
let query_sock vm c =
let request = Vmm_wire.Stats.subscribe !command my_version vm in
let header = Vmm_asn.{ version = my_version ; sequence = !command ; id = vm } in
command := Int64.succ !command ;
Logs.debug (fun m -> m "%Lu requesting %a via socket" !command pp_id vm) ;
Vmm_lwt.write_wire c request
Vmm_lwt.write_wire c (header, `Command (`Stats_cmd `Stats_subscribe))
let rec maybe_connect stat_socket =
let c = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in

View File

@ -98,12 +98,6 @@ let handle mvar ring s addr () =
| Error _ ->
Logs.err (fun m -> m "exception while reading") ;
Lwt.return_unit
| Ok (_, `Failure _) ->
Logs.warn (fun m -> m "ignoring failure") ;
loop ()
| Ok (_, `Success _) ->
Logs.warn (fun m -> m "ignoring success") ;
loop ()
| Ok (hdr, `Command (`Log_cmd lc)) ->
if not (Vmm_asn.version_eq hdr.Vmm_asn.version my_version) then begin
Logs.warn (fun m -> m "unsupported version") ;
@ -140,8 +134,8 @@ let handle mvar ring s addr () =
Lwt.return_unit
| Ok () -> loop () (* TODO no need to loop ;) *)
end
| _ ->
Logs.err (fun m -> m "unknown command") ;
| Ok wire ->
Logs.warn (fun m -> m "ignoring %a" Vmm_asn.pp_wire wire) ;
loop ()
in
loop () >>= fun () ->

View File

@ -20,6 +20,6 @@ let () =
Pkg.bin "provision/vmm_revoke" ;
Pkg.bin "provision/vmm_gen_ca" ; *)
(* Pkg.clib "stats/libvmm_stats_stubs.clib" ; *)
(* Pkg.bin "stats/vmm_stats_lwt" ;
Pkg.bin "app/vmm_influxdb_stats" ; *)
Pkg.bin "stats/vmm_stats_lwt" ;
Pkg.bin "app/vmm_influxdb_stats" ;
]

View File

@ -79,11 +79,7 @@ let read_wire s =
else
Lwt.return (Error `Eof)
let write_wire s wire =
let data = Vmm_asn.wire_to_cstruct wire in
let dlen = Cstruct.create 4 in
Cstruct.BE.set_uint32 dlen 0 (Int32.of_int (Cstruct.len data)) ;
let buf = Cstruct.(to_bytes (append dlen data)) in
let write_raw s buf =
let rec w off l =
Lwt.catch (fun () ->
Lwt_unix.send s buf off l [] >>= fun n ->
@ -98,6 +94,13 @@ let write_wire s wire =
(* Logs.debug (fun m -> m "writing %a" Cstruct.hexdump_pp (Cstruct.of_bytes buf)) ; *)
w 0 (Bytes.length buf)
let write_wire s wire =
let data = Vmm_asn.wire_to_cstruct wire in
let dlen = Cstruct.create 4 in
Cstruct.BE.set_uint32 dlen 0 (Int32.of_int (Cstruct.len data)) ;
let buf = Cstruct.(to_bytes (append dlen data)) in
write_raw s buf
let safe_close fd =
Lwt.catch
(fun () -> Lwt_unix.close fd)

View File

@ -9,6 +9,8 @@ val wait_and_clear :
val read_wire :
Lwt_unix.file_descr ->
(Vmm_asn.wire, [> `Eof | `Exception | `Toomuch ]) result Lwt.t
val write_raw :
Lwt_unix.file_descr -> bytes -> (unit, [> `Exception ]) result Lwt.t
val write_wire :
Lwt_unix.file_descr -> Vmm_asn.wire -> (unit, [> `Exception ]) result Lwt.t
val safe_close : Lwt_unix.file_descr -> unit Lwt.t

View File

@ -16,7 +16,9 @@ external vmmapi_close : vmctx -> unit = "vmmanage_vmmapi_close"
external vmmapi_statnames : vmctx -> string list = "vmmanage_vmmapi_statnames"
external vmmapi_stats : vmctx -> int64 list = "vmmanage_vmmapi_stats"
let my_version = `WV2
let my_version = `AV2
let bcast = ref 0L
let descr = ref []
@ -117,10 +119,10 @@ let tick t =
match Vmm_core.drop_super ~super:id ~sub:vmid with
| None -> Logs.err (fun m -> m "couldn't drop super %a from sub %a" Vmm_core.pp_id id Vmm_core.pp_id vmid) ; out
| Some real_id ->
let name = Vmm_core.string_of_id real_id in
let stats_encoded = Vmm_wire.Stats.(data 0L my_version name (encode_stats stats)) in
(socket, vmid, stats_encoded) :: out)
let header = Vmm_asn.{ version = my_version ; sequence = !bcast ; id = real_id } in
bcast := Int64.succ !bcast ;
let data = `Stats_data stats in
((socket, vmid, (header, `Command (`Stats_cmd data))) :: out))
out xs)
[] (Vmm_trie.all t'.vmid_pid)
in
@ -171,29 +173,38 @@ let remove_vmid t vmid =
let remove_vmids t vmids =
List.fold_left remove_vmid t vmids
let handle t socket hdr cs =
let open Vmm_wire in
let open Vmm_wire.Stats in
let handle t socket (header, wire) =
let r =
if not (version_eq my_version hdr.version) then
if not (Vmm_asn.version_eq my_version header.Vmm_asn.version) then
Error (`Msg "cannot handle version")
else
decode_strings cs >>= fun (id, off) ->
match int_to_op hdr.tag with
| Some Add ->
decode_pid_taps (Cstruct.shift cs off) >>= fun (pid, taps) ->
add_pid t id pid taps >>= fun t ->
Ok (t, `Add id, None, success ~msg:"added" my_version hdr.id (op_to_int Add))
| Some Remove ->
let t = remove_vmid t id in
Ok (t, `Remove id, None, success ~msg:"removed" my_version hdr.id (op_to_int Remove))
| Some Subscribe ->
let name_sockets, close = Vmm_trie.insert id socket t.name_sockets in
Ok ({ t with name_sockets }, `None, close, success ~msg:"subscribed" my_version hdr.id (op_to_int Subscribe))
| _ -> Error (`Msg "unknown command")
match wire with
| `Command (`Stats_cmd cmd) ->
begin
let id = header.Vmm_asn.id in
match cmd with
| `Stats_add (pid, taps) ->
add_pid t id pid taps >>= fun t ->
Ok (t, `Add id, None, Some "added")
| `Stats_remove ->
let t = remove_vmid t id in
Ok (t, `Remove id, None, Some "removed")
| `Stats_subscribe ->
let name_sockets, close = Vmm_trie.insert id socket t.name_sockets in
Ok ({ t with name_sockets }, `None, close, Some "subscribed")
| _ -> Error (`Msg "unknown command")
end
| _ ->
Logs.warn (fun m -> m "ignoring %a" Vmm_asn.pp_wire (header, wire)) ;
Ok (t, `None, None, None)
in
match r with
| Ok (t, action, close, out) -> t, action, close, out
| Ok (t, action, close, out) ->
let out = match out with
| None -> None
| Some str -> Some (header, `Success (`String str))
in
t, action, close, out
| Error (`Msg msg) ->
Logs.err (fun m -> m "error while processing %s" msg) ;
t, `None, None, fail ~msg my_version hdr.id
t, `None, None, Some (header, `Failure msg)

View File

@ -27,8 +27,8 @@ let handle s addr () =
Vmm_lwt.read_wire s >>= function
| Error (`Msg msg) -> Logs.err (fun m -> m "error while reading %s" msg) ; loop acc
| Error _ -> Logs.err (fun m -> m "exception while reading") ; Lwt.return acc
| Ok (hdr, data) ->
let t', action, close, out = Vmm_stats.handle !t s hdr data in
| Ok wire ->
let t', action, close, out = Vmm_stats.handle !t s wire in
let acc = match action with
| `Add pid -> pid :: acc
| `Remove pid -> List.filter (fun m -> m <> pid) acc
@ -36,9 +36,12 @@ let handle s addr () =
in
t := t' ;
(match close with None -> Lwt.return_unit | Some s' -> Vmm_lwt.safe_close s') >>= fun () ->
Vmm_lwt.write_wire s out >>= function
| Ok () -> loop acc
| Error _ -> Logs.err (fun m -> m "exception while writing") ; Lwt.return acc
match out with
| None -> loop acc
| Some out ->
Vmm_lwt.write_wire s out >>= function
| Ok () -> loop acc
| Error _ -> Logs.err (fun m -> m "exception while writing") ; Lwt.return acc
in
loop [] >>= fun vmids ->
Vmm_lwt.safe_close s >|= fun () ->