vmmd: revise process, write_reply, and out signatures, move process to jump
This commit is contained in:
parent
d4e31da27f
commit
d84013103c
105
app/vmmd.ml
105
app/vmmd.ml
|
@ -37,20 +37,20 @@ let create process cont =
|
||||||
let state', out' = Vmm_vmmd.handle_shutdown !state name vm r in
|
let state', out' = Vmm_vmmd.handle_shutdown !state name vm r in
|
||||||
state := state' ;
|
state := state' ;
|
||||||
s := { !s with vm_destroyed = succ !s.vm_destroyed } ;
|
s := { !s with vm_destroyed = succ !s.vm_destroyed } ;
|
||||||
(process "handle shutdown" out' >|= fun _ -> ()) >|= fun () ->
|
process "handle shutdown (stat, log)" out' >|= fun () ->
|
||||||
let state', waiter_opt = Vmm_vmmd.waiter !state name in
|
let state', waiter_opt = Vmm_vmmd.waiter !state name in
|
||||||
state := state' ;
|
state := state' ;
|
||||||
(match waiter_opt with
|
(match waiter_opt with
|
||||||
| None -> ()
|
| None -> ()
|
||||||
| Some wakeme -> Lwt.wakeup wakeme ())) ;
|
| Some wakeme -> Lwt.wakeup wakeme ())) ;
|
||||||
(process "setting up statistics, log, reply" out >|= fun _ -> ())
|
process "setting up stat, log, reply" out
|
||||||
|
|
||||||
let register who header =
|
let register who header =
|
||||||
match Vmm_vmmd.register !state who Lwt.task with
|
match Vmm_vmmd.register !state who Lwt.task with
|
||||||
| None -> Error (`Data (header, `Failure "task already registered"))
|
| None -> Error (`Data (header, `Failure "task already registered"))
|
||||||
| Some (state', task) -> state := state' ; Ok task
|
| Some (state', task) -> state := state' ; Ok task
|
||||||
|
|
||||||
let handle out fd addr =
|
let handle process fd addr =
|
||||||
Logs.debug (fun m -> m "connection from %a" Vmm_lwt.pp_sockaddr addr) ;
|
Logs.debug (fun m -> m "connection from %a" Vmm_lwt.pp_sockaddr addr) ;
|
||||||
(* now we need to read a packet and handle it
|
(* now we need to read a packet and handle it
|
||||||
(1)
|
(1)
|
||||||
|
@ -64,21 +64,6 @@ let handle out fd addr =
|
||||||
-- Lwt effects happen (stats, logs, wait_and_clear) --
|
-- Lwt effects happen (stats, logs, wait_and_clear) --
|
||||||
(2) goto (1)
|
(2) goto (1)
|
||||||
*)
|
*)
|
||||||
let process txt wires =
|
|
||||||
Lwt_list.fold_left_s (fun r data ->
|
|
||||||
match r, data with
|
|
||||||
| Ok (), (#Vmm_vmmd.service_out as o) -> out o
|
|
||||||
| Ok (), `Data wire ->
|
|
||||||
(* rather: terminate connection *)
|
|
||||||
Vmm_lwt.write_wire fd wire >|= fun _ ->
|
|
||||||
Ok ()
|
|
||||||
| Error e, _ -> Lwt.return (Error e))
|
|
||||||
(Ok ()) wires >|= function
|
|
||||||
| Ok () -> Ok ()
|
|
||||||
| Error (`Msg msg) ->
|
|
||||||
Logs.err (fun m -> m "error in processing data %s: %s" txt msg) ;
|
|
||||||
Error ()
|
|
||||||
in
|
|
||||||
let rec loop () =
|
let rec loop () =
|
||||||
Logs.debug (fun m -> m "now reading") ;
|
Logs.debug (fun m -> m "now reading") ;
|
||||||
Vmm_lwt.read_wire fd >>= function
|
Vmm_lwt.read_wire fd >>= function
|
||||||
|
@ -86,32 +71,31 @@ let handle out fd addr =
|
||||||
Logs.err (fun m -> m "error while reading") ;
|
Logs.err (fun m -> m "error while reading") ;
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
| Ok wire ->
|
| Ok wire ->
|
||||||
Logs.debug (fun m -> m "read sth") ;
|
Logs.debug (fun m -> m "read %a" Vmm_commands.pp_wire wire) ;
|
||||||
let state', data, next = Vmm_vmmd.handle_command !state wire in
|
let state', data, next = Vmm_vmmd.handle_command !state wire in
|
||||||
state := state' ;
|
state := state' ;
|
||||||
process "handle_command" data >>= function
|
process "handle command" data >>= fun () ->
|
||||||
| Error () -> Lwt.return_unit
|
match next with
|
||||||
| Ok () -> match next with
|
|
||||||
| `Loop -> loop ()
|
| `Loop -> loop ()
|
||||||
| `End -> Lwt.return_unit
|
| `End -> Lwt.return_unit
|
||||||
| `Create cont -> create process cont
|
| `Create cont -> create process cont
|
||||||
| `Wait (who, out) ->
|
| `Wait (who, out) ->
|
||||||
(match register who (fst wire) with
|
(match register who (fst wire) with
|
||||||
| Error out' -> process "wait" [ out' ] >|= ignore
|
| Error out' -> process "wait" [ out' ]
|
||||||
| Ok task ->
|
| Ok task ->
|
||||||
task >>= fun () ->
|
task >>= fun () ->
|
||||||
process "wait" [ out ] >|= ignore)
|
process "wait" [ out ])
|
||||||
| `Wait_and_create (who, next) ->
|
| `Wait_and_create (who, next) ->
|
||||||
(match register who (fst wire) with
|
(match register who (fst wire) with
|
||||||
| Error out' -> process "wait and create" [ out' ] >|= ignore
|
| Error out' -> process "wait and create" [ out' ]
|
||||||
| Ok task ->
|
| Ok task ->
|
||||||
task >>= fun () ->
|
task >>= fun () ->
|
||||||
let state', data, n = next !state in
|
let state', data, n = next !state in
|
||||||
state := state' ;
|
state := state' ;
|
||||||
process "wait and create" data >>= fun _ ->
|
process "wait and create" data >>= fun () ->
|
||||||
match n with
|
match n with
|
||||||
| `End -> Lwt.return_unit
|
| `End -> Lwt.return_unit
|
||||||
| `Create cont -> create process cont >|= ignore)
|
| `Create cont -> create process cont)
|
||||||
in
|
in
|
||||||
loop () >>= fun () ->
|
loop () >>= fun () ->
|
||||||
Vmm_lwt.safe_close fd
|
Vmm_lwt.safe_close fd
|
||||||
|
@ -185,39 +169,70 @@ let jump _ =
|
||||||
| Some c -> c) >>= fun (c, c_fd, c_mut) ->
|
| Some c -> c) >>= fun (c, c_fd, c_mut) ->
|
||||||
create_mbox `Stats >>= fun s ->
|
create_mbox `Stats >>= fun s ->
|
||||||
|
|
||||||
let write_reply (header, cmd) name mvar fd mut =
|
let write_reply txt (header, cmd) name mvar fd mut =
|
||||||
Lwt_mutex.with_lock mut (fun () ->
|
Lwt_mutex.with_lock mut (fun () ->
|
||||||
Lwt_mvar.put mvar (header, cmd) >>= fun () ->
|
Lwt_mvar.put mvar (header, cmd) >>= fun () ->
|
||||||
Vmm_lwt.read_wire fd) >|= function
|
Vmm_lwt.read_wire fd) >|= function
|
||||||
| Ok (header', reply) ->
|
| Ok (header', reply) ->
|
||||||
if not Vmm_commands.(version_eq header.version header'.version) then
|
if not Vmm_commands.(version_eq header.version header'.version) then begin
|
||||||
Error (`Msg ("wrong version in reply from " ^ name))
|
Logs.err (fun m -> m "%s: wrong version (got %a, expected %a) in reply from %s"
|
||||||
else if not Vmm_commands.(Int64.equal header.sequence header'.sequence) then
|
txt
|
||||||
Error (`Msg (
|
Vmm_commands.pp_version header'.Vmm_commands.version
|
||||||
Fmt.strf "wrong id %Lu (expected %Lu) in reply from %s"
|
Vmm_commands.pp_version header.Vmm_commands.version
|
||||||
header'.Vmm_commands.sequence header.Vmm_commands.sequence name))
|
name) ;
|
||||||
else begin match reply with
|
invalid_arg "bad version received"
|
||||||
| `Success _ -> Ok ()
|
end else if not Vmm_commands.(Int64.equal header.sequence header'.sequence) then begin
|
||||||
| `Failure msg -> Error (`Msg (msg ^ " from " ^ name))
|
Logs.err (fun m -> m "%s: wrong id %Lu (expected %Lu) in reply from %s"
|
||||||
| _ -> Error (`Msg ("unexpected data from " ^ name))
|
txt header'.Vmm_commands.sequence header.Vmm_commands.sequence name) ;
|
||||||
|
invalid_arg "wrong sequence number received"
|
||||||
|
end else begin
|
||||||
|
Logs.debug (fun m -> m "%s: received valid reply from %s %a"
|
||||||
|
txt name Vmm_commands.pp_wire (header', reply)) ;
|
||||||
|
match reply with
|
||||||
|
| `Success _ -> ()
|
||||||
|
| `Failure msg ->
|
||||||
|
(* can we programatically solve such a situation? *)
|
||||||
|
(* we at least know e.g when writing to console resulted in an error,
|
||||||
|
that we can't continue but need to roll back -- and not continue
|
||||||
|
with execvp() *)
|
||||||
|
Logs.err (fun m -> m "%s: received failure %s from %s" txt msg name)
|
||||||
|
| _ ->
|
||||||
|
Logs.err (fun m -> m "%s: unexpected data from %s" txt name) ;
|
||||||
|
invalid_arg "unexpected data"
|
||||||
end
|
end
|
||||||
| Error _ -> Error (`Msg ("error in read from " ^ name))
|
| Error _ ->
|
||||||
|
Logs.err (fun m -> m "error in read from %s" name) ;
|
||||||
|
invalid_arg "communication failure"
|
||||||
in
|
in
|
||||||
let out = function
|
let out txt = function
|
||||||
| `Stat wire ->
|
| `Stat wire ->
|
||||||
begin match s with
|
begin match s with
|
||||||
| None -> Lwt.return (Ok ())
|
| None -> Lwt.return_unit
|
||||||
| Some (s, s_fd, s_mut) -> write_reply wire "stats" s s_fd s_mut
|
| Some (s, s_fd, s_mut) -> write_reply txt wire "stats" s s_fd s_mut
|
||||||
end
|
end
|
||||||
| `Log wire -> write_reply wire "log" l l_fd l_mut
|
| `Log wire -> write_reply txt wire "log" l l_fd l_mut
|
||||||
| `Cons wire -> write_reply wire "console" c c_fd c_mut
|
| `Cons wire -> write_reply txt wire "console" c c_fd c_mut
|
||||||
in
|
in
|
||||||
|
let process ?fd txt wires =
|
||||||
|
Lwt_list.iter_p (function
|
||||||
|
| (#Vmm_vmmd.service_out as o) -> out txt o
|
||||||
|
| `Data wire -> match fd with
|
||||||
|
| None ->
|
||||||
|
Logs.app (fun m -> m "%s received %a" txt Vmm_commands.pp_wire wire) ;
|
||||||
|
Lwt.return_unit
|
||||||
|
| Some fd ->
|
||||||
|
(* TODO should we terminate the connection on write failure? *)
|
||||||
|
Vmm_lwt.write_wire fd wire >|= fun _ ->
|
||||||
|
())
|
||||||
|
wires
|
||||||
|
in
|
||||||
|
|
||||||
Lwt.async stats_loop ;
|
Lwt.async stats_loop ;
|
||||||
Lwt.catch (fun () ->
|
Lwt.catch (fun () ->
|
||||||
let rec loop () =
|
let rec loop () =
|
||||||
Lwt_unix.accept ss >>= fun (fd, addr) ->
|
Lwt_unix.accept ss >>= fun (fd, addr) ->
|
||||||
Lwt_unix.set_close_on_exec fd ;
|
Lwt_unix.set_close_on_exec fd ;
|
||||||
Lwt.async (fun () -> handle out fd addr) ;
|
Lwt.async (fun () -> handle (process ~fd) fd addr) ;
|
||||||
loop ()
|
loop ()
|
||||||
in
|
in
|
||||||
loop ())
|
loop ())
|
||||||
|
|
Loading…
Reference in a new issue