1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
| private void handlePrimaryResult(final PrimaryResultT primaryResult) { this.primaryResult = primaryResult; final ReplicaRequest replicaRequest = primaryResult.replicaRequest(); if (replicaRequest != null) { if (logger.isTraceEnabled()) { logger.trace("[{}] op [{}] completed on primary for request [{}]", primary.routingEntry().shardId(), opType, request); } // we have to get the replication group after successfully indexing into the primary in order to honour recovery semantics. // we have to make sure that every operation indexed into the primary after recovery start will also be replicated // to the recovery target. If we used an old replication group, we may miss a recovery that has started since then. // we also have to make sure to get the global checkpoint before the replication group, to ensure that the global checkpoint // is valid for this replication group. If we would sample in the reverse, the global checkpoint might be based on a subset // of the sampled replication group, and advanced further than what the given replication group would allow it to. // This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint. final long globalCheckpoint = primary.computedGlobalCheckpoint(); // we have to capture the max_seq_no_of_updates after this request was completed on the primary to make sure the value of // max_seq_no_of_updates on replica when this request is executed is at least the value on the primary when it was executed // on. final long maxSeqNoOfUpdatesOrDeletes = primary.maxSeqNoOfUpdatesOrDeletes(); assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "seqno_of_updates still uninitialized"; // 获取活跃的shard // https://github.com/jiankunking/elasticsearch/blob/master/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java#L70 final ReplicationGroup replicationGroup = primary.getReplicationGroup(); final PendingReplicationActions pendingReplicationActions = primary.getPendingReplicationActions(); markUnavailableShardsAsStale(replicaRequest, replicationGroup); // 并发写入 所有副本(in-sync set中的) performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup, pendingReplicationActions); }
private void performOnReplicas(final ReplicaRequest replicaRequest, final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes, final ReplicationGroup replicationGroup, final PendingReplicationActions pendingReplicationActions) { // for total stats, add number of unassigned shards and // number of initializing shards that are not ready yet to receive operations (recovery has not opened engine yet on the target) totalShards.addAndGet(replicationGroup.getSkippedShards().size());
final ShardRouting primaryRouting = primary.routingEntry();
for (final ShardRouting shard : replicationGroup.getReplicationTargets()) { if (shard.isSameAllocation(primaryRouting) == false) { performOnReplica(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, pendingReplicationActions); } } }
private void performOnReplica(final ShardRouting shard, final ReplicaRequest replicaRequest, final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes, final PendingReplicationActions pendingReplicationActions) { if (logger.isTraceEnabled()) { logger.trace("[{}] sending op [{}] to replica {} for request [{}]", shard.shardId(), opType, shard, replicaRequest); } totalShards.incrementAndGet(); pendingActions.incrementAndGet(); final ActionListener<ReplicaResponse> replicationListener = new ActionListener<>() { @Override public void onResponse(ReplicaResponse response) { successfulShards.incrementAndGet(); try { updateCheckPoints(shard, response::localCheckpoint, response::globalCheckpoint); } finally { decPendingAndFinishIfNeeded(); } }
@Override public void onFailure(Exception replicaException) { logger.trace(() -> new ParameterizedMessage( "[{}] failure while performing [{}] on replica {}, request [{}]", shard.shardId(), opType, shard, replicaRequest), replicaException); // Only report "critical" exceptions - TODO: Reach out to the master node to get the latest shard state then report. if (TransportActions.isShardNotAvailableException(replicaException) == false) { RestStatus restStatus = ExceptionsHelper.status(replicaException); shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure( shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false)); } String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard); // failShardIfNeeded 具体执行何种操作要看 replicasProxy的真正实现类: // 如果是WriteActionReplicasProxy,则会报告shard错误。 // 在写入场景中replicasProxy的真正实现类就是WriteActionReplicasProxy。 replicasProxy.failShardIfNeeded(shard, primaryTerm, message, replicaException, ActionListener.wrap(r -> decPendingAndFinishIfNeeded(), ReplicationOperation.this::onNoLongerPrimary)); }
@Override public String toString() { return "[" + replicaRequest + "][" + shard + "]"; } }; final String allocationId = shard.allocationId().getId(); final RetryableAction<ReplicaResponse> replicationAction = new RetryableAction<>(logger, threadPool, initialRetryBackoffBound, retryTimeout, replicationListener) {
@Override public void tryAction(ActionListener<ReplicaResponse> listener) { replicasProxy.performOn(shard, replicaRequest, primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, listener); }
@Override public void onFinished() { super.onFinished(); pendingReplicationActions.removeReplicationAction(allocationId, this); }
@Override public boolean shouldRetry(Exception e) { final Throwable cause = ExceptionsHelper.unwrapCause(e); return cause instanceof CircuitBreakingException || cause instanceof EsRejectedExecutionException || cause instanceof ConnectTransportException; } };
pendingReplicationActions.addPendingAction(allocationId, replicationAction); // 这里是提交到线程池,副本请求是并发的 replicationAction.run(); } /** * A proxy for <b>write</b> operations that need to be performed on the * replicas, where a failure to execute the operation should fail * the replica shard and/or mark the replica as stale. * * This extends {@code TransportReplicationAction.ReplicasProxy} to do the * failing and stale-ing. */ class WriteActionReplicasProxy extends ReplicasProxy {
// 注意 // 1、如果写入副本节点失败,则主节点将问题报告给主节点, // 然后主节点更新Meta中索引的InSyncAllocations配置并删除副本节点。 // 也就是说 之后,它将不再处理读取请求。 // 在Meta更新到达每个节点之前,用户仍然可以在此副本节点上读取数据, // 但是在Meta更新完成之后不会发生。 // 这个解决方案并不严格。 考虑到ES是近乎实时的系统,因此在写入数据后,需要刷新才能使其可见。 // 因此,一般而言,可以在短时间内读取旧数据是可以接受的。 // 2、从代码中看一旦副本写入失败,就会触发shardStateAction.remoteShardFailed,从而引起shard下线 // 3、https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-replication.html#basic-write-model @Override public void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception, ActionListener<Void> listener) { if (TransportActions.isShardNotAvailableException(exception) == false) { logger.warn(new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception); } shardStateAction.remoteShardFailed( replica.shardId(), replica.allocationId().getId(), primaryTerm, true, message, exception, listener); }
@Override public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, ActionListener<Void> listener) { shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null, listener); } }
/** * A proxy for <b>write</b> operations that need to be performed on the * replicas, where a failure to execute the operation should fail * the replica shard and/or mark the replica as stale. * * This extends {@code TransportReplicationAction.ReplicasProxy} to do the * failing and stale-ing. */ class WriteActionReplicasProxy extends ReplicasProxy {
@Override public void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception, ActionListener<Void> listener) { if (TransportActions.isShardNotAvailableException(exception) == false) { logger.warn(new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception); } shardStateAction.remoteShardFailed( replica.shardId(), replica.allocationId().getId(), primaryTerm, true, message, exception, listener); }
@Override public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, ActionListener<Void> listener) { shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null, listener); } }
|