diff --git a/app/vmm_influxdb_stats.ml b/app/vmm_influxdb_stats.ml index 5a4be8b..fdd67e4 100644 --- a/app/vmm_influxdb_stats.ml +++ b/app/vmm_influxdb_stats.ml @@ -74,8 +74,7 @@ WRITE module P = struct let tv (sec, usec) = Printf.sprintf "%Lu.%06d" sec usec - (* TODO: this should use an unsigned to string function *) - let i64 i = Int64.to_string i ^ "i" + let i64 i = Printf.sprintf "%Lui" i let encode_ru vm ru = let fields = @@ -113,7 +112,7 @@ module P = struct (String.concat ~sep:"," (List.map (fun (k, v) -> (escape k) ^ "=" ^ (i64 v)) xs)) - let i32 i = Int32.to_string i ^ "i" + let i32 i = Printf.sprintf "%lui" i let encode_if vm ifd = let fields = @@ -147,103 +146,127 @@ let command = ref 1 let (req : string IM.t ref) = ref IM.empty -let tcp_sent = ref 0 -let socket_read = ref 0 -let socket_sent = ref 0 - let str_of_e = function | `Eof -> "end of file" | `Exception -> "exception" | `Toomuch -> "too much" | `Msg m -> m -let rec read_sock db c fd = - let open Vmm_wire in - Logs.debug (fun m -> m "reading now (socket read %d, socket sent %d, tcp sent %d)" !socket_read !socket_sent !tcp_sent); - Vmm_lwt.read_exactly c >>= function - | Error e -> - Logs.err (fun m -> m "error %s while reading" (str_of_e e)) ; - Lwt.return_unit - | Ok (hdr, data) -> - Logs.debug (fun m -> m "%d read %d data" hdr.id (String.length data)) ; - socket_read := 8 (* hdr *) + String.length data + !socket_read ; - if not (version_eq hdr.version my_version) then begin - Logs.err (fun m -> m "unknown wire protocol version") ; +(* how many times did I write this now? *) +let safe_close s = Lwt.catch (fun () -> Lwt_unix.close s) (fun _ -> Lwt.return_unit) + +let rec read_sock_write_tcp db c ?fd addr addrtype = + match fd with + | None -> + let fd = Lwt_unix.socket addrtype Lwt_unix.SOCK_STREAM 0 in + Lwt_unix.setsockopt fd Lwt_unix.SO_KEEPALIVE true ; + Lwt.catch (fun () -> Lwt_unix.connect fd addr) + (fun e -> + let addr', port = match addr with + | Lwt_unix.ADDR_INET (ip, port) -> Unix.string_of_inet_addr ip, port + | Lwt_unix.ADDR_UNIX addr -> addr, 0 + in + Logs.warn (fun m -> m "error %s connecting to influxd %s:%d, retrying in 5s" + (Printexc.to_string e) addr' port) ; + safe_close fd >>= fun () -> + Lwt_unix.sleep 5.0 >>= fun () -> + read_sock_write_tcp db c addr addrtype) >>= fun () -> + read_sock_write_tcp db c ~fd addr addrtype + | Some fd -> + let open Vmm_wire in + Vmm_lwt.read_exactly c >>= function + | Error e -> + Logs.err (fun m -> m "error %s while reading vmm socket (return)" + (str_of_e e)) ; Lwt.return_unit - end else - let name = IM.find hdr.id !req in - Logs.debug (fun m -> m "talking about %s (socket read %d, socket sent %d, tcp sent %d)" name !socket_read !socket_sent !tcp_sent) ; - req := IM.remove hdr.id !req ; - match Stats.int_to_op hdr.tag with - | Some Stats.Stat_reply -> - begin match Vmm_wire.Stats.decode_stats (Cstruct.of_string data) with - | Error (`Msg msg) -> - Logs.warn (fun m -> m "couldn't decode stats for %s: %s" name msg) ; - read_sock db c fd - | Ok (ru, vmm, ifs) -> - let ru = P.encode_ru name ru in - let vmm = P.encode_vmm name vmm in - let taps = List.map (P.encode_if name) ifs in - let out = String.concat ~sep:"\n" (ru :: vmm :: taps @ [ "" ]) in - Logs.info (fun m -> m "%d writing for %s via tcp %d (total read %d, tcp send %d)" hdr.id name (String.length out) !socket_read !tcp_sent) ; - Vmm_lwt.write_raw fd out >>= function - | Ok () -> - tcp_sent := String.length out + !tcp_sent ; - Logs.debug (fun m -> m "%d successfully wrote, tcp_sent now %d" hdr.id !tcp_sent) ; - read_sock db c fd - | Error e -> - Logs.err (fun m -> m "%d error %s while writing (tcp_sent %d)" hdr.id (str_of_e e) !tcp_sent) ; - invalid_arg "failed to write via TCP" - end - | _ when hdr.tag = fail_tag -> - Logs.err (fun m -> m "failed to retrieve statistics for %s" name) ; - read_sock db c fd - | _ -> - Logs.err (fun m -> m "unhandled tag %d for %s" hdr.tag name) ; - read_sock db c fd + | Ok (hdr, data) -> + Logs.debug (fun m -> m "%d read %d data" hdr.id (String.length data)) ; + if not (version_eq hdr.version my_version) then begin + Logs.err (fun m -> m "unknown wire protocol version") ; + Lwt.return_unit + end else + let name = IM.find hdr.id !req in + req := IM.remove hdr.id !req ; + match Stats.int_to_op hdr.tag with + | Some Stats.Stat_reply -> + begin match Vmm_wire.Stats.decode_stats (Cstruct.of_string data) with + | Error (`Msg msg) -> + Logs.warn (fun m -> m "error %s while decoding stats %s, ignoring" + msg name) ; + read_sock_write_tcp db c ~fd addr addrtype + | Ok (ru, vmm, ifs) -> + let ru = P.encode_ru name ru in + let vmm = P.encode_vmm name vmm in + let taps = List.map (P.encode_if name) ifs in + let out = (String.concat ~sep:"\n" (ru :: vmm :: taps)) ^ "\n" in + Vmm_lwt.write_raw fd out >>= function + | Ok () -> read_sock_write_tcp db c ~fd addr addrtype + | Error e -> + Logs.err (fun m -> m "error %s while writing to tcp (%s)" + (str_of_e e) name) ; + safe_close fd >>= fun () -> + read_sock_write_tcp db c addr addrtype + end + | _ when hdr.tag = fail_tag -> + Logs.err (fun m -> m "failed to retrieve statistics for %s" name) ; + read_sock_write_tcp db c ~fd addr addrtype + | _ -> + Logs.err (fun m -> m "unhandled tag %d for %s" hdr.tag name) ; + read_sock_write_tcp db c ~fd addr addrtype let rec query_sock prefix db c interval = - Logs.debug (fun m -> m "querying socket (socket_send %d socket_read %d tcp_send %d)" - !socket_sent !socket_read !tcp_sent) ; (* query c for everyone in db *) Lwt_list.iter_s (fun (id, name) -> let id = identifier id in let id = match prefix with None -> id | Some p -> p ^ "." ^ id in let request = Vmm_wire.Stats.stat !command my_version id in - Logs.debug (fun m -> m "req for %s (command %d) len %d (total %d)" id !command (String.length request) !socket_sent) ; req := IM.add !command name !req ; incr command ; - socket_sent := !socket_sent + String.length request ; Vmm_lwt.write_raw c request >>= function - | Ok () -> - Logs.debug (fun m -> m "req for %s (command+1 %d) written (total %d)" id !command !socket_sent) ; - Lwt.return_unit + | Ok () -> Lwt.return_unit | Error e -> - Logs.err (fun m -> m "error while writing %s (command+1 %d): %s" id !command (str_of_e e)) ; + Logs.err (fun m -> m "error while writing to vmm socket %s: %s" + id (str_of_e e)) ; Lwt.fail_with "exception while writing") db >>= fun () -> Lwt_unix.sleep (float_of_int interval) >>= fun () -> query_sock prefix db c interval -let client stat_socket influxhost influxport db prefix interval = - (* start a socket connection to vmm_stats *) +let maybe_connect stat_socket = let c = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in - Lwt_unix.set_close_on_exec c ; - Lwt.catch (fun () -> Lwt_unix.(connect c (ADDR_UNIX stat_socket))) + Lwt.catch + (fun () -> + Lwt_unix.(connect c (ADDR_UNIX stat_socket)) >>= fun () -> + Lwt.return c) (fun e -> Logs.warn (fun m -> m "error %s connecting to socket %s" (Printexc.to_string e) stat_socket) ; - invalid_arg "cannot connect to stat socket") >>= fun () -> + Lwt.fail_with "cannot connect to stat socket") - (* setup remote connection to influx *) +let client stat_socket influxhost influxport db prefix interval = + (* start a socket connection to vmm_stats *) + maybe_connect stat_socket >>= fun c -> + + (* figure out address of influx *) Lwt_unix.gethostbyname influxhost >>= fun host_entry -> let host_inet_addr = Array.get host_entry.Lwt_unix.h_addr_list 0 in - let fd = Lwt_unix.socket host_entry.Lwt_unix.h_addrtype Lwt_unix.SOCK_STREAM 0 in - Lwt_unix.setsockopt fd Lwt_unix.SO_KEEPALIVE true ; - Lwt_unix.connect fd (Lwt_unix.ADDR_INET (host_inet_addr, influxport)) >>= fun () -> + let addr = Lwt_unix.ADDR_INET (host_inet_addr, influxport) + and addrtype = host_entry.Lwt_unix.h_addrtype + in (* loop *) - Lwt.join [ query_sock prefix db c interval ; read_sock db c fd ] + let rec loop c = + Lwt.catch (fun () -> + Lwt.pick [ + query_sock prefix db c interval ; + read_sock_write_tcp db c addr addrtype + ]) + (fun _ -> + safe_close c >>= fun () -> + maybe_connect stat_socket >>= fun c -> + loop c) + in + loop c let run_client _ socket (influxhost, influxport) db prefix interval = Sys.(set_signal sigpipe Signal_ignore) ; @@ -252,8 +275,7 @@ let run_client _ socket (influxhost, influxport) db prefix interval = match Bos.OS.File.read_lines (Fpath.v db) >>= parse_db with | Ok [] -> invalid_arg "empty database" | Ok db -> db - | Error (`Msg m) -> - invalid_arg ("couldn't parse database " ^ m) + | Error (`Msg m) -> invalid_arg ("couldn't parse database " ^ m) in Lwt_main.run (client socket influxhost influxport db prefix interval) diff --git a/app/vmm_prometheus_stats.ml b/app/vmm_prometheus_stats.ml index 6e1d2aa..1759eb1 100644 --- a/app/vmm_prometheus_stats.ml +++ b/app/vmm_prometheus_stats.ml @@ -38,7 +38,7 @@ module P = struct [ p "HELP" help ; p "TYPE" (t_s typ) ; name ^ " " ^ value ] let tv (sec, usec) = Printf.sprintf "%Lu.%06d" sec usec - let i64 = Int64.to_string + let i64 i = Printf.sprintf "%Lu" i let encode_ru vm ru = let p = p vm in @@ -87,7 +87,7 @@ module P = struct String.concat ~sep:"\n" (List.map (fun (k, v) -> p (massage k) k (i64 v)) xs) - let i32 = Int32.to_string + let i32 i = Printf.sprintf "%lu" i let encode_if vm ifd = let p = p (vm ^ "_" ^ ifd.name) in