You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
216 lines
10 KiB
216 lines
10 KiB
import { __extends } from "tslib"; |
|
import { Observable } from "./Observable.js"; |
|
import { iterateObserversSafely } from "./iteration.js"; |
|
import { fixObservableSubclass } from "./subclassing.js"; |
|
function isPromiseLike(value) { |
|
return value && typeof value.then === "function"; |
|
} |
|
// A Concast<T> observable concatenates the given sources into a single |
|
// non-overlapping sequence of Ts, automatically unwrapping any promises, |
|
// and broadcasts the T elements of that sequence to any number of |
|
// subscribers, all without creating a bunch of intermediary Observable |
|
// wrapper objects. |
|
// |
|
// Even though any number of observers can subscribe to the Concast, each |
|
// source observable is guaranteed to receive at most one subscribe call, |
|
// and the results are multicast to all observers. |
|
// |
|
// In addition to broadcasting every next/error message to this.observers, |
|
// the Concast stores the most recent message using this.latest, so any |
|
// new observers can immediately receive the latest message, even if it |
|
// was originally delivered in the past. This behavior means we can assume |
|
// every active observer in this.observers has received the same most |
|
// recent message. |
|
// |
|
// With the exception of this.latest replay, a Concast is a "hot" |
|
// observable in the sense that it does not replay past results from the |
|
// beginning of time for each new observer. |
|
// |
|
// Could we have used some existing RxJS class instead? Concast<T> is |
|
// similar to a BehaviorSubject<T>, because it is multicast and redelivers |
|
// the latest next/error message to new subscribers. Unlike Subject<T>, |
|
// Concast<T> does not expose an Observer<T> interface (this.handlers is |
|
// intentionally private), since Concast<T> gets its inputs from the |
|
// concatenated sources. If we ever switch to RxJS, there may be some |
|
// value in reusing their code, but for now we use zen-observable, which |
|
// does not contain any Subject implementations. |
|
var Concast = /** @class */ (function (_super) { |
|
__extends(Concast, _super); |
|
// Not only can the individual elements of the iterable be promises, but |
|
// also the iterable itself can be wrapped in a promise. |
|
function Concast(sources) { |
|
var _this = _super.call(this, function (observer) { |
|
_this.addObserver(observer); |
|
return function () { return _this.removeObserver(observer); }; |
|
}) || this; |
|
// Active observers receiving broadcast messages. Thanks to this.latest, |
|
// we can assume all observers in this Set have received the same most |
|
// recent message, though possibly at different times in the past. |
|
_this.observers = new Set(); |
|
_this.promise = new Promise(function (resolve, reject) { |
|
_this.resolve = resolve; |
|
_this.reject = reject; |
|
}); |
|
// Bound handler functions that can be reused for every internal |
|
// subscription. |
|
_this.handlers = { |
|
next: function (result) { |
|
if (_this.sub !== null) { |
|
_this.latest = ["next", result]; |
|
_this.notify("next", result); |
|
iterateObserversSafely(_this.observers, "next", result); |
|
} |
|
}, |
|
error: function (error) { |
|
var sub = _this.sub; |
|
if (sub !== null) { |
|
// Delay unsubscribing from the underlying subscription slightly, |
|
// so that immediately subscribing another observer can keep the |
|
// subscription active. |
|
if (sub) |
|
setTimeout(function () { return sub.unsubscribe(); }); |
|
_this.sub = null; |
|
_this.latest = ["error", error]; |
|
_this.reject(error); |
|
_this.notify("error", error); |
|
iterateObserversSafely(_this.observers, "error", error); |
|
} |
|
}, |
|
complete: function () { |
|
var _a = _this, sub = _a.sub, _b = _a.sources, sources = _b === void 0 ? [] : _b; |
|
if (sub !== null) { |
|
// If complete is called before concast.start, this.sources may be |
|
// undefined, so we use a default value of [] for sources. That works |
|
// here because it falls into the if (!value) {...} block, which |
|
// appropriately terminates the Concast, even if this.sources might |
|
// eventually have been initialized to a non-empty array. |
|
var value = sources.shift(); |
|
if (!value) { |
|
if (sub) |
|
setTimeout(function () { return sub.unsubscribe(); }); |
|
_this.sub = null; |
|
if (_this.latest && _this.latest[0] === "next") { |
|
_this.resolve(_this.latest[1]); |
|
} |
|
else { |
|
_this.resolve(); |
|
} |
|
_this.notify("complete"); |
|
// We do not store this.latest = ["complete"], because doing so |
|
// discards useful information about the previous next (or |
|
// error) message. Instead, if new observers subscribe after |
|
// this Concast has completed, they will receive the final |
|
// 'next' message (unless there was an error) immediately |
|
// followed by a 'complete' message (see addObserver). |
|
iterateObserversSafely(_this.observers, "complete"); |
|
} |
|
else if (isPromiseLike(value)) { |
|
value.then(function (obs) { return (_this.sub = obs.subscribe(_this.handlers)); }, _this.handlers.error); |
|
} |
|
else { |
|
_this.sub = value.subscribe(_this.handlers); |
|
} |
|
} |
|
}, |
|
}; |
|
_this.nextResultListeners = new Set(); |
|
// A public way to abort observation and broadcast. |
|
_this.cancel = function (reason) { |
|
_this.reject(reason); |
|
_this.sources = []; |
|
_this.handlers.complete(); |
|
}; |
|
// Suppress rejection warnings for this.promise, since it's perfectly |
|
// acceptable to pay no attention to this.promise if you're consuming |
|
// the results through the normal observable API. |
|
_this.promise.catch(function (_) { }); |
|
// If someone accidentally tries to create a Concast using a subscriber |
|
// function, recover by creating an Observable from that subscriber and |
|
// using it as the source. |
|
if (typeof sources === "function") { |
|
sources = [new Observable(sources)]; |
|
} |
|
if (isPromiseLike(sources)) { |
|
sources.then(function (iterable) { return _this.start(iterable); }, _this.handlers.error); |
|
} |
|
else { |
|
_this.start(sources); |
|
} |
|
return _this; |
|
} |
|
Concast.prototype.start = function (sources) { |
|
if (this.sub !== void 0) |
|
return; |
|
// In practice, sources is most often simply an Array of observables. |
|
// TODO Consider using sources[Symbol.iterator]() to take advantage |
|
// of the laziness of non-Array iterables. |
|
this.sources = Array.from(sources); |
|
// Calling this.handlers.complete() kicks off consumption of the first |
|
// source observable. It's tempting to do this step lazily in |
|
// addObserver, but this.promise can be accessed without calling |
|
// addObserver, so consumption needs to begin eagerly. |
|
this.handlers.complete(); |
|
}; |
|
Concast.prototype.deliverLastMessage = function (observer) { |
|
if (this.latest) { |
|
var nextOrError = this.latest[0]; |
|
var method = observer[nextOrError]; |
|
if (method) { |
|
method.call(observer, this.latest[1]); |
|
} |
|
// If the subscription is already closed, and the last message was |
|
// a 'next' message, simulate delivery of the final 'complete' |
|
// message again. |
|
if (this.sub === null && nextOrError === "next" && observer.complete) { |
|
observer.complete(); |
|
} |
|
} |
|
}; |
|
Concast.prototype.addObserver = function (observer) { |
|
if (!this.observers.has(observer)) { |
|
// Immediately deliver the most recent message, so we can always |
|
// be sure all observers have the latest information. |
|
this.deliverLastMessage(observer); |
|
this.observers.add(observer); |
|
} |
|
}; |
|
Concast.prototype.removeObserver = function (observer) { |
|
if (this.observers.delete(observer) && this.observers.size < 1) { |
|
// In case there are still any listeners in this.nextResultListeners, and |
|
// no error or completion has been broadcast yet, make sure those |
|
// observers have a chance to run and then remove themselves from |
|
// this.observers. |
|
this.handlers.complete(); |
|
} |
|
}; |
|
Concast.prototype.notify = function (method, arg) { |
|
var nextResultListeners = this.nextResultListeners; |
|
if (nextResultListeners.size) { |
|
// Replacing this.nextResultListeners first ensures it does not grow while |
|
// we are iterating over it, potentially leading to infinite loops. |
|
this.nextResultListeners = new Set(); |
|
nextResultListeners.forEach(function (listener) { return listener(method, arg); }); |
|
} |
|
}; |
|
// We need a way to run callbacks just *before* the next result (or error or |
|
// completion) is delivered by this Concast, so we can be sure any code that |
|
// runs as a result of delivering that result/error observes the effects of |
|
// running the callback(s). It was tempting to reuse the Observer type instead |
|
// of introducing NextResultListener, but that messes with the sizing and |
|
// maintenance of this.observers, and ends up being more code overall. |
|
Concast.prototype.beforeNext = function (callback) { |
|
var called = false; |
|
this.nextResultListeners.add(function (method, arg) { |
|
if (!called) { |
|
called = true; |
|
callback(method, arg); |
|
} |
|
}); |
|
}; |
|
return Concast; |
|
}(Observable)); |
|
export { Concast }; |
|
// Necessary because the Concast constructor has a different signature |
|
// than the Observable constructor. |
|
fixObservableSubclass(Concast); |
|
//# sourceMappingURL=Concast.js.map
|