本篇文章給大家分享的是有關Flume架構中如何進行MemoryChannel事務實現,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
創新互聯長期為1000多家客戶提供的網站建設服務,團隊從業經驗10年,關注不同地域、不同群體,并針對不同對象提供差異化的產品和服務;打造開放共贏平臺,與合作伙伴共同營造健康的互聯網生態環境。為武義企業提供專業的成都網站建設、成都網站設計,武義網站改版等技術服務。擁有十年豐富建站經驗和眾多成功案例,為您定制開發。
Flume提供了可靠地日志采集功能,其高可靠是通過事務機制實現的。而對于Channel的事務我們本部分會介紹MemoryChannel和FileChannel的實現。
首先我們看下BasicChannelSemantics實現:
public abstract class BasicChannelSemantics extends AbstractChannel {
//1、事務使用ThreadLocal存儲,保證事務線程安全
private ThreadLocal<BasicTransactionSemantics> currentTransaction
= new ThreadLocal<BasicTransactionSemantics>();
private boolean initialized = false;
//2、進行一些初始化工作
protected void initialize() {}
//3、提供給實現類的創建事務的回調
protected abstract BasicTransactionSemantics createTransaction();
//4、往Channel放Event,其直接委托給事務的put方法實現
@Override
public void put(Event event) throws ChannelException {
BasicTransactionSemantics transaction = currentTransaction.get();
Preconditions.checkState(transaction != null,
"No transaction exists for this thread");
transaction.put(event);
}
//5、從Channel獲取Event,也是直接委托給事務的take方法實現
@Override
public Event take() throws ChannelException {
BasicTransactionSemantics transaction = currentTransaction.get();
Preconditions.checkState(transaction != null,
"No transaction exists for this thread");
return transaction.take();
}
//6、獲取事務,如果本實例沒有初始化則先初始化;否則先從ThreadLocal獲取事務,如果沒有或者關閉了則創建一個并綁定到ThreadLocal。
@Override
public Transaction getTransaction() {
if (!initialized) {
synchronized (this) {
if (!initialized) {
initialize();
initialized = true;
}
}
}
BasicTransactionSemantics transaction = currentTransaction.get();
if (transaction == null || transaction.getState().equals(
BasicTransactionSemantics.State.CLOSED)) {
transaction = createTransaction();
currentTransaction.set(transaction);
}
return transaction;
}
}首先我們來看下MemoryChannel的實現,其是一個純內存的Channel實現,整個事務操作都是在內存中完成。首先看下其內存結構:

1、首先由一個Channel Queue用于存儲整個Channel的Event數據;
2、每個事務都有一個Take Queue和Put Queue分別用于存儲事務相關的取數據和放數據,等事務提交時才完全同步到Channel Queue,或者失敗把取數據回滾到Channel Queue。
MemoryChannel時設計時考慮了兩個容量:Channel Queue容量和事務容量,而這兩個容量涉及到了數量容量和字節數容量。
另外因為多個事務要操作Channel Queue,還要考慮Channel Queue的動態擴容問題,因此MemoryChannel使用了鎖來實現;而容量問題則使用了信號量來實現。
在configure方法中進行了一些參數的初始化,如容量、Channel Queue等。首先看下Channel Queue的容量是如何計算的:
try {
capacity = context.getInteger("capacity", defaultCapacity);
} catch(NumberFormatException e) {
capacity = defaultCapacity;
}
if (capacity <= 0) {
capacity = defaultCapacity;
}即首先從配置文件讀取數量容量,如果沒有配置則是默認容量(默認100),而配置的容量小于等于0,則也是默認容量。
接下來是初始化事務數量容量:
try {
transCapacity = context.getInteger("transactionCapacity", defaultTransCapacity);
} catch(NumberFormatException e) {
transCapacity = defaultTransCapacity;
}
if (transCapacity <= 0) {
transCapacity = defaultTransCapacity;
}
Preconditions.checkState(transCapacity <= capacity,
"Transaction Capacity of Memory Channel cannot be higher than " +
"the capacity.");整個過程和Channel Queue數量容量初始化類似,但是最后做了前置條件判斷,事務容量必須小于等于Channel Queue容量。
接下來是字節容量限制:
try {
byteCapacityBufferPercentage = context.getInteger("byteCapacityBufferPercentage", defaultByteCapacityBufferPercentage);
} catch(NumberFormatException e) {
byteCapacityBufferPercentage = defaultByteCapacityBufferPercentage;
}
try {
byteCapacity = (int)((context.getLong("byteCapacity", defaultByteCapacity).longValue() * (1 - byteCapacityBufferPercentage * .01 )) /byteCapacitySlotSize);
if (byteCapacity < 1) {
byteCapacity = Integer.MAX_VALUE;
}
} catch(NumberFormatException e) {
byteCapacity = (int)((defaultByteCapacity * (1 - byteCapacityBufferPercentage * .01 )) /byteCapacitySlotSize);
}byteCapacityBufferPercentage:用來確定byteCapacity的一個百分比參數,即我們定義的字節容量和實際事件容量的百分比,因為我們定義的字節容量主要考慮Event body,而忽略Event header,因此需要減去Event header部分的內存占用,可以認為該參數定義了Event header占了實際字節容量的百分比,默認20%;
byteCapacity:首先讀取配置文件定義的byteCapacity,如果沒有定義,則使用默認defaultByteCapacity,而defaultByteCapacity默認是JVM物理內存的80%(Runtime.getRuntime().maxMemory() * .80);那么實際byteCapacity=定義的byteCapacity * (1- Event header百分比)/ byteCapacitySlotSize;byteCapacitySlotSize默認100,即計算百分比的一個系數。
接下來定義keepAlive參數:
try {
keepAlive = context.getInteger("keep-alive", defaultKeepAlive);
} catch(NumberFormatException e) {
keepAlive = defaultKeepAlive;
}keepAlive定義了操作Channel Queue的等待超時事件,默認3s。
接著初始化Channel Queue:
if(queue != null) {
try {
resizeQueue(capacity);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} else {
synchronized(queueLock) {
queue = new LinkedBlockingDeque<Event>(capacity);
queueRemaining = new Semaphore(capacity);
queueStored = new Semaphore(0);
}
}首先如果Channel Queue不為null,表示動態擴容;否則進行Channel Queue的創建。
首先看下首次創建Channel Queue,首先使用queueLock鎖定,即在操作Channel Queue時都需要鎖定,因為之前說過Channel Queue可能動態擴容,然后初始化信號量:Channel Queue剩余容量和向Channel Queue申請存儲的容量,用于事務操作中預占Channel Queue容量。
接著是調用resizeQueue動態擴容:
private void resizeQueue(int capacity) throws InterruptedException {
int oldCapacity;
synchronized(queueLock) { //首先計算擴容前的Channel Queue的容量
oldCapacity = queue.size() + queue.remainingCapacity();
}
if(oldCapacity == capacity) {//如果新容量和老容量相等,不需要擴容
return;
} else if (oldCapacity > capacity) {//如果老容量大于新容量,縮容
//首先要預占老容量-新容量的大小,以便縮容容量
if(!queueRemaining.tryAcquire(oldCapacity - capacity, keepAlive, TimeUnit.SECONDS)) {
//如果獲取失敗,默認是記錄日志然后忽略
} else {
//否則,直接縮容,然后復制老Queue的數據,縮容時需要鎖定queueLock,因為這一系列操作要線程安全
synchronized(queueLock) {
LinkedBlockingDeque<Event> newQueue = new LinkedBlockingDeque<Event>(capacity);
newQueue.addAll(queue);
queue = newQueue;
}
}
} else {
//如果不是縮容,則直接擴容即可
synchronized(queueLock) {
LinkedBlockingDeque<Event> newQueue = new LinkedBlockingDeque<Event>(capacity);
newQueue.addAll(queue);
queue = newQueue;
}
//增加/減少Channel Queue的新的容量
queueRemaining.release(capacity - oldCapacity);
}
}
到此,整個Channel Queue相關的數據初始化完畢,接著會調用start方法進行初始化:
public synchronized void start() {
channelCounter.start();
channelCounter.setChannelSize(queue.size());
channelCounter.setChannelCapacity(Long.valueOf(
queue.size() + queue.remainingCapacity()));
super.start();
}此處初始化了一個ChannelCounter,是一個計數器,記錄如當前隊列放入Event數、取出Event數、成功數等。
之前已經分析了大部分Channel會把put和take直接委托給事務去完成,因此接下來看下MemoryTransaction的實現。
首先看下MemoryTransaction的初始化:
private class MemoryTransaction extends BasicTransactionSemantics {
private LinkedBlockingDeque<Event> takeList;
private LinkedBlockingDeque<Event> putList;
private final ChannelCounter channelCounter;
private int putByteCounter = 0;
private int takeByteCounter = 0;
public MemoryTransaction(int transCapacity, ChannelCounter counter) {
putList = new LinkedBlockingDeque<Event>(transCapacity);
takeList = new LinkedBlockingDeque<Event>(transCapacity);
channelCounter = counter;
}可以看出MemoryTransaction涉及到兩個事務容量大小定義的隊列(鏈表阻塞隊列)、隊列字節計數器、另外一個是Channel操作的計數器。
事務中的放入操作如下:
protected void doPut(Event event) throws InterruptedException {
//1、增加放入事件計數器
channelCounter.incrementEventPutAttemptCount();
//2、estimateEventSize計算當前Event body大小
int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);
//3、往事務隊列的putList中放入Event,如果滿了,則拋異常回滾事務
if (!putList.offer(event)) {
throw new ChannelException(
"Put queue for MemoryTransaction of capacity " +
putList.size() + " full, consider committing more frequently, " +
"increasing capacity or increasing thread count");
}
//4、增加放入隊列字節數計數器
putByteCounter += eventByteSize;
}整個doPut操作相對來說比較簡單,就是往事務putList隊列放入Event,如果滿了則直接拋異常回滾事務;否則放入putList暫存,等事務提交時轉移到Channel Queue。另外需要增加放入隊列的字節數計數器,以便之后做字節容量限制。
接下來是事務中的取出操作:
protected Event doTake() throws InterruptedException {
//1、增加取出事件計數器
channelCounter.incrementEventTakeAttemptCount();
//2、如果takeList隊列沒有剩余容量,即當前事務已經消費了最大容量的Event
if(takeList.remainingCapacity() == 0) {
throw new ChannelException("Take list for MemoryTransaction, capacity " +
takeList.size() + " full, consider committing more frequently, " +
"increasing capacity, or increasing thread count");
}
//3、queueStored試圖獲取一個信號量,超時直接返回null
if(!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
return null;
}
//4、從Channel Queue獲取一個Event
Event event;
synchronized(queueLock) {//對Channel Queue的操作必須加queueLock,因為之前說的動態擴容問題
event = queue.poll();
}
//5、因為信號量的保證,Channel Queue不應該返回null,出現了就不正常了
Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " +
"signalling existence of entry");
//6、暫存到事務的takeList隊列
takeList.put(event);
//7、計算當前Event body大小并增加取出隊列字節數計數器
int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);
takeByteCounter += eventByteSize;
return event;
}接下來是提交事務:
protected void doCommit() throws InterruptedException {
//1、計算改變的Event數量,即取出數量-放入數量;如果放入的多,那么改變的Event數量將是負數
int remainingChange = takeList.size() - putList.size();
//2、 如果remainingChange小于0,則需要獲取Channel Queue剩余容量的信號量
if(remainingChange < 0) {
//2.1、首先獲取putByteCounter個字節容量信號量,如果失敗說明超過字節容量限制了,回滾事務
if(!bytesRemaining.tryAcquire(putByteCounter, keepAlive, TimeUnit.SECONDS)) {
throw new ChannelException("Cannot commit transaction. Byte capacity " +
"allocated to store event body " + byteCapacity * byteCapacitySlotSize +
"reached. Please increase heap space/byte capacity allocated to " +
"the channel as the sinks may not be keeping up with the sources");
}
//2.2、獲取Channel Queue的-remainingChange個信號量用于放入-remainingChange個Event,如果獲取不到,則釋放putByteCounter個字節容量信號量,并拋出異常回滾事務
if(!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {
bytesRemaining.release(putByteCounter);
throw new ChannelFullException("Space for commit to queue couldn't be acquired." +
" Sinks are likely not keeping up with sources, or the buffer size is too tight");
}
}
int puts = putList.size();
int takes = takeList.size();
synchronized(queueLock) {//操作Channel Queue時一定要鎖定queueLock
if(puts > 0 ) {
while(!putList.isEmpty()) { //3.1、如果有Event,則循環放入Channel Queue
if(!queue.offer(putList.removeFirst())) {
//3.2、如果放入Channel Queue失敗了,說明信號量控制出問題了,這種情況不應該發生
throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
}
}
}
//4、操作成功后,清空putList和takeList隊列
putList.clear();
takeList.clear();
}
//5.1、釋放takeByteCounter個字節容量信號量
bytesRemaining.release(takeByteCounter);
//5.2、重置字節計數器
takeByteCounter = 0;
putByteCounter = 0;
//5.3、釋放puts個queueStored信號量,這樣doTake方法就可以獲取數據了
queueStored.release(puts);
//5.4、釋放remainingChange個queueRemaining信號量
if(remainingChange > 0) {
queueRemaining.release(remainingChange);
}
//6、ChannelCounter一些數據計數
if (puts > 0) {
channelCounter.addToEventPutSuccessCount(puts);
}
if (takes > 0) {
channelCounter.addToEventTakeSuccessCount(takes);
}
channelCounter.setChannelSize(queue.size());
}此處涉及到兩個信號量:
queueStored表示Channel Queue已存儲事件容量(已存儲的事件數量),隊列取出事件時-1,放入事件成功時+N,取出失敗時-N,即Channel Queue存儲了多少事件。queueStored信號量默認為0。當doTake取出Event時減少一個queueStored信號量,當doCommit提交事務時需要增加putList 隊列大小的queueStored信號量,當doRollback回滾事務時需要減少takeList隊列大小的queueStored信號量。
queueRemaining表示Channel Queue可存儲事件容量(可存儲的事件數量),取出事件成功時+N,放入事件成功時-N。queueRemaining信號量默認為Channel Queue容量。其在提交事務時首先通過remainingChange = takeList.size() - putList.size()計算獲得需要增加多少變更事件;如果小于0表示放入的事件比取出的多,表示有- remainingChange個事件放入,此時應該減少-queueRemaining信號量;而如果大于0,則表示取出的事件比放入的多,表示有queueRemaining個事件取出,此時應該增加queueRemaining信號量;即消費事件時減少信號量,生產事件時增加信號量。
而bytesRemaining是字節容量信號量,超出容量則回滾事務。
最后看下回滾事務:
protected void doRollback() {
int takes = takeList.size();
synchronized(queueLock) { //操作Channel Queue時一定鎖住queueLock
//1、前置條件判斷,檢查是否有足夠容量回滾事務
Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), "Not enough space in memory channel " +
"queue to rollback takes. This should never happen, please report");
//2、回滾事務的takeList隊列到Channel Queue
while(!takeList.isEmpty()) {
queue.addFirst(takeList.removeLast());
}
putList.clear();
}
//3、釋放putByteCounter個bytesRemaining信號量
bytesRemaining.release(putByteCounter);
//4、計數器重置
putByteCounter = 0;
takeByteCounter = 0;
//5、釋放takeList隊列大小個已存儲事件容量
queueStored.release(takes);
channelCounter.setChannelSize(queue.size());
}
}也就是說在回滾時,需要把takeList中暫存的事件回滾到Channel Queue,并回滾queueStored信號量。
以上就是Flume架構中如何進行MemoryChannel事務實現,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注創新互聯行業資訊頻道。
網頁標題:Flume架構中如何進行MemoryChannel事務實現
文章源于:http://www.yijiale78.com/article32/gjhosc.html
成都網站建設公司_創新互聯,為您提供網站制作、網站排名、網頁設計公司、軟件開發、動態網站、定制開發
聲明:本網站發布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創新互聯