fewer lists, read replies (to sockets) in vmmd

This commit is contained in:
Hannes Mehnert 2018-10-26 01:11:41 +02:00
parent aa051d62cd
commit a60f866f70
4 changed files with 97 additions and 84 deletions

11
app/vmm_cli.ml Normal file
View File

@ -0,0 +1,11 @@
let setup_log style_renderer level =
Fmt_tty.setup_std_outputs ?style_renderer ();
Logs.set_level level;
Logs.set_reporter (Logs_fmt.reporter ~dst:Format.std_formatter ())
open Cmdliner
let setup_log =
Term.(const setup_log
$ Fmt_cli.style_renderer ()
$ Logs_cli.level ())

View File

@ -1,5 +1,7 @@
(* (c) 2017 Hannes Mehnert, all rights reserved *) (* (c) 2017 Hannes Mehnert, all rights reserved *)
open Vmm_cli
type stats = { type stats = {
start : Ptime.t ; start : Ptime.t ;
vm_created : int ; vm_created : int ;
@ -20,54 +22,36 @@ let version = `AV2
let state = ref (Vmm_vmmd.init version) let state = ref (Vmm_vmmd.init version)
let create c_fd process cont = let create process cont =
Vmm_lwt.read_wire c_fd >>= function let await, wakeme = Lwt.wait () in
match cont !state await with
| Error (`Msg msg) -> | Error (`Msg msg) ->
Logs.err (fun m -> m "error %s while reading from console" msg) ; Logs.err (fun m -> m "create continuation failed %s" msg) ;
Lwt.return_unit Lwt.return_unit
| Error _ -> | Ok (state'', out, name, vm) ->
Logs.err (fun m -> m "error while reading from console") ; state := state'' ;
Lwt.return_unit s := { !s with vm_created = succ !s.vm_created } ;
| Ok (header, wire) -> Lwt.async (fun () ->
if not (Vmm_commands.version_eq version header.Vmm_commands.version) then begin Vmm_lwt.wait_and_clear vm.Vmm_core.pid vm.Vmm_core.stdout >>= fun r ->
Logs.err (fun m -> m "invalid version while reading from console") ; let state', out' = Vmm_vmmd.handle_shutdown !state name vm r in
Lwt.return_unit s := { !s with vm_destroyed = succ !s.vm_destroyed } ;
end else state := state' ;
match wire with (process out' >|= function
| `Command _ -> | Error (`Msg msg) ->
Logs.err (fun m -> m "console returned a command") ; Logs.err (fun m -> m "error %s on handling shutdown" msg)
Lwt.return_unit | Ok () -> ()) >|= fun () ->
| `Failure f -> Lwt.wakeup wakeme ()) ;
Logs.err (fun m -> m "console failed with %s" f) ; (process out >|= function
Lwt.return_unit | Error (`Msg msg) -> Logs.err (fun m -> m "error %s while setting up stats and logging" msg)
| `Data _ -> | Ok () -> ()) >>= fun () ->
Logs.err (fun m -> m "console replied with data") ; let state', out = Vmm_vmmd.setup_stats !state name vm in
Lwt.return_unit state := state' ;
| `Success _msg -> process [ out ] >|= function
(* assert hdr.id = id! *) | Error (`Msg msg) -> Logs.err (fun m -> m "error %s sending information to stats" msg)
let await, wakeme = Lwt.wait () in | Ok () -> ()
match cont !state await with
| Error (`Msg msg) ->
Logs.err (fun m -> m "create continuation failed %s" msg) ;
Lwt.return_unit
| Ok (state'', out, name, vm) ->
state := state'' ;
s := { !s with vm_created = succ !s.vm_created } ;
Lwt.async (fun () ->
Vmm_lwt.wait_and_clear vm.Vmm_core.pid vm.Vmm_core.stdout >>= fun r ->
let state', out' = Vmm_vmmd.handle_shutdown !state name vm r in
s := { !s with vm_destroyed = succ !s.vm_destroyed } ;
state := state' ;
process out' >|= fun () ->
Lwt.wakeup wakeme ()) ;
process out >>= fun () ->
let state', out = Vmm_vmmd.setup_stats !state name vm in
state := state' ;
process out (* TODO: need to read from stats socket! *)
let handle out c_fd fd addr = let handle out fd addr =
(* out is for `Log | `Stat | `Cons (including reconnect semantics) *) (* out is for `Log | `Stat | `Cons (including reconnect semantics) *)
(* need to handle data out (+ die on write failure) *)
Logs.debug (fun m -> m "connection from %a" Vmm_lwt.pp_sockaddr addr) ; Logs.debug (fun m -> m "connection from %a" Vmm_lwt.pp_sockaddr addr) ;
(* now we need to read a packet and handle it (* now we need to read a packet and handle it
(1) (1)
@ -81,12 +65,16 @@ let handle out c_fd fd addr =
-- Lwt effects happen (stats, logs, wait_and_clear) -- -- Lwt effects happen (stats, logs, wait_and_clear) --
(2) goto (1) (2) goto (1)
*) *)
let process xs = let process wires =
Lwt_list.iter_p (function Lwt_list.fold_left_s (fun r data ->
| #Vmm_vmmd.service_out as o -> out o match r, data with
| `Data cs -> | Ok (), (#Vmm_vmmd.service_out as o) -> out o
| Ok (), `Data wire ->
(* rather: terminate connection *) (* rather: terminate connection *)
Vmm_lwt.write_wire fd cs >|= fun _ -> ()) xs Vmm_lwt.write_wire fd wire >|= fun _ ->
Ok ()
| Error e, _ -> Lwt.return (Error e))
(Ok ()) wires
in in
Logs.debug (fun m -> m "now reading") ; Logs.debug (fun m -> m "now reading") ;
(Vmm_lwt.read_wire fd >>= function (Vmm_lwt.read_wire fd >>= function
@ -97,20 +85,24 @@ let handle out c_fd fd addr =
Logs.debug (fun m -> m "read sth") ; Logs.debug (fun m -> m "read sth") ;
let state', data, next = Vmm_vmmd.handle_command !state wire in let state', data, next = Vmm_vmmd.handle_command !state wire in
state := state' ; state := state' ;
process data >>= fun () -> process data >>= function
match next with | Error (`Msg msg) -> Logs.err (fun m -> m "received error %s" msg) ; Lwt.return_unit
| `End -> Lwt.return_unit | Ok () -> match next with
| `Wait (task, out) -> task >>= fun () -> process out | `End -> Lwt.return_unit
| `Wait (task, out) ->
task >>= fun () ->
process [ out ] >|= fun _ ->
()
| `Wait_and_create (task, next) -> | `Wait_and_create (task, next) ->
task >>= fun () -> task >>= fun () ->
let state', data, n = next !state in let state', data, n = next !state in
state := state' ; state := state' ;
process data >>= fun () -> process data >>= fun _ ->
(match n with (match n with
| `End -> Lwt.return_unit | `End -> Lwt.return_unit
| `Create cont -> create c_fd process cont) | `Create cont -> create process cont)
| `Create cont -> | `Create cont ->
create c_fd process cont create process cont
(* data contained a write to console, we need to wait for its reply first *) (* data contained a write to console, we need to wait for its reply first *)
) >>= fun () -> ) >>= fun () ->
Vmm_lwt.safe_close fd Vmm_lwt.safe_close fd
@ -172,33 +164,42 @@ let jump _ =
create_mbox `Stats >>= fun s -> create_mbox `Stats >>= fun s ->
(create_mbox `Log >|= function (create_mbox `Log >|= function
| None -> invalid_arg "cannot connect to log socket" | None -> invalid_arg "cannot connect to log socket"
| Some l -> l) >>= fun (l, _l_fd) -> | Some l -> l) >>= fun (l, l_fd) ->
let write_reply (header, cmd) mvar fd =
Lwt_mvar.put mvar (header, cmd) >>= fun () ->
Vmm_lwt.read_wire fd >|= function
| Ok (header', reply) ->
if not Vmm_commands.(version_eq header.version header'.version) then
Error (`Msg "wrong version in reply")
else if not Vmm_commands.(Int64.equal header.sequence header'.sequence) then
Error (`Msg "wrong id in reply")
else begin match reply with
| `Success _ -> Ok ()
| `Failure msg -> Error (`Msg msg)
| _ -> Error (`Msg "unexpected data")
end
| Error _ -> Error (`Msg "error in read")
in
let out = function let out = function
| `Stat data -> (match s with None -> Lwt.return_unit | Some (s, _s_fd) -> Lwt_mvar.put s data) | `Stat wire ->
| `Log data -> Lwt_mvar.put l data begin match s with
| `Cons data -> Lwt_mvar.put c data | None -> Lwt.return (Ok ())
| Some (s, s_fd) -> write_reply wire s s_fd
end
| `Log wire -> write_reply wire l l_fd
| `Cons wire -> write_reply wire c c_fd
in in
Lwt.async stats_loop ; Lwt.async stats_loop ;
let rec loop () = let rec loop () =
Lwt_unix.accept ss >>= fun (fd, addr) -> Lwt_unix.accept ss >>= fun (fd, addr) ->
Lwt_unix.set_close_on_exec fd ; Lwt_unix.set_close_on_exec fd ;
Lwt.async (fun () -> handle out c_fd fd addr) ; Lwt.async (fun () -> handle out fd addr) ;
loop () loop ()
in in
loop ()) loop ())
let setup_log style_renderer level =
Fmt_tty.setup_std_outputs ?style_renderer ();
Logs.set_level level;
Logs.set_reporter (Logs_fmt.reporter ~dst:Format.std_formatter ())
open Cmdliner open Cmdliner
let setup_log =
Term.(const setup_log
$ Fmt_cli.style_renderer ()
$ Logs_cli.level ())
let cmd = let cmd =
Term.(ret (const jump $ setup_log)), Term.(ret (const jump $ setup_log)),
Term.info "vmmd" ~version:"%%VERSION_NUM%%" Term.info "vmmd" ~version:"%%VERSION_NUM%%"

View File

@ -57,7 +57,8 @@ let handle_create t hdr vm_config =
let header = Vmm_commands.{ version = t.wire_version ; sequence = t.console_counter ; id = name } in let header = Vmm_commands.{ version = t.wire_version ; sequence = t.console_counter ; id = name } in
(header, `Command (`Console_cmd `Console_add)) (header, `Command (`Console_cmd `Console_add))
in in
Ok ({ t with console_counter = Int64.succ t.console_counter }, [ `Cons cons_out ], Ok ({ t with console_counter = Int64.succ t.console_counter },
[ `Cons cons_out ],
`Create (fun t task -> `Create (fun t task ->
(* actually execute the vm *) (* actually execute the vm *)
Vmm_unix.exec name vm_config taps >>= fun vm -> Vmm_unix.exec name vm_config taps >>= fun vm ->
@ -73,7 +74,7 @@ let setup_stats t name vm =
let stat_out = `Stats_add (vm.pid, vm.taps) in let stat_out = `Stats_add (vm.pid, vm.taps) in
let header = Vmm_commands.{ version = t.wire_version ; sequence = t.stats_counter ; id = name } in let header = Vmm_commands.{ version = t.wire_version ; sequence = t.stats_counter ; id = name } in
let t = { t with stats_counter = Int64.succ t.stats_counter } in let t = { t with stats_counter = Int64.succ t.stats_counter } in
t, [ `Stat (header, `Command (`Stats_cmd stat_out)) ] t, `Stat (header, `Command (`Stats_cmd stat_out))
let handle_shutdown t name vm r = let handle_shutdown t name vm r =
(match Vmm_unix.shutdown name vm with (match Vmm_unix.shutdown name vm with
@ -92,9 +93,9 @@ let handle_command t (header, payload) =
| Ok x -> x | Ok x -> x
| Error (`Msg msg) -> | Error (`Msg msg) ->
Logs.debug (fun m -> m "error while processing command: %s" msg) ; Logs.debug (fun m -> m "error while processing command: %s" msg) ;
let out = `Failure msg in (t, [ `Data (header, `Failure msg) ], `End)
(t, [ `Data (header, out) ], `End)
in in
let reply x = `Data (header, `Success x) in
msg_to_err ( msg_to_err (
let id = header.Vmm_commands.id in let id = header.Vmm_commands.id in
match payload with match payload with
@ -103,11 +104,11 @@ let handle_command t (header, payload) =
| `Policy_remove -> | `Policy_remove ->
Logs.debug (fun m -> m "remove policy %a" pp_id header.Vmm_commands.id) ; Logs.debug (fun m -> m "remove policy %a" pp_id header.Vmm_commands.id) ;
let resources = Vmm_resources.remove t.resources id in let resources = Vmm_resources.remove t.resources id in
Ok ({ t with resources }, [ `Data (header, `Success (`String "removed policy")) ], `End) Ok ({ t with resources }, [ reply (`String "removed policy") ], `End)
| `Policy_add policy -> | `Policy_add policy ->
Logs.debug (fun m -> m "insert policy %a" pp_id id) ; Logs.debug (fun m -> m "insert policy %a" pp_id id) ;
Vmm_resources.insert_policy t.resources id policy >>= fun resources -> Vmm_resources.insert_policy t.resources id policy >>= fun resources ->
Ok ({ t with resources }, [ `Data (header, `Success (`String "added policy")) ], `End) Ok ({ t with resources }, [ reply (`String "added policy") ], `End)
| `Policy_info -> | `Policy_info ->
begin begin
Logs.debug (fun m -> m "policy %a" pp_id id) ; Logs.debug (fun m -> m "policy %a" pp_id id) ;
@ -122,7 +123,7 @@ let handle_command t (header, payload) =
Logs.debug (fun m -> m "policies: couldn't find %a" pp_id id) ; Logs.debug (fun m -> m "policies: couldn't find %a" pp_id id) ;
Error (`Msg "policy: not found") Error (`Msg "policy: not found")
| _ -> | _ ->
Ok (t, [ `Data (header, `Success (`Policies policies)) ], `End) Ok (t, [ reply (`Policies policies) ], `End)
end end
end end
| `Command (`Vm_cmd vc) -> | `Command (`Vm_cmd vc) ->
@ -140,7 +141,7 @@ let handle_command t (header, payload) =
Logs.debug (fun m -> m "info: couldn't find %a" pp_id id) ; Logs.debug (fun m -> m "info: couldn't find %a" pp_id id) ;
Error (`Msg "info: not found") Error (`Msg "info: not found")
| _ -> | _ ->
Ok (t, [ `Data (header, `Success (`Vms vms)) ], `End) Ok (t, [ reply (`Vms vms) ], `End)
end end
| `Vm_create vm_config -> | `Vm_create vm_config ->
handle_create t header vm_config handle_create t header vm_config
@ -168,9 +169,9 @@ let handle_command t (header, payload) =
Vmm_unix.destroy vm ; Vmm_unix.destroy vm ;
let id_str = string_of_id id in let id_str = string_of_id id in
let out, next = let out, next =
let s = [ `Data (header, `Success (`String "destroyed vm")) ] in let s = reply (`String "destroyed vm") in
match String.Map.find_opt id_str t.tasks with match String.Map.find_opt id_str t.tasks with
| None -> s, `End | None -> [ s ], `End
| Some t -> [], `Wait (t, s) | Some t -> [], `Wait (t, s)
in in
let tasks = String.Map.remove id_str t.tasks in let tasks = String.Map.remove id_str t.tasks in

View File

@ -17,11 +17,11 @@ val handle_shutdown : 'a t -> Vmm_core.id -> Vmm_core.vm ->
val handle_command : 'a t -> Vmm_commands.wire -> val handle_command : 'a t -> Vmm_commands.wire ->
'a t * out list * 'a t * out list *
[ `Create of 'c t -> 'c -> ('c t * out list * Vmm_core.id * Vmm_core.vm, [> Rresult.R.msg ]) result [ `Create of 'c t -> 'c -> ('c t * out list * Vmm_core.id * Vmm_core.vm, [> `Msg of string ]) result
| `End | `End
| `Wait of 'a * out list | `Wait of 'a * out
| `Wait_and_create of 'a * ('a t -> 'a t * out list * | `Wait_and_create of 'a * ('a t -> 'a t * out list *
[ `Create of 'd t -> 'd -> ('d t * out list * Vmm_core.id * Vmm_core.vm, [> Rresult.R.msg ]) result [ `Create of 'd t -> 'd -> ('d t * out list * Vmm_core.id * Vmm_core.vm, [> Rresult.R.msg ]) result
| `End ]) ] | `End ]) ]
val setup_stats : 'a t -> Vmm_core.id -> Vmm_core.vm -> 'a t * out list val setup_stats : 'a t -> Vmm_core.id -> Vmm_core.vm -> 'a t * out