diff --git a/README.md b/README.md index 96d2a47..c035f56 100644 --- a/README.md +++ b/README.md @@ -71,15 +71,15 @@ DEV> mirage configure -t ukvm DEV> mirage build DEV> mv ukvm-bin /tmp/ukvm-bin.net DEV> cd ../../.. -DEV> COPY /tmp/ukvm-bin.none /tmp/ukvm-bin.net SRV: -DEV> COPY vmm_console vmm_log vmm_stats_lwt vmmd SRV: +DEV> COPY /tmp/ukvm-bin.none /tmp/ukvm-bin.net SRV:/var/db/albatross +DEV> COPY vmm_console vmm_log vmm_stats_lwt vmmd SRV:/opt/bin/ ``` ``` -SRV> vmm_console -vv cons.sock & -SRV> vmm_log -vv log.out log.sock & -SRV> vmm_stats_lwt -vv stat.sock & #optional -SRV# vmmd -vv . cacert.pem server.pem server.key +SRV> vmm_console -vv & +SRV> vmm_log -vv & +SRV> vmm_stats_lwt -vv & #optional +SRV# vmmd -vv cacert.pem server.pem server.key ``` Some setup for network interfaces is needed, depending on your operating system. diff --git a/app/vmm_console.ml b/app/vmm_console.ml index f6f4fbf..8a3f059 100644 --- a/app/vmm_console.ml +++ b/app/vmm_console.ml @@ -56,7 +56,7 @@ let read_console s name ring channel () = Lwt_io.close channel) let open_fifo name = - let fifo = Fpath.(v (Filename.get_temp_dir_name ()) / name + "fifo") in + let fifo = Fpath.(Vmm_core.tmpdir / name + "fifo") in Lwt.catch (fun () -> Logs.debug (fun m -> m "opening %a for reading" Fpath.pp fifo) ; Lwt_io.open_file ~mode:Lwt_io.Input (Fpath.to_string fifo) >>= fun channel -> @@ -152,7 +152,10 @@ let handle s addr () = let jump _ file = Sys.(set_signal sigpipe Signal_ignore) ; Lwt_main.run - (let s = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in + ((Lwt_unix.file_exists file >>= function + | true -> Lwt_unix.unlink file + | false -> Lwt.return_unit) >>= fun () -> + let s = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in Lwt_unix.(bind s (ADDR_UNIX file)) >>= fun () -> Lwt_unix.listen s 1 ; let rec loop () = @@ -175,8 +178,9 @@ let setup_log = $ Logs_cli.level ()) let socket = - let doc = "Socket to listen onto" in - Arg.(required & pos 0 (some string) None & info [] ~doc) + let doc = "Socket to listen on" in + let sock = Fpath.(to_string (Vmm_core.tmpdir / "cons" + "sock")) in + Arg.(value & opt string sock & info [ "s" ; "socket" ] ~doc) let cmd = Term.(ret (const jump $ setup_log $ socket)), diff --git a/app/vmm_influxdb_stats.ml b/app/vmm_influxdb_stats.ml new file mode 100644 index 0000000..242c1b2 --- /dev/null +++ b/app/vmm_influxdb_stats.ml @@ -0,0 +1,285 @@ +(* (c) 2018 Hannes Mehnert, all rights reserved *) + +open Lwt.Infix + +open Astring + +open Vmm_core + + +(* +line protocol: +``` + [,=[,=]] \ + =[,=] [] +``` + +(measurement, tag_key, tag_value, field_key are all strings, index over tags) + +* Quoting + +Element Double quotes Single quotes +--------------------------------------------- +Timestamp Never Never +Measurements, tag keys, tag values, field keys Never* Never* +Field values Double quote string field values. Do not double quote floats, + integers, or Booleans. Never + +*=Line Protocol allows users to double and single quote measurement names, tag + keys, tag values, and field keys. It will, however, assume that the double or + single quotes are part of the name, key, or value. This can complicate query + syntax (see the example below). + + +Float IEEE-754 64-bit floating-point numbers. This is the default numerical + type. Examples: 1, 1.0, 1.e+78, 1.E+78. +Integer Signed 64-bit integers (-9223372036854775808 to 9223372036854775807). + Specify an integer with a trailing i on the number. Example: 1i. + +For tag keys, tag values, and field keys always use a backslash character \ to +escape: + + commas , + equal signs = + spaces + +For measurements always use a backslash character \ to escape: + + commas , + spaces + +For string field values use a backslash character \ to escape: + + double quotes "" + +Line Protocol does not require users to escape the backslash character \. Users +do not need to escape all other special characters. + +do not use any keywords: +ALL ALTER ANY AS ASC BEGIN +BY CREATE CONTINUOUS DATABASE DATABASES DEFAULT +DELETE DESC DESTINATIONS DIAGNOSTICS DISTINCT DROP +DURATION END EVERY EXPLAIN FIELD FOR +FROM GRANT GRANTS GROUP GROUPS IN +INF INSERT INTO KEY KEYS KILL +LIMIT SHOW MEASUREMENT MEASUREMENTS NAME OFFSET +ON ORDER PASSWORD POLICY POLICIES PRIVILEGES +QUERIES QUERY READ REPLICATION RESAMPLE RETENTION +REVOKE SELECT SERIES SET SHARD SHARDS +SLIMIT SOFFSET STATS SUBSCRIPTION SUBSCRIPTIONS TAG +TO USER USERS VALUES WHERE WITH +WRITE +*) + +module P = struct + let tv (sec, usec) = Printf.sprintf "%Lu.%06d" sec usec + + (* TODO: this should use an unsigned to string function *) + let i64 i = Int64.to_string i ^ "i" + + let encode_ru vm ru = + let fields = + [ "utime", tv ru.utime ; + "stime", tv ru.stime ; + "maxrss", i64 ru.maxrss ; + "ixrss", i64 ru.ixrss ; + "idrss", i64 ru.idrss ; + "isrss", i64 ru.isrss ; + "minflt", i64 ru.minflt ; + "maxflt", i64 ru.majflt ; + "nswap", i64 ru.nswap ; + "inblock", i64 ru.inblock ; + "outblock", i64 ru.outblock ; + "msgsnd", i64 ru.msgsnd ; + "msgrcv", i64 ru.msgrcv ; + "nsignals", i64 ru.nsignals ; + "nvcsw", i64 ru.nvcsw ; + "nivcsw", i64 ru.nivcsw + ] + in + let fields = List.map (fun (k, v) -> k ^ "=" ^ v) fields in + Printf.sprintf "resource_usage,vm=%s %s" vm (String.concat ~sep:"," fields) + + let encode_vmm vm xs = + let escape s = + let cutted = String.cuts ~sep:"," s in + let cutted = String.concat ~sep:"\\," cutted in + let cutted = String.cuts ~sep:" " cutted in + let cutted = String.concat ~sep:"\\ " cutted in + let cutted = String.cuts ~sep:"=" cutted in + String.concat ~sep:"\\=" cutted + in + Printf.sprintf "vmm,vm=%s %s" vm + (String.concat ~sep:"," + (List.map (fun (k, v) -> (escape k) ^ "=" ^ (i64 v)) xs)) + + let i32 i = Int32.to_string i ^ "i" + + let encode_if vm ifd = + let fields = + (* TODO: flags *) + [ "send_queue_length", i32 ifd.send_length ; + "max_send_queue_length", i32 ifd.max_send_length ; + "send_queue_drops", i32 ifd.send_drops ; + "mtu", i32 ifd.mtu ; + "baudrate", i64 ifd.baudrate ; + "vm_to_host_packets", i64 ifd.input_packets ; + "vm_to_host_errors", i64 ifd.input_errors ; + "vm_to_host_bytes", i64 ifd.input_bytes ; + "vm_to_host_mcast", i64 ifd.input_mcast ; + "vm_to_host_dropped", i64 ifd.input_dropped ; + "collisions", i64 ifd.collisions ; + "host_to_vm_packets", i64 ifd.output_packets ; + "host_to_vm_errors", i64 ifd.output_errors ; + "host_to_vm_bytes", i64 ifd.output_bytes ; + "host_to_vm_mcast", i64 ifd.output_mcast ; + "host_to_vm_dropped", i64 ifd.output_dropped + ] + in + let fields = List.map (fun (k, v) -> k ^ "=" ^ v) fields in + Printf.sprintf "interface,vm=%s,ifname=%s %s" + vm ifd.name (String.concat ~sep:"," fields) +end + +let my_version = `WV1 + +let command = ref 1 + +let (req : string IM.t ref) = ref IM.empty + +let rec read_sock db c fd = + let open Vmm_wire in + Vmm_lwt.read_exactly c >>= function + | Error _ -> Lwt.return_unit + | Ok (hdr, data) -> + if not (version_eq hdr.version my_version) then begin + Logs.err (fun m -> m "unknown wire protocol version") ; Lwt.return_unit + end else + let name = IM.find hdr.id !req in + req := IM.remove hdr.id !req ; + match Stats.int_to_op hdr.tag with + | Some Stats.Stat_reply -> + begin match Vmm_wire.Stats.decode_stats (Cstruct.of_string data) with + | Error (`Msg msg) -> + Logs.warn (fun m -> m "couldn't decode stats for %s: %s" name msg) ; + read_sock db c fd + | Ok (ru, vmm, ifs) -> + let ru = P.encode_ru name ru in + let vmm = P.encode_vmm name vmm in + let taps = List.map (P.encode_if name) ifs in + let out = String.concat ~sep:"\n" (ru :: vmm :: taps @ [ "" ]) in + Logs.info (fun m -> m "result: %s" out) ; + Vmm_lwt.write_raw fd out >>= function + | Ok () -> read_sock db c fd + | Error _ -> invalid_arg "failed to write via TCP" + end + | _ when hdr.tag = fail_tag -> + Logs.err (fun m -> m "failed to retrieve statistics for %s" name) ; + read_sock db c fd + | _ -> + Logs.err (fun m -> m "unhandled tag %d for %s" hdr.tag name) ; + read_sock db c fd + +let rec query_sock prefix db c interval = + (* query c for everyone in db *) + Lwt_list.iter_s (fun (id, name) -> + let id = identifier id in + let id = match prefix with None -> id | Some p -> p ^ "." ^ id in + let request = Vmm_wire.Stats.stat !command my_version id in + req := IM.add !command name !req ; + incr command ; + Vmm_lwt.write_raw c request >>= function + | Ok () -> Lwt.return_unit + | Error _ -> Lwt.fail_with "exception while writing") + db >>= fun () -> + Lwt_unix.sleep (float_of_int interval) >>= fun () -> + query_sock prefix db c interval + +let client stat_socket influxhost influxport db prefix interval = + (* start a socket connection to vmm_stats *) + let c = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in + Lwt_unix.set_close_on_exec c ; + Lwt.catch (fun () -> Lwt_unix.(connect c (ADDR_UNIX stat_socket))) + (fun e -> + Logs.warn (fun m -> m "error %s connecting to socket %s" + (Printexc.to_string e) stat_socket) ; + invalid_arg "cannot connect to stat socket") >>= fun () -> + + (* setup remote connection to influx *) + Lwt_unix.gethostbyname influxhost >>= fun host_entry -> + let host_inet_addr = Array.get host_entry.Lwt_unix.h_addr_list 0 in + let fd = Lwt_unix.socket host_entry.Lwt_unix.h_addrtype Lwt_unix.SOCK_STREAM 0 in + Lwt_unix.setsockopt fd Lwt_unix.SO_KEEPALIVE true ; + Lwt_unix.connect fd (Lwt_unix.ADDR_INET (host_inet_addr, influxport)) >>= fun () -> + + (* loop *) + Lwt.join [ query_sock prefix db c interval ; read_sock db c fd ] + +let run_client _ socket (influxhost, influxport) db prefix interval = + Sys.(set_signal sigpipe Signal_ignore) ; + let db = + let open Rresult.R.Infix in + match Bos.OS.File.read_lines (Fpath.v db) >>= parse_db with + | Ok [] -> invalid_arg "empty database" + | Ok db -> db + | Error (`Msg m) -> + invalid_arg ("couldn't parse database " ^ m) + in + Lwt_main.run (client socket influxhost influxport db prefix interval) + +let setup_log style_renderer level = + Fmt_tty.setup_std_outputs ?style_renderer (); + Logs.set_level level; + Logs.set_reporter (Logs_fmt.reporter ~dst:Format.std_formatter ()) + +open Cmdliner + +let setup_log = + Term.(const setup_log + $ Fmt_cli.style_renderer () + $ Logs_cli.level ()) + +let host_port : (string * int) Arg.converter = + let parse s = + match String.cut ~sep:":" s with + | None -> `Error "broken: no port specified" + | Some (hostname, port) -> + try + `Ok (hostname, int_of_string port) + with + Not_found -> `Error "failed to parse port" + in + parse, fun ppf (h, p) -> Format.fprintf ppf "%s:%d" h p + +let socket = + let doc = "Stat socket to connect onto" in + Arg.(required & pos 0 (some string) None & info [] ~doc) + +let influx = + Arg.(required & pos 1 (some host_port) None & info [] ~docv:"influx" + ~doc:"the influx hostname:port to connect to") + +let db = + let doc = "VMID database" in + Arg.(required & pos 2 (some file) None & info [] ~doc) + +let prefix = + let doc = "prefix" in + Arg.(value & opt (some string) None & info [ "prefix" ] ~doc) + +let interval = + let doc = "Poll interval in seconds" in + Arg.(value & opt int 10 & info [ "interval" ] ~doc) + +let cmd = + let doc = "VMM InfluxDB connector" in + let man = [ + `S "DESCRIPTION" ; + `P "$(tname) connects to a vmm stats socket, pulls statistics and pushes them via TCP to influxdb" ] + in + Term.(pure run_client $ setup_log $ socket $ influx $ db $ prefix $ interval), + Term.info "vmm_influxdb_stats" ~version:"%%VERSION_NUM%%" ~doc ~man + +let () = + match Term.eval cmd + with `Error _ -> exit 1 | _ -> exit 0 diff --git a/app/vmm_log.ml b/app/vmm_log.ml index 6e240a0..010e4ac 100644 --- a/app/vmm_log.ml +++ b/app/vmm_log.ml @@ -120,6 +120,9 @@ let jump _ file sock = Sys.(set_signal sigpipe Signal_ignore) ; Lwt_main.run (Lwt_unix.openfile file Lwt_unix.[O_APPEND;O_CREAT;O_WRONLY] 0o600 >>= fun fd -> + (Lwt_unix.file_exists sock >>= function + | true -> Lwt_unix.unlink sock + | false -> Lwt.return_unit) >>= fun () -> let s = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in Lwt_unix.(bind s (ADDR_UNIX sock)) >>= fun () -> Lwt_unix.listen s 1 ; @@ -144,12 +147,13 @@ let setup_log = $ Logs_cli.level ()) let socket = - let doc = "Socket to listen onto" in - Arg.(required & pos 1 (some string) None & info [] ~doc) + let doc = "Socket to listen on" in + let sock = Fpath.(to_string (Vmm_core.tmpdir / "log" + "sock")) in + Arg.(value & opt string sock & info [ "s" ; "socket" ] ~doc) let file = - let doc = "File to write to" in - Arg.(required & pos 0 (some string) None & info [] ~doc) + let doc = "File to write the log to" in + Arg.(value & opt string "/var/log/albatross" & info [ "logfile" ] ~doc) let cmd = Term.(ret (const jump $ setup_log $ file $ socket)), diff --git a/app/vmm_prometheus_stats.ml b/app/vmm_prometheus_stats.ml index f46398e..6e1d2aa 100644 --- a/app/vmm_prometheus_stats.ml +++ b/app/vmm_prometheus_stats.ml @@ -343,10 +343,10 @@ let db = Arg.(value & opt (some file) None & info [ "db" ] ~doc) let cmd = - let doc = "VMM TLS client" in + let doc = "VMM Prometheus connector" in let man = [ `S "DESCRIPTION" ; - `P "$(tname) connects to a server and initiates a TLS handshake" ] + `P "$(tname) connects to a VMMD to gather statistics and serves them for Prometheus via HTTP" ] in Term.(pure run_client $ setup_log $ cas $ client_cert $ client_key $ destination $ db $ address $ port), Term.info "vmm_prometheus_stats" ~version:"%%VERSION_NUM%%" ~doc ~man diff --git a/app/vmmd.ml b/app/vmmd.ml index b08c74d..2967d8b 100644 --- a/app/vmmd.ml +++ b/app/vmmd.ml @@ -216,16 +216,15 @@ let rec stats_loop () = Lwt_unix.sleep 600. >>= fun () -> stats_loop () -let jump _ dir cacert cert priv_key port = +let jump _ cacert cert priv_key port = Sys.(set_signal sigpipe Signal_ignore) ; - let dir = Fpath.v dir in Lwt_main.run (Nocrypto_entropy_lwt.initialize () >>= fun () -> - (init_sock dir "cons" >|= function + (init_sock Vmm_core.tmpdir "cons" >|= function | None -> invalid_arg "cannot connect to console socket" | Some c -> c) >>= fun c -> - init_sock dir "stat" >>= fun s -> - (init_sock dir "log" >|= function + init_sock Vmm_core.tmpdir "stat" >>= fun s -> + (init_sock Vmm_core.tmpdir "log" >|= function | None -> invalid_arg "cannot connect to log socket" | Some l -> l) >>= fun l -> server_socket port >>= fun socket -> @@ -237,7 +236,7 @@ let jump _ dir cacert cert priv_key port = Tls.(Config.server ~version:(Core.TLS_1_2, Core.TLS_1_2) ~reneg:true ~certificates:(`Single cert) ()) in - (match Vmm_engine.init dir cmp_s c s l with + (match Vmm_engine.init cmp_s c s l with | Ok s -> Lwt.return s | Error (`Msg m) -> Lwt.fail_with m) >>= fun t -> let state = ref t in @@ -289,28 +288,24 @@ let setup_log = $ Fmt_cli.style_renderer () $ Logs_cli.level ()) -let wdir = - let doc = "Working directory (unix domain sockets, etc.)" in - Arg.(required & pos 0 (some dir) None & info [] ~doc) - let cacert = let doc = "CA certificate" in - Arg.(required & pos 1 (some file) None & info [] ~doc) + Arg.(required & pos 0 (some file) None & info [] ~doc) let cert = let doc = "Certificate" in - Arg.(required & pos 2 (some file) None & info [] ~doc) + Arg.(required & pos 1 (some file) None & info [] ~doc) let key = let doc = "Private key" in - Arg.(required & pos 3 (some file) None & info [] ~doc) + Arg.(required & pos 2 (some file) None & info [] ~doc) let port = let doc = "TCP listen port" in Arg.(value & opt int 1025 & info [ "port" ] ~doc) let cmd = - Term.(ret (const jump $ setup_log $ wdir $ cacert $ cert $ key $ port)), + Term.(ret (const jump $ setup_log $ cacert $ cert $ key $ port)), Term.info "vmmd" ~version:"%%VERSION_NUM%%" let () = match Term.eval cmd with `Ok () -> exit 0 | _ -> exit 1 diff --git a/pkg/pkg.ml b/pkg/pkg.ml index 2a71d0b..14dd74e 100644 --- a/pkg/pkg.ml +++ b/pkg/pkg.ml @@ -18,6 +18,6 @@ let () = Pkg.bin "provision/vmm_gen_ca" ; Pkg.clib "stats/libvmm_stats_stubs.clib" ; Pkg.bin "stats/vmm_stats_lwt" ; - Pkg.bin "stats/vmm_stats_once" ; Pkg.bin "app/vmm_prometheus_stats" ; + Pkg.bin "app/vmm_influxdb_stats" ; ] diff --git a/src/vmm_commands.ml b/src/vmm_commands.ml index 88dadb6..2a93735 100644 --- a/src/vmm_commands.ml +++ b/src/vmm_commands.ml @@ -49,7 +49,7 @@ let rec close fd = let close_no_err fd = try close fd with _ -> () (* own code starts here - (c) 2017 Hannes Mehnert, all rights reserved *) + (c) 2017, 2018 Hannes Mehnert, all rights reserved *) open Vmm_core @@ -58,9 +58,8 @@ let rec mkfifo name = | Unix.Unix_error (Unix.EINTR, _, _) -> mkfifo name let image_file, fifo_file = - let tmp = Fpath.v (Filename.get_temp_dir_name ()) in - ((fun vm -> Fpath.(tmp / (vm_id vm) + "img")), - (fun vm -> Fpath.(tmp / (vm_id vm) + "fifo"))) + ((fun vm -> Fpath.(tmpdir / (vm_id vm) + "img")), + (fun vm -> Fpath.(tmpdir / (vm_id vm) + "fifo"))) let rec fifo_exists file = try Ok (Unix.((stat @@ Fpath.to_string file).st_kind = S_FIFO)) with @@ -157,13 +156,13 @@ let cpuset cpu = Ok ([ "taskset" ; "-c" ; cpustring ]) | x -> Error (`Msg ("unsupported operating system " ^ x)) -let exec dir vm taps = +let exec vm taps = (* TODO: --net-mac=xx *) let net = List.map (fun t -> "--net=" ^ t) taps in let argv = match vm.argv with None -> [] | Some xs -> xs in (match taps with - | [] -> Ok Fpath.(dir / "ukvm-bin.none") - | [_] -> Ok Fpath.(dir / "ukvm-bin.net") + | [] -> Ok Fpath.(dbdir / "ukvm-bin.none") + | [_] -> Ok Fpath.(dbdir / "ukvm-bin.net") | _ -> Error (`Msg "cannot handle multiple network interfaces")) >>= fun bin -> cpuset vm.cpuid >>= fun cpuset -> let mem = "--mem=" ^ string_of_int vm.requested_memory in diff --git a/src/vmm_commands.mli b/src/vmm_commands.mli index d691cea..6b7e6fb 100644 --- a/src/vmm_commands.mli +++ b/src/vmm_commands.mli @@ -8,7 +8,7 @@ val prepare : vm_config -> (string list, [> R.msg ]) result val shutdown : vm -> (unit, [> R.msg ]) result -val exec : Fpath.t -> vm_config -> string list -> (vm, [> R.msg ]) result +val exec : vm_config -> string list -> (vm, [> R.msg ]) result val destroy : vm -> unit diff --git a/src/vmm_core.ml b/src/vmm_core.ml index 488951e..558d59b 100644 --- a/src/vmm_core.ml +++ b/src/vmm_core.ml @@ -4,6 +4,9 @@ open Astring open Rresult.R.Infix +let tmpdir = Fpath.(v "/var" / "run" / "albatross") +let dbdir = Fpath.(v "/var" / "db" / "albatross") + module I = struct type t = int let compare : int -> int -> int = compare diff --git a/src/vmm_engine.ml b/src/vmm_engine.ml index df70642..50352be 100644 --- a/src/vmm_engine.ml +++ b/src/vmm_engine.ml @@ -8,7 +8,6 @@ open Rresult open R.Infix type ('a, 'b, 'c) t = { - dir : Fpath.t ; cmp : 'b -> 'b -> bool ; console_socket : 'a ; console_counter : int ; @@ -34,9 +33,9 @@ type ('a, 'b, 'c) t = { crls : X509.CRL.c list ; } -let init dir cmp console_socket stats_socket log_socket = +let init cmp console_socket stats_socket log_socket = (* error hard on permission denied etc. *) - let crls = Fpath.(dir / "crls") in + let crls = Fpath.(dbdir / "crls") in (Bos.OS.Dir.exists crls >>= function | true -> Ok true | false -> Bos.OS.Dir.create crls) >>= fun _ -> @@ -49,14 +48,14 @@ let init dir cmp console_socket stats_socket log_socket = | None -> R.error_msgf "couldn't parse CRL %a" Fpath.pp f | Some crl -> Ok (crl :: acc)) (Ok []) - Fpath.(dir / "crls") >>= fun crls -> + crls >>= fun crls -> crls >>= fun crls -> Ok { - dir ; cmp ; + cmp ; console_socket ; console_counter = 1 ; console_requests = IM.empty ; console_attached = String.Map.empty ; console_version = `WV0 ; stats_socket ; stats_counter = 1 ; stats_requests = IM.empty ; - stats_version = `WV0 ; + stats_version = `WV1 ; log_socket ; log_counter = 1 ; log_attached = String.Map.empty ; log_version = `WV0 ; log_requests = IM.empty ; client_version = `WV0 ; @@ -124,7 +123,7 @@ let handle_create t vm_config policies = Logs.debug (fun m -> m "prepared vm with taps %a" Fmt.(list ~sep:(unit ",@ ") string) taps) ; Ok (fun t s -> (* actually execute the vm *) - Vmm_commands.exec t.dir vm_config taps >>= fun vm -> + Vmm_commands.exec vm_config taps >>= fun vm -> Logs.debug (fun m -> m "exec()ed vm") ; Vmm_resources.insert t.resources full vm >>= fun resources -> let used_bridges = @@ -142,8 +141,8 @@ let handle_create t vm_config policies = Ok (t, `Tls (s, tls_out) :: out, vm)) let setup_stats t vm = - Vmm_commands.setup_freebsd_kludge vm.Vmm_core.pid >>= fun () -> - let stat_out = Vmm_wire.Stats.add t.stats_counter t.stats_version vm.pid vm.taps in + Vmm_commands.setup_freebsd_kludge vm.pid >>= fun () -> + let stat_out = Vmm_wire.Stats.add t.stats_counter t.stats_version (vm_id vm.config) vm.pid vm.taps in let t = { t with stats_counter = succ t.stats_counter } in Ok (t, stat t stat_out) @@ -167,7 +166,7 @@ let handle_shutdown t vm r = String.Map.add br (String.Set.remove ta old) b) t.used_bridges vm.config.network vm.taps in - let stat_out = Vmm_wire.Stats.remove t.stats_counter t.stats_version vm.pid in + let stat_out = Vmm_wire.Stats.remove t.stats_counter t.stats_version (vm_id vm.config) in let tasks = String.Map.remove (vm_id vm.config) t.tasks in let t = { t with stats_counter = succ t.stats_counter ; resources ; used_bridges ; tasks } in let t, outs = log t (Log.hdr vm.config.prefix vm.config.vname, @@ -185,6 +184,7 @@ let handle_command t s prefix perms hdr buf = begin Vmm_wire.decode_str buf >>= fun (buf, _l) -> let arg = if String.length buf = 0 then prefix else prefix @ [buf] in + let vmid = string_of_id arg in match x with | Info -> begin match Vmm_resources.find t.resources arg with @@ -211,29 +211,27 @@ let handle_command t s prefix perms hdr buf = end | Attach -> (* TODO: get (optionally) from client, instead of hardcoding Ptime.epoch below *) - let name = String.concat ~sep:"." arg in let on_success t = - let cons = Vmm_wire.Console.history t.console_counter t.console_version name Ptime.epoch in - let old = match String.Map.find name t.console_attached with + let cons = Vmm_wire.Console.history t.console_counter t.console_version vmid Ptime.epoch in + let old = match String.Map.find vmid t.console_attached with | None -> [] | Some s -> let out = Vmm_wire.success hdr.Vmm_wire.id t.client_version in [ `Tls (s, out) ] in - let console_attached = String.Map.add name s t.console_attached in + let console_attached = String.Map.add vmid s t.console_attached in { t with console_counter = succ t.console_counter ; console_attached }, `Raw (t.console_socket, cons) :: old in - let cons = Vmm_wire.Console.attach t.console_counter t.console_version name in + let cons = Vmm_wire.Console.attach t.console_counter t.console_version vmid in let console_requests = IM.add t.console_counter on_success t.console_requests in Ok ({ t with console_counter = succ t.console_counter ; console_requests }, [ `Raw (t.console_socket, cons) ]) | Detach -> - let name = String.concat ~sep:"." arg in - let cons = Vmm_wire.Console.detach t.console_counter t.console_version name in - (match String.Map.find name t.console_attached with + let cons = Vmm_wire.Console.detach t.console_counter t.console_version vmid in + (match String.Map.find vmid t.console_attached with | None -> Error (`Msg "not attached") - | Some x when t.cmp x s -> Ok (String.Map.remove name t.console_attached) + | Some x when t.cmp x s -> Ok (String.Map.remove vmid t.console_attached) | Some _ -> Error (`Msg "this socket is not attached")) >>= fun console_attached -> let out = Vmm_wire.success hdr.Vmm_wire.id t.client_version in Ok ({ t with console_counter = succ t.console_counter ; console_attached }, @@ -243,7 +241,7 @@ let handle_command t s prefix perms hdr buf = | None -> Error (`Msg "no statistics available") | Some _ -> match Vmm_resources.find_vm t.resources arg with | Some vm -> - let stat_out = Vmm_wire.Stats.stat t.stats_counter t.stats_version vm.pid in + let stat_out = Vmm_wire.Stats.stat t.stats_counter t.stats_version vmid in let d = (s, hdr.Vmm_wire.id, translate_tap vm) in let stats_requests = IM.add t.stats_counter d t.stats_requests in Ok ({ t with stats_counter = succ t.stats_counter ; stats_requests }, @@ -326,7 +324,7 @@ let handle_revocation t s leaf chain ca prefix = | Some _, None -> Error (`Msg "CRL number not present") | Some x, Some y -> if y > x then Ok () else Error (`Msg "CRL number not increased")) >>= fun () -> (* filename should be whatever_dir / crls / *) - let filename = Fpath.(t.dir / "crls" / string_of_id prefix) in + let filename = Fpath.(dbdir / "crls" / string_of_id prefix) in Bos.OS.File.delete filename >>= fun () -> Bos.OS.File.write filename (Cstruct.to_string (X509.Encoding.crl_to_cstruct crl)) >>= fun () -> (* remove crl with same issuer from crls, and inject this one into state *) diff --git a/src/vmm_wire.ml b/src/vmm_wire.ml index 57c2815..0fc77d0 100644 --- a/src/vmm_wire.ml +++ b/src/vmm_wire.ml @@ -16,21 +16,26 @@ open Astring open Vmm_core -type version = [ `WV0 ] +type version = [ `WV0 | `WV1 ] let version_to_int = function | `WV0 -> 0 + | `WV1 -> 1 let version_of_int = function | 0 -> Ok `WV0 + | 1 -> Ok `WV1 | _ -> Error (`Msg "unknown wire version") let version_eq a b = match a, b with | `WV0, `WV0 -> true + | `WV1, `WV1 -> true + | _ -> false let pp_version ppf v = Fmt.string ppf (match v with - | `WV0 -> "wire version 0") + | `WV0 -> "wire version 0" + | `WV1 -> "wire version 1") type header = { length : int ; @@ -243,16 +248,16 @@ module Stats = struct [@@uint16_t] ] - let encode id version op ?payload pid = - let pid = encode_pid pid in + let encode id version op ?payload nam = + let data, l = encode_string nam in let length, p = match payload with - | None -> 4, empty - | Some x -> 4 + Cstruct.len x, x + | None -> l, empty + | Some x -> l + Cstruct.len x, x and tag = op_to_int op in let r = - Cstruct.concat [ create_header { length ; version ; id ; tag } ; pid ; p ] + Cstruct.concat [ create_header { length ; version ; id ; tag } ; data ; p ] in Cstruct.to_string r @@ -352,13 +357,13 @@ module Stats = struct output_mcast ; input_dropped ; output_dropped }, l + 116) - let add id v pid taps = - let payload = encode_strings taps in - encode id v Add ~payload pid + let add id v nam pid taps = + let payload = Cstruct.append (encode_pid pid) (encode_strings taps) in + encode id v Add ~payload nam - let remove id v pid = encode id v Remove pid + let remove id v nam = encode id v Remove nam - let stat id v pid = encode id v Stat_request pid + let stat id v nam = encode id v Stat_request nam let stat_reply id version payload = let length = Cstruct.len payload diff --git a/stats/vmm_stats.ml b/stats/vmm_stats.ml index 9778df5..5e1974f 100644 --- a/stats/vmm_stats.ml +++ b/stats/vmm_stats.ml @@ -16,7 +16,7 @@ external vmmapi_close : vmctx -> unit = "vmmanage_vmmapi_close" external vmmapi_statnames : vmctx -> string list = "vmmanage_vmmapi_statnames" external vmmapi_stats : vmctx -> int64 list = "vmmanage_vmmapi_stats" -let my_version = `WV0 +let my_version = `WV1 let descr = ref [] @@ -25,12 +25,13 @@ type t = { pid_rusage : rusage IM.t ; pid_vmmapi : (string * int64) list IM.t ; nic_ifdata : ifdata String.Map.t ; + vmid_pid : int String.Map.t ; } let pp_strings pp taps = Fmt.(list ~sep:(unit ",@ ") string) pp taps let empty () = - { pid_nic = IM.empty ; pid_rusage = IM.empty ; pid_vmmapi = IM.empty ; nic_ifdata = String.Map.empty } + { pid_nic = IM.empty ; pid_rusage = IM.empty ; pid_vmmapi = IM.empty ; nic_ifdata = String.Map.empty ; vmid_pid = String.Map.empty } let rec wrap f arg = try Some (f arg) with @@ -91,7 +92,7 @@ let fill_descr ctx = end | ds -> Logs.info (fun m -> m "%d descr are already present" (List.length ds)) -let add_pid t pid nics = +let add_pid t vmid pid nics = let name = "ukvm" ^ string_of_int pid in match wrap sysctl_ifcount () with | None -> @@ -117,47 +118,59 @@ let add_pid t pid nics = Ok (Some vmctx)) >>= fun vmctx -> Logs.info (fun m -> m "adding %d %a with vmctx %b" pid pp_strings nics (match vmctx with None -> false | Some _ -> true)) ; - let pid_nic = IM.add pid (vmctx, nic_ids) t.pid_nic in - Ok { t with pid_nic } - - -let stats t pid = - Logs.debug (fun m -> m "querying statistics for %d" pid) ; - try - let _, nics = IM.find pid t.pid_nic - and ru = IM.find pid t.pid_rusage - and vmm = - try IM.find pid t.pid_vmmapi with - | Not_found -> - Logs.err (fun m -> m "failed to find vmm stats for %d" pid); - [] + let pid_nic = IM.add pid (vmctx, nic_ids) t.pid_nic + and vmid_pid = String.Map.add vmid pid t.vmid_pid in - match - List.fold_left (fun acc nic -> - match String.Map.find nic t.nic_ifdata, acc with - | None, _ -> None - | _, None -> None - | Some ifd, Some acc -> Some (ifd :: acc)) - (Some []) (snd (List.split nics)) + Ok { t with pid_nic ; vmid_pid } + + +let stats t vmid = + Logs.debug (fun m -> m "querying statistics for vmid %s" vmid) ; + match String.Map.find vmid t.vmid_pid with + | None -> Error (`Msg ("unknown vm " ^ vmid)) + | Some pid -> + Logs.debug (fun m -> m "querying statistics for %d" pid) ; + try + let _, nics = IM.find pid t.pid_nic + and ru = IM.find pid t.pid_rusage + and vmm = + try IM.find pid t.pid_vmmapi with + | Not_found -> + Logs.err (fun m -> m "failed to find vmm stats for %d" pid); + [] + in + match + List.fold_left (fun acc nic -> + match String.Map.find nic t.nic_ifdata, acc with + | None, _ -> None + | _, None -> None + | Some ifd, Some acc -> Some (ifd :: acc)) + (Some []) (snd (List.split nics)) + with + | None -> Error (`Msg "failed to find interface statistics") + | Some ifd -> Ok (ru, vmm, ifd) with - | None -> Error (`Msg "failed to find interface statistics") - | Some ifd -> Ok (ru, vmm, ifd) - with - | _ -> Error (`Msg "failed to find resource usage") + | _ -> Error (`Msg "failed to find resource usage") -let remove_pid t pid = - Logs.info (fun m -> m "removing pid %d" pid) ; - (try - match IM.find pid t.pid_nic with - | Some vmctx, _ -> ignore (wrap vmmapi_close vmctx) - | None, _ -> () - with - _ -> ()) ; - let pid_nic = IM.remove pid t.pid_nic in - { t with pid_nic } +let remove_vmid t vmid = + Logs.info (fun m -> m "removing vmid %s" vmid) ; + match String.Map.find vmid t.vmid_pid with + | None -> Logs.warn (fun m -> m "no pid found for %s" vmid) ; t + | Some pid -> + Logs.info (fun m -> m "removing pid %d" pid) ; + (try + match IM.find pid t.pid_nic with + | Some vmctx, _ -> ignore (wrap vmmapi_close vmctx) + | None, _ -> () + with + _ -> ()) ; + let pid_nic = IM.remove pid t.pid_nic + and vmid_pid = String.Map.remove vmid t.vmid_pid + in + { t with pid_nic ; vmid_pid } -let remove_pids t pids = - List.fold_left remove_pid t pids +let remove_vmids t vmids = + List.fold_left remove_vmid t vmids let handle t hdr buf = let open Vmm_wire in @@ -167,18 +180,17 @@ let handle t hdr buf = if not (version_eq my_version hdr.version) then Error (`Msg "cannot handle version") else + decode_string cs >>= fun (name, off) -> match int_to_op hdr.tag with | Some Add -> - decode_pid_taps cs >>= fun (pid, taps) -> - add_pid t pid taps >>= fun t -> - Ok (t, `Add pid, success ~msg:"added" hdr.id my_version) + decode_pid_taps (Cstruct.shift cs off) >>= fun (pid, taps) -> + add_pid t name pid taps >>= fun t -> + Ok (t, `Add name, success ~msg:"added" hdr.id my_version) | Some Remove -> - decode_pid cs >>= fun pid -> - let t = remove_pid t pid in - Ok (t, `Remove pid, success ~msg:"removed" hdr.id my_version) + let t = remove_vmid t name in + Ok (t, `Remove name, success ~msg:"removed" hdr.id my_version) | Some Stat_request -> - decode_pid cs >>= fun pid -> - stats t pid >>= fun s -> + stats t name >>= fun s -> Ok (t, `None, stat_reply hdr.id my_version (encode_stats s)) | _ -> Error (`Msg "unknown command") in diff --git a/stats/vmm_stats_lwt.ml b/stats/vmm_stats_lwt.ml index ab2aab8..4eb79e1 100644 --- a/stats/vmm_stats_lwt.ml +++ b/stats/vmm_stats_lwt.ml @@ -41,10 +41,10 @@ let handle s addr () = | Ok () -> loop acc | Error _ -> Logs.err (fun m -> m "exception while writing") ; Lwt.return acc in - loop [] >>= fun pids -> + loop [] >>= fun vmids -> Lwt.catch (fun () -> Lwt_unix.close s) (fun _ -> Lwt.return_unit) >|= fun () -> - Logs.warn (fun m -> m "disconnect, dropping %d pids!" (List.length pids)) ; - let t' = Vmm_stats.remove_pids !t pids in + Logs.warn (fun m -> m "disconnect, dropping %d vms!" (List.length vmids)) ; + let t' = Vmm_stats.remove_vmids !t vmids in t := t' let rec timer interval () = @@ -56,7 +56,10 @@ let jump _ file interval = Sys.(set_signal sigpipe Signal_ignore) ; let interval = Duration.(to_f (of_sec interval)) in Lwt_main.run - (let s = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in + ((Lwt_unix.file_exists file >>= function + | true -> Lwt_unix.unlink file + | false -> Lwt.return_unit) >>= fun () -> + let s = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in Lwt_unix.(bind s (ADDR_UNIX file)) >>= fun () -> Lwt_unix.listen s 1 ; Lwt.async (timer interval) ; @@ -80,8 +83,9 @@ let setup_log = $ Logs_cli.level ()) let socket = - let doc = "Socket to listen onto" in - Arg.(value & pos 0 string "" & info [] ~doc) + let doc = "Socket to listen on" in + let sock = Fpath.(to_string (Vmm_core.tmpdir / "stat" + "sock")) in + Arg.(value & opt string sock & info [ "s" ; "socket" ] ~doc) let interval = let doc = "Interval between statistics gatherings (in seconds)" in diff --git a/stats/vmm_stats_once.ml b/stats/vmm_stats_once.ml deleted file mode 100644 index ff37245..0000000 --- a/stats/vmm_stats_once.ml +++ /dev/null @@ -1,70 +0,0 @@ -(* (c) 2017, 2018 Hannes Mehnert, all rights reserved *) - -(* the process responsible for gathering statistics (CPU + mem + network) *) - -open Lwt.Infix - -let t = ref (Vmm_stats.empty ()) - -let rec timer pids () = - t := Vmm_stats.tick !t ; - List.iter (fun pid -> - match Vmm_stats.stats !t pid with - | Ok (ru, vmm, ifd) -> - Logs.info (fun m -> m "stats %d@.%a@.%a@.%a@." - pid Vmm_core.pp_rusage ru - Fmt.(list ~sep:(unit "@.") (pair ~sep:(unit ": ") string int64)) vmm - Fmt.(list ~sep:(unit "@.") Vmm_core.pp_ifdata) ifd) - | Error (`Msg e) -> - Logs.err (fun m -> m "error %s while getting stats of %d" e pid)) - pids ; - Lwt_unix.sleep Duration.(to_f (of_sec 1)) >>= fun () -> - timer pids () - -let split_pid xs = - List.fold_left (fun acc str -> - match Astring.String.cuts ~sep:":" str with - | pid :: taps -> (int_of_string pid, taps) :: acc - | [] -> invalid_arg "invalid pid") [] xs - -let jump _ pids = - Sys.(set_signal sigpipe Signal_ignore) ; - let pid_taps = split_pid pids in - let st = - List.fold_left (fun t (pid, taps) -> - match Vmm_stats.add_pid t pid taps with - | Ok t -> - Logs.info (fun m -> m "added pid %d taps %a" - pid Fmt.(list ~sep:(unit ", ") string) taps) ; - t - | Error (`Msg ms) -> - Logs.err (fun m -> m "error %s while adding pid %d taps %a" - ms pid Fmt.(list ~sep:(unit ", ") string) taps); - invalid_arg "broken") - !t pid_taps - in - t := st ; - let pids = fst (List.split pid_taps) in - `Ok (Lwt_main.run (timer pids ())) - -let setup_log style_renderer level = - Fmt_tty.setup_std_outputs ?style_renderer (); - Logs.set_level level; - Logs.set_reporter (Logs_fmt.reporter ~dst:Format.std_formatter ()) - -open Cmdliner - -let setup_log = - Term.(const setup_log - $ Fmt_cli.style_renderer () - $ Logs_cli.level ()) - -let pids = - let doc = "pids" in - Arg.(value & opt_all string [] & info [ "pid" ] ~doc) - -let cmd = - Term.(ret (const jump $ setup_log $ pids)), - Term.info "vmm_stats_once" ~version:"%%VERSION_NUM%%" - -let () = match Term.eval cmd with `Ok () -> exit 0 | _ -> exit 1