wangzengdi's Blog

Functional Programming

0%

本文是 Transducer Explained 教程的第二篇。在第一篇中,从 reducer 开始讲起,到 transformers,再到在 transduce 中使用 map transducers。本文将介绍四个新的 transducersfilterremovedroptake。我们将展示如何将 transducers 组合成 流(pipelines),并讨论转换的顺序。我们还将改变 reduce 的实现,使其能够借助 reduced 提前终止迭代操作。

那么,上次讲到哪了呢?

Transformer

第一篇文章中,通过定义 transformer 协议来规范了 reduce 的步骤。

所有 transformers 都包含3个方法:

  1. 使用初始值初始化初始化转换,init
  2. 使用 reducer 来组合每个元素,step
  3. 将最后累积值转换为输出,result

在第一篇文章中,我们使用下面的 reducer 来转换输入源: multaddappend

1
2
3
4
const append = (value, item) => {
value.push(item);
return value;
};

本文只会用到 append,将元素拼接到数组的尾部。

Reduce

我们定义了自己的 reduce 实现,它接受以下参数:

  1. 一个 transformer 或者将被封装成 transformer 的 reducer
  2. 一个初始值(例如[])。
  3. 一个输入源(例如一个将要被归并的数组)

目前的实现使用 transformer 的 step 函数作为对原生数组进行归并的 reducer,我们稍后将修改这一实现。

在迭代过程中,step 函数接受两个参数:resultitem。初始的 result 值 由调用者提供,后续的每个 result 使用当前 step 函数的返回值。

item 由迭代过程内部提供。在第一篇文章中展示了两个过程:1. 归并数组的每个元素; 2. 手动调用 step 函数处理每个元素。(在后续文章中将看到更多例子)。

Transducer

我们创建了 mapping transducer

1
2
3
4
5
const map = f => xf => ({
init: () => xf.init(),
step: (result, item) => xf.step(result, f(item)),
result: result => xf.result(result),
});

map transducer 接受一个映射函数:f,并返回一个真正的 transducer:

  1. 接受一个已有 transformer
  2. 返回一个新的 transformer,用来通过 f 转换 items
  3. 对封装的 transformer 进行代理,添加一些额外的处理过程(如添加映射操作)

函数 f,可以是对数据进行映射(转换)的任意函数。

1
2
3
const plus1 = input => input + 1;

const plus2 = input => input + 2;

在本文的例子中将继续使用这两个简单的映射函数。

Transduce

我们定义了一个新函数,transduce, 其接受:

  1. 一个 transducer: 定义转换的过程
  2. 一个 stepper(reducer)函数,或 transformer (如 append)
  3. 一个 stepper 函数 的初始值 (如 [])
  4. 输入源 (如一个待转换的数组)

通过 transducer 和 stepper(reducer) 函数,可以创建一个 transformer, 然后将该 transformer、初始值和输入源一同传入 reduce

我们已经展示过,同一 transducer 可以生成不同的结果,只要改变传入 transduce 的初始值和 stepper 函数。

Composition

最后,我们展示了可以通过组合已有 transducers 来创建新的 transducers

1
2
3
4
5
6
7
8
9
10
const compose2 = (fn1, fn2) => item => fn1(fn2(item));

const transducer = compose2(
compose2(map(plus1), map(plus2)),
map(plus1));
const stepper = append;
const init = [];
const input = [2,3,4];
const output = transduce(transducer, stepper, init, input);
// [6,7,8]

被组合的 transducer 从左向右对输入源进行转换。

本文从这里开始讲起。如果对上面这个例子不太熟悉,可以参考第一篇文章的讲解

Pipelines (流水线)

首先,定义一个新的函数,用它可以组合任意数量的函数。

1
2
3
4
5
const compose = (...fns) => xf => {
let i = fns.length - 1;
for(; i >=0; i--) { xf = fns[i](xf); }
return xf;
};

compose 是对 compose2 的扩展。compose 从右向左进行函数组合:组合出来的函数调用时,右侧函数的返回值,作为左侧函数的输入,不断重复该过程,知道最后一个函数的输出,作为整个函数的结果输出。

可以通过组合 plus1plus2 来创建 plus4

1
2
3
4
5
6
7
8
// 手动调用 
const value = plus1(plus1(plus2(5)));
// 9

// 使用组合函数 (允许函数复用)
const plus4 = compose(plus1, plus1, plus2);
const value = plus4(5);
// 9

我们来通过组合 map transducers 来创建一个 plus5 transducer。

1
2
3
4
5
6
7
8
9
10
const transducer = compose(
map(plus1), // [3,4,5]
map(plus2), // [5,6,7]
map(plus1), // [6,7,8]
map(plus1)); // [7,8,9]
const stepper = append;
const init = [];
const input = [2,3,4];
const output = transduce(transducer, stepper, init, input);
// [7,8,9]

每个 transducer 旁边的注释将每个转换通过数组展示出来,这样每次可以完整的查看整个转换。

然而,需要注意的是,在整个转换过程中,transducers 每次只顺序转换一个 item,并且不会产生中间结果。注释仅仅表示 pipeline 的每步的转换结果。我们将在下面继续讨论这个问题。

实际上,虽然组合是从右向左,转换的顺序实际上是从左向右的(在示例中是从上向下)。(需要将 transducers 的组合顺序 和 组合后 transducers 对输入源的转换顺序区分开)。在本例中顺序并不重要,但下一个例子中,便需要考虑调用顺序了。

Filter

我们来定义 filter transducer

1
2
3
4
5
6
7
8
9
10
11
const filter = predicate => xf => ({
init: () => xf.init(),
step: (value, item) => {
const allow = predicate(item);
if(allow){
value = xf.step(value, item);
}
return value;
},
result: (value) => xf.result(value),
};

注意,通过 filter 创建的 transformer,只有在判断函数 predicate 返回 真值 时,才会调用给下一个 transformer。当 predicate 返回 false 时,当前元素会被忽略,并返回上次的迭代结果。

我们创建一个 transducer ,来过滤出所有奇数。

1
2
3
4
5
6
7
8
const isOdd = num => num % 2 === 1;

const transducer = filter(isOdd);
const stepper = append;
const init = [];
const input = [1,2,3,4,5];
const output = transduce(transducer, stepper, init, input);
// [1,3,5]

我们使用 isOdd 的判断函数,来创建一个过滤元素的 transducer。然后使用 transduce 将其作用于一个整数数组,输出数组中只包含奇数。

我们再创建一个函数,该函数调用后,返回一个检查当前输入值是否等于之前输入值的判断函数。

1
2
3
4
5
6
7
8
9
// another predicate 
const isEqual = y => x => x === y;

const transducer = filter(isEqual(2));
const stepper = append;
const init = [];
const input = [1,2,3,4,5];
const output = transduce(transducer, stepper, init, input);
// [2]

可以看到,创建出来的判断函数 isEqual(2) 只允许输出数组中包含数字 2。

再来一个辅助函数,其接受一个判断函数,对其结果取反。

1
2
3
4
5
6
7
8
const not = predicate => x => !predicate(x);

const transducer = filter(not(isEqual(2)));
const stepper = append;
const init = [];
const input = [1,2,3,4,5];
const output = transduce(transducer, stepper, init, input);
// [1,3,4,5]

我们修改了前面的例子:对判断函数 isEqual(2) 取反,由此创建了一个移除输入源中所有数字 2 的 transducer。

现在在我们的 pipeline 库中添加了另一件武器(filter),一起来玩一下吧。

Pipeline 顺序

我们来对每个元素加 1,然后过滤出奇数。

1
2
3
4
5
6
7
8
const transducer = compose(
map(plus1), // [2,3,4,5,6]
filter(isOdd)); // [3,5]
const stepper = append;
const init = [];
const input = [1,2,3,4,5];
const output = transduce(transducer, stepper, init, input);
// [3,5]

首先调用 map(plus1) transducer 对每个元素加 1,然后调用下一步转换:过滤出所有奇数。

我们改变一下 transducers 的顺序,看看会发生什么。

1
2
3
4
5
6
7
8
const transducer = compose(
filter(isOdd), // [1,3,5]
map(plus1)); // [2,4,6]
const stepper = append;
const init = [];
const input = [1,2,3,4,5];
const output = transduce(transducer, stepper, init, input);
// [2,4,6]

我们首先过滤出所有奇数。filter(isOdd) transformer 只将奇数传给下一个 transformer。所有传到下一步的元素(奇数)会使用 plus1 进行映射。

这展示了 组合 transducers 的两个重要性质:

  1. 虽然 transducers组合 是从右向左,但数据转换是从左向右。
  2. 越早使用 transducers 减少 pipeline 中元素的数量,效率可能会越高。

注意到,在最后一个例子中,map(plus1) 仅仅使用所有元素的子集调用。同样的,并未创建中间数组,仅仅作为注释便于理解而已。

Remove

准备好了吗?现在开始讲另一个 transducer。

1
const remove = predicate => filter(not(predicate));

很简单吧。实际上,我们可以通过对 predicate 取反 和 复用 filter 来创建 remove transducer。

现在来实践一下。

1
2
3
4
5
6
7
8
9
const transducer = compose(
filter(isOdd), // [1,3,5]
map(plus1), // [2,4,6]
remove(isEqual(4))); // [2,6]
const stepper = append;
const init = [];
const input = [1,2,3,4,5];
const output = transduce(transducer, stepper, init, input);
// [2,6]

首先过滤出奇数,然后对每项加1,然后删除 4

Drop

如果想在迭代开始时跳过前 n 个元素,该做么做呢?这正是 drop transducer 的作用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const drop = n => xf => {
var left = n;
return {
init: () => xf.init(),
step: function(value, item){
if(left > 0){
left--;
} else {
value = xf.step(value, item);
}
return value;
},
result: result => xf.result(result),
}
}

可以这样使用 drop

1
2
3
4
5
6
var transducer = drop(2);
var stepper = append;
var init = [];
var input = [1,2,3,4,5];
var output = transduce(transducer, stepper, init, input);
// [3,4,5]

drop 接受丢弃元素的个数。这是第一个创建有状态变换的 transducer 的示例。每次调用 drop transducer 创建一个转换时,便会创建一个变量 left,用来指示当前还剩多少个元素需要被丢弃。left 被初始化为 n

注意,我们使用一个无状态的 transducer 创建了一个有状态的 transformer。这是一个重要的区别。这意味着我们可以重用 drop(2) transducer 任意多次,而不必担心任何状态。状态是在 transformer 中创建的,而不是 transducer 中。

假如我们不想丢弃,而是获取前 n 个元素并丢弃剩余元素,该怎么办呢?为方便实现,假设 n > 0

我们来尝试一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
const take = n => xf => {
var left = n;
return {
init: () => xf.init(),
step: function(value, item){
value = xf.step(value, item);
if(--left <= 0){
// 如何停止???
}
return value;
},
result: result => xf.result(result),
}
};

哦噢,麻烦来了。我们知道如何逐步处理每个元素,并且通过 transformer 的状态来保持剩余元素的计数。但是,如何停止对剩余元素的迭代呢?

为什么需要表明转换已经完成,不再接受任何额外元素呢?不仅因为继续接受元素是一种浪费,还因为无法保证迭代能够完成。有可能迭代是无限的。如果可以,我们当然想终止无限迭代。

那么如何表示提前终止呢?我们需要在看一下迭代的源代码:transduce

Reduce redux (终极 Reduce)

下面是来自第一篇文章transducereduce 的定义,:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
const transduce = (transducer, stepper, init, input) => {
if(typeof stepper === 'function'){
stepper = wrap(stepper);
}

var xf = transducer(stepper);
return reduce(xf, init, input);
};

const reduce(xf, init, input) => {
if(typeof xf === 'function'){
xf = wrap(xf);
}
// 怎样才能结束呢??
var value = input.reduce(xf.step, init);
return xf.result(value);
};

const wrap = stepper => ({
init: () => throw new Error('init not supported'),
step: stepper,
result: value => value,
};

从上面的实现可知,我们正在使用原生数组 reduce 方法进行迭代,reducereducer 来自 transformer。后续文章中,我们将删除输入源是数组的假设,但现在还需继续使用该假设。我们来定义自己的 arrayReduce 实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
const reduce = (xf, init, input) => {
if(typeof xf === 'function'){
xf = wrap(xf);
}

return arrayReduce(xf, init, input);
};

const arrayReduce(xf, init, array) => {
var value = init;
var idx = 0;
var length = array.length;
for(; idx < length; idx++){
value = xf.step(value, array[idx]);
// We need to break here, but how do we know?
}
return xf.result(value);
};

arrayReduce 的实现接受一个 transformer 、一个初始值和输入数组。然后使用 for 循环遍历每个元素,并使用累加值 value 和数组元素来调用 transformer 的 step 函数。

我们需要一个方法来打破这个循环,打破循环需要依赖某些标记值。幸运的是,我们可以采用已有的 transducer 协议

为了在调用 transformer 中的 step 之后发出提前终止信号,我们可以将 reduced 值封装在包含两个属性的对象中:

  1. value: 存储实际要封装的值。
  2. __transducers_reduced__: bool 类型值,为true时,表示该对象已经 reduced (归并过)了,迭代需要被终止。

实现如下:

1
2
3
4
const reduced = value => ({
value: value,
__transducers_reduced__: true,
});

我们还将添加一个 predicate 判断函数来确定值是否是 reduced 。

1
const iReduced = value => value && value.__transducers_reduced__;

此外,我还需要一个方法来提取,或 deref(解引用) reduced 的值。

现在我们可以调整一下 arrayReduce 的实现,来来处理提前终止的 reduced 的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
const arrayReduce => (xf, init, array) => {
var value = init;
var idx = 0;
var length = array.length;
for(; idx < length; idx++){
value = xf.step(value, array[idx]);
if(isReduced(value)){
value = deref(value);
break;
}
}
return xf.result(value);
}

现在可以像以前一样对每个元素进行迭代,但每次调用 step 之后,会先检查是否当前值是否 reduced。如果是,则提取当前值并终止迭代。我们仍然对最终值调用 result 方法,不管它来自 reduced value 还是完整的迭代。

Take 2

现在可以完成 take 的实现了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const take = n => xf => {
var left = n;
return {
init: () => xf.init(),
step: function(value, item){
value = xf.step(value, item);
if(--left <= 0){
// 任务完成,发出 reduced 信号
value = reduced(value);
}
return value;
},
result: value => xf.result(value),
}
};

我们之前唯一缺失的是:当检测到转换完成后使用 reduced 对值进行封装。(现在已经补上了)

让我们看看它是否能工作:

1
2
3
4
5
6
const transducer = take(3);
const stepper = append;
const init = [];
const input = [1,2,3,4,5];
const output = transduce(transducer, stepper, init, input);
// [1,2,3]

工作正常!

就像任何其他的 transducer,你可以将 droptake 组合成一个 pipeline

1
2
3
4
5
6
7
8
9
const transducer = compose(
drop(1), // [2,3,4,5]
take(3), // [2,3,4]
drop(1)); // [3,4]
const stepper = append;
const init = [];
const input = [1,2,3,4,5];
const output = transduce(transducer, stepper, init, input);
// [3,4]

第一个 drop 跳过第一个元素,然后将剩余元素传给下一个 transformer。take transformer 获取从第一个 drop 传过来的数组中的前三个元素,然后停止迭代。最后一个 drop 删除从 take 传过来的数组中的首个元素,并且在终止之前逐一发送剩余的两个元素。

第二部分总结

我们首先总结了在第一篇文章中学到的内容,然后添加了 4 个新的 transducers:filterremovetakedrop。我们通过组合 transducers 来创建 transformer pipelines,并看到数据变换的顺序是从左到右。

我们看到,除了在转换期间改变元素,transformer 还可以决定跳过任意元素,通过不调用下一个 transformer 的 step 来实现。每一个 transformer 的实现决定了什么会传递到下个 transformer。有些情况下, transformer 可能会发送多个值,例如 cattransduce-string

我们还看到了可以创建有状态变换的 transducer 的一些例子。状态由 transformer 管理,而不是 transducer。这允许无状态 transducer 的重用,即时它们创建的 transformers 管理状态。

当实现 take 时,我们意识到需要添加一个用于提前终止迭代的方法。我们改变了 reduce 的实现来处理和解包 reduced 的值,并且实现 take 用于在取完数据时,终止迭代。

还有别的吗?

在入门教程的最后一篇文章中还有一些需要解释的相关问题。我们仍未解释 transformer 的 initreduce 的作用。我们将添加 into 并一般化 reduce 的实现来支持迭代器。

我们还看到输入元素可以是产生 sequence 值的任意东西:惰性列表、无限序列生成器、CSPNode.js streams、迭代器、生成器、immutable-js 数据结构等。

想要获取新文章的通知吗?可以关注 获取 simplectic 的 Twitter。

我现在已经准备好了!

已经准备使用 transducers 了吗?如果通读了这篇文章,你应该已经具备良好的知识体系: transducers-jstransducers.js。我们主要参考 transducers-js 的实现,但这些概念同样适用于 transducers.js。

如果你喜欢 Underscore.js,可以查看 underarm。它是基于 transduce 的库,允许针对 transducers.js 和 transducer-js 支持的公共协议定义 API。

译者注:本文翻译自 Kevin Beaty 的《Transducers Explained: Part 1》, 转载请与原作者本人联系。
对原文中出现的专业术语,在不妨碍理解的情况下采用原文单词。

Transduce 相较于 Reduce 的改进,用一句话概括:在使用 Reduce 对每个元素归并之前,先对取出的每个元素进行转换。

Transduce 的时间复杂度为 O(n), 传统 compose + reduce 的为O(mn),m:compose 中包含 转变函数的个数,n:输入“数组”的长度。

名词解释:

  • reduce:归并、折叠。

  • reducer:用来进行 reduce 的二元函数。

  • result:reducer 的首个参数,累积值。

  • item:reducer 的第二个参数,reduce 数组遍历过程中的当前元素。

  • transduce:transform + reduce。

  • transducer:传入一个transformer,返回一个新的transformer。

  • transformer:在 reduce 过程中,对当前处理元素进行转换的对象,其中包含三个方法: { init, step, result }

  • xf:reduce 函数的首个参数,可以是 reducer,也可以是 transformer。

  • stepper:等同于 reducer。

下面开始正文。


本文使用 JavaScript 对 transducers 原理进行剖析。首先介绍数组的 reducing(归并) 过程,并定义用于 数据转换 的 transformers;然后逐步引入 transducers,并将其用于 transduce 中。文末会有总结、展望、一些补充链接,以及当前涉及 transducer 的一些库。

Transducers…

什么是 Transducers ?

原文解释如下

Transducers 是可组合的算法变换。它们独立于输入和输出的上下文,并且作为一个独立的单元提供最基本的 转换(transformation)。由于 transducers 与输入和输出相解耦,所以它们可以用于许多不同的处理场景:collections, streams, channels, observables(集合、流、管道、观察者模式)等。Transducers 可以直接组合,与输入无关,且不会产生额外的中间变量。

嗯…

还是不太理解

我们来找一些相关的代码看看。当使用 transducers 时,所谓的 “算法变换” 就已经以函数的形式被定义好了(或至少部分定义好了), transducer 类似于传入 reduce 的 reducerClojure 文档 将这些 “算法变换” 称为 reducing function。这又是什么东西?好吧…… ,让我们从 Array#reduce 函数开始讲解。首先来看看 MDN 的定义

Reduce

reduce() 方法将一个二元函数,作用于一个累积值、以及数组中的每个元素(按从左到右的顺序), 最终将数组 reduce(归并、折叠)成一个单值。

更多解释可以参考 MDN 文档(译者注:reduce 在某些语言中称为 foldl 左折叠,如 Haskell)。由于大家可能对 reduce 已经比较熟悉,这里将举几个例子来快速说明一下。

1
2
3
4
5
6
7
8
9
const sum = (result, item) => result + item;

const mult = (result, item) => result * item;

// 10 (=1+2+3+4)
const summed = [2,3,4].reduce(sum, 1);

// 24 (=1*2*3*4)
const multed = [2,3,4].reduce(mult, 1);

上述代码中的 reducers 是 summultreducers 连同初始值:1,一起传入 reduce 中。“输入源” 是数组 [2, 3, 4],“输出源” 是通过 reduce 内部实现创建的新数组。

关于 reduce, 有几个非常重要的点需要注意:

  1. 归并从输入的初始值开始。
  2. reducer 一次处理一个元素,操作过程如下:
    • 使用传入 reduce 的初始值作为第一步的累积值
    • 当次操作的返回值作为下次操作的累积值
  3. 将最后一次计算结果作为整体的返回值。

注意,在上述两例中,reducer 是一个二元函数。第一个参数为累积值,其值为由外部传入的初始值,或上次 reducer 的计算结果;第二个参数是在迭代过程中传入的单个元素。在本例中,reduce 对数组中的每个元素进行迭代,并进行归并处理。我们稍后会看到其他的迭代方式。我们使用 reducer 函数来描述 “转换的本质”。

Transformer

接下来,我们来明确 transformer 的归并过程:

1
2
3
4
5
6
7
8
9
10
11
const createTransformer = reducer => ({
// 1. 作为 reduce 的初始值
init: () => 1,

// 2. 每次输入一个元素,并将本次计算结果
// 传给下次迭代的 reducer
step: reducer,

// 3. 将最后一次的计算结果作为最终输出
result: result => result,
});

我们创建一个对象来封装 reducer,并将其命名为 step;该对象还包含另外两个函数:1. init 函数用于初始化 transformer,result 函数用于将最后一次计算结果转换为最终输出。注意,本文将只关注 step 函数,initresult 函数将在后续文章中做深入分析。现在,你可以把它们当作管理 “转换” 生命周期的方法:init 用于初始化,step 用于迭代,result 用于输出整个结果。

现在,我们来将刚定义的 transformer 运用到 reduce 中。

1
2
3
4
5
6
7
8
9
const input = [2,3,4];

const xf = createTransformer(sum);
const output = input.reduce(xf.step, xf.init());
// output = 10 (=1+2+3+4)

const xf = createTransformer(mult);
const output = input.reduce(xf.step, xf.init());
// output = 24 (=1*2*3*4)

我们的最终目标是将 转换 与输入、输出解耦,所以我们将 reduce 定义为函数的形式。

1
2
3
4
const reduce = (xf, init, input) => {
let result = input.reduce(xf.step, init);
return xf.result(result);
};

为了使用 reduce ,我们向 reduce 传入 transformer、初始值和输入源。上述实现结合了 transformer 的 step 函数和数组的 reduce 函数,并将 step 函数的结果作为输出。这里的 reduce 内部实现仍然假设输入类型为数组。 稍后将去掉这个假设。

我们显式地向 reduce 传入一个初始值,其实本可以使用 transformer 的 init 函数提供初始值,但考虑到 reduce 函数的灵活性,需要能够自定义初始值。在实践中,transformer 的 init 函数仅在 reduce 未提供初始值的情况下使用。

新 reduce 函数的使用类似于之前的 reduce 。

1
2
3
4
5
6
7
8
9
const input = [2,3,4];
const xf = createTransformer(sum);
const output = reduce(xf, xf.init(), input);
// output = 10 (=1+2+3+4)

const input = [2,3,4];
const xf = createTransformer(mult);
const output = reduce(xf, xf.init(), input);
// output = 24 (=1*2*3*4)

若想要改变初始值,还可以显式地向 reduce 传入初始值。

1
2
3
4
5
6
7
8
9
const input = [2,3,4];
const xf = createTransformer(sum);
const output = reduce(xf, 2, input);
// output = 11 (=2+2+3+4)

const input = [2,3,4];
const xf = createTransformer(mult);
const output = reduce(xf, 2, input);
// output = 48 (=2*2*3*4)

reduce 函数现在需要一个 transformer。由于 transformer 的 init 函数未在 reduce 内部用到,且 result 通常是个恒等函数:一个直接返回输入本身的函数,我们将定义一个辅助函数,先将 reducer 转换为 transformer,然后将 transformer 传入 reduce 使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
const reduce = (xf, init, input) => {
if(typeof xf === 'function'){
// 确保含有 transformer
xf = wrap(xf);
}
const result = input.reduce(xf.step, init);
return xf.result(result);
}

const wrap = xf => ({
// 1. 我们会显式的传入一个 reduce 的初始值,
// 所以这里不再需要内部的 init 函数
init: () => {
throw new Error('init not supported');
},

// 2. 每次输入一个元素,并将本次计算结果
// 传给下次迭代
step: xf,

// 3. 将最后一次的计算结果作为输出
result: result => result,
}

首先我们检查参数 xf 的类型是否为 function。若是,我们假定它是一个 reducer (step) 函数, 并调用 wrap 函数将其转换为 transformer。然后像之前一样调用 reduce 。

现在已经可以直接向 reduce 传递 reducer 了。

1
2
3
4
5
6
7
const input = [2,3,4];
const output = reduce(sum, 1, input);
// output = 10 (=1+2+3+4)

const input = [2,3,4];
const output = reduce(mult, 2, input);
// output = 48 (=2*2*3*4)

reduce

但我们仍然可以向 reduce 传 transformers。

1
2
3
4
5
6
7
8
9
const input = [2,3,4];
const xf = wrap(sum);
const output = reduce(xf, 2, input);
// output = 11 (=2+2+3+4)

const input = [2,3,4];
const xf = wrap(mult);
const output = reduce(xf, 1, input);
// output = 24 (=1*2*3*4)

注意,这里在外部使用 wrap 将已有的 reducer 转换为 transformer。这在使用 transducers 会经常碰到:开发者将转换定义为简单的函数,transducers 库会自动将将其转换为 transformer。

不一样的数组拷贝

目前,我们一直使用数字作为初始值和加法作为 reducer 。其实不一定非要这样,reduce 也可以将数组作为初始值。

1
2
3
4
5
const append = (result, item) => result.push(item);

const input = [2,3,4];
const output = reduce(append, [], input);
// output = [2, 3, 4]

我们定义了一个步进函数(stepper/reducer)append,用于将每个元素拷贝到新数组中,并返回该数组。借助 append, reduce 便可以创建一份数组的拷贝。

上述操作是否够酷?或许算不上…。当你在将元素拷贝到数组前,先对它变换一下,情况开始变得有趣起来。

最孤单的数字

(注:One is the loneliest number,一句英文歌词,引出 plus1 转换)

假设我们想让每个元素加1,定义一个加1函数。

1
const plus1 = item => item + 1;

使用上面的函数创建一个 transformer,在 transformer 的 step 中,会对每个元素进行转换。

1
2
3
4
5
6
7
8
9
10
11
const xfplus1 = {
init: () => { throw new Error('init not needed'); },

step: (result, item) => {
const plus1ed = plus1(item);
return append(result, plus1ed);
},
// step: (result, item) => append(result, plus1(item)),

result: result => result,
}

我们来使用 transformer 输出每步的计算结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
const xf = xfplus1;
const init = [];
let result = xf.step(init, 2);
// [3] (=append([], 2+1)))

result = xf.step(result, 3);
// [3,4] (=append([3], 3+1)))

result = xf.step(result, 4);
// [3,4,5] (=append([3,4], 4+1)))

const output = xf.result(result);
// [3,4,5]

我们使用一个 transformer 来遍历元素:将每个元素加 1 后,添加到输出数组中。

如果想计算数组元素加 1 后的总和,该怎么办呢?可以使用 reduce 。

1
2
const output = reduce(sum, 0, output);
// 12 (=0+3+4+5)

上述方案虽然可行,但不幸的是,我们在获得最终答案的过程中,不得不创建一个中间数组。有更好的方案吗?

答案是有的。再来看下上面的 xfplus1 ,如果将 append 替换为 sum ,并且以 0 作为初始值,就可以定义一个直接对元素求和,但不会生成中间结果(数组)的 transformer。

但是,有时我们想立即查看替换 reducer 后的结果,因为涉及到的改变仅仅只是将 append 替换成了 sum。因此我们希望有一个能够创建 转换 的函数,该函数不依赖于用于组合结果的 transformer。

1
2
3
4
5
6
7
8
const transducerPlus1 = (xf) => ({
init: () => xf.init(),
step: (result, item) => {
const plus1ed = plus1(item);
return xf.step(result, plus1ed);
},
result: result => xf.result(result),
});

该函数接受一个 transformer:xf,返回一个基于 xf 的新 transformer。新 transformer 将经过 plus1 转换的值传给 xf,相当于对 xf 进行代理。由于使用 step 函数可以完全定义这个转换,所以新 transformer 可以复用旧 transformer – xfinitresult 函数。新 transformer 每次对元素进行 plus1 转换后,利用转换的值对封装的 transformer 的 step 函数进行调用。

Transducer

我们刚刚创建了第一个 transducer:一个接受已有 transformer,返回新 transformer 的函数。transducer 会将一些额外的处理行为委托给新 transformer。

我们来尝试一下,使用刚才的 transducer 来重新实现前面的例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const stepper = wrap(append);
const init = [];
const transducer = transducerPlus1;
const xf = transducer(stepper);
let result = xf.step(init, 2);
// [3] (=append([], 2+1)))

result = xf.step(result, 3);
// [3,4] (=append([3], 3+1)))

result = xf.step(result, 4);
// [3,4,5] (=append([3,4], 4+1)))

const output = xf.result(result);
// [3,4,5]

运行过程和结果与之前相同,很好。唯一的区别是 transformer:xf 的创建。我们使用 wrapappend 转换成名为 stepper 的 transformer,然后使用 transducer 封装这个 stepper 并返回一个 plus1 转换。然后我们就可以像从前一样使用转换函数:xf 对每个元素逐步操作,并得到结果。

中间辅助元素

从现在开始,事情变得有趣起来:我们可以用同一 transducer,仅在改变 stepper 和初始值的情况下,获得最终的累加和,而不需要中间辅助数组。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const stepper = wrap(sum);
const init = 0;
const transducer = transducerPlus1;
const xf = transducer(stepper);
let result = xf.step(init, 2);
// 3 (=sum(0, 2+1)))

result = xf.step(result, 3);
// 7 (=sum(3, 3+1)))

result = xf.step(result, 4);
// 12 (=sum(7, 4+1)))

const output = xf.result(result);
// 12

不需要计算中间数组,一次遍历即可得到结果。sumappend 例子只有两处不同:

  • 创建 stepper 时,用 sum 代替 append 进行封装。
  • 初始值使用 0 代替 []。

仅此两处差异,其他完全一样。

需要注意的是,只有 stepper 转换知道 result 的数据类型。当封装 sum 时,result 的类型为数字,封装 append 时,result 的类型是数组。初始值类型与 stepper 的 result 参数类型相同。每次迭代元素的类型不限,因为 stepper 知道如何组合上次输出的结果和当前的元素,并返回一个新的组合的结果;本次输出结果可能会用于下次迭代中的组合,如此迭代循环。

这些特性允许我们定义独立于输出的转换。

可能会像 1 一样糟糕

(注:第二句歌词,Can be as bad as one,作者意思应该是,如果 plus2 还跟 plus1 一样从头重新实现一遍,就比较坑了)

假如我们想要 plus2,需要做哪些改变呢?我们可以像定义 transducerPlus1 那样,定义一个新的 transducerPlus2 。看看 transducerPlus1 是如何实现的,并决定做哪些改变。但这样做违反了 DRY 原则。

有更好的方案吗?

实际上,除了将 transformer 中 step 函数的值由 plus1 替换为 plus2 之外,其他的没变。

因此,可以将 plus1 提取出来,并将其作为 f 传进去。

1
2
3
4
5
6
7
8
const map = f => xf => ({
init: () => xf.init(),
step: (result, item) => {
const mapped = f(item);
return xf.step(result, mapped);
},
result: result => xf.result(result),
});

我们定义了 mapping transducer,我们使用它来单步执行转换过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const plus2 = input => input + 2;
const transducer = map(plus2);
const stepper = wrap(append);
const xf = transducer(stepper);
const init = [];
let result = xf.step(init, 2);
// [4] (=append([], 2+2)))

result = xf.step(result, 3);
// [4,5] (=append([4], 3+2)))

result = xf.step(result, 4);
// [4,5,6] (=append([4,5], 4+1)))

const output = xf.result(result);
// [4,5,6]

本例相较于之前 plus1append 的例子,唯一的区别在于使用 map 创建 transducer。我们可以类似地使用 map(plus1) 来创建 plus1 transducer。transducerPlus1 虽然只是短暂的出现便被 map(plus1) 代替,但它对我们理解 transduce 的内部原理帮助很大。

Transduce

上面的例子讲解了如何使用 transducers 对一系列输入进行转换。我们来分解一下其中的步骤。

首先调用一个包含 stepper 转换的 transducer 来初始化转换,并定义 transduce 的初始值。

1
2
3
4
const transducer = map(plus1);
const stepper = wrap(append);
const xf = transducer(stepper);
const init = [];

然后使用 xf.step 来单步处理每个元素。我们使用初始值作为传入 step 函数的第一个 result 参数,上一个 step 函数的返回值供后面的 step 函数迭代使用。

1
2
3
4
5
6
7
8
let result = xf.step(init, 2);
// [3] (=append([], 2+1)))

result = xf.step(result, 3);
// [3,4] (=append([3], 3+1)))

result = xf.step(result, 4);
// [3,4,5] (=append([3,4], 4+1)))

我们使用 xf.result 输出最终结果。

1
2
const output = xf.result(result);
// [3,4,5]

可能你已经注意到了,这与上面定义的 reduce 实现非常相似。事实上确实是这样。我们可以将这个过程封装成一个新的函数 transduce

1
2
3
4
5
6
7
8
9
10
11
12
const transduce = (transducer, stepper, init, input) => {
if(typeof stepper === 'function'){
// 确保迭代时,存在可用的 transformer
stepper = wrap(stepper);
}

// 传入 stepper 来创建 transformer
const xf = transducer(stepper);
// xf 是一个 transformer
// 现在可以使用上面定义的 reduce 来迭代并变换输入
return reduce(xf, init, input);
};

就像 reduce,我们需要确保 stepper 是一个 transformer。然后通过向 transducer 传入 stepper 来创建新的 transformer。 最后,我们使用包含新的 transformer 的 reduce 来进行迭代和转换数据。也就是说 transducer 的函数类型为:transformer -> transformer。

我们来实践一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
const transducer = map(plus1);
const stepper = append;
const init = [];
const input = [2,3,4];
const output = transduce(transducer, stepper, init, input);
// [3,4,5]

const transducer = map(plus2);
const stepper = append;
const init = [];
const input = [2,3,4];
const output = transduce(transducer, stepper, init, input);
// [4,5,6]

上述两例的唯一区别是传递给 map 的函数不同。

我们来尝试一下不同的步进函数(step function)和初始值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
const transducer = map(plus1);
const stepper = sum;
const init = 0;
const input = [2,3,4];
const output = transduce(transducer, stepper, init, input);
// 12 (=3+4+5)

const transducer = map(plus2);
const stepper = sum;
const init = 0;
const input = [2,3,4];
const output = transduce(transducer, stepper, init, input);
// 15 (=4+5+6)

const transducer = map(plus1);
const stepper = mult;
const init = 1;
const input = [2,3,4];
const output = transduce(transducer, stepper, init, input);
// 60 (=3*4*5)

const transducer = map(plus2);
const stepper = mult;
const init = 1;
const input = [2,3,4];
const output = transduce(transducer, stepper, init, input);
// 120 (=4*5*6)

transduce

这里我们只是改变了 stepper 和初始值,便可以得到不同的结果。我们可以在不依赖中间结果的情况下,遍历一次便可求得累加和或乘积。

组合

如果我们想加3,该怎么办呢?我们可以定义 plus3 并且使用 map,但利用 transducers 的一个特性来进行优化。

事实上,可以通过已有的两个函数:plus1plus2,来定义 plus3

1
const plus3 = item => puls2(plus1(item));

或许你已经看出来,其实这就是函数组合。让我们通过函数组合来重新定义 plus3

1
2
3
4
5
6
const compose2 = (fn1, fn2) => item => fn1(fn2(item));

const plus3 = compose2(plus1, plus2);

const output = [plus3(2), plus3(3), plus3(4)];
// [5,6,7]

compose2 用于组合两个函数,调用顺序从右向左,看一下 compose2 的实现就可以知道为什么调用顺序是从右向左的了。最后一个 function 接受传入参数,返回结果作为下个 function 的输入。如此迭代,直到输出结果。

让我们使用 compose2 来定义一个对每个迭代元素加 3 的 transducer,该 transducer 由 plus1plus2 组合而成。

1
2
3
4
5
6
7
const transducerPlus3 = map(compose2(plus1, plus2));
const transducer = transducerPlus3;
const stepper = append;
const init = [];
const input = [2,3,4];
const output = transduce(transducer, stepper, init, input);
// [5,6,7]

我们对已有的函数:plus1plus2 进行组合,生成传入 map 的函数,而不是从头重新实现一遍 plus3

为什么要告诉你上面这些呢?实际上,我们还可以通过组合其他的 transducers 来创建新的 transducers。

1
2
3
4
5
6
7
8
9
const transducerPlus1 = map(plus1);
const transducerPlus2 = map(plus2);
const transducerPlus3 = compose2(transducerPlus1, transducerPlus2);
const transducer = transducerPlus3;
const stepper = append;
const init = [];
const input = [2,3,4];
const output = transduce(transducer, stepper, init, input);
// [5,6,7]

新组合出来的 transducer 又可以用于组合其他的 transducer 。

1
2
3
4
5
6
7
8
9
const transducerPlus1 = map(plus1);
const transducerPlus2 = map(plus2);
const transducerPlus3 = compose2(transducerPlus1, transducerPlus2);
const transducerPlus4 = compose2(transducerPlus3, transducerPlus1);
const transducer = transducerPlus4;
const stepper = append;
const init = [];
const input = [2,3,4];
const output = transduce(transducer, stepper, init, input);

compose_transducers

再次注意,这里与本节前面的例子的唯一区别仅仅在于 transducer 的创建,其它都一样。

之所以可以使用 组合,是因为 transducers 可以接受已有 transformer,返回新的 transformer。也即 transducer 的输入与输出类型相同,且为单输入单输出的函数。只要符合上述条件,便可以使用函数组合来创建接受相同类型参数的新函数。

由上可得,transducers 是一种 “可组合的算法变换”。这在实际开发中非常有用:可以将新的变换分解为一系列较小的变换,然后将它们通过 composepipe 以流水线( pipeline)的组合起来。后面后有更多的例子进行演示。

事实上,虽然函数组合的顺序从右向左,而转换本身是自左向右的(译者注:这也是理解 transduce 的难点之一,理解了这个,也就基本理解了 transduce。可以通过单个 transducer 和 transformer 的组合,来理解转换的调用顺序。transduce 本质上做的事情是 在对每个元素进行归并之前先对其进行转换 ,将这句话在自己心里默念三遍:),这也是 transduce 区别于 reduce 的“唯一”不同点)。

在上面的 transducersPlus4 示例中,每个元素先进行 plus3 转换,然后进行 plus1 转换。

虽然在本例中 transducers 的调用顺序对结果没有影响,但是从左向右的变换顺序还是需要牢记在心。变换的调用顺序让你在阅读代码时更容易理解,因为它与你的阅读顺序是一致的。

part 1 总结

Transducers 将 “可组合的算法转换” 抽象出来,使其独立于输入、输出,甚至迭代的整个过程。

本文演示了如何使用 transducers 来抽象算法转换,transducer 将一个 transformer 转换为另一个 transformer。transformer 可以用于 transduce,进行迭代和转换输入源。

Underscore.jsLo-Dash 在进行数组或对象计算时,往往会产生中间结果。而 transducers 以一系列函数的方式定义转换,类似于 reduce 的处理方式:将初始值作为首次迭代的累积值,使用累积值和当前元素进行计算,返回转换后的累积值,并将其作为下次迭代的累积值参数。一旦将这些转换从数据中抽象出来,就可以将相同的转换应用于不同的处理过程,这些处理过程以某个初始的累积值开始,不断对变换后的元素累积。

我们已经展示了同一 transducer 可以应用于不同的 “输出源”,只需改变初始累积值和 reducer 函数。这种抽象的好处之一为:可以只通过一轮计算便得到结果,且不会产生中间辅助数组。

尽管没有明确说明,我们还是展示了 transducer 是与迭代过程及输入源解耦的。在迭代过程中,我们使用相同的 transducer 对元素进行转换,并将转换结果传给 step 函数,并且使用数组的 reduce 从数组中拉取待处理的数据元素。

还想了解更多!

看这里 ,之后的文章中将会在本文基础上探讨更多主题,例如:transducers 并不一定需要每步都输出元素;并且迭代过程可以提前终止,并返回终止前已经归并的结果。本文只讨论了 step,未讨论 init 和 result,将来也会进行补充。

我们还将会学习到,输入源可以是任意产生一系列值的模型:惰性列表,不确定的序列生成器,CSP(通信顺序进程),Node.js streams
lazy lists, indefinite sequence generation, CSP[http://phuu.net/2014/08/31/csp-and-transducers.html), [push streams][12], Node.js streams, iterators, generators, immutable-js data structures, 等等。

等不及了!

在此期间,可以看看 Clojure文档, 或者看看 这个视频这篇文章。其他更多更好的介绍,可以自行 Google 。

想要立刻实践一下?已经有三个库实现了相似的API:transducers-jstransducers.jsramda(译者注:ramda 中 transducer 部分也是本文作者写的)。本文是按照 transducers-js 的实现进行讲解和演示的,但这些概念同样适用于 transducers.js。

Underscore.js 的爱好者?可以查看 underarm,基于 transduce 库(译者注:本文作者写的库)写的。

怎样将 transducer 应用到 Node.js streams 中呢?我们还在探索。

希望得到新文章的通知?可以在 Twitter 上关注 simplectic

map、reduce 和 filter 是函数式编程中用到频率非常高的三个函数,对类Array 类型操作非常方便,被称为 FP 中的三板斧。还有几个相关的稍微复杂的函数也一并介绍: chain(flattenMap)、transduce、into、reduced。

图解 map/reduce/filter

虽然这是 Underscore 方式的写法,但不影响对三个概念的理解.

map/reduce/filter

源码分析

  • map

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    module.exports = _curry2(_dispatchable('map', _xmap, function map(fn, functor) {
    switch (Object.prototype.toString.call(functor)) {
    case '[object Function]':
    return curryN(functor.length, function() {
    return fn.call(this, functor.apply(this, arguments));
    });
    case '[object Object]':
    return _reduce(function(acc, key) {
    acc[key] = fn(functor[key]);
    return acc;
    }, {}, keys(functor));
    default:
    return _map(fn, functor);
    }
    }));

Ramda 目录说明

Ramda API 的源码都在 src/ 文件夹中,src/ 下包含一个 internal/ 文件夹和若干外部模块。直接在 src/ 下编写的模块(函数)供外部调用,在 internal/ 下编写的模块仅供内部使用,属于帮助函数,内部函数以下划线 “_” 开头

本次源码分析使用版本号为:Ramda v0.22.1。

源码分析

  • __ 占位符

    占位符是一个 “普通的” object ,key 为 @@functional/placeholder ,value 为 true。

    1
    module.exports = {'@@functional/placeholder': true};

    配合柯里化函数使用,可以使柯里化函数传入实参不在限于从左往右依次传入,大大增强了柯里化函数的能力。

    举例如下,g 是一个柯里化的 ternary(三元)函数,_ 代表 R.__ ,下面的写法是等价的。

    1
    2
    3
    4
    5
    6
    7
    8
    g(1, 2, 3)
    g(_, 2, 3)(1)
    g(_, _, 3)(1)(2)
    g(_, _, 3)(1, 2)
    g(_, 2, _)(1, 3)
    g(_, 2)(1)(3)
    g(_, 2)(1, 3)
    g(_, 2)(_, 3)(1)
  • _isPlaceholder

    判断实参是否为占位符(R.__),在柯里化函数中使用。

    1
    2
    3
    4
    5
    module.exports = function _isPlaceholder(a) {
    return a != null &&
    typeof a === 'object' &&
    a['@@functional/placeholder'] === true;
    };
  • _curry1

    优化的内部单参数柯里化函数:对单参数函数fn 进行柯里化,返回柯里化了的 fn’ → f1。

    当传入参数为空或者传入的是占位符,返回 f1;否则执行 f1 ,并返回执行结果。

    柯里化用到了闭包。_curry1/_curry2/_curry3 是为了柯里化 ramda API 优化用的。因为 ramda API 都是原生柯里化的,且参数一般不超过3个,所以用到上述3个内部优化的柯里化函数,以提高效率。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    module.exports = function _curry1(fn) {
    return function f1(a) {
    if (arguments.length === 0 || _isPlaceholder(a)) {
    return f1;
    } else {
    return fn.apply(this, arguments);
    }
    };
    };
  • _curry2

    优化的内部双参数柯里化函数:对双参数函数 fn 进行柯里化,返回柯里化了的 fn’ → f2。

    原理讲解:

    1. 当无参数传入时,返回 f2;
    2. 当有一个参数(a)传入时,判断该参数是否为 R.__ :是则返回 f2,否则返回加持一个参数(a)的单参数柯里化函数;
    3. 当传入两个参数时,若都是占位符,返回 f2,若有一个占位符则返回加持一个参数的单参数柯里化函数,若无占位符,则执行 fn。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    module.exports = function _curry2(fn) {
    return function f2(a, b) {
    switch (arguments.length) {
    case 0:
    return f2;
    case 1:
    return _isPlaceholder(a) ? f2
    : _curry1(function(_b) { return fn(a, _b); });
    default:
    return _isPlaceholder(a) && _isPlaceholder(b) ? f2
    : _isPlaceholder(a) ? _curry1(function(_a) { return fn(_a, b); })
    : _isPlaceholder(b) ? _curry1(function(_b) { return fn(a, _b); })
    : fn(a, b);
    }
    };
    };
  • _arity

    没有看出来 _arity 是干嘛用的,控制(限制)函数参数的个数?

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    module.exports = function _arity(n, fn) {
    /* eslint-disable no-unused-vars */
    switch (n) {
    case 0: return function() { return fn.apply(this, arguments); };
    case 1: return function(a0) { return fn.apply(this, arguments); };
    case 2: return function(a0, a1) { return fn.apply(this, arguments); };
    case 3: return function(a0, a1, a2) { return fn.apply(this, arguments); };
    case 4: return function(a0, a1, a2, a3) { return fn.apply(this, arguments); };
    case 5: return function(a0, a1, a2, a3, a4) { return fn.apply(this, arguments); };
    case 6: return function(a0, a1, a2, a3, a4, a5) { return fn.apply(this, arguments); };
    case 7: return function(a0, a1, a2, a3, a4, a5, a6) { return fn.apply(this, arguments); };
    case 8: return function(a0, a1, a2, a3, a4, a5, a6, a7) { return fn.apply(this, arguments); };
    case 9: return function(a0, a1, a2, a3, a4, a5, a6, a7, a8) { return fn.apply(this, arguments); };
    case 10: return function(a0, a1, a2, a3, a4, a5, a6, a7, a8, a9) { return fn.apply(this, arguments); };
    default: throw new Error('First argument to _arity must be a non-negative integer no greater than ten');
    }
    };
  • curryN

    length:待柯里化函数参数的个数,fn:带柯里化函数

    参数个数为1,用 _curry1 对 fn 柯里化;参数个数范围为 (1,10] ,用 _curryN。

    1
    2
    3
    4
    5
    6
    module.exports = _curry2(function curryN(length, fn) {
    if (length === 1) {
    return _curry1(fn);
    }
    return _arity(length, _curryN(length, [], fn));
    });
  • curry

    平时实际用到的柯里化函数

    1
    2
    3
    module.exports = _curry1(function curry(fn) {
    return curryN(fn.length, fn);
    });