useReactiveContext:一种新方法订阅Angular组件

10/23/2022 Angular

# 引言

我们都知道ObservablesRxJS奠定了Angular框架的基础。RxJS允许我们用一种反应性的方式创建我们自己的应用。然而,Observables有一个很大的缺点:每次当你订阅一个Observable时,最后都必须要解除订阅。否则,这很可能会引起应用的内存泄露。处理每一次的订阅并在最后解除订阅,这对于大多数人来说是一件比较麻烦且头痛的事情。

这些年,开发者们创建了无数的方法去管理这些订阅。尽快如此,Angular社区目前尚未有一个统一的解决方案在这件事上达成一致。我个人倾向于选择其中的一种实践去应用,而不是多种。这样的好处在于代码的统一性和规范性,以及后续的可维护性。

Angular14在前段时间发布了很多新的特性。其中就包含以ReactiveContext的方式去创建和管理订阅。

# 常见的订阅管理

接下来让我们看一下Observable的使用示例,以及在组件中最常用的管理订阅的方法:

@Component({
  // ...
})
export class UserComponent implements OnDestroy {

  private readonly unsubscribe$ = new Subject();
  
  constructor() {
    fromEvent(document, 'click')
      .pipe(takeUntil(this.unsubscribe$))
      .subscribe(() => {

      });
  }

  ngOnDestroy() {
    this.unsubscribe$.next();
    this.unsubscribe$.complete();
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

在这个组件中,我们在document上订阅了一个点击事件。为了代码能正确的运行,我们需要在组件销毁时去取消这个订阅。否则,将会导致潜在的内存泄露风险。我们最常见的方法是使用takeUnit:

  • 创建一个Subject并将其分配给unsubscribe$
  • 在组件销毁的生命周期ngOnDestroy中调用unsubscribe$next()complete()方法
  • subscribe被调用之前,调用takeUntil运算符

从上面的代码片段中,我们不难看出这里面仍然存在很多的工作需要去做,这就是为什么我们已经拥有了如此多的订阅管理的库却依然尝试找到一个最佳的解决方案。

# ReactiveContext

现在让我们来看看用ReactiveContext来对上面的代码进行重构:

@Component({
  // ...
})
export class UserComponent {
  
  constructor() {
    useReactiveContext(
      fromEvent(document, 'click')
    )
    .subscribe(() => {
      // show tooltip
    })
  }

  ngOnDestroy() {
    this.unsubscribe$.next();
    this.unsubscribe$.complete();
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

从上面的代码不难看出,我们只需要用useReactiveContext包裹observable就能完成之前繁琐的操作。useReactiveContext在组件销毁时会自动解除订阅。

与之前的takeUntil方法相比,你不必:

  • 记住创建一个专门的SubjectngOnDestroy方法
  • 使用takeUntil操作符

# 实现原理

useReactiveContext函数将observable包装在一个上下文中,让我们不用去在意订阅管理。开发者可以轻松地订阅自己的流,而不必考虑组件被销毁时会发生了什么。接下来让我们看一下基本实现,看看它是如何工作的:

export function useReactiveContext<T>(stream$: Observable<T>) {
  
  const changeDetector = inject(ChangeDetectorRef),
    unsubscribe$ = new Subject<void>();
	
  (changeDetector as ViewRef).onDestroy(() => {
    unsubscribe$.next();
    unsubscribe$.complete();
  });
	
  return {
    subscribe(
      next?: (value: T) => void,
      error?: (e: any) => void,
      complete?: () => void
    ): Subscription {
      return stream$
        .pipe(takeUntil(unsubscribe$))
        .subscribe(next, error, complete);
    },
  };
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

我们可以看到useReactiveContextobservable作为一个参数传入,并用takeUntil操作符进行包裹,在Angular 14ChangeDetectorRef允许执行一个新的onDestroy方法,它将在在组件被销毁时运行代码。在上面的代码中,我们运用了这个原理,并用它自动清理了订阅。useReactiveContext使用了inject函数创建了对ChangeDetectorRef的引用。因此不必将ChangeDetectorRef的引用作为函数参数进行传递。

但是,inject存在一些缺点。那就是只能在构造函数或类属性中使用。这就意味着useReactiveContext也只能在这些地方使用。

注入机制缩小了useReactiveContext可以使用的范围,幸运的是有两种解决方法可以忽略这个限制

# 构造函数之外

# 将ReactiveContext分配给类

您可以创建一个类属性,该属性包含对useReactiveContext创建的上下文的引用。这个解决方案允许我们在构造函数范围之外的类方法中使用ReactiveContext:

@Component({
  // ...
})
class ContextComponent {

  reactiveContext = useReactiveContext();
  
  showTooltipOnClick() {
    const click$ = fromEvent(document, 'click');

    this.reactiveContext
        .connect(click$)
        .subscribe(() => {
          // show tooltip
        });
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

为了代码能按预期执行,我们需要添加一个connect方法, 用reactiveContext连接observable

export function useReactiveContext<T>(stream$: Observable<T>) {
  
  const changeDetector = inject(ChangeDetectorRef),
    unsubscribe$ = new Subject<void>(),
    innerStream$ = stream$;

  let innerStream$: Observable<T> | undefined;

  if (stream$) {
    innerStream$ = stream$.pipe(takeUntil(unsub$));
  }

  (changeDetector as ViewRef).onDestroy(() => {
    unsubscribe$.next();
    unsubscribe$.complete();
  });
  
  return {
    connect: (stream$: Observable<T>) => {
      innerStream$ = stream$.pipe(takeUntil(unsub$));
      return context;
    },
    subscribe(
      next?: (value: T) => void,
      error?: (e: any) => void,
      complete?: () => void
    ): Subscription {
      return innerStream$
        .pipe(takeUntil(unsubscribe$))
        .subscribe(next, error, complete);
    },
  };
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

# 传递ChangeDetectorRef的引用

另一个解决方案是将ChangeDetectorRef作为useReactiveContext的参数:

@Component({
  // ...
})
class ContextComponent {

  constructor(private readonly cd: ChangeDetectorRef) {}

  showTooltipOnClick() {

    const click$ = fromEvent(document, 'click');

    useReactiveContext(click$, this.cd)
      .connect(click$)
      .subscribe(() => {
        // show tooltip
      })
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

这样,我们就不必依赖注入函数,并且没有任何的缺点:

export function useReactiveContext<T>(
  stream$: Observable<T>,
  cd?: ChangeDetectorRef
) {

  const changeDetector = cd ? cd : inject(ChangeDetectorRef),
    unsubscribe$ = new Subject<void>(),
    innerStream$ = stream$;

  let innerStream$: Observable<T> | undefined;

  if (stream$) {
    innerStream$ = stream$.pipe(takeUntil(unsubscribe$));
  }

  (changeDetector as ViewRef).onDestroy(() => {
    unsubscribe$.next();
    unsubscribe$.complete();
  });

  const context = {
    connect: (stream$: Observable<T>) => {
      innerStream$ = stream$.pipe(takeUntil(unsubscribe$));
      return context;
    },
    subscribe(
      next?: (value: T) => void,
      error?: (e: any) => void,
      complete?: () => void
    ): Subscription {
      return innerStream$
        .pipe(takeUntil(unsubscribe$))
        .subscribe(next, error, complete);
    },
  };
  return context;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

# 在Template中使用Observable

到目前为止,我们主要讨论怎样使用Observable监听浏览器的一系列事件。而Observable通常与组件的Template一起使用。比较流行的是:

除了以上的解决方案,在其他一些情况下,必须订阅组件中Observable并手动运行检测变更:

@Component({
  template: `<div *ngFor="let user of users">{{user}}</div>`,
  changeDetection: ChangeDetectionStrategy.Onpush,
})
export class UserComponent implements OnInit, OnDestroy {

  private users: User[];
  private readonly unsubscribe$ = new Subject();

  constructor(
    private readonly changeDetectorRef: ChangeDetectorRef,
    private readonly userService: UserService
  ) {}

  ngOnInit() {
    this.userService
        .onUsers()
        .pipe(takeUntil(this.unsubscribe$))
        .subscribe((users) => {
          this.users = users;
          this.changeDetectorRef.detectChanges();
        })
  }

  ngOnDestroy() {
    this.unsubscribe$.next();
    this.unsubscribe$.complete();
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

我们可以使用useReactiveContext来简化上面的代码

@Component({
  template: `<div *ngFor="let user of users">{{user}}</div>`,
  changeDetection: ChangeDetectionStrategy.Onpush,
})
export class UserComponent implements OnInit {

  private users: User[];
  private readonly unsubscribe$ = new Subject();

  constructor(private readonly userService: UserService) {}

  ngOnInit() {
    this.context
        .connect(this.userService.onUsers())
        .subscribeAndRender((users) => {
          this.users = users;
        })
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

useReactiveContext已经使用了ChangeDetectorRef,因此我们可以利用当值出现在Observable中时手动触发detectChanges。并用subscribeAndRender代替subscribe

export function useReactiveContext<T>(stream$: Observable<T>) {

  const changeDetector = inject(ChangeDetectorRef),
    unsubscribe$ = new Subject<void>();

    // ...

  return {
    // ...
    subscribeAndRender(
      next?: (value: T) => void,
      error?: (e: any) => void,
      complete?: () => void
    ): Subscription {
      return innerStream$.subscribe(
        (v) => {
          next(v);
          changeDetector.detectChanges();
        },
        error,
        complete
      );
    },
  };
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

这个解决方案在使用ChangeDetectionStrategy.onPushngZones被禁用时也能完美运行

# 总结

reactiveContext是一种在组件中处理订阅的新方式。useReactiveContext是一个灵活的API。与每种解决方案一样,它也有一些优点和缺点。

# 优点

  • 自动取消订阅,无需记住使用takeUntil操作符
  • 函数式编程,易读性和整洁性
  • 性能优秀
  • 与onPush策略配合使用,也可以用于ngZones被禁用的情况

# 缺点

  • 在构造函数或类属性之外使用reactiveContext需要传递ChangeDetectorRef的引用
  • 只能用于组件、指令或管道
  • 当组件使用@ViewChild时将不会生效(希望Angular团队将来能解决这个问题)

# 最后的一点想法

当涉及到订阅管理时,Angular社区无法达成一致。Angular的最新版本引入了一些新功能,这些功能激励我们为常见问题找到更好的解决方案。

在使用RxJS和Angular时,订阅管理是一个麻烦。很难想象开发人员花了多少时间来确保他们的应用没有内存泄漏。希望将来能有一个通用的机制,让大家不必为RxJS订阅而烦恼。

Last Updated: 11/21/2022, 6:50:20 AM