注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

断尘居

温柔的男人像海洋。

 
 
 
 
 

日志

 
 

Lucene索引过程分析(4)  

2011-12-22 16:52:54|  分类: Lucene |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |

6、关闭IndexWriter对象

代码:

writer.close();

--> IndexWriter.closeInternal(boolean)

      --> (1) 将索引信息由内存写入磁盘: flush(waitForMerges, true, true); 
      --> (2) 进行段合并: mergeScheduler.merge(this);

对段的合并将在后面的章节进行讨论,此处仅仅讨论将索引信息由写入磁盘的过程。

代码:

IndexWriter.flush(boolean triggerMerge, boolean flushDocStores, boolean flushDeletes)

--> IndexWriter.doFlush(boolean flushDocStores, boolean flushDeletes)

      --> IndexWriter.doFlushInternal(boolean flushDocStores, boolean flushDeletes)

将索引写入磁盘包括以下几个过程:

  • 得到要写入的段名:String segment = docWriter.getSegment();
  • DocumentsWriter将缓存的信息写入段:docWriter.flush(flushDocStores);
  • 生成新的段信息对象:newSegment = new SegmentInfo(segment, flushedDocCount, directory, false, true, docStoreOffset, docStoreSegment, docStoreIsCompoundFile, docWriter.hasProx());
  • 准备删除文档:docWriter.pushDeletes();
  • 生成cfs段:docWriter.createCompoundFile(segment);
  • 删除文档:applyDeletes();

6.1、得到要写入的段名

代码:

SegmentInfo newSegment = null;

final int numDocs = docWriter.getNumDocsInRAM();//文档总数

String docStoreSegment = docWriter.getDocStoreSegment();//存储域和词向量所要要写入的段名,"_0"   

int docStoreOffset = docWriter.getDocStoreOffset();//存储域和词向量要写入的段中的偏移量

String segment = docWriter.getSegment();//段名,"_0"

在Lucene的索引文件结构一章做过详细介绍,存储域和词向量可以和索引域存储在不同的段中。

6.2、将缓存的内容写入段

代码:

flushedDocCount = docWriter.flush(flushDocStores);

此过程又包含以下两个阶段;

  • 按照基本索引链关闭存储域和词向量信息
  • 按照基本索引链的结构将索引结果写入段

6.2.1、按照基本索引链关闭存储域和词向量信息

代码为:

closeDocStore();

flushState.numDocsInStore = 0;

其主要是根据基本索引链结构,关闭存储域和词向量信息:

  • consumer(DocFieldProcessor).closeDocStore(flushState);
    • consumer(DocInverter).closeDocStore(state);
      • consumer(TermsHash).closeDocStore(state);
        • consumer(FreqProxTermsWriter).closeDocStore(state);
        • if (nextTermsHash != null) nextTermsHash.closeDocStore(state);
          • consumer(TermVectorsTermsWriter).closeDocStore(state);
      • endConsumer(NormsWriter).closeDocStore(state);
    • fieldsWriter(StoredFieldsWriter).closeDocStore(state);

其中有实质意义的是以下两个closeDocStore:

  • 词向量的关闭:TermVectorsTermsWriter.closeDocStore(SegmentWriteState)

void closeDocStore(final SegmentWriteState state) throws IOException {

                     if (tvx != null) { 
            //为不保存词向量的文档在tvd文件中写入零。即便不保存词向量,在tvx, tvd中也保留一个位置 
            fill(state.numDocsInStore - docWriter.getDocStoreOffset()); 
            //关闭tvx, tvf, tvd文件的写入流 
            tvx.close(); 
            tvf.close(); 
            tvd.close(); 
            tvx = null; 
            //记录写入的文件名,为以后生成cfs文件的时候,将这些写入的文件生成一个统一的cfs文件。 
            state.flushedFiles.add(state.docStoreSegmentName + "." + IndexFileNames.VECTORS_INDEX_EXTENSION); 
            state.flushedFiles.add(state.docStoreSegmentName + "." + IndexFileNames.VECTORS_FIELDS_EXTENSION); 
            state.flushedFiles.add(state.docStoreSegmentName + "." + IndexFileNames.VECTORS_DOCUMENTS_EXTENSION); 
            //从DocumentsWriter的成员变量openFiles中删除,未来可能被IndexFileDeleter删除 
            docWriter.removeOpenFile(state.docStoreSegmentName + "." + IndexFileNames.VECTORS_INDEX_EXTENSION); 
            docWriter.removeOpenFile(state.docStoreSegmentName + "." + IndexFileNames.VECTORS_FIELDS_EXTENSION); 
            docWriter.removeOpenFile(state.docStoreSegmentName + "." + IndexFileNames.VECTORS_DOCUMENTS_EXTENSION); 
            lastDocID = 0; 
        }     
}
  • 存储域的关闭:StoredFieldsWriter.closeDocStore(SegmentWriteState)

public void closeDocStore(SegmentWriteState state) throws IOException {

    //关闭fdx, fdt写入流

    fieldsWriter.close(); 
    --> fieldsStream.close(); 
    --> indexStream.close(); 
    fieldsWriter = null; 
    lastDocID = 0;

    //记录写入的文件名 
    state.flushedFiles.add(state.docStoreSegmentName + "." + IndexFileNames.FIELDS_EXTENSION); 
    state.flushedFiles.add(state.docStoreSegmentName + "." + IndexFileNames.FIELDS_INDEX_EXTENSION); 
    state.docWriter.removeOpenFile(state.docStoreSegmentName + "." + IndexFileNames.FIELDS_EXTENSION); 
    state.docWriter.removeOpenFile(state.docStoreSegmentName + "." + IndexFileNames.FIELDS_INDEX_EXTENSION); 
}


6.2.2、按照基本索引链的结构将索引结果写入段

代码为:

consumer(DocFieldProcessor).flush(threads, flushState);

    //回收fieldHash,以便用于下一轮的索引,为提高效率,索引链中的对象是被复用的。

    Map> childThreadsAndFields = new HashMap>(); 
    for ( DocConsumerPerThread thread : threads) { 
        DocFieldProcessorPerThread perThread = (DocFieldProcessorPerThread) thread; 
        childThreadsAndFields.put(perThread.consumer, perThread.fields()); 
        perThread.trimFields(state); 
    }

    //写入存储域

    --> fieldsWriter(StoredFieldsWriter).flush(state);

    //写入索引域

    --> consumer(DocInverter).flush(childThreadsAndFields, state);

    //写入域元数据信息,并记录写入的文件名,以便以后生成cfs文件

    --> final String fileName = state.segmentFileName(IndexFileNames.FIELD_INFOS_EXTENSION);

    --> fieldInfos.write(state.directory, fileName);

    --> state.flushedFiles.add(fileName);

此过程也是按照基本索引链来的:

  • consumer(DocFieldProcessor).flush(…);
    • consumer(DocInverter).flush(…);
      • consumer(TermsHash).flush(…);
        • consumer(FreqProxTermsWriter).flush(…);
        • if (nextTermsHash != null) nextTermsHash.flush(…);
          • consumer(TermVectorsTermsWriter).flush(…);
      • endConsumer(NormsWriter).flush(…);
    • fieldsWriter(StoredFieldsWriter).flush(…);

6.2.2.1、写入存储域

代码为:

StoredFieldsWriter.flush(SegmentWriteState state) { 
    if (state.numDocsInStore > 0) { 
      initFieldsWriter(); 
      fill(state.numDocsInStore - docWriter.getDocStoreOffset()); 
    } 
    if (fieldsWriter != null) 
      fieldsWriter.flush(); 
  }

从代码中可以看出,是写入fdx, fdt两个文件,但是在上述的closeDocStore已经写入了,并且把state.numDocsInStore置零,fieldsWriter设为null,在这里其实什么也不做。

6.2.2.2、写入索引域

代码为:

DocInverter.flush(Map>, SegmentWriteState)

    //写入倒排表及词向量信息

    --> consumer(TermsHash).flush(childThreadsAndFields, state);

    //写入标准化因子

    --> endConsumer(NormsWriter).flush(endChildThreadsAndFields, state);

6.2.2.2.1、写入倒排表及词向量信息

代码为:

TermsHash.flush(Map>, SegmentWriteState)

    //写入倒排表信息

    --> consumer(FreqProxTermsWriter).flush(childThreadsAndFields, state);

   //回收RawPostingList

    --> shrinkFreePostings(threadsAndFields, state);

    //写入词向量信息

    --> if (nextTermsHash != null) nextTermsHash.flush(nextThreadsAndFields, state);

          --> consumer(TermVectorsTermsWriter).flush(childThreadsAndFields, state);

6.2.2.2.1.1、写入倒排表信息

代码为:

FreqProxTermsWriter.flush(Map                                       Collection>, SegmentWriteState)

    (a) 所有域按名称排序,使得同名域能够一起处理

    Collections.sort(allFields);

    final int numAllFields = allFields.size();

    (b) 生成倒排表的写对象

    final FormatPostingsFieldsConsumer consumer = new FormatPostingsFieldsWriter(state, fieldInfos);

    int start = 0;

    (c) 对于每一个域

    while(start < numAllFields) {

        (c-1) 找出所有的同名域

        final FieldInfo fieldInfo = allFields.get(start).fieldInfo;

        final String fieldName = fieldInfo.name;

        int end = start+1;

        while(end < numAllFields && allFields.get(end).fieldInfo.name.equals(fieldName))

            end++;

        FreqProxTermsWriterPerField[] fields = new FreqProxTermsWriterPerField[end-start];

        for(int i=start;i

            fields[i-start] = allFields.get(i);

            fieldInfo.storePayloads |= fields[i-start].hasPayloads;

        }

        (c-2) 将同名域的倒排表添加到文件

        appendPostings(fields, consumer);

       (c-3) 释放空间

        for(int i=0;i

            TermsHashPerField perField = fields[i].termsHashPerField;

            int numPostings = perField.numPostings;

            perField.reset();

            perField.shrinkHash(numPostings);

            fields[i].reset();

        }

        start = end;

    }

    (d) 关闭倒排表的写对象

    consumer.finish();

(b) 生成倒排表的写对象

代码为:

public FormatPostingsFieldsWriter(SegmentWriteState state, FieldInfos fieldInfos) throws IOException { 
    dir = state.directory; 
    segment = state.segmentName; 
    totalNumDocs = state.numDocs; 
    this.fieldInfos = fieldInfos; 
    //用于写tii,tis 
    termsOut = new TermInfosWriter(dir, segment, fieldInfos, state.termIndexInterval); 
    //用于写freq, prox的跳表  
    skipListWriter = new DefaultSkipListWriter(termsOut.skipInterval, termsOut.maxSkipLevels, totalNumDocs, null, null); 
    //记录写入的文件名, 
    state.flushedFiles.add(state.segmentFileName(IndexFileNames.TERMS_EXTENSION)); 
    state.flushedFiles.add(state.segmentFileName(IndexFileNames.TERMS_INDEX_EXTENSION));  
    //用以上两个写对象,按照一定的格式写入段 
    termsWriter = new FormatPostingsTermsWriter(state, this); 
}

对象结构如下:

consumer    FormatPostingsFieldsWriter  (id=119)  //用于处理一个域 
    dir    SimpleFSDirectory  (id=126)   //目标索引文件夹 
    totalNumDocs    8   //文档总数 
    fieldInfos    FieldInfos  (id=70)  //域元数据信息   
    segment    "_0"   //段名 
    skipListWriter    DefaultSkipListWriter  (id=133)  //freq, prox中跳表的写对象   
    termsOut    TermInfosWriter  (id=125)  //tii, tis文件的写对象 
    termsWriter    FormatPostingsTermsWriter  (id=135)  //用于添加词(Term) 
        currentTerm    null    
        currentTermStart    0    
        fieldInfo    null    
        freqStart    0    
        proxStart    0    
        termBuffer    null    
        termsOut    TermInfosWriter  (id=125)    
        docsWriter    FormatPostingsDocsWriter  (id=139)  //用于写入此词的docid, freq信息 
            df    0    
            fieldInfo    null    
            freqStart    0    
            lastDocID    0    
            omitTermFreqAndPositions    false    
            out    SimpleFSDirectory$SimpleFSIndexOutput  (id=144)    
            skipInterval    16    
            skipListWriter    DefaultSkipListWriter  (id=133)    
            storePayloads    false    
            termInfo    TermInfo  (id=151)    
            totalNumDocs    8     
            posWriter    FormatPostingsPositionsWriter  (id=146)  //用于写入此词在此文档中的位置信息   
                lastPayloadLength    -1    
                lastPosition    0    
                omitTermFreqAndPositions    false    
                out    SimpleFSDirectory$SimpleFSIndexOutput  (id=157)    
                parent    FormatPostingsDocsWriter  (id=139)    
                storePayloads    false   
  • FormatPostingsFieldsWriter.addField(FieldInfo field)用于添加索引域信息,其返回FormatPostingsTermsConsumer用于添加词信息
  • FormatPostingsTermsConsumer.addTerm(char[] text, int start)用于添加词信息,其返回FormatPostingsDocsConsumer用于添加freq信息
  • FormatPostingsDocsConsumer.addDoc(int docID, int termDocFreq)用于添加freq信息,其返回FormatPostingsPositionsConsumer用于添加prox信息
  • FormatPostingsPositionsConsumer.addPosition(int position, byte[] payload, int payloadOffset, int payloadLength)用于添加prox信息

(c-2) 将同名域的倒排表添加到文件

代码为:

 

FreqProxTermsWriter.appendPostings(FreqProxTermsWriterPerField[], FormatPostingsFieldsConsumer) {

    int numFields = fields.length;

    final FreqProxFieldMergeState[] mergeStates = new FreqProxFieldMergeState[numFields];

    for(int i=0;i

      FreqProxFieldMergeState fms = mergeStates[i] = new FreqProxFieldMergeState(fields[i]);

      boolean result = fms.nextTerm(); //对所有的域,取第一个词(Term)

    }

    (1) 添加此域,虽然有多个域,但是由于是同名域,只取第一个域的信息即可。返回的是用于添加此域中的词的对象。

    final FormatPostingsTermsConsumer termsConsumer = consumer.addField(fields[0].fieldInfo);

    FreqProxFieldMergeState[] termStates = new FreqProxFieldMergeState[numFields];

    final boolean currentFieldOmitTermFreqAndPositions = fields[0].fieldInfo.omitTermFreqAndPositions;

    (2) 此while循环是遍历每一个尚有未处理的词的域,依次按照词典顺序处理这些域所包含的词。当一个域中的所有的词都被处理过后,则numFields减一,并从mergeStates数组中移除此域。直到所有的域的所有的词都处理完毕,方才退出此循环。

    while(numFields > 0) {

       (2-1) 找出所有域中按字典顺序的下一个词。可能多个同名域中,都包含同一个term,因而要遍历所有的numFields,得到所有的域里的下一个词,numToMerge即为有多少个域包含此词。

      termStates[0] = mergeStates[0];

      int numToMerge = 1;

      for(int i=1;i

        final char[] text = mergeStates[i].text;

        final int textOffset = mergeStates[i].textOffset;

        final int cmp = compareText(text, textOffset, termStates[0].text, termStates[0].textOffset);

        if (cmp < 0) {

          termStates[0] = mergeStates[i];

          numToMerge = 1;

        } else if (cmp == 0)

          termStates[numToMerge++] = mergeStates[i];

      }

      (2-2) 添加此词,返回FormatPostingsDocsConsumer用于添加文档号(doc ID)及词频信息(freq)

      final FormatPostingsDocsConsumer docConsumer = termsConsumer.addTerm(termStates[0].text, termStates[0].textOffset);

      (2-3) 由于共numToMerge个域都包含此词,每个词都有一个链表的文档号表示包含这些词的文档。此循环遍历所有的包含此词的域,依次按照从小到大的循序添加包含此词的文档号及词频信息。当一个域中对此词的所有文档号都处理过了,则numToMerge减一,并从termStates数组中移除此域。当所有包含此词的域的所有文档号都处理过了,则结束此循环。

      while(numToMerge > 0) {

        (2-3-1) 找出最小的文档号

        FreqProxFieldMergeState minState = termStates[0];

        for(int i=1;i

          if (termStates[i].docID < minState.docID)

            minState = termStates[i];

        final int termDocFreq = minState.termFreq;

        (2-3-2) 添加文档号及词频信息,并形成跳表,返回FormatPostingsPositionsConsumer用于添加位置(prox)信息

        final FormatPostingsPositionsConsumer posConsumer = docConsumer.addDoc(minState.docID, termDocFreq);

        //ByteSliceReader是用于读取bytepool中的prox信息的。

        final ByteSliceReader prox = minState.prox;

        if (!currentFieldOmitTermFreqAndPositions) {

          int position = 0;

          (2-3-3) 此循环对包含此词的文档,添加位置信息

          for(int j=0;j

            final int code = prox.readVInt();

            position += code >> 1;

            final int payloadLength;

            // 如果此位置有payload信息,则从bytepool中读出,否则设为零。

            if ((code & 1) != 0) {

              payloadLength = prox.readVInt();

              if (payloadBuffer == null || payloadBuffer.length < payloadLength)

                payloadBuffer = new byte[payloadLength];

              prox.readBytes(payloadBuffer, 0, payloadLength);

            } else

              payloadLength = 0;

              //添加位置(prox)信息

              posConsumer.addPosition(position, payloadBuffer, 0, payloadLength);

          }

          posConsumer.finish();

        }

       (2-3-4) 判断退出条件,上次选中的域取得下一个文档号,如果没有,则说明此域包含此词的文档已经处理完毕,则从termStates中删除此域,并将numToMerge减一。然后此域取得下一个词,当循环到(2)的时候,表明此域已经开始处理下一个词。如果没有下一个词,说明此域中的所有的词都处理完毕,则从mergeStates中删除此域,并将numFields减一,当numFields为0的时候,循环(2)也就结束了。

        if (!minState.nextDoc()) {//获得下一个docid

          //如果此域包含此词的文档已经没有下一篇docid,则从数组termStates中移除,numToMerge减一。

          int upto = 0;

          for(int i=0;i

            if (termStates[i] != minState)

              termStates[upto++] = termStates[i];

          numToMerge--;

          //此域则取下一个词(term),在循环(2)处来参与下一个词的合并

          if (!minState.nextTerm()) {

            //如果此域没有下一个词了,则此域从数组mergeStates中移除,numFields减一。

            upto = 0;

            for(int i=0;i

              if (mergeStates[i] != minState)

                mergeStates[upto++] = mergeStates[i];

            numFields--;

          }

        }

      }

      (2-4) 经过上面的过程,docid和freq信息虽已经写入段文件,而跳表信息并没有写到文件中,而是写入skip buffer里面了,此处真正写入文件。并且词典(tii, tis)也应该写入文件。

      docConsumer(FormatPostingsDocsWriter).finish();

    }

    termsConsumer.finish();

  }

(2-3-4) 获得下一篇文档号代码如下:

 

public boolean nextDoc() {//如何获取下一个docid

  if (freq.eof()) {//如果bytepool中的freq信息已经读完

    if (p.lastDocCode != -1) {//由上述缓存管理,PostingList里面还存着最后一篇文档的文档号及词频信息,则将最后一篇文档返回

      docID = p.lastDocID;

      if (!field.omitTermFreqAndPositions)

        termFreq = p.docFreq;

      p.lastDocCode = -1;

      return true;

    } else

      return false;//没有下一篇文档

  }

  final int code = freq.readVInt();//如果bytepool中的freq信息尚未读完

  if (field.omitTermFreqAndPositions)

    docID += code;

  else {

    //读出文档号及词频信息。

    docID += code >>> 1;

    if ((code & 1) != 0)

      termFreq = 1;

    else

      termFreq = freq.readVInt();

  }

  return true;

}

(2-3-2) 添加文档号及词频信息代码如下:

 

FormatPostingsPositionsConsumer FormatPostingsDocsWriter.addDoc(int docID, int termDocFreq) {

    final int delta = docID - lastDocID;

    //当文档数量达到skipInterval倍数的时候,添加跳表项。

    if ((++df % skipInterval) == 0) {

      skipListWriter.setSkipData(lastDocID, storePayloads, posWriter.lastPayloadLength);

      skipListWriter.bufferSkip(df);

   }

   lastDocID = docID;

   if (omitTermFreqAndPositions)

     out.writeVInt(delta);

   else if (1 == termDocFreq)

     out.writeVInt((delta<<1) | 1);

   else {

     //写入文档号及词频信息。

     out.writeVInt(delta<<1);

     out.writeVInt(termDocFreq);

   }

   return posWriter;

}

(2-3-3) 添加位置信息:

 

FormatPostingsPositionsWriter.addPosition(int position, byte[] payload, int payloadOffset, int payloadLength) {

    final int delta = position - lastPosition;

    lastPosition = position;

    if (storePayloads) {

        //保存位置及payload信息

        if (payloadLength != lastPayloadLength) {

            lastPayloadLength = payloadLength;

            out.writeVInt((delta<<1)|1);

            out.writeVInt(payloadLength);

        } else

            out.writeVInt(delta << 1);

            if (payloadLength > 0)

                out.writeBytes(payload, payloadLength);

    } else

        out.writeVInt(delta);

}

(2-4) 将跳表和词典(tii, tis)写入文件

FormatPostingsDocsWriter.finish() {

    //将跳表缓存写入文件

    long skipPointer = skipListWriter.writeSkip(out);

    if (df > 0) {

      //将词典(terminfo)写入tii,tis文件

      parent.termsOut(TermInfosWriter).add(fieldInfo.number, utf8.result, utf8.length, termInfo);

    }

  }

将跳表缓存写入文件:

DefaultSkipListWriter(MultiLevelSkipListWriter).writeSkip(IndexOutput)  {

    long skipPointer = output.getFilePointer();

    if (skipBuffer == null || skipBuffer.length == 0) return skipPointer;

    //正如我们在索引文件格式中分析的那样, 高层在前,低层在后,除最低层外,其他的层都有长度保存。

    for (int level = numberOfSkipLevels - 1; level > 0; level--) {

      long length = skipBuffer[level].getFilePointer();

      if (length > 0) {

        output.writeVLong(length);

        skipBuffer[level].writeTo(output);

      }

    }

    //写入最低层

    skipBuffer[0].writeTo(output);

    return skipPointer;

  }

将词典(terminfo)写入tii,tis文件:

  • tii文件是tis文件的类似跳表的东西,是在tis文件中每隔indexInterval个词提取出一个词放在tii文件中,以便很快的查找到词。
  • 因而TermInfosWriter类型中有一个成员变量other也是TermInfosWriter类型的,还有一个成员变量isIndex来表示此对象是用来写tii文件的还是用来写tis文件的。
  • 如果一个TermInfosWriter对象的isIndex=false则,它是用来写tis文件的,它的other指向的是用来写tii文件的TermInfosWriter对象
  • 如果一个TermInfosWriter对象的isIndex=true则,它是用来写tii文件的,它的other指向的是用来写tis文件的TermInfosWriter对象

TermInfosWriter.add (int fieldNumber, byte[] termBytes, int termBytesLength, TermInfo ti) {

    //如果词的总数是indexInterval的倍数,则应该写入tii文件

    if (!isIndex && size % indexInterval == 0)

      other.add(lastFieldNumber, lastTermBytes, lastTermBytesLength, lastTi);

    //将词写入tis文件

    writeTerm(fieldNumber, termBytes, termBytesLength);

    output.writeVInt(ti.docFreq);                       // write doc freq

    output.writeVLong(ti.freqPointer - lastTi.freqPointer); // write pointers

    output.writeVLong(ti.proxPointer - lastTi.proxPointer);

    if (ti.docFreq >= skipInterval) {

      output.writeVInt(ti.skipOffset);

    }

    if (isIndex) {

      output.writeVLong(other.output.getFilePointer() - lastIndexPointer);

      lastIndexPointer = other.output.getFilePointer(); // write pointer

    }

    lastFieldNumber = fieldNumber;

    lastTi.set(ti);

    size++;

  }

6.2.2.2.1.2、写入词向量信息

代码为:

TermVectorsTermsWriter.flush (Map> 
                                              threadsAndFields, final SegmentWriteState state) {

    if (tvx != null) {

      if (state.numDocsInStore > 0)

        fill(state.numDocsInStore - docWriter.getDocStoreOffset());

      tvx.flush();

      tvd.flush();

      tvf.flush();

    }

    for (Map.Entry> entry : 
                                                                                                                                      threadsAndFields.entrySet()) {

      for (final TermsHashConsumerPerField field : entry.getValue() ) {

        TermVectorsTermsWriterPerField perField = (TermVectorsTermsWriterPerField) field;

        perField.termsHashPerField.reset();

        perField.shrinkHash();

      }

      TermVectorsTermsWriterPerThread perThread = (TermVectorsTermsWriterPerThread) entry.getKey();

      perThread.termsHashPerThread.reset(true);

    }

  }

从代码中可以看出,是写入tvx, tvd, tvf三个文件,但是在上述的closeDocStore已经写入了,并且把tvx设为null,在这里其实什么也不做,仅仅是清空postingsHash,以便进行下一轮索引时重用此对象。

6.2.2.2.2、写入标准化因子

代码为:

NormsWriter.flush(Map> threadsAndFields,

                           SegmentWriteState state) {

    final Map> byField = new HashMap>();

    for (final Map.Entry> entry : threadsAndFields.entrySet()) {

     //遍历所有的域,将同名域对应的NormsWriterPerField放到同一个链表中。

      final Collection fields = entry.getValue();

      final Iterator fieldsIt = fields.iterator();

      while (fieldsIt.hasNext()) {

        final NormsWriterPerField perField = (NormsWriterPerField) fieldsIt.next();

        List l = byField.get(perField.fieldInfo);

        if (l == null) {

            l = new ArrayList();

            byField.put(perField.fieldInfo, l);

        }

        l.add(perField);

    }

    //记录写入的文件名,方便以后生成cfs文件。

    final String normsFileName = state.segmentName + "." + IndexFileNames.NORMS_EXTENSION;

    state.flushedFiles.add(normsFileName);

    IndexOutput normsOut = state.directory.createOutput(normsFileName);

    try {

      //写入nrm文件头

      normsOut.writeBytes(SegmentMerger.NORMS_HEADER, 0, SegmentMerger.NORMS_HEADER.length);

      final int numField = fieldInfos.size();

      int normCount = 0;

      //对每一个域进行处理

      for(int fieldNumber=0;fieldNumber

        final FieldInfo fieldInfo = fieldInfos.fieldInfo(fieldNumber);

        //得到同名域的链表

        List toMerge = byField.get(fieldInfo);

        int upto = 0;

        if (toMerge != null) {

          final int numFields = toMerge.size();

          normCount++;

          final NormsWriterPerField[] fields = new NormsWriterPerField[numFields];

          int[] uptos = new int[numFields];

          for(int j=0;j

            fields[j] = toMerge.get(j);

          int numLeft = numFields;

          //处理同名的多个域

          while(numLeft > 0) {

            //得到所有的同名域中最小的文档号

            int minLoc = 0;

            int minDocID = fields[0].docIDs[uptos[0]];

            for(int j=1;j

              final int docID = fields[j].docIDs[uptos[j]];

              if (docID < minDocID) {

                minDocID = docID;

                minLoc = j;

              }

            }

            // 在nrm文件中,每一个文件都有一个位置,没有设定的,放入默认值

            for (;upto<minDocID;upto++)

              normsOut.writeByte(defaultNorm);

            //写入当前的nrm值

            normsOut.writeByte(fields[minLoc].norms[uptos[minLoc]]);

            (uptos[minLoc])++;

            upto++;

            //如果当前域的文档已经处理完毕,则numLeft减一,归零时推出循环。

            if (uptos[minLoc] == fields[minLoc].upto) {

              fields[minLoc].reset();

              if (minLoc != numLeft-1) {

                fields[minLoc] = fields[numLeft-1];

                uptos[minLoc] = uptos[numLeft-1];

              }

              numLeft--;

            }

          }

          // 对所有的未设定nrm值的文档写入默认值。

          for(;upto

            normsOut.writeByte(defaultNorm);

        } else if (fieldInfo.isIndexed && !fieldInfo.omitNorms) {

          normCount++;

          // Fill entire field with default norm:

          for(;upto

            normsOut.writeByte(defaultNorm);

        }

      }

    } finally {

      normsOut.close();

    }

  }

6.2.2.3、写入域元数据

代码为:

FieldInfos.write(IndexOutput) {

    output.writeVInt(CURRENT_FORMAT);

    output.writeVInt(size());

    for (int i = 0; i < size(); i++) {

      FieldInfo fi = fieldInfo(i);

      byte bits = 0x0;

      if (fi.isIndexed) bits |= IS_INDEXED;

      if (fi.storeTermVector) bits |= STORE_TERMVECTOR;

      if (fi.storePositionWithTermVector) bits |= STORE_POSITIONS_WITH_TERMVECTOR;

      if (fi.storeOffsetWithTermVector) bits |= STORE_OFFSET_WITH_TERMVECTOR;

      if (fi.omitNorms) bits |= OMIT_NORMS;

      if (fi.storePayloads) bits |= STORE_PAYLOADS;

      if (fi.omitTermFreqAndPositions) bits |= OMIT_TERM_FREQ_AND_POSITIONS;

      output.writeString(fi.name);

      output.writeByte(bits);

    }

}

此处基本就是按照fnm文件的格式写入的。

6.3、生成新的段信息对象

代码:

newSegment = new SegmentInfo(segment, flushedDocCount, directory, false, true, docStoreOffset, docStoreSegment, docStoreIsCompoundFile, docWriter.hasProx());

segmentInfos.add(newSegment);

 

6.4、准备删除文档

代码:

docWriter.pushDeletes();

    --> deletesFlushed.update(deletesInRAM);

此处将deletesInRAM全部加到deletesFlushed中,并把deletesInRAM清空。原因上面已经阐明。

6.5、生成cfs段

代码:

docWriter.createCompoundFile(segment);

newSegment.setUseCompoundFile(true);

代码为:

DocumentsWriter.createCompoundFile(String segment) {

    CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, segment + "." + IndexFileNames.COMPOUND_FILE_EXTENSION);

    //将上述中记录的文档名全部加入cfs段的写对象。

    for (final String flushedFile : flushState.flushedFiles)

      cfsWriter.addFile(flushedFile);

    cfsWriter.close();

  }

6.6、删除文档

代码:

applyDeletes();

代码为:

boolean applyDeletes(SegmentInfos infos) {

  if (!hasDeletes())

    return false;

  final int infosEnd = infos.size();

  int docStart = 0;

  boolean any = false;

  for (int i = 0; i < infosEnd; i++) {

    assert infos.info(i).dir == directory;

    SegmentReader reader = writer.readerPool.get(infos.info(i), false);

    try {

      any |= applyDeletes(reader, docStart);

      docStart += reader.maxDoc();

    } finally {

      writer.readerPool.release(reader);

    }

  }

  deletesFlushed.clear();

  return any;

}

  • Lucene删除文档可以用reader,也可以用writer,但是归根结底还是用reader来删除的。
  • reader的删除有以下三种方式:
    • 按照词删除,删除所有包含此词的文档。
    • 按照文档号删除。
    • 按照查询对象删除,删除所有满足此查询的文档。
  • 但是这三种方式归根结底还是按照文档号删除,也就是写.del文件的过程。

 

private final synchronized boolean applyDeletes(IndexReader reader, int docIDStart)

  throws CorruptIndexException, IOException {

  final int docEnd = docIDStart + reader.maxDoc();

  boolean any = false;

  //按照词删除,删除所有包含此词的文档。

  TermDocs docs = reader.termDocs();

  try {

    for (Entry entry: deletesFlushed.terms.entrySet()) {

      Term term = entry.getKey();

      docs.seek(term);

      int limit = entry.getValue().getNum();

      while (docs.next()) {

        int docID = docs.doc();

        if (docIDStart+docID >= limit)

          break;

        reader.deleteDocument(docID);

        any = true;

      }

    }

  } finally {

    docs.close();

  }

  //按照文档号删除。

  for (Integer docIdInt : deletesFlushed.docIDs) {

    int docID = docIdInt.intValue();

    if (docID >= docIDStart && docID < docEnd) {

      reader.deleteDocument(docID-docIDStart);

      any = true;

    }

  }

  //按照查询对象删除,删除所有满足此查询的文档。

  IndexSearcher searcher = new IndexSearcher(reader);

  for (Entry entry : deletesFlushed.queries.entrySet()) {

    Query query = entry.getKey();

    int limit = entry.getValue().intValue();

    Weight weight = query.weight(searcher);

    Scorer scorer = weight.scorer(reader, true, false);

    if (scorer != null) {

      while(true)  {

        int doc = scorer.nextDoc();

        if (((long) docIDStart) + doc >= limit)

          break;

        reader.deleteDocument(doc);

        any = true;

      }

    }

  }

  searcher.close();

  return any;

}

  评论这张
 
阅读(880)| 评论(0)
推荐 转载

历史上的今天

在LOFTER的更多文章

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017