diff --git a/app/vmm_log.ml b/app/vmm_log.ml index 5578e2e..8667225 100644 --- a/app/vmm_log.ml +++ b/app/vmm_log.ml @@ -12,6 +12,9 @@ open Lwt.Infix let my_version = `AV2 +let entry_to_ring (ts, event) = + (ts, Cstruct.to_string (Vmm_asn.log_entry_to_cstruct (ts, event))) + let broadcast prefix data t = Lwt_list.fold_left_s (fun t (id, s) -> Vmm_lwt.write_wire s data >|= function @@ -29,27 +32,45 @@ let write_complete s cs = in w 0 +let read_from_file file = + Lwt_unix.stat file >>= fun stat -> + let size = stat.Lwt_unix.st_size in + Lwt_unix.openfile file Lwt_unix.[O_RDONLY] 0 >>= fun fd -> + let buf = Bytes.create size in + let rec read off = + Lwt_unix.read fd buf off (size - off) >>= fun bytes -> + if bytes + off = size then + Lwt.return_unit + else + read (bytes + off) + in + read 0 >>= fun () -> + let logs = Vmm_asn.logs_of_disk my_version (Cstruct.of_bytes buf) in + Vmm_lwt.safe_close fd >|= fun () -> + List.rev logs + let write_to_file file = let mvar = Lwt_mvar.create_empty () in - let rec write_loop ?(retry = true) ?data ?fd () = + let rec write_loop ?(retry = true) ?log_entry ?fd () = match fd with | None when retry -> Lwt_unix.openfile file Lwt_unix.[O_APPEND;O_CREAT;O_WRONLY] 0o600 >>= fun fd -> - write_loop ~retry:false ?data ~fd () + write_loop ~retry:false ?log_entry ~fd () | None -> Logs.err (fun m -> m "retry is false, exiting") ; Lwt.return_unit | Some fd -> - (match data with + (match log_entry with | None -> Lwt_mvar.take mvar - | Some d -> Lwt.return d) >>= fun data -> + | Some l -> Lwt.return l) >>= fun log_entry -> + let data = Vmm_asn.log_to_disk my_version log_entry in Lwt.catch (fun () -> write_complete fd data >|= fun () -> (true, None, Some fd)) (fun e -> Logs.err (fun m -> m "exception %s while writing" (Printexc.to_string e)) ; Vmm_lwt.safe_close fd >|= fun () -> - (retry, Some data, None)) >>= fun (retry, data, fd) -> - write_loop ~retry ?data ?fd () + (retry, Some log_entry, None)) >>= fun (retry, log_entry, fd) -> + write_loop ~retry ?log_entry ?fd () in mvar, write_loop @@ -63,8 +84,7 @@ let send_history s ring id ts = in let res = List.fold_left (fun acc (_, x) -> - let cs = Cstruct.of_string x in - match Vmm_asn.log_entry_of_cstruct cs with + match Vmm_asn.log_entry_of_cstruct (Cstruct.of_string x) with | Ok (ts, event) -> let sub = Vmm_core.Log.name event in if Vmm_core.is_sub_id ~super:id ~sub @@ -80,12 +100,10 @@ let send_history s ring id ts = let header = Vmm_commands.{ version = my_version ; sequence = 0L ; id } in Vmm_lwt.write_wire s (header, `Data (`Log_data (ts, event))) | Error e -> Lwt.return (Error e)) - (Ok ()) res + (Ok ()) (List.rev res) let handle mvar ring s addr () = Logs.info (fun m -> m "handling connection from %a" Vmm_lwt.pp_sockaddr addr) ; - let str = Fmt.strf "%a: CONNECT\n" (Ptime.pp_human ()) (Ptime_clock.now ()) in - Lwt_mvar.put mvar (Cstruct.of_string str) >>= fun () -> let rec loop () = Vmm_lwt.read_wire s >>= function | Error (`Msg e) -> @@ -94,17 +112,16 @@ let handle mvar ring s addr () = | Error _ -> Logs.err (fun m -> m "exception while reading") ; Lwt.return_unit - | Ok (hdr, `Data (`Log_data (ts, event))) -> + | Ok (hdr, `Data (`Log_data entry)) -> if not (Vmm_commands.version_eq hdr.Vmm_commands.version my_version) then begin Logs.warn (fun m -> m "unsupported version") ; Lwt.return_unit end else begin - let data = Vmm_asn.log_entry_to_cstruct (ts, event) in - Vmm_ring.write ring (ts, Cstruct.to_string data) ; - Lwt_mvar.put mvar data >>= fun () -> + Vmm_ring.write ring (entry_to_ring entry) ; + Lwt_mvar.put mvar entry >>= fun () -> let data' = let header = Vmm_commands.{ version = my_version ; sequence = 0L ; id = hdr.Vmm_commands.id } in - (header, `Data (`Log_data (ts, event))) + (header, `Data (`Log_data entry)) in broadcast hdr.Vmm_commands.id data' !tree >>= fun tree' -> tree := tree' ; @@ -152,7 +169,12 @@ let jump _ file sock = Lwt_unix.(bind s (ADDR_UNIX sock)) >>= fun () -> Lwt_unix.listen s 1 ; let ring = Vmm_ring.create () in + read_from_file file >>= fun entries -> + List.iter (Vmm_ring.write ring) (List.map entry_to_ring entries) ; let mvar, writer = write_to_file file in + let start = Ptime_clock.now (), `Startup in + Lwt_mvar.put mvar start >>= fun () -> + Vmm_ring.write ring (entry_to_ring start) ; let rec loop () = Lwt_unix.accept s >>= fun (cs, addr) -> Lwt.async (handle mvar ring cs addr) ; diff --git a/src/vmm_asn.ml b/src/vmm_asn.ml index c1f6ec2..cde00f6 100644 --- a/src/vmm_asn.ml +++ b/src/vmm_asn.ml @@ -435,6 +435,33 @@ let log_entry = let log_entry_of_cstruct, log_entry_to_cstruct = projections_of log_entry +let log_disk = + Asn.S.(sequence2 + (required ~label:"version" version) + (required ~label:"entry" log_entry)) + +let log_disk_of_cstruct, log_disk_to_cstruct = projections_of log_disk + +let log_to_disk version entry = + log_disk_to_cstruct (version, entry) + +let logs_of_disk version buf = + let rec next acc buf = + match Asn.decode (Asn.codec Asn.der log_disk) buf with + | Ok ((version', entry), cs) -> + let acc' = + if Vmm_commands.version_eq version version' then + entry :: acc + else + acc + in + next acc' cs + | Error (`Parse msg) -> + Logs.warn (fun m -> m "parse error %s while parsing log" msg) ; + acc (* ignore *) + in + next [] buf + type cert_extension = version * t let cert_extension = diff --git a/src/vmm_asn.mli b/src/vmm_asn.mli index 8dc17e1..6310794 100644 --- a/src/vmm_asn.mli +++ b/src/vmm_asn.mli @@ -17,6 +17,10 @@ val log_entry_to_cstruct : Log.t -> Cstruct.t val log_entry_of_cstruct : Cstruct.t -> (Log.t, [> `Msg of string ]) result +val log_to_disk : Vmm_commands.version -> Log.t -> Cstruct.t + +val logs_of_disk : Vmm_commands.version -> Cstruct.t -> Log.t list + type cert_extension = Vmm_commands.version * Vmm_commands.t val cert_extension_of_cstruct : Cstruct.t -> (cert_extension, [> `Msg of string ]) result