use vmm_trie in log and stat, cleanups

This commit is contained in:
Hannes Mehnert 2018-09-28 22:44:38 +02:00
parent 91bda433e8
commit 38094a53e3
12 changed files with 274 additions and 184 deletions

View File

@ -239,18 +239,11 @@ let rec read_sock_write_tcp c ?fd addr addrtype =
Lwt.return (Some fd)) >>= fun fd ->
read_sock_write_tcp c ?fd addr addrtype
let query_sock vms c =
(* query c for everyone in db *)
Lwt_list.fold_left_s (fun r name ->
match r with
| Error e -> Lwt.return (Error e)
| Ok () ->
let id = Astring.String.cuts ~sep:"." name in
let request = Vmm_wire.Stats.stat !command my_version id in
command := Int64.succ !command ;
Logs.debug (fun m -> m "%Lu requesting %a via socket" !command pp_id id) ;
Vmm_lwt.write_wire c request)
(Ok ()) vms
let query_sock vm c =
let request = Vmm_wire.Stats.subscribe !command my_version vm in
command := Int64.succ !command ;
Logs.debug (fun m -> m "%Lu requesting %a via socket" !command pp_id vm) ;
Vmm_lwt.write_wire c request
let rec maybe_connect stat_socket =
let c = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in
@ -267,7 +260,7 @@ let rec maybe_connect stat_socket =
Lwt_unix.sleep (float_of_int 5) >>= fun () ->
maybe_connect stat_socket)
let client stat_socket influxhost influxport vms =
let client stat_socket influxhost influxport vm =
(* figure out address of influx *)
Lwt_unix.gethostbyname influxhost >>= fun host_entry ->
let host_inet_addr = Array.get host_entry.Lwt_unix.h_addr_list 0 in
@ -292,7 +285,7 @@ let client stat_socket influxhost influxport vms =
let rec loop () =
(* start a socket connection to vmm_stats *)
maybe_connect stat_socket >>= fun c ->
query_sock vms c >>= function
query_sock vm c >>= function
| Error e ->
Logs.err (fun m -> m "error %s while writing to stat socket" (str_of_e e)) ;
Lwt.return_unit
@ -302,9 +295,9 @@ let client stat_socket influxhost influxport vms =
in
loop ()
let run_client _ socket (influxhost, influxport) vms =
let run_client _ socket (influxhost, influxport) vm =
Sys.(set_signal sigpipe Signal_ignore) ;
Lwt_main.run (client socket influxhost influxport vms)
Lwt_main.run (client socket influxhost influxport vm)
let setup_log style_renderer level =
Fmt_tty.setup_std_outputs ?style_renderer ();
@ -339,9 +332,14 @@ let influx =
Arg.(required & pos 0 (some host_port) None & info [] ~docv:"influx"
~doc:"the influx hostname:port to connect to")
let vms =
let doc = "virtual machine names" in
Arg.(value & opt_all string [] & info [ "n" ; "name" ] ~doc)
let vm_c =
let parse s = `Ok (Vmm_core.id_of_string s)
in
(parse, Vmm_core.pp_id)
let opt_vmname =
let doc = "Name virtual machine." in
Arg.(value & opt vm_c [] & info [ "n" ; "name"] ~doc)
let cmd =
let doc = "VMM InfluxDB connector" in
@ -349,7 +347,7 @@ let cmd =
`S "DESCRIPTION" ;
`P "$(tname) connects to a vmm stats socket, pulls statistics and pushes them via TCP to influxdb" ]
in
Term.(pure run_client $ setup_log $ socket $ influx $ vms),
Term.(pure run_client $ setup_log $ socket $ influx $ opt_vmname),
Term.info "vmm_influxdb_stats" ~version:"%%VERSION_NUM%%" ~doc ~man
let () =

View File

@ -16,66 +16,12 @@ open Astring
let my_version = `WV2
type t = N of Lwt_unix.file_descr list * t String.Map.t
let empty = N ([], String.Map.empty)
let insert id fd t =
let rec go (N (fds, m)) = function
| [] -> N ((fd :: fds), m)
| x::xs ->
let n = match String.Map.find_opt x m with
| None -> empty
| Some n -> n
in
let entry = go n xs in
N (fds, String.Map.add x entry m)
in
go t id
let remove id fd t =
let rec go (N (fds, m)) = function
| [] ->
begin match List.filter (fun fd' -> fd <> fd') fds with
| [] -> None
| fds' -> Some (N (fds', m))
end
| x::xs ->
let n' = match String.Map.find_opt x m with
| None -> None
| Some n -> go n xs
in
let m' = match n' with
| None -> String.Map.remove x m
| Some entry -> String.Map.add x entry m
in
if String.Map.is_empty m' && fds = [] then None else Some (N (fds, m'))
in
match go t id with
| None -> empty
| Some n -> n
let collect id t =
let rec go acc prefix (N (fds, m)) =
let acc' =
let here = List.map (fun fd -> (prefix, fd)) fds in
here @ acc
in
function
| [] -> acc'
| x::xs ->
match String.Map.find_opt x m with
| None -> acc'
| Some n -> go acc' (prefix @ [ x ]) n xs
in
go [] [] t id
let broadcast prefix data t =
Lwt_list.fold_left_s (fun t (id, s) ->
Vmm_lwt.write_wire s data >|= function
| Ok () -> t
| Error `Exception -> remove id s t)
t (collect prefix t)
| Error `Exception -> Vmm_trie.remove id t)
t (Vmm_trie.collect prefix t)
let write_complete s cs =
let l = Cstruct.len cs in
@ -116,10 +62,33 @@ let write_to_file file =
- should there be acks for history/datain?
*)
let tree = ref empty
let tree = ref Vmm_trie.empty
let bcast = ref 0L
let send_history s ring id cmd_id =
let elements = Vmm_ring.read ring in
let res =
List.fold_left (fun acc (_, x) ->
let cs = Cstruct.of_string x in
match Vmm_wire.Log.decode_log_hdr cs with
| Ok (hdr, _) ->
begin match Vmm_core.drop_super ~super:id ~sub:hdr.Vmm_core.Log.context with
| Some [] -> cs :: acc
| _ -> acc
end
| _ -> acc)
[] elements
in
(* just need a wrapper in tag = Log.Data, id = reqid *)
Lwt_list.fold_left_s (fun r body ->
match r with
| Ok () ->
let data = Vmm_wire.encode ~body my_version cmd_id (Vmm_wire.Log.op_to_int Vmm_wire.Log.Broadcast) in
Vmm_lwt.write_wire s data
| Error e -> Lwt.return (Error e))
(Ok ()) res
let handle mvar ring s addr () =
Logs.info (fun m -> m "handling connection from %a" Vmm_lwt.pp_sockaddr addr) ;
let str = Fmt.strf "%a: CONNECT\n" (Ptime.pp_human ()) (Ptime_clock.now ()) in
@ -153,48 +122,28 @@ let handle mvar ring s addr () =
tree := tree' ;
loop ()
end
| Some Vmm_wire.Log.History ->
begin match Vmm_wire.decode_id_ts data with
| Error (`Msg err) ->
Logs.warn (fun m -> m "ignoring error %s while decoding history" err) ;
loop ()
| Ok ((sub, ts), _) ->
let elements = Vmm_ring.read_history ring ts in
let res =
List.fold_left (fun acc (_, x) ->
let cs = Cstruct.of_string x in
match Vmm_wire.Log.decode_log_hdr cs with
| Ok (hdr, _) when Vmm_core.is_sub_id ~super:hdr.Vmm_core.Log.context ~sub ->
cs :: acc
| _ -> acc)
[] elements
in
(* just need a wrapper in tag = Log.Data, id = reqid *)
Lwt_list.fold_left_s (fun r body ->
match r with
| Ok () ->
let data = Vmm_wire.encode ~body my_version hdr.Vmm_wire.id (Vmm_wire.Log.op_to_int Vmm_wire.Log.Log) in
Vmm_lwt.write_wire s data
| Error e -> Lwt.return (Error e))
(Ok ()) res >>= function
| Ok () -> loop ()
| Error _ ->
Logs.err (fun m -> m "error while sending data in history") ;
Lwt.return_unit
end
| Some Vmm_wire.Log.Subscribe ->
begin match Vmm_wire.decode_strings data with
| Error (`Msg err) ->
Logs.warn (fun m -> m "ignoring error %s while decoding subscribe" err) ;
loop ()
| Ok (id, _) ->
tree := insert id s !tree ;
let tree', ret = Vmm_trie.insert id s !tree in
tree := tree' ;
(match ret with
| None -> Lwt.return_unit
| Some s' -> Vmm_lwt.safe_close s') >>= fun () ->
let out = Vmm_wire.success my_version hdr.Vmm_wire.id hdr.Vmm_wire.tag in
Vmm_lwt.write_wire s out >>= function
| Ok () -> loop ()
| Error _ ->
Logs.err (fun m -> m "error while sending reply for subscribe") ;
Lwt.return_unit
| Ok () ->
send_history s ring id hdr.Vmm_wire.id >>= function
| Error _ ->
Logs.err (fun m -> m "error while sending history") ;
Lwt.return_unit
| Ok () -> loop () (* TODO no need to loop ;) *)
end
| _ ->
Logs.err (fun m -> m "unknown command") ;

View File

@ -164,16 +164,13 @@ let console _ opt_socket name =
Vmm_lwt.safe_close fd) ;
`Ok ()
let stats _ opt_socket vms =
let stats _ opt_socket vm =
Lwt_main.run (
connect (socket `Stats opt_socket) >>= fun fd ->
let count = ref 0L in
Lwt_list.iter_s (fun name ->
let cmd = Vmm_wire.Stats.stat !count my_version name in
count := Int64.succ !count ;
Vmm_lwt.write_wire fd cmd >>= function
| Error `Exception -> Lwt.fail_with "write error"
| Ok () -> Lwt.return_unit) vms >>= fun () ->
let cmd = Vmm_wire.Stats.subscribe my_command my_version vm in
(Vmm_lwt.write_wire fd cmd >>= function
| Error `Exception -> Lwt.fail_with "write error"
| Ok () -> Lwt.return_unit) >>= fun () ->
(* now we busy read and process stat output *)
let rec loop () =
Vmm_lwt.read_wire fd >>= function
@ -220,6 +217,57 @@ let stats _ opt_socket vms =
Vmm_lwt.safe_close fd) ;
`Ok ()
let event_log _ opt_socket vm =
Lwt_main.run (
connect (socket `Log opt_socket) >>= fun fd ->
let cmd = Vmm_wire.Log.subscribe my_command my_version vm in
(Vmm_lwt.write_wire fd cmd >>= function
| Error `Exception -> Lwt.fail_with "write error"
| Ok () -> Lwt.return_unit) >>= fun () ->
(* now we busy read and process stat output *)
let rec loop () =
Vmm_lwt.read_wire fd >>= function
| Error (`Msg msg) -> Logs.err (fun m -> m "error while reading %s" msg) ; loop ()
| Error _ -> Logs.err (fun m -> m "exception while reading") ; Lwt.return_unit
| Ok (hdr, data) ->
if Vmm_wire.is_fail hdr then
let msg = match Vmm_wire.decode_string data with
| Error _ -> None
| Ok (m, _) -> Some m
in
Logs.err (fun m -> m "operation failed: %a" Fmt.(option ~none:(unit "") string) msg) ;
Lwt.return_unit
else if Vmm_wire.is_reply hdr then
let msg = match Vmm_wire.decode_string data with
| Error _ -> None
| Ok (m, _) -> Some m
in
Logs.app (fun m -> m "operation succeeded: %a" Fmt.(option ~none:(unit "") string) msg) ;
loop ()
else
begin
(match Vmm_wire.Log.int_to_op hdr.Vmm_wire.tag with
| Some Vmm_wire.Log.Broadcast ->
begin match Vmm_wire.Log.decode_log_hdr data with
| Error (`Msg err) ->
Logs.warn (fun m -> m "ignoring error %s while decoding log" err) ;
| Ok (loghdr, logdata) ->
match Vmm_wire.Log.decode_event logdata with
| Error (`Msg err) ->
Logs.warn (fun m -> m "loghdr %a ignoring error %s while decoding logdata"
Vmm_core.Log.pp_hdr loghdr err)
| Ok event ->
Logs.app (fun m -> m "%a" Vmm_core.Log.pp (loghdr, event))
end
| _ ->
Logs.warn (fun m -> m "unknown operation %lx" hdr.Vmm_wire.tag)) ;
loop ()
end
in
loop () >>= fun () ->
Vmm_lwt.safe_close fd) ;
`Ok ()
let help _ _ man_format cmds = function
| None -> `Help (`Pager, None)
| Some t when List.mem t cmds -> `Help (man_format, Some t)
@ -318,19 +366,24 @@ let console_cmd =
Term.(ret (const console $ setup_log $ socket $ vm_name)),
Term.info "console" ~doc ~man
let vm_names =
let doc = "Name virtual machine." in
Arg.(value & opt_all vm_c [] & info [ "n" ; "name" ] ~doc)
let stats_cmd =
let doc = "statistics of VMs" in
let man =
[`S "DESCRIPTION";
`P "Shows statistics of VMs."]
in
Term.(ret (const stats $ setup_log $ socket $ vm_names)),
Term.(ret (const stats $ setup_log $ socket $ opt_vmname)),
Term.info "stats" ~doc ~man
let log_cmd =
let doc = "Event log" in
let man =
[`S "DESCRIPTION";
`P "Shows event log of VM."]
in
Term.(ret (const event_log $ setup_log $ socket $ opt_vmname)),
Term.info "log" ~doc ~man
let help_cmd =
let topic =
let doc = "The topic to get help on. `topics' lists the topics." in
@ -353,7 +406,7 @@ let default_cmd =
Term.(ret (const help $ setup_log $ socket $ Term.man_format $ Term.choice_names $ Term.pure None)),
Term.info "vmmc" ~version:"%%VERSION_NUM%%" ~doc ~man
let cmds = [ help_cmd ; info_cmd ; destroy_cmd ; create_cmd ; console_cmd ; stats_cmd ]
let cmds = [ help_cmd ; info_cmd ; destroy_cmd ; create_cmd ; console_cmd ; stats_cmd ; log_cmd ]
let () =
match Term.eval_choice default_cmd cmds

View File

@ -7,9 +7,6 @@ open Vmm_core
open Rresult
open R.Infix
let handle_command t s prefix perms hdr buf =
let res =
if not (Vmm_wire.version_eq hdr.Vmm_wire.version t.client_version) then

View File

@ -323,9 +323,8 @@ module Log = struct
name : string ;
}
let pp_hdr db ppf (hdr : hdr) =
let name = translate_serial db hdr.name in
Fmt.pf ppf "%a: %s" (Ptime.pp_human ()) hdr.ts name
let pp_hdr ppf (hdr : hdr) =
Fmt.pf ppf "%a: %s" (Ptime.pp_human ()) hdr.ts hdr.name
let hdr context name = { ts = Ptime_clock.now () ; context ; name }
@ -355,6 +354,6 @@ module Log = struct
type msg = hdr * event
let pp db ppf (hdr, event) =
Fmt.pf ppf "%a %a" (pp_hdr db) hdr pp_event event
let pp ppf (hdr, event) =
Fmt.pf ppf "%a %a" pp_hdr hdr pp_event event
end

View File

@ -36,7 +36,7 @@ let init () = {
let log state (hdr, event) =
let data = Vmm_wire.Log.log state.log_counter state.log_version hdr event in
let log_counter = Int64.succ state.log_counter in
Logs.debug (fun m -> m "LOG %a" (Log.pp []) (hdr, event)) ;
Logs.debug (fun m -> m "LOG %a" Log.pp (hdr, event)) ;
({ state with log_counter }, `Log data)
let handle_create t hdr vm_config (* policies *) =

View File

@ -71,9 +71,9 @@ let read_wire s =
r b 0 l >|= function
| Error e -> Error e
| Ok () ->
Logs.debug (fun m -> m "read hdr %a, body %a"
(* Logs.debug (fun m -> m "read hdr %a, body %a"
Cstruct.hexdump_pp (Cstruct.of_bytes buf)
Cstruct.hexdump_pp (Cstruct.of_bytes b)) ;
Cstruct.hexdump_pp (Cstruct.of_bytes b)) ; *)
Ok (hdr, Cstruct.of_bytes b)
else
Lwt.return (Ok (hdr, Cstruct.empty))
@ -91,7 +91,7 @@ let write_wire s buf =
Logs.err (fun m -> m "exception %s while writing" (Printexc.to_string e)) ;
Lwt.return (Error `Exception))
in
Logs.debug (fun m -> m "writing %a" Cstruct.hexdump_pp (Cstruct.of_bytes buf)) ;
(* Logs.debug (fun m -> m "writing %a" Cstruct.hexdump_pp (Cstruct.of_bytes buf)) ; *)
w 0 (Bytes.length buf)
let safe_close fd =

79
src/vmm_trie.ml Normal file
View File

@ -0,0 +1,79 @@
open Astring
type 'a t = N of 'a option * 'a t String.Map.t
let empty = N (None, String.Map.empty)
let insert id e t =
let rec go (N (es, m)) = function
| [] ->
begin match es with
| None -> N (Some e, m), None
| Some es' -> N (Some e, m), Some es'
end
| x::xs ->
let n = match String.Map.find_opt x m with
| None -> empty
| Some n -> n
in
let entry, ret = go n xs in
N (es, String.Map.add x entry m), ret
in
go t id
let remove id t =
let rec go (N (es, m)) = function
| [] -> if String.Map.is_empty m then None else Some (N (None, m))
| x::xs ->
let n' = match String.Map.find_opt x m with
| None -> None
| Some n -> go n xs
in
let m' = match n' with
| None -> String.Map.remove x m
| Some entry -> String.Map.add x entry m
in
if String.Map.is_empty m' && es = None then None else Some (N (es, m'))
in
match go t id with
| None -> empty
| Some n -> n
let find id t =
let rec go (N (es, m)) = function
| [] -> es
| x::xs ->
match String.Map.find_opt x m with
| None -> None
| Some n -> go n xs
in
go t id
let collect id t =
let rec go acc prefix (N (es, m)) =
let acc' =
match es with
| None -> acc
| Some e -> (prefix, e) :: acc
in
function
| [] -> acc'
| x::xs ->
match String.Map.find_opt x m with
| None -> acc'
| Some n -> go acc' (prefix @ [ x ]) n xs
in
go [] [] t id
let all t =
let rec go acc prefix (N (es, m)) =
let acc' =
match es with
| None -> acc
| Some e -> (prefix, e) :: acc
in
List.fold_left (fun acc (name, node) ->
go acc (prefix@[name]) node)
acc' (String.Map.bindings m)
in
go [] [] t

15
src/vmm_trie.mli Normal file
View File

@ -0,0 +1,15 @@
open Vmm_core
type 'a t
val empty : 'a t
val insert : id -> 'a -> 'a t -> 'a t * 'a option
val remove : id -> 'a t -> 'a t
val find : id -> 'a t -> 'a option
val collect : id -> 'a t -> (id * 'a) list
val all : 'a t -> (id * 'a) list

View File

@ -260,19 +260,19 @@ module Stats = struct
type op =
| Add
| Remove
| Stats
| Subscribe
| Data
let op_to_int = function
| Add -> 0x0200l
| Remove -> 0x0201l
| Stats -> 0x0202l
| Subscribe -> 0x0202l
| Data -> 0x0203l
let int_to_op = function
| 0x0200l -> Some Add
| 0x0201l -> Some Remove
| 0x0202l -> Some Stats
| 0x0202l -> Some Subscribe
| 0x0203l -> Some Data
| _ -> None
@ -382,7 +382,7 @@ module Stats = struct
let remove id version name = encode ~name version id (op_to_int Remove)
let stat id version name = encode ~name version id (op_to_int Stats)
let subscribe id version name = encode ~name version id (op_to_int Subscribe)
let data id version vm body =
let name = Vmm_core.id_of_string vm in
@ -440,30 +440,27 @@ let split_id id = match List.rev id with
module Log = struct
type op =
| Log
| History
| Broadcast
| Subscribe
let op_to_int = function
| Log -> 0x0300l
| History -> 0x0301l
| Subscribe -> 0x0301l
| Broadcast -> 0x0302l
| Subscribe -> 0x0303l
let int_to_op = function
| 0x0300l -> Some Log
| 0x0301l -> Some History
| 0x0301l -> Some Subscribe
| 0x0302l -> Some Broadcast
| 0x0303l -> Some Subscribe
| _ -> None
let history id version name ts =
encode ~name ~body:(encode_ptime ts) version id (op_to_int History)
let subscribe id version name =
encode ~name version id (op_to_int Subscribe)
let decode_log_hdr cs =
decode_id_ts cs >>= fun ((id, ts), off) ->
split_id id >>= fun (name, context) ->
Ok ({ Log.ts ; context ; name }, Cstruct.shift cs (16 + off))
Ok ({ Log.ts ; context ; name }, Cstruct.shift cs off)
let encode_addr ip port =
let cs = Cstruct.create 6 in
@ -490,7 +487,7 @@ module Log = struct
decode_string r >>= fun (block, l) ->
let block = if block = "" then None else Some block in
cs_shift r l >>= fun r' ->
decode_strings r' >>= fun taps ->
decode_strings r' >>= fun (taps, _) ->
Ok (pid, taps, block)
let encode_pid_exit pid c =

View File

@ -22,17 +22,17 @@ let descr = ref []
type 'a t = {
pid_nic : ((vmctx, int) result * (int * string) list) IM.t ;
vmid_pid : int String.Map.t ;
name_sockets : 'a String.Map.t ;
vmid_pid : int Vmm_trie.t ;
name_sockets : 'a Vmm_trie.t ;
}
let pp_strings pp taps = Fmt.(list ~sep:(unit ",@ ") string) pp taps
let empty () =
{ pid_nic = IM.empty ; vmid_pid = String.Map.empty ; name_sockets = String.Map.empty }
{ pid_nic = IM.empty ; vmid_pid = Vmm_trie.empty ; name_sockets = Vmm_trie.empty }
let remove_socket t name =
let name_sockets = String.Map.remove name t.name_sockets in
let remove_entry t name =
let name_sockets = Vmm_trie.remove name t.name_sockets in
{ t with name_sockets }
let rec wrap f arg =
@ -50,10 +50,10 @@ let fill_descr ctx =
Logs.err (fun m -> m "vmmapi_statnames failed, shouldn't happen") ;
()
| Some d ->
Logs.info (fun m -> m "descr are %a" pp_strings d) ;
Logs.debug (fun m -> m "descr are %a" pp_strings d) ;
descr := d
end
| ds -> Logs.info (fun m -> m "%d descr are already present" (List.length ds))
| ds -> Logs.debug (fun m -> m "%d descr are already present" (List.length ds))
let open_vmmapi ?(retries = 4) pid =
let name = "solo5-" ^ string_of_int pid in
@ -91,20 +91,18 @@ let gather pid vmctx nics =
| None ->
Logs.warn (fun m -> m "failed to get ifdata for %s" nname) ;
ifd
| Some data ->
Logs.debug (fun m -> m "adding ifdata for %s" nname) ;
data::ifd)
| Some data -> data::ifd)
[] nics
let tick t =
Logs.debug (fun m -> m "tick with %d vms" (IM.cardinal t.pid_nic)) ;
let pid_nic = try_open_vmmapi t.pid_nic in
let t' = { t with pid_nic } in
let outs =
String.Map.fold (fun name socket out ->
match String.Map.find_opt name t.vmid_pid with
| None -> Logs.warn (fun m -> m "couldn't find pid of %s" name) ; out
| Some pid -> match IM.find_opt pid t.pid_nic with
List.fold_left (fun out (vmid, pid) ->
let listeners = Vmm_trie.collect vmid t'.name_sockets in
match listeners with
| [] -> Logs.warn (fun m -> m "nobody is listening") ; out
| xs -> match IM.find_opt pid t.pid_nic with
| None -> Logs.warn (fun m -> m "couldn't find nics of %d" pid) ; out
| Some (vmctx, nics) ->
let ru, vmm, ifd = gather pid vmctx nics in
@ -115,9 +113,15 @@ let tick t =
let vmm' = match vmm with None -> [] | Some xs -> List.combine !descr xs in
ru', vmm', ifd
in
let stats_encoded = Vmm_wire.Stats.(data 0L my_version name (encode_stats stats)) in
(socket, name, stats_encoded) :: out)
t'.name_sockets []
List.fold_left (fun out (id, socket) ->
match Vmm_core.drop_super ~super:id ~sub:vmid with
| None -> Logs.err (fun m -> m "couldn't drop super %a from sub %a" Vmm_core.pp_id id Vmm_core.pp_id vmid) ; out
| Some real_id ->
let name = Vmm_core.string_of_id real_id in
let stats_encoded = Vmm_wire.Stats.(data 0L my_version name (encode_stats stats)) in
(socket, vmid, stats_encoded) :: out)
out xs)
[] (Vmm_trie.all t'.vmid_pid)
in
(t', outs)
@ -141,14 +145,15 @@ let add_pid t vmid pid nics =
Logs.info (fun m -> m "adding %d %a with vmctx %b" pid pp_strings nics
(match vmctx with Error _ -> false | Ok _ -> true)) ;
let pid_nic = IM.add pid (vmctx, nic_ids) t.pid_nic
and vmid_pid = String.Map.add vmid pid t.vmid_pid
and vmid_pid, ret = Vmm_trie.insert vmid pid t.vmid_pid
in
assert (ret = None) ;
Ok { t with pid_nic ; vmid_pid }
let remove_vmid t vmid =
Logs.info (fun m -> m "removing vmid %s" vmid) ;
match String.Map.find vmid t.vmid_pid with
| None -> Logs.warn (fun m -> m "no pid found for %s" vmid) ; t
Logs.info (fun m -> m "removing vmid %a" Vmm_core.pp_id vmid) ;
match Vmm_trie.find vmid t.vmid_pid with
| None -> Logs.warn (fun m -> m "no pid found for %a" Vmm_core.pp_id vmid) ; t
| Some pid ->
Logs.info (fun m -> m "removing pid %d" pid) ;
(try
@ -158,7 +163,7 @@ let remove_vmid t vmid =
with
_ -> ()) ;
let pid_nic = IM.remove pid t.pid_nic
and vmid_pid = String.Map.remove vmid t.vmid_pid
and vmid_pid = Vmm_trie.remove vmid t.vmid_pid
in
{ t with pid_nic ; vmid_pid }
@ -173,22 +178,21 @@ let handle t socket hdr cs =
Error (`Msg "cannot handle version")
else
decode_strings cs >>= fun (id, off) ->
let name = Vmm_core.string_of_id id in
match int_to_op hdr.tag with
| Some Add ->
decode_pid_taps (Cstruct.shift cs off) >>= fun (pid, taps) ->
add_pid t name pid taps >>= fun t ->
Ok (t, `Add name, success ~msg:"added" my_version hdr.id (op_to_int Add))
add_pid t id pid taps >>= fun t ->
Ok (t, `Add id, None, success ~msg:"added" my_version hdr.id (op_to_int Add))
| Some Remove ->
let t = remove_vmid t name in
Ok (t, `Remove name, success ~msg:"removed" my_version hdr.id (op_to_int Remove))
| Some Stats ->
let name_sockets = String.Map.add name socket t.name_sockets in
Ok ({ t with name_sockets }, `None, success ~msg:"subscribed" my_version hdr.id (op_to_int Stats))
let t = remove_vmid t id in
Ok (t, `Remove id, None, success ~msg:"removed" my_version hdr.id (op_to_int Remove))
| Some Subscribe ->
let name_sockets, close = Vmm_trie.insert id socket t.name_sockets in
Ok ({ t with name_sockets }, `None, close, success ~msg:"subscribed" my_version hdr.id (op_to_int Subscribe))
| _ -> Error (`Msg "unknown command")
in
match r with
| Ok (t, action, out) -> t, action, out
| Ok (t, action, close, out) -> t, action, close, out
| Error (`Msg msg) ->
Logs.err (fun m -> m "error while processing %s" msg) ;
t, `None, fail ~msg my_version hdr.id
t, `None, None, fail ~msg my_version hdr.id

View File

@ -28,15 +28,14 @@ let handle s addr () =
| Error (`Msg msg) -> Logs.err (fun m -> m "error while reading %s" msg) ; loop acc
| Error _ -> Logs.err (fun m -> m "exception while reading") ; Lwt.return acc
| Ok (hdr, data) ->
Logs.debug (fun m -> m "received %a" Cstruct.hexdump_pp data) ;
let t', action, out = Vmm_stats.handle !t s hdr data in
let t', action, close, out = Vmm_stats.handle !t s hdr data in
let acc = match action with
| `Add pid -> pid :: acc
| `Remove pid -> List.filter (fun m -> m <> pid) acc
| `None -> acc
in
t := t' ;
Logs.debug (fun m -> m "sent %a" Cstruct.hexdump_pp out) ;
(match close with None -> Lwt.return_unit | Some s' -> Vmm_lwt.safe_close s') >>= fun () ->
Vmm_lwt.write_wire s out >>= function
| Ok () -> loop acc
| Error _ -> Logs.err (fun m -> m "exception while writing") ; Lwt.return acc
@ -54,7 +53,7 @@ let rec timer interval () =
Vmm_lwt.write_wire s stat >>= function
| Ok () -> Lwt.return_unit
| Error `Exception ->
t := Vmm_stats.remove_socket !t name ;
t := Vmm_stats.remove_entry !t name ;
Vmm_lwt.safe_close s)
outs >>= fun () ->
Lwt_unix.sleep interval >>= fun () ->