stats: transmit vmid in add/remove/stats, pid only in add

don't use /tmp anymore, but /var/run/albatross for fifos + sockets + vm images,
  and /var/db/albatross for ukvm-bin and crls, and /var/log/albatross for logging

vmm_console/vmm_log/vmm_stats_lwt: delete socket on startup if it exists

vmm_influxdb_stats: connects to vmm_stats socket and pushes every interval in
 influxdb line format via tcp to specified host and port
This commit is contained in:
Hannes Mehnert 2018-04-25 13:15:53 +02:00
parent c04f062960
commit 0583fbfaf1
15 changed files with 435 additions and 196 deletions

View file

@ -71,15 +71,15 @@ DEV> mirage configure -t ukvm
DEV> mirage build
DEV> mv ukvm-bin /tmp/ukvm-bin.net
DEV> cd ../../..
DEV> COPY /tmp/ukvm-bin.none /tmp/ukvm-bin.net SRV:
DEV> COPY vmm_console vmm_log vmm_stats_lwt vmmd SRV:
DEV> COPY /tmp/ukvm-bin.none /tmp/ukvm-bin.net SRV:/var/db/albatross
DEV> COPY vmm_console vmm_log vmm_stats_lwt vmmd SRV:/opt/bin/
```
```
SRV> vmm_console -vv cons.sock &
SRV> vmm_log -vv log.out log.sock &
SRV> vmm_stats_lwt -vv stat.sock & #optional
SRV# vmmd -vv . cacert.pem server.pem server.key
SRV> vmm_console -vv &
SRV> vmm_log -vv &
SRV> vmm_stats_lwt -vv & #optional
SRV# vmmd -vv cacert.pem server.pem server.key
```
Some setup for network interfaces is needed, depending on your operating system.

View file

@ -56,7 +56,7 @@ let read_console s name ring channel () =
Lwt_io.close channel)
let open_fifo name =
let fifo = Fpath.(v (Filename.get_temp_dir_name ()) / name + "fifo") in
let fifo = Fpath.(Vmm_core.tmpdir / name + "fifo") in
Lwt.catch (fun () ->
Logs.debug (fun m -> m "opening %a for reading" Fpath.pp fifo) ;
Lwt_io.open_file ~mode:Lwt_io.Input (Fpath.to_string fifo) >>= fun channel ->
@ -152,7 +152,10 @@ let handle s addr () =
let jump _ file =
Sys.(set_signal sigpipe Signal_ignore) ;
Lwt_main.run
(let s = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in
((Lwt_unix.file_exists file >>= function
| true -> Lwt_unix.unlink file
| false -> Lwt.return_unit) >>= fun () ->
let s = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in
Lwt_unix.(bind s (ADDR_UNIX file)) >>= fun () ->
Lwt_unix.listen s 1 ;
let rec loop () =
@ -175,8 +178,9 @@ let setup_log =
$ Logs_cli.level ())
let socket =
let doc = "Socket to listen onto" in
Arg.(required & pos 0 (some string) None & info [] ~doc)
let doc = "Socket to listen on" in
let sock = Fpath.(to_string (Vmm_core.tmpdir / "cons" + "sock")) in
Arg.(value & opt string sock & info [ "s" ; "socket" ] ~doc)
let cmd =
Term.(ret (const jump $ setup_log $ socket)),

285
app/vmm_influxdb_stats.ml Normal file
View file

@ -0,0 +1,285 @@
(* (c) 2018 Hannes Mehnert, all rights reserved *)
open Lwt.Infix
open Astring
open Vmm_core
(*
line protocol:
```
<measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] \
<field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
```
(measurement, tag_key, tag_value, field_key are all strings, index over tags)
* Quoting
Element Double quotes Single quotes
---------------------------------------------
Timestamp Never Never
Measurements, tag keys, tag values, field keys Never* Never*
Field values Double quote string field values. Do not double quote floats,
integers, or Booleans. Never
*=Line Protocol allows users to double and single quote measurement names, tag
keys, tag values, and field keys. It will, however, assume that the double or
single quotes are part of the name, key, or value. This can complicate query
syntax (see the example below).
Float IEEE-754 64-bit floating-point numbers. This is the default numerical
type. Examples: 1, 1.0, 1.e+78, 1.E+78.
Integer Signed 64-bit integers (-9223372036854775808 to 9223372036854775807).
Specify an integer with a trailing i on the number. Example: 1i.
For tag keys, tag values, and field keys always use a backslash character \ to
escape:
commas ,
equal signs =
spaces
For measurements always use a backslash character \ to escape:
commas ,
spaces
For string field values use a backslash character \ to escape:
double quotes ""
Line Protocol does not require users to escape the backslash character \. Users
do not need to escape all other special characters.
do not use any keywords:
ALL ALTER ANY AS ASC BEGIN
BY CREATE CONTINUOUS DATABASE DATABASES DEFAULT
DELETE DESC DESTINATIONS DIAGNOSTICS DISTINCT DROP
DURATION END EVERY EXPLAIN FIELD FOR
FROM GRANT GRANTS GROUP GROUPS IN
INF INSERT INTO KEY KEYS KILL
LIMIT SHOW MEASUREMENT MEASUREMENTS NAME OFFSET
ON ORDER PASSWORD POLICY POLICIES PRIVILEGES
QUERIES QUERY READ REPLICATION RESAMPLE RETENTION
REVOKE SELECT SERIES SET SHARD SHARDS
SLIMIT SOFFSET STATS SUBSCRIPTION SUBSCRIPTIONS TAG
TO USER USERS VALUES WHERE WITH
WRITE
*)
module P = struct
let tv (sec, usec) = Printf.sprintf "%Lu.%06d" sec usec
(* TODO: this should use an unsigned to string function *)
let i64 i = Int64.to_string i ^ "i"
let encode_ru vm ru =
let fields =
[ "utime", tv ru.utime ;
"stime", tv ru.stime ;
"maxrss", i64 ru.maxrss ;
"ixrss", i64 ru.ixrss ;
"idrss", i64 ru.idrss ;
"isrss", i64 ru.isrss ;
"minflt", i64 ru.minflt ;
"maxflt", i64 ru.majflt ;
"nswap", i64 ru.nswap ;
"inblock", i64 ru.inblock ;
"outblock", i64 ru.outblock ;
"msgsnd", i64 ru.msgsnd ;
"msgrcv", i64 ru.msgrcv ;
"nsignals", i64 ru.nsignals ;
"nvcsw", i64 ru.nvcsw ;
"nivcsw", i64 ru.nivcsw
]
in
let fields = List.map (fun (k, v) -> k ^ "=" ^ v) fields in
Printf.sprintf "resource_usage,vm=%s %s" vm (String.concat ~sep:"," fields)
let encode_vmm vm xs =
let escape s =
let cutted = String.cuts ~sep:"," s in
let cutted = String.concat ~sep:"\\," cutted in
let cutted = String.cuts ~sep:" " cutted in
let cutted = String.concat ~sep:"\\ " cutted in
let cutted = String.cuts ~sep:"=" cutted in
String.concat ~sep:"\\=" cutted
in
Printf.sprintf "vmm,vm=%s %s" vm
(String.concat ~sep:","
(List.map (fun (k, v) -> (escape k) ^ "=" ^ (i64 v)) xs))
let i32 i = Int32.to_string i ^ "i"
let encode_if vm ifd =
let fields =
(* TODO: flags *)
[ "send_queue_length", i32 ifd.send_length ;
"max_send_queue_length", i32 ifd.max_send_length ;
"send_queue_drops", i32 ifd.send_drops ;
"mtu", i32 ifd.mtu ;
"baudrate", i64 ifd.baudrate ;
"vm_to_host_packets", i64 ifd.input_packets ;
"vm_to_host_errors", i64 ifd.input_errors ;
"vm_to_host_bytes", i64 ifd.input_bytes ;
"vm_to_host_mcast", i64 ifd.input_mcast ;
"vm_to_host_dropped", i64 ifd.input_dropped ;
"collisions", i64 ifd.collisions ;
"host_to_vm_packets", i64 ifd.output_packets ;
"host_to_vm_errors", i64 ifd.output_errors ;
"host_to_vm_bytes", i64 ifd.output_bytes ;
"host_to_vm_mcast", i64 ifd.output_mcast ;
"host_to_vm_dropped", i64 ifd.output_dropped
]
in
let fields = List.map (fun (k, v) -> k ^ "=" ^ v) fields in
Printf.sprintf "interface,vm=%s,ifname=%s %s"
vm ifd.name (String.concat ~sep:"," fields)
end
let my_version = `WV1
let command = ref 1
let (req : string IM.t ref) = ref IM.empty
let rec read_sock db c fd =
let open Vmm_wire in
Vmm_lwt.read_exactly c >>= function
| Error _ -> Lwt.return_unit
| Ok (hdr, data) ->
if not (version_eq hdr.version my_version) then begin
Logs.err (fun m -> m "unknown wire protocol version") ; Lwt.return_unit
end else
let name = IM.find hdr.id !req in
req := IM.remove hdr.id !req ;
match Stats.int_to_op hdr.tag with
| Some Stats.Stat_reply ->
begin match Vmm_wire.Stats.decode_stats (Cstruct.of_string data) with
| Error (`Msg msg) ->
Logs.warn (fun m -> m "couldn't decode stats for %s: %s" name msg) ;
read_sock db c fd
| Ok (ru, vmm, ifs) ->
let ru = P.encode_ru name ru in
let vmm = P.encode_vmm name vmm in
let taps = List.map (P.encode_if name) ifs in
let out = String.concat ~sep:"\n" (ru :: vmm :: taps @ [ "" ]) in
Logs.info (fun m -> m "result: %s" out) ;
Vmm_lwt.write_raw fd out >>= function
| Ok () -> read_sock db c fd
| Error _ -> invalid_arg "failed to write via TCP"
end
| _ when hdr.tag = fail_tag ->
Logs.err (fun m -> m "failed to retrieve statistics for %s" name) ;
read_sock db c fd
| _ ->
Logs.err (fun m -> m "unhandled tag %d for %s" hdr.tag name) ;
read_sock db c fd
let rec query_sock prefix db c interval =
(* query c for everyone in db *)
Lwt_list.iter_s (fun (id, name) ->
let id = identifier id in
let id = match prefix with None -> id | Some p -> p ^ "." ^ id in
let request = Vmm_wire.Stats.stat !command my_version id in
req := IM.add !command name !req ;
incr command ;
Vmm_lwt.write_raw c request >>= function
| Ok () -> Lwt.return_unit
| Error _ -> Lwt.fail_with "exception while writing")
db >>= fun () ->
Lwt_unix.sleep (float_of_int interval) >>= fun () ->
query_sock prefix db c interval
let client stat_socket influxhost influxport db prefix interval =
(* start a socket connection to vmm_stats *)
let c = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in
Lwt_unix.set_close_on_exec c ;
Lwt.catch (fun () -> Lwt_unix.(connect c (ADDR_UNIX stat_socket)))
(fun e ->
Logs.warn (fun m -> m "error %s connecting to socket %s"
(Printexc.to_string e) stat_socket) ;
invalid_arg "cannot connect to stat socket") >>= fun () ->
(* setup remote connection to influx *)
Lwt_unix.gethostbyname influxhost >>= fun host_entry ->
let host_inet_addr = Array.get host_entry.Lwt_unix.h_addr_list 0 in
let fd = Lwt_unix.socket host_entry.Lwt_unix.h_addrtype Lwt_unix.SOCK_STREAM 0 in
Lwt_unix.setsockopt fd Lwt_unix.SO_KEEPALIVE true ;
Lwt_unix.connect fd (Lwt_unix.ADDR_INET (host_inet_addr, influxport)) >>= fun () ->
(* loop *)
Lwt.join [ query_sock prefix db c interval ; read_sock db c fd ]
let run_client _ socket (influxhost, influxport) db prefix interval =
Sys.(set_signal sigpipe Signal_ignore) ;
let db =
let open Rresult.R.Infix in
match Bos.OS.File.read_lines (Fpath.v db) >>= parse_db with
| Ok [] -> invalid_arg "empty database"
| Ok db -> db
| Error (`Msg m) ->
invalid_arg ("couldn't parse database " ^ m)
in
Lwt_main.run (client socket influxhost influxport db prefix interval)
let setup_log style_renderer level =
Fmt_tty.setup_std_outputs ?style_renderer ();
Logs.set_level level;
Logs.set_reporter (Logs_fmt.reporter ~dst:Format.std_formatter ())
open Cmdliner
let setup_log =
Term.(const setup_log
$ Fmt_cli.style_renderer ()
$ Logs_cli.level ())
let host_port : (string * int) Arg.converter =
let parse s =
match String.cut ~sep:":" s with
| None -> `Error "broken: no port specified"
| Some (hostname, port) ->
try
`Ok (hostname, int_of_string port)
with
Not_found -> `Error "failed to parse port"
in
parse, fun ppf (h, p) -> Format.fprintf ppf "%s:%d" h p
let socket =
let doc = "Stat socket to connect onto" in
Arg.(required & pos 0 (some string) None & info [] ~doc)
let influx =
Arg.(required & pos 1 (some host_port) None & info [] ~docv:"influx"
~doc:"the influx hostname:port to connect to")
let db =
let doc = "VMID database" in
Arg.(required & pos 2 (some file) None & info [] ~doc)
let prefix =
let doc = "prefix" in
Arg.(value & opt (some string) None & info [ "prefix" ] ~doc)
let interval =
let doc = "Poll interval in seconds" in
Arg.(value & opt int 10 & info [ "interval" ] ~doc)
let cmd =
let doc = "VMM InfluxDB connector" in
let man = [
`S "DESCRIPTION" ;
`P "$(tname) connects to a vmm stats socket, pulls statistics and pushes them via TCP to influxdb" ]
in
Term.(pure run_client $ setup_log $ socket $ influx $ db $ prefix $ interval),
Term.info "vmm_influxdb_stats" ~version:"%%VERSION_NUM%%" ~doc ~man
let () =
match Term.eval cmd
with `Error _ -> exit 1 | _ -> exit 0

View file

@ -120,6 +120,9 @@ let jump _ file sock =
Sys.(set_signal sigpipe Signal_ignore) ;
Lwt_main.run
(Lwt_unix.openfile file Lwt_unix.[O_APPEND;O_CREAT;O_WRONLY] 0o600 >>= fun fd ->
(Lwt_unix.file_exists sock >>= function
| true -> Lwt_unix.unlink sock
| false -> Lwt.return_unit) >>= fun () ->
let s = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in
Lwt_unix.(bind s (ADDR_UNIX sock)) >>= fun () ->
Lwt_unix.listen s 1 ;
@ -144,12 +147,13 @@ let setup_log =
$ Logs_cli.level ())
let socket =
let doc = "Socket to listen onto" in
Arg.(required & pos 1 (some string) None & info [] ~doc)
let doc = "Socket to listen on" in
let sock = Fpath.(to_string (Vmm_core.tmpdir / "log" + "sock")) in
Arg.(value & opt string sock & info [ "s" ; "socket" ] ~doc)
let file =
let doc = "File to write to" in
Arg.(required & pos 0 (some string) None & info [] ~doc)
let doc = "File to write the log to" in
Arg.(value & opt string "/var/log/albatross" & info [ "logfile" ] ~doc)
let cmd =
Term.(ret (const jump $ setup_log $ file $ socket)),

View file

@ -343,10 +343,10 @@ let db =
Arg.(value & opt (some file) None & info [ "db" ] ~doc)
let cmd =
let doc = "VMM TLS client" in
let doc = "VMM Prometheus connector" in
let man = [
`S "DESCRIPTION" ;
`P "$(tname) connects to a server and initiates a TLS handshake" ]
`P "$(tname) connects to a VMMD to gather statistics and serves them for Prometheus via HTTP" ]
in
Term.(pure run_client $ setup_log $ cas $ client_cert $ client_key $ destination $ db $ address $ port),
Term.info "vmm_prometheus_stats" ~version:"%%VERSION_NUM%%" ~doc ~man

View file

@ -216,16 +216,15 @@ let rec stats_loop () =
Lwt_unix.sleep 600. >>= fun () ->
stats_loop ()
let jump _ dir cacert cert priv_key port =
let jump _ cacert cert priv_key port =
Sys.(set_signal sigpipe Signal_ignore) ;
let dir = Fpath.v dir in
Lwt_main.run
(Nocrypto_entropy_lwt.initialize () >>= fun () ->
(init_sock dir "cons" >|= function
(init_sock Vmm_core.tmpdir "cons" >|= function
| None -> invalid_arg "cannot connect to console socket"
| Some c -> c) >>= fun c ->
init_sock dir "stat" >>= fun s ->
(init_sock dir "log" >|= function
init_sock Vmm_core.tmpdir "stat" >>= fun s ->
(init_sock Vmm_core.tmpdir "log" >|= function
| None -> invalid_arg "cannot connect to log socket"
| Some l -> l) >>= fun l ->
server_socket port >>= fun socket ->
@ -237,7 +236,7 @@ let jump _ dir cacert cert priv_key port =
Tls.(Config.server ~version:(Core.TLS_1_2, Core.TLS_1_2)
~reneg:true ~certificates:(`Single cert) ())
in
(match Vmm_engine.init dir cmp_s c s l with
(match Vmm_engine.init cmp_s c s l with
| Ok s -> Lwt.return s
| Error (`Msg m) -> Lwt.fail_with m) >>= fun t ->
let state = ref t in
@ -289,28 +288,24 @@ let setup_log =
$ Fmt_cli.style_renderer ()
$ Logs_cli.level ())
let wdir =
let doc = "Working directory (unix domain sockets, etc.)" in
Arg.(required & pos 0 (some dir) None & info [] ~doc)
let cacert =
let doc = "CA certificate" in
Arg.(required & pos 1 (some file) None & info [] ~doc)
Arg.(required & pos 0 (some file) None & info [] ~doc)
let cert =
let doc = "Certificate" in
Arg.(required & pos 2 (some file) None & info [] ~doc)
Arg.(required & pos 1 (some file) None & info [] ~doc)
let key =
let doc = "Private key" in
Arg.(required & pos 3 (some file) None & info [] ~doc)
Arg.(required & pos 2 (some file) None & info [] ~doc)
let port =
let doc = "TCP listen port" in
Arg.(value & opt int 1025 & info [ "port" ] ~doc)
let cmd =
Term.(ret (const jump $ setup_log $ wdir $ cacert $ cert $ key $ port)),
Term.(ret (const jump $ setup_log $ cacert $ cert $ key $ port)),
Term.info "vmmd" ~version:"%%VERSION_NUM%%"
let () = match Term.eval cmd with `Ok () -> exit 0 | _ -> exit 1

View file

@ -18,6 +18,6 @@ let () =
Pkg.bin "provision/vmm_gen_ca" ;
Pkg.clib "stats/libvmm_stats_stubs.clib" ;
Pkg.bin "stats/vmm_stats_lwt" ;
Pkg.bin "stats/vmm_stats_once" ;
Pkg.bin "app/vmm_prometheus_stats" ;
Pkg.bin "app/vmm_influxdb_stats" ;
]

View file

@ -49,7 +49,7 @@ let rec close fd =
let close_no_err fd = try close fd with _ -> ()
(* own code starts here
(c) 2017 Hannes Mehnert, all rights reserved *)
(c) 2017, 2018 Hannes Mehnert, all rights reserved *)
open Vmm_core
@ -58,9 +58,8 @@ let rec mkfifo name =
| Unix.Unix_error (Unix.EINTR, _, _) -> mkfifo name
let image_file, fifo_file =
let tmp = Fpath.v (Filename.get_temp_dir_name ()) in
((fun vm -> Fpath.(tmp / (vm_id vm) + "img")),
(fun vm -> Fpath.(tmp / (vm_id vm) + "fifo")))
((fun vm -> Fpath.(tmpdir / (vm_id vm) + "img")),
(fun vm -> Fpath.(tmpdir / (vm_id vm) + "fifo")))
let rec fifo_exists file =
try Ok (Unix.((stat @@ Fpath.to_string file).st_kind = S_FIFO)) with
@ -157,13 +156,13 @@ let cpuset cpu =
Ok ([ "taskset" ; "-c" ; cpustring ])
| x -> Error (`Msg ("unsupported operating system " ^ x))
let exec dir vm taps =
let exec vm taps =
(* TODO: --net-mac=xx *)
let net = List.map (fun t -> "--net=" ^ t) taps in
let argv = match vm.argv with None -> [] | Some xs -> xs in
(match taps with
| [] -> Ok Fpath.(dir / "ukvm-bin.none")
| [_] -> Ok Fpath.(dir / "ukvm-bin.net")
| [] -> Ok Fpath.(dbdir / "ukvm-bin.none")
| [_] -> Ok Fpath.(dbdir / "ukvm-bin.net")
| _ -> Error (`Msg "cannot handle multiple network interfaces")) >>= fun bin ->
cpuset vm.cpuid >>= fun cpuset ->
let mem = "--mem=" ^ string_of_int vm.requested_memory in

View file

@ -8,7 +8,7 @@ val prepare : vm_config -> (string list, [> R.msg ]) result
val shutdown : vm -> (unit, [> R.msg ]) result
val exec : Fpath.t -> vm_config -> string list -> (vm, [> R.msg ]) result
val exec : vm_config -> string list -> (vm, [> R.msg ]) result
val destroy : vm -> unit

View file

@ -4,6 +4,9 @@ open Astring
open Rresult.R.Infix
let tmpdir = Fpath.(v "/var" / "run" / "albatross")
let dbdir = Fpath.(v "/var" / "db" / "albatross")
module I = struct
type t = int
let compare : int -> int -> int = compare

View file

@ -8,7 +8,6 @@ open Rresult
open R.Infix
type ('a, 'b, 'c) t = {
dir : Fpath.t ;
cmp : 'b -> 'b -> bool ;
console_socket : 'a ;
console_counter : int ;
@ -34,9 +33,9 @@ type ('a, 'b, 'c) t = {
crls : X509.CRL.c list ;
}
let init dir cmp console_socket stats_socket log_socket =
let init cmp console_socket stats_socket log_socket =
(* error hard on permission denied etc. *)
let crls = Fpath.(dir / "crls") in
let crls = Fpath.(dbdir / "crls") in
(Bos.OS.Dir.exists crls >>= function
| true -> Ok true
| false -> Bos.OS.Dir.create crls) >>= fun _ ->
@ -49,14 +48,14 @@ let init dir cmp console_socket stats_socket log_socket =
| None -> R.error_msgf "couldn't parse CRL %a" Fpath.pp f
| Some crl -> Ok (crl :: acc))
(Ok [])
Fpath.(dir / "crls") >>= fun crls ->
crls >>= fun crls ->
crls >>= fun crls ->
Ok {
dir ; cmp ;
cmp ;
console_socket ; console_counter = 1 ; console_requests = IM.empty ;
console_attached = String.Map.empty ; console_version = `WV0 ;
stats_socket ; stats_counter = 1 ; stats_requests = IM.empty ;
stats_version = `WV0 ;
stats_version = `WV1 ;
log_socket ; log_counter = 1 ; log_attached = String.Map.empty ;
log_version = `WV0 ; log_requests = IM.empty ;
client_version = `WV0 ;
@ -124,7 +123,7 @@ let handle_create t vm_config policies =
Logs.debug (fun m -> m "prepared vm with taps %a" Fmt.(list ~sep:(unit ",@ ") string) taps) ;
Ok (fun t s ->
(* actually execute the vm *)
Vmm_commands.exec t.dir vm_config taps >>= fun vm ->
Vmm_commands.exec vm_config taps >>= fun vm ->
Logs.debug (fun m -> m "exec()ed vm") ;
Vmm_resources.insert t.resources full vm >>= fun resources ->
let used_bridges =
@ -142,8 +141,8 @@ let handle_create t vm_config policies =
Ok (t, `Tls (s, tls_out) :: out, vm))
let setup_stats t vm =
Vmm_commands.setup_freebsd_kludge vm.Vmm_core.pid >>= fun () ->
let stat_out = Vmm_wire.Stats.add t.stats_counter t.stats_version vm.pid vm.taps in
Vmm_commands.setup_freebsd_kludge vm.pid >>= fun () ->
let stat_out = Vmm_wire.Stats.add t.stats_counter t.stats_version (vm_id vm.config) vm.pid vm.taps in
let t = { t with stats_counter = succ t.stats_counter } in
Ok (t, stat t stat_out)
@ -167,7 +166,7 @@ let handle_shutdown t vm r =
String.Map.add br (String.Set.remove ta old) b)
t.used_bridges vm.config.network vm.taps
in
let stat_out = Vmm_wire.Stats.remove t.stats_counter t.stats_version vm.pid in
let stat_out = Vmm_wire.Stats.remove t.stats_counter t.stats_version (vm_id vm.config) in
let tasks = String.Map.remove (vm_id vm.config) t.tasks in
let t = { t with stats_counter = succ t.stats_counter ; resources ; used_bridges ; tasks } in
let t, outs = log t (Log.hdr vm.config.prefix vm.config.vname,
@ -185,6 +184,7 @@ let handle_command t s prefix perms hdr buf =
begin
Vmm_wire.decode_str buf >>= fun (buf, _l) ->
let arg = if String.length buf = 0 then prefix else prefix @ [buf] in
let vmid = string_of_id arg in
match x with
| Info ->
begin match Vmm_resources.find t.resources arg with
@ -211,29 +211,27 @@ let handle_command t s prefix perms hdr buf =
end
| Attach ->
(* TODO: get (optionally) <since> from client, instead of hardcoding Ptime.epoch below *)
let name = String.concat ~sep:"." arg in
let on_success t =
let cons = Vmm_wire.Console.history t.console_counter t.console_version name Ptime.epoch in
let old = match String.Map.find name t.console_attached with
let cons = Vmm_wire.Console.history t.console_counter t.console_version vmid Ptime.epoch in
let old = match String.Map.find vmid t.console_attached with
| None -> []
| Some s ->
let out = Vmm_wire.success hdr.Vmm_wire.id t.client_version in
[ `Tls (s, out) ]
in
let console_attached = String.Map.add name s t.console_attached in
let console_attached = String.Map.add vmid s t.console_attached in
{ t with console_counter = succ t.console_counter ; console_attached },
`Raw (t.console_socket, cons) :: old
in
let cons = Vmm_wire.Console.attach t.console_counter t.console_version name in
let cons = Vmm_wire.Console.attach t.console_counter t.console_version vmid in
let console_requests = IM.add t.console_counter on_success t.console_requests in
Ok ({ t with console_counter = succ t.console_counter ; console_requests },
[ `Raw (t.console_socket, cons) ])
| Detach ->
let name = String.concat ~sep:"." arg in
let cons = Vmm_wire.Console.detach t.console_counter t.console_version name in
(match String.Map.find name t.console_attached with
let cons = Vmm_wire.Console.detach t.console_counter t.console_version vmid in
(match String.Map.find vmid t.console_attached with
| None -> Error (`Msg "not attached")
| Some x when t.cmp x s -> Ok (String.Map.remove name t.console_attached)
| Some x when t.cmp x s -> Ok (String.Map.remove vmid t.console_attached)
| Some _ -> Error (`Msg "this socket is not attached")) >>= fun console_attached ->
let out = Vmm_wire.success hdr.Vmm_wire.id t.client_version in
Ok ({ t with console_counter = succ t.console_counter ; console_attached },
@ -243,7 +241,7 @@ let handle_command t s prefix perms hdr buf =
| None -> Error (`Msg "no statistics available")
| Some _ -> match Vmm_resources.find_vm t.resources arg with
| Some vm ->
let stat_out = Vmm_wire.Stats.stat t.stats_counter t.stats_version vm.pid in
let stat_out = Vmm_wire.Stats.stat t.stats_counter t.stats_version vmid in
let d = (s, hdr.Vmm_wire.id, translate_tap vm) in
let stats_requests = IM.add t.stats_counter d t.stats_requests in
Ok ({ t with stats_counter = succ t.stats_counter ; stats_requests },
@ -326,7 +324,7 @@ let handle_revocation t s leaf chain ca prefix =
| Some _, None -> Error (`Msg "CRL number not present")
| Some x, Some y -> if y > x then Ok () else Error (`Msg "CRL number not increased")) >>= fun () ->
(* filename should be whatever_dir / crls / <id> *)
let filename = Fpath.(t.dir / "crls" / string_of_id prefix) in
let filename = Fpath.(dbdir / "crls" / string_of_id prefix) in
Bos.OS.File.delete filename >>= fun () ->
Bos.OS.File.write filename (Cstruct.to_string (X509.Encoding.crl_to_cstruct crl)) >>= fun () ->
(* remove crl with same issuer from crls, and inject this one into state *)

View file

@ -16,21 +16,26 @@ open Astring
open Vmm_core
type version = [ `WV0 ]
type version = [ `WV0 | `WV1 ]
let version_to_int = function
| `WV0 -> 0
| `WV1 -> 1
let version_of_int = function
| 0 -> Ok `WV0
| 1 -> Ok `WV1
| _ -> Error (`Msg "unknown wire version")
let version_eq a b = match a, b with
| `WV0, `WV0 -> true
| `WV1, `WV1 -> true
| _ -> false
let pp_version ppf v =
Fmt.string ppf (match v with
| `WV0 -> "wire version 0")
| `WV0 -> "wire version 0"
| `WV1 -> "wire version 1")
type header = {
length : int ;
@ -243,16 +248,16 @@ module Stats = struct
[@@uint16_t]
]
let encode id version op ?payload pid =
let pid = encode_pid pid in
let encode id version op ?payload nam =
let data, l = encode_string nam in
let length, p =
match payload with
| None -> 4, empty
| Some x -> 4 + Cstruct.len x, x
| None -> l, empty
| Some x -> l + Cstruct.len x, x
and tag = op_to_int op
in
let r =
Cstruct.concat [ create_header { length ; version ; id ; tag } ; pid ; p ]
Cstruct.concat [ create_header { length ; version ; id ; tag } ; data ; p ]
in
Cstruct.to_string r
@ -352,13 +357,13 @@ module Stats = struct
output_mcast ; input_dropped ; output_dropped },
l + 116)
let add id v pid taps =
let payload = encode_strings taps in
encode id v Add ~payload pid
let add id v nam pid taps =
let payload = Cstruct.append (encode_pid pid) (encode_strings taps) in
encode id v Add ~payload nam
let remove id v pid = encode id v Remove pid
let remove id v nam = encode id v Remove nam
let stat id v pid = encode id v Stat_request pid
let stat id v nam = encode id v Stat_request nam
let stat_reply id version payload =
let length = Cstruct.len payload

View file

@ -16,7 +16,7 @@ external vmmapi_close : vmctx -> unit = "vmmanage_vmmapi_close"
external vmmapi_statnames : vmctx -> string list = "vmmanage_vmmapi_statnames"
external vmmapi_stats : vmctx -> int64 list = "vmmanage_vmmapi_stats"
let my_version = `WV0
let my_version = `WV1
let descr = ref []
@ -25,12 +25,13 @@ type t = {
pid_rusage : rusage IM.t ;
pid_vmmapi : (string * int64) list IM.t ;
nic_ifdata : ifdata String.Map.t ;
vmid_pid : int String.Map.t ;
}
let pp_strings pp taps = Fmt.(list ~sep:(unit ",@ ") string) pp taps
let empty () =
{ pid_nic = IM.empty ; pid_rusage = IM.empty ; pid_vmmapi = IM.empty ; nic_ifdata = String.Map.empty }
{ pid_nic = IM.empty ; pid_rusage = IM.empty ; pid_vmmapi = IM.empty ; nic_ifdata = String.Map.empty ; vmid_pid = String.Map.empty }
let rec wrap f arg =
try Some (f arg) with
@ -91,7 +92,7 @@ let fill_descr ctx =
end
| ds -> Logs.info (fun m -> m "%d descr are already present" (List.length ds))
let add_pid t pid nics =
let add_pid t vmid pid nics =
let name = "ukvm" ^ string_of_int pid in
match wrap sysctl_ifcount () with
| None ->
@ -117,47 +118,59 @@ let add_pid t pid nics =
Ok (Some vmctx)) >>= fun vmctx ->
Logs.info (fun m -> m "adding %d %a with vmctx %b" pid pp_strings nics
(match vmctx with None -> false | Some _ -> true)) ;
let pid_nic = IM.add pid (vmctx, nic_ids) t.pid_nic in
Ok { t with pid_nic }
let stats t pid =
Logs.debug (fun m -> m "querying statistics for %d" pid) ;
try
let _, nics = IM.find pid t.pid_nic
and ru = IM.find pid t.pid_rusage
and vmm =
try IM.find pid t.pid_vmmapi with
| Not_found ->
Logs.err (fun m -> m "failed to find vmm stats for %d" pid);
[]
let pid_nic = IM.add pid (vmctx, nic_ids) t.pid_nic
and vmid_pid = String.Map.add vmid pid t.vmid_pid
in
match
List.fold_left (fun acc nic ->
match String.Map.find nic t.nic_ifdata, acc with
| None, _ -> None
| _, None -> None
| Some ifd, Some acc -> Some (ifd :: acc))
(Some []) (snd (List.split nics))
Ok { t with pid_nic ; vmid_pid }
let stats t vmid =
Logs.debug (fun m -> m "querying statistics for vmid %s" vmid) ;
match String.Map.find vmid t.vmid_pid with
| None -> Error (`Msg ("unknown vm " ^ vmid))
| Some pid ->
Logs.debug (fun m -> m "querying statistics for %d" pid) ;
try
let _, nics = IM.find pid t.pid_nic
and ru = IM.find pid t.pid_rusage
and vmm =
try IM.find pid t.pid_vmmapi with
| Not_found ->
Logs.err (fun m -> m "failed to find vmm stats for %d" pid);
[]
in
match
List.fold_left (fun acc nic ->
match String.Map.find nic t.nic_ifdata, acc with
| None, _ -> None
| _, None -> None
| Some ifd, Some acc -> Some (ifd :: acc))
(Some []) (snd (List.split nics))
with
| None -> Error (`Msg "failed to find interface statistics")
| Some ifd -> Ok (ru, vmm, ifd)
with
| None -> Error (`Msg "failed to find interface statistics")
| Some ifd -> Ok (ru, vmm, ifd)
with
| _ -> Error (`Msg "failed to find resource usage")
| _ -> Error (`Msg "failed to find resource usage")
let remove_pid t pid =
Logs.info (fun m -> m "removing pid %d" pid) ;
(try
match IM.find pid t.pid_nic with
| Some vmctx, _ -> ignore (wrap vmmapi_close vmctx)
| None, _ -> ()
with
_ -> ()) ;
let pid_nic = IM.remove pid t.pid_nic in
{ t with pid_nic }
let remove_vmid t vmid =
Logs.info (fun m -> m "removing vmid %s" vmid) ;
match String.Map.find vmid t.vmid_pid with
| None -> Logs.warn (fun m -> m "no pid found for %s" vmid) ; t
| Some pid ->
Logs.info (fun m -> m "removing pid %d" pid) ;
(try
match IM.find pid t.pid_nic with
| Some vmctx, _ -> ignore (wrap vmmapi_close vmctx)
| None, _ -> ()
with
_ -> ()) ;
let pid_nic = IM.remove pid t.pid_nic
and vmid_pid = String.Map.remove vmid t.vmid_pid
in
{ t with pid_nic ; vmid_pid }
let remove_pids t pids =
List.fold_left remove_pid t pids
let remove_vmids t vmids =
List.fold_left remove_vmid t vmids
let handle t hdr buf =
let open Vmm_wire in
@ -167,18 +180,17 @@ let handle t hdr buf =
if not (version_eq my_version hdr.version) then
Error (`Msg "cannot handle version")
else
decode_string cs >>= fun (name, off) ->
match int_to_op hdr.tag with
| Some Add ->
decode_pid_taps cs >>= fun (pid, taps) ->
add_pid t pid taps >>= fun t ->
Ok (t, `Add pid, success ~msg:"added" hdr.id my_version)
decode_pid_taps (Cstruct.shift cs off) >>= fun (pid, taps) ->
add_pid t name pid taps >>= fun t ->
Ok (t, `Add name, success ~msg:"added" hdr.id my_version)
| Some Remove ->
decode_pid cs >>= fun pid ->
let t = remove_pid t pid in
Ok (t, `Remove pid, success ~msg:"removed" hdr.id my_version)
let t = remove_vmid t name in
Ok (t, `Remove name, success ~msg:"removed" hdr.id my_version)
| Some Stat_request ->
decode_pid cs >>= fun pid ->
stats t pid >>= fun s ->
stats t name >>= fun s ->
Ok (t, `None, stat_reply hdr.id my_version (encode_stats s))
| _ -> Error (`Msg "unknown command")
in

View file

@ -41,10 +41,10 @@ let handle s addr () =
| Ok () -> loop acc
| Error _ -> Logs.err (fun m -> m "exception while writing") ; Lwt.return acc
in
loop [] >>= fun pids ->
loop [] >>= fun vmids ->
Lwt.catch (fun () -> Lwt_unix.close s) (fun _ -> Lwt.return_unit) >|= fun () ->
Logs.warn (fun m -> m "disconnect, dropping %d pids!" (List.length pids)) ;
let t' = Vmm_stats.remove_pids !t pids in
Logs.warn (fun m -> m "disconnect, dropping %d vms!" (List.length vmids)) ;
let t' = Vmm_stats.remove_vmids !t vmids in
t := t'
let rec timer interval () =
@ -56,7 +56,10 @@ let jump _ file interval =
Sys.(set_signal sigpipe Signal_ignore) ;
let interval = Duration.(to_f (of_sec interval)) in
Lwt_main.run
(let s = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in
((Lwt_unix.file_exists file >>= function
| true -> Lwt_unix.unlink file
| false -> Lwt.return_unit) >>= fun () ->
let s = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in
Lwt_unix.(bind s (ADDR_UNIX file)) >>= fun () ->
Lwt_unix.listen s 1 ;
Lwt.async (timer interval) ;
@ -80,8 +83,9 @@ let setup_log =
$ Logs_cli.level ())
let socket =
let doc = "Socket to listen onto" in
Arg.(value & pos 0 string "" & info [] ~doc)
let doc = "Socket to listen on" in
let sock = Fpath.(to_string (Vmm_core.tmpdir / "stat" + "sock")) in
Arg.(value & opt string sock & info [ "s" ; "socket" ] ~doc)
let interval =
let doc = "Interval between statistics gatherings (in seconds)" in

View file

@ -1,70 +0,0 @@
(* (c) 2017, 2018 Hannes Mehnert, all rights reserved *)
(* the process responsible for gathering statistics (CPU + mem + network) *)
open Lwt.Infix
let t = ref (Vmm_stats.empty ())
let rec timer pids () =
t := Vmm_stats.tick !t ;
List.iter (fun pid ->
match Vmm_stats.stats !t pid with
| Ok (ru, vmm, ifd) ->
Logs.info (fun m -> m "stats %d@.%a@.%a@.%a@."
pid Vmm_core.pp_rusage ru
Fmt.(list ~sep:(unit "@.") (pair ~sep:(unit ": ") string int64)) vmm
Fmt.(list ~sep:(unit "@.") Vmm_core.pp_ifdata) ifd)
| Error (`Msg e) ->
Logs.err (fun m -> m "error %s while getting stats of %d" e pid))
pids ;
Lwt_unix.sleep Duration.(to_f (of_sec 1)) >>= fun () ->
timer pids ()
let split_pid xs =
List.fold_left (fun acc str ->
match Astring.String.cuts ~sep:":" str with
| pid :: taps -> (int_of_string pid, taps) :: acc
| [] -> invalid_arg "invalid pid") [] xs
let jump _ pids =
Sys.(set_signal sigpipe Signal_ignore) ;
let pid_taps = split_pid pids in
let st =
List.fold_left (fun t (pid, taps) ->
match Vmm_stats.add_pid t pid taps with
| Ok t ->
Logs.info (fun m -> m "added pid %d taps %a"
pid Fmt.(list ~sep:(unit ", ") string) taps) ;
t
| Error (`Msg ms) ->
Logs.err (fun m -> m "error %s while adding pid %d taps %a"
ms pid Fmt.(list ~sep:(unit ", ") string) taps);
invalid_arg "broken")
!t pid_taps
in
t := st ;
let pids = fst (List.split pid_taps) in
`Ok (Lwt_main.run (timer pids ()))
let setup_log style_renderer level =
Fmt_tty.setup_std_outputs ?style_renderer ();
Logs.set_level level;
Logs.set_reporter (Logs_fmt.reporter ~dst:Format.std_formatter ())
open Cmdliner
let setup_log =
Term.(const setup_log
$ Fmt_cli.style_renderer ()
$ Logs_cli.level ())
let pids =
let doc = "pids" in
Arg.(value & opt_all string [] & info [ "pid" ] ~doc)
let cmd =
Term.(ret (const jump $ setup_log $ pids)),
Term.info "vmm_stats_once" ~version:"%%VERSION_NUM%%"
let () = match Term.eval cmd with `Ok () -> exit 0 | _ -> exit 1