vmm_influxdb_stats: proper error handling

This commit is contained in:
Hannes Mehnert 2018-05-02 12:17:14 +02:00
parent 3cec5dd35d
commit e25d15ee1a
2 changed files with 94 additions and 72 deletions

View file

@ -74,8 +74,7 @@ WRITE
module P = struct
let tv (sec, usec) = Printf.sprintf "%Lu.%06d" sec usec
(* TODO: this should use an unsigned to string function *)
let i64 i = Int64.to_string i ^ "i"
let i64 i = Printf.sprintf "%Lui" i
let encode_ru vm ru =
let fields =
@ -113,7 +112,7 @@ module P = struct
(String.concat ~sep:","
(List.map (fun (k, v) -> (escape k) ^ "=" ^ (i64 v)) xs))
let i32 i = Int32.to_string i ^ "i"
let i32 i = Printf.sprintf "%lui" i
let encode_if vm ifd =
let fields =
@ -147,103 +146,127 @@ let command = ref 1
let (req : string IM.t ref) = ref IM.empty
let tcp_sent = ref 0
let socket_read = ref 0
let socket_sent = ref 0
let str_of_e = function
| `Eof -> "end of file"
| `Exception -> "exception"
| `Toomuch -> "too much"
| `Msg m -> m
let rec read_sock db c fd =
let open Vmm_wire in
Logs.debug (fun m -> m "reading now (socket read %d, socket sent %d, tcp sent %d)" !socket_read !socket_sent !tcp_sent);
Vmm_lwt.read_exactly c >>= function
| Error e ->
Logs.err (fun m -> m "error %s while reading" (str_of_e e)) ;
Lwt.return_unit
| Ok (hdr, data) ->
Logs.debug (fun m -> m "%d read %d data" hdr.id (String.length data)) ;
socket_read := 8 (* hdr *) + String.length data + !socket_read ;
if not (version_eq hdr.version my_version) then begin
Logs.err (fun m -> m "unknown wire protocol version") ;
(* how many times did I write this now? *)
let safe_close s = Lwt.catch (fun () -> Lwt_unix.close s) (fun _ -> Lwt.return_unit)
let rec read_sock_write_tcp db c ?fd addr addrtype =
match fd with
| None ->
let fd = Lwt_unix.socket addrtype Lwt_unix.SOCK_STREAM 0 in
Lwt_unix.setsockopt fd Lwt_unix.SO_KEEPALIVE true ;
Lwt.catch (fun () -> Lwt_unix.connect fd addr)
(fun e ->
let addr', port = match addr with
| Lwt_unix.ADDR_INET (ip, port) -> Unix.string_of_inet_addr ip, port
| Lwt_unix.ADDR_UNIX addr -> addr, 0
in
Logs.warn (fun m -> m "error %s connecting to influxd %s:%d, retrying in 5s"
(Printexc.to_string e) addr' port) ;
safe_close fd >>= fun () ->
Lwt_unix.sleep 5.0 >>= fun () ->
read_sock_write_tcp db c addr addrtype) >>= fun () ->
read_sock_write_tcp db c ~fd addr addrtype
| Some fd ->
let open Vmm_wire in
Vmm_lwt.read_exactly c >>= function
| Error e ->
Logs.err (fun m -> m "error %s while reading vmm socket (return)"
(str_of_e e)) ;
Lwt.return_unit
end else
let name = IM.find hdr.id !req in
Logs.debug (fun m -> m "talking about %s (socket read %d, socket sent %d, tcp sent %d)" name !socket_read !socket_sent !tcp_sent) ;
req := IM.remove hdr.id !req ;
match Stats.int_to_op hdr.tag with
| Some Stats.Stat_reply ->
begin match Vmm_wire.Stats.decode_stats (Cstruct.of_string data) with
| Error (`Msg msg) ->
Logs.warn (fun m -> m "couldn't decode stats for %s: %s" name msg) ;
read_sock db c fd
| Ok (ru, vmm, ifs) ->
let ru = P.encode_ru name ru in
let vmm = P.encode_vmm name vmm in
let taps = List.map (P.encode_if name) ifs in
let out = String.concat ~sep:"\n" (ru :: vmm :: taps @ [ "" ]) in
Logs.info (fun m -> m "%d writing for %s via tcp %d (total read %d, tcp send %d)" hdr.id name (String.length out) !socket_read !tcp_sent) ;
Vmm_lwt.write_raw fd out >>= function
| Ok () ->
tcp_sent := String.length out + !tcp_sent ;
Logs.debug (fun m -> m "%d successfully wrote, tcp_sent now %d" hdr.id !tcp_sent) ;
read_sock db c fd
| Error e ->
Logs.err (fun m -> m "%d error %s while writing (tcp_sent %d)" hdr.id (str_of_e e) !tcp_sent) ;
invalid_arg "failed to write via TCP"
end
| _ when hdr.tag = fail_tag ->
Logs.err (fun m -> m "failed to retrieve statistics for %s" name) ;
read_sock db c fd
| _ ->
Logs.err (fun m -> m "unhandled tag %d for %s" hdr.tag name) ;
read_sock db c fd
| Ok (hdr, data) ->
Logs.debug (fun m -> m "%d read %d data" hdr.id (String.length data)) ;
if not (version_eq hdr.version my_version) then begin
Logs.err (fun m -> m "unknown wire protocol version") ;
Lwt.return_unit
end else
let name = IM.find hdr.id !req in
req := IM.remove hdr.id !req ;
match Stats.int_to_op hdr.tag with
| Some Stats.Stat_reply ->
begin match Vmm_wire.Stats.decode_stats (Cstruct.of_string data) with
| Error (`Msg msg) ->
Logs.warn (fun m -> m "error %s while decoding stats %s, ignoring"
msg name) ;
read_sock_write_tcp db c ~fd addr addrtype
| Ok (ru, vmm, ifs) ->
let ru = P.encode_ru name ru in
let vmm = P.encode_vmm name vmm in
let taps = List.map (P.encode_if name) ifs in
let out = (String.concat ~sep:"\n" (ru :: vmm :: taps)) ^ "\n" in
Vmm_lwt.write_raw fd out >>= function
| Ok () -> read_sock_write_tcp db c ~fd addr addrtype
| Error e ->
Logs.err (fun m -> m "error %s while writing to tcp (%s)"
(str_of_e e) name) ;
safe_close fd >>= fun () ->
read_sock_write_tcp db c addr addrtype
end
| _ when hdr.tag = fail_tag ->
Logs.err (fun m -> m "failed to retrieve statistics for %s" name) ;
read_sock_write_tcp db c ~fd addr addrtype
| _ ->
Logs.err (fun m -> m "unhandled tag %d for %s" hdr.tag name) ;
read_sock_write_tcp db c ~fd addr addrtype
let rec query_sock prefix db c interval =
Logs.debug (fun m -> m "querying socket (socket_send %d socket_read %d tcp_send %d)"
!socket_sent !socket_read !tcp_sent) ;
(* query c for everyone in db *)
Lwt_list.iter_s (fun (id, name) ->
let id = identifier id in
let id = match prefix with None -> id | Some p -> p ^ "." ^ id in
let request = Vmm_wire.Stats.stat !command my_version id in
Logs.debug (fun m -> m "req for %s (command %d) len %d (total %d)" id !command (String.length request) !socket_sent) ;
req := IM.add !command name !req ;
incr command ;
socket_sent := !socket_sent + String.length request ;
Vmm_lwt.write_raw c request >>= function
| Ok () ->
Logs.debug (fun m -> m "req for %s (command+1 %d) written (total %d)" id !command !socket_sent) ;
Lwt.return_unit
| Ok () -> Lwt.return_unit
| Error e ->
Logs.err (fun m -> m "error while writing %s (command+1 %d): %s" id !command (str_of_e e)) ;
Logs.err (fun m -> m "error while writing to vmm socket %s: %s"
id (str_of_e e)) ;
Lwt.fail_with "exception while writing")
db >>= fun () ->
Lwt_unix.sleep (float_of_int interval) >>= fun () ->
query_sock prefix db c interval
let client stat_socket influxhost influxport db prefix interval =
(* start a socket connection to vmm_stats *)
let maybe_connect stat_socket =
let c = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in
Lwt_unix.set_close_on_exec c ;
Lwt.catch (fun () -> Lwt_unix.(connect c (ADDR_UNIX stat_socket)))
Lwt.catch
(fun () ->
Lwt_unix.(connect c (ADDR_UNIX stat_socket)) >>= fun () ->
Lwt.return c)
(fun e ->
Logs.warn (fun m -> m "error %s connecting to socket %s"
(Printexc.to_string e) stat_socket) ;
invalid_arg "cannot connect to stat socket") >>= fun () ->
Lwt.fail_with "cannot connect to stat socket")
(* setup remote connection to influx *)
let client stat_socket influxhost influxport db prefix interval =
(* start a socket connection to vmm_stats *)
maybe_connect stat_socket >>= fun c ->
(* figure out address of influx *)
Lwt_unix.gethostbyname influxhost >>= fun host_entry ->
let host_inet_addr = Array.get host_entry.Lwt_unix.h_addr_list 0 in
let fd = Lwt_unix.socket host_entry.Lwt_unix.h_addrtype Lwt_unix.SOCK_STREAM 0 in
Lwt_unix.setsockopt fd Lwt_unix.SO_KEEPALIVE true ;
Lwt_unix.connect fd (Lwt_unix.ADDR_INET (host_inet_addr, influxport)) >>= fun () ->
let addr = Lwt_unix.ADDR_INET (host_inet_addr, influxport)
and addrtype = host_entry.Lwt_unix.h_addrtype
in
(* loop *)
Lwt.join [ query_sock prefix db c interval ; read_sock db c fd ]
let rec loop c =
Lwt.catch (fun () ->
Lwt.pick [
query_sock prefix db c interval ;
read_sock_write_tcp db c addr addrtype
])
(fun _ ->
safe_close c >>= fun () ->
maybe_connect stat_socket >>= fun c ->
loop c)
in
loop c
let run_client _ socket (influxhost, influxport) db prefix interval =
Sys.(set_signal sigpipe Signal_ignore) ;
@ -252,8 +275,7 @@ let run_client _ socket (influxhost, influxport) db prefix interval =
match Bos.OS.File.read_lines (Fpath.v db) >>= parse_db with
| Ok [] -> invalid_arg "empty database"
| Ok db -> db
| Error (`Msg m) ->
invalid_arg ("couldn't parse database " ^ m)
| Error (`Msg m) -> invalid_arg ("couldn't parse database " ^ m)
in
Lwt_main.run (client socket influxhost influxport db prefix interval)

View file

@ -38,7 +38,7 @@ module P = struct
[ p "HELP" help ; p "TYPE" (t_s typ) ; name ^ " " ^ value ]
let tv (sec, usec) = Printf.sprintf "%Lu.%06d" sec usec
let i64 = Int64.to_string
let i64 i = Printf.sprintf "%Lu" i
let encode_ru vm ru =
let p = p vm in
@ -87,7 +87,7 @@ module P = struct
String.concat ~sep:"\n"
(List.map (fun (k, v) -> p (massage k) k (i64 v)) xs)
let i32 = Int32.to_string
let i32 i = Printf.sprintf "%lu" i
let encode_if vm ifd =
let p = p (vm ^ "_" ^ ifd.name) in