module LaunchDarkly.Server.Network.Eventing (eventThread) where
import qualified Codec.Compression.GZip as GZip
import Control.Concurrent (killThread, myThreadId)
import Control.Concurrent.MVar (modifyMVar_, readMVar, swapMVar, takeMVar)
import Control.Monad (forever, unless, void, when)
import Control.Monad.Catch (MonadMask, MonadThrow)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Logger (MonadLogger, logDebug, logError, logWarn)
import Data.Aeson (encode)
import qualified Data.ByteString.Lazy as L
import Data.Function ((&))
import Data.Generics.Product (getField)
import Data.IORef (atomicModifyIORef', newIORef, readIORef)
import qualified Data.Text as T
import Data.Text.Encoding (decodeUtf8)
import Data.Tuple (swap)
import qualified Data.UUID as UUID
import Network.HTTP.Client (Manager, Request (..), RequestBody (..), httpLbs, responseStatus)
import Network.HTTP.Types.Status (Status (statusCode), status400)
import System.Random (newStdGen, random)
import System.Timeout (timeout)
import LaunchDarkly.Server.Client.Internal (Client, Status (ShuttingDown))
import LaunchDarkly.Server.Config.ClientContext
import LaunchDarkly.Server.Config.HttpConfiguration (prepareRequest)
import LaunchDarkly.Server.Events (EventState, processSummary)
import LaunchDarkly.Server.Network.Common (addToAL, checkAuthorization, getServerTime, isHttpUnrecoverable, tryAuthorized, tryHTTP)
processSend :: (MonadIO m, MonadLogger m, MonadMask m, MonadThrow m) => Manager -> Request -> m (Bool, Integer)
processSend :: Manager -> Request -> m (Bool, Integer)
processSend manager :: Manager
manager req :: Request
req =
(IO (Either HttpException (Response ByteString))
-> m (Either HttpException (Response ByteString))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either HttpException (Response ByteString))
-> m (Either HttpException (Response ByteString)))
-> IO (Either HttpException (Response ByteString))
-> m (Either HttpException (Response ByteString))
forall a b. (a -> b) -> a -> b
$ IO (Response ByteString)
-> IO (Either HttpException (Response ByteString))
forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either HttpException a)
tryHTTP (IO (Response ByteString)
-> IO (Either HttpException (Response ByteString)))
-> IO (Response ByteString)
-> IO (Either HttpException (Response ByteString))
forall a b. (a -> b) -> a -> b
$ Request -> Manager -> IO (Response ByteString)
httpLbs Request
req Manager
manager) m (Either HttpException (Response ByteString))
-> (Either HttpException (Response ByteString)
-> m (Bool, Integer))
-> m (Bool, Integer)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
(Left err :: HttpException
err) -> $(LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) (String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ HttpException -> String
forall a. Show a => a -> String
show HttpException
err) m () -> m (Bool, Integer) -> m (Bool, Integer)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> (Bool, Integer) -> m (Bool, Integer)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
False, 0)
(Right response :: Response ByteString
response) -> do
Response ByteString -> m ()
forall (m :: * -> *) body. MonadThrow m => Response body -> m ()
checkAuthorization Response ByteString
response
let code :: Status
code = Response ByteString -> Status
forall body. Response body -> Status
responseStatus Response ByteString
response
serverTime :: Integer
serverTime = Response ByteString -> Integer
forall body. Response body -> Integer
getServerTime Response ByteString
response
in $(LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logWarn) (Text -> Text -> Text
T.append "@@@ server time from LD was determined to be: " (Text -> Text) -> Text -> Text
forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Integer -> String
forall a. Show a => a -> String
show Integer
serverTime)
m () -> m (Bool, Integer) -> m (Bool, Integer)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> if Status
code Status -> Status -> Bool
forall a. Ord a => a -> a -> Bool
< Status
status400
then (Bool, Integer) -> m (Bool, Integer)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
True, Integer
serverTime)
else
if Int -> Bool
isHttpUnrecoverable (Int -> Bool) -> Int -> Bool
forall a b. (a -> b) -> a -> b
$ Status -> Int
statusCode (Status -> Int) -> Status -> Int
forall a b. (a -> b) -> a -> b
$ Status
code
then $(LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logWarn) (Text -> Text -> Text
T.append "got non recoverable event post response dropping payload: " (Text -> Text) -> Text -> Text
forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Status -> String
forall a. Show a => a -> String
show Status
code) m () -> m (Bool, Integer) -> m (Bool, Integer)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> (Bool, Integer) -> m (Bool, Integer)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
True, Integer
serverTime)
else (Bool, Integer) -> m (Bool, Integer)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
False, Integer
serverTime)
setEventHeaders :: Request -> Request
request :: Request
request =
Request
request
{ requestHeaders :: RequestHeaders
requestHeaders =
(Request -> RequestHeaders
requestHeaders Request
request)
RequestHeaders
-> (RequestHeaders -> RequestHeaders) -> RequestHeaders
forall a b. a -> (a -> b) -> b
& \l :: RequestHeaders
l ->
RequestHeaders -> HeaderName -> ByteString -> RequestHeaders
forall k v. Eq k => [(k, v)] -> k -> v -> [(k, v)]
addToAL RequestHeaders
l "Content-Type" "application/json"
RequestHeaders
-> (RequestHeaders -> RequestHeaders) -> RequestHeaders
forall a b. a -> (a -> b) -> b
& \l :: RequestHeaders
l -> RequestHeaders -> HeaderName -> ByteString -> RequestHeaders
forall k v. Eq k => [(k, v)] -> k -> v -> [(k, v)]
addToAL RequestHeaders
l "X-LaunchDarkly-Event-Schema" "4"
, method :: ByteString
method = "POST"
}
updateLastKnownServerTime :: EventState -> Integer -> IO ()
updateLastKnownServerTime :: EventState -> Integer -> IO ()
updateLastKnownServerTime state :: EventState
state serverTime :: Integer
serverTime = MVar Integer -> (Integer -> IO Integer) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (EventState -> MVar Integer
forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"lastKnownServerTime" EventState
state) (\lastKnown :: Integer
lastKnown -> Integer -> IO Integer
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Integer -> IO Integer) -> Integer -> IO Integer
forall a b. (a -> b) -> a -> b
$ Integer -> Integer -> Integer
forall a. Ord a => a -> a -> a
max Integer
serverTime Integer
lastKnown)
eventThread :: (MonadIO m, MonadLogger m, MonadMask m) => Manager -> Client -> ClientContext -> m ()
eventThread :: Manager -> Client -> ClientContext -> m ()
eventThread manager :: Manager
manager client :: Client
client clientContext :: ClientContext
clientContext = do
let
state :: EventState
state = Client -> EventState
forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"events" Client
client
config :: Config
config = Client -> Config
forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"config" Client
client
compressEvents :: Bool
compressEvents = Config -> Bool
forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"compressEvents" Config
config
httpConfig :: HttpConfiguration
httpConfig = ClientContext -> HttpConfiguration
httpConfiguration ClientContext
clientContext
IORef StdGen
rngRef <- IO (IORef StdGen) -> m (IORef StdGen)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef StdGen) -> m (IORef StdGen))
-> IO (IORef StdGen) -> m (IORef StdGen)
forall a b. (a -> b) -> a -> b
$ IO StdGen
newStdGen IO StdGen -> (StdGen -> IO (IORef StdGen)) -> IO (IORef StdGen)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= StdGen -> IO (IORef StdGen)
forall a. a -> IO (IORef a)
newIORef
Request
req <- (IO Request -> m Request
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Request -> m Request) -> IO Request -> m Request
forall a b. (a -> b) -> a -> b
$ HttpConfiguration -> String -> IO Request
forall (m :: * -> *).
MonadThrow m =>
HttpConfiguration -> String -> m Request
prepareRequest HttpConfiguration
httpConfig (String -> IO Request) -> String -> IO Request
forall a b. (a -> b) -> a -> b
$ (Text -> String
T.unpack (Text -> String) -> Text -> String
forall a b. (a -> b) -> a -> b
$ Config -> Text
forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"eventsURI" Config
config) String -> String -> String
forall a. [a] -> [a] -> [a]
++ "/bulk") m Request -> (Request -> m Request) -> m Request
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Request -> m Request
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Request -> m Request)
-> (Request -> Request) -> Request -> m Request
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Request -> Request
setEventHeaders
m () -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Client -> m Any -> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadLogger m, MonadCatch m) =>
Client -> m a -> m ()
tryAuthorized Client
client (m Any -> m ()) -> m Any -> m ()
forall a b. (a -> b) -> a -> b
$ m () -> m Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m Any) -> m () -> m Any
forall a b. (a -> b) -> a -> b
$ do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Config -> EventState -> IO ()
processSummary Config
config EventState
state
[EventType]
events' <- IO [EventType] -> m [EventType]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [EventType] -> m [EventType])
-> IO [EventType] -> m [EventType]
forall a b. (a -> b) -> a -> b
$ MVar [EventType] -> [EventType] -> IO [EventType]
forall a. MVar a -> a -> IO a
swapMVar (EventState -> MVar [EventType]
forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"events" EventState
state) []
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ [EventType] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [EventType]
events') (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
UUID
payloadId <- IO UUID -> m UUID
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO UUID -> m UUID) -> IO UUID -> m UUID
forall a b. (a -> b) -> a -> b
$ IORef StdGen -> (StdGen -> (StdGen, UUID)) -> IO UUID
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef StdGen
rngRef ((UUID, StdGen) -> (StdGen, UUID)
forall a b. (a, b) -> (b, a)
swap ((UUID, StdGen) -> (StdGen, UUID))
-> (StdGen -> (UUID, StdGen)) -> StdGen -> (StdGen, UUID)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StdGen -> (UUID, StdGen)
forall a g. (Random a, RandomGen g) => g -> (a, g)
random)
let
encoded :: ByteString
encoded = [EventType] -> ByteString
forall a. ToJSON a => a -> ByteString
encode [EventType]
events'
payload :: ByteString
payload = if Bool
compressEvents then ByteString -> ByteString
GZip.compress ByteString
encoded else ByteString
encoded
thisReq :: Request
thisReq =
Request
req
{ requestBody :: RequestBody
requestBody = ByteString -> RequestBody
RequestBodyLBS ByteString
payload
, requestHeaders :: RequestHeaders
requestHeaders =
(Request -> RequestHeaders
requestHeaders Request
req)
RequestHeaders
-> (RequestHeaders -> RequestHeaders) -> RequestHeaders
forall a b. a -> (a -> b) -> b
& \l :: RequestHeaders
l ->
RequestHeaders -> HeaderName -> ByteString -> RequestHeaders
forall k v. Eq k => [(k, v)] -> k -> v -> [(k, v)]
addToAL RequestHeaders
l "X-LaunchDarkly-Payload-ID" (UUID -> ByteString
UUID.toASCIIBytes UUID
payloadId)
RequestHeaders
-> (RequestHeaders -> RequestHeaders) -> RequestHeaders
forall a b. a -> (a -> b) -> b
& \l :: RequestHeaders
l -> if Bool
compressEvents then RequestHeaders -> HeaderName -> ByteString -> RequestHeaders
forall k v. Eq k => [(k, v)] -> k -> v -> [(k, v)]
addToAL RequestHeaders
l "Content-Encoding" "gzip" else RequestHeaders
l
}
(success :: Bool
success, serverTime :: Integer
serverTime) <- Manager -> Request -> m (Bool, Integer)
forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m, MonadThrow m) =>
Manager -> Request -> m (Bool, Integer)
processSend Manager
manager Request
thisReq
$(LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logDebug) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
T.append "sending events: " (Text -> Text) -> Text -> Text
forall a b. (a -> b) -> a -> b
$ ByteString -> Text
decodeUtf8 (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
L.toStrict ByteString
encoded
()
_ <- case Bool
success of
True -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ EventState -> Integer -> IO ()
updateLastKnownServerTime EventState
state Integer
serverTime
False -> do
$(LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logWarn) "retrying event delivery after one second"
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ()) -> IO (Maybe ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> IO () -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
timeout (1 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1000000) (IO () -> IO (Maybe ())) -> IO () -> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
readMVar (MVar () -> IO ()) -> MVar () -> IO ()
forall a b. (a -> b) -> a -> b
$ EventState -> MVar ()
forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"flush" EventState
state
(success' :: Bool
success', serverTime' :: Integer
serverTime') <- Manager -> Request -> m (Bool, Integer)
forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m, MonadThrow m) =>
Manager -> Request -> m (Bool, Integer)
processSend Manager
manager Request
thisReq
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
success' (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
$(LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logWarn) "failed sending events on retry, dropping event batch"
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ EventState -> Integer -> IO ()
updateLastKnownServerTime EventState
state Integer
serverTime'
$(LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logDebug) "finished send of event batch"
Status
status <- IO Status -> m Status
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Status -> m Status) -> IO Status -> m Status
forall a b. (a -> b) -> a -> b
$ IORef Status -> IO Status
forall a. IORef a -> IO a
readIORef (IORef Status -> IO Status) -> IORef Status -> IO Status
forall a b. (a -> b) -> a -> b
$ Client -> IORef Status
forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"status" Client
client
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Status
status Status -> Status -> Bool
forall a. Eq a => a -> a -> Bool
== Status
ShuttingDown) (IO ThreadId
myThreadId IO ThreadId -> (ThreadId -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ThreadId -> IO ()
killThread)
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ()) -> IO (Maybe ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> IO () -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
timeout (Int -> Int -> Int
forall a. Num a => a -> a -> a
(*) 1000000 (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$ Natural -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Natural -> Int) -> Natural -> Int
forall a b. (a -> b) -> a -> b
$ Config -> Natural
forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"flushIntervalSeconds" Config
config) (IO () -> IO (Maybe ())) -> IO () -> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (MVar () -> IO ()) -> MVar () -> IO ()
forall a b. (a -> b) -> a -> b
$ EventState -> MVar ()
forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"flush" EventState
state