initial metrics

This commit is contained in:
Hannes Mehnert 2019-10-10 22:26:36 +02:00
parent 94912c21e4
commit f81a12bc4d
23 changed files with 422 additions and 282 deletions

View File

@ -26,8 +26,15 @@ depends: [
"duration"
"decompress" {>= "0.9.0" & < "1.0.0"}
"checkseum"
"metrics"
"metrics-lwt"
"metrics-influx"
]
pin-depends: [
["metrics.dev" "git+https://github.com/hannesm/metrics.git#future"]
["metrics-lwt.dev" "git+https://github.com/hannesm/metrics.git#future"]
["metrics-influx.dev" "git+https://github.com/hannesm/metrics.git#future"]
]
build: [
["dune" "subst"] {pinned}
["dune" "build" "-p" name "-j" jobs]

View File

@ -10,7 +10,10 @@ let read fd =
Logs.debug (fun m -> m "reading tls stream") ;
let rec loop () =
Vmm_tls_lwt.read_tls fd >>= function
| Error _ -> Lwt.return ()
| Error `Eof ->
Logs.warn (fun m -> m "eof from server");
Lwt.return (Ok ())
| Error _ -> Lwt.return (Error (`Msg ("read failure")))
| Ok wire ->
Albatross_cli.print_result version wire ;
loop ()
@ -71,19 +74,25 @@ let handle (host, port) cert key ca id (cmd : Vmm_commands.t) =
X509_lwt.authenticator (`Ca_file ca) >>= fun authenticator ->
Lwt_unix.gethostbyname host >>= fun host_entry ->
let host_inet_addr = Array.get host_entry.Lwt_unix.h_addr_list 0 in
let fd = Lwt_unix.(socket host_entry.h_addrtype SOCK_STREAM 0) in
Logs.debug (fun m -> m "connecting to remote host") ;
Lwt_unix.connect fd (Lwt_unix.ADDR_INET (host_inet_addr, port)) >>= fun () ->
(* reneg true to allow re-negotiation over the server-authenticated TLS
channel (to transport client certificate encrypted), once TLS 1.3 is in
(and required) be removed! *)
let client = Tls.Config.client ~reneg:true ~certificates ~authenticator () in
Tls_lwt.Unix.client_of_fd client (* TODO ~host *) fd >>= fun t ->
Logs.debug (fun m -> m "finished tls handshake") ;
read t
let sockaddr = Lwt_unix.ADDR_INET (host_inet_addr, port) in
Vmm_lwt.connect host_entry.h_addrtype sockaddr >>= function
| None ->
let err =
Rresult.R.error_msgf "connection failed to %a" Vmm_lwt.pp_sockaddr sockaddr
in
Lwt.return err
| Some fd ->
Logs.debug (fun m -> m "connecting to remote host") ;
(* reneg true to allow re-negotiation over the server-authenticated TLS
channel (to transport client certificate encrypted), once TLS 1.3 is in
(and required) be removed! *)
let client = Tls.Config.client ~reneg:true ~certificates ~authenticator () in
Tls_lwt.Unix.client_of_fd client (* TODO ~host *) fd >>= fun t ->
Logs.debug (fun m -> m "finished tls handshake") ;
read t
let jump endp cert key ca name cmd =
Ok (Lwt_main.run (handle endp cert key ca name cmd))
Lwt_main.run (handle endp cert key ca name cmd)
let info_policy _ endp cert key ca name =
jump endp cert key ca name (`Policy_cmd `Policy_info)

View File

@ -8,12 +8,6 @@ let socket t = function
| Some x -> x
| None -> Vmm_core.socket_path t
let connect socket_path =
let c = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in
Lwt_unix.set_close_on_exec c ;
Lwt_unix.connect c (Lwt_unix.ADDR_UNIX socket_path) >|= fun () ->
c
let process fd =
Vmm_lwt.read_wire fd >|= function
| Error _ -> Error ()
@ -30,18 +24,26 @@ let read fd =
let handle opt_socket name (cmd : Vmm_commands.t) =
let sock, next = Vmm_commands.endpoint cmd in
connect (socket sock opt_socket) >>= fun fd ->
let header = Vmm_commands.{ version ; sequence = 0L ; name } in
Vmm_lwt.write_wire fd (header, `Command cmd) >>= function
| Error `Exception -> Lwt.return ()
| Ok () ->
(match next with
| `Read -> read fd
| `End -> process fd >|= ignore) >>= fun () ->
Vmm_lwt.safe_close fd
let sockaddr = Lwt_unix.ADDR_UNIX (socket sock opt_socket) in
Vmm_lwt.connect Lwt_unix.PF_UNIX sockaddr >>= function
| None ->
let err =
Rresult.R.error_msgf "couldn't connect to %a" Vmm_lwt.pp_sockaddr sockaddr
in
Lwt.return err
| Some fd ->
let header = Vmm_commands.{ version ; sequence = 0L ; name } in
Vmm_lwt.write_wire fd (header, `Command cmd) >>= function
| Error `Exception -> Lwt.return (Error (`Msg "exception"))
| Ok () ->
(match next with
| `Read -> read fd
| `End -> process fd >|= ignore) >>= fun () ->
Vmm_lwt.safe_close fd >|= fun () ->
Ok ()
let jump opt_socket name cmd =
Ok (Lwt_main.run (handle opt_socket name cmd))
Lwt_main.run (handle opt_socket name cmd)
let info_policy _ opt_socket name =
jump opt_socket name (`Policy_cmd `Policy_info)

View File

@ -6,7 +6,11 @@ let version = `AV3
let rec read_tls_write_cons t =
Vmm_tls_lwt.read_tls t >>= function
| Error _ -> Lwt.return_unit
| Error `Eof ->
Logs.warn (fun m -> m "eof from server");
Lwt.return (Ok ())
| Error _ ->
Lwt.return (Error (`Msg ("read failure")))
| Ok wire ->
Albatross_cli.print_result version wire ;
read_tls_write_cons t
@ -24,17 +28,25 @@ let client cas host port cert priv_key =
- ip: connecto to ip and verify hostname *)
Lwt_unix.gethostbyname host >>= fun host_entry ->
let host_inet_addr = Array.get host_entry.Lwt_unix.h_addr_list 0 in
let fd = Lwt_unix.socket host_entry.Lwt_unix.h_addrtype Lwt_unix.SOCK_STREAM 0 in
Lwt_unix.connect fd (Lwt_unix.ADDR_INET (host_inet_addr, port)) >>= fun _ ->
X509_lwt.private_of_pems ~cert ~priv_key >>= fun cert ->
let certificates = `Single cert in
let client = Tls.Config.client ~reneg:true ~certificates ~authenticator () in
Tls_lwt.Unix.client_of_fd client (* ~host *) fd >>= fun t ->
read_tls_write_cons t)
let sockaddr = Lwt_unix.ADDR_INET (host_inet_addr, port) in
Vmm_lwt.connect host_entry.Lwt_unix.h_addrtype sockaddr >>= function
| None ->
let err =
Rresult.R.error_msgf "couldn't connect to %a" Vmm_lwt.pp_sockaddr sockaddr
in
Lwt.return err
| Some fd ->
X509_lwt.private_of_pems ~cert ~priv_key >>= fun cert ->
let certificates = `Single cert in
let client = Tls.Config.client ~reneg:true ~certificates ~authenticator () in
Tls_lwt.Unix.client_of_fd client (* ~host *) fd >>= fun t ->
read_tls_write_cons t)
(fun exn ->
Logs.err (fun m -> m "failed to establish TLS connection: %s"
(Printexc.to_string exn)) ;
Lwt.return_unit)
let err =
Rresult.R.error_msgf "failed to establish TLS connection: %s"
(Printexc.to_string exn)
in
Lwt.return err)
let run_client _ cas cert key (host, port) =
Printexc.register_printer (function

View File

@ -3,6 +3,68 @@
open Astring
open Vmm_core
let conn_metrics kind =
let s = ref (0, 0) in
let open Metrics in
let doc = "connection statistics" in
let data () =
Data.v [
int "active" (fst !s) ;
int "total" (snd !s) ;
] in
let tags = Tags.string "kind" in
let src = Src.v ~doc ~tags:Tags.[ tags ] ~data "connections" in
(fun action ->
(match action with
| `Open -> s := (succ (fst !s), succ (snd !s))
| `Close -> s := (pred (fst !s), snd !s));
Metrics.add src (fun x -> x kind) (fun d -> d ()))
open Lwt.Infix
let process =
Metrics.field ~doc:"name of the process" "process" Metrics.String
let init_influx name data =
match data with
| None -> ()
| Some (ip, port) ->
Logs.info (fun m -> m "stats connecting to %a:%d" Ipaddr.V4.pp ip port);
Metrics.enable_all ();
Metrics_lwt.init_periodic (fun () -> Lwt_unix.sleep 10.);
let get_cache, reporter = Metrics.cache_reporter () in
Metrics.set_reporter reporter;
let fd = ref None in
let rec report () =
let send () =
(match !fd with
| Some _ -> Lwt.return_unit
| None ->
let addr = Lwt_unix.ADDR_INET (Ipaddr_unix.V4.to_inet_addr ip, port) in
Vmm_lwt.connect Lwt_unix.PF_INET addr >|= function
| None -> Logs.err (fun m -> m "connection failure to stats")
| Some fd' -> fd := Some fd') >>= fun () ->
match !fd with
| None -> Lwt.return_unit
| Some socket ->
let tag = process name in
let datas = Metrics.SM.fold (fun src (tags, data) acc ->
let name = Metrics.Src.name src in
Metrics_influx.encode_line_protocol (tag :: tags) data name :: acc)
(get_cache ()) []
in
let datas = String.concat ~sep:"" datas in
Vmm_lwt.write_raw socket (Bytes.unsafe_of_string datas) >|= function
| Ok () -> ()
| Error `Exception ->
Logs.warn (fun m -> m "error on stats write");
fd := None
and sleep () = Lwt_unix.sleep 10.
in
Lwt.join [ send () ; sleep () ] >>= report
in
Lwt.async report
let print_result version (header, reply) =
if not (Vmm_commands.version_eq header.Vmm_commands.version version) then
Logs.err (fun m -> m "version not equal")
@ -43,9 +105,30 @@ let setup_log =
$ Fmt_cli.style_renderer ()
$ Logs_cli.level ())
let ip_port : (Ipaddr.V4.t * int) Arg.converter =
let default_port = 8094 in
let parse s =
match
match String.cut ~sep:":" s with
| None -> Ok (s, default_port)
| Some (ip, port) -> match int_of_string port with
| exception Failure _ -> Error "non-numeric port"
| port -> Ok (ip, port)
with
| Error msg -> `Error msg
| Ok (ip, port) -> match Ipaddr.V4.of_string ip with
| Ok ip -> `Ok (ip, port)
| Error `Msg msg -> `Error msg
in
parse, fun ppf (ip, port) -> Format.fprintf ppf "%a:%d" Ipaddr.V4.pp ip port
let influx =
let doc = "IP address and port (default: 8094) to report metrics to in influx line protocol" in
Arg.(value & opt (some ip_port) None & info [ "influx" ] ~doc ~docv:"INFLUXHOST[:PORT]")
let host_port : (string * int) Arg.converter =
let parse s =
match Astring.String.cut ~sep:":" s with
match String.cut ~sep:":" s with
| None -> `Error "broken: no port specified"
| Some (hostname, port) ->
try
@ -81,7 +164,6 @@ let vmm_dev_req0 =
let doc = "VMM device name" in
Arg.(required & pos 0 (some string) None & info [] ~doc ~docv:"VMMDEV")
let opt_vm_name =
let doc = "name of virtual machine." in
Arg.(value & opt vm_c Name.root & info [ "n" ; "name"] ~doc)

View File

@ -3,4 +3,4 @@
(public_name albatross.cli)
(wrapped false)
(modules albatross_cli)
(libraries checkseum.c albatross lwt.unix cmdliner logs.fmt logs.cli fmt.cli fmt.tty ipaddr.unix))
(libraries checkseum.c albatross lwt.unix cmdliner logs.fmt logs.cli fmt.cli fmt.tty ipaddr.unix metrics metrics-lwt metrics-influx))

View File

@ -20,7 +20,7 @@ let pp_unix_error ppf e = Fmt.string ppf (Unix.error_message e)
let active = ref String.Map.empty
let read_console id name ring channel () =
let read_console id name ring channel =
Lwt.catch (fun () ->
let rec loop () =
Lwt_io.read_line channel >>= fun line ->
@ -66,6 +66,8 @@ let open_fifo name =
let t = ref String.Map.empty
let fifos = Albatross_cli.conn_metrics "fifo"
let add_fifo id =
let name = Vmm_core.Name.to_string id in
open_fifo id >|= function
@ -79,7 +81,8 @@ let add_fifo id =
ring
| Some ring -> ring
in
Lwt.async (read_console id name ring f) ;
fifos `Open;
Lwt.async (fun () -> read_console id name ring f >|= fun () -> fifos `Close) ;
Ok ()
let subscribe s id =
@ -110,7 +113,7 @@ let send_history s r id since =
| Error _ -> Vmm_lwt.safe_close s)
entries
let handle s addr () =
let handle s addr =
Logs.info (fun m -> m "handling connection %a" Vmm_lwt.pp_sockaddr addr) ;
let rec loop () =
Vmm_lwt.read_wire s >>= function
@ -156,18 +159,17 @@ let handle s addr () =
Vmm_lwt.safe_close s >|= fun () ->
Logs.warn (fun m -> m "disconnected")
let jump _ file =
let m = Albatross_cli.conn_metrics "unix"
let jump _ influx =
Sys.(set_signal sigpipe Signal_ignore) ;
Lwt_main.run
((Lwt_unix.file_exists file >>= function
| true -> Lwt_unix.unlink file
| false -> Lwt.return_unit) >>= fun () ->
let s = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in
Lwt_unix.(bind s (ADDR_UNIX file)) >>= fun () ->
Lwt_unix.listen s 1 ;
(Albatross_cli.init_influx "albatross_console" influx;
Vmm_lwt.server_socket `Console >>= fun s ->
let rec loop () =
Lwt_unix.accept s >>= fun (cs, addr) ->
Lwt.async (handle cs addr) ;
m `Open;
Lwt.async (fun () -> handle cs addr >|= fun () -> m `Close) ;
loop ()
in
loop ())
@ -176,12 +178,8 @@ open Cmdliner
open Albatross_cli
let socket =
let doc = "socket to use" in
Arg.(value & opt string (Vmm_core.socket_path `Console) & info [ "socket" ] ~doc)
let cmd =
Term.(term_result (const jump $ setup_log $ socket)),
Term.(term_result (const jump $ setup_log $ influx)),
Term.info "albatross_console" ~version:"%%VERSION_NUM%%"
let () = match Term.eval cmd with `Ok () -> exit 0 | _ -> exit 1

View File

@ -174,28 +174,21 @@ let safe_close s =
Logs.err (fun m -> m "exception %s while closing" (Printexc.to_string e)) ;
Lwt.return_unit)
let rec read_sock_write_tcp c ?fd addr addrtype =
let rec read_sock_write_tcp c ?fd addr =
match fd with
| None ->
Logs.debug (fun m -> m "new connection to TCP") ;
let fd = Lwt_unix.socket addrtype Lwt_unix.SOCK_STREAM 0 in
Lwt_unix.setsockopt fd Lwt_unix.SO_KEEPALIVE true ;
Lwt.catch
(fun () ->
Lwt_unix.connect fd addr >|= fun () ->
Logs.debug (fun m -> m "connected to TCP") ;
Some fd)
(fun e ->
let addr', port = match addr with
| Lwt_unix.ADDR_INET (ip, port) -> Unix.string_of_inet_addr ip, port
| Lwt_unix.ADDR_UNIX addr -> addr, 0
in
Logs.warn (fun m -> m "error %s connecting to influxd %s:%d, retrying in 5s"
(Printexc.to_string e) addr' port) ;
safe_close fd >>= fun () ->
Lwt_unix.sleep 5.0 >|= fun () ->
None) >>= fun fd ->
read_sock_write_tcp c ?fd addr addrtype
begin
Logs.debug (fun m -> m "new connection to TCP") ;
Vmm_lwt.connect Lwt_unix.PF_INET addr >>= function
| None ->
Logs.warn (fun m -> m "error connecting to influxd %a, retrying in 5s"
Vmm_lwt.pp_sockaddr addr);
Lwt_unix.sleep 5.0 >>= fun () ->
read_sock_write_tcp c addr
| Some fd ->
Lwt_unix.setsockopt fd Lwt_unix.SO_KEEPALIVE true ;
read_sock_write_tcp c ~fd addr
end
| Some fd ->
Logs.debug (fun m -> m "reading from unix socket") ;
Vmm_lwt.read_wire c >>= function
@ -223,7 +216,7 @@ let rec read_sock_write_tcp c ?fd addr addrtype =
Vmm_lwt.write_raw fd (Bytes.unsafe_of_string out) >>= function
| Ok () ->
Logs.debug (fun m -> m "wrote successfully") ;
read_sock_write_tcp c ~fd addr addrtype
read_sock_write_tcp c ~fd addr
| Error e ->
Logs.err (fun m -> m "error %s while writing to tcp (%s)"
(str_of_e e) name) ;
@ -233,7 +226,7 @@ let rec read_sock_write_tcp c ?fd addr addrtype =
| Ok wire ->
Logs.warn (fun m -> m "ignoring %a" Vmm_commands.pp_wire wire) ;
Lwt.return (Some fd) >>= fun fd ->
read_sock_write_tcp c ?fd addr addrtype
read_sock_write_tcp c ?fd addr
let query_sock vm c =
let header = Vmm_commands.{ version = my_version ; sequence = !command ; name = vm } in
@ -241,78 +234,67 @@ let query_sock vm c =
Logs.debug (fun m -> m "%Lu requesting %a via socket" !command Name.pp vm) ;
Vmm_lwt.write_wire c (header, `Command (`Stats_cmd `Stats_subscribe))
let rec maybe_connect stat_socket =
let c = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in
Lwt.catch
(fun () ->
Logs.debug (fun m -> m "connecting to %s" stat_socket) ;
Lwt_unix.(connect c (ADDR_UNIX stat_socket)) >>= fun () ->
Logs.debug (fun m -> m "connected") ;
Lwt.return c)
(fun e ->
Logs.warn (fun m -> m "error %s connecting to socket %s"
(Printexc.to_string e) stat_socket) ;
safe_close c >>= fun () ->
Lwt_unix.sleep (float_of_int 5) >>= fun () ->
maybe_connect stat_socket)
let rec maybe_connect () =
let sockaddr = Lwt_unix.ADDR_UNIX (socket_path `Stats) in
Logs.debug (fun m -> m "connecting to %a" Vmm_lwt.pp_sockaddr sockaddr);
Vmm_lwt.connect Lwt_unix.PF_UNIX sockaddr >>= function
| None ->
Logs.warn (fun m -> m "error connecting to socket %a" Vmm_lwt.pp_sockaddr sockaddr);
Lwt_unix.sleep 5. >>= fun () ->
maybe_connect ()
| Some c ->
Logs.debug (fun m -> m "connected");
Lwt.return c
let client stat_socket influxhost influxport vm =
(* figure out address of influx *)
Lwt_unix.gethostbyname influxhost >>= fun host_entry ->
let host_inet_addr = Array.get host_entry.Lwt_unix.h_addr_list 0 in
let addr = Lwt_unix.ADDR_INET (host_inet_addr, influxport)
and addrtype = host_entry.Lwt_unix.h_addrtype
in
let client influx vm =
match influx with
| None -> Lwt.return (Error (`Msg "influx host not provided"))
| Some (ip, port) ->
let addr = Lwt_unix.ADDR_INET (Ipaddr_unix.V4.to_inet_addr ip, port) in
(* loop *)
(* the query task queries the stat_socket at each
- if this fails, closing is set to true (and unit is returned)
(* loop *)
(* the query task queries the stat_socket at each
- if this fails, closing is set to true (and unit is returned)
the read_sock reads the stat_socket, and forwards to a TCP socket
- if closing is true, the TCP socket is closed and unit is returned
- if read on the unix domain socket fails, closing is set to true
the read_sock reads the stat_socket, and forwards to a TCP socket
- if closing is true, the TCP socket is closed and unit is returned
- if read on the unix domain socket fails, closing is set to true
(and unit is returned) *)
(* connection to the unix domain socket is managed in this loop only:
- maybe_connect attempts to establishes to it
- query_sock/read_sock_write_tcp write an read from it
- on failure in read or write, the TCP connection is closed, and loop
(* connection to the unix domain socket is managed in this loop only:
- maybe_connect attempts to establishes to it
- query_sock/read_sock_write_tcp write an read from it
- on failure in read or write, the TCP connection is closed, and loop
takes control: safe_close, maybe_connect, rinse, repeat *)
let rec loop () =
(* start a socket connection to vmm_stats *)
maybe_connect stat_socket >>= fun c ->
maybe_connect () >>= fun c ->
query_sock vm c >>= function
| Error e ->
Logs.err (fun m -> m "error %s while writing to stat socket" (str_of_e e)) ;
Lwt.return_unit
let err =
Rresult.R.error_msgf "error %s while writing to stat socket" (str_of_e e)
in
Lwt.return err
| Ok () ->
read_sock_write_tcp c addr addrtype >>= fun restart ->
if restart then loop () else Lwt.return_unit
read_sock_write_tcp c addr >>= fun restart ->
if restart then loop () else Lwt.return (Ok ())
in
loop ()
let run_client _ socket (influxhost, influxport) vm =
let run_client _ influx vm =
Sys.(set_signal sigpipe Signal_ignore) ;
Lwt_main.run (client socket influxhost influxport vm)
Lwt_main.run (client influx vm)
open Cmdliner
open Albatross_cli
let socket =
let doc = "socket to use" in
Arg.(value & opt string (Vmm_core.socket_path `Stats) & info [ "socket" ] ~doc)
let influx =
Arg.(required & pos 0 (some host_port) None & info [] ~docv:"INFLUXHOST:INFLUXPORT"
~doc:"the influx hostname:port to connect to")
let cmd =
let doc = "Albatross Influx connector" in
let man = [
`S "DESCRIPTION" ;
`P "$(tname) connects to a albatross stats socket, pulls statistics and pushes them via TCP to influxdb" ]
in
Term.(pure run_client $ setup_log $ socket $ influx $ opt_vm_name),
Term.(term_result (const run_client $ setup_log $ influx $ opt_vm_name)),
Term.info "albatross_influx" ~version:"%%VERSION_NUM%%" ~doc ~man
let () =

View File

@ -120,7 +120,7 @@ let read_data mvar ring s =
in
loop ()
let handle mvar ring s addr () =
let handle mvar ring s addr =
Logs.info (fun m -> m "handling connection from %a" Vmm_lwt.pp_sockaddr addr) ;
Vmm_lwt.read_wire s >>= begin function
| Error _ ->
@ -159,15 +159,13 @@ let handle mvar ring s addr () =
end >>= fun () ->
Vmm_lwt.safe_close s
let jump _ file sock =
let m = Albatross_cli.conn_metrics "unix"
let jump _ file influx =
Sys.(set_signal sigpipe Signal_ignore) ;
Lwt_main.run
((Lwt_unix.file_exists sock >>= function
| true -> Lwt_unix.unlink sock
| false -> Lwt.return_unit) >>= fun () ->
let s = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in
Lwt_unix.(bind s (ADDR_UNIX sock)) >>= fun () ->
Lwt_unix.listen s 1 ;
(Albatross_cli.init_influx "albatross_log" influx;
Vmm_lwt.server_socket `Log >>= fun s ->
let ring = Vmm_ring.create `Startup () in
read_from_file file >>= fun entries ->
Logs.app (fun m -> m "read %d entries from disk" (List.length entries)) ;
@ -181,7 +179,8 @@ let jump _ file sock =
Vmm_ring.write ring start ;
let rec loop () =
Lwt_unix.accept s >>= fun (cs, addr) ->
Lwt.async (handle mvar ring cs addr) ;
m `Open;
Lwt.async (fun () -> handle mvar ring cs addr >|= fun () -> m `Close) ;
loop ()
in
loop ())
@ -189,16 +188,12 @@ let jump _ file sock =
open Cmdliner
open Albatross_cli
let socket =
let doc = "socket to use" in
Arg.(value & opt string (Vmm_core.socket_path `Log) & info [ "socket" ] ~doc)
let file =
let doc = "File to write the log to" in
Arg.(value & opt string "/var/log/albatross" & info [ "logfile" ] ~doc)
let cmd =
Term.(term_result (const jump $ setup_log $ file $ socket)),
Term.(term_result (const jump $ setup_log $ file $ influx)),
Term.info "albatross_log" ~version:"%%VERSION_NUM%%"
let () = match Term.eval cmd with `Ok () -> exit 0 | _ -> exit 1

View File

@ -4,20 +4,6 @@ open Albatross_cli
open Vmm_core
type stats = {
start : Ptime.t ;
vm_created : int ;
vm_destroyed : int ;
}
let s = ref { start = Ptime_clock.now () ; vm_created = 0 ; vm_destroyed = 0 }
let pp_stats ppf s =
let diff = Ptime.(diff (Ptime_clock.now ()) s.start) in
Fmt.pf ppf "up %a: %d vms created, %d vms destroyed, %d running"
Ptime.Span.pp diff
s.vm_created s.vm_destroyed (s.vm_created - s.vm_destroyed)
open Lwt.Infix
let version = `AV3
@ -36,12 +22,10 @@ let create stat_out log_out cons_out data_out cons succ_cont fail_cont =
data_out data
| Ok (state', stat, log, data, name, vm) ->
state := state' ;
s := { !s with vm_created = succ !s.vm_created } ;
Lwt.async (fun () ->
Vmm_lwt.wait_and_clear vm.Unikernel.pid >>= fun r ->
let state', stat', log' = Vmm_vmmd.handle_shutdown !state name vm r in
state := state' ;
s := { !s with vm_destroyed = succ !s.vm_destroyed } ;
stat_out "handle shutdown stat" stat' >>= fun () ->
log_out "handle shutdown log" log' >|= fun () ->
let state', waiter_opt = Vmm_vmmd.waiter !state name in
@ -116,32 +100,9 @@ let handle log_out cons_out stat_out fd addr =
let connect_client_socket sock =
let name = socket_path sock in
let c = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in
Lwt_unix.set_close_on_exec c ;
Lwt.catch (fun () ->
Lwt_unix.(connect c (ADDR_UNIX name)) >|= fun () ->
Some (c, Lwt_mutex.create ()))
(fun e ->
Logs.warn (fun m -> m "error %s connecting to socket %s"
(Printexc.to_string e) name) ;
(Lwt.catch (fun () -> Lwt_unix.close c) (fun _ -> Lwt.return_unit)) >|= fun () ->
None)
let server_socket sock =
let name = socket_path sock in
(Lwt_unix.file_exists name >>= function
| true -> Lwt_unix.unlink name
| false -> Lwt.return_unit) >>= fun () ->
let s = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in
Lwt_unix.set_close_on_exec s ;
Lwt_unix.(bind s (ADDR_UNIX name)) >|= fun () ->
Lwt_unix.listen s 1 ;
s
let rec stats_loop () =
Logs.info (fun m -> m "%a" pp_stats !s) ;
Lwt_unix.sleep 600. >>= fun () ->
stats_loop ()
Vmm_lwt.connect Lwt_unix.PF_UNIX (Lwt_unix.ADDR_UNIX name) >|= function
| None -> None
| Some x -> Some (x, Lwt_mutex.create ())
let write_reply name (fd, mut) txt (header, cmd) =
Logs.debug (fun m -> m "locking to write to %s" name) ;
@ -179,13 +140,16 @@ let write_reply name (fd, mut) txt (header, cmd) =
Logs.err (fun m -> m "error in read from %s" name) ;
invalid_arg "communication failure"
let jump _ =
let m = conn_metrics "unix"
let jump _ influx =
Sys.(set_signal sigpipe Signal_ignore);
match Vmm_vmmd.restore_unikernels () with
| Error (`Msg msg) -> Logs.err (fun m -> m "bailing out: %s" msg)
| Ok old_unikernels ->
Lwt_main.run
(server_socket `Vmmd >>= fun ss ->
(init_influx "albatross" influx;
Vmm_lwt.server_socket `Vmmd >>= fun ss ->
(connect_client_socket `Log >|= function
| None -> invalid_arg "cannot connect to log socket"
| Some l -> l) >>= fun l ->
@ -214,8 +178,6 @@ let jump _ =
| Some s -> write_reply "stat" s txt wire >|= fun _ -> ()
in
Lwt.async stats_loop ;
let start_unikernel (name, config) =
let hdr = Vmm_commands.{ version ; sequence = 0L ; name = Name.root }
and data_out _ = Lwt.return_unit
@ -234,7 +196,10 @@ let jump _ =
let rec loop () =
Lwt_unix.accept ss >>= fun (fd, addr) ->
Lwt_unix.set_close_on_exec fd ;
Lwt.async (fun () -> handle log_out cons_out stat_out fd addr) ;
m `Open;
Lwt.async (fun () ->
handle log_out cons_out stat_out fd addr >|= fun () ->
m `Close) ;
loop ()
in
loop ())
@ -245,7 +210,7 @@ let jump _ =
open Cmdliner
let cmd =
Term.(const jump $ setup_log),
Term.(const jump $ setup_log $ influx),
Term.info "albatrossd" ~version:"%%VERSION_NUM%%"
let () = match Term.eval cmd with `Ok () -> exit 0 | _ -> exit 1

View File

@ -3,7 +3,7 @@
(public_name albatrossd)
(package albatross)
(modules albatrossd)
(libraries albatross.cli albatross))
(libraries albatross.cli albatross metrics-lwt metrics-influx))
(executable
(name albatross_console)

View File

@ -41,8 +41,6 @@ EOD;
mkdir -p /var/run/albatross/util /var/run/albatross/fifo
chown albatross:albatross /var/run/albatross/util /var/run/albatross/fifo
chmod 2760 /var/run/albatross/fifo
chgrp albatross /usr/local/libexec/albatross/albatrossd
chmod 2750 /usr/local/libexec/albatross/albatrossd
EOD;
post-deinstall = <<EOD

View File

@ -3,4 +3,4 @@
(public_name albatross)
(wrapped false)
(libraries rresult logs ipaddr bos hex ptime astring duration cstruct
decompress lwt lwt.unix ptime.clock.os asn1-combinators))
decompress lwt lwt.unix ptime.clock.os asn1-combinators metrics))

View File

@ -9,12 +9,12 @@ type service = [ `Console | `Log | `Stats | `Vmmd ]
let socket_path t =
let path = match t with
| `Console -> Fpath.(sockdir / "console" + "sock")
| `Vmmd -> Fpath.(tmpdir / "vmmd" + "sock")
| `Stats -> Fpath.(sockdir / "stat" + "sock")
| `Log -> Fpath.(sockdir / "log" + "sock")
| `Console -> "console"
| `Vmmd -> "vmmd"
| `Stats -> "stat"
| `Log -> "log"
in
Fpath.to_string path
Fpath.to_string Fpath.(sockdir / path + "sock")
let pp_socket ppf t =
let name = socket_path t in
@ -53,6 +53,10 @@ module Name = struct
let to_list x = x
let drop x = match List.rev x with
| [] -> []
| _::tl -> List.rev tl
let append_exn lbl x =
if valid_label lbl then
x @ [ lbl ]

View File

@ -24,6 +24,7 @@ module Name : sig
val of_list : string list -> (t, [> `Msg of string ]) result
val to_list : t -> string list
val drop : t -> t
val append : string -> t -> (t, [> `Msg of string ]) result
val prepend : string -> t -> (t, [> `Msg of string ]) result
val append_exn : string -> t -> t

View File

@ -7,6 +7,38 @@ let pp_sockaddr ppf = function
| Lwt_unix.ADDR_INET (addr, port) -> Fmt.pf ppf "TCP %s:%d"
(Unix.string_of_inet_addr addr) port
let safe_close fd =
Lwt.catch
(fun () -> Lwt_unix.close fd)
(fun _ -> Lwt.return_unit)
let server_socket sock =
let name = Vmm_core.socket_path sock in
(Lwt_unix.file_exists name >>= function
| true -> Lwt_unix.unlink name
| false -> Lwt.return_unit) >>= fun () ->
let s = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in
Lwt_unix.set_close_on_exec s ;
let old_umask = Unix.umask 0 in
let _ = Unix.umask (old_umask land 0o707) in
Lwt_unix.(bind s (ADDR_UNIX name)) >|= fun () ->
Logs.app (fun m -> m "listening on %s" name);
let _ = Unix.umask old_umask in
Lwt_unix.listen s 1 ;
s
let connect addrtype sockaddr =
let c = Lwt_unix.(socket addrtype SOCK_STREAM 0) in
Lwt_unix.set_close_on_exec c ;
Lwt.catch (fun () ->
Lwt_unix.(connect c sockaddr) >|= fun () ->
Some c)
(fun e ->
Logs.warn (fun m -> m "error %s connecting to socket %a"
(Printexc.to_string e) pp_sockaddr sockaddr);
safe_close c >|= fun () ->
None)
let pp_process_status ppf = function
| Unix.WEXITED c -> Fmt.pf ppf "exited with %d" c
| Unix.WSIGNALED s -> Fmt.pf ppf "killed by signal %a" Fmt.Dump.signal s
@ -101,11 +133,6 @@ let write_wire s wire =
let buf = Cstruct.(to_bytes (append dlen data)) in
write_raw s buf
let safe_close fd =
Lwt.catch
(fun () -> Lwt_unix.close fd)
(fun _ -> Lwt.return_unit)
let read_from_file file =
Lwt.catch (fun () ->
Lwt_unix.stat file >>= fun stat ->

View File

@ -2,6 +2,10 @@
val pp_sockaddr : Format.formatter -> Lwt_unix.sockaddr -> unit
val server_socket : Vmm_core.service -> Lwt_unix.file_descr Lwt.t
val connect : Lwt_unix.socket_domain -> Lwt_unix.sockaddr -> Lwt_unix.file_descr option Lwt.t
val pp_process_status : Format.formatter -> Unix.process_status -> unit
val ret : Unix.process_status -> Vmm_core.process_exit

View File

@ -31,18 +31,62 @@ let empty = {
unikernels = Vmm_trie.empty
}
let policy_metrics =
let open Metrics in
let doc = "VMM resource policies" in
let data policy =
Data.v [
uint "maximum unikernels" policy.Policy.vms ;
uint "maximum memory" policy.Policy.memory ;
uint "maximum block" (match policy.Policy.block with None -> 0 | Some x -> x)
]
in
let tag = Tags.string "domain" in
Src.v ~doc ~tags:Tags.[tag] ~data "vmm-policies"
let no_policy = Policy.{ vms = 0 ; cpuids = IS.empty ; memory = 0 ; block = None ; bridges = Astring.String.Set.empty }
(* we should confirm the following invariant: Vm or Block have no siblings *)
let block_usage t name =
Vmm_trie.fold name t.block_devices
(fun _ (size, _) blockspace -> blockspace + size)
0
(fun _ (size, act) (active, inactive) ->
if act then active + size, inactive else active, inactive + size)
(0, 0)
let total_block_usage t name =
let act, inact = block_usage t name in
act + inact
let vm_usage t name =
Vmm_trie.fold name t.unikernels
(fun _ vm (vms, memory) -> (succ vms, memory + vm.Unikernel.config.Unikernel.memory))
(0, 0)
let unikernel_metrics =
let open Metrics in
let doc = "VMM unikernels" in
let data (t, name) =
let vms, memory = vm_usage t name
and act, inact = block_usage t name
in
Data.v [
uint "attached used block" act ;
uint "unattached used block" inact ;
uint "total used block" (act + inact) ;
uint "running unikernels" vms ;
uint "used memory" memory
]
in
let tag = Tags.string "domain" in
Src.v ~doc ~tags:Tags.[tag] ~data "vmm-unikernels"
let rec report_vms t name =
let name' = Name.drop name in
let str = if Name.is_root name' then "." else Name.to_string name' in
Metrics.add unikernel_metrics (fun x -> x str) (fun d -> d (t, name'));
if Name.is_root name' then () else report_vms t name'
let find_vm t name = Vmm_trie.find name t.unikernels
let find_policy t name = Vmm_trie.find name t.policies
@ -69,12 +113,15 @@ let remove_vm t name = match find_vm t name with
| Some vm ->
let block_devices = use_blocks t.block_devices name vm false in
let unikernels = Vmm_trie.remove name t.unikernels in
Ok { t with block_devices ; unikernels }
let t' = { t with block_devices ; unikernels } in
report_vms t' name;
Ok t'
let remove_policy t name = match find_policy t name with
| None -> Error (`Msg "unknown policy")
| Some _ ->
let policies = Vmm_trie.remove name t.policies in
Metrics.add policy_metrics (fun x -> x (Name.to_string name)) (fun d -> d no_policy);
Ok { t with policies }
let remove_block t name = match find_block t name with
@ -84,7 +131,9 @@ let remove_block t name = match find_block t name with
Error (`Msg "block device in use")
else
let block_devices = Vmm_trie.remove name t.block_devices in
Ok { t with block_devices }
let t' = { t with block_devices } in
report_vms t' name;
Ok t'
let check_policy (p : Policy.t) (running_vms, used_memory) (vm : Unikernel.config) =
if succ running_vms > p.Policy.vms then
@ -129,7 +178,9 @@ let insert_vm t name vm =
let unikernels, old = Vmm_trie.insert name vm t.unikernels in
(match old with None -> () | Some _ -> invalid_arg ("unikernel " ^ Name.to_string name ^ " already exists in trie")) ;
let block_devices = use_blocks t.block_devices name vm true in
{ t with unikernels ; block_devices }
let t' = { t with unikernels ; block_devices } in
report_vms t' name;
t'
let check_block t name size =
let block_ok = match find_block t name with
@ -140,7 +191,7 @@ let check_block t name size =
match find_policy t dom with
| None -> Ok ()
| Some p ->
let used = block_usage t dom in
let used = total_block_usage t dom in
match p.Policy.block with
| None -> Error (`Msg "no block devices are allowed by policy")
| Some limit ->
@ -155,7 +206,9 @@ let check_block t name size =
let insert_block t name size =
check_block t name size >>= fun () ->
let block_devices = fst (Vmm_trie.insert name (size, false) t.block_devices) in
Ok { t with block_devices }
let t' = { t with block_devices } in
report_vms t' name;
Ok t'
let sub_policy ~super ~sub =
let sub_block sub super =
@ -202,7 +255,7 @@ let check_policies_below t curname super =
let check_vms t name p =
let (vms, used_memory) = vm_usage t name
and block = block_usage t name
and block = total_block_usage t name
in
let bridges, cpuids =
Vmm_trie.fold name t.unikernels
@ -231,4 +284,5 @@ let insert_policy t name p =
check_policies_below t name p >>= fun () ->
check_vms t name p >>= fun () ->
let policies = fst (Vmm_trie.insert name p t.policies) in
Metrics.add policy_metrics (fun x -> x (Name.to_string name)) (fun d -> d p);
Ok { t with policies }

View File

@ -86,7 +86,7 @@ let restore_unikernels () =
match Vmm_asn.unikernels_of_cstruct data with
| Error (`Msg msg) -> Error (`Msg ("couldn't parse state: " ^ msg))
| Ok unikernels ->
Logs.info (fun m -> m "restored some unikernels") ;
Logs.info (fun m -> m "restored %d unikernels" (List.length (Vmm_trie.all unikernels))) ;
Ok unikernels
let dump_unikernels t =

View File

@ -23,7 +23,7 @@ let pp_sockaddr ppf = function
| Lwt_unix.ADDR_INET (addr, port) -> Fmt.pf ppf "TCP %s:%d"
(Unix.string_of_inet_addr addr) port
let handle s addr () =
let handle s addr =
Logs.info (fun m -> m "handling stats connection %a" pp_sockaddr addr) ;
let rec loop () =
Vmm_lwt.read_wire s >>= function
@ -64,20 +64,19 @@ let timer () =
Vmm_lwt.safe_close s)
outs
let jump _ file interval =
let m = Albatross_cli.conn_metrics "unix"
let jump _ interval influx =
Sys.(set_signal sigpipe Signal_ignore) ;
let interval = Duration.(to_f (of_sec interval)) in
Lwt_main.run
((Lwt_unix.file_exists file >>= function
| true -> Lwt_unix.unlink file
| false -> Lwt.return_unit) >>= fun () ->
let s = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in
Lwt_unix.(bind s (ADDR_UNIX file)) >>= fun () ->
Lwt_unix.listen s 1 ;
(Albatross_cli.init_influx "albatross_stats" influx;
Vmm_lwt.server_socket `Stats >>= fun s ->
let _ev = Lwt_engine.on_timer interval true (fun _e -> Lwt.async timer) in
let rec loop () =
Lwt_unix.accept s >>= fun (cs, addr) ->
Lwt.async (handle cs addr) ;
m `Open;
Lwt.async (fun () -> handle cs addr >|= fun () -> m `Close);
loop ()
in
loop ())
@ -85,16 +84,12 @@ let jump _ file interval =
open Cmdliner
open Albatross_cli
let socket =
let doc = "socket to use" in
Arg.(value & opt string (Vmm_core.socket_path `Stats) & info [ "socket" ] ~doc)
let interval =
let doc = "Interval between statistics gatherings (in seconds)" in
Arg.(value & opt int 10 & info [ "interval" ] ~doc)
let cmd =
Term.(term_result (const jump $ setup_log $ socket $ interval)),
Term.(term_result (const jump $ setup_log $ interval $ influx)),
Term.info "albatross_stats" ~version:"%%VERSION_NUM%%"
let () = match Term.eval cmd with `Ok () -> exit 0 | _ -> exit 1

View File

@ -46,6 +46,8 @@ let rec wrap f arg =
Logs.err (fun m -> m "exception %s" (Printexc.to_string e)) ;
None
let vmmapi = Albatross_cli.conn_metrics "vmmapi"
let remove_vmid t vmid =
Logs.info (fun m -> m "removing vmid %a" Vmm_core.Name.pp vmid) ;
match Vmm_trie.find vmid t.vmid_pid with
@ -53,7 +55,7 @@ let remove_vmid t vmid =
| Some pid ->
Logs.info (fun m -> m "removing pid %d" pid) ;
(match IM.find_opt pid t.pid_nic with
| Some (Ok vmctx, _, _) -> ignore (wrap vmmapi_close vmctx)
| Some (Ok vmctx, _, _) -> ignore (wrap vmmapi_close vmctx) ; vmmapi `Close
| _ -> ()) ;
let pid_nic = IM.remove pid t.pid_nic
and vmid_pid = Vmm_trie.remove vmid t.vmid_pid
@ -84,6 +86,7 @@ let open_vmmapi ~retries name =
Logs.warn (fun m -> m "(ignored, %d attempts left) vmmapi_open failed for %s" left name) ;
Error left
| Some vmctx ->
vmmapi `Open;
Logs.info (fun m -> m "vmmapi_open succeeded for %s" name) ;
fill_descr vmctx ;
Ok vmctx

View File

@ -7,7 +7,7 @@ let () =
(library
(name albatross_stats)
(public_name albatross.stats)
(libraries albatross)
(libraries albatross albatross.cli)
(wrapped false)
(c_names albatross_stats_stubs)
(modules albatross_stats_pure))

View File

@ -15,11 +15,6 @@ let tls_config cacert cert priv_key =
~reneg:true ~certificates:(`Single cert) ()),
ca)
let connect socket_path =
let c = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in
Lwt_unix.connect c (Lwt_unix.ADDR_UNIX socket_path) >|= fun () ->
c
let client_auth ca tls =
let authenticator =
let time = Ptime_clock.now () in
@ -66,56 +61,63 @@ let handle ca tls =
| Error (`Msg m) -> Lwt.fail_with m
| Ok (name, policies, cmd) ->
let sock, next = Vmm_commands.endpoint cmd in
connect (Vmm_core.socket_path sock) >>= fun fd ->
(match sock with
| `Vmmd ->
Lwt_list.fold_left_s (fun r (id, policy) ->
match r with
| Error (`Msg msg) -> Lwt.return (Error (`Msg msg))
| Ok () ->
Logs.debug (fun m -> m "adding policy for %a: %a" Vmm_core.Name.pp id Vmm_core.Policy.pp policy) ;
let header = Vmm_commands.{version = my_version ; sequence = !command ; name = id } in
command := Int64.succ !command ;
Vmm_lwt.write_wire fd (header, `Command (`Policy_cmd (`Policy_add policy))) >>= function
| Error `Exception -> Lwt.return (Error (`Msg "failed to write policy"))
| Ok () ->
Vmm_lwt.read_wire fd >|= function
(* TODO check version *)
| Error _ -> Error (`Msg "read error after writing policy")
| Ok (_, `Success _) -> Ok ()
| Ok wire ->
Rresult.R.error_msgf
"expected success when adding policy, got: %a"
Vmm_commands.pp_wire wire)
(Ok ()) policies
| _ -> Lwt.return (Ok ())) >>= function
| Error (`Msg msg) ->
begin
Logs.warn (fun m -> m "error while applying policies %s" msg) ;
let wire =
let header = Vmm_commands.{version = my_version ; sequence = 0L ; name } in
header, `Failure msg
in
Vmm_tls_lwt.write_tls tls wire >>= fun _ ->
Vmm_lwt.safe_close fd >>= fun () ->
Lwt.fail_with msg
end
| Ok () ->
let wire =
let header = Vmm_commands.{version = my_version ; sequence = !command ; name } in
command := Int64.succ !command ;
(header, `Command cmd)
let sockaddr = Lwt_unix.ADDR_UNIX (Vmm_core.socket_path sock) in
Vmm_lwt.connect Lwt_unix.PF_UNIX sockaddr >>= function
| None ->
let err =
Rresult.R.error_msgf "failed to connect to %a" Vmm_lwt.pp_sockaddr sockaddr
in
Vmm_lwt.write_wire fd wire >>= function
| Error `Exception ->
Vmm_lwt.safe_close fd >>= fun () ->
Lwt.return (Error (`Msg "couldn't write"))
Lwt.return err
| Some fd ->
(match sock with
| `Vmmd ->
Lwt_list.fold_left_s (fun r (id, policy) ->
match r with
| Error (`Msg msg) -> Lwt.return (Error (`Msg msg))
| Ok () ->
Logs.debug (fun m -> m "adding policy for %a: %a" Vmm_core.Name.pp id Vmm_core.Policy.pp policy) ;
let header = Vmm_commands.{version = my_version ; sequence = !command ; name = id } in
command := Int64.succ !command ;
Vmm_lwt.write_wire fd (header, `Command (`Policy_cmd (`Policy_add policy))) >>= function
| Error `Exception -> Lwt.return (Error (`Msg "failed to write policy"))
| Ok () ->
Vmm_lwt.read_wire fd >|= function
(* TODO check version *)
| Error _ -> Error (`Msg "read error after writing policy")
| Ok (_, `Success _) -> Ok ()
| Ok wire ->
Rresult.R.error_msgf
"expected success when adding policy, got: %a"
Vmm_commands.pp_wire wire)
(Ok ()) policies
| _ -> Lwt.return (Ok ())) >>= function
| Error (`Msg msg) ->
begin
Logs.warn (fun m -> m "error while applying policies %s" msg) ;
let wire =
let header = Vmm_commands.{version = my_version ; sequence = 0L ; name } in
header, `Failure msg
in
Vmm_tls_lwt.write_tls tls wire >>= fun _ ->
Vmm_lwt.safe_close fd >>= fun () ->
Lwt.fail_with msg
end
| Ok () ->
(match next with
| `Read -> read fd tls
| `End -> process fd tls) >>= fun res ->
Vmm_lwt.safe_close fd >|= fun () ->
res
let wire =
let header = Vmm_commands.{version = my_version ; sequence = !command ; name } in
command := Int64.succ !command ;
(header, `Command cmd)
in
Vmm_lwt.write_wire fd wire >>= function
| Error `Exception ->
Vmm_lwt.safe_close fd >>= fun () ->
Lwt.return (Error (`Msg "couldn't write"))
| Ok () ->
(match next with
| `Read -> read fd tls
| `End -> process fd tls) >>= fun res ->
Vmm_lwt.safe_close fd >|= fun () ->
res
open Cmdliner