From 1d4d7509dcc7bd7786f28298b95f30d2ad1cd648 Mon Sep 17 00:00:00 2001 From: Hannes Mehnert Date: Mon, 22 Oct 2018 23:20:00 +0200 Subject: [PATCH] remove vmm_wire, use asn.1 --- .ocamlinit | 3 + app/vmm_console.ml | 37 +-- app/vmm_log.ml | 105 ++++--- app/vmmc.ml | 74 +++-- app/vmmd.ml | 67 ++--- pkg/pkg.ml | 13 +- src/albatross.mllib | 11 + src/vmm_asn.ml | 447 +++++++++++++++++++++++++++- src/vmm_asn.mli | 65 ++++- src/vmm_commands.ml | 124 +------- src/vmm_commands.mli | 7 + src/vmm_compress.mli | 2 + src/vmm_core.ml | 18 +- src/vmm_core.mli | 304 +++++++++++++++++++ src/vmm_engine.ml | 195 +++++++------ src/vmm_engine.mli | 26 ++ src/vmm_lwt.ml | 38 +-- src/vmm_lwt.mli | 14 + src/vmm_tls.ml | 35 ++- src/vmm_tls.mli | 5 + src/vmm_wire.ml | 681 ------------------------------------------- stats/vmm_stats.ml | 1 + 22 files changed, 1170 insertions(+), 1102 deletions(-) create mode 100644 .ocamlinit create mode 100644 src/albatross.mllib create mode 100644 src/vmm_commands.mli create mode 100644 src/vmm_compress.mli create mode 100644 src/vmm_core.mli create mode 100644 src/vmm_engine.mli create mode 100644 src/vmm_lwt.mli create mode 100644 src/vmm_tls.mli delete mode 100644 src/vmm_wire.ml diff --git a/.ocamlinit b/.ocamlinit new file mode 100644 index 0000000..6702b21 --- /dev/null +++ b/.ocamlinit @@ -0,0 +1,3 @@ +#require "cstruct, asn1-combinators, astring, fmt, ipaddr, rresult, lwt, x509, tls, hex, bos, decompress, tls.lwt" +#directory "_build/src" +#load "albatross.cma" diff --git a/app/vmm_console.ml b/app/vmm_console.ml index 3ec0cc6..81f7572 100644 --- a/app/vmm_console.ml +++ b/app/vmm_console.ml @@ -14,7 +14,7 @@ open Lwt.Infix open Astring -let my_version = `WV2 +let my_version = `AV2 let pp_unix_error ppf e = Fmt.string ppf (Unix.error_message e) @@ -31,7 +31,8 @@ let read_console name ring channel () = (match String.Map.find name !active with | None -> Lwt.return_unit | Some fd -> - Vmm_lwt.write_wire fd (Vmm_wire.Console.data my_version id t line) >>= function + let header = Vmm_asn.{ version = my_version ; sequence = 0L ; id } in + Vmm_lwt.write_wire fd (header, `Command (`Console_cmd (`Console_data (t, line)))) >>= function | Error _ -> Vmm_lwt.safe_close fd >|= fun () -> active := String.Map.remove name !active @@ -79,7 +80,7 @@ let add_fifo id = | None -> Error (`Msg "opening") -let attach s id = +let subscribe s id = let name = Vmm_core.string_of_id id in Logs.debug (fun m -> m "attempting to attach %a" Vmm_core.pp_id id) ; match String.Map.find name !t with @@ -90,8 +91,8 @@ let attach s id = let entries = Vmm_ring.read r in Logs.debug (fun m -> m "found %d history" (List.length entries)) ; Lwt_list.iter_s (fun (i, v) -> - let msg = Vmm_wire.Console.data my_version id i v in - Vmm_lwt.write_wire s msg >|= fun _ -> ()) + let header = Vmm_asn.{ version = my_version ; sequence = 0L ; id } in + Vmm_lwt.write_wire s (header, `Command (`Console_cmd (`Console_data (i, v)))) >|= fun _ -> ()) entries >>= fun () -> (match String.Map.find name !active with | None -> Lwt.return_unit @@ -109,24 +110,24 @@ let handle s addr () = | Error _ -> Logs.err (fun m -> m "exception while reading") ; Lwt.return_unit - | Ok (hdr, _) when Vmm_wire.is_reply hdr -> - Logs.err (fun m -> m "unexpected reply") ; + | Ok (_, `Success _) -> + Logs.err (fun m -> m "unexpected success reply") ; loop () - | Ok (hdr, data) -> - (if not (Vmm_wire.version_eq hdr.Vmm_wire.version my_version) then + | Ok (_, `Failure _) -> + Logs.err (fun m -> m "unexpected failure reply") ; + loop () + | Ok (header, `Command cmd) -> + (if not (Vmm_asn.version_eq header.Vmm_asn.version my_version) then Lwt.return (Error (`Msg "ignoring data with bad version")) else - match Vmm_wire.decode_strings data with - | Error e -> Lwt.return (Error e) - | Ok (id, _) -> match Vmm_wire.Console.int_to_op hdr.Vmm_wire.tag with - | Some Vmm_wire.Console.Add_console -> add_fifo id - | Some Vmm_wire.Console.Attach_console -> attach s id - | Some Vmm_wire.Console.Data -> Lwt.return (Error (`Msg "unexpected Data")) - | None -> Lwt.return (Error (`Msg "unknown command"))) >>= (function - | Ok msg -> Vmm_lwt.write_wire s (Vmm_wire.success ~msg my_version hdr.Vmm_wire.id hdr.Vmm_wire.tag) + match cmd with + | `Console_cmd `Console_add -> add_fifo header.Vmm_asn.id + | `Console_cmd `Console_subscribe -> subscribe s header.Vmm_asn.id + | _ -> Lwt.return (Error (`Msg "unexpected command"))) >>= (function + | Ok msg -> Vmm_lwt.write_wire s (header, `Success (`String msg)) | Error (`Msg msg) -> Logs.err (fun m -> m "error while processing command: %s" msg) ; - Vmm_lwt.write_wire s (Vmm_wire.fail ~msg my_version hdr.Vmm_wire.id)) >>= function + Vmm_lwt.write_wire s (header, `Failure msg)) >>= function | Ok () -> loop () | Error _ -> Logs.err (fun m -> m "exception while writing to socket") ; diff --git a/app/vmm_log.ml b/app/vmm_log.ml index 5d2969d..26b488c 100644 --- a/app/vmm_log.ml +++ b/app/vmm_log.ml @@ -12,7 +12,7 @@ open Lwt.Infix -let my_version = `WV2 +let my_version = `AV2 let broadcast prefix data t = Lwt_list.fold_left_s (fun t (id, s) -> @@ -64,25 +64,24 @@ let tree = ref Vmm_trie.empty let bcast = ref 0L -let send_history s ring id cmd_id = +let send_history s ring id = let elements = Vmm_ring.read ring in let res = List.fold_left (fun acc (_, x) -> let cs = Cstruct.of_string x in - match Vmm_wire.Log.decode_log_hdr cs with - | Ok (hdr, _) -> - begin match Vmm_core.drop_super ~super:id ~sub:hdr.Vmm_core.Log.name with - | Some [] -> cs :: acc - | _ -> acc - end + match Vmm_asn.log_entry_of_cstruct cs with + | Ok (header, ts, event) -> + if Vmm_core.is_sub_id ~super:id ~sub:header.Vmm_asn.id + then (header, ts, event) :: acc + else acc | _ -> acc) [] elements in (* just need a wrapper in tag = Log.Data, id = reqid *) - Lwt_list.fold_left_s (fun r body -> + Lwt_list.fold_left_s (fun r (header, ts, event) -> match r with | Ok () -> - let data = Vmm_wire.encode ~body my_version cmd_id (Vmm_wire.Log.op_to_int Vmm_wire.Log.Broadcast) in + let data = header, `Command (`Log_cmd (`Log_data (ts, event))) in Vmm_lwt.write_wire s data | Error e -> Lwt.return (Error e)) (Ok ()) res @@ -99,53 +98,51 @@ let handle mvar ring s addr () = | Error _ -> Logs.err (fun m -> m "exception while reading") ; Lwt.return_unit - | Ok (hdr, _) when Vmm_wire.is_reply hdr -> - Logs.warn (fun m -> m "ignoring reply") ; + | Ok (_, `Failure _) -> + Logs.warn (fun m -> m "ignoring failure") ; loop () - | Ok (hdr, _) when not (Vmm_wire.version_eq hdr.Vmm_wire.version my_version) -> - Logs.warn (fun m -> m "unsupported version") ; - Lwt.return_unit - | Ok (hdr, data) -> match Vmm_wire.Log.int_to_op hdr.Vmm_wire.tag with - | Some Vmm_wire.Log.Log -> - begin match Vmm_wire.Log.decode_log_hdr data with - | Error (`Msg err) -> - Logs.warn (fun m -> m "ignoring error %s while decoding log" err) ; - loop () - | Ok (hdr, _) -> - Vmm_ring.write ring (hdr.Vmm_core.Log.ts, Cstruct.to_string data) ; - Lwt_mvar.put mvar data >>= fun () -> - let data' = Vmm_wire.encode ~body:data my_version !bcast (Vmm_wire.Log.op_to_int Vmm_wire.Log.Broadcast) in - bcast := Int64.succ !bcast ; - broadcast hdr.Vmm_core.Log.name data' !tree >>= fun tree' -> - tree := tree' ; - loop () - end - | Some Vmm_wire.Log.Subscribe -> - begin match Vmm_wire.decode_strings data with - | Error (`Msg err) -> - Logs.warn (fun m -> m "ignoring error %s while decoding subscribe" err) ; - loop () - | Ok (id, _) -> - let tree', ret = Vmm_trie.insert id s !tree in - tree := tree' ; - (match ret with - | None -> Lwt.return_unit - | Some s' -> Vmm_lwt.safe_close s') >>= fun () -> - let out = Vmm_wire.success my_version hdr.Vmm_wire.id hdr.Vmm_wire.tag in - Vmm_lwt.write_wire s out >>= function + | Ok (_, `Success _) -> + Logs.warn (fun m -> m "ignoring success") ; + loop () + | Ok (hdr, `Command (`Log_cmd lc)) -> + if not (Vmm_asn.version_eq hdr.Vmm_asn.version my_version) then begin + Logs.warn (fun m -> m "unsupported version") ; + Lwt.return_unit + end else begin + match lc with + | `Log_data (ts, event) -> + let data = Vmm_asn.log_entry_to_cstruct (hdr, ts, event) in + Vmm_ring.write ring (ts, Cstruct.to_string data) ; + Lwt_mvar.put mvar data >>= fun () -> + let data' = + let header = Vmm_asn.{ version = my_version ; sequence = !bcast ; id = hdr.Vmm_asn.id } in + (header, `Command (`Log_cmd (`Log_data (ts, event)))) + in + bcast := Int64.succ !bcast ; + broadcast hdr.Vmm_asn.id data' !tree >>= fun tree' -> + tree := tree' ; + loop () + | `Log_subscribe -> + let tree', ret = Vmm_trie.insert hdr.Vmm_asn.id s !tree in + tree := tree' ; + (match ret with + | None -> Lwt.return_unit + | Some s' -> Vmm_lwt.safe_close s') >>= fun () -> + let out = `Success `Empty in + Vmm_lwt.write_wire s (hdr, out) >>= function + | Error _ -> + Logs.err (fun m -> m "error while sending reply for subscribe") ; + Lwt.return_unit + | Ok () -> + send_history s ring hdr.Vmm_asn.id >>= function | Error _ -> - Logs.err (fun m -> m "error while sending reply for subscribe") ; + Logs.err (fun m -> m "error while sending history") ; Lwt.return_unit - | Ok () -> - send_history s ring id hdr.Vmm_wire.id >>= function - | Error _ -> - Logs.err (fun m -> m "error while sending history") ; - Lwt.return_unit - | Ok () -> loop () (* TODO no need to loop ;) *) - end - | _ -> - Logs.err (fun m -> m "unknown command") ; - loop () + | Ok () -> loop () (* TODO no need to loop ;) *) + end + | _ -> + Logs.err (fun m -> m "unknown command") ; + loop () in loop () >>= fun () -> Vmm_lwt.safe_close s diff --git a/app/vmmc.ml b/app/vmmc.ml index 1ce632c..d66ee7a 100644 --- a/app/vmmc.ml +++ b/app/vmmc.ml @@ -6,11 +6,20 @@ open Astring open Vmm_core +let version = `AV2 + let process fd = Vmm_lwt.read_wire fd >|= function - | Error (`Msg m) -> Error (`Msg m) - | Error _ -> Error (`Msg "read error") - | Ok data -> Vmm_commands.handle_reply data + | Error _ -> + Error (`Msg "read or parse error") + | Ok (header, reply) -> + if Vmm_asn.version_eq header.Vmm_asn.version version then begin + Logs.app (fun m -> m "%a" Vmm_asn.pp_wire (header, reply)) ; + Ok () + end else begin + Logs.err (fun m -> m "version not equal") ; + Error (`Msg "version not equal") + end let socket t = function | Some x -> x @@ -25,53 +34,38 @@ let connect socket_path = let read fd = (* now we busy read and process output *) let rec loop () = - Vmm_lwt.read_wire fd >>= function - | Error (`Msg msg) -> Logs.err (fun m -> m "error while reading %s" msg) ; loop () - | Error _ -> Lwt.return (Error (`Msg "exception while reading")) - | Ok data -> match Vmm_commands.handle_reply data with - | Error (`Msg msg) -> Lwt.return (Error (`Msg msg)) - | Ok (hdr, data) -> - if Vmm_wire.is_reply hdr then - let msg = match Vmm_wire.decode_string data with - | Error _ -> None - | Ok (m, _) -> Some m - in - Logs.app (fun m -> m "operation succeeded: %a" Fmt.(option ~none:(unit "") string) msg) ; - loop () - else - match Vmm_commands.log_pp_reply (hdr, data) with - | Ok () -> loop () - | Error (`Msg msg) -> Lwt.return (Error (`Msg msg)) + process fd >>= function + | Error e -> Lwt.return (Error e) + | Ok () -> loop () in loop () -let handle opt_socket (cmd : Vmm_commands.t) = - let sock, next, cmd = Vmm_commands.handle cmd in +let handle opt_socket id (cmd : Vmm_asn.wire_command) = + let sock, next = Vmm_commands.handle cmd in connect (socket sock opt_socket) >>= fun fd -> - Vmm_lwt.write_wire fd cmd >>= function + let header = Vmm_asn.{ version ; sequence = 0L ; id } in + Vmm_lwt.write_wire fd (header, `Command cmd) >>= function | Error `Exception -> Lwt.return (Error (`Msg "couldn't write")) | Ok () -> (match next with | `Read -> read fd - | `End -> - process fd >|= function - | Error e -> Error e - | Ok data -> Vmm_commands.log_pp_reply data) >>= fun res -> + | `End -> process fd) >>= fun res -> Vmm_lwt.safe_close fd >|= fun () -> res -let jump opt_socket cmd = +let jump opt_socket name cmd = match - Lwt_main.run (handle opt_socket cmd) + Lwt_main.run (handle opt_socket name cmd) with | Ok () -> `Ok () | Error (`Msg m) -> `Error (false, m) -let info_ _ opt_socket name = jump opt_socket (`Info name) +let info_ _ opt_socket name = jump opt_socket name (`Vm_cmd `Vm_info) -let policy _ opt_socket name = jump opt_socket (`Policy name) +let policy _ opt_socket name = jump opt_socket name (`Policy_cmd `Policy_info) -let remove_policy _ opt_socket name = jump opt_socket (`Remove_policy name) +let remove_policy _ opt_socket name = + jump opt_socket name (`Policy_cmd `Policy_remove) let add_policy _ opt_socket name vms memory cpus block bridges = let bridges = match bridges with @@ -84,10 +78,10 @@ let add_policy _ opt_socket name vms memory cpus block bridges = and cpuids = IS.of_list cpus in let policy = { vms ; cpuids ; memory ; block ; bridges } in - jump opt_socket (`Add_policy (name, policy)) + jump opt_socket name (`Policy_cmd (`Policy_add policy)) let destroy _ opt_socket name = - jump opt_socket (`Destroy_vm name) + jump opt_socket name (`Vm_cmd `Vm_destroy) let create _ opt_socket force name image cpuid requested_memory boot_params block_device network = let image' = match Bos.OS.File.read (Fpath.v image) with @@ -106,17 +100,17 @@ let create _ opt_socket force name image cpuid requested_memory boot_params bloc } in let cmd = if force then - `Force_create_vm vm_config + `Vm_force_create vm_config else - `Create_vm vm_config + `Vm_create vm_config in - jump opt_socket cmd + jump opt_socket name (`Vm_cmd cmd) -let console _ opt_socket name = jump opt_socket (`Console name) +let console _ opt_socket name = jump opt_socket name (`Console_cmd `Console_subscribe) -let stats _ opt_socket name = jump opt_socket (`Statistics name) +let stats _ opt_socket name = jump opt_socket name (`Stats_cmd `Stats_subscribe) -let event_log _ opt_socket name = jump opt_socket (`Log name) +let event_log _ opt_socket name = jump opt_socket name (`Log_cmd `Log_subscribe) let help _ _ man_format cmds = function | None -> `Help (`Pager, None) diff --git a/app/vmmd.ml b/app/vmmd.ml index fe5380e..1f89fba 100644 --- a/app/vmmd.ml +++ b/app/vmmd.ml @@ -16,24 +16,34 @@ let pp_stats ppf s = open Lwt.Infix -type out = [ - | `Cons of Cstruct.t - | `Stat of Cstruct.t - | `Log of Cstruct.t -] +let version = `AV2 -let state = ref (Vmm_engine.init ()) +let state = ref (Vmm_engine.init version) let create c_fd process cont = Vmm_lwt.read_wire c_fd >>= function - | Ok (hdr, data) -> - if Vmm_wire.is_fail hdr then begin - Logs.err (fun m -> m "console failed with %s" (Cstruct.to_string data)) ; + | Error (`Msg msg) -> + Logs.err (fun m -> m "error %s while reading from console" msg) ; + Lwt.return_unit + | Error _ -> + Logs.err (fun m -> m "error while reading from console") ; + Lwt.return_unit + | Ok (header, wire) -> + if not (Vmm_asn.version_eq version header.Vmm_asn.version) then begin + Logs.err (fun m -> m "invalid version while reading from console") ; Lwt.return_unit - end else if Vmm_wire.is_reply hdr then begin - (* assert hdr.id = id! *) - let await, wakeme = Lwt.wait () in - begin match cont !state await with + end else + match wire with + | `Command _ -> + Logs.err (fun m -> m "console returned a command") ; + Lwt.return_unit + | `Failure f -> + Logs.err (fun m -> m "console failed with %s" f) ; + Lwt.return_unit + | `Success _msg -> + (* assert hdr.id = id! *) + let await, wakeme = Lwt.wait () in + match cont !state await with | Error (`Msg msg) -> Logs.err (fun m -> m "create continuation failed %s" msg) ; Lwt.return_unit @@ -48,25 +58,9 @@ let create c_fd process cont = process out' >|= fun () -> Lwt.wakeup wakeme ()) ; process out >>= fun () -> - begin match Vmm_engine.setup_stats !state vm with - | Ok (state', out) -> - state := state' ; - process out (* TODO: need to read from stats socket! *) - | Error (`Msg e) -> - Logs.warn (fun m -> m "(ignored) error %s while setting up statistics" e) ; - Lwt.return_unit - end - end - end else begin - Logs.err (fun m -> m "reading from console %lx, %a" hdr.Vmm_wire.tag Cstruct.hexdump_pp data) ; - Lwt.return_unit - end - | Error (`Msg msg) -> - Logs.err (fun m -> m "error %s while reading from console" msg) ; - Lwt.return_unit - | Error _ -> - Logs.err (fun m -> m "error while reading from console") ; - Lwt.return_unit + let state', out = Vmm_engine.setup_stats !state vm in + state := state' ; + process out (* TODO: need to read from stats socket! *) let handle out c_fd fd addr = (* out is for `Log | `Stat | `Cons (including reconnect semantics) *) @@ -86,7 +80,7 @@ let handle out c_fd fd addr = *) let process xs = Lwt_list.iter_p (function - | #out as o -> out o + | #Vmm_engine.service_out as o -> out o | `Data cs -> (* rather: terminate connection *) Vmm_lwt.write_wire fd cs >|= fun _ -> ()) xs @@ -96,16 +90,15 @@ let handle out c_fd fd addr = | Error _ -> Logs.err (fun m -> m "error while reading") ; Lwt.return_unit - | Ok (hdr, buf) -> + | Ok wire -> Logs.debug (fun m -> m "read sth") ; - let state', data, next = Vmm_engine.handle_command !state hdr buf in + let state', data, next = Vmm_engine.handle_command !state wire in state := state' ; process data >>= fun () -> match next with | `End -> Lwt.return_unit | `Wait (task, out) -> task >>= fun () -> process out - | `Wait_and_create (state', task, next) -> - state := state' ; + | `Wait_and_create (task, next) -> task >>= fun () -> let state', data, n = next !state in state := state' ; diff --git a/pkg/pkg.ml b/pkg/pkg.ml index 0b280e7..ae465e3 100644 --- a/pkg/pkg.ml +++ b/pkg/pkg.ml @@ -6,20 +6,21 @@ open Topkg let () = Pkg.describe "albatross" @@ fun _ -> Ok [ + Pkg.mllib "src/albatross.mllib" ; Pkg.bin "app/vmmd" ; Pkg.bin "app/vmm_console" ; Pkg.bin "app/vmm_log" ; - Pkg.bin "app/vmm_client" ; - Pkg.bin "app/vmm_tls_endpoint" ; +(* Pkg.bin "app/vmm_client" ; + Pkg.bin "app/vmm_tls_endpoint" ; *) Pkg.bin "app/vmmc" ; - Pkg.bin "provision/vmm_req_command" ; +(* Pkg.bin "provision/vmm_req_command" ; Pkg.bin "provision/vmm_req_delegation" ; Pkg.bin "provision/vmm_req_vm" ; Pkg.bin "provision/vmm_sign" ; Pkg.bin "provision/vmm_revoke" ; - Pkg.bin "provision/vmm_gen_ca" ; + Pkg.bin "provision/vmm_gen_ca" ; *) (* Pkg.clib "stats/libvmm_stats_stubs.clib" ; *) - Pkg.bin "stats/vmm_stats_lwt" ; +(* Pkg.bin "stats/vmm_stats_lwt" ; (* Pkg.bin "app/vmm_prometheus_stats" ; *) - Pkg.bin "app/vmm_influxdb_stats" ; + Pkg.bin "app/vmm_influxdb_stats" ; *) ] diff --git a/src/albatross.mllib b/src/albatross.mllib new file mode 100644 index 0000000..5aebb70 --- /dev/null +++ b/src/albatross.mllib @@ -0,0 +1,11 @@ +Vmm_asn +Vmm_lwt +Vmm_tls +Vmm_engine +Vmm_commands +Vmm_core +Vmm_engine +Vmm_resources +Vmm_trie +Vmm_unix +Vmm_compress \ No newline at end of file diff --git a/src/vmm_asn.ml b/src/vmm_asn.ml index 46b623e..bd421f5 100644 --- a/src/vmm_asn.ml +++ b/src/vmm_asn.ml @@ -134,7 +134,6 @@ let policy_of_cstruct, policy_to_cstruct = | Error (`Parse msg) -> Error (`Msg msg)), Asn.encode c) - let image = let f = function | `C1 x -> `Hvt_amd64, x @@ -165,27 +164,31 @@ let opt cert oid f = | None -> Ok None | Some (_, data) -> f data >>| fun s -> Some s -type version = [ `AV0 | `AV1 ] +type version = [ `AV0 | `AV1 | `AV2 ] let version_of_int = function | 0 -> Ok `AV0 | 1 -> Ok `AV1 + | 2 -> Ok `AV2 | _ -> Error (`Msg "couldn't parse version") let version_to_int = function | `AV0 -> 0 | `AV1 -> 1 + | `AV2 -> 2 let pp_version ppf v = Fmt.int ppf (match v with | `AV0 -> 0 - | `AV1 -> 1) + | `AV1 -> 1 + | `AV2 -> 2) let version_eq a b = match a, b with | `AV0, `AV0 -> true | `AV1, `AV1 -> true + | `AV2, `AV2 -> true | _ -> false let version_to_cstruct v = int_to_cstruct (version_to_int v) @@ -260,3 +263,441 @@ let block_device_of_cert version cert = let block_size_of_cert version cert = version_of_cert version cert >>= fun () -> req "block-size" cert Oid.memory int_of_cstruct + +(* communication protocol *) +type console_cmd = [ + | `Console_add + | `Console_subscribe + | `Console_data of Ptime.t * string +] + +let pp_console_cmd ppf = function + | `Console_add -> Fmt.string ppf "console add" + | `Console_subscribe -> Fmt.string ppf "console subscribe" + | `Console_data (ts, line) -> Fmt.pf ppf "console data %a: %s" + (Ptime.pp_rfc3339 ()) ts line + +let console_cmd = + let f = function + | `C1 () -> `Console_add + | `C2 () -> `Console_subscribe + | `C3 (timestamp, data) -> `Console_data (timestamp, data) + and g = function + | `Console_add -> `C1 () + | `Console_subscribe -> `C2 () + | `Console_data (timestamp, data) -> `C3 (timestamp, data) + in + Asn.S.map f g @@ + Asn.S.(choice3 + (explicit 0 null) + (explicit 1 null) + (explicit 2 (sequence2 + (required ~label:"timestamp" utc_time) + (required ~label:"data" utf8_string)))) + +(* TODO is this good? *) +let int64 = + let f cs = Cstruct.BE.get_uint64 cs 0 + and g data = + let buf = Cstruct.create 8 in + Cstruct.BE.set_uint64 buf 0 data ; + buf + in + Asn.S.map f g Asn.S.octet_string + +let timeval = + Asn.S.(sequence2 + (required ~label:"seconds" int64) + (required ~label:"microseconds" int)) + +let ru = + let f (utime, (stime, (maxrss, (ixrss, (idrss, (isrss, (minflt, (majflt, (nswap, (inblock, (outblock, (msgsnd, (msgrcv, (nsignals, (nvcsw, nivcsw))))))))))))))) = + { utime ; stime ; maxrss ; ixrss ; idrss ; isrss ; minflt ; majflt ; nswap ; inblock ; outblock ; msgsnd ; msgrcv ; nsignals ; nvcsw ; nivcsw } + and g ru = + (ru.utime, (ru.stime, (ru.maxrss, (ru.ixrss, (ru.idrss, (ru.isrss, (ru.minflt, (ru.majflt, (ru.nswap, (ru.inblock, (ru.outblock, (ru.msgsnd, (ru.msgrcv, (ru.nsignals, (ru.nvcsw, ru.nivcsw))))))))))))))) + in + Asn.S.map f g @@ + Asn.S.(sequence @@ + (required ~label:"utime" timeval) + @ (required ~label:"stime" timeval) + @ (required ~label:"maxrss" int64) + @ (required ~label:"ixrss" int64) + @ (required ~label:"idrss" int64) + @ (required ~label:"isrss" int64) + @ (required ~label:"minflt" int64) + @ (required ~label:"majflt" int64) + @ (required ~label:"nswap" int64) + @ (required ~label:"inblock" int64) + @ (required ~label:"outblock" int64) + @ (required ~label:"msgsnd" int64) + @ (required ~label:"msgrcv" int64) + @ (required ~label:"nsignals" int64) + @ (required ~label:"nvcsw" int64) + -@ (required ~label:"nivcsw" int64)) + +(* TODO is this good? *) +let int32 = + let f i = Int32.of_int i + and g i = Int32.to_int i + in + Asn.S.map f g Asn.S.int + +let ifdata = + let f (name, (flags, (send_length, (max_send_length, (send_drops, (mtu, (baudrate, (input_packets, (input_errors, (output_packets, (output_errors, (collisions, (input_bytes, (output_bytes, (input_mcast, (output_mcast, (input_dropped, output_dropped))))))))))))))))) = + { name; flags; send_length; max_send_length; send_drops; mtu; baudrate; input_packets; input_errors; output_packets; output_errors; collisions; input_bytes; output_bytes; input_mcast; output_mcast; input_dropped; output_dropped } + and g i = + (i.name, (i.flags, (i.send_length, (i.max_send_length, (i.send_drops, (i.mtu, (i.baudrate, (i.input_packets, (i.input_errors, (i.output_packets, (i.output_errors, (i.collisions, (i.input_bytes, (i.output_bytes, (i.input_mcast, (i.output_mcast, (i.input_dropped, i.output_dropped))))))))))))))))) + in + Asn.S.map f g @@ + Asn.S.(sequence @@ + (required ~label:"name" utf8_string) + @ (required ~label:"flags" int32) + @ (required ~label:"send_length" int32) + @ (required ~label:"max_send_length" int32) + @ (required ~label:"send_drops" int32) + @ (required ~label:"mtu" int32) + @ (required ~label:"baudrate" int64) + @ (required ~label:"input_packets" int64) + @ (required ~label:"input_errors" int64) + @ (required ~label:"output_packets" int64) + @ (required ~label:"output_errors" int64) + @ (required ~label:"collisions" int64) + @ (required ~label:"input_bytes" int64) + @ (required ~label:"output_bytes" int64) + @ (required ~label:"input_mcast" int64) + @ (required ~label:"output_mcast" int64) + @ (required ~label:"input_dropped" int64) + -@ (required ~label:"output_dropped" int64)) + +type stats_cmd = [ + | `Stats_add of int * string list + | `Stats_remove + | `Stats_subscribe + | `Stats_data of rusage * (string * int64) list * ifdata list +] + +let pp_stats_cmd ppf = function + | `Stats_add (pid, taps) -> Fmt.pf ppf "stats add: pid %d taps %a" pid Fmt.(list ~sep:(unit ", ") string) taps + | `Stats_remove -> Fmt.string ppf "stat remove" + | `Stats_subscribe -> Fmt.string ppf "stat subscribe" + | `Stats_data (ru, vmm, ifs) -> Fmt.pf ppf "stats data: %a %a %a" + pp_rusage ru + pp_vmm vmm + Fmt.(list ~sep:(unit "@.@.") pp_ifdata) ifs + +let stats_cmd = + let f = function + | `C1 (pid, taps) -> `Stats_add (pid, taps) + | `C2 () -> `Stats_remove + | `C3 () -> `Stats_subscribe + | `C4 (ru, vmm, ifdata) -> + let vmm = match vmm with None -> [] | Some vmm -> vmm + and ifdata = match ifdata with None -> [] | Some ifs -> ifs + in + `Stats_data (ru, vmm, ifdata) + and g = function + | `Stats_add (pid, taps) -> `C1 (pid, taps) + | `Stats_remove -> `C2 () + | `Stats_subscribe -> `C3 () + | `Stats_data (ru, vmm, ifdata) -> + let vmm = match vmm with [] -> None | xs -> Some xs + and ifs = match ifdata with [] -> None | xs -> Some xs + in + `C4 (ru, vmm, ifs) + in + Asn.S.map f g @@ + Asn.S.(choice4 + (explicit 0 (sequence2 + (required ~label:"pid" int) + (required ~label:"taps" (sequence_of utf8_string)))) + (explicit 1 null) + (explicit 2 null) + (explicit 3 (sequence3 + (required ~label:"resource_usage" ru) + (optional ~label:"vmm_stats" @@ explicit 0 + (sequence_of (sequence2 + (required ~label:"key" utf8_string) + (required ~label:"value" int64)))) + (optional ~label:"ifdata" @@ explicit 1 + (sequence_of ifdata))))) + +let addr = + Asn.S.(sequence2 + (required ~label:"ip" ipv4) + (required ~label:"port" int)) + +let log_event = + let f = function + | `C1 () -> `Startup + | `C2 (ip, port) -> `Login (ip, port) + | `C3 (ip, port) -> `Logout (ip, port) + | `C4 (pid, taps, block) -> `VM_start (pid, taps, block) + | `C5 (pid, status) -> + let status' = match status with + | `C1 n -> `Exit n + | `C2 n -> `Signal n + | `C3 n -> `Stop n + in + `VM_stop (pid, status') + and g = function + | `Startup -> `C1 () + | `Login (ip, port) -> `C2 (ip, port) + | `Logout (ip, port) -> `C3 (ip, port) + | `VM_start (pid, taps, block) -> `C4 (pid, taps, block) + | `VM_stop (pid, status) -> + let status' = match status with + | `Exit n -> `C1 n + | `Signal n -> `C2 n + | `Stop n -> `C3 n + in + `C5 (pid, status') + in + Asn.S.map f g @@ + Asn.S.(choice5 + (explicit 0 null) + (explicit 1 addr) + (explicit 2 addr) + (explicit 3 (sequence3 + (required ~label:"pid" int) + (required ~label:"taps" (sequence_of utf8_string)) + (optional ~label:"block" utf8_string))) + (explicit 4 (sequence2 + (required ~label:"pid" int) + (required ~label:"status" (choice3 + (explicit 0 int) + (explicit 1 int) + (explicit 2 int)))))) + +type log_cmd = [ + | `Log_data of Ptime.t * Log.event + | `Log_subscribe +] + +let pp_log_cmd ppf = function + | `Log_data (ts, event) -> Fmt.pf ppf "log data: %a %a" (Ptime.pp_rfc3339 ()) ts Log.pp_event event + | `Log_subscribe -> Fmt.string ppf "log subscribe" + +let log_cmd = + let f = function + | `C1 (timestamp, event) -> `Log_data (timestamp, event) + | `C2 () -> `Log_subscribe + and g = function + | `Log_data (timestamp, event) -> `C1 (timestamp, event) + | `Log_subscribe -> `C2 () + in + Asn.S.map f g @@ + Asn.S.(choice2 + (explicit 0 (sequence2 + (required ~label:"timestamp" utc_time) + (required ~label:"event" log_event))) + (explicit 1 null)) + +type vm_cmd = [ + | `Vm_info + | `Vm_create of vm_config + | `Vm_force_create of vm_config + | `Vm_destroy +] + +let pp_vm_cmd ppf = function + | `Vm_info -> Fmt.string ppf "vm info" + | `Vm_create vm_config -> Fmt.pf ppf "create %a" pp_vm_config vm_config + | `Vm_force_create vm_config -> Fmt.pf ppf "force create %a" pp_vm_config vm_config + | `Vm_destroy -> Fmt.string ppf "vm destroy" + +let vm_config = + let f (cpuid, requested_memory, block_device, network, vmimage, argv) = + let network = match network with None -> [] | Some xs -> xs in + { vname = [] ; cpuid ; requested_memory ; block_device ; network ; vmimage ; argv } + and g vm = + let network = match vm.network with [] -> None | xs -> Some xs in + (vm.cpuid, vm.requested_memory, vm.block_device, network, vm.vmimage, vm.argv) + in + Asn.S.map f g @@ + Asn.S.(sequence6 + (required ~label:"cpu" int) + (required ~label:"memory" int) + (optional ~label:"block" utf8_string) + (optional ~label:"bridges" (sequence_of utf8_string)) + (required ~label:"vmimage" image) + (optional ~label:"arguments" (sequence_of utf8_string))) + +let vm_cmd = + let f = function + | `C1 () -> `Vm_info + | `C2 vm -> `Vm_create vm + | `C3 vm -> `Vm_force_create vm + | `C4 () -> `Vm_destroy + and g = function + | `Vm_info -> `C1 () + | `Vm_create vm -> `C2 vm + | `Vm_force_create vm -> `C3 vm + | `Vm_destroy -> `C4 () + in + Asn.S.map f g @@ + Asn.S.(choice4 + (explicit 0 null) + (explicit 1 vm_config) + (explicit 2 vm_config) + (explicit 3 null)) + +type policy_cmd = [ + | `Policy_info + | `Policy_add of policy + | `Policy_remove +] + +let pp_policy_cmd ppf = function + | `Policy_info -> Fmt.string ppf "policy info" + | `Policy_add policy -> Fmt.pf ppf "add policy: %a" pp_policy policy + | `Policy_remove -> Fmt.string ppf "policy remove" + +let policy_cmd = + let f = function + | `C1 () -> `Policy_info + | `C2 policy -> `Policy_add policy + | `C3 () -> `Policy_remove + and g = function + | `Policy_info -> `C1 () + | `Policy_add policy -> `C2 policy + | `Policy_remove -> `C3 () + in + Asn.S.map f g @@ + Asn.S.(choice3 + (explicit 0 null) + (explicit 1 policy_obj) + (explicit 2 null)) + +let version = + let f data = match version_of_int data with + | Ok v -> v + | Error (`Msg m) -> Asn.S.error (`Parse m) + and g = version_to_int + in + Asn.S.map f g Asn.S.int + +type wire_command = [ + | `Console_cmd of console_cmd + | `Stats_cmd of stats_cmd + | `Log_cmd of log_cmd + | `Vm_cmd of vm_cmd + | `Policy_cmd of policy_cmd + ] + +let pp_wire_command ppf = function + | `Console_cmd c -> pp_console_cmd ppf c + | `Stats_cmd s -> pp_stats_cmd ppf s + | `Log_cmd l -> pp_log_cmd ppf l + | `Vm_cmd v -> pp_vm_cmd ppf v + | `Policy_cmd p -> pp_policy_cmd ppf p + +let wire_command : wire_command Asn.S.t = + let f = function + | `C1 console -> `Console_cmd console + | `C2 stats -> `Stats_cmd stats + | `C3 log -> `Log_cmd log + | `C4 vm -> `Vm_cmd vm + | `C5 policy -> `Policy_cmd policy + and g = function + | `Console_cmd c -> `C1 c + | `Stats_cmd c -> `C2 c + | `Log_cmd c -> `C3 c + | `Vm_cmd c -> `C4 c + | `Policy_cmd c -> `C5 c + in + Asn.S.map f g @@ + Asn.S.(choice5 + (explicit 0 console_cmd) + (explicit 1 stats_cmd) + (explicit 2 log_cmd) + (explicit 3 vm_cmd) + (explicit 4 policy_cmd)) + +type header = { + version : version ; + sequence : int64 ; + id : id ; +} + +let header = + let f (version, sequence, id) = { version ; sequence ; id } + and g h = h.version, h.sequence, h.id + in + Asn.S.map f g @@ + Asn.S.(sequence3 + (required ~label:"version" version) + (required ~label:"sequence" int64) + (required ~label:"id" (sequence_of utf8_string))) + +type success = [ `Empty | `String of string | `Policies of policy list | `Vms of vm_config list ] + +let pp_success ppf = function + | `Empty -> Fmt.string ppf "success" + | `String data -> Fmt.pf ppf "success: %s" data + | `Policies ps -> Fmt.(list ~sep:(unit "@.") pp_policy) ppf ps + | `Vms vms -> Fmt.(list ~sep:(unit "@.") pp_vm_config) ppf vms + +type wire = header * [ + | `Command of wire_command + | `Success of success + | `Failure of string ] + +let pp_wire ppf (header, data) = + let id = header.id in + match data with + | `Command c -> Fmt.pf ppf "host %a: %a" pp_id id pp_wire_command c + | `Failure f -> Fmt.pf ppf "host %a: command failed %s" pp_id id f + | `Success s -> Fmt.pf ppf "host %a: %a" pp_id id pp_success s + +let wire = + let f (header, payload) = + header, + match payload with + | `C1 cmd -> `Command cmd + | `C2 data -> + let p = match data with + | `C1 () -> `Empty + | `C2 str -> `String str + | `C3 policies -> `Policies policies + | `C4 vms -> `Vms vms + in + `Success p + | `C3 str -> `Failure str + and g (header, payload) = + header, + match payload with + | `Command cmd -> `C1 cmd + | `Success data -> + let p = match data with + | `Empty -> `C1 () + | `String s -> `C2 s + | `Policies ps -> `C3 ps + | `Vms vms -> `C4 vms + in + `C2 p + | `Failure str -> `C3 str + in + Asn.S.map f g @@ + Asn.S.(sequence2 + (required ~label:"header" header) + (required ~label:"payload" + (choice3 + (explicit 0 wire_command) + (explicit 1 (choice4 + (explicit 0 null) + (explicit 1 utf8_string) + (explicit 2 (sequence_of policy_obj)) + (explicit 3 (sequence_of vm_config)))) + (explicit 2 utf8_string)))) + +let wire_of_cstruct, wire_to_cstruct = projections_of wire + +type log_entry = header * Ptime.t * Log.event + +let log_entry = + Asn.S.(sequence3 + (required ~label:"headet" header) + (required ~label:"timestamp" utc_time) + (required ~label:"event" log_event)) + +let log_entry_of_cstruct, log_entry_to_cstruct = projections_of log_entry diff --git a/src/vmm_asn.mli b/src/vmm_asn.mli index f44b3e1..9094932 100644 --- a/src/vmm_asn.mli +++ b/src/vmm_asn.mli @@ -75,7 +75,7 @@ end (** {1 Encoding and decoding functions} *) (** The type of versions of the ASN.1 grammar defined above. *) -type version = [ `AV0 | `AV1 ] +type version = [ `AV0 | `AV1 | `AV2 ] (** [version_eq a b] is true if [a] and [b] are equal. *) val version_eq : version -> version -> bool @@ -171,3 +171,66 @@ val block_device_of_cert : version -> X509.t -> (string, [> `Msg of string ]) re (** [block_size_of_cert version cert] is either the decoded block size, or an error. *) val block_size_of_cert : version -> X509.t -> (int, [> `Msg of string ]) result + +open Vmm_core +type console_cmd = [ + | `Console_add + | `Console_subscribe + | `Console_data of Ptime.t * string +] + +type stats_cmd = [ + | `Stats_add of int * string list + | `Stats_remove + | `Stats_subscribe + | `Stats_data of rusage * (string * int64) list * ifdata list +] + +type log_cmd = [ + | `Log_data of Ptime.t * Log.event + | `Log_subscribe +] + +type vm_cmd = [ + | `Vm_info + | `Vm_create of vm_config + | `Vm_force_create of vm_config + | `Vm_destroy +] + +type policy_cmd = [ + | `Policy_info + | `Policy_add of policy + | `Policy_remove +] + +type wire_command = [ + | `Console_cmd of console_cmd + | `Stats_cmd of stats_cmd + | `Log_cmd of log_cmd + | `Vm_cmd of vm_cmd + | `Policy_cmd of policy_cmd ] + +type header = { + version : version ; + sequence : int64 ; + id : id ; +} + +type wire = header * [ + | `Command of wire_command + | `Success of [ `Empty | `String of string | `Policies of policy list | `Vms of vm_config list ] + | `Failure of string ] + +val pp_wire : wire Fmt.t + +val wire_to_cstruct : wire -> Cstruct.t + +val wire_of_cstruct : Cstruct.t -> (wire, [> `Msg of string ]) result + +type log_entry = header * Ptime.t * Log.event + +val log_entry_to_cstruct : log_entry -> Cstruct.t + +val log_entry_of_cstruct : Cstruct.t -> (log_entry, [> `Msg of string ]) result + diff --git a/src/vmm_commands.ml b/src/vmm_commands.ml index f8bedc3..f10f4c9 100644 --- a/src/vmm_commands.ml +++ b/src/vmm_commands.ml @@ -2,123 +2,9 @@ open Vmm_core -let c = 0L -let ver = `WV2 - -type t = [ - | `Info of id - | `Policy of id - | `Add_policy of id * policy - | `Remove_policy of id - | `Create_vm of vm_config - | `Force_create_vm of vm_config - | `Destroy_vm of id - | `Statistics of id - | `Console of id - | `Log of id - | `Crl (* TODO *) - | `Create_block of id * int - | `Destroy_block of id -] - let handle = function - | `Info name -> - let cmd = Vmm_wire.Vm.info c ver name in - `Vmmd, `End, cmd - | `Policy name -> - let cmd = Vmm_wire.Vm.policy c ver name in - `Vmmd, `End, cmd - | `Remove_policy name -> - let cmd = Vmm_wire.Vm.remove_policy c ver name in - `Vmmd, `End, cmd - | `Add_policy (name, policy) -> - let cmd = Vmm_wire.Vm.insert_policy c ver name policy in - `Vmmd, `End, cmd - | `Create_vm vm -> - let cmd = Vmm_wire.Vm.create c ver vm in - `Vmmd, `End, cmd - | `Force_create_vm vm -> - let cmd = Vmm_wire.Vm.force_create c ver vm in - `Vmmd, `End, cmd - | `Destroy_vm name -> - let cmd = Vmm_wire.Vm.destroy c ver name in - `Vmmd, `End, cmd - | `Statistics name -> - let cmd = Vmm_wire.Stats.subscribe c ver name in - `Stats, `Read, cmd - | `Console name -> - let cmd = Vmm_wire.Console.attach c ver name in - `Console, `Read, cmd - | `Log name -> - let cmd = Vmm_wire.Log.subscribe c ver name in - `Log, `Read, cmd - | `Crl -> assert false - | `Create_block (_name, _size) -> assert false - | `Destroy_block _name -> assert false - -let handle_reply (hdr, data) = - if not (Vmm_wire.version_eq hdr.Vmm_wire.version ver) then - Error (`Msg "unknown wire protocol version") - else - if Vmm_wire.is_fail hdr then - let msg = match Vmm_wire.decode_string data with - | Ok (msg, _) -> msg - | Error _ -> "" - in - Error (`Msg ("command failed " ^ msg)) - else if Vmm_wire.is_reply hdr && hdr.Vmm_wire.id = c then - Ok (hdr, data) - else - Error (`Msg "received unexpected data") - -let log_pp_reply (hdr, data) = - let open Vmm_wire in - let tag' = Int32.logxor reply_tag hdr.tag in - let open Rresult.R.Infix in - match Vm.int_to_op tag' with - | Some Vm.Info -> - Vm.decode_vms data >>| fun (vms, _) -> - List.iter (fun (id, memory, cmd, pid, taps) -> - Logs.app (fun m -> m "VM %a %dMB command %s pid %d taps %a" - pp_id id memory cmd pid Fmt.(list ~sep:(unit ", ") string) taps)) - vms - | Some Vm.Policy -> - Vm.decode_policies data >>| fun (policies, _) -> - List.iter (fun (id, policy) -> - Logs.app (fun m -> m "policy %a: %a" pp_id id pp_policy policy)) - policies - | Some Vm.Insert_policy -> - Ok (Logs.app (fun m -> m "added policy")) - | Some Vm.Remove_policy -> - Ok (Logs.app (fun m -> m "removed policy")) - | Some Vm.Destroy -> - Ok (Logs.app (fun m -> m "destroyed VM")) - | Some Vm.Create -> - Ok (Logs.app (fun m -> m "successfully started VM")) - | Some Vm.Force_create -> - Ok (Logs.app (fun m -> m "successfully forcefully started VM")) - | None -> match Console.int_to_op tag' with - | Some Console.Data -> - decode_id_ts data >>= fun ((name, ts), off) -> - decode_string (Cstruct.shift data off) >>| fun (msg, _) -> - Logs.app (fun m -> m "%a %a: %s" Ptime.pp ts pp_id name msg) - | Some _ -> Error (`Msg (Printf.sprintf "unknown operation %lx" hdr.tag)) - | None -> match Stats.int_to_op tag' with - | Some Stats.Data -> - decode_strings data >>= fun (name', off) -> - Stats.decode_stats (Cstruct.shift data off) >>| fun (ru, vmm, ifs) -> - Logs.app (fun m -> m "stats %a@.%a@.%a@.%a@." - pp_id name' pp_rusage ru - Fmt.(list ~sep:(unit "@.") (pair ~sep:(unit ": ") string int64)) vmm - Fmt.(list ~sep:(unit "@.") pp_ifdata) ifs) - | Some _ -> Error (`Msg (Printf.sprintf "unknown operation %lx" hdr.tag)) - | None -> match Log.int_to_op tag' with - | Some Log.Broadcast -> - Log.decode_log_hdr data >>= fun (loghdr, logdata) -> - Log.decode_event logdata >>| fun event -> - Logs.app (fun m -> m "%a" Vmm_core.Log.pp (loghdr, event)) - | Some _ -> Error (`Msg (Printf.sprintf "unknown operation %lx" hdr.tag)) - | None -> Error (`Msg (Printf.sprintf "unknown operation %lx" hdr.tag)) - - - + | `Vm_cmd _ -> `Vmmd, `End + | `Policy_cmd _ -> `Vmmd, `End + | `Stats_cmd _ -> `Stats, `Read + | `Console_cmd _ -> `Console, `Read + | `Log_cmd _ -> `Log, `Read diff --git a/src/vmm_commands.mli b/src/vmm_commands.mli new file mode 100644 index 0000000..f242239 --- /dev/null +++ b/src/vmm_commands.mli @@ -0,0 +1,7 @@ +val handle : + [< `Console_cmd of 'a + | `Log_cmd of 'b + | `Policy_cmd of 'c + | `Stats_cmd of 'd + | `Vm_cmd of 'e ] -> + [> `Console | `Log | `Stats | `Vmmd ] * [> `End | `Read ] diff --git a/src/vmm_compress.mli b/src/vmm_compress.mli new file mode 100644 index 0000000..cfceea6 --- /dev/null +++ b/src/vmm_compress.mli @@ -0,0 +1,2 @@ +val compress : ?level:int -> string -> string +val uncompress : string -> (string, unit) result diff --git a/src/vmm_core.ml b/src/vmm_core.ml index c4d2ae4..e58e95f 100644 --- a/src/vmm_core.ml +++ b/src/vmm_core.ml @@ -295,6 +295,9 @@ let pp_rusage ppf r = Fmt.pf ppf "utime %Lu.%d stime %Lu.%d maxrss %Lu ixrss %Lu idrss %Lu isrss %Lu minflt %Lu majflt %Lu nswap %Lu inblock %Lu outblock %Lu msgsnd %Lu msgrcv %Lu signals %Lu nvcsw %Lu nivcsw %Lu" (fst r.utime) (snd r.utime) (fst r.stime) (snd r.stime) r.maxrss r.ixrss r.idrss r.isrss r.minflt r.majflt r.nswap r.inblock r.outblock r.msgsnd r.msgrcv r.nsignals r.nvcsw r.nivcsw +let pp_vmm ppf vmm = + Fmt.(list ~sep:(unit "@,") (pair ~sep:(unit ": ") string int64)) ppf vmm + type ifdata = { name : string ; flags : int32 ; @@ -321,16 +324,6 @@ let pp_ifdata ppf i = i.name i.flags i.send_length i.max_send_length i.send_drops i.mtu i.baudrate i.input_packets i.input_errors i.output_packets i.output_errors i.collisions i.input_bytes i.output_bytes i.input_mcast i.output_mcast i.input_dropped i.output_dropped module Log = struct - type hdr = { - ts : Ptime.t ; - name : id ; - } - - let pp_hdr ppf (hdr : hdr) = - Fmt.pf ppf "%a: %a" (Ptime.pp_rfc3339 ()) hdr.ts pp_id hdr.name - - let hdr name = { ts = Ptime_clock.now () ; name } - type event = [ `Startup | `Login of Ipaddr.V4.t * int @@ -354,9 +347,4 @@ module Log = struct | `Stop n -> "stop", n in Fmt.pf ppf "STOPPED %d with %s %a" pid s Fmt.Dump.signal c - - type msg = hdr * event - - let pp ppf (hdr, event) = - Fmt.pf ppf "%a %a" pp_hdr hdr pp_event event end diff --git a/src/vmm_core.mli b/src/vmm_core.mli new file mode 100644 index 0000000..6c0bd83 --- /dev/null +++ b/src/vmm_core.mli @@ -0,0 +1,304 @@ +val tmpdir : Fpath.t +val dbdir : Fpath.t +val socket_path : [< `Console | `Log | `Stats | `Vmmd ] -> string +val pp_socket : + Format.formatter -> [< `Console | `Log | `Stats | `Vmmd ] -> unit +module I : sig type t = int val compare : int -> int -> int end +module IS : + sig + type elt = I.t + type t = Set.Make(I).t + val empty : t + val is_empty : t -> bool + val mem : elt -> t -> bool + val add : elt -> t -> t + val singleton : elt -> t + val remove : elt -> t -> t + val union : t -> t -> t + val inter : t -> t -> t + val diff : t -> t -> t + val compare : t -> t -> int + val equal : t -> t -> bool + val subset : t -> t -> bool + val iter : (elt -> unit) -> t -> unit + val map : (elt -> elt) -> t -> t + val fold : (elt -> 'a -> 'a) -> t -> 'a -> 'a + val for_all : (elt -> bool) -> t -> bool + val exists : (elt -> bool) -> t -> bool + val filter : (elt -> bool) -> t -> t + val partition : (elt -> bool) -> t -> t * t + val cardinal : t -> int + val elements : t -> elt list + val min_elt : t -> elt + val min_elt_opt : t -> elt option + val max_elt : t -> elt + val max_elt_opt : t -> elt option + val choose : t -> elt + val choose_opt : t -> elt option + val split : elt -> t -> t * bool * t + val find : elt -> t -> elt + val find_opt : elt -> t -> elt option + val find_first : (elt -> bool) -> t -> elt + val find_first_opt : (elt -> bool) -> t -> elt option + val find_last : (elt -> bool) -> t -> elt + val find_last_opt : (elt -> bool) -> t -> elt option + val of_list : elt list -> t + end +module IM : + sig + type key = I.t + type 'a t = 'a Map.Make(I).t + val empty : 'a t + val is_empty : 'a t -> bool + val mem : key -> 'a t -> bool + val add : key -> 'a -> 'a t -> 'a t + val update : key -> ('a option -> 'a option) -> 'a t -> 'a t + val singleton : key -> 'a -> 'a t + val remove : key -> 'a t -> 'a t + val merge : + (key -> 'a option -> 'b option -> 'c option) -> 'a t -> 'b t -> 'c t + val union : (key -> 'a -> 'a -> 'a option) -> 'a t -> 'a t -> 'a t + val compare : ('a -> 'a -> int) -> 'a t -> 'a t -> int + val equal : ('a -> 'a -> bool) -> 'a t -> 'a t -> bool + val iter : (key -> 'a -> unit) -> 'a t -> unit + val fold : (key -> 'a -> 'b -> 'b) -> 'a t -> 'b -> 'b + val for_all : (key -> 'a -> bool) -> 'a t -> bool + val exists : (key -> 'a -> bool) -> 'a t -> bool + val filter : (key -> 'a -> bool) -> 'a t -> 'a t + val partition : (key -> 'a -> bool) -> 'a t -> 'a t * 'a t + val cardinal : 'a t -> int + val bindings : 'a t -> (key * 'a) list + val min_binding : 'a t -> key * 'a + val min_binding_opt : 'a t -> (key * 'a) option + val max_binding : 'a t -> key * 'a + val max_binding_opt : 'a t -> (key * 'a) option + val choose : 'a t -> key * 'a + val choose_opt : 'a t -> (key * 'a) option + val split : key -> 'a t -> 'a t * 'a option * 'a t + val find : key -> 'a t -> 'a + val find_opt : key -> 'a t -> 'a option + val find_first : (key -> bool) -> 'a t -> key * 'a + val find_first_opt : (key -> bool) -> 'a t -> (key * 'a) option + val find_last : (key -> bool) -> 'a t -> key * 'a + val find_last_opt : (key -> bool) -> 'a t -> (key * 'a) option + val map : ('a -> 'b) -> 'a t -> 'b t + val mapi : (key -> 'a -> 'b) -> 'a t -> 'b t + end +module IM64 : + sig + type key = Int64.t + type 'a t = 'a Map.Make(Int64).t + val empty : 'a t + val is_empty : 'a t -> bool + val mem : key -> 'a t -> bool + val add : key -> 'a -> 'a t -> 'a t + val update : key -> ('a option -> 'a option) -> 'a t -> 'a t + val singleton : key -> 'a -> 'a t + val remove : key -> 'a t -> 'a t + val merge : + (key -> 'a option -> 'b option -> 'c option) -> 'a t -> 'b t -> 'c t + val union : (key -> 'a -> 'a -> 'a option) -> 'a t -> 'a t -> 'a t + val compare : ('a -> 'a -> int) -> 'a t -> 'a t -> int + val equal : ('a -> 'a -> bool) -> 'a t -> 'a t -> bool + val iter : (key -> 'a -> unit) -> 'a t -> unit + val fold : (key -> 'a -> 'b -> 'b) -> 'a t -> 'b -> 'b + val for_all : (key -> 'a -> bool) -> 'a t -> bool + val exists : (key -> 'a -> bool) -> 'a t -> bool + val filter : (key -> 'a -> bool) -> 'a t -> 'a t + val partition : (key -> 'a -> bool) -> 'a t -> 'a t * 'a t + val cardinal : 'a t -> int + val bindings : 'a t -> (key * 'a) list + val min_binding : 'a t -> key * 'a + val min_binding_opt : 'a t -> (key * 'a) option + val max_binding : 'a t -> key * 'a + val max_binding_opt : 'a t -> (key * 'a) option + val choose : 'a t -> key * 'a + val choose_opt : 'a t -> (key * 'a) option + val split : key -> 'a t -> 'a t * 'a option * 'a t + val find : key -> 'a t -> 'a + val find_opt : key -> 'a t -> 'a option + val find_first : (key -> bool) -> 'a t -> key * 'a + val find_first_opt : (key -> bool) -> 'a t -> (key * 'a) option + val find_last : (key -> bool) -> 'a t -> key * 'a + val find_last_opt : (key -> bool) -> 'a t -> (key * 'a) option + val map : ('a -> 'b) -> 'a t -> 'b t + val mapi : (key -> 'a -> 'b) -> 'a t -> 'b t + end +type command = + [ `Console + | `Create_block + | `Create_vm + | `Crl + | `Destroy_block + | `Destroy_vm + | `Force_create_vm + | `Info + | `Log + | `Statistics ] +val pp_command : + Format.formatter -> + [< `Console + | `Create_block + | `Create_vm + | `Crl + | `Destroy_block + | `Destroy_vm + | `Force_create_vm + | `Info + | `Log + | `Statistics ] -> + unit +val command_of_string : + string -> + [> `Console + | `Create_block + | `Create_vm + | `Crl + | `Destroy_block + | `Destroy_vm + | `Force_create_vm + | `Info + | `Log + | `Statistics ] + option +type vmtype = [ `Hvt_amd64 | `Hvt_amd64_compressed | `Hvt_arm64 ] +val vmtype_to_int : + [< `Hvt_amd64 | `Hvt_amd64_compressed | `Hvt_arm64 ] -> int +val int_to_vmtype : + int -> [> `Hvt_amd64 | `Hvt_amd64_compressed | `Hvt_arm64 ] option +val pp_vmtype : + Format.formatter -> + [< `Hvt_amd64 | `Hvt_amd64_compressed | `Hvt_arm64 ] -> unit +type id = string list +val string_of_id : string list -> string +val id_of_string : string -> string list +val drop_super : super:string list -> sub:string list -> string list option +val is_sub_id : super:string list -> sub:string list -> bool +val domain : 'a list -> 'a list +val pp_id : Format.formatter -> string list -> unit +val pp_is : Format.formatter -> IS.t -> unit +type bridge = + [ `External of string * Ipaddr.V4.t * Ipaddr.V4.t * Ipaddr.V4.t * int + | `Internal of string ] +val pp_bridge : + Format.formatter -> + [< `External of string * Ipaddr.V4.t * Ipaddr.V4.t * Ipaddr.V4.t * int + | `Internal of string ] -> + unit +type policy = { + vms : int; + cpuids : IS.t; + memory : int; + block : int option; + bridges : bridge Astring.String.Map.t; +} +val pp_policy : Format.formatter -> policy -> unit +val sub_bridges : + [> `External of string * Ipaddr.V4.t * Ipaddr.V4.t * Ipaddr.V4.t * 'a + | `Internal of string ] + Astring.String.map -> + [> `External of string * Ipaddr.V4.t * Ipaddr.V4.t * Ipaddr.V4.t * 'a + | `Internal of string ] + Astring.String.map -> bool +val sub_block : 'a option -> 'a option -> bool +val sub_cpu : IS.t -> IS.t -> bool +val is_sub : super:policy -> sub:policy -> bool +type vm_config = { + vname : id; + cpuid : int; + requested_memory : int; + block_device : string option; + network : string list; + vmimage : vmtype * Cstruct.t; + argv : string list option; +} +val location : vm_config -> string * string +val pp_image : + Format.formatter -> + [< `Hvt_amd64 | `Hvt_amd64_compressed | `Hvt_arm64 ] * Cstruct.t -> unit +val pp_vm_config : Format.formatter -> vm_config -> unit +val good_bridge : string list -> 'a Astring.String.map -> bool +val vm_matches_res : policy -> vm_config -> bool +val check_policies : + vm_config -> policy list -> (unit, [> `Msg of string ]) Result.result +type vm = { + config : vm_config; + cmd : Bos.Cmd.t; + pid : int; + taps : string list; + stdout : Unix.file_descr; +} +val pp_vm : Format.formatter -> vm -> unit +val translate_tap : vm -> string -> string option +val identifier : Nocrypto.Numeric.Z.t -> string +val id : X509.t -> string +val name : X509.t -> string +val parse_db : + string list -> ((Z.t * string) list, [> Rresult.R.msg ]) Result.result +val find_in_db : + string -> 'a list -> ('a -> bool) -> ('a, [> Rresult.R.msg ]) Result.result +val find_name : + ('a * string) list -> string -> ('a, [> Rresult.R.msg ]) Result.result +val translate_serial : + (Nocrypto.Numeric.Z.t * string) list -> string -> string +val translate_name : (Nocrypto.Numeric.Z.t * string) list -> string -> string +val separate_chain : 'a list -> ('a * 'a list, [> `Msg of string ]) result +type rusage = { + utime : int64 * int; + stime : int64 * int; + maxrss : int64; + ixrss : int64; + idrss : int64; + isrss : int64; + minflt : int64; + majflt : int64; + nswap : int64; + inblock : int64; + outblock : int64; + msgsnd : int64; + msgrcv : int64; + nsignals : int64; + nvcsw : int64; + nivcsw : int64; +} +val pp_rusage : Format.formatter -> rusage -> unit +val pp_vmm : (string * int64) list Fmt.t + +type ifdata = { + name : string; + flags : int32; + send_length : int32; + max_send_length : int32; + send_drops : int32; + mtu : int32; + baudrate : int64; + input_packets : int64; + input_errors : int64; + output_packets : int64; + output_errors : int64; + collisions : int64; + input_bytes : int64; + output_bytes : int64; + input_mcast : int64; + output_mcast : int64; + input_dropped : int64; + output_dropped : int64; +} +val pp_ifdata : Format.formatter -> ifdata -> unit +module Log : + sig + type event = + [ `Login of Ipaddr.V4.t * int + | `Logout of Ipaddr.V4.t * int + | `Startup + | `VM_start of int * string list * string option + | `VM_stop of int * [ `Exit of int | `Signal of int | `Stop of int ] ] + val pp_event : + Format.formatter -> + [< `Login of Ipaddr.V4.t * int + | `Logout of Ipaddr.V4.t * int + | `Startup + | `VM_start of int * string list * string option + | `VM_stop of int * [< `Exit of int | `Signal of int | `Stop of int ] ] -> + unit + end diff --git a/src/vmm_engine.ml b/src/vmm_engine.ml index cf52778..af9c47b 100644 --- a/src/vmm_engine.ml +++ b/src/vmm_engine.ml @@ -8,13 +8,10 @@ open Rresult open R.Infix type 'a t = { + wire_version : Vmm_asn.version ; console_counter : int64 ; - console_version : Vmm_wire.version ; stats_counter : int64 ; - stats_version : Vmm_wire.version ; log_counter : int64 ; - log_version : Vmm_wire.version ; - client_version : Vmm_wire.version ; (* TODO: refine, maybe: bridges : (Macaddr.t String.Map.t * String.Set.t) String.Map.t ; *) used_bridges : String.Set.t String.Map.t ; @@ -23,23 +20,34 @@ type 'a t = { tasks : 'a String.Map.t ; } -let init () = { - console_counter = 1L ; console_version = `WV2 ; - stats_counter = 1L ; stats_version = `WV2 ; - log_counter = 1L ; log_version = `WV2 ; - client_version = `WV2 ; +let init wire_version = { + wire_version ; + console_counter = 1L ; + stats_counter = 1L ; + log_counter = 1L ; used_bridges = String.Map.empty ; resources = Vmm_resources.empty ; tasks = String.Map.empty ; } -let log state (hdr, event) = - let data = Vmm_wire.Log.log state.log_counter state.log_version hdr event in - let log_counter = Int64.succ state.log_counter in - Logs.debug (fun m -> m "LOG %a" Log.pp (hdr, event)) ; - ({ state with log_counter }, `Log data) +type service_out = [ + | `Stat of Vmm_asn.wire + | `Log of Vmm_asn.wire + | `Cons of Vmm_asn.wire +] + +type out = [ service_out | `Data of Vmm_asn.wire ] + +let log t id event = + let data = `Log_data (Ptime_clock.now (), event) in + let header = Vmm_asn.{ version = t.wire_version ; sequence = t.log_counter ; id } in + let log_counter = Int64.succ t.log_counter in + Logs.debug (fun m -> m "LOG %a" Log.pp_event event) ; + ({ t with log_counter }, `Log (header, `Command (`Log_cmd data))) let handle_create t hdr vm_config = + (* TODO fix (remove field?) *) + let vm_config = { vm_config with vname = hdr.Vmm_asn.id } in (match Vmm_resources.find_vm t.resources vm_config.vname with | Some _ -> Error (`Msg "VM with same name is already running") | None -> Ok ()) >>= fun () -> @@ -52,8 +60,9 @@ let handle_create t hdr vm_config = Vmm_unix.prepare vm_config >>= fun taps -> Logs.debug (fun m -> m "prepared vm with taps %a" Fmt.(list ~sep:(unit ",@ ") string) taps) ; (* TODO should we pre-reserve sth in t? *) - let cons = Vmm_wire.Console.add t.console_counter t.console_version vm_config.vname in - Ok ({ t with console_counter = Int64.succ t.console_counter }, [ `Cons cons ], + let cons = `Console_add in + let header = Vmm_asn.{ version = t.wire_version ; sequence = t.console_counter ; id = vm_config.vname } in + Ok ({ t with console_counter = Int64.succ t.console_counter }, [ `Cons (header, `Command (`Console_cmd cons)) ], `Create (fun t task -> (* actually execute the vm *) Vmm_unix.exec vm_config taps >>= fun vm -> @@ -70,14 +79,15 @@ let handle_create t hdr vm_config = t.used_bridges vm_config.network taps in let t = { t with resources ; tasks ; used_bridges } in - let t, out = log t (Log.hdr vm_config.vname, `VM_start (vm.pid, vm.taps, None)) in - let data = Vmm_wire.success t.client_version hdr.Vmm_wire.id Vmm_wire.Vm.(op_to_int Create) in - Ok (t, [ `Data data ; out ], vm))) + let t, out = log t vm_config.vname (`VM_start (vm.pid, vm.taps, None)) in + let data = `Success (`String "created VM") in + Ok (t, [ `Data (hdr, data) ; out ], vm))) let setup_stats t vm = - let stat_out = Vmm_wire.Stats.add t.stats_counter t.stats_version vm.config.vname vm.pid vm.taps in + let stat_out = `Stats_add (vm.pid, vm.taps) in + let header = Vmm_asn.{ version = t.wire_version ; sequence = t.stats_counter ; id = vm.config.vname } in let t = { t with stats_counter = Int64.succ t.stats_counter } in - Ok (t, [ `Stat stat_out ]) + t, [ `Stat (header, `Command (`Stats_cmd stat_out)) ] let handle_shutdown t vm r = (match Vmm_unix.shutdown vm with @@ -93,61 +103,56 @@ let handle_shutdown t vm r = String.Map.add br (String.Set.remove ta old) b) t.used_bridges vm.config.network vm.taps in - let stat_out = Vmm_wire.Stats.remove t.stats_counter t.stats_version vm.config.vname in + let stat_out = `Stats_remove in + let header = Vmm_asn.{ version = t.wire_version ; sequence = t.stats_counter ; id = vm.config.vname } in let tasks = String.Map.remove (string_of_id vm.config.vname) t.tasks in let t = { t with stats_counter = Int64.succ t.stats_counter ; resources ; used_bridges ; tasks } in - let t, logout = log t (Log.hdr vm.config.vname, `VM_stop (vm.pid, r)) + let t, logout = log t vm.config.vname (`VM_stop (vm.pid, r)) in - (t, [ `Stat stat_out ; logout ]) + (t, [ `Stat (header, `Command (`Stats_cmd stat_out)) ; logout ]) -let handle_command t hdr buf = +let handle_command t (header, payload) = let msg_to_err = function | Ok x -> x | Error (`Msg msg) -> Logs.debug (fun m -> m "error while processing command: %s" msg) ; - let out = Vmm_wire.fail ~msg t.client_version hdr.Vmm_wire.id in - (t, [ `Data out ], `End) + let out = `Failure msg in + (t, [ `Data (header, out) ], `End) in msg_to_err ( - if Vmm_wire.is_reply hdr then begin - Logs.warn (fun m -> m "ignoring reply") ; + let id = header.Vmm_asn.id in + match payload with + | `Failure f -> + Logs.warn (fun m -> m "ignoring failure %s" f) ; Ok (t, [], `End) - end else if not (Vmm_wire.version_eq hdr.Vmm_wire.version t.client_version) then - Error (`Msg "unknown client version") - else Vmm_wire.decode_strings buf >>= fun (id, off) -> - match Vmm_wire.Vm.int_to_op hdr.Vmm_wire.tag with - | None -> Error (`Msg "unknown command") - | Some Vmm_wire.Vm.Remove_policy -> - Logs.debug (fun m -> m "remove policy %a" pp_id id) ; - let resources = Vmm_resources.remove t.resources id in - let success = Vmm_wire.success t.client_version hdr.Vmm_wire.id hdr.Vmm_wire.tag in - Ok ({ t with resources }, [ `Data success ], `End) - | Some Vmm_wire.Vm.Insert_policy -> - begin - Logs.debug (fun m -> m "insert policy %a" pp_id id) ; - Vmm_asn.policy_of_cstruct (Cstruct.shift buf off) >>= fun (policy, _) -> - Vmm_resources.insert_policy t.resources id policy >>= fun resources -> - let success = Vmm_wire.success t.client_version hdr.Vmm_wire.id hdr.Vmm_wire.tag in - Ok ({ t with resources }, [ `Data success ], `End) - end - | Some Vmm_wire.Vm.Policy -> - begin - Logs.debug (fun m -> m "policy %a" pp_id id) ; - let policies = - Vmm_resources.fold t.resources id - (fun _ policies -> policies) - (fun prefix policy policies-> (prefix, policy) :: policies) - [] - in - match policies with - | [] -> - Logs.debug (fun m -> m "policies: couldn't find %a" pp_id id) ; - Error (`Msg "policy: not found") - | _ -> - let out = Vmm_wire.Vm.policy_reply hdr.Vmm_wire.id t.client_version policies in - Ok (t, [ `Data out ], `End) - end - | Some Vmm_wire.Vm.Info -> + | `Success _ -> + Logs.warn (fun m -> m "ignoring success") ; + Ok (t, [], `End) + | `Command (`Policy_cmd `Policy_remove) -> + Logs.debug (fun m -> m "remove policy %a" pp_id header.Vmm_asn.id) ; + let resources = Vmm_resources.remove t.resources id in + Ok ({ t with resources }, [ `Data (header, `Success (`String "removed policy")) ], `End) + | `Command (`Policy_cmd (`Policy_add policy)) -> + Logs.debug (fun m -> m "insert policy %a" pp_id id) ; + Vmm_resources.insert_policy t.resources id policy >>= fun resources -> + Ok ({ t with resources }, [ `Data (header, `Success (`String "added policy")) ], `End) + | `Command (`Policy_cmd `Policy_info) -> + begin + Logs.debug (fun m -> m "policy %a" pp_id id) ; + let policies = + Vmm_resources.fold t.resources id + (fun _ policies -> policies) + (fun prefix policy policies-> (prefix, policy) :: policies) + [] + in + match policies with + | [] -> + Logs.debug (fun m -> m "policies: couldn't find %a" pp_id id) ; + Error (`Msg "policy: not found") + | _ -> + Ok (t, [ `Data (header, `Success (`Policies policies)) ], `End) + end + | `Command (`Vm_cmd `Vm_info) -> begin Logs.debug (fun m -> m "info %a" pp_id id) ; let vms = @@ -161,44 +166,42 @@ let handle_command t hdr buf = Logs.debug (fun m -> m "info: couldn't find %a" pp_id id) ; Error (`Msg "info: not found") | _ -> - let out = Vmm_wire.Vm.info_reply hdr.Vmm_wire.id t.client_version vms in - Ok (t, [ `Data out ], `End) + let vm_configs = List.map (fun vm -> vm.config) vms in + Ok (t, [ `Data (header, `Success (`Vms vm_configs)) ], `End) end - | Some Vmm_wire.Vm.Create -> - Vmm_wire.Vm.decode_vm_config buf >>= fun vm_config -> - handle_create t hdr vm_config - | Some Vmm_wire.Vm.Force_create -> - Vmm_wire.Vm.decode_vm_config buf >>= fun vm_config -> - let resources = Vmm_resources.remove t.resources vm_config.vname in - if Vmm_resources.check_vm_policy resources vm_config then - begin match Vmm_resources.find_vm t.resources id with - | None -> handle_create t hdr vm_config - | Some vm -> - Vmm_unix.destroy vm ; - let id_str = string_of_id id in - match String.Map.find_opt id_str t.tasks with - | None -> handle_create t hdr vm_config - | Some task -> - let tasks = String.Map.remove id_str t.tasks in - let t = { t with tasks } in - Ok (t, [], `Wait_and_create - (t, task, fun t -> - msg_to_err @@ handle_create t hdr vm_config)) - end - else - Error (`Msg "wouldn't match policy") - | Some Vmm_wire.Vm.Destroy -> - match Vmm_resources.find_vm t.resources id with + | `Command (`Vm_cmd (`Vm_create vm_config)) -> + handle_create t header vm_config + | `Command (`Vm_cmd (`Vm_force_create vm_config)) -> + let resources = Vmm_resources.remove t.resources vm_config.vname in + if Vmm_resources.check_vm_policy resources vm_config then + begin match Vmm_resources.find_vm t.resources id with + | None -> handle_create t header vm_config + | Some vm -> + Vmm_unix.destroy vm ; + let id_str = string_of_id id in + match String.Map.find_opt id_str t.tasks with + | None -> handle_create t header vm_config + | Some task -> + let tasks = String.Map.remove id_str t.tasks in + let t = { t with tasks } in + Ok (t, [], `Wait_and_create + (task, fun t -> msg_to_err @@ handle_create t header vm_config)) + end + else + Error (`Msg "wouldn't match policy") + | `Command (`Vm_cmd `Vm_destroy) -> + begin match Vmm_resources.find_vm t.resources id with | Some vm -> Vmm_unix.destroy vm ; let id_str = string_of_id id in let out, next = - let success = Vmm_wire.success t.client_version hdr.Vmm_wire.id hdr.Vmm_wire.tag in - let s = [ `Data success ] in + let s = [ `Data (header, `Success (`String "destroyed vm")) ] in match String.Map.find_opt id_str t.tasks with | None -> s, `End - | Some t -> [], `Wait (t, s) + | Some t -> [], `Wait (t, s) in let tasks = String.Map.remove id_str t.tasks in Ok ({ t with tasks }, out, next) - | None -> Error (`Msg "destroy: not found")) + | None -> Error (`Msg "destroy: not found") + end + | _ -> Error (`Msg "unknown command")) diff --git a/src/vmm_engine.mli b/src/vmm_engine.mli new file mode 100644 index 0000000..af6d787 --- /dev/null +++ b/src/vmm_engine.mli @@ -0,0 +1,26 @@ + +type 'a t + +val init : Vmm_asn.version -> 'a t + +type service_out = [ + | `Stat of Vmm_asn.wire + | `Log of Vmm_asn.wire + | `Cons of Vmm_asn.wire +] + +type out = [ service_out | `Data of Vmm_asn.wire ] + +val handle_shutdown : 'a t -> Vmm_core.vm -> + [ `Exit of int | `Signal of int | `Stop of int ] -> 'a t * out list + +val handle_command : 'a t -> Vmm_asn.wire -> + 'a t * out list * + [ `Create of 'c t -> 'c -> ('c t * out list * Vmm_core.vm, [> Rresult.R.msg ]) result + | `End + | `Wait of 'a * out list + | `Wait_and_create of 'a * ('a t -> 'a t * out list * + [ `Create of 'd t -> 'd -> ('d t * out list * Vmm_core.vm, [> Rresult.R.msg ]) result + | `End ]) ] + +val setup_stats : 'a t -> Vmm_core.vm -> 'a t * out list diff --git a/src/vmm_lwt.ml b/src/vmm_lwt.ml index bfe0382..9017109 100644 --- a/src/vmm_lwt.ml +++ b/src/vmm_lwt.ml @@ -42,7 +42,7 @@ let wait_and_clear pid stdout = ret s let read_wire s = - let buf = Bytes.create (Int32.to_int Vmm_wire.header_size) in + let buf = Bytes.create 4 in let rec r b i l = Lwt.catch (fun () -> Lwt_unix.read s b i l >>= function @@ -59,27 +59,31 @@ let read_wire s = Logs.err (fun m -> m "exception %s while reading" err) ; Lwt.return (Error `Exception)) in - r buf 0 (Int32.to_int Vmm_wire.header_size) >>= function + r buf 0 4 >>= function | Error e -> Lwt.return (Error e) | Ok () -> - match Vmm_wire.decode_header (Cstruct.of_bytes buf) with - | Error (`Msg m) -> Lwt.return (Error (`Msg m)) - | Ok hdr -> - let l = Int32.to_int hdr.Vmm_wire.length in - if l > 0 then - let b = Bytes.create l in - r b 0 l >|= function - | Error e -> Error e - | Ok () -> -(* Logs.debug (fun m -> m "read hdr %a, body %a" + let len = Cstruct.BE.get_uint32 (Cstruct.of_bytes buf) 0 in + if len > 0l then + let b = Bytes.create (Int32.to_int len) in + r b 0 (Int32.to_int len) >|= function + | Error e -> Error e + | Ok () -> + (* Logs.debug (fun m -> m "read hdr %a, body %a" Cstruct.hexdump_pp (Cstruct.of_bytes buf) Cstruct.hexdump_pp (Cstruct.of_bytes b)) ; *) - Ok (hdr, Cstruct.of_bytes b) - else - Lwt.return (Ok (hdr, Cstruct.empty)) + match Vmm_asn.wire_of_cstruct (Cstruct.of_bytes b) with + | Ok w -> Ok w + | Error (`Msg msg) -> + Logs.err (fun m -> m "error %s while parsing data" msg) ; + Error `Exception + else + Lwt.return (Error `Eof) -let write_wire s buf = - let buf = Cstruct.to_bytes buf in +let write_wire s wire = + let data = Vmm_asn.wire_to_cstruct wire in + let dlen = Cstruct.create 4 in + Cstruct.BE.set_uint32 dlen 0 (Int32.of_int (Cstruct.len data)) ; + let buf = Cstruct.(to_bytes (append dlen data)) in let rec w off l = Lwt.catch (fun () -> Lwt_unix.send s buf off l [] >>= fun n -> diff --git a/src/vmm_lwt.mli b/src/vmm_lwt.mli new file mode 100644 index 0000000..ea11a6d --- /dev/null +++ b/src/vmm_lwt.mli @@ -0,0 +1,14 @@ +val pp_sockaddr : Format.formatter -> Lwt_unix.sockaddr -> unit +val pp_process_status : Format.formatter -> Unix.process_status -> unit +val ret : + Unix.process_status -> [> `Exit of int | `Signal of int | `Stop of int ] +val waitpid : int -> (int * Lwt_unix.process_status, unit) result Lwt.t +val wait_and_clear : + int -> + Unix.file_descr -> [> `Exit of int | `Signal of int | `Stop of int ] Lwt.t +val read_wire : + Lwt_unix.file_descr -> + (Vmm_asn.wire, [> `Eof | `Exception | `Toomuch ]) result Lwt.t +val write_wire : + Lwt_unix.file_descr -> Vmm_asn.wire -> (unit, [> `Exception ]) result Lwt.t +val safe_close : Lwt_unix.file_descr -> unit Lwt.t diff --git a/src/vmm_tls.ml b/src/vmm_tls.ml index e532841..4bd3daf 100644 --- a/src/vmm_tls.ml +++ b/src/vmm_tls.ml @@ -26,27 +26,32 @@ let read_tls t = Logs.err (fun m -> m "TLS read exception %s" (Printexc.to_string e)) ; Lwt.return (Error `Exception)) in - let buf = Cstruct.create (Int32.to_int Vmm_wire.header_size) in - r_n buf 0 (Int32.to_int Vmm_wire.header_size) >>= function + let buf = Cstruct.create 4 in + r_n buf 0 4 >>= function | Error e -> Lwt.return (Error e) | Ok () -> - match Vmm_wire.decode_header buf with - | Error (`Msg m) -> Lwt.return (Error (`Msg m)) - | Ok hdr -> - let l = Int32.to_int hdr.Vmm_wire.length in - if l > 0 then - let b = Cstruct.create l in - r_n b 0 l >|= function - | Error e -> Error e - | Ok () -> -(* Logs.debug (fun m -> m "TLS read id %d %a tag %d data %a" + let len = Cstruct.BE.get_uint32 buf 0 in + if len > 0l then + let b = Cstruct.create (Int32.to_int len) in + r_n b 0 (Int32.to_int len) >|= function + | Error e -> Error e + | Ok () -> + (* Logs.debug (fun m -> m "TLS read id %d %a tag %d data %a" hdr.Vmm_wire.id Vmm_wire.pp_version hdr.Vmm_wire.version hdr.Vmm_wire.tag Cstruct.hexdump_pp b) ; *) - Ok (hdr, b) + match Vmm_asn.wire_of_cstruct b with + | Ok w -> Ok w + | Error (`Msg msg) -> + Logs.err (fun m -> m "error %s while parsing data" msg) ; + Error `Exception else - Lwt.return (Ok (hdr, Cstruct.empty)) + Lwt.return (Error `Eof) -let write_tls s buf = +let write_tls s wire = + let data = Vmm_asn.wire_to_cstruct wire in + let dlen = Cstruct.create 4 in + Cstruct.BE.set_uint32 dlen 0 (Int32.of_int (Cstruct.len data)) ; + let buf = Cstruct.(append dlen data) in (* Logs.debug (fun m -> m "TLS write %a" Cstruct.hexdump_pp (Cstruct.of_string buf)) ; *) Lwt.catch (fun () -> Tls_lwt.Unix.write s buf >|= fun () -> Ok ()) diff --git a/src/vmm_tls.mli b/src/vmm_tls.mli new file mode 100644 index 0000000..c5e6967 --- /dev/null +++ b/src/vmm_tls.mli @@ -0,0 +1,5 @@ +val read_tls : + Tls_lwt.Unix.t -> + (Vmm_asn.wire, [> `Eof | `Exception | `Toomuch ]) result Lwt.t +val write_tls : + Tls_lwt.Unix.t -> Vmm_asn.wire -> (unit, [> `Exception ]) result Lwt.t diff --git a/src/vmm_wire.ml b/src/vmm_wire.ml deleted file mode 100644 index d06ff00..0000000 --- a/src/vmm_wire.ml +++ /dev/null @@ -1,681 +0,0 @@ -(* (c) 2017 Hannes Mehnert, all rights reserved *) - -(* the wire protocol - length prepended binary data *) - -(* each message (on all channels) is prefixed by a common header: - - tag (32 bit) the type of message - it is only 31 bit, the highest (leftmost) bit indicates query (0) or reply (1) - a failure is reported with the special tag 0xFFFFFFFF (all bits set) - data is a string - every request leads to a reply - WV0 and WV1 used 16 bit only - - version (16 bit) the version used on this channel (used to be byte 4-6) - - padding (16 bit) - - id (64 bit) unique id chosen by sender (for request/reply) - 0 shouldn't be used (reserved for log/console messages which do not correspond to a request) - - length (32 bit) spanning the message (excluding the 20 bytes header) - - full VM name (i.e. foo.bar.baz) encoded as size of list followed by list of strings - - replies do not contain the VM name - - Version and tag are protocol-specific - the channel between vmm and console - uses different tags and mayuse a different version than between vmm and - client. - - every command issued is replied to with success or failure. broadcast - communication (console data, log events) are not acknowledged by the - recipient. - *) - - -(* TODO unlikely that this is 32bit clean *) - -open Astring - -open Vmm_core - -type version = [ `WV0 | `WV1 | `WV2 ] - -let version_to_int = function - | `WV0 -> 0 - | `WV1 -> 1 - | `WV2 -> 2 - -let version_of_int = function - | 0 -> Ok `WV0 - | 1 -> Ok `WV1 - | 2 -> Ok `WV2 - | _ -> Error (`Msg "unknown wire version") - -let version_eq a b = match a, b with - | `WV0, `WV0 -> true - | `WV1, `WV1 -> true - | `WV2, `WV2 -> true - | _ -> false - -let pp_version ppf v = - Fmt.string ppf (match v with - | `WV0 -> "wire version 0" - | `WV1 -> "wire version 1" - | `WV2 -> "wire version 2") - -type header = { - version : version ; - tag : int32 ; - length : int32 ; - id : int64 ; -} - -let header_size = 20l - -let max_size = 0x7FFFFFFFl - -(* Throughout this module, we don't expect any cstruct bigger than the above - max_size (encode checks this!) *) - -open Rresult -open R.Infix - - -let cs_create len = Cstruct.create (Int32.to_int len) - -let cs_len cs = - let l = Cstruct.len cs in - assert (l lsr 31 = 0) ; - Int32.of_int l - -let check_len cs l = - if Int32.compare (cs_len cs) l = -1 then - Error (`Msg "underflow") - else - Ok () - -let cs_shift cs num = - check_len cs (Int32.of_int num) >>= fun () -> - Ok (Cstruct.shift cs num) - -let check_exact cs l = - if cs_len cs = l then - Ok () - else - Error (`Msg "bad length") - -let null cs = if Cstruct.len cs = 0 then Ok () else Error (`Msg "trailing bytes") - -let decode_header cs = - check_len cs 8l >>= fun () -> - let version = Cstruct.BE.get_uint16 cs 4 in - version_of_int version >>= function - | `WV0 | `WV1 -> Error (`Msg "unsupported version") - | `WV2 as version -> - check_len cs header_size >>= fun () -> - let tag = Cstruct.BE.get_uint32 cs 0 - and id = Cstruct.BE.get_uint64 cs 8 - and length = Cstruct.BE.get_uint32 cs 16 - in - Ok { length ; id ; version ; tag } - -let encode_header { length ; id ; version ; tag } = - match version with - | `WV0 | `WV1 -> invalid_arg "version no longer supported" - | `WV2 -> - let hdr = cs_create header_size in - Cstruct.BE.set_uint32 hdr 0 tag ; - Cstruct.BE.set_uint16 hdr 4 (version_to_int version) ; - Cstruct.BE.set_uint64 hdr 8 id ; - Cstruct.BE.set_uint32 hdr 16 length ; - hdr - -let max_str_len = 0xFFFF - -let decode_string cs = - check_len cs 2l >>= fun () -> - let l = Cstruct.BE.get_uint16 cs 0 in - check_len cs (Int32.add 2l (Int32.of_int l)) >>= fun () -> - let str = Cstruct.(to_string (sub cs 2 l)) in - Ok (str, l + 2) - -let encode_string str = - let l = String.length str in - assert (l < max_str_len) ; - let cs = Cstruct.create (2 + l) in - Cstruct.BE.set_uint16 cs 0 l ; - Cstruct.blit_from_string str 0 cs 2 l ; - cs - -let max = Int64.of_int max_int -let min = Int64.of_int min_int - -let decode_int ?(off = 0) cs = - check_len cs Int32.(add (of_int off) 8l) >>= fun () -> - let i = Cstruct.BE.get_uint64 cs off in - if i > max then - Error (`Msg "int too big") - else if i < min then - Error (`Msg "int too small") - else - Ok (Int64.to_int i) - -let encode_int i = - let cs = Cstruct.create 8 in - Cstruct.BE.set_uint64 cs 0 (Int64.of_int i) ; - cs - -let decode_list inner buf = - decode_int buf >>= fun len -> - let rec go acc idx = function - | 0 -> Ok (List.rev acc, idx) - | n -> - cs_shift buf idx >>= fun cs' -> - inner cs' >>= fun (data, len) -> - go (data :: acc) (idx + len) (pred n) - in - go [] 8 len - -let encode_list inner data = - let cs = encode_int (List.length data) in - Cstruct.concat (cs :: (List.map inner data)) - -let decode_strings = decode_list decode_string - -let encode_strings = encode_list encode_string - -let encode ?name ?body version id tag = - let vm = match name with None -> Cstruct.empty | Some id -> encode_strings id in - let payload = match body with None -> Cstruct.empty | Some x -> x in - let header = - let length = Int32.(add (cs_len payload) (cs_len vm)) in - { length ; id ; version ; tag } - in - Cstruct.concat [ encode_header header ; vm ; payload ] - -let maybe_str = function - | None -> Cstruct.empty - | Some c -> encode_string c - -let fail_tag = 0xFFFFFFFFl - -let reply_tag = 0x80000000l - -let is_tag v tag = Int32.logand v tag = v - -let is_reply { tag ; _ } = is_tag reply_tag tag - -let is_fail { tag ; _ } = is_tag fail_tag tag - -let reply ?body version id tag = - encode ?body version id (Int32.logor reply_tag tag) - -let fail ?msg version id = - encode ~body:(maybe_str msg) version id fail_tag - -let success ?msg version id tag = - reply ~body:(maybe_str msg) version id tag - -let decode_ptime ?(off = 0) cs = - cs_shift cs off >>= fun cs' -> - check_len cs' 16l >>= fun () -> - decode_int cs' >>= fun d -> - let ps = Cstruct.BE.get_uint64 cs' 8 in - Ok (Ptime.v (d, ps)) - -let encode_ptime ts = - let d, ps = Ptime.(Span.to_d_ps (to_span ts)) in - let cs = Cstruct.create 16 in - Cstruct.BE.set_uint64 cs 0 (Int64.of_int d) ; - Cstruct.BE.set_uint64 cs 8 ps ; - cs - -module Console = struct - type op = - | Add_console - | Attach_console - | Data (* is a reply, never acked *) - - let op_to_int = function - | Add_console -> 0x0100l - | Attach_console -> 0x0101l - | Data -> 0x0102l - - let int_to_op = function - | 0x0100l -> Some Add_console - | 0x0101l -> Some Attach_console - | 0x0102l -> Some Data - | _ -> None - - let data version name ts msg = - let body = - let ts = encode_ptime ts - and data = encode_string msg - in - Cstruct.append ts data - in - encode version ~name ~body 0L (op_to_int Data) - - let add id version name = - encode ~name version id (op_to_int Add_console) - - let attach id version name = - encode ~name version id (op_to_int Attach_console) -end - -module Stats = struct - type op = - | Add - | Remove - | Subscribe - | Data - - let op_to_int = function - | Add -> 0x0200l - | Remove -> 0x0201l - | Subscribe -> 0x0202l - | Data -> 0x0203l - - let int_to_op = function - | 0x0200l -> Some Add - | 0x0201l -> Some Remove - | 0x0202l -> Some Subscribe - | 0x0203l -> Some Data - | _ -> None - - let rusage_len = 144l - - let encode_rusage ru = - let cs = cs_create rusage_len in - Cstruct.BE.set_uint64 cs 0 (fst ru.utime) ; - Cstruct.BE.set_uint64 cs 8 (Int64.of_int (snd ru.utime)) ; - Cstruct.BE.set_uint64 cs 16 (fst ru.stime) ; - Cstruct.BE.set_uint64 cs 24 (Int64.of_int (snd ru.stime)) ; - Cstruct.BE.set_uint64 cs 32 ru.maxrss ; - Cstruct.BE.set_uint64 cs 40 ru.ixrss ; - Cstruct.BE.set_uint64 cs 48 ru.idrss ; - Cstruct.BE.set_uint64 cs 56 ru.isrss ; - Cstruct.BE.set_uint64 cs 64 ru.minflt ; - Cstruct.BE.set_uint64 cs 72 ru.majflt ; - Cstruct.BE.set_uint64 cs 80 ru.nswap ; - Cstruct.BE.set_uint64 cs 88 ru.inblock ; - Cstruct.BE.set_uint64 cs 96 ru.outblock ; - Cstruct.BE.set_uint64 cs 104 ru.msgsnd ; - Cstruct.BE.set_uint64 cs 112 ru.msgrcv ; - Cstruct.BE.set_uint64 cs 120 ru.nsignals ; - Cstruct.BE.set_uint64 cs 128 ru.nvcsw ; - Cstruct.BE.set_uint64 cs 136 ru.nivcsw ; - cs - - let decode_rusage cs = - check_exact cs rusage_len >>= fun () -> - (decode_int ~off:8 cs >>= fun ms -> - Ok (Cstruct.BE.get_uint64 cs 0, ms)) >>= fun utime -> - (decode_int ~off:24 cs >>= fun ms -> - Ok (Cstruct.BE.get_uint64 cs 16, ms)) >>= fun stime -> - let maxrss = Cstruct.BE.get_uint64 cs 32 - and ixrss = Cstruct.BE.get_uint64 cs 40 - and idrss = Cstruct.BE.get_uint64 cs 48 - and isrss = Cstruct.BE.get_uint64 cs 56 - and minflt = Cstruct.BE.get_uint64 cs 64 - and majflt = Cstruct.BE.get_uint64 cs 72 - and nswap = Cstruct.BE.get_uint64 cs 80 - and inblock = Cstruct.BE.get_uint64 cs 88 - and outblock = Cstruct.BE.get_uint64 cs 96 - and msgsnd = Cstruct.BE.get_uint64 cs 104 - and msgrcv = Cstruct.BE.get_uint64 cs 112 - and nsignals = Cstruct.BE.get_uint64 cs 120 - and nvcsw = Cstruct.BE.get_uint64 cs 128 - and nivcsw = Cstruct.BE.get_uint64 cs 136 - in - Ok { utime ; stime ; maxrss ; ixrss ; idrss ; isrss ; minflt ; majflt ; - nswap ; inblock ; outblock ; msgsnd ; msgrcv ; nsignals ; nvcsw ; nivcsw } - - let ifdata_len = 116l - - let encode_ifdata i = - let name = encode_string i.name in - let cs = cs_create ifdata_len in - Cstruct.BE.set_uint32 cs 0 i.flags ; - Cstruct.BE.set_uint32 cs 4 i.send_length ; - Cstruct.BE.set_uint32 cs 8 i.max_send_length ; - Cstruct.BE.set_uint32 cs 12 i.send_drops ; - Cstruct.BE.set_uint32 cs 16 i.mtu ; - Cstruct.BE.set_uint64 cs 20 i.baudrate ; - Cstruct.BE.set_uint64 cs 28 i.input_packets ; - Cstruct.BE.set_uint64 cs 36 i.input_errors ; - Cstruct.BE.set_uint64 cs 44 i.output_packets ; - Cstruct.BE.set_uint64 cs 52 i.output_errors ; - Cstruct.BE.set_uint64 cs 60 i.collisions ; - Cstruct.BE.set_uint64 cs 68 i.input_bytes ; - Cstruct.BE.set_uint64 cs 76 i.output_bytes ; - Cstruct.BE.set_uint64 cs 84 i.input_mcast ; - Cstruct.BE.set_uint64 cs 92 i.output_mcast ; - Cstruct.BE.set_uint64 cs 100 i.input_dropped ; - Cstruct.BE.set_uint64 cs 108 i.output_dropped ; - Cstruct.append name cs - - let decode_ifdata buf = - decode_string buf >>= fun (name, l) -> - cs_shift buf l >>= fun cs -> - check_len cs ifdata_len >>= fun () -> - let flags = Cstruct.BE.get_uint32 cs 0 - and send_length = Cstruct.BE.get_uint32 cs 4 - and max_send_length = Cstruct.BE.get_uint32 cs 8 - and send_drops = Cstruct.BE.get_uint32 cs 12 - and mtu = Cstruct.BE.get_uint32 cs 16 - and baudrate = Cstruct.BE.get_uint64 cs 20 - and input_packets = Cstruct.BE.get_uint64 cs 28 - and input_errors = Cstruct.BE.get_uint64 cs 36 - and output_packets = Cstruct.BE.get_uint64 cs 44 - and output_errors = Cstruct.BE.get_uint64 cs 52 - and collisions = Cstruct.BE.get_uint64 cs 60 - and input_bytes = Cstruct.BE.get_uint64 cs 68 - and output_bytes = Cstruct.BE.get_uint64 cs 76 - and input_mcast = Cstruct.BE.get_uint64 cs 84 - and output_mcast = Cstruct.BE.get_uint64 cs 92 - and input_dropped = Cstruct.BE.get_uint64 cs 100 - and output_dropped = Cstruct.BE.get_uint64 cs 108 - in - Ok ({ name ; flags ; send_length ; max_send_length ; send_drops ; mtu ; - baudrate ; input_packets ; input_errors ; output_packets ; - output_errors ; collisions ; input_bytes ; output_bytes ; input_mcast ; - output_mcast ; input_dropped ; output_dropped }, - Int32.(to_int ifdata_len) + l) - - let add id version name pid taps = - let body = Cstruct.append (encode_int pid) (encode_strings taps) in - encode ~name ~body version id (op_to_int Add) - - let remove id version name = encode ~name version id (op_to_int Remove) - - let subscribe id version name = encode ~name version id (op_to_int Subscribe) - - let data id version vm body = - let name = Vmm_core.id_of_string vm in - encode ~name ~body version id (op_to_int Data) - - let encode_int64 i = - let cs = Cstruct.create 8 in - Cstruct.BE.set_uint64 cs 0 i ; - cs - - let decode_int64 ?(off = 0) cs = - check_len cs (Int32.add 8l (Int32.of_int off)) >>= fun () -> - Ok (Cstruct.BE.get_uint64 cs off) - - let encode_vmm_stats = - encode_list - (fun (k, v) -> Cstruct.append (encode_string k) (encode_int64 v)) - - let decode_vmm_stats = - decode_list (fun buf -> - decode_string buf >>= fun (str, off) -> - decode_int64 ~off buf >>= fun v -> - Ok ((str, v), off + 8)) - - let encode_stats (ru, vmm, ifd) = - Cstruct.concat - [ encode_rusage ru ; - encode_vmm_stats vmm ; - encode_list encode_ifdata ifd ] - - let decode_stats cs = - check_len cs rusage_len >>= fun () -> - let ru, rest = Cstruct.split cs (Int32.to_int rusage_len) in - decode_rusage ru >>= fun ru -> - decode_vmm_stats rest >>= fun (vmm, off) -> - cs_shift rest off >>= fun rest' -> - decode_list decode_ifdata rest' >>= fun (ifs, _) -> - Ok (ru, vmm, ifs) - - let decode_pid_taps data = - decode_int data >>= fun pid -> - decode_strings (Cstruct.shift data 8) >>= fun (taps, _off) -> - Ok (pid, taps) -end - -let decode_id_ts cs = - decode_strings cs >>= fun (id, off) -> - decode_ptime ~off cs >>= fun ts -> - Ok ((id, ts), off + 16) - -let split_id id = match List.rev id with - | [] -> Error (`Msg "bad header") - | name::rest -> Ok (name, List.rev rest) - -module Log = struct - type op = - | Log - | Broadcast - | Subscribe - - let op_to_int = function - | Log -> 0x0300l - | Subscribe -> 0x0301l - | Broadcast -> 0x0302l - - let int_to_op = function - | 0x0300l -> Some Log - | 0x0301l -> Some Subscribe - | 0x0302l -> Some Broadcast - | _ -> None - - let subscribe id version name = - encode ~name version id (op_to_int Subscribe) - - let decode_log_hdr cs = - decode_id_ts cs >>= fun ((name, ts), off) -> - Ok ({ Log.ts ; name }, Cstruct.shift cs off) - - let encode_addr ip port = - let cs = Cstruct.create 6 in - Cstruct.BE.set_uint32 cs 0 (Ipaddr.V4.to_int32 ip) ; - Cstruct.BE.set_uint16 cs 4 port ; - cs - - let decode_addr cs = - check_len cs 6l >>= fun () -> - let ip = Ipaddr.V4.of_int32 (Cstruct.BE.get_uint32 cs 0) - and port = Cstruct.BE.get_uint16 cs 4 - in - Ok (ip, port) - - let encode_vm (pid, taps, block) = - let cs = encode_int pid in - let bl = encode_string (match block with None -> "" | Some x -> x) in - let taps = encode_strings taps in - Cstruct.concat [ cs ; bl ; taps ] - - let decode_vm cs = - decode_int cs >>= fun pid -> - let r = Cstruct.shift cs 8 in - decode_string r >>= fun (block, l) -> - let block = if block = "" then None else Some block in - cs_shift r l >>= fun r' -> - decode_strings r' >>= fun (taps, _) -> - Ok (pid, taps, block) - - let encode_pid_exit pid c = - let r, c = match c with - | `Exit n -> 0, n - | `Signal n -> 1, n - | `Stop n -> 2, n - in - let r_cs = encode_int r - and pid_cs = encode_int pid - and c_cs = encode_int c - in - Cstruct.concat [ pid_cs ; r_cs ; c_cs ] - - let decode_pid_exit cs = - check_len cs 24l >>= fun () -> - decode_int cs >>= fun pid -> - decode_int ~off:8 cs >>= fun r -> - decode_int ~off:16 cs >>= fun c -> - (match r with - | 0 -> Ok (`Exit c) - | 1 -> Ok (`Signal c) - | 2 -> Ok (`Stop c) - | _ -> Error (`Msg "couldn't parse exit status")) >>= fun r -> - Ok (pid, r) - - let encode_event ev = - let tag, data = match ev with - | `Startup -> 0, Cstruct.empty - | `Login (ip, port) -> 1, encode_addr ip port - | `Logout (ip, port) -> 2, encode_addr ip port - | `VM_start vm -> 3, encode_vm vm - | `VM_stop (pid, c) -> 4, encode_pid_exit pid c - in - let cs = Cstruct.create 2 in - Cstruct.BE.set_uint16 cs 0 tag ; - Cstruct.append cs data - - let decode_event cs = - check_len cs 2l >>= fun () -> - let data = Cstruct.(shift cs 2) in - match Cstruct.BE.get_uint16 cs 0 with - | 0 -> Ok `Startup - | 1 -> decode_addr data >>= fun addr -> Ok (`Login addr) - | 2 -> decode_addr data >>= fun addr -> Ok (`Logout addr) - | 3 -> decode_vm data >>= fun vm -> Ok (`VM_start vm) - | 4 -> decode_pid_exit data >>= fun ex -> Ok (`VM_stop ex) - | x -> R.error_msgf "couldn't parse event type %d" x - - let log id version hdr event = - let body = Cstruct.append (encode_ptime hdr.Log.ts) (encode_event event) in - encode ~name:hdr.Log.name ~body version id (op_to_int Log) -end - -module Vm = struct - type op = - | Create - | Destroy - | Info - | Policy - | Insert_policy - | Remove_policy - | Force_create - - let op_to_int = function - | Create -> 0x0400l - | Destroy -> 0x0401l - | Info -> 0x0402l - | Policy -> 0x0403l - | Insert_policy -> 0x0404l - | Remove_policy -> 0x0405l - | Force_create -> 0x0406l - - let int_to_op = function - | 0x0400l -> Some Create - | 0x0401l -> Some Destroy - | 0x0402l -> Some Info - | 0x0403l -> Some Policy - | 0x0404l -> Some Insert_policy - | 0x0405l -> Some Remove_policy - | 0x0406l -> Some Force_create - | _ -> None - - let policy id version name = - encode ~name version id (op_to_int Policy) - - let insert_policy id version name policy = - let body = Vmm_asn.policy_to_cstruct policy in - encode ~name ~body version id (op_to_int Insert_policy) - - let remove_policy id version name = - encode ~name version id (op_to_int Remove_policy) - - let info id version name = - encode ~name version id (op_to_int Info) - - let encode_vm vm = - let name = encode_strings vm.config.vname - and memory = encode_int vm.config.requested_memory - and cs = encode_string (Bos.Cmd.to_string vm.cmd) - and pid = encode_int vm.pid - and taps = encode_strings vm.taps - in - Cstruct.concat [ name ; memory ; cs ; pid ; taps ] - - let info_reply id version vms = - let body = encode_list encode_vm vms in - reply ~body version id (op_to_int Info) - - let policy_reply id version policies = - let body = encode_list - (fun (prefix, policy) -> - let name_cs = encode_strings prefix - and pol_cs = Vmm_asn.policy_to_cstruct policy in - Cstruct.append name_cs pol_cs) - policies - in - reply ~body version id (op_to_int Policy) - - let decode_policies buf = - decode_list (fun cs -> - decode_strings cs >>= fun (id, l) -> - cs_shift cs l >>= fun cs' -> - Vmm_asn.policy_of_cstruct cs' >>= fun (policy, cs'') -> - let off = Cstruct.len cs - Cstruct.len cs'' in - Ok ((id, policy), off)) - buf - - let decode_vm cs = - decode_strings cs >>= fun (id, l) -> - cs_shift cs l >>= fun cs' -> - decode_int cs' >>= fun memory -> - cs_shift cs' 8 >>= fun cs'' -> - decode_string cs'' >>= fun (cmd, l') -> - cs_shift cs'' l' >>= fun cs''' -> - decode_int cs''' >>= fun pid -> - cs_shift cs''' 8 >>= fun cs'''' -> - decode_strings cs'''' >>= fun (taps, l'') -> - Ok ((id, memory, cmd, pid, taps), l + 8 + l' + 8 + l'') - - let decode_vms buf = decode_list decode_vm buf - - let encode_vm_config vm = - let cpu = encode_int vm.cpuid - and mem = encode_int vm.requested_memory - and block = encode_string (match vm.block_device with None -> "" | Some x -> x) - and network = encode_strings vm.network - and vmimage = Cstruct.concat [ encode_int (vmtype_to_int (fst vm.vmimage)) ; - encode_int (Cstruct.len (snd vm.vmimage)) ; - snd vm.vmimage ] - and args = encode_strings (match vm.argv with None -> [] | Some args -> args) - in - Cstruct.concat [ cpu ; mem ; block ; network ; vmimage ; args ] - - let decode_vm_config buf = - decode_strings buf >>= fun (vname, off) -> - Logs.debug (fun m -> m "vm_config name %a" pp_id vname) ; - cs_shift buf off >>= fun buf' -> - decode_int buf' >>= fun cpuid -> - Logs.debug (fun m -> m "cpuid %d" cpuid) ; - decode_int ~off:8 buf' >>= fun requested_memory -> - Logs.debug (fun m -> m "mem %d" requested_memory) ; - cs_shift buf' 16 >>= fun buf'' -> - decode_string buf'' >>= fun (block, off) -> - Logs.debug (fun m -> m "block %s" block) ; - cs_shift buf'' off >>= fun buf''' -> - let block_device = if block = "" then None else Some block in - decode_strings buf''' >>= fun (network, off') -> - cs_shift buf''' off' >>= fun buf'''' -> - decode_int buf'''' >>= fun vmtype -> - (match int_to_vmtype vmtype with - | Some x -> Ok x - | None -> Error (`Msg "unknown vmtype")) >>= fun vmtype -> - decode_int ~off:8 buf'''' >>= fun size -> - check_len buf'''' (Int32.of_int size) >>= fun () -> - let vmimage = (vmtype, Cstruct.sub buf'''' 16 size) in - cs_shift buf'''' (16 + size) >>= fun buf''''' -> - decode_strings buf''''' >>= fun (argv, _) -> - let argv = match argv with [] -> None | xs -> Some xs in - Ok { vname ; cpuid ; requested_memory ; block_device ; network ; vmimage ; argv } - - let create id version vm = - let body = encode_vm_config vm in - encode ~name:vm.vname ~body version id (op_to_int Create) - - let force_create id version vm = - let body = encode_vm_config vm in - encode ~name:vm.vname ~body version id (op_to_int Force_create) - - let destroy id version name = - encode ~name version id (op_to_int Destroy) -end diff --git a/stats/vmm_stats.ml b/stats/vmm_stats.ml index 9571eee..9b34541 100644 --- a/stats/vmm_stats.ml +++ b/stats/vmm_stats.ml @@ -118,6 +118,7 @@ let tick t = | None -> Logs.err (fun m -> m "couldn't drop super %a from sub %a" Vmm_core.pp_id id Vmm_core.pp_id vmid) ; out | Some real_id -> let name = Vmm_core.string_of_id real_id in + let stats_encoded = Vmm_wire.Stats.(data 0L my_version name (encode_stats stats)) in (socket, vmid, stats_encoded) :: out) out xs)