open Lwt.Infix let read fd = Lwt.catch (fun () -> let rec r b ?(off = 0) l = if l = 0 then Lwt.return (Ok ()) else Lwt_unix.read fd b off l >>= fun read -> if read = 0 then Lwt.return (Error (`Msg "end of file")) else r b ~off:(read + off) (l - read) in let open Lwt_result.Infix in let bl = Bytes.create 8 in r bl 8 >>= fun () -> let l = Cstruct.BE.get_uint64 (Cstruct.of_bytes bl) 0 in let l_int = Int64.to_int l in (* TODO *) let b = Bytes.create l_int in r b l_int >|= fun () -> Cstruct.of_bytes b) (fun e -> Logs.err (fun m -> m "Error while reading: %s" (Printexc.to_string e)); Lwt.return (Error (`Msg "error in read"))) let read_cmd fd = let open Lwt_result.Infix in read fd >>= fun data -> Lwt.return (Builder.Asn.cmd_of_cs data) let write fd data = Lwt.catch (fun () -> let rec w b ?(off = 0) l = if l = 0 then Lwt.return_unit else Lwt_unix.write fd b off l >>= fun written -> w b ~off:(written + off) (l - written) in let csl = Cstruct.create 8 in Cstruct.BE.set_uint64 csl 0 (Int64.of_int (Cstruct.len data)); w (Cstruct.to_bytes csl) 8 >>= fun () -> w (Cstruct.to_bytes data) (Cstruct.len data) >|= fun () -> Ok ()) (fun e -> Logs.err (fun m -> m "Error while writing: %s" (Printexc.to_string e)); Lwt.return (Error (`Msg "unix error in write"))) let write_cmd fd cmd = let data = Builder.Asn.cmd_to_cs cmd in write fd data let pp_sockaddr ppf = function | Lwt_unix.ADDR_UNIX s -> Fmt.pf ppf "unix://%s" s | Lwt_unix.ADDR_INET (ip, port) -> Fmt.pf ppf "%s:%d" (Unix.string_of_inet_addr ip) port module UM = Map.Make(Uuidm) module S = Binary_heap.Make (struct type t = Builder.schedule_item let compare { Builder.next = n1 ; _ } { Builder.next = n2 ; _ } = Ptime.compare n1 n2 end) let dummy = let job = Builder.{ name = "dummy" ; script = "#nothing" ; files = [] } in Builder.{ next = Ptime.epoch ; period = Daily ; job } type t = { mutable queue : Builder.job Queue.t ; mutable schedule : S.t ; mutable running : (Ptime.t * Builder.job * string Lwt_condition.t * (int64 * string) list) UM.t ; waiter : unit Lwt_condition.t ; dir : Fpath.t ; } let p_to_span p = let one_hour = 60 * 60 in let s = match p with | Builder.Hourly -> one_hour | Builder.Daily -> 24 * one_hour | Builder.Weekly -> 7 * 24 * one_hour in Ptime.Span.of_int_s s let schedule_job t now period job = if Queue.fold (fun acc i -> if acc then not (String.equal i.Builder.name job.Builder.name) else acc) true t.queue then begin Queue.add job t.queue; Lwt_condition.broadcast t.waiter (); end; match Ptime.add_span now (p_to_span period) with | None -> Logs.err (fun m -> m "ptime add span failed when scheduling job") | Some next -> S.add t.schedule Builder.{ next ; period ; job } let schedule t = let now = Ptime_clock.now () in let rec s_next modified = match S.minimum t.schedule with | exception Binary_heap.Empty -> modified | Builder.{ next ; period ; job } when Ptime.is_later ~than:next now -> S.remove t.schedule; schedule_job t now period job; s_next true | _ -> modified in s_next false let dump, restore = let open Rresult.R.Infix in let file = "state" in (fun t -> let state = let jobs = Queue.fold (fun acc j -> Builder.Job j :: acc) [] t.queue in S.fold (fun s acc -> (Builder.Schedule s) :: acc) t.schedule (List.rev jobs) in let data = Builder.Asn.state_to_cs state in let bak = Fpath.(t.dir / file + "tmp") in Bos.OS.File.write bak (Cstruct.to_string data) >>= fun () -> Bos.OS.U.(error_to_msg (rename bak Fpath.(t.dir / file)))), (fun dir -> Bos.OS.Dir.create dir >>= fun _ -> let to_read = Fpath.(dir / file) in let queue = Queue.create () and schedule = S.create ~dummy 13 and waiter = Lwt_condition.create () in Bos.OS.File.exists to_read >>= function | false -> Logs.warn (fun m -> m "state file does not exist, using empty"); Ok { queue ; schedule ; running = UM.empty ; waiter ; dir } | true -> Bos.OS.File.read to_read >>= fun data -> Builder.Asn.state_of_cs (Cstruct.of_string data) >>= fun items -> let queue, schedule = List.fold_left (fun (queue, schedule) -> function | Builder.Job j -> Queue.add j queue; (queue, schedule) | Builder.Schedule s -> S.add schedule s; (queue, schedule)) (queue, schedule) items in Ok { queue ; schedule ; running = UM.empty ; waiter ; dir }) let uuid_gen = Uuidm.v4_gen (Random.State.make_self_init ()) let job_finished state uuid res data = let open Rresult.R.Infix in let now = Ptime_clock.now () in let started, job, out = match UM.find_opt uuid state.running with | None -> Ptime.epoch, dummy.Builder.job, [] | Some (c, j, cond, o) -> let res_str = Fmt.to_to_string Builder.pp_execution_result res in Lwt_condition.broadcast cond res_str; c, j, o in state.running <- UM.remove uuid state.running; let out_dir = Fpath.(state.dir / Uuidm.to_string uuid) in Bos.OS.Dir.create out_dir >>= fun _ -> let full = let out = List.map (fun (d, d') -> Int64.to_int d, d') out in let v = job, uuid, out, started, now, res, data in Builder.Asn.exec_to_cs v in Bos.OS.File.write Fpath.(out_dir / "full") (Cstruct.to_string full) >>= fun () -> let console_out = List.map (fun (delta, txt) -> Printf.sprintf "%dms: %S" (Duration.to_ms delta) txt) (List.rev out) in let console = Fpath.(out_dir / "console.log") in let started = "started at " ^ Ptime.to_rfc3339 started and stopped = "stopped at " ^ Ptime.to_rfc3339 now and exited = Fmt.to_to_string Builder.pp_execution_result res in Bos.OS.File.write_lines console (started :: console_out @ [ exited ; stopped ]) >>= fun () -> let out = Fpath.(out_dir / "output") in List.iter (fun (path, value) -> let p = Fpath.append out path in ignore (Bos.OS.Dir.create (Fpath.parent p)); ignore (Bos.OS.File.write p value)) data; let in_dir = Fpath.(out_dir / "input") in ignore (Bos.OS.Dir.create in_dir); List.iter (fun (path, value) -> let p = Fpath.append in_dir path in ignore (Bos.OS.Dir.create (Fpath.parent p)); ignore (Bos.OS.File.write p value)) job.Builder.files; Bos.OS.File.write Fpath.(in_dir / "script.sh") job.Builder.script let handle t fd addr = (* -- client connection: (1) read client hello (2) send server hello -- now there are different paths: (3) read request job (4) send job (5) await job done -- or a leftover client connection from previous run: (3) read job done -- or scheduling a job (3) read schedule -- or unscheduling a job (3) read unschedule -- or info (3) read info (4) send info_reply -- or observing (3) read observe (4) send outputs and job done *) let open Lwt_result.Infix in Logs.app (fun m -> m "client connection from %a" pp_sockaddr addr); read_cmd fd >>= function | Builder.Client_hello n when n = Builder.cmds -> write_cmd fd Builder.(Server_hello cmds) >>= fun () -> begin read_cmd fd >>= function | Builder.Job_requested -> Logs.app (fun m -> m "job requested"); let rec find_job () = match Queue.take_opt t.queue with | None -> Lwt.bind (Lwt_condition.wait t.waiter) find_job | Some job -> Lwt.return job in Lwt.bind (find_job ()) (fun job -> (* TODO set a timer / timeout and re-queue the same job if timeout expired *) ignore (dump t); let uuid = uuid_gen () in (* TODO if this write fails, put job back into queue! *) write_cmd fd (Builder.Job_schedule (uuid, job)) >>= fun () -> Logs.app (fun m -> m "job %a scheduled %a" Uuidm.pp uuid Builder.pp_job job); t.running <- UM.add uuid (Ptime_clock.now (), job, Lwt_condition.create (), []) t.running; (* await output *) let rec read () = read_cmd fd >>= function | Builder.Output (uuid, data) -> Logs.app (fun m -> m "job %a output %S" Uuidm.pp uuid data); (match UM.find_opt uuid t.running with | None -> Logs.err (fun m -> m "unknown %a, discarding %S" Uuidm.pp uuid data) | Some (created, job, cond, out) -> Lwt_condition.broadcast cond data; let ts = let delta = Ptime.diff (Ptime_clock.now ()) created in Duration.of_f (Ptime.Span.to_float_s delta) in let value = created, job, cond, (ts, data) :: out in t.running <- UM.add uuid value t.running); read () | Builder.Job_finished (uuid, r, data) -> Logs.app (fun m -> m "job %a finished with %a" Uuidm.pp uuid Builder.pp_execution_result r); ignore (job_finished t uuid r data); Lwt.return (Ok ()) | cmd -> Logs.err (fun m -> m "expected output of job finished, got %a" Builder.pp_cmd cmd); Lwt.return (Ok ()) in read ()) | Builder.Job_finished (uuid, r, data) -> Logs.app (fun m -> m "job %a immediately finished with %a" Uuidm.pp uuid Builder.pp_execution_result r); ignore (job_finished t uuid r data); Lwt.return (Ok ()) | Builder.Schedule (p, j) -> Logs.app (fun m -> m "%a schedule %a" Builder.pp_period p Builder.pp_job j); if S.fold (fun { Builder.job ; _ } acc -> if acc then not (String.equal job.Builder.name j.Builder.name) else acc) t.schedule true then let now = Ptime_clock.now () in schedule_job t now p j; ignore (dump t); Lwt.return (Ok ()) else begin Logs.err (fun m -> m "job with same name already in schedule"); Lwt.return (Error (`Msg "job name already used")) end | Builder.Unschedule name -> Logs.app (fun m -> m "unschedule %s" name); let schedule = let s = S.create ~dummy 13 in S.iter (fun ({ Builder.job ; _ } as si) -> if not (String.equal job.Builder.name name) then S.add s si else ()) t.schedule; s and queue = let q = Queue.create () in Queue.iter (fun job -> if not (String.equal job.Builder.name name) then Queue.add job q else ()) t.queue; q in t.schedule <- schedule; t.queue <- queue; ignore (dump t); Lwt.return (Ok ()) | Builder.Info -> Logs.app (fun m -> m "info"); let reply = let schedule = S.fold (fun s acc -> s :: acc) t.schedule [] and queue = List.rev (Queue.fold (fun acc j -> j :: acc) [] t.queue) and running = UM.fold (fun uuid (started, job, _, _) acc -> (started, uuid, job) :: acc) t.running [] in Builder.{ schedule ; queue ; running } in write_cmd fd (Builder.Info_reply reply) | Builder.Observe id -> (* two cases: still running or already done *) begin match UM.find_opt id t.running with | Some (_, _, cond, out) -> let open Lwt.Infix in Lwt_list.iter_s (fun (_, l) -> write_cmd fd (Builder.Output (id, l)) >|= ignore) out >>= fun () -> let rec more () = Lwt_condition.wait cond >>= fun data -> write_cmd fd (Builder.Output (id, data)) >>= fun _ -> more () in more () | None -> let console_file = Fpath.(t.dir / Uuidm.to_string id / "console.log") in match Bos.OS.File.read_lines console_file with | Error _ -> Lwt.return (Ok ()) | Ok data -> let open Lwt.Infix in Lwt_list.iter_s (fun l -> write_cmd fd (Builder.Output (id, l)) >|= ignore) data >|= fun () -> Ok () end | cmd -> Logs.err (fun m -> m "unexpected %a" Builder.pp_cmd cmd); Lwt_result.lift (Error (`Msg "bad communication")) end | cmd -> Logs.err (fun m -> m "expected client hello with matching version, got %a" Builder.pp_cmd cmd); Lwt_result.lift (Error (`Msg "bad communication")) let jump () ip port dir = Lwt_main.run (Sys.(set_signal sigpipe Signal_ignore); let d = Fpath.v dir in Lwt_result.lift (restore d) >>= fun state -> (match state with | Ok s -> Lwt.return s | Error `Msg m -> Lwt.fail_with m) >>= fun state -> let modified = schedule state in ignore (if modified then dump state else Ok ()); let s = Lwt_unix.(socket PF_INET SOCK_STREAM 0) in Lwt_unix.(setsockopt s SO_REUSEADDR true); Lwt_unix.(bind s (ADDR_INET (Unix.inet_addr_of_string ip, port))) >>= fun () -> Lwt_unix.listen s 10; let _ev = Lwt_engine.on_timer 60. true (fun _ev -> let modified = schedule state in ignore (if modified then dump state else Ok ())) in Lwt.catch (fun () -> let rec loop () = Lwt_unix.accept s >>= fun (fd, addr) -> Lwt.async (fun () -> handle state fd addr >>= fun _ -> Lwt_unix.close fd); loop () in loop ()) (fun e -> Lwt.return (Logs.err (fun m -> m "exception %s, shutting down" (Printexc.to_string e))))); Ok () let setup_log style_renderer level = Fmt_tty.setup_std_outputs ?style_renderer (); Logs.set_level level; Logs.set_reporter (Logs_fmt.reporter ~dst:Format.std_formatter ()) open Cmdliner let setup_log = Term.(const setup_log $ Fmt_cli.style_renderer () $ Logs_cli.level ()) let port = let doc = "TCP listen port" in Arg.(value & opt int 1234 & info [ "port" ] ~doc) let ip = let doc = "Listen IP" in Arg.(value & opt string "127.0.0.1" & info [ "ip" ] ~doc) let dir = let doc = "Directory for persistent data (defaults to /var/db/builder)" in Arg.(value & opt dir "/var/db/builder" & info [ "dir" ] ~doc) let cmd = Term.(term_result (const jump $ setup_log $ ip $ port $ dir)), Term.info "builder-server" ~version:Builder.version let () = match Term.eval cmd with `Ok () -> exit 0 | _ -> exit 1