Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka: auto trim CheckpointedOffsetStorage #1796

Merged
merged 1 commit into from
Apr 22, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
kafka: auto trim CheckpointedOffsetStorage
  • Loading branch information
4eUeP committed Apr 19, 2024
commit ca45dfa49123d19d7b9359e6cec56940eafc7985
53 changes: 34 additions & 19 deletions hstream-kafka/HStream/Kafka/Group/OffsetsStore.hs
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
module HStream.Kafka.Group.OffsetsStore
( OffsetStorage(..)
, mkCkpOffsetStorage
)
where

import Control.Monad (unless)
import Data.Bifunctor (bimap)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import qualified Data.Text as T
import Data.Word (Word64)
import qualified HStream.Logger as Log
import qualified HStream.Store as S
import HStream.Utils (textToCBytes)
, deleteCkpOffsetStorage
) where

import Control.Monad (unless)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import qualified Data.Text as T
import Data.Word (Word64)

import HStream.Base.Timer (CompactedWorker, startCompactedWorker,
stopCompactedWorker,
triggerCompactedWorker)
import qualified HStream.Logger as Log
import qualified HStream.Store as S
import HStream.Utils (textToCBytes)

type LogID = Word64
type LSN = Word64
Expand All @@ -24,10 +27,11 @@ class OffsetStorage s where
--------------------------------------------------------------------------------

data CkpOffsetStorage = CkpOffsetStorage
{ ckpStore :: S.LDCheckpointStore
, ckpStoreName :: T.Text
, ckpStoreId :: Word64
{ ckpStore :: !S.LDCheckpointStore
, ckpStoreName :: !T.Text
, ckpStoreId :: !Word64
-- ^ __consumer_offsets logID
, trimCkpWorker :: !CompactedWorker
}

mkCkpOffsetStorage :: S.LDClient -> T.Text -> IO CkpOffsetStorage
Expand All @@ -38,13 +42,24 @@ mkCkpOffsetStorage client ckpStoreName = do
S.initOffsetCheckpointDir client logAttrs
ckpStoreId <- S.allocOffsetCheckpointId client cbGroupName
ckpStore <- S.newRSMBasedCheckpointStore client ckpStoreId 5000
Log.info $ "mkCkpOffsetStorage with name: " <> Log.build ckpStoreName <> ", storeId: " <> Log.build ckpStoreId
Log.info $ "mkCkpOffsetStorage with name: " <> Log.build ckpStoreName
<> ", storeId: " <> Log.build ckpStoreId
trimCkpWorker <- startCompactedWorker (60 * 1000000){- 60s -} $ do
Log.debug $ "Compacting checkpoint store of " <> Log.build ckpStoreName
S.trimLastBefore 1 client ckpStoreId
return CkpOffsetStorage{..}

-- FIXME: there may other resources(in memory or...) need to be released
deleteCkpOffsetStorage :: S.LDClient -> CkpOffsetStorage -> IO ()
deleteCkpOffsetStorage ldclient CkpOffsetStorage{..} = do
stopCompactedWorker trimCkpWorker
S.freeOffsetCheckpointId ldclient (textToCBytes ckpStoreName)

instance OffsetStorage CkpOffsetStorage where
commitOffsets CkpOffsetStorage{..} offsetsKey offsets = do
unless (Map.null offsets) $ do
S.ckpStoreUpdateMultiLSN ckpStore (textToCBytes offsetsKey) offsets
loadOffsets CkpOffsetStorage{..} offsetKey = do
checkpoints <- S.ckpStoreGetAllCheckpoints' ckpStore (textToCBytes offsetKey)
return . Map.fromList $ map (bimap fromIntegral fromIntegral) checkpoints
triggerCompactedWorker trimCkpWorker

loadOffsets CkpOffsetStorage{..} offsetKey =
Map.fromList <$> S.ckpStoreGetAllCheckpoints' ckpStore (textToCBytes offsetKey)
Loading