From f81a12bc4db90f156b2ee59a3079f7717add10c8 Mon Sep 17 00:00:00 2001 From: Hannes Mehnert Date: Thu, 10 Oct 2019 22:26:36 +0200 Subject: [PATCH] initial metrics --- albatross.opam | 9 +- client/albatross_client_bistro.ml | 33 ++++--- client/albatross_client_local.ml | 34 ++++---- client/albatross_client_remote_tls.ml | 34 +++++--- command-line/albatross_cli.ml | 86 +++++++++++++++++- command-line/dune | 2 +- daemon/albatross_console.ml | 30 +++---- daemon/albatross_influx.ml | 120 +++++++++++--------------- daemon/albatross_log.ml | 23 ++--- daemon/albatrossd.ml | 61 +++---------- daemon/dune | 2 +- packaging/MANIFEST | 2 - src/dune | 2 +- src/vmm_core.ml | 14 +-- src/vmm_core.mli | 1 + src/vmm_lwt.ml | 37 ++++++-- src/vmm_lwt.mli | 4 + src/vmm_resources.ml | 70 +++++++++++++-- src/vmm_vmmd.ml | 2 +- stats/albatross_stats.ml | 23 ++--- stats/albatross_stats_pure.ml | 5 +- stats/dune | 2 +- tls/albatross_tls_common.ml | 108 +++++++++++------------ 23 files changed, 422 insertions(+), 282 deletions(-) diff --git a/albatross.opam b/albatross.opam index 10cd322..7965076 100644 --- a/albatross.opam +++ b/albatross.opam @@ -26,8 +26,15 @@ depends: [ "duration" "decompress" {>= "0.9.0" & < "1.0.0"} "checkseum" + "metrics" + "metrics-lwt" + "metrics-influx" +] +pin-depends: [ + ["metrics.dev" "git+https://github.com/hannesm/metrics.git#future"] + ["metrics-lwt.dev" "git+https://github.com/hannesm/metrics.git#future"] + ["metrics-influx.dev" "git+https://github.com/hannesm/metrics.git#future"] ] - build: [ ["dune" "subst"] {pinned} ["dune" "build" "-p" name "-j" jobs] diff --git a/client/albatross_client_bistro.ml b/client/albatross_client_bistro.ml index b095205..dda33dd 100644 --- a/client/albatross_client_bistro.ml +++ b/client/albatross_client_bistro.ml @@ -10,7 +10,10 @@ let read fd = Logs.debug (fun m -> m "reading tls stream") ; let rec loop () = Vmm_tls_lwt.read_tls fd >>= function - | Error _ -> Lwt.return () + | Error `Eof -> + Logs.warn (fun m -> m "eof from server"); + Lwt.return (Ok ()) + | Error _ -> Lwt.return (Error (`Msg ("read failure"))) | Ok wire -> Albatross_cli.print_result version wire ; loop () @@ -71,19 +74,25 @@ let handle (host, port) cert key ca id (cmd : Vmm_commands.t) = X509_lwt.authenticator (`Ca_file ca) >>= fun authenticator -> Lwt_unix.gethostbyname host >>= fun host_entry -> let host_inet_addr = Array.get host_entry.Lwt_unix.h_addr_list 0 in - let fd = Lwt_unix.(socket host_entry.h_addrtype SOCK_STREAM 0) in - Logs.debug (fun m -> m "connecting to remote host") ; - Lwt_unix.connect fd (Lwt_unix.ADDR_INET (host_inet_addr, port)) >>= fun () -> - (* reneg true to allow re-negotiation over the server-authenticated TLS - channel (to transport client certificate encrypted), once TLS 1.3 is in - (and required) be removed! *) - let client = Tls.Config.client ~reneg:true ~certificates ~authenticator () in - Tls_lwt.Unix.client_of_fd client (* TODO ~host *) fd >>= fun t -> - Logs.debug (fun m -> m "finished tls handshake") ; - read t + let sockaddr = Lwt_unix.ADDR_INET (host_inet_addr, port) in + Vmm_lwt.connect host_entry.h_addrtype sockaddr >>= function + | None -> + let err = + Rresult.R.error_msgf "connection failed to %a" Vmm_lwt.pp_sockaddr sockaddr + in + Lwt.return err + | Some fd -> + Logs.debug (fun m -> m "connecting to remote host") ; + (* reneg true to allow re-negotiation over the server-authenticated TLS + channel (to transport client certificate encrypted), once TLS 1.3 is in + (and required) be removed! *) + let client = Tls.Config.client ~reneg:true ~certificates ~authenticator () in + Tls_lwt.Unix.client_of_fd client (* TODO ~host *) fd >>= fun t -> + Logs.debug (fun m -> m "finished tls handshake") ; + read t let jump endp cert key ca name cmd = - Ok (Lwt_main.run (handle endp cert key ca name cmd)) + Lwt_main.run (handle endp cert key ca name cmd) let info_policy _ endp cert key ca name = jump endp cert key ca name (`Policy_cmd `Policy_info) diff --git a/client/albatross_client_local.ml b/client/albatross_client_local.ml index 58fc177..29218ea 100644 --- a/client/albatross_client_local.ml +++ b/client/albatross_client_local.ml @@ -8,12 +8,6 @@ let socket t = function | Some x -> x | None -> Vmm_core.socket_path t -let connect socket_path = - let c = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in - Lwt_unix.set_close_on_exec c ; - Lwt_unix.connect c (Lwt_unix.ADDR_UNIX socket_path) >|= fun () -> - c - let process fd = Vmm_lwt.read_wire fd >|= function | Error _ -> Error () @@ -30,18 +24,26 @@ let read fd = let handle opt_socket name (cmd : Vmm_commands.t) = let sock, next = Vmm_commands.endpoint cmd in - connect (socket sock opt_socket) >>= fun fd -> - let header = Vmm_commands.{ version ; sequence = 0L ; name } in - Vmm_lwt.write_wire fd (header, `Command cmd) >>= function - | Error `Exception -> Lwt.return () - | Ok () -> - (match next with - | `Read -> read fd - | `End -> process fd >|= ignore) >>= fun () -> - Vmm_lwt.safe_close fd + let sockaddr = Lwt_unix.ADDR_UNIX (socket sock opt_socket) in + Vmm_lwt.connect Lwt_unix.PF_UNIX sockaddr >>= function + | None -> + let err = + Rresult.R.error_msgf "couldn't connect to %a" Vmm_lwt.pp_sockaddr sockaddr + in + Lwt.return err + | Some fd -> + let header = Vmm_commands.{ version ; sequence = 0L ; name } in + Vmm_lwt.write_wire fd (header, `Command cmd) >>= function + | Error `Exception -> Lwt.return (Error (`Msg "exception")) + | Ok () -> + (match next with + | `Read -> read fd + | `End -> process fd >|= ignore) >>= fun () -> + Vmm_lwt.safe_close fd >|= fun () -> + Ok () let jump opt_socket name cmd = - Ok (Lwt_main.run (handle opt_socket name cmd)) + Lwt_main.run (handle opt_socket name cmd) let info_policy _ opt_socket name = jump opt_socket name (`Policy_cmd `Policy_info) diff --git a/client/albatross_client_remote_tls.ml b/client/albatross_client_remote_tls.ml index 26353a8..989910a 100644 --- a/client/albatross_client_remote_tls.ml +++ b/client/albatross_client_remote_tls.ml @@ -6,7 +6,11 @@ let version = `AV3 let rec read_tls_write_cons t = Vmm_tls_lwt.read_tls t >>= function - | Error _ -> Lwt.return_unit + | Error `Eof -> + Logs.warn (fun m -> m "eof from server"); + Lwt.return (Ok ()) + | Error _ -> + Lwt.return (Error (`Msg ("read failure"))) | Ok wire -> Albatross_cli.print_result version wire ; read_tls_write_cons t @@ -24,17 +28,25 @@ let client cas host port cert priv_key = - ip: connecto to ip and verify hostname *) Lwt_unix.gethostbyname host >>= fun host_entry -> let host_inet_addr = Array.get host_entry.Lwt_unix.h_addr_list 0 in - let fd = Lwt_unix.socket host_entry.Lwt_unix.h_addrtype Lwt_unix.SOCK_STREAM 0 in - Lwt_unix.connect fd (Lwt_unix.ADDR_INET (host_inet_addr, port)) >>= fun _ -> - X509_lwt.private_of_pems ~cert ~priv_key >>= fun cert -> - let certificates = `Single cert in - let client = Tls.Config.client ~reneg:true ~certificates ~authenticator () in - Tls_lwt.Unix.client_of_fd client (* ~host *) fd >>= fun t -> - read_tls_write_cons t) + let sockaddr = Lwt_unix.ADDR_INET (host_inet_addr, port) in + Vmm_lwt.connect host_entry.Lwt_unix.h_addrtype sockaddr >>= function + | None -> + let err = + Rresult.R.error_msgf "couldn't connect to %a" Vmm_lwt.pp_sockaddr sockaddr + in + Lwt.return err + | Some fd -> + X509_lwt.private_of_pems ~cert ~priv_key >>= fun cert -> + let certificates = `Single cert in + let client = Tls.Config.client ~reneg:true ~certificates ~authenticator () in + Tls_lwt.Unix.client_of_fd client (* ~host *) fd >>= fun t -> + read_tls_write_cons t) (fun exn -> - Logs.err (fun m -> m "failed to establish TLS connection: %s" - (Printexc.to_string exn)) ; - Lwt.return_unit) + let err = + Rresult.R.error_msgf "failed to establish TLS connection: %s" + (Printexc.to_string exn) + in + Lwt.return err) let run_client _ cas cert key (host, port) = Printexc.register_printer (function diff --git a/command-line/albatross_cli.ml b/command-line/albatross_cli.ml index 54db9d6..1d40ee3 100644 --- a/command-line/albatross_cli.ml +++ b/command-line/albatross_cli.ml @@ -3,6 +3,68 @@ open Astring open Vmm_core +let conn_metrics kind = + let s = ref (0, 0) in + let open Metrics in + let doc = "connection statistics" in + let data () = + Data.v [ + int "active" (fst !s) ; + int "total" (snd !s) ; + ] in + let tags = Tags.string "kind" in + let src = Src.v ~doc ~tags:Tags.[ tags ] ~data "connections" in + (fun action -> + (match action with + | `Open -> s := (succ (fst !s), succ (snd !s)) + | `Close -> s := (pred (fst !s), snd !s)); + Metrics.add src (fun x -> x kind) (fun d -> d ())) + +open Lwt.Infix + +let process = + Metrics.field ~doc:"name of the process" "process" Metrics.String + +let init_influx name data = + match data with + | None -> () + | Some (ip, port) -> + Logs.info (fun m -> m "stats connecting to %a:%d" Ipaddr.V4.pp ip port); + Metrics.enable_all (); + Metrics_lwt.init_periodic (fun () -> Lwt_unix.sleep 10.); + let get_cache, reporter = Metrics.cache_reporter () in + Metrics.set_reporter reporter; + let fd = ref None in + let rec report () = + let send () = + (match !fd with + | Some _ -> Lwt.return_unit + | None -> + let addr = Lwt_unix.ADDR_INET (Ipaddr_unix.V4.to_inet_addr ip, port) in + Vmm_lwt.connect Lwt_unix.PF_INET addr >|= function + | None -> Logs.err (fun m -> m "connection failure to stats") + | Some fd' -> fd := Some fd') >>= fun () -> + match !fd with + | None -> Lwt.return_unit + | Some socket -> + let tag = process name in + let datas = Metrics.SM.fold (fun src (tags, data) acc -> + let name = Metrics.Src.name src in + Metrics_influx.encode_line_protocol (tag :: tags) data name :: acc) + (get_cache ()) [] + in + let datas = String.concat ~sep:"" datas in + Vmm_lwt.write_raw socket (Bytes.unsafe_of_string datas) >|= function + | Ok () -> () + | Error `Exception -> + Logs.warn (fun m -> m "error on stats write"); + fd := None + and sleep () = Lwt_unix.sleep 10. + in + Lwt.join [ send () ; sleep () ] >>= report + in + Lwt.async report + let print_result version (header, reply) = if not (Vmm_commands.version_eq header.Vmm_commands.version version) then Logs.err (fun m -> m "version not equal") @@ -43,9 +105,30 @@ let setup_log = $ Fmt_cli.style_renderer () $ Logs_cli.level ()) +let ip_port : (Ipaddr.V4.t * int) Arg.converter = + let default_port = 8094 in + let parse s = + match + match String.cut ~sep:":" s with + | None -> Ok (s, default_port) + | Some (ip, port) -> match int_of_string port with + | exception Failure _ -> Error "non-numeric port" + | port -> Ok (ip, port) + with + | Error msg -> `Error msg + | Ok (ip, port) -> match Ipaddr.V4.of_string ip with + | Ok ip -> `Ok (ip, port) + | Error `Msg msg -> `Error msg + in + parse, fun ppf (ip, port) -> Format.fprintf ppf "%a:%d" Ipaddr.V4.pp ip port + +let influx = + let doc = "IP address and port (default: 8094) to report metrics to in influx line protocol" in + Arg.(value & opt (some ip_port) None & info [ "influx" ] ~doc ~docv:"INFLUXHOST[:PORT]") + let host_port : (string * int) Arg.converter = let parse s = - match Astring.String.cut ~sep:":" s with + match String.cut ~sep:":" s with | None -> `Error "broken: no port specified" | Some (hostname, port) -> try @@ -81,7 +164,6 @@ let vmm_dev_req0 = let doc = "VMM device name" in Arg.(required & pos 0 (some string) None & info [] ~doc ~docv:"VMMDEV") - let opt_vm_name = let doc = "name of virtual machine." in Arg.(value & opt vm_c Name.root & info [ "n" ; "name"] ~doc) diff --git a/command-line/dune b/command-line/dune index d02cf35..222ab7b 100644 --- a/command-line/dune +++ b/command-line/dune @@ -3,4 +3,4 @@ (public_name albatross.cli) (wrapped false) (modules albatross_cli) - (libraries checkseum.c albatross lwt.unix cmdliner logs.fmt logs.cli fmt.cli fmt.tty ipaddr.unix)) + (libraries checkseum.c albatross lwt.unix cmdliner logs.fmt logs.cli fmt.cli fmt.tty ipaddr.unix metrics metrics-lwt metrics-influx)) diff --git a/daemon/albatross_console.ml b/daemon/albatross_console.ml index c37d6f2..9d20ca7 100644 --- a/daemon/albatross_console.ml +++ b/daemon/albatross_console.ml @@ -20,7 +20,7 @@ let pp_unix_error ppf e = Fmt.string ppf (Unix.error_message e) let active = ref String.Map.empty -let read_console id name ring channel () = +let read_console id name ring channel = Lwt.catch (fun () -> let rec loop () = Lwt_io.read_line channel >>= fun line -> @@ -66,6 +66,8 @@ let open_fifo name = let t = ref String.Map.empty +let fifos = Albatross_cli.conn_metrics "fifo" + let add_fifo id = let name = Vmm_core.Name.to_string id in open_fifo id >|= function @@ -79,7 +81,8 @@ let add_fifo id = ring | Some ring -> ring in - Lwt.async (read_console id name ring f) ; + fifos `Open; + Lwt.async (fun () -> read_console id name ring f >|= fun () -> fifos `Close) ; Ok () let subscribe s id = @@ -110,7 +113,7 @@ let send_history s r id since = | Error _ -> Vmm_lwt.safe_close s) entries -let handle s addr () = +let handle s addr = Logs.info (fun m -> m "handling connection %a" Vmm_lwt.pp_sockaddr addr) ; let rec loop () = Vmm_lwt.read_wire s >>= function @@ -156,18 +159,17 @@ let handle s addr () = Vmm_lwt.safe_close s >|= fun () -> Logs.warn (fun m -> m "disconnected") -let jump _ file = +let m = Albatross_cli.conn_metrics "unix" + +let jump _ influx = Sys.(set_signal sigpipe Signal_ignore) ; Lwt_main.run - ((Lwt_unix.file_exists file >>= function - | true -> Lwt_unix.unlink file - | false -> Lwt.return_unit) >>= fun () -> - let s = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in - Lwt_unix.(bind s (ADDR_UNIX file)) >>= fun () -> - Lwt_unix.listen s 1 ; + (Albatross_cli.init_influx "albatross_console" influx; + Vmm_lwt.server_socket `Console >>= fun s -> let rec loop () = Lwt_unix.accept s >>= fun (cs, addr) -> - Lwt.async (handle cs addr) ; + m `Open; + Lwt.async (fun () -> handle cs addr >|= fun () -> m `Close) ; loop () in loop ()) @@ -176,12 +178,8 @@ open Cmdliner open Albatross_cli -let socket = - let doc = "socket to use" in - Arg.(value & opt string (Vmm_core.socket_path `Console) & info [ "socket" ] ~doc) - let cmd = - Term.(term_result (const jump $ setup_log $ socket)), + Term.(term_result (const jump $ setup_log $ influx)), Term.info "albatross_console" ~version:"%%VERSION_NUM%%" let () = match Term.eval cmd with `Ok () -> exit 0 | _ -> exit 1 diff --git a/daemon/albatross_influx.ml b/daemon/albatross_influx.ml index 9823db9..4f1351c 100644 --- a/daemon/albatross_influx.ml +++ b/daemon/albatross_influx.ml @@ -174,28 +174,21 @@ let safe_close s = Logs.err (fun m -> m "exception %s while closing" (Printexc.to_string e)) ; Lwt.return_unit) -let rec read_sock_write_tcp c ?fd addr addrtype = +let rec read_sock_write_tcp c ?fd addr = match fd with | None -> - Logs.debug (fun m -> m "new connection to TCP") ; - let fd = Lwt_unix.socket addrtype Lwt_unix.SOCK_STREAM 0 in - Lwt_unix.setsockopt fd Lwt_unix.SO_KEEPALIVE true ; - Lwt.catch - (fun () -> - Lwt_unix.connect fd addr >|= fun () -> - Logs.debug (fun m -> m "connected to TCP") ; - Some fd) - (fun e -> - let addr', port = match addr with - | Lwt_unix.ADDR_INET (ip, port) -> Unix.string_of_inet_addr ip, port - | Lwt_unix.ADDR_UNIX addr -> addr, 0 - in - Logs.warn (fun m -> m "error %s connecting to influxd %s:%d, retrying in 5s" - (Printexc.to_string e) addr' port) ; - safe_close fd >>= fun () -> - Lwt_unix.sleep 5.0 >|= fun () -> - None) >>= fun fd -> - read_sock_write_tcp c ?fd addr addrtype + begin + Logs.debug (fun m -> m "new connection to TCP") ; + Vmm_lwt.connect Lwt_unix.PF_INET addr >>= function + | None -> + Logs.warn (fun m -> m "error connecting to influxd %a, retrying in 5s" + Vmm_lwt.pp_sockaddr addr); + Lwt_unix.sleep 5.0 >>= fun () -> + read_sock_write_tcp c addr + | Some fd -> + Lwt_unix.setsockopt fd Lwt_unix.SO_KEEPALIVE true ; + read_sock_write_tcp c ~fd addr + end | Some fd -> Logs.debug (fun m -> m "reading from unix socket") ; Vmm_lwt.read_wire c >>= function @@ -223,7 +216,7 @@ let rec read_sock_write_tcp c ?fd addr addrtype = Vmm_lwt.write_raw fd (Bytes.unsafe_of_string out) >>= function | Ok () -> Logs.debug (fun m -> m "wrote successfully") ; - read_sock_write_tcp c ~fd addr addrtype + read_sock_write_tcp c ~fd addr | Error e -> Logs.err (fun m -> m "error %s while writing to tcp (%s)" (str_of_e e) name) ; @@ -233,7 +226,7 @@ let rec read_sock_write_tcp c ?fd addr addrtype = | Ok wire -> Logs.warn (fun m -> m "ignoring %a" Vmm_commands.pp_wire wire) ; Lwt.return (Some fd) >>= fun fd -> - read_sock_write_tcp c ?fd addr addrtype + read_sock_write_tcp c ?fd addr let query_sock vm c = let header = Vmm_commands.{ version = my_version ; sequence = !command ; name = vm } in @@ -241,78 +234,67 @@ let query_sock vm c = Logs.debug (fun m -> m "%Lu requesting %a via socket" !command Name.pp vm) ; Vmm_lwt.write_wire c (header, `Command (`Stats_cmd `Stats_subscribe)) -let rec maybe_connect stat_socket = - let c = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in - Lwt.catch - (fun () -> - Logs.debug (fun m -> m "connecting to %s" stat_socket) ; - Lwt_unix.(connect c (ADDR_UNIX stat_socket)) >>= fun () -> - Logs.debug (fun m -> m "connected") ; - Lwt.return c) - (fun e -> - Logs.warn (fun m -> m "error %s connecting to socket %s" - (Printexc.to_string e) stat_socket) ; - safe_close c >>= fun () -> - Lwt_unix.sleep (float_of_int 5) >>= fun () -> - maybe_connect stat_socket) +let rec maybe_connect () = + let sockaddr = Lwt_unix.ADDR_UNIX (socket_path `Stats) in + Logs.debug (fun m -> m "connecting to %a" Vmm_lwt.pp_sockaddr sockaddr); + Vmm_lwt.connect Lwt_unix.PF_UNIX sockaddr >>= function + | None -> + Logs.warn (fun m -> m "error connecting to socket %a" Vmm_lwt.pp_sockaddr sockaddr); + Lwt_unix.sleep 5. >>= fun () -> + maybe_connect () + | Some c -> + Logs.debug (fun m -> m "connected"); + Lwt.return c -let client stat_socket influxhost influxport vm = - (* figure out address of influx *) - Lwt_unix.gethostbyname influxhost >>= fun host_entry -> - let host_inet_addr = Array.get host_entry.Lwt_unix.h_addr_list 0 in - let addr = Lwt_unix.ADDR_INET (host_inet_addr, influxport) - and addrtype = host_entry.Lwt_unix.h_addrtype - in +let client influx vm = + match influx with + | None -> Lwt.return (Error (`Msg "influx host not provided")) + | Some (ip, port) -> + let addr = Lwt_unix.ADDR_INET (Ipaddr_unix.V4.to_inet_addr ip, port) in - (* loop *) - (* the query task queries the stat_socket at each - - if this fails, closing is set to true (and unit is returned) + (* loop *) + (* the query task queries the stat_socket at each + - if this fails, closing is set to true (and unit is returned) - the read_sock reads the stat_socket, and forwards to a TCP socket - - if closing is true, the TCP socket is closed and unit is returned - - if read on the unix domain socket fails, closing is set to true + the read_sock reads the stat_socket, and forwards to a TCP socket + - if closing is true, the TCP socket is closed and unit is returned + - if read on the unix domain socket fails, closing is set to true (and unit is returned) *) - (* connection to the unix domain socket is managed in this loop only: - - maybe_connect attempts to establishes to it - - query_sock/read_sock_write_tcp write an read from it - - on failure in read or write, the TCP connection is closed, and loop + (* connection to the unix domain socket is managed in this loop only: + - maybe_connect attempts to establishes to it + - query_sock/read_sock_write_tcp write an read from it + - on failure in read or write, the TCP connection is closed, and loop takes control: safe_close, maybe_connect, rinse, repeat *) let rec loop () = (* start a socket connection to vmm_stats *) - maybe_connect stat_socket >>= fun c -> + maybe_connect () >>= fun c -> query_sock vm c >>= function | Error e -> - Logs.err (fun m -> m "error %s while writing to stat socket" (str_of_e e)) ; - Lwt.return_unit + let err = + Rresult.R.error_msgf "error %s while writing to stat socket" (str_of_e e) + in + Lwt.return err | Ok () -> - read_sock_write_tcp c addr addrtype >>= fun restart -> - if restart then loop () else Lwt.return_unit + read_sock_write_tcp c addr >>= fun restart -> + if restart then loop () else Lwt.return (Ok ()) in loop () -let run_client _ socket (influxhost, influxport) vm = +let run_client _ influx vm = Sys.(set_signal sigpipe Signal_ignore) ; - Lwt_main.run (client socket influxhost influxport vm) + Lwt_main.run (client influx vm) open Cmdliner open Albatross_cli -let socket = - let doc = "socket to use" in - Arg.(value & opt string (Vmm_core.socket_path `Stats) & info [ "socket" ] ~doc) - -let influx = - Arg.(required & pos 0 (some host_port) None & info [] ~docv:"INFLUXHOST:INFLUXPORT" - ~doc:"the influx hostname:port to connect to") - let cmd = let doc = "Albatross Influx connector" in let man = [ `S "DESCRIPTION" ; `P "$(tname) connects to a albatross stats socket, pulls statistics and pushes them via TCP to influxdb" ] in - Term.(pure run_client $ setup_log $ socket $ influx $ opt_vm_name), + Term.(term_result (const run_client $ setup_log $ influx $ opt_vm_name)), Term.info "albatross_influx" ~version:"%%VERSION_NUM%%" ~doc ~man let () = diff --git a/daemon/albatross_log.ml b/daemon/albatross_log.ml index f108dec..84a134f 100644 --- a/daemon/albatross_log.ml +++ b/daemon/albatross_log.ml @@ -120,7 +120,7 @@ let read_data mvar ring s = in loop () -let handle mvar ring s addr () = +let handle mvar ring s addr = Logs.info (fun m -> m "handling connection from %a" Vmm_lwt.pp_sockaddr addr) ; Vmm_lwt.read_wire s >>= begin function | Error _ -> @@ -159,15 +159,13 @@ let handle mvar ring s addr () = end >>= fun () -> Vmm_lwt.safe_close s -let jump _ file sock = +let m = Albatross_cli.conn_metrics "unix" + +let jump _ file influx = Sys.(set_signal sigpipe Signal_ignore) ; Lwt_main.run - ((Lwt_unix.file_exists sock >>= function - | true -> Lwt_unix.unlink sock - | false -> Lwt.return_unit) >>= fun () -> - let s = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in - Lwt_unix.(bind s (ADDR_UNIX sock)) >>= fun () -> - Lwt_unix.listen s 1 ; + (Albatross_cli.init_influx "albatross_log" influx; + Vmm_lwt.server_socket `Log >>= fun s -> let ring = Vmm_ring.create `Startup () in read_from_file file >>= fun entries -> Logs.app (fun m -> m "read %d entries from disk" (List.length entries)) ; @@ -181,7 +179,8 @@ let jump _ file sock = Vmm_ring.write ring start ; let rec loop () = Lwt_unix.accept s >>= fun (cs, addr) -> - Lwt.async (handle mvar ring cs addr) ; + m `Open; + Lwt.async (fun () -> handle mvar ring cs addr >|= fun () -> m `Close) ; loop () in loop ()) @@ -189,16 +188,12 @@ let jump _ file sock = open Cmdliner open Albatross_cli -let socket = - let doc = "socket to use" in - Arg.(value & opt string (Vmm_core.socket_path `Log) & info [ "socket" ] ~doc) - let file = let doc = "File to write the log to" in Arg.(value & opt string "/var/log/albatross" & info [ "logfile" ] ~doc) let cmd = - Term.(term_result (const jump $ setup_log $ file $ socket)), + Term.(term_result (const jump $ setup_log $ file $ influx)), Term.info "albatross_log" ~version:"%%VERSION_NUM%%" let () = match Term.eval cmd with `Ok () -> exit 0 | _ -> exit 1 diff --git a/daemon/albatrossd.ml b/daemon/albatrossd.ml index e834ff6..56f5c40 100644 --- a/daemon/albatrossd.ml +++ b/daemon/albatrossd.ml @@ -4,20 +4,6 @@ open Albatross_cli open Vmm_core -type stats = { - start : Ptime.t ; - vm_created : int ; - vm_destroyed : int ; -} - -let s = ref { start = Ptime_clock.now () ; vm_created = 0 ; vm_destroyed = 0 } - -let pp_stats ppf s = - let diff = Ptime.(diff (Ptime_clock.now ()) s.start) in - Fmt.pf ppf "up %a: %d vms created, %d vms destroyed, %d running" - Ptime.Span.pp diff - s.vm_created s.vm_destroyed (s.vm_created - s.vm_destroyed) - open Lwt.Infix let version = `AV3 @@ -36,12 +22,10 @@ let create stat_out log_out cons_out data_out cons succ_cont fail_cont = data_out data | Ok (state', stat, log, data, name, vm) -> state := state' ; - s := { !s with vm_created = succ !s.vm_created } ; Lwt.async (fun () -> Vmm_lwt.wait_and_clear vm.Unikernel.pid >>= fun r -> let state', stat', log' = Vmm_vmmd.handle_shutdown !state name vm r in state := state' ; - s := { !s with vm_destroyed = succ !s.vm_destroyed } ; stat_out "handle shutdown stat" stat' >>= fun () -> log_out "handle shutdown log" log' >|= fun () -> let state', waiter_opt = Vmm_vmmd.waiter !state name in @@ -116,32 +100,9 @@ let handle log_out cons_out stat_out fd addr = let connect_client_socket sock = let name = socket_path sock in - let c = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in - Lwt_unix.set_close_on_exec c ; - Lwt.catch (fun () -> - Lwt_unix.(connect c (ADDR_UNIX name)) >|= fun () -> - Some (c, Lwt_mutex.create ())) - (fun e -> - Logs.warn (fun m -> m "error %s connecting to socket %s" - (Printexc.to_string e) name) ; - (Lwt.catch (fun () -> Lwt_unix.close c) (fun _ -> Lwt.return_unit)) >|= fun () -> - None) - -let server_socket sock = - let name = socket_path sock in - (Lwt_unix.file_exists name >>= function - | true -> Lwt_unix.unlink name - | false -> Lwt.return_unit) >>= fun () -> - let s = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in - Lwt_unix.set_close_on_exec s ; - Lwt_unix.(bind s (ADDR_UNIX name)) >|= fun () -> - Lwt_unix.listen s 1 ; - s - -let rec stats_loop () = - Logs.info (fun m -> m "%a" pp_stats !s) ; - Lwt_unix.sleep 600. >>= fun () -> - stats_loop () + Vmm_lwt.connect Lwt_unix.PF_UNIX (Lwt_unix.ADDR_UNIX name) >|= function + | None -> None + | Some x -> Some (x, Lwt_mutex.create ()) let write_reply name (fd, mut) txt (header, cmd) = Logs.debug (fun m -> m "locking to write to %s" name) ; @@ -179,13 +140,16 @@ let write_reply name (fd, mut) txt (header, cmd) = Logs.err (fun m -> m "error in read from %s" name) ; invalid_arg "communication failure" -let jump _ = +let m = conn_metrics "unix" + +let jump _ influx = Sys.(set_signal sigpipe Signal_ignore); match Vmm_vmmd.restore_unikernels () with | Error (`Msg msg) -> Logs.err (fun m -> m "bailing out: %s" msg) | Ok old_unikernels -> Lwt_main.run - (server_socket `Vmmd >>= fun ss -> + (init_influx "albatross" influx; + Vmm_lwt.server_socket `Vmmd >>= fun ss -> (connect_client_socket `Log >|= function | None -> invalid_arg "cannot connect to log socket" | Some l -> l) >>= fun l -> @@ -214,8 +178,6 @@ let jump _ = | Some s -> write_reply "stat" s txt wire >|= fun _ -> () in - Lwt.async stats_loop ; - let start_unikernel (name, config) = let hdr = Vmm_commands.{ version ; sequence = 0L ; name = Name.root } and data_out _ = Lwt.return_unit @@ -234,7 +196,10 @@ let jump _ = let rec loop () = Lwt_unix.accept ss >>= fun (fd, addr) -> Lwt_unix.set_close_on_exec fd ; - Lwt.async (fun () -> handle log_out cons_out stat_out fd addr) ; + m `Open; + Lwt.async (fun () -> + handle log_out cons_out stat_out fd addr >|= fun () -> + m `Close) ; loop () in loop ()) @@ -245,7 +210,7 @@ let jump _ = open Cmdliner let cmd = - Term.(const jump $ setup_log), + Term.(const jump $ setup_log $ influx), Term.info "albatrossd" ~version:"%%VERSION_NUM%%" let () = match Term.eval cmd with `Ok () -> exit 0 | _ -> exit 1 diff --git a/daemon/dune b/daemon/dune index 3f94d9f..832d3c6 100644 --- a/daemon/dune +++ b/daemon/dune @@ -3,7 +3,7 @@ (public_name albatrossd) (package albatross) (modules albatrossd) - (libraries albatross.cli albatross)) + (libraries albatross.cli albatross metrics-lwt metrics-influx)) (executable (name albatross_console) diff --git a/packaging/MANIFEST b/packaging/MANIFEST index fb2a4d8..9ff5205 100644 --- a/packaging/MANIFEST +++ b/packaging/MANIFEST @@ -41,8 +41,6 @@ EOD; mkdir -p /var/run/albatross/util /var/run/albatross/fifo chown albatross:albatross /var/run/albatross/util /var/run/albatross/fifo chmod 2760 /var/run/albatross/fifo -chgrp albatross /usr/local/libexec/albatross/albatrossd -chmod 2750 /usr/local/libexec/albatross/albatrossd EOD; post-deinstall = < Fpath.(sockdir / "console" + "sock") - | `Vmmd -> Fpath.(tmpdir / "vmmd" + "sock") - | `Stats -> Fpath.(sockdir / "stat" + "sock") - | `Log -> Fpath.(sockdir / "log" + "sock") + | `Console -> "console" + | `Vmmd -> "vmmd" + | `Stats -> "stat" + | `Log -> "log" in - Fpath.to_string path + Fpath.to_string Fpath.(sockdir / path + "sock") let pp_socket ppf t = let name = socket_path t in @@ -53,6 +53,10 @@ module Name = struct let to_list x = x + let drop x = match List.rev x with + | [] -> [] + | _::tl -> List.rev tl + let append_exn lbl x = if valid_label lbl then x @ [ lbl ] diff --git a/src/vmm_core.mli b/src/vmm_core.mli index ad68498..fe14bd8 100644 --- a/src/vmm_core.mli +++ b/src/vmm_core.mli @@ -24,6 +24,7 @@ module Name : sig val of_list : string list -> (t, [> `Msg of string ]) result val to_list : t -> string list + val drop : t -> t val append : string -> t -> (t, [> `Msg of string ]) result val prepend : string -> t -> (t, [> `Msg of string ]) result val append_exn : string -> t -> t diff --git a/src/vmm_lwt.ml b/src/vmm_lwt.ml index 094a9a5..049db75 100644 --- a/src/vmm_lwt.ml +++ b/src/vmm_lwt.ml @@ -7,6 +7,38 @@ let pp_sockaddr ppf = function | Lwt_unix.ADDR_INET (addr, port) -> Fmt.pf ppf "TCP %s:%d" (Unix.string_of_inet_addr addr) port +let safe_close fd = + Lwt.catch + (fun () -> Lwt_unix.close fd) + (fun _ -> Lwt.return_unit) + +let server_socket sock = + let name = Vmm_core.socket_path sock in + (Lwt_unix.file_exists name >>= function + | true -> Lwt_unix.unlink name + | false -> Lwt.return_unit) >>= fun () -> + let s = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in + Lwt_unix.set_close_on_exec s ; + let old_umask = Unix.umask 0 in + let _ = Unix.umask (old_umask land 0o707) in + Lwt_unix.(bind s (ADDR_UNIX name)) >|= fun () -> + Logs.app (fun m -> m "listening on %s" name); + let _ = Unix.umask old_umask in + Lwt_unix.listen s 1 ; + s + +let connect addrtype sockaddr = + let c = Lwt_unix.(socket addrtype SOCK_STREAM 0) in + Lwt_unix.set_close_on_exec c ; + Lwt.catch (fun () -> + Lwt_unix.(connect c sockaddr) >|= fun () -> + Some c) + (fun e -> + Logs.warn (fun m -> m "error %s connecting to socket %a" + (Printexc.to_string e) pp_sockaddr sockaddr); + safe_close c >|= fun () -> + None) + let pp_process_status ppf = function | Unix.WEXITED c -> Fmt.pf ppf "exited with %d" c | Unix.WSIGNALED s -> Fmt.pf ppf "killed by signal %a" Fmt.Dump.signal s @@ -101,11 +133,6 @@ let write_wire s wire = let buf = Cstruct.(to_bytes (append dlen data)) in write_raw s buf -let safe_close fd = - Lwt.catch - (fun () -> Lwt_unix.close fd) - (fun _ -> Lwt.return_unit) - let read_from_file file = Lwt.catch (fun () -> Lwt_unix.stat file >>= fun stat -> diff --git a/src/vmm_lwt.mli b/src/vmm_lwt.mli index f47b169..9cb67a7 100644 --- a/src/vmm_lwt.mli +++ b/src/vmm_lwt.mli @@ -2,6 +2,10 @@ val pp_sockaddr : Format.formatter -> Lwt_unix.sockaddr -> unit +val server_socket : Vmm_core.service -> Lwt_unix.file_descr Lwt.t + +val connect : Lwt_unix.socket_domain -> Lwt_unix.sockaddr -> Lwt_unix.file_descr option Lwt.t + val pp_process_status : Format.formatter -> Unix.process_status -> unit val ret : Unix.process_status -> Vmm_core.process_exit diff --git a/src/vmm_resources.ml b/src/vmm_resources.ml index dbd3e97..bfb7695 100644 --- a/src/vmm_resources.ml +++ b/src/vmm_resources.ml @@ -31,18 +31,62 @@ let empty = { unikernels = Vmm_trie.empty } +let policy_metrics = + let open Metrics in + let doc = "VMM resource policies" in + let data policy = + Data.v [ + uint "maximum unikernels" policy.Policy.vms ; + uint "maximum memory" policy.Policy.memory ; + uint "maximum block" (match policy.Policy.block with None -> 0 | Some x -> x) + ] + in + let tag = Tags.string "domain" in + Src.v ~doc ~tags:Tags.[tag] ~data "vmm-policies" + +let no_policy = Policy.{ vms = 0 ; cpuids = IS.empty ; memory = 0 ; block = None ; bridges = Astring.String.Set.empty } + (* we should confirm the following invariant: Vm or Block have no siblings *) let block_usage t name = Vmm_trie.fold name t.block_devices - (fun _ (size, _) blockspace -> blockspace + size) - 0 + (fun _ (size, act) (active, inactive) -> + if act then active + size, inactive else active, inactive + size) + (0, 0) + +let total_block_usage t name = + let act, inact = block_usage t name in + act + inact let vm_usage t name = Vmm_trie.fold name t.unikernels (fun _ vm (vms, memory) -> (succ vms, memory + vm.Unikernel.config.Unikernel.memory)) (0, 0) +let unikernel_metrics = + let open Metrics in + let doc = "VMM unikernels" in + let data (t, name) = + let vms, memory = vm_usage t name + and act, inact = block_usage t name + in + Data.v [ + uint "attached used block" act ; + uint "unattached used block" inact ; + uint "total used block" (act + inact) ; + uint "running unikernels" vms ; + uint "used memory" memory + ] + in + let tag = Tags.string "domain" in + Src.v ~doc ~tags:Tags.[tag] ~data "vmm-unikernels" + +let rec report_vms t name = + let name' = Name.drop name in + let str = if Name.is_root name' then "." else Name.to_string name' in + Metrics.add unikernel_metrics (fun x -> x str) (fun d -> d (t, name')); + if Name.is_root name' then () else report_vms t name' + let find_vm t name = Vmm_trie.find name t.unikernels let find_policy t name = Vmm_trie.find name t.policies @@ -69,12 +113,15 @@ let remove_vm t name = match find_vm t name with | Some vm -> let block_devices = use_blocks t.block_devices name vm false in let unikernels = Vmm_trie.remove name t.unikernels in - Ok { t with block_devices ; unikernels } + let t' = { t with block_devices ; unikernels } in + report_vms t' name; + Ok t' let remove_policy t name = match find_policy t name with | None -> Error (`Msg "unknown policy") | Some _ -> let policies = Vmm_trie.remove name t.policies in + Metrics.add policy_metrics (fun x -> x (Name.to_string name)) (fun d -> d no_policy); Ok { t with policies } let remove_block t name = match find_block t name with @@ -84,7 +131,9 @@ let remove_block t name = match find_block t name with Error (`Msg "block device in use") else let block_devices = Vmm_trie.remove name t.block_devices in - Ok { t with block_devices } + let t' = { t with block_devices } in + report_vms t' name; + Ok t' let check_policy (p : Policy.t) (running_vms, used_memory) (vm : Unikernel.config) = if succ running_vms > p.Policy.vms then @@ -129,7 +178,9 @@ let insert_vm t name vm = let unikernels, old = Vmm_trie.insert name vm t.unikernels in (match old with None -> () | Some _ -> invalid_arg ("unikernel " ^ Name.to_string name ^ " already exists in trie")) ; let block_devices = use_blocks t.block_devices name vm true in - { t with unikernels ; block_devices } + let t' = { t with unikernels ; block_devices } in + report_vms t' name; + t' let check_block t name size = let block_ok = match find_block t name with @@ -140,7 +191,7 @@ let check_block t name size = match find_policy t dom with | None -> Ok () | Some p -> - let used = block_usage t dom in + let used = total_block_usage t dom in match p.Policy.block with | None -> Error (`Msg "no block devices are allowed by policy") | Some limit -> @@ -155,7 +206,9 @@ let check_block t name size = let insert_block t name size = check_block t name size >>= fun () -> let block_devices = fst (Vmm_trie.insert name (size, false) t.block_devices) in - Ok { t with block_devices } + let t' = { t with block_devices } in + report_vms t' name; + Ok t' let sub_policy ~super ~sub = let sub_block sub super = @@ -202,7 +255,7 @@ let check_policies_below t curname super = let check_vms t name p = let (vms, used_memory) = vm_usage t name - and block = block_usage t name + and block = total_block_usage t name in let bridges, cpuids = Vmm_trie.fold name t.unikernels @@ -231,4 +284,5 @@ let insert_policy t name p = check_policies_below t name p >>= fun () -> check_vms t name p >>= fun () -> let policies = fst (Vmm_trie.insert name p t.policies) in + Metrics.add policy_metrics (fun x -> x (Name.to_string name)) (fun d -> d p); Ok { t with policies } diff --git a/src/vmm_vmmd.ml b/src/vmm_vmmd.ml index da20171..74a7c19 100644 --- a/src/vmm_vmmd.ml +++ b/src/vmm_vmmd.ml @@ -86,7 +86,7 @@ let restore_unikernels () = match Vmm_asn.unikernels_of_cstruct data with | Error (`Msg msg) -> Error (`Msg ("couldn't parse state: " ^ msg)) | Ok unikernels -> - Logs.info (fun m -> m "restored some unikernels") ; + Logs.info (fun m -> m "restored %d unikernels" (List.length (Vmm_trie.all unikernels))) ; Ok unikernels let dump_unikernels t = diff --git a/stats/albatross_stats.ml b/stats/albatross_stats.ml index 28cf8fc..a940c42 100644 --- a/stats/albatross_stats.ml +++ b/stats/albatross_stats.ml @@ -23,7 +23,7 @@ let pp_sockaddr ppf = function | Lwt_unix.ADDR_INET (addr, port) -> Fmt.pf ppf "TCP %s:%d" (Unix.string_of_inet_addr addr) port -let handle s addr () = +let handle s addr = Logs.info (fun m -> m "handling stats connection %a" pp_sockaddr addr) ; let rec loop () = Vmm_lwt.read_wire s >>= function @@ -64,20 +64,19 @@ let timer () = Vmm_lwt.safe_close s) outs -let jump _ file interval = +let m = Albatross_cli.conn_metrics "unix" + +let jump _ interval influx = Sys.(set_signal sigpipe Signal_ignore) ; let interval = Duration.(to_f (of_sec interval)) in Lwt_main.run - ((Lwt_unix.file_exists file >>= function - | true -> Lwt_unix.unlink file - | false -> Lwt.return_unit) >>= fun () -> - let s = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in - Lwt_unix.(bind s (ADDR_UNIX file)) >>= fun () -> - Lwt_unix.listen s 1 ; + (Albatross_cli.init_influx "albatross_stats" influx; + Vmm_lwt.server_socket `Stats >>= fun s -> let _ev = Lwt_engine.on_timer interval true (fun _e -> Lwt.async timer) in let rec loop () = Lwt_unix.accept s >>= fun (cs, addr) -> - Lwt.async (handle cs addr) ; + m `Open; + Lwt.async (fun () -> handle cs addr >|= fun () -> m `Close); loop () in loop ()) @@ -85,16 +84,12 @@ let jump _ file interval = open Cmdliner open Albatross_cli -let socket = - let doc = "socket to use" in - Arg.(value & opt string (Vmm_core.socket_path `Stats) & info [ "socket" ] ~doc) - let interval = let doc = "Interval between statistics gatherings (in seconds)" in Arg.(value & opt int 10 & info [ "interval" ] ~doc) let cmd = - Term.(term_result (const jump $ setup_log $ socket $ interval)), + Term.(term_result (const jump $ setup_log $ interval $ influx)), Term.info "albatross_stats" ~version:"%%VERSION_NUM%%" let () = match Term.eval cmd with `Ok () -> exit 0 | _ -> exit 1 diff --git a/stats/albatross_stats_pure.ml b/stats/albatross_stats_pure.ml index c90ec11..6b680be 100644 --- a/stats/albatross_stats_pure.ml +++ b/stats/albatross_stats_pure.ml @@ -46,6 +46,8 @@ let rec wrap f arg = Logs.err (fun m -> m "exception %s" (Printexc.to_string e)) ; None +let vmmapi = Albatross_cli.conn_metrics "vmmapi" + let remove_vmid t vmid = Logs.info (fun m -> m "removing vmid %a" Vmm_core.Name.pp vmid) ; match Vmm_trie.find vmid t.vmid_pid with @@ -53,7 +55,7 @@ let remove_vmid t vmid = | Some pid -> Logs.info (fun m -> m "removing pid %d" pid) ; (match IM.find_opt pid t.pid_nic with - | Some (Ok vmctx, _, _) -> ignore (wrap vmmapi_close vmctx) + | Some (Ok vmctx, _, _) -> ignore (wrap vmmapi_close vmctx) ; vmmapi `Close | _ -> ()) ; let pid_nic = IM.remove pid t.pid_nic and vmid_pid = Vmm_trie.remove vmid t.vmid_pid @@ -84,6 +86,7 @@ let open_vmmapi ~retries name = Logs.warn (fun m -> m "(ignored, %d attempts left) vmmapi_open failed for %s" left name) ; Error left | Some vmctx -> + vmmapi `Open; Logs.info (fun m -> m "vmmapi_open succeeded for %s" name) ; fill_descr vmctx ; Ok vmctx diff --git a/stats/dune b/stats/dune index 4879890..d11d535 100644 --- a/stats/dune +++ b/stats/dune @@ -7,7 +7,7 @@ let () = (library (name albatross_stats) (public_name albatross.stats) - (libraries albatross) + (libraries albatross albatross.cli) (wrapped false) (c_names albatross_stats_stubs) (modules albatross_stats_pure)) diff --git a/tls/albatross_tls_common.ml b/tls/albatross_tls_common.ml index 7abc9a3..0844736 100644 --- a/tls/albatross_tls_common.ml +++ b/tls/albatross_tls_common.ml @@ -15,11 +15,6 @@ let tls_config cacert cert priv_key = ~reneg:true ~certificates:(`Single cert) ()), ca) -let connect socket_path = - let c = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in - Lwt_unix.connect c (Lwt_unix.ADDR_UNIX socket_path) >|= fun () -> - c - let client_auth ca tls = let authenticator = let time = Ptime_clock.now () in @@ -66,56 +61,63 @@ let handle ca tls = | Error (`Msg m) -> Lwt.fail_with m | Ok (name, policies, cmd) -> let sock, next = Vmm_commands.endpoint cmd in - connect (Vmm_core.socket_path sock) >>= fun fd -> - (match sock with - | `Vmmd -> - Lwt_list.fold_left_s (fun r (id, policy) -> - match r with - | Error (`Msg msg) -> Lwt.return (Error (`Msg msg)) - | Ok () -> - Logs.debug (fun m -> m "adding policy for %a: %a" Vmm_core.Name.pp id Vmm_core.Policy.pp policy) ; - let header = Vmm_commands.{version = my_version ; sequence = !command ; name = id } in - command := Int64.succ !command ; - Vmm_lwt.write_wire fd (header, `Command (`Policy_cmd (`Policy_add policy))) >>= function - | Error `Exception -> Lwt.return (Error (`Msg "failed to write policy")) - | Ok () -> - Vmm_lwt.read_wire fd >|= function - (* TODO check version *) - | Error _ -> Error (`Msg "read error after writing policy") - | Ok (_, `Success _) -> Ok () - | Ok wire -> - Rresult.R.error_msgf - "expected success when adding policy, got: %a" - Vmm_commands.pp_wire wire) - (Ok ()) policies - | _ -> Lwt.return (Ok ())) >>= function - | Error (`Msg msg) -> - begin - Logs.warn (fun m -> m "error while applying policies %s" msg) ; - let wire = - let header = Vmm_commands.{version = my_version ; sequence = 0L ; name } in - header, `Failure msg - in - Vmm_tls_lwt.write_tls tls wire >>= fun _ -> - Vmm_lwt.safe_close fd >>= fun () -> - Lwt.fail_with msg - end - | Ok () -> - let wire = - let header = Vmm_commands.{version = my_version ; sequence = !command ; name } in - command := Int64.succ !command ; - (header, `Command cmd) + let sockaddr = Lwt_unix.ADDR_UNIX (Vmm_core.socket_path sock) in + Vmm_lwt.connect Lwt_unix.PF_UNIX sockaddr >>= function + | None -> + let err = + Rresult.R.error_msgf "failed to connect to %a" Vmm_lwt.pp_sockaddr sockaddr in - Vmm_lwt.write_wire fd wire >>= function - | Error `Exception -> - Vmm_lwt.safe_close fd >>= fun () -> - Lwt.return (Error (`Msg "couldn't write")) + Lwt.return err + | Some fd -> + (match sock with + | `Vmmd -> + Lwt_list.fold_left_s (fun r (id, policy) -> + match r with + | Error (`Msg msg) -> Lwt.return (Error (`Msg msg)) + | Ok () -> + Logs.debug (fun m -> m "adding policy for %a: %a" Vmm_core.Name.pp id Vmm_core.Policy.pp policy) ; + let header = Vmm_commands.{version = my_version ; sequence = !command ; name = id } in + command := Int64.succ !command ; + Vmm_lwt.write_wire fd (header, `Command (`Policy_cmd (`Policy_add policy))) >>= function + | Error `Exception -> Lwt.return (Error (`Msg "failed to write policy")) + | Ok () -> + Vmm_lwt.read_wire fd >|= function + (* TODO check version *) + | Error _ -> Error (`Msg "read error after writing policy") + | Ok (_, `Success _) -> Ok () + | Ok wire -> + Rresult.R.error_msgf + "expected success when adding policy, got: %a" + Vmm_commands.pp_wire wire) + (Ok ()) policies + | _ -> Lwt.return (Ok ())) >>= function + | Error (`Msg msg) -> + begin + Logs.warn (fun m -> m "error while applying policies %s" msg) ; + let wire = + let header = Vmm_commands.{version = my_version ; sequence = 0L ; name } in + header, `Failure msg + in + Vmm_tls_lwt.write_tls tls wire >>= fun _ -> + Vmm_lwt.safe_close fd >>= fun () -> + Lwt.fail_with msg + end | Ok () -> - (match next with - | `Read -> read fd tls - | `End -> process fd tls) >>= fun res -> - Vmm_lwt.safe_close fd >|= fun () -> - res + let wire = + let header = Vmm_commands.{version = my_version ; sequence = !command ; name } in + command := Int64.succ !command ; + (header, `Command cmd) + in + Vmm_lwt.write_wire fd wire >>= function + | Error `Exception -> + Vmm_lwt.safe_close fd >>= fun () -> + Lwt.return (Error (`Msg "couldn't write")) + | Ok () -> + (match next with + | `Read -> read fd tls + | `End -> process fd tls) >>= fun res -> + Vmm_lwt.safe_close fd >|= fun () -> + res open Cmdliner