Model asynchronous tasks as publishers and limit concurrent execution
MIT License
Note: This is a proof of concept that hasn't been tested in production. The core functionality should be sound, but the code is primarily being published as a reference and likely will not be maintained. Use at your own discretion!
PublisherQueue
is a class that manages the serial or concurrent execution of
Combine publishers. It can be used to serialise asynchronous tasks larger than
a single closure, or to place an upper bound on concurrency between publishers
(by customising the maxConcurrentPublishers
argument).
For example, if not for the maximum of 1 concurrent publisher, the following
code would exhibit a data race on the count
variable, and the count would be
unlikely to match the expected total.
let total = 1000
let group = DispatchGroup()
let queue = PublisherQueue(size: total) // maxConcurrentPublishers = .max(1)
var count = 0
var subscriptions = Set<AnyCancellable>()
for i in 0 ..< total {
let publisher = Just(i)
.subscribe(on: DispatchQueue.global())
.enqueue(on: queue)
group.enter()
publisher.sink { _ in
count += 1
group.leave()
}.store(in: &subscriptions)
}
group.wait()
print("Expected: \(total), received: \(count)")
// Expected: 1000, received: 1000
PublisherQueue is Copyright 2020 Adam Sharp. It is free software, and may be redistributed under the terms specified in the LICENSE file.