commit fc4afd870f759784faf7ea181a84eb203726d3b0 Author: Hannes Mehnert Date: Tue Nov 24 22:02:45 2020 +0100 initial commit diff --git a/LICENSE.md b/LICENSE.md new file mode 100644 index 0000000..e69de29 diff --git a/README.md b/README.md new file mode 100644 index 0000000..618cc0f --- /dev/null +++ b/README.md @@ -0,0 +1,34 @@ +# builder - scheduling and executing jobs + +This consists of two programs, a client and a server. The single server +contains a queue of jobs, which are consumed by a client. Any number of +clients can be connected to the server. + +The server keeps persistent state of the job queue (so restarts / crashes are +dealt with). A client connects, provides some information about itself, and +then waits for a job. Once a job is read and accepted, it is executed by the +client. Resulting artifacts can be transferred by the client to the server. +2 +The server has the ability to schedule jobs at regular intervals - similar to +crontab. + +Handled and unhandled error conditions: +- client execution fails (timeout, restart, killed): not handled +- client execution gets a signal: reported to server +- client can't write job data files -> failure is captured and reported +- client can't read job output -> logged to client's console (without artifacts gathered) +- client errors when submitting console output -> ignored +- client errors when submitting build artifacts -> retry +- there's no explicit ACK for packets + +Left to do: +- client should queue up console output on server connection failure (and continue reading output at the same time) +- client should inform when data passed to it and artifacts overlap (name / checksum) +- client could sandbox the executed script a bit more (maybe?) +- client should have a timeout for the script to be executed +- separate stdout and stderr to be sent to the server? +- UI +- retrieve artifacts even if execution failed +- the running jobs are not stored onto disk, which may result in unexpected behaviour +- should instead once a job is scheduled the uuid and job information being dumped to disk already (also avoids dummy job dump) +- directory traversals (server folds over output directory and collects files) diff --git a/app/builder_client.ml b/app/builder_client.ml new file mode 100644 index 0000000..3118546 --- /dev/null +++ b/app/builder_client.ml @@ -0,0 +1,240 @@ +open Rresult.R.Infix + +let sh = "script.sh" + +let prng = Random.State.make_self_init () + +let rec tmp_dirname () = + let rnd = Random.State.bits prng land 0xFFFFFF in + let name = + Filename.concat (Filename.get_temp_dir_name ()) + (Printf.sprintf "builder-%06x" rnd) + in + try + let _stat = Unix.lstat name + in + tmp_dirname () + with + _ -> name + +let read_console_write_network s fd uuid = + let ch = Unix.in_channel_of_descr fd in + let rec read_write () = + try + let line = input_line ch in + Builder.write_cmd s (Builder.Output (uuid, line)) |> ignore; (* TODO *) + read_write () + with + End_of_file -> () + in + read_write (); + exit 0 + +let prepare_fs job = + let tmpdir = Fpath.v (tmp_dirname ()) in + at_exit (fun () -> ignore (Bos.OS.Dir.delete ~recurse:true tmpdir)); + Bos.OS.Dir.create tmpdir >>= fun did_not_exist -> + if not did_not_exist then + Error (`Msg "path already existed") + else + Bos.OS.Dir.set_current tmpdir >>= fun () -> + List.fold_left (fun acc (f, v) -> + acc >>= fun () -> + let path = Fpath.append tmpdir f in + Bos.OS.Dir.create (Fpath.parent path) >>= fun _ -> + Bos.OS.File.write (Fpath.append tmpdir f) v) + (Ok ()) job.Builder.files >>= fun () -> + Bos.OS.File.write ~mode:500 Fpath.(tmpdir / sh) job.Builder.script >>| fun () -> + tmpdir + +let collect_output files tmpdir = + let all_files = + let dirs = [ tmpdir ] in + let collect path acc = path :: acc in + match Bos.OS.Path.fold ~elements:`Files collect [] dirs with + | Ok files -> files + | Error `Msg msg -> + Logs.warn (fun m -> m "folding resulted in an error %s" msg); + [] + in + List.fold_left (fun acc f -> + match Fpath.rem_prefix tmpdir f with + | None -> + Logs.warn (fun m -> m "couldn't remove tmpdir prefix from %a" + Fpath.pp f); + acc + | Some name when Fpath.to_string name = sh -> + (* ignoring the script.sh itself *) + acc + | Some name when List.exists (fun (p, _) -> Fpath.equal p name) files -> + (* ignoring all inputs *) + acc + | Some name -> + match Bos.OS.File.read f with + | Ok data -> (name, data) :: acc + | Error `Msg e -> + Logs.err (fun m -> m "error reading %a: %s" Fpath.pp f e); + acc) + [] all_files + +let execute_job s uuid job = + match prepare_fs job with + | Error `Msg msg -> Builder.Msg msg, [] + | Ok tmpdir -> + let to_read, out = Unix.pipe () in + let f = Unix.fork () in + if f = 0 then + (* child *) + read_console_write_network s to_read uuid + else (* parent *) + let toexec = Fpath.(to_string (tmpdir / sh)) in + let pid = + Unix.create_process "/bin/sh" [| "-e" ; toexec |] Unix.stdin out out + in + let r = Unix.waitpid [] pid in + Unix.kill f 9; + match snd r with + | Unix.WEXITED 0 -> Builder.Exited 0, collect_output job.Builder.files tmpdir + | Unix.WEXITED c -> Builder.Exited c, [] + | Unix.WSIGNALED s -> Builder.Signalled s, [] + | Unix.WSTOPPED s -> Builder.Stopped s, [] + +let jump () (host, port) = + (* client semantics: + - 1 connect to server + - 2 send client hello + - 3 await server hello, check version agreement + - 4 send job request + - 5 read from server until job is received + - 6 dump files, execute job (pipe + fork to send output to server) + - 7 send job result to server + + if while in 1-5 server communication fails, start from 1 + if in 6 server communication fails, drop data (for now) [and retry] + if in 7 server communication fails, retry until publishing result is done + *) + let connect () = + try + let sockaddr = Unix.ADDR_INET (host, port) in + let s = Unix.(socket PF_INET SOCK_STREAM 0) in + Unix.(connect s sockaddr); + Ok s + with + | Unix.Unix_error (err, f, _) -> + Logs.err (fun m -> m "unix error in %s: %s" f (Unix.error_message err)); + Error (`Msg "connect failure") + and disconnect s = + Unix.close s + and timeout () = Unix.sleepf 2. + in + let disc_on_err s = function Ok x -> Ok x | Error _ as e -> disconnect s; e in + let init () = + connect () >>= fun s -> + let hello = Builder.(Client_hello cmds) in + disc_on_err s (Builder.write_cmd s hello) >>= fun () -> + disc_on_err s (Builder.read_cmd s) >>| fun cmd -> + s, cmd + in + let rec establish () = + match init () with + | Error `Msg e -> + Logs.warn (fun m -> m "error %s connecting, trying again in a bit" e); + timeout (); + establish () + | Ok cmd -> Ok cmd + in + let good_server_hello s = function + | Builder.Server_hello x when x = Builder.cmds-> Ok () + | cmd -> + Logs.err (fun m -> m "expected Server Hello with matching version, got %a" + Builder.pp_cmd cmd); + disconnect s; + Error (`Msg "bad communication") + in + let rec hs () = + establish () >>= fun (s, cmd) -> + good_server_hello s cmd >>= fun () -> + match + disc_on_err s (Builder.write_cmd s Builder.Job_requested >>= fun () -> + Builder.read_cmd s) + with + | Ok cmd -> Ok (s, cmd) + | Error `Msg msg -> + Logs.warn (fun m -> m "received error %s while waiting for job, retry" msg); + hs () + in + let submit_success s fini = + match Builder.write_cmd s fini with + | Ok () -> Ok () + | Error `Msg msg -> + Logs.err (fun m -> m "error %s while submitting result" msg); + let rec try_again n = + match + timeout (); + establish () + with + | Ok (s, cmd) -> + good_server_hello s cmd >>= fun () -> + begin match disc_on_err s (Builder.write_cmd s fini) with + | Ok () -> Ok () + | Error `Msg msg -> + Logs.warn (fun m -> m "failed to submit result %s (n = %d)" msg n); + try_again (succ n) + end + | Error `Msg msg -> + Logs.warn (fun m -> m "failed to establish connection %s (n = %d)" msg n); + try_again (succ n) + in + try_again 0 + in + hs () >>= fun (s, cmd) -> + (match cmd with + | Builder.Job_schedule (uuid, job) -> + Logs.app (fun m -> m "received job uuid %a: %a" Uuidm.pp uuid + Builder.pp_job job); + let r, data = execute_job s uuid job in + let fini = Builder.Job_finished (uuid, r, data) in + submit_success s fini + | cmd -> + Logs.err (fun m -> m "expected Job, got %a" Builder.pp_cmd cmd); + disconnect s; + Error (`Msg "bad communication")) >>= fun () -> + disconnect s; + Ok () + +let setup_log style_renderer level = + Fmt_tty.setup_std_outputs ?style_renderer (); + Logs.set_level level; + Logs.set_reporter (Logs_fmt.reporter ~dst:Format.std_formatter ()) + +open Cmdliner + +let host_port : (Unix.inet_addr * int) Arg.converter = + let parse s = + match String.split_on_char ':' s with + | [ hostname ; port ] -> + begin try + `Ok (Unix.inet_addr_of_string hostname, int_of_string port) + with + Not_found -> `Error "failed to parse IP:port" + end + | _ -> `Error "broken: no port specified" + in + parse, fun ppf (h, p) -> Format.fprintf ppf "%s:%d" + (Unix.string_of_inet_addr h) p + +let remote = + let doc = "The remote host:port to connect to" in + Arg.(value & opt host_port (Unix.inet_addr_loopback, 1234) & + info [ "r" ; "remote" ] ~doc ~docv:"IP:PORT") + +let setup_log = + Term.(const setup_log + $ Fmt_cli.style_renderer () + $ Logs_cli.level ()) + +let cmd = + Term.(term_result (const jump $ setup_log $ remote)), + Term.info "builder-client" ~version:Builder.version + +let () = match Term.eval cmd with `Ok () -> exit 0 | _ -> exit 1 diff --git a/app/client.ml b/app/client.ml new file mode 100644 index 0000000..c3f9daf --- /dev/null +++ b/app/client.ml @@ -0,0 +1,174 @@ +open Rresult.R.Infix + +let connect (host, port) = + let connect () = + try + let sockaddr = Unix.ADDR_INET (host, port) in + let s = Unix.(socket PF_INET SOCK_STREAM 0) in + Unix.(connect s sockaddr); + Ok s + with + | Unix.Unix_error (err, f, _) -> + Logs.err (fun m -> m "unix error in %s: %s" f (Unix.error_message err)); + Error (`Msg "connect failure") + in + connect () >>= fun s -> + let hello = Builder.(Client_hello cmds) in + Builder.write_cmd s hello >>= fun () -> + Builder.read_cmd s >>= function + | Builder.Server_hello x when x = Builder.cmds -> Ok s + | cmd -> + Logs.err (fun m -> m "expected Server Hello with matching version, got %a" + Builder.pp_cmd cmd); + Error (`Msg "bad communication") + +let observe () remote id = + match Uuidm.of_string id with + | None -> Error (`Msg "error parsing uuid") + | Some uuid -> + connect remote >>= fun s -> + Builder.write_cmd s (Builder.Observe uuid) >>= fun () -> + let rec read () = + Builder.read_cmd s >>= fun cmd -> + Logs.app (fun m -> m "%a" Builder.pp_cmd cmd); + read () + in + read () + +let info_ () remote = + connect remote >>= fun s -> + Builder.write_cmd s Builder.Info >>= fun () -> + Builder.read_cmd s >>= fun cmd -> + Logs.app (fun m -> m "%a" Builder.pp_cmd cmd); + Ok () + +let unschedule () remote name = + connect remote >>= fun s -> + Builder.write_cmd s (Builder.Unschedule name) + +let schedule () remote name script period dir = + let files = + match dir with + | None -> [] + | Some f -> + let dir = Fpath.v f in + let all_files = + let dirs = [ dir ] in + let collect path acc = path :: acc in + match Bos.OS.Path.fold ~elements:`Files collect [] dirs with + | Ok files -> files + | Error `Msg msg -> + Logs.warn (fun m -> m "folding resulted in an error %s" msg); + [] + in + List.fold_left (fun acc f -> + match Fpath.rem_prefix dir f with + | None -> + Logs.warn (fun m -> m "couldn't remove prefix from %a" + Fpath.pp f); + acc + | Some name -> + match Bos.OS.File.read f with + | Ok data -> (name, data) :: acc + | Error `Msg e -> + Logs.err (fun m -> m "error reading %a: %s" Fpath.pp f e); + acc) + [] all_files + in + Bos.OS.File.read (Fpath.v script) >>= fun script -> + let job = Builder.{ name ; script ; files } in + connect remote >>= fun s -> + Builder.write_cmd s (Builder.Schedule (period, job)) + +let help () man_format cmds = function + | None -> `Help (`Pager, None) + | Some t when List.mem t cmds -> `Help (man_format, Some t) + | Some x -> + print_endline ("unknown command '" ^ x ^ "', available commands:"); + List.iter print_endline cmds; + `Ok () + +let setup_log style_renderer level = + Fmt_tty.setup_std_outputs ?style_renderer (); + Logs.set_level level; + Logs.set_reporter (Logs_fmt.reporter ~dst:Format.std_formatter ()) + +open Cmdliner + +let host_port : (Unix.inet_addr * int) Arg.converter = + let parse s = + match String.split_on_char ':' s with + | [ hostname ; port ] -> + begin try + `Ok (Unix.inet_addr_of_string hostname, int_of_string port) + with + Not_found -> `Error "failed to parse IP:port" + end + | _ -> `Error "broken: no port specified" + in + parse, fun ppf (h, p) -> Format.fprintf ppf "%s:%d" + (Unix.string_of_inet_addr h) p + +let remote = + let doc = "The remote host:port to connect to" in + Arg.(value & opt host_port (Unix.inet_addr_loopback, 1234) & + info [ "r" ; "remote" ] ~doc ~docv:"IP:PORT") + +let nam = + let doc = "The job name" in + Arg.(required & pos 0 (some string) None & info [ ] ~doc ~docv:"NAME") + +let id = + let doc = "The job ID" in + Arg.(required & pos 0 (some string) None & info [ ] ~doc ~docv:"ID") + +let p : Builder.period Arg.converter = + let parse = function + | "hourly" -> `Ok Builder.Hourly + | "daily" -> `Ok Builder.Daily + | "weekly" -> `Ok Builder.Weekly + | s -> `Error ("failed to parse period " ^ s) + in + parse, Builder.pp_period + +let period = + let doc = "The periodic execution interval" in + Arg.(value & opt p Builder.Hourly & info [ "period" ] ~doc ~docv:"PERIOD") + +let dir = + let doc = "The directory with supplementary material to embed into the job" in + Arg.(value & opt (some dir) None & info [ "dir" ] ~doc ~docv:"DIR") + +let script = + let doc = "The script to execute" in + Arg.(required & pos 1 (some file) None & info [ ] ~doc ~docv:"FILE") + +let setup_log = + Term.(const setup_log + $ Fmt_cli.style_renderer () + $ Logs_cli.level ()) + +let observe_cmd = + Term.(term_result (const observe $ setup_log $ remote $ id)), + Term.info "observe" + +let info_cmd = + Term.(term_result (const info_ $ setup_log $ remote)), + Term.info "info" + +let unschedule_cmd = + Term.(term_result (const unschedule $ setup_log $ remote $ nam)), + Term.info "unschedule" + +let schedule_cmd = + Term.(term_result (const schedule $ setup_log $ remote $ nam $ script $ period $ dir)), + Term.info "schedule" + +let help_cmd = + let doc = "Builder client" in + Term.(ret (const help $ setup_log $ Term.man_format $ Term.choice_names $ Term.pure None)), + Term.info "builder" ~version:Builder.version ~doc + +let cmds = [ help_cmd ; schedule_cmd ; unschedule_cmd ; info_cmd ; observe_cmd ] + +let () = match Term.eval_choice help_cmd cmds with `Ok () -> exit 0 | _ -> exit 1 diff --git a/app/dune b/app/dune new file mode 100644 index 0000000..36c6b9f --- /dev/null +++ b/app/dune @@ -0,0 +1,17 @@ +(executable + (name builder_client) + (public_name builder-client) + (modules builder_client) + (libraries cmdliner builder logs.fmt fmt.cli logs.cli fmt.tty bos)) + +(executable + (name client) + (public_name client) + (modules client) + (libraries cmdliner builder logs.fmt fmt.cli logs.cli fmt.tty bos)) + +(executable + (name server) + (public_name builder-server) + (modules server) + (libraries cmdliner builder logs.fmt fmt.cli logs.cli fmt.tty lwt lwt.unix bos bheap ptime.clock.os duration)) diff --git a/app/server.ml b/app/server.ml new file mode 100644 index 0000000..7f755da --- /dev/null +++ b/app/server.ml @@ -0,0 +1,414 @@ +open Lwt.Infix + +let read fd = + Lwt.catch (fun () -> + let rec r b ?(off = 0) l = + if l = 0 then + Lwt.return (Ok ()) + else + Lwt_unix.read fd b off l >>= fun read -> + if read = 0 then + Lwt.return (Error (`Msg "end of file")) + else + r b ~off:(read + off) (l - read) + in + let open Lwt_result.Infix in + let bl = Bytes.create 8 in + r bl 8 >>= fun () -> + let l = Cstruct.BE.get_uint64 (Cstruct.of_bytes bl) 0 in + let l_int = Int64.to_int l in (* TODO *) + let b = Bytes.create l_int in + r b l_int >|= fun () -> + Cstruct.of_bytes b) + (fun e -> + Logs.err (fun m -> m "Error while reading: %s" (Printexc.to_string e)); + Lwt.return (Error (`Msg "error in read"))) + +let read_cmd fd = + let open Lwt_result.Infix in + read fd >>= fun data -> + Lwt.return (Builder.Asn.cmd_of_cs data) + +let write fd data = + Lwt.catch (fun () -> + let rec w b ?(off = 0) l = + if l = 0 then + Lwt.return_unit + else + Lwt_unix.write fd b off l >>= fun written -> + w b ~off:(written + off) (l - written) + in + let csl = Cstruct.create 8 in + Cstruct.BE.set_uint64 csl 0 (Int64.of_int (Cstruct.len data)); + w (Cstruct.to_bytes csl) 8 >>= fun () -> + w (Cstruct.to_bytes data) (Cstruct.len data) >|= fun () -> + Ok ()) + (fun e -> + Logs.err (fun m -> m "Error while writing: %s" (Printexc.to_string e)); + Lwt.return (Error (`Msg "unix error in write"))) + +let write_cmd fd cmd = + let data = Builder.Asn.cmd_to_cs cmd in + write fd data + +let pp_sockaddr ppf = function + | Lwt_unix.ADDR_UNIX s -> Fmt.pf ppf "unix://%s" s + | Lwt_unix.ADDR_INET (ip, port) -> + Fmt.pf ppf "%s:%d" (Unix.string_of_inet_addr ip) port + +module UM = Map.Make(Uuidm) + +module S = Binary_heap.Make (struct + type t = Builder.schedule_item + let compare { Builder.next = n1 ; _ } { Builder.next = n2 ; _ } = + Ptime.compare n1 n2 + end) + +let dummy = + let job = Builder.{ name = "dummy" ; script = "#nothing" ; files = [] } in + Builder.{ next = Ptime.epoch ; period = Daily ; job } + +type t = { + mutable queue : Builder.job Queue.t ; + mutable schedule : S.t ; + mutable running : (Ptime.t * Builder.job * string Lwt_condition.t * (int64 * string) list) UM.t ; + waiter : unit Lwt_condition.t ; + dir : Fpath.t ; +} + +let p_to_span p = + let one_hour = 60 * 60 in + let s = match p with + | Builder.Hourly -> one_hour + | Builder.Daily -> 24 * one_hour + | Builder.Weekly -> 7 * 24 * one_hour + in + Ptime.Span.of_int_s s + +let schedule_job t now period job = + if Queue.fold (fun acc i -> + if acc then not (String.equal i.Builder.name job.Builder.name) else acc) + true t.queue + then begin + Queue.add job t.queue; + Lwt_condition.broadcast t.waiter (); + end; + match Ptime.add_span now (p_to_span period) with + | None -> Logs.err (fun m -> m "ptime add span failed when scheduling job") + | Some next -> S.add t.schedule Builder.{ next ; period ; job } + +let schedule t = + let now = Ptime_clock.now () in + let rec s_next modified = + match S.minimum t.schedule with + | exception Binary_heap.Empty -> modified + | Builder.{ next ; period ; job } when Ptime.is_later ~than:next now -> + S.remove t.schedule; + schedule_job t now period job; + s_next true + | _ -> modified + in + s_next false + +let dump, restore = + let open Rresult.R.Infix in + let file = "state" in + (fun t -> + let state = + let jobs = Queue.fold (fun acc j -> Builder.Job j :: acc) [] t.queue in + S.fold (fun s acc -> (Builder.Schedule s) :: acc) t.schedule + (List.rev jobs) + in + let data = Builder.Asn.state_to_cs state in + let bak = Fpath.(t.dir / file + "tmp") in + Bos.OS.File.write bak (Cstruct.to_string data) >>= fun () -> + Bos.OS.U.(error_to_msg (rename bak Fpath.(t.dir / file)))), + (fun dir -> + Bos.OS.Dir.create dir >>= fun _ -> + let to_read = Fpath.(dir / file) in + let queue = Queue.create () + and schedule = S.create ~dummy 13 + and waiter = Lwt_condition.create () + in + Bos.OS.File.exists to_read >>= function + | false -> + Logs.warn (fun m -> m "state file does not exist, using empty"); + Ok { queue ; schedule ; running = UM.empty ; waiter ; dir } + | true -> + Bos.OS.File.read to_read >>= fun data -> + Builder.Asn.state_of_cs (Cstruct.of_string data) >>= fun items -> + let queue, schedule = + List.fold_left (fun (queue, schedule) -> function + | Builder.Job j -> Queue.add j queue; (queue, schedule) + | Builder.Schedule s -> S.add schedule s; (queue, schedule)) + (queue, schedule) items + in + Ok { queue ; schedule ; running = UM.empty ; waiter ; dir }) + +let uuid_gen = Uuidm.v4_gen (Random.State.make_self_init ()) + +let job_finished state uuid res data = + let open Rresult.R.Infix in + let now = Ptime_clock.now () in + let started, job, out = + match UM.find_opt uuid state.running with + | None -> Ptime.epoch, dummy.Builder.job, [] + | Some (c, j, cond, o) -> + let res_str = Fmt.to_to_string Builder.pp_execution_result res in + Lwt_condition.broadcast cond res_str; + c, j, o + in + state.running <- UM.remove uuid state.running; + let out_dir = Fpath.(state.dir / Uuidm.to_string uuid) in + Bos.OS.Dir.create out_dir >>= fun _ -> + let full = + let out = List.map (fun (d, d') -> Int64.to_int d, d') out in + let v = job, uuid, out, started, now, res, data in + Builder.Asn.exec_to_cs v + in + Bos.OS.File.write Fpath.(out_dir / "full") (Cstruct.to_string full) >>= fun () -> + let console_out = + List.map (fun (delta, txt) -> + Printf.sprintf "%dms: %S" (Duration.to_ms delta) txt) + (List.rev out) + in + let console = Fpath.(out_dir / "console.log") in + let started = "started at " ^ Ptime.to_rfc3339 started + and stopped = "stopped at " ^ Ptime.to_rfc3339 now + and exited = Fmt.to_to_string Builder.pp_execution_result res + in + Bos.OS.File.write_lines console (started :: console_out @ [ exited ; stopped ]) >>= fun () -> + let out = Fpath.(out_dir / "output") in + List.iter (fun (path, value) -> + let p = Fpath.append out path in + ignore (Bos.OS.Dir.create (Fpath.parent p)); + ignore (Bos.OS.File.write p value)) + data; + let in_dir = Fpath.(out_dir / "input") in + List.iter (fun (path, value) -> + let p = Fpath.append in_dir path in + ignore (Bos.OS.Dir.create (Fpath.parent p)); + ignore (Bos.OS.File.write p value)) + job.Builder.files; + Bos.OS.File.write Fpath.(in_dir / "script.sh") job.Builder.script + +let handle t fd addr = + (* -- client connection: + (1) read client hello + (2) send server hello + -- now there are different paths: + (3) read request job + (4) send job + (5) await job done + -- or a leftover client connection from previous run: + (3) read job done + -- or scheduling a job + (3) read schedule + -- or unscheduling a job + (3) read unschedule + -- or info + (3) read info + (4) send info_reply + -- or observing + (3) read observe + (4) send outputs and job done + *) + let open Lwt_result.Infix in + Logs.app (fun m -> m "client connection from %a" pp_sockaddr addr); + read_cmd fd >>= function + | Builder.Client_hello n when n = Builder.cmds -> + write_cmd fd Builder.(Server_hello cmds) >>= fun () -> + begin + read_cmd fd >>= function + | Builder.Job_requested -> + Logs.app (fun m -> m "job requested"); + let rec find_job () = + match Queue.take_opt t.queue with + | None -> Lwt.bind (Lwt_condition.wait t.waiter) find_job + | Some job -> Lwt.return job + in + Lwt.bind (find_job ()) (fun job -> + ignore (dump t); + let uuid = uuid_gen () in + write_cmd fd (Builder.Job_schedule (uuid, job)) >>= fun () -> + Logs.app (fun m -> m "job %a scheduled %a" + Uuidm.pp uuid Builder.pp_job job); + t.running <- UM.add uuid (Ptime_clock.now (), job, Lwt_condition.create (), []) t.running; + (* await output *) + let rec read () = + read_cmd fd >>= function + | Builder.Output (uuid, data) -> + Logs.app (fun m -> m "job %a output %S" Uuidm.pp uuid data); + (match UM.find_opt uuid t.running with + | None -> + Logs.err (fun m -> m "unknown %a, discarding %S" + Uuidm.pp uuid data) + | Some (created, job, cond, out) -> + Lwt_condition.broadcast cond data; + let ts = + let delta = Ptime.diff (Ptime_clock.now ()) created in + Duration.of_f (Ptime.Span.to_float_s delta) + in + let value = created, job, cond, (ts, data) :: out in + t.running <- UM.add uuid value t.running); + read () + | Builder.Job_finished (uuid, r, data) -> + Logs.app (fun m -> m "job %a finished with %a" Uuidm.pp uuid + Builder.pp_execution_result r); + ignore (job_finished t uuid r data); + Lwt.return (Ok ()) + | cmd -> + Logs.err (fun m -> m "expected output of job finished, got %a" + Builder.pp_cmd cmd); + Lwt.return (Ok ()) + in + read ()) + | Builder.Job_finished (uuid, r, data) -> + Logs.app (fun m -> m "job %a immediately finished with %a" Uuidm.pp uuid + Builder.pp_execution_result r); + ignore (job_finished t uuid r data); + Lwt.return (Ok ()) + | Builder.Schedule (p, j) -> + Logs.app (fun m -> m "%a schedule %a" Builder.pp_period p + Builder.pp_job j); + if S.fold (fun { Builder.job ; _ } acc -> + if acc then not (String.equal job.Builder.name j.Builder.name) else acc) + t.schedule true + then + let now = Ptime_clock.now () in + schedule_job t now p j; + ignore (dump t); + Lwt.return (Ok ()) + else begin + Logs.err (fun m -> m "job with same name already in schedule"); + Lwt.return (Error (`Msg "job name already used")) + end + | Builder.Unschedule name -> + Logs.app (fun m -> m "unschedule %s" name); + let schedule = + let s = S.create ~dummy 13 in + S.iter (fun ({ Builder.job ; _ } as si) -> + if not (String.equal job.Builder.name name) then + S.add s si + else ()) t.schedule; + s + and queue = + let q = Queue.create () in + Queue.iter (fun job -> + if not (String.equal job.Builder.name name) then + Queue.add job q + else + ()) + t.queue; + q + in + t.schedule <- schedule; + t.queue <- queue; + ignore (dump t); + Lwt.return (Ok ()) + | Builder.Info -> + Logs.app (fun m -> m "info"); + let reply = + let schedule = S.fold (fun s acc -> s :: acc) t.schedule [] + and queue = List.rev (Queue.fold (fun acc j -> j :: acc) [] t.queue) + and running = + UM.fold (fun uuid (started, job, _, _) acc -> + (started, uuid, job) :: acc) + t.running [] + in + Builder.{ schedule ; queue ; running } + in + write_cmd fd (Builder.Info_reply reply) + | Builder.Observe id -> + (* two cases: still running or already done *) + begin match UM.find_opt id t.running with + | Some (_, _, cond, out) -> + let open Lwt.Infix in + Lwt_list.iter_s (fun (_, l) -> + write_cmd fd (Builder.Output (id, l)) >|= ignore) out >>= fun () -> + let rec more () = + Lwt_condition.wait cond >>= fun data -> + write_cmd fd (Builder.Output (id, data)) >>= fun _ -> + more () + in + more () + | None -> + let console_file = Fpath.(t.dir / Uuidm.to_string id / "console.log") in + match Bos.OS.File.read_lines console_file with + | Error _ -> Lwt.return (Ok ()) + | Ok data -> + let open Lwt.Infix in + Lwt_list.iter_s (fun l -> + write_cmd fd (Builder.Output (id, l)) >|= ignore) data >|= fun () -> + Ok () + end + | cmd -> + Logs.err (fun m -> m "unexpected %a" Builder.pp_cmd cmd); + Lwt_result.lift (Error (`Msg "bad communication")) + end + | cmd -> + Logs.err (fun m -> m "expected client hello with matching version, got %a" + Builder.pp_cmd cmd); + Lwt_result.lift (Error (`Msg "bad communication")) + +let jump () ip port dir = + Lwt_main.run + (Sys.(set_signal sigpipe Signal_ignore); + let d = Fpath.v dir in + Lwt_result.lift (restore d) >>= fun state -> + (match state with + | Ok s -> Lwt.return s | Error `Msg m -> Lwt.fail_with m) >>= fun state -> + let modified = schedule state in + ignore (if modified then dump state else Ok ()); + let s = Lwt_unix.(socket PF_INET SOCK_STREAM 0) in + Lwt_unix.(setsockopt s SO_REUSEADDR true); + Lwt_unix.(bind s (ADDR_INET (Unix.inet_addr_of_string ip, port))) >>= fun () -> + Lwt_unix.listen s 10; + let _ev = + Lwt_engine.on_timer 60. true (fun _ev -> + let modified = schedule state in + ignore (if modified then dump state else Ok ())) + in + Lwt.catch (fun () -> + let rec loop () = + Lwt_unix.accept s >>= fun (fd, addr) -> + Lwt.async (fun () -> + handle state fd addr >>= fun _ -> + Lwt_unix.close fd); + loop () + in + loop ()) + (fun e -> + Lwt.return (Logs.err (fun m -> m "exception %s, shutting down" + (Printexc.to_string e))))); + Ok () + +let setup_log style_renderer level = + Fmt_tty.setup_std_outputs ?style_renderer (); + Logs.set_level level; + Logs.set_reporter (Logs_fmt.reporter ~dst:Format.std_formatter ()) + +open Cmdliner + +let setup_log = + Term.(const setup_log + $ Fmt_cli.style_renderer () + $ Logs_cli.level ()) + +let port = + let doc = "TCP listen port" in + Arg.(value & opt int 1234 & info [ "port" ] ~doc) + +let ip = + let doc = "Listen IP" in + Arg.(value & opt string "127.0.0.1" & info [ "ip" ] ~doc) + +let dir = + let doc = "Directory for persistent data (defaults to /var/db/builder)" in + Arg.(value & opt dir "/var/db/builder" & info [ "dir" ] ~doc) + +let cmd = + Term.(term_result (const jump $ setup_log $ ip $ port $ dir)), + Term.info "builder-server" ~version:Builder.version + +let () = match Term.eval cmd with `Ok () -> exit 0 | _ -> exit 1 diff --git a/builder.opam b/builder.opam new file mode 100644 index 0000000..e69de29 diff --git a/dune-project b/dune-project new file mode 100644 index 0000000..d376ae4 --- /dev/null +++ b/dune-project @@ -0,0 +1,2 @@ +(lang dune 2.0) +(name builder) diff --git a/lib/builder.ml b/lib/builder.ml new file mode 100644 index 0000000..8e11f3e --- /dev/null +++ b/lib/builder.ml @@ -0,0 +1,366 @@ +let src = Logs.Src.create "builder" ~doc:"Builder" +module Log = (val Logs.src_log src : Logs.LOG) + +open Rresult.R.Infix + +type data = (Fpath.t * string) list + +let pp_data ppf xs = + Fmt.(list ~sep:(unit "@.") + (pair ~sep:(unit ": ") Fpath.pp int)) + ppf + (List.map (fun (f, s) -> f, String.length s) xs) + +type job = { + name : string ; + script : string ; + files : data ; +} + +let pp_job ppf { name ; script ; files } = + Fmt.pf ppf "name %s, script %d, files %a" name (String.length script) + pp_data files + +type execution_result = + | Exited of int + | Signalled of int + | Stopped of int + | Msg of string + +let pp_execution_result ppf = function + | Exited i -> Fmt.pf ppf "exited %d" i + | Signalled i -> Fmt.pf ppf "signalled %d" i + | Stopped i -> Fmt.pf ppf "stopped %d" i + | Msg m -> Fmt.pf ppf "execution aborted: %s" m + +type period = Hourly | Daily | Weekly + +let pp_period ppf = function + | Hourly -> Fmt.string ppf "hourly" + | Daily -> Fmt.string ppf "daily" + | Weekly -> Fmt.string ppf "weekly" + +type schedule_item = { + next : Ptime.t ; + period : period ; + job : job ; +} + +let pp_schedule_item ppf { next ; period ; job } = + Fmt.pf ppf "next %a, %a: %a" (Ptime.pp_rfc3339 ()) next + pp_period period pp_job job + +type info = { + schedule : schedule_item list ; + queue : job list ; + running : (Ptime.t * Uuidm.t * job) list ; +} + +let triple ~sep pa pb pc ppf (va, vb, vc)= + Fmt.pair ~sep pa (Fmt.pair ~sep pb pc) ppf + (va, (vb, vc)) + +let pp_info ppf { schedule ; queue ; running } = + let pp_time = Ptime.pp_rfc3339 () in + Fmt.pf ppf "schedule: %a@.queue: %a@.running: %a@." + Fmt.(list ~sep:(unit ";@ ") pp_schedule_item) schedule + Fmt.(list ~sep:(unit ";@ ") pp_job) queue + Fmt.(list ~sep:(unit ";@ ") + (triple ~sep:(unit ",@,") pp_time Uuidm.pp pp_job)) running + +type cmd = + | Client_hello of int + | Server_hello of int + | Job_requested + | Job_schedule of Uuidm.t * job + | Job_finished of Uuidm.t * execution_result * data + | Output of Uuidm.t * string + | Schedule of period * job + | Unschedule of string + | Info + | Info_reply of info + | Observe of Uuidm.t + +let cmds = 11 + +let version = + Fmt.strf "version %%VERSION%% protocol version %d" cmds + +let pp_cmd ppf = function + | Client_hello max -> Fmt.pf ppf "client hello (max %d)" max + | Server_hello max -> Fmt.pf ppf "server hello (max %d)" max + | Job_requested -> Fmt.string ppf "job request" + | Job_schedule (uuid, job) -> + Fmt.pf ppf "[%a] job schedule %a" Uuidm.pp uuid pp_job job + | Job_finished (uuid, result, data) -> + Fmt.pf ppf "[%a] job finished with %a: %a" Uuidm.pp uuid + pp_execution_result result pp_data data + | Output (uuid, data) -> Fmt.pf ppf "[%a] %S" Uuidm.pp uuid data + | Schedule (period, job) -> + Fmt.pf ppf "schedule at %a: %a" pp_period period pp_job job + | Unschedule job_name -> Fmt.pf ppf "unschedule %s" job_name + | Info -> Fmt.string ppf "info" + | Info_reply info -> Fmt.pf ppf "info: %a" pp_info info + | Observe id -> Fmt.pf ppf "observe %a" Uuidm.pp id + +type state_item = + | Job of job + | Schedule of schedule_item + +let pp_state_item ppf = function + | Job j -> Fmt.pf ppf "job %a" pp_job j + | Schedule s -> Fmt.pf ppf "schedule %a" pp_schedule_item s + +type state = state_item list + +let pp_state = Fmt.(list ~sep:(unit ";@ ") pp_state_item) + +module Asn = struct + let guard p err = if p then Ok () else Error err + + let decode_strict codec cs = + match Asn.decode codec cs with + | Ok (a, cs) -> + guard (Cstruct.len cs = 0) (`Msg "trailing bytes") >>= fun () -> + Ok a + | Error (`Parse msg) -> Error (`Msg msg) + + let projections_of asn = + let c = Asn.codec Asn.der asn in + (decode_strict c, Asn.encode c) + + let data = + let f (path, value) = + match Fpath.of_string path with + | Ok p -> p, value + | Error `Msg msg -> Asn.S.error (`Parse msg) + and g (path, value) = + Fpath.to_string path, value + in + Asn.S.(sequence_of + (map f g + (sequence2 + (required ~label:"path" utf8_string) + (required ~label:"data" utf8_string)))) + + let job = + let f (name, script, files) = + { name ; script ; files } + and g { name ; script ; files } = + name, script, files + in + Asn.S.(map f g (sequence3 + (required ~label:"name" utf8_string) + (required ~label:"script" utf8_string) + (required ~label:"files" data))) + + let period = + let f = function + | `C1 () -> Hourly + | `C2 () -> Daily + | `C3 () -> Weekly + and g = function + | Hourly -> `C1 () + | Daily -> `C2 () + | Weekly -> `C3 () + in + Asn.S.(map f g + (choice3 (explicit 0 null) (explicit 1 null) (explicit 2 null))) + + let schedule = + let f (next, period, job) = {next; period; job} + and g {next; period; job} = (next, period, job) + in + Asn.S.(map f g (sequence3 + (required ~label:"next" utc_time) + (required ~label:"period" period) + (required ~label:"job" job))) + + let state_item = + let f = function + | `C1 j -> Job j + | `C2 s -> Schedule s + and g = function + | Job j -> `C1 j + | Schedule s -> `C2 s + in + Asn.S.(map f g (choice2 + (explicit 0 job) + (explicit 1 schedule))) + + let state_of_cs, state_to_cs = projections_of (Asn.S.sequence_of state_item) + + let uuid = + let f s = + match Uuidm.of_bytes s with + | None -> Asn.S.error (`Parse "couldn't decode UUID") + | Some s -> s + and g uuid = Uuidm.to_bytes uuid + in + Asn.S.(map f g utf8_string) + + let res = + let f = function + | `C1 i -> Exited i + | `C2 i -> Signalled i + | `C3 i -> Stopped i + | `C4 s -> Msg s + and g = function + | Exited i -> `C1 i + | Signalled i -> `C2 i + | Stopped i -> `C3 i + | Msg s -> `C4 s + in + Asn.S.(map f g + (choice4 + (explicit 0 int) + (explicit 1 int) + (explicit 2 int) + (explicit 3 utf8_string))) + + let exec = + let f = function + | `C1 (job, uuid, out, (created, finished), res, data) -> + job, uuid, out, created, finished, res, data + | `C2 () -> assert false + and g (job, uuid, out, created, finished, res, data) = + `C1 (job, uuid, out, (created, finished), res, data) + in + Asn.S.(map f g + (choice2 + (explicit 0 + (sequence6 + (required ~label:"job" job) + (required ~label:"uuid" uuid) + (required ~label:"console" + (sequence_of (sequence2 + (required ~label:"delta" int) + (required ~label:"data" utf8_string)))) + (required ~label:"timestamps" + (sequence2 + (required ~label:"started" utc_time) + (required ~label:"finished" utc_time))) + (required ~label:"result" res) + (required ~label:"output" data))) + (explicit 1 null))) + + let exec_of_cs, exec_to_cs = projections_of exec + + let cmd = + let f = function + | `C1 `C1 max -> Client_hello max + | `C1 `C2 max -> Server_hello max + | `C1 `C3 (uuid, job) -> Job_schedule (uuid, job) + | `C1 `C4 (uuid, res, data) -> Job_finished (uuid, res, data) + | `C1 `C5 (uuid, out) -> Output (uuid, out) + | `C1 `C6 (period, job) -> Schedule (period, job) + | `C2 `C1 () -> Info + | `C2 `C2 (schedule, queue, running) -> + Info_reply { schedule ; queue ; running } + | `C2 `C3 () -> Job_requested + | `C2 `C4 jn -> Unschedule jn + | `C2 `C5 id -> Observe id + and g = function + | Client_hello max -> `C1 (`C1 max) + | Server_hello max -> `C1 (`C2 max) + | Job_schedule (uuid, job) -> `C1 (`C3 (uuid, job)) + | Job_finished (uuid, res, data) -> `C1 (`C4 (uuid, res, data)) + | Output (uuid, out) -> `C1 (`C5 (uuid, out)) + | Schedule (period, job) -> `C1 (`C6 (period, job)) + | Info -> `C2 (`C1 ()) + | Info_reply { schedule ; queue ; running } -> + `C2 (`C2 (schedule, queue, running)) + | Job_requested -> `C2 (`C3 ()) + | Unschedule jn -> `C2 (`C4 jn) + | Observe id -> `C2 (`C5 id) + in + Asn.S.(map f g + (choice2 + (choice6 + (explicit 0 int) + (explicit 1 int) + (explicit 2 (sequence2 + (required ~label:"uuid" uuid) + (required ~label:"job" job))) + (explicit 3 (sequence3 + (required ~label:"uuid" uuid) + (required ~label:"result" res) + (required ~label:"data" data))) + (explicit 4 (sequence2 + (required ~label:"uuid" uuid) + (required ~label:"output" utf8_string))) + (explicit 5 (sequence2 + (required ~label:"period" period) + (required ~label:"job" job)))) + (choice5 + (explicit 6 null) + (explicit 7 (sequence3 + (required ~label:"schedule" (sequence_of schedule)) + (required ~label:"queue" (sequence_of job)) + (required ~label:"running" + (sequence_of + (sequence3 + (required ~label:"started" utc_time) + (required ~label:"uuid" uuid) + (required ~label:"job" job)))))) + (explicit 8 null) + (explicit 9 utf8_string) + (explicit 10 uuid) + ))) + + let cmd_of_cs, cmd_to_cs = projections_of cmd +end + +let rec ign_intr f v = + try f v with Unix.Unix_error (Unix.EINTR, _, _) -> ign_intr f v + +let read fd = + try + let rec r b ?(off = 0) l = + if l = 0 then + Ok () + else + let read = ign_intr (Unix.read fd b off) l in + if read = 0 then + Error (`Msg "end of file") + else + r b ~off:(read + off) (l - read) + in + let bl = Bytes.create 8 in + r bl 8 >>= fun () -> + let l = Cstruct.BE.get_uint64 (Cstruct.of_bytes bl) 0 in + let l_int = Int64.to_int l in (* TODO *) + let b = Bytes.create l_int in + r b l_int >>= fun () -> + Ok (Cstruct.of_bytes b) + with + Unix.Unix_error (err, f, _) -> + Log.err (fun m -> m "Unix error in %s: %s" f (Unix.error_message err)); + Error (`Msg "unix error in read") + +let read_cmd fd = + read fd >>= fun data -> + Asn.cmd_of_cs data + +let write fd data = + try + let rec w b ?(off = 0) l = + if l = 0 then + () + else + let written = ign_intr (Unix.write fd b off) l in + w b ~off:(written + off) (l - written) + in + let csl = Cstruct.create 8 in + Cstruct.BE.set_uint64 csl 0 (Int64.of_int (Cstruct.len data)); + w (Cstruct.to_bytes csl) 8; + w (Cstruct.to_bytes data) (Cstruct.len data); + Ok () + with + Unix.Unix_error (err, f, _) -> + Log.err (fun m -> m "Unix error in %s: %s" f (Unix.error_message err)); + Error (`Msg "unix error in write") + +let write_cmd fd cmd = + let data = Asn.cmd_to_cs cmd in + write fd data diff --git a/lib/dune b/lib/dune new file mode 100644 index 0000000..79e2ce0 --- /dev/null +++ b/lib/dune @@ -0,0 +1,4 @@ +(library + (name builder) + (public_name builder) + (libraries unix rresult fpath fmt logs cstruct asn1-combinators uuidm))