[ad_1]
By: Rajiv Shringi, Oleksii Tkachuk, Kartik Sathyanarayanan
In our earlier weblog publish, we launched Netflix’s TimeSequence Abstraction, a distributed service designed to retailer and question giant volumes of temporal occasion knowledge with low millisecond latencies. Today, we’re excited to current the Distributed Counter Abstraction. This counting service, constructed on prime of the TimeSequence Abstraction, permits distributed counting at scale whereas sustaining related low latency efficiency. As with all our abstractions, we use our Data Gateway Control Plane to shard, configure, and deploy this service globally.
Distributed counting is a difficult downside in pc science. In this weblog publish, we’ll discover the various counting necessities at Netflix, the challenges of reaching correct counts in close to real-time, and the rationale behind our chosen strategy, together with the required trade-offs.
Note: When it involves distributed counters, phrases similar to ‘accurate’ or ‘precise’ ought to be taken with a grain of salt. In this context, they seek advice from a depend very near correct, offered with minimal delays.
At Netflix, our counting use circumstances embrace monitoring thousands and thousands of person interactions, monitoring how usually particular options or experiences are proven to customers, and counting a number of sides of information throughout A/B take a look at experiments, amongst others.
At Netflix, these use circumstances could be categorised into two broad classes:
- Best-Effort: For this class, the depend doesn’t should be very correct or sturdy. However, this class requires near-immediate entry to the present depend at low latencies, all whereas maintaining infrastructure prices to a minimal.
- Eventually Consistent: This class wants correct and sturdy counts, and is prepared to tolerate a slight delay in accuracy and a barely greater infrastructure price as a trade-off.
Both classes share widespread necessities, similar to excessive throughput and excessive availability. The desk beneath gives an in depth overview of the various necessities throughout these two classes.
To meet the outlined necessities, the Counter Abstraction was designed to be extremely configurable. It permits customers to decide on between completely different counting modes, similar to Best-Effort or Eventually Consistent, whereas contemplating the documented trade-offs of every possibility. After deciding on a mode, customers can work together with APIs while not having to fret concerning the underlying storage mechanisms and counting strategies.
Let’s take a better take a look at the construction and performance of the API.
Counters are organized into separate namespaces that customers arrange for every of their particular use circumstances. Each namespace could be configured with completely different parameters, similar to Type of Counter, Time-To-Live (TTL), and Counter Cardinality, utilizing the service’s Control Plane.
The Counter Abstraction API resembles Java’s AtomicInteger interface:
AddCount/AddAndGetCount: Adjusts the depend for the required counter by the given delta worth inside a dataset. The delta worth could be constructive or damaging. The AddAndGetCount counterpart additionally returns the depend after performing the add operation.
{
"namespace": "my_dataset",
"counter_name": "counter123",
"delta": 2,
"idempotency_token": {
"token": "some_event_id",
"generation_time": "2024-10-05T14:48:00Z"
}
}
The idempotency token can be utilized for counter varieties that assist them. Clients can use this token to securely retry or hedge their requests. Failures in a distributed system are a given, and being able to securely retry requests enhances the reliability of the service.
GetCount: Retrieves the depend worth of the required counter inside a dataset.
{
"namespace": "my_dataset",
"counter_name": "counter123"
}
ClearCount: Effectively resets the depend to 0 for the required counter inside a dataset.
{
"namespace": "my_dataset",
"counter_name": "counter456",
"idempotency_token": {...}
}
Now, let’s take a look at the various kinds of counters supported throughout the Abstraction.
The service primarily helps two varieties of counters: Best-Effort and Eventually Consistent, together with a 3rd experimental sort: Accurate. In the next sections, we’ll describe the completely different approaches for these kinds of counters and the trade-offs related to every.
This sort of counter is powered by EVCache, Netflix’s distributed caching answer constructed on the broadly fashionable Memcached. It is appropriate to be used circumstances like A/B experiments, the place many concurrent experiments are run for comparatively brief durations and an approximate depend is ample. Setting apart the complexities of provisioning, useful resource allocation, and management airplane administration, the core of this answer is remarkably simple:
// counter cache key
counterCacheKey = <namespace>:<counter_name>// add operation
return delta > 0
? cache.incr(counterCacheKey, delta, TTL)
: cache.decr(counterCacheKey, Math.abs(delta), TTL);
// get operation
cache.get(counterCacheKey);
// clear counts from all replicas
cache.delete(counterCacheKey, ReplicaPolicy.ALL);
EVCache delivers extraordinarily excessive throughput at low millisecond latency or higher inside a single area, enabling a multi-tenant setup inside a shared cluster, saving infrastructure prices. However, there are some trade-offs: it lacks cross-region replication for the increment operation and doesn’t present consistency ensures, which can be vital for an correct depend. Additionally, idempotency will not be natively supported, making it unsafe to retry or hedge requests.
While some customers could settle for the restrictions of a Best-Effort counter, others go for exact counts, sturdiness and world availability. In the next sections, we’ll discover numerous methods for reaching sturdy and correct counts. Our goal is to focus on the challenges inherent in world distributed counting and clarify the reasoning behind our chosen strategy.
Approach 1: Storing a Single Row per Counter
Let’s begin easy by utilizing a single row per counter key inside a desk in a globally replicated datastore.
Let’s study a number of the drawbacks of this strategy:
- Lack of Idempotency: There is not any idempotency key baked into the storage data-model stopping customers from safely retrying requests. Implementing idempotency would doubtless require utilizing an exterior system for such keys, which may additional degrade efficiency or trigger race circumstances.
- Heavy Contention: To replace counts reliably, each author should carry out a Compare-And-Swap operation for a given counter utilizing locks or transactions. Depending on the throughput and concurrency of operations, this will result in vital rivalry, closely impacting efficiency.
Secondary Keys: One technique to scale back rivalry on this strategy can be to make use of a secondary key, similar to a bucket_id, which permits for distributing writes by splitting a given counter into buckets, whereas enabling reads to mixture throughout buckets. The problem lies in figuring out the suitable variety of buckets. A static quantity should still result in rivalry with sizzling keys, whereas dynamically assigning the variety of buckets per counter throughout thousands and thousands of counters presents a extra advanced downside.
Let’s see if we will iterate on our answer to beat these drawbacks.
Approach 2: Per Instance Aggregation
To handle problems with sizzling keys and rivalry from writing to the identical row in real-time, we might implement a method the place every occasion aggregates the counts in reminiscence after which flushes them to disk at common intervals. Introducing ample jitter to the flush course of can additional scale back rivalry.
However, this answer presents a brand new set of points:
- Vulnerability to Data Loss: The answer is susceptible to knowledge loss for all in-memory knowledge throughout occasion failures, restarts, or deployments.
- Inability to Reliably Reset Counts: Due to counting requests being distributed throughout a number of machines, it’s difficult to ascertain consensus on the precise cut-off date when a counter reset occurred.
- Lack of Idempotency: Similar to the earlier strategy, this methodology doesn’t natively assure idempotency. One technique to obtain idempotency is by constantly routing the identical set of counters to the identical occasion. However, this strategy could introduce further complexities, similar to chief election, and potential challenges with availability and latency within the write path.
That stated, this strategy should still be appropriate in situations the place these trade-offs are acceptable. However, let’s see if we will handle a few of these points with a distinct event-based strategy.
Approach 3: Using Durable Queues
In this strategy, we log counter occasions right into a sturdy queuing system like Apache Kafka to stop any potential knowledge loss. By creating a number of subject partitions and hashing the counter key to a selected partition, we make sure that the identical set of counters are processed by the identical set of shoppers. This setup simplifies facilitating idempotency checks and resetting counts. Furthermore, by leveraging further stream processing frameworks similar to Kafka Streams or Apache Flink, we will implement windowed aggregations.
However, this strategy comes with some challenges:
- Potential Delays: Having the identical shopper course of all of the counts from a given partition can result in backups and delays, leading to stale counts.
- Rebalancing Partitions: This strategy requires auto-scaling and rebalancing of subject partitions because the cardinality of counters and throughput will increase.
Furthermore, all approaches that pre-aggregate counts make it difficult to assist two of our necessities for correct counters:
- Auditing of Counts: Auditing entails extracting knowledge to an offline system for evaluation to make sure that increments had been utilized accurately to succeed in the ultimate worth. This course of may also be used to trace the provenance of increments. However, auditing turns into infeasible when counts are aggregated with out storing the person increments.
- Potential Recounting: Similar to auditing, if changes to increments are vital and recounting of occasions inside a time window is required, pre-aggregating counts makes this infeasible.
Barring these few necessities, this strategy can nonetheless be efficient if we decide the correct technique to scale our queue partitions and shoppers whereas sustaining idempotency. However, let’s discover how we will alter this strategy to satisfy the auditing and recounting necessities.
Approach 4: Event Log of Individual Increments
In this strategy, we log every particular person counter increment together with its event_time and event_id. The event_id can embrace the supply info of the place the increment originated. The mixture of event_time and event_id may function the idempotency key for the write.
However, in its easiest kind, this strategy has a number of drawbacks:
- Read Latency: Each learn request requires scanning all increments for a given counter doubtlessly degrading efficiency.
- Duplicate Work: Multiple threads would possibly duplicate the trouble of aggregating the identical set of counters throughout learn operations, resulting in wasted effort and subpar useful resource utilization.
- Wide Partitions: If utilizing a datastore like Apache Cassandra, storing many increments for a similar counter might result in a broad partition, affecting learn efficiency.
- Large Data Footprint: Storing every increment individually might additionally lead to a considerable knowledge footprint over time. Without an environment friendly knowledge retention technique, this strategy could wrestle to scale successfully.
The mixed influence of those points can result in elevated infrastructure prices that could be troublesome to justify. However, adopting an event-driven strategy appears to be a big step ahead in addressing a number of the challenges we’ve encountered and assembly our necessities.
How can we enhance this answer additional?
We use a mixture of the earlier approaches, the place we log every counting exercise as an occasion, and repeatedly mixture these occasions within the background utilizing queues and a sliding time window. Additionally, we make use of a bucketing technique to stop broad partitions. In the next sections, we’ll discover how this strategy addresses the beforehand talked about drawbacks and meets all our necessities.
Note: From right here on, we’ll use the phrases “rollup” and “aggregate” interchangeably. They primarily imply the identical factor, i.e., amassing particular person counter increments/decrements and arriving on the last worth.
TimeSequence Event Store:
We selected the TimeSequence Data Abstraction as our occasion retailer, the place counter mutations are ingested as occasion data. Some of the advantages of storing occasions in TimeSequence embrace:
High-Performance: The TimeSequence abstraction already addresses lots of our necessities, together with excessive availability and throughput, dependable and quick efficiency, and extra.
Reducing Code Complexity: We scale back a number of code complexity in Counter Abstraction by delegating a significant portion of the performance to an present service.
TimeSequence Abstraction makes use of Cassandra because the underlying occasion retailer, however it may be configured to work with any persistent retailer. Here is what it appears like:
Handling Wide Partitions: The time_bucket and event_bucket columns play a vital position in breaking apart a large partition, stopping high-throughput counter occasions from overwhelming a given partition. For extra info concerning this, seek advice from our earlier weblog.
No Over-Counting: The event_time, event_id and event_item_key columns kind the idempotency key for the occasions for a given counter, enabling purchasers to retry safely with out the chance of over-counting.
Event Ordering: TimeSequence orders all occasions in descending order of time permitting us to leverage this property for occasions like depend resets.
Event Retention: The TimeSequence Abstraction contains retention insurance policies to make sure that occasions usually are not saved indefinitely, saving disk area and lowering infrastructure prices. Once occasions have been aggregated and moved to a cheaper retailer for audits, there’s no have to retain them within the major storage.
Now, let’s see how these occasions are aggregated for a given counter.
Aggregating Count Events:
As talked about earlier, amassing all particular person increments for each learn request can be cost-prohibitive by way of learn efficiency. Therefore, a background aggregation course of is critical to repeatedly converge counts and guarantee optimum learn efficiency.
But how can we safely mixture depend occasions amidst ongoing write operations?
This is the place the idea of Eventually Consistent counts turns into essential. By deliberately lagging behind the present time by a secure margin, we make sure that aggregation all the time happens inside an immutable window.
Lets see what that appears like:
Let’s break this down:
- finalRollupTs: This represents the newest time when the counter worth was final aggregated. For a counter being operated for the primary time, this timestamp defaults to an affordable time previously.
- Immutable Window and Lag: Aggregation can solely happen safely inside an immutable window that’s now not receiving counter occasions. The “acceptLimit” parameter of the TimeSequence Abstraction performs a vital position right here, because it rejects incoming occasions with timestamps past this restrict. During aggregations, this window is pushed barely additional again to account for clock skews.
This does imply that the counter worth will lag behind its most up-to-date replace by some margin (usually within the order of seconds). This strategy does go away the door open for missed occasions because of cross-region replication points. See “Future Work” part on the finish.
- Aggregation Process: The rollup course of aggregates all occasions within the aggregation window for the reason that final rollup to reach on the new worth.
Rollup Store:
We save the outcomes of this aggregation in a persistent retailer. The subsequent aggregation will merely proceed from this checkpoint.
We create one such Rollup desk per dataset and use Cassandra as our persistent retailer. However, as you’ll quickly see within the Control Plane part, the Counter service could be configured to work with any persistent retailer.
LastWriteTs: Every time a given counter receives a write, we additionally log a last-write-timestamp as a columnar replace on this desk. This is finished utilizing Cassandra’s USING TIMESTAMP characteristic to predictably apply the Last-Write-Win (LWW) semantics. This timestamp is identical because the event_time for the occasion. In the following sections, we’ll see how this timestamp is used to maintain some counters in lively rollup circulation till they’ve caught as much as their newest worth.
Rollup Cache
To optimize learn efficiency, these values are cached in EVCache for every counter. We mix the finalRollupCount and finalRollupTs right into a single cached worth per counter to stop potential mismatches between the depend and its corresponding checkpoint timestamp.
But, how do we all know which counters to set off rollups for? Let’s discover our Write and Read path to know this higher.
Add/Clear Count:
An add or clear depend request writes durably to the TimeSequence Abstraction and updates the last-write-timestamp within the Rollup retailer. If the sturdiness acknowledgement fails, purchasers can retry their requests with the identical idempotency token with out the chance of overcounting. Upon sturdiness, we ship a fire-and-forget request to set off the rollup for the request counter.
GetCount:
We return the final rolled-up depend as a fast point-read operation, accepting the trade-off of doubtless delivering a barely stale depend. We additionally set off a rollup throughout the learn operation to advance the last-rollup-timestamp, enhancing the efficiency of subsequent aggregations. This course of additionally self-remediates a stale depend if any earlier rollups had failed.
With this strategy, the counts frequently converge to their newest worth. Now, let’s see how we scale this strategy to thousands and thousands of counters and hundreds of concurrent operations utilizing our Rollup Pipeline.
Rollup Pipeline:
Each Counter-Rollup server operates a rollup pipeline to effectively mixture counts throughout thousands and thousands of counters. This is the place many of the complexity in Counter Abstraction is available in. In the next sections, we’ll share key particulars on how environment friendly aggregations are achieved.
Light-Weight Roll-Up Event: As seen in our Write and Read paths above, each operation on a counter sends a lightweight occasion to the Rollup server:
rollupEvent: {
"namespace": "my_dataset",
"counter": "counter123"
}
Note that this occasion doesn’t embrace the increment. This is simply a sign to the Rollup server that this counter has been accessed and now must be aggregated. Knowing precisely which particular counters should be aggregated prevents scanning the complete occasion dataset for the aim of aggregations.
In-Memory Rollup Queues: A given Rollup server occasion runs a set of in-memory queues to obtain rollup occasions and parallelize aggregations. In the primary model of this service, we settled on utilizing in-memory queues to cut back provisioning complexity, save on infrastructure prices, and make rebalancing the variety of queues pretty simple. However, this comes with the trade-off of doubtless lacking rollup occasions in case of an occasion crash. For extra particulars, see the “Stale Counts” part in “Future Work.”
Minimize Duplicate Effort: We use a quick non-cryptographic hash like XXHash to make sure that the identical set of counters find yourself on the identical queue. Further, we attempt to reduce the quantity of duplicate aggregation work by having a separate rollup stack that chooses to run fewer beefier cases.
Availability and Race Conditions: Having a single Rollup server occasion can reduce duplicate aggregation work however could create availability challenges for triggering rollups. If we select to horizontally scale the Rollup servers, we enable threads to overwrite rollup values whereas avoiding any type of distributed locking mechanisms to take care of excessive availability and efficiency. This strategy stays secure as a result of aggregation happens inside an immutable window. Although the idea of now() could differ between threads, inflicting rollup values to typically fluctuate, the counts will finally converge to an correct worth inside every immutable aggregation window.
Rebalancing Queues: If we have to scale the variety of queues, a easy Control Plane configuration replace adopted by a re-deploy is sufficient to rebalance the variety of queues.
"eventual_counter_config": {
"queue_config": {
"num_queues" : 8, // change to 16 and re-deploy
...
Handling Deployments: During deployments, these queues shut down gracefully, draining all present occasions first, whereas the brand new Rollup server occasion begins up with doubtlessly new queue configurations. There could also be a short interval when each the previous and new Rollup servers are lively, however as talked about earlier than, this race situation is managed since aggregations happen inside immutable home windows.
Minimize Rollup Effort: Receiving a number of occasions for a similar counter doesn’t imply rolling it up a number of instances. We drain these rollup occasions right into a Set, making certain a given counter is rolled up solely as soon as throughout a rollup window.
Efficient Aggregation: Each rollup shopper processes a batch of counters concurrently. Within every batch, it queries the underlying TimeSequence abstraction in parallel to mixture occasions inside specified time boundaries. The TimeSequence abstraction optimizes these vary scans to realize low millisecond latencies.
Dynamic Batching: The Rollup server dynamically adjusts the variety of time partitions that should be scanned primarily based on cardinality of counters with a view to stop overwhelming the underlying retailer with many parallel learn requests.
Adaptive Back-Pressure: Each shopper waits for one batch to finish earlier than issuing the rollups for the following batch. It adjusts the wait time between batches primarily based on the efficiency of the earlier batch. This strategy gives back-pressure throughout rollups to stop overwhelming the underlying TimeSequence retailer.
Handling Convergence:
In order to stop low-cardinality counters from lagging behind an excessive amount of and subsequently scanning too many time partitions, they’re saved in fixed rollup circulation. For high-cardinality counters, repeatedly circulating them would devour extreme reminiscence in our Rollup queues. This is the place the last-write-timestamp talked about beforehand performs a vital position. The Rollup server inspects this timestamp to find out if a given counter must be re-queued, making certain that we proceed aggregating till it has totally caught up with the writes.
Now, let’s see how we leverage this counter sort to supply an up-to-date present depend in near-realtime.
We are experimenting with a barely modified model of the Eventually Consistent counter. Again, take the time period ‘Accurate’ with a grain of salt. The key distinction between this sort of counter and its counterpart is that the delta, representing the counts for the reason that last-rolled-up timestamp, is computed in real-time.
Aggregating this delta in real-time can influence the efficiency of this operation, relying on the variety of occasions and partitions that should be scanned to retrieve this delta. The identical precept of rolling up in batches applies right here to stop scanning too many partitions in parallel.
Conversely, if the counters on this dataset are accessed incessantly, the time hole for the delta stays slender, making this strategy of fetching present counts fairly efficient.
Now, let’s see how all this complexity is managed by having a unified Control Plane configuration.
The Data Gateway Platform Control Plane manages management settings for all abstractions and namespaces, together with the Counter Abstraction. Below, is an instance of a management airplane configuration for a namespace that helps finally constant counters with low cardinality:
"persistence_configuration": [
{
"id": "CACHE", // Counter cache config
"scope": "dal=counter",
"physical_storage": {
"type": "EVCACHE", // type of cache storage
"cluster": "evcache_dgw_counter_tier1" // Shared EVCache cluster
}
},
{
"id": "COUNTER_ROLLUP",
"scope": "dal=counter", // Counter abstraction config
"physical_storage": {
"type": "CASSANDRA", // type of Rollup store
"cluster": "cass_dgw_counter_uc1", // physical cluster name
"dataset": "my_dataset_1" // namespace/dataset
},
"counter_cardinality": "LOW", // supported counter cardinality
"config": {
"counter_type": "EVENTUAL", // Type of counter
"eventual_counter_config": { // eventual counter type
"internal_config": {
"queue_config": { // adjust w.r.t cardinality
"num_queues" : 8, // Rollup queues per instance
"coalesce_ms": 10000, // coalesce duration for rollups
"capacity_bytes": 16777216 // allocated memory per queue
},
"rollup_batch_count": 32 // parallelization factor
}
}
}
},
{
"id": "EVENT_STORAGE",
"scope": "dal=ts", // TimeSeries Event store
"physical_storage": {
"type": "CASSANDRA", // persistent store type
"cluster": "cass_dgw_counter_uc1", // physical cluster name
"dataset": "my_dataset_1", // keyspace name
},
"config": {
"time_partition": { // time-partitioning for events
"buckets_per_id": 4, // event buckets within
"seconds_per_bucket": "600", // smaller width for LOW card
"seconds_per_slice": "86400", // width of a time slice table
},
"accept_limit": "5s", // boundary for immutability
},
"lifecycleConfigs": {
"lifecycleConfig": [
{
"type": "retention", // Event retention
"config": {
"close_after": "518400s",
"delete_after": "604800s" // 7 day count event retention
}
}
]
}
}
]
Using such a management airplane configuration, we compose a number of abstraction layers utilizing containers deployed on the identical host, with every container fetching configuration particular to its scope.
As with the TimeSequence abstraction, our automation makes use of a bunch of person inputs concerning their workload and cardinalities to reach on the proper set of infrastructure and associated management airplane configuration. You can study extra about this course of in a chat given by considered one of our beautiful colleagues, Joey Lynch : How Netflix optimally provisions infrastructure within the cloud.
At the time of penning this weblog, this service was processing near 75K depend requests/second globally throughout the completely different API endpoints and datasets:
whereas offering single-digit millisecond latencies for all its endpoints:
While our system is strong, we nonetheless have work to do in making it extra dependable and enhancing its options. Some of that work contains:
- Regional Rollups: Cross-region replication points may end up in missed occasions from different areas. An alternate technique entails establishing a rollup desk for every area, after which tallying them in a worldwide rollup desk. A key problem on this design can be successfully speaking the clearing of the counter throughout areas.
- Error Detection and Stale Counts: Excessively stale counts can happen if rollup occasions are misplaced or if a rollups fails and isn’t retried. This isn’t a difficulty for incessantly accessed counters, as they continue to be in rollup circulation. This concern is extra pronounced for counters that aren’t accessed incessantly. Typically, the preliminary learn for such a counter will set off a rollup, self-remediating the difficulty. However, to be used circumstances that can’t settle for doubtlessly stale preliminary reads, we plan to implement improved error detection, rollup handoffs, and sturdy queues for resilient retries.
Distributed counting stays a difficult downside in pc science. In this weblog, we explored a number of approaches to implement and deploy a Counting service at scale. While there could also be different strategies for distributed counting, our purpose has been to ship blazing quick efficiency at low infrastructure prices whereas sustaining excessive availability and offering idempotency ensures. Along the best way, we make numerous trade-offs to satisfy the various counting necessities at Netflix. We hope you discovered this weblog publish insightful.
Stay tuned for Part 3 of Composite Abstractions at Netflix, the place we’ll introduce our Graph Abstraction, a brand new service being constructed on prime of the Key-Value Abstraction and the TimeSequence Abstraction to deal with high-throughput, low-latency graphs.
Special due to our beautiful colleagues who contributed to the Counter Abstraction’s success: Joey Lynch, Vinay Chella, Kaidan Fullerton, Tom DeVoe, Mengqing Wang