diff --git a/app/vmm_console.ml b/app/vmm_console.ml index 10475c9..7365cca 100644 --- a/app/vmm_console.ml +++ b/app/vmm_console.ml @@ -32,7 +32,7 @@ let read_console name ring channel () = | None -> Lwt.return_unit | Some fd -> let header = Vmm_asn.{ version = my_version ; sequence = 0L ; id } in - Vmm_lwt.write_wire fd (header, `Command (`Console_cmd (`Console_data (t, line)))) >>= function + Vmm_lwt.write_wire fd (header, `Data (`Console_data (t, line))) >>= function | Error _ -> Vmm_lwt.safe_close fd >|= fun () -> active := String.Map.remove name !active @@ -92,7 +92,7 @@ let subscribe s id = Logs.debug (fun m -> m "found %d history" (List.length entries)) ; Lwt_list.iter_s (fun (i, v) -> let header = Vmm_asn.{ version = my_version ; sequence = 0L ; id } in - Vmm_lwt.write_wire s (header, `Command (`Console_cmd (`Console_data (i, v)))) >|= fun _ -> ()) + Vmm_lwt.write_wire s (header, `Data (`Console_data (i, v))) >|= fun _ -> ()) entries >>= fun () -> (match String.Map.find name !active with | None -> Lwt.return_unit @@ -114,8 +114,7 @@ let handle s addr () = else match cmd with | `Console_add -> add_fifo header.Vmm_asn.id - | `Console_subscribe -> subscribe s header.Vmm_asn.id - | `Console_data _ -> Lwt.return (Error (`Msg "unexpected command"))) >>= (function + | `Console_subscribe -> subscribe s header.Vmm_asn.id) >>= (function | Ok msg -> Vmm_lwt.write_wire s (header, `Success (`String msg)) | Error (`Msg msg) -> Logs.err (fun m -> m "error while processing command: %s" msg) ; diff --git a/app/vmm_influxdb_stats.ml b/app/vmm_influxdb_stats.ml index ef7d9b2..d6b172c 100644 --- a/app/vmm_influxdb_stats.ml +++ b/app/vmm_influxdb_stats.ml @@ -189,7 +189,7 @@ let rec read_sock_write_tcp c ?fd addr addrtype = safe_close fd >>= fun () -> safe_close c >|= fun () -> true - | Ok (hdr, `Command (`Stats_cmd (`Stats_data (ru, vmm, ifs)))) -> + | Ok (hdr, `Data (`Stats_data (ru, vmm, ifs))) -> begin if not (Vmm_asn.version_eq hdr.Vmm_asn.version my_version) then begin Logs.err (fun m -> m "unknown wire protocol version") ; diff --git a/app/vmm_log.ml b/app/vmm_log.ml index 8d688c1..089e188 100644 --- a/app/vmm_log.ml +++ b/app/vmm_log.ml @@ -73,9 +73,7 @@ let send_history s ring id = (* just need a wrapper in tag = Log.Data, id = reqid *) Lwt_list.fold_left_s (fun r (header, ts, event) -> match r with - | Ok () -> - let data = header, `Command (`Log_cmd (`Log_data (ts, event))) in - Vmm_lwt.write_wire s data + | Ok () -> Vmm_lwt.write_wire s (header, `Data (`Log_data (ts, event))) | Error e -> Lwt.return (Error e)) (Ok ()) res @@ -91,24 +89,29 @@ let handle mvar ring s addr () = | Error _ -> Logs.err (fun m -> m "exception while reading") ; Lwt.return_unit + | Ok (hdr, `Data (`Log_data (ts, event))) -> + if not (Vmm_asn.version_eq hdr.Vmm_asn.version my_version) then begin + Logs.warn (fun m -> m "unsupported version") ; + Lwt.return_unit + end else begin + let data = Vmm_asn.log_entry_to_cstruct (hdr, ts, event) in + Vmm_ring.write ring (ts, Cstruct.to_string data) ; + Lwt_mvar.put mvar data >>= fun () -> + let data' = + let header = Vmm_asn.{ version = my_version ; sequence = !bcast ; id = hdr.Vmm_asn.id } in + (header, `Data (`Log_data (ts, event))) + in + bcast := Int64.succ !bcast ; + broadcast hdr.Vmm_asn.id data' !tree >>= fun tree' -> + tree := tree' ; + loop () + end | Ok (hdr, `Command (`Log_cmd lc)) -> if not (Vmm_asn.version_eq hdr.Vmm_asn.version my_version) then begin Logs.warn (fun m -> m "unsupported version") ; Lwt.return_unit end else begin match lc with - | `Log_data (ts, event) -> - let data = Vmm_asn.log_entry_to_cstruct (hdr, ts, event) in - Vmm_ring.write ring (ts, Cstruct.to_string data) ; - Lwt_mvar.put mvar data >>= fun () -> - let data' = - let header = Vmm_asn.{ version = my_version ; sequence = !bcast ; id = hdr.Vmm_asn.id } in - (header, `Command (`Log_cmd (`Log_data (ts, event)))) - in - bcast := Int64.succ !bcast ; - broadcast hdr.Vmm_asn.id data' !tree >>= fun tree' -> - tree := tree' ; - loop () | `Log_subscribe -> let tree', ret = Vmm_trie.insert hdr.Vmm_asn.id s !tree in tree := tree' ; diff --git a/app/vmmd.ml b/app/vmmd.ml index fea7f31..e0238b7 100644 --- a/app/vmmd.ml +++ b/app/vmmd.ml @@ -40,6 +40,9 @@ let create c_fd process cont = | `Failure f -> Logs.err (fun m -> m "console failed with %s" f) ; Lwt.return_unit + | `Data _ -> + Logs.err (fun m -> m "console replied with data") ; + Lwt.return_unit | `Success _msg -> (* assert hdr.id = id! *) let await, wakeme = Lwt.wait () in diff --git a/src/vmm_asn.ml b/src/vmm_asn.ml index a233928..7f2df72 100644 --- a/src/vmm_asn.ml +++ b/src/vmm_asn.ml @@ -106,32 +106,24 @@ let version_eq a b = type console_cmd = [ | `Console_add | `Console_subscribe - | `Console_data of Ptime.t * string ] let pp_console_cmd ppf = function | `Console_add -> Fmt.string ppf "console add" | `Console_subscribe -> Fmt.string ppf "console subscribe" - | `Console_data (ts, line) -> Fmt.pf ppf "console data %a: %s" - (Ptime.pp_rfc3339 ()) ts line let console_cmd = let f = function | `C1 () -> `Console_add | `C2 () -> `Console_subscribe - | `C3 (timestamp, data) -> `Console_data (timestamp, data) and g = function | `Console_add -> `C1 () | `Console_subscribe -> `C2 () - | `Console_data (timestamp, data) -> `C3 (timestamp, data) in Asn.S.map f g @@ - Asn.S.(choice3 + Asn.S.(choice2 (explicit 0 null) - (explicit 1 null) - (explicit 2 (sequence2 - (required ~label:"timestamp" utc_time) - (required ~label:"data" utf8_string)))) + (explicit 1 null)) (* TODO is this good? *) let int64 = @@ -211,41 +203,30 @@ type stats_cmd = [ | `Stats_add of int * string list | `Stats_remove | `Stats_subscribe - | `Stats_data of stats ] let pp_stats_cmd ppf = function | `Stats_add (pid, taps) -> Fmt.pf ppf "stats add: pid %d taps %a" pid Fmt.(list ~sep:(unit ", ") string) taps | `Stats_remove -> Fmt.string ppf "stat remove" | `Stats_subscribe -> Fmt.string ppf "stat subscribe" - | `Stats_data stats -> Fmt.pf ppf "stats data: %a" pp_stats stats let stats_cmd = let f = function | `C1 (pid, taps) -> `Stats_add (pid, taps) | `C2 () -> `Stats_remove | `C3 () -> `Stats_subscribe - | `C4 (ru, ifs, vmm) -> `Stats_data (ru, vmm, ifs) and g = function | `Stats_add (pid, taps) -> `C1 (pid, taps) | `Stats_remove -> `C2 () | `Stats_subscribe -> `C3 () - | `Stats_data (ru, ifs, vmm) -> `C4 (ru, vmm, ifs) in Asn.S.map f g @@ - Asn.S.(choice4 + Asn.S.(choice3 (explicit 0 (sequence2 (required ~label:"pid" int) (required ~label:"taps" (sequence_of utf8_string)))) (explicit 1 null) - (explicit 2 null) - (explicit 3 (sequence3 - (required ~label:"resource_usage" ru) - (required ~label:"ifdata" (sequence_of ifdata)) - (optional ~label:"vmm_stats" - (sequence_of (sequence2 - (required ~label:"key" utf8_string) - (required ~label:"value" int64))))))) + (explicit 2 null)) let addr = Asn.S.(sequence2 @@ -295,28 +276,20 @@ let log_event = (explicit 2 int)))))) type log_cmd = [ - | `Log_data of Ptime.t * Log.event | `Log_subscribe ] let pp_log_cmd ppf = function - | `Log_data (ts, event) -> Fmt.pf ppf "log data: %a %a" (Ptime.pp_rfc3339 ()) ts Log.pp_event event | `Log_subscribe -> Fmt.string ppf "log subscribe" let log_cmd = let f = function - | `C1 (timestamp, event) -> `Log_data (timestamp, event) - | `C2 () -> `Log_subscribe + | () -> `Log_subscribe and g = function - | `Log_data (timestamp, event) -> `C1 (timestamp, event) - | `Log_subscribe -> `C2 () + | `Log_subscribe -> () in Asn.S.map f g @@ - Asn.S.(choice2 - (explicit 0 (sequence2 - (required ~label:"timestamp" utc_time) - (required ~label:"event" log_event))) - (explicit 1 null)) + Asn.S.null type vm_cmd = [ | `Vm_info @@ -444,6 +417,45 @@ let wire_command : wire_command Asn.S.t = (explicit 3 vm_cmd) (explicit 4 policy_cmd)) +type data = [ + | `Console_data of Ptime.t * string + | `Stats_data of stats + | `Log_data of Ptime.t * Log.event +] + +let pp_data ppf = function + | `Console_data (ts, line) -> Fmt.pf ppf "console data %a: %s" + (Ptime.pp_rfc3339 ()) ts line + | `Stats_data stats -> Fmt.pf ppf "stats data: %a" pp_stats stats + | `Log_data (ts, event) -> Fmt.pf ppf "log data: %a %a" (Ptime.pp_rfc3339 ()) ts Log.pp_event event + +let data = + let f = function + | `C1 (timestamp, data) -> `Console_data (timestamp, data) + | `C2 (ru, ifs, vmm) -> `Stats_data (ru, vmm, ifs) + | `C3 (timestamp, event) -> `Log_data (timestamp, event) + and g = function + | `Console_data (timestamp, data) -> `C1 (timestamp, data) + | `Stats_data (ru, ifs, vmm) -> `C2 (ru, vmm, ifs) + | `Log_data (timestamp, event) -> `C3 (timestamp, event) + in + Asn.S.map f g @@ + Asn.S.(choice3 + (explicit 0 (sequence2 + (required ~label:"timestamp" utc_time) + (required ~label:"data" utf8_string))) + (explicit 1 (sequence3 + (required ~label:"resource_usage" ru) + (required ~label:"ifdata" (sequence_of ifdata)) + (optional ~label:"vmm_stats" + (sequence_of (sequence2 + (required ~label:"key" utf8_string) + (required ~label:"value" int64)))))) + (explicit 2 (sequence2 + (required ~label:"timestamp" utc_time) + (required ~label:"event" log_event)))) + + type header = { version : version ; sequence : int64 ; @@ -471,7 +483,8 @@ let pp_success ppf = function type wire = header * [ | `Command of wire_command | `Success of success - | `Failure of string ] + | `Failure of string + | `Data of data ] let pp_wire ppf (header, data) = let id = header.id in @@ -479,6 +492,7 @@ let pp_wire ppf (header, data) = | `Command c -> Fmt.pf ppf "host %a: %a" pp_id id pp_wire_command c | `Failure f -> Fmt.pf ppf "host %a: command failed %s" pp_id id f | `Success s -> Fmt.pf ppf "host %a: %a" pp_id id pp_success s + | `Data d -> pp_data ppf d let wire = let f (header, payload) = @@ -494,6 +508,7 @@ let wire = in `Success p | `C3 str -> `Failure str + | `C4 data -> `Data data and g (header, payload) = header, match payload with @@ -507,12 +522,13 @@ let wire = in `C2 p | `Failure str -> `C3 str + | `Data d -> `C4 d in Asn.S.map f g @@ Asn.S.(sequence2 (required ~label:"header" header) (required ~label:"payload" - (choice3 + (choice4 (explicit 0 wire_command) (explicit 1 (choice4 (explicit 0 null) @@ -525,7 +541,8 @@ let wire = (sequence2 (required ~label:"name" (sequence_of utf8_string)) (required ~label:"vm_config" vm_config)))))) - (explicit 2 utf8_string)))) + (explicit 2 utf8_string) + (explicit 3 data)))) let wire_of_cstruct, wire_to_cstruct = projections_of wire diff --git a/src/vmm_asn.mli b/src/vmm_asn.mli index ea1a5eb..ff5ec93 100644 --- a/src/vmm_asn.mli +++ b/src/vmm_asn.mli @@ -23,18 +23,15 @@ val pp_version : version Fmt.t type console_cmd = [ | `Console_add | `Console_subscribe - | `Console_data of Ptime.t * string ] type stats_cmd = [ | `Stats_add of int * string list | `Stats_remove | `Stats_subscribe - | `Stats_data of stats ] type log_cmd = [ - | `Log_data of Ptime.t * Log.event | `Log_subscribe ] @@ -60,6 +57,14 @@ type wire_command = [ val pp_wire_command : wire_command Fmt.t +type data = [ + | `Console_data of Ptime.t * string + | `Stats_data of stats + | `Log_data of Ptime.t * Log.event +] + +val pp_data : data Fmt.t + type header = { version : version ; sequence : int64 ; @@ -69,7 +74,8 @@ type header = { type wire = header * [ | `Command of wire_command | `Success of [ `Empty | `String of string | `Policies of (id * policy) list | `Vms of (id * vm_config) list ] - | `Failure of string ] + | `Failure of string + | `Data of data ] val pp_wire : wire Fmt.t diff --git a/src/vmm_engine.ml b/src/vmm_engine.ml index 0df0648..81e2aca 100644 --- a/src/vmm_engine.ml +++ b/src/vmm_engine.ml @@ -38,7 +38,7 @@ let log t id event = let header = Vmm_asn.{ version = t.wire_version ; sequence = t.log_counter ; id } in let log_counter = Int64.succ t.log_counter in Logs.debug (fun m -> m "LOG %a" Log.pp_event event) ; - ({ t with log_counter }, `Log (header, `Command (`Log_cmd data))) + ({ t with log_counter }, `Log (header, `Data data)) let handle_create t hdr vm_config = let name = hdr.Vmm_asn.id in @@ -80,13 +80,12 @@ let handle_shutdown t name vm r = | Ok () -> () | Error (`Msg e) -> Logs.warn (fun m -> m "%s while shutdown vm %a" e pp_vm vm)) ; let resources = Vmm_resources.remove t.resources name in - let stat_out = `Stats_remove in let header = Vmm_asn.{ version = t.wire_version ; sequence = t.stats_counter ; id = name } in let tasks = String.Map.remove (string_of_id name) t.tasks in let t = { t with stats_counter = Int64.succ t.stats_counter ; resources ; tasks } in let t, logout = log t name (`VM_stop (vm.pid, r)) in - (t, [ `Stat (header, `Command (`Stats_cmd stat_out)) ; logout ]) + (t, [ `Stat (header, `Command (`Stats_cmd `Stats_remove)) ; logout ]) let handle_command t (header, payload) = let msg_to_err = function diff --git a/stats/vmm_stats.ml b/stats/vmm_stats.ml index 208434c..02f8424 100644 --- a/stats/vmm_stats.ml +++ b/stats/vmm_stats.ml @@ -121,8 +121,7 @@ let tick t = | Some real_id -> let header = Vmm_asn.{ version = my_version ; sequence = !bcast ; id = real_id } in bcast := Int64.succ !bcast ; - let data = `Stats_data stats in - ((socket, vmid, (header, `Command (`Stats_cmd data))) :: out)) + ((socket, vmid, (header, `Data (`Stats_data stats))) :: out)) out xs) [] (Vmm_trie.all t'.vmid_pid) in @@ -192,7 +191,6 @@ let handle t socket (header, wire) = | `Stats_subscribe -> let name_sockets, close = Vmm_trie.insert id socket t.name_sockets in Ok ({ t with name_sockets }, `None, close, Some "subscribed") - | _ -> Error (`Msg "unknown command") end | _ -> Logs.warn (fun m -> m "ignoring %a" Vmm_asn.pp_wire (header, wire)) ;