From 992e1b0a2bf52074fdfd228b4f336ed04781c69d Mon Sep 17 00:00:00 2001 From: Hannes Mehnert Date: Thu, 25 Oct 2018 16:02:04 +0200 Subject: [PATCH] - Vmm_ring is now polymorph (alows to store log_entry :D) - Vmm_console/log/stats do not read multiple times console_add loops console_subscribe terminates (a stream of messages is sent) log data stream loops log_subscribe terminates (a stream of data is sent) stat_add loops stat_remove loops stat_subscribe terminates (a stream of stats is sent) terminates means: reads once more, and closes socket after second read returned loop processes further incoming data --- app/vmm_console.ml | 87 ++++++++++++++++++++++---------------- app/vmm_log.ml | 95 ++++++++++++++++++++++-------------------- src/vmm_ring.ml | 12 +++--- src/vmm_ring.mli | 10 ++--- stats/vmm_stats.ml | 57 +++++++++++-------------- stats/vmm_stats_lwt.ml | 44 +++++++++++-------- 6 files changed, 163 insertions(+), 142 deletions(-) diff --git a/app/vmm_console.ml b/app/vmm_console.ml index 7bb03ba..16ac1ea 100644 --- a/app/vmm_console.ml +++ b/app/vmm_console.ml @@ -71,38 +71,42 @@ let add_fifo id = let name = Vmm_core.string_of_id id in open_fifo name >|= function | Some f -> - let ring = Vmm_ring.create () in - Logs.debug (fun m -> m "inserting %s" name) ; + let ring = Vmm_ring.create "" () in + Logs.debug (fun m -> m "inserting fifo %s" name) ; let map = String.Map.add name ring !t in t := map ; Lwt.async (read_console name ring f) ; - Ok "reading" + Ok () | None -> Error (`Msg "opening") -let subscribe s id since = +let subscribe s id = let name = Vmm_core.string_of_id id in - Logs.debug (fun m -> m "attempting to attach %a" Vmm_core.pp_id id) ; + Logs.debug (fun m -> m "attempting to subscribe %a" Vmm_core.pp_id id) ; match String.Map.find name !t with | None -> active := String.Map.add name s !active ; - Lwt.return (Ok "waiing for VM") + Lwt.return (None, "waiting for VM") | Some r -> - let entries = - match since with - | None -> Vmm_ring.read r - | Some ts -> Vmm_ring.read_history r ts - in - Logs.debug (fun m -> m "found %d history" (List.length entries)) ; - Lwt_list.iter_s (fun (i, v) -> - let header = Vmm_commands.{ version = my_version ; sequence = 0L ; id } in - Vmm_lwt.write_wire s (header, `Data (`Console_data (i, v))) >|= fun _ -> ()) - entries >>= fun () -> (match String.Map.find name !active with | None -> Lwt.return_unit | Some s -> Vmm_lwt.safe_close s) >|= fun () -> active := String.Map.add name s !active ; - Ok "attached" + (Some r, "subscribed") + +let send_history s r id since = + let entries = + match since with + | None -> Vmm_ring.read r + | Some ts -> Vmm_ring.read_history r ts + in + Logs.debug (fun m -> m "%a found %d history" Vmm_core.pp_id id (List.length entries)) ; + Lwt_list.iter_s (fun (i, v) -> + let header = Vmm_commands.{ version = my_version ; sequence = 0L ; id } in + Vmm_lwt.write_wire s (header, `Data (`Console_data (i, v))) >>= function + | Ok () -> Lwt.return_unit + | Error _ -> Vmm_lwt.safe_close s) + entries let handle s addr () = Logs.info (fun m -> m "handling connection %a" Vmm_lwt.pp_sockaddr addr) ; @@ -112,26 +116,39 @@ let handle s addr () = Logs.err (fun m -> m "exception while reading") ; Lwt.return_unit | Ok (header, `Command (`Console_cmd cmd)) -> - begin - (if not (Vmm_commands.version_eq header.Vmm_commands.version my_version) then - Lwt.return (Error (`Msg "ignoring data with bad version")) - else - match cmd with - | `Console_add -> add_fifo header.Vmm_commands.id - | `Console_subscribe ts -> subscribe s header.Vmm_commands.id ts) - >>= (function - | Ok msg -> Vmm_lwt.write_wire s (header, `Success (`String msg)) - | Error (`Msg msg) -> - Logs.err (fun m -> m "error while processing command: %s" msg) ; - Vmm_lwt.write_wire s (header, `Failure msg)) >>= function - | Ok () -> loop () - | Error _ -> - Logs.err (fun m -> m "exception while writing to socket") ; - Lwt.return_unit + if not (Vmm_commands.version_eq header.Vmm_commands.version my_version) then begin + Logs.err (fun m -> m "ignoring data with bad version") ; + Lwt.return_unit + end else begin + let name = header.Vmm_commands.id in + match cmd with + | `Console_add -> + begin + add_fifo name >>= fun res -> + let reply = match res with + | Ok () -> `Success `Empty + | Error (`Msg msg) -> `Failure msg + in + Vmm_lwt.write_wire s (header, reply) >>= function + | Ok () -> loop () + | Error _ -> + Logs.err (fun m -> m "error while writing") ; + Lwt.return_unit + end + | `Console_subscribe ts -> + subscribe s name >>= fun (ring, res) -> + Vmm_lwt.write_wire s (header, `Success (`String res)) >>= function + | Error _ -> Vmm_lwt.safe_close s + | Ok () -> + (match ring with + | None -> Lwt.return_unit + | Some r -> send_history s r name ts) >>= fun () -> + (* now we wait for the next read and terminate*) + Vmm_lwt.read_wire s >|= fun _ -> () end | Ok wire -> - Logs.warn (fun m -> m "ignoring %a" Vmm_commands.pp_wire wire) ; - loop () + Logs.err (fun m -> m "unexpected wire %a" Vmm_commands.pp_wire wire) ; + Lwt.return () in loop () >>= fun () -> Vmm_lwt.safe_close s >|= fun () -> diff --git a/app/vmm_log.ml b/app/vmm_log.ml index 8667225..6bac67f 100644 --- a/app/vmm_log.ml +++ b/app/vmm_log.ml @@ -12,12 +12,9 @@ open Lwt.Infix let my_version = `AV2 -let entry_to_ring (ts, event) = - (ts, Cstruct.to_string (Vmm_asn.log_entry_to_cstruct (ts, event))) - -let broadcast prefix data t = +let broadcast prefix wire t = Lwt_list.fold_left_s (fun t (id, s) -> - Vmm_lwt.write_wire s data >|= function + Vmm_lwt.write_wire s wire >|= function | Ok () -> t | Error `Exception -> Vmm_trie.remove id t) t (Vmm_trie.collect prefix t) @@ -83,14 +80,11 @@ let send_history s ring id ts = | Some since -> Vmm_ring.read_history ring since in let res = - List.fold_left (fun acc (_, x) -> - match Vmm_asn.log_entry_of_cstruct (Cstruct.of_string x) with - | Ok (ts, event) -> - let sub = Vmm_core.Log.name event in - if Vmm_core.is_sub_id ~super:id ~sub - then (ts, event) :: acc - else acc - | _ -> acc) + List.fold_left (fun acc (ts, event) -> + let sub = Vmm_core.Log.name event in + if Vmm_core.is_sub_id ~super:id ~sub + then (ts, event) :: acc + else acc) [] elements in (* just need a wrapper in tag = Log.Data, id = reqid *) @@ -102,31 +96,42 @@ let send_history s ring id ts = | Error e -> Lwt.return (Error e)) (Ok ()) (List.rev res) -let handle mvar ring s addr () = - Logs.info (fun m -> m "handling connection from %a" Vmm_lwt.pp_sockaddr addr) ; +let handle_data mvar ring hdr entry = + if not (Vmm_commands.version_eq hdr.Vmm_commands.version my_version) then begin + Logs.warn (fun m -> m "unsupported version") ; + Lwt.return_unit + end else begin + Vmm_ring.write ring entry ; + Lwt_mvar.put mvar entry >>= fun () -> + let data' = (hdr, `Data (`Log_data entry)) in + broadcast hdr.Vmm_commands.id data' !tree >|= fun tree' -> + tree := tree' + end + +let read_data mvar ring s = let rec loop () = Vmm_lwt.read_wire s >>= function - | Error (`Msg e) -> - Logs.err (fun m -> m "error while reading %s" e) ; - loop () | Error _ -> - Logs.err (fun m -> m "exception while reading") ; + Logs.err (fun m -> m "error while reading") ; Lwt.return_unit | Ok (hdr, `Data (`Log_data entry)) -> - if not (Vmm_commands.version_eq hdr.Vmm_commands.version my_version) then begin - Logs.warn (fun m -> m "unsupported version") ; - Lwt.return_unit - end else begin - Vmm_ring.write ring (entry_to_ring entry) ; - Lwt_mvar.put mvar entry >>= fun () -> - let data' = - let header = Vmm_commands.{ version = my_version ; sequence = 0L ; id = hdr.Vmm_commands.id } in - (header, `Data (`Log_data entry)) - in - broadcast hdr.Vmm_commands.id data' !tree >>= fun tree' -> - tree := tree' ; - loop () - end + handle_data mvar ring hdr entry >>= fun () -> + loop () + | Ok wire -> + Logs.warn (fun m -> m "unexpected wire %a" Vmm_commands.pp_wire wire) ; + Lwt.return_unit + in + loop () + +let handle mvar ring s addr () = + Logs.info (fun m -> m "handling connection from %a" Vmm_lwt.pp_sockaddr addr) ; + Vmm_lwt.read_wire s >>= begin function + | Error _ -> + Logs.err (fun m -> m "error while reading") ; + Lwt.return_unit + | Ok (hdr, `Data (`Log_data entry)) -> + handle_data mvar ring hdr entry >>= fun () -> + read_data mvar ring s | Ok (hdr, `Command (`Log_cmd lc)) -> if not (Vmm_commands.version_eq hdr.Vmm_commands.version my_version) then begin Logs.warn (fun m -> m "unsupported version") ; @@ -141,23 +146,21 @@ let handle mvar ring s addr () = | Some s' -> Vmm_lwt.safe_close s') >>= fun () -> let out = `Success `Empty in Vmm_lwt.write_wire s (hdr, out) >>= function - | Error _ -> - Logs.err (fun m -> m "error while sending reply for subscribe") ; + | Error _ -> Logs.err (fun m -> m "error while sending reply for subscribe") ; Lwt.return_unit | Ok () -> send_history s ring hdr.Vmm_commands.id ts >>= function - | Error _ -> - Logs.err (fun m -> m "error while sending history") ; - Lwt.return_unit - | Ok () -> loop () (* TODO no need to loop ;) *) + | Error _ -> Logs.err (fun m -> m "error while sending history") ; Lwt.return_unit + | Ok () -> + (* command processing is finished, but we leave the socket open + until read returns (either with a message we ignore or a failure from the closed connection) *) + Vmm_lwt.read_wire s >|= fun _ -> () end | Ok wire -> Logs.warn (fun m -> m "ignoring %a" Vmm_commands.pp_wire wire) ; - loop () - in - loop () >>= fun () -> + Lwt.return_unit + end >>= fun () -> Vmm_lwt.safe_close s - (* should remove all the s from the tree above *) let jump _ file sock = Sys.(set_signal sigpipe Signal_ignore) ; @@ -168,13 +171,13 @@ let jump _ file sock = let s = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in Lwt_unix.(bind s (ADDR_UNIX sock)) >>= fun () -> Lwt_unix.listen s 1 ; - let ring = Vmm_ring.create () in + let ring = Vmm_ring.create `Startup () in read_from_file file >>= fun entries -> - List.iter (Vmm_ring.write ring) (List.map entry_to_ring entries) ; + List.iter (Vmm_ring.write ring) entries ; let mvar, writer = write_to_file file in let start = Ptime_clock.now (), `Startup in Lwt_mvar.put mvar start >>= fun () -> - Vmm_ring.write ring (entry_to_ring start) ; + Vmm_ring.write ring start ; let rec loop () = Lwt_unix.accept s >>= fun (cs, addr) -> Lwt.async (handle mvar ring cs addr) ; diff --git a/src/vmm_ring.ml b/src/vmm_ring.ml index f49d6e7..f780e29 100644 --- a/src/vmm_ring.ml +++ b/src/vmm_ring.ml @@ -2,19 +2,19 @@ (* a ring buffer with N strings, dropping old ones *) -type t = { - data : (Ptime.t * string) array ; +type 'a t = { + data : (Ptime.t * 'a) array ; mutable write : int ; size : int ; } -let create ?(size = 1024) () = - { data = Array.make 1024 (Ptime.min, "") ; write = 0 ; size } +let create ?(size = 1024) neutral () = + { data = Array.make 1024 (Ptime.min, neutral) ; write = 0 ; size } let inc t = (succ t.write) mod t.size -let write t v = - Array.set t.data t.write v ; +let write t entry = + Array.set t.data t.write entry ; t.write <- inc t let dec t n = (pred n + t.size) mod t.size diff --git a/src/vmm_ring.mli b/src/vmm_ring.mli index 14dc7ec..4bb8673 100644 --- a/src/vmm_ring.mli +++ b/src/vmm_ring.mli @@ -1,9 +1,9 @@ (* (c) 2018 Hannes Mehnert, all rights reserved *) -type t +type 'a t -val create : ?size:int -> unit -> t +val create : ?size:int -> 'a -> unit -> 'a t -val write : t -> Ptime.t * string -> unit -val read : t -> (Ptime.t * string) list -val read_history : t -> Ptime.t -> (Ptime.t * string) list +val write : 'a t -> Ptime.t * 'a -> unit +val read : 'a t -> (Ptime.t * 'a) list +val read_history : 'a t -> Ptime.t -> (Ptime.t * 'a) list diff --git a/stats/vmm_stats.ml b/stats/vmm_stats.ml index bc21c40..e3fccf5 100644 --- a/stats/vmm_stats.ml +++ b/stats/vmm_stats.ml @@ -170,36 +170,27 @@ let remove_vmids t vmids = List.fold_left remove_vmid t vmids let handle t socket (header, wire) = - let r = - if not (Vmm_commands.version_eq my_version header.Vmm_commands.version) then - Error (`Msg "cannot handle version") - else - match wire with - | `Command (`Stats_cmd cmd) -> - begin - let id = header.Vmm_commands.id in - match cmd with - | `Stats_add (pid, taps) -> - add_pid t id pid taps >>= fun t -> - Ok (t, `Add id, None, Some "added") - | `Stats_remove -> - let t = remove_vmid t id in - Ok (t, `Remove id, None, Some "removed") - | `Stats_subscribe -> - let name_sockets, close = Vmm_trie.insert id socket t.name_sockets in - Ok ({ t with name_sockets }, `None, close, Some "subscribed") - end - | _ -> - Logs.warn (fun m -> m "ignoring %a" Vmm_commands.pp_wire (header, wire)) ; - Ok (t, `None, None, None) - in - match r with - | Ok (t, action, close, out) -> - let out = match out with - | None -> None - | Some str -> Some (header, `Success (`String str)) - in - t, action, close, out - | Error (`Msg msg) -> - Logs.err (fun m -> m "error while processing %s" msg) ; - t, `None, None, Some (header, `Failure msg) + if not (Vmm_commands.version_eq my_version header.Vmm_commands.version) then begin + Logs.err (fun m -> m "invalid version %a (mine is %a)" + Vmm_commands.pp_version header.Vmm_commands.version + Vmm_commands.pp_version my_version) ; + Error (`Msg "cannot handle version") + end else + match wire with + | `Command (`Stats_cmd cmd) -> + begin + let id = header.Vmm_commands.id in + match cmd with + | `Stats_add (pid, taps) -> + add_pid t id pid taps >>= fun t -> + Ok (t, `Add id, "added") + | `Stats_remove -> + let t = remove_vmid t id in + Ok (t, `Remove id, "removed") + | `Stats_subscribe -> + let name_sockets, close = Vmm_trie.insert id socket t.name_sockets in + Ok ({ t with name_sockets }, `Close close, "subscribed") + end + | _ -> + Logs.err (fun m -> m "unexpected wire %a" Vmm_commands.pp_wire (header, wire)) ; + Error (`Msg "unexpected command") diff --git a/stats/vmm_stats_lwt.ml b/stats/vmm_stats_lwt.ml index 532bb11..0763592 100644 --- a/stats/vmm_stats_lwt.ml +++ b/stats/vmm_stats_lwt.ml @@ -23,25 +23,35 @@ let pp_sockaddr ppf = function let handle s addr () = Logs.info (fun m -> m "handling stats connection %a" pp_sockaddr addr) ; - let rec loop acc = + let rec loop pids = Vmm_lwt.read_wire s >>= function - | Error (`Msg msg) -> Logs.err (fun m -> m "error while reading %s" msg) ; loop acc - | Error _ -> Logs.err (fun m -> m "exception while reading") ; Lwt.return acc + | Error _ -> + Logs.err (fun m -> m "exception while reading") ; + Lwt.return pids | Ok wire -> - let t', action, close, out = Vmm_stats.handle !t s wire in - let acc = match action with - | `Add pid -> pid :: acc - | `Remove pid -> List.filter (fun m -> m <> pid) acc - | `None -> acc - in - t := t' ; - (match close with None -> Lwt.return_unit | Some s' -> Vmm_lwt.safe_close s') >>= fun () -> - match out with - | None -> loop acc - | Some out -> - Vmm_lwt.write_wire s out >>= function - | Ok () -> loop acc - | Error _ -> Logs.err (fun m -> m "exception while writing") ; Lwt.return acc + match Vmm_stats.handle !t s wire with + | Error (`Msg msg) -> + Vmm_lwt.write_wire s (fst wire, `Failure msg) >>= fun _ -> + Lwt.return pids + | Ok (t', action, out) -> + t := t' ; + let pids = match action with + | `Add pid -> pid :: pids + | `Remove pid -> List.filter (fun m -> m <> pid) pids + | `Close _ -> pids + in + t := t' ; + Vmm_lwt.write_wire s (fst wire, `Success (`String out)) >>= function + | Ok () -> + (match action with + | `Close (Some s') -> + Vmm_lwt.safe_close s' >>= fun () -> + (* read the next *) + Vmm_lwt.read_wire s >|= fun _ -> pids + | _ -> loop pids) + | Error _ -> + Logs.err (fun m -> m "error while writing") ; + Lwt.return pids in loop [] >>= fun vmids -> Vmm_lwt.safe_close s >|= fun () ->