vmm_influxdb: improve connection handling (next attempt to not leak fds and reconnect on demand)
This commit is contained in:
parent
43ee0cf4e0
commit
9e4cb94884
1
.merlin
1
.merlin
|
@ -7,3 +7,4 @@ B _build/**
|
|||
|
||||
PKG topkg logs ipaddr x509 tls rresult bos lwt cmdliner hex cstruct.ppx duration
|
||||
PKG ptime ptime.clock.os ipaddr.unix decompress
|
||||
PKG lwt.unix
|
|
@ -160,94 +160,114 @@ let safe_close s =
|
|||
Logs.err (fun m -> m "exception %s while closing" (Printexc.to_string e)) ;
|
||||
Lwt.return_unit)
|
||||
|
||||
let rec read_sock_write_tcp db c ?fd addr addrtype =
|
||||
let rec read_sock_write_tcp closing db c ?fd addr addrtype =
|
||||
match fd with
|
||||
| None ->
|
||||
Logs.debug (fun m -> m "new connection to TCP") ;
|
||||
let fd = Lwt_unix.socket addrtype Lwt_unix.SOCK_STREAM 0 in
|
||||
Lwt_unix.setsockopt fd Lwt_unix.SO_KEEPALIVE true ;
|
||||
Lwt.catch
|
||||
(fun () ->
|
||||
Lwt_unix.connect fd addr >|= fun () ->
|
||||
Logs.debug (fun m -> m "connected to TCP"))
|
||||
(fun e ->
|
||||
let addr', port = match addr with
|
||||
| Lwt_unix.ADDR_INET (ip, port) -> Unix.string_of_inet_addr ip, port
|
||||
| Lwt_unix.ADDR_UNIX addr -> addr, 0
|
||||
in
|
||||
Logs.warn (fun m -> m "error %s connecting to influxd %s:%d, retrying in 5s"
|
||||
(Printexc.to_string e) addr' port) ;
|
||||
safe_close fd >>= fun () ->
|
||||
Lwt_unix.sleep 5.0 >>= fun () ->
|
||||
read_sock_write_tcp db c addr addrtype) >>= fun () ->
|
||||
read_sock_write_tcp db c ~fd addr addrtype
|
||||
if !closing then
|
||||
Lwt.return_unit
|
||||
else begin
|
||||
Logs.debug (fun m -> m "new connection to TCP") ;
|
||||
let fd = Lwt_unix.socket addrtype Lwt_unix.SOCK_STREAM 0 in
|
||||
Lwt_unix.setsockopt fd Lwt_unix.SO_KEEPALIVE true ;
|
||||
Lwt.catch
|
||||
(fun () ->
|
||||
Lwt_unix.connect fd addr >|= fun () ->
|
||||
Logs.debug (fun m -> m "connected to TCP") ;
|
||||
Some fd)
|
||||
(fun e ->
|
||||
let addr', port = match addr with
|
||||
| Lwt_unix.ADDR_INET (ip, port) -> Unix.string_of_inet_addr ip, port
|
||||
| Lwt_unix.ADDR_UNIX addr -> addr, 0
|
||||
in
|
||||
Logs.warn (fun m -> m "error %s connecting to influxd %s:%d, retrying in 5s"
|
||||
(Printexc.to_string e) addr' port) ;
|
||||
safe_close fd >>= fun () ->
|
||||
Lwt_unix.sleep 5.0 >|= fun () ->
|
||||
None) >>= fun fd ->
|
||||
read_sock_write_tcp closing db c ?fd addr addrtype
|
||||
end
|
||||
| Some fd ->
|
||||
let open Vmm_wire in
|
||||
Logs.debug (fun m -> m "reading from unix socket") ;
|
||||
Vmm_lwt.read_exactly c >>= function
|
||||
| Error e ->
|
||||
Logs.err (fun m -> m "error %s while reading vmm socket (return)"
|
||||
(str_of_e e)) ;
|
||||
if !closing then
|
||||
safe_close fd
|
||||
| Ok (hdr, data) ->
|
||||
if not (version_eq hdr.version my_version) then begin
|
||||
Logs.err (fun m -> m "unknown wire protocol version") ;
|
||||
safe_close fd
|
||||
end else
|
||||
let name = IM.find hdr.id !req in
|
||||
req := IM.remove hdr.id !req ;
|
||||
match Stats.int_to_op hdr.tag with
|
||||
| Some Stats.Stat_reply ->
|
||||
begin match Vmm_wire.Stats.decode_stats (Cstruct.of_string data) with
|
||||
| Error (`Msg msg) ->
|
||||
Logs.warn (fun m -> m "error %s while decoding stats %s, ignoring"
|
||||
msg name) ;
|
||||
read_sock_write_tcp db c ~fd addr addrtype
|
||||
| Ok (ru, vmm, ifs) ->
|
||||
let ru = P.encode_ru name ru in
|
||||
let vmm = P.encode_vmm name vmm in
|
||||
let taps = List.map (P.encode_if name) ifs in
|
||||
let out = (String.concat ~sep:"\n" (ru :: vmm :: taps)) ^ "\n" in
|
||||
Logs.debug (fun m -> m "writing %d via tcp" (String.length out)) ;
|
||||
Vmm_lwt.write_raw fd out >>= function
|
||||
| Ok () ->
|
||||
Logs.debug (fun m -> m "wrote successfully") ;
|
||||
read_sock_write_tcp db c ~fd addr addrtype
|
||||
| Error e ->
|
||||
Logs.err (fun m -> m "error %s while writing to tcp (%s)"
|
||||
(str_of_e e) name) ;
|
||||
safe_close fd >>= fun () ->
|
||||
read_sock_write_tcp db c addr addrtype
|
||||
end
|
||||
| _ when hdr.tag = fail_tag ->
|
||||
Logs.err (fun m -> m "failed to retrieve statistics for %s" name) ;
|
||||
read_sock_write_tcp db c ~fd addr addrtype
|
||||
| _ ->
|
||||
Logs.err (fun m -> m "unhandled tag %d for %s" hdr.tag name) ;
|
||||
read_sock_write_tcp db c ~fd addr addrtype
|
||||
|
||||
let rec query_sock prefix db c interval =
|
||||
(* query c for everyone in db *)
|
||||
Lwt_list.iter_s (fun (id, name) ->
|
||||
let id = identifier id in
|
||||
let id = match prefix with None -> id | Some p -> p ^ "." ^ id in
|
||||
let request = Vmm_wire.Stats.stat !command my_version id in
|
||||
req := IM.add !command name !req ;
|
||||
incr command ;
|
||||
Logs.debug (fun m -> m "%d requesting %s via socket" !command id) ;
|
||||
Vmm_lwt.write_raw c request >>= function
|
||||
| Ok () ->
|
||||
Logs.debug (fun m -> m "%d done" !command) ;
|
||||
Lwt.return_unit
|
||||
else begin
|
||||
let open Vmm_wire in
|
||||
Logs.debug (fun m -> m "reading from unix socket") ;
|
||||
Vmm_lwt.read_exactly c >>= function
|
||||
| Error e ->
|
||||
Logs.err (fun m -> m "error while writing to vmm socket %s: %s"
|
||||
id (str_of_e e)) ;
|
||||
Lwt.fail_with "exception while writing")
|
||||
db >>= fun () ->
|
||||
Lwt_unix.sleep (float_of_int interval) >>= fun () ->
|
||||
query_sock prefix db c interval
|
||||
Logs.err (fun m -> m "error %s while reading vmm socket (return)"
|
||||
(str_of_e e)) ;
|
||||
closing := true ;
|
||||
safe_close fd
|
||||
| Ok (hdr, data) ->
|
||||
if not (version_eq hdr.version my_version) then begin
|
||||
Logs.err (fun m -> m "unknown wire protocol version") ;
|
||||
closing := true ;
|
||||
safe_close fd
|
||||
end else
|
||||
let name =
|
||||
try IM.find hdr.id !req
|
||||
with Not_found -> "not found"
|
||||
in
|
||||
req := IM.remove hdr.id !req ;
|
||||
begin match Stats.int_to_op hdr.tag with
|
||||
| Some Stats.Stat_reply ->
|
||||
begin match Vmm_wire.Stats.decode_stats (Cstruct.of_string data) with
|
||||
| Error (`Msg msg) ->
|
||||
Logs.warn (fun m -> m "error %s while decoding stats %s, ignoring"
|
||||
msg name) ;
|
||||
Lwt.return (Some fd)
|
||||
| Ok (ru, vmm, ifs) ->
|
||||
let ru = P.encode_ru name ru in
|
||||
let vmm = P.encode_vmm name vmm in
|
||||
let taps = List.map (P.encode_if name) ifs in
|
||||
let out = (String.concat ~sep:"\n" (ru :: vmm :: taps)) ^ "\n" in
|
||||
Logs.debug (fun m -> m "writing %d via tcp" (String.length out)) ;
|
||||
Vmm_lwt.write_raw fd out >>= function
|
||||
| Ok () ->
|
||||
Logs.debug (fun m -> m "wrote successfully") ;
|
||||
Lwt.return (Some fd)
|
||||
| Error e ->
|
||||
Logs.err (fun m -> m "error %s while writing to tcp (%s)"
|
||||
(str_of_e e) name) ;
|
||||
safe_close fd >|= fun () ->
|
||||
None
|
||||
end
|
||||
| _ when hdr.tag = fail_tag ->
|
||||
Logs.err (fun m -> m "failed to retrieve statistics for %s" name) ;
|
||||
Lwt.return (Some fd)
|
||||
| _ ->
|
||||
Logs.err (fun m -> m "unhandled tag %d for %s" hdr.tag name) ;
|
||||
Lwt.return (Some fd)
|
||||
end >>= fun fd ->
|
||||
read_sock_write_tcp closing db c ?fd addr addrtype
|
||||
end
|
||||
|
||||
let maybe_connect stat_socket =
|
||||
let rec query_sock closing prefix db c interval =
|
||||
(* query c for everyone in db *)
|
||||
if !closing then
|
||||
Lwt.return_unit
|
||||
else
|
||||
Lwt_list.fold_left_s (fun r (id, name) ->
|
||||
match r with
|
||||
| Error e -> Lwt.return (Error e)
|
||||
| Ok () ->
|
||||
let id = identifier id in
|
||||
let id = match prefix with None -> id | Some p -> p ^ "." ^ id in
|
||||
let request = Vmm_wire.Stats.stat !command my_version id in
|
||||
req := IM.add !command name !req ;
|
||||
incr command ;
|
||||
Logs.debug (fun m -> m "%d requesting %s via socket" !command id) ;
|
||||
Vmm_lwt.write_raw c request)
|
||||
(Ok ()) db >>= function
|
||||
| Error e ->
|
||||
Logs.err (fun m -> m "error %s while writing to vmm socket" (str_of_e e)) ;
|
||||
closing := true ;
|
||||
Lwt.return_unit
|
||||
| Ok () ->
|
||||
Lwt_unix.sleep (float_of_int interval) >>= fun () ->
|
||||
query_sock closing prefix db c interval
|
||||
|
||||
let rec maybe_connect stat_socket =
|
||||
let c = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in
|
||||
Lwt.catch
|
||||
(fun () ->
|
||||
|
@ -258,7 +278,9 @@ let maybe_connect stat_socket =
|
|||
(fun e ->
|
||||
Logs.warn (fun m -> m "error %s connecting to socket %s"
|
||||
(Printexc.to_string e) stat_socket) ;
|
||||
Lwt.fail_with "cannot connect to stat socket")
|
||||
safe_close c >>= fun () ->
|
||||
Lwt_unix.sleep (float_of_int 5) >>= fun () ->
|
||||
maybe_connect stat_socket)
|
||||
|
||||
let client stat_socket influxhost influxport db prefix interval =
|
||||
(* start a socket connection to vmm_stats *)
|
||||
|
@ -272,16 +294,27 @@ let client stat_socket influxhost influxport db prefix interval =
|
|||
in
|
||||
|
||||
(* loop *)
|
||||
(* the query task queries the stat_socket at each interval
|
||||
- if this fails, closing is set to true (and unit is returned)
|
||||
|
||||
the read_sock reads the stat_socket, and forwards to a TCP socket
|
||||
- if closing is true, the TCP socket is closed and unit is returned
|
||||
- if read on the unix domain socket fails, closing is set to true
|
||||
(and unit is returned) *)
|
||||
(* connection to the unix domain socket is managed in this loop only:
|
||||
- maybe_connect attempts to establishes to it
|
||||
- query_sock/read_sock_write_tcp write an read from it
|
||||
- on failure in read or write, the TCP connection is closed, and loop
|
||||
takes control: safe_close, maybe_connect, rinse, repeat *)
|
||||
let rec loop c =
|
||||
Lwt.catch (fun () ->
|
||||
Lwt.pick [ query_sock prefix db c interval ; read_sock_write_tcp db c addr addrtype ] >>= fun () ->
|
||||
safe_close c >>= fun () ->
|
||||
maybe_connect stat_socket >>= fun c ->
|
||||
loop c)
|
||||
(fun _ ->
|
||||
safe_close c >>= fun () ->
|
||||
maybe_connect stat_socket >>= fun c ->
|
||||
loop c)
|
||||
let closing = ref false in
|
||||
Lwt.join [
|
||||
query_sock closing prefix db c interval ;
|
||||
read_sock_write_tcp closing db c addr addrtype
|
||||
] >>= fun () ->
|
||||
safe_close c >>= fun () ->
|
||||
maybe_connect stat_socket >>= fun c ->
|
||||
loop c
|
||||
in
|
||||
loop c
|
||||
|
||||
|
@ -322,15 +355,16 @@ let host_port : (string * int) Arg.converter =
|
|||
|
||||
let socket =
|
||||
let doc = "Stat socket to connect onto" in
|
||||
Arg.(required & pos 0 (some string) None & info [] ~doc)
|
||||
let sock = Fpath.(to_string (Vmm_core.tmpdir / "stat" + "sock")) in
|
||||
Arg.(value & opt string sock & info [ "s" ; "socket" ] ~doc)
|
||||
|
||||
let influx =
|
||||
Arg.(required & pos 1 (some host_port) None & info [] ~docv:"influx"
|
||||
Arg.(required & pos 0 (some host_port) None & info [] ~docv:"influx"
|
||||
~doc:"the influx hostname:port to connect to")
|
||||
|
||||
let db =
|
||||
let doc = "VMID database" in
|
||||
Arg.(required & pos 2 (some file) None & info [] ~doc)
|
||||
Arg.(required & pos 1 (some file) None & info [] ~doc)
|
||||
|
||||
let prefix =
|
||||
let doc = "prefix" in
|
||||
|
|
Loading…
Reference in a new issue