include version in log_entries on disk, read log file on startup (and write events to ring store)
This commit is contained in:
parent
cdae37b0bf
commit
b55281d1e5
|
@ -12,6 +12,9 @@ open Lwt.Infix
|
||||||
|
|
||||||
let my_version = `AV2
|
let my_version = `AV2
|
||||||
|
|
||||||
|
let entry_to_ring (ts, event) =
|
||||||
|
(ts, Cstruct.to_string (Vmm_asn.log_entry_to_cstruct (ts, event)))
|
||||||
|
|
||||||
let broadcast prefix data t =
|
let broadcast prefix data t =
|
||||||
Lwt_list.fold_left_s (fun t (id, s) ->
|
Lwt_list.fold_left_s (fun t (id, s) ->
|
||||||
Vmm_lwt.write_wire s data >|= function
|
Vmm_lwt.write_wire s data >|= function
|
||||||
|
@ -29,27 +32,45 @@ let write_complete s cs =
|
||||||
in
|
in
|
||||||
w 0
|
w 0
|
||||||
|
|
||||||
|
let read_from_file file =
|
||||||
|
Lwt_unix.stat file >>= fun stat ->
|
||||||
|
let size = stat.Lwt_unix.st_size in
|
||||||
|
Lwt_unix.openfile file Lwt_unix.[O_RDONLY] 0 >>= fun fd ->
|
||||||
|
let buf = Bytes.create size in
|
||||||
|
let rec read off =
|
||||||
|
Lwt_unix.read fd buf off (size - off) >>= fun bytes ->
|
||||||
|
if bytes + off = size then
|
||||||
|
Lwt.return_unit
|
||||||
|
else
|
||||||
|
read (bytes + off)
|
||||||
|
in
|
||||||
|
read 0 >>= fun () ->
|
||||||
|
let logs = Vmm_asn.logs_of_disk my_version (Cstruct.of_bytes buf) in
|
||||||
|
Vmm_lwt.safe_close fd >|= fun () ->
|
||||||
|
List.rev logs
|
||||||
|
|
||||||
let write_to_file file =
|
let write_to_file file =
|
||||||
let mvar = Lwt_mvar.create_empty () in
|
let mvar = Lwt_mvar.create_empty () in
|
||||||
let rec write_loop ?(retry = true) ?data ?fd () =
|
let rec write_loop ?(retry = true) ?log_entry ?fd () =
|
||||||
match fd with
|
match fd with
|
||||||
| None when retry ->
|
| None when retry ->
|
||||||
Lwt_unix.openfile file Lwt_unix.[O_APPEND;O_CREAT;O_WRONLY] 0o600 >>= fun fd ->
|
Lwt_unix.openfile file Lwt_unix.[O_APPEND;O_CREAT;O_WRONLY] 0o600 >>= fun fd ->
|
||||||
write_loop ~retry:false ?data ~fd ()
|
write_loop ~retry:false ?log_entry ~fd ()
|
||||||
| None ->
|
| None ->
|
||||||
Logs.err (fun m -> m "retry is false, exiting") ;
|
Logs.err (fun m -> m "retry is false, exiting") ;
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
| Some fd ->
|
| Some fd ->
|
||||||
(match data with
|
(match log_entry with
|
||||||
| None -> Lwt_mvar.take mvar
|
| None -> Lwt_mvar.take mvar
|
||||||
| Some d -> Lwt.return d) >>= fun data ->
|
| Some l -> Lwt.return l) >>= fun log_entry ->
|
||||||
|
let data = Vmm_asn.log_to_disk my_version log_entry in
|
||||||
Lwt.catch
|
Lwt.catch
|
||||||
(fun () -> write_complete fd data >|= fun () -> (true, None, Some fd))
|
(fun () -> write_complete fd data >|= fun () -> (true, None, Some fd))
|
||||||
(fun e ->
|
(fun e ->
|
||||||
Logs.err (fun m -> m "exception %s while writing" (Printexc.to_string e)) ;
|
Logs.err (fun m -> m "exception %s while writing" (Printexc.to_string e)) ;
|
||||||
Vmm_lwt.safe_close fd >|= fun () ->
|
Vmm_lwt.safe_close fd >|= fun () ->
|
||||||
(retry, Some data, None)) >>= fun (retry, data, fd) ->
|
(retry, Some log_entry, None)) >>= fun (retry, log_entry, fd) ->
|
||||||
write_loop ~retry ?data ?fd ()
|
write_loop ~retry ?log_entry ?fd ()
|
||||||
in
|
in
|
||||||
mvar, write_loop
|
mvar, write_loop
|
||||||
|
|
||||||
|
@ -63,8 +84,7 @@ let send_history s ring id ts =
|
||||||
in
|
in
|
||||||
let res =
|
let res =
|
||||||
List.fold_left (fun acc (_, x) ->
|
List.fold_left (fun acc (_, x) ->
|
||||||
let cs = Cstruct.of_string x in
|
match Vmm_asn.log_entry_of_cstruct (Cstruct.of_string x) with
|
||||||
match Vmm_asn.log_entry_of_cstruct cs with
|
|
||||||
| Ok (ts, event) ->
|
| Ok (ts, event) ->
|
||||||
let sub = Vmm_core.Log.name event in
|
let sub = Vmm_core.Log.name event in
|
||||||
if Vmm_core.is_sub_id ~super:id ~sub
|
if Vmm_core.is_sub_id ~super:id ~sub
|
||||||
|
@ -80,12 +100,10 @@ let send_history s ring id ts =
|
||||||
let header = Vmm_commands.{ version = my_version ; sequence = 0L ; id } in
|
let header = Vmm_commands.{ version = my_version ; sequence = 0L ; id } in
|
||||||
Vmm_lwt.write_wire s (header, `Data (`Log_data (ts, event)))
|
Vmm_lwt.write_wire s (header, `Data (`Log_data (ts, event)))
|
||||||
| Error e -> Lwt.return (Error e))
|
| Error e -> Lwt.return (Error e))
|
||||||
(Ok ()) res
|
(Ok ()) (List.rev res)
|
||||||
|
|
||||||
let handle mvar ring s addr () =
|
let handle mvar ring s addr () =
|
||||||
Logs.info (fun m -> m "handling connection from %a" Vmm_lwt.pp_sockaddr addr) ;
|
Logs.info (fun m -> m "handling connection from %a" Vmm_lwt.pp_sockaddr addr) ;
|
||||||
let str = Fmt.strf "%a: CONNECT\n" (Ptime.pp_human ()) (Ptime_clock.now ()) in
|
|
||||||
Lwt_mvar.put mvar (Cstruct.of_string str) >>= fun () ->
|
|
||||||
let rec loop () =
|
let rec loop () =
|
||||||
Vmm_lwt.read_wire s >>= function
|
Vmm_lwt.read_wire s >>= function
|
||||||
| Error (`Msg e) ->
|
| Error (`Msg e) ->
|
||||||
|
@ -94,17 +112,16 @@ let handle mvar ring s addr () =
|
||||||
| Error _ ->
|
| Error _ ->
|
||||||
Logs.err (fun m -> m "exception while reading") ;
|
Logs.err (fun m -> m "exception while reading") ;
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
| Ok (hdr, `Data (`Log_data (ts, event))) ->
|
| Ok (hdr, `Data (`Log_data entry)) ->
|
||||||
if not (Vmm_commands.version_eq hdr.Vmm_commands.version my_version) then begin
|
if not (Vmm_commands.version_eq hdr.Vmm_commands.version my_version) then begin
|
||||||
Logs.warn (fun m -> m "unsupported version") ;
|
Logs.warn (fun m -> m "unsupported version") ;
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
end else begin
|
end else begin
|
||||||
let data = Vmm_asn.log_entry_to_cstruct (ts, event) in
|
Vmm_ring.write ring (entry_to_ring entry) ;
|
||||||
Vmm_ring.write ring (ts, Cstruct.to_string data) ;
|
Lwt_mvar.put mvar entry >>= fun () ->
|
||||||
Lwt_mvar.put mvar data >>= fun () ->
|
|
||||||
let data' =
|
let data' =
|
||||||
let header = Vmm_commands.{ version = my_version ; sequence = 0L ; id = hdr.Vmm_commands.id } in
|
let header = Vmm_commands.{ version = my_version ; sequence = 0L ; id = hdr.Vmm_commands.id } in
|
||||||
(header, `Data (`Log_data (ts, event)))
|
(header, `Data (`Log_data entry))
|
||||||
in
|
in
|
||||||
broadcast hdr.Vmm_commands.id data' !tree >>= fun tree' ->
|
broadcast hdr.Vmm_commands.id data' !tree >>= fun tree' ->
|
||||||
tree := tree' ;
|
tree := tree' ;
|
||||||
|
@ -152,7 +169,12 @@ let jump _ file sock =
|
||||||
Lwt_unix.(bind s (ADDR_UNIX sock)) >>= fun () ->
|
Lwt_unix.(bind s (ADDR_UNIX sock)) >>= fun () ->
|
||||||
Lwt_unix.listen s 1 ;
|
Lwt_unix.listen s 1 ;
|
||||||
let ring = Vmm_ring.create () in
|
let ring = Vmm_ring.create () in
|
||||||
|
read_from_file file >>= fun entries ->
|
||||||
|
List.iter (Vmm_ring.write ring) (List.map entry_to_ring entries) ;
|
||||||
let mvar, writer = write_to_file file in
|
let mvar, writer = write_to_file file in
|
||||||
|
let start = Ptime_clock.now (), `Startup in
|
||||||
|
Lwt_mvar.put mvar start >>= fun () ->
|
||||||
|
Vmm_ring.write ring (entry_to_ring start) ;
|
||||||
let rec loop () =
|
let rec loop () =
|
||||||
Lwt_unix.accept s >>= fun (cs, addr) ->
|
Lwt_unix.accept s >>= fun (cs, addr) ->
|
||||||
Lwt.async (handle mvar ring cs addr) ;
|
Lwt.async (handle mvar ring cs addr) ;
|
||||||
|
|
|
@ -435,6 +435,33 @@ let log_entry =
|
||||||
|
|
||||||
let log_entry_of_cstruct, log_entry_to_cstruct = projections_of log_entry
|
let log_entry_of_cstruct, log_entry_to_cstruct = projections_of log_entry
|
||||||
|
|
||||||
|
let log_disk =
|
||||||
|
Asn.S.(sequence2
|
||||||
|
(required ~label:"version" version)
|
||||||
|
(required ~label:"entry" log_entry))
|
||||||
|
|
||||||
|
let log_disk_of_cstruct, log_disk_to_cstruct = projections_of log_disk
|
||||||
|
|
||||||
|
let log_to_disk version entry =
|
||||||
|
log_disk_to_cstruct (version, entry)
|
||||||
|
|
||||||
|
let logs_of_disk version buf =
|
||||||
|
let rec next acc buf =
|
||||||
|
match Asn.decode (Asn.codec Asn.der log_disk) buf with
|
||||||
|
| Ok ((version', entry), cs) ->
|
||||||
|
let acc' =
|
||||||
|
if Vmm_commands.version_eq version version' then
|
||||||
|
entry :: acc
|
||||||
|
else
|
||||||
|
acc
|
||||||
|
in
|
||||||
|
next acc' cs
|
||||||
|
| Error (`Parse msg) ->
|
||||||
|
Logs.warn (fun m -> m "parse error %s while parsing log" msg) ;
|
||||||
|
acc (* ignore *)
|
||||||
|
in
|
||||||
|
next [] buf
|
||||||
|
|
||||||
type cert_extension = version * t
|
type cert_extension = version * t
|
||||||
|
|
||||||
let cert_extension =
|
let cert_extension =
|
||||||
|
|
|
@ -17,6 +17,10 @@ val log_entry_to_cstruct : Log.t -> Cstruct.t
|
||||||
|
|
||||||
val log_entry_of_cstruct : Cstruct.t -> (Log.t, [> `Msg of string ]) result
|
val log_entry_of_cstruct : Cstruct.t -> (Log.t, [> `Msg of string ]) result
|
||||||
|
|
||||||
|
val log_to_disk : Vmm_commands.version -> Log.t -> Cstruct.t
|
||||||
|
|
||||||
|
val logs_of_disk : Vmm_commands.version -> Cstruct.t -> Log.t list
|
||||||
|
|
||||||
type cert_extension = Vmm_commands.version * Vmm_commands.t
|
type cert_extension = Vmm_commands.version * Vmm_commands.t
|
||||||
|
|
||||||
val cert_extension_of_cstruct : Cstruct.t -> (cert_extension, [> `Msg of string ]) result
|
val cert_extension_of_cstruct : Cstruct.t -> (cert_extension, [> `Msg of string ]) result
|
||||||
|
|
Loading…
Reference in a new issue