diff --git a/app/vmm_influxdb_stats.ml b/app/vmm_influxdb_stats.ml index 242c1b2..5a4be8b 100644 --- a/app/vmm_influxdb_stats.ml +++ b/app/vmm_influxdb_stats.ml @@ -147,15 +147,32 @@ let command = ref 1 let (req : string IM.t ref) = ref IM.empty +let tcp_sent = ref 0 +let socket_read = ref 0 +let socket_sent = ref 0 + +let str_of_e = function + | `Eof -> "end of file" + | `Exception -> "exception" + | `Toomuch -> "too much" + | `Msg m -> m + let rec read_sock db c fd = let open Vmm_wire in + Logs.debug (fun m -> m "reading now (socket read %d, socket sent %d, tcp sent %d)" !socket_read !socket_sent !tcp_sent); Vmm_lwt.read_exactly c >>= function - | Error _ -> Lwt.return_unit + | Error e -> + Logs.err (fun m -> m "error %s while reading" (str_of_e e)) ; + Lwt.return_unit | Ok (hdr, data) -> + Logs.debug (fun m -> m "%d read %d data" hdr.id (String.length data)) ; + socket_read := 8 (* hdr *) + String.length data + !socket_read ; if not (version_eq hdr.version my_version) then begin - Logs.err (fun m -> m "unknown wire protocol version") ; Lwt.return_unit + Logs.err (fun m -> m "unknown wire protocol version") ; + Lwt.return_unit end else let name = IM.find hdr.id !req in + Logs.debug (fun m -> m "talking about %s (socket read %d, socket sent %d, tcp sent %d)" name !socket_read !socket_sent !tcp_sent) ; req := IM.remove hdr.id !req ; match Stats.int_to_op hdr.tag with | Some Stats.Stat_reply -> @@ -168,10 +185,15 @@ let rec read_sock db c fd = let vmm = P.encode_vmm name vmm in let taps = List.map (P.encode_if name) ifs in let out = String.concat ~sep:"\n" (ru :: vmm :: taps @ [ "" ]) in - Logs.info (fun m -> m "result: %s" out) ; + Logs.info (fun m -> m "%d writing for %s via tcp %d (total read %d, tcp send %d)" hdr.id name (String.length out) !socket_read !tcp_sent) ; Vmm_lwt.write_raw fd out >>= function - | Ok () -> read_sock db c fd - | Error _ -> invalid_arg "failed to write via TCP" + | Ok () -> + tcp_sent := String.length out + !tcp_sent ; + Logs.debug (fun m -> m "%d successfully wrote, tcp_sent now %d" hdr.id !tcp_sent) ; + read_sock db c fd + | Error e -> + Logs.err (fun m -> m "%d error %s while writing (tcp_sent %d)" hdr.id (str_of_e e) !tcp_sent) ; + invalid_arg "failed to write via TCP" end | _ when hdr.tag = fail_tag -> Logs.err (fun m -> m "failed to retrieve statistics for %s" name) ; @@ -181,16 +203,24 @@ let rec read_sock db c fd = read_sock db c fd let rec query_sock prefix db c interval = + Logs.debug (fun m -> m "querying socket (socket_send %d socket_read %d tcp_send %d)" + !socket_sent !socket_read !tcp_sent) ; (* query c for everyone in db *) Lwt_list.iter_s (fun (id, name) -> let id = identifier id in let id = match prefix with None -> id | Some p -> p ^ "." ^ id in let request = Vmm_wire.Stats.stat !command my_version id in + Logs.debug (fun m -> m "req for %s (command %d) len %d (total %d)" id !command (String.length request) !socket_sent) ; req := IM.add !command name !req ; incr command ; + socket_sent := !socket_sent + String.length request ; Vmm_lwt.write_raw c request >>= function - | Ok () -> Lwt.return_unit - | Error _ -> Lwt.fail_with "exception while writing") + | Ok () -> + Logs.debug (fun m -> m "req for %s (command+1 %d) written (total %d)" id !command !socket_sent) ; + Lwt.return_unit + | Error e -> + Logs.err (fun m -> m "error while writing %s (command+1 %d): %s" id !command (str_of_e e)) ; + Lwt.fail_with "exception while writing") db >>= fun () -> Lwt_unix.sleep (float_of_int interval) >>= fun () -> query_sock prefix db c interval