builder/app/builder_client.ml

241 lines
7.5 KiB
OCaml

open Rresult.R.Infix
let sh = "script.sh"
let prng = Random.State.make_self_init ()
let rec tmp_dirname () =
let rnd = Random.State.bits prng land 0xFFFFFF in
let name =
Filename.concat (Filename.get_temp_dir_name ())
(Printf.sprintf "builder-%06x" rnd)
in
try
let _stat = Unix.lstat name
in
tmp_dirname ()
with
_ -> name
let read_console_write_network s fd uuid =
let ch = Unix.in_channel_of_descr fd in
let rec read_write () =
try
let line = input_line ch in
Builder.write_cmd s (Builder.Output (uuid, line)) |> ignore; (* TODO *)
read_write ()
with
End_of_file -> ()
in
read_write ();
exit 0
let prepare_fs job =
let tmpdir = Fpath.v (tmp_dirname ()) in
at_exit (fun () -> ignore (Bos.OS.Dir.delete ~recurse:true tmpdir));
Bos.OS.Dir.create tmpdir >>= fun did_not_exist ->
if not did_not_exist then
Error (`Msg "path already existed")
else
Bos.OS.Dir.set_current tmpdir >>= fun () ->
List.fold_left (fun acc (f, v) ->
acc >>= fun () ->
let path = Fpath.append tmpdir f in
Bos.OS.Dir.create (Fpath.parent path) >>= fun _ ->
Bos.OS.File.write (Fpath.append tmpdir f) v)
(Ok ()) job.Builder.files >>= fun () ->
Bos.OS.File.write ~mode:500 Fpath.(tmpdir / sh) job.Builder.script >>| fun () ->
tmpdir
let collect_output files tmpdir =
let all_files =
let dirs = [ tmpdir ] in
let collect path acc = path :: acc in
match Bos.OS.Path.fold ~elements:`Files collect [] dirs with
| Ok files -> files
| Error `Msg msg ->
Logs.warn (fun m -> m "folding resulted in an error %s" msg);
[]
in
List.fold_left (fun acc f ->
match Fpath.rem_prefix tmpdir f with
| None ->
Logs.warn (fun m -> m "couldn't remove tmpdir prefix from %a"
Fpath.pp f);
acc
| Some name when Fpath.to_string name = sh ->
(* ignoring the script.sh itself *)
acc
| Some name when List.exists (fun (p, _) -> Fpath.equal p name) files ->
(* ignoring all inputs *)
acc
| Some name ->
match Bos.OS.File.read f with
| Ok data -> (name, data) :: acc
| Error `Msg e ->
Logs.err (fun m -> m "error reading %a: %s" Fpath.pp f e);
acc)
[] all_files
let execute_job s uuid job =
match prepare_fs job with
| Error `Msg msg -> Builder.Msg msg, []
| Ok tmpdir ->
let to_read, out = Unix.pipe () in
let f = Unix.fork () in
if f = 0 then
(* child *)
read_console_write_network s to_read uuid
else (* parent *)
let toexec = Fpath.(to_string (tmpdir / sh)) in
let pid =
Unix.create_process "/bin/sh" [| "-e" ; toexec |] Unix.stdin out out
in
let r = Unix.waitpid [] pid in
Unix.kill f 9;
match snd r with
| Unix.WEXITED 0 -> Builder.Exited 0, collect_output job.Builder.files tmpdir
| Unix.WEXITED c -> Builder.Exited c, []
| Unix.WSIGNALED s -> Builder.Signalled s, []
| Unix.WSTOPPED s -> Builder.Stopped s, []
let jump () (host, port) =
(* client semantics:
- 1 connect to server
- 2 send client hello
- 3 await server hello, check version agreement
- 4 send job request
- 5 read from server until job is received
- 6 dump files, execute job (pipe + fork to send output to server)
- 7 send job result to server
if while in 1-5 server communication fails, start from 1
if in 6 server communication fails, drop data (for now) [and retry]
if in 7 server communication fails, retry until publishing result is done
*)
let connect () =
try
let sockaddr = Unix.ADDR_INET (host, port) in
let s = Unix.(socket PF_INET SOCK_STREAM 0) in
Unix.(connect s sockaddr);
Ok s
with
| Unix.Unix_error (err, f, _) ->
Logs.err (fun m -> m "unix error in %s: %s" f (Unix.error_message err));
Error (`Msg "connect failure")
and disconnect s =
Unix.close s
and timeout () = Unix.sleepf 2.
in
let disc_on_err s = function Ok x -> Ok x | Error _ as e -> disconnect s; e in
let init () =
connect () >>= fun s ->
let hello = Builder.(Client_hello cmds) in
disc_on_err s (Builder.write_cmd s hello) >>= fun () ->
disc_on_err s (Builder.read_cmd s) >>| fun cmd ->
s, cmd
in
let rec establish () =
match init () with
| Error `Msg e ->
Logs.warn (fun m -> m "error %s connecting, trying again in a bit" e);
timeout ();
establish ()
| Ok cmd -> Ok cmd
in
let good_server_hello s = function
| Builder.Server_hello x when x = Builder.cmds-> Ok ()
| cmd ->
Logs.err (fun m -> m "expected Server Hello with matching version, got %a"
Builder.pp_cmd cmd);
disconnect s;
Error (`Msg "bad communication")
in
let rec hs () =
establish () >>= fun (s, cmd) ->
good_server_hello s cmd >>= fun () ->
match
disc_on_err s (Builder.write_cmd s Builder.Job_requested >>= fun () ->
Builder.read_cmd s)
with
| Ok cmd -> Ok (s, cmd)
| Error `Msg msg ->
Logs.warn (fun m -> m "received error %s while waiting for job, retry" msg);
hs ()
in
let submit_success s fini =
match Builder.write_cmd s fini with
| Ok () -> Ok ()
| Error `Msg msg ->
Logs.err (fun m -> m "error %s while submitting result" msg);
let rec try_again n =
match
timeout ();
establish ()
with
| Ok (s, cmd) ->
good_server_hello s cmd >>= fun () ->
begin match disc_on_err s (Builder.write_cmd s fini) with
| Ok () -> Ok ()
| Error `Msg msg ->
Logs.warn (fun m -> m "failed to submit result %s (n = %d)" msg n);
try_again (succ n)
end
| Error `Msg msg ->
Logs.warn (fun m -> m "failed to establish connection %s (n = %d)" msg n);
try_again (succ n)
in
try_again 0
in
hs () >>= fun (s, cmd) ->
(match cmd with
| Builder.Job_schedule (uuid, job) ->
Logs.app (fun m -> m "received job uuid %a: %a" Uuidm.pp uuid
Builder.pp_job job);
let r, data = execute_job s uuid job in
let fini = Builder.Job_finished (uuid, r, data) in
submit_success s fini
| cmd ->
Logs.err (fun m -> m "expected Job, got %a" Builder.pp_cmd cmd);
disconnect s;
Error (`Msg "bad communication")) >>= fun () ->
disconnect s;
Ok ()
let setup_log style_renderer level =
Fmt_tty.setup_std_outputs ?style_renderer ();
Logs.set_level level;
Logs.set_reporter (Logs_fmt.reporter ~dst:Format.std_formatter ())
open Cmdliner
let host_port : (Unix.inet_addr * int) Arg.converter =
let parse s =
match String.split_on_char ':' s with
| [ hostname ; port ] ->
begin try
`Ok (Unix.inet_addr_of_string hostname, int_of_string port)
with
Not_found -> `Error "failed to parse IP:port"
end
| _ -> `Error "broken: no port specified"
in
parse, fun ppf (h, p) -> Format.fprintf ppf "%s:%d"
(Unix.string_of_inet_addr h) p
let remote =
let doc = "The remote host:port to connect to" in
Arg.(value & opt host_port (Unix.inet_addr_loopback, 1234) &
info [ "r" ; "remote" ] ~doc ~docv:"IP:PORT")
let setup_log =
Term.(const setup_log
$ Fmt_cli.style_renderer ()
$ Logs_cli.level ())
let cmd =
Term.(term_result (const jump $ setup_log $ remote)),
Term.info "builder-client" ~version:Builder.version
let () = match Term.eval cmd with `Ok () -> exit 0 | _ -> exit 1