* Interface for an object that can be incremented, breaking after some
* configured limit has been reached.
public interface CircuitBreaker {
* The parent breaker is a sum of all the following breakers combined. With
* this we allow a single breaker to have a significant amount of memory
* available while still having a "total" limit for all breakers. Note that
* it's not a "real" breaker in that it cannot be added to or subtracted
* from by itself.
String PARENT = "parent";
* The fielddata breaker tracks data used for fielddata (on fields) as well
* as the id cached used for parent/child queries.
String FIELDDATA = "fielddata";
* The request breaker tracks memory used for particular requests. This
* includes allocations for things like the cardinality aggregation, and
* accounting for the number of buckets used in an aggregation request.
* Generally the amounts added to this breaker are released after a request
* is finished.
String REQUEST = "request";
* The in-flight request breaker tracks bytes allocated for reading and
* writing requests on the network layer.
String IN_FLIGHT_REQUESTS = "in_flight_requests";
* The accounting breaker tracks things held in memory that is independent
* of the request lifecycle. This includes memory used by Lucene for
* segments.
String ACCOUNTING = "accounting";
enum Type {
// A regular or ChildMemoryCircuitBreaker
// A special parent-type for the hierarchy breaker service
// A breaker where every action is a noop, it never breaks
public static Type parseValue(String value) {
switch(value.toLowerCase(Locale.ROOT)) {
case "noop":
return Type.NOOP;
case "parent":
return Type.PARENT;
case "memory":
return Type.MEMORY;
throw new IllegalArgumentException("No CircuitBreaker with type: " + value);
enum Durability {
// The condition that tripped the circuit breaker fixes itself eventually.
// The condition that tripped the circuit breaker requires manual intervention.
* Trip the circuit breaker
* @param fieldName name of the field responsible for tripping the breaker
* @param bytesNeeded bytes asked for but unable to be allocated
void circuitBreak(String fieldName, long bytesNeeded);
* add bytes to the breaker and maybe trip
* @param bytes number of bytes to add
* @param label string label describing the bytes being added
* @return the number of "used" bytes for the circuit breaker
double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException;
* Adjust the circuit breaker without tripping
long addWithoutBreaking(long bytes);
* @return the currently used bytes the breaker is tracking
long getUsed();
* @return maximum number of bytes the circuit breaker can track before tripping
long getLimit();
* @return overhead of circuit breaker
double getOverhead();
* @return the number of times the circuit breaker has been tripped
long getTrippedCount();
* @return the name of the breaker
String getName();
* @return whether a tripped circuit breaker will reset itself (transient) or requires manual intervention (permanent).
Durability getDurability();
public class NoopCircuitBreaker implements CircuitBreaker {
public static final int LIMIT = -1;
private final String name;
public NoopCircuitBreaker(String name) {
this.name = name;
public void circuitBreak(String fieldName, long bytesNeeded) {
// noop
public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException {
return 0;
public long addWithoutBreaking(long bytes) {
return 0;
public long getUsed() {
return 0;
public long getLimit() {
return LIMIT;
public double getOverhead() {
return 0;
public long getTrippedCount() {
return 0;
public String getName() {
return this.name;
public Durability getDurability() {
return Durability.PERMANENT;
public class ChildMemoryCircuitBreaker implements CircuitBreaker {
private final long memoryBytesLimit;
private final double overheadConstant;
private final Durability durability;
private final AtomicLong used;
private final AtomicLong trippedCount;
private final Logger logger;
private final HierarchyCircuitBreakerService parent;
private final String name;
* Create a circuit breaker that will break if the number of estimated
* bytes grows above the limit. All estimations will be multiplied by
* the given overheadConstant. This breaker starts with 0 bytes used.
* @param settings settings to configure this breaker
* @param parent parent circuit breaker service to delegate tripped breakers to
* @param name the name of the breaker
public ChildMemoryCircuitBreaker(BreakerSettings settings, Logger logger,
HierarchyCircuitBreakerService parent, String name) {
this(settings, null, logger, parent, name);
* Create a circuit breaker that will break if the number of estimated
* bytes grows above the limit. All estimations will be multiplied by
* the given overheadConstant. Uses the given oldBreaker to initialize
* the starting offset.
* @param settings settings to configure this breaker
* @param parent parent circuit breaker service to delegate tripped breakers to
* @param name the name of the breaker
* @param oldBreaker the previous circuit breaker to inherit the used value from (starting offset)
public ChildMemoryCircuitBreaker(BreakerSettings settings, ChildMemoryCircuitBreaker oldBreaker,
Logger logger, HierarchyCircuitBreakerService parent, String name) {
this.name = name;
this.memoryBytesLimit = settings.getLimit();
this.overheadConstant = settings.getOverhead();
this.durability = settings.getDurability();
if (oldBreaker == null) {
this.used = new AtomicLong(0);
this.trippedCount = new AtomicLong(0);
} else {
this.used = oldBreaker.used;
this.trippedCount = oldBreaker.trippedCount;
this.logger = logger;
if (logger.isTraceEnabled()) {
logger.trace("creating ChildCircuitBreaker with settings {}", settings);
this.parent = parent;
* Method used to trip the breaker, delegates to the parent to determine
* whether to trip the breaker or not
public void circuitBreak(String fieldName, long bytesNeeded) {
final String message = "[" + this.name + "] Data too large, data for [" + fieldName + "]" +
" would be [" + bytesNeeded + "/" + new ByteSizeValue(bytesNeeded) + "]" +
", which is larger than the limit of [" +
memoryBytesLimit + "/" + new ByteSizeValue(memoryBytesLimit) + "]";
logger.debug("{}", message);
throw new CircuitBreakingException(message, bytesNeeded, memoryBytesLimit, durability);
* Add a number of bytes, tripping the circuit breaker if the aggregated
* estimates are above the limit. Automatically trips the breaker if the
* memory limit is set to 0. Will never trip the breaker if the limit is
* set < 0, but can still be used to aggregate estimations.
* @param bytes number of bytes to add to the breaker
* @return number of "used" bytes so far
public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException {
// short-circuit on no data allowed, immediately throwing an exception
if (memoryBytesLimit == 0) {
circuitBreak(label, bytes);
long newUsed;
// If there is no limit (-1), we can optimize a bit by using
// .addAndGet() instead of looping (because we don't have to check a
// limit), which makes the RamAccountingTermsEnum case faster.
if (this.memoryBytesLimit == -1) {
newUsed = noLimit(bytes, label);
} else {
newUsed = limit(bytes, label);
// Additionally, we need to check that we haven't exceeded the parent's limit
try {
parent.checkParentLimit((long) (bytes * overheadConstant), label);
} catch (CircuitBreakingException e) {
// If the parent breaker is tripped, this breaker has to be
// adjusted back down because the allocation is "blocked" but the
// breaker has already been incremented
throw e;
return newUsed;
private long noLimit(long bytes, String label) {
long newUsed;
newUsed = this.used.addAndGet(bytes);
if (logger.isTraceEnabled()) {
logger.trace("[{}] Adding [{}][{}] to used bytes [new used: [{}], limit: [-1b]]",
this.name, new ByteSizeValue(bytes), label, new ByteSizeValue(newUsed));
return newUsed;
private long limit(long bytes, String label) {
long newUsed;// Otherwise, check the addition and commit the addition, looping if
// there are conflicts. May result in additional logging, but it's
// trace logging and shouldn't be counted on for additions.
long currentUsed;
do {
currentUsed = this.used.get();
newUsed = currentUsed + bytes;
long newUsedWithOverhead = (long) (newUsed * overheadConstant);
if (logger.isTraceEnabled()) {
logger.trace("[{}] Adding [{}][{}] to used bytes [new used: [{}], limit: {} [{}], estimate: {} [{}]]",
new ByteSizeValue(bytes), label, new ByteSizeValue(newUsed),
memoryBytesLimit, new ByteSizeValue(memoryBytesLimit),
newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead));
if (memoryBytesLimit > 0 && newUsedWithOverhead > memoryBytesLimit) {
logger.warn("[{}] New used memory {} [{}] for data of [{}] would be larger than configured breaker: {} [{}], breaking",
newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead), label,
memoryBytesLimit, new ByteSizeValue(memoryBytesLimit));
circuitBreak(label, newUsedWithOverhead);
// Attempt to set the new used value, but make sure it hasn't changed
// underneath us, if it has, keep trying until we are able to set it
} while (!this.used.compareAndSet(currentUsed, newUsed));
return newUsed;
* Add an <b>exact</b> number of bytes, not checking for tripping the
* circuit breaker. This bypasses the overheadConstant multiplication.
* Also does not check with the parent breaker to see if the parent limit
* has been exceeded.
* @param bytes number of bytes to add to the breaker
* @return number of "used" bytes so far
public long addWithoutBreaking(long bytes) {
long u = used.addAndGet(bytes);
if (logger.isTraceEnabled()) {
logger.trace("[{}] Adjusted breaker by [{}] bytes, now [{}]", this.name, bytes, u);
assert u >= 0 : "Used bytes: [" + u + "] must be >= 0";
return u;
* @return the number of aggregated "used" bytes so far
public long getUsed() {
return this.used.get();
* @return the number of bytes that can be added before the breaker trips
public long getLimit() {
return this.memoryBytesLimit;
* @return the constant multiplier the breaker uses for aggregations
public double getOverhead() {
return this.overheadConstant;
* @return the number of times the breaker has been tripped
public long getTrippedCount() {
return this.trippedCount.get();
* @return the name of the breaker
public String getName() {
return this.name;
* @return whether a tripped circuit breaker will reset itself (transient) or requires manual intervention (permanent).
public Durability getDurability() {
return this.durability;
如有侵权,请联系 cloudcommunity@tencent.com 删除。
如有侵权,请联系 cloudcommunity@tencent.com 删除。