type data for streamed thingies

This commit is contained in:
Hannes Mehnert 2018-10-23 23:11:22 +02:00
parent ce0c42fa77
commit 6f18f1bfff
8 changed files with 92 additions and 67 deletions

View file

@ -32,7 +32,7 @@ let read_console name ring channel () =
| None -> Lwt.return_unit
| Some fd ->
let header = Vmm_asn.{ version = my_version ; sequence = 0L ; id } in
Vmm_lwt.write_wire fd (header, `Command (`Console_cmd (`Console_data (t, line)))) >>= function
Vmm_lwt.write_wire fd (header, `Data (`Console_data (t, line))) >>= function
| Error _ ->
Vmm_lwt.safe_close fd >|= fun () ->
active := String.Map.remove name !active
@ -92,7 +92,7 @@ let subscribe s id =
Logs.debug (fun m -> m "found %d history" (List.length entries)) ;
Lwt_list.iter_s (fun (i, v) ->
let header = Vmm_asn.{ version = my_version ; sequence = 0L ; id } in
Vmm_lwt.write_wire s (header, `Command (`Console_cmd (`Console_data (i, v)))) >|= fun _ -> ())
Vmm_lwt.write_wire s (header, `Data (`Console_data (i, v))) >|= fun _ -> ())
entries >>= fun () ->
(match String.Map.find name !active with
| None -> Lwt.return_unit
@ -114,8 +114,7 @@ let handle s addr () =
else
match cmd with
| `Console_add -> add_fifo header.Vmm_asn.id
| `Console_subscribe -> subscribe s header.Vmm_asn.id
| `Console_data _ -> Lwt.return (Error (`Msg "unexpected command"))) >>= (function
| `Console_subscribe -> subscribe s header.Vmm_asn.id) >>= (function
| Ok msg -> Vmm_lwt.write_wire s (header, `Success (`String msg))
| Error (`Msg msg) ->
Logs.err (fun m -> m "error while processing command: %s" msg) ;

View file

@ -189,7 +189,7 @@ let rec read_sock_write_tcp c ?fd addr addrtype =
safe_close fd >>= fun () ->
safe_close c >|= fun () ->
true
| Ok (hdr, `Command (`Stats_cmd (`Stats_data (ru, vmm, ifs)))) ->
| Ok (hdr, `Data (`Stats_data (ru, vmm, ifs))) ->
begin
if not (Vmm_asn.version_eq hdr.Vmm_asn.version my_version) then begin
Logs.err (fun m -> m "unknown wire protocol version") ;

View file

@ -73,9 +73,7 @@ let send_history s ring id =
(* just need a wrapper in tag = Log.Data, id = reqid *)
Lwt_list.fold_left_s (fun r (header, ts, event) ->
match r with
| Ok () ->
let data = header, `Command (`Log_cmd (`Log_data (ts, event))) in
Vmm_lwt.write_wire s data
| Ok () -> Vmm_lwt.write_wire s (header, `Data (`Log_data (ts, event)))
| Error e -> Lwt.return (Error e))
(Ok ()) res
@ -91,24 +89,29 @@ let handle mvar ring s addr () =
| Error _ ->
Logs.err (fun m -> m "exception while reading") ;
Lwt.return_unit
| Ok (hdr, `Data (`Log_data (ts, event))) ->
if not (Vmm_asn.version_eq hdr.Vmm_asn.version my_version) then begin
Logs.warn (fun m -> m "unsupported version") ;
Lwt.return_unit
end else begin
let data = Vmm_asn.log_entry_to_cstruct (hdr, ts, event) in
Vmm_ring.write ring (ts, Cstruct.to_string data) ;
Lwt_mvar.put mvar data >>= fun () ->
let data' =
let header = Vmm_asn.{ version = my_version ; sequence = !bcast ; id = hdr.Vmm_asn.id } in
(header, `Data (`Log_data (ts, event)))
in
bcast := Int64.succ !bcast ;
broadcast hdr.Vmm_asn.id data' !tree >>= fun tree' ->
tree := tree' ;
loop ()
end
| Ok (hdr, `Command (`Log_cmd lc)) ->
if not (Vmm_asn.version_eq hdr.Vmm_asn.version my_version) then begin
Logs.warn (fun m -> m "unsupported version") ;
Lwt.return_unit
end else begin
match lc with
| `Log_data (ts, event) ->
let data = Vmm_asn.log_entry_to_cstruct (hdr, ts, event) in
Vmm_ring.write ring (ts, Cstruct.to_string data) ;
Lwt_mvar.put mvar data >>= fun () ->
let data' =
let header = Vmm_asn.{ version = my_version ; sequence = !bcast ; id = hdr.Vmm_asn.id } in
(header, `Command (`Log_cmd (`Log_data (ts, event))))
in
bcast := Int64.succ !bcast ;
broadcast hdr.Vmm_asn.id data' !tree >>= fun tree' ->
tree := tree' ;
loop ()
| `Log_subscribe ->
let tree', ret = Vmm_trie.insert hdr.Vmm_asn.id s !tree in
tree := tree' ;

View file

@ -40,6 +40,9 @@ let create c_fd process cont =
| `Failure f ->
Logs.err (fun m -> m "console failed with %s" f) ;
Lwt.return_unit
| `Data _ ->
Logs.err (fun m -> m "console replied with data") ;
Lwt.return_unit
| `Success _msg ->
(* assert hdr.id = id! *)
let await, wakeme = Lwt.wait () in

View file

@ -106,32 +106,24 @@ let version_eq a b =
type console_cmd = [
| `Console_add
| `Console_subscribe
| `Console_data of Ptime.t * string
]
let pp_console_cmd ppf = function
| `Console_add -> Fmt.string ppf "console add"
| `Console_subscribe -> Fmt.string ppf "console subscribe"
| `Console_data (ts, line) -> Fmt.pf ppf "console data %a: %s"
(Ptime.pp_rfc3339 ()) ts line
let console_cmd =
let f = function
| `C1 () -> `Console_add
| `C2 () -> `Console_subscribe
| `C3 (timestamp, data) -> `Console_data (timestamp, data)
and g = function
| `Console_add -> `C1 ()
| `Console_subscribe -> `C2 ()
| `Console_data (timestamp, data) -> `C3 (timestamp, data)
in
Asn.S.map f g @@
Asn.S.(choice3
Asn.S.(choice2
(explicit 0 null)
(explicit 1 null)
(explicit 2 (sequence2
(required ~label:"timestamp" utc_time)
(required ~label:"data" utf8_string))))
(explicit 1 null))
(* TODO is this good? *)
let int64 =
@ -211,41 +203,30 @@ type stats_cmd = [
| `Stats_add of int * string list
| `Stats_remove
| `Stats_subscribe
| `Stats_data of stats
]
let pp_stats_cmd ppf = function
| `Stats_add (pid, taps) -> Fmt.pf ppf "stats add: pid %d taps %a" pid Fmt.(list ~sep:(unit ", ") string) taps
| `Stats_remove -> Fmt.string ppf "stat remove"
| `Stats_subscribe -> Fmt.string ppf "stat subscribe"
| `Stats_data stats -> Fmt.pf ppf "stats data: %a" pp_stats stats
let stats_cmd =
let f = function
| `C1 (pid, taps) -> `Stats_add (pid, taps)
| `C2 () -> `Stats_remove
| `C3 () -> `Stats_subscribe
| `C4 (ru, ifs, vmm) -> `Stats_data (ru, vmm, ifs)
and g = function
| `Stats_add (pid, taps) -> `C1 (pid, taps)
| `Stats_remove -> `C2 ()
| `Stats_subscribe -> `C3 ()
| `Stats_data (ru, ifs, vmm) -> `C4 (ru, vmm, ifs)
in
Asn.S.map f g @@
Asn.S.(choice4
Asn.S.(choice3
(explicit 0 (sequence2
(required ~label:"pid" int)
(required ~label:"taps" (sequence_of utf8_string))))
(explicit 1 null)
(explicit 2 null)
(explicit 3 (sequence3
(required ~label:"resource_usage" ru)
(required ~label:"ifdata" (sequence_of ifdata))
(optional ~label:"vmm_stats"
(sequence_of (sequence2
(required ~label:"key" utf8_string)
(required ~label:"value" int64)))))))
(explicit 2 null))
let addr =
Asn.S.(sequence2
@ -295,28 +276,20 @@ let log_event =
(explicit 2 int))))))
type log_cmd = [
| `Log_data of Ptime.t * Log.event
| `Log_subscribe
]
let pp_log_cmd ppf = function
| `Log_data (ts, event) -> Fmt.pf ppf "log data: %a %a" (Ptime.pp_rfc3339 ()) ts Log.pp_event event
| `Log_subscribe -> Fmt.string ppf "log subscribe"
let log_cmd =
let f = function
| `C1 (timestamp, event) -> `Log_data (timestamp, event)
| `C2 () -> `Log_subscribe
| () -> `Log_subscribe
and g = function
| `Log_data (timestamp, event) -> `C1 (timestamp, event)
| `Log_subscribe -> `C2 ()
| `Log_subscribe -> ()
in
Asn.S.map f g @@
Asn.S.(choice2
(explicit 0 (sequence2
(required ~label:"timestamp" utc_time)
(required ~label:"event" log_event)))
(explicit 1 null))
Asn.S.null
type vm_cmd = [
| `Vm_info
@ -444,6 +417,45 @@ let wire_command : wire_command Asn.S.t =
(explicit 3 vm_cmd)
(explicit 4 policy_cmd))
type data = [
| `Console_data of Ptime.t * string
| `Stats_data of stats
| `Log_data of Ptime.t * Log.event
]
let pp_data ppf = function
| `Console_data (ts, line) -> Fmt.pf ppf "console data %a: %s"
(Ptime.pp_rfc3339 ()) ts line
| `Stats_data stats -> Fmt.pf ppf "stats data: %a" pp_stats stats
| `Log_data (ts, event) -> Fmt.pf ppf "log data: %a %a" (Ptime.pp_rfc3339 ()) ts Log.pp_event event
let data =
let f = function
| `C1 (timestamp, data) -> `Console_data (timestamp, data)
| `C2 (ru, ifs, vmm) -> `Stats_data (ru, vmm, ifs)
| `C3 (timestamp, event) -> `Log_data (timestamp, event)
and g = function
| `Console_data (timestamp, data) -> `C1 (timestamp, data)
| `Stats_data (ru, ifs, vmm) -> `C2 (ru, vmm, ifs)
| `Log_data (timestamp, event) -> `C3 (timestamp, event)
in
Asn.S.map f g @@
Asn.S.(choice3
(explicit 0 (sequence2
(required ~label:"timestamp" utc_time)
(required ~label:"data" utf8_string)))
(explicit 1 (sequence3
(required ~label:"resource_usage" ru)
(required ~label:"ifdata" (sequence_of ifdata))
(optional ~label:"vmm_stats"
(sequence_of (sequence2
(required ~label:"key" utf8_string)
(required ~label:"value" int64))))))
(explicit 2 (sequence2
(required ~label:"timestamp" utc_time)
(required ~label:"event" log_event))))
type header = {
version : version ;
sequence : int64 ;
@ -471,7 +483,8 @@ let pp_success ppf = function
type wire = header * [
| `Command of wire_command
| `Success of success
| `Failure of string ]
| `Failure of string
| `Data of data ]
let pp_wire ppf (header, data) =
let id = header.id in
@ -479,6 +492,7 @@ let pp_wire ppf (header, data) =
| `Command c -> Fmt.pf ppf "host %a: %a" pp_id id pp_wire_command c
| `Failure f -> Fmt.pf ppf "host %a: command failed %s" pp_id id f
| `Success s -> Fmt.pf ppf "host %a: %a" pp_id id pp_success s
| `Data d -> pp_data ppf d
let wire =
let f (header, payload) =
@ -494,6 +508,7 @@ let wire =
in
`Success p
| `C3 str -> `Failure str
| `C4 data -> `Data data
and g (header, payload) =
header,
match payload with
@ -507,12 +522,13 @@ let wire =
in
`C2 p
| `Failure str -> `C3 str
| `Data d -> `C4 d
in
Asn.S.map f g @@
Asn.S.(sequence2
(required ~label:"header" header)
(required ~label:"payload"
(choice3
(choice4
(explicit 0 wire_command)
(explicit 1 (choice4
(explicit 0 null)
@ -525,7 +541,8 @@ let wire =
(sequence2
(required ~label:"name" (sequence_of utf8_string))
(required ~label:"vm_config" vm_config))))))
(explicit 2 utf8_string))))
(explicit 2 utf8_string)
(explicit 3 data))))
let wire_of_cstruct, wire_to_cstruct = projections_of wire

View file

@ -23,18 +23,15 @@ val pp_version : version Fmt.t
type console_cmd = [
| `Console_add
| `Console_subscribe
| `Console_data of Ptime.t * string
]
type stats_cmd = [
| `Stats_add of int * string list
| `Stats_remove
| `Stats_subscribe
| `Stats_data of stats
]
type log_cmd = [
| `Log_data of Ptime.t * Log.event
| `Log_subscribe
]
@ -60,6 +57,14 @@ type wire_command = [
val pp_wire_command : wire_command Fmt.t
type data = [
| `Console_data of Ptime.t * string
| `Stats_data of stats
| `Log_data of Ptime.t * Log.event
]
val pp_data : data Fmt.t
type header = {
version : version ;
sequence : int64 ;
@ -69,7 +74,8 @@ type header = {
type wire = header * [
| `Command of wire_command
| `Success of [ `Empty | `String of string | `Policies of (id * policy) list | `Vms of (id * vm_config) list ]
| `Failure of string ]
| `Failure of string
| `Data of data ]
val pp_wire : wire Fmt.t

View file

@ -38,7 +38,7 @@ let log t id event =
let header = Vmm_asn.{ version = t.wire_version ; sequence = t.log_counter ; id } in
let log_counter = Int64.succ t.log_counter in
Logs.debug (fun m -> m "LOG %a" Log.pp_event event) ;
({ t with log_counter }, `Log (header, `Command (`Log_cmd data)))
({ t with log_counter }, `Log (header, `Data data))
let handle_create t hdr vm_config =
let name = hdr.Vmm_asn.id in
@ -80,13 +80,12 @@ let handle_shutdown t name vm r =
| Ok () -> ()
| Error (`Msg e) -> Logs.warn (fun m -> m "%s while shutdown vm %a" e pp_vm vm)) ;
let resources = Vmm_resources.remove t.resources name in
let stat_out = `Stats_remove in
let header = Vmm_asn.{ version = t.wire_version ; sequence = t.stats_counter ; id = name } in
let tasks = String.Map.remove (string_of_id name) t.tasks in
let t = { t with stats_counter = Int64.succ t.stats_counter ; resources ; tasks } in
let t, logout = log t name (`VM_stop (vm.pid, r))
in
(t, [ `Stat (header, `Command (`Stats_cmd stat_out)) ; logout ])
(t, [ `Stat (header, `Command (`Stats_cmd `Stats_remove)) ; logout ])
let handle_command t (header, payload) =
let msg_to_err = function

View file

@ -121,8 +121,7 @@ let tick t =
| Some real_id ->
let header = Vmm_asn.{ version = my_version ; sequence = !bcast ; id = real_id } in
bcast := Int64.succ !bcast ;
let data = `Stats_data stats in
((socket, vmid, (header, `Command (`Stats_cmd data))) :: out))
((socket, vmid, (header, `Data (`Stats_data stats))) :: out))
out xs)
[] (Vmm_trie.all t'.vmid_pid)
in
@ -192,7 +191,6 @@ let handle t socket (header, wire) =
| `Stats_subscribe ->
let name_sockets, close = Vmm_trie.insert id socket t.name_sockets in
Ok ({ t with name_sockets }, `None, close, Some "subscribed")
| _ -> Error (`Msg "unknown command")
end
| _ ->
Logs.warn (fun m -> m "ignoring %a" Vmm_asn.pp_wire (header, wire)) ;