diff --git a/app/vmm_influxdb_stats.ml b/app/vmm_influxdb_stats.ml index 1ccb359..cc84072 100644 --- a/app/vmm_influxdb_stats.ml +++ b/app/vmm_influxdb_stats.ml @@ -239,18 +239,11 @@ let rec read_sock_write_tcp c ?fd addr addrtype = Lwt.return (Some fd)) >>= fun fd -> read_sock_write_tcp c ?fd addr addrtype -let query_sock vms c = - (* query c for everyone in db *) - Lwt_list.fold_left_s (fun r name -> - match r with - | Error e -> Lwt.return (Error e) - | Ok () -> - let id = Astring.String.cuts ~sep:"." name in - let request = Vmm_wire.Stats.stat !command my_version id in - command := Int64.succ !command ; - Logs.debug (fun m -> m "%Lu requesting %a via socket" !command pp_id id) ; - Vmm_lwt.write_wire c request) - (Ok ()) vms +let query_sock vm c = + let request = Vmm_wire.Stats.subscribe !command my_version vm in + command := Int64.succ !command ; + Logs.debug (fun m -> m "%Lu requesting %a via socket" !command pp_id vm) ; + Vmm_lwt.write_wire c request let rec maybe_connect stat_socket = let c = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in @@ -267,7 +260,7 @@ let rec maybe_connect stat_socket = Lwt_unix.sleep (float_of_int 5) >>= fun () -> maybe_connect stat_socket) -let client stat_socket influxhost influxport vms = +let client stat_socket influxhost influxport vm = (* figure out address of influx *) Lwt_unix.gethostbyname influxhost >>= fun host_entry -> let host_inet_addr = Array.get host_entry.Lwt_unix.h_addr_list 0 in @@ -292,7 +285,7 @@ let client stat_socket influxhost influxport vms = let rec loop () = (* start a socket connection to vmm_stats *) maybe_connect stat_socket >>= fun c -> - query_sock vms c >>= function + query_sock vm c >>= function | Error e -> Logs.err (fun m -> m "error %s while writing to stat socket" (str_of_e e)) ; Lwt.return_unit @@ -302,9 +295,9 @@ let client stat_socket influxhost influxport vms = in loop () -let run_client _ socket (influxhost, influxport) vms = +let run_client _ socket (influxhost, influxport) vm = Sys.(set_signal sigpipe Signal_ignore) ; - Lwt_main.run (client socket influxhost influxport vms) + Lwt_main.run (client socket influxhost influxport vm) let setup_log style_renderer level = Fmt_tty.setup_std_outputs ?style_renderer (); @@ -339,9 +332,14 @@ let influx = Arg.(required & pos 0 (some host_port) None & info [] ~docv:"influx" ~doc:"the influx hostname:port to connect to") -let vms = - let doc = "virtual machine names" in - Arg.(value & opt_all string [] & info [ "n" ; "name" ] ~doc) +let vm_c = + let parse s = `Ok (Vmm_core.id_of_string s) + in + (parse, Vmm_core.pp_id) + +let opt_vmname = + let doc = "Name virtual machine." in + Arg.(value & opt vm_c [] & info [ "n" ; "name"] ~doc) let cmd = let doc = "VMM InfluxDB connector" in @@ -349,7 +347,7 @@ let cmd = `S "DESCRIPTION" ; `P "$(tname) connects to a vmm stats socket, pulls statistics and pushes them via TCP to influxdb" ] in - Term.(pure run_client $ setup_log $ socket $ influx $ vms), + Term.(pure run_client $ setup_log $ socket $ influx $ opt_vmname), Term.info "vmm_influxdb_stats" ~version:"%%VERSION_NUM%%" ~doc ~man let () = diff --git a/app/vmm_log.ml b/app/vmm_log.ml index ba5824e..0338ab8 100644 --- a/app/vmm_log.ml +++ b/app/vmm_log.ml @@ -16,66 +16,12 @@ open Astring let my_version = `WV2 -type t = N of Lwt_unix.file_descr list * t String.Map.t - -let empty = N ([], String.Map.empty) - -let insert id fd t = - let rec go (N (fds, m)) = function - | [] -> N ((fd :: fds), m) - | x::xs -> - let n = match String.Map.find_opt x m with - | None -> empty - | Some n -> n - in - let entry = go n xs in - N (fds, String.Map.add x entry m) - in - go t id - -let remove id fd t = - let rec go (N (fds, m)) = function - | [] -> - begin match List.filter (fun fd' -> fd <> fd') fds with - | [] -> None - | fds' -> Some (N (fds', m)) - end - | x::xs -> - let n' = match String.Map.find_opt x m with - | None -> None - | Some n -> go n xs - in - let m' = match n' with - | None -> String.Map.remove x m - | Some entry -> String.Map.add x entry m - in - if String.Map.is_empty m' && fds = [] then None else Some (N (fds, m')) - in - match go t id with - | None -> empty - | Some n -> n - -let collect id t = - let rec go acc prefix (N (fds, m)) = - let acc' = - let here = List.map (fun fd -> (prefix, fd)) fds in - here @ acc - in - function - | [] -> acc' - | x::xs -> - match String.Map.find_opt x m with - | None -> acc' - | Some n -> go acc' (prefix @ [ x ]) n xs - in - go [] [] t id - let broadcast prefix data t = Lwt_list.fold_left_s (fun t (id, s) -> Vmm_lwt.write_wire s data >|= function | Ok () -> t - | Error `Exception -> remove id s t) - t (collect prefix t) + | Error `Exception -> Vmm_trie.remove id t) + t (Vmm_trie.collect prefix t) let write_complete s cs = let l = Cstruct.len cs in @@ -116,10 +62,33 @@ let write_to_file file = - should there be acks for history/datain? *) -let tree = ref empty +let tree = ref Vmm_trie.empty let bcast = ref 0L +let send_history s ring id cmd_id = + let elements = Vmm_ring.read ring in + let res = + List.fold_left (fun acc (_, x) -> + let cs = Cstruct.of_string x in + match Vmm_wire.Log.decode_log_hdr cs with + | Ok (hdr, _) -> + begin match Vmm_core.drop_super ~super:id ~sub:hdr.Vmm_core.Log.context with + | Some [] -> cs :: acc + | _ -> acc + end + | _ -> acc) + [] elements + in + (* just need a wrapper in tag = Log.Data, id = reqid *) + Lwt_list.fold_left_s (fun r body -> + match r with + | Ok () -> + let data = Vmm_wire.encode ~body my_version cmd_id (Vmm_wire.Log.op_to_int Vmm_wire.Log.Broadcast) in + Vmm_lwt.write_wire s data + | Error e -> Lwt.return (Error e)) + (Ok ()) res + let handle mvar ring s addr () = Logs.info (fun m -> m "handling connection from %a" Vmm_lwt.pp_sockaddr addr) ; let str = Fmt.strf "%a: CONNECT\n" (Ptime.pp_human ()) (Ptime_clock.now ()) in @@ -153,48 +122,28 @@ let handle mvar ring s addr () = tree := tree' ; loop () end - | Some Vmm_wire.Log.History -> - begin match Vmm_wire.decode_id_ts data with - | Error (`Msg err) -> - Logs.warn (fun m -> m "ignoring error %s while decoding history" err) ; - loop () - | Ok ((sub, ts), _) -> - let elements = Vmm_ring.read_history ring ts in - let res = - List.fold_left (fun acc (_, x) -> - let cs = Cstruct.of_string x in - match Vmm_wire.Log.decode_log_hdr cs with - | Ok (hdr, _) when Vmm_core.is_sub_id ~super:hdr.Vmm_core.Log.context ~sub -> - cs :: acc - | _ -> acc) - [] elements - in - (* just need a wrapper in tag = Log.Data, id = reqid *) - Lwt_list.fold_left_s (fun r body -> - match r with - | Ok () -> - let data = Vmm_wire.encode ~body my_version hdr.Vmm_wire.id (Vmm_wire.Log.op_to_int Vmm_wire.Log.Log) in - Vmm_lwt.write_wire s data - | Error e -> Lwt.return (Error e)) - (Ok ()) res >>= function - | Ok () -> loop () - | Error _ -> - Logs.err (fun m -> m "error while sending data in history") ; - Lwt.return_unit - end | Some Vmm_wire.Log.Subscribe -> begin match Vmm_wire.decode_strings data with | Error (`Msg err) -> Logs.warn (fun m -> m "ignoring error %s while decoding subscribe" err) ; loop () | Ok (id, _) -> - tree := insert id s !tree ; + let tree', ret = Vmm_trie.insert id s !tree in + tree := tree' ; + (match ret with + | None -> Lwt.return_unit + | Some s' -> Vmm_lwt.safe_close s') >>= fun () -> let out = Vmm_wire.success my_version hdr.Vmm_wire.id hdr.Vmm_wire.tag in Vmm_lwt.write_wire s out >>= function - | Ok () -> loop () | Error _ -> Logs.err (fun m -> m "error while sending reply for subscribe") ; Lwt.return_unit + | Ok () -> + send_history s ring id hdr.Vmm_wire.id >>= function + | Error _ -> + Logs.err (fun m -> m "error while sending history") ; + Lwt.return_unit + | Ok () -> loop () (* TODO no need to loop ;) *) end | _ -> Logs.err (fun m -> m "unknown command") ; diff --git a/app/vmmc.ml b/app/vmmc.ml index 0298f28..4265b34 100644 --- a/app/vmmc.ml +++ b/app/vmmc.ml @@ -164,16 +164,13 @@ let console _ opt_socket name = Vmm_lwt.safe_close fd) ; `Ok () -let stats _ opt_socket vms = +let stats _ opt_socket vm = Lwt_main.run ( connect (socket `Stats opt_socket) >>= fun fd -> - let count = ref 0L in - Lwt_list.iter_s (fun name -> - let cmd = Vmm_wire.Stats.stat !count my_version name in - count := Int64.succ !count ; - Vmm_lwt.write_wire fd cmd >>= function - | Error `Exception -> Lwt.fail_with "write error" - | Ok () -> Lwt.return_unit) vms >>= fun () -> + let cmd = Vmm_wire.Stats.subscribe my_command my_version vm in + (Vmm_lwt.write_wire fd cmd >>= function + | Error `Exception -> Lwt.fail_with "write error" + | Ok () -> Lwt.return_unit) >>= fun () -> (* now we busy read and process stat output *) let rec loop () = Vmm_lwt.read_wire fd >>= function @@ -220,6 +217,57 @@ let stats _ opt_socket vms = Vmm_lwt.safe_close fd) ; `Ok () +let event_log _ opt_socket vm = + Lwt_main.run ( + connect (socket `Log opt_socket) >>= fun fd -> + let cmd = Vmm_wire.Log.subscribe my_command my_version vm in + (Vmm_lwt.write_wire fd cmd >>= function + | Error `Exception -> Lwt.fail_with "write error" + | Ok () -> Lwt.return_unit) >>= fun () -> + (* now we busy read and process stat output *) + let rec loop () = + Vmm_lwt.read_wire fd >>= function + | Error (`Msg msg) -> Logs.err (fun m -> m "error while reading %s" msg) ; loop () + | Error _ -> Logs.err (fun m -> m "exception while reading") ; Lwt.return_unit + | Ok (hdr, data) -> + if Vmm_wire.is_fail hdr then + let msg = match Vmm_wire.decode_string data with + | Error _ -> None + | Ok (m, _) -> Some m + in + Logs.err (fun m -> m "operation failed: %a" Fmt.(option ~none:(unit "") string) msg) ; + Lwt.return_unit + else if Vmm_wire.is_reply hdr then + let msg = match Vmm_wire.decode_string data with + | Error _ -> None + | Ok (m, _) -> Some m + in + Logs.app (fun m -> m "operation succeeded: %a" Fmt.(option ~none:(unit "") string) msg) ; + loop () + else + begin + (match Vmm_wire.Log.int_to_op hdr.Vmm_wire.tag with + | Some Vmm_wire.Log.Broadcast -> + begin match Vmm_wire.Log.decode_log_hdr data with + | Error (`Msg err) -> + Logs.warn (fun m -> m "ignoring error %s while decoding log" err) ; + | Ok (loghdr, logdata) -> + match Vmm_wire.Log.decode_event logdata with + | Error (`Msg err) -> + Logs.warn (fun m -> m "loghdr %a ignoring error %s while decoding logdata" + Vmm_core.Log.pp_hdr loghdr err) + | Ok event -> + Logs.app (fun m -> m "%a" Vmm_core.Log.pp (loghdr, event)) + end + | _ -> + Logs.warn (fun m -> m "unknown operation %lx" hdr.Vmm_wire.tag)) ; + loop () + end + in + loop () >>= fun () -> + Vmm_lwt.safe_close fd) ; + `Ok () + let help _ _ man_format cmds = function | None -> `Help (`Pager, None) | Some t when List.mem t cmds -> `Help (man_format, Some t) @@ -318,19 +366,24 @@ let console_cmd = Term.(ret (const console $ setup_log $ socket $ vm_name)), Term.info "console" ~doc ~man -let vm_names = - let doc = "Name virtual machine." in - Arg.(value & opt_all vm_c [] & info [ "n" ; "name" ] ~doc) - let stats_cmd = let doc = "statistics of VMs" in let man = [`S "DESCRIPTION"; `P "Shows statistics of VMs."] in - Term.(ret (const stats $ setup_log $ socket $ vm_names)), + Term.(ret (const stats $ setup_log $ socket $ opt_vmname)), Term.info "stats" ~doc ~man +let log_cmd = + let doc = "Event log" in + let man = + [`S "DESCRIPTION"; + `P "Shows event log of VM."] + in + Term.(ret (const event_log $ setup_log $ socket $ opt_vmname)), + Term.info "log" ~doc ~man + let help_cmd = let topic = let doc = "The topic to get help on. `topics' lists the topics." in @@ -353,7 +406,7 @@ let default_cmd = Term.(ret (const help $ setup_log $ socket $ Term.man_format $ Term.choice_names $ Term.pure None)), Term.info "vmmc" ~version:"%%VERSION_NUM%%" ~doc ~man -let cmds = [ help_cmd ; info_cmd ; destroy_cmd ; create_cmd ; console_cmd ; stats_cmd ] +let cmds = [ help_cmd ; info_cmd ; destroy_cmd ; create_cmd ; console_cmd ; stats_cmd ; log_cmd ] let () = match Term.eval_choice default_cmd cmds diff --git a/src/vmm_commands.ml b/src/vmm_commands.ml index aad9c0e..e4bf64b 100644 --- a/src/vmm_commands.ml +++ b/src/vmm_commands.ml @@ -7,9 +7,6 @@ open Vmm_core open Rresult open R.Infix - - - let handle_command t s prefix perms hdr buf = let res = if not (Vmm_wire.version_eq hdr.Vmm_wire.version t.client_version) then diff --git a/src/vmm_core.ml b/src/vmm_core.ml index 0c10bfb..a567fed 100644 --- a/src/vmm_core.ml +++ b/src/vmm_core.ml @@ -323,9 +323,8 @@ module Log = struct name : string ; } - let pp_hdr db ppf (hdr : hdr) = - let name = translate_serial db hdr.name in - Fmt.pf ppf "%a: %s" (Ptime.pp_human ()) hdr.ts name + let pp_hdr ppf (hdr : hdr) = + Fmt.pf ppf "%a: %s" (Ptime.pp_human ()) hdr.ts hdr.name let hdr context name = { ts = Ptime_clock.now () ; context ; name } @@ -355,6 +354,6 @@ module Log = struct type msg = hdr * event - let pp db ppf (hdr, event) = - Fmt.pf ppf "%a %a" (pp_hdr db) hdr pp_event event + let pp ppf (hdr, event) = + Fmt.pf ppf "%a %a" pp_hdr hdr pp_event event end diff --git a/src/vmm_engine.ml b/src/vmm_engine.ml index a4e1af7..c81aa22 100644 --- a/src/vmm_engine.ml +++ b/src/vmm_engine.ml @@ -36,7 +36,7 @@ let init () = { let log state (hdr, event) = let data = Vmm_wire.Log.log state.log_counter state.log_version hdr event in let log_counter = Int64.succ state.log_counter in - Logs.debug (fun m -> m "LOG %a" (Log.pp []) (hdr, event)) ; + Logs.debug (fun m -> m "LOG %a" Log.pp (hdr, event)) ; ({ state with log_counter }, `Log data) let handle_create t hdr vm_config (* policies *) = diff --git a/src/vmm_lwt.ml b/src/vmm_lwt.ml index 80dfb34..bfe0382 100644 --- a/src/vmm_lwt.ml +++ b/src/vmm_lwt.ml @@ -71,9 +71,9 @@ let read_wire s = r b 0 l >|= function | Error e -> Error e | Ok () -> - Logs.debug (fun m -> m "read hdr %a, body %a" +(* Logs.debug (fun m -> m "read hdr %a, body %a" Cstruct.hexdump_pp (Cstruct.of_bytes buf) - Cstruct.hexdump_pp (Cstruct.of_bytes b)) ; + Cstruct.hexdump_pp (Cstruct.of_bytes b)) ; *) Ok (hdr, Cstruct.of_bytes b) else Lwt.return (Ok (hdr, Cstruct.empty)) @@ -91,7 +91,7 @@ let write_wire s buf = Logs.err (fun m -> m "exception %s while writing" (Printexc.to_string e)) ; Lwt.return (Error `Exception)) in - Logs.debug (fun m -> m "writing %a" Cstruct.hexdump_pp (Cstruct.of_bytes buf)) ; + (* Logs.debug (fun m -> m "writing %a" Cstruct.hexdump_pp (Cstruct.of_bytes buf)) ; *) w 0 (Bytes.length buf) let safe_close fd = diff --git a/src/vmm_trie.ml b/src/vmm_trie.ml new file mode 100644 index 0000000..dc85b0a --- /dev/null +++ b/src/vmm_trie.ml @@ -0,0 +1,79 @@ +open Astring + +type 'a t = N of 'a option * 'a t String.Map.t + +let empty = N (None, String.Map.empty) + +let insert id e t = + let rec go (N (es, m)) = function + | [] -> + begin match es with + | None -> N (Some e, m), None + | Some es' -> N (Some e, m), Some es' + end + | x::xs -> + let n = match String.Map.find_opt x m with + | None -> empty + | Some n -> n + in + let entry, ret = go n xs in + N (es, String.Map.add x entry m), ret + in + go t id + +let remove id t = + let rec go (N (es, m)) = function + | [] -> if String.Map.is_empty m then None else Some (N (None, m)) + | x::xs -> + let n' = match String.Map.find_opt x m with + | None -> None + | Some n -> go n xs + in + let m' = match n' with + | None -> String.Map.remove x m + | Some entry -> String.Map.add x entry m + in + if String.Map.is_empty m' && es = None then None else Some (N (es, m')) + in + match go t id with + | None -> empty + | Some n -> n + +let find id t = + let rec go (N (es, m)) = function + | [] -> es + | x::xs -> + match String.Map.find_opt x m with + | None -> None + | Some n -> go n xs + in + go t id + +let collect id t = + let rec go acc prefix (N (es, m)) = + let acc' = + match es with + | None -> acc + | Some e -> (prefix, e) :: acc + in + function + | [] -> acc' + | x::xs -> + match String.Map.find_opt x m with + | None -> acc' + | Some n -> go acc' (prefix @ [ x ]) n xs + in + go [] [] t id + +let all t = + let rec go acc prefix (N (es, m)) = + let acc' = + match es with + | None -> acc + | Some e -> (prefix, e) :: acc + in + List.fold_left (fun acc (name, node) -> + go acc (prefix@[name]) node) + acc' (String.Map.bindings m) + in + go [] [] t diff --git a/src/vmm_trie.mli b/src/vmm_trie.mli new file mode 100644 index 0000000..5e2bca2 --- /dev/null +++ b/src/vmm_trie.mli @@ -0,0 +1,15 @@ +open Vmm_core + +type 'a t + +val empty : 'a t + +val insert : id -> 'a -> 'a t -> 'a t * 'a option + +val remove : id -> 'a t -> 'a t + +val find : id -> 'a t -> 'a option + +val collect : id -> 'a t -> (id * 'a) list + +val all : 'a t -> (id * 'a) list diff --git a/src/vmm_wire.ml b/src/vmm_wire.ml index 8e23452..267b00a 100644 --- a/src/vmm_wire.ml +++ b/src/vmm_wire.ml @@ -260,19 +260,19 @@ module Stats = struct type op = | Add | Remove - | Stats + | Subscribe | Data let op_to_int = function | Add -> 0x0200l | Remove -> 0x0201l - | Stats -> 0x0202l + | Subscribe -> 0x0202l | Data -> 0x0203l let int_to_op = function | 0x0200l -> Some Add | 0x0201l -> Some Remove - | 0x0202l -> Some Stats + | 0x0202l -> Some Subscribe | 0x0203l -> Some Data | _ -> None @@ -382,7 +382,7 @@ module Stats = struct let remove id version name = encode ~name version id (op_to_int Remove) - let stat id version name = encode ~name version id (op_to_int Stats) + let subscribe id version name = encode ~name version id (op_to_int Subscribe) let data id version vm body = let name = Vmm_core.id_of_string vm in @@ -440,30 +440,27 @@ let split_id id = match List.rev id with module Log = struct type op = | Log - | History | Broadcast | Subscribe let op_to_int = function | Log -> 0x0300l - | History -> 0x0301l + | Subscribe -> 0x0301l | Broadcast -> 0x0302l - | Subscribe -> 0x0303l let int_to_op = function | 0x0300l -> Some Log - | 0x0301l -> Some History + | 0x0301l -> Some Subscribe | 0x0302l -> Some Broadcast - | 0x0303l -> Some Subscribe | _ -> None - let history id version name ts = - encode ~name ~body:(encode_ptime ts) version id (op_to_int History) + let subscribe id version name = + encode ~name version id (op_to_int Subscribe) let decode_log_hdr cs = decode_id_ts cs >>= fun ((id, ts), off) -> split_id id >>= fun (name, context) -> - Ok ({ Log.ts ; context ; name }, Cstruct.shift cs (16 + off)) + Ok ({ Log.ts ; context ; name }, Cstruct.shift cs off) let encode_addr ip port = let cs = Cstruct.create 6 in @@ -490,7 +487,7 @@ module Log = struct decode_string r >>= fun (block, l) -> let block = if block = "" then None else Some block in cs_shift r l >>= fun r' -> - decode_strings r' >>= fun taps -> + decode_strings r' >>= fun (taps, _) -> Ok (pid, taps, block) let encode_pid_exit pid c = diff --git a/stats/vmm_stats.ml b/stats/vmm_stats.ml index ba7c3b4..9571eee 100644 --- a/stats/vmm_stats.ml +++ b/stats/vmm_stats.ml @@ -22,17 +22,17 @@ let descr = ref [] type 'a t = { pid_nic : ((vmctx, int) result * (int * string) list) IM.t ; - vmid_pid : int String.Map.t ; - name_sockets : 'a String.Map.t ; + vmid_pid : int Vmm_trie.t ; + name_sockets : 'a Vmm_trie.t ; } let pp_strings pp taps = Fmt.(list ~sep:(unit ",@ ") string) pp taps let empty () = - { pid_nic = IM.empty ; vmid_pid = String.Map.empty ; name_sockets = String.Map.empty } + { pid_nic = IM.empty ; vmid_pid = Vmm_trie.empty ; name_sockets = Vmm_trie.empty } -let remove_socket t name = - let name_sockets = String.Map.remove name t.name_sockets in +let remove_entry t name = + let name_sockets = Vmm_trie.remove name t.name_sockets in { t with name_sockets } let rec wrap f arg = @@ -50,10 +50,10 @@ let fill_descr ctx = Logs.err (fun m -> m "vmmapi_statnames failed, shouldn't happen") ; () | Some d -> - Logs.info (fun m -> m "descr are %a" pp_strings d) ; + Logs.debug (fun m -> m "descr are %a" pp_strings d) ; descr := d end - | ds -> Logs.info (fun m -> m "%d descr are already present" (List.length ds)) + | ds -> Logs.debug (fun m -> m "%d descr are already present" (List.length ds)) let open_vmmapi ?(retries = 4) pid = let name = "solo5-" ^ string_of_int pid in @@ -91,20 +91,18 @@ let gather pid vmctx nics = | None -> Logs.warn (fun m -> m "failed to get ifdata for %s" nname) ; ifd - | Some data -> - Logs.debug (fun m -> m "adding ifdata for %s" nname) ; - data::ifd) + | Some data -> data::ifd) [] nics let tick t = - Logs.debug (fun m -> m "tick with %d vms" (IM.cardinal t.pid_nic)) ; let pid_nic = try_open_vmmapi t.pid_nic in let t' = { t with pid_nic } in let outs = - String.Map.fold (fun name socket out -> - match String.Map.find_opt name t.vmid_pid with - | None -> Logs.warn (fun m -> m "couldn't find pid of %s" name) ; out - | Some pid -> match IM.find_opt pid t.pid_nic with + List.fold_left (fun out (vmid, pid) -> + let listeners = Vmm_trie.collect vmid t'.name_sockets in + match listeners with + | [] -> Logs.warn (fun m -> m "nobody is listening") ; out + | xs -> match IM.find_opt pid t.pid_nic with | None -> Logs.warn (fun m -> m "couldn't find nics of %d" pid) ; out | Some (vmctx, nics) -> let ru, vmm, ifd = gather pid vmctx nics in @@ -115,9 +113,15 @@ let tick t = let vmm' = match vmm with None -> [] | Some xs -> List.combine !descr xs in ru', vmm', ifd in - let stats_encoded = Vmm_wire.Stats.(data 0L my_version name (encode_stats stats)) in - (socket, name, stats_encoded) :: out) - t'.name_sockets [] + List.fold_left (fun out (id, socket) -> + match Vmm_core.drop_super ~super:id ~sub:vmid with + | None -> Logs.err (fun m -> m "couldn't drop super %a from sub %a" Vmm_core.pp_id id Vmm_core.pp_id vmid) ; out + | Some real_id -> + let name = Vmm_core.string_of_id real_id in + let stats_encoded = Vmm_wire.Stats.(data 0L my_version name (encode_stats stats)) in + (socket, vmid, stats_encoded) :: out) + out xs) + [] (Vmm_trie.all t'.vmid_pid) in (t', outs) @@ -141,14 +145,15 @@ let add_pid t vmid pid nics = Logs.info (fun m -> m "adding %d %a with vmctx %b" pid pp_strings nics (match vmctx with Error _ -> false | Ok _ -> true)) ; let pid_nic = IM.add pid (vmctx, nic_ids) t.pid_nic - and vmid_pid = String.Map.add vmid pid t.vmid_pid + and vmid_pid, ret = Vmm_trie.insert vmid pid t.vmid_pid in + assert (ret = None) ; Ok { t with pid_nic ; vmid_pid } let remove_vmid t vmid = - Logs.info (fun m -> m "removing vmid %s" vmid) ; - match String.Map.find vmid t.vmid_pid with - | None -> Logs.warn (fun m -> m "no pid found for %s" vmid) ; t + Logs.info (fun m -> m "removing vmid %a" Vmm_core.pp_id vmid) ; + match Vmm_trie.find vmid t.vmid_pid with + | None -> Logs.warn (fun m -> m "no pid found for %a" Vmm_core.pp_id vmid) ; t | Some pid -> Logs.info (fun m -> m "removing pid %d" pid) ; (try @@ -158,7 +163,7 @@ let remove_vmid t vmid = with _ -> ()) ; let pid_nic = IM.remove pid t.pid_nic - and vmid_pid = String.Map.remove vmid t.vmid_pid + and vmid_pid = Vmm_trie.remove vmid t.vmid_pid in { t with pid_nic ; vmid_pid } @@ -173,22 +178,21 @@ let handle t socket hdr cs = Error (`Msg "cannot handle version") else decode_strings cs >>= fun (id, off) -> - let name = Vmm_core.string_of_id id in match int_to_op hdr.tag with | Some Add -> decode_pid_taps (Cstruct.shift cs off) >>= fun (pid, taps) -> - add_pid t name pid taps >>= fun t -> - Ok (t, `Add name, success ~msg:"added" my_version hdr.id (op_to_int Add)) + add_pid t id pid taps >>= fun t -> + Ok (t, `Add id, None, success ~msg:"added" my_version hdr.id (op_to_int Add)) | Some Remove -> - let t = remove_vmid t name in - Ok (t, `Remove name, success ~msg:"removed" my_version hdr.id (op_to_int Remove)) - | Some Stats -> - let name_sockets = String.Map.add name socket t.name_sockets in - Ok ({ t with name_sockets }, `None, success ~msg:"subscribed" my_version hdr.id (op_to_int Stats)) + let t = remove_vmid t id in + Ok (t, `Remove id, None, success ~msg:"removed" my_version hdr.id (op_to_int Remove)) + | Some Subscribe -> + let name_sockets, close = Vmm_trie.insert id socket t.name_sockets in + Ok ({ t with name_sockets }, `None, close, success ~msg:"subscribed" my_version hdr.id (op_to_int Subscribe)) | _ -> Error (`Msg "unknown command") in match r with - | Ok (t, action, out) -> t, action, out + | Ok (t, action, close, out) -> t, action, close, out | Error (`Msg msg) -> Logs.err (fun m -> m "error while processing %s" msg) ; - t, `None, fail ~msg my_version hdr.id + t, `None, None, fail ~msg my_version hdr.id diff --git a/stats/vmm_stats_lwt.ml b/stats/vmm_stats_lwt.ml index 04d754d..0300e4d 100644 --- a/stats/vmm_stats_lwt.ml +++ b/stats/vmm_stats_lwt.ml @@ -28,15 +28,14 @@ let handle s addr () = | Error (`Msg msg) -> Logs.err (fun m -> m "error while reading %s" msg) ; loop acc | Error _ -> Logs.err (fun m -> m "exception while reading") ; Lwt.return acc | Ok (hdr, data) -> - Logs.debug (fun m -> m "received %a" Cstruct.hexdump_pp data) ; - let t', action, out = Vmm_stats.handle !t s hdr data in + let t', action, close, out = Vmm_stats.handle !t s hdr data in let acc = match action with | `Add pid -> pid :: acc | `Remove pid -> List.filter (fun m -> m <> pid) acc | `None -> acc in t := t' ; - Logs.debug (fun m -> m "sent %a" Cstruct.hexdump_pp out) ; + (match close with None -> Lwt.return_unit | Some s' -> Vmm_lwt.safe_close s') >>= fun () -> Vmm_lwt.write_wire s out >>= function | Ok () -> loop acc | Error _ -> Logs.err (fun m -> m "exception while writing") ; Lwt.return acc @@ -54,7 +53,7 @@ let rec timer interval () = Vmm_lwt.write_wire s stat >>= function | Ok () -> Lwt.return_unit | Error `Exception -> - t := Vmm_stats.remove_socket !t name ; + t := Vmm_stats.remove_entry !t name ; Vmm_lwt.safe_close s) outs >>= fun () -> Lwt_unix.sleep interval >>= fun () ->