initial commit

This commit is contained in:
Hannes Mehnert 2020-11-24 22:02:45 +01:00
commit fc4afd870f
10 changed files with 1251 additions and 0 deletions

0
LICENSE.md Normal file
View file

34
README.md Normal file
View file

@ -0,0 +1,34 @@
# builder - scheduling and executing jobs
This consists of two programs, a client and a server. The single server
contains a queue of jobs, which are consumed by a client. Any number of
clients can be connected to the server.
The server keeps persistent state of the job queue (so restarts / crashes are
dealt with). A client connects, provides some information about itself, and
then waits for a job. Once a job is read and accepted, it is executed by the
client. Resulting artifacts can be transferred by the client to the server.
2
The server has the ability to schedule jobs at regular intervals - similar to
crontab.
Handled and unhandled error conditions:
- client execution fails (timeout, restart, killed): not handled
- client execution gets a signal: reported to server
- client can't write job data files -> failure is captured and reported
- client can't read job output -> logged to client's console (without artifacts gathered)
- client errors when submitting console output -> ignored
- client errors when submitting build artifacts -> retry
- there's no explicit ACK for packets
Left to do:
- client should queue up console output on server connection failure (and continue reading output at the same time)
- client should inform when data passed to it and artifacts overlap (name / checksum)
- client could sandbox the executed script a bit more (maybe?)
- client should have a timeout for the script to be executed
- separate stdout and stderr to be sent to the server?
- UI
- retrieve artifacts even if execution failed
- the running jobs are not stored onto disk, which may result in unexpected behaviour
- should instead once a job is scheduled the uuid and job information being dumped to disk already (also avoids dummy job dump)
- directory traversals (server folds over output directory and collects files)

240
app/builder_client.ml Normal file
View file

@ -0,0 +1,240 @@
open Rresult.R.Infix
let sh = "script.sh"
let prng = Random.State.make_self_init ()
let rec tmp_dirname () =
let rnd = Random.State.bits prng land 0xFFFFFF in
let name =
Filename.concat (Filename.get_temp_dir_name ())
(Printf.sprintf "builder-%06x" rnd)
in
try
let _stat = Unix.lstat name
in
tmp_dirname ()
with
_ -> name
let read_console_write_network s fd uuid =
let ch = Unix.in_channel_of_descr fd in
let rec read_write () =
try
let line = input_line ch in
Builder.write_cmd s (Builder.Output (uuid, line)) |> ignore; (* TODO *)
read_write ()
with
End_of_file -> ()
in
read_write ();
exit 0
let prepare_fs job =
let tmpdir = Fpath.v (tmp_dirname ()) in
at_exit (fun () -> ignore (Bos.OS.Dir.delete ~recurse:true tmpdir));
Bos.OS.Dir.create tmpdir >>= fun did_not_exist ->
if not did_not_exist then
Error (`Msg "path already existed")
else
Bos.OS.Dir.set_current tmpdir >>= fun () ->
List.fold_left (fun acc (f, v) ->
acc >>= fun () ->
let path = Fpath.append tmpdir f in
Bos.OS.Dir.create (Fpath.parent path) >>= fun _ ->
Bos.OS.File.write (Fpath.append tmpdir f) v)
(Ok ()) job.Builder.files >>= fun () ->
Bos.OS.File.write ~mode:500 Fpath.(tmpdir / sh) job.Builder.script >>| fun () ->
tmpdir
let collect_output files tmpdir =
let all_files =
let dirs = [ tmpdir ] in
let collect path acc = path :: acc in
match Bos.OS.Path.fold ~elements:`Files collect [] dirs with
| Ok files -> files
| Error `Msg msg ->
Logs.warn (fun m -> m "folding resulted in an error %s" msg);
[]
in
List.fold_left (fun acc f ->
match Fpath.rem_prefix tmpdir f with
| None ->
Logs.warn (fun m -> m "couldn't remove tmpdir prefix from %a"
Fpath.pp f);
acc
| Some name when Fpath.to_string name = sh ->
(* ignoring the script.sh itself *)
acc
| Some name when List.exists (fun (p, _) -> Fpath.equal p name) files ->
(* ignoring all inputs *)
acc
| Some name ->
match Bos.OS.File.read f with
| Ok data -> (name, data) :: acc
| Error `Msg e ->
Logs.err (fun m -> m "error reading %a: %s" Fpath.pp f e);
acc)
[] all_files
let execute_job s uuid job =
match prepare_fs job with
| Error `Msg msg -> Builder.Msg msg, []
| Ok tmpdir ->
let to_read, out = Unix.pipe () in
let f = Unix.fork () in
if f = 0 then
(* child *)
read_console_write_network s to_read uuid
else (* parent *)
let toexec = Fpath.(to_string (tmpdir / sh)) in
let pid =
Unix.create_process "/bin/sh" [| "-e" ; toexec |] Unix.stdin out out
in
let r = Unix.waitpid [] pid in
Unix.kill f 9;
match snd r with
| Unix.WEXITED 0 -> Builder.Exited 0, collect_output job.Builder.files tmpdir
| Unix.WEXITED c -> Builder.Exited c, []
| Unix.WSIGNALED s -> Builder.Signalled s, []
| Unix.WSTOPPED s -> Builder.Stopped s, []
let jump () (host, port) =
(* client semantics:
- 1 connect to server
- 2 send client hello
- 3 await server hello, check version agreement
- 4 send job request
- 5 read from server until job is received
- 6 dump files, execute job (pipe + fork to send output to server)
- 7 send job result to server
if while in 1-5 server communication fails, start from 1
if in 6 server communication fails, drop data (for now) [and retry]
if in 7 server communication fails, retry until publishing result is done
*)
let connect () =
try
let sockaddr = Unix.ADDR_INET (host, port) in
let s = Unix.(socket PF_INET SOCK_STREAM 0) in
Unix.(connect s sockaddr);
Ok s
with
| Unix.Unix_error (err, f, _) ->
Logs.err (fun m -> m "unix error in %s: %s" f (Unix.error_message err));
Error (`Msg "connect failure")
and disconnect s =
Unix.close s
and timeout () = Unix.sleepf 2.
in
let disc_on_err s = function Ok x -> Ok x | Error _ as e -> disconnect s; e in
let init () =
connect () >>= fun s ->
let hello = Builder.(Client_hello cmds) in
disc_on_err s (Builder.write_cmd s hello) >>= fun () ->
disc_on_err s (Builder.read_cmd s) >>| fun cmd ->
s, cmd
in
let rec establish () =
match init () with
| Error `Msg e ->
Logs.warn (fun m -> m "error %s connecting, trying again in a bit" e);
timeout ();
establish ()
| Ok cmd -> Ok cmd
in
let good_server_hello s = function
| Builder.Server_hello x when x = Builder.cmds-> Ok ()
| cmd ->
Logs.err (fun m -> m "expected Server Hello with matching version, got %a"
Builder.pp_cmd cmd);
disconnect s;
Error (`Msg "bad communication")
in
let rec hs () =
establish () >>= fun (s, cmd) ->
good_server_hello s cmd >>= fun () ->
match
disc_on_err s (Builder.write_cmd s Builder.Job_requested >>= fun () ->
Builder.read_cmd s)
with
| Ok cmd -> Ok (s, cmd)
| Error `Msg msg ->
Logs.warn (fun m -> m "received error %s while waiting for job, retry" msg);
hs ()
in
let submit_success s fini =
match Builder.write_cmd s fini with
| Ok () -> Ok ()
| Error `Msg msg ->
Logs.err (fun m -> m "error %s while submitting result" msg);
let rec try_again n =
match
timeout ();
establish ()
with
| Ok (s, cmd) ->
good_server_hello s cmd >>= fun () ->
begin match disc_on_err s (Builder.write_cmd s fini) with
| Ok () -> Ok ()
| Error `Msg msg ->
Logs.warn (fun m -> m "failed to submit result %s (n = %d)" msg n);
try_again (succ n)
end
| Error `Msg msg ->
Logs.warn (fun m -> m "failed to establish connection %s (n = %d)" msg n);
try_again (succ n)
in
try_again 0
in
hs () >>= fun (s, cmd) ->
(match cmd with
| Builder.Job_schedule (uuid, job) ->
Logs.app (fun m -> m "received job uuid %a: %a" Uuidm.pp uuid
Builder.pp_job job);
let r, data = execute_job s uuid job in
let fini = Builder.Job_finished (uuid, r, data) in
submit_success s fini
| cmd ->
Logs.err (fun m -> m "expected Job, got %a" Builder.pp_cmd cmd);
disconnect s;
Error (`Msg "bad communication")) >>= fun () ->
disconnect s;
Ok ()
let setup_log style_renderer level =
Fmt_tty.setup_std_outputs ?style_renderer ();
Logs.set_level level;
Logs.set_reporter (Logs_fmt.reporter ~dst:Format.std_formatter ())
open Cmdliner
let host_port : (Unix.inet_addr * int) Arg.converter =
let parse s =
match String.split_on_char ':' s with
| [ hostname ; port ] ->
begin try
`Ok (Unix.inet_addr_of_string hostname, int_of_string port)
with
Not_found -> `Error "failed to parse IP:port"
end
| _ -> `Error "broken: no port specified"
in
parse, fun ppf (h, p) -> Format.fprintf ppf "%s:%d"
(Unix.string_of_inet_addr h) p
let remote =
let doc = "The remote host:port to connect to" in
Arg.(value & opt host_port (Unix.inet_addr_loopback, 1234) &
info [ "r" ; "remote" ] ~doc ~docv:"IP:PORT")
let setup_log =
Term.(const setup_log
$ Fmt_cli.style_renderer ()
$ Logs_cli.level ())
let cmd =
Term.(term_result (const jump $ setup_log $ remote)),
Term.info "builder-client" ~version:Builder.version
let () = match Term.eval cmd with `Ok () -> exit 0 | _ -> exit 1

174
app/client.ml Normal file
View file

@ -0,0 +1,174 @@
open Rresult.R.Infix
let connect (host, port) =
let connect () =
try
let sockaddr = Unix.ADDR_INET (host, port) in
let s = Unix.(socket PF_INET SOCK_STREAM 0) in
Unix.(connect s sockaddr);
Ok s
with
| Unix.Unix_error (err, f, _) ->
Logs.err (fun m -> m "unix error in %s: %s" f (Unix.error_message err));
Error (`Msg "connect failure")
in
connect () >>= fun s ->
let hello = Builder.(Client_hello cmds) in
Builder.write_cmd s hello >>= fun () ->
Builder.read_cmd s >>= function
| Builder.Server_hello x when x = Builder.cmds -> Ok s
| cmd ->
Logs.err (fun m -> m "expected Server Hello with matching version, got %a"
Builder.pp_cmd cmd);
Error (`Msg "bad communication")
let observe () remote id =
match Uuidm.of_string id with
| None -> Error (`Msg "error parsing uuid")
| Some uuid ->
connect remote >>= fun s ->
Builder.write_cmd s (Builder.Observe uuid) >>= fun () ->
let rec read () =
Builder.read_cmd s >>= fun cmd ->
Logs.app (fun m -> m "%a" Builder.pp_cmd cmd);
read ()
in
read ()
let info_ () remote =
connect remote >>= fun s ->
Builder.write_cmd s Builder.Info >>= fun () ->
Builder.read_cmd s >>= fun cmd ->
Logs.app (fun m -> m "%a" Builder.pp_cmd cmd);
Ok ()
let unschedule () remote name =
connect remote >>= fun s ->
Builder.write_cmd s (Builder.Unschedule name)
let schedule () remote name script period dir =
let files =
match dir with
| None -> []
| Some f ->
let dir = Fpath.v f in
let all_files =
let dirs = [ dir ] in
let collect path acc = path :: acc in
match Bos.OS.Path.fold ~elements:`Files collect [] dirs with
| Ok files -> files
| Error `Msg msg ->
Logs.warn (fun m -> m "folding resulted in an error %s" msg);
[]
in
List.fold_left (fun acc f ->
match Fpath.rem_prefix dir f with
| None ->
Logs.warn (fun m -> m "couldn't remove prefix from %a"
Fpath.pp f);
acc
| Some name ->
match Bos.OS.File.read f with
| Ok data -> (name, data) :: acc
| Error `Msg e ->
Logs.err (fun m -> m "error reading %a: %s" Fpath.pp f e);
acc)
[] all_files
in
Bos.OS.File.read (Fpath.v script) >>= fun script ->
let job = Builder.{ name ; script ; files } in
connect remote >>= fun s ->
Builder.write_cmd s (Builder.Schedule (period, job))
let help () man_format cmds = function
| None -> `Help (`Pager, None)
| Some t when List.mem t cmds -> `Help (man_format, Some t)
| Some x ->
print_endline ("unknown command '" ^ x ^ "', available commands:");
List.iter print_endline cmds;
`Ok ()
let setup_log style_renderer level =
Fmt_tty.setup_std_outputs ?style_renderer ();
Logs.set_level level;
Logs.set_reporter (Logs_fmt.reporter ~dst:Format.std_formatter ())
open Cmdliner
let host_port : (Unix.inet_addr * int) Arg.converter =
let parse s =
match String.split_on_char ':' s with
| [ hostname ; port ] ->
begin try
`Ok (Unix.inet_addr_of_string hostname, int_of_string port)
with
Not_found -> `Error "failed to parse IP:port"
end
| _ -> `Error "broken: no port specified"
in
parse, fun ppf (h, p) -> Format.fprintf ppf "%s:%d"
(Unix.string_of_inet_addr h) p
let remote =
let doc = "The remote host:port to connect to" in
Arg.(value & opt host_port (Unix.inet_addr_loopback, 1234) &
info [ "r" ; "remote" ] ~doc ~docv:"IP:PORT")
let nam =
let doc = "The job name" in
Arg.(required & pos 0 (some string) None & info [ ] ~doc ~docv:"NAME")
let id =
let doc = "The job ID" in
Arg.(required & pos 0 (some string) None & info [ ] ~doc ~docv:"ID")
let p : Builder.period Arg.converter =
let parse = function
| "hourly" -> `Ok Builder.Hourly
| "daily" -> `Ok Builder.Daily
| "weekly" -> `Ok Builder.Weekly
| s -> `Error ("failed to parse period " ^ s)
in
parse, Builder.pp_period
let period =
let doc = "The periodic execution interval" in
Arg.(value & opt p Builder.Hourly & info [ "period" ] ~doc ~docv:"PERIOD")
let dir =
let doc = "The directory with supplementary material to embed into the job" in
Arg.(value & opt (some dir) None & info [ "dir" ] ~doc ~docv:"DIR")
let script =
let doc = "The script to execute" in
Arg.(required & pos 1 (some file) None & info [ ] ~doc ~docv:"FILE")
let setup_log =
Term.(const setup_log
$ Fmt_cli.style_renderer ()
$ Logs_cli.level ())
let observe_cmd =
Term.(term_result (const observe $ setup_log $ remote $ id)),
Term.info "observe"
let info_cmd =
Term.(term_result (const info_ $ setup_log $ remote)),
Term.info "info"
let unschedule_cmd =
Term.(term_result (const unschedule $ setup_log $ remote $ nam)),
Term.info "unschedule"
let schedule_cmd =
Term.(term_result (const schedule $ setup_log $ remote $ nam $ script $ period $ dir)),
Term.info "schedule"
let help_cmd =
let doc = "Builder client" in
Term.(ret (const help $ setup_log $ Term.man_format $ Term.choice_names $ Term.pure None)),
Term.info "builder" ~version:Builder.version ~doc
let cmds = [ help_cmd ; schedule_cmd ; unschedule_cmd ; info_cmd ; observe_cmd ]
let () = match Term.eval_choice help_cmd cmds with `Ok () -> exit 0 | _ -> exit 1

17
app/dune Normal file
View file

@ -0,0 +1,17 @@
(executable
(name builder_client)
(public_name builder-client)
(modules builder_client)
(libraries cmdliner builder logs.fmt fmt.cli logs.cli fmt.tty bos))
(executable
(name client)
(public_name client)
(modules client)
(libraries cmdliner builder logs.fmt fmt.cli logs.cli fmt.tty bos))
(executable
(name server)
(public_name builder-server)
(modules server)
(libraries cmdliner builder logs.fmt fmt.cli logs.cli fmt.tty lwt lwt.unix bos bheap ptime.clock.os duration))

414
app/server.ml Normal file
View file

@ -0,0 +1,414 @@
open Lwt.Infix
let read fd =
Lwt.catch (fun () ->
let rec r b ?(off = 0) l =
if l = 0 then
Lwt.return (Ok ())
else
Lwt_unix.read fd b off l >>= fun read ->
if read = 0 then
Lwt.return (Error (`Msg "end of file"))
else
r b ~off:(read + off) (l - read)
in
let open Lwt_result.Infix in
let bl = Bytes.create 8 in
r bl 8 >>= fun () ->
let l = Cstruct.BE.get_uint64 (Cstruct.of_bytes bl) 0 in
let l_int = Int64.to_int l in (* TODO *)
let b = Bytes.create l_int in
r b l_int >|= fun () ->
Cstruct.of_bytes b)
(fun e ->
Logs.err (fun m -> m "Error while reading: %s" (Printexc.to_string e));
Lwt.return (Error (`Msg "error in read")))
let read_cmd fd =
let open Lwt_result.Infix in
read fd >>= fun data ->
Lwt.return (Builder.Asn.cmd_of_cs data)
let write fd data =
Lwt.catch (fun () ->
let rec w b ?(off = 0) l =
if l = 0 then
Lwt.return_unit
else
Lwt_unix.write fd b off l >>= fun written ->
w b ~off:(written + off) (l - written)
in
let csl = Cstruct.create 8 in
Cstruct.BE.set_uint64 csl 0 (Int64.of_int (Cstruct.len data));
w (Cstruct.to_bytes csl) 8 >>= fun () ->
w (Cstruct.to_bytes data) (Cstruct.len data) >|= fun () ->
Ok ())
(fun e ->
Logs.err (fun m -> m "Error while writing: %s" (Printexc.to_string e));
Lwt.return (Error (`Msg "unix error in write")))
let write_cmd fd cmd =
let data = Builder.Asn.cmd_to_cs cmd in
write fd data
let pp_sockaddr ppf = function
| Lwt_unix.ADDR_UNIX s -> Fmt.pf ppf "unix://%s" s
| Lwt_unix.ADDR_INET (ip, port) ->
Fmt.pf ppf "%s:%d" (Unix.string_of_inet_addr ip) port
module UM = Map.Make(Uuidm)
module S = Binary_heap.Make (struct
type t = Builder.schedule_item
let compare { Builder.next = n1 ; _ } { Builder.next = n2 ; _ } =
Ptime.compare n1 n2
end)
let dummy =
let job = Builder.{ name = "dummy" ; script = "#nothing" ; files = [] } in
Builder.{ next = Ptime.epoch ; period = Daily ; job }
type t = {
mutable queue : Builder.job Queue.t ;
mutable schedule : S.t ;
mutable running : (Ptime.t * Builder.job * string Lwt_condition.t * (int64 * string) list) UM.t ;
waiter : unit Lwt_condition.t ;
dir : Fpath.t ;
}
let p_to_span p =
let one_hour = 60 * 60 in
let s = match p with
| Builder.Hourly -> one_hour
| Builder.Daily -> 24 * one_hour
| Builder.Weekly -> 7 * 24 * one_hour
in
Ptime.Span.of_int_s s
let schedule_job t now period job =
if Queue.fold (fun acc i ->
if acc then not (String.equal i.Builder.name job.Builder.name) else acc)
true t.queue
then begin
Queue.add job t.queue;
Lwt_condition.broadcast t.waiter ();
end;
match Ptime.add_span now (p_to_span period) with
| None -> Logs.err (fun m -> m "ptime add span failed when scheduling job")
| Some next -> S.add t.schedule Builder.{ next ; period ; job }
let schedule t =
let now = Ptime_clock.now () in
let rec s_next modified =
match S.minimum t.schedule with
| exception Binary_heap.Empty -> modified
| Builder.{ next ; period ; job } when Ptime.is_later ~than:next now ->
S.remove t.schedule;
schedule_job t now period job;
s_next true
| _ -> modified
in
s_next false
let dump, restore =
let open Rresult.R.Infix in
let file = "state" in
(fun t ->
let state =
let jobs = Queue.fold (fun acc j -> Builder.Job j :: acc) [] t.queue in
S.fold (fun s acc -> (Builder.Schedule s) :: acc) t.schedule
(List.rev jobs)
in
let data = Builder.Asn.state_to_cs state in
let bak = Fpath.(t.dir / file + "tmp") in
Bos.OS.File.write bak (Cstruct.to_string data) >>= fun () ->
Bos.OS.U.(error_to_msg (rename bak Fpath.(t.dir / file)))),
(fun dir ->
Bos.OS.Dir.create dir >>= fun _ ->
let to_read = Fpath.(dir / file) in
let queue = Queue.create ()
and schedule = S.create ~dummy 13
and waiter = Lwt_condition.create ()
in
Bos.OS.File.exists to_read >>= function
| false ->
Logs.warn (fun m -> m "state file does not exist, using empty");
Ok { queue ; schedule ; running = UM.empty ; waiter ; dir }
| true ->
Bos.OS.File.read to_read >>= fun data ->
Builder.Asn.state_of_cs (Cstruct.of_string data) >>= fun items ->
let queue, schedule =
List.fold_left (fun (queue, schedule) -> function
| Builder.Job j -> Queue.add j queue; (queue, schedule)
| Builder.Schedule s -> S.add schedule s; (queue, schedule))
(queue, schedule) items
in
Ok { queue ; schedule ; running = UM.empty ; waiter ; dir })
let uuid_gen = Uuidm.v4_gen (Random.State.make_self_init ())
let job_finished state uuid res data =
let open Rresult.R.Infix in
let now = Ptime_clock.now () in
let started, job, out =
match UM.find_opt uuid state.running with
| None -> Ptime.epoch, dummy.Builder.job, []
| Some (c, j, cond, o) ->
let res_str = Fmt.to_to_string Builder.pp_execution_result res in
Lwt_condition.broadcast cond res_str;
c, j, o
in
state.running <- UM.remove uuid state.running;
let out_dir = Fpath.(state.dir / Uuidm.to_string uuid) in
Bos.OS.Dir.create out_dir >>= fun _ ->
let full =
let out = List.map (fun (d, d') -> Int64.to_int d, d') out in
let v = job, uuid, out, started, now, res, data in
Builder.Asn.exec_to_cs v
in
Bos.OS.File.write Fpath.(out_dir / "full") (Cstruct.to_string full) >>= fun () ->
let console_out =
List.map (fun (delta, txt) ->
Printf.sprintf "%dms: %S" (Duration.to_ms delta) txt)
(List.rev out)
in
let console = Fpath.(out_dir / "console.log") in
let started = "started at " ^ Ptime.to_rfc3339 started
and stopped = "stopped at " ^ Ptime.to_rfc3339 now
and exited = Fmt.to_to_string Builder.pp_execution_result res
in
Bos.OS.File.write_lines console (started :: console_out @ [ exited ; stopped ]) >>= fun () ->
let out = Fpath.(out_dir / "output") in
List.iter (fun (path, value) ->
let p = Fpath.append out path in
ignore (Bos.OS.Dir.create (Fpath.parent p));
ignore (Bos.OS.File.write p value))
data;
let in_dir = Fpath.(out_dir / "input") in
List.iter (fun (path, value) ->
let p = Fpath.append in_dir path in
ignore (Bos.OS.Dir.create (Fpath.parent p));
ignore (Bos.OS.File.write p value))
job.Builder.files;
Bos.OS.File.write Fpath.(in_dir / "script.sh") job.Builder.script
let handle t fd addr =
(* -- client connection:
(1) read client hello
(2) send server hello
-- now there are different paths:
(3) read request job
(4) send job
(5) await job done
-- or a leftover client connection from previous run:
(3) read job done
-- or scheduling a job
(3) read schedule
-- or unscheduling a job
(3) read unschedule
-- or info
(3) read info
(4) send info_reply
-- or observing
(3) read observe
(4) send outputs and job done
*)
let open Lwt_result.Infix in
Logs.app (fun m -> m "client connection from %a" pp_sockaddr addr);
read_cmd fd >>= function
| Builder.Client_hello n when n = Builder.cmds ->
write_cmd fd Builder.(Server_hello cmds) >>= fun () ->
begin
read_cmd fd >>= function
| Builder.Job_requested ->
Logs.app (fun m -> m "job requested");
let rec find_job () =
match Queue.take_opt t.queue with
| None -> Lwt.bind (Lwt_condition.wait t.waiter) find_job
| Some job -> Lwt.return job
in
Lwt.bind (find_job ()) (fun job ->
ignore (dump t);
let uuid = uuid_gen () in
write_cmd fd (Builder.Job_schedule (uuid, job)) >>= fun () ->
Logs.app (fun m -> m "job %a scheduled %a"
Uuidm.pp uuid Builder.pp_job job);
t.running <- UM.add uuid (Ptime_clock.now (), job, Lwt_condition.create (), []) t.running;
(* await output *)
let rec read () =
read_cmd fd >>= function
| Builder.Output (uuid, data) ->
Logs.app (fun m -> m "job %a output %S" Uuidm.pp uuid data);
(match UM.find_opt uuid t.running with
| None ->
Logs.err (fun m -> m "unknown %a, discarding %S"
Uuidm.pp uuid data)
| Some (created, job, cond, out) ->
Lwt_condition.broadcast cond data;
let ts =
let delta = Ptime.diff (Ptime_clock.now ()) created in
Duration.of_f (Ptime.Span.to_float_s delta)
in
let value = created, job, cond, (ts, data) :: out in
t.running <- UM.add uuid value t.running);
read ()
| Builder.Job_finished (uuid, r, data) ->
Logs.app (fun m -> m "job %a finished with %a" Uuidm.pp uuid
Builder.pp_execution_result r);
ignore (job_finished t uuid r data);
Lwt.return (Ok ())
| cmd ->
Logs.err (fun m -> m "expected output of job finished, got %a"
Builder.pp_cmd cmd);
Lwt.return (Ok ())
in
read ())
| Builder.Job_finished (uuid, r, data) ->
Logs.app (fun m -> m "job %a immediately finished with %a" Uuidm.pp uuid
Builder.pp_execution_result r);
ignore (job_finished t uuid r data);
Lwt.return (Ok ())
| Builder.Schedule (p, j) ->
Logs.app (fun m -> m "%a schedule %a" Builder.pp_period p
Builder.pp_job j);
if S.fold (fun { Builder.job ; _ } acc ->
if acc then not (String.equal job.Builder.name j.Builder.name) else acc)
t.schedule true
then
let now = Ptime_clock.now () in
schedule_job t now p j;
ignore (dump t);
Lwt.return (Ok ())
else begin
Logs.err (fun m -> m "job with same name already in schedule");
Lwt.return (Error (`Msg "job name already used"))
end
| Builder.Unschedule name ->
Logs.app (fun m -> m "unschedule %s" name);
let schedule =
let s = S.create ~dummy 13 in
S.iter (fun ({ Builder.job ; _ } as si) ->
if not (String.equal job.Builder.name name) then
S.add s si
else ()) t.schedule;
s
and queue =
let q = Queue.create () in
Queue.iter (fun job ->
if not (String.equal job.Builder.name name) then
Queue.add job q
else
())
t.queue;
q
in
t.schedule <- schedule;
t.queue <- queue;
ignore (dump t);
Lwt.return (Ok ())
| Builder.Info ->
Logs.app (fun m -> m "info");
let reply =
let schedule = S.fold (fun s acc -> s :: acc) t.schedule []
and queue = List.rev (Queue.fold (fun acc j -> j :: acc) [] t.queue)
and running =
UM.fold (fun uuid (started, job, _, _) acc ->
(started, uuid, job) :: acc)
t.running []
in
Builder.{ schedule ; queue ; running }
in
write_cmd fd (Builder.Info_reply reply)
| Builder.Observe id ->
(* two cases: still running or already done *)
begin match UM.find_opt id t.running with
| Some (_, _, cond, out) ->
let open Lwt.Infix in
Lwt_list.iter_s (fun (_, l) ->
write_cmd fd (Builder.Output (id, l)) >|= ignore) out >>= fun () ->
let rec more () =
Lwt_condition.wait cond >>= fun data ->
write_cmd fd (Builder.Output (id, data)) >>= fun _ ->
more ()
in
more ()
| None ->
let console_file = Fpath.(t.dir / Uuidm.to_string id / "console.log") in
match Bos.OS.File.read_lines console_file with
| Error _ -> Lwt.return (Ok ())
| Ok data ->
let open Lwt.Infix in
Lwt_list.iter_s (fun l ->
write_cmd fd (Builder.Output (id, l)) >|= ignore) data >|= fun () ->
Ok ()
end
| cmd ->
Logs.err (fun m -> m "unexpected %a" Builder.pp_cmd cmd);
Lwt_result.lift (Error (`Msg "bad communication"))
end
| cmd ->
Logs.err (fun m -> m "expected client hello with matching version, got %a"
Builder.pp_cmd cmd);
Lwt_result.lift (Error (`Msg "bad communication"))
let jump () ip port dir =
Lwt_main.run
(Sys.(set_signal sigpipe Signal_ignore);
let d = Fpath.v dir in
Lwt_result.lift (restore d) >>= fun state ->
(match state with
| Ok s -> Lwt.return s | Error `Msg m -> Lwt.fail_with m) >>= fun state ->
let modified = schedule state in
ignore (if modified then dump state else Ok ());
let s = Lwt_unix.(socket PF_INET SOCK_STREAM 0) in
Lwt_unix.(setsockopt s SO_REUSEADDR true);
Lwt_unix.(bind s (ADDR_INET (Unix.inet_addr_of_string ip, port))) >>= fun () ->
Lwt_unix.listen s 10;
let _ev =
Lwt_engine.on_timer 60. true (fun _ev ->
let modified = schedule state in
ignore (if modified then dump state else Ok ()))
in
Lwt.catch (fun () ->
let rec loop () =
Lwt_unix.accept s >>= fun (fd, addr) ->
Lwt.async (fun () ->
handle state fd addr >>= fun _ ->
Lwt_unix.close fd);
loop ()
in
loop ())
(fun e ->
Lwt.return (Logs.err (fun m -> m "exception %s, shutting down"
(Printexc.to_string e)))));
Ok ()
let setup_log style_renderer level =
Fmt_tty.setup_std_outputs ?style_renderer ();
Logs.set_level level;
Logs.set_reporter (Logs_fmt.reporter ~dst:Format.std_formatter ())
open Cmdliner
let setup_log =
Term.(const setup_log
$ Fmt_cli.style_renderer ()
$ Logs_cli.level ())
let port =
let doc = "TCP listen port" in
Arg.(value & opt int 1234 & info [ "port" ] ~doc)
let ip =
let doc = "Listen IP" in
Arg.(value & opt string "127.0.0.1" & info [ "ip" ] ~doc)
let dir =
let doc = "Directory for persistent data (defaults to /var/db/builder)" in
Arg.(value & opt dir "/var/db/builder" & info [ "dir" ] ~doc)
let cmd =
Term.(term_result (const jump $ setup_log $ ip $ port $ dir)),
Term.info "builder-server" ~version:Builder.version
let () = match Term.eval cmd with `Ok () -> exit 0 | _ -> exit 1

0
builder.opam Normal file
View file

2
dune-project Normal file
View file

@ -0,0 +1,2 @@
(lang dune 2.0)
(name builder)

366
lib/builder.ml Normal file
View file

@ -0,0 +1,366 @@
let src = Logs.Src.create "builder" ~doc:"Builder"
module Log = (val Logs.src_log src : Logs.LOG)
open Rresult.R.Infix
type data = (Fpath.t * string) list
let pp_data ppf xs =
Fmt.(list ~sep:(unit "@.")
(pair ~sep:(unit ": ") Fpath.pp int))
ppf
(List.map (fun (f, s) -> f, String.length s) xs)
type job = {
name : string ;
script : string ;
files : data ;
}
let pp_job ppf { name ; script ; files } =
Fmt.pf ppf "name %s, script %d, files %a" name (String.length script)
pp_data files
type execution_result =
| Exited of int
| Signalled of int
| Stopped of int
| Msg of string
let pp_execution_result ppf = function
| Exited i -> Fmt.pf ppf "exited %d" i
| Signalled i -> Fmt.pf ppf "signalled %d" i
| Stopped i -> Fmt.pf ppf "stopped %d" i
| Msg m -> Fmt.pf ppf "execution aborted: %s" m
type period = Hourly | Daily | Weekly
let pp_period ppf = function
| Hourly -> Fmt.string ppf "hourly"
| Daily -> Fmt.string ppf "daily"
| Weekly -> Fmt.string ppf "weekly"
type schedule_item = {
next : Ptime.t ;
period : period ;
job : job ;
}
let pp_schedule_item ppf { next ; period ; job } =
Fmt.pf ppf "next %a, %a: %a" (Ptime.pp_rfc3339 ()) next
pp_period period pp_job job
type info = {
schedule : schedule_item list ;
queue : job list ;
running : (Ptime.t * Uuidm.t * job) list ;
}
let triple ~sep pa pb pc ppf (va, vb, vc)=
Fmt.pair ~sep pa (Fmt.pair ~sep pb pc) ppf
(va, (vb, vc))
let pp_info ppf { schedule ; queue ; running } =
let pp_time = Ptime.pp_rfc3339 () in
Fmt.pf ppf "schedule: %a@.queue: %a@.running: %a@."
Fmt.(list ~sep:(unit ";@ ") pp_schedule_item) schedule
Fmt.(list ~sep:(unit ";@ ") pp_job) queue
Fmt.(list ~sep:(unit ";@ ")
(triple ~sep:(unit ",@,") pp_time Uuidm.pp pp_job)) running
type cmd =
| Client_hello of int
| Server_hello of int
| Job_requested
| Job_schedule of Uuidm.t * job
| Job_finished of Uuidm.t * execution_result * data
| Output of Uuidm.t * string
| Schedule of period * job
| Unschedule of string
| Info
| Info_reply of info
| Observe of Uuidm.t
let cmds = 11
let version =
Fmt.strf "version %%VERSION%% protocol version %d" cmds
let pp_cmd ppf = function
| Client_hello max -> Fmt.pf ppf "client hello (max %d)" max
| Server_hello max -> Fmt.pf ppf "server hello (max %d)" max
| Job_requested -> Fmt.string ppf "job request"
| Job_schedule (uuid, job) ->
Fmt.pf ppf "[%a] job schedule %a" Uuidm.pp uuid pp_job job
| Job_finished (uuid, result, data) ->
Fmt.pf ppf "[%a] job finished with %a: %a" Uuidm.pp uuid
pp_execution_result result pp_data data
| Output (uuid, data) -> Fmt.pf ppf "[%a] %S" Uuidm.pp uuid data
| Schedule (period, job) ->
Fmt.pf ppf "schedule at %a: %a" pp_period period pp_job job
| Unschedule job_name -> Fmt.pf ppf "unschedule %s" job_name
| Info -> Fmt.string ppf "info"
| Info_reply info -> Fmt.pf ppf "info: %a" pp_info info
| Observe id -> Fmt.pf ppf "observe %a" Uuidm.pp id
type state_item =
| Job of job
| Schedule of schedule_item
let pp_state_item ppf = function
| Job j -> Fmt.pf ppf "job %a" pp_job j
| Schedule s -> Fmt.pf ppf "schedule %a" pp_schedule_item s
type state = state_item list
let pp_state = Fmt.(list ~sep:(unit ";@ ") pp_state_item)
module Asn = struct
let guard p err = if p then Ok () else Error err
let decode_strict codec cs =
match Asn.decode codec cs with
| Ok (a, cs) ->
guard (Cstruct.len cs = 0) (`Msg "trailing bytes") >>= fun () ->
Ok a
| Error (`Parse msg) -> Error (`Msg msg)
let projections_of asn =
let c = Asn.codec Asn.der asn in
(decode_strict c, Asn.encode c)
let data =
let f (path, value) =
match Fpath.of_string path with
| Ok p -> p, value
| Error `Msg msg -> Asn.S.error (`Parse msg)
and g (path, value) =
Fpath.to_string path, value
in
Asn.S.(sequence_of
(map f g
(sequence2
(required ~label:"path" utf8_string)
(required ~label:"data" utf8_string))))
let job =
let f (name, script, files) =
{ name ; script ; files }
and g { name ; script ; files } =
name, script, files
in
Asn.S.(map f g (sequence3
(required ~label:"name" utf8_string)
(required ~label:"script" utf8_string)
(required ~label:"files" data)))
let period =
let f = function
| `C1 () -> Hourly
| `C2 () -> Daily
| `C3 () -> Weekly
and g = function
| Hourly -> `C1 ()
| Daily -> `C2 ()
| Weekly -> `C3 ()
in
Asn.S.(map f g
(choice3 (explicit 0 null) (explicit 1 null) (explicit 2 null)))
let schedule =
let f (next, period, job) = {next; period; job}
and g {next; period; job} = (next, period, job)
in
Asn.S.(map f g (sequence3
(required ~label:"next" utc_time)
(required ~label:"period" period)
(required ~label:"job" job)))
let state_item =
let f = function
| `C1 j -> Job j
| `C2 s -> Schedule s
and g = function
| Job j -> `C1 j
| Schedule s -> `C2 s
in
Asn.S.(map f g (choice2
(explicit 0 job)
(explicit 1 schedule)))
let state_of_cs, state_to_cs = projections_of (Asn.S.sequence_of state_item)
let uuid =
let f s =
match Uuidm.of_bytes s with
| None -> Asn.S.error (`Parse "couldn't decode UUID")
| Some s -> s
and g uuid = Uuidm.to_bytes uuid
in
Asn.S.(map f g utf8_string)
let res =
let f = function
| `C1 i -> Exited i
| `C2 i -> Signalled i
| `C3 i -> Stopped i
| `C4 s -> Msg s
and g = function
| Exited i -> `C1 i
| Signalled i -> `C2 i
| Stopped i -> `C3 i
| Msg s -> `C4 s
in
Asn.S.(map f g
(choice4
(explicit 0 int)
(explicit 1 int)
(explicit 2 int)
(explicit 3 utf8_string)))
let exec =
let f = function
| `C1 (job, uuid, out, (created, finished), res, data) ->
job, uuid, out, created, finished, res, data
| `C2 () -> assert false
and g (job, uuid, out, created, finished, res, data) =
`C1 (job, uuid, out, (created, finished), res, data)
in
Asn.S.(map f g
(choice2
(explicit 0
(sequence6
(required ~label:"job" job)
(required ~label:"uuid" uuid)
(required ~label:"console"
(sequence_of (sequence2
(required ~label:"delta" int)
(required ~label:"data" utf8_string))))
(required ~label:"timestamps"
(sequence2
(required ~label:"started" utc_time)
(required ~label:"finished" utc_time)))
(required ~label:"result" res)
(required ~label:"output" data)))
(explicit 1 null)))
let exec_of_cs, exec_to_cs = projections_of exec
let cmd =
let f = function
| `C1 `C1 max -> Client_hello max
| `C1 `C2 max -> Server_hello max
| `C1 `C3 (uuid, job) -> Job_schedule (uuid, job)
| `C1 `C4 (uuid, res, data) -> Job_finished (uuid, res, data)
| `C1 `C5 (uuid, out) -> Output (uuid, out)
| `C1 `C6 (period, job) -> Schedule (period, job)
| `C2 `C1 () -> Info
| `C2 `C2 (schedule, queue, running) ->
Info_reply { schedule ; queue ; running }
| `C2 `C3 () -> Job_requested
| `C2 `C4 jn -> Unschedule jn
| `C2 `C5 id -> Observe id
and g = function
| Client_hello max -> `C1 (`C1 max)
| Server_hello max -> `C1 (`C2 max)
| Job_schedule (uuid, job) -> `C1 (`C3 (uuid, job))
| Job_finished (uuid, res, data) -> `C1 (`C4 (uuid, res, data))
| Output (uuid, out) -> `C1 (`C5 (uuid, out))
| Schedule (period, job) -> `C1 (`C6 (period, job))
| Info -> `C2 (`C1 ())
| Info_reply { schedule ; queue ; running } ->
`C2 (`C2 (schedule, queue, running))
| Job_requested -> `C2 (`C3 ())
| Unschedule jn -> `C2 (`C4 jn)
| Observe id -> `C2 (`C5 id)
in
Asn.S.(map f g
(choice2
(choice6
(explicit 0 int)
(explicit 1 int)
(explicit 2 (sequence2
(required ~label:"uuid" uuid)
(required ~label:"job" job)))
(explicit 3 (sequence3
(required ~label:"uuid" uuid)
(required ~label:"result" res)
(required ~label:"data" data)))
(explicit 4 (sequence2
(required ~label:"uuid" uuid)
(required ~label:"output" utf8_string)))
(explicit 5 (sequence2
(required ~label:"period" period)
(required ~label:"job" job))))
(choice5
(explicit 6 null)
(explicit 7 (sequence3
(required ~label:"schedule" (sequence_of schedule))
(required ~label:"queue" (sequence_of job))
(required ~label:"running"
(sequence_of
(sequence3
(required ~label:"started" utc_time)
(required ~label:"uuid" uuid)
(required ~label:"job" job))))))
(explicit 8 null)
(explicit 9 utf8_string)
(explicit 10 uuid)
)))
let cmd_of_cs, cmd_to_cs = projections_of cmd
end
let rec ign_intr f v =
try f v with Unix.Unix_error (Unix.EINTR, _, _) -> ign_intr f v
let read fd =
try
let rec r b ?(off = 0) l =
if l = 0 then
Ok ()
else
let read = ign_intr (Unix.read fd b off) l in
if read = 0 then
Error (`Msg "end of file")
else
r b ~off:(read + off) (l - read)
in
let bl = Bytes.create 8 in
r bl 8 >>= fun () ->
let l = Cstruct.BE.get_uint64 (Cstruct.of_bytes bl) 0 in
let l_int = Int64.to_int l in (* TODO *)
let b = Bytes.create l_int in
r b l_int >>= fun () ->
Ok (Cstruct.of_bytes b)
with
Unix.Unix_error (err, f, _) ->
Log.err (fun m -> m "Unix error in %s: %s" f (Unix.error_message err));
Error (`Msg "unix error in read")
let read_cmd fd =
read fd >>= fun data ->
Asn.cmd_of_cs data
let write fd data =
try
let rec w b ?(off = 0) l =
if l = 0 then
()
else
let written = ign_intr (Unix.write fd b off) l in
w b ~off:(written + off) (l - written)
in
let csl = Cstruct.create 8 in
Cstruct.BE.set_uint64 csl 0 (Int64.of_int (Cstruct.len data));
w (Cstruct.to_bytes csl) 8;
w (Cstruct.to_bytes data) (Cstruct.len data);
Ok ()
with
Unix.Unix_error (err, f, _) ->
Log.err (fun m -> m "Unix error in %s: %s" f (Unix.error_message err));
Error (`Msg "unix error in write")
let write_cmd fd cmd =
let data = Asn.cmd_to_cs cmd in
write fd data

4
lib/dune Normal file
View file

@ -0,0 +1,4 @@
(library
(name builder)
(public_name builder)
(libraries unix rresult fpath fmt logs cstruct asn1-combinators uuidm))