注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

断尘居

温柔的男人像海洋。

 
 
 
 
 

日志

 
 

Lucene索引过程分析(2)  

2011-12-22 16:48:55|  分类: Lucene |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |

3、将文档加入IndexWriter

代码:

writer.addDocument(doc); 
-->IndexWriter.addDocument(Document doc, Analyzer analyzer) 
     -->doFlush = docWriter.addDocument(doc, analyzer); 
          --> DocumentsWriter.updateDocument(Document, Analyzer, Term) 
注:--> 代表一级函数调用

IndexWriter继而调用DocumentsWriter.addDocument,其又调用DocumentsWriter.updateDocument。

4、将文档加入DocumentsWriter

代码:

DocumentsWriter.updateDocument(Document doc, Analyzer analyzer, Term delTerm) 
-->(1) DocumentsWriterThreadState state = getThreadState(doc, delTerm); 
-->(2) DocWriter perDoc = state.consumer.processDocument(); 
-->(3) finishDocument(state, perDoc);

DocumentsWriter对象主要包含以下几部分:

  • 用于写索引文件
    • IndexWriter writer;
    • Directory directory;
    • Similarity similarity:分词器
    • String segment:当前的段名,每当flush的时候,将索引写入以此为名称的段。
IndexWriter.doFlushInternal() 
--> String segment = docWriter.getSegment();//return segment 
--> newSegment = new SegmentInfo(segment,……); 
--> docWriter.createCompoundFile(segment);//根据segment创建cfs文件。
  •  
    • String docStoreSegment:存储域所要写入的目标段。(在索引文件格式一文中已经详细描述)
    • int docStoreOffset:存储域在目标段中的偏移量。
    • int nextDocID:下一篇添加到此索引的文档ID号,对于同一个索引文件夹,此变量唯一,且同步访问。
    • DocConsumer consumer; 这是整个索引过程的核心,是IndexChain整个索引链的源头。

基本索引链:

对于一篇文档的索引过程,不是由一个对象来完成的,而是用对象组合的方式形成的一个处理链,链上的每个对象仅仅处理索引过程的一部分,称为索引链,由于后面还有其他的索引链,所以此处的索引链我称为基本索引链。

DocConsumer consumer 类型为DocFieldProcessor,是整个索引链的源头,包含如下部分:

  • 对索引域的处理
    • DocFieldConsumer consumer 类型为DocInverter,包含如下部分
      • InvertedDocConsumer consumer类型为TermsHash,包含如下部分
        • TermsHashConsumer consumer类型为FreqProxTermsWriter,负责写freq, prox信息
        • TermsHash nextTermsHash
          • TermsHashConsumer consumer类型为TermVectorsTermsWriter,负责写tvx, tvd, tvf信息
      • InvertedDocEndConsumer endConsumer 类型为NormsWriter,负责写nrm信息
  • 对存储域的处理
    • FieldInfos fieldInfos = new FieldInfos();
    • StoredFieldsWriter fieldsWriter负责写fnm, fdt, fdx信息
  • 删除文档
    • BufferedDeletes deletesInRAM = new BufferedDeletes();
    • BufferedDeletes deletesFlushed = new BufferedDeletes();

类BufferedDeletes包含了一下的成员变量:

  • HashMap terms = new HashMap();删除的词(Term)
  • HashMap queries = new HashMap();删除的查询(Query)
  • List docIDs = new ArrayList();删除的文档ID
  • long bytesUsed:用于判断是否应该对删除的文档写入索引文件。

由此可见,文档的删除主要有三种方式:

  • IndexWriter.deleteDocuments(Term term):所有包含此词的文档都会被删除。
  • IndexWriter.deleteDocuments(Query query):所有能满足此查询的文档都会被删除。
  • IndexReader.deleteDocument(int docNum):删除此文档ID

删除文档既可以用reader进行删除,也可以用writer进行删除,不同的是,reader进行删除后,此reader马上能够生效,而用writer删除后,会被缓存在deletesInRAM及deletesFlushed中,只有写入到索引文件中,当reader再次打开的时候,才能够看到。

那deletesInRAM和deletesFlushed各有什么用处呢?

此版本的Lucene对文档的删除是支持多线程的,当用IndexWriter删除文档的时候,都是缓存在deletesInRAM中的,直到flush,才将删除的文档写入到索引文件中去,我们知道flush是需要一段时间的,那么在flush的过程中,另一个线程又有文档删除怎么办呢?

一般过程是这个样子的,当flush的时候,首先在同步(synchornized)的方法pushDeletes中,将deletesInRAM全部加到deletesFlushed中,然后将deletesInRAM清空,退出同步方法,于是flush的线程程就向索引文件写deletesFlushed中的删除文档的过程,而与此同时其他线程新删除的文档则添加到新的deletesInRAM中去,直到下次flush才写入索引文件。

  • 缓存管理
    • 为了提高索引的速度,Lucene对很多的数据进行了缓存,使一起写入磁盘,然而缓存需要进行管理,何时分配,何时回收,何时写入磁盘都需要考虑。
    • ArrayList freeCharBlocks = new ArrayList();将用于缓存词(Term)信息的空闲块
    • ArrayList freeByteBlocks = new ArrayList();将用于缓存文档号(doc id)及词频(freq),位置(prox)信息的空闲块。
    • ArrayList freeIntBlocks = new ArrayList();将存储某词的词频(freq)和位置(prox)分别在byteBlocks中的偏移量
    • boolean bufferIsFull;用来判断缓存是否满了,如果满了,则应该写入磁盘
    • long numBytesAlloc;分配的内存数量
    • long numBytesUsed;使用的内存数量
    • long freeTrigger;应该开始回收内存时的内存用量。
    • long freeLevel;回收内存应该回收到的内存用量。
    • long ramBufferSize;用户设定的内存用量。
缓存用量之间的关系如下: 
DocumentsWriter.setRAMBufferSizeMB(double mb){ 

    ramBufferSize = (long) (mb*1024*1024);//用户设定的内存用量,当使用内存大于此时,开始写入磁盘 
    waitQueuePauseBytes = (long) (ramBufferSize*0.1); 
    waitQueueResumeBytes = (long) (ramBufferSize*0.05); 
    freeTrigger = (long) (1.05 * ramBufferSize);//当分配的内存到达105%的时候开始释放freeBlocks中的内存 
    freeLevel = (long) (0.95 * ramBufferSize);//一直释放到95%



DocumentsWriter.balanceRAM(){ 
    if (numBytesAlloc+deletesRAMUsed > freeTrigger) { 
    //当分配的内存加删除文档所占用的内存大于105%的时候,开始释放内存 
        while(numBytesAlloc+deletesRAMUsed > freeLevel) { 
        //一直进行释放,直到95% 

            //释放free blocks

            byteBlockAllocator.freeByteBlocks.remove(byteBlockAllocator.freeByteBlocks.size()-1); 
            numBytesAlloc -= BYTE_BLOCK_SIZE;

            freeCharBlocks.remove(freeCharBlocks.size()-1); 
            numBytesAlloc -= CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;

            freeIntBlocks.remove(freeIntBlocks.size()-1); 
            numBytesAlloc -= INT_BLOCK_SIZE * INT_NUM_BYTE; 
        } 
    } else {

        if (numBytesUsed+deletesRAMUsed > ramBufferSize){

        //当使用的内存加删除文档占有的内存大于用户指定的内存时,可以写入磁盘

              bufferIsFull = true;

        }

    } 
}

当判断是否应该写入磁盘时:

  • 如果使用的内存大于用户指定内存时,bufferIsFull = true
  • 当使用的内存加删除文档所占的内存加正在写入的删除文档所占的内存大于用户指定内存时 deletesInRAM.bytesUsed + deletesFlushed.bytesUsed + numBytesUsed) >= ramBufferSize
  • 当删除的文档数目大于maxBufferedDeleteTerms时

DocumentsWriter.timeToFlushDeletes(){

    return (bufferIsFull || deletesFull()) && setFlushPending();

}

DocumentsWriter.deletesFull(){

    return (ramBufferSize != IndexWriter.DISABLE_AUTO_FLUSH && 
        (deletesInRAM.bytesUsed + deletesFlushed.bytesUsed + numBytesUsed) >= ramBufferSize) || 
        (maxBufferedDeleteTerms != IndexWriter.DISABLE_AUTO_FLUSH && 
        ((deletesInRAM.size() + deletesFlushed.size()) >= maxBufferedDeleteTerms));

}

  • 多线程并发索引
    • 为了支持多线程并发索引,对每一个线程都有一个DocumentsWriterThreadState,其为每一个线程根据DocConsumer consumer的索引链来创建每个线程的索引链(XXXPerThread),来进行对文档的并发处理。
    • DocumentsWriterThreadState[] threadStates = new DocumentsWriterThreadState[0];
    • HashMap threadBindings = new HashMap();
    • 虽然对文档的处理过程可以并行,但是将文档写入索引文件却必须串行进行,串行写入的代码在DocumentsWriter.finishDocument中
    • WaitQueue waitQueue = new WaitQueue()
    • long waitQueuePauseBytes
    • long waitQueueResumeBytes

在Lucene中,文档是按添加的顺序编号的,DocumentsWriter中的nextDocID就是记录下一个添加的文档id。 当Lucene支持多线程的时候,就必须要有一个synchornized方法来付给文档id并且将nextDocID加一,这些是在DocumentsWriter.getThreadState这个函数里面做的。

虽然给文档付ID没有问题了。但是由Lucene索引文件格式我们知道,文档是要按照ID的顺序从小到大写到索引文件中去的,然而不同的文档处理速度不同,当一个先来的线程一处理一篇需要很长时间的大文档时,另一个后来的线程二可能已经处理了很多小的文档了,但是这些后来小文档的ID号都大于第一个线程所处理的大文档,因而不能马上写到索引文件中去,而是放到waitQueue中,仅仅当大文档处理完了之后才写入索引文件。

waitQueue中有一个变量nextWriteDocID表示下一个可以写入文件的ID,当付给大文档ID=4时,则nextWriteDocID也设为4,虽然后来的小文档5,6,7,8等都已处理结束,但是如下代码,

WaitQueue.add(){

    if (doc.docID == nextWriteDocID){ 
       ………… 
    } else { 
        waiting[loc] = doc; 
        waitingBytes += doc.sizeInBytes(); 
   }

   doPause()

}

则把5, 6, 7, 8放入waiting队列,并且记录当前等待的文档所占用的内存大小waitingBytes。

当大文档4处理完毕后,不但写入文档4,把原来等待的文档5, 6, 7, 8也一起写入。

WaitQueue.add(){

    if (doc.docID == nextWriteDocID) {

       writeDocument(doc);

       while(true) {

           doc = waiting[nextWriteLoc];

           writeDocument(doc);

       }

   } else {

      …………

   }

   doPause()

}

但是这存在一个问题:当大文档很大很大,处理的很慢很慢的时候,后来的线程二可能已经处理了很多的小文档了,这些文档都是在waitQueue中,则占有了越来越多的内存,长此以往,有内存不够的危险。

因而在finishDocuments里面,在WaitQueue.add最后调用了doPause()函数

DocumentsWriter.finishDocument(){

    doPause = waitQueue.add(docWriter);

    if (doPause) 
        waitForWaitQueue();

    notifyAll();

}

WaitQueue.doPause() { 
    return waitingBytes > waitQueuePauseBytes; 
}

当waitingBytes足够大的时候(为用户指定的内存使用量的10%),doPause返回true,于是后来的线程二会进入wait状态,不再处理另外的文档,而是等待线程一处理大文档结束。

当线程一处理大文档结束的时候,调用notifyAll唤醒等待他的线程。

DocumentsWriter.waitForWaitQueue() { 
  do { 
    try { 
      wait(); 
    } catch (InterruptedException ie) { 
      throw new ThreadInterruptedException(ie); 
    } 
  } while (!waitQueue.doResume()); 
}

WaitQueue.doResume() { 
     return waitingBytes <= waitQueueResumeBytes; 
}

当waitingBytes足够小的时候,doResume返回true, 则线程二不用再wait了,可以继续处理另外的文档。

  • 一些标志位
    • int maxFieldLength:一篇文档中,一个域内可索引的最大的词(Term)数。
    • int maxBufferedDeleteTerms:可缓存的最大的删除词(Term)数。当大于这个数的时候,就要写到文件中了。

此过程又包含如下三个子过程:

4.1、得到当前线程对应的文档集处理对象(DocumentsWriterThreadState)

代码为:

DocumentsWriterThreadState state = getThreadState(doc, delTerm);

在Lucene中,对于同一个索引文件夹,只能够有一个IndexWriter打开它,在打开后,在文件夹中,生成文件write.lock,当其他IndexWriter再试图打开此索引文件夹的时候,则会报org.apache.lucene.store.LockObtainFailedException错误。

这样就出现了这样一个问题,在同一个进程中,对同一个索引文件夹,只能有一个IndexWriter打开它,因而如果想多线程向此索引文件夹中添加文档,则必须共享一个IndexWriter,而且在以往的实现中,addDocument函数是同步的(synchronized),也即多线程的索引并不能起到提高性能的效果。

于是为了支持多线程索引,不使IndexWriter成为瓶颈,对于每一个线程都有一个相应的文档集处理对象(DocumentsWriterThreadState),这样对文档的索引过程可以多线程并行进行,从而增加索引的速度。

getThreadState函数是同步的(synchronized),DocumentsWriter有一个成员变量threadBindings,它是一个HashMap,键为线程对象(Thread.currentThread()),值为此线程对应的DocumentsWriterThreadState对象。

DocumentsWriterThreadState DocumentsWriter.getThreadState(Document doc, Term delTerm)包含如下几个过程:

  • 根据当前线程对象,从HashMap中查找相应的DocumentsWriterThreadState对象,如果没找到,则生成一个新对象,并添加到HashMap中
DocumentsWriterThreadState state = (DocumentsWriterThreadState) threadBindings.get(Thread.currentThread()); 
if (state == null) { 
    …… 
    state = new DocumentsWriterThreadState(this); 
    …… 
    threadBindings.put(Thread.currentThread(), state); 
  • 如果此线程对象正在用于处理上一篇文档,则等待,直到此线程的上一篇文档处理完。
DocumentsWriter.getThreadState() { 
    waitReady(state); 
    state.isIdle = false; 


waitReady(state) { 
    while (!state.isIdle) {wait();} 
}  

显然如果state.isIdle为false,则此线程等待。 
在一篇文档处理之前,state.isIdle = false会被设定,而在一篇文档处理完毕之后,DocumentsWriter.finishDocument(DocumentsWriterThreadState perThread, DocWriter docWriter)中,会首先设定perThread.isIdle = true; 然后notifyAll()来唤醒等待此文档完成的线程,从而处理下一篇文档。
  • 如果IndexWriter刚刚commit过,则新添加的文档要加入到新的段中(segment),则首先要生成新的段名。
initSegmentName(false); 
--> if (segment == null) segment = writer.newSegmentName();
  • 将此线程的文档处理对象设为忙碌:state.isIdle = false;

4.2、用得到的文档集处理对象(DocumentsWriterThreadState)处理文档

代码为:

DocWriter perDoc = state.consumer.processDocument();

每一个文档集处理对象DocumentsWriterThreadState都有一个文档及域处理对象DocFieldProcessorPerThread,它的成员函数processDocument()被调用来对文档及域进行处理。

线程索引链(XXXPerThread):

由于要多线程进行索引,因而每个线程都要有自己的索引链,称为线程索引链。

线程索引链同基本索引链有相似的树形结构,由基本索引链中每个层次的对象调用addThreads进行创建的,负责每个线程的对文档的处理。

DocFieldProcessorPerThread是线程索引链的源头,由DocFieldProcessor.addThreads(…)创建

DocFieldProcessorPerThread对象结构如下:

  • 对索引域进行处理
    • DocFieldConsumerPerThread consumer 类型为 DocInverterPerThread,由DocInverter.addThreads创建
      • InvertedDocConsumerPerThread consumer 类型为TermsHashPerThread,由TermsHash.addThreads创建
        • TermsHashConsumerPerThread consumer类型为FreqProxTermsWriterPerThread,由FreqProxTermsWriter.addThreads创建,负责每个线程的freq,prox信息处理
        • TermsHashPerThread nextPerThread
          • TermsHashConsumerPerThread consumer类型TermVectorsTermsWriterPerThread,由TermVectorsTermsWriter创建,负责每个线程的tvx,tvd,tvf信息处理
      • InvertedDocEndConsumerPerThread endConsumer 类型为NormsWriterPerThread,由NormsWriter.addThreads创建,负责nrm信息的处理
  • 对存储域进行处理
    • StoredFieldsWriterPerThread fieldsWriter由StoredFieldsWriter.addThreads创建,负责fnm,fdx,fdt的处理。
    • FieldInfos fieldInfos;

DocumentsWriter.DocWriter DocFieldProcessorPerThread.processDocument()包含以下几个过程:

4.2.1、开始处理当前文档

consumer(DocInverterPerThread).startDocument(); 
fieldsWriter(StoredFieldsWriterPerThread).startDocument();

在此版的Lucene中,几乎所有的XXXPerThread的类,都有startDocument和finishDocument两个函数,因为对同一个线程,这些对象都是复用的,而非对每一篇新来的文档都创建一套,这样也提高了效率,也牵扯到数据的清理问题。一般在startDocument函数中,清理处理上篇文档遗留的数据,在finishDocument中,收集本次处理的结果数据,并返回,一直返回到DocumentsWriter.updateDocument(Document, Analyzer, Term) 然后根据条件判断是否将数据刷新到硬盘上。

4.2.2、逐个处理文档的每一个域

由于一个线程可以连续处理多个文档,而在普通的应用中,几乎每篇文档的域都是大致相同的,为每篇文档的每个域都创建一个处理对象非常低效,因而考虑到复用域处理对象DocFieldProcessorPerField,对于每一个域都有一个此对象。

那当来到一个新的域的时候,如何更快的找到此域的处理对象呢?Lucene创建了一个DocFieldProcessorPerField[] fieldHash哈希表来方便更快查找域对应的处理对象。

当处理各个域的时候,按什么顺序呢?其实是按照域名的字典顺序。因而Lucene创建了DocFieldProcessorPerField[] fields的数组来方便按顺序处理域。

因而一个域的处理对象被放在了两个地方。

对于域的处理过程如下:

4.2.2.1、首先:对于每一个域,按照域名,在fieldHash中查找域处理对象DocFieldProcessorPerField,代码如下:

final int hashPos = fieldName.hashCode() & hashMask;//计算哈希值 
DocFieldProcessorPerField fp = fieldHash[hashPos];//找到哈希表中对应的位置 
while(fp != null && !fp.fieldInfo.name.equals(fieldName)) fp = fp.next;//链式哈希表

如果能够找到,则更新DocFieldProcessorPerField中的域信息fp.fieldInfo.update(field.isIndexed()…)

如果没有找到,则添加域到DocFieldProcessorPerThread.fieldInfos中,并创建新的DocFieldProcessorPerField,且将其加入哈希表。代码如下:

fp = new DocFieldProcessorPerField(this, fi); 
fp.next = fieldHash[hashPos]; 
fieldHash[hashPos] = fp;

如果是一个新的field,则将其加入fields数组fields[fieldCount++] = fp;

并且如果是存储域的话,用StoredFieldsWriterPerThread将其写到索引中:

if (field.isStored()) { 
  fieldsWriter.addField(field, fp.fieldInfo); 
}

4.2.2.1.1、处理存储域的过程如下:

StoredFieldsWriterPerThread.addField(Fieldable field, FieldInfo fieldInfo) 
--> localFieldsWriter.writeField(fieldInfo, field);

FieldsWriter.writeField(FieldInfo fi, Fieldable field)代码如下:

请参照fdt文件的格式,则一目了然:

fieldsStream.writeVInt(fi.number);//文档号 
byte bits = 0; 
if (field.isTokenized()) 
    bits |= FieldsWriter.FIELD_IS_TOKENIZED; 
if (field.isBinary()) 
    bits |= FieldsWriter.FIELD_IS_BINARY; 
if (field.isCompressed()) 
    bits |= FieldsWriter.FIELD_IS_COMPRESSED;

fieldsStream.writeByte(bits); //域的属性位

if (field.isCompressed()) {//对于压缩域 
    // compression is enabled for the current field 
    final byte[] data; 
    final int len; 
    final int offset; 
    // check if it is a binary field 
    if (field.isBinary()) { 
        data = CompressionTools.compress(field.getBinaryValue(), field.getBinaryOffset(), field.getBinaryLength()); 
    } else { 
        byte x[] = field.stringValue().getBytes("UTF-8"); 
        data = CompressionTools.compress(x, 0, x.length); 
    } 
    len = data.length; 
    offset = 0; 
    fieldsStream.writeVInt(len);//写长度 
    fieldsStream.writeBytes(data, offset, len);//写二进制内容 
} else {//对于非压缩域 
    // compression is disabled for the current field 
    if (field.isBinary()) {//如果是二进制域 
        final byte[] data; 
        final int len; 
        final int offset; 
        data = field.getBinaryValue(); 
        len = field.getBinaryLength(); 
        offset = field.getBinaryOffset();

        fieldsStream.writeVInt(len);//写长度 
        fieldsStream.writeBytes(data, offset, len);//写二进制内容 
    } else { 
        fieldsStream.writeString(field.stringValue());//写字符内容 
    } 
}

4.2.2.2、然后:对fields数组进行排序,是域按照名称排序。quickSort(fields, 0, fieldCount-1);

4.2.2.3、最后:按照排序号的顺序,对域逐个处理,此处处理的仅仅是索引域,代码如下:

for(int i=0;i      fields[i].consumer.processFields(fields[i].fields, fields[i].fieldCount);

域处理对象(DocFieldProcessorPerField)结构如下:

域索引链:

每个域也有自己的索引链,称为域索引链,每个域的索引链也有同线程索引链有相似的树形结构,由线程索引链中每个层次的每个层次的对象调用addField进行创建,负责对此域的处理。

和基本索引链及线程索引链不同的是,域索引链仅仅负责处理索引域,而不负责存储域的处理。

DocFieldProcessorPerField是域索引链的源头,对象结构如下:

  • DocFieldConsumerPerField consumer类型为DocInverterPerField,由DocInverterPerThread.addField创建
    • InvertedDocConsumerPerField consumer 类型为TermsHashPerField,由TermsHashPerThread.addField创建
      • TermsHashConsumerPerField consumer 类型为FreqProxTermsWriterPerField,由FreqProxTermsWriterPerThread.addField创建,负责freq, prox信息的处理
      • TermsHashPerField nextPerField
        • TermsHashConsumerPerField consumer 类型为TermVectorsTermsWriterPerField,由TermVectorsTermsWriterPerThread.addField创建,负责tvx, tvd, tvf信息的处理
    • InvertedDocEndConsumerPerField endConsumer 类型为NormsWriterPerField,由NormsWriterPerThread.addField创建,负责nrm信息的处理。

4.2.2.3.1、处理索引域的过程如下:

DocInverterPerField.processFields(Fieldable[], int) 过程如下:

  • 判断是否要形成倒排表,代码如下:
boolean doInvert = consumer.start(fields, count); 
--> TermsHashPerField.start(Fieldable[], int)  
      --> for(int i=0;i             if (fields[i].isIndexed()) 
                 return true; 
            return false;

读到这里,大家可能会发生困惑,既然XXXPerField是对于每一个域有一个处理对象的,那为什么参数传进来的是Fieldable[]数组, 并且还有域的数目count呢?

其实这不经常用到,但必须得提一下,由上面的fieldHash的实现我们可以看到,是根据域名进行哈希的,所以准确的讲,XXXPerField并非对于每一个域有一个处理对象,而是对每一组相同名字的域有相同的处理对象。

对于同一篇文档,相同名称的域可以添加多个,代码如下:

doc.add(new Field("contents", "the content of the file.", Field.Store.NO, Field.Index.NOT_ANALYZED)); 
doc.add(new Field("contents", new FileReader(f)));

则传进来的名为"contents"的域如下:

fields    Fieldable[2]  (id=52)    
    [0]    Field  (id=56)    
        binaryLength    0    
        binaryOffset    0    
        boost    1.0    
        fieldsData    "the content of the file."    
        isBinary    false    
        isCompressed    false    
        isIndexed    true    
        isStored    false    
        isTokenized    false    
        lazy    false    
        name    "contents"    
        omitNorms    false    
        omitTermFreqAndPositions    false    
        storeOffsetWithTermVector    false    
        storePositionWithTermVector    false    
        storeTermVector    false    
        tokenStream    null    
    [1]    Field  (id=58)    
        binaryLength    0    
        binaryOffset    0    
        boost    1.0    
        fieldsData    FileReader  (id=131)    
        isBinary    false    
        isCompressed    false    
        isIndexed    true    
        isStored    false    
        isTokenized    true    
        lazy    false    
        name    "contents"    
        omitNorms    false    
        omitTermFreqAndPositions    false    
        storeOffsetWithTermVector    false    
        storePositionWithTermVector    false    
        storeTermVector    false    
        tokenStream    null   

  • 对传进来的同名域逐一处理,代码如下

for(int i=0;i

    final Fieldable field = fields[i];

    if (field.isIndexed() && doInvert) {

        //仅仅对索引域进行处理

        if (!field.isTokenized()) {

            //如果此域不分词,见(1)对不分词的域的处理

        } else {

            //如果此域分词,见(2)对分词的域的处理

        }

    }

}

(1) 对不分词的域的处理

(1-1) 得到域的内容,并构建单个Token形成的SingleTokenAttributeSource。因为不进行分词,因而整个域的内容算做一个Token.

String stringValue = field.stringValue(); //stringValue    "200910240957"  
final int valueLength = stringValue.length(); 
perThread.singleToken.reinit(stringValue, 0, valueLength);

对于此域唯一的一个Token有以下的属性:

  • Term:文字信息。在处理过程中,此值将保存在TermAttribute的实现类实例化的对象TermAttributeImp里面。
  • Offset:偏移量信息,是按字或字母的起始偏移量和终止偏移量,表明此Token在文章中的位置,多用于加亮。在处理过程中,此值将保存在OffsetAttribute的实现类实例化的对象OffsetAttributeImp里面。

在SingleTokenAttributeSource里面,有一个HashMap来保存可能用于保存属性的类名(Key,准确的讲是接口)以及保存属性信息的对象(Value):

singleToken    DocInverterPerThread$SingleTokenAttributeSource  (id=150)    
    attributeImpls    LinkedHashMap  (id=945)    
    attributes    LinkedHashMap  (id=946)     
        size    2    
        table    HashMap$Entry[16]  (id=988)    
            [0]    LinkedHashMap$Entry  (id=991)     
                key    Class (org.apache.lucene.analysis.tokenattributes.TermAttribute) (id=755)     
                value    TermAttributeImpl  (id=949)    
                    termBuffer    char[19]  (id=954)    //[2, 0, 0, 9, 1, 0, 2, 4, 0, 9, 5, 7] 
                    termLength    12
     
            [7]    LinkedHashMap$Entry  (id=993)     
                key    Class (org.apache.lucene.analysis.tokenattributes.OffsetAttribute) (id=274)     
                value    OffsetAttributeImpl  (id=948)    
                    endOffset    12    
                    startOffset    0
     
    factory    AttributeSource$AttributeFactory$DefaultAttributeFactory  (id=947)    
    offsetAttribute    OffsetAttributeImpl  (id=948)    
    termAttribute    TermAttributeImpl  (id=949)   

(1-2) 得到Token的各种属性信息,为索引做准备。

consumer.start(field)做的主要事情就是根据各种属性的类型来构造保存属性的对象(HashMap中有则取出,无则构造),为索引做准备。

consumer(TermsHashPerField).start(…)

--> termAtt = fieldState.attributeSource.addAttribute(TermAttribute.class);得到的就是上述HashMap中的TermAttributeImpl   

--> consumer(FreqProxTermsWriterPerField).start(f);

      --> if (fieldState.attributeSource.hasAttribute(PayloadAttribute.class)) {

                payloadAttribute = fieldState.attributeSource.getAttribute(PayloadAttribute.class); 
                存储payload信息则得到payload的属}

--> nextPerField(TermsHashPerField).start(f);

      --> termAtt = fieldState.attributeSource.addAttribute(TermAttribute.class);得到的还是上述HashMap中的TermAttributeImpl

      --> consumer(TermVectorsTermsWriterPerField).start(f);

            --> if (doVectorOffsets) {

                      offsetAttribute = fieldState.attributeSource.addAttribute(OffsetAttribute.class); 
                      如果存储词向量则得到的是上述HashMap中的OffsetAttributeImp }

(1-3) 将Token加入倒排表

consumer(TermsHashPerField).add();

加入倒排表的过程,无论对于分词的域和不分词的域,过程是一样的,因而放到对分词的域的解析中一起说明。

(2) 对分词的域的处理

(2-1) 构建域的TokenStream

final TokenStream streamValue = field.tokenStreamValue();

//用户可以在添加域的时候,应用构造函数public Field(String name, TokenStream tokenStream) 直接传进一个TokenStream过来,这样就不用另外构建一个TokenStream了。

if (streamValue != null) 
  stream = streamValue; 
else {

  ……

  stream = docState.analyzer.reusableTokenStream(fieldInfo.name, reader);

}

此时TokenStream的各项属性值还都是空的,等待一个一个被分词后得到,此时的TokenStream对象如下:

stream    StopFilter  (id=112)    
    attributeImpls    LinkedHashMap  (id=121)    
    attributes    LinkedHashMap  (id=122)     
        size    4    
        table    HashMap$Entry[16]  (id=146)     
            [2]    LinkedHashMap$Entry  (id=148)     
                key    Class (org.apache.lucene.analysis.tokenattributes.TypeAttribute) (id=154)     
                value    TypeAttributeImpl  (id=157)    
                    type    "word"     
            [8]    LinkedHashMap$Entry  (id=150)    
                after    LinkedHashMap$Entry  (id=156)     
                    key    Class (org.apache.lucene.analysis.tokenattributes.OffsetAttribute) (id=163)     
                    value    OffsetAttributeImpl  (id=164)    
                        endOffset    0    
                        startOffset    0
     
                key    Class (org.apache.lucene.analysis.tokenattributes.TermAttribute) (id=142)     
                value    TermAttributeImpl  (id=133)    
                    termBuffer    char[17]  (id=173)    
                    termLength    0
     
            [10]    LinkedHashMap$Entry  (id=151)     
                key    Class (org.apache.lucene.analysis.tokenattributes.PositionIncrementAttribute) (id=136)     
                value    PositionIncrementAttributeImpl  (id=129)    
                    positionIncrement    1     
    currentState    AttributeSource$State  (id=123)    
    enablePositionIncrements    true    
    factory    AttributeSource$AttributeFactory$DefaultAttributeFactory  (id=125)    
    input    LowerCaseFilter  (id=127)     
        input    StandardFilter  (id=213)     
            input    StandardTokenizer  (id=218)     
                input    FileReader  (id=93)    //从文件中读出来的文本,将经过分词器分词,并一层层的Filter的处理,得到一个个Token 
    stopWords    CharArraySet$UnmodifiableCharArraySet  (id=131)    
    termAtt    TermAttributeImpl  (id=133)   

(2-2) 得到第一个Token,并初始化此Token的各项属性信息,并为索引做准备(start)。

boolean hasMoreTokens = stream.incrementToken();//得到第一个Token

OffsetAttribute offsetAttribute = fieldState.attributeSource.addAttribute(OffsetAttribute.class);//得到偏移量属性

offsetAttribute    OffsetAttributeImpl  (id=164)    
    endOffset    8    
    startOffset    0   

PositionIncrementAttribute posIncrAttribute = fieldState.attributeSource.addAttribute(PositionIncrementAttribute.class);//得到位置属性

posIncrAttribute    PositionIncrementAttributeImpl  (id=129)    
    positionIncrement    1   

consumer.start(field);//其中得到了TermAttribute属性,如果存储payload则得到PayloadAttribute属性,如果存储词向量则得到OffsetAttribute属性。

(2-3) 进行循环,不断的取下一个Token,并添加到倒排表

for(;;) {

    if (!hasMoreTokens) break;

    …… 
    consumer.add();

    …… 
    hasMoreTokens = stream.incrementToken(); 
}

(2-4) 添加Token到倒排表的过程consumer(TermsHashPerField).add()

TermsHashPerField对象主要包括以下部分:

  • CharBlockPool charPool; 用于存储Token的文本信息,如果不足时,从DocumentsWriter中的freeCharBlocks分配
  • ByteBlockPool bytePool;用于存储freq, prox信息,如果不足时,从DocumentsWriter中的freeByteBlocks分配
  • IntBlockPool intPool; 用于存储分别指向每个Token在bytePool中freq和prox信息的偏移量。如果不足时,从DocumentsWriter的freeIntBlocks分配
  • TermsHashConsumerPerField consumer类型为FreqProxTermsWriterPerField,用于写freq, prox信息到缓存中。
  • RawPostingList[] postingsHash = new RawPostingList[postingsHashSize];存储倒排表,每一个Term都有一个RawPostingList (PostingList),其中包含了int textStart,也即文本在charPool中的偏移量,int byteStart,即此Term的freq和prox信息在bytePool中的起始偏移量,int intStart,即此term的在intPool中的起始偏移量。

形成倒排表的过程如下:

//得到token的文本及文本长度

final char[] tokenText = termAtt.termBuffer();//[s, t, u, d, e, n, t, s]

final int tokenTextLen = termAtt.termLength();//tokenTextLen 8

//按照token的文本计算哈希值,以便在postingsHash中找到此token对应的倒排表

int downto = tokenTextLen; 
int code = 0; 
while (downto > 0) { 
  char ch = tokenText[—downto]; 
  code = (code*31) + ch; 
}

int hashPos = code & postingsHashMask;

//在倒排表哈希表中查找此Token,如果找到相应的位置,但是不是此Token,说明此位置存在哈希冲突,采取重新哈希rehash的方法。

p = postingsHash[hashPos];

if (p != null && !postingEquals(tokenText, tokenTextLen)) {  
  final int inc = ((code>>8)+code)|1; 
  do { 
    code += inc; 
    hashPos = code & postingsHashMask; 
    p = postingsHash[hashPos]; 
  } while (p != null && !postingEquals(tokenText, tokenTextLen)); 
}

//如果此Token之前从未出现过

if (p == null) {

    if (textLen1 + charPool.charUpto > DocumentsWriter.CHAR_BLOCK_SIZE) {

        //当charPool不足的时候,在freeCharBlocks中分配新的buffer

        charPool.nextBuffer();

    }

    //从空闲的倒排表中分配新的倒排表

    p = perThread.freePostings[--perThread.freePostingsCount];

    //将文本复制到charPool中

    final char[] text = charPool.buffer; 
    final int textUpto = charPool.charUpto; 
    p.textStart = textUpto + charPool.charOffset; 
    charPool.charUpto += textLen1; 
    System.arraycopy(tokenText, 0, text, textUpto, tokenTextLen); 
    text[textUpto+tokenTextLen] = 0xffff;

    //将倒排表放入哈希表中

    postingsHash[hashPos] = p; 
    numPostings++;

    if (numPostingInt + intPool.intUpto > DocumentsWriter.INT_BLOCK_SIZE) intPool.nextBuffer();

    //当intPool不足的时候,在freeIntBlocks中分配新的buffer。

    if (DocumentsWriter.BYTE_BLOCK_SIZE - bytePool.byteUpto < numPostingInt*ByteBlockPool.FIRST_LEVEL_SIZE)

        bytePool.nextBuffer();

    //当bytePool不足的时候,在freeByteBlocks中分配新的buffer。

    //此处streamCount为2,表明在intPool中,每两项表示一个词,一个是指向bytePool中freq信息偏移量的,一个是指向bytePool中prox信息偏移量的。

    intUptos = intPool.buffer; 
    intUptoStart = intPool.intUpto; 
    intPool.intUpto += streamCount;

    p.intStart = intUptoStart + intPool.intOffset;

    //在bytePool中分配两个空间,一个放freq信息,一个放prox信息的。  
    for(int i=0;i

        final int upto = bytePool.newSlice(ByteBlockPool.FIRST_LEVEL_SIZE); 
        intUptos[intUptoStart+i] = upto + bytePool.byteOffset; 
    } 
    p.byteStart = intUptos[intUptoStart];

    //当Term原来没有出现过的时候,调用newTerm

    consumer(FreqProxTermsWriterPerField).newTerm(p);

}

//如果此Token之前曾经出现过,则调用addTerm。

else {

    intUptos = intPool.buffers[p.intStart >> DocumentsWriter.INT_BLOCK_SHIFT]; 
    intUptoStart = p.intStart & DocumentsWriter.INT_BLOCK_MASK; 
    consumer(FreqProxTermsWriterPerField).addTerm(p);

}

(2-5) 添加新Term的过程,consumer(FreqProxTermsWriterPerField).newTerm

final void newTerm(RawPostingList p0) { 
  FreqProxTermsWriter.PostingList p = (FreqProxTermsWriter.PostingList) p0; 
  p.lastDocID = docState.docID; //当一个新的term出现的时候,包含此Term的就只有本篇文档,记录其ID 
  p.lastDocCode = docState.docID << 1; //docCode是文档ID左移一位,为什么左移,请参照索引文件格式(1)中的或然跟随规则。 
  p.docFreq = 1; //docFreq这里用词可能容易引起误会,docFreq这里指的是此文档所包含的此Term的次数,并非包含此Term的文档的个数。 
  writeProx(p, fieldState.position); //写入prox信息到bytePool中,此时freq信息还不能写入,因为当前的文档还没有处理完,尚不知道此文档包含此Term的总数。 
}

writeProx(FreqProxTermsWriter.PostingList p, int proxCode) {

  termsHashPerField.writeVInt(1, proxCode<<1);//第一个参数所谓1,也就是写入此文档在intPool中的第1项——prox信息。为什么左移一位呢?是因为后面可能跟着payload信息,参照索引文件格式(1)中或然跟随规则。 
  p.lastPosition = fieldState.position;//总是要记录lastDocID, lastPostion,是因为要计算差值,参照索引文件格式(1)中的差值规则。

}

(2-6) 添加已有Term的过程

final void addTerm(RawPostingList p0) {

  FreqProxTermsWriter.PostingList p = (FreqProxTermsWriter.PostingList) p0;

  if (docState.docID != p.lastDocID) {

      //当文档ID变了的时候,说明上一篇文档已经处理完毕,可以写入freq信息了。

      //第一个参数所谓0,也就是写入上一篇文档在intPool中的第0项——freq信息。至于信息为何这样写,参照索引文件格式(1)中的或然跟随规则,及tis文件格式。

      if (1 == p.docFreq) 
        termsHashPerField.writeVInt(0, p.lastDocCode|1); 
      else { 
        termsHashPerField.writeVInt(0, p.lastDocCode); 
        termsHashPerField.writeVInt(0, p.docFreq); 
      } 
      p.docFreq = 1;//对于新的文档,freq还是为1. 
      p.lastDocCode = (docState.docID - p.lastDocID) << 1;//文档号存储差值 
      p.lastDocID = docState.docID; 
      writeProx(p, fieldState.position);  
    } else {

      //当文档ID不变的时候,说明此文档中这个词又出现了一次,从而freq加一,写入再次出现的位置信息,用差值。 
      p.docFreq++; 
      writeProx(p, fieldState.position-p.lastPosition); 
  } 
}

(2-7) 结束处理当前域

consumer(TermsHashPerField).finish();

--> FreqProxTermsWriterPerField.finish()

--> TermVectorsTermsWriterPerField.finish()

endConsumer(NormsWriterPerField).finish();

--> norms[upto] = Similarity.encodeNorm(norm);//计算标准化因子的值。

--> docIDs[upto] = docState.docID;

4.2.3、结束处理当前文档

final DocumentsWriter.DocWriter one = fieldsWriter(StoredFieldsWriterPerThread).finishDocument();

存储域返回结果:一个写成了二进制的存储域缓存。

one    StoredFieldsWriter$PerDoc  (id=322)    
    docID    0    
    fdt    RAMOutputStream  (id=325)    
        bufferLength    1024    
        bufferPosition    40    
        bufferStart    0    
        copyBuffer    null    
        currentBuffer    byte[1024]  (id=332)    
        currentBufferIndex    0    
        file    RAMFile  (id=333)    
        utf8Result    UnicodeUtil$UTF8Result  (id=335)    
    next    null    
    numStoredFields    2    
    this$0    StoredFieldsWriter  (id=327)   

final DocumentsWriter.DocWriter two = consumer(DocInverterPerThread).finishDocument();

--> NormsWriterPerThread.finishDocument()

--> TermsHashPerThread.finishDocument()

索引域的返回结果为null

4.3、用DocumentsWriter.finishDocument结束本次文档添加

代码:

DocumentsWriter.updateDocument(Document, Analyzer, Term)

--> DocumentsWriter.finishDocument(DocumentsWriterThreadState, DocumentsWriter$DocWriter)

      --> doPause = waitQueue.add(docWriter);//有关waitQueue,在DocumentsWriter的缓存管理中已作解释

            --> DocumentsWriter$WaitQueue.writeDocument(DocumentsWriter$DocWriter)

                  --> StoredFieldsWriter$PerDoc.finish()

                        --> fieldsWriter.flushDocument(perDoc.numStoredFields, perDoc.fdt);将存储域信息真正写入文件。

  评论这张
 
阅读(974)| 评论(0)
推荐 转载

历史上的今天

在LOFTER的更多文章

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017