remove vmm_wire, use asn.1

This commit is contained in:
Hannes Mehnert 2018-10-22 23:20:00 +02:00
parent 51a0344477
commit 1d4d7509dc
22 changed files with 1170 additions and 1102 deletions

3
.ocamlinit Normal file
View file

@ -0,0 +1,3 @@
#require "cstruct, asn1-combinators, astring, fmt, ipaddr, rresult, lwt, x509, tls, hex, bos, decompress, tls.lwt"
#directory "_build/src"
#load "albatross.cma"

View file

@ -14,7 +14,7 @@ open Lwt.Infix
open Astring open Astring
let my_version = `WV2 let my_version = `AV2
let pp_unix_error ppf e = Fmt.string ppf (Unix.error_message e) let pp_unix_error ppf e = Fmt.string ppf (Unix.error_message e)
@ -31,7 +31,8 @@ let read_console name ring channel () =
(match String.Map.find name !active with (match String.Map.find name !active with
| None -> Lwt.return_unit | None -> Lwt.return_unit
| Some fd -> | Some fd ->
Vmm_lwt.write_wire fd (Vmm_wire.Console.data my_version id t line) >>= function let header = Vmm_asn.{ version = my_version ; sequence = 0L ; id } in
Vmm_lwt.write_wire fd (header, `Command (`Console_cmd (`Console_data (t, line)))) >>= function
| Error _ -> | Error _ ->
Vmm_lwt.safe_close fd >|= fun () -> Vmm_lwt.safe_close fd >|= fun () ->
active := String.Map.remove name !active active := String.Map.remove name !active
@ -79,7 +80,7 @@ let add_fifo id =
| None -> | None ->
Error (`Msg "opening") Error (`Msg "opening")
let attach s id = let subscribe s id =
let name = Vmm_core.string_of_id id in let name = Vmm_core.string_of_id id in
Logs.debug (fun m -> m "attempting to attach %a" Vmm_core.pp_id id) ; Logs.debug (fun m -> m "attempting to attach %a" Vmm_core.pp_id id) ;
match String.Map.find name !t with match String.Map.find name !t with
@ -90,8 +91,8 @@ let attach s id =
let entries = Vmm_ring.read r in let entries = Vmm_ring.read r in
Logs.debug (fun m -> m "found %d history" (List.length entries)) ; Logs.debug (fun m -> m "found %d history" (List.length entries)) ;
Lwt_list.iter_s (fun (i, v) -> Lwt_list.iter_s (fun (i, v) ->
let msg = Vmm_wire.Console.data my_version id i v in let header = Vmm_asn.{ version = my_version ; sequence = 0L ; id } in
Vmm_lwt.write_wire s msg >|= fun _ -> ()) Vmm_lwt.write_wire s (header, `Command (`Console_cmd (`Console_data (i, v)))) >|= fun _ -> ())
entries >>= fun () -> entries >>= fun () ->
(match String.Map.find name !active with (match String.Map.find name !active with
| None -> Lwt.return_unit | None -> Lwt.return_unit
@ -109,24 +110,24 @@ let handle s addr () =
| Error _ -> | Error _ ->
Logs.err (fun m -> m "exception while reading") ; Logs.err (fun m -> m "exception while reading") ;
Lwt.return_unit Lwt.return_unit
| Ok (hdr, _) when Vmm_wire.is_reply hdr -> | Ok (_, `Success _) ->
Logs.err (fun m -> m "unexpected reply") ; Logs.err (fun m -> m "unexpected success reply") ;
loop () loop ()
| Ok (hdr, data) -> | Ok (_, `Failure _) ->
(if not (Vmm_wire.version_eq hdr.Vmm_wire.version my_version) then Logs.err (fun m -> m "unexpected failure reply") ;
loop ()
| Ok (header, `Command cmd) ->
(if not (Vmm_asn.version_eq header.Vmm_asn.version my_version) then
Lwt.return (Error (`Msg "ignoring data with bad version")) Lwt.return (Error (`Msg "ignoring data with bad version"))
else else
match Vmm_wire.decode_strings data with match cmd with
| Error e -> Lwt.return (Error e) | `Console_cmd `Console_add -> add_fifo header.Vmm_asn.id
| Ok (id, _) -> match Vmm_wire.Console.int_to_op hdr.Vmm_wire.tag with | `Console_cmd `Console_subscribe -> subscribe s header.Vmm_asn.id
| Some Vmm_wire.Console.Add_console -> add_fifo id | _ -> Lwt.return (Error (`Msg "unexpected command"))) >>= (function
| Some Vmm_wire.Console.Attach_console -> attach s id | Ok msg -> Vmm_lwt.write_wire s (header, `Success (`String msg))
| Some Vmm_wire.Console.Data -> Lwt.return (Error (`Msg "unexpected Data"))
| None -> Lwt.return (Error (`Msg "unknown command"))) >>= (function
| Ok msg -> Vmm_lwt.write_wire s (Vmm_wire.success ~msg my_version hdr.Vmm_wire.id hdr.Vmm_wire.tag)
| Error (`Msg msg) -> | Error (`Msg msg) ->
Logs.err (fun m -> m "error while processing command: %s" msg) ; Logs.err (fun m -> m "error while processing command: %s" msg) ;
Vmm_lwt.write_wire s (Vmm_wire.fail ~msg my_version hdr.Vmm_wire.id)) >>= function Vmm_lwt.write_wire s (header, `Failure msg)) >>= function
| Ok () -> loop () | Ok () -> loop ()
| Error _ -> | Error _ ->
Logs.err (fun m -> m "exception while writing to socket") ; Logs.err (fun m -> m "exception while writing to socket") ;

View file

@ -12,7 +12,7 @@
open Lwt.Infix open Lwt.Infix
let my_version = `WV2 let my_version = `AV2
let broadcast prefix data t = let broadcast prefix data t =
Lwt_list.fold_left_s (fun t (id, s) -> Lwt_list.fold_left_s (fun t (id, s) ->
@ -64,25 +64,24 @@ let tree = ref Vmm_trie.empty
let bcast = ref 0L let bcast = ref 0L
let send_history s ring id cmd_id = let send_history s ring id =
let elements = Vmm_ring.read ring in let elements = Vmm_ring.read ring in
let res = let res =
List.fold_left (fun acc (_, x) -> List.fold_left (fun acc (_, x) ->
let cs = Cstruct.of_string x in let cs = Cstruct.of_string x in
match Vmm_wire.Log.decode_log_hdr cs with match Vmm_asn.log_entry_of_cstruct cs with
| Ok (hdr, _) -> | Ok (header, ts, event) ->
begin match Vmm_core.drop_super ~super:id ~sub:hdr.Vmm_core.Log.name with if Vmm_core.is_sub_id ~super:id ~sub:header.Vmm_asn.id
| Some [] -> cs :: acc then (header, ts, event) :: acc
| _ -> acc else acc
end
| _ -> acc) | _ -> acc)
[] elements [] elements
in in
(* just need a wrapper in tag = Log.Data, id = reqid *) (* just need a wrapper in tag = Log.Data, id = reqid *)
Lwt_list.fold_left_s (fun r body -> Lwt_list.fold_left_s (fun r (header, ts, event) ->
match r with match r with
| Ok () -> | Ok () ->
let data = Vmm_wire.encode ~body my_version cmd_id (Vmm_wire.Log.op_to_int Vmm_wire.Log.Broadcast) in let data = header, `Command (`Log_cmd (`Log_data (ts, event))) in
Vmm_lwt.write_wire s data Vmm_lwt.write_wire s data
| Error e -> Lwt.return (Error e)) | Error e -> Lwt.return (Error e))
(Ok ()) res (Ok ()) res
@ -99,45 +98,43 @@ let handle mvar ring s addr () =
| Error _ -> | Error _ ->
Logs.err (fun m -> m "exception while reading") ; Logs.err (fun m -> m "exception while reading") ;
Lwt.return_unit Lwt.return_unit
| Ok (hdr, _) when Vmm_wire.is_reply hdr -> | Ok (_, `Failure _) ->
Logs.warn (fun m -> m "ignoring reply") ; Logs.warn (fun m -> m "ignoring failure") ;
loop () loop ()
| Ok (hdr, _) when not (Vmm_wire.version_eq hdr.Vmm_wire.version my_version) -> | Ok (_, `Success _) ->
Logs.warn (fun m -> m "ignoring success") ;
loop ()
| Ok (hdr, `Command (`Log_cmd lc)) ->
if not (Vmm_asn.version_eq hdr.Vmm_asn.version my_version) then begin
Logs.warn (fun m -> m "unsupported version") ; Logs.warn (fun m -> m "unsupported version") ;
Lwt.return_unit Lwt.return_unit
| Ok (hdr, data) -> match Vmm_wire.Log.int_to_op hdr.Vmm_wire.tag with end else begin
| Some Vmm_wire.Log.Log -> match lc with
begin match Vmm_wire.Log.decode_log_hdr data with | `Log_data (ts, event) ->
| Error (`Msg err) -> let data = Vmm_asn.log_entry_to_cstruct (hdr, ts, event) in
Logs.warn (fun m -> m "ignoring error %s while decoding log" err) ; Vmm_ring.write ring (ts, Cstruct.to_string data) ;
loop ()
| Ok (hdr, _) ->
Vmm_ring.write ring (hdr.Vmm_core.Log.ts, Cstruct.to_string data) ;
Lwt_mvar.put mvar data >>= fun () -> Lwt_mvar.put mvar data >>= fun () ->
let data' = Vmm_wire.encode ~body:data my_version !bcast (Vmm_wire.Log.op_to_int Vmm_wire.Log.Broadcast) in let data' =
let header = Vmm_asn.{ version = my_version ; sequence = !bcast ; id = hdr.Vmm_asn.id } in
(header, `Command (`Log_cmd (`Log_data (ts, event))))
in
bcast := Int64.succ !bcast ; bcast := Int64.succ !bcast ;
broadcast hdr.Vmm_core.Log.name data' !tree >>= fun tree' -> broadcast hdr.Vmm_asn.id data' !tree >>= fun tree' ->
tree := tree' ; tree := tree' ;
loop () loop ()
end | `Log_subscribe ->
| Some Vmm_wire.Log.Subscribe -> let tree', ret = Vmm_trie.insert hdr.Vmm_asn.id s !tree in
begin match Vmm_wire.decode_strings data with
| Error (`Msg err) ->
Logs.warn (fun m -> m "ignoring error %s while decoding subscribe" err) ;
loop ()
| Ok (id, _) ->
let tree', ret = Vmm_trie.insert id s !tree in
tree := tree' ; tree := tree' ;
(match ret with (match ret with
| None -> Lwt.return_unit | None -> Lwt.return_unit
| Some s' -> Vmm_lwt.safe_close s') >>= fun () -> | Some s' -> Vmm_lwt.safe_close s') >>= fun () ->
let out = Vmm_wire.success my_version hdr.Vmm_wire.id hdr.Vmm_wire.tag in let out = `Success `Empty in
Vmm_lwt.write_wire s out >>= function Vmm_lwt.write_wire s (hdr, out) >>= function
| Error _ -> | Error _ ->
Logs.err (fun m -> m "error while sending reply for subscribe") ; Logs.err (fun m -> m "error while sending reply for subscribe") ;
Lwt.return_unit Lwt.return_unit
| Ok () -> | Ok () ->
send_history s ring id hdr.Vmm_wire.id >>= function send_history s ring hdr.Vmm_asn.id >>= function
| Error _ -> | Error _ ->
Logs.err (fun m -> m "error while sending history") ; Logs.err (fun m -> m "error while sending history") ;
Lwt.return_unit Lwt.return_unit

View file

@ -6,11 +6,20 @@ open Astring
open Vmm_core open Vmm_core
let version = `AV2
let process fd = let process fd =
Vmm_lwt.read_wire fd >|= function Vmm_lwt.read_wire fd >|= function
| Error (`Msg m) -> Error (`Msg m) | Error _ ->
| Error _ -> Error (`Msg "read error") Error (`Msg "read or parse error")
| Ok data -> Vmm_commands.handle_reply data | Ok (header, reply) ->
if Vmm_asn.version_eq header.Vmm_asn.version version then begin
Logs.app (fun m -> m "%a" Vmm_asn.pp_wire (header, reply)) ;
Ok ()
end else begin
Logs.err (fun m -> m "version not equal") ;
Error (`Msg "version not equal")
end
let socket t = function let socket t = function
| Some x -> x | Some x -> x
@ -25,53 +34,38 @@ let connect socket_path =
let read fd = let read fd =
(* now we busy read and process output *) (* now we busy read and process output *)
let rec loop () = let rec loop () =
Vmm_lwt.read_wire fd >>= function process fd >>= function
| Error (`Msg msg) -> Logs.err (fun m -> m "error while reading %s" msg) ; loop () | Error e -> Lwt.return (Error e)
| Error _ -> Lwt.return (Error (`Msg "exception while reading"))
| Ok data -> match Vmm_commands.handle_reply data with
| Error (`Msg msg) -> Lwt.return (Error (`Msg msg))
| Ok (hdr, data) ->
if Vmm_wire.is_reply hdr then
let msg = match Vmm_wire.decode_string data with
| Error _ -> None
| Ok (m, _) -> Some m
in
Logs.app (fun m -> m "operation succeeded: %a" Fmt.(option ~none:(unit "") string) msg) ;
loop ()
else
match Vmm_commands.log_pp_reply (hdr, data) with
| Ok () -> loop () | Ok () -> loop ()
| Error (`Msg msg) -> Lwt.return (Error (`Msg msg))
in in
loop () loop ()
let handle opt_socket (cmd : Vmm_commands.t) = let handle opt_socket id (cmd : Vmm_asn.wire_command) =
let sock, next, cmd = Vmm_commands.handle cmd in let sock, next = Vmm_commands.handle cmd in
connect (socket sock opt_socket) >>= fun fd -> connect (socket sock opt_socket) >>= fun fd ->
Vmm_lwt.write_wire fd cmd >>= function let header = Vmm_asn.{ version ; sequence = 0L ; id } in
Vmm_lwt.write_wire fd (header, `Command cmd) >>= function
| Error `Exception -> Lwt.return (Error (`Msg "couldn't write")) | Error `Exception -> Lwt.return (Error (`Msg "couldn't write"))
| Ok () -> | Ok () ->
(match next with (match next with
| `Read -> read fd | `Read -> read fd
| `End -> | `End -> process fd) >>= fun res ->
process fd >|= function
| Error e -> Error e
| Ok data -> Vmm_commands.log_pp_reply data) >>= fun res ->
Vmm_lwt.safe_close fd >|= fun () -> Vmm_lwt.safe_close fd >|= fun () ->
res res
let jump opt_socket cmd = let jump opt_socket name cmd =
match match
Lwt_main.run (handle opt_socket cmd) Lwt_main.run (handle opt_socket name cmd)
with with
| Ok () -> `Ok () | Ok () -> `Ok ()
| Error (`Msg m) -> `Error (false, m) | Error (`Msg m) -> `Error (false, m)
let info_ _ opt_socket name = jump opt_socket (`Info name) let info_ _ opt_socket name = jump opt_socket name (`Vm_cmd `Vm_info)
let policy _ opt_socket name = jump opt_socket (`Policy name) let policy _ opt_socket name = jump opt_socket name (`Policy_cmd `Policy_info)
let remove_policy _ opt_socket name = jump opt_socket (`Remove_policy name) let remove_policy _ opt_socket name =
jump opt_socket name (`Policy_cmd `Policy_remove)
let add_policy _ opt_socket name vms memory cpus block bridges = let add_policy _ opt_socket name vms memory cpus block bridges =
let bridges = match bridges with let bridges = match bridges with
@ -84,10 +78,10 @@ let add_policy _ opt_socket name vms memory cpus block bridges =
and cpuids = IS.of_list cpus and cpuids = IS.of_list cpus
in in
let policy = { vms ; cpuids ; memory ; block ; bridges } in let policy = { vms ; cpuids ; memory ; block ; bridges } in
jump opt_socket (`Add_policy (name, policy)) jump opt_socket name (`Policy_cmd (`Policy_add policy))
let destroy _ opt_socket name = let destroy _ opt_socket name =
jump opt_socket (`Destroy_vm name) jump opt_socket name (`Vm_cmd `Vm_destroy)
let create _ opt_socket force name image cpuid requested_memory boot_params block_device network = let create _ opt_socket force name image cpuid requested_memory boot_params block_device network =
let image' = match Bos.OS.File.read (Fpath.v image) with let image' = match Bos.OS.File.read (Fpath.v image) with
@ -106,17 +100,17 @@ let create _ opt_socket force name image cpuid requested_memory boot_params bloc
} in } in
let cmd = let cmd =
if force then if force then
`Force_create_vm vm_config `Vm_force_create vm_config
else else
`Create_vm vm_config `Vm_create vm_config
in in
jump opt_socket cmd jump opt_socket name (`Vm_cmd cmd)
let console _ opt_socket name = jump opt_socket (`Console name) let console _ opt_socket name = jump opt_socket name (`Console_cmd `Console_subscribe)
let stats _ opt_socket name = jump opt_socket (`Statistics name) let stats _ opt_socket name = jump opt_socket name (`Stats_cmd `Stats_subscribe)
let event_log _ opt_socket name = jump opt_socket (`Log name) let event_log _ opt_socket name = jump opt_socket name (`Log_cmd `Log_subscribe)
let help _ _ man_format cmds = function let help _ _ man_format cmds = function
| None -> `Help (`Pager, None) | None -> `Help (`Pager, None)

View file

@ -16,24 +16,34 @@ let pp_stats ppf s =
open Lwt.Infix open Lwt.Infix
type out = [ let version = `AV2
| `Cons of Cstruct.t
| `Stat of Cstruct.t
| `Log of Cstruct.t
]
let state = ref (Vmm_engine.init ()) let state = ref (Vmm_engine.init version)
let create c_fd process cont = let create c_fd process cont =
Vmm_lwt.read_wire c_fd >>= function Vmm_lwt.read_wire c_fd >>= function
| Ok (hdr, data) -> | Error (`Msg msg) ->
if Vmm_wire.is_fail hdr then begin Logs.err (fun m -> m "error %s while reading from console" msg) ;
Logs.err (fun m -> m "console failed with %s" (Cstruct.to_string data)) ;
Lwt.return_unit Lwt.return_unit
end else if Vmm_wire.is_reply hdr then begin | Error _ ->
Logs.err (fun m -> m "error while reading from console") ;
Lwt.return_unit
| Ok (header, wire) ->
if not (Vmm_asn.version_eq version header.Vmm_asn.version) then begin
Logs.err (fun m -> m "invalid version while reading from console") ;
Lwt.return_unit
end else
match wire with
| `Command _ ->
Logs.err (fun m -> m "console returned a command") ;
Lwt.return_unit
| `Failure f ->
Logs.err (fun m -> m "console failed with %s" f) ;
Lwt.return_unit
| `Success _msg ->
(* assert hdr.id = id! *) (* assert hdr.id = id! *)
let await, wakeme = Lwt.wait () in let await, wakeme = Lwt.wait () in
begin match cont !state await with match cont !state await with
| Error (`Msg msg) -> | Error (`Msg msg) ->
Logs.err (fun m -> m "create continuation failed %s" msg) ; Logs.err (fun m -> m "create continuation failed %s" msg) ;
Lwt.return_unit Lwt.return_unit
@ -48,25 +58,9 @@ let create c_fd process cont =
process out' >|= fun () -> process out' >|= fun () ->
Lwt.wakeup wakeme ()) ; Lwt.wakeup wakeme ()) ;
process out >>= fun () -> process out >>= fun () ->
begin match Vmm_engine.setup_stats !state vm with let state', out = Vmm_engine.setup_stats !state vm in
| Ok (state', out) ->
state := state' ; state := state' ;
process out (* TODO: need to read from stats socket! *) process out (* TODO: need to read from stats socket! *)
| Error (`Msg e) ->
Logs.warn (fun m -> m "(ignored) error %s while setting up statistics" e) ;
Lwt.return_unit
end
end
end else begin
Logs.err (fun m -> m "reading from console %lx, %a" hdr.Vmm_wire.tag Cstruct.hexdump_pp data) ;
Lwt.return_unit
end
| Error (`Msg msg) ->
Logs.err (fun m -> m "error %s while reading from console" msg) ;
Lwt.return_unit
| Error _ ->
Logs.err (fun m -> m "error while reading from console") ;
Lwt.return_unit
let handle out c_fd fd addr = let handle out c_fd fd addr =
(* out is for `Log | `Stat | `Cons (including reconnect semantics) *) (* out is for `Log | `Stat | `Cons (including reconnect semantics) *)
@ -86,7 +80,7 @@ let handle out c_fd fd addr =
*) *)
let process xs = let process xs =
Lwt_list.iter_p (function Lwt_list.iter_p (function
| #out as o -> out o | #Vmm_engine.service_out as o -> out o
| `Data cs -> | `Data cs ->
(* rather: terminate connection *) (* rather: terminate connection *)
Vmm_lwt.write_wire fd cs >|= fun _ -> ()) xs Vmm_lwt.write_wire fd cs >|= fun _ -> ()) xs
@ -96,16 +90,15 @@ let handle out c_fd fd addr =
| Error _ -> | Error _ ->
Logs.err (fun m -> m "error while reading") ; Logs.err (fun m -> m "error while reading") ;
Lwt.return_unit Lwt.return_unit
| Ok (hdr, buf) -> | Ok wire ->
Logs.debug (fun m -> m "read sth") ; Logs.debug (fun m -> m "read sth") ;
let state', data, next = Vmm_engine.handle_command !state hdr buf in let state', data, next = Vmm_engine.handle_command !state wire in
state := state' ; state := state' ;
process data >>= fun () -> process data >>= fun () ->
match next with match next with
| `End -> Lwt.return_unit | `End -> Lwt.return_unit
| `Wait (task, out) -> task >>= fun () -> process out | `Wait (task, out) -> task >>= fun () -> process out
| `Wait_and_create (state', task, next) -> | `Wait_and_create (task, next) ->
state := state' ;
task >>= fun () -> task >>= fun () ->
let state', data, n = next !state in let state', data, n = next !state in
state := state' ; state := state' ;

View file

@ -6,20 +6,21 @@ open Topkg
let () = let () =
Pkg.describe "albatross" @@ fun _ -> Pkg.describe "albatross" @@ fun _ ->
Ok [ Ok [
Pkg.mllib "src/albatross.mllib" ;
Pkg.bin "app/vmmd" ; Pkg.bin "app/vmmd" ;
Pkg.bin "app/vmm_console" ; Pkg.bin "app/vmm_console" ;
Pkg.bin "app/vmm_log" ; Pkg.bin "app/vmm_log" ;
Pkg.bin "app/vmm_client" ; (* Pkg.bin "app/vmm_client" ;
Pkg.bin "app/vmm_tls_endpoint" ; Pkg.bin "app/vmm_tls_endpoint" ; *)
Pkg.bin "app/vmmc" ; Pkg.bin "app/vmmc" ;
Pkg.bin "provision/vmm_req_command" ; (* Pkg.bin "provision/vmm_req_command" ;
Pkg.bin "provision/vmm_req_delegation" ; Pkg.bin "provision/vmm_req_delegation" ;
Pkg.bin "provision/vmm_req_vm" ; Pkg.bin "provision/vmm_req_vm" ;
Pkg.bin "provision/vmm_sign" ; Pkg.bin "provision/vmm_sign" ;
Pkg.bin "provision/vmm_revoke" ; Pkg.bin "provision/vmm_revoke" ;
Pkg.bin "provision/vmm_gen_ca" ; Pkg.bin "provision/vmm_gen_ca" ; *)
(* Pkg.clib "stats/libvmm_stats_stubs.clib" ; *) (* Pkg.clib "stats/libvmm_stats_stubs.clib" ; *)
Pkg.bin "stats/vmm_stats_lwt" ; (* Pkg.bin "stats/vmm_stats_lwt" ;
(* Pkg.bin "app/vmm_prometheus_stats" ; *) (* Pkg.bin "app/vmm_prometheus_stats" ; *)
Pkg.bin "app/vmm_influxdb_stats" ; Pkg.bin "app/vmm_influxdb_stats" ; *)
] ]

11
src/albatross.mllib Normal file
View file

@ -0,0 +1,11 @@
Vmm_asn
Vmm_lwt
Vmm_tls
Vmm_engine
Vmm_commands
Vmm_core
Vmm_engine
Vmm_resources
Vmm_trie
Vmm_unix
Vmm_compress

View file

@ -134,7 +134,6 @@ let policy_of_cstruct, policy_to_cstruct =
| Error (`Parse msg) -> Error (`Msg msg)), | Error (`Parse msg) -> Error (`Msg msg)),
Asn.encode c) Asn.encode c)
let image = let image =
let f = function let f = function
| `C1 x -> `Hvt_amd64, x | `C1 x -> `Hvt_amd64, x
@ -165,27 +164,31 @@ let opt cert oid f =
| None -> Ok None | None -> Ok None
| Some (_, data) -> f data >>| fun s -> Some s | Some (_, data) -> f data >>| fun s -> Some s
type version = [ `AV0 | `AV1 ] type version = [ `AV0 | `AV1 | `AV2 ]
let version_of_int = function let version_of_int = function
| 0 -> Ok `AV0 | 0 -> Ok `AV0
| 1 -> Ok `AV1 | 1 -> Ok `AV1
| 2 -> Ok `AV2
| _ -> Error (`Msg "couldn't parse version") | _ -> Error (`Msg "couldn't parse version")
let version_to_int = function let version_to_int = function
| `AV0 -> 0 | `AV0 -> 0
| `AV1 -> 1 | `AV1 -> 1
| `AV2 -> 2
let pp_version ppf v = let pp_version ppf v =
Fmt.int ppf Fmt.int ppf
(match v with (match v with
| `AV0 -> 0 | `AV0 -> 0
| `AV1 -> 1) | `AV1 -> 1
| `AV2 -> 2)
let version_eq a b = let version_eq a b =
match a, b with match a, b with
| `AV0, `AV0 -> true | `AV0, `AV0 -> true
| `AV1, `AV1 -> true | `AV1, `AV1 -> true
| `AV2, `AV2 -> true
| _ -> false | _ -> false
let version_to_cstruct v = int_to_cstruct (version_to_int v) let version_to_cstruct v = int_to_cstruct (version_to_int v)
@ -260,3 +263,441 @@ let block_device_of_cert version cert =
let block_size_of_cert version cert = let block_size_of_cert version cert =
version_of_cert version cert >>= fun () -> version_of_cert version cert >>= fun () ->
req "block-size" cert Oid.memory int_of_cstruct req "block-size" cert Oid.memory int_of_cstruct
(* communication protocol *)
type console_cmd = [
| `Console_add
| `Console_subscribe
| `Console_data of Ptime.t * string
]
let pp_console_cmd ppf = function
| `Console_add -> Fmt.string ppf "console add"
| `Console_subscribe -> Fmt.string ppf "console subscribe"
| `Console_data (ts, line) -> Fmt.pf ppf "console data %a: %s"
(Ptime.pp_rfc3339 ()) ts line
let console_cmd =
let f = function
| `C1 () -> `Console_add
| `C2 () -> `Console_subscribe
| `C3 (timestamp, data) -> `Console_data (timestamp, data)
and g = function
| `Console_add -> `C1 ()
| `Console_subscribe -> `C2 ()
| `Console_data (timestamp, data) -> `C3 (timestamp, data)
in
Asn.S.map f g @@
Asn.S.(choice3
(explicit 0 null)
(explicit 1 null)
(explicit 2 (sequence2
(required ~label:"timestamp" utc_time)
(required ~label:"data" utf8_string))))
(* TODO is this good? *)
let int64 =
let f cs = Cstruct.BE.get_uint64 cs 0
and g data =
let buf = Cstruct.create 8 in
Cstruct.BE.set_uint64 buf 0 data ;
buf
in
Asn.S.map f g Asn.S.octet_string
let timeval =
Asn.S.(sequence2
(required ~label:"seconds" int64)
(required ~label:"microseconds" int))
let ru =
let f (utime, (stime, (maxrss, (ixrss, (idrss, (isrss, (minflt, (majflt, (nswap, (inblock, (outblock, (msgsnd, (msgrcv, (nsignals, (nvcsw, nivcsw))))))))))))))) =
{ utime ; stime ; maxrss ; ixrss ; idrss ; isrss ; minflt ; majflt ; nswap ; inblock ; outblock ; msgsnd ; msgrcv ; nsignals ; nvcsw ; nivcsw }
and g ru =
(ru.utime, (ru.stime, (ru.maxrss, (ru.ixrss, (ru.idrss, (ru.isrss, (ru.minflt, (ru.majflt, (ru.nswap, (ru.inblock, (ru.outblock, (ru.msgsnd, (ru.msgrcv, (ru.nsignals, (ru.nvcsw, ru.nivcsw)))))))))))))))
in
Asn.S.map f g @@
Asn.S.(sequence @@
(required ~label:"utime" timeval)
@ (required ~label:"stime" timeval)
@ (required ~label:"maxrss" int64)
@ (required ~label:"ixrss" int64)
@ (required ~label:"idrss" int64)
@ (required ~label:"isrss" int64)
@ (required ~label:"minflt" int64)
@ (required ~label:"majflt" int64)
@ (required ~label:"nswap" int64)
@ (required ~label:"inblock" int64)
@ (required ~label:"outblock" int64)
@ (required ~label:"msgsnd" int64)
@ (required ~label:"msgrcv" int64)
@ (required ~label:"nsignals" int64)
@ (required ~label:"nvcsw" int64)
-@ (required ~label:"nivcsw" int64))
(* TODO is this good? *)
let int32 =
let f i = Int32.of_int i
and g i = Int32.to_int i
in
Asn.S.map f g Asn.S.int
let ifdata =
let f (name, (flags, (send_length, (max_send_length, (send_drops, (mtu, (baudrate, (input_packets, (input_errors, (output_packets, (output_errors, (collisions, (input_bytes, (output_bytes, (input_mcast, (output_mcast, (input_dropped, output_dropped))))))))))))))))) =
{ name; flags; send_length; max_send_length; send_drops; mtu; baudrate; input_packets; input_errors; output_packets; output_errors; collisions; input_bytes; output_bytes; input_mcast; output_mcast; input_dropped; output_dropped }
and g i =
(i.name, (i.flags, (i.send_length, (i.max_send_length, (i.send_drops, (i.mtu, (i.baudrate, (i.input_packets, (i.input_errors, (i.output_packets, (i.output_errors, (i.collisions, (i.input_bytes, (i.output_bytes, (i.input_mcast, (i.output_mcast, (i.input_dropped, i.output_dropped)))))))))))))))))
in
Asn.S.map f g @@
Asn.S.(sequence @@
(required ~label:"name" utf8_string)
@ (required ~label:"flags" int32)
@ (required ~label:"send_length" int32)
@ (required ~label:"max_send_length" int32)
@ (required ~label:"send_drops" int32)
@ (required ~label:"mtu" int32)
@ (required ~label:"baudrate" int64)
@ (required ~label:"input_packets" int64)
@ (required ~label:"input_errors" int64)
@ (required ~label:"output_packets" int64)
@ (required ~label:"output_errors" int64)
@ (required ~label:"collisions" int64)
@ (required ~label:"input_bytes" int64)
@ (required ~label:"output_bytes" int64)
@ (required ~label:"input_mcast" int64)
@ (required ~label:"output_mcast" int64)
@ (required ~label:"input_dropped" int64)
-@ (required ~label:"output_dropped" int64))
type stats_cmd = [
| `Stats_add of int * string list
| `Stats_remove
| `Stats_subscribe
| `Stats_data of rusage * (string * int64) list * ifdata list
]
let pp_stats_cmd ppf = function
| `Stats_add (pid, taps) -> Fmt.pf ppf "stats add: pid %d taps %a" pid Fmt.(list ~sep:(unit ", ") string) taps
| `Stats_remove -> Fmt.string ppf "stat remove"
| `Stats_subscribe -> Fmt.string ppf "stat subscribe"
| `Stats_data (ru, vmm, ifs) -> Fmt.pf ppf "stats data: %a %a %a"
pp_rusage ru
pp_vmm vmm
Fmt.(list ~sep:(unit "@.@.") pp_ifdata) ifs
let stats_cmd =
let f = function
| `C1 (pid, taps) -> `Stats_add (pid, taps)
| `C2 () -> `Stats_remove
| `C3 () -> `Stats_subscribe
| `C4 (ru, vmm, ifdata) ->
let vmm = match vmm with None -> [] | Some vmm -> vmm
and ifdata = match ifdata with None -> [] | Some ifs -> ifs
in
`Stats_data (ru, vmm, ifdata)
and g = function
| `Stats_add (pid, taps) -> `C1 (pid, taps)
| `Stats_remove -> `C2 ()
| `Stats_subscribe -> `C3 ()
| `Stats_data (ru, vmm, ifdata) ->
let vmm = match vmm with [] -> None | xs -> Some xs
and ifs = match ifdata with [] -> None | xs -> Some xs
in
`C4 (ru, vmm, ifs)
in
Asn.S.map f g @@
Asn.S.(choice4
(explicit 0 (sequence2
(required ~label:"pid" int)
(required ~label:"taps" (sequence_of utf8_string))))
(explicit 1 null)
(explicit 2 null)
(explicit 3 (sequence3
(required ~label:"resource_usage" ru)
(optional ~label:"vmm_stats" @@ explicit 0
(sequence_of (sequence2
(required ~label:"key" utf8_string)
(required ~label:"value" int64))))
(optional ~label:"ifdata" @@ explicit 1
(sequence_of ifdata)))))
let addr =
Asn.S.(sequence2
(required ~label:"ip" ipv4)
(required ~label:"port" int))
let log_event =
let f = function
| `C1 () -> `Startup
| `C2 (ip, port) -> `Login (ip, port)
| `C3 (ip, port) -> `Logout (ip, port)
| `C4 (pid, taps, block) -> `VM_start (pid, taps, block)
| `C5 (pid, status) ->
let status' = match status with
| `C1 n -> `Exit n
| `C2 n -> `Signal n
| `C3 n -> `Stop n
in
`VM_stop (pid, status')
and g = function
| `Startup -> `C1 ()
| `Login (ip, port) -> `C2 (ip, port)
| `Logout (ip, port) -> `C3 (ip, port)
| `VM_start (pid, taps, block) -> `C4 (pid, taps, block)
| `VM_stop (pid, status) ->
let status' = match status with
| `Exit n -> `C1 n
| `Signal n -> `C2 n
| `Stop n -> `C3 n
in
`C5 (pid, status')
in
Asn.S.map f g @@
Asn.S.(choice5
(explicit 0 null)
(explicit 1 addr)
(explicit 2 addr)
(explicit 3 (sequence3
(required ~label:"pid" int)
(required ~label:"taps" (sequence_of utf8_string))
(optional ~label:"block" utf8_string)))
(explicit 4 (sequence2
(required ~label:"pid" int)
(required ~label:"status" (choice3
(explicit 0 int)
(explicit 1 int)
(explicit 2 int))))))
type log_cmd = [
| `Log_data of Ptime.t * Log.event
| `Log_subscribe
]
let pp_log_cmd ppf = function
| `Log_data (ts, event) -> Fmt.pf ppf "log data: %a %a" (Ptime.pp_rfc3339 ()) ts Log.pp_event event
| `Log_subscribe -> Fmt.string ppf "log subscribe"
let log_cmd =
let f = function
| `C1 (timestamp, event) -> `Log_data (timestamp, event)
| `C2 () -> `Log_subscribe
and g = function
| `Log_data (timestamp, event) -> `C1 (timestamp, event)
| `Log_subscribe -> `C2 ()
in
Asn.S.map f g @@
Asn.S.(choice2
(explicit 0 (sequence2
(required ~label:"timestamp" utc_time)
(required ~label:"event" log_event)))
(explicit 1 null))
type vm_cmd = [
| `Vm_info
| `Vm_create of vm_config
| `Vm_force_create of vm_config
| `Vm_destroy
]
let pp_vm_cmd ppf = function
| `Vm_info -> Fmt.string ppf "vm info"
| `Vm_create vm_config -> Fmt.pf ppf "create %a" pp_vm_config vm_config
| `Vm_force_create vm_config -> Fmt.pf ppf "force create %a" pp_vm_config vm_config
| `Vm_destroy -> Fmt.string ppf "vm destroy"
let vm_config =
let f (cpuid, requested_memory, block_device, network, vmimage, argv) =
let network = match network with None -> [] | Some xs -> xs in
{ vname = [] ; cpuid ; requested_memory ; block_device ; network ; vmimage ; argv }
and g vm =
let network = match vm.network with [] -> None | xs -> Some xs in
(vm.cpuid, vm.requested_memory, vm.block_device, network, vm.vmimage, vm.argv)
in
Asn.S.map f g @@
Asn.S.(sequence6
(required ~label:"cpu" int)
(required ~label:"memory" int)
(optional ~label:"block" utf8_string)
(optional ~label:"bridges" (sequence_of utf8_string))
(required ~label:"vmimage" image)
(optional ~label:"arguments" (sequence_of utf8_string)))
let vm_cmd =
let f = function
| `C1 () -> `Vm_info
| `C2 vm -> `Vm_create vm
| `C3 vm -> `Vm_force_create vm
| `C4 () -> `Vm_destroy
and g = function
| `Vm_info -> `C1 ()
| `Vm_create vm -> `C2 vm
| `Vm_force_create vm -> `C3 vm
| `Vm_destroy -> `C4 ()
in
Asn.S.map f g @@
Asn.S.(choice4
(explicit 0 null)
(explicit 1 vm_config)
(explicit 2 vm_config)
(explicit 3 null))
type policy_cmd = [
| `Policy_info
| `Policy_add of policy
| `Policy_remove
]
let pp_policy_cmd ppf = function
| `Policy_info -> Fmt.string ppf "policy info"
| `Policy_add policy -> Fmt.pf ppf "add policy: %a" pp_policy policy
| `Policy_remove -> Fmt.string ppf "policy remove"
let policy_cmd =
let f = function
| `C1 () -> `Policy_info
| `C2 policy -> `Policy_add policy
| `C3 () -> `Policy_remove
and g = function
| `Policy_info -> `C1 ()
| `Policy_add policy -> `C2 policy
| `Policy_remove -> `C3 ()
in
Asn.S.map f g @@
Asn.S.(choice3
(explicit 0 null)
(explicit 1 policy_obj)
(explicit 2 null))
let version =
let f data = match version_of_int data with
| Ok v -> v
| Error (`Msg m) -> Asn.S.error (`Parse m)
and g = version_to_int
in
Asn.S.map f g Asn.S.int
type wire_command = [
| `Console_cmd of console_cmd
| `Stats_cmd of stats_cmd
| `Log_cmd of log_cmd
| `Vm_cmd of vm_cmd
| `Policy_cmd of policy_cmd
]
let pp_wire_command ppf = function
| `Console_cmd c -> pp_console_cmd ppf c
| `Stats_cmd s -> pp_stats_cmd ppf s
| `Log_cmd l -> pp_log_cmd ppf l
| `Vm_cmd v -> pp_vm_cmd ppf v
| `Policy_cmd p -> pp_policy_cmd ppf p
let wire_command : wire_command Asn.S.t =
let f = function
| `C1 console -> `Console_cmd console
| `C2 stats -> `Stats_cmd stats
| `C3 log -> `Log_cmd log
| `C4 vm -> `Vm_cmd vm
| `C5 policy -> `Policy_cmd policy
and g = function
| `Console_cmd c -> `C1 c
| `Stats_cmd c -> `C2 c
| `Log_cmd c -> `C3 c
| `Vm_cmd c -> `C4 c
| `Policy_cmd c -> `C5 c
in
Asn.S.map f g @@
Asn.S.(choice5
(explicit 0 console_cmd)
(explicit 1 stats_cmd)
(explicit 2 log_cmd)
(explicit 3 vm_cmd)
(explicit 4 policy_cmd))
type header = {
version : version ;
sequence : int64 ;
id : id ;
}
let header =
let f (version, sequence, id) = { version ; sequence ; id }
and g h = h.version, h.sequence, h.id
in
Asn.S.map f g @@
Asn.S.(sequence3
(required ~label:"version" version)
(required ~label:"sequence" int64)
(required ~label:"id" (sequence_of utf8_string)))
type success = [ `Empty | `String of string | `Policies of policy list | `Vms of vm_config list ]
let pp_success ppf = function
| `Empty -> Fmt.string ppf "success"
| `String data -> Fmt.pf ppf "success: %s" data
| `Policies ps -> Fmt.(list ~sep:(unit "@.") pp_policy) ppf ps
| `Vms vms -> Fmt.(list ~sep:(unit "@.") pp_vm_config) ppf vms
type wire = header * [
| `Command of wire_command
| `Success of success
| `Failure of string ]
let pp_wire ppf (header, data) =
let id = header.id in
match data with
| `Command c -> Fmt.pf ppf "host %a: %a" pp_id id pp_wire_command c
| `Failure f -> Fmt.pf ppf "host %a: command failed %s" pp_id id f
| `Success s -> Fmt.pf ppf "host %a: %a" pp_id id pp_success s
let wire =
let f (header, payload) =
header,
match payload with
| `C1 cmd -> `Command cmd
| `C2 data ->
let p = match data with
| `C1 () -> `Empty
| `C2 str -> `String str
| `C3 policies -> `Policies policies
| `C4 vms -> `Vms vms
in
`Success p
| `C3 str -> `Failure str
and g (header, payload) =
header,
match payload with
| `Command cmd -> `C1 cmd
| `Success data ->
let p = match data with
| `Empty -> `C1 ()
| `String s -> `C2 s
| `Policies ps -> `C3 ps
| `Vms vms -> `C4 vms
in
`C2 p
| `Failure str -> `C3 str
in
Asn.S.map f g @@
Asn.S.(sequence2
(required ~label:"header" header)
(required ~label:"payload"
(choice3
(explicit 0 wire_command)
(explicit 1 (choice4
(explicit 0 null)
(explicit 1 utf8_string)
(explicit 2 (sequence_of policy_obj))
(explicit 3 (sequence_of vm_config))))
(explicit 2 utf8_string))))
let wire_of_cstruct, wire_to_cstruct = projections_of wire
type log_entry = header * Ptime.t * Log.event
let log_entry =
Asn.S.(sequence3
(required ~label:"headet" header)
(required ~label:"timestamp" utc_time)
(required ~label:"event" log_event))
let log_entry_of_cstruct, log_entry_to_cstruct = projections_of log_entry

View file

@ -75,7 +75,7 @@ end
(** {1 Encoding and decoding functions} *) (** {1 Encoding and decoding functions} *)
(** The type of versions of the ASN.1 grammar defined above. *) (** The type of versions of the ASN.1 grammar defined above. *)
type version = [ `AV0 | `AV1 ] type version = [ `AV0 | `AV1 | `AV2 ]
(** [version_eq a b] is true if [a] and [b] are equal. *) (** [version_eq a b] is true if [a] and [b] are equal. *)
val version_eq : version -> version -> bool val version_eq : version -> version -> bool
@ -171,3 +171,66 @@ val block_device_of_cert : version -> X509.t -> (string, [> `Msg of string ]) re
(** [block_size_of_cert version cert] is either the decoded block size, or an error. *) (** [block_size_of_cert version cert] is either the decoded block size, or an error. *)
val block_size_of_cert : version -> X509.t -> (int, [> `Msg of string ]) result val block_size_of_cert : version -> X509.t -> (int, [> `Msg of string ]) result
open Vmm_core
type console_cmd = [
| `Console_add
| `Console_subscribe
| `Console_data of Ptime.t * string
]
type stats_cmd = [
| `Stats_add of int * string list
| `Stats_remove
| `Stats_subscribe
| `Stats_data of rusage * (string * int64) list * ifdata list
]
type log_cmd = [
| `Log_data of Ptime.t * Log.event
| `Log_subscribe
]
type vm_cmd = [
| `Vm_info
| `Vm_create of vm_config
| `Vm_force_create of vm_config
| `Vm_destroy
]
type policy_cmd = [
| `Policy_info
| `Policy_add of policy
| `Policy_remove
]
type wire_command = [
| `Console_cmd of console_cmd
| `Stats_cmd of stats_cmd
| `Log_cmd of log_cmd
| `Vm_cmd of vm_cmd
| `Policy_cmd of policy_cmd ]
type header = {
version : version ;
sequence : int64 ;
id : id ;
}
type wire = header * [
| `Command of wire_command
| `Success of [ `Empty | `String of string | `Policies of policy list | `Vms of vm_config list ]
| `Failure of string ]
val pp_wire : wire Fmt.t
val wire_to_cstruct : wire -> Cstruct.t
val wire_of_cstruct : Cstruct.t -> (wire, [> `Msg of string ]) result
type log_entry = header * Ptime.t * Log.event
val log_entry_to_cstruct : log_entry -> Cstruct.t
val log_entry_of_cstruct : Cstruct.t -> (log_entry, [> `Msg of string ]) result

View file

@ -2,123 +2,9 @@
open Vmm_core open Vmm_core
let c = 0L
let ver = `WV2
type t = [
| `Info of id
| `Policy of id
| `Add_policy of id * policy
| `Remove_policy of id
| `Create_vm of vm_config
| `Force_create_vm of vm_config
| `Destroy_vm of id
| `Statistics of id
| `Console of id
| `Log of id
| `Crl (* TODO *)
| `Create_block of id * int
| `Destroy_block of id
]
let handle = function let handle = function
| `Info name -> | `Vm_cmd _ -> `Vmmd, `End
let cmd = Vmm_wire.Vm.info c ver name in | `Policy_cmd _ -> `Vmmd, `End
`Vmmd, `End, cmd | `Stats_cmd _ -> `Stats, `Read
| `Policy name -> | `Console_cmd _ -> `Console, `Read
let cmd = Vmm_wire.Vm.policy c ver name in | `Log_cmd _ -> `Log, `Read
`Vmmd, `End, cmd
| `Remove_policy name ->
let cmd = Vmm_wire.Vm.remove_policy c ver name in
`Vmmd, `End, cmd
| `Add_policy (name, policy) ->
let cmd = Vmm_wire.Vm.insert_policy c ver name policy in
`Vmmd, `End, cmd
| `Create_vm vm ->
let cmd = Vmm_wire.Vm.create c ver vm in
`Vmmd, `End, cmd
| `Force_create_vm vm ->
let cmd = Vmm_wire.Vm.force_create c ver vm in
`Vmmd, `End, cmd
| `Destroy_vm name ->
let cmd = Vmm_wire.Vm.destroy c ver name in
`Vmmd, `End, cmd
| `Statistics name ->
let cmd = Vmm_wire.Stats.subscribe c ver name in
`Stats, `Read, cmd
| `Console name ->
let cmd = Vmm_wire.Console.attach c ver name in
`Console, `Read, cmd
| `Log name ->
let cmd = Vmm_wire.Log.subscribe c ver name in
`Log, `Read, cmd
| `Crl -> assert false
| `Create_block (_name, _size) -> assert false
| `Destroy_block _name -> assert false
let handle_reply (hdr, data) =
if not (Vmm_wire.version_eq hdr.Vmm_wire.version ver) then
Error (`Msg "unknown wire protocol version")
else
if Vmm_wire.is_fail hdr then
let msg = match Vmm_wire.decode_string data with
| Ok (msg, _) -> msg
| Error _ -> ""
in
Error (`Msg ("command failed " ^ msg))
else if Vmm_wire.is_reply hdr && hdr.Vmm_wire.id = c then
Ok (hdr, data)
else
Error (`Msg "received unexpected data")
let log_pp_reply (hdr, data) =
let open Vmm_wire in
let tag' = Int32.logxor reply_tag hdr.tag in
let open Rresult.R.Infix in
match Vm.int_to_op tag' with
| Some Vm.Info ->
Vm.decode_vms data >>| fun (vms, _) ->
List.iter (fun (id, memory, cmd, pid, taps) ->
Logs.app (fun m -> m "VM %a %dMB command %s pid %d taps %a"
pp_id id memory cmd pid Fmt.(list ~sep:(unit ", ") string) taps))
vms
| Some Vm.Policy ->
Vm.decode_policies data >>| fun (policies, _) ->
List.iter (fun (id, policy) ->
Logs.app (fun m -> m "policy %a: %a" pp_id id pp_policy policy))
policies
| Some Vm.Insert_policy ->
Ok (Logs.app (fun m -> m "added policy"))
| Some Vm.Remove_policy ->
Ok (Logs.app (fun m -> m "removed policy"))
| Some Vm.Destroy ->
Ok (Logs.app (fun m -> m "destroyed VM"))
| Some Vm.Create ->
Ok (Logs.app (fun m -> m "successfully started VM"))
| Some Vm.Force_create ->
Ok (Logs.app (fun m -> m "successfully forcefully started VM"))
| None -> match Console.int_to_op tag' with
| Some Console.Data ->
decode_id_ts data >>= fun ((name, ts), off) ->
decode_string (Cstruct.shift data off) >>| fun (msg, _) ->
Logs.app (fun m -> m "%a %a: %s" Ptime.pp ts pp_id name msg)
| Some _ -> Error (`Msg (Printf.sprintf "unknown operation %lx" hdr.tag))
| None -> match Stats.int_to_op tag' with
| Some Stats.Data ->
decode_strings data >>= fun (name', off) ->
Stats.decode_stats (Cstruct.shift data off) >>| fun (ru, vmm, ifs) ->
Logs.app (fun m -> m "stats %a@.%a@.%a@.%a@."
pp_id name' pp_rusage ru
Fmt.(list ~sep:(unit "@.") (pair ~sep:(unit ": ") string int64)) vmm
Fmt.(list ~sep:(unit "@.") pp_ifdata) ifs)
| Some _ -> Error (`Msg (Printf.sprintf "unknown operation %lx" hdr.tag))
| None -> match Log.int_to_op tag' with
| Some Log.Broadcast ->
Log.decode_log_hdr data >>= fun (loghdr, logdata) ->
Log.decode_event logdata >>| fun event ->
Logs.app (fun m -> m "%a" Vmm_core.Log.pp (loghdr, event))
| Some _ -> Error (`Msg (Printf.sprintf "unknown operation %lx" hdr.tag))
| None -> Error (`Msg (Printf.sprintf "unknown operation %lx" hdr.tag))

7
src/vmm_commands.mli Normal file
View file

@ -0,0 +1,7 @@
val handle :
[< `Console_cmd of 'a
| `Log_cmd of 'b
| `Policy_cmd of 'c
| `Stats_cmd of 'd
| `Vm_cmd of 'e ] ->
[> `Console | `Log | `Stats | `Vmmd ] * [> `End | `Read ]

2
src/vmm_compress.mli Normal file
View file

@ -0,0 +1,2 @@
val compress : ?level:int -> string -> string
val uncompress : string -> (string, unit) result

View file

@ -295,6 +295,9 @@ let pp_rusage ppf r =
Fmt.pf ppf "utime %Lu.%d stime %Lu.%d maxrss %Lu ixrss %Lu idrss %Lu isrss %Lu minflt %Lu majflt %Lu nswap %Lu inblock %Lu outblock %Lu msgsnd %Lu msgrcv %Lu signals %Lu nvcsw %Lu nivcsw %Lu" Fmt.pf ppf "utime %Lu.%d stime %Lu.%d maxrss %Lu ixrss %Lu idrss %Lu isrss %Lu minflt %Lu majflt %Lu nswap %Lu inblock %Lu outblock %Lu msgsnd %Lu msgrcv %Lu signals %Lu nvcsw %Lu nivcsw %Lu"
(fst r.utime) (snd r.utime) (fst r.stime) (snd r.stime) r.maxrss r.ixrss r.idrss r.isrss r.minflt r.majflt r.nswap r.inblock r.outblock r.msgsnd r.msgrcv r.nsignals r.nvcsw r.nivcsw (fst r.utime) (snd r.utime) (fst r.stime) (snd r.stime) r.maxrss r.ixrss r.idrss r.isrss r.minflt r.majflt r.nswap r.inblock r.outblock r.msgsnd r.msgrcv r.nsignals r.nvcsw r.nivcsw
let pp_vmm ppf vmm =
Fmt.(list ~sep:(unit "@,") (pair ~sep:(unit ": ") string int64)) ppf vmm
type ifdata = { type ifdata = {
name : string ; name : string ;
flags : int32 ; flags : int32 ;
@ -321,16 +324,6 @@ let pp_ifdata ppf i =
i.name i.flags i.send_length i.max_send_length i.send_drops i.mtu i.baudrate i.input_packets i.input_errors i.output_packets i.output_errors i.collisions i.input_bytes i.output_bytes i.input_mcast i.output_mcast i.input_dropped i.output_dropped i.name i.flags i.send_length i.max_send_length i.send_drops i.mtu i.baudrate i.input_packets i.input_errors i.output_packets i.output_errors i.collisions i.input_bytes i.output_bytes i.input_mcast i.output_mcast i.input_dropped i.output_dropped
module Log = struct module Log = struct
type hdr = {
ts : Ptime.t ;
name : id ;
}
let pp_hdr ppf (hdr : hdr) =
Fmt.pf ppf "%a: %a" (Ptime.pp_rfc3339 ()) hdr.ts pp_id hdr.name
let hdr name = { ts = Ptime_clock.now () ; name }
type event = type event =
[ `Startup [ `Startup
| `Login of Ipaddr.V4.t * int | `Login of Ipaddr.V4.t * int
@ -354,9 +347,4 @@ module Log = struct
| `Stop n -> "stop", n | `Stop n -> "stop", n
in in
Fmt.pf ppf "STOPPED %d with %s %a" pid s Fmt.Dump.signal c Fmt.pf ppf "STOPPED %d with %s %a" pid s Fmt.Dump.signal c
type msg = hdr * event
let pp ppf (hdr, event) =
Fmt.pf ppf "%a %a" pp_hdr hdr pp_event event
end end

304
src/vmm_core.mli Normal file
View file

@ -0,0 +1,304 @@
val tmpdir : Fpath.t
val dbdir : Fpath.t
val socket_path : [< `Console | `Log | `Stats | `Vmmd ] -> string
val pp_socket :
Format.formatter -> [< `Console | `Log | `Stats | `Vmmd ] -> unit
module I : sig type t = int val compare : int -> int -> int end
module IS :
sig
type elt = I.t
type t = Set.Make(I).t
val empty : t
val is_empty : t -> bool
val mem : elt -> t -> bool
val add : elt -> t -> t
val singleton : elt -> t
val remove : elt -> t -> t
val union : t -> t -> t
val inter : t -> t -> t
val diff : t -> t -> t
val compare : t -> t -> int
val equal : t -> t -> bool
val subset : t -> t -> bool
val iter : (elt -> unit) -> t -> unit
val map : (elt -> elt) -> t -> t
val fold : (elt -> 'a -> 'a) -> t -> 'a -> 'a
val for_all : (elt -> bool) -> t -> bool
val exists : (elt -> bool) -> t -> bool
val filter : (elt -> bool) -> t -> t
val partition : (elt -> bool) -> t -> t * t
val cardinal : t -> int
val elements : t -> elt list
val min_elt : t -> elt
val min_elt_opt : t -> elt option
val max_elt : t -> elt
val max_elt_opt : t -> elt option
val choose : t -> elt
val choose_opt : t -> elt option
val split : elt -> t -> t * bool * t
val find : elt -> t -> elt
val find_opt : elt -> t -> elt option
val find_first : (elt -> bool) -> t -> elt
val find_first_opt : (elt -> bool) -> t -> elt option
val find_last : (elt -> bool) -> t -> elt
val find_last_opt : (elt -> bool) -> t -> elt option
val of_list : elt list -> t
end
module IM :
sig
type key = I.t
type 'a t = 'a Map.Make(I).t
val empty : 'a t
val is_empty : 'a t -> bool
val mem : key -> 'a t -> bool
val add : key -> 'a -> 'a t -> 'a t
val update : key -> ('a option -> 'a option) -> 'a t -> 'a t
val singleton : key -> 'a -> 'a t
val remove : key -> 'a t -> 'a t
val merge :
(key -> 'a option -> 'b option -> 'c option) -> 'a t -> 'b t -> 'c t
val union : (key -> 'a -> 'a -> 'a option) -> 'a t -> 'a t -> 'a t
val compare : ('a -> 'a -> int) -> 'a t -> 'a t -> int
val equal : ('a -> 'a -> bool) -> 'a t -> 'a t -> bool
val iter : (key -> 'a -> unit) -> 'a t -> unit
val fold : (key -> 'a -> 'b -> 'b) -> 'a t -> 'b -> 'b
val for_all : (key -> 'a -> bool) -> 'a t -> bool
val exists : (key -> 'a -> bool) -> 'a t -> bool
val filter : (key -> 'a -> bool) -> 'a t -> 'a t
val partition : (key -> 'a -> bool) -> 'a t -> 'a t * 'a t
val cardinal : 'a t -> int
val bindings : 'a t -> (key * 'a) list
val min_binding : 'a t -> key * 'a
val min_binding_opt : 'a t -> (key * 'a) option
val max_binding : 'a t -> key * 'a
val max_binding_opt : 'a t -> (key * 'a) option
val choose : 'a t -> key * 'a
val choose_opt : 'a t -> (key * 'a) option
val split : key -> 'a t -> 'a t * 'a option * 'a t
val find : key -> 'a t -> 'a
val find_opt : key -> 'a t -> 'a option
val find_first : (key -> bool) -> 'a t -> key * 'a
val find_first_opt : (key -> bool) -> 'a t -> (key * 'a) option
val find_last : (key -> bool) -> 'a t -> key * 'a
val find_last_opt : (key -> bool) -> 'a t -> (key * 'a) option
val map : ('a -> 'b) -> 'a t -> 'b t
val mapi : (key -> 'a -> 'b) -> 'a t -> 'b t
end
module IM64 :
sig
type key = Int64.t
type 'a t = 'a Map.Make(Int64).t
val empty : 'a t
val is_empty : 'a t -> bool
val mem : key -> 'a t -> bool
val add : key -> 'a -> 'a t -> 'a t
val update : key -> ('a option -> 'a option) -> 'a t -> 'a t
val singleton : key -> 'a -> 'a t
val remove : key -> 'a t -> 'a t
val merge :
(key -> 'a option -> 'b option -> 'c option) -> 'a t -> 'b t -> 'c t
val union : (key -> 'a -> 'a -> 'a option) -> 'a t -> 'a t -> 'a t
val compare : ('a -> 'a -> int) -> 'a t -> 'a t -> int
val equal : ('a -> 'a -> bool) -> 'a t -> 'a t -> bool
val iter : (key -> 'a -> unit) -> 'a t -> unit
val fold : (key -> 'a -> 'b -> 'b) -> 'a t -> 'b -> 'b
val for_all : (key -> 'a -> bool) -> 'a t -> bool
val exists : (key -> 'a -> bool) -> 'a t -> bool
val filter : (key -> 'a -> bool) -> 'a t -> 'a t
val partition : (key -> 'a -> bool) -> 'a t -> 'a t * 'a t
val cardinal : 'a t -> int
val bindings : 'a t -> (key * 'a) list
val min_binding : 'a t -> key * 'a
val min_binding_opt : 'a t -> (key * 'a) option
val max_binding : 'a t -> key * 'a
val max_binding_opt : 'a t -> (key * 'a) option
val choose : 'a t -> key * 'a
val choose_opt : 'a t -> (key * 'a) option
val split : key -> 'a t -> 'a t * 'a option * 'a t
val find : key -> 'a t -> 'a
val find_opt : key -> 'a t -> 'a option
val find_first : (key -> bool) -> 'a t -> key * 'a
val find_first_opt : (key -> bool) -> 'a t -> (key * 'a) option
val find_last : (key -> bool) -> 'a t -> key * 'a
val find_last_opt : (key -> bool) -> 'a t -> (key * 'a) option
val map : ('a -> 'b) -> 'a t -> 'b t
val mapi : (key -> 'a -> 'b) -> 'a t -> 'b t
end
type command =
[ `Console
| `Create_block
| `Create_vm
| `Crl
| `Destroy_block
| `Destroy_vm
| `Force_create_vm
| `Info
| `Log
| `Statistics ]
val pp_command :
Format.formatter ->
[< `Console
| `Create_block
| `Create_vm
| `Crl
| `Destroy_block
| `Destroy_vm
| `Force_create_vm
| `Info
| `Log
| `Statistics ] ->
unit
val command_of_string :
string ->
[> `Console
| `Create_block
| `Create_vm
| `Crl
| `Destroy_block
| `Destroy_vm
| `Force_create_vm
| `Info
| `Log
| `Statistics ]
option
type vmtype = [ `Hvt_amd64 | `Hvt_amd64_compressed | `Hvt_arm64 ]
val vmtype_to_int :
[< `Hvt_amd64 | `Hvt_amd64_compressed | `Hvt_arm64 ] -> int
val int_to_vmtype :
int -> [> `Hvt_amd64 | `Hvt_amd64_compressed | `Hvt_arm64 ] option
val pp_vmtype :
Format.formatter ->
[< `Hvt_amd64 | `Hvt_amd64_compressed | `Hvt_arm64 ] -> unit
type id = string list
val string_of_id : string list -> string
val id_of_string : string -> string list
val drop_super : super:string list -> sub:string list -> string list option
val is_sub_id : super:string list -> sub:string list -> bool
val domain : 'a list -> 'a list
val pp_id : Format.formatter -> string list -> unit
val pp_is : Format.formatter -> IS.t -> unit
type bridge =
[ `External of string * Ipaddr.V4.t * Ipaddr.V4.t * Ipaddr.V4.t * int
| `Internal of string ]
val pp_bridge :
Format.formatter ->
[< `External of string * Ipaddr.V4.t * Ipaddr.V4.t * Ipaddr.V4.t * int
| `Internal of string ] ->
unit
type policy = {
vms : int;
cpuids : IS.t;
memory : int;
block : int option;
bridges : bridge Astring.String.Map.t;
}
val pp_policy : Format.formatter -> policy -> unit
val sub_bridges :
[> `External of string * Ipaddr.V4.t * Ipaddr.V4.t * Ipaddr.V4.t * 'a
| `Internal of string ]
Astring.String.map ->
[> `External of string * Ipaddr.V4.t * Ipaddr.V4.t * Ipaddr.V4.t * 'a
| `Internal of string ]
Astring.String.map -> bool
val sub_block : 'a option -> 'a option -> bool
val sub_cpu : IS.t -> IS.t -> bool
val is_sub : super:policy -> sub:policy -> bool
type vm_config = {
vname : id;
cpuid : int;
requested_memory : int;
block_device : string option;
network : string list;
vmimage : vmtype * Cstruct.t;
argv : string list option;
}
val location : vm_config -> string * string
val pp_image :
Format.formatter ->
[< `Hvt_amd64 | `Hvt_amd64_compressed | `Hvt_arm64 ] * Cstruct.t -> unit
val pp_vm_config : Format.formatter -> vm_config -> unit
val good_bridge : string list -> 'a Astring.String.map -> bool
val vm_matches_res : policy -> vm_config -> bool
val check_policies :
vm_config -> policy list -> (unit, [> `Msg of string ]) Result.result
type vm = {
config : vm_config;
cmd : Bos.Cmd.t;
pid : int;
taps : string list;
stdout : Unix.file_descr;
}
val pp_vm : Format.formatter -> vm -> unit
val translate_tap : vm -> string -> string option
val identifier : Nocrypto.Numeric.Z.t -> string
val id : X509.t -> string
val name : X509.t -> string
val parse_db :
string list -> ((Z.t * string) list, [> Rresult.R.msg ]) Result.result
val find_in_db :
string -> 'a list -> ('a -> bool) -> ('a, [> Rresult.R.msg ]) Result.result
val find_name :
('a * string) list -> string -> ('a, [> Rresult.R.msg ]) Result.result
val translate_serial :
(Nocrypto.Numeric.Z.t * string) list -> string -> string
val translate_name : (Nocrypto.Numeric.Z.t * string) list -> string -> string
val separate_chain : 'a list -> ('a * 'a list, [> `Msg of string ]) result
type rusage = {
utime : int64 * int;
stime : int64 * int;
maxrss : int64;
ixrss : int64;
idrss : int64;
isrss : int64;
minflt : int64;
majflt : int64;
nswap : int64;
inblock : int64;
outblock : int64;
msgsnd : int64;
msgrcv : int64;
nsignals : int64;
nvcsw : int64;
nivcsw : int64;
}
val pp_rusage : Format.formatter -> rusage -> unit
val pp_vmm : (string * int64) list Fmt.t
type ifdata = {
name : string;
flags : int32;
send_length : int32;
max_send_length : int32;
send_drops : int32;
mtu : int32;
baudrate : int64;
input_packets : int64;
input_errors : int64;
output_packets : int64;
output_errors : int64;
collisions : int64;
input_bytes : int64;
output_bytes : int64;
input_mcast : int64;
output_mcast : int64;
input_dropped : int64;
output_dropped : int64;
}
val pp_ifdata : Format.formatter -> ifdata -> unit
module Log :
sig
type event =
[ `Login of Ipaddr.V4.t * int
| `Logout of Ipaddr.V4.t * int
| `Startup
| `VM_start of int * string list * string option
| `VM_stop of int * [ `Exit of int | `Signal of int | `Stop of int ] ]
val pp_event :
Format.formatter ->
[< `Login of Ipaddr.V4.t * int
| `Logout of Ipaddr.V4.t * int
| `Startup
| `VM_start of int * string list * string option
| `VM_stop of int * [< `Exit of int | `Signal of int | `Stop of int ] ] ->
unit
end

View file

@ -8,13 +8,10 @@ open Rresult
open R.Infix open R.Infix
type 'a t = { type 'a t = {
wire_version : Vmm_asn.version ;
console_counter : int64 ; console_counter : int64 ;
console_version : Vmm_wire.version ;
stats_counter : int64 ; stats_counter : int64 ;
stats_version : Vmm_wire.version ;
log_counter : int64 ; log_counter : int64 ;
log_version : Vmm_wire.version ;
client_version : Vmm_wire.version ;
(* TODO: refine, maybe: (* TODO: refine, maybe:
bridges : (Macaddr.t String.Map.t * String.Set.t) String.Map.t ; *) bridges : (Macaddr.t String.Map.t * String.Set.t) String.Map.t ; *)
used_bridges : String.Set.t String.Map.t ; used_bridges : String.Set.t String.Map.t ;
@ -23,23 +20,34 @@ type 'a t = {
tasks : 'a String.Map.t ; tasks : 'a String.Map.t ;
} }
let init () = { let init wire_version = {
console_counter = 1L ; console_version = `WV2 ; wire_version ;
stats_counter = 1L ; stats_version = `WV2 ; console_counter = 1L ;
log_counter = 1L ; log_version = `WV2 ; stats_counter = 1L ;
client_version = `WV2 ; log_counter = 1L ;
used_bridges = String.Map.empty ; used_bridges = String.Map.empty ;
resources = Vmm_resources.empty ; resources = Vmm_resources.empty ;
tasks = String.Map.empty ; tasks = String.Map.empty ;
} }
let log state (hdr, event) = type service_out = [
let data = Vmm_wire.Log.log state.log_counter state.log_version hdr event in | `Stat of Vmm_asn.wire
let log_counter = Int64.succ state.log_counter in | `Log of Vmm_asn.wire
Logs.debug (fun m -> m "LOG %a" Log.pp (hdr, event)) ; | `Cons of Vmm_asn.wire
({ state with log_counter }, `Log data) ]
type out = [ service_out | `Data of Vmm_asn.wire ]
let log t id event =
let data = `Log_data (Ptime_clock.now (), event) in
let header = Vmm_asn.{ version = t.wire_version ; sequence = t.log_counter ; id } in
let log_counter = Int64.succ t.log_counter in
Logs.debug (fun m -> m "LOG %a" Log.pp_event event) ;
({ t with log_counter }, `Log (header, `Command (`Log_cmd data)))
let handle_create t hdr vm_config = let handle_create t hdr vm_config =
(* TODO fix (remove field?) *)
let vm_config = { vm_config with vname = hdr.Vmm_asn.id } in
(match Vmm_resources.find_vm t.resources vm_config.vname with (match Vmm_resources.find_vm t.resources vm_config.vname with
| Some _ -> Error (`Msg "VM with same name is already running") | Some _ -> Error (`Msg "VM with same name is already running")
| None -> Ok ()) >>= fun () -> | None -> Ok ()) >>= fun () ->
@ -52,8 +60,9 @@ let handle_create t hdr vm_config =
Vmm_unix.prepare vm_config >>= fun taps -> Vmm_unix.prepare vm_config >>= fun taps ->
Logs.debug (fun m -> m "prepared vm with taps %a" Fmt.(list ~sep:(unit ",@ ") string) taps) ; Logs.debug (fun m -> m "prepared vm with taps %a" Fmt.(list ~sep:(unit ",@ ") string) taps) ;
(* TODO should we pre-reserve sth in t? *) (* TODO should we pre-reserve sth in t? *)
let cons = Vmm_wire.Console.add t.console_counter t.console_version vm_config.vname in let cons = `Console_add in
Ok ({ t with console_counter = Int64.succ t.console_counter }, [ `Cons cons ], let header = Vmm_asn.{ version = t.wire_version ; sequence = t.console_counter ; id = vm_config.vname } in
Ok ({ t with console_counter = Int64.succ t.console_counter }, [ `Cons (header, `Command (`Console_cmd cons)) ],
`Create (fun t task -> `Create (fun t task ->
(* actually execute the vm *) (* actually execute the vm *)
Vmm_unix.exec vm_config taps >>= fun vm -> Vmm_unix.exec vm_config taps >>= fun vm ->
@ -70,14 +79,15 @@ let handle_create t hdr vm_config =
t.used_bridges vm_config.network taps t.used_bridges vm_config.network taps
in in
let t = { t with resources ; tasks ; used_bridges } in let t = { t with resources ; tasks ; used_bridges } in
let t, out = log t (Log.hdr vm_config.vname, `VM_start (vm.pid, vm.taps, None)) in let t, out = log t vm_config.vname (`VM_start (vm.pid, vm.taps, None)) in
let data = Vmm_wire.success t.client_version hdr.Vmm_wire.id Vmm_wire.Vm.(op_to_int Create) in let data = `Success (`String "created VM") in
Ok (t, [ `Data data ; out ], vm))) Ok (t, [ `Data (hdr, data) ; out ], vm)))
let setup_stats t vm = let setup_stats t vm =
let stat_out = Vmm_wire.Stats.add t.stats_counter t.stats_version vm.config.vname vm.pid vm.taps in let stat_out = `Stats_add (vm.pid, vm.taps) in
let header = Vmm_asn.{ version = t.wire_version ; sequence = t.stats_counter ; id = vm.config.vname } in
let t = { t with stats_counter = Int64.succ t.stats_counter } in let t = { t with stats_counter = Int64.succ t.stats_counter } in
Ok (t, [ `Stat stat_out ]) t, [ `Stat (header, `Command (`Stats_cmd stat_out)) ]
let handle_shutdown t vm r = let handle_shutdown t vm r =
(match Vmm_unix.shutdown vm with (match Vmm_unix.shutdown vm with
@ -93,44 +103,40 @@ let handle_shutdown t vm r =
String.Map.add br (String.Set.remove ta old) b) String.Map.add br (String.Set.remove ta old) b)
t.used_bridges vm.config.network vm.taps t.used_bridges vm.config.network vm.taps
in in
let stat_out = Vmm_wire.Stats.remove t.stats_counter t.stats_version vm.config.vname in let stat_out = `Stats_remove in
let header = Vmm_asn.{ version = t.wire_version ; sequence = t.stats_counter ; id = vm.config.vname } in
let tasks = String.Map.remove (string_of_id vm.config.vname) t.tasks in let tasks = String.Map.remove (string_of_id vm.config.vname) t.tasks in
let t = { t with stats_counter = Int64.succ t.stats_counter ; resources ; used_bridges ; tasks } in let t = { t with stats_counter = Int64.succ t.stats_counter ; resources ; used_bridges ; tasks } in
let t, logout = log t (Log.hdr vm.config.vname, `VM_stop (vm.pid, r)) let t, logout = log t vm.config.vname (`VM_stop (vm.pid, r))
in in
(t, [ `Stat stat_out ; logout ]) (t, [ `Stat (header, `Command (`Stats_cmd stat_out)) ; logout ])
let handle_command t hdr buf = let handle_command t (header, payload) =
let msg_to_err = function let msg_to_err = function
| Ok x -> x | Ok x -> x
| Error (`Msg msg) -> | Error (`Msg msg) ->
Logs.debug (fun m -> m "error while processing command: %s" msg) ; Logs.debug (fun m -> m "error while processing command: %s" msg) ;
let out = Vmm_wire.fail ~msg t.client_version hdr.Vmm_wire.id in let out = `Failure msg in
(t, [ `Data out ], `End) (t, [ `Data (header, out) ], `End)
in in
msg_to_err ( msg_to_err (
if Vmm_wire.is_reply hdr then begin let id = header.Vmm_asn.id in
Logs.warn (fun m -> m "ignoring reply") ; match payload with
| `Failure f ->
Logs.warn (fun m -> m "ignoring failure %s" f) ;
Ok (t, [], `End) Ok (t, [], `End)
end else if not (Vmm_wire.version_eq hdr.Vmm_wire.version t.client_version) then | `Success _ ->
Error (`Msg "unknown client version") Logs.warn (fun m -> m "ignoring success") ;
else Vmm_wire.decode_strings buf >>= fun (id, off) -> Ok (t, [], `End)
match Vmm_wire.Vm.int_to_op hdr.Vmm_wire.tag with | `Command (`Policy_cmd `Policy_remove) ->
| None -> Error (`Msg "unknown command") Logs.debug (fun m -> m "remove policy %a" pp_id header.Vmm_asn.id) ;
| Some Vmm_wire.Vm.Remove_policy ->
Logs.debug (fun m -> m "remove policy %a" pp_id id) ;
let resources = Vmm_resources.remove t.resources id in let resources = Vmm_resources.remove t.resources id in
let success = Vmm_wire.success t.client_version hdr.Vmm_wire.id hdr.Vmm_wire.tag in Ok ({ t with resources }, [ `Data (header, `Success (`String "removed policy")) ], `End)
Ok ({ t with resources }, [ `Data success ], `End) | `Command (`Policy_cmd (`Policy_add policy)) ->
| Some Vmm_wire.Vm.Insert_policy ->
begin
Logs.debug (fun m -> m "insert policy %a" pp_id id) ; Logs.debug (fun m -> m "insert policy %a" pp_id id) ;
Vmm_asn.policy_of_cstruct (Cstruct.shift buf off) >>= fun (policy, _) ->
Vmm_resources.insert_policy t.resources id policy >>= fun resources -> Vmm_resources.insert_policy t.resources id policy >>= fun resources ->
let success = Vmm_wire.success t.client_version hdr.Vmm_wire.id hdr.Vmm_wire.tag in Ok ({ t with resources }, [ `Data (header, `Success (`String "added policy")) ], `End)
Ok ({ t with resources }, [ `Data success ], `End) | `Command (`Policy_cmd `Policy_info) ->
end
| Some Vmm_wire.Vm.Policy ->
begin begin
Logs.debug (fun m -> m "policy %a" pp_id id) ; Logs.debug (fun m -> m "policy %a" pp_id id) ;
let policies = let policies =
@ -144,10 +150,9 @@ let handle_command t hdr buf =
Logs.debug (fun m -> m "policies: couldn't find %a" pp_id id) ; Logs.debug (fun m -> m "policies: couldn't find %a" pp_id id) ;
Error (`Msg "policy: not found") Error (`Msg "policy: not found")
| _ -> | _ ->
let out = Vmm_wire.Vm.policy_reply hdr.Vmm_wire.id t.client_version policies in Ok (t, [ `Data (header, `Success (`Policies policies)) ], `End)
Ok (t, [ `Data out ], `End)
end end
| Some Vmm_wire.Vm.Info -> | `Command (`Vm_cmd `Vm_info) ->
begin begin
Logs.debug (fun m -> m "info %a" pp_id id) ; Logs.debug (fun m -> m "info %a" pp_id id) ;
let vms = let vms =
@ -161,44 +166,42 @@ let handle_command t hdr buf =
Logs.debug (fun m -> m "info: couldn't find %a" pp_id id) ; Logs.debug (fun m -> m "info: couldn't find %a" pp_id id) ;
Error (`Msg "info: not found") Error (`Msg "info: not found")
| _ -> | _ ->
let out = Vmm_wire.Vm.info_reply hdr.Vmm_wire.id t.client_version vms in let vm_configs = List.map (fun vm -> vm.config) vms in
Ok (t, [ `Data out ], `End) Ok (t, [ `Data (header, `Success (`Vms vm_configs)) ], `End)
end end
| Some Vmm_wire.Vm.Create -> | `Command (`Vm_cmd (`Vm_create vm_config)) ->
Vmm_wire.Vm.decode_vm_config buf >>= fun vm_config -> handle_create t header vm_config
handle_create t hdr vm_config | `Command (`Vm_cmd (`Vm_force_create vm_config)) ->
| Some Vmm_wire.Vm.Force_create ->
Vmm_wire.Vm.decode_vm_config buf >>= fun vm_config ->
let resources = Vmm_resources.remove t.resources vm_config.vname in let resources = Vmm_resources.remove t.resources vm_config.vname in
if Vmm_resources.check_vm_policy resources vm_config then if Vmm_resources.check_vm_policy resources vm_config then
begin match Vmm_resources.find_vm t.resources id with begin match Vmm_resources.find_vm t.resources id with
| None -> handle_create t hdr vm_config | None -> handle_create t header vm_config
| Some vm -> | Some vm ->
Vmm_unix.destroy vm ; Vmm_unix.destroy vm ;
let id_str = string_of_id id in let id_str = string_of_id id in
match String.Map.find_opt id_str t.tasks with match String.Map.find_opt id_str t.tasks with
| None -> handle_create t hdr vm_config | None -> handle_create t header vm_config
| Some task -> | Some task ->
let tasks = String.Map.remove id_str t.tasks in let tasks = String.Map.remove id_str t.tasks in
let t = { t with tasks } in let t = { t with tasks } in
Ok (t, [], `Wait_and_create Ok (t, [], `Wait_and_create
(t, task, fun t -> (task, fun t -> msg_to_err @@ handle_create t header vm_config))
msg_to_err @@ handle_create t hdr vm_config))
end end
else else
Error (`Msg "wouldn't match policy") Error (`Msg "wouldn't match policy")
| Some Vmm_wire.Vm.Destroy -> | `Command (`Vm_cmd `Vm_destroy) ->
match Vmm_resources.find_vm t.resources id with begin match Vmm_resources.find_vm t.resources id with
| Some vm -> | Some vm ->
Vmm_unix.destroy vm ; Vmm_unix.destroy vm ;
let id_str = string_of_id id in let id_str = string_of_id id in
let out, next = let out, next =
let success = Vmm_wire.success t.client_version hdr.Vmm_wire.id hdr.Vmm_wire.tag in let s = [ `Data (header, `Success (`String "destroyed vm")) ] in
let s = [ `Data success ] in
match String.Map.find_opt id_str t.tasks with match String.Map.find_opt id_str t.tasks with
| None -> s, `End | None -> s, `End
| Some t -> [], `Wait (t, s) | Some t -> [], `Wait (t, s)
in in
let tasks = String.Map.remove id_str t.tasks in let tasks = String.Map.remove id_str t.tasks in
Ok ({ t with tasks }, out, next) Ok ({ t with tasks }, out, next)
| None -> Error (`Msg "destroy: not found")) | None -> Error (`Msg "destroy: not found")
end
| _ -> Error (`Msg "unknown command"))

26
src/vmm_engine.mli Normal file
View file

@ -0,0 +1,26 @@
type 'a t
val init : Vmm_asn.version -> 'a t
type service_out = [
| `Stat of Vmm_asn.wire
| `Log of Vmm_asn.wire
| `Cons of Vmm_asn.wire
]
type out = [ service_out | `Data of Vmm_asn.wire ]
val handle_shutdown : 'a t -> Vmm_core.vm ->
[ `Exit of int | `Signal of int | `Stop of int ] -> 'a t * out list
val handle_command : 'a t -> Vmm_asn.wire ->
'a t * out list *
[ `Create of 'c t -> 'c -> ('c t * out list * Vmm_core.vm, [> Rresult.R.msg ]) result
| `End
| `Wait of 'a * out list
| `Wait_and_create of 'a * ('a t -> 'a t * out list *
[ `Create of 'd t -> 'd -> ('d t * out list * Vmm_core.vm, [> Rresult.R.msg ]) result
| `End ]) ]
val setup_stats : 'a t -> Vmm_core.vm -> 'a t * out list

View file

@ -42,7 +42,7 @@ let wait_and_clear pid stdout =
ret s ret s
let read_wire s = let read_wire s =
let buf = Bytes.create (Int32.to_int Vmm_wire.header_size) in let buf = Bytes.create 4 in
let rec r b i l = let rec r b i l =
Lwt.catch (fun () -> Lwt.catch (fun () ->
Lwt_unix.read s b i l >>= function Lwt_unix.read s b i l >>= function
@ -59,27 +59,31 @@ let read_wire s =
Logs.err (fun m -> m "exception %s while reading" err) ; Logs.err (fun m -> m "exception %s while reading" err) ;
Lwt.return (Error `Exception)) Lwt.return (Error `Exception))
in in
r buf 0 (Int32.to_int Vmm_wire.header_size) >>= function r buf 0 4 >>= function
| Error e -> Lwt.return (Error e) | Error e -> Lwt.return (Error e)
| Ok () -> | Ok () ->
match Vmm_wire.decode_header (Cstruct.of_bytes buf) with let len = Cstruct.BE.get_uint32 (Cstruct.of_bytes buf) 0 in
| Error (`Msg m) -> Lwt.return (Error (`Msg m)) if len > 0l then
| Ok hdr -> let b = Bytes.create (Int32.to_int len) in
let l = Int32.to_int hdr.Vmm_wire.length in r b 0 (Int32.to_int len) >|= function
if l > 0 then
let b = Bytes.create l in
r b 0 l >|= function
| Error e -> Error e | Error e -> Error e
| Ok () -> | Ok () ->
(* Logs.debug (fun m -> m "read hdr %a, body %a" (* Logs.debug (fun m -> m "read hdr %a, body %a"
Cstruct.hexdump_pp (Cstruct.of_bytes buf) Cstruct.hexdump_pp (Cstruct.of_bytes buf)
Cstruct.hexdump_pp (Cstruct.of_bytes b)) ; *) Cstruct.hexdump_pp (Cstruct.of_bytes b)) ; *)
Ok (hdr, Cstruct.of_bytes b) match Vmm_asn.wire_of_cstruct (Cstruct.of_bytes b) with
| Ok w -> Ok w
| Error (`Msg msg) ->
Logs.err (fun m -> m "error %s while parsing data" msg) ;
Error `Exception
else else
Lwt.return (Ok (hdr, Cstruct.empty)) Lwt.return (Error `Eof)
let write_wire s buf = let write_wire s wire =
let buf = Cstruct.to_bytes buf in let data = Vmm_asn.wire_to_cstruct wire in
let dlen = Cstruct.create 4 in
Cstruct.BE.set_uint32 dlen 0 (Int32.of_int (Cstruct.len data)) ;
let buf = Cstruct.(to_bytes (append dlen data)) in
let rec w off l = let rec w off l =
Lwt.catch (fun () -> Lwt.catch (fun () ->
Lwt_unix.send s buf off l [] >>= fun n -> Lwt_unix.send s buf off l [] >>= fun n ->

14
src/vmm_lwt.mli Normal file
View file

@ -0,0 +1,14 @@
val pp_sockaddr : Format.formatter -> Lwt_unix.sockaddr -> unit
val pp_process_status : Format.formatter -> Unix.process_status -> unit
val ret :
Unix.process_status -> [> `Exit of int | `Signal of int | `Stop of int ]
val waitpid : int -> (int * Lwt_unix.process_status, unit) result Lwt.t
val wait_and_clear :
int ->
Unix.file_descr -> [> `Exit of int | `Signal of int | `Stop of int ] Lwt.t
val read_wire :
Lwt_unix.file_descr ->
(Vmm_asn.wire, [> `Eof | `Exception | `Toomuch ]) result Lwt.t
val write_wire :
Lwt_unix.file_descr -> Vmm_asn.wire -> (unit, [> `Exception ]) result Lwt.t
val safe_close : Lwt_unix.file_descr -> unit Lwt.t

View file

@ -26,27 +26,32 @@ let read_tls t =
Logs.err (fun m -> m "TLS read exception %s" (Printexc.to_string e)) ; Logs.err (fun m -> m "TLS read exception %s" (Printexc.to_string e)) ;
Lwt.return (Error `Exception)) Lwt.return (Error `Exception))
in in
let buf = Cstruct.create (Int32.to_int Vmm_wire.header_size) in let buf = Cstruct.create 4 in
r_n buf 0 (Int32.to_int Vmm_wire.header_size) >>= function r_n buf 0 4 >>= function
| Error e -> Lwt.return (Error e) | Error e -> Lwt.return (Error e)
| Ok () -> | Ok () ->
match Vmm_wire.decode_header buf with let len = Cstruct.BE.get_uint32 buf 0 in
| Error (`Msg m) -> Lwt.return (Error (`Msg m)) if len > 0l then
| Ok hdr -> let b = Cstruct.create (Int32.to_int len) in
let l = Int32.to_int hdr.Vmm_wire.length in r_n b 0 (Int32.to_int len) >|= function
if l > 0 then
let b = Cstruct.create l in
r_n b 0 l >|= function
| Error e -> Error e | Error e -> Error e
| Ok () -> | Ok () ->
(* Logs.debug (fun m -> m "TLS read id %d %a tag %d data %a" (* Logs.debug (fun m -> m "TLS read id %d %a tag %d data %a"
hdr.Vmm_wire.id Vmm_wire.pp_version hdr.Vmm_wire.version hdr.Vmm_wire.tag hdr.Vmm_wire.id Vmm_wire.pp_version hdr.Vmm_wire.version hdr.Vmm_wire.tag
Cstruct.hexdump_pp b) ; *) Cstruct.hexdump_pp b) ; *)
Ok (hdr, b) match Vmm_asn.wire_of_cstruct b with
| Ok w -> Ok w
| Error (`Msg msg) ->
Logs.err (fun m -> m "error %s while parsing data" msg) ;
Error `Exception
else else
Lwt.return (Ok (hdr, Cstruct.empty)) Lwt.return (Error `Eof)
let write_tls s buf = let write_tls s wire =
let data = Vmm_asn.wire_to_cstruct wire in
let dlen = Cstruct.create 4 in
Cstruct.BE.set_uint32 dlen 0 (Int32.of_int (Cstruct.len data)) ;
let buf = Cstruct.(append dlen data) in
(* Logs.debug (fun m -> m "TLS write %a" Cstruct.hexdump_pp (Cstruct.of_string buf)) ; *) (* Logs.debug (fun m -> m "TLS write %a" Cstruct.hexdump_pp (Cstruct.of_string buf)) ; *)
Lwt.catch Lwt.catch
(fun () -> Tls_lwt.Unix.write s buf >|= fun () -> Ok ()) (fun () -> Tls_lwt.Unix.write s buf >|= fun () -> Ok ())

5
src/vmm_tls.mli Normal file
View file

@ -0,0 +1,5 @@
val read_tls :
Tls_lwt.Unix.t ->
(Vmm_asn.wire, [> `Eof | `Exception | `Toomuch ]) result Lwt.t
val write_tls :
Tls_lwt.Unix.t -> Vmm_asn.wire -> (unit, [> `Exception ]) result Lwt.t

View file

@ -1,681 +0,0 @@
(* (c) 2017 Hannes Mehnert, all rights reserved *)
(* the wire protocol - length prepended binary data *)
(* each message (on all channels) is prefixed by a common header:
- tag (32 bit) the type of message
it is only 31 bit, the highest (leftmost) bit indicates query (0) or reply (1)
a failure is reported with the special tag 0xFFFFFFFF (all bits set) - data is a string
every request leads to a reply
WV0 and WV1 used 16 bit only
- version (16 bit) the version used on this channel (used to be byte 4-6)
- padding (16 bit)
- id (64 bit) unique id chosen by sender (for request/reply) - 0 shouldn't be used (reserved for log/console messages which do not correspond to a request)
- length (32 bit) spanning the message (excluding the 20 bytes header)
- full VM name (i.e. foo.bar.baz) encoded as size of list followed by list of strings
- replies do not contain the VM name
Version and tag are protocol-specific - the channel between vmm and console
uses different tags and mayuse a different version than between vmm and
client.
every command issued is replied to with success or failure. broadcast
communication (console data, log events) are not acknowledged by the
recipient.
*)
(* TODO unlikely that this is 32bit clean *)
open Astring
open Vmm_core
type version = [ `WV0 | `WV1 | `WV2 ]
let version_to_int = function
| `WV0 -> 0
| `WV1 -> 1
| `WV2 -> 2
let version_of_int = function
| 0 -> Ok `WV0
| 1 -> Ok `WV1
| 2 -> Ok `WV2
| _ -> Error (`Msg "unknown wire version")
let version_eq a b = match a, b with
| `WV0, `WV0 -> true
| `WV1, `WV1 -> true
| `WV2, `WV2 -> true
| _ -> false
let pp_version ppf v =
Fmt.string ppf (match v with
| `WV0 -> "wire version 0"
| `WV1 -> "wire version 1"
| `WV2 -> "wire version 2")
type header = {
version : version ;
tag : int32 ;
length : int32 ;
id : int64 ;
}
let header_size = 20l
let max_size = 0x7FFFFFFFl
(* Throughout this module, we don't expect any cstruct bigger than the above
max_size (encode checks this!) *)
open Rresult
open R.Infix
let cs_create len = Cstruct.create (Int32.to_int len)
let cs_len cs =
let l = Cstruct.len cs in
assert (l lsr 31 = 0) ;
Int32.of_int l
let check_len cs l =
if Int32.compare (cs_len cs) l = -1 then
Error (`Msg "underflow")
else
Ok ()
let cs_shift cs num =
check_len cs (Int32.of_int num) >>= fun () ->
Ok (Cstruct.shift cs num)
let check_exact cs l =
if cs_len cs = l then
Ok ()
else
Error (`Msg "bad length")
let null cs = if Cstruct.len cs = 0 then Ok () else Error (`Msg "trailing bytes")
let decode_header cs =
check_len cs 8l >>= fun () ->
let version = Cstruct.BE.get_uint16 cs 4 in
version_of_int version >>= function
| `WV0 | `WV1 -> Error (`Msg "unsupported version")
| `WV2 as version ->
check_len cs header_size >>= fun () ->
let tag = Cstruct.BE.get_uint32 cs 0
and id = Cstruct.BE.get_uint64 cs 8
and length = Cstruct.BE.get_uint32 cs 16
in
Ok { length ; id ; version ; tag }
let encode_header { length ; id ; version ; tag } =
match version with
| `WV0 | `WV1 -> invalid_arg "version no longer supported"
| `WV2 ->
let hdr = cs_create header_size in
Cstruct.BE.set_uint32 hdr 0 tag ;
Cstruct.BE.set_uint16 hdr 4 (version_to_int version) ;
Cstruct.BE.set_uint64 hdr 8 id ;
Cstruct.BE.set_uint32 hdr 16 length ;
hdr
let max_str_len = 0xFFFF
let decode_string cs =
check_len cs 2l >>= fun () ->
let l = Cstruct.BE.get_uint16 cs 0 in
check_len cs (Int32.add 2l (Int32.of_int l)) >>= fun () ->
let str = Cstruct.(to_string (sub cs 2 l)) in
Ok (str, l + 2)
let encode_string str =
let l = String.length str in
assert (l < max_str_len) ;
let cs = Cstruct.create (2 + l) in
Cstruct.BE.set_uint16 cs 0 l ;
Cstruct.blit_from_string str 0 cs 2 l ;
cs
let max = Int64.of_int max_int
let min = Int64.of_int min_int
let decode_int ?(off = 0) cs =
check_len cs Int32.(add (of_int off) 8l) >>= fun () ->
let i = Cstruct.BE.get_uint64 cs off in
if i > max then
Error (`Msg "int too big")
else if i < min then
Error (`Msg "int too small")
else
Ok (Int64.to_int i)
let encode_int i =
let cs = Cstruct.create 8 in
Cstruct.BE.set_uint64 cs 0 (Int64.of_int i) ;
cs
let decode_list inner buf =
decode_int buf >>= fun len ->
let rec go acc idx = function
| 0 -> Ok (List.rev acc, idx)
| n ->
cs_shift buf idx >>= fun cs' ->
inner cs' >>= fun (data, len) ->
go (data :: acc) (idx + len) (pred n)
in
go [] 8 len
let encode_list inner data =
let cs = encode_int (List.length data) in
Cstruct.concat (cs :: (List.map inner data))
let decode_strings = decode_list decode_string
let encode_strings = encode_list encode_string
let encode ?name ?body version id tag =
let vm = match name with None -> Cstruct.empty | Some id -> encode_strings id in
let payload = match body with None -> Cstruct.empty | Some x -> x in
let header =
let length = Int32.(add (cs_len payload) (cs_len vm)) in
{ length ; id ; version ; tag }
in
Cstruct.concat [ encode_header header ; vm ; payload ]
let maybe_str = function
| None -> Cstruct.empty
| Some c -> encode_string c
let fail_tag = 0xFFFFFFFFl
let reply_tag = 0x80000000l
let is_tag v tag = Int32.logand v tag = v
let is_reply { tag ; _ } = is_tag reply_tag tag
let is_fail { tag ; _ } = is_tag fail_tag tag
let reply ?body version id tag =
encode ?body version id (Int32.logor reply_tag tag)
let fail ?msg version id =
encode ~body:(maybe_str msg) version id fail_tag
let success ?msg version id tag =
reply ~body:(maybe_str msg) version id tag
let decode_ptime ?(off = 0) cs =
cs_shift cs off >>= fun cs' ->
check_len cs' 16l >>= fun () ->
decode_int cs' >>= fun d ->
let ps = Cstruct.BE.get_uint64 cs' 8 in
Ok (Ptime.v (d, ps))
let encode_ptime ts =
let d, ps = Ptime.(Span.to_d_ps (to_span ts)) in
let cs = Cstruct.create 16 in
Cstruct.BE.set_uint64 cs 0 (Int64.of_int d) ;
Cstruct.BE.set_uint64 cs 8 ps ;
cs
module Console = struct
type op =
| Add_console
| Attach_console
| Data (* is a reply, never acked *)
let op_to_int = function
| Add_console -> 0x0100l
| Attach_console -> 0x0101l
| Data -> 0x0102l
let int_to_op = function
| 0x0100l -> Some Add_console
| 0x0101l -> Some Attach_console
| 0x0102l -> Some Data
| _ -> None
let data version name ts msg =
let body =
let ts = encode_ptime ts
and data = encode_string msg
in
Cstruct.append ts data
in
encode version ~name ~body 0L (op_to_int Data)
let add id version name =
encode ~name version id (op_to_int Add_console)
let attach id version name =
encode ~name version id (op_to_int Attach_console)
end
module Stats = struct
type op =
| Add
| Remove
| Subscribe
| Data
let op_to_int = function
| Add -> 0x0200l
| Remove -> 0x0201l
| Subscribe -> 0x0202l
| Data -> 0x0203l
let int_to_op = function
| 0x0200l -> Some Add
| 0x0201l -> Some Remove
| 0x0202l -> Some Subscribe
| 0x0203l -> Some Data
| _ -> None
let rusage_len = 144l
let encode_rusage ru =
let cs = cs_create rusage_len in
Cstruct.BE.set_uint64 cs 0 (fst ru.utime) ;
Cstruct.BE.set_uint64 cs 8 (Int64.of_int (snd ru.utime)) ;
Cstruct.BE.set_uint64 cs 16 (fst ru.stime) ;
Cstruct.BE.set_uint64 cs 24 (Int64.of_int (snd ru.stime)) ;
Cstruct.BE.set_uint64 cs 32 ru.maxrss ;
Cstruct.BE.set_uint64 cs 40 ru.ixrss ;
Cstruct.BE.set_uint64 cs 48 ru.idrss ;
Cstruct.BE.set_uint64 cs 56 ru.isrss ;
Cstruct.BE.set_uint64 cs 64 ru.minflt ;
Cstruct.BE.set_uint64 cs 72 ru.majflt ;
Cstruct.BE.set_uint64 cs 80 ru.nswap ;
Cstruct.BE.set_uint64 cs 88 ru.inblock ;
Cstruct.BE.set_uint64 cs 96 ru.outblock ;
Cstruct.BE.set_uint64 cs 104 ru.msgsnd ;
Cstruct.BE.set_uint64 cs 112 ru.msgrcv ;
Cstruct.BE.set_uint64 cs 120 ru.nsignals ;
Cstruct.BE.set_uint64 cs 128 ru.nvcsw ;
Cstruct.BE.set_uint64 cs 136 ru.nivcsw ;
cs
let decode_rusage cs =
check_exact cs rusage_len >>= fun () ->
(decode_int ~off:8 cs >>= fun ms ->
Ok (Cstruct.BE.get_uint64 cs 0, ms)) >>= fun utime ->
(decode_int ~off:24 cs >>= fun ms ->
Ok (Cstruct.BE.get_uint64 cs 16, ms)) >>= fun stime ->
let maxrss = Cstruct.BE.get_uint64 cs 32
and ixrss = Cstruct.BE.get_uint64 cs 40
and idrss = Cstruct.BE.get_uint64 cs 48
and isrss = Cstruct.BE.get_uint64 cs 56
and minflt = Cstruct.BE.get_uint64 cs 64
and majflt = Cstruct.BE.get_uint64 cs 72
and nswap = Cstruct.BE.get_uint64 cs 80
and inblock = Cstruct.BE.get_uint64 cs 88
and outblock = Cstruct.BE.get_uint64 cs 96
and msgsnd = Cstruct.BE.get_uint64 cs 104
and msgrcv = Cstruct.BE.get_uint64 cs 112
and nsignals = Cstruct.BE.get_uint64 cs 120
and nvcsw = Cstruct.BE.get_uint64 cs 128
and nivcsw = Cstruct.BE.get_uint64 cs 136
in
Ok { utime ; stime ; maxrss ; ixrss ; idrss ; isrss ; minflt ; majflt ;
nswap ; inblock ; outblock ; msgsnd ; msgrcv ; nsignals ; nvcsw ; nivcsw }
let ifdata_len = 116l
let encode_ifdata i =
let name = encode_string i.name in
let cs = cs_create ifdata_len in
Cstruct.BE.set_uint32 cs 0 i.flags ;
Cstruct.BE.set_uint32 cs 4 i.send_length ;
Cstruct.BE.set_uint32 cs 8 i.max_send_length ;
Cstruct.BE.set_uint32 cs 12 i.send_drops ;
Cstruct.BE.set_uint32 cs 16 i.mtu ;
Cstruct.BE.set_uint64 cs 20 i.baudrate ;
Cstruct.BE.set_uint64 cs 28 i.input_packets ;
Cstruct.BE.set_uint64 cs 36 i.input_errors ;
Cstruct.BE.set_uint64 cs 44 i.output_packets ;
Cstruct.BE.set_uint64 cs 52 i.output_errors ;
Cstruct.BE.set_uint64 cs 60 i.collisions ;
Cstruct.BE.set_uint64 cs 68 i.input_bytes ;
Cstruct.BE.set_uint64 cs 76 i.output_bytes ;
Cstruct.BE.set_uint64 cs 84 i.input_mcast ;
Cstruct.BE.set_uint64 cs 92 i.output_mcast ;
Cstruct.BE.set_uint64 cs 100 i.input_dropped ;
Cstruct.BE.set_uint64 cs 108 i.output_dropped ;
Cstruct.append name cs
let decode_ifdata buf =
decode_string buf >>= fun (name, l) ->
cs_shift buf l >>= fun cs ->
check_len cs ifdata_len >>= fun () ->
let flags = Cstruct.BE.get_uint32 cs 0
and send_length = Cstruct.BE.get_uint32 cs 4
and max_send_length = Cstruct.BE.get_uint32 cs 8
and send_drops = Cstruct.BE.get_uint32 cs 12
and mtu = Cstruct.BE.get_uint32 cs 16
and baudrate = Cstruct.BE.get_uint64 cs 20
and input_packets = Cstruct.BE.get_uint64 cs 28
and input_errors = Cstruct.BE.get_uint64 cs 36
and output_packets = Cstruct.BE.get_uint64 cs 44
and output_errors = Cstruct.BE.get_uint64 cs 52
and collisions = Cstruct.BE.get_uint64 cs 60
and input_bytes = Cstruct.BE.get_uint64 cs 68
and output_bytes = Cstruct.BE.get_uint64 cs 76
and input_mcast = Cstruct.BE.get_uint64 cs 84
and output_mcast = Cstruct.BE.get_uint64 cs 92
and input_dropped = Cstruct.BE.get_uint64 cs 100
and output_dropped = Cstruct.BE.get_uint64 cs 108
in
Ok ({ name ; flags ; send_length ; max_send_length ; send_drops ; mtu ;
baudrate ; input_packets ; input_errors ; output_packets ;
output_errors ; collisions ; input_bytes ; output_bytes ; input_mcast ;
output_mcast ; input_dropped ; output_dropped },
Int32.(to_int ifdata_len) + l)
let add id version name pid taps =
let body = Cstruct.append (encode_int pid) (encode_strings taps) in
encode ~name ~body version id (op_to_int Add)
let remove id version name = encode ~name version id (op_to_int Remove)
let subscribe id version name = encode ~name version id (op_to_int Subscribe)
let data id version vm body =
let name = Vmm_core.id_of_string vm in
encode ~name ~body version id (op_to_int Data)
let encode_int64 i =
let cs = Cstruct.create 8 in
Cstruct.BE.set_uint64 cs 0 i ;
cs
let decode_int64 ?(off = 0) cs =
check_len cs (Int32.add 8l (Int32.of_int off)) >>= fun () ->
Ok (Cstruct.BE.get_uint64 cs off)
let encode_vmm_stats =
encode_list
(fun (k, v) -> Cstruct.append (encode_string k) (encode_int64 v))
let decode_vmm_stats =
decode_list (fun buf ->
decode_string buf >>= fun (str, off) ->
decode_int64 ~off buf >>= fun v ->
Ok ((str, v), off + 8))
let encode_stats (ru, vmm, ifd) =
Cstruct.concat
[ encode_rusage ru ;
encode_vmm_stats vmm ;
encode_list encode_ifdata ifd ]
let decode_stats cs =
check_len cs rusage_len >>= fun () ->
let ru, rest = Cstruct.split cs (Int32.to_int rusage_len) in
decode_rusage ru >>= fun ru ->
decode_vmm_stats rest >>= fun (vmm, off) ->
cs_shift rest off >>= fun rest' ->
decode_list decode_ifdata rest' >>= fun (ifs, _) ->
Ok (ru, vmm, ifs)
let decode_pid_taps data =
decode_int data >>= fun pid ->
decode_strings (Cstruct.shift data 8) >>= fun (taps, _off) ->
Ok (pid, taps)
end
let decode_id_ts cs =
decode_strings cs >>= fun (id, off) ->
decode_ptime ~off cs >>= fun ts ->
Ok ((id, ts), off + 16)
let split_id id = match List.rev id with
| [] -> Error (`Msg "bad header")
| name::rest -> Ok (name, List.rev rest)
module Log = struct
type op =
| Log
| Broadcast
| Subscribe
let op_to_int = function
| Log -> 0x0300l
| Subscribe -> 0x0301l
| Broadcast -> 0x0302l
let int_to_op = function
| 0x0300l -> Some Log
| 0x0301l -> Some Subscribe
| 0x0302l -> Some Broadcast
| _ -> None
let subscribe id version name =
encode ~name version id (op_to_int Subscribe)
let decode_log_hdr cs =
decode_id_ts cs >>= fun ((name, ts), off) ->
Ok ({ Log.ts ; name }, Cstruct.shift cs off)
let encode_addr ip port =
let cs = Cstruct.create 6 in
Cstruct.BE.set_uint32 cs 0 (Ipaddr.V4.to_int32 ip) ;
Cstruct.BE.set_uint16 cs 4 port ;
cs
let decode_addr cs =
check_len cs 6l >>= fun () ->
let ip = Ipaddr.V4.of_int32 (Cstruct.BE.get_uint32 cs 0)
and port = Cstruct.BE.get_uint16 cs 4
in
Ok (ip, port)
let encode_vm (pid, taps, block) =
let cs = encode_int pid in
let bl = encode_string (match block with None -> "" | Some x -> x) in
let taps = encode_strings taps in
Cstruct.concat [ cs ; bl ; taps ]
let decode_vm cs =
decode_int cs >>= fun pid ->
let r = Cstruct.shift cs 8 in
decode_string r >>= fun (block, l) ->
let block = if block = "" then None else Some block in
cs_shift r l >>= fun r' ->
decode_strings r' >>= fun (taps, _) ->
Ok (pid, taps, block)
let encode_pid_exit pid c =
let r, c = match c with
| `Exit n -> 0, n
| `Signal n -> 1, n
| `Stop n -> 2, n
in
let r_cs = encode_int r
and pid_cs = encode_int pid
and c_cs = encode_int c
in
Cstruct.concat [ pid_cs ; r_cs ; c_cs ]
let decode_pid_exit cs =
check_len cs 24l >>= fun () ->
decode_int cs >>= fun pid ->
decode_int ~off:8 cs >>= fun r ->
decode_int ~off:16 cs >>= fun c ->
(match r with
| 0 -> Ok (`Exit c)
| 1 -> Ok (`Signal c)
| 2 -> Ok (`Stop c)
| _ -> Error (`Msg "couldn't parse exit status")) >>= fun r ->
Ok (pid, r)
let encode_event ev =
let tag, data = match ev with
| `Startup -> 0, Cstruct.empty
| `Login (ip, port) -> 1, encode_addr ip port
| `Logout (ip, port) -> 2, encode_addr ip port
| `VM_start vm -> 3, encode_vm vm
| `VM_stop (pid, c) -> 4, encode_pid_exit pid c
in
let cs = Cstruct.create 2 in
Cstruct.BE.set_uint16 cs 0 tag ;
Cstruct.append cs data
let decode_event cs =
check_len cs 2l >>= fun () ->
let data = Cstruct.(shift cs 2) in
match Cstruct.BE.get_uint16 cs 0 with
| 0 -> Ok `Startup
| 1 -> decode_addr data >>= fun addr -> Ok (`Login addr)
| 2 -> decode_addr data >>= fun addr -> Ok (`Logout addr)
| 3 -> decode_vm data >>= fun vm -> Ok (`VM_start vm)
| 4 -> decode_pid_exit data >>= fun ex -> Ok (`VM_stop ex)
| x -> R.error_msgf "couldn't parse event type %d" x
let log id version hdr event =
let body = Cstruct.append (encode_ptime hdr.Log.ts) (encode_event event) in
encode ~name:hdr.Log.name ~body version id (op_to_int Log)
end
module Vm = struct
type op =
| Create
| Destroy
| Info
| Policy
| Insert_policy
| Remove_policy
| Force_create
let op_to_int = function
| Create -> 0x0400l
| Destroy -> 0x0401l
| Info -> 0x0402l
| Policy -> 0x0403l
| Insert_policy -> 0x0404l
| Remove_policy -> 0x0405l
| Force_create -> 0x0406l
let int_to_op = function
| 0x0400l -> Some Create
| 0x0401l -> Some Destroy
| 0x0402l -> Some Info
| 0x0403l -> Some Policy
| 0x0404l -> Some Insert_policy
| 0x0405l -> Some Remove_policy
| 0x0406l -> Some Force_create
| _ -> None
let policy id version name =
encode ~name version id (op_to_int Policy)
let insert_policy id version name policy =
let body = Vmm_asn.policy_to_cstruct policy in
encode ~name ~body version id (op_to_int Insert_policy)
let remove_policy id version name =
encode ~name version id (op_to_int Remove_policy)
let info id version name =
encode ~name version id (op_to_int Info)
let encode_vm vm =
let name = encode_strings vm.config.vname
and memory = encode_int vm.config.requested_memory
and cs = encode_string (Bos.Cmd.to_string vm.cmd)
and pid = encode_int vm.pid
and taps = encode_strings vm.taps
in
Cstruct.concat [ name ; memory ; cs ; pid ; taps ]
let info_reply id version vms =
let body = encode_list encode_vm vms in
reply ~body version id (op_to_int Info)
let policy_reply id version policies =
let body = encode_list
(fun (prefix, policy) ->
let name_cs = encode_strings prefix
and pol_cs = Vmm_asn.policy_to_cstruct policy in
Cstruct.append name_cs pol_cs)
policies
in
reply ~body version id (op_to_int Policy)
let decode_policies buf =
decode_list (fun cs ->
decode_strings cs >>= fun (id, l) ->
cs_shift cs l >>= fun cs' ->
Vmm_asn.policy_of_cstruct cs' >>= fun (policy, cs'') ->
let off = Cstruct.len cs - Cstruct.len cs'' in
Ok ((id, policy), off))
buf
let decode_vm cs =
decode_strings cs >>= fun (id, l) ->
cs_shift cs l >>= fun cs' ->
decode_int cs' >>= fun memory ->
cs_shift cs' 8 >>= fun cs'' ->
decode_string cs'' >>= fun (cmd, l') ->
cs_shift cs'' l' >>= fun cs''' ->
decode_int cs''' >>= fun pid ->
cs_shift cs''' 8 >>= fun cs'''' ->
decode_strings cs'''' >>= fun (taps, l'') ->
Ok ((id, memory, cmd, pid, taps), l + 8 + l' + 8 + l'')
let decode_vms buf = decode_list decode_vm buf
let encode_vm_config vm =
let cpu = encode_int vm.cpuid
and mem = encode_int vm.requested_memory
and block = encode_string (match vm.block_device with None -> "" | Some x -> x)
and network = encode_strings vm.network
and vmimage = Cstruct.concat [ encode_int (vmtype_to_int (fst vm.vmimage)) ;
encode_int (Cstruct.len (snd vm.vmimage)) ;
snd vm.vmimage ]
and args = encode_strings (match vm.argv with None -> [] | Some args -> args)
in
Cstruct.concat [ cpu ; mem ; block ; network ; vmimage ; args ]
let decode_vm_config buf =
decode_strings buf >>= fun (vname, off) ->
Logs.debug (fun m -> m "vm_config name %a" pp_id vname) ;
cs_shift buf off >>= fun buf' ->
decode_int buf' >>= fun cpuid ->
Logs.debug (fun m -> m "cpuid %d" cpuid) ;
decode_int ~off:8 buf' >>= fun requested_memory ->
Logs.debug (fun m -> m "mem %d" requested_memory) ;
cs_shift buf' 16 >>= fun buf'' ->
decode_string buf'' >>= fun (block, off) ->
Logs.debug (fun m -> m "block %s" block) ;
cs_shift buf'' off >>= fun buf''' ->
let block_device = if block = "" then None else Some block in
decode_strings buf''' >>= fun (network, off') ->
cs_shift buf''' off' >>= fun buf'''' ->
decode_int buf'''' >>= fun vmtype ->
(match int_to_vmtype vmtype with
| Some x -> Ok x
| None -> Error (`Msg "unknown vmtype")) >>= fun vmtype ->
decode_int ~off:8 buf'''' >>= fun size ->
check_len buf'''' (Int32.of_int size) >>= fun () ->
let vmimage = (vmtype, Cstruct.sub buf'''' 16 size) in
cs_shift buf'''' (16 + size) >>= fun buf''''' ->
decode_strings buf''''' >>= fun (argv, _) ->
let argv = match argv with [] -> None | xs -> Some xs in
Ok { vname ; cpuid ; requested_memory ; block_device ; network ; vmimage ; argv }
let create id version vm =
let body = encode_vm_config vm in
encode ~name:vm.vname ~body version id (op_to_int Create)
let force_create id version vm =
let body = encode_vm_config vm in
encode ~name:vm.vname ~body version id (op_to_int Force_create)
let destroy id version name =
encode ~name version id (op_to_int Destroy)
end

View file

@ -118,6 +118,7 @@ let tick t =
| None -> Logs.err (fun m -> m "couldn't drop super %a from sub %a" Vmm_core.pp_id id Vmm_core.pp_id vmid) ; out | None -> Logs.err (fun m -> m "couldn't drop super %a from sub %a" Vmm_core.pp_id id Vmm_core.pp_id vmid) ; out
| Some real_id -> | Some real_id ->
let name = Vmm_core.string_of_id real_id in let name = Vmm_core.string_of_id real_id in
let stats_encoded = Vmm_wire.Stats.(data 0L my_version name (encode_stats stats)) in let stats_encoded = Vmm_wire.Stats.(data 0L my_version name (encode_stats stats)) in
(socket, vmid, stats_encoded) :: out) (socket, vmid, stats_encoded) :: out)
out xs) out xs)