当前位置:首页 » 计算机系统

使用ConcurrentHashMap实现高效缓存框架

2017-05-14 06:41 本站整理 浏览(11)

  在项目中我们有的时候需要使用某种形式的缓存,使用缓存能够重用之前的计算结果,降低系统延迟,提高吞吐量,但是其却会消耗更多的内存。就像许多重复发明的轮子一样,缓存框架的设计看上去简单,但系统的性能将会由缓存框架的伸缩性决定。如下是一段使用HashMap实现的缓存框架:

public interface Computable<A, V> {
  V compute(A arg) throws InterruptedException;
}
import java.math.BigInteger;

public class ExpensiveFunction implements Computable<String, BigInteger> {
  @Override
  public BigInteger compute(String arg) throws InterruptedException {
    return new BigInteger(arg);
  }
}
import java.util.HashMap;
import java.util.Map;

public class Memorizer1<A, V> implements Computable<A, V> {
  private final Map<A, V> cache = new HashMap<>();
  private final Computable<A, V> c;

  public Memorizer1(Computable<A, V> c) {
    this.c = c;
  }

  @Override
  public synchronized V compute(A arg) throws InterruptedException {
    V result = cache.get(arg);
    if (null == result) {
      result = c.compute(arg);
      cache.put(arg, result);
    }

    return result;
  }
}

  上述代码中,Computable接口定义的是一类用于执行某种类型计算的策略族。ExpensiveFunction实现了Computable接口,该类在概念上是通过传入的参数arg,经过一系列复杂计算而得到结果,这里为了方便起见,只是返回了一个BigInteger对象。Memorizer1类也实现了Computable接口,这里实际上用到了装饰者模式,在构造Memorizer1类时需要传入一个Computable类型对象进来,如ExpensiveFunction,当需要使用ExpensiveFunction类来进行复杂计算时,可以通过Memorizer1类来对其进行装饰,转而调用Memorizer1的compute方法。而在Memorizer1内部,其使用了一个HashMap来对真正的Computable对象(如ExpensiveFunction)的结果进行了缓存,如果传入的参数arg能够在cache中找到结果,那么直接返回,否则调用实际的Computable::compute方法进行计算,通过这种方式达到提高系统新能的目的。

  上述Memorizer1虽然能够实现对计算结果的缓存,但是由于HashMap不是线程安全的,其使用synchronized将整个compute方法包裹起来,当并发量较高时,就会出现多个线程同时竞争执行compute方法的情况,系统的性能可能比不使用缓存更低,此时如何提高Memorizer1的包裹内容的伸缩性将决定了系统的性能。

  观察这段程序会发现,Memorizer1中有两个全局属性,一个是用于缓存的cache,一个是用于计算的c。这里c对象只表示了一组通过传入参数计算结果的策略,其是一个无状态对象(所谓的无状态对象是指其内部没有被多个线程公用的属性);而主要的cache对象才是所有线程需要竞争的对象,因而这里使用锁的时候其实只需要锁住HashMap一个类型的对象即可。在java中对于HashMap,可以使用ConcurrentHashMap来解决多线程竞争map对象的问题,如下是使用ConcurrentHashMap改写Memorizer1的形式:

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class Memorizer2<A, V> implements Computable<A, V> {
  private final Map<A, V> cache = new ConcurrentHashMap<>();
  private final Computable<A, V> c;

  public Memorizer2(Computable<A, V> c) {
    this.c = c;
  }

  @Override
  public V compute(A arg) throws InterruptedException {
    V result = cache.get(arg);
    if (null == result) {
      result = c.compute(arg);
      cache.put(arg, result);
    }

    return result;
  }
}

  上述Memorizer2很好的改善了Memorizer1中存在的不足,并且多线程情况下也能够很好的运行,但是其仍存在一些不足,通过分析Memorizer2::compute方法可以发现,使用cache的位置主要有两个:首先获取cache中键为arg的值,如果不存在则调用Computable::compute方法来计算结果,并将结果保存到cache中。其实在调用cache::get和cache::put方法之间进行了一些计算,当一个线程执行了cache::get方法之后得到了空的result,其会执行高额的Computable::compute方法,此时该线程还未执行到cache::put方法,而另一个线程也调用了cache::get方法,其也将得到一个空的result,这样这两个线程还是有可能执行两次Computable::compute方法,并且由于Computable::compute方法耗费较高,因而重复计算的概率还是比较大的。

  在java类库中提供了另外一个类FutureTask,该类有一个get方法,如果有结果,那么其会立即返回结果,如果还未运行完成,那么其会阻塞,直到得到运行结果为止。这里我们可以通过FutureTask改写上述缓存框架,如下是改写后的代码:

import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

public class Memorizer3<A, V> implements Computable<A, V> {
  private final Map<A, Future<V>> cache = new ConcurrentHashMap<>();
  private final Computable<A, V> c;

  public Memorizer3(Computable<A, V> c) {
    this.c = c;
  }

  @Override
  public V compute(final A arg) throws InterruptedException {
    Future<V> f = cache.get(arg);
    if (null == f) {
      Callable<V> eval = () -> {
          return c.compute(arg);
      };

      FutureTask<V> ft = new FutureTask<V>(eval);
      f = ft;
      cache.put(arg, f);
      ft.run();
    }

    try {
      return f.get();
    } catch (ExecutionException e) {
      throw launderThrowable(e.getCause());
    }
  }

  private InterruptedException launderThrowable(Throwable cause) {
    if (cause instanceof InterruptedException) {
      return (InterruptedException) cause;
    } else {
      return new InterruptedException(cause.getMessage());
    }
  }
}

  这里cache中存储的键还是计算需要的参数,而值则是一个个FutureTask对象,真正的计算放到了一个Callable对象中,在调用FutureTask::run方法时,是实际执行了Computable::compute方法。当一个线程传入一个参数之后,Memorizer3首先在cache中找是否已经有计算结果,如果没有,那么会创建一个FutureTask对象存入到cache中,然后调用FutureTask::run方法计算结果,最后调用FutureTask::get方法返回结果,如果run方法还未计算出结果,那么最后的get方法将会被阻塞,直到产生结果。
      Memorizer3对于缓存的实现几乎是完美的,但是其还是存在重复计算的缺陷,即当一个线程在cache中未找到结果,其创建了FutureTask对象并将其放入cache中,然后执行run方法计算结果的同时,另一个线程也传入同样的参数想要获取计算结果,其也发现cache中没有缓存结果,那么其也会创建一个新的FutureTask对象,并调用FutureTask::run方法计算结果。但是相对于Memorizer2,其出现重复计算的概率已经大大减少了。这里造成这个问题的原因还是在于Map的复合操作“若没有则添加”并不是原子的。在ConcurrentHashMap中提供了一个方法V putIfAbsent(K key, V value),其是一个原子的若不存在则添加方法,并且其会返回当前Map中已经存在的value值,若没有则返回null,如下是通过该方法解决上述重复计算问题的代码:

import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

public class Memorizer<A, V> implements Computable<A, V> {
  private final ConcurrentMap<A, Future<V>> cache = new ConcurrentHashMap<>();
  private final Computable<A, V> c;

  public Memorizer(Computable<A, V> c) {
    this.c = c;
  }

  @Override
  public V compute(final A arg) throws InterruptedException {
    while (true) {
      Future<V> f = cache.get(arg);
      if (null == f) {
        Callable<V> eval = () -> {
          return c.compute(arg);
        };

        FutureTask<V> ft = new FutureTask<V>(eval);
        f = cache.putIfAbsent(arg, ft);
        if (null == f) {
          f = ft;
          ft.run();
        }
      }

      try {
        return f.get();
      } catch (CancellationException e) {
        cache.remove(arg, f);
      } catch (ExecutionException e) {
        throw launderThrowable(e.getCause());
      }
    }
  }

  private InterruptedException launderThrowable(Throwable cause) {
    if (cause instanceof InterruptedException) {
      return (InterruptedException) cause;
    }

    return new InterruptedException(cause.getMessage());
  }
}

  这里通过调用ConcurrentHashMap::putIfAbsent方法,假设两个线程传入了同样的参数,并且都创建了一个FutureTask对象,一个线程获得了cache的执行权限执行了cache::putIfAbsent()方法,并且返回了一个null到局部变量f中,此时另一个线程也会调用cache::putIfAbsent()方法,由于第一个线程已经将相关键值对存入到cache中了,那么第二个线程将获得第一个线程创建的FutureTask对象,并且将其替换给当前线程中的局部变量f,并且其判断f不为null,那么其会调用f::get()方法,而此时第一个线程正在执行FutureTask::run方法,如果其已经计算完成,那么其会返回结果给第一个线程,而第二个线程是直接执行的FutureTask::get方法,如果第一个线程执行完成,那么第二个线程将直接获取结果返回,如果第一个线程没有执行完成,那么第二个线程将等待第一个线程执行完成后再返回结果。

  这里对compute方法使用while循环的目的是,当某个线程在执行结果的时候,其余的线程需要等待该线程执行完成,如果其余的线程由于某些原因等待被打断,那么通过while循环其会继续进入等待状态,从而得到执行结果。而对于某个执行计算结果的线程而言,如果其计算过程被取消或失败,那么对于缓存而言这就造成了缓存污染,因为存入的是FutureTask对象,而不是真正的结果,如果计算的线程被取消,那么实际上FutureTask::get方法将一直无法获取到值,但是cache中对应键的值也不是null的,这就是缓存污染的根源。上述代码中通过调用FutureTask::get方法时捕获CacellationException来捕捉执行计算的线程计算被取消或失败的情况,当出现这种情况的时候就从cache中将相应的FutureTask对象给移除掉,并且通过while循环也可以是后来进入的线程再次执行run方法从而得到计算结果。

  上述的Memorizer基本上是一个完美的缓存类,但是对于缓存而言,其数据如果存在过期问题,那么将需要另外进行设计,从而实现高性能吞吐的目的,当然,如果只是针对一些复杂的计算,只要传入的值不变,其结果永远不会发生变化,那么使用该框架还是非常合适的。