builder/lib/builder.ml

367 lines
11 KiB
OCaml

let src = Logs.Src.create "builder" ~doc:"Builder"
module Log = (val Logs.src_log src : Logs.LOG)
open Rresult.R.Infix
type data = (Fpath.t * string) list
let pp_data ppf xs =
Fmt.(list ~sep:(unit "@.")
(pair ~sep:(unit ": ") Fpath.pp int))
ppf
(List.map (fun (f, s) -> f, String.length s) xs)
type job = {
name : string ;
script : string ;
files : data ;
}
let pp_job ppf { name ; script ; files } =
Fmt.pf ppf "name %s, script %d, files %a" name (String.length script)
pp_data files
type execution_result =
| Exited of int
| Signalled of int
| Stopped of int
| Msg of string
let pp_execution_result ppf = function
| Exited i -> Fmt.pf ppf "exited %d" i
| Signalled i -> Fmt.pf ppf "signalled %d" i
| Stopped i -> Fmt.pf ppf "stopped %d" i
| Msg m -> Fmt.pf ppf "execution aborted: %s" m
type period = Hourly | Daily | Weekly
let pp_period ppf = function
| Hourly -> Fmt.string ppf "hourly"
| Daily -> Fmt.string ppf "daily"
| Weekly -> Fmt.string ppf "weekly"
type schedule_item = {
next : Ptime.t ;
period : period ;
job : job ;
}
let pp_schedule_item ppf { next ; period ; job } =
Fmt.pf ppf "next %a, %a: %a" (Ptime.pp_rfc3339 ()) next
pp_period period pp_job job
type info = {
schedule : schedule_item list ;
queue : job list ;
running : (Ptime.t * Uuidm.t * job) list ;
}
let triple ~sep pa pb pc ppf (va, vb, vc)=
Fmt.pair ~sep pa (Fmt.pair ~sep pb pc) ppf
(va, (vb, vc))
let pp_info ppf { schedule ; queue ; running } =
let pp_time = Ptime.pp_rfc3339 () in
Fmt.pf ppf "schedule: %a@.queue: %a@.running: %a@."
Fmt.(list ~sep:(unit ";@ ") pp_schedule_item) schedule
Fmt.(list ~sep:(unit ";@ ") pp_job) queue
Fmt.(list ~sep:(unit ";@ ")
(triple ~sep:(unit ",@,") pp_time Uuidm.pp pp_job)) running
type cmd =
| Client_hello of int
| Server_hello of int
| Job_requested
| Job_schedule of Uuidm.t * job
| Job_finished of Uuidm.t * execution_result * data
| Output of Uuidm.t * string
| Schedule of period * job
| Unschedule of string
| Info
| Info_reply of info
| Observe of Uuidm.t
let cmds = 11
let version =
Fmt.strf "version %%VERSION%% protocol version %d" cmds
let pp_cmd ppf = function
| Client_hello max -> Fmt.pf ppf "client hello (max %d)" max
| Server_hello max -> Fmt.pf ppf "server hello (max %d)" max
| Job_requested -> Fmt.string ppf "job request"
| Job_schedule (uuid, job) ->
Fmt.pf ppf "[%a] job schedule %a" Uuidm.pp uuid pp_job job
| Job_finished (uuid, result, data) ->
Fmt.pf ppf "[%a] job finished with %a: %a" Uuidm.pp uuid
pp_execution_result result pp_data data
| Output (uuid, data) -> Fmt.pf ppf "[%a] %S" Uuidm.pp uuid data
| Schedule (period, job) ->
Fmt.pf ppf "schedule at %a: %a" pp_period period pp_job job
| Unschedule job_name -> Fmt.pf ppf "unschedule %s" job_name
| Info -> Fmt.string ppf "info"
| Info_reply info -> Fmt.pf ppf "info: %a" pp_info info
| Observe id -> Fmt.pf ppf "observe %a" Uuidm.pp id
type state_item =
| Job of job
| Schedule of schedule_item
let pp_state_item ppf = function
| Job j -> Fmt.pf ppf "job %a" pp_job j
| Schedule s -> Fmt.pf ppf "schedule %a" pp_schedule_item s
type state = state_item list
let pp_state = Fmt.(list ~sep:(unit ";@ ") pp_state_item)
module Asn = struct
let guard p err = if p then Ok () else Error err
let decode_strict codec cs =
match Asn.decode codec cs with
| Ok (a, cs) ->
guard (Cstruct.len cs = 0) (`Msg "trailing bytes") >>= fun () ->
Ok a
| Error (`Parse msg) -> Error (`Msg msg)
let projections_of asn =
let c = Asn.codec Asn.der asn in
(decode_strict c, Asn.encode c)
let data =
let f (path, value) =
match Fpath.of_string path with
| Ok p -> p, value
| Error `Msg msg -> Asn.S.error (`Parse msg)
and g (path, value) =
Fpath.to_string path, value
in
Asn.S.(sequence_of
(map f g
(sequence2
(required ~label:"path" utf8_string)
(required ~label:"data" utf8_string))))
let job =
let f (name, script, files) =
{ name ; script ; files }
and g { name ; script ; files } =
name, script, files
in
Asn.S.(map f g (sequence3
(required ~label:"name" utf8_string)
(required ~label:"script" utf8_string)
(required ~label:"files" data)))
let period =
let f = function
| `C1 () -> Hourly
| `C2 () -> Daily
| `C3 () -> Weekly
and g = function
| Hourly -> `C1 ()
| Daily -> `C2 ()
| Weekly -> `C3 ()
in
Asn.S.(map f g
(choice3 (explicit 0 null) (explicit 1 null) (explicit 2 null)))
let schedule =
let f (next, period, job) = {next; period; job}
and g {next; period; job} = (next, period, job)
in
Asn.S.(map f g (sequence3
(required ~label:"next" utc_time)
(required ~label:"period" period)
(required ~label:"job" job)))
let state_item =
let f = function
| `C1 j -> Job j
| `C2 s -> Schedule s
and g = function
| Job j -> `C1 j
| Schedule s -> `C2 s
in
Asn.S.(map f g (choice2
(explicit 0 job)
(explicit 1 schedule)))
let state_of_cs, state_to_cs = projections_of (Asn.S.sequence_of state_item)
let uuid =
let f s =
match Uuidm.of_bytes s with
| None -> Asn.S.error (`Parse "couldn't decode UUID")
| Some s -> s
and g uuid = Uuidm.to_bytes uuid
in
Asn.S.(map f g utf8_string)
let res =
let f = function
| `C1 i -> Exited i
| `C2 i -> Signalled i
| `C3 i -> Stopped i
| `C4 s -> Msg s
and g = function
| Exited i -> `C1 i
| Signalled i -> `C2 i
| Stopped i -> `C3 i
| Msg s -> `C4 s
in
Asn.S.(map f g
(choice4
(explicit 0 int)
(explicit 1 int)
(explicit 2 int)
(explicit 3 utf8_string)))
let exec =
let f = function
| `C1 (job, uuid, out, (created, finished), res, data) ->
job, uuid, out, created, finished, res, data
| `C2 () -> assert false
and g (job, uuid, out, created, finished, res, data) =
`C1 (job, uuid, out, (created, finished), res, data)
in
Asn.S.(map f g
(choice2
(explicit 0
(sequence6
(required ~label:"job" job)
(required ~label:"uuid" uuid)
(required ~label:"console"
(sequence_of (sequence2
(required ~label:"delta" int)
(required ~label:"data" utf8_string))))
(required ~label:"timestamps"
(sequence2
(required ~label:"started" utc_time)
(required ~label:"finished" utc_time)))
(required ~label:"result" res)
(required ~label:"output" data)))
(explicit 1 null)))
let exec_of_cs, exec_to_cs = projections_of exec
let cmd =
let f = function
| `C1 `C1 max -> Client_hello max
| `C1 `C2 max -> Server_hello max
| `C1 `C3 (uuid, job) -> Job_schedule (uuid, job)
| `C1 `C4 (uuid, res, data) -> Job_finished (uuid, res, data)
| `C1 `C5 (uuid, out) -> Output (uuid, out)
| `C1 `C6 (period, job) -> Schedule (period, job)
| `C2 `C1 () -> Info
| `C2 `C2 (schedule, queue, running) ->
Info_reply { schedule ; queue ; running }
| `C2 `C3 () -> Job_requested
| `C2 `C4 jn -> Unschedule jn
| `C2 `C5 id -> Observe id
and g = function
| Client_hello max -> `C1 (`C1 max)
| Server_hello max -> `C1 (`C2 max)
| Job_schedule (uuid, job) -> `C1 (`C3 (uuid, job))
| Job_finished (uuid, res, data) -> `C1 (`C4 (uuid, res, data))
| Output (uuid, out) -> `C1 (`C5 (uuid, out))
| Schedule (period, job) -> `C1 (`C6 (period, job))
| Info -> `C2 (`C1 ())
| Info_reply { schedule ; queue ; running } ->
`C2 (`C2 (schedule, queue, running))
| Job_requested -> `C2 (`C3 ())
| Unschedule jn -> `C2 (`C4 jn)
| Observe id -> `C2 (`C5 id)
in
Asn.S.(map f g
(choice2
(choice6
(explicit 0 int)
(explicit 1 int)
(explicit 2 (sequence2
(required ~label:"uuid" uuid)
(required ~label:"job" job)))
(explicit 3 (sequence3
(required ~label:"uuid" uuid)
(required ~label:"result" res)
(required ~label:"data" data)))
(explicit 4 (sequence2
(required ~label:"uuid" uuid)
(required ~label:"output" utf8_string)))
(explicit 5 (sequence2
(required ~label:"period" period)
(required ~label:"job" job))))
(choice5
(explicit 6 null)
(explicit 7 (sequence3
(required ~label:"schedule" (sequence_of schedule))
(required ~label:"queue" (sequence_of job))
(required ~label:"running"
(sequence_of
(sequence3
(required ~label:"started" utc_time)
(required ~label:"uuid" uuid)
(required ~label:"job" job))))))
(explicit 8 null)
(explicit 9 utf8_string)
(explicit 10 uuid)
)))
let cmd_of_cs, cmd_to_cs = projections_of cmd
end
let rec ign_intr f v =
try f v with Unix.Unix_error (Unix.EINTR, _, _) -> ign_intr f v
let read fd =
try
let rec r b ?(off = 0) l =
if l = 0 then
Ok ()
else
let read = ign_intr (Unix.read fd b off) l in
if read = 0 then
Error (`Msg "end of file")
else
r b ~off:(read + off) (l - read)
in
let bl = Bytes.create 8 in
r bl 8 >>= fun () ->
let l = Cstruct.BE.get_uint64 (Cstruct.of_bytes bl) 0 in
let l_int = Int64.to_int l in (* TODO *)
let b = Bytes.create l_int in
r b l_int >>= fun () ->
Ok (Cstruct.of_bytes b)
with
Unix.Unix_error (err, f, _) ->
Log.err (fun m -> m "Unix error in %s: %s" f (Unix.error_message err));
Error (`Msg "unix error in read")
let read_cmd fd =
read fd >>= fun data ->
Asn.cmd_of_cs data
let write fd data =
try
let rec w b ?(off = 0) l =
if l = 0 then
()
else
let written = ign_intr (Unix.write fd b off) l in
w b ~off:(written + off) (l - written)
in
let csl = Cstruct.create 8 in
Cstruct.BE.set_uint64 csl 0 (Int64.of_int (Cstruct.len data));
w (Cstruct.to_bytes csl) 8;
w (Cstruct.to_bytes data) (Cstruct.len data);
Ok ()
with
Unix.Unix_error (err, f, _) ->
Log.err (fun m -> m "Unix error in %s: %s" f (Unix.error_message err));
Error (`Msg "unix error in write")
let write_cmd fd cmd =
let data = Asn.cmd_to_cs cmd in
write fd data