vmmd: store waiter, not tasks in Vmm_vmmd.t -- create task and waiter on demand (destroy / create --force), instead of at each unikernel creation
This commit is contained in:
parent
fc63a89429
commit
fdcea94a0b
50
app/vmmd.ml
50
app/vmmd.ml
|
@ -25,26 +25,34 @@ let version = `AV3
|
||||||
let state = ref (Vmm_vmmd.init version)
|
let state = ref (Vmm_vmmd.init version)
|
||||||
|
|
||||||
let create process cont =
|
let create process cont =
|
||||||
let await, wakeme = Lwt.wait () in
|
match cont !state with
|
||||||
match cont !state await with
|
|
||||||
| Error (`Msg msg) ->
|
| Error (`Msg msg) ->
|
||||||
Logs.err (fun m -> m "create continuation failed %s" msg) ;
|
Logs.err (fun m -> m "create continuation failed %s" msg) ;
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
| Ok (state'', out, name, vm) ->
|
| Ok (state', out, name, vm) ->
|
||||||
state := state'' ;
|
state := state' ;
|
||||||
s := { !s with vm_created = succ !s.vm_created } ;
|
s := { !s with vm_created = succ !s.vm_created } ;
|
||||||
Lwt.async (fun () ->
|
Lwt.async (fun () ->
|
||||||
Vmm_lwt.wait_and_clear vm.Unikernel.pid >>= fun r ->
|
Vmm_lwt.wait_and_clear vm.Unikernel.pid >>= fun r ->
|
||||||
let state', out' = Vmm_vmmd.handle_shutdown !state name vm r in
|
let state', out' = Vmm_vmmd.handle_shutdown !state name vm r in
|
||||||
s := { !s with vm_destroyed = succ !s.vm_destroyed } ;
|
|
||||||
state := state' ;
|
state := state' ;
|
||||||
(process "handle_shutdown" out' >|= fun _ -> ()) >|= fun () ->
|
s := { !s with vm_destroyed = succ !s.vm_destroyed } ;
|
||||||
Lwt.wakeup wakeme ()) ;
|
(process "handle shutdown" out' >|= fun _ -> ()) >|= fun () ->
|
||||||
|
let state', waiter_opt = Vmm_vmmd.waiter !state name in
|
||||||
|
state := state' ;
|
||||||
|
(match waiter_opt with
|
||||||
|
| None -> ()
|
||||||
|
| Some wakeme -> Lwt.wakeup wakeme ())) ;
|
||||||
(process "setting up console" out >|= fun _ -> ()) >>= fun () ->
|
(process "setting up console" out >|= fun _ -> ()) >>= fun () ->
|
||||||
let state', out = Vmm_vmmd.setup_stats !state name vm in
|
let state', out = Vmm_vmmd.setup_stats !state name vm in
|
||||||
state := state' ;
|
state := state' ;
|
||||||
process "setting up statistics" [ out ] >|= fun _ -> ()
|
process "setting up statistics" [ out ] >|= fun _ -> ()
|
||||||
|
|
||||||
|
let register who header =
|
||||||
|
match Vmm_vmmd.register !state who Lwt.task with
|
||||||
|
| None -> Error (`Data (header, `Failure "task already registered"))
|
||||||
|
| Some (state', task) -> state := state' ; Ok task
|
||||||
|
|
||||||
let handle out fd addr =
|
let handle out fd addr =
|
||||||
Logs.debug (fun m -> m "connection from %a" Vmm_lwt.pp_sockaddr addr) ;
|
Logs.debug (fun m -> m "connection from %a" Vmm_lwt.pp_sockaddr addr) ;
|
||||||
(* now we need to read a packet and handle it
|
(* now we need to read a packet and handle it
|
||||||
|
@ -90,17 +98,23 @@ let handle out fd addr =
|
||||||
| `Loop -> loop ()
|
| `Loop -> loop ()
|
||||||
| `End -> Lwt.return_unit
|
| `End -> Lwt.return_unit
|
||||||
| `Create cont -> create process cont
|
| `Create cont -> create process cont
|
||||||
| `Wait (task, out) ->
|
| `Wait (who, out) ->
|
||||||
task >>= fun () ->
|
(match register who (fst wire) with
|
||||||
process "wait" [ out ] >|= ignore
|
| Error out' -> process "wait" [ out' ] >|= ignore
|
||||||
| `Wait_and_create (task, next) ->
|
| Ok task ->
|
||||||
task >>= fun () ->
|
task >>= fun () ->
|
||||||
let state', data, n = next !state in
|
process "wait" [ out ] >|= ignore)
|
||||||
state := state' ;
|
| `Wait_and_create (who, next) ->
|
||||||
process "wait and create" data >>= fun _ ->
|
(match register who (fst wire) with
|
||||||
match n with
|
| Error out' -> process "wait and create" [ out' ] >|= ignore
|
||||||
| `End -> Lwt.return_unit
|
| Ok task ->
|
||||||
| `Create cont -> create process cont >|= ignore
|
task >>= fun () ->
|
||||||
|
let state', data, n = next !state in
|
||||||
|
state := state' ;
|
||||||
|
process "wait and create" data >>= fun _ ->
|
||||||
|
match n with
|
||||||
|
| `End -> Lwt.return_unit
|
||||||
|
| `Create cont -> create process cont >|= ignore)
|
||||||
in
|
in
|
||||||
loop () >>= fun () ->
|
loop () >>= fun () ->
|
||||||
Vmm_lwt.safe_close fd
|
Vmm_lwt.safe_close fd
|
||||||
|
|
|
@ -13,13 +13,29 @@ type 'a t = {
|
||||||
stats_counter : int64 ;
|
stats_counter : int64 ;
|
||||||
log_counter : int64 ;
|
log_counter : int64 ;
|
||||||
resources : Vmm_resources.t ;
|
resources : Vmm_resources.t ;
|
||||||
tasks : 'a String.Map.t ;
|
waiters : 'a String.Map.t ;
|
||||||
}
|
}
|
||||||
|
|
||||||
let kill t =
|
let kill t =
|
||||||
List.iter Vmm_unix.destroy
|
List.iter Vmm_unix.destroy
|
||||||
(List.map snd (Vmm_trie.all t.resources.Vmm_resources.unikernels))
|
(List.map snd (Vmm_trie.all t.resources.Vmm_resources.unikernels))
|
||||||
|
|
||||||
|
let waiter t id =
|
||||||
|
let name = Name.to_string id in
|
||||||
|
match String.Map.find name t.waiters with
|
||||||
|
| None -> t, None
|
||||||
|
| Some waiter ->
|
||||||
|
let waiters = String.Map.remove name t.waiters in
|
||||||
|
{ t with waiters }, Some waiter
|
||||||
|
|
||||||
|
let register t id create =
|
||||||
|
let name = Name.to_string id in
|
||||||
|
match String.Map.find name t.waiters with
|
||||||
|
| None ->
|
||||||
|
let task, waiter = create () in
|
||||||
|
Some ({ t with waiters = String.Map.add name waiter t.waiters }, task)
|
||||||
|
| Some _ -> None
|
||||||
|
|
||||||
let init wire_version =
|
let init wire_version =
|
||||||
let t = {
|
let t = {
|
||||||
wire_version ;
|
wire_version ;
|
||||||
|
@ -27,7 +43,7 @@ let init wire_version =
|
||||||
stats_counter = 1L ;
|
stats_counter = 1L ;
|
||||||
log_counter = 1L ;
|
log_counter = 1L ;
|
||||||
resources = Vmm_resources.empty ;
|
resources = Vmm_resources.empty ;
|
||||||
tasks = String.Map.empty ;
|
waiters = String.Map.empty ;
|
||||||
} in
|
} in
|
||||||
match Vmm_unix.find_block_devices () with
|
match Vmm_unix.find_block_devices () with
|
||||||
| Error (`Msg msg) ->
|
| Error (`Msg msg) ->
|
||||||
|
@ -75,7 +91,7 @@ let handle_create t reply name vm_config =
|
||||||
in
|
in
|
||||||
Ok ({ t with console_counter = Int64.succ t.console_counter },
|
Ok ({ t with console_counter = Int64.succ t.console_counter },
|
||||||
[ `Cons cons_out ],
|
[ `Cons cons_out ],
|
||||||
`Create (fun t task ->
|
`Create (fun t ->
|
||||||
(* actually execute the vm *)
|
(* actually execute the vm *)
|
||||||
let block_device = match vm_config.Unikernel.block_device with
|
let block_device = match vm_config.Unikernel.block_device with
|
||||||
| None -> None
|
| None -> None
|
||||||
|
@ -84,8 +100,7 @@ let handle_create t reply name vm_config =
|
||||||
Vmm_unix.exec name vm_config taps block_device >>= fun vm ->
|
Vmm_unix.exec name vm_config taps block_device >>= fun vm ->
|
||||||
Logs.debug (fun m -> m "exec()ed vm") ;
|
Logs.debug (fun m -> m "exec()ed vm") ;
|
||||||
Vmm_resources.insert_vm t.resources name vm >>= fun resources ->
|
Vmm_resources.insert_vm t.resources name vm >>= fun resources ->
|
||||||
let tasks = String.Map.add (Name.to_string name) task t.tasks in
|
let t = { t with resources } in
|
||||||
let t = { t with resources ; tasks } in
|
|
||||||
let t, out = log t name (`Unikernel_start (name, vm.Unikernel.pid, vm.Unikernel.taps, None)) in
|
let t, out = log t name (`Unikernel_start (name, vm.Unikernel.pid, vm.Unikernel.taps, None)) in
|
||||||
Ok (t, [ reply (`String "created VM") ; out ], name, vm)))
|
Ok (t, [ reply (`String "created VM") ; out ], name, vm)))
|
||||||
|
|
||||||
|
@ -112,8 +127,7 @@ let handle_shutdown t name vm r =
|
||||||
| Ok resources -> resources
|
| Ok resources -> resources
|
||||||
in
|
in
|
||||||
let header = Vmm_commands.{ version = t.wire_version ; sequence = t.stats_counter ; name } in
|
let header = Vmm_commands.{ version = t.wire_version ; sequence = t.stats_counter ; name } in
|
||||||
let tasks = String.Map.remove (Name.to_string name) t.tasks in
|
let t = { t with stats_counter = Int64.succ t.stats_counter ; resources } in
|
||||||
let t = { t with stats_counter = Int64.succ t.stats_counter ; resources ; tasks } in
|
|
||||||
let t, logout = log t name (`Unikernel_stop (name, vm.Unikernel.pid, r))
|
let t, logout = log t name (`Unikernel_stop (name, vm.Unikernel.pid, r))
|
||||||
in
|
in
|
||||||
(t, [ `Stat (header, `Command (`Stats_cmd `Stats_remove)) ; logout ])
|
(t, [ `Stat (header, `Command (`Stats_cmd `Stats_remove)) ; logout ])
|
||||||
|
@ -176,28 +190,15 @@ let handle_unikernel_cmd t reply id msg_to_err = function
|
||||||
| None -> handle_create t reply id vm_config
|
| None -> handle_create t reply id vm_config
|
||||||
| Some vm ->
|
| Some vm ->
|
||||||
Vmm_unix.destroy vm ;
|
Vmm_unix.destroy vm ;
|
||||||
let id_str = Name.to_string id in
|
Ok (t, [], `Wait_and_create
|
||||||
match String.Map.find_opt id_str t.tasks with
|
(id, fun t -> msg_to_err @@ handle_create t reply id vm_config))
|
||||||
| None -> handle_create t reply id vm_config
|
|
||||||
| Some task ->
|
|
||||||
let tasks = String.Map.remove id_str t.tasks in
|
|
||||||
let t = { t with tasks } in
|
|
||||||
Ok (t, [], `Wait_and_create
|
|
||||||
(task, fun t -> msg_to_err @@ handle_create t reply id vm_config))
|
|
||||||
end
|
end
|
||||||
| `Unikernel_destroy ->
|
| `Unikernel_destroy ->
|
||||||
match Vmm_resources.find_vm t.resources id with
|
match Vmm_resources.find_vm t.resources id with
|
||||||
| Some vm ->
|
| Some vm ->
|
||||||
Vmm_unix.destroy vm ;
|
Vmm_unix.destroy vm ;
|
||||||
let id_str = Name.to_string id in
|
let s = reply (`String "destroyed unikernel") in
|
||||||
let out, next =
|
Ok (t, [], `Wait (id, s))
|
||||||
let s = reply (`String "destroyed unikernel") in
|
|
||||||
match String.Map.find_opt id_str t.tasks with
|
|
||||||
| None -> [ s ], `End
|
|
||||||
| Some t -> [], `Wait (t, s)
|
|
||||||
in
|
|
||||||
let tasks = String.Map.remove id_str t.tasks in
|
|
||||||
Ok ({ t with tasks }, out, next)
|
|
||||||
| None -> Error (`Msg "destroy: not found")
|
| None -> Error (`Msg "destroy: not found")
|
||||||
|
|
||||||
let handle_block_cmd t reply id = function
|
let handle_block_cmd t reply id = function
|
||||||
|
|
|
@ -6,6 +6,10 @@ type 'a t
|
||||||
|
|
||||||
val init : Vmm_commands.version -> 'a t
|
val init : Vmm_commands.version -> 'a t
|
||||||
|
|
||||||
|
val waiter : 'a t -> Name.t -> 'a t * 'a option
|
||||||
|
|
||||||
|
val register : 'a t -> Name.t -> (unit -> 'b * 'a) -> ('a t * 'b) option
|
||||||
|
|
||||||
type service_out = [
|
type service_out = [
|
||||||
| `Stat of Vmm_commands.wire
|
| `Stat of Vmm_commands.wire
|
||||||
| `Log of Vmm_commands.wire
|
| `Log of Vmm_commands.wire
|
||||||
|
@ -19,12 +23,12 @@ val handle_shutdown : 'a t -> Name.t -> Unikernel.t ->
|
||||||
|
|
||||||
val handle_command : 'a t -> Vmm_commands.wire ->
|
val handle_command : 'a t -> Vmm_commands.wire ->
|
||||||
'a t * out list *
|
'a t * out list *
|
||||||
[ `Create of 'c t -> 'c -> ('c t * out list * Name.t * Unikernel.t, [> `Msg of string ]) result
|
[ `Create of 'a t -> ('a t * out list * Name.t * Unikernel.t, [> `Msg of string ]) result
|
||||||
| `Loop
|
| `Loop
|
||||||
| `End
|
| `End
|
||||||
| `Wait of 'a * out
|
| `Wait of Name.t * out
|
||||||
| `Wait_and_create of 'a * ('a t -> 'a t * out list *
|
| `Wait_and_create of Name.t * ('a t -> 'a t * out list *
|
||||||
[ `Create of 'd t -> 'd -> ('d t * out list * Name.t * Unikernel.t, [> Rresult.R.msg ]) result
|
[ `Create of 'a t -> ('a t * out list * Name.t * Unikernel.t, [> Rresult.R.msg ]) result
|
||||||
| `End ]) ]
|
| `End ]) ]
|
||||||
|
|
||||||
val setup_stats : 'a t -> Name.t -> Unikernel.t -> 'a t * out
|
val setup_stats : 'a t -> Name.t -> Unikernel.t -> 'a t * out
|
||||||
|
|
Loading…
Reference in a new issue