revise log and console subscription protocol, require either since or count

This commit is contained in:
Hannes Mehnert 2019-10-29 19:42:55 +01:00
parent 90d1fd9d7d
commit 8a113e5ce0
11 changed files with 93 additions and 71 deletions

View File

@ -115,14 +115,14 @@ let create _ endp cert key ca force name image cpuid memory argv block network c
| Ok cmd -> jump endp cert key ca name (`Unikernel_cmd cmd) | Ok cmd -> jump endp cert key ca name (`Unikernel_cmd cmd)
| Error (`Msg msg) -> Error (`Msg msg) | Error (`Msg msg) -> Error (`Msg msg)
let console _ endp cert key ca name since = let console _ endp cert key ca name since count =
jump endp cert key ca name (`Console_cmd (`Console_subscribe since)) jump endp cert key ca name (`Console_cmd (`Console_subscribe (Albatross_cli.since_count since count)))
let stats _ endp cert key ca name = let stats _ endp cert key ca name =
jump endp cert key ca name (`Stats_cmd `Stats_subscribe) jump endp cert key ca name (`Stats_cmd `Stats_subscribe)
let event_log _ endp cert key ca name since = let event_log _ endp cert key ca name since count =
jump endp cert key ca name (`Log_cmd (`Log_subscribe since)) jump endp cert key ca name (`Log_cmd (`Log_subscribe (Albatross_cli.since_count since count)))
let block_info _ endp cert key ca block_name = let block_info _ endp cert key ca block_name =
jump endp cert key ca block_name (`Block_cmd `Block_info) jump endp cert key ca block_name (`Block_cmd `Block_info)
@ -217,7 +217,7 @@ let console_cmd =
[`S "DESCRIPTION"; [`S "DESCRIPTION";
`P "Shows console output of a VM."] `P "Shows console output of a VM."]
in in
Term.(term_result (const console $ setup_log $ destination $ ca_cert $ ca_key $ server_ca $ vm_name $ since)), Term.(term_result (const console $ setup_log $ destination $ ca_cert $ ca_key $ server_ca $ vm_name $ since $ count)),
Term.info "console" ~doc ~man Term.info "console" ~doc ~man
let stats_cmd = let stats_cmd =
@ -235,7 +235,7 @@ let log_cmd =
[`S "DESCRIPTION"; [`S "DESCRIPTION";
`P "Shows event log of VM."] `P "Shows event log of VM."]
in in
Term.(term_result (const event_log $ setup_log $ destination $ ca_cert $ ca_key $ server_ca $ opt_vm_name $ since)), Term.(term_result (const event_log $ setup_log $ destination $ ca_cert $ ca_key $ server_ca $ opt_vm_name $ since $ count)),
Term.info "log" ~doc ~man Term.info "log" ~doc ~man
let block_info_cmd = let block_info_cmd =

View File

@ -66,8 +66,8 @@ let create _ opt_socket force name image cpuid memory argv block network compres
| Ok cmd -> jump opt_socket name (`Unikernel_cmd cmd) | Ok cmd -> jump opt_socket name (`Unikernel_cmd cmd)
| Error (`Msg msg) -> Error (`Msg msg) | Error (`Msg msg) -> Error (`Msg msg)
let console _ opt_socket name since = let console _ opt_socket name since count =
jump opt_socket name (`Console_cmd (`Console_subscribe since)) jump opt_socket name (`Console_cmd (`Console_subscribe (Albatross_cli.since_count since count)))
let stats_add _ opt_socket name vmmdev pid bridge_taps = let stats_add _ opt_socket name vmmdev pid bridge_taps =
jump opt_socket name (`Stats_cmd (`Stats_add (vmmdev, pid, bridge_taps))) jump opt_socket name (`Stats_cmd (`Stats_add (vmmdev, pid, bridge_taps)))
@ -78,8 +78,8 @@ let stats_remove _ opt_socket name =
let stats_subscribe _ opt_socket name = let stats_subscribe _ opt_socket name =
jump opt_socket name (`Stats_cmd `Stats_subscribe) jump opt_socket name (`Stats_cmd `Stats_subscribe)
let event_log _ opt_socket name since = let event_log _ opt_socket name since count =
jump opt_socket name (`Log_cmd (`Log_subscribe since)) jump opt_socket name (`Log_cmd (`Log_subscribe (Albatross_cli.since_count since count)))
let block_info _ opt_socket block_name = let block_info _ opt_socket block_name =
jump opt_socket block_name (`Block_cmd `Block_info) jump opt_socket block_name (`Block_cmd `Block_info)
@ -162,7 +162,7 @@ let console_cmd =
[`S "DESCRIPTION"; [`S "DESCRIPTION";
`P "Shows console output of a VM."] `P "Shows console output of a VM."]
in in
Term.(term_result (const console $ setup_log $ socket $ vm_name $ since)), Term.(term_result (const console $ setup_log $ socket $ vm_name $ since $ count)),
Term.info "console" ~doc ~man Term.info "console" ~doc ~man
let stats_subscribe_cmd = let stats_subscribe_cmd =
@ -198,7 +198,7 @@ let log_cmd =
[`S "DESCRIPTION"; [`S "DESCRIPTION";
`P "Shows event log of VM."] `P "Shows event log of VM."]
in in
Term.(term_result (const event_log $ setup_log $ socket $ opt_vm_name $ since)), Term.(term_result (const event_log $ setup_log $ socket $ opt_vm_name $ since $ count)),
Term.info "log" ~doc ~man Term.info "log" ~doc ~man
let block_info_cmd = let block_info_cmd =

View File

@ -233,10 +233,22 @@ let exit_code =
let timestamp_c = let timestamp_c =
let parse s = match Ptime.of_rfc3339 s with let parse s = match Ptime.of_rfc3339 s with
| Ok (t, _, _) -> `Ok t | Ok (t, _, _) -> `Ok t
| Error _ -> `Error "couldn't parse timestamp" | Error _ ->
(* let's try to add T00:00:00-00:00 *)
match Ptime.of_rfc3339 (s ^ "T00:00:00-00:00") with
| Ok (t, _, _) -> `Ok t
| Error _ -> `Error "couldn't parse timestamp"
in in
(parse, Ptime.pp_rfc3339 ()) (parse, Ptime.pp_rfc3339 ())
let since = let since =
let doc = "Receive data since a specified timestamp (RFC 3339 encoded)" in let doc = "Receive data since a specified timestamp (RFC 3339 encoded)" in
Arg.(value & opt (some timestamp_c) None & info [ "since" ] ~doc) Arg.(value & opt (some timestamp_c) None & info [ "since" ] ~doc)
let count =
let doc = "Receive N data records" in
Arg.(value & opt int 20 & info [ "count" ] ~doc)
let since_count since count = match since with
| None -> `Count count
| Some since -> `Since since

View File

@ -104,8 +104,8 @@ let subscribe s id =
let send_history s r id since = let send_history s r id since =
let entries = let entries =
match since with match since with
| None -> Vmm_ring.read r | `Count n -> Vmm_ring.read_last r n
| Some ts -> Vmm_ring.read_history r ts | `Since ts -> Vmm_ring.read_history r ts
in in
Logs.debug (fun m -> m "%a found %d history" Vmm_core.Name.pp id (List.length entries)) ; Logs.debug (fun m -> m "%a found %d history" Vmm_core.Name.pp id (List.length entries)) ;
Lwt_list.iter_s (fun (i, v) -> Lwt_list.iter_s (fun (i, v) ->

View File

@ -67,19 +67,15 @@ let write_to_file mvar file =
loop fd >|= fun _ -> loop fd >|= fun _ ->
() ()
let send_history s ring id ts = let send_history s ring id what =
let elements = let tst event =
match ts with let sub = Vmm_core.Log.name event in
| None -> Vmm_ring.read ring Vmm_core.Name.is_sub ~super:id ~sub
| Some since -> Vmm_ring.read_history ring since
in in
let res = let elements =
List.fold_left (fun acc (ts, event) -> match what with
let sub = Vmm_core.Log.name event in | `Since since -> Vmm_ring.read_history ~tst ring since
if Vmm_core.Name.is_sub ~super:id ~sub | `Count n -> Vmm_ring.read_last ~tst ring n
then (ts, event) :: acc
else acc)
[] elements
in in
(* just need a wrapper in tag = Log.Data, id = reqid *) (* just need a wrapper in tag = Log.Data, id = reqid *)
Lwt_list.fold_left_s (fun r (ts, event) -> Lwt_list.fold_left_s (fun r (ts, event) ->
@ -88,7 +84,7 @@ let send_history s ring id ts =
let header = Vmm_commands.{ version = my_version ; sequence = 0L ; name = id } in let header = Vmm_commands.{ version = my_version ; sequence = 0L ; name = id } in
Vmm_lwt.write_wire s (header, `Data (`Log_data (ts, event))) Vmm_lwt.write_wire s (header, `Data (`Log_data (ts, event)))
| Error e -> Lwt.return (Error e)) | Error e -> Lwt.return (Error e))
(Ok ()) (List.rev res) (Ok ()) elements
let tree = ref Vmm_trie.empty let tree = ref Vmm_trie.empty

View File

@ -45,14 +45,14 @@ let create _ force name image cpuid memory argv block network compression restar
| Ok cmd -> jump name (`Unikernel_cmd cmd) | Ok cmd -> jump name (`Unikernel_cmd cmd)
| Error (`Msg msg) -> Error (`Msg msg) | Error (`Msg msg) -> Error (`Msg msg)
let console _ name since = let console _ name since count =
jump name (`Console_cmd (`Console_subscribe since)) jump name (`Console_cmd (`Console_subscribe (Albatross_cli.since_count since count)))
let stats _ name = let stats _ name =
jump name (`Stats_cmd `Stats_subscribe) jump name (`Stats_cmd `Stats_subscribe)
let event_log _ name since = let event_log _ name since count =
jump name (`Log_cmd (`Log_subscribe since)) jump name (`Log_cmd (`Log_subscribe (Albatross_cli.since_count since count)))
let block_info _ block_name = let block_info _ block_name =
jump block_name (`Block_cmd `Block_info) jump block_name (`Block_cmd `Block_info)
@ -131,7 +131,7 @@ let console_cmd =
[`S "DESCRIPTION"; [`S "DESCRIPTION";
`P "Shows console output of a VM."] `P "Shows console output of a VM."]
in in
Term.(term_result (const console $ setup_log $ vm_name $ since)), Term.(term_result (const console $ setup_log $ vm_name $ since $ count)),
Term.info "console" ~doc ~man Term.info "console" ~doc ~man
let stats_cmd = let stats_cmd =
@ -149,7 +149,7 @@ let log_cmd =
[`S "DESCRIPTION"; [`S "DESCRIPTION";
`P "Shows event log of VM."] `P "Shows event log of VM."]
in in
Term.(term_result (const event_log $ setup_log $ opt_vm_name $ since)), Term.(term_result (const event_log $ setup_log $ opt_vm_name $ since $ count)),
Term.info "log" ~doc ~man Term.info "log" ~doc ~man
let block_info_cmd = let block_info_cmd =

View File

@ -53,15 +53,17 @@ let policy =
let console_cmd = let console_cmd =
let f = function let f = function
| `C1 () -> `Console_add | `C1 () -> `Console_add
| `C2 ts -> `Console_subscribe ts | `C2 `C1 ts -> `Console_subscribe (`Since ts)
| `C2 `C2 c -> `Console_subscribe (`Count c)
and g = function and g = function
| `Console_add -> `C1 () | `Console_add -> `C1 ()
| `Console_subscribe ts -> `C2 ts | `Console_subscribe `Since ts -> `C2 (`C1 ts)
| `Console_subscribe `Count c -> `C2 (`C2 c)
in in
Asn.S.map f g @@ Asn.S.map f g @@
Asn.S.(choice2 Asn.S.(choice2
(explicit 0 null) (explicit 0 null)
(explicit 1 (sequence (single (optional ~label:"since" utc_time))))) (explicit 1 (choice2 (explicit 0 utc_time) (explicit 1 int))))
(* TODO is this good? *) (* TODO is this good? *)
let int64 = let int64 =
@ -283,12 +285,14 @@ let log_event =
let log_cmd = let log_cmd =
let f = function let f = function
| ts -> `Log_subscribe ts | `C1 since -> `Log_subscribe (`Since since)
| `C2 n -> `Log_subscribe (`Count n)
and g = function and g = function
| `Log_subscribe ts -> ts | `Log_subscribe `Since since -> `C1 since
| `Log_subscribe `Count n -> `C2 n
in in
Asn.S.map f g @@ Asn.S.map f g @@
Asn.S.(sequence (single (optional ~label:"since" utc_time))) Asn.S.(choice2 (explicit 0 utc_time) (explicit 1 int))
let typ = let typ =
let f = function let f = function

View File

@ -19,16 +19,20 @@ let version_eq a b =
| `AV2, `AV2 -> true | `AV2, `AV2 -> true
| _ -> false | _ -> false
type since_count = [ `Since of Ptime.t | `Count of int ]
let pp_since_count ppf = function
| `Since since -> Fmt.pf ppf "since %a" (Ptime.pp_rfc3339 ()) since
| `Count n -> Fmt.pf ppf "number %d" n
type console_cmd = [ type console_cmd = [
| `Console_add | `Console_add
| `Console_subscribe of Ptime.t option | `Console_subscribe of since_count
] ]
let pp_console_cmd ppf = function let pp_console_cmd ppf = function
| `Console_add -> Fmt.string ppf "console add" | `Console_add -> Fmt.string ppf "console add"
| `Console_subscribe ts -> | `Console_subscribe ts -> Fmt.pf ppf "console subscribe %a" pp_since_count ts
Fmt.pf ppf "console subscribe since %a"
Fmt.(option ~none:(unit "epoch") (Ptime.pp_rfc3339 ())) ts
type stats_cmd = [ type stats_cmd = [
| `Stats_add of string * int * (string * string) list | `Stats_add of string * int * (string * string) list
@ -44,13 +48,11 @@ let pp_stats_cmd ppf = function
| `Stats_subscribe -> Fmt.string ppf "stat subscribe" | `Stats_subscribe -> Fmt.string ppf "stat subscribe"
type log_cmd = [ type log_cmd = [
| `Log_subscribe of Ptime.t option | `Log_subscribe of since_count
] ]
let pp_log_cmd ppf = function let pp_log_cmd ppf = function
| `Log_subscribe ts -> | `Log_subscribe x -> Fmt.pf ppf "log subscribe since %a" pp_since_count x
Fmt.pf ppf "log subscribe since %a"
Fmt.(option ~none:(unit "epoch") (Ptime.pp_rfc3339 ())) ts
type unikernel_cmd = [ type unikernel_cmd = [
| `Unikernel_info | `Unikernel_info

View File

@ -11,9 +11,11 @@ val version_eq : version -> version -> bool
(** [pp_version ppf version] pretty prints [version] onto [ppf]. *) (** [pp_version ppf version] pretty prints [version] onto [ppf]. *)
val pp_version : version Fmt.t val pp_version : version Fmt.t
type since_count = [ `Since of Ptime.t | `Count of int ]
type console_cmd = [ type console_cmd = [
| `Console_add | `Console_add
| `Console_subscribe of Ptime.t option | `Console_subscribe of since_count
] ]
type stats_cmd = [ type stats_cmd = [
@ -23,7 +25,7 @@ type stats_cmd = [
] ]
type log_cmd = [ type log_cmd = [
| `Log_subscribe of Ptime.t option | `Log_subscribe of since_count
] ]
type unikernel_cmd = [ type unikernel_cmd = [

View File

@ -19,25 +19,30 @@ let write t entry =
let dec t n = (pred n + t.size) mod t.size let dec t n = (pred n + t.size) mod t.size
let not_written ts = Ptime.equal ts Ptime.min let read_last t ?(tst = fun _ -> true) n =
let rec one idx count acc =
let entry_not_written (ts, _) = not_written ts let our = Array.get t.data idx in
if tst (snd our) then
let earlier than (ts, _) = if pred count = 0 then
if not_written ts then true else Ptime.is_earlier ts ~than our :: acc
else
let read_some tst t = one (dec t idx) (pred count) (our :: acc)
let rec go s acc idx =
if idx = s then (* don't read it twice *)
acc
else else
let entry = Array.get t.data idx in one (dec t idx) count acc
if tst entry then acc else go s (entry :: acc) (dec t idx)
in in
let idx = dec t t.write in one (dec t t.write) n []
let entry = Array.get t.data idx in
if tst entry then [] else go idx [entry] (dec t idx)
let read t = read_some entry_not_written t let read_history t ?(tst = fun _ -> true) since =
let rec go acc idx =
let read_history t than = read_some (earlier than) t let entry = Array.get t.data idx in
if Ptime.equal (fst entry) Ptime.min then
acc
else if tst (snd entry) then
if Ptime.is_earlier (fst entry) ~than:since then
acc
else
go (entry :: acc) (dec t idx)
else
go acc (dec t idx)
in
go [] (dec t t.write)

View File

@ -5,5 +5,6 @@ type 'a t
val create : ?size:int -> 'a -> unit -> 'a t val create : ?size:int -> 'a -> unit -> 'a t
val write : 'a t -> Ptime.t * 'a -> unit val write : 'a t -> Ptime.t * 'a -> unit
val read : 'a t -> (Ptime.t * 'a) list
val read_history : 'a t -> Ptime.t -> (Ptime.t * 'a) list val read_last : 'a t -> ?tst:('a -> bool) -> int -> (Ptime.t * 'a) list
val read_history : 'a t -> ?tst:('a -> bool) -> Ptime.t -> (Ptime.t * 'a) list