diff --git a/app/vmm_client.ml b/app/vmm_client.ml index bebe90f..f38b58c 100644 --- a/app/vmm_client.ml +++ b/app/vmm_client.ml @@ -6,7 +6,7 @@ let rec read_tls_write_cons t = Vmm_tls.read_tls t >>= function | Error _ -> Logs.err (fun m -> m "exception while reading") ; Lwt.return_unit | Ok wire -> - Logs.app (fun m -> m "%a" Vmm_asn.pp_wire wire) ; + Logs.app (fun m -> m "%a" Vmm_commands.pp_wire wire) ; read_tls_write_cons t let client cas host port cert priv_key = diff --git a/app/vmm_console.ml b/app/vmm_console.ml index 7365cca..1fb787d 100644 --- a/app/vmm_console.ml +++ b/app/vmm_console.ml @@ -31,7 +31,7 @@ let read_console name ring channel () = (match String.Map.find name !active with | None -> Lwt.return_unit | Some fd -> - let header = Vmm_asn.{ version = my_version ; sequence = 0L ; id } in + let header = Vmm_commands.{ version = my_version ; sequence = 0L ; id } in Vmm_lwt.write_wire fd (header, `Data (`Console_data (t, line))) >>= function | Error _ -> Vmm_lwt.safe_close fd >|= fun () -> @@ -91,7 +91,7 @@ let subscribe s id = let entries = Vmm_ring.read r in Logs.debug (fun m -> m "found %d history" (List.length entries)) ; Lwt_list.iter_s (fun (i, v) -> - let header = Vmm_asn.{ version = my_version ; sequence = 0L ; id } in + let header = Vmm_commands.{ version = my_version ; sequence = 0L ; id } in Vmm_lwt.write_wire s (header, `Data (`Console_data (i, v))) >|= fun _ -> ()) entries >>= fun () -> (match String.Map.find name !active with @@ -109,12 +109,12 @@ let handle s addr () = Lwt.return_unit | Ok (header, `Command (`Console_cmd cmd)) -> begin - (if not (Vmm_asn.version_eq header.Vmm_asn.version my_version) then + (if not (Vmm_commands.version_eq header.Vmm_commands.version my_version) then Lwt.return (Error (`Msg "ignoring data with bad version")) else match cmd with - | `Console_add -> add_fifo header.Vmm_asn.id - | `Console_subscribe -> subscribe s header.Vmm_asn.id) >>= (function + | `Console_add -> add_fifo header.Vmm_commands.id + | `Console_subscribe -> subscribe s header.Vmm_commands.id) >>= (function | Ok msg -> Vmm_lwt.write_wire s (header, `Success (`String msg)) | Error (`Msg msg) -> Logs.err (fun m -> m "error while processing command: %s" msg) ; @@ -125,7 +125,7 @@ let handle s addr () = Lwt.return_unit end | Ok wire -> - Logs.warn (fun m -> m "ignoring %a" Vmm_asn.pp_wire wire) ; + Logs.warn (fun m -> m "ignoring %a" Vmm_commands.pp_wire wire) ; loop () in loop () >>= fun () -> diff --git a/app/vmm_influxdb_stats.ml b/app/vmm_influxdb_stats.ml index d6b172c..f53ce3c 100644 --- a/app/vmm_influxdb_stats.ml +++ b/app/vmm_influxdb_stats.ml @@ -5,6 +5,7 @@ open Lwt.Infix open Astring open Vmm_core +open Vmm_core.Stats (* @@ -191,13 +192,13 @@ let rec read_sock_write_tcp c ?fd addr addrtype = true | Ok (hdr, `Data (`Stats_data (ru, vmm, ifs))) -> begin - if not (Vmm_asn.version_eq hdr.Vmm_asn.version my_version) then begin + if not (Vmm_commands.version_eq hdr.Vmm_commands.version my_version) then begin Logs.err (fun m -> m "unknown wire protocol version") ; safe_close fd >>= fun () -> safe_close c >|= fun () -> false end else - let name = string_of_id hdr.Vmm_asn.id in + let name = string_of_id hdr.Vmm_commands.id in let ru = P.encode_ru name ru in let vmm = match vmm with None -> [] | Some xs -> [ P.encode_vmm name xs ] in let taps = List.map (P.encode_if name) ifs in @@ -214,12 +215,12 @@ let rec read_sock_write_tcp c ?fd addr addrtype = false end | Ok wire -> - Logs.warn (fun m -> m "ignoring %a" Vmm_asn.pp_wire wire) ; + Logs.warn (fun m -> m "ignoring %a" Vmm_commands.pp_wire wire) ; Lwt.return (Some fd) >>= fun fd -> read_sock_write_tcp c ?fd addr addrtype let query_sock vm c = - let header = Vmm_asn.{ version = my_version ; sequence = !command ; id = vm } in + let header = Vmm_commands.{ version = my_version ; sequence = !command ; id = vm } in command := Int64.succ !command ; Logs.debug (fun m -> m "%Lu requesting %a via socket" !command pp_id vm) ; Vmm_lwt.write_wire c (header, `Command (`Stats_cmd `Stats_subscribe)) diff --git a/app/vmm_log.ml b/app/vmm_log.ml index 089e188..bc6bc73 100644 --- a/app/vmm_log.ml +++ b/app/vmm_log.ml @@ -55,25 +55,26 @@ let write_to_file file = let tree = ref Vmm_trie.empty -let bcast = ref 0L - let send_history s ring id = let elements = Vmm_ring.read ring in let res = List.fold_left (fun acc (_, x) -> let cs = Cstruct.of_string x in match Vmm_asn.log_entry_of_cstruct cs with - | Ok (header, ts, event) -> - if Vmm_core.is_sub_id ~super:id ~sub:header.Vmm_asn.id - then (header, ts, event) :: acc + | Ok (ts, event) -> + let sub = Vmm_core.Log.name event in + if Vmm_core.is_sub_id ~super:id ~sub + then (ts, event) :: acc else acc | _ -> acc) [] elements in (* just need a wrapper in tag = Log.Data, id = reqid *) - Lwt_list.fold_left_s (fun r (header, ts, event) -> + Lwt_list.fold_left_s (fun r (ts, event) -> match r with - | Ok () -> Vmm_lwt.write_wire s (header, `Data (`Log_data (ts, event))) + | Ok () -> + let header = Vmm_commands.{ version = my_version ; sequence = 0L ; id } in + Vmm_lwt.write_wire s (header, `Data (`Log_data (ts, event))) | Error e -> Lwt.return (Error e)) (Ok ()) res @@ -90,30 +91,29 @@ let handle mvar ring s addr () = Logs.err (fun m -> m "exception while reading") ; Lwt.return_unit | Ok (hdr, `Data (`Log_data (ts, event))) -> - if not (Vmm_asn.version_eq hdr.Vmm_asn.version my_version) then begin + if not (Vmm_commands.version_eq hdr.Vmm_commands.version my_version) then begin Logs.warn (fun m -> m "unsupported version") ; Lwt.return_unit end else begin - let data = Vmm_asn.log_entry_to_cstruct (hdr, ts, event) in + let data = Vmm_asn.log_entry_to_cstruct (ts, event) in Vmm_ring.write ring (ts, Cstruct.to_string data) ; Lwt_mvar.put mvar data >>= fun () -> let data' = - let header = Vmm_asn.{ version = my_version ; sequence = !bcast ; id = hdr.Vmm_asn.id } in + let header = Vmm_commands.{ version = my_version ; sequence = 0L ; id = hdr.Vmm_commands.id } in (header, `Data (`Log_data (ts, event))) in - bcast := Int64.succ !bcast ; - broadcast hdr.Vmm_asn.id data' !tree >>= fun tree' -> + broadcast hdr.Vmm_commands.id data' !tree >>= fun tree' -> tree := tree' ; loop () end | Ok (hdr, `Command (`Log_cmd lc)) -> - if not (Vmm_asn.version_eq hdr.Vmm_asn.version my_version) then begin + if not (Vmm_commands.version_eq hdr.Vmm_commands.version my_version) then begin Logs.warn (fun m -> m "unsupported version") ; Lwt.return_unit end else begin match lc with | `Log_subscribe -> - let tree', ret = Vmm_trie.insert hdr.Vmm_asn.id s !tree in + let tree', ret = Vmm_trie.insert hdr.Vmm_commands.id s !tree in tree := tree' ; (match ret with | None -> Lwt.return_unit @@ -124,14 +124,14 @@ let handle mvar ring s addr () = Logs.err (fun m -> m "error while sending reply for subscribe") ; Lwt.return_unit | Ok () -> - send_history s ring hdr.Vmm_asn.id >>= function + send_history s ring hdr.Vmm_commands.id >>= function | Error _ -> Logs.err (fun m -> m "error while sending history") ; Lwt.return_unit | Ok () -> loop () (* TODO no need to loop ;) *) end | Ok wire -> - Logs.warn (fun m -> m "ignoring %a" Vmm_asn.pp_wire wire) ; + Logs.warn (fun m -> m "ignoring %a" Vmm_commands.pp_wire wire) ; loop () in loop () >>= fun () -> diff --git a/app/vmm_tls_endpoint.ml b/app/vmm_tls_endpoint.ml index 852e090..8c8f8f6 100644 --- a/app/vmm_tls_endpoint.ml +++ b/app/vmm_tls_endpoint.ml @@ -44,7 +44,7 @@ let read fd tls = Vmm_lwt.read_wire fd >>= function | Error _ -> Lwt.return (Error (`Msg "exception while reading")) | Ok wire -> - Logs.debug (fun m -> m "read proxying %a" Vmm_asn.pp_wire wire) ; + Logs.debug (fun m -> m "read proxying %a" Vmm_commands.pp_wire wire) ; Vmm_tls.write_tls tls wire >>= function | Ok () -> loop () | Error `Exception -> Lwt.return (Error (`Msg "exception")) @@ -55,7 +55,7 @@ let process fd tls = Vmm_lwt.read_wire fd >>= function | Error _ -> Lwt.return (Error (`Msg "read error")) | Ok wire -> - Logs.debug (fun m -> m "proxying %a" Vmm_asn.pp_wire wire) ; + Logs.debug (fun m -> m "proxying %a" Vmm_commands.pp_wire wire) ; Vmm_tls.write_tls tls wire >|= function | Ok () -> Ok () | Error `Exception -> Error (`Msg "exception on write") @@ -65,10 +65,10 @@ let handle ca (tls, addr) = match Vmm_x509.handle addr chain with | Error (`Msg m) -> Lwt.fail_with m | Ok (name, cmd) -> - let sock, next = Vmm_commands.handle cmd in + let sock, next = Vmm_commands.endpoint cmd in connect (Vmm_core.socket_path sock) >>= fun fd -> let wire = - let header = Vmm_asn.{version = my_version ; sequence = !command ; id = name } in + let header = Vmm_commands.{version = my_version ; sequence = !command ; id = name } in command := Int64.succ !command ; (header, `Command cmd) in diff --git a/app/vmmc.ml b/app/vmmc.ml index 04ca11f..5904d32 100644 --- a/app/vmmc.ml +++ b/app/vmmc.ml @@ -13,8 +13,8 @@ let process fd = | Error _ -> Error (`Msg "read or parse error") | Ok (header, reply) -> - if Vmm_asn.version_eq header.Vmm_asn.version version then begin - Logs.app (fun m -> m "%a" Vmm_asn.pp_wire (header, reply)) ; + if Vmm_commands.version_eq header.Vmm_commands.version version then begin + Logs.app (fun m -> m "%a" Vmm_commands.pp_wire (header, reply)) ; Ok () end else begin Logs.err (fun m -> m "version not equal") ; @@ -40,10 +40,10 @@ let read fd = in loop () -let handle opt_socket id (cmd : Vmm_asn.wire_command) = - let sock, next = Vmm_commands.handle cmd in +let handle opt_socket id (cmd : Vmm_commands.t) = + let sock, next = Vmm_commands.endpoint cmd in connect (socket sock opt_socket) >>= fun fd -> - let header = Vmm_asn.{ version ; sequence = 0L ; id } in + let header = Vmm_commands.{ version ; sequence = 0L ; id } in Vmm_lwt.write_wire fd (header, `Command cmd) >>= function | Error `Exception -> Lwt.return (Error (`Msg "couldn't write")) | Ok () -> diff --git a/app/vmmd.ml b/app/vmmd.ml index e0238b7..5a22172 100644 --- a/app/vmmd.ml +++ b/app/vmmd.ml @@ -29,7 +29,7 @@ let create c_fd process cont = Logs.err (fun m -> m "error while reading from console") ; Lwt.return_unit | Ok (header, wire) -> - if not (Vmm_asn.version_eq version header.Vmm_asn.version) then begin + if not (Vmm_commands.version_eq version header.Vmm_commands.version) then begin Logs.err (fun m -> m "invalid version while reading from console") ; Lwt.return_unit end else diff --git a/provision/vmm_sign.ml b/provision/vmm_sign.ml index 425c7b2..26ece81 100644 --- a/provision/vmm_sign.ml +++ b/provision/vmm_sign.ml @@ -29,12 +29,12 @@ let sign dbname cacert key csr days = with | [ (_, `Unsupported (_, v)) as ext ] -> Vmm_asn.cert_extension_of_cstruct v >>= fun (version, cmd) -> - (if Vmm_asn.version_eq version asn_version then + (if Vmm_commands.version_eq version asn_version then Ok () else Error (`Msg "unknown version in request")) >>= fun () -> (* TODO l_exts / d_exts trouble *) - Logs.app (fun m -> m "signing %a" Vmm_asn.pp_wire_command cmd) ; + Logs.app (fun m -> m "signing %a" Vmm_commands.pp cmd) ; Ok (ext :: l_exts) >>= fun extensions -> sign ~dbname extensions issuer key csr (Duration.of_day days) | _ -> Error (`Msg "none or multiple albatross extensions found") diff --git a/src/vmm_asn.ml b/src/vmm_asn.ml index 7f2df72..1313e91 100644 --- a/src/vmm_asn.ml +++ b/src/vmm_asn.ml @@ -1,6 +1,7 @@ (* (c) 2017 Hannes Mehnert, all rights reserved *) open Vmm_core +open Vmm_commands open Rresult open Astring @@ -86,32 +87,6 @@ let image = (explicit 1 octet_string) (explicit 2 octet_string)) -type version = [ `AV0 | `AV1 | `AV2 ] - -let pp_version ppf v = - Fmt.int ppf - (match v with - | `AV0 -> 0 - | `AV1 -> 1 - | `AV2 -> 2) - -let version_eq a b = - match a, b with - | `AV0, `AV0 -> true - | `AV1, `AV1 -> true - | `AV2, `AV2 -> true - | _ -> false - -(* communication protocol *) -type console_cmd = [ - | `Console_add - | `Console_subscribe -] - -let pp_console_cmd ppf = function - | `Console_add -> Fmt.string ppf "console add" - | `Console_subscribe -> Fmt.string ppf "console subscribe" - let console_cmd = let f = function | `C1 () -> `Console_add @@ -141,6 +116,7 @@ let timeval = (required ~label:"microseconds" int)) let ru = + let open Stats in let f (utime, (stime, (maxrss, (ixrss, (idrss, (isrss, (minflt, (majflt, (nswap, (inblock, (outblock, (msgsnd, (msgrcv, (nsignals, (nvcsw, nivcsw))))))))))))))) = { utime ; stime ; maxrss ; ixrss ; idrss ; isrss ; minflt ; majflt ; nswap ; inblock ; outblock ; msgsnd ; msgrcv ; nsignals ; nvcsw ; nivcsw } and g ru = @@ -173,6 +149,7 @@ let int32 = Asn.S.map f g Asn.S.int let ifdata = + let open Stats in let f (name, (flags, (send_length, (max_send_length, (send_drops, (mtu, (baudrate, (input_packets, (input_errors, (output_packets, (output_errors, (collisions, (input_bytes, (output_bytes, (input_mcast, (output_mcast, (input_dropped, output_dropped))))))))))))))))) = { name; flags; send_length; max_send_length; send_drops; mtu; baudrate; input_packets; input_errors; output_packets; output_errors; collisions; input_bytes; output_bytes; input_mcast; output_mcast; input_dropped; output_dropped } and g i = @@ -199,17 +176,6 @@ let ifdata = @ (required ~label:"input_dropped" int64) -@ (required ~label:"output_dropped" int64)) -type stats_cmd = [ - | `Stats_add of int * string list - | `Stats_remove - | `Stats_subscribe -] - -let pp_stats_cmd ppf = function - | `Stats_add (pid, taps) -> Fmt.pf ppf "stats add: pid %d taps %a" pid Fmt.(list ~sep:(unit ", ") string) taps - | `Stats_remove -> Fmt.string ppf "stat remove" - | `Stats_subscribe -> Fmt.string ppf "stat subscribe" - let stats_cmd = let f = function | `C1 (pid, taps) -> `Stats_add (pid, taps) @@ -228,60 +194,56 @@ let stats_cmd = (explicit 1 null) (explicit 2 null)) -let addr = - Asn.S.(sequence2 - (required ~label:"ip" ipv4) - (required ~label:"port" int)) - let log_event = let f = function | `C1 () -> `Startup - | `C2 (ip, port) -> `Login (ip, port) - | `C3 (ip, port) -> `Logout (ip, port) - | `C4 (pid, taps, block) -> `VM_start (pid, taps, block) - | `C5 (pid, status) -> + | `C2 (name, ip, port) -> `Login (name, ip, port) + | `C3 (name, ip, port) -> `Logout (name, ip, port) + | `C4 (name, pid, taps, block) -> `Vm_start (name, pid, taps, block) + | `C5 (name, pid, status) -> let status' = match status with | `C1 n -> `Exit n | `C2 n -> `Signal n | `C3 n -> `Stop n in - `VM_stop (pid, status') + `Vm_stop (name, pid, status') and g = function | `Startup -> `C1 () - | `Login (ip, port) -> `C2 (ip, port) - | `Logout (ip, port) -> `C3 (ip, port) - | `VM_start (pid, taps, block) -> `C4 (pid, taps, block) - | `VM_stop (pid, status) -> + | `Login (name, ip, port) -> `C2 (name, ip, port) + | `Logout (name, ip, port) -> `C3 (name, ip, port) + | `Vm_start (name, pid, taps, block) -> `C4 (name, pid, taps, block) + | `Vm_stop (name, pid, status) -> let status' = match status with | `Exit n -> `C1 n | `Signal n -> `C2 n | `Stop n -> `C3 n in - `C5 (pid, status') + `C5 (name, pid, status') + in + let endp = + Asn.S.(sequence3 + (required ~label:"name" (sequence_of utf8_string)) + (required ~label:"ip" ipv4) + (required ~label:"port" int)) in Asn.S.map f g @@ Asn.S.(choice5 (explicit 0 null) - (explicit 1 addr) - (explicit 2 addr) - (explicit 3 (sequence3 - (required ~label:"pid" int) - (required ~label:"taps" (sequence_of utf8_string)) - (optional ~label:"block" utf8_string))) - (explicit 4 (sequence2 + (explicit 1 endp) + (explicit 2 endp) + (explicit 3 (sequence4 + (required ~label:"name" (sequence_of utf8_string)) + (required ~label:"pid" int) + (required ~label:"taps" (sequence_of utf8_string)) + (optional ~label:"block" utf8_string))) + (explicit 4 (sequence3 + (required ~label:"name" (sequence_of utf8_string)) (required ~label:"pid" int) (required ~label:"status" (choice3 (explicit 0 int) (explicit 1 int) (explicit 2 int)))))) -type log_cmd = [ - | `Log_subscribe -] - -let pp_log_cmd ppf = function - | `Log_subscribe -> Fmt.string ppf "log subscribe" - let log_cmd = let f = function | () -> `Log_subscribe @@ -291,19 +253,6 @@ let log_cmd = Asn.S.map f g @@ Asn.S.null -type vm_cmd = [ - | `Vm_info - | `Vm_create of vm_config - | `Vm_force_create of vm_config - | `Vm_destroy -] - -let pp_vm_cmd ppf = function - | `Vm_info -> Fmt.string ppf "vm info" - | `Vm_create vm_config -> Fmt.pf ppf "create %a" pp_vm_config vm_config - | `Vm_force_create vm_config -> Fmt.pf ppf "force create %a" pp_vm_config vm_config - | `Vm_destroy -> Fmt.string ppf "vm destroy" - let vm_config = let f (cpuid, requested_memory, block_device, network, vmimage, argv) = let network = match network with None -> [] | Some xs -> xs in @@ -340,17 +289,6 @@ let vm_cmd = (explicit 2 vm_config) (explicit 3 null)) -type policy_cmd = [ - | `Policy_info - | `Policy_add of policy - | `Policy_remove -] - -let pp_policy_cmd ppf = function - | `Policy_info -> Fmt.string ppf "policy info" - | `Policy_add policy -> Fmt.pf ppf "add policy: %a" pp_policy policy - | `Policy_remove -> Fmt.string ppf "policy remove" - let policy_cmd = let f = function | `C1 () -> `Policy_info @@ -380,22 +318,7 @@ let version = in Asn.S.map f g Asn.S.int -type wire_command = [ - | `Console_cmd of console_cmd - | `Stats_cmd of stats_cmd - | `Log_cmd of log_cmd - | `Vm_cmd of vm_cmd - | `Policy_cmd of policy_cmd - ] - -let pp_wire_command ppf = function - | `Console_cmd c -> pp_console_cmd ppf c - | `Stats_cmd s -> pp_stats_cmd ppf s - | `Log_cmd l -> pp_log_cmd ppf l - | `Vm_cmd v -> pp_vm_cmd ppf v - | `Policy_cmd p -> pp_policy_cmd ppf p - -let wire_command : wire_command Asn.S.t = +let wire_command = let f = function | `C1 console -> `Console_cmd console | `C2 stats -> `Stats_cmd stats @@ -417,18 +340,6 @@ let wire_command : wire_command Asn.S.t = (explicit 3 vm_cmd) (explicit 4 policy_cmd)) -type data = [ - | `Console_data of Ptime.t * string - | `Stats_data of stats - | `Log_data of Ptime.t * Log.event -] - -let pp_data ppf = function - | `Console_data (ts, line) -> Fmt.pf ppf "console data %a: %s" - (Ptime.pp_rfc3339 ()) ts line - | `Stats_data stats -> Fmt.pf ppf "stats data: %a" pp_stats stats - | `Log_data (ts, event) -> Fmt.pf ppf "log data: %a %a" (Ptime.pp_rfc3339 ()) ts Log.pp_event event - let data = let f = function | `C1 (timestamp, data) -> `Console_data (timestamp, data) @@ -455,13 +366,6 @@ let data = (required ~label:"timestamp" utc_time) (required ~label:"event" log_event)))) - -type header = { - version : version ; - sequence : int64 ; - id : id ; -} - let header = let f (version, sequence, id) = { version ; sequence ; id } and g h = h.version, h.sequence, h.id @@ -472,28 +376,6 @@ let header = (required ~label:"sequence" int64) (required ~label:"id" (sequence_of utf8_string))) -type success = [ `Empty | `String of string | `Policies of (id * policy) list | `Vms of (id * vm_config) list ] - -let pp_success ppf = function - | `Empty -> Fmt.string ppf "success" - | `String data -> Fmt.pf ppf "success: %s" data - | `Policies ps -> Fmt.(list ~sep:(unit "@.") (pair ~sep:(unit ": ") pp_id pp_policy)) ppf ps - | `Vms vms -> Fmt.(list ~sep:(unit "@.") (pair ~sep:(unit ": ") pp_id pp_vm_config)) ppf vms - -type wire = header * [ - | `Command of wire_command - | `Success of success - | `Failure of string - | `Data of data ] - -let pp_wire ppf (header, data) = - let id = header.id in - match data with - | `Command c -> Fmt.pf ppf "host %a: %a" pp_id id pp_wire_command c - | `Failure f -> Fmt.pf ppf "host %a: command failed %s" pp_id id f - | `Success s -> Fmt.pf ppf "host %a: %a" pp_id id pp_success s - | `Data d -> pp_data ppf d - let wire = let f (header, payload) = header, @@ -544,19 +426,16 @@ let wire = (explicit 2 utf8_string) (explicit 3 data)))) -let wire_of_cstruct, wire_to_cstruct = projections_of wire - -type log_entry = header * Ptime.t * Log.event +let wire_of_cstruct, (wire_to_cstruct : Vmm_commands.wire -> Cstruct.t) = projections_of wire let log_entry = - Asn.S.(sequence3 - (required ~label:"headet" header) + Asn.S.(sequence2 (required ~label:"timestamp" utc_time) (required ~label:"event" log_event)) let log_entry_of_cstruct, log_entry_to_cstruct = projections_of log_entry -type cert_extension = version * wire_command +type cert_extension = version * t let cert_extension = Asn.S.(sequence2 diff --git a/src/vmm_asn.mli b/src/vmm_asn.mli index ff5ec93..8fd920d 100644 --- a/src/vmm_asn.mli +++ b/src/vmm_asn.mli @@ -9,89 +9,18 @@ open Vmm_core (** OID in the Mirage namespace (enterprise arc 1.3.6.1.4.1.49836.43) *) val oid : Asn.OID.t -(** {1 Encoding and decoding functions} *) +val wire_to_cstruct : Vmm_commands.wire -> Cstruct.t -(** The type of versions of the ASN.1 grammar defined above. *) -type version = [ `AV0 | `AV1 | `AV2 ] +val wire_of_cstruct : Cstruct.t -> (Vmm_commands.wire, [> `Msg of string ]) result -(** [version_eq a b] is true if [a] and [b] are equal. *) -val version_eq : version -> version -> bool +val log_entry_to_cstruct : Log.t -> Cstruct.t -(** [pp_version ppf version] pretty prints [version] onto [ppf]. *) -val pp_version : version Fmt.t +val log_entry_of_cstruct : Cstruct.t -> (Log.t, [> `Msg of string ]) result -type console_cmd = [ - | `Console_add - | `Console_subscribe -] - -type stats_cmd = [ - | `Stats_add of int * string list - | `Stats_remove - | `Stats_subscribe -] - -type log_cmd = [ - | `Log_subscribe -] - -type vm_cmd = [ - | `Vm_info - | `Vm_create of vm_config - | `Vm_force_create of vm_config - | `Vm_destroy -] - -type policy_cmd = [ - | `Policy_info - | `Policy_add of policy - | `Policy_remove -] - -type wire_command = [ - | `Console_cmd of console_cmd - | `Stats_cmd of stats_cmd - | `Log_cmd of log_cmd - | `Vm_cmd of vm_cmd - | `Policy_cmd of policy_cmd ] - -val pp_wire_command : wire_command Fmt.t - -type data = [ - | `Console_data of Ptime.t * string - | `Stats_data of stats - | `Log_data of Ptime.t * Log.event -] - -val pp_data : data Fmt.t - -type header = { - version : version ; - sequence : int64 ; - id : id ; -} - -type wire = header * [ - | `Command of wire_command - | `Success of [ `Empty | `String of string | `Policies of (id * policy) list | `Vms of (id * vm_config) list ] - | `Failure of string - | `Data of data ] - -val pp_wire : wire Fmt.t - -val wire_to_cstruct : wire -> Cstruct.t - -val wire_of_cstruct : Cstruct.t -> (wire, [> `Msg of string ]) result - -type log_entry = header * Ptime.t * Log.event - -val log_entry_to_cstruct : log_entry -> Cstruct.t - -val log_entry_of_cstruct : Cstruct.t -> (log_entry, [> `Msg of string ]) result - -type cert_extension = version * wire_command +type cert_extension = Vmm_commands.version * Vmm_commands.t val cert_extension_of_cstruct : Cstruct.t -> (cert_extension, [> `Msg of string ]) result val cert_extension_to_cstruct : cert_extension -> Cstruct.t -val wire_command_of_cert : version -> X509.t -> (wire_command, [> `Msg of string ]) result +val wire_command_of_cert : Vmm_commands.version -> X509.t -> + (Vmm_commands.t, [> `Msg of string ]) result diff --git a/src/vmm_commands.ml b/src/vmm_commands.ml index f10f4c9..03b486f 100644 --- a/src/vmm_commands.ml +++ b/src/vmm_commands.ml @@ -1,10 +1,134 @@ (* (c) 2018 Hannes Mehnert, all rights reserved *) +(* the wire protocol *) open Vmm_core -let handle = function +type version = [ `AV0 | `AV1 | `AV2 ] + +let pp_version ppf v = + Fmt.int ppf + (match v with + | `AV0 -> 0 + | `AV1 -> 1 + | `AV2 -> 2) + +let version_eq a b = + match a, b with + | `AV0, `AV0 -> true + | `AV1, `AV1 -> true + | `AV2, `AV2 -> true + | _ -> false + +type console_cmd = [ + | `Console_add + | `Console_subscribe +] + +let pp_console_cmd ppf = function + | `Console_add -> Fmt.string ppf "console add" + | `Console_subscribe -> Fmt.string ppf "console subscribe" + +type stats_cmd = [ + | `Stats_add of int * string list + | `Stats_remove + | `Stats_subscribe +] + +let pp_stats_cmd ppf = function + | `Stats_add (pid, taps) -> Fmt.pf ppf "stats add: pid %d taps %a" pid Fmt.(list ~sep:(unit ", ") string) taps + | `Stats_remove -> Fmt.string ppf "stat remove" + | `Stats_subscribe -> Fmt.string ppf "stat subscribe" + +type log_cmd = [ + | `Log_subscribe +] + +let pp_log_cmd ppf = function + | `Log_subscribe -> Fmt.string ppf "log subscribe" + +type vm_cmd = [ + | `Vm_info + | `Vm_create of vm_config + | `Vm_force_create of vm_config + | `Vm_destroy +] + +let pp_vm_cmd ppf = function + | `Vm_info -> Fmt.string ppf "vm info" + | `Vm_create vm_config -> Fmt.pf ppf "create %a" pp_vm_config vm_config + | `Vm_force_create vm_config -> Fmt.pf ppf "force create %a" pp_vm_config vm_config + | `Vm_destroy -> Fmt.string ppf "vm destroy" + +type policy_cmd = [ + | `Policy_info + | `Policy_add of policy + | `Policy_remove +] + +let pp_policy_cmd ppf = function + | `Policy_info -> Fmt.string ppf "policy info" + | `Policy_add policy -> Fmt.pf ppf "add policy: %a" pp_policy policy + | `Policy_remove -> Fmt.string ppf "policy remove" + +type t = [ + | `Console_cmd of console_cmd + | `Stats_cmd of stats_cmd + | `Log_cmd of log_cmd + | `Vm_cmd of vm_cmd + | `Policy_cmd of policy_cmd + ] + +let pp ppf = function + | `Console_cmd c -> pp_console_cmd ppf c + | `Stats_cmd s -> pp_stats_cmd ppf s + | `Log_cmd l -> pp_log_cmd ppf l + | `Vm_cmd v -> pp_vm_cmd ppf v + | `Policy_cmd p -> pp_policy_cmd ppf p + +type data = [ + | `Console_data of Ptime.t * string + | `Stats_data of Stats.t + | `Log_data of Log.t +] + +let pp_data ppf = function + | `Console_data (ts, line) -> Fmt.pf ppf "console data %a: %s" + (Ptime.pp_rfc3339 ()) ts line + | `Stats_data stats -> Fmt.pf ppf "stats data: %a" Stats.pp stats + | `Log_data log -> Fmt.pf ppf "log data: %a" Log.pp log + +type header = { + version : version ; + sequence : int64 ; + id : id ; +} + +type success = [ `Empty | `String of string | `Policies of (id * policy) list | `Vms of (id * vm_config) list ] + +let pp_success ppf = function + | `Empty -> Fmt.string ppf "success" + | `String data -> Fmt.pf ppf "success: %s" data + | `Policies ps -> Fmt.(list ~sep:(unit "@.") (pair ~sep:(unit ": ") pp_id pp_policy)) ppf ps + | `Vms vms -> Fmt.(list ~sep:(unit "@.") (pair ~sep:(unit ": ") pp_id pp_vm_config)) ppf vms + +type wire = header * [ + | `Command of t + | `Success of success + | `Failure of string + | `Data of data ] + +let pp_wire ppf (header, data) = + let id = header.id in + match data with + | `Command c -> Fmt.pf ppf "host %a: %a" pp_id id pp c + | `Failure f -> Fmt.pf ppf "host %a: command failed %s" pp_id id f + | `Success s -> Fmt.pf ppf "host %a: %a" pp_id id pp_success s + | `Data d -> pp_data ppf d + +let endpoint = function | `Vm_cmd _ -> `Vmmd, `End | `Policy_cmd _ -> `Vmmd, `End | `Stats_cmd _ -> `Stats, `Read | `Console_cmd _ -> `Console, `Read | `Log_cmd _ -> `Log, `Read + diff --git a/src/vmm_commands.mli b/src/vmm_commands.mli index f242239..430618a 100644 --- a/src/vmm_commands.mli +++ b/src/vmm_commands.mli @@ -1,7 +1,78 @@ -val handle : - [< `Console_cmd of 'a - | `Log_cmd of 'b - | `Policy_cmd of 'c - | `Stats_cmd of 'd - | `Vm_cmd of 'e ] -> - [> `Console | `Log | `Stats | `Vmmd ] * [> `End | `Read ] +open Vmm_core + +(** The type of versions of the grammar defined below. *) +type version = [ `AV0 | `AV1 | `AV2 ] + +(** [version_eq a b] is true if [a] and [b] are equal. *) +val version_eq : version -> version -> bool + +(** [pp_version ppf version] pretty prints [version] onto [ppf]. *) +val pp_version : version Fmt.t + +type console_cmd = [ + | `Console_add + | `Console_subscribe +] + +type stats_cmd = [ + | `Stats_add of int * string list + | `Stats_remove + | `Stats_subscribe +] + +type log_cmd = [ + | `Log_subscribe +] + +type vm_cmd = [ + | `Vm_info + | `Vm_create of vm_config + | `Vm_force_create of vm_config + | `Vm_destroy +] + +type policy_cmd = [ + | `Policy_info + | `Policy_add of policy + | `Policy_remove +] + +type t = [ + | `Console_cmd of console_cmd + | `Stats_cmd of stats_cmd + | `Log_cmd of log_cmd + | `Vm_cmd of vm_cmd + | `Policy_cmd of policy_cmd ] + +val pp : t Fmt.t + +type data = [ + | `Console_data of Ptime.t * string + | `Stats_data of Stats.t + | `Log_data of Log.t +] + +val pp_data : data Fmt.t + +type header = { + version : version ; + sequence : int64 ; + id : id ; +} + +type success = [ + | `Empty + | `String of string + | `Policies of (id * policy) list + | `Vms of (id * vm_config) list +] + +type wire = header * [ + | `Command of t + | `Success of success + | `Failure of string + | `Data of data ] + +val pp_wire : wire Fmt.t + +val endpoint : t -> service * [ `End | `Read ] diff --git a/src/vmm_core.ml b/src/vmm_core.ml index 6f1c4b4..c43a348 100644 --- a/src/vmm_core.ml +++ b/src/vmm_core.ml @@ -7,6 +7,8 @@ open Rresult.R.Infix let tmpdir = Fpath.(v "/var" / "run" / "albatross") let dbdir = Fpath.(v "/var" / "db" / "albatross") +type service = [ `Console | `Log | `Stats | `Vmmd ] + let socket_path t = let path name = Fpath.(tmpdir / "util" / name + "sock") in let path = match t with @@ -185,88 +187,104 @@ let separate_chain = function | [ leaf ] -> Ok (leaf, []) | leaf :: xs -> Ok (leaf, List.rev xs) -type rusage = { - utime : (int64 * int) ; - stime : (int64 * int) ; - maxrss : int64 ; - ixrss : int64 ; - idrss : int64 ; - isrss : int64 ; - minflt : int64 ; - majflt : int64 ; - nswap : int64 ; - inblock : int64 ; - outblock : int64 ; - msgsnd : int64 ; - msgrcv : int64 ; - nsignals : int64 ; - nvcsw : int64 ; - nivcsw : int64 ; -} +module Stats = struct + type rusage = { + utime : (int64 * int) ; + stime : (int64 * int) ; + maxrss : int64 ; + ixrss : int64 ; + idrss : int64 ; + isrss : int64 ; + minflt : int64 ; + majflt : int64 ; + nswap : int64 ; + inblock : int64 ; + outblock : int64 ; + msgsnd : int64 ; + msgrcv : int64 ; + nsignals : int64 ; + nvcsw : int64 ; + nivcsw : int64 ; + } -let pp_rusage ppf r = - Fmt.pf ppf "utime %Lu.%d stime %Lu.%d maxrss %Lu ixrss %Lu idrss %Lu isrss %Lu minflt %Lu majflt %Lu nswap %Lu inblock %Lu outblock %Lu msgsnd %Lu msgrcv %Lu signals %Lu nvcsw %Lu nivcsw %Lu" - (fst r.utime) (snd r.utime) (fst r.stime) (snd r.stime) r.maxrss r.ixrss r.idrss r.isrss r.minflt r.majflt r.nswap r.inblock r.outblock r.msgsnd r.msgrcv r.nsignals r.nvcsw r.nivcsw + let pp_rusage ppf r = + Fmt.pf ppf "utime %Lu.%d stime %Lu.%d maxrss %Lu ixrss %Lu idrss %Lu isrss %Lu minflt %Lu majflt %Lu nswap %Lu inblock %Lu outblock %Lu msgsnd %Lu msgrcv %Lu signals %Lu nvcsw %Lu nivcsw %Lu" + (fst r.utime) (snd r.utime) (fst r.stime) (snd r.stime) r.maxrss r.ixrss r.idrss r.isrss r.minflt r.majflt r.nswap r.inblock r.outblock r.msgsnd r.msgrcv r.nsignals r.nvcsw r.nivcsw -type vmm_stats = (string * int64) list -let pp_vmm_stats ppf vmm = - Fmt.(list ~sep:(unit "@,") (pair ~sep:(unit ": ") string int64)) ppf vmm + type vmm = (string * int64) list + let pp_vmm ppf vmm = + Fmt.(list ~sep:(unit "@,") (pair ~sep:(unit ": ") string int64)) ppf vmm -type ifdata = { - name : string ; - flags : int32 ; - send_length : int32 ; - max_send_length : int32 ; - send_drops : int32 ; - mtu : int32 ; - baudrate : int64 ; - input_packets : int64 ; - input_errors : int64 ; - output_packets : int64 ; - output_errors : int64 ; - collisions : int64 ; - input_bytes : int64 ; - output_bytes : int64 ; - input_mcast : int64 ; - output_mcast : int64 ; - input_dropped : int64 ; - output_dropped : int64 ; -} + type ifdata = { + name : string ; + flags : int32 ; + send_length : int32 ; + max_send_length : int32 ; + send_drops : int32 ; + mtu : int32 ; + baudrate : int64 ; + input_packets : int64 ; + input_errors : int64 ; + output_packets : int64 ; + output_errors : int64 ; + collisions : int64 ; + input_bytes : int64 ; + output_bytes : int64 ; + input_mcast : int64 ; + output_mcast : int64 ; + input_dropped : int64 ; + output_dropped : int64 ; + } -let pp_ifdata ppf i = - Fmt.pf ppf "name %s flags %lX send_length %lu max_send_length %lu send_drops %lu mtu %lu baudrate %Lu input_packets %Lu input_errors %Lu output_packets %Lu output_errors %Lu collisions %Lu input_bytes %Lu output_bytes %Lu input_mcast %Lu output_mcast %Lu input_dropped %Lu output_dropped %Lu" - i.name i.flags i.send_length i.max_send_length i.send_drops i.mtu i.baudrate i.input_packets i.input_errors i.output_packets i.output_errors i.collisions i.input_bytes i.output_bytes i.input_mcast i.output_mcast i.input_dropped i.output_dropped + let pp_ifdata ppf i = + Fmt.pf ppf "name %s flags %lX send_length %lu max_send_length %lu send_drops %lu mtu %lu baudrate %Lu input_packets %Lu input_errors %Lu output_packets %Lu output_errors %Lu collisions %Lu input_bytes %Lu output_bytes %Lu input_mcast %Lu output_mcast %Lu input_dropped %Lu output_dropped %Lu" + i.name i.flags i.send_length i.max_send_length i.send_drops i.mtu i.baudrate i.input_packets i.input_errors i.output_packets i.output_errors i.collisions i.input_bytes i.output_bytes i.input_mcast i.output_mcast i.input_dropped i.output_dropped -type stats = rusage * vmm_stats option * ifdata list -let pp_stats ppf (ru, vmm, ifs) = - Fmt.pf ppf "%a@.%a@.%a" - pp_rusage ru - Fmt.(option ~none:(unit "no vmm stats") pp_vmm_stats) vmm - Fmt.(list ~sep:(unit "@.@.") pp_ifdata) ifs + type t = rusage * vmm option * ifdata list + let pp ppf (ru, vmm, ifs) = + Fmt.pf ppf "%a@.%a@.%a" + pp_rusage ru + Fmt.(option ~none:(unit "no vmm stats") pp_vmm) vmm + Fmt.(list ~sep:(unit "@.@.") pp_ifdata) ifs +end + +type process_exit = [ `Exit of int | `Signal of int | `Stop of int ] + +let pp_process_exit ppf = function + | `Exit n -> Fmt.pf ppf "exit %a (%d)" Fmt.Dump.signal n n + | `Signal n -> Fmt.pf ppf "signal %a (%d)" Fmt.Dump.signal n n + | `Stop n -> Fmt.pf ppf "stop %a (%d)" Fmt.Dump.signal n n module Log = struct - type event = - [ `Startup - | `Login of Ipaddr.V4.t * int - | `Logout of Ipaddr.V4.t * int - | `VM_start of int * string list * string option - | `VM_stop of int * [ `Exit of int | `Signal of int | `Stop of int ] - ] + type log_event = [ + | `Login of id * Ipaddr.V4.t * int + | `Logout of id * Ipaddr.V4.t * int + | `Startup + | `Vm_start of id * int * string list * string option + | `Vm_stop of id * int * process_exit + ] - let pp_event ppf = function - | `Startup -> Fmt.(pf ppf "STARTUP") - | `Login (ip, port) -> Fmt.pf ppf "LOGIN %a:%d" Ipaddr.V4.pp_hum ip port - | `Logout (ip, port) -> Fmt.pf ppf "LOGOUT %a:%d" Ipaddr.V4.pp_hum ip port - | `VM_start (pid, taps, block) -> - Fmt.pf ppf "STARTED %d (tap %a, block %a)" - pid Fmt.(list ~sep:(unit "; ") string) taps + let name = function + | `Startup -> [] + | `Login (name, _, _) -> name + | `Logout (name, _, _) -> name + | `Vm_start (name, _, _ ,_) -> name + | `Vm_stop (name, _, _) -> name + + let pp_log_event ppf = function + | `Startup -> Fmt.(pf ppf "startup") + | `Login (name, ip, port) -> Fmt.pf ppf "%a login %a:%d" pp_id name Ipaddr.V4.pp_hum ip port + | `Logout (name, ip, port) -> Fmt.pf ppf "%a logout %a:%d" pp_id name Ipaddr.V4.pp_hum ip port + | `Vm_start (name, pid, taps, block) -> + Fmt.pf ppf "%a started %d (tap %a, block %a)" + pp_id name pid Fmt.(list ~sep:(unit "; ") string) taps Fmt.(option ~none:(unit "no") string) block - | `VM_stop (pid, code) -> - let s, c = match code with - | `Exit n -> "exit", n - | `Signal n -> "signal", n - | `Stop n -> "stop", n - in - Fmt.pf ppf "STOPPED %d with %s %a" pid s Fmt.Dump.signal c + | `Vm_stop (name, pid, code) -> + Fmt.pf ppf "%a stopped %d with %a" pp_id name pid pp_process_exit code + + type t = Ptime.t * log_event + + let pp ppf (ts, ev) = + Fmt.pf ppf "%a: %a" (Ptime.pp_rfc3339 ()) ts pp_log_event ev end diff --git a/src/vmm_core.mli b/src/vmm_core.mli index 2b1a12b..48c91e5 100644 --- a/src/vmm_core.mli +++ b/src/vmm_core.mli @@ -1,8 +1,11 @@ val tmpdir : Fpath.t val dbdir : Fpath.t -val socket_path : [< `Console | `Log | `Stats | `Vmmd ] -> string -val pp_socket : - Format.formatter -> [< `Console | `Log | `Stats | `Vmmd ] -> unit + +type service = [ `Console | `Log | `Stats | `Vmmd ] + +val socket_path : service -> string +val pp_socket : service Fmt.t + module I : sig type t = int val compare : int -> int -> int end module IS : sig @@ -14,9 +17,6 @@ module IM : sig include Map.S with type key = I.t end -type vmtype = [ `Hvt_amd64 | `Hvt_amd64_compressed | `Hvt_arm64 ] -val pp_vmtype : vmtype Fmt.t - type id = string list val string_of_id : string list -> string val id_of_string : string -> string list @@ -45,6 +45,9 @@ val sub_block : 'a option -> 'a option -> bool val sub_cpu : IS.t -> IS.t -> bool val is_sub : super:policy -> sub:policy -> bool +type vmtype = [ `Hvt_amd64 | `Hvt_amd64_compressed | `Hvt_arm64 ] +val pp_vmtype : vmtype Fmt.t + type vm_config = { cpuid : int; requested_memory : int; @@ -79,68 +82,73 @@ val name : X509.t -> string val separate_chain : 'a list -> ('a * 'a list, [> `Msg of string ]) result -type rusage = { - utime : int64 * int; - stime : int64 * int; - maxrss : int64; - ixrss : int64; - idrss : int64; - isrss : int64; - minflt : int64; - majflt : int64; - nswap : int64; - inblock : int64; - outblock : int64; - msgsnd : int64; - msgrcv : int64; - nsignals : int64; - nvcsw : int64; - nivcsw : int64; -} -val pp_rusage : rusage Fmt.t +module Stats : sig + type rusage = { + utime : int64 * int; + stime : int64 * int; + maxrss : int64; + ixrss : int64; + idrss : int64; + isrss : int64; + minflt : int64; + majflt : int64; + nswap : int64; + inblock : int64; + outblock : int64; + msgsnd : int64; + msgrcv : int64; + nsignals : int64; + nvcsw : int64; + nivcsw : int64; + } + val pp_rusage : rusage Fmt.t -type vmm_stats = (string * int64) list -val pp_vmm_stats : vmm_stats Fmt.t + type vmm = (string * int64) list + val pp_vmm : vmm Fmt.t -type ifdata = { - name : string; - flags : int32; - send_length : int32; - max_send_length : int32; - send_drops : int32; - mtu : int32; - baudrate : int64; - input_packets : int64; - input_errors : int64; - output_packets : int64; - output_errors : int64; - collisions : int64; - input_bytes : int64; - output_bytes : int64; - input_mcast : int64; - output_mcast : int64; - input_dropped : int64; - output_dropped : int64; -} -val pp_ifdata : ifdata Fmt.t + type ifdata = { + name : string; + flags : int32; + send_length : int32; + max_send_length : int32; + send_drops : int32; + mtu : int32; + baudrate : int64; + input_packets : int64; + input_errors : int64; + output_packets : int64; + output_errors : int64; + collisions : int64; + input_bytes : int64; + output_bytes : int64; + input_mcast : int64; + output_mcast : int64; + input_dropped : int64; + output_dropped : int64; + } + val pp_ifdata : ifdata Fmt.t -type stats = rusage * vmm_stats option * ifdata list -val pp_stats : stats Fmt.t + type t = rusage * vmm option * ifdata list + val pp : t Fmt.t +end -module Log : - sig - type event = - [ `Login of Ipaddr.V4.t * int - | `Logout of Ipaddr.V4.t * int - | `Startup - | `VM_start of int * string list * string option - | `VM_stop of int * [ `Exit of int | `Signal of int | `Stop of int ] ] - val pp_event : - Format.formatter -> - [< `Login of Ipaddr.V4.t * int - | `Logout of Ipaddr.V4.t * int - | `Startup - | `VM_start of int * string list * string option - | `VM_stop of int * [< `Exit of int | `Signal of int | `Stop of int ] ] -> - unit - end +type process_exit = [ `Exit of int | `Signal of int | `Stop of int ] + +val pp_process_exit : process_exit Fmt.t + +module Log : sig + type log_event = [ + | `Login of id * Ipaddr.V4.t * int + | `Logout of id * Ipaddr.V4.t * int + | `Startup + | `Vm_start of id * int * string list * string option + | `Vm_stop of id * int * process_exit ] + + val name : log_event -> id + + val pp_log_event : log_event Fmt.t + + type t = Ptime.t * log_event + + val pp : t Fmt.t +end diff --git a/src/vmm_engine.ml b/src/vmm_engine.ml index 81e2aca..4a089e7 100644 --- a/src/vmm_engine.ml +++ b/src/vmm_engine.ml @@ -8,7 +8,7 @@ open Rresult open R.Infix type 'a t = { - wire_version : Vmm_asn.version ; + wire_version : Vmm_commands.version ; console_counter : int64 ; stats_counter : int64 ; log_counter : int64 ; @@ -26,22 +26,22 @@ let init wire_version = { } type service_out = [ - | `Stat of Vmm_asn.wire - | `Log of Vmm_asn.wire - | `Cons of Vmm_asn.wire + | `Stat of Vmm_commands.wire + | `Log of Vmm_commands.wire + | `Cons of Vmm_commands.wire ] -type out = [ service_out | `Data of Vmm_asn.wire ] +type out = [ service_out | `Data of Vmm_commands.wire ] let log t id event = - let data = `Log_data (Ptime_clock.now (), event) in - let header = Vmm_asn.{ version = t.wire_version ; sequence = t.log_counter ; id } in + let data = (Ptime_clock.now (), event) in + let header = Vmm_commands.{ version = t.wire_version ; sequence = t.log_counter ; id } in let log_counter = Int64.succ t.log_counter in - Logs.debug (fun m -> m "LOG %a" Log.pp_event event) ; - ({ t with log_counter }, `Log (header, `Data data)) + Logs.debug (fun m -> m "log %a" Log.pp data) ; + ({ t with log_counter }, `Log (header, `Data (`Log_data data))) let handle_create t hdr vm_config = - let name = hdr.Vmm_asn.id in + let name = hdr.Vmm_commands.id in (match Vmm_resources.find_vm t.resources name with | Some _ -> Error (`Msg "VM with same name is already running") | None -> Ok ()) >>= fun () -> @@ -54,7 +54,7 @@ let handle_create t hdr vm_config = Vmm_unix.prepare name vm_config >>= fun taps -> Logs.debug (fun m -> m "prepared vm with taps %a" Fmt.(list ~sep:(unit ",@ ") string) taps) ; let cons_out = - let header = Vmm_asn.{ version = t.wire_version ; sequence = t.console_counter ; id = name } in + let header = Vmm_commands.{ version = t.wire_version ; sequence = t.console_counter ; id = name } in (header, `Command (`Console_cmd `Console_add)) in Ok ({ t with console_counter = Int64.succ t.console_counter }, [ `Cons cons_out ], @@ -65,13 +65,13 @@ let handle_create t hdr vm_config = Vmm_resources.insert_vm t.resources name vm >>= fun resources -> let tasks = String.Map.add (string_of_id name) task t.tasks in let t = { t with resources ; tasks } in - let t, out = log t name (`VM_start (vm.pid, vm.taps, None)) in + let t, out = log t name (`Vm_start (name, vm.pid, vm.taps, None)) in let data = `Success (`String "created VM") in Ok (t, [ `Data (hdr, data) ; out ], name, vm))) let setup_stats t name vm = let stat_out = `Stats_add (vm.pid, vm.taps) in - let header = Vmm_asn.{ version = t.wire_version ; sequence = t.stats_counter ; id = name } in + let header = Vmm_commands.{ version = t.wire_version ; sequence = t.stats_counter ; id = name } in let t = { t with stats_counter = Int64.succ t.stats_counter } in t, [ `Stat (header, `Command (`Stats_cmd stat_out)) ] @@ -80,10 +80,10 @@ let handle_shutdown t name vm r = | Ok () -> () | Error (`Msg e) -> Logs.warn (fun m -> m "%s while shutdown vm %a" e pp_vm vm)) ; let resources = Vmm_resources.remove t.resources name in - let header = Vmm_asn.{ version = t.wire_version ; sequence = t.stats_counter ; id = name } in + let header = Vmm_commands.{ version = t.wire_version ; sequence = t.stats_counter ; id = name } in let tasks = String.Map.remove (string_of_id name) t.tasks in let t = { t with stats_counter = Int64.succ t.stats_counter ; resources ; tasks } in - let t, logout = log t name (`VM_stop (vm.pid, r)) + let t, logout = log t name (`Vm_stop (name, vm.pid, r)) in (t, [ `Stat (header, `Command (`Stats_cmd `Stats_remove)) ; logout ]) @@ -96,12 +96,12 @@ let handle_command t (header, payload) = (t, [ `Data (header, out) ], `End) in msg_to_err ( - let id = header.Vmm_asn.id in + let id = header.Vmm_commands.id in match payload with | `Command (`Policy_cmd pc) -> begin match pc with | `Policy_remove -> - Logs.debug (fun m -> m "remove policy %a" pp_id header.Vmm_asn.id) ; + Logs.debug (fun m -> m "remove policy %a" pp_id header.Vmm_commands.id) ; let resources = Vmm_resources.remove t.resources id in Ok ({ t with resources }, [ `Data (header, `Success (`String "removed policy")) ], `End) | `Policy_add policy -> @@ -179,5 +179,5 @@ let handle_command t (header, payload) = end end | _ -> - Logs.err (fun m -> m "ignoring %a" Vmm_asn.pp_wire (header, payload)) ; + Logs.err (fun m -> m "ignoring %a" Vmm_commands.pp_wire (header, payload)) ; Error (`Msg "unknown command")) diff --git a/src/vmm_engine.mli b/src/vmm_engine.mli index bf119a5..3f4fd2f 100644 --- a/src/vmm_engine.mli +++ b/src/vmm_engine.mli @@ -1,20 +1,20 @@ type 'a t -val init : Vmm_asn.version -> 'a t +val init : Vmm_commands.version -> 'a t type service_out = [ - | `Stat of Vmm_asn.wire - | `Log of Vmm_asn.wire - | `Cons of Vmm_asn.wire + | `Stat of Vmm_commands.wire + | `Log of Vmm_commands.wire + | `Cons of Vmm_commands.wire ] -type out = [ service_out | `Data of Vmm_asn.wire ] +type out = [ service_out | `Data of Vmm_commands.wire ] val handle_shutdown : 'a t -> Vmm_core.id -> Vmm_core.vm -> [ `Exit of int | `Signal of int | `Stop of int ] -> 'a t * out list -val handle_command : 'a t -> Vmm_asn.wire -> +val handle_command : 'a t -> Vmm_commands.wire -> 'a t * out list * [ `Create of 'c t -> 'c -> ('c t * out list * Vmm_core.id * Vmm_core.vm, [> Rresult.R.msg ]) result | `End diff --git a/src/vmm_lwt.mli b/src/vmm_lwt.mli index c111b45..f98809b 100644 --- a/src/vmm_lwt.mli +++ b/src/vmm_lwt.mli @@ -1,16 +1,20 @@ val pp_sockaddr : Format.formatter -> Lwt_unix.sockaddr -> unit + val pp_process_status : Format.formatter -> Unix.process_status -> unit -val ret : - Unix.process_status -> [> `Exit of int | `Signal of int | `Stop of int ] + +val ret : Unix.process_status -> Vmm_core.process_exit + val waitpid : int -> (int * Lwt_unix.process_status, unit) result Lwt.t -val wait_and_clear : - int -> - Unix.file_descr -> [> `Exit of int | `Signal of int | `Stop of int ] Lwt.t -val read_wire : - Lwt_unix.file_descr -> - (Vmm_asn.wire, [> `Eof | `Exception | `Toomuch ]) result Lwt.t + +val wait_and_clear : int -> Unix.file_descr -> Vmm_core.process_exit Lwt.t + +val read_wire : Lwt_unix.file_descr -> + (Vmm_commands.wire, [> `Eof | `Exception | `Toomuch ]) result Lwt.t + val write_raw : Lwt_unix.file_descr -> bytes -> (unit, [> `Exception ]) result Lwt.t + val write_wire : - Lwt_unix.file_descr -> Vmm_asn.wire -> (unit, [> `Exception ]) result Lwt.t + Lwt_unix.file_descr -> Vmm_commands.wire -> (unit, [> `Exception ]) result Lwt.t + val safe_close : Lwt_unix.file_descr -> unit Lwt.t diff --git a/src/vmm_tls.mli b/src/vmm_tls.mli index c5e6967..b72e093 100644 --- a/src/vmm_tls.mli +++ b/src/vmm_tls.mli @@ -1,5 +1,5 @@ -val read_tls : - Tls_lwt.Unix.t -> - (Vmm_asn.wire, [> `Eof | `Exception | `Toomuch ]) result Lwt.t +val read_tls : Tls_lwt.Unix.t -> + (Vmm_commands.wire, [> `Eof | `Exception | `Toomuch ]) result Lwt.t + val write_tls : - Tls_lwt.Unix.t -> Vmm_asn.wire -> (unit, [> `Exception ]) result Lwt.t + Tls_lwt.Unix.t -> Vmm_commands.wire -> (unit, [> `Exception ]) result Lwt.t diff --git a/stats/vmm_stats.ml b/stats/vmm_stats.ml index 02f8424..7363440 100644 --- a/stats/vmm_stats.ml +++ b/stats/vmm_stats.ml @@ -5,9 +5,9 @@ open Rresult.R.Infix open Vmm_core -external sysctl_rusage : int -> rusage = "vmmanage_sysctl_rusage" +external sysctl_rusage : int -> Stats.rusage = "vmmanage_sysctl_rusage" external sysctl_ifcount : unit -> int = "vmmanage_sysctl_ifcount" -external sysctl_ifdata : int -> ifdata = "vmmanage_sysctl_ifdata" +external sysctl_ifdata : int -> Stats.ifdata = "vmmanage_sysctl_ifdata" type vmctx @@ -18,8 +18,6 @@ external vmmapi_stats : vmctx -> int64 list = "vmmanage_vmmapi_stats" let my_version = `AV2 -let bcast = ref 0L - let descr = ref [] type 'a t = { @@ -119,8 +117,7 @@ let tick t = match Vmm_core.drop_super ~super:id ~sub:vmid with | None -> Logs.err (fun m -> m "couldn't drop super %a from sub %a" Vmm_core.pp_id id Vmm_core.pp_id vmid) ; out | Some real_id -> - let header = Vmm_asn.{ version = my_version ; sequence = !bcast ; id = real_id } in - bcast := Int64.succ !bcast ; + let header = Vmm_commands.{ version = my_version ; sequence = 0L ; id = real_id } in ((socket, vmid, (header, `Data (`Stats_data stats))) :: out)) out xs) [] (Vmm_trie.all t'.vmid_pid) @@ -174,13 +171,13 @@ let remove_vmids t vmids = let handle t socket (header, wire) = let r = - if not (Vmm_asn.version_eq my_version header.Vmm_asn.version) then + if not (Vmm_commands.version_eq my_version header.Vmm_commands.version) then Error (`Msg "cannot handle version") else match wire with | `Command (`Stats_cmd cmd) -> begin - let id = header.Vmm_asn.id in + let id = header.Vmm_commands.id in match cmd with | `Stats_add (pid, taps) -> add_pid t id pid taps >>= fun t -> @@ -193,7 +190,7 @@ let handle t socket (header, wire) = Ok ({ t with name_sockets }, `None, close, Some "subscribed") end | _ -> - Logs.warn (fun m -> m "ignoring %a" Vmm_asn.pp_wire (header, wire)) ; + Logs.warn (fun m -> m "ignoring %a" Vmm_commands.pp_wire (header, wire)) ; Ok (t, `None, None, None) in match r with