如何在快应用开发中使用 RxJS?

快应用 Sep 13, 2022

快应用 是基于手机硬件平台的新型应用形态,具备传统 APP 完整的应用体验,无需安装、即点即用;它跟小程序一样,基于前端技术栈开发;理论上,没有使用浏览器 API 的工具库,在快应用开发中同样适用。 RxJS 是 JavaScript 的反应式(响应式)编程库,它通过使用可观察(Observable)序列,来编写异步和基于事件的程序;本文旨在与朋友们分享:如何在快应用开发中使用 RxJS

RxJS 介绍

Rx(ReactiveX)是响应式(反应式)编程原则的一种实现,通过使用可观察序列,组成异步和基于事件的程序。它的设计思想组合了观察者模式,迭代器模式和函数式编程。响应式编程在各个编程语言中都有对应的实现,应用较为广泛的是 RxJava 以及 RxJS

RxJS 是基于 ReactiveX 实现的 JavaScript 版本的库,它使编写异步或基于回调的代码更容易。你可以把它看成是一个用于处理事件的 Lodash。RxJS 也是 Angular 强烈推荐的事件处理库。

要使用 RxJS ,先要了解其中的几个核心概念:

  • Observable (可观察对象): 表示一个概念,这个概念是一个可调用的未来值或事件的集合。
  • Observer (观察者): 一个回调函数的集合,它知道如何去监听由 Observable 提供的值。
  • Subscription (订阅): 表示 Observable 的执行,主要用于取消 Observable 的执行。
  • Operators (操作符): 采用函数式编程风格的纯函数 (pure function),使用像 mapfilterconcatflatMap 等这样的操作符来处理集合。
  • Subject (主体): 相当于 EventEmitter,并且是将值或事件多路推送给多个 Observer 的唯一方式。
  • Schedulers (调度器): 用来控制并发并且是中央集权的调度员,允许我们在发生计算时进行协调,例如 setTimeoutrequestAnimationFrame 或其他。

上面的描述可能比较抽象,举一个类比现实生活的例子,来帮助理解这几个概念:购房者一直在密切的关注房价,而房价随时间波动,购房者可能会根据波动的房价而采取一系列的行动,比如购入或者继续观望。购房者与房价的这样一种关系其实就构成了一种观察者关系。这里,购房者担任观察者的角色,房价是被观察的角色,当房价信息发生变化,则自动推送信息给购房者。

  • 房价即为 Observable 对象;
  • 购房者即为 Observer 对象;
  • 而购房者观察房价即为 Subscribe(订阅)关系;

如果理解了这个场景,那么就大概理解了 RxJS 的基础概念,如果你没接触过需要更详细了解,可以看看这篇文章 响应式编程入门 。这里就不做过多展开了,文章后面会列举一些 RxJS 的相关文档和工具,有兴趣的可以自行探索和学习。下面就直接进入结合快应用的使用方法了。

注意,本文示例均使用 RxJS 6.5 版本编写。

简单示例

安装

# yarn安装
yarn add rxjs  
# OR pnpm 安装
pnpm insatll rxjs 
# OR npm 安装
npm install rxjs --save

导入

import { Observable } from 'rxjs'

使用

const foo = new Observable(subscriber => {
     console.log('Hello');
         subscriber.next('World 2022');
    })
    
foo.subscribe(x => {
    console.log(x);
})
// output:  Hello World 2022

实践示例

节流(throttle)和防抖(debounce)

节流的处理

在开发快应用时,会遇到一些情况,比如:点击一个按钮或,请求一个网络接口(或者一些其他异步操作),由于有些网络接口对请求频率有限制(或者有些异步操作很消耗性能),如果用户快速多次点击按钮,会短时间触发多个请求,很可能导致接口拒绝返回数据(或者降低设备运行效率),这不是我们期望的行为,就需要对按钮的点击,做限流或是防抖处理。

下面是没有做处理的代码:

<template>
  <div class="demo-page">
    <text class="title">按钮点击次数{{count}}</text>
    <input class="btn" type="button" value="按钮" onclick="clickHandler" />
  </div>
</template>

<script>
import fetch from '@system.fetch'
export default {
  data() {
    return {
      count: 0
    }
  },
  
  requestData() {
    return fetch.fetch({url:'https://api.github.com/users?per_page=5'})
  },
  
  clickHandler () {
    this.requestData().then(res=> {
      if (res) {
        this.count++;
      }
    })
  }
}
</script>

<style>
  .demo-page {
    flex-direction: column;
    justify-content: center;
    align-items: center;
  }

  .title {
    font-size: 40px;
    text-align: center;
  }

  .btn {
    width: 550px;
    height: 86px;
    margin-top: 75px;
    border-radius: 43px;
    background-color: #09ba07;
    font-size: 30px;
    color: #ffffff;
  }
</style>

很显然,由于没有对点击事件做限制,每次点击都会触发一次请求,这不是我预期的效果,通常我们的做法一般是增加一个参数,用于保存上次点击时间,再根据这个参数来判断当前点击事件时间是否小于一定间隔,从而判断对应的逻辑是否执行。这种方式增加了额外的判断逻辑,也不是那么优雅,如果采用 RxJS 的方式,可以怎么做呢?下面是修改后的代码:

<template>
  <div class="demo-page">
    <text class="title">按钮点击次数{{count}}</text>
    <input id="button" class="btn" type="button" value="按钮"/>
  </div>
</template>

<script>
  import fetch from '@system.fetch'
  import {fromEvent} from 'rxjs'
  import {throttleTime} from 'rxjs/operators'
  export default {
    data() {
      return {
        count: 0
      }
    },
    onShow() {
      // 获取按钮的 DOM
      const button = this.$element('button')
      // 根据按钮点击事件创建可订阅流
      const observable = fromEvent(button, 'click')
      // 为可订阅流增加限制1秒的触发间隔
      const throttleButton = observable.pipe(throttleTime(1000))
      const subscribe = throttleButton.subscribe(e => {
        // 订阅流执行对应的逻辑
        console.log(`button clicked: ${JSON.stringify(e)}`)
        this.requestData().then(res => {
          if (res) {
            this.count++
          }
        })
      })
    },
    requestData() {
      return fetch.fetch({url: 'https://api.github.com/users?per_page=5'})
    },
  }
</script>

可以看到,不管我们以多快的速度点击按钮,现在按钮点击事件被节流到每秒只能触发一次了。

如何在快应用开发中使用 RxJS - 节流效果

防抖的处理

在开发应用的时候,会遇到搜索框联想的需求;一般来说,会采取监听输入框的 change 事件,来执行请求接口等逻辑;但是如果每次 change 都触发一次请求,会出现用户还没输入完成就开始提示,请求一般都是异步,会出现联想提示频繁变更,不是用户想要得情况。

更好处理方式是:在一段时间内,用户的输入不再继续了,我们就触发对应的数据请求,及联想更新逻辑。这里就需要用到事件的防抖了。下面是没有做防抖处理的代码:

<template>
  <div class="wrap">
    <input id="input" class="input" placeholder="请输入搜索词" type="text" onchange="changeHandler"/>
    <text for="text">{{$item}}</text>
  </div>
</template>

<script>
  export default {
    data() {
      return  {
        text: []
      }
    },
    changeHandler(e) {
      this.text.push(e.value)
    },
  }
</script>

<style>
  .wrap {
    flex-direction: column;
    align-items: center;
  }
  .input {
    height: 80px;
    width: 700px;
    border: 1px solid #8d8d8d;
    border-radius: 80px;
    padding: 0 40px;
    margin-top: 40px;
    background-color: #f0f0f0;
  }
</style>

运行的效果是这样的:

无防抖效果

显然效果是不符合我们预期的,下面用 RxJS 的方式,为它加上防抖

<template>
  <div class="wrap">
    <input id="input" class="input" placeholder="请输入搜索词" type="text"/>
    <text for="text">{{$item}}</text>
  </div>
</template>

<script>
  import {fromEvent} from 'rxjs'
  import {debounceTime} from 'rxjs/operators'
  export default {
    data() {
      return  {
        text: []
      }
    },
    onShow() {
      // 获取 input 的 DOM
      const input = this.$element('input')
      // 根据输入框的 change 事件创建可订阅流
      const observable = fromEvent(input, 'change')
      // 为可订阅流增加防抖 2 秒的时间间隔,2 秒后没有变化则触发对应了处理逻辑
      const debouncedInput = observable.pipe(debounceTime(2000))
      // 订阅流执行对应的逻辑
      const subscribe = debouncedInput.subscribe(e => {
        console.log(`input changed: ${JSON.stringify(e)}`)
        this.text.push(e.value);
      })
    },
  }
</script>

现在的效果是这样的:

防抖效果

可以看到,防抖的效果已经达到,👏。

请求失败自动重试

在开发 快应用 的时候,发送请求是通过 fetch 接口,但这个接口并没有提供超时和重试的机制,往往需要我们自行开发适配;这里我们采用 RxJS 来实现封装 fetch 接口,使其能够支持自动重试。

import fetch from '@system.fetch'
import {throwError, of, defer} from 'rxjs'
import {retry, mergeMap} from 'rxjs/operators'

export function myFetch(params) {
  const retryNum = params.retry || 1 // 出错后重试的次数
  return new Promise((resolve) => { // 用promise封装使其支持常规async/await调用
    defer(() => fetch.fetch({...params})) // 使用defer操作符,确保每次重试都是新的请求
      .pipe(
        mergeMap((res) => {
          if (res.data.code !== 200) { // 判断接口状态码,不为200时重试,这里可以根据业务自定义
            return throwError(res.data)
          }
          return of(res)
        }),
        retry(retryNum),
      ).subscribe({
        next: val => resolve(val),
        error: val => resolve(val)
      })
  })
}

通过上面的封装, 快应用 的原生接口就实现了失败重试的能力。

请求超时

通常,处理请求超时,会采用 setTimeout 的方式来实现;这里我们来试试如何用 RxJS 的方式来封装一个支持超时机制的请求接口。

import fetch from '@system.fetch'
import { defer } from 'rxjs'
import { timeout } from 'rxjs/operators'

export function myFetch(params) {
  const TIMEOUT = params.timeout || 2000
  return new Promise((resolve) => {
    defer(() => fetch.fetch({ ...params }))
      .pipe(
        timeout(TIMEOUT), // 超过设定时间未返回值抛出超时错误
      ).subscribe({
        next: val => resolve(val),
        error: val => resolve(val)
      })
  })
}

可以看出,超时机制的封装十分简洁,代码也更加优雅。

技术总结

RxJS 作为一个擅长处理事件的库,函数式编程使得代码更加优雅,在需要处理多个事件并发的时候,能够显现出其强大的优势;本文中只介绍部分部分操作符,就能将繁琐的操作变得更加简洁。接下来的将会分享:如何在快应用开发中使用 RxJS 做状态管理

参考文档

您可能会感兴趣的文章

Tags

Great! You've successfully subscribed.
Great! Next, complete checkout for full access.
Welcome back! You've successfully signed in.
Success! Your account is fully activated, you now have access to all content.