From 9e4cb94884a6c1f16b2e5dc6df181c3f7985f29c Mon Sep 17 00:00:00 2001 From: Hannes Mehnert Date: Sat, 7 Jul 2018 12:38:29 +0200 Subject: [PATCH] vmm_influxdb: improve connection handling (next attempt to not leak fds and reconnect on demand) --- .merlin | 3 +- app/vmm_influxdb_stats.ml | 224 ++++++++++++++++++++++---------------- 2 files changed, 131 insertions(+), 96 deletions(-) diff --git a/.merlin b/.merlin index aaed434..3014a5d 100644 --- a/.merlin +++ b/.merlin @@ -6,4 +6,5 @@ S provision B _build/** PKG topkg logs ipaddr x509 tls rresult bos lwt cmdliner hex cstruct.ppx duration -PKG ptime ptime.clock.os ipaddr.unix decompress \ No newline at end of file +PKG ptime ptime.clock.os ipaddr.unix decompress +PKG lwt.unix \ No newline at end of file diff --git a/app/vmm_influxdb_stats.ml b/app/vmm_influxdb_stats.ml index ecbdd96..8a3c89d 100644 --- a/app/vmm_influxdb_stats.ml +++ b/app/vmm_influxdb_stats.ml @@ -160,94 +160,114 @@ let safe_close s = Logs.err (fun m -> m "exception %s while closing" (Printexc.to_string e)) ; Lwt.return_unit) -let rec read_sock_write_tcp db c ?fd addr addrtype = +let rec read_sock_write_tcp closing db c ?fd addr addrtype = match fd with | None -> - Logs.debug (fun m -> m "new connection to TCP") ; - let fd = Lwt_unix.socket addrtype Lwt_unix.SOCK_STREAM 0 in - Lwt_unix.setsockopt fd Lwt_unix.SO_KEEPALIVE true ; - Lwt.catch - (fun () -> - Lwt_unix.connect fd addr >|= fun () -> - Logs.debug (fun m -> m "connected to TCP")) - (fun e -> - let addr', port = match addr with - | Lwt_unix.ADDR_INET (ip, port) -> Unix.string_of_inet_addr ip, port - | Lwt_unix.ADDR_UNIX addr -> addr, 0 - in - Logs.warn (fun m -> m "error %s connecting to influxd %s:%d, retrying in 5s" - (Printexc.to_string e) addr' port) ; - safe_close fd >>= fun () -> - Lwt_unix.sleep 5.0 >>= fun () -> - read_sock_write_tcp db c addr addrtype) >>= fun () -> - read_sock_write_tcp db c ~fd addr addrtype + if !closing then + Lwt.return_unit + else begin + Logs.debug (fun m -> m "new connection to TCP") ; + let fd = Lwt_unix.socket addrtype Lwt_unix.SOCK_STREAM 0 in + Lwt_unix.setsockopt fd Lwt_unix.SO_KEEPALIVE true ; + Lwt.catch + (fun () -> + Lwt_unix.connect fd addr >|= fun () -> + Logs.debug (fun m -> m "connected to TCP") ; + Some fd) + (fun e -> + let addr', port = match addr with + | Lwt_unix.ADDR_INET (ip, port) -> Unix.string_of_inet_addr ip, port + | Lwt_unix.ADDR_UNIX addr -> addr, 0 + in + Logs.warn (fun m -> m "error %s connecting to influxd %s:%d, retrying in 5s" + (Printexc.to_string e) addr' port) ; + safe_close fd >>= fun () -> + Lwt_unix.sleep 5.0 >|= fun () -> + None) >>= fun fd -> + read_sock_write_tcp closing db c ?fd addr addrtype + end | Some fd -> - let open Vmm_wire in - Logs.debug (fun m -> m "reading from unix socket") ; - Vmm_lwt.read_exactly c >>= function - | Error e -> - Logs.err (fun m -> m "error %s while reading vmm socket (return)" - (str_of_e e)) ; + if !closing then safe_close fd - | Ok (hdr, data) -> - if not (version_eq hdr.version my_version) then begin - Logs.err (fun m -> m "unknown wire protocol version") ; - safe_close fd - end else - let name = IM.find hdr.id !req in - req := IM.remove hdr.id !req ; - match Stats.int_to_op hdr.tag with - | Some Stats.Stat_reply -> - begin match Vmm_wire.Stats.decode_stats (Cstruct.of_string data) with - | Error (`Msg msg) -> - Logs.warn (fun m -> m "error %s while decoding stats %s, ignoring" - msg name) ; - read_sock_write_tcp db c ~fd addr addrtype - | Ok (ru, vmm, ifs) -> - let ru = P.encode_ru name ru in - let vmm = P.encode_vmm name vmm in - let taps = List.map (P.encode_if name) ifs in - let out = (String.concat ~sep:"\n" (ru :: vmm :: taps)) ^ "\n" in - Logs.debug (fun m -> m "writing %d via tcp" (String.length out)) ; - Vmm_lwt.write_raw fd out >>= function - | Ok () -> - Logs.debug (fun m -> m "wrote successfully") ; - read_sock_write_tcp db c ~fd addr addrtype - | Error e -> - Logs.err (fun m -> m "error %s while writing to tcp (%s)" - (str_of_e e) name) ; - safe_close fd >>= fun () -> - read_sock_write_tcp db c addr addrtype - end - | _ when hdr.tag = fail_tag -> - Logs.err (fun m -> m "failed to retrieve statistics for %s" name) ; - read_sock_write_tcp db c ~fd addr addrtype - | _ -> - Logs.err (fun m -> m "unhandled tag %d for %s" hdr.tag name) ; - read_sock_write_tcp db c ~fd addr addrtype - -let rec query_sock prefix db c interval = - (* query c for everyone in db *) - Lwt_list.iter_s (fun (id, name) -> - let id = identifier id in - let id = match prefix with None -> id | Some p -> p ^ "." ^ id in - let request = Vmm_wire.Stats.stat !command my_version id in - req := IM.add !command name !req ; - incr command ; - Logs.debug (fun m -> m "%d requesting %s via socket" !command id) ; - Vmm_lwt.write_raw c request >>= function - | Ok () -> - Logs.debug (fun m -> m "%d done" !command) ; - Lwt.return_unit + else begin + let open Vmm_wire in + Logs.debug (fun m -> m "reading from unix socket") ; + Vmm_lwt.read_exactly c >>= function | Error e -> - Logs.err (fun m -> m "error while writing to vmm socket %s: %s" - id (str_of_e e)) ; - Lwt.fail_with "exception while writing") - db >>= fun () -> - Lwt_unix.sleep (float_of_int interval) >>= fun () -> - query_sock prefix db c interval + Logs.err (fun m -> m "error %s while reading vmm socket (return)" + (str_of_e e)) ; + closing := true ; + safe_close fd + | Ok (hdr, data) -> + if not (version_eq hdr.version my_version) then begin + Logs.err (fun m -> m "unknown wire protocol version") ; + closing := true ; + safe_close fd + end else + let name = + try IM.find hdr.id !req + with Not_found -> "not found" + in + req := IM.remove hdr.id !req ; + begin match Stats.int_to_op hdr.tag with + | Some Stats.Stat_reply -> + begin match Vmm_wire.Stats.decode_stats (Cstruct.of_string data) with + | Error (`Msg msg) -> + Logs.warn (fun m -> m "error %s while decoding stats %s, ignoring" + msg name) ; + Lwt.return (Some fd) + | Ok (ru, vmm, ifs) -> + let ru = P.encode_ru name ru in + let vmm = P.encode_vmm name vmm in + let taps = List.map (P.encode_if name) ifs in + let out = (String.concat ~sep:"\n" (ru :: vmm :: taps)) ^ "\n" in + Logs.debug (fun m -> m "writing %d via tcp" (String.length out)) ; + Vmm_lwt.write_raw fd out >>= function + | Ok () -> + Logs.debug (fun m -> m "wrote successfully") ; + Lwt.return (Some fd) + | Error e -> + Logs.err (fun m -> m "error %s while writing to tcp (%s)" + (str_of_e e) name) ; + safe_close fd >|= fun () -> + None + end + | _ when hdr.tag = fail_tag -> + Logs.err (fun m -> m "failed to retrieve statistics for %s" name) ; + Lwt.return (Some fd) + | _ -> + Logs.err (fun m -> m "unhandled tag %d for %s" hdr.tag name) ; + Lwt.return (Some fd) + end >>= fun fd -> + read_sock_write_tcp closing db c ?fd addr addrtype + end -let maybe_connect stat_socket = +let rec query_sock closing prefix db c interval = + (* query c for everyone in db *) + if !closing then + Lwt.return_unit + else + Lwt_list.fold_left_s (fun r (id, name) -> + match r with + | Error e -> Lwt.return (Error e) + | Ok () -> + let id = identifier id in + let id = match prefix with None -> id | Some p -> p ^ "." ^ id in + let request = Vmm_wire.Stats.stat !command my_version id in + req := IM.add !command name !req ; + incr command ; + Logs.debug (fun m -> m "%d requesting %s via socket" !command id) ; + Vmm_lwt.write_raw c request) + (Ok ()) db >>= function + | Error e -> + Logs.err (fun m -> m "error %s while writing to vmm socket" (str_of_e e)) ; + closing := true ; + Lwt.return_unit + | Ok () -> + Lwt_unix.sleep (float_of_int interval) >>= fun () -> + query_sock closing prefix db c interval + +let rec maybe_connect stat_socket = let c = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in Lwt.catch (fun () -> @@ -258,7 +278,9 @@ let maybe_connect stat_socket = (fun e -> Logs.warn (fun m -> m "error %s connecting to socket %s" (Printexc.to_string e) stat_socket) ; - Lwt.fail_with "cannot connect to stat socket") + safe_close c >>= fun () -> + Lwt_unix.sleep (float_of_int 5) >>= fun () -> + maybe_connect stat_socket) let client stat_socket influxhost influxport db prefix interval = (* start a socket connection to vmm_stats *) @@ -272,16 +294,27 @@ let client stat_socket influxhost influxport db prefix interval = in (* loop *) + (* the query task queries the stat_socket at each interval + - if this fails, closing is set to true (and unit is returned) + + the read_sock reads the stat_socket, and forwards to a TCP socket + - if closing is true, the TCP socket is closed and unit is returned + - if read on the unix domain socket fails, closing is set to true + (and unit is returned) *) + (* connection to the unix domain socket is managed in this loop only: + - maybe_connect attempts to establishes to it + - query_sock/read_sock_write_tcp write an read from it + - on failure in read or write, the TCP connection is closed, and loop + takes control: safe_close, maybe_connect, rinse, repeat *) let rec loop c = - Lwt.catch (fun () -> - Lwt.pick [ query_sock prefix db c interval ; read_sock_write_tcp db c addr addrtype ] >>= fun () -> - safe_close c >>= fun () -> - maybe_connect stat_socket >>= fun c -> - loop c) - (fun _ -> - safe_close c >>= fun () -> - maybe_connect stat_socket >>= fun c -> - loop c) + let closing = ref false in + Lwt.join [ + query_sock closing prefix db c interval ; + read_sock_write_tcp closing db c addr addrtype + ] >>= fun () -> + safe_close c >>= fun () -> + maybe_connect stat_socket >>= fun c -> + loop c in loop c @@ -322,15 +355,16 @@ let host_port : (string * int) Arg.converter = let socket = let doc = "Stat socket to connect onto" in - Arg.(required & pos 0 (some string) None & info [] ~doc) + let sock = Fpath.(to_string (Vmm_core.tmpdir / "stat" + "sock")) in + Arg.(value & opt string sock & info [ "s" ; "socket" ] ~doc) let influx = - Arg.(required & pos 1 (some host_port) None & info [] ~docv:"influx" + Arg.(required & pos 0 (some host_port) None & info [] ~docv:"influx" ~doc:"the influx hostname:port to connect to") let db = let doc = "VMID database" in - Arg.(required & pos 2 (some file) None & info [] ~doc) + Arg.(required & pos 1 (some file) None & info [] ~doc) let prefix = let doc = "prefix" in