更新時間:2023-04-14 來源:黑馬程序員 瀏覽量:
Kafka的消息存儲在磁盤中,為了控制磁盤占用空間,Kafka需要不斷地對過去的一些消息進(jìn)行清理工作。Kafka的每個分區(qū)都有很多的日志文件,這樣也是為了方便進(jìn)行日志的清理。在Kafka中,提供日志清理和日志壓縮兩種方式:
日志刪除(Log Deletion):按照指定的策略直接刪除不符合條件的日志。
日志壓縮(Log Compaction):按照消息的key進(jìn)行整合,有相同key的但有不同value值,只保留最后一個版本。
在Kafka的broker或topic配置中:
以段(segment日志)為單位來進(jìn)行定期清理的。本節(jié)我們來詳細(xì)介紹Kafka數(shù)據(jù)處理中的日志刪除和日志壓縮。
定時日志刪除任務(wù)
Kafka日志管理器中會有一個專門的日志刪除任務(wù)來定期檢測和刪除不符合保留條件的日志分段文件,這個周期可以通過broker端參數(shù)log.retention.check.interval.ms來配置,默認(rèn)值為300,000,即5分鐘。當(dāng)前日志分段的保留策略有3種:1. 基于時間的保留策略
2. 基于日志大小的保留策略
3. 基于日志起始偏移量的保留策略
基于時間的保留策略
以下三種配置可以指定如果Kafka中的消息超過指定的閾值,就會將日志進(jìn)行自動清理:
- log.retention.hours
- log.retention.minutes
- log.retention.ms
其中,優(yōu)先級為 log.retention.ms > log.retention.minutes > log.retention.hours。默認(rèn)情況,在broker中,配置如下:
log.retention.hours=168
也就是,默認(rèn)日志的保留時間為168小時,相當(dāng)于保留7天。
刪除日志分段時:
1. 從日志文件對象中所維護(hù)日志分段的跳躍表中移除待刪除的日志分段,以保證沒有線程對這些日志分段進(jìn)行讀取操作
2. 將日志分段文件添加上“.deleted”的后綴(也包括日志分段對應(yīng)的索引文件)
3. Kafka的后臺定時任務(wù)會定期刪除這些“.deleted”為后綴的文件,這個任務(wù)的延遲執(zhí)行時間可以通過file.delete.delay.ms參數(shù)來設(shè)置,默認(rèn)值為60000,即1分鐘。
設(shè)置topic 5秒刪除一次
1. 為了方便觀察,設(shè)置段文件的大小為1M。
key: segment.bytes
value: 1048576
1. 設(shè)置topic的刪除策略
key: retention.ms
value: 5000
嘗試往topic中添加一些數(shù)據(jù),等待一會,觀察日志的刪除情況。我們發(fā)現(xiàn),日志會定期被標(biāo)記為刪除,然后被刪除。
基于日志大小的保留策略
日志刪除任務(wù)會檢查當(dāng)前日志的大小是否超過設(shè)定的閾值來尋找可刪除的日志分段的文件集合??梢酝ㄟ^broker端參數(shù) log.retention.bytes 來配置,默認(rèn)值為-1,表示無窮大。如果超過該大小,會自動將超出部分刪除。
注意:
log.retention.bytes 配置的是日志文件的總大小,而不是單個的日志分段的大小,一個日志文件包含多個日志分段。
基于日志起始偏移量保留策略
每個segment日志都有它的起始偏移量,如果起始偏移量小于 logStartOffset,那么這些日志文件將會標(biāo)記為刪
日志壓縮(Log Compaction)
Log Compaction是默認(rèn)的日志刪除之外的清理過時數(shù)據(jù)的方式。它會將相同的key對應(yīng)的數(shù)據(jù)只保留一個版本。
Log Compaction執(zhí)行后,offset將不再連續(xù),但依然可以查詢Segment。
Log Compaction執(zhí)行前后,日志分段中的每條消息偏移量保持不變。Log Compaction會生成一個新的Segment文件。
Log Compaction是針對key的,在使用的時候注意每個消息的key不為空。
基于Log Compaction可以保留key的最新更新,可以基于Log Compaction來恢復(fù)消費(fèi)者的最新狀態(tài)。