1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
| /** * Executes bulk item requests and handles request execution exceptions. * @return {@code true} if request completed on this thread and the listener was invoked, {@code false} if the request triggered * a mapping update that will finish and invoke the listener on a different thread */ static boolean executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier, MappingUpdatePerformer mappingUpdater, Consumer<ActionListener<Void>> waitForMappingUpdate, ActionListener<Void> itemDoneListener) throws Exception { final DocWriteRequest.OpType opType = context.getCurrent().opType();
final UpdateHelper.Result updateResult; if (opType == DocWriteRequest.OpType.UPDATE) { final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent(); try { // updateResult = updateHelper.prepare(updateRequest, context.getPrimary(), nowInMillisSupplier); } catch (Exception failure) { // we may fail translating a update to index or delete operation // we use index result to communicate failure while translating update request final Engine.Result result = new Engine.IndexResult(failure, updateRequest.version()); context.setRequestToExecute(updateRequest); context.markOperationAsExecuted(result); context.markAsCompleted(context.getExecutionResult()); return true; } // execute translated update request switch (updateResult.getResponseResult()) { case CREATED: case UPDATED: IndexRequest indexRequest = updateResult.action(); IndexMetadata metadata = context.getPrimary().indexSettings().getIndexMetadata(); MappingMetadata mappingMd = metadata.mapping(); indexRequest.process(metadata.getCreationVersion(), mappingMd, updateRequest.concreteIndex()); context.setRequestToExecute(indexRequest); break; case DELETED: context.setRequestToExecute(updateResult.action()); break; case NOOP: context.markOperationAsNoOp(updateResult.action()); context.markAsCompleted(context.getExecutionResult()); return true; default: throw new IllegalStateException("Illegal update operation " + updateResult.getResponseResult()); } } else { context.setRequestToExecute(context.getCurrent()); updateResult = null; }
assert context.getRequestToExecute() != null; // also checks that we're in TRANSLATED state
final IndexShard primary = context.getPrimary(); final long version = context.getRequestToExecute().version(); final boolean isDelete = context.getRequestToExecute().opType() == DocWriteRequest.OpType.DELETE; final Engine.Result result; if (isDelete) { final DeleteRequest request = context.getRequestToExecute(); result = primary.applyDeleteOperationOnPrimary(version, request.id(), request.versionType(), request.ifSeqNo(), request.ifPrimaryTerm()); } else { final IndexRequest request = context.getRequestToExecute(); result = primary.applyIndexOperationOnPrimary(version, request.versionType(), new SourceToParse( request.index(), request.id(), request.source(), request.getContentType(), request.routing()), request.ifSeqNo(), request.ifPrimaryTerm(), request.getAutoGeneratedTimestamp(), request.isRetry()); } if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
try { primary.mapperService().merge(MapperService.SINGLE_MAPPING_NAME, new CompressedXContent(result.getRequiredMappingUpdate(), XContentType.JSON, ToXContent.EMPTY_PARAMS), MapperService.MergeReason.MAPPING_UPDATE_PREFLIGHT); } catch (Exception e) { logger.info(() -> new ParameterizedMessage("{} mapping update rejected by primary", primary.shardId()), e); onComplete(exceptionToResult(e, primary, isDelete, version), context, updateResult); return true; }
mappingUpdater.updateMappings(result.getRequiredMappingUpdate(), primary.shardId(), new ActionListener<>() { @Override public void onResponse(Void v) { context.markAsRequiringMappingUpdate(); waitForMappingUpdate.accept( ActionListener.runAfter(new ActionListener<>() { @Override public void onResponse(Void v) { assert context.requiresWaitingForMappingUpdate(); context.resetForExecutionForRetry(); }
@Override public void onFailure(Exception e) { context.failOnMappingUpdate(e); } }, () -> itemDoneListener.onResponse(null)) ); }
@Override public void onFailure(Exception e) { onComplete(exceptionToResult(e, primary, isDelete, version), context, updateResult); // Requesting mapping update failed, so we don't have to wait for a cluster state update assert context.isInitial(); itemDoneListener.onResponse(null); } }); return false; } else { onComplete(result, context, updateResult); } return true; }
/** * Prepares an update request by converting it into an index or delete request or an update response (no action). */ public Result prepare(UpdateRequest request, IndexShard indexShard, LongSupplier nowInMillis) { // 这里是实时获取 // 获取结果最终会到InternalEngine // get(Get get, DocumentMapper mapper, Function<Engine.Searcher, Engine.Searcher> searcherWrapper) // 后面会附上 代码 final GetResult getResult = indexShard.getService().getForUpdate( request.id(), request.ifSeqNo(), request.ifPrimaryTerm()); return prepare(indexShard.shardId(), request, getResult, nowInMillis); }
public GetResult getForUpdate(String id, long ifSeqNo, long ifPrimaryTerm) { // realtime是true return get(id, new String[]{RoutingFieldMapper.NAME}, true, Versions.MATCH_ANY, VersionType.INTERNAL, ifSeqNo, ifPrimaryTerm, FetchSourceContext.FETCH_SOURCE); }
private GetResult get(String id, String[] gFields, boolean realtime, long version, VersionType versionType, long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext) { currentMetric.inc(); try { long now = System.nanoTime(); GetResult getResult = innerGet(id, gFields, realtime, version, versionType, ifSeqNo, ifPrimaryTerm, fetchSourceContext);
if (getResult.isExists()) { existsMetric.inc(System.nanoTime() - now); } else { missingMetric.inc(System.nanoTime() - now); } return getResult; } finally { currentMetric.dec(); } }
private GetResult innerGet(String id, String[] gFields, boolean realtime, long version, VersionType versionType, long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext) { fetchSourceContext = normalizeFetchSourceContent(fetchSourceContext, gFields);
Engine.GetResult get = indexShard.get(new Engine.Get(realtime, realtime, id) .version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)); assert get.isFromTranslog() == false || realtime : "should only read from translog if realtime enabled"; if (get.exists() == false) { get.close(); }
if (get == null || get.exists() == false) { return new GetResult(shardId.getIndexName(), id, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, -1, false, null, null, null); }
try { // break between having loaded it from translog (so we only have _source), and having a document to load return innerGetLoadFromStoredFields(id, gFields, fetchSourceContext, get, mapperService); } finally { get.close(); } }
public Engine.GetResult get(Engine.Get get) { readAllowed(); DocumentMapper mapper = mapperService.documentMapper(); if (mapper == null) { return GetResult.NOT_EXISTS; } return getEngine().get(get, mapper, this::wrapSearcher); }
/** * Prepares an update request by converting it into an index or delete request or an update response (no action, in the event of a * noop). */ protected Result prepare(ShardId shardId, UpdateRequest request, final GetResult getResult, LongSupplier nowInMillis) { if (getResult.isExists() == false) { // If the document didn't exist, execute the update request as an upsert return prepareUpsert(shardId, request, getResult, nowInMillis); } else if (getResult.internalSourceRef() == null) { // no source, we can't do anything, throw a failure... throw new DocumentSourceMissingException(shardId, request.id()); } else if (request.script() == null && request.doc() != null) { // The request has no script, it is a new doc that should be merged with the old document return prepareUpdateIndexRequest(shardId, request, getResult, request.detectNoop()); } else { // The request has a script (or empty script), execute the script and prepare a new index request return prepareUpdateScriptRequest(shardId, request, getResult, nowInMillis); } }