如何在快应用开发中使用 RxJS?

快应用 是基于手机硬件平台的新型应用形态,具备传统 APP 完整的应用体验,无需安装、即点即用;它跟小程序一样,基于前端技术栈开发;理论上,没有使用浏览器 API 的工具库,在快应用开发中同样适用。 RxJS 是 JavaScript 的反应式(响应式)编程库,它通过使用可观察(Observable)序列,来编写异步和基于事件的程序;本文旨在与朋友们分享:如何在快应用开发中使用 RxJS

RxJS 介绍

Rx(ReactiveX)是响应式(反应式)编程原则的一种实现,通过使用可观察序列,组成异步和基于事件的程序。它的设计思想组合了观察者模式,迭代器模式和函数式编程。响应式编程在各个编程语言中都有对应的实现,应用较为广泛的是 RxJava 以及 RxJS

RxJS 是基于 ReactiveX 实现的 JavaScript 版本的库,它使编写异步或基于回调的代码更容易。你可以把它看成是一个用于处理事件的 Lodash。RxJS 也是 Angular 强烈推荐的事件处理库。

要使用 RxJS ,先要了解其中的几个核心概念:

  • Observable (可观察对象): 表示一个概念,这个概念是一个可调用的未来值或事件的集合。
  • Observer (观察者): 一个回调函数的集合,它知道如何去监听由 Observable 提供的值。
  • Subscription (订阅): 表示 Observable 的执行,主要用于取消 Observable 的执行。
  • Operators (操作符): 采用函数式编程风格的纯函数 (pure function),使用像 mapfilterconcatflatMap 等这样的操作符来处理集合。
  • Subject (主体): 相当于 EventEmitter,并且是将值或事件多路推送给多个 Observer 的唯一方式。
  • Schedulers (调度器): 用来控制并发并且是中央集权的调度员,允许我们在发生计算时进行协调,例如 setTimeoutrequestAnimationFrame 或其他。

上面的描述可能比较抽象,举一个类比现实生活的例子,来帮助理解这几个概念:购房者一直在密切的关注房价,而房价随时间波动,购房者可能会根据波动的房价而采取一系列的行动,比如购入或者继续观望。购房者与房价的这样一种关系其实就构成了一种观察者关系。这里,购房者担任观察者的角色,房价是被观察的角色,当房价信息发生变化,则自动推送信息给购房者。

  • 房价即为 Observable 对象;
  • 购房者即为 Observer 对象;
  • 而购房者观察房价即为 Subscribe(订阅)关系;

如果理解了这个场景,那么就大概理解了 RxJS 的基础概念,如果你没接触过需要更详细了解,可以看看这篇文章 响应式编程入门 。这里就不做过多展开了,文章后面会列举一些 RxJS 的相关文档和工具,有兴趣的可以自行探索和学习。下面就直接进入结合快应用的使用方法了。

注意,本文示例均使用 RxJS 6.5 版本编写。

简单示例

安装

# yarn安装
yarn add rxjs  
# OR pnpm 安装
pnpm insatll rxjs 
# OR npm 安装
npm install rxjs --save

导入

import { Observable } from 'rxjs'

使用

const foo = new Observable(subscriber => {
     console.log('Hello');
         subscriber.next('World 2022');
    })
    
foo.subscribe(x => {
    console.log(x);
})
// output:  Hello World 2022

实践示例

节流(throttle)和防抖(debounce)

节流的处理

在开发快应用时,会遇到一些情况,比如:点击一个按钮或,请求一个网络接口(或者一些其他异步操作),由于有些网络接口对请求频率有限制(或者有些异步操作很消耗性能),如果用户快速多次点击按钮,会短时间触发多个请求,很可能导致接口拒绝返回数据(或者降低设备运行效率),这不是我们期望的行为,就需要对按钮的点击,做限流或是防抖处理。

下面是没有做处理的代码:

<template>
  <div class="demo-page">
    <text class="title">按钮点击次数{{count}}</text>
    <input class="btn" type="button" value="按钮" onclick="clickHandler" />
  </div>
</template>

<script>
import fetch from '@system.fetch'
export default {
  data() {
    return {
      count: 0
    }
  },
  
  requestData() {
    return fetch.fetch({url:'https://api.github.com/users?per_page=5'})
  },
  
  clickHandler () {
    this.requestData().then(res=> {
      if (res) {
        this.count++;
      }
    })
  }
}
</script>

<style>
  .demo-page {
    flex-direction: column;
    justify-content: center;
    align-items: center;
  }

  .title {
    font-size: 40px;
    text-align: center;
  }

  .btn {
    width: 550px;
    height: 86px;
    margin-top: 75px;
    border-radius: 43px;
    background-color: #09ba07;
    font-size: 30px;
    color: #ffffff;
  }
</style>

很显然,由于没有对点击事件做限制,每次点击都会触发一次请求,这不是我预期的效果,通常我们的做法一般是增加一个参数,用于保存上次点击时间,再根据这个参数来判断当前点击事件时间是否小于一定间隔,从而判断对应的逻辑是否执行。这种方式增加了额外的判断逻辑,也不是那么优雅,如果采用 RxJS 的方式,可以怎么做呢?下面是修改后的代码:

<template>
  <div class="demo-page">
    <text class="title">按钮点击次数{{count}}</text>
    <input id="button" class="btn" type="button" value="按钮"/>
  </div>
</template>

<script>
  import fetch from '@system.fetch'
  import {fromEvent} from 'rxjs'
  import {throttleTime} from 'rxjs/operators'
  export default {
    data() {
      return {
        count: 0
      }
    },
    onShow() {
      // 获取按钮的 DOM
      const button = this.$element('button')
      // 根据按钮点击事件创建可订阅流
      const observable = fromEvent(button, 'click')
      // 为可订阅流增加限制1秒的触发间隔
      const throttleButton = observable.pipe(throttleTime(1000))
      const subscribe = throttleButton.subscribe(e => {
        // 订阅流执行对应的逻辑
        console.log(`button clicked: ${JSON.stringify(e)}`)
        this.requestData().then(res => {
          if (res) {
            this.count++
          }
        })
      })
    },
    requestData() {
      return fetch.fetch({url: 'https://api.github.com/users?per_page=5'})
    },
  }
</script>

可以看到,不管我们以多快的速度点击按钮,现在按钮点击事件被节流到每秒只能触发一次了。

防抖的处理

在开发应用的时候,会遇到搜索框联想的需求;一般来说,会采取监听输入框的 change 事件,来执行请求接口等逻辑;但是如果每次 change 都触发一次请求,会出现用户还没输入完成就开始提示,请求一般都是异步,会出现联想提示频繁变更,不是用户想要得情况。

更好处理方式是:在一段时间内,用户的输入不再继续了,我们就触发对应的数据请求,及联想更新逻辑。这里就需要用到事件的防抖了。下面是没有做防抖处理的代码:

<template>
  <div class="wrap">
    <input id="input" class="input" placeholder="请输入搜索词" type="text" onchange="changeHandler"/>
    <text for="text">{{$item}}</text>
  </div>
</template>

<script>
  export default {
    data() {
      return  {
        text: []
      }
    },
    changeHandler(e) {
      this.text.push(e.value)
    },
  }
</script>

<style>
  .wrap {
    flex-direction: column;
    align-items: center;
  }
  .input {
    height: 80px;
    width: 700px;
    border: 1px solid #8d8d8d;
    border-radius: 80px;
    padding: 0 40px;
    margin-top: 40px;
    background-color: #f0f0f0;
  }
</style>

运行的效果是这样的:

显然效果是不符合我们预期的,下面用 RxJS 的方式,为它加上防抖

<template>
  <div class="wrap">
    <input id="input" class="input" placeholder="请输入搜索词" type="text"/>
    <text for="text">{{$item}}</text>
  </div>
</template>

<script>
  import {fromEvent} from 'rxjs'
  import {debounceTime} from 'rxjs/operators'
  export default {
    data() {
      return  {
        text: []
      }
    },
    onShow() {
      // 获取 input 的 DOM
      const input = this.$element('input')
      // 根据输入框的 change 事件创建可订阅流
      const observable = fromEvent(input, 'change')
      // 为可订阅流增加防抖 2 秒的时间间隔,2 秒后没有变化则触发对应了处理逻辑
      const debouncedInput = observable.pipe(debounceTime(2000))
      // 订阅流执行对应的逻辑
      const subscribe = debouncedInput.subscribe(e => {
        console.log(`input changed: ${JSON.stringify(e)}`)
        this.text.push(e.value);
      })
    },
  }
</script>

现在的效果是这样的:

可以看到,防抖的效果已经达到,👏。

请求失败自动重试

在开发 快应用 的时候,发送请求是通过 fetch 接口,但这个接口并没有提供超时和重试的机制,往往需要我们自行开发适配;这里我们采用 RxJS 来实现封装 fetch 接口,使其能够支持自动重试。

import fetch from '@system.fetch'
import {throwError, of, defer} from 'rxjs'
import {retry, mergeMap} from 'rxjs/operators'

export function myFetch(params) {
  const retryNum = params.retry || 1 // 出错后重试的次数
  return new Promise((resolve) => { // 用promise封装使其支持常规async/await调用
    defer(() => fetch.fetch({...params})) // 使用defer操作符,确保每次重试都是新的请求
      .pipe(
        mergeMap((res) => {
          if (res.data.code !== 200) { // 判断接口状态码,不为200时重试,这里可以根据业务自定义
            return throwError(res.data)
          }
          return of(res)
        }),
        retry(retryNum),
      ).subscribe({
        next: val => resolve(val),
        error: val => resolve(val)
      })
  })
}

通过上面的封装, 快应用 的原生接口就实现了失败重试的能力。

请求超时

通常,处理请求超时,会采用 setTimeout 的方式来实现;这里我们来试试如何用 RxJS 的方式来封装一个支持超时机制的请求接口。

import fetch from '@system.fetch'
import { defer } from 'rxjs'
import { timeout } from 'rxjs/operators'

export function myFetch(params) {
  const TIMEOUT = params.timeout || 2000
  return new Promise((resolve) => {
    defer(() => fetch.fetch({ ...params }))
      .pipe(
        timeout(TIMEOUT), // 超过设定时间未返回值抛出超时错误
      ).subscribe({
        next: val => resolve(val),
        error: val => resolve(val)
      })
  })
}

可以看出,超时机制的封装十分简洁,代码也更加优雅。

技术总结

RxJS 作为一个擅长处理事件的库,函数式编程使得代码更加优雅,在需要处理多个事件并发的时候,能够显现出其强大的优势;本文中只介绍部分部分操作符,就能将繁琐的操作变得更加简洁。接下来的将会分享:如何在快应用开发中使用 RxJS 做状态管理

参考文档

您可能会感兴趣的文章