vmmd: fine grained output handling, gracefully handle Failure from console

This commit is contained in:
Hannes Mehnert 2019-01-27 16:07:53 +01:00
parent e362722da5
commit 16f06216ba
5 changed files with 177 additions and 187 deletions

View file

@ -24,33 +24,40 @@ let version = `AV3
let state = ref (Vmm_vmmd.init version)
let create process cont =
match cont !state with
let create stat_out log_out cons_out data_out cons succ_cont fail_cont =
cons_out "create" cons >>= function
| Error () ->
let data = fail_cont () in
data_out data
| Ok () -> match succ_cont !state with
| Error (`Msg msg) ->
Logs.err (fun m -> m "create continuation failed %s" msg) ;
Lwt.return_unit
| Ok (state', out, name, vm) ->
| Ok (state', stat, log, data, name, vm) ->
state := state' ;
s := { !s with vm_created = succ !s.vm_created } ;
Lwt.async (fun () ->
Vmm_lwt.wait_and_clear vm.Unikernel.pid >>= fun r ->
let state', out' = Vmm_vmmd.handle_shutdown !state name vm r in
let state', stat', log' = Vmm_vmmd.handle_shutdown !state name vm r in
state := state' ;
s := { !s with vm_destroyed = succ !s.vm_destroyed } ;
process "handle shutdown (stat, log)" out' >|= fun () ->
stat_out "handle shutdown stat" stat' >>= fun () ->
log_out "handle shutdown log" log' >|= fun () ->
let state', waiter_opt = Vmm_vmmd.waiter !state name in
state := state' ;
(match waiter_opt with
| None -> ()
| Some wakeme -> Lwt.wakeup wakeme ())) ;
process "setting up stat, log, reply" out
stat_out "setting up stat" stat >>= fun () ->
log_out "setting up log" log >>= fun () ->
data_out data
let register who header =
match Vmm_vmmd.register !state who Lwt.task with
| None -> Error (`Data (header, `Failure "task already registered"))
| None -> Error (header, `Failure "task already registered")
| Some (state', task) -> state := state' ; Ok task
let handle process fd addr =
let handle log_out cons_out stat_out fd addr =
Logs.debug (fun m -> m "connection from %a" Vmm_lwt.pp_sockaddr addr) ;
(* now we need to read a packet and handle it
(1)
@ -64,6 +71,11 @@ let handle process fd addr =
-- Lwt effects happen (stats, logs, wait_and_clear) --
(2) goto (1)
*)
let out wire =
(* TODO should we terminate the connection on write failure? *)
Vmm_lwt.write_wire fd wire >|= fun _ -> ()
in
let rec loop () =
Logs.debug (fun m -> m "now reading") ;
Vmm_lwt.read_wire fd >>= function
@ -72,30 +84,31 @@ let handle process fd addr =
Lwt.return_unit
| Ok wire ->
Logs.debug (fun m -> m "read %a" Vmm_commands.pp_wire wire) ;
let state', data, next = Vmm_vmmd.handle_command !state wire in
match Vmm_vmmd.handle_command !state wire with
| Error wire -> out wire
| Ok (state', next) ->
state := state' ;
process "handle command" data >>= fun () ->
match next with
| `Loop -> loop ()
| `End -> Lwt.return_unit
| `Create cont -> create process cont
| `Wait (who, out) ->
| `Loop wire -> out wire >>= loop
| `End wire -> out wire
| `Create (cons, succ, fail) ->
create stat_out log_out cons_out out cons succ fail
| `Wait (who, data) ->
(match register who (fst wire) with
| Error out' -> process "wait" [ out' ]
| Error data' -> out data'
| Ok task ->
task >>= fun () ->
process "wait" [ out ])
out data)
| `Wait_and_create (who, next) ->
(match register who (fst wire) with
| Error out' -> process "wait and create" [ out' ]
| Error data -> out data
| Ok task ->
task >>= fun () ->
let state', data, n = next !state in
match next !state with
| Error data -> out data
| Ok (state', `Create (cons, succ, fail)) ->
state := state' ;
process "wait and create" data >>= fun () ->
match n with
| `End -> Lwt.return_unit
| `Create cont -> create process cont)
create stat_out log_out cons_out out cons succ fail)
in
loop () >>= fun () ->
Vmm_lwt.safe_close fd
@ -129,39 +142,10 @@ let rec stats_loop () =
Lwt_unix.sleep 600. >>= fun () ->
stats_loop ()
let jump _ =
Sys.(set_signal sigpipe Signal_ignore);
match Vmm_vmmd.restore_unikernels () with
| Error (`Msg msg) -> Logs.err (fun m -> m "bailing out: %s" msg)
| Ok old_unikernels ->
Lwt_main.run
(server_socket `Vmmd >>= fun ss ->
(connect_client_socket `Log >|= function
| None -> invalid_arg "cannot connect to log socket"
| Some l -> l) >>= fun (l_fd, l_mut) ->
let self_destruct_mutex = Lwt_mutex.create () in
let self_destruct () =
Lwt_mutex.with_lock self_destruct_mutex (fun () ->
(if Vmm_vmmd.killall !state then
(* not too happy about the sleep here, but cleaning up resources
is really important (fifos, vm images, tap devices) - which
is done asynchronously (in the task waitpid() on the pid) *)
Lwt_unix.sleep 1.
else
Lwt.return_unit) >>= fun () ->
Vmm_lwt.safe_close ss)
in
Sys.(set_signal sigterm (Signal_handle (fun _ -> Lwt.async self_destruct)));
(connect_client_socket `Console >|= function
| None -> invalid_arg "cannot connect to console socket"
| Some c -> c) >>= fun (c_fd, c_mut) ->
connect_client_socket `Stats >>= fun s ->
let write_reply txt (header, cmd) name fd mut =
let write_reply name (fd, mut) txt (header, cmd) =
Lwt_mutex.with_lock mut (fun () ->
Vmm_lwt.write_wire fd (header, cmd) >>= function
| Error `Exception -> invalid_arg ("exception while writing to " ^ txt)
| Error `Exception -> invalid_arg ("exception during " ^ txt ^ " while writing to " ^ name)
| Ok () -> Vmm_lwt.read_wire fd) >|= function
| Ok (header', reply) ->
if not Vmm_commands.(version_eq header.version header'.version) then begin
@ -179,15 +163,10 @@ let jump _ =
Logs.debug (fun m -> m "%s: received valid reply from %s %a"
txt name Vmm_commands.pp_wire (header', reply)) ;
match reply with
| `Success _ -> ()
| `Success _ -> Ok ()
| `Failure msg ->
(* can we programatically recover from such a situation? *)
(* we at least know e.g when writing to console resulted in an error,
that we can't continue but need to roll back -- and not continue
with execvp()
-> we also should destroy image file, fifo, tap devices (i.e. Vmm_unix.shutdown) *)
Logs.err (fun m -> m "%s: received failure %s from %s" txt msg name)
Logs.err (fun m -> m "%s: received failure %s from %s" txt msg name) ;
Error ()
| _ ->
Logs.err (fun m -> m "%s: unexpected data from %s" txt name) ;
invalid_arg "unexpected data"
@ -195,40 +174,55 @@ let jump _ =
| Error _ ->
Logs.err (fun m -> m "error in read from %s" name) ;
invalid_arg "communication failure"
let jump _ =
Sys.(set_signal sigpipe Signal_ignore);
match Vmm_vmmd.restore_unikernels () with
| Error (`Msg msg) -> Logs.err (fun m -> m "bailing out: %s" msg)
| Ok old_unikernels ->
Lwt_main.run
(server_socket `Vmmd >>= fun ss ->
(connect_client_socket `Log >|= function
| None -> invalid_arg "cannot connect to log socket"
| Some l -> l) >>= fun l ->
let self_destruct_mutex = Lwt_mutex.create () in
let self_destruct () =
Lwt_mutex.with_lock self_destruct_mutex (fun () ->
(if Vmm_vmmd.killall !state then
(* not too happy about the sleep here, but cleaning up resources
is really important (fifos, vm images, tap devices) - which
is done asynchronously (in the task waitpid() on the pid) *)
Lwt_unix.sleep 1.
else
Lwt.return_unit) >>= fun () ->
Vmm_lwt.safe_close ss)
in
let out txt = function
| `Stat wire ->
begin match s with
| None -> Lwt.return_unit
| Some (s_fd, s_mut) -> write_reply txt wire "stats" s_fd s_mut
end
| `Log wire -> write_reply txt wire "log" l_fd l_mut
| `Cons wire -> write_reply txt wire "console" c_fd c_mut
in
let process ?fd txt wires =
Lwt_list.iter_p (function
| (#Vmm_vmmd.service_out as o) -> out txt o
| `Data wire -> match fd with
| None ->
Logs.app (fun m -> m "%s received %a" txt Vmm_commands.pp_wire wire) ;
Lwt.return_unit
| Some fd ->
(* TODO should we terminate the connection on write failure? *)
Vmm_lwt.write_wire fd wire >|= fun _ ->
()) wires
Sys.(set_signal sigterm (Signal_handle (fun _ -> Lwt.async self_destruct)));
(connect_client_socket `Console >|= function
| None -> invalid_arg "cannot connect to console socket"
| Some c -> c) >>= fun c ->
connect_client_socket `Stats >>= fun s ->
let log_out txt wire = write_reply "log" l txt wire >|= fun _ -> ()
and cons_out = write_reply "cons" c
and stat_out txt wire = match s with
| None -> Logs.info (fun m -> m "ignoring stat %s %a" txt Vmm_commands.pp_wire wire) ; Lwt.return_unit
| Some s -> write_reply "stat" s txt wire >|= fun _ -> ()
in
Lwt.async stats_loop ;
let start_unikernel (name, config) =
match Vmm_vmmd.handle_create !state [] name config with
let hdr = Vmm_commands.{ version ; sequence = 0L ; name = Name.root }
and data_out _ = Lwt.return_unit
in
match Vmm_vmmd.handle_create !state hdr name config with
| Error (`Msg msg) ->
Logs.err (fun m -> m "failed to restart %a: %s" Name.pp name msg) ;
Lwt.return_unit
| Ok (state', out, `Create next) ->
| Ok (state', `Create (cons, succ, fail)) ->
state := state' ;
process "create from dump" out >>= fun () ->
create process next
create stat_out log_out cons_out data_out cons succ fail
in
Lwt_list.iter_p start_unikernel (Vmm_trie.all old_unikernels) >>= fun () ->
@ -236,7 +230,7 @@ let jump _ =
let rec loop () =
Lwt_unix.accept ss >>= fun (fd, addr) ->
Lwt_unix.set_close_on_exec fd ;
Lwt.async (fun () -> handle (process ~fd) fd addr) ;
Lwt.async (fun () -> handle log_out cons_out stat_out fd addr) ;
loop ()
in
loop ())

View file

@ -150,6 +150,12 @@ let prepare name vm =
Bos.OS.File.write (Name.image_file name) (Cstruct.to_string image) >>= fun () ->
Ok (List.rev taps)
let free_resources name taps =
(* same order as prepare! *)
Bos.OS.File.delete (Name.image_file name) >>= fun () ->
Bos.OS.File.delete (Name.fifo_file name) >>= fun () ->
List.fold_left (fun r n -> r >>= fun () -> destroy_tap n) (Ok ()) taps
let vm_device vm =
match Lazy.force uname with
| x when x = "FreeBSD" -> Ok ("solo5-" ^ string_of_int vm.Unikernel.pid)
@ -162,10 +168,7 @@ let shutdown name vm =
| x, Ok name when x = "FreeBSD" ->
ignore (Bos.OS.Cmd.run Bos.Cmd.(v "bhyvectl" % "--destroy" % ("--vm=" ^ name)))
| _ -> ()) ;
(* same order as prepare! *)
Bos.OS.File.delete (Name.image_file name) >>= fun () ->
Bos.OS.File.delete (Name.fifo_file name) >>= fun () ->
List.fold_left (fun r n -> r >>= fun () -> destroy_tap n) (Ok ()) vm.Unikernel.taps
free_resources name vm.Unikernel.taps
let cpuset cpu =
let cpustring = string_of_int cpu in

View file

@ -9,6 +9,8 @@ val prepare : Name.t -> Unikernel.config -> (string list, [> R.msg ]) result
val exec : Name.t -> Unikernel.config -> string list -> Name.t option ->
(Unikernel.t, [> R.msg ]) result
val free_resources : Name.t -> string list -> (unit, [> R.msg ]) result
val shutdown : Name.t -> Unikernel.t -> (unit, [> R.msg ]) result
val destroy : Unikernel.t -> unit

View file

@ -64,23 +64,17 @@ let init wire_version =
in
{ t with resources }
type service_out = [
| `Stat of Vmm_commands.wire
| `Log of Vmm_commands.wire
| `Cons of Vmm_commands.wire
]
type out = [ service_out | `Data of Vmm_commands.wire ]
type 'a create =
'a t -> ('a t * out list * Name.t * Unikernel.t, [ `Msg of string ]) result
Vmm_commands.wire *
('a t -> ('a t * Vmm_commands.wire * Vmm_commands.wire * Vmm_commands.wire * Name.t * Unikernel.t, [ `Msg of string ]) result) *
(unit -> Vmm_commands.wire)
let log t name event =
let data = (Ptime_clock.now (), event) in
let header = Vmm_commands.{ version = t.wire_version ; sequence = t.log_counter ; name } in
let log_counter = Int64.succ t.log_counter in
Logs.debug (fun m -> m "log %a" Log.pp data) ;
({ t with log_counter }, `Log (header, `Data (`Log_data data)))
({ t with log_counter }, (header, `Data (`Log_data data)))
let restore_unikernels () =
match Vmm_unix.restore () with
@ -117,14 +111,14 @@ let setup_stats t name vm =
in
let header = Vmm_commands.{ version = t.wire_version ; sequence = t.stats_counter ; name } in
let t = { t with stats_counter = Int64.succ t.stats_counter } in
t, `Stat (header, `Command (`Stats_cmd stat_out))
t, (header, `Command (`Stats_cmd stat_out))
let remove_stats t name =
let header = Vmm_commands.{ version = t.wire_version ; sequence = t.stats_counter ; name } in
let t = { t with stats_counter = Int64.succ t.stats_counter } in
(t, `Stat (header, `Command (`Stats_cmd `Stats_remove)))
(t, (header, `Command (`Stats_cmd `Stats_remove)))
let handle_create t reply name vm_config =
let handle_create t hdr name vm_config =
(match Vmm_resources.find_vm t.resources name with
| Some _ -> Error (`Msg "VM with same name is already running")
| None -> Ok ()) >>= fun () ->
@ -137,9 +131,7 @@ let handle_create t reply name vm_config =
let header = Vmm_commands.{ version = t.wire_version ; sequence = t.console_counter ; name } in
(header, `Command (`Console_cmd `Console_add))
in
Ok ({ t with console_counter = Int64.succ t.console_counter },
[ `Cons cons_out ],
`Create (fun t ->
let success t =
(* actually execute the vm *)
let block_device = match vm_config.Unikernel.block_device with
| None -> None
@ -150,9 +142,18 @@ let handle_create t reply name vm_config =
Vmm_resources.insert_vm t.resources name vm >>= fun resources ->
let t = { t with resources } in
dump_unikernels t ;
let t, out = log t name (`Unikernel_start (name, vm.Unikernel.pid, vm.Unikernel.taps, None)) in
let t, log_out = log t name (`Unikernel_start (name, vm.Unikernel.pid, vm.Unikernel.taps, None)) in
let t, stat_out = setup_stats t name vm in
Ok (t, stat_out :: out :: reply, name, vm)))
Ok (t, stat_out, log_out, (hdr, `Success (`String "created VM")), name, vm)
and fail () =
match Vmm_unix.free_resources name taps with
| Ok () -> (hdr, `Failure "could not create VM: console failed")
| Error (`Msg msg) ->
let m = "could not create VM: console failed, and also " ^ msg ^ " while cleaning resources" in
(hdr, `Failure m)
in
Ok ({ t with console_counter = Int64.succ t.console_counter },
`Create (cons_out, success, fail))
let handle_shutdown t name vm r =
(match Vmm_unix.shutdown name vm with
@ -166,15 +167,15 @@ let handle_shutdown t name vm r =
in
let t = { t with resources } in
if not !in_shutdown then dump_unikernels t ;
let t, logout = log t name (`Unikernel_stop (name, vm.Unikernel.pid, r)) in
let t, log_out = log t name (`Unikernel_stop (name, vm.Unikernel.pid, r)) in
let t, stat_out = remove_stats t name in
(t, [ stat_out ; logout ])
(t, stat_out, log_out)
let handle_policy_cmd t reply id = function
| `Policy_remove ->
Logs.debug (fun m -> m "remove policy %a" Name.pp id) ;
Vmm_resources.remove_policy t.resources id >>= fun resources ->
Ok ({ t with resources }, [ reply (`String "removed policy") ], `End)
Ok ({ t with resources }, `End (reply (`String "removed policy")))
| `Policy_add policy ->
Logs.debug (fun m -> m "insert policy %a" Name.pp id) ;
let same_policy = match Vmm_resources.find_policy t.resources id with
@ -182,10 +183,10 @@ let handle_policy_cmd t reply id = function
| Some p' -> Policy.equal policy p'
in
if same_policy then
Ok (t, [ reply (`String "no modification of policy") ], `Loop)
Ok (t, `Loop (reply (`String "no modification of policy")))
else
Vmm_resources.insert_policy t.resources id policy >>= fun resources ->
Ok ({ t with resources }, [ reply (`String "added policy") ], `Loop)
Ok ({ t with resources }, `Loop (reply (`String "added policy")))
| `Policy_info ->
Logs.debug (fun m -> m "policy %a" Name.pp id) ;
let policies =
@ -198,9 +199,9 @@ let handle_policy_cmd t reply id = function
Logs.debug (fun m -> m "policies: couldn't find %a" Name.pp id) ;
Error (`Msg "policy: not found")
| _ ->
Ok (t, [ reply (`Policies policies) ], `End)
Ok (t, `End (reply (`Policies policies)))
let handle_unikernel_cmd t reply id msg_to_err = function
let handle_unikernel_cmd t reply header id msg_to_err = function
| `Unikernel_info ->
Logs.debug (fun m -> m "info %a" Name.pp id) ;
let vms =
@ -213,11 +214,9 @@ let handle_unikernel_cmd t reply id msg_to_err = function
Logs.debug (fun m -> m "info: couldn't find %a" Name.pp id) ;
Error (`Msg "info: no unikernel found")
| _ ->
Ok (t, [ reply (`Unikernels vms) ], `End)
Ok (t, `End (reply (`Unikernels vms)))
end
| `Unikernel_create vm_config ->
let success = reply (`String "created VM") in
handle_create t [ success ] id vm_config
| `Unikernel_create vm_config -> handle_create t header id vm_config
| `Unikernel_force_create vm_config ->
begin
let resources =
@ -227,21 +226,18 @@ let handle_unikernel_cmd t reply id msg_to_err = function
in
Vmm_resources.check_vm resources id vm_config >>= fun () ->
match Vmm_resources.find_vm t.resources id with
| None ->
let success = reply (`String "created VM (didn't exist before)") in
handle_create t [ success ] id vm_config
| None -> handle_create t header id vm_config
| Some vm ->
Vmm_unix.destroy vm ;
let success = reply (`String "destroyed and created VM") in
Ok (t, [], `Wait_and_create
(id, fun t -> msg_to_err @@ handle_create t [ success ] id vm_config))
Ok (t, `Wait_and_create
(id, fun t -> msg_to_err @@ handle_create t header id vm_config))
end
| `Unikernel_destroy ->
match Vmm_resources.find_vm t.resources id with
| Some vm ->
Vmm_unix.destroy vm ;
let s = reply (`String "destroyed unikernel") in
Ok (t, [], `Wait (id, s))
Ok (t, `Wait (id, s))
| None -> Error (`Msg "destroy: not found")
let handle_block_cmd t reply id = function
@ -253,7 +249,7 @@ let handle_block_cmd t reply id = function
| Some (_, false) ->
Vmm_unix.destroy_block id >>= fun () ->
Vmm_resources.remove_block t.resources id >>= fun resources ->
Ok ({ t with resources }, [ reply (`String "removed block") ], `End)
Ok ({ t with resources }, `End (reply (`String "removed block")))
end
| `Block_add size ->
begin
@ -264,7 +260,7 @@ let handle_block_cmd t reply id = function
Vmm_resources.check_block t.resources id size >>= fun () ->
Vmm_unix.create_block id size >>= fun () ->
Vmm_resources.insert_block t.resources id size >>= fun resources ->
Ok ({ t with resources }, [ reply (`String "added block device") ], `Loop)
Ok ({ t with resources }, `Loop (reply (`String "added block device")))
end
| `Block_info ->
Logs.debug (fun m -> m "block %a" Name.pp id) ;
@ -278,21 +274,21 @@ let handle_block_cmd t reply id = function
Logs.debug (fun m -> m "block: couldn't find %a" Name.pp id) ;
Error (`Msg "block: not found")
| _ ->
Ok (t, [ reply (`Block_devices blocks) ], `End)
Ok (t, `End (reply (`Block_devices blocks)))
let handle_command t (header, payload) =
let msg_to_err = function
| Ok x -> x
| Ok x -> Ok x
| Error (`Msg msg) ->
Logs.err (fun m -> m "error while processing command: %s" msg) ;
(t, [ `Data (header, `Failure msg) ], `End)
and reply x = `Data (header, `Success x)
Error (header, `Failure msg)
and reply x = (header, `Success x)
and id = header.Vmm_commands.name
in
msg_to_err (
match payload with
| `Command (`Policy_cmd pc) -> handle_policy_cmd t reply id pc
| `Command (`Unikernel_cmd vc) -> handle_unikernel_cmd t reply id msg_to_err vc
| `Command (`Unikernel_cmd vc) -> handle_unikernel_cmd t reply header id msg_to_err vc
| `Command (`Block_cmd bc) -> handle_block_cmd t reply id bc
| _ ->
Logs.err (fun m -> m "ignoring %a" Vmm_commands.pp_wire (header, payload)) ;

View file

@ -10,31 +10,26 @@ val waiter : 'a t -> Name.t -> 'a t * 'a option
val register : 'a t -> Name.t -> (unit -> 'b * 'a) -> ('a t * 'b) option
type service_out = [
| `Stat of Vmm_commands.wire
| `Log of Vmm_commands.wire
| `Cons of Vmm_commands.wire
]
type out = [ service_out | `Data of Vmm_commands.wire ]
type 'a create =
'a t -> ('a t * out list * Name.t * Unikernel.t, [ `Msg of string ]) result
Vmm_commands.wire *
('a t -> ('a t * Vmm_commands.wire * Vmm_commands.wire * Vmm_commands.wire * Name.t * Unikernel.t, [ `Msg of string ]) result) *
(unit -> Vmm_commands.wire)
val handle_shutdown : 'a t -> Name.t -> Unikernel.t ->
[ `Exit of int | `Signal of int | `Stop of int ] -> 'a t * out list
[ `Exit of int | `Signal of int | `Stop of int ] -> 'a t * Vmm_commands.wire * Vmm_commands.wire
val handle_create : 'a t -> out list ->
val handle_create : 'a t -> Vmm_commands.header ->
Name.t -> Unikernel.config ->
('a t * out list * [ `Create of 'a create ], [> `Msg of string ]) result
('a t * [ `Create of 'a create ], [> `Msg of string ]) result
val handle_command : 'a t -> Vmm_commands.wire ->
'a t * out list *
('a t *
[ `Create of 'a create
| `Loop
| `End
| `Wait of Name.t * out
| `Wait_and_create of Name.t * ('a t -> 'a t * out list * [ `Create of 'a create | `End ]) ]
| `Loop of Vmm_commands.wire
| `End of Vmm_commands.wire
| `Wait of Name.t * Vmm_commands.wire
| `Wait_and_create of Name.t * ('a t -> ('a t * [ `Create of 'a create ], Vmm_commands.wire) result) ],
Vmm_commands.wire) result
val killall : 'a t -> bool