RxJS - Reactive Extension for Javascript
to handle asynchrouns data stream easiliy
npm install rxjs
represent async object to be function , http stream , port , time data stream.
function subcrib the data stream or lisen or recive the data stream
import { Observable } from 'rxjs';
const observable = new Observable(subscriber => {
subscriber.next('First value'); subscriber.next('Second value');
setTimeout(() => {
subscriber.next('Third value');
subscriber.complete(); // Indicates the end of the stream
}, 2000);
});
const observer = {
next: (value) => console.log('Received:', value),
error: (err) => console.error('Error:', err),
complete: () => console.log('Stream completed')
};
const subscription = observable.subscribe(observer);
`setTimeout(() => {
subscription.unsubscribe();
console.log('Unsubscribed');
}, 1000);
const subject = new Subject();
subject.subscribe(value => console.log('Subject:', value));
subject.next('Subject Value');
const behaviorSubject = new BehaviorSubject('Initial Value');
behaviorSubject.subscribe(value => console.log('BehaviorSubject:', value));
behaviorSubject.next('BehaviorSubject Value');
Operator | Description | Example |
---|---|---|
from |
Converts arrays, promises,iterables into observables | from([1, 2, 3]) |
of |
Emits a set of values as an observable | of(1, 2, 3) |
fromEvent |
Creates an observable from DOM events | fromEvent(document, 'click') |
Operator | Description | Example |
---|---|---|
Merge |
Combines multiple observables | |
Concat |
Starts the next observable after one completes | |
forkJoin |
Waits for multiple observables to complete | forkJoin([of(1), of(2)]) |
combineLatest |
Emits the latest values from multiple observables | combineLatest([of(1), of(2)]) |
Operator | Description | Example |
---|---|---|
Map |
Transforms the data | of(1, 2, 3).pipe(map(x => x * 10)) |
concatMap |
Subscribes to observables sequentially | of(1, 2, 3).pipe(concatMap(x => of(x * 10))) |
switchMap |
Unsubscribes previous observable if new value emitted | of(1, 2).pipe(switchMap(x => of(x * 10))) |
Pluck |
Selects a property from emitted objects | |
MergeMap |
Merges all the inner observables into one | of(1, 2).pipe(mergeMap(x => of(x * 10))) |
Operator | Description | Example |
---|---|---|
Filter |
Filters data based on a condition | of(1, 2, 3, 4).pipe(filter(x => x % 2 === 0)) |
distinctUntilChanged |
Emits if current value differs from last | of(1, 1, 2).pipe(distinctUntilChanged()) |
take |
Emits first N values | of(1, 2, 3).pipe(take(2)) |
debounceTime |
Emits last value if specified time has passed | fromEvent(document, 'click').pipe(debounceTime(500)) |
Operator | Description | Example |
---|---|---|
Delay |
Delays the emission of values | of(1, 2, 3).pipe(delay(1000)) |
tap |
Allows logging or executing logic | of(1, 2, 3).pipe(tap(x => console.log('test', x))) |
catchError |
Catches errors and recover gracefully | throwError('Error!').pipe(catchError(err => of('recovered'))) |