{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NumericUnderscores #-}
module LaunchDarkly.Server.Network.Streaming (streamingThread) where
import Control.Applicative (many, (<|>))
import Control.Concurrent (threadDelay)
import Control.Exception (throwIO)
import Control.Monad (mzero, void)
import Control.Monad.Catch (Exception, MonadCatch, MonadMask, try)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Logger (MonadLogger, logDebug, logError, logWarn)
import Control.Monad.Loops (iterateUntilM)
import Data.Aeson (FromJSON, Result (..), eitherDecode, fromJSON, parseJSON, withObject, (.!=), (.:), (.:?))
import Data.Attoparsec.Text as P hiding (Result, try)
import Data.ByteString (ByteString)
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as L
import Data.Text (Text)
import qualified Data.Text as T
import Data.Text.Encoding (decodeUtf8, encodeUtf8)
import GHC.Generics (Generic)
import GHC.Natural (Natural)
import Network.HTTP.Client (HttpException (..), HttpExceptionContent (..), Manager, Request, Response (..), brRead)
import Network.HTTP.Types.Status (Status (statusCode))
import System.Clock (Clock (Monotonic), TimeSpec (TimeSpec), getTime)
import System.Random (Random (randomR), newStdGen)
import System.Timeout (timeout)
import LaunchDarkly.AesonCompat (KeyMap, emptyObject)
import LaunchDarkly.Server.Config.ClientContext (ClientContext (..))
import LaunchDarkly.Server.Config.HttpConfiguration (HttpConfiguration (..), prepareRequest)
import LaunchDarkly.Server.DataSource.Internal (DataSourceUpdates (..))
import LaunchDarkly.Server.Features (Flag, Segment)
import LaunchDarkly.Server.Network.Common (checkAuthorization, handleUnauthorized, isHttpUnrecoverable, throwIfNot200, tryHTTP, withResponseGeneric)
import LaunchDarkly.Server.Store.Internal (StoreResult)
data PutBody = PutBody
{ PutBody -> KeyMap Flag
flags :: !(KeyMap Flag)
, PutBody -> KeyMap Segment
segments :: !(KeyMap Segment)
}
deriving ((forall x. PutBody -> Rep PutBody x)
-> (forall x. Rep PutBody x -> PutBody) -> Generic PutBody
forall x. Rep PutBody x -> PutBody
forall x. PutBody -> Rep PutBody x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep PutBody x -> PutBody
$cfrom :: forall x. PutBody -> Rep PutBody x
Generic, Int -> PutBody -> ShowS
[PutBody] -> ShowS
PutBody -> String
(Int -> PutBody -> ShowS)
-> (PutBody -> String) -> ([PutBody] -> ShowS) -> Show PutBody
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [PutBody] -> ShowS
$cshowList :: [PutBody] -> ShowS
show :: PutBody -> String
$cshow :: PutBody -> String
showsPrec :: Int -> PutBody -> ShowS
$cshowsPrec :: Int -> PutBody -> ShowS
Show)
data PathData d = PathData
{ PathData d -> Text
path :: !Text
, PathData d -> d
pathData :: !d
}
deriving ((forall x. PathData d -> Rep (PathData d) x)
-> (forall x. Rep (PathData d) x -> PathData d)
-> Generic (PathData d)
forall x. Rep (PathData d) x -> PathData d
forall x. PathData d -> Rep (PathData d) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall d x. Rep (PathData d) x -> PathData d
forall d x. PathData d -> Rep (PathData d) x
$cto :: forall d x. Rep (PathData d) x -> PathData d
$cfrom :: forall d x. PathData d -> Rep (PathData d) x
Generic, Int -> PathData d -> ShowS
[PathData d] -> ShowS
PathData d -> String
(Int -> PathData d -> ShowS)
-> (PathData d -> String)
-> ([PathData d] -> ShowS)
-> Show (PathData d)
forall d. Show d => Int -> PathData d -> ShowS
forall d. Show d => [PathData d] -> ShowS
forall d. Show d => PathData d -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [PathData d] -> ShowS
$cshowList :: forall d. Show d => [PathData d] -> ShowS
show :: PathData d -> String
$cshow :: forall d. Show d => PathData d -> String
showsPrec :: Int -> PathData d -> ShowS
$cshowsPrec :: forall d. Show d => Int -> PathData d -> ShowS
Show)
data PathVersion = PathVersion
{ PathVersion -> Text
path :: !Text
, PathVersion -> Natural
version :: !Natural
}
deriving ((forall x. PathVersion -> Rep PathVersion x)
-> (forall x. Rep PathVersion x -> PathVersion)
-> Generic PathVersion
forall x. Rep PathVersion x -> PathVersion
forall x. PathVersion -> Rep PathVersion x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep PathVersion x -> PathVersion
$cfrom :: forall x. PathVersion -> Rep PathVersion x
Generic, Int -> PathVersion -> ShowS
[PathVersion] -> ShowS
PathVersion -> String
(Int -> PathVersion -> ShowS)
-> (PathVersion -> String)
-> ([PathVersion] -> ShowS)
-> Show PathVersion
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [PathVersion] -> ShowS
$cshowList :: [PathVersion] -> ShowS
show :: PathVersion -> String
$cshow :: PathVersion -> String
showsPrec :: Int -> PathVersion -> ShowS
$cshowsPrec :: Int -> PathVersion -> ShowS
Show, Value -> Parser [PathVersion]
Value -> Parser PathVersion
(Value -> Parser PathVersion)
-> (Value -> Parser [PathVersion]) -> FromJSON PathVersion
forall a.
(Value -> Parser a) -> (Value -> Parser [a]) -> FromJSON a
parseJSONList :: Value -> Parser [PathVersion]
$cparseJSONList :: Value -> Parser [PathVersion]
parseJSON :: Value -> Parser PathVersion
$cparseJSON :: Value -> Parser PathVersion
FromJSON)
instance FromJSON PutBody where
parseJSON :: Value -> Parser PutBody
parseJSON = String -> (Object -> Parser PutBody) -> Value -> Parser PutBody
forall a. String -> (Object -> Parser a) -> Value -> Parser a
withObject "PutBody" ((Object -> Parser PutBody) -> Value -> Parser PutBody)
-> (Object -> Parser PutBody) -> Value -> Parser PutBody
forall a b. (a -> b) -> a -> b
$ \o :: Object
o -> do
KeyMap Flag
flags <- Object
o Object -> Text -> Parser (KeyMap Flag)
forall a. FromJSON a => Object -> Text -> Parser a
.: "flags"
KeyMap Segment
segments <- Object
o Object -> Text -> Parser (Maybe (KeyMap Segment))
forall a. FromJSON a => Object -> Text -> Parser (Maybe a)
.:? "segments" Parser (Maybe (KeyMap Segment))
-> KeyMap Segment -> Parser (KeyMap Segment)
forall a. Parser (Maybe a) -> a -> Parser a
.!= KeyMap Segment
forall v. KeyMap v
emptyObject
PutBody -> Parser PutBody
forall (f :: * -> *) a. Applicative f => a -> f a
pure (PutBody -> Parser PutBody) -> PutBody -> Parser PutBody
forall a b. (a -> b) -> a -> b
$ $WPutBody :: KeyMap Flag -> KeyMap Segment -> PutBody
PutBody {$sel:flags:PutBody :: KeyMap Flag
flags = KeyMap Flag
flags, $sel:segments:PutBody :: KeyMap Segment
segments = KeyMap Segment
segments}
instance FromJSON a => FromJSON (PathData a) where
parseJSON :: Value -> Parser (PathData a)
parseJSON = String
-> (Object -> Parser (PathData a)) -> Value -> Parser (PathData a)
forall a. String -> (Object -> Parser a) -> Value -> Parser a
withObject "Put" ((Object -> Parser (PathData a)) -> Value -> Parser (PathData a))
-> (Object -> Parser (PathData a)) -> Value -> Parser (PathData a)
forall a b. (a -> b) -> a -> b
$ \o :: Object
o -> do
a
pathData <- Object
o Object -> Text -> Parser a
forall a. FromJSON a => Object -> Text -> Parser a
.: "data"
Text
path <- Object
o Object -> Text -> Parser (Maybe Text)
forall a. FromJSON a => Object -> Text -> Parser (Maybe a)
.:? "path" Parser (Maybe Text) -> Text -> Parser Text
forall a. Parser (Maybe a) -> a -> Parser a
.!= "/"
PathData a -> Parser (PathData a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (PathData a -> Parser (PathData a))
-> PathData a -> Parser (PathData a)
forall a b. (a -> b) -> a -> b
$ $WPathData :: forall d. Text -> d -> PathData d
PathData {$sel:path:PathData :: Text
path = Text
path, $sel:pathData:PathData :: a
pathData = a
pathData}
data SSE = SSE
{ SSE -> Text
name :: !Text
, SSE -> Text
buffer :: !Text
, SSE -> Maybe Text
lastEventId :: !(Maybe Text)
, SSE -> Maybe Text
retry :: !(Maybe Text)
}
deriving ((forall x. SSE -> Rep SSE x)
-> (forall x. Rep SSE x -> SSE) -> Generic SSE
forall x. Rep SSE x -> SSE
forall x. SSE -> Rep SSE x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep SSE x -> SSE
$cfrom :: forall x. SSE -> Rep SSE x
Generic, Int -> SSE -> ShowS
[SSE] -> ShowS
SSE -> String
(Int -> SSE -> ShowS)
-> (SSE -> String) -> ([SSE] -> ShowS) -> Show SSE
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [SSE] -> ShowS
$cshowList :: [SSE] -> ShowS
show :: SSE -> String
$cshow :: SSE -> String
showsPrec :: Int -> SSE -> ShowS
$cshowsPrec :: Int -> SSE -> ShowS
Show, SSE -> SSE -> Bool
(SSE -> SSE -> Bool) -> (SSE -> SSE -> Bool) -> Eq SSE
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: SSE -> SSE -> Bool
$c/= :: SSE -> SSE -> Bool
== :: SSE -> SSE -> Bool
$c== :: SSE -> SSE -> Bool
Eq)
nameCharPredicate :: Char -> Bool
nameCharPredicate :: Char -> Bool
nameCharPredicate x :: Char
x = Char
x Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
/= '\r' Bool -> Bool -> Bool
&& Char
x Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
/= ':' Bool -> Bool -> Bool
&& Char
x Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
/= '\n'
anyCharPredicate :: Char -> Bool
anyCharPredicate :: Char -> Bool
anyCharPredicate x :: Char
x = Char
x Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
/= '\r' Bool -> Bool -> Bool
&& Char
x Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
/= '\n'
endOfLineSSE :: Parser ()
endOfLineSSE :: Parser ()
endOfLineSSE = [Parser ()] -> Parser ()
forall (f :: * -> *) a. Alternative f => [f a] -> f a
choice [Parser Text Text -> Parser ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Parser Text Text -> Parser ()) -> Parser Text Text -> Parser ()
forall a b. (a -> b) -> a -> b
$ Text -> Parser Text Text
string "\r\n", Parser Text Text -> Parser ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Parser Text Text -> Parser ()) -> Parser Text Text -> Parser ()
forall a b. (a -> b) -> a -> b
$ Text -> Parser Text Text
string "\r", Parser Text Text -> Parser ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Parser Text Text -> Parser ()) -> Parser Text Text -> Parser ()
forall a b. (a -> b) -> a -> b
$ Text -> Parser Text Text
string "\n", Parser ()
forall t. Chunk t => Parser t ()
endOfInput]
comment :: Parser ()
= Char -> Parser Char
char ':' Parser Char -> Parser Text Text -> Parser Text Text
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> (Char -> Bool) -> Parser Text Text
P.takeWhile Char -> Bool
anyCharPredicate Parser Text Text -> Parser () -> Parser ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Parser ()
endOfLineSSE Parser () -> Parser () -> Parser ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> Parser ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
parseField :: Parser (Text, Text)
parseField :: Parser (Text, Text)
parseField = do
Text
fieldName <- (Char -> Bool) -> Parser Text Text
P.takeWhile1 Char -> Bool
nameCharPredicate
Parser Char -> Parser ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Parser Char -> Parser ()) -> Parser Char -> Parser ()
forall a b. (a -> b) -> a -> b
$ Char -> Parser Char -> Parser Char
forall (f :: * -> *) a. Alternative f => a -> f a -> f a
option ' ' (Parser Char -> Parser Char) -> Parser Char -> Parser Char
forall a b. (a -> b) -> a -> b
$ Char -> Parser Char
char ':'
Parser Char -> Parser ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Parser Char -> Parser ()) -> Parser Char -> Parser ()
forall a b. (a -> b) -> a -> b
$ Char -> Parser Char -> Parser Char
forall (f :: * -> *) a. Alternative f => a -> f a -> f a
option ' ' (Parser Char -> Parser Char) -> Parser Char -> Parser Char
forall a b. (a -> b) -> a -> b
$ Char -> Parser Char
char ' '
Text
fieldValue <- (Char -> Bool) -> Parser Text Text
P.takeWhile Char -> Bool
anyCharPredicate
Parser ()
endOfLineSSE
(Text, Text) -> Parser (Text, Text)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Text
fieldName, Text
fieldValue)
processField :: (Text, Text) -> SSE -> SSE
processField :: (Text, Text) -> SSE -> SSE
processField (fieldName :: Text
fieldName, fieldValue :: Text
fieldValue) event :: SSE
event = case Text
fieldName of
"event" -> SSE
event {$sel:name:SSE :: Text
name = Text
fieldValue}
"id" -> SSE
event {$sel:lastEventId:SSE :: Maybe Text
lastEventId = Text -> Maybe Text
forall a. a -> Maybe a
Just Text
fieldValue}
"retry" -> SSE
event {$sel:retry:SSE :: Maybe Text
retry = Text -> Maybe Text
forall a. a -> Maybe a
Just Text
fieldValue}
"data" -> SSE
event {$sel:buffer:SSE :: Text
buffer = [Text] -> Text
T.concat [SSE -> Text
buffer SSE
event, if Text -> Bool
T.null (SSE -> Text
buffer SSE
event) then "" else "\n", Text
fieldValue]}
_ -> SSE
event
parseEvent :: Parser SSE
parseEvent :: Parser SSE
parseEvent = do
[(Text, Text)]
fields <- [[(Text, Text)]] -> [(Text, Text)]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat ([[(Text, Text)]] -> [(Text, Text)])
-> Parser Text [[(Text, Text)]] -> Parser Text [(Text, Text)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Parser Text [(Text, Text)] -> Parser Text [[(Text, Text)]]
forall (f :: * -> *) a. Alternative f => f a -> f [a]
many ((Parser ()
comment Parser ()
-> Parser Text [(Text, Text)] -> Parser Text [(Text, Text)]
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> [(Text, Text)] -> Parser Text [(Text, Text)]
forall (f :: * -> *) a. Applicative f => a -> f a
pure []) Parser Text [(Text, Text)]
-> Parser Text [(Text, Text)] -> Parser Text [(Text, Text)]
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ((Text, Text) -> [(Text, Text)])
-> Parser (Text, Text) -> Parser Text [(Text, Text)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((Text, Text) -> [(Text, Text)] -> [(Text, Text)]
forall a. a -> [a] -> [a]
: []) Parser (Text, Text)
parseField)
Parser ()
endOfLineSSE
let event :: SSE
event = ((Text, Text) -> SSE -> SSE) -> SSE -> [(Text, Text)] -> SSE
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (Text, Text) -> SSE -> SSE
processField (Text -> Text -> Maybe Text -> Maybe Text -> SSE
SSE "" "" Maybe Text
forall (m :: * -> *) a. MonadPlus m => m a
mzero Maybe Text
forall (m :: * -> *) a. MonadPlus m => m a
mzero) [(Text, Text)]
fields
if Text -> Bool
T.null (SSE -> Text
name SSE
event) Bool -> Bool -> Bool
|| Text -> Bool
T.null (SSE -> Text
buffer SSE
event) then Parser SSE
parseEvent else SSE -> Parser SSE
forall (f :: * -> *) a. Applicative f => a -> f a
pure SSE
event
processPut :: (MonadIO m, MonadLogger m) => DataSourceUpdates -> L.ByteString -> m Bool
processPut :: DataSourceUpdates -> ByteString -> m Bool
processPut dataSourceUpdates :: DataSourceUpdates
dataSourceUpdates value :: ByteString
value = case ByteString -> Either String (PathData PutBody)
forall a. FromJSON a => ByteString -> Either String a
eitherDecode ByteString
value of
Right (PathData _ (PutBody flags :: KeyMap Flag
flags segments :: KeyMap Segment
segments)) -> do
$(LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logDebug) "initializing dataSourceUpdates with put"
IO (Either Text ()) -> m (Either Text ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (DataSourceUpdates
-> KeyMap Flag -> KeyMap Segment -> IO (Either Text ())
dataSourceUpdatesInit DataSourceUpdates
dataSourceUpdates KeyMap Flag
flags KeyMap Segment
segments) m (Either Text ()) -> (Either Text () -> m Bool) -> m Bool
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left err :: Text
err -> do
$(LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
T.append "dataSourceUpdates failed put: " Text
err
Bool -> m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
_ -> Bool -> m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
Left err :: String
err -> do
$(LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
T.append "failed to parse put body" (String -> Text
T.pack String
err)
Bool -> m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
processPatch :: forall m. (MonadIO m, MonadLogger m) => DataSourceUpdates -> L.ByteString -> m Bool
processPatch :: DataSourceUpdates -> ByteString -> m Bool
processPatch dataSourceUpdates :: DataSourceUpdates
dataSourceUpdates value :: ByteString
value = case ByteString -> Either String (PathData Value)
forall a. FromJSON a => ByteString -> Either String a
eitherDecode ByteString
value of
Right (PathData path :: Text
path body :: Value
body)
| Text -> Text -> Bool
T.isPrefixOf "/flags/" Text
path -> Text
-> Text
-> (DataSourceUpdates -> Flag -> IO (Either Text ()))
-> Result Flag
-> m Bool
forall a.
Text
-> Text
-> (DataSourceUpdates -> a -> IO (Either Text ()))
-> Result a
-> m Bool
insPatch "flag" Text
path DataSourceUpdates -> Flag -> IO (Either Text ())
dataSourceUpdatesInsertFlag (Value -> Result Flag
forall a. FromJSON a => Value -> Result a
fromJSON Value
body)
| Text -> Text -> Bool
T.isPrefixOf "/segments/" Text
path -> Text
-> Text
-> (DataSourceUpdates -> Segment -> IO (Either Text ()))
-> Result Segment
-> m Bool
forall a.
Text
-> Text
-> (DataSourceUpdates -> a -> IO (Either Text ()))
-> Result a
-> m Bool
insPatch "segment" Text
path DataSourceUpdates -> Segment -> IO (Either Text ())
dataSourceUpdatesInsertSegment (Value -> Result Segment
forall a. FromJSON a => Value -> Result a
fromJSON Value
body)
| Bool
otherwise -> do
$(LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) "unknown patch path"
Bool -> m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
Left err :: String
err -> do
$(LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
T.append "failed to parse patch generic" (String -> Text
T.pack String
err)
Bool -> m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
where
insPatch :: Text -> Text -> (DataSourceUpdates -> a -> IO (Either Text ())) -> Result a -> m Bool
insPatch :: Text
-> Text
-> (DataSourceUpdates -> a -> IO (Either Text ()))
-> Result a
-> m Bool
insPatch name :: Text
name _ _ (Error err :: String
err) = do
$(LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.concat ["failed to parse patch ", Text
name, ": ", String -> Text
T.pack String
err]
Bool -> m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
insPatch name :: Text
name path :: Text
path insert :: DataSourceUpdates -> a -> IO (Either Text ())
insert (Success item :: a
item) = do
$(LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logDebug) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.concat ["patching ", Text
name, " with path: ", Text
path]
Either Text ()
status <- IO (Either Text ()) -> m (Either Text ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either Text ()) -> m (Either Text ()))
-> IO (Either Text ()) -> m (Either Text ())
forall a b. (a -> b) -> a -> b
$ DataSourceUpdates -> a -> IO (Either Text ())
insert DataSourceUpdates
dataSourceUpdates a
item
(Text -> m Bool) -> (() -> m Bool) -> Either Text () -> m Bool
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either
( \err :: Text
err -> do
$(LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.concat ["dataSourceUpdates failed ", Text
name, " patch: ", Text
err]
Bool -> m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
)
(m Bool -> () -> m Bool
forall a b. a -> b -> a
const (m Bool -> () -> m Bool) -> m Bool -> () -> m Bool
forall a b. (a -> b) -> a -> b
$ Bool -> m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True)
Either Text ()
status
processDelete :: forall m. (MonadIO m, MonadLogger m) => DataSourceUpdates -> L.ByteString -> m Bool
processDelete :: DataSourceUpdates -> ByteString -> m Bool
processDelete dataSourceUpdates :: DataSourceUpdates
dataSourceUpdates value :: ByteString
value = case ByteString -> Either String PathVersion
forall a. FromJSON a => ByteString -> Either String a
eitherDecode ByteString
value :: Either String PathVersion of
Right (PathVersion path :: Text
path version :: Natural
version)
| Text -> Text -> Bool
T.isPrefixOf "/flags/" Text
path -> Text -> Text -> IO (Either Text ()) -> m Bool
logDelete "flag" Text
path (DataSourceUpdates -> Text -> Natural -> IO (Either Text ())
dataSourceUpdatesDeleteFlag DataSourceUpdates
dataSourceUpdates (Int -> Text -> Text
T.drop 7 Text
path) Natural
version)
| Text -> Text -> Bool
T.isPrefixOf "/segments/" Text
path -> Text -> Text -> IO (Either Text ()) -> m Bool
logDelete "segment" Text
path (DataSourceUpdates -> Text -> Natural -> IO (Either Text ())
dataSourceUpdatesDeleteSegment DataSourceUpdates
dataSourceUpdates (Int -> Text -> Text
T.drop 10 Text
path) Natural
version)
| Bool
otherwise -> do
$(LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) "unknown delete path"
Bool -> m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
Left err :: String
err -> do
$(LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
T.append "failed to parse delete" (String -> Text
T.pack String
err)
Bool -> m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
where
logDelete :: Text -> Text -> StoreResult () -> m Bool
logDelete :: Text -> Text -> IO (Either Text ()) -> m Bool
logDelete name :: Text
name path :: Text
path action :: IO (Either Text ())
action = do
$(LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logDebug) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.concat ["deleting ", Text
name, " with path: ", Text
path]
Either Text ()
status <- IO (Either Text ()) -> m (Either Text ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Either Text ())
action
(Text -> m Bool) -> (() -> m Bool) -> Either Text () -> m Bool
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either
( \err :: Text
err -> do
$(LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.concat ["dataSourceUpdates failed ", Text
name, " delete: ", Text
err]
Bool -> m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
)
(m Bool -> () -> m Bool
forall a b. a -> b -> a
const (m Bool -> () -> m Bool) -> m Bool -> () -> m Bool
forall a b. (a -> b) -> a -> b
$ Bool -> m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True)
Either Text ()
status
processEvent :: (MonadIO m, MonadLogger m) => DataSourceUpdates -> Text -> L.ByteString -> m Bool
processEvent :: DataSourceUpdates -> Text -> ByteString -> m Bool
processEvent dataSourceUpdates :: DataSourceUpdates
dataSourceUpdates name :: Text
name value :: ByteString
value = case Text
name of
"put" -> DataSourceUpdates -> ByteString -> m Bool
forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> ByteString -> m Bool
processPut DataSourceUpdates
dataSourceUpdates ByteString
value
"patch" -> DataSourceUpdates -> ByteString -> m Bool
forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> ByteString -> m Bool
processPatch DataSourceUpdates
dataSourceUpdates ByteString
value
"delete" -> DataSourceUpdates -> ByteString -> m Bool
forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> ByteString -> m Bool
processDelete DataSourceUpdates
dataSourceUpdates ByteString
value
_ -> do
$(LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logWarn) "unknown event type"
Bool -> m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
data ReadE = ReadETimeout | ReadEClosed deriving (Int -> ReadE -> ShowS
[ReadE] -> ShowS
ReadE -> String
(Int -> ReadE -> ShowS)
-> (ReadE -> String) -> ([ReadE] -> ShowS) -> Show ReadE
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ReadE] -> ShowS
$cshowList :: [ReadE] -> ShowS
show :: ReadE -> String
$cshow :: ReadE -> String
showsPrec :: Int -> ReadE -> ShowS
$cshowsPrec :: Int -> ReadE -> ShowS
Show, Show ReadE
Typeable ReadE
(Typeable ReadE, Show ReadE) =>
(ReadE -> SomeException)
-> (SomeException -> Maybe ReadE)
-> (ReadE -> String)
-> Exception ReadE
SomeException -> Maybe ReadE
ReadE -> String
ReadE -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
displayException :: ReadE -> String
$cdisplayException :: ReadE -> String
fromException :: SomeException -> Maybe ReadE
$cfromException :: SomeException -> Maybe ReadE
toException :: ReadE -> SomeException
$ctoException :: ReadE -> SomeException
$cp2Exception :: Show ReadE
$cp1Exception :: Typeable ReadE
Exception)
tryReadE :: MonadCatch m => m a -> m (Either ReadE a)
tryReadE :: m a -> m (Either ReadE a)
tryReadE = m a -> m (Either ReadE a)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try
readWithException :: IO ByteString -> IO Text
readWithException :: IO ByteString -> IO Text
readWithException body :: IO ByteString
body =
Int -> IO ByteString -> IO (Maybe ByteString)
forall a. Int -> IO a -> IO (Maybe a)
timeout (1_000_000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 300) (IO ByteString -> IO ByteString
brRead IO ByteString
body) IO (Maybe ByteString) -> (Maybe ByteString -> IO Text) -> IO Text
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Nothing -> ReadE -> IO Text
forall e a. Exception e => e -> IO a
throwIO ReadE
ReadETimeout
Just bytes :: ByteString
bytes -> if ByteString
bytes ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== ByteString
B.empty then ReadE -> IO Text
forall e a. Exception e => e -> IO a
throwIO ReadE
ReadEClosed else Text -> IO Text
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> Text
decodeUtf8 ByteString
bytes)
readStream :: (MonadIO m, MonadLogger m, MonadMask m) => IO ByteString -> DataSourceUpdates -> m Bool
readStream :: IO ByteString -> DataSourceUpdates -> m Bool
readStream body :: IO ByteString
body dataSourceUpdates :: DataSourceUpdates
dataSourceUpdates = Text -> Bool -> m Bool
loop "" Bool
False
where
loop :: Text -> Bool -> m Bool
loop initial :: Text
initial processedEvent :: Bool
processedEvent =
m (Result SSE) -> m (Either ReadE (Result SSE))
forall (m :: * -> *) a. MonadCatch m => m a -> m (Either ReadE a)
tryReadE (m Text -> Parser SSE -> Text -> m (Result SSE)
forall (m :: * -> *) a.
Monad m =>
m Text -> Parser a -> Text -> m (Result a)
parseWith (IO Text -> m Text
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Text -> m Text) -> IO Text -> m Text
forall a b. (a -> b) -> a -> b
$ IO ByteString -> IO Text
readWithException IO ByteString
body) Parser SSE
parseEvent Text
initial) m (Either ReadE (Result SSE))
-> (Either ReadE (Result SSE) -> m Bool) -> m Bool
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
(Left ReadETimeout) -> do
$(LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) "streaming connection unexpectedly closed"
Bool -> m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
processedEvent
(Left ReadEClosed) -> do
$(LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) "timeout waiting for SSE event"
Bool -> m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
processedEvent
(Right parsed :: Result SSE
parsed) -> case Result SSE
parsed of
Done remaining :: Text
remaining event :: SSE
event -> do
Bool
processed <- DataSourceUpdates -> Text -> ByteString -> m Bool
forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> Text -> ByteString -> m Bool
processEvent DataSourceUpdates
dataSourceUpdates (SSE -> Text
name SSE
event) (ByteString -> ByteString
L.fromStrict (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ Text -> ByteString
encodeUtf8 (Text -> ByteString) -> Text -> ByteString
forall a b. (a -> b) -> a -> b
$ SSE -> Text
buffer SSE
event)
if Bool
processed then Text -> Bool -> m Bool
loop Text
remaining Bool
True else Bool -> m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
processedEvent
Fail _ context :: [String]
context err :: String
err -> do
$(LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> [Text] -> Text
T.intercalate " " ["failed parsing SSE frame", String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ [String] -> String
forall a. Show a => a -> String
show [String]
context, String -> Text
T.pack String
err]
Bool -> m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
processedEvent
Partial _ -> do
$(LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) "failed parsing SSE frame unexpected partial"
Bool -> m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
processedEvent
startNewConnection :: (MonadIO m, MonadLogger m, MonadMask m) => Manager -> Request -> DataSourceUpdates -> StreamingState -> m StreamingState
startNewConnection :: Manager
-> Request
-> DataSourceUpdates
-> StreamingState
-> m StreamingState
startNewConnection manager :: Manager
manager request :: Request
request dataSourceUpdates :: DataSourceUpdates
dataSourceUpdates state :: StreamingState
state@(StreamingState {Maybe TimeSpec
$sel:activeSince:StreamingState :: StreamingState -> Maybe TimeSpec
activeSince :: Maybe TimeSpec
activeSince}) = do
$(LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logDebug) "starting new streaming connection"
Either HttpException Bool
status <- m Bool -> m (Either HttpException Bool)
forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either HttpException a)
tryHTTP (m Bool -> m (Either HttpException Bool))
-> m Bool -> m (Either HttpException Bool)
forall a b. (a -> b) -> a -> b
$ Request
-> Manager -> (Response (IO ByteString) -> m Bool) -> m Bool
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
Request -> Manager -> (Response (IO ByteString) -> m a) -> m a
withResponseGeneric Request
request Manager
manager ((Response (IO ByteString) -> m Bool) -> m Bool)
-> (Response (IO ByteString) -> m Bool) -> m Bool
forall a b. (a -> b) -> a -> b
$ \response :: Response (IO ByteString)
response -> do
Response (IO ByteString) -> m ()
forall (m :: * -> *) body. MonadThrow m => Response body -> m ()
checkAuthorization Response (IO ByteString)
response
m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Request -> Response (IO ByteString) -> m ()
forall (m :: * -> *).
MonadIO m =>
Request -> Response (IO ByteString) -> m ()
throwIfNot200 Request
request Response (IO ByteString)
response
m () -> m Bool -> m Bool
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ByteString -> DataSourceUpdates -> m Bool
forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
IO ByteString -> DataSourceUpdates -> m Bool
readStream (Response (IO ByteString) -> IO ByteString
forall body. Response body -> body
responseBody Response (IO ByteString)
response) DataSourceUpdates
dataSourceUpdates
TimeSpec
now <- IO TimeSpec -> m TimeSpec
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO TimeSpec -> m TimeSpec) -> IO TimeSpec -> m TimeSpec
forall a b. (a -> b) -> a -> b
$ Clock -> IO TimeSpec
getTime Clock
Monotonic
StreamingState
-> TimeSpec -> Either HttpException Bool -> m StreamingState
forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
StreamingState
-> TimeSpec -> Either HttpException Bool -> m StreamingState
handleResponse StreamingState
state TimeSpec
now Either HttpException Bool
status
where
handleResponse :: (MonadIO m, MonadLogger m, MonadMask m) => StreamingState -> TimeSpec -> (Either HttpException Bool) -> m StreamingState
handleResponse :: StreamingState
-> TimeSpec -> Either HttpException Bool -> m StreamingState
handleResponse state :: StreamingState
state now :: TimeSpec
now result :: Either HttpException Bool
result =
let state' :: StreamingState
state' = StreamingState
-> TimeSpec -> Either HttpException Bool -> StreamingState
updateState StreamingState
state TimeSpec
now Either HttpException Bool
result
in if StreamingState -> Bool
cancel StreamingState
state'
then StreamingState -> m StreamingState
forall (f :: * -> *) a. Applicative f => a -> f a
pure StreamingState
state'
else do
Int
delay <- StreamingState -> m Int
forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
StreamingState -> m Int
calculateDelay StreamingState
state'
()
_ <- IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
delay
StreamingState -> m StreamingState
forall (f :: * -> *) a. Applicative f => a -> f a
pure StreamingState
state'
updateState :: StreamingState -> TimeSpec -> (Either HttpException Bool) -> StreamingState
updateState :: StreamingState
-> TimeSpec -> Either HttpException Bool -> StreamingState
updateState state :: StreamingState
state now :: TimeSpec
now (Right _) = StreamingState
state {$sel:initialConnection:StreamingState :: Bool
initialConnection = Bool
False, $sel:activeSince:StreamingState :: Maybe TimeSpec
activeSince = TimeSpec -> Maybe TimeSpec
forall a. a -> Maybe a
Just TimeSpec
now, $sel:attempt:StreamingState :: Int
attempt = 1}
updateState state :: StreamingState
state@(StreamingState {$sel:attempt:StreamingState :: StreamingState -> Int
attempt = Int
att}) now :: TimeSpec
now (Left (HttpExceptionRequest _ (StatusCodeException response :: Response ()
response _)))
| Int -> Bool
isHttpUnrecoverable Int
code = StreamingState
state {$sel:cancel:StreamingState :: Bool
cancel = Bool
True}
| Bool
otherwise = do
case Maybe TimeSpec
activeSince of
(Just time :: TimeSpec
time)
| (TimeSpec
now TimeSpec -> TimeSpec -> Bool
forall a. Ord a => a -> a -> Bool
>= TimeSpec
time TimeSpec -> TimeSpec -> TimeSpec
forall a. Num a => a -> a -> a
+ (Int64 -> Int64 -> TimeSpec
TimeSpec 60 0)) -> StreamingState
state {$sel:attempt:StreamingState :: Int
attempt = 1, $sel:activeSince:StreamingState :: Maybe TimeSpec
activeSince = Maybe TimeSpec
forall a. Maybe a
Nothing}
| Bool
otherwise -> StreamingState
state {$sel:attempt:StreamingState :: Int
attempt = Int
att Int -> Int -> Int
forall a. Num a => a -> a -> a
+ 1}
Nothing -> StreamingState
state {$sel:attempt:StreamingState :: Int
attempt = Int
att Int -> Int -> Int
forall a. Num a => a -> a -> a
+ 1}
where
code :: Int
code = Status -> Int
statusCode (Response () -> Status
forall body. Response body -> Status
responseStatus Response ()
response)
updateState state :: StreamingState
state@(StreamingState {$sel:attempt:StreamingState :: StreamingState -> Int
attempt = Int
att}) _ _ = StreamingState
state {$sel:attempt:StreamingState :: Int
attempt = Int
att Int -> Int -> Int
forall a. Num a => a -> a -> a
+ 1}
calculateDelay :: (MonadIO m, MonadLogger m, MonadMask m) => StreamingState -> m Int
calculateDelay :: StreamingState -> m Int
calculateDelay StreamingState {Int
$sel:initialRetryDelay:StreamingState :: StreamingState -> Int
initialRetryDelay :: Int
initialRetryDelay, $sel:attempt:StreamingState :: StreamingState -> Int
attempt = Int
att} = do
IO Int -> m Int
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> m Int) -> IO Int -> m Int
forall a b. (a -> b) -> a -> b
$
IO StdGen
newStdGen IO StdGen -> (StdGen -> IO Int) -> IO Int
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \gen :: StdGen
gen ->
let timespan :: Int
timespan = Int -> Int -> Int
forall a. Ord a => a -> a -> a
min (30 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1_000_000) ((Int
initialRetryDelay Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1_000) Int -> Int -> Int
forall a. Num a => a -> a -> a
* (2 Int -> Int -> Int
forall a b. (Num a, Integral b) => a -> b -> a
^ (Int
att Int -> Int -> Int
forall a. Num a => a -> a -> a
- 1)))
jitter :: Int
jitter = (Int, StdGen) -> Int
forall a b. (a, b) -> a
fst ((Int, StdGen) -> Int) -> (Int, StdGen) -> Int
forall a b. (a -> b) -> a -> b
$ CharPos -> StdGen -> (Int, StdGen)
forall a g. (Random a, RandomGen g) => (a, a) -> g -> (a, g)
randomR (0, Int
timespan Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` 2) StdGen
gen
in Int -> IO Int
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> IO Int) -> Int -> IO Int
forall a b. (a -> b) -> a -> b
$ (Int
timespan Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
jitter)
data StreamingState = StreamingState
{ StreamingState -> Bool
initialConnection :: Bool
, StreamingState -> Int
initialRetryDelay :: Int
, StreamingState -> Maybe TimeSpec
activeSince :: Maybe TimeSpec
, StreamingState -> Int
attempt :: Int
, StreamingState -> Bool
cancel :: Bool
}
deriving ((forall x. StreamingState -> Rep StreamingState x)
-> (forall x. Rep StreamingState x -> StreamingState)
-> Generic StreamingState
forall x. Rep StreamingState x -> StreamingState
forall x. StreamingState -> Rep StreamingState x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep StreamingState x -> StreamingState
$cfrom :: forall x. StreamingState -> Rep StreamingState x
Generic, Int -> StreamingState -> ShowS
[StreamingState] -> ShowS
StreamingState -> String
(Int -> StreamingState -> ShowS)
-> (StreamingState -> String)
-> ([StreamingState] -> ShowS)
-> Show StreamingState
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [StreamingState] -> ShowS
$cshowList :: [StreamingState] -> ShowS
show :: StreamingState -> String
$cshow :: StreamingState -> String
showsPrec :: Int -> StreamingState -> ShowS
$cshowsPrec :: Int -> StreamingState -> ShowS
Show)
streamingThread :: (MonadIO m, MonadLogger m, MonadMask m) => Text -> Int -> ClientContext -> DataSourceUpdates -> m ()
streamingThread :: Text -> Int -> ClientContext -> DataSourceUpdates -> m ()
streamingThread streamURI :: Text
streamURI initialRetryDelay :: Int
initialRetryDelay clientContext :: ClientContext
clientContext dataSourceUpdates :: DataSourceUpdates
dataSourceUpdates = do
let manager :: Manager
manager = HttpConfiguration -> Manager
tlsManager (HttpConfiguration -> Manager) -> HttpConfiguration -> Manager
forall a b. (a -> b) -> a -> b
$ ClientContext -> HttpConfiguration
httpConfiguration ClientContext
clientContext
Request
req <- HttpConfiguration -> String -> m Request
forall (m :: * -> *).
MonadThrow m =>
HttpConfiguration -> String -> m Request
prepareRequest (ClientContext -> HttpConfiguration
httpConfiguration ClientContext
clientContext) (Text -> String
T.unpack Text
streamURI String -> ShowS
forall a. [a] -> [a] -> [a]
++ "/all")
DataSourceUpdates -> m () -> m ()
forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadCatch m) =>
DataSourceUpdates -> m () -> m ()
handleUnauthorized DataSourceUpdates
dataSourceUpdates (Manager -> Request -> m ()
forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
Manager -> Request -> m ()
processStream Manager
manager Request
req)
where
processStream :: (MonadIO m, MonadLogger m, MonadMask m) => Manager -> Request -> m ()
processStream :: Manager -> Request -> m ()
processStream manager :: Manager
manager req :: Request
req = do
m StreamingState -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m StreamingState -> m ()) -> m StreamingState -> m ()
forall a b. (a -> b) -> a -> b
$ (StreamingState -> Bool)
-> (StreamingState -> m StreamingState)
-> StreamingState
-> m StreamingState
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> (a -> m a) -> a -> m a
iterateUntilM (StreamingState -> Bool
cancel) (\state :: StreamingState
state -> Manager
-> Request
-> DataSourceUpdates
-> StreamingState
-> m StreamingState
forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
Manager
-> Request
-> DataSourceUpdates
-> StreamingState
-> m StreamingState
startNewConnection Manager
manager Request
req DataSourceUpdates
dataSourceUpdates StreamingState
state) StreamingState :: Bool -> Int -> Maybe TimeSpec -> Int -> Bool -> StreamingState
StreamingState {$sel:initialConnection:StreamingState :: Bool
initialConnection = Bool
True, Int
initialRetryDelay :: Int
$sel:initialRetryDelay:StreamingState :: Int
initialRetryDelay, $sel:activeSince:StreamingState :: Maybe TimeSpec
activeSince = Maybe TimeSpec
forall a. Maybe a
Nothing, $sel:attempt:StreamingState :: Int
attempt = 0, $sel:cancel:StreamingState :: Bool
cancel = Bool
False}