-- Copyright 2013 Evan Laforge
-- This program is distributed under the terms of the GNU General Public
-- License 3.0, see COPYING or http://www.gnu.org/licenses/gpl-3.0.txt

module Util.Thread (
    start, startLogged
    , Seconds, delay
    , timeout
    -- * Flag
    , Flag, flag, set, wait, poll
    -- * timing
    , force, timeAction, timeActionText
    , printTimer, printTimer_, printTimerVal
    , currentCpu
    -- * map concurrent
    , forCpu_
    -- * Metric
    , Metric(..), metric, diffMetric, showMetric
) where
import qualified Control.Concurrent as Concurrent
import qualified Control.Concurrent.Async as Async
import qualified Control.Concurrent.QSem as QSem
import qualified Control.Concurrent.STM as STM
import qualified Control.DeepSeq as DeepSeq
import qualified Control.Exception as Exception
import qualified Control.Monad.Trans as Trans

import qualified Data.Text as Text
import           Data.Text (Text)
import qualified Data.Text.IO as Text.IO
import qualified Data.Time as Time

import qualified GHC.Conc as Conc
import qualified System.CPUTime as CPUTime
import qualified System.IO as IO
import qualified System.Timeout as Timeout

import qualified Text.Printf as Printf

import qualified Util.Log as Log


-- | Start a noisy thread that will log when it starts and stops, and warn if
-- it dies from an exception.
startLogged :: String -> IO () -> IO Concurrent.ThreadId
startLogged :: String -> IO () -> IO ThreadId
startLogged String
name IO ()
thread = do
    ThreadId
threadId <- IO ThreadId
Concurrent.myThreadId
    ThreadId -> String -> IO ()
Conc.labelThread ThreadId
threadId String
name
    let threadName :: Text
threadName = String -> Text
Text.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ ThreadId -> String
forall a. Show a => a -> String
show ThreadId
threadId String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
" " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
name String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
": "
    Text -> IO ()
forall (m :: * -> *). (Stack, LogMonad m) => Text -> m ()
Log.debug (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
threadName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"started"
    IO () -> (Either SomeException () -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
Concurrent.forkFinally IO ()
thread ((Either SomeException () -> IO ()) -> IO ThreadId)
-> (Either SomeException () -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \Either SomeException ()
result -> case Either SomeException ()
result of
        Right () -> Text -> IO ()
forall (m :: * -> *). (Stack, LogMonad m) => Text -> m ()
Log.debug (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
threadName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"completed"
        Left SomeException
err -> Text -> IO ()
forall (m :: * -> *). (Stack, LogMonad m) => Text -> m ()
Log.warn (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
threadName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"died: "
            Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
Text.pack (SomeException -> String
forall a. Show a => a -> String
show (SomeException
err :: Exception.SomeException))

start :: IO () -> IO Concurrent.ThreadId
start :: IO () -> IO ThreadId
start = IO () -> IO ThreadId
Concurrent.forkIO

-- | This is just NominalDiffTime, but with a name I might remember.
type Seconds = Time.NominalDiffTime

-- | Delay in seconds.
delay :: Seconds -> IO ()
delay :: Seconds -> IO ()
delay = Int -> IO ()
Concurrent.threadDelay (Int -> IO ()) -> (Seconds -> Int) -> Seconds -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Seconds -> Int
toUsec

timeout :: Seconds -> IO a -> IO (Maybe a)
timeout :: forall a. Seconds -> IO a -> IO (Maybe a)
timeout = Int -> IO a -> IO (Maybe a)
forall a. Int -> IO a -> IO (Maybe a)
Timeout.timeout (Int -> IO a -> IO (Maybe a))
-> (Seconds -> Int) -> Seconds -> IO a -> IO (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Seconds -> Int
toUsec

toUsec :: Seconds -> Int
toUsec :: Seconds -> Int
toUsec = Seconds -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round (Seconds -> Int) -> (Seconds -> Seconds) -> Seconds -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Seconds -> Seconds -> Seconds
forall a. Num a => a -> a -> a
*Seconds
1000000)

-- * Flag

-- | A Flag starts False, and can eventually become True.  It never goes back
-- to False again.
newtype Flag = Flag (STM.TVar Bool)
    deriving (Flag -> Flag -> Bool
(Flag -> Flag -> Bool) -> (Flag -> Flag -> Bool) -> Eq Flag
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Flag -> Flag -> Bool
$c/= :: Flag -> Flag -> Bool
== :: Flag -> Flag -> Bool
$c== :: Flag -> Flag -> Bool
Eq)

instance Show Flag where show :: Flag -> String
show Flag
_ = String
"((Flag))"

flag :: IO Flag
flag :: IO Flag
flag = TVar Bool -> Flag
Flag (TVar Bool -> Flag) -> IO (TVar Bool) -> IO Flag
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
STM.newTVarIO Bool
False

set :: Flag -> IO ()
set :: Flag -> IO ()
set (Flag TVar Bool
var) = STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar TVar Bool
var Bool
True

-- | Wait a finite amount of time for the flag to become true.
poll :: Seconds -> Flag -> IO Bool
poll :: Seconds -> Flag -> IO Bool
poll Seconds
time (Flag TVar Bool
var)
    | Seconds
time Seconds -> Seconds -> Bool
forall a. Ord a => a -> a -> Bool
<= Seconds
0 = TVar Bool -> IO Bool
forall a. TVar a -> IO a
STM.readTVarIO TVar Bool
var
    | Bool
otherwise = Bool -> (() -> Bool) -> Maybe () -> Bool
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Bool
False (Bool -> () -> Bool
forall a b. a -> b -> a
const Bool
True) (Maybe () -> Bool) -> IO (Maybe ()) -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Seconds -> IO () -> IO (Maybe ())
forall a. Seconds -> IO a -> IO (Maybe a)
timeout Seconds
time (Flag -> IO ()
wait (TVar Bool -> Flag
Flag TVar Bool
var))

-- | Wait until the flag becomes true.
wait :: Flag -> IO ()
wait :: Flag -> IO ()
wait (Flag TVar Bool
var) = STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    Bool
val <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
STM.readTVar TVar Bool
var
    if Bool
val then () -> STM ()
forall (m :: * -> *) a. Monad m => a -> m a
return () else STM ()
forall a. STM a
STM.retry

-- * timing

force :: DeepSeq.NFData a => a -> IO ()
force :: forall a. NFData a => a -> IO ()
force a
x = () -> IO ()
forall a. a -> IO a
Exception.evaluate (a -> ()
forall a. NFData a => a -> ()
DeepSeq.rnf a
x)

-- | Time an IO action in CPU and wall seconds.  Technically not thread
-- related, but I don't have a better place at the moment.
timeAction :: Trans.MonadIO m => m a -> m (a, Metric Seconds)
timeAction :: forall (m :: * -> *) a. MonadIO m => m a -> m (a, Metric Seconds)
timeAction m a
action = do
    Metric UTCTime
start <- IO (Metric UTCTime) -> m (Metric UTCTime)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
Trans.liftIO IO (Metric UTCTime)
metric
    !a
val <- m a
action
    Metric UTCTime
end <- IO (Metric UTCTime) -> m (Metric UTCTime)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
Trans.liftIO IO (Metric UTCTime)
metric
    (a, Metric Seconds) -> m (a, Metric Seconds)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
val, Metric UTCTime -> Metric UTCTime -> Metric Seconds
diffMetric Metric UTCTime
start Metric UTCTime
end)

-- | Like 'timeAction', but return a Text msg instead of the values.
timeActionText :: Trans.MonadIO m => m a -> m (a, Text)
timeActionText :: forall (m :: * -> *) a. MonadIO m => m a -> m (a, Text)
timeActionText = ((a, Metric Seconds) -> (a, Text))
-> m (a, Metric Seconds) -> m (a, Text)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((Metric Seconds -> Text) -> (a, Metric Seconds) -> (a, Text)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Metric Seconds -> Text
showMetric) (m (a, Metric Seconds) -> m (a, Text))
-> (m a -> m (a, Metric Seconds)) -> m a -> m (a, Text)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m a -> m (a, Metric Seconds)
forall (m :: * -> *) a. MonadIO m => m a -> m (a, Metric Seconds)
timeAction

cpuToSec :: Integer -> Seconds
cpuToSec :: Integer -> Seconds
cpuToSec Integer
s = Integer -> Seconds
forall a b. (Integral a, Num b) => a -> b
fromIntegral Integer
s Seconds -> Seconds -> Seconds
forall a. Fractional a => a -> a -> a
/ Integer -> Seconds
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Integer
10Integer -> Integer -> Integer
forall a b. (Num a, Integral b) => a -> b -> a
^Integer
12)

printTimer :: Text -> (a -> String) -> IO a -> IO a
printTimer :: forall a. Text -> (a -> String) -> IO a -> IO a
printTimer Text
msg a -> String
showVal IO a
action = do
    Text -> IO ()
Text.IO.putStr (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
msg Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" - "
    Handle -> IO ()
IO.hFlush Handle
IO.stdout
    Either SomeException (a, Text)
result <- IO (a, Text) -> IO (Either SomeException (a, Text))
forall e a. Exception e => IO a -> IO (Either e a)
Exception.try (IO (a, Text) -> IO (Either SomeException (a, Text)))
-> IO (a, Text) -> IO (Either SomeException (a, Text))
forall a b. (a -> b) -> a -> b
$ IO a -> IO (a, Text)
forall (m :: * -> *) a. MonadIO m => m a -> m (a, Text)
timeActionText (IO a -> IO (a, Text)) -> IO a -> IO (a, Text)
forall a b. (a -> b) -> a -> b
$ do
        !a
val <- IO a
action
        a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
val
    case Either SomeException (a, Text)
result of
        Right (a
val, Text
msg) -> do
            Text -> IO ()
Text.IO.putStrLn (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$
                Text
"time: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
msg Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" - " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
Text.pack (a -> String
showVal a
val)
            a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
val
        Left (SomeException
exc :: Exception.SomeException) -> do
            -- Complete the line so the exception doesn't interrupt it.  This
            -- is important if it's a 'failure' line!
            String -> IO ()
putStrLn (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"threw exception: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> SomeException -> String
forall a. Show a => a -> String
show SomeException
exc
            SomeException -> IO a
forall e a. Exception e => e -> IO a
Exception.throwIO SomeException
exc

printTimer_ :: Trans.MonadIO m => Text -> m a -> m a
printTimer_ :: forall (m :: * -> *) a. MonadIO m => Text -> m a -> m a
printTimer_ Text
msg m a
action = do
    (a
a, Metric Seconds
metric) <- m a -> m (a, Metric Seconds)
forall (m :: * -> *) a. MonadIO m => m a -> m (a, Metric Seconds)
timeAction m a
action
    IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
Trans.liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Handle -> Text -> IO ()
Text.IO.hPutStrLn Handle
IO.stderr (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$
        Text
msg Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
": " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Metric Seconds -> Text
showMetric Metric Seconds
metric
    a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a

printTimerVal :: (DeepSeq.NFData a, Trans.MonadIO m) => Text -> a -> m a
printTimerVal :: forall a (m :: * -> *). (NFData a, MonadIO m) => Text -> a -> m a
printTimerVal Text
msg a
val = Text -> m a -> m a
forall (m :: * -> *) a. MonadIO m => Text -> m a -> m a
printTimer_ Text
msg (m a -> m a) -> m a -> m a
forall a b. (a -> b) -> a -> b
$ a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> m a) -> a -> m a
forall a b. (a -> b) -> a -> b
$ a -> ()
forall a. NFData a => a -> ()
DeepSeq.rnf a
val () -> a -> a
`seq` a
val

currentCpu :: IO Seconds
currentCpu :: IO Seconds
currentCpu = Integer -> Seconds
cpuToSec (Integer -> Seconds) -> IO Integer -> IO Seconds
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO Integer
CPUTime.getCPUTime

toSecs :: Seconds -> Double
toSecs :: Seconds -> Double
toSecs = Seconds -> Double
forall a b. (Real a, Fractional b) => a -> b
realToFrac

-- * concurrent map

forCpu_ :: [a] -> (a -> IO b) -> IO ()
forCpu_ :: forall a b. [a] -> (a -> IO b) -> IO ()
forCpu_ [a]
xs a -> IO b
f = do
    QSem
sem <- Int -> IO QSem
QSem.newQSem (Int -> IO QSem) -> IO Int -> IO QSem
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO Int
Concurrent.getNumCapabilities
    [a] -> (a -> IO b) -> IO ()
forall (f :: * -> *) a b. Foldable f => f a -> (a -> IO b) -> IO ()
Async.forConcurrently_ [a]
xs ((a -> IO b) -> IO ()) -> (a -> IO b) -> IO ()
forall a b. (a -> b) -> a -> b
$ \a
x ->
        IO () -> IO () -> IO b -> IO b
forall a b c. IO a -> IO b -> IO c -> IO c
Exception.bracket_ (QSem -> IO ()
QSem.waitQSem QSem
sem) (QSem -> IO ()
QSem.signalQSem QSem
sem) (a -> IO b
f a
x)

-- * Metric

data Metric time = Metric {
    forall time. Metric time -> Seconds
metricCpu :: Seconds
    , forall time. Metric time -> time
metricWall :: time
    } deriving (Int -> Metric time -> String -> String
[Metric time] -> String -> String
Metric time -> String
(Int -> Metric time -> String -> String)
-> (Metric time -> String)
-> ([Metric time] -> String -> String)
-> Show (Metric time)
forall time. Show time => Int -> Metric time -> String -> String
forall time. Show time => [Metric time] -> String -> String
forall time. Show time => Metric time -> String
forall a.
(Int -> a -> String -> String)
-> (a -> String) -> ([a] -> String -> String) -> Show a
showList :: [Metric time] -> String -> String
$cshowList :: forall time. Show time => [Metric time] -> String -> String
show :: Metric time -> String
$cshow :: forall time. Show time => Metric time -> String
showsPrec :: Int -> Metric time -> String -> String
$cshowsPrec :: forall time. Show time => Int -> Metric time -> String -> String
Show)

metric :: IO (Metric Time.UTCTime)
metric :: IO (Metric UTCTime)
metric = Seconds -> UTCTime -> Metric UTCTime
forall time. Seconds -> time -> Metric time
Metric (Seconds -> UTCTime -> Metric UTCTime)
-> IO Seconds -> IO (UTCTime -> Metric UTCTime)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO Seconds
currentCpu IO (UTCTime -> Metric UTCTime) -> IO UTCTime -> IO (Metric UTCTime)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> IO UTCTime
Time.getCurrentTime

diffMetric :: Metric Time.UTCTime -> Metric Time.UTCTime -> Metric Seconds
diffMetric :: Metric UTCTime -> Metric UTCTime -> Metric Seconds
diffMetric (Metric Seconds
cpu1 UTCTime
time1) (Metric Seconds
cpu2 UTCTime
time2) =
    Seconds -> Seconds -> Metric Seconds
forall time. Seconds -> time -> Metric time
Metric (Seconds
cpu2Seconds -> Seconds -> Seconds
forall a. Num a => a -> a -> a
-Seconds
cpu1) (UTCTime
time2 UTCTime -> UTCTime -> Seconds
`Time.diffUTCTime` UTCTime
time1)

showMetric :: Metric Seconds -> Text
showMetric :: Metric Seconds -> Text
showMetric (Metric Seconds
cpu Seconds
wall) =
    String -> Text
Text.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ String -> Double -> Double -> String
forall r. PrintfType r => String -> r
Printf.printf String
"%.2f cpu / %.2fs" (Seconds -> Double
toSecs Seconds
cpu) (Seconds -> Double
toSecs Seconds
wall)