418 lines
15 KiB
OCaml
418 lines
15 KiB
OCaml
open Lwt.Infix
|
|
|
|
let read fd =
|
|
Lwt.catch (fun () ->
|
|
let rec r b ?(off = 0) l =
|
|
if l = 0 then
|
|
Lwt.return (Ok ())
|
|
else
|
|
Lwt_unix.read fd b off l >>= fun read ->
|
|
if read = 0 then
|
|
Lwt.return (Error (`Msg "end of file"))
|
|
else
|
|
r b ~off:(read + off) (l - read)
|
|
in
|
|
let open Lwt_result.Infix in
|
|
let bl = Bytes.create 8 in
|
|
r bl 8 >>= fun () ->
|
|
let l = Cstruct.BE.get_uint64 (Cstruct.of_bytes bl) 0 in
|
|
let l_int = Int64.to_int l in (* TODO *)
|
|
let b = Bytes.create l_int in
|
|
r b l_int >|= fun () ->
|
|
Cstruct.of_bytes b)
|
|
(fun e ->
|
|
Logs.err (fun m -> m "Error while reading: %s" (Printexc.to_string e));
|
|
Lwt.return (Error (`Msg "error in read")))
|
|
|
|
let read_cmd fd =
|
|
let open Lwt_result.Infix in
|
|
read fd >>= fun data ->
|
|
Lwt.return (Builder.Asn.cmd_of_cs data)
|
|
|
|
let write fd data =
|
|
Lwt.catch (fun () ->
|
|
let rec w b ?(off = 0) l =
|
|
if l = 0 then
|
|
Lwt.return_unit
|
|
else
|
|
Lwt_unix.write fd b off l >>= fun written ->
|
|
w b ~off:(written + off) (l - written)
|
|
in
|
|
let csl = Cstruct.create 8 in
|
|
Cstruct.BE.set_uint64 csl 0 (Int64.of_int (Cstruct.len data));
|
|
w (Cstruct.to_bytes csl) 8 >>= fun () ->
|
|
w (Cstruct.to_bytes data) (Cstruct.len data) >|= fun () ->
|
|
Ok ())
|
|
(fun e ->
|
|
Logs.err (fun m -> m "Error while writing: %s" (Printexc.to_string e));
|
|
Lwt.return (Error (`Msg "unix error in write")))
|
|
|
|
let write_cmd fd cmd =
|
|
let data = Builder.Asn.cmd_to_cs cmd in
|
|
write fd data
|
|
|
|
let pp_sockaddr ppf = function
|
|
| Lwt_unix.ADDR_UNIX s -> Fmt.pf ppf "unix://%s" s
|
|
| Lwt_unix.ADDR_INET (ip, port) ->
|
|
Fmt.pf ppf "%s:%d" (Unix.string_of_inet_addr ip) port
|
|
|
|
module UM = Map.Make(Uuidm)
|
|
|
|
module S = Binary_heap.Make (struct
|
|
type t = Builder.schedule_item
|
|
let compare { Builder.next = n1 ; _ } { Builder.next = n2 ; _ } =
|
|
Ptime.compare n1 n2
|
|
end)
|
|
|
|
let dummy =
|
|
let job = Builder.{ name = "dummy" ; script = "#nothing" ; files = [] } in
|
|
Builder.{ next = Ptime.epoch ; period = Daily ; job }
|
|
|
|
type t = {
|
|
mutable queue : Builder.job Queue.t ;
|
|
mutable schedule : S.t ;
|
|
mutable running : (Ptime.t * Builder.job * string Lwt_condition.t * (int64 * string) list) UM.t ;
|
|
waiter : unit Lwt_condition.t ;
|
|
dir : Fpath.t ;
|
|
}
|
|
|
|
let p_to_span p =
|
|
let one_hour = 60 * 60 in
|
|
let s = match p with
|
|
| Builder.Hourly -> one_hour
|
|
| Builder.Daily -> 24 * one_hour
|
|
| Builder.Weekly -> 7 * 24 * one_hour
|
|
in
|
|
Ptime.Span.of_int_s s
|
|
|
|
let schedule_job t now period job =
|
|
if Queue.fold (fun acc i ->
|
|
if acc then not (String.equal i.Builder.name job.Builder.name) else acc)
|
|
true t.queue
|
|
then begin
|
|
Queue.add job t.queue;
|
|
Lwt_condition.broadcast t.waiter ();
|
|
end;
|
|
match Ptime.add_span now (p_to_span period) with
|
|
| None -> Logs.err (fun m -> m "ptime add span failed when scheduling job")
|
|
| Some next -> S.add t.schedule Builder.{ next ; period ; job }
|
|
|
|
let schedule t =
|
|
let now = Ptime_clock.now () in
|
|
let rec s_next modified =
|
|
match S.minimum t.schedule with
|
|
| exception Binary_heap.Empty -> modified
|
|
| Builder.{ next ; period ; job } when Ptime.is_later ~than:next now ->
|
|
S.remove t.schedule;
|
|
schedule_job t now period job;
|
|
s_next true
|
|
| _ -> modified
|
|
in
|
|
s_next false
|
|
|
|
let dump, restore =
|
|
let open Rresult.R.Infix in
|
|
let file = "state" in
|
|
(fun t ->
|
|
let state =
|
|
let jobs = Queue.fold (fun acc j -> Builder.Job j :: acc) [] t.queue in
|
|
S.fold (fun s acc -> (Builder.Schedule s) :: acc) t.schedule
|
|
(List.rev jobs)
|
|
in
|
|
let data = Builder.Asn.state_to_cs state in
|
|
let bak = Fpath.(t.dir / file + "tmp") in
|
|
Bos.OS.File.write bak (Cstruct.to_string data) >>= fun () ->
|
|
Bos.OS.U.(error_to_msg (rename bak Fpath.(t.dir / file)))),
|
|
(fun dir ->
|
|
Bos.OS.Dir.create dir >>= fun _ ->
|
|
let to_read = Fpath.(dir / file) in
|
|
let queue = Queue.create ()
|
|
and schedule = S.create ~dummy 13
|
|
and waiter = Lwt_condition.create ()
|
|
in
|
|
Bos.OS.File.exists to_read >>= function
|
|
| false ->
|
|
Logs.warn (fun m -> m "state file does not exist, using empty");
|
|
Ok { queue ; schedule ; running = UM.empty ; waiter ; dir }
|
|
| true ->
|
|
Bos.OS.File.read to_read >>= fun data ->
|
|
Builder.Asn.state_of_cs (Cstruct.of_string data) >>= fun items ->
|
|
let queue, schedule =
|
|
List.fold_left (fun (queue, schedule) -> function
|
|
| Builder.Job j -> Queue.add j queue; (queue, schedule)
|
|
| Builder.Schedule s -> S.add schedule s; (queue, schedule))
|
|
(queue, schedule) items
|
|
in
|
|
Ok { queue ; schedule ; running = UM.empty ; waiter ; dir })
|
|
|
|
let uuid_gen = Uuidm.v4_gen (Random.State.make_self_init ())
|
|
|
|
let job_finished state uuid res data =
|
|
let open Rresult.R.Infix in
|
|
let now = Ptime_clock.now () in
|
|
let started, job, out =
|
|
match UM.find_opt uuid state.running with
|
|
| None -> Ptime.epoch, dummy.Builder.job, []
|
|
| Some (c, j, cond, o) ->
|
|
let res_str = Fmt.to_to_string Builder.pp_execution_result res in
|
|
Lwt_condition.broadcast cond res_str;
|
|
c, j, o
|
|
in
|
|
state.running <- UM.remove uuid state.running;
|
|
let out_dir = Fpath.(state.dir / Uuidm.to_string uuid) in
|
|
Bos.OS.Dir.create out_dir >>= fun _ ->
|
|
let full =
|
|
let out = List.map (fun (d, d') -> Int64.to_int d, d') out in
|
|
let v = job, uuid, out, started, now, res, data in
|
|
Builder.Asn.exec_to_cs v
|
|
in
|
|
Bos.OS.File.write Fpath.(out_dir / "full") (Cstruct.to_string full) >>= fun () ->
|
|
let console_out =
|
|
List.map (fun (delta, txt) ->
|
|
Printf.sprintf "%dms: %S" (Duration.to_ms delta) txt)
|
|
(List.rev out)
|
|
in
|
|
let console = Fpath.(out_dir / "console.log") in
|
|
let started = "started at " ^ Ptime.to_rfc3339 started
|
|
and stopped = "stopped at " ^ Ptime.to_rfc3339 now
|
|
and exited = Fmt.to_to_string Builder.pp_execution_result res
|
|
in
|
|
Bos.OS.File.write_lines console (started :: console_out @ [ exited ; stopped ]) >>= fun () ->
|
|
let out = Fpath.(out_dir / "output") in
|
|
List.iter (fun (path, value) ->
|
|
let p = Fpath.append out path in
|
|
ignore (Bos.OS.Dir.create (Fpath.parent p));
|
|
ignore (Bos.OS.File.write p value))
|
|
data;
|
|
let in_dir = Fpath.(out_dir / "input") in
|
|
ignore (Bos.OS.Dir.create in_dir);
|
|
List.iter (fun (path, value) ->
|
|
let p = Fpath.append in_dir path in
|
|
ignore (Bos.OS.Dir.create (Fpath.parent p));
|
|
ignore (Bos.OS.File.write p value))
|
|
job.Builder.files;
|
|
Bos.OS.File.write Fpath.(in_dir / "script.sh") job.Builder.script
|
|
|
|
let handle t fd addr =
|
|
(* -- client connection:
|
|
(1) read client hello
|
|
(2) send server hello
|
|
-- now there are different paths:
|
|
(3) read request job
|
|
(4) send job
|
|
(5) await job done
|
|
-- or a leftover client connection from previous run:
|
|
(3) read job done
|
|
-- or scheduling a job
|
|
(3) read schedule
|
|
-- or unscheduling a job
|
|
(3) read unschedule
|
|
-- or info
|
|
(3) read info
|
|
(4) send info_reply
|
|
-- or observing
|
|
(3) read observe
|
|
(4) send outputs and job done
|
|
*)
|
|
let open Lwt_result.Infix in
|
|
Logs.app (fun m -> m "client connection from %a" pp_sockaddr addr);
|
|
read_cmd fd >>= function
|
|
| Builder.Client_hello n when n = Builder.cmds ->
|
|
write_cmd fd Builder.(Server_hello cmds) >>= fun () ->
|
|
begin
|
|
read_cmd fd >>= function
|
|
| Builder.Job_requested ->
|
|
Logs.app (fun m -> m "job requested");
|
|
let rec find_job () =
|
|
match Queue.take_opt t.queue with
|
|
| None -> Lwt.bind (Lwt_condition.wait t.waiter) find_job
|
|
| Some job -> Lwt.return job
|
|
in
|
|
Lwt.bind (find_job ()) (fun job ->
|
|
(* TODO set a timer / timeout and re-queue the same job if timeout expired *)
|
|
ignore (dump t);
|
|
let uuid = uuid_gen () in
|
|
(* TODO if this write fails, put job back into queue! *)
|
|
write_cmd fd (Builder.Job_schedule (uuid, job)) >>= fun () ->
|
|
Logs.app (fun m -> m "job %a scheduled %a"
|
|
Uuidm.pp uuid Builder.pp_job job);
|
|
t.running <- UM.add uuid (Ptime_clock.now (), job, Lwt_condition.create (), []) t.running;
|
|
(* await output *)
|
|
let rec read () =
|
|
read_cmd fd >>= function
|
|
| Builder.Output (uuid, data) ->
|
|
Logs.app (fun m -> m "job %a output %S" Uuidm.pp uuid data);
|
|
(match UM.find_opt uuid t.running with
|
|
| None ->
|
|
Logs.err (fun m -> m "unknown %a, discarding %S"
|
|
Uuidm.pp uuid data)
|
|
| Some (created, job, cond, out) ->
|
|
Lwt_condition.broadcast cond data;
|
|
let ts =
|
|
let delta = Ptime.diff (Ptime_clock.now ()) created in
|
|
Duration.of_f (Ptime.Span.to_float_s delta)
|
|
in
|
|
let value = created, job, cond, (ts, data) :: out in
|
|
t.running <- UM.add uuid value t.running);
|
|
read ()
|
|
| Builder.Job_finished (uuid, r, data) ->
|
|
Logs.app (fun m -> m "job %a finished with %a" Uuidm.pp uuid
|
|
Builder.pp_execution_result r);
|
|
ignore (job_finished t uuid r data);
|
|
Lwt.return (Ok ())
|
|
| cmd ->
|
|
Logs.err (fun m -> m "expected output of job finished, got %a"
|
|
Builder.pp_cmd cmd);
|
|
Lwt.return (Ok ())
|
|
in
|
|
read ())
|
|
| Builder.Job_finished (uuid, r, data) ->
|
|
Logs.app (fun m -> m "job %a immediately finished with %a" Uuidm.pp uuid
|
|
Builder.pp_execution_result r);
|
|
ignore (job_finished t uuid r data);
|
|
Lwt.return (Ok ())
|
|
| Builder.Schedule (p, j) ->
|
|
Logs.app (fun m -> m "%a schedule %a" Builder.pp_period p
|
|
Builder.pp_job j);
|
|
if S.fold (fun { Builder.job ; _ } acc ->
|
|
if acc then not (String.equal job.Builder.name j.Builder.name) else acc)
|
|
t.schedule true
|
|
then
|
|
let now = Ptime_clock.now () in
|
|
schedule_job t now p j;
|
|
ignore (dump t);
|
|
Lwt.return (Ok ())
|
|
else begin
|
|
Logs.err (fun m -> m "job with same name already in schedule");
|
|
Lwt.return (Error (`Msg "job name already used"))
|
|
end
|
|
| Builder.Unschedule name ->
|
|
Logs.app (fun m -> m "unschedule %s" name);
|
|
let schedule =
|
|
let s = S.create ~dummy 13 in
|
|
S.iter (fun ({ Builder.job ; _ } as si) ->
|
|
if not (String.equal job.Builder.name name) then
|
|
S.add s si
|
|
else ()) t.schedule;
|
|
s
|
|
and queue =
|
|
let q = Queue.create () in
|
|
Queue.iter (fun job ->
|
|
if not (String.equal job.Builder.name name) then
|
|
Queue.add job q
|
|
else
|
|
())
|
|
t.queue;
|
|
q
|
|
in
|
|
t.schedule <- schedule;
|
|
t.queue <- queue;
|
|
ignore (dump t);
|
|
Lwt.return (Ok ())
|
|
| Builder.Info ->
|
|
Logs.app (fun m -> m "info");
|
|
let reply =
|
|
let schedule = S.fold (fun s acc -> s :: acc) t.schedule []
|
|
and queue = List.rev (Queue.fold (fun acc j -> j :: acc) [] t.queue)
|
|
and running =
|
|
UM.fold (fun uuid (started, job, _, _) acc ->
|
|
(started, uuid, job) :: acc)
|
|
t.running []
|
|
in
|
|
Builder.{ schedule ; queue ; running }
|
|
in
|
|
write_cmd fd (Builder.Info_reply reply)
|
|
| Builder.Observe id ->
|
|
(* two cases: still running or already done *)
|
|
begin match UM.find_opt id t.running with
|
|
| Some (_, _, cond, out) ->
|
|
let open Lwt.Infix in
|
|
Lwt_list.iter_s (fun (_, l) ->
|
|
write_cmd fd (Builder.Output (id, l)) >|= ignore) out >>= fun () ->
|
|
let rec more () =
|
|
Lwt_condition.wait cond >>= fun data ->
|
|
write_cmd fd (Builder.Output (id, data)) >>= fun _ ->
|
|
more ()
|
|
in
|
|
more ()
|
|
| None ->
|
|
let console_file = Fpath.(t.dir / Uuidm.to_string id / "console.log") in
|
|
match Bos.OS.File.read_lines console_file with
|
|
| Error _ -> Lwt.return (Ok ())
|
|
| Ok data ->
|
|
let open Lwt.Infix in
|
|
Lwt_list.iter_s (fun l ->
|
|
write_cmd fd (Builder.Output (id, l)) >|= ignore) data >|= fun () ->
|
|
Ok ()
|
|
end
|
|
| cmd ->
|
|
Logs.err (fun m -> m "unexpected %a" Builder.pp_cmd cmd);
|
|
Lwt_result.lift (Error (`Msg "bad communication"))
|
|
end
|
|
| cmd ->
|
|
Logs.err (fun m -> m "expected client hello with matching version, got %a"
|
|
Builder.pp_cmd cmd);
|
|
Lwt_result.lift (Error (`Msg "bad communication"))
|
|
|
|
let jump () ip port dir =
|
|
Lwt_main.run
|
|
(Sys.(set_signal sigpipe Signal_ignore);
|
|
let d = Fpath.v dir in
|
|
Lwt_result.lift (restore d) >>= fun state ->
|
|
(match state with
|
|
| Ok s -> Lwt.return s | Error `Msg m -> Lwt.fail_with m) >>= fun state ->
|
|
let modified = schedule state in
|
|
ignore (if modified then dump state else Ok ());
|
|
let s = Lwt_unix.(socket PF_INET SOCK_STREAM 0) in
|
|
Lwt_unix.(setsockopt s SO_REUSEADDR true);
|
|
Lwt_unix.(bind s (ADDR_INET (Unix.inet_addr_of_string ip, port))) >>= fun () ->
|
|
Lwt_unix.listen s 10;
|
|
let _ev =
|
|
Lwt_engine.on_timer 60. true (fun _ev ->
|
|
let modified = schedule state in
|
|
ignore (if modified then dump state else Ok ()))
|
|
in
|
|
Lwt.catch (fun () ->
|
|
let rec loop () =
|
|
Lwt_unix.accept s >>= fun (fd, addr) ->
|
|
Lwt.async (fun () ->
|
|
handle state fd addr >>= fun _ ->
|
|
Lwt_unix.close fd);
|
|
loop ()
|
|
in
|
|
loop ())
|
|
(fun e ->
|
|
Lwt.return (Logs.err (fun m -> m "exception %s, shutting down"
|
|
(Printexc.to_string e)))));
|
|
Ok ()
|
|
|
|
let setup_log style_renderer level =
|
|
Fmt_tty.setup_std_outputs ?style_renderer ();
|
|
Logs.set_level level;
|
|
Logs.set_reporter (Logs_fmt.reporter ~dst:Format.std_formatter ())
|
|
|
|
open Cmdliner
|
|
|
|
let setup_log =
|
|
Term.(const setup_log
|
|
$ Fmt_cli.style_renderer ()
|
|
$ Logs_cli.level ())
|
|
|
|
let port =
|
|
let doc = "TCP listen port" in
|
|
Arg.(value & opt int 1234 & info [ "port" ] ~doc)
|
|
|
|
let ip =
|
|
let doc = "Listen IP" in
|
|
Arg.(value & opt string "127.0.0.1" & info [ "ip" ] ~doc)
|
|
|
|
let dir =
|
|
let doc = "Directory for persistent data (defaults to /var/db/builder)" in
|
|
Arg.(value & opt dir "/var/db/builder" & info [ "dir" ] ~doc)
|
|
|
|
let cmd =
|
|
Term.(term_result (const jump $ setup_log $ ip $ port $ dir)),
|
|
Term.info "builder-server" ~version:Builder.version
|
|
|
|
let () = match Term.eval cmd with `Ok () -> exit 0 | _ -> exit 1
|