- Vmm_ring is now polymorph (alows to store log_entry :D)

- Vmm_console/log/stats do not read multiple times
  console_add loops
  console_subscribe terminates (a stream of messages is sent)
  log data stream loops
  log_subscribe terminates (a stream of data is sent)
  stat_add loops
  stat_remove loops
  stat_subscribe terminates (a stream of stats is sent)
terminates means: reads once more, and closes socket after second read returned
loop processes further incoming data
This commit is contained in:
Hannes Mehnert 2018-10-25 16:02:04 +02:00
parent b55281d1e5
commit 992e1b0a2b
6 changed files with 163 additions and 142 deletions

View file

@ -71,38 +71,42 @@ let add_fifo id =
let name = Vmm_core.string_of_id id in
open_fifo name >|= function
| Some f ->
let ring = Vmm_ring.create () in
Logs.debug (fun m -> m "inserting %s" name) ;
let ring = Vmm_ring.create "" () in
Logs.debug (fun m -> m "inserting fifo %s" name) ;
let map = String.Map.add name ring !t in
t := map ;
Lwt.async (read_console name ring f) ;
Ok "reading"
Ok ()
| None ->
Error (`Msg "opening")
let subscribe s id since =
let subscribe s id =
let name = Vmm_core.string_of_id id in
Logs.debug (fun m -> m "attempting to attach %a" Vmm_core.pp_id id) ;
Logs.debug (fun m -> m "attempting to subscribe %a" Vmm_core.pp_id id) ;
match String.Map.find name !t with
| None ->
active := String.Map.add name s !active ;
Lwt.return (Ok "waiing for VM")
Lwt.return (None, "waiting for VM")
| Some r ->
let entries =
match since with
| None -> Vmm_ring.read r
| Some ts -> Vmm_ring.read_history r ts
in
Logs.debug (fun m -> m "found %d history" (List.length entries)) ;
Lwt_list.iter_s (fun (i, v) ->
let header = Vmm_commands.{ version = my_version ; sequence = 0L ; id } in
Vmm_lwt.write_wire s (header, `Data (`Console_data (i, v))) >|= fun _ -> ())
entries >>= fun () ->
(match String.Map.find name !active with
| None -> Lwt.return_unit
| Some s -> Vmm_lwt.safe_close s) >|= fun () ->
active := String.Map.add name s !active ;
Ok "attached"
(Some r, "subscribed")
let send_history s r id since =
let entries =
match since with
| None -> Vmm_ring.read r
| Some ts -> Vmm_ring.read_history r ts
in
Logs.debug (fun m -> m "%a found %d history" Vmm_core.pp_id id (List.length entries)) ;
Lwt_list.iter_s (fun (i, v) ->
let header = Vmm_commands.{ version = my_version ; sequence = 0L ; id } in
Vmm_lwt.write_wire s (header, `Data (`Console_data (i, v))) >>= function
| Ok () -> Lwt.return_unit
| Error _ -> Vmm_lwt.safe_close s)
entries
let handle s addr () =
Logs.info (fun m -> m "handling connection %a" Vmm_lwt.pp_sockaddr addr) ;
@ -112,26 +116,39 @@ let handle s addr () =
Logs.err (fun m -> m "exception while reading") ;
Lwt.return_unit
| Ok (header, `Command (`Console_cmd cmd)) ->
begin
(if not (Vmm_commands.version_eq header.Vmm_commands.version my_version) then
Lwt.return (Error (`Msg "ignoring data with bad version"))
else
match cmd with
| `Console_add -> add_fifo header.Vmm_commands.id
| `Console_subscribe ts -> subscribe s header.Vmm_commands.id ts)
>>= (function
| Ok msg -> Vmm_lwt.write_wire s (header, `Success (`String msg))
| Error (`Msg msg) ->
Logs.err (fun m -> m "error while processing command: %s" msg) ;
Vmm_lwt.write_wire s (header, `Failure msg)) >>= function
| Ok () -> loop ()
| Error _ ->
Logs.err (fun m -> m "exception while writing to socket") ;
Lwt.return_unit
if not (Vmm_commands.version_eq header.Vmm_commands.version my_version) then begin
Logs.err (fun m -> m "ignoring data with bad version") ;
Lwt.return_unit
end else begin
let name = header.Vmm_commands.id in
match cmd with
| `Console_add ->
begin
add_fifo name >>= fun res ->
let reply = match res with
| Ok () -> `Success `Empty
| Error (`Msg msg) -> `Failure msg
in
Vmm_lwt.write_wire s (header, reply) >>= function
| Ok () -> loop ()
| Error _ ->
Logs.err (fun m -> m "error while writing") ;
Lwt.return_unit
end
| `Console_subscribe ts ->
subscribe s name >>= fun (ring, res) ->
Vmm_lwt.write_wire s (header, `Success (`String res)) >>= function
| Error _ -> Vmm_lwt.safe_close s
| Ok () ->
(match ring with
| None -> Lwt.return_unit
| Some r -> send_history s r name ts) >>= fun () ->
(* now we wait for the next read and terminate*)
Vmm_lwt.read_wire s >|= fun _ -> ()
end
| Ok wire ->
Logs.warn (fun m -> m "ignoring %a" Vmm_commands.pp_wire wire) ;
loop ()
Logs.err (fun m -> m "unexpected wire %a" Vmm_commands.pp_wire wire) ;
Lwt.return ()
in
loop () >>= fun () ->
Vmm_lwt.safe_close s >|= fun () ->

View file

@ -12,12 +12,9 @@ open Lwt.Infix
let my_version = `AV2
let entry_to_ring (ts, event) =
(ts, Cstruct.to_string (Vmm_asn.log_entry_to_cstruct (ts, event)))
let broadcast prefix data t =
let broadcast prefix wire t =
Lwt_list.fold_left_s (fun t (id, s) ->
Vmm_lwt.write_wire s data >|= function
Vmm_lwt.write_wire s wire >|= function
| Ok () -> t
| Error `Exception -> Vmm_trie.remove id t)
t (Vmm_trie.collect prefix t)
@ -83,14 +80,11 @@ let send_history s ring id ts =
| Some since -> Vmm_ring.read_history ring since
in
let res =
List.fold_left (fun acc (_, x) ->
match Vmm_asn.log_entry_of_cstruct (Cstruct.of_string x) with
| Ok (ts, event) ->
let sub = Vmm_core.Log.name event in
if Vmm_core.is_sub_id ~super:id ~sub
then (ts, event) :: acc
else acc
| _ -> acc)
List.fold_left (fun acc (ts, event) ->
let sub = Vmm_core.Log.name event in
if Vmm_core.is_sub_id ~super:id ~sub
then (ts, event) :: acc
else acc)
[] elements
in
(* just need a wrapper in tag = Log.Data, id = reqid *)
@ -102,31 +96,42 @@ let send_history s ring id ts =
| Error e -> Lwt.return (Error e))
(Ok ()) (List.rev res)
let handle mvar ring s addr () =
Logs.info (fun m -> m "handling connection from %a" Vmm_lwt.pp_sockaddr addr) ;
let handle_data mvar ring hdr entry =
if not (Vmm_commands.version_eq hdr.Vmm_commands.version my_version) then begin
Logs.warn (fun m -> m "unsupported version") ;
Lwt.return_unit
end else begin
Vmm_ring.write ring entry ;
Lwt_mvar.put mvar entry >>= fun () ->
let data' = (hdr, `Data (`Log_data entry)) in
broadcast hdr.Vmm_commands.id data' !tree >|= fun tree' ->
tree := tree'
end
let read_data mvar ring s =
let rec loop () =
Vmm_lwt.read_wire s >>= function
| Error (`Msg e) ->
Logs.err (fun m -> m "error while reading %s" e) ;
loop ()
| Error _ ->
Logs.err (fun m -> m "exception while reading") ;
Logs.err (fun m -> m "error while reading") ;
Lwt.return_unit
| Ok (hdr, `Data (`Log_data entry)) ->
if not (Vmm_commands.version_eq hdr.Vmm_commands.version my_version) then begin
Logs.warn (fun m -> m "unsupported version") ;
Lwt.return_unit
end else begin
Vmm_ring.write ring (entry_to_ring entry) ;
Lwt_mvar.put mvar entry >>= fun () ->
let data' =
let header = Vmm_commands.{ version = my_version ; sequence = 0L ; id = hdr.Vmm_commands.id } in
(header, `Data (`Log_data entry))
in
broadcast hdr.Vmm_commands.id data' !tree >>= fun tree' ->
tree := tree' ;
loop ()
end
handle_data mvar ring hdr entry >>= fun () ->
loop ()
| Ok wire ->
Logs.warn (fun m -> m "unexpected wire %a" Vmm_commands.pp_wire wire) ;
Lwt.return_unit
in
loop ()
let handle mvar ring s addr () =
Logs.info (fun m -> m "handling connection from %a" Vmm_lwt.pp_sockaddr addr) ;
Vmm_lwt.read_wire s >>= begin function
| Error _ ->
Logs.err (fun m -> m "error while reading") ;
Lwt.return_unit
| Ok (hdr, `Data (`Log_data entry)) ->
handle_data mvar ring hdr entry >>= fun () ->
read_data mvar ring s
| Ok (hdr, `Command (`Log_cmd lc)) ->
if not (Vmm_commands.version_eq hdr.Vmm_commands.version my_version) then begin
Logs.warn (fun m -> m "unsupported version") ;
@ -141,23 +146,21 @@ let handle mvar ring s addr () =
| Some s' -> Vmm_lwt.safe_close s') >>= fun () ->
let out = `Success `Empty in
Vmm_lwt.write_wire s (hdr, out) >>= function
| Error _ ->
Logs.err (fun m -> m "error while sending reply for subscribe") ;
| Error _ -> Logs.err (fun m -> m "error while sending reply for subscribe") ;
Lwt.return_unit
| Ok () ->
send_history s ring hdr.Vmm_commands.id ts >>= function
| Error _ ->
Logs.err (fun m -> m "error while sending history") ;
Lwt.return_unit
| Ok () -> loop () (* TODO no need to loop ;) *)
| Error _ -> Logs.err (fun m -> m "error while sending history") ; Lwt.return_unit
| Ok () ->
(* command processing is finished, but we leave the socket open
until read returns (either with a message we ignore or a failure from the closed connection) *)
Vmm_lwt.read_wire s >|= fun _ -> ()
end
| Ok wire ->
Logs.warn (fun m -> m "ignoring %a" Vmm_commands.pp_wire wire) ;
loop ()
in
loop () >>= fun () ->
Lwt.return_unit
end >>= fun () ->
Vmm_lwt.safe_close s
(* should remove all the s from the tree above *)
let jump _ file sock =
Sys.(set_signal sigpipe Signal_ignore) ;
@ -168,13 +171,13 @@ let jump _ file sock =
let s = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in
Lwt_unix.(bind s (ADDR_UNIX sock)) >>= fun () ->
Lwt_unix.listen s 1 ;
let ring = Vmm_ring.create () in
let ring = Vmm_ring.create `Startup () in
read_from_file file >>= fun entries ->
List.iter (Vmm_ring.write ring) (List.map entry_to_ring entries) ;
List.iter (Vmm_ring.write ring) entries ;
let mvar, writer = write_to_file file in
let start = Ptime_clock.now (), `Startup in
Lwt_mvar.put mvar start >>= fun () ->
Vmm_ring.write ring (entry_to_ring start) ;
Vmm_ring.write ring start ;
let rec loop () =
Lwt_unix.accept s >>= fun (cs, addr) ->
Lwt.async (handle mvar ring cs addr) ;

View file

@ -2,19 +2,19 @@
(* a ring buffer with N strings, dropping old ones *)
type t = {
data : (Ptime.t * string) array ;
type 'a t = {
data : (Ptime.t * 'a) array ;
mutable write : int ;
size : int ;
}
let create ?(size = 1024) () =
{ data = Array.make 1024 (Ptime.min, "") ; write = 0 ; size }
let create ?(size = 1024) neutral () =
{ data = Array.make 1024 (Ptime.min, neutral) ; write = 0 ; size }
let inc t = (succ t.write) mod t.size
let write t v =
Array.set t.data t.write v ;
let write t entry =
Array.set t.data t.write entry ;
t.write <- inc t
let dec t n = (pred n + t.size) mod t.size

View file

@ -1,9 +1,9 @@
(* (c) 2018 Hannes Mehnert, all rights reserved *)
type t
type 'a t
val create : ?size:int -> unit -> t
val create : ?size:int -> 'a -> unit -> 'a t
val write : t -> Ptime.t * string -> unit
val read : t -> (Ptime.t * string) list
val read_history : t -> Ptime.t -> (Ptime.t * string) list
val write : 'a t -> Ptime.t * 'a -> unit
val read : 'a t -> (Ptime.t * 'a) list
val read_history : 'a t -> Ptime.t -> (Ptime.t * 'a) list

View file

@ -170,36 +170,27 @@ let remove_vmids t vmids =
List.fold_left remove_vmid t vmids
let handle t socket (header, wire) =
let r =
if not (Vmm_commands.version_eq my_version header.Vmm_commands.version) then
Error (`Msg "cannot handle version")
else
match wire with
| `Command (`Stats_cmd cmd) ->
begin
let id = header.Vmm_commands.id in
match cmd with
| `Stats_add (pid, taps) ->
add_pid t id pid taps >>= fun t ->
Ok (t, `Add id, None, Some "added")
| `Stats_remove ->
let t = remove_vmid t id in
Ok (t, `Remove id, None, Some "removed")
| `Stats_subscribe ->
let name_sockets, close = Vmm_trie.insert id socket t.name_sockets in
Ok ({ t with name_sockets }, `None, close, Some "subscribed")
end
| _ ->
Logs.warn (fun m -> m "ignoring %a" Vmm_commands.pp_wire (header, wire)) ;
Ok (t, `None, None, None)
in
match r with
| Ok (t, action, close, out) ->
let out = match out with
| None -> None
| Some str -> Some (header, `Success (`String str))
in
t, action, close, out
| Error (`Msg msg) ->
Logs.err (fun m -> m "error while processing %s" msg) ;
t, `None, None, Some (header, `Failure msg)
if not (Vmm_commands.version_eq my_version header.Vmm_commands.version) then begin
Logs.err (fun m -> m "invalid version %a (mine is %a)"
Vmm_commands.pp_version header.Vmm_commands.version
Vmm_commands.pp_version my_version) ;
Error (`Msg "cannot handle version")
end else
match wire with
| `Command (`Stats_cmd cmd) ->
begin
let id = header.Vmm_commands.id in
match cmd with
| `Stats_add (pid, taps) ->
add_pid t id pid taps >>= fun t ->
Ok (t, `Add id, "added")
| `Stats_remove ->
let t = remove_vmid t id in
Ok (t, `Remove id, "removed")
| `Stats_subscribe ->
let name_sockets, close = Vmm_trie.insert id socket t.name_sockets in
Ok ({ t with name_sockets }, `Close close, "subscribed")
end
| _ ->
Logs.err (fun m -> m "unexpected wire %a" Vmm_commands.pp_wire (header, wire)) ;
Error (`Msg "unexpected command")

View file

@ -23,25 +23,35 @@ let pp_sockaddr ppf = function
let handle s addr () =
Logs.info (fun m -> m "handling stats connection %a" pp_sockaddr addr) ;
let rec loop acc =
let rec loop pids =
Vmm_lwt.read_wire s >>= function
| Error (`Msg msg) -> Logs.err (fun m -> m "error while reading %s" msg) ; loop acc
| Error _ -> Logs.err (fun m -> m "exception while reading") ; Lwt.return acc
| Error _ ->
Logs.err (fun m -> m "exception while reading") ;
Lwt.return pids
| Ok wire ->
let t', action, close, out = Vmm_stats.handle !t s wire in
let acc = match action with
| `Add pid -> pid :: acc
| `Remove pid -> List.filter (fun m -> m <> pid) acc
| `None -> acc
in
t := t' ;
(match close with None -> Lwt.return_unit | Some s' -> Vmm_lwt.safe_close s') >>= fun () ->
match out with
| None -> loop acc
| Some out ->
Vmm_lwt.write_wire s out >>= function
| Ok () -> loop acc
| Error _ -> Logs.err (fun m -> m "exception while writing") ; Lwt.return acc
match Vmm_stats.handle !t s wire with
| Error (`Msg msg) ->
Vmm_lwt.write_wire s (fst wire, `Failure msg) >>= fun _ ->
Lwt.return pids
| Ok (t', action, out) ->
t := t' ;
let pids = match action with
| `Add pid -> pid :: pids
| `Remove pid -> List.filter (fun m -> m <> pid) pids
| `Close _ -> pids
in
t := t' ;
Vmm_lwt.write_wire s (fst wire, `Success (`String out)) >>= function
| Ok () ->
(match action with
| `Close (Some s') ->
Vmm_lwt.safe_close s' >>= fun () ->
(* read the next *)
Vmm_lwt.read_wire s >|= fun _ -> pids
| _ -> loop pids)
| Error _ ->
Logs.err (fun m -> m "error while writing") ;
Lwt.return pids
in
loop [] >>= fun vmids ->
Vmm_lwt.safe_close s >|= fun () ->