diff --git a/app/vmmd.ml b/app/vmmd.ml index 7f6bd57..7dd01ca 100644 --- a/app/vmmd.ml +++ b/app/vmmd.ml @@ -25,26 +25,34 @@ let version = `AV3 let state = ref (Vmm_vmmd.init version) let create process cont = - let await, wakeme = Lwt.wait () in - match cont !state await with + match cont !state with | Error (`Msg msg) -> Logs.err (fun m -> m "create continuation failed %s" msg) ; Lwt.return_unit - | Ok (state'', out, name, vm) -> - state := state'' ; + | Ok (state', out, name, vm) -> + state := state' ; s := { !s with vm_created = succ !s.vm_created } ; Lwt.async (fun () -> Vmm_lwt.wait_and_clear vm.Unikernel.pid >>= fun r -> let state', out' = Vmm_vmmd.handle_shutdown !state name vm r in - s := { !s with vm_destroyed = succ !s.vm_destroyed } ; state := state' ; - (process "handle_shutdown" out' >|= fun _ -> ()) >|= fun () -> - Lwt.wakeup wakeme ()) ; + s := { !s with vm_destroyed = succ !s.vm_destroyed } ; + (process "handle shutdown" out' >|= fun _ -> ()) >|= fun () -> + let state', waiter_opt = Vmm_vmmd.waiter !state name in + state := state' ; + (match waiter_opt with + | None -> () + | Some wakeme -> Lwt.wakeup wakeme ())) ; (process "setting up console" out >|= fun _ -> ()) >>= fun () -> let state', out = Vmm_vmmd.setup_stats !state name vm in state := state' ; process "setting up statistics" [ out ] >|= fun _ -> () +let register who header = + match Vmm_vmmd.register !state who Lwt.task with + | None -> Error (`Data (header, `Failure "task already registered")) + | Some (state', task) -> state := state' ; Ok task + let handle out fd addr = Logs.debug (fun m -> m "connection from %a" Vmm_lwt.pp_sockaddr addr) ; (* now we need to read a packet and handle it @@ -90,17 +98,23 @@ let handle out fd addr = | `Loop -> loop () | `End -> Lwt.return_unit | `Create cont -> create process cont - | `Wait (task, out) -> - task >>= fun () -> - process "wait" [ out ] >|= ignore - | `Wait_and_create (task, next) -> - task >>= fun () -> - let state', data, n = next !state in - state := state' ; - process "wait and create" data >>= fun _ -> - match n with - | `End -> Lwt.return_unit - | `Create cont -> create process cont >|= ignore + | `Wait (who, out) -> + (match register who (fst wire) with + | Error out' -> process "wait" [ out' ] >|= ignore + | Ok task -> + task >>= fun () -> + process "wait" [ out ] >|= ignore) + | `Wait_and_create (who, next) -> + (match register who (fst wire) with + | Error out' -> process "wait and create" [ out' ] >|= ignore + | Ok task -> + task >>= fun () -> + let state', data, n = next !state in + state := state' ; + process "wait and create" data >>= fun _ -> + match n with + | `End -> Lwt.return_unit + | `Create cont -> create process cont >|= ignore) in loop () >>= fun () -> Vmm_lwt.safe_close fd diff --git a/src/vmm_vmmd.ml b/src/vmm_vmmd.ml index 22d54a6..08338fa 100644 --- a/src/vmm_vmmd.ml +++ b/src/vmm_vmmd.ml @@ -13,13 +13,29 @@ type 'a t = { stats_counter : int64 ; log_counter : int64 ; resources : Vmm_resources.t ; - tasks : 'a String.Map.t ; + waiters : 'a String.Map.t ; } let kill t = List.iter Vmm_unix.destroy (List.map snd (Vmm_trie.all t.resources.Vmm_resources.unikernels)) +let waiter t id = + let name = Name.to_string id in + match String.Map.find name t.waiters with + | None -> t, None + | Some waiter -> + let waiters = String.Map.remove name t.waiters in + { t with waiters }, Some waiter + +let register t id create = + let name = Name.to_string id in + match String.Map.find name t.waiters with + | None -> + let task, waiter = create () in + Some ({ t with waiters = String.Map.add name waiter t.waiters }, task) + | Some _ -> None + let init wire_version = let t = { wire_version ; @@ -27,7 +43,7 @@ let init wire_version = stats_counter = 1L ; log_counter = 1L ; resources = Vmm_resources.empty ; - tasks = String.Map.empty ; + waiters = String.Map.empty ; } in match Vmm_unix.find_block_devices () with | Error (`Msg msg) -> @@ -75,7 +91,7 @@ let handle_create t reply name vm_config = in Ok ({ t with console_counter = Int64.succ t.console_counter }, [ `Cons cons_out ], - `Create (fun t task -> + `Create (fun t -> (* actually execute the vm *) let block_device = match vm_config.Unikernel.block_device with | None -> None @@ -84,8 +100,7 @@ let handle_create t reply name vm_config = Vmm_unix.exec name vm_config taps block_device >>= fun vm -> Logs.debug (fun m -> m "exec()ed vm") ; Vmm_resources.insert_vm t.resources name vm >>= fun resources -> - let tasks = String.Map.add (Name.to_string name) task t.tasks in - let t = { t with resources ; tasks } in + let t = { t with resources } in let t, out = log t name (`Unikernel_start (name, vm.Unikernel.pid, vm.Unikernel.taps, None)) in Ok (t, [ reply (`String "created VM") ; out ], name, vm))) @@ -112,8 +127,7 @@ let handle_shutdown t name vm r = | Ok resources -> resources in let header = Vmm_commands.{ version = t.wire_version ; sequence = t.stats_counter ; name } in - let tasks = String.Map.remove (Name.to_string name) t.tasks in - let t = { t with stats_counter = Int64.succ t.stats_counter ; resources ; tasks } in + let t = { t with stats_counter = Int64.succ t.stats_counter ; resources } in let t, logout = log t name (`Unikernel_stop (name, vm.Unikernel.pid, r)) in (t, [ `Stat (header, `Command (`Stats_cmd `Stats_remove)) ; logout ]) @@ -176,28 +190,15 @@ let handle_unikernel_cmd t reply id msg_to_err = function | None -> handle_create t reply id vm_config | Some vm -> Vmm_unix.destroy vm ; - let id_str = Name.to_string id in - match String.Map.find_opt id_str t.tasks with - | None -> handle_create t reply id vm_config - | Some task -> - let tasks = String.Map.remove id_str t.tasks in - let t = { t with tasks } in - Ok (t, [], `Wait_and_create - (task, fun t -> msg_to_err @@ handle_create t reply id vm_config)) + Ok (t, [], `Wait_and_create + (id, fun t -> msg_to_err @@ handle_create t reply id vm_config)) end | `Unikernel_destroy -> match Vmm_resources.find_vm t.resources id with | Some vm -> Vmm_unix.destroy vm ; - let id_str = Name.to_string id in - let out, next = - let s = reply (`String "destroyed unikernel") in - match String.Map.find_opt id_str t.tasks with - | None -> [ s ], `End - | Some t -> [], `Wait (t, s) - in - let tasks = String.Map.remove id_str t.tasks in - Ok ({ t with tasks }, out, next) + let s = reply (`String "destroyed unikernel") in + Ok (t, [], `Wait (id, s)) | None -> Error (`Msg "destroy: not found") let handle_block_cmd t reply id = function diff --git a/src/vmm_vmmd.mli b/src/vmm_vmmd.mli index f14d9cb..ef67772 100644 --- a/src/vmm_vmmd.mli +++ b/src/vmm_vmmd.mli @@ -6,6 +6,10 @@ type 'a t val init : Vmm_commands.version -> 'a t +val waiter : 'a t -> Name.t -> 'a t * 'a option + +val register : 'a t -> Name.t -> (unit -> 'b * 'a) -> ('a t * 'b) option + type service_out = [ | `Stat of Vmm_commands.wire | `Log of Vmm_commands.wire @@ -19,12 +23,12 @@ val handle_shutdown : 'a t -> Name.t -> Unikernel.t -> val handle_command : 'a t -> Vmm_commands.wire -> 'a t * out list * - [ `Create of 'c t -> 'c -> ('c t * out list * Name.t * Unikernel.t, [> `Msg of string ]) result + [ `Create of 'a t -> ('a t * out list * Name.t * Unikernel.t, [> `Msg of string ]) result | `Loop | `End - | `Wait of 'a * out - | `Wait_and_create of 'a * ('a t -> 'a t * out list * - [ `Create of 'd t -> 'd -> ('d t * out list * Name.t * Unikernel.t, [> Rresult.R.msg ]) result + | `Wait of Name.t * out + | `Wait_and_create of Name.t * ('a t -> 'a t * out list * + [ `Create of 'a t -> ('a t * out list * Name.t * Unikernel.t, [> Rresult.R.msg ]) result | `End ]) ] val setup_stats : 'a t -> Name.t -> Unikernel.t -> 'a t * out