Refactor socket activation, use for all daemons
This commit is contained in:
parent
0c29e2b90d
commit
0508465bba
|
@ -345,6 +345,10 @@ let retry_connections =
|
||||||
let doc = "Number of retries when connecting to other daemons (log, console, stats etc). 0 aborts after one failure, -1 is unlimited retries." in
|
let doc = "Number of retries when connecting to other daemons (log, console, stats etc). 0 aborts after one failure, -1 is unlimited retries." in
|
||||||
Arg.(value & opt int 0 & info [ "retry-connections" ] ~doc)
|
Arg.(value & opt int 0 & info [ "retry-connections" ] ~doc)
|
||||||
|
|
||||||
|
let systemd_socket_activation =
|
||||||
|
let doc = "Pass this flag when systemd socket activation is being used" in
|
||||||
|
Arg.(value & flag & info [ "systemd-socket-activation" ] ~doc)
|
||||||
|
|
||||||
let exit_status = function
|
let exit_status = function
|
||||||
| Ok () -> Ok Success
|
| Ok () -> Ok Success
|
||||||
| Error e -> Ok e
|
| Error e -> Ok e
|
||||||
|
|
|
@ -158,12 +158,12 @@ let handle s addr =
|
||||||
|
|
||||||
let m = Vmm_core.conn_metrics "unix"
|
let m = Vmm_core.conn_metrics "unix"
|
||||||
|
|
||||||
let jump _ influx tmpdir =
|
let jump _ systemd influx tmpdir =
|
||||||
Sys.(set_signal sigpipe Signal_ignore) ;
|
Sys.(set_signal sigpipe Signal_ignore) ;
|
||||||
Albatross_cli.set_tmpdir tmpdir;
|
Albatross_cli.set_tmpdir tmpdir;
|
||||||
Lwt_main.run
|
Lwt_main.run
|
||||||
(Albatross_cli.init_influx "albatross_console" influx;
|
(Albatross_cli.init_influx "albatross_console" influx;
|
||||||
Vmm_lwt.server_socket `Console >>= fun s ->
|
Vmm_lwt.server_socket systemd `Console >>= fun s ->
|
||||||
let rec loop () =
|
let rec loop () =
|
||||||
Lwt_unix.accept s >>= fun (cs, addr) ->
|
Lwt_unix.accept s >>= fun (cs, addr) ->
|
||||||
m `Open;
|
m `Open;
|
||||||
|
@ -177,7 +177,7 @@ open Cmdliner
|
||||||
open Albatross_cli
|
open Albatross_cli
|
||||||
|
|
||||||
let cmd =
|
let cmd =
|
||||||
Term.(term_result (const jump $ setup_log $ influx $ tmpdir)),
|
Term.(term_result (const jump $ setup_log $ systemd_socket_activation $ influx $ tmpdir)),
|
||||||
Term.info "albatross_console" ~version
|
Term.info "albatross_console" ~version
|
||||||
|
|
||||||
let () = match Term.eval cmd with `Ok () -> exit 0 | _ -> exit 1
|
let () = match Term.eval cmd with `Ok () -> exit 0 | _ -> exit 1
|
||||||
|
|
|
@ -148,14 +148,6 @@ let handle mvar ring s addr =
|
||||||
|
|
||||||
let m = Vmm_core.conn_metrics "unix"
|
let m = Vmm_core.conn_metrics "unix"
|
||||||
|
|
||||||
let server_socket systemd =
|
|
||||||
if systemd then
|
|
||||||
match Daemon.listen_fds () with
|
|
||||||
| [fd] -> Lwt.return (Lwt_unix.of_unix_file_descr fd)
|
|
||||||
| _ -> failwith "Unexpected number of sockets"
|
|
||||||
else
|
|
||||||
Vmm_lwt.server_socket `Log
|
|
||||||
|
|
||||||
let jump _ systemd file read_only influx tmpdir =
|
let jump _ systemd file read_only influx tmpdir =
|
||||||
Sys.(set_signal sigpipe Signal_ignore) ;
|
Sys.(set_signal sigpipe Signal_ignore) ;
|
||||||
Albatross_cli.set_tmpdir tmpdir;
|
Albatross_cli.set_tmpdir tmpdir;
|
||||||
|
@ -169,7 +161,7 @@ let jump _ systemd file read_only influx tmpdir =
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
end else begin
|
end else begin
|
||||||
Albatross_cli.init_influx "albatross_log" influx;
|
Albatross_cli.init_influx "albatross_log" influx;
|
||||||
server_socket systemd >>= fun s ->
|
Vmm_lwt.server_socket systemd `Log >>= fun s ->
|
||||||
let ring = Vmm_ring.create `Startup () in
|
let ring = Vmm_ring.create `Startup () in
|
||||||
List.iter (Vmm_ring.write ring) entries ;
|
List.iter (Vmm_ring.write ring) entries ;
|
||||||
let mvar = Lwt_mvar.create_empty () in
|
let mvar = Lwt_mvar.create_empty () in
|
||||||
|
@ -199,12 +191,8 @@ let read_only =
|
||||||
let doc = "Only read log file and present entries" in
|
let doc = "Only read log file and present entries" in
|
||||||
Arg.(value & flag & info [ "read-only" ] ~doc)
|
Arg.(value & flag & info [ "read-only" ] ~doc)
|
||||||
|
|
||||||
let systemd =
|
|
||||||
let doc = "Pass this flag when systemd socket activation is being used" in
|
|
||||||
Arg.(value & flag & info [ "systemd-socket-activation" ] ~doc)
|
|
||||||
|
|
||||||
let cmd =
|
let cmd =
|
||||||
Term.(const jump $ setup_log $ systemd $ file $ read_only $ influx $ tmpdir),
|
Term.(const jump $ setup_log $ systemd_socket_activation $ file $ read_only $ influx $ tmpdir),
|
||||||
Term.info "albatross_log" ~version
|
Term.info "albatross_log" ~version
|
||||||
|
|
||||||
let () = match Term.eval cmd with `Ok () -> exit 0 | _ -> exit 1
|
let () = match Term.eval cmd with `Ok () -> exit 0 | _ -> exit 1
|
||||||
|
|
|
@ -135,7 +135,7 @@ let write_reply name fd txt (hdr, cmd) =
|
||||||
|
|
||||||
let m = conn_metrics "unix"
|
let m = conn_metrics "unix"
|
||||||
|
|
||||||
let jump _ influx tmpdir dbdir retries enable_stats =
|
let jump _ systemd influx tmpdir dbdir retries enable_stats =
|
||||||
Sys.(set_signal sigpipe Signal_ignore);
|
Sys.(set_signal sigpipe Signal_ignore);
|
||||||
Albatross_cli.set_tmpdir tmpdir;
|
Albatross_cli.set_tmpdir tmpdir;
|
||||||
Albatross_cli.set_dbdir dbdir;
|
Albatross_cli.set_dbdir dbdir;
|
||||||
|
@ -165,7 +165,7 @@ let jump _ influx tmpdir dbdir retries enable_stats =
|
||||||
else
|
else
|
||||||
Lwt.return_none) >>= fun s ->
|
Lwt.return_none) >>= fun s ->
|
||||||
Lwt.catch
|
Lwt.catch
|
||||||
(fun () -> Vmm_lwt.server_socket `Vmmd)
|
(fun () -> Vmm_lwt.server_socket systemd `Vmmd)
|
||||||
(fun e ->
|
(fun e ->
|
||||||
let str =
|
let str =
|
||||||
Fmt.strf "unable to create server socket %a: %s"
|
Fmt.strf "unable to create server socket %a: %s"
|
||||||
|
@ -218,7 +218,7 @@ let jump _ influx tmpdir dbdir retries enable_stats =
|
||||||
open Cmdliner
|
open Cmdliner
|
||||||
|
|
||||||
let cmd =
|
let cmd =
|
||||||
Term.(const jump $ setup_log $ influx $ tmpdir $ dbdir $ retry_connections $ enable_stats),
|
Term.(const jump $ setup_log $ systemd_socket_activation $ influx $ tmpdir $ dbdir $ retry_connections $ enable_stats),
|
||||||
Term.info "albatrossd" ~version:Albatross_cli.version
|
Term.info "albatrossd" ~version:Albatross_cli.version
|
||||||
|
|
||||||
let () = match Term.eval cmd with `Ok () -> exit 0 | _ -> exit 1
|
let () = match Term.eval cmd with `Ok () -> exit 0 | _ -> exit 1
|
||||||
|
|
|
@ -24,4 +24,4 @@
|
||||||
(public_name albatross-log)
|
(public_name albatross-log)
|
||||||
(package albatross)
|
(package albatross)
|
||||||
(modules albatross_log)
|
(modules albatross_log)
|
||||||
(libraries albatross.cli albatross systemd))
|
(libraries albatross.cli albatross))
|
||||||
|
|
3
src/dune
3
src/dune
|
@ -3,4 +3,5 @@
|
||||||
(public_name albatross)
|
(public_name albatross)
|
||||||
(wrapped false)
|
(wrapped false)
|
||||||
(libraries rresult logs ipaddr bos hex ptime astring duration cstruct jsonm
|
(libraries rresult logs ipaddr bos hex ptime astring duration cstruct jsonm
|
||||||
decompress lwt lwt.unix ptime.clock.os asn1-combinators metrics))
|
decompress lwt lwt.unix ptime.clock.os asn1-combinators metrics
|
||||||
|
systemd))
|
||||||
|
|
|
@ -12,20 +12,25 @@ let safe_close fd =
|
||||||
(fun () -> Lwt_unix.close fd)
|
(fun () -> Lwt_unix.close fd)
|
||||||
(fun _ -> Lwt.return_unit)
|
(fun _ -> Lwt.return_unit)
|
||||||
|
|
||||||
let server_socket sock =
|
let server_socket systemd sock =
|
||||||
let name = Vmm_core.socket_path sock in
|
if systemd
|
||||||
(Lwt_unix.file_exists name >>= function
|
then match Daemon.listen_fds () with
|
||||||
| true -> Lwt_unix.unlink name
|
| [fd] -> Lwt.return (Lwt_unix.of_unix_file_descr fd)
|
||||||
| false -> Lwt.return_unit) >>= fun () ->
|
| _ -> failwith "Systemd socket activation error" (* FIXME *)
|
||||||
let s = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in
|
else
|
||||||
Lwt_unix.set_close_on_exec s ;
|
let name = Vmm_core.socket_path sock in
|
||||||
let old_umask = Unix.umask 0 in
|
(Lwt_unix.file_exists name >>= function
|
||||||
let _ = Unix.umask (old_umask land 0o707) in
|
| true -> Lwt_unix.unlink name
|
||||||
Lwt_unix.(bind s (ADDR_UNIX name)) >|= fun () ->
|
| false -> Lwt.return_unit) >>= fun () ->
|
||||||
Logs.app (fun m -> m "listening on %s" name);
|
let s = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in
|
||||||
let _ = Unix.umask old_umask in
|
Lwt_unix.set_close_on_exec s ;
|
||||||
Lwt_unix.listen s 1 ;
|
let old_umask = Unix.umask 0 in
|
||||||
s
|
let _ = Unix.umask (old_umask land 0o707) in
|
||||||
|
Lwt_unix.(bind s (ADDR_UNIX name)) >|= fun () ->
|
||||||
|
Logs.app (fun m -> m "listening on %s" name);
|
||||||
|
let _ = Unix.umask old_umask in
|
||||||
|
Lwt_unix.listen s 1 ;
|
||||||
|
s
|
||||||
|
|
||||||
let connect addrtype sockaddr =
|
let connect addrtype sockaddr =
|
||||||
let c = Lwt_unix.(socket addrtype SOCK_STREAM 0) in
|
let c = Lwt_unix.(socket addrtype SOCK_STREAM 0) in
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
|
|
||||||
val pp_sockaddr : Format.formatter -> Lwt_unix.sockaddr -> unit
|
val pp_sockaddr : Format.formatter -> Lwt_unix.sockaddr -> unit
|
||||||
|
|
||||||
val server_socket : Vmm_core.service -> Lwt_unix.file_descr Lwt.t
|
val server_socket : bool -> Vmm_core.service -> Lwt_unix.file_descr Lwt.t
|
||||||
|
|
||||||
val connect : Lwt_unix.socket_domain -> Lwt_unix.sockaddr -> Lwt_unix.file_descr option Lwt.t
|
val connect : Lwt_unix.socket_domain -> Lwt_unix.sockaddr -> Lwt_unix.file_descr option Lwt.t
|
||||||
|
|
||||||
|
|
|
@ -66,13 +66,13 @@ let timer () =
|
||||||
|
|
||||||
let m = Vmm_core.conn_metrics "unix"
|
let m = Vmm_core.conn_metrics "unix"
|
||||||
|
|
||||||
let jump _ interval influx tmpdir =
|
let jump _ systemd interval influx tmpdir =
|
||||||
Sys.(set_signal sigpipe Signal_ignore);
|
Sys.(set_signal sigpipe Signal_ignore);
|
||||||
Albatross_cli.set_tmpdir tmpdir;
|
Albatross_cli.set_tmpdir tmpdir;
|
||||||
let interval = Duration.(to_f (of_sec interval)) in
|
let interval = Duration.(to_f (of_sec interval)) in
|
||||||
Lwt_main.run
|
Lwt_main.run
|
||||||
(Albatross_cli.init_influx "albatross_stats" influx;
|
(Albatross_cli.init_influx "albatross_stats" influx;
|
||||||
Vmm_lwt.server_socket `Stats >>= fun s ->
|
Vmm_lwt.server_socket systemd `Stats >>= fun s ->
|
||||||
let _ev = Lwt_engine.on_timer interval true (fun _e -> Lwt.async timer) in
|
let _ev = Lwt_engine.on_timer interval true (fun _e -> Lwt.async timer) in
|
||||||
let rec loop () =
|
let rec loop () =
|
||||||
Lwt_unix.accept s >>= fun (cs, addr) ->
|
Lwt_unix.accept s >>= fun (cs, addr) ->
|
||||||
|
@ -90,7 +90,7 @@ let interval =
|
||||||
Arg.(value & opt int 10 & info [ "interval" ] ~doc)
|
Arg.(value & opt int 10 & info [ "interval" ] ~doc)
|
||||||
|
|
||||||
let cmd =
|
let cmd =
|
||||||
Term.(term_result (const jump $ setup_log $ interval $ influx $ tmpdir)),
|
Term.(term_result (const jump $ setup_log $ systemd_socket_activation $ interval $ influx $ tmpdir)),
|
||||||
Term.info "albatross_stats" ~version
|
Term.info "albatross_stats" ~version
|
||||||
|
|
||||||
let () = match Term.eval cmd with `Ok () -> exit 0 | _ -> exit 1
|
let () = match Term.eval cmd with `Ok () -> exit 0 | _ -> exit 1
|
||||||
|
|
Loading…
Reference in a new issue