module LaunchDarkly.Server.Network.Eventing (eventThread) where

import qualified Codec.Compression.GZip as GZip
import Control.Concurrent (killThread, myThreadId)
import Control.Concurrent.MVar (modifyMVar_, readMVar, swapMVar, takeMVar)
import Control.Monad (forever, unless, void, when)
import Control.Monad.Catch (MonadMask, MonadThrow)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Logger (MonadLogger, logDebug, logError, logWarn)
import Data.Aeson (encode)
import qualified Data.ByteString.Lazy as L
import Data.Function ((&))
import Data.Generics.Product (getField)
import Data.IORef (atomicModifyIORef', newIORef, readIORef)
import qualified Data.Text as T
import Data.Text.Encoding (decodeUtf8)
import Data.Tuple (swap)
import qualified Data.UUID as UUID
import Network.HTTP.Client (Manager, Request (..), RequestBody (..), httpLbs, responseStatus)
import Network.HTTP.Types.Status (Status (statusCode), status400)
import System.Random (newStdGen, random)
import System.Timeout (timeout)

import LaunchDarkly.Server.Client.Internal (Client, Status (ShuttingDown))
import LaunchDarkly.Server.Config.ClientContext
import LaunchDarkly.Server.Config.HttpConfiguration (prepareRequest)
import LaunchDarkly.Server.Events (EventState, processSummary)
import LaunchDarkly.Server.Network.Common (addToAL, checkAuthorization, getServerTime, isHttpUnrecoverable, tryAuthorized, tryHTTP)

-- A true result indicates a retry does not need to be attempted
processSend :: (MonadIO m, MonadLogger m, MonadMask m, MonadThrow m) => Manager -> Request -> m (Bool, Integer)
processSend :: Manager -> Request -> m (Bool, Integer)
processSend manager :: Manager
manager req :: Request
req =
    (IO (Either HttpException (Response ByteString))
-> m (Either HttpException (Response ByteString))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either HttpException (Response ByteString))
 -> m (Either HttpException (Response ByteString)))
-> IO (Either HttpException (Response ByteString))
-> m (Either HttpException (Response ByteString))
forall a b. (a -> b) -> a -> b
$ IO (Response ByteString)
-> IO (Either HttpException (Response ByteString))
forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either HttpException a)
tryHTTP (IO (Response ByteString)
 -> IO (Either HttpException (Response ByteString)))
-> IO (Response ByteString)
-> IO (Either HttpException (Response ByteString))
forall a b. (a -> b) -> a -> b
$ Request -> Manager -> IO (Response ByteString)
httpLbs Request
req Manager
manager) m (Either HttpException (Response ByteString))
-> (Either HttpException (Response ByteString)
    -> m (Bool, Integer))
-> m (Bool, Integer)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        (Left err :: HttpException
err) -> $(LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) (String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ HttpException -> String
forall a. Show a => a -> String
show HttpException
err) m () -> m (Bool, Integer) -> m (Bool, Integer)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> (Bool, Integer) -> m (Bool, Integer)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
False, 0)
        (Right response :: Response ByteString
response) -> do
            Response ByteString -> m ()
forall (m :: * -> *) body. MonadThrow m => Response body -> m ()
checkAuthorization Response ByteString
response
            let code :: Status
code = Response ByteString -> Status
forall body. Response body -> Status
responseStatus Response ByteString
response
                serverTime :: Integer
serverTime = Response ByteString -> Integer
forall body. Response body -> Integer
getServerTime Response ByteString
response
             in $(LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logWarn) (Text -> Text -> Text
T.append "@@@ server time from LD was determined to be: " (Text -> Text) -> Text -> Text
forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Integer -> String
forall a. Show a => a -> String
show Integer
serverTime)
                    m () -> m (Bool, Integer) -> m (Bool, Integer)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> if Status
code Status -> Status -> Bool
forall a. Ord a => a -> a -> Bool
< Status
status400
                        then (Bool, Integer) -> m (Bool, Integer)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
True, Integer
serverTime)
                        else
                            if Int -> Bool
isHttpUnrecoverable (Int -> Bool) -> Int -> Bool
forall a b. (a -> b) -> a -> b
$ Status -> Int
statusCode (Status -> Int) -> Status -> Int
forall a b. (a -> b) -> a -> b
$ Status
code
                                then $(LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logWarn) (Text -> Text -> Text
T.append "got non recoverable event post response dropping payload: " (Text -> Text) -> Text -> Text
forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Status -> String
forall a. Show a => a -> String
show Status
code) m () -> m (Bool, Integer) -> m (Bool, Integer)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> (Bool, Integer) -> m (Bool, Integer)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
True, Integer
serverTime)
                                else (Bool, Integer) -> m (Bool, Integer)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
False, Integer
serverTime)

setEventHeaders :: Request -> Request
setEventHeaders :: Request -> Request
setEventHeaders request :: Request
request =
    Request
request
        { requestHeaders :: RequestHeaders
requestHeaders =
            (Request -> RequestHeaders
requestHeaders Request
request)
                RequestHeaders
-> (RequestHeaders -> RequestHeaders) -> RequestHeaders
forall a b. a -> (a -> b) -> b
& \l :: RequestHeaders
l ->
                    RequestHeaders -> HeaderName -> ByteString -> RequestHeaders
forall k v. Eq k => [(k, v)] -> k -> v -> [(k, v)]
addToAL RequestHeaders
l "Content-Type" "application/json"
                        RequestHeaders
-> (RequestHeaders -> RequestHeaders) -> RequestHeaders
forall a b. a -> (a -> b) -> b
& \l :: RequestHeaders
l -> RequestHeaders -> HeaderName -> ByteString -> RequestHeaders
forall k v. Eq k => [(k, v)] -> k -> v -> [(k, v)]
addToAL RequestHeaders
l "X-LaunchDarkly-Event-Schema" "4"
        , method :: ByteString
method = "POST"
        }

updateLastKnownServerTime :: EventState -> Integer -> IO ()
updateLastKnownServerTime :: EventState -> Integer -> IO ()
updateLastKnownServerTime state :: EventState
state serverTime :: Integer
serverTime = MVar Integer -> (Integer -> IO Integer) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (EventState -> MVar Integer
forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"lastKnownServerTime" EventState
state) (\lastKnown :: Integer
lastKnown -> Integer -> IO Integer
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Integer -> IO Integer) -> Integer -> IO Integer
forall a b. (a -> b) -> a -> b
$ Integer -> Integer -> Integer
forall a. Ord a => a -> a -> a
max Integer
serverTime Integer
lastKnown)

eventThread :: (MonadIO m, MonadLogger m, MonadMask m) => Manager -> Client -> ClientContext -> m ()
eventThread :: Manager -> Client -> ClientContext -> m ()
eventThread manager :: Manager
manager client :: Client
client clientContext :: ClientContext
clientContext = do
    let
        state :: EventState
state = Client -> EventState
forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"events" Client
client
        config :: Config
config = Client -> Config
forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"config" Client
client
        compressEvents :: Bool
compressEvents = Config -> Bool
forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"compressEvents" Config
config
        httpConfig :: HttpConfiguration
httpConfig = ClientContext -> HttpConfiguration
httpConfiguration ClientContext
clientContext
    IORef StdGen
rngRef <- IO (IORef StdGen) -> m (IORef StdGen)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef StdGen) -> m (IORef StdGen))
-> IO (IORef StdGen) -> m (IORef StdGen)
forall a b. (a -> b) -> a -> b
$ IO StdGen
newStdGen IO StdGen -> (StdGen -> IO (IORef StdGen)) -> IO (IORef StdGen)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= StdGen -> IO (IORef StdGen)
forall a. a -> IO (IORef a)
newIORef
    Request
req <- (IO Request -> m Request
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Request -> m Request) -> IO Request -> m Request
forall a b. (a -> b) -> a -> b
$ HttpConfiguration -> String -> IO Request
forall (m :: * -> *).
MonadThrow m =>
HttpConfiguration -> String -> m Request
prepareRequest HttpConfiguration
httpConfig (String -> IO Request) -> String -> IO Request
forall a b. (a -> b) -> a -> b
$ (Text -> String
T.unpack (Text -> String) -> Text -> String
forall a b. (a -> b) -> a -> b
$ Config -> Text
forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"eventsURI" Config
config) String -> String -> String
forall a. [a] -> [a] -> [a]
++ "/bulk") m Request -> (Request -> m Request) -> m Request
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Request -> m Request
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Request -> m Request)
-> (Request -> Request) -> Request -> m Request
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Request -> Request
setEventHeaders
    m () -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Client -> m Any -> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadLogger m, MonadCatch m) =>
Client -> m a -> m ()
tryAuthorized Client
client (m Any -> m ()) -> m Any -> m ()
forall a b. (a -> b) -> a -> b
$ m () -> m Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m Any) -> m () -> m Any
forall a b. (a -> b) -> a -> b
$ do
        IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Config -> EventState -> IO ()
processSummary Config
config EventState
state
        [EventType]
events' <- IO [EventType] -> m [EventType]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [EventType] -> m [EventType])
-> IO [EventType] -> m [EventType]
forall a b. (a -> b) -> a -> b
$ MVar [EventType] -> [EventType] -> IO [EventType]
forall a. MVar a -> a -> IO a
swapMVar (EventState -> MVar [EventType]
forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"events" EventState
state) []
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ [EventType] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [EventType]
events') (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            UUID
payloadId <- IO UUID -> m UUID
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO UUID -> m UUID) -> IO UUID -> m UUID
forall a b. (a -> b) -> a -> b
$ IORef StdGen -> (StdGen -> (StdGen, UUID)) -> IO UUID
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef StdGen
rngRef ((UUID, StdGen) -> (StdGen, UUID)
forall a b. (a, b) -> (b, a)
swap ((UUID, StdGen) -> (StdGen, UUID))
-> (StdGen -> (UUID, StdGen)) -> StdGen -> (StdGen, UUID)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StdGen -> (UUID, StdGen)
forall a g. (Random a, RandomGen g) => g -> (a, g)
random)
            let
                encoded :: ByteString
encoded = [EventType] -> ByteString
forall a. ToJSON a => a -> ByteString
encode [EventType]
events'
                payload :: ByteString
payload = if Bool
compressEvents then ByteString -> ByteString
GZip.compress ByteString
encoded else ByteString
encoded
                thisReq :: Request
thisReq =
                    Request
req
                        { requestBody :: RequestBody
requestBody = ByteString -> RequestBody
RequestBodyLBS ByteString
payload
                        , requestHeaders :: RequestHeaders
requestHeaders =
                            (Request -> RequestHeaders
requestHeaders Request
req)
                                RequestHeaders
-> (RequestHeaders -> RequestHeaders) -> RequestHeaders
forall a b. a -> (a -> b) -> b
& \l :: RequestHeaders
l ->
                                    RequestHeaders -> HeaderName -> ByteString -> RequestHeaders
forall k v. Eq k => [(k, v)] -> k -> v -> [(k, v)]
addToAL RequestHeaders
l "X-LaunchDarkly-Payload-ID" (UUID -> ByteString
UUID.toASCIIBytes UUID
payloadId)
                                        RequestHeaders
-> (RequestHeaders -> RequestHeaders) -> RequestHeaders
forall a b. a -> (a -> b) -> b
& \l :: RequestHeaders
l -> if Bool
compressEvents then RequestHeaders -> HeaderName -> ByteString -> RequestHeaders
forall k v. Eq k => [(k, v)] -> k -> v -> [(k, v)]
addToAL RequestHeaders
l "Content-Encoding" "gzip" else RequestHeaders
l
                        }
            (success :: Bool
success, serverTime :: Integer
serverTime) <- Manager -> Request -> m (Bool, Integer)
forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m, MonadThrow m) =>
Manager -> Request -> m (Bool, Integer)
processSend Manager
manager Request
thisReq
            $(LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logDebug) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
T.append "sending events: " (Text -> Text) -> Text -> Text
forall a b. (a -> b) -> a -> b
$ ByteString -> Text
decodeUtf8 (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
L.toStrict ByteString
encoded
            ()
_ <- case Bool
success of
                True -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ EventState -> Integer -> IO ()
updateLastKnownServerTime EventState
state Integer
serverTime
                False -> do
                    $(LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logWarn) "retrying event delivery after one second"
                    IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ()) -> IO (Maybe ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> IO () -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
timeout (1 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1000000) (IO () -> IO (Maybe ())) -> IO () -> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
readMVar (MVar () -> IO ()) -> MVar () -> IO ()
forall a b. (a -> b) -> a -> b
$ EventState -> MVar ()
forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"flush" EventState
state
                    (success' :: Bool
success', serverTime' :: Integer
serverTime') <- Manager -> Request -> m (Bool, Integer)
forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m, MonadThrow m) =>
Manager -> Request -> m (Bool, Integer)
processSend Manager
manager Request
thisReq
                    Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
success' (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
                        $(LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logWarn) "failed sending events on retry, dropping event batch"
                    IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ EventState -> Integer -> IO ()
updateLastKnownServerTime EventState
state Integer
serverTime'
            $(LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logDebug) "finished send of event batch"
        Status
status <- IO Status -> m Status
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Status -> m Status) -> IO Status -> m Status
forall a b. (a -> b) -> a -> b
$ IORef Status -> IO Status
forall a. IORef a -> IO a
readIORef (IORef Status -> IO Status) -> IORef Status -> IO Status
forall a b. (a -> b) -> a -> b
$ Client -> IORef Status
forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"status" Client
client
        IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Status
status Status -> Status -> Bool
forall a. Eq a => a -> a -> Bool
== Status
ShuttingDown) (IO ThreadId
myThreadId IO ThreadId -> (ThreadId -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ThreadId -> IO ()
killThread)
        IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ()) -> IO (Maybe ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> IO () -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
timeout (Int -> Int -> Int
forall a. Num a => a -> a -> a
(*) 1000000 (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$ Natural -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Natural -> Int) -> Natural -> Int
forall a b. (a -> b) -> a -> b
$ Config -> Natural
forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"flushIntervalSeconds" Config
config) (IO () -> IO (Maybe ())) -> IO () -> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (MVar () -> IO ()) -> MVar () -> IO ()
forall a b. (a -> b) -> a -> b
$ EventState -> MVar ()
forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"flush" EventState
state