【Android】Fresco图片加载框架(二)————Producer
/** * 本文可以随意转载到任何网站或者App, * BUT * 转载也要按“基本法”, * 请注明原文出处和作者 */
fresco官方高大上介绍(1)(注意:前方有堵墙)
fresco官方高大上介绍(2)(注意:前方有堵墙)
介绍:
上一篇大概介绍了fresco这个lib的整体结构和流程,这篇主要介绍fresco中关键的一部分--Producer。
个人觉得,Producer基本是整个ImagePipeline module的核心,串联了整个图片读取的流程和各个细节(decode,resize等等)的处理,而且感觉整个设计上很有意思,读完感觉收益匪浅。
正文:
(分析主要是代码,相当枯燥~~~sigh)
以一次网络请求的例,进行分析,其他类型的请求,例如从cache中读取图片等,都差不多。
当SimpleDraweeView#setController后,图片拉取的流程就开始了(详细流程可以参看上一篇的流程图)。忽略掉大部分细节,流程会来到(PipelineDraweeControllerBuilder.java):
@Override protected DataSource<CloseableReference<CloseableImage>> getDataSourceForRequest( ImageRequest imageRequest, Object callerContext, boolean bitmapCacheOnly) { if (bitmapCacheOnly) { return mImagePipeline.fetchImageFromBitmapCache(imageRequest, callerContext); } else { return mImagePipeline.fetchDecodedImage(imageRequest, callerContext); } }
这里就是开始图片拉取,转入到ImagePipeline核心流程。这里就是调用的ImagePipeline的fetchDecodeImage方法,从名字看,意思就是“获取一张decode(解码)的图片”,其代码(ImagePipeline.java):
/** * Submits a request for execution and returns a DataSource representing the pending decoded * image(s). * <p>The returned DataSource must be closed once the client has finished with it. * @param imageRequest the request to submit * @return a DataSource representing the pending decoded image(s) */ public DataSource<CloseableReference<CloseableImage>> fetchDecodedImage( ImageRequest imageRequest, Object callerContext) { try { Producer<CloseableReference<CloseableImage>> producerSequence = mProducerSequenceFactory.getDecodedImageProducerSequence(imageRequest); return submitFetchRequest( producerSequence, imageRequest, ImageRequest.RequestLevel.FULL_FETCH, callerContext); } catch (Exception exception) { return DataSources.immediateFailedDataSource(exception); } }
这里有两个主要的函数:getDecodedImageProducerSequence和submitFetchRequest
getDecodedImageProducerSequence的返回值就是一个Producer<CloseableReference<CloseableImage>>。该值作为参数,传到第二个函数submitFetchRequest中,先看submitFetchRequest:
private <T> DataSource<CloseableReference<T>> submitFetchRequest( Producer<CloseableReference<T>> producerSequence, ImageRequest imageRequest, ImageRequest.RequestLevel lowestPermittedRequestLevelOnSubmit, Object callerContext) { try { ImageRequest.RequestLevel lowestPermittedRequestLevel = ImageRequest.RequestLevel.getMax( imageRequest.getLowestPermittedRequestLevel(), lowestPermittedRequestLevelOnSubmit); SettableProducerContext settableProducerContext = new SettableProducerContext( imageRequest, generateUniqueFutureId(), mRequestListener, callerContext, lowestPermittedRequestLevel, /* isPrefetch */ false, imageRequest.getProgressiveRenderingEnabled() || !UriUtil.isNetworkUri(imageRequest.getSourceUri()), imageRequest.getPriority()); return CloseableProducerToDataSourceAdapter.create( producerSequence, settableProducerContext, mRequestListener); } catch (Exception exception) { return DataSources.immediateFailedDataSource(exception); } }
然后上面的create函数,就会一直到(AbstractProducerToDataSourceAdapter.java):
protected AbstractProducerToDataSourceAdapter( Producer<T> producer, SettableProducerContext settableProducerContext, RequestListener requestListener) { mSettableProducerContext = settableProducerContext; mRequestListener = requestListener; mRequestListener.onRequestStart( settableProducerContext.getImageRequest(), mSettableProducerContext.getCallerContext(), mSettableProducerContext.getId(), mSettableProducerContext.isPrefetch()); producer.produceResults(createConsumer(), settableProducerContext); }
到最后,submitFetchRequest会调用到Producer#produceResults方法,而这个producer就是前面那个getDecodedImageProducerSequence方法产生的,所以回头看这个最最关键的地方。
getDecodedImageProducerSequence是ProducerSequenceFactory.java的方法:
/** * Returns a sequence that can be used for a request for a decoded image. * * @param imageRequest the request that will be submitted * @return the sequence that should be used to process the request */ public Producer<CloseableReference<CloseableImage>> getDecodedImageProducerSequence( ImageRequest imageRequest) { Producer<CloseableReference<CloseableImage>> pipelineSequence = getBasicDecodedImageSequence(imageRequest); if (imageRequest.getPostprocessor() != null) { return getPostprocessorSequence(pipelineSequence); } else { return pipelineSequence; } }
从注释看,方法的意思就是返回一个用于请求decoded图片的sequence,而事实上,应该是返回一个Producer才对啊,
那为什么是强调是sequence Producer,而不是,仅仅就是一个Producer?
带着疑问继续看:
private Producer<CloseableReference<CloseableImage>> getBasicDecodedImageSequence( ImageRequest imageRequest) { Preconditions.checkNotNull(imageRequest); Uri uri = imageRequest.getSourceUri(); Preconditions.checkNotNull(uri, "Uri is null."); if (UriUtil.isNetworkUri(uri)) { return getNetworkFetchSequence(); } else if (UriUtil.isLocalFileUri(uri)) { if (MediaUtils.isVideo(MediaUtils.extractMime(uri.getPath()))) { return getLocalVideoFileFetchSequence(); } else { return getLocalImageFileFetchSequence(); } } else if (UriUtil.isLocalContentUri(uri)) { return getLocalContentUriFetchSequence(); } else if (UriUtil.isLocalAssetUri(uri)) { return getLocalAssetFetchSequence(); } else if (UriUtil.isLocalResourceUri(uri)) { return getLocalResourceFetchSequence(); } else if (UriUtil.isDataUri(uri)) { return getDataFetchSequence(); } else { String uriString = uri.toString(); if (uriString.length() > 30) { uriString = uriString.substring(0, 30) + "..."; } throw new RuntimeException("Unsupported uri scheme! Uri is: " + uriString); } }
看代码可知道,就是根据ImageRequest的Uri,选择一个sequence Producer,我们这里假设是网络请求图片,所以选择的是图中红色方法getNetworkFetchSequence,它也是返回一个Producer(可粗略看):
/** * swallow result if prefetch -> bitmap cache get -> * background thread hand-off -> multiplex -> bitmap cache -> decode -> multiplex -> * encoded cache -> disk cache -> (webp transcode) -> network fetch. */ private synchronized Producer<CloseableReference<CloseableImage>> getNetworkFetchSequence() { if (mNetworkFetchSequence == null) { mNetworkFetchSequence = newBitmapCacheGetToDecodeSequence(getCommonNetworkFetchToEncodedMemorySequence()); } return mNetworkFetchSequence; }
先看红色代码,getCommonNetworkFetchToEncodedMemorySequence,它也是返回一个Producer(可粗略看):
/** * multiplex -> encoded cache -> disk cache -> (webp transcode) -> network fetch. */ private synchronized Producer<EncodedImage> getCommonNetworkFetchToEncodedMemorySequence() { if (mCommonNetworkFetchToEncodedMemorySequence == null) { Producer<EncodedImage> inputProducer = newEncodedCacheMultiplexToTranscodeSequence( mProducerFactory.newNetworkFetchProducer(mNetworkFetcher)); mCommonNetworkFetchToEncodedMemorySequence = ProducerFactory.newAddImageTransformMetaDataProducer(inputProducer); if (mResizeAndRotateEnabledForNetwork && !mDownsampleEnabled) { mCommonNetworkFetchToEncodedMemorySequence = mProducerFactory.newResizeAndRotateProducer( mCommonNetworkFetchToEncodedMemorySequence); } } return mCommonNetworkFetchToEncodedMemorySequence; }
再看,newEncodedCacheMultiplexToTranscodeSequence,它也是返回一个Producer(可粗略看):
/** * encoded cache multiplex -> encoded cache -> (disk cache) -> (webp transcode) * @param inputProducer producer providing the input to the transcode * @return encoded cache multiplex to webp transcode sequence */ private Producer<EncodedImage> newEncodedCacheMultiplexToTranscodeSequence( Producer<EncodedImage> inputProducer) { if (Build.VERSION.SDK_INT < Build.VERSION_CODES.JELLY_BEAN_MR2) { inputProducer = mProducerFactory.newWebpTranscodeProducer(inputProducer); } inputProducer = mProducerFactory.newDiskCacheProducer(inputProducer); EncodedMemoryCacheProducer encodedMemoryCacheProducer = mProducerFactory.newEncodedMemoryCacheProducer(inputProducer); return mProducerFactory.newEncodedCacheKeyMultiplexProducer(encodedMemoryCacheProducer); }
选其中一个红色函数看,newDiskCacheProducer,它也是返回一个Producer(可粗略看):
public DiskCacheProducer newDiskCacheProducer( Producer<EncodedImage> inputProducer) { return new DiskCacheProducer( mDefaultBufferedDiskCache, mSmallImageBufferedDiskCache, mCacheKeyFactory, inputProducer); }
好了,到此为止(列出的函数足够多了,晕~~~)
其实上面,一连串几个函数,如果有细心的留意,它们有一个特点,就是,(每一个函数)都以 (上一个函数)产生的Producer作为参数进行传递。
这样的设计,是不是有似曾相识的感觉,看下面的代码应该就能够了解得更深:
FileInputStream fileInputStream = new FileInputStream("/test.txt"); InputStreamReader inputStreamReader = new InputStreamReader(fileInputStream); BufferedReader bufferedReader = new BufferedReader(inputSteamReader);
很熟悉了吧,可以把sequence Producer就看成是上面这样的一个逻辑~~~
那么这样做的作用是什么?我们选一个简单Producer来分析,就以DiskCacheProducer为例:
public DiskCacheProducer( BufferedDiskCache defaultBufferedDiskCache, BufferedDiskCache smallImageBufferedDiskCache, CacheKeyFactory cacheKeyFactory, Producer<EncodedImage> inputProducer) { mDefaultBufferedDiskCache = defaultBufferedDiskCache; mSmallImageBufferedDiskCache = smallImageBufferedDiskCache; mCacheKeyFactory = cacheKeyFactory; mInputProducer = inputProducer; } public void produceResults( final Consumer<EncodedImage> consumer, final ProducerContext producerContext) { ImageRequest imageRequest = producerContext.getImageRequest(); //如果diskcache disabled的话,那么直接执行maybeStartInputProducer if (!imageRequest.isDiskCacheEnabled()) { maybeStartInputProducer(consumer, consumer, producerContext); return; } final ProducerListener listener = producerContext.getListener(); final String requestId = producerContext.getId(); listener.onProducerStart(requestId, PRODUCER_NAME); final CacheKey cacheKey = mCacheKeyFactory.getEncodedCacheKey(imageRequest); final BufferedDiskCache cache = imageRequest.getImageType() == ImageRequest.ImageType.SMALL ? mSmallImageBufferedDiskCache : mDefaultBufferedDiskCache; Continuation<EncodedImage, Void> continuation = new Continuation<EncodedImage, Void>() { //回调 @Override public Void then(Task<EncodedImage> task) throws Exception { //根据task是canceled,fault等状态决定如何执行 if (task.isCancelled() || (task.isFaulted() && task.getError() instanceof CancellationException)) { listener.onProducerFinishWithCancellation(requestId, PRODUCER_NAME, null); consumer.onCancellation(); } else if (task.isFaulted()) { listener.onProducerFinishWithFailure(requestId, PRODUCER_NAME, task.getError(), null); //出错了,就调用maybeStartInputProducer maybeStartInputProducer( consumer, new DiskCacheConsumer(consumer, cache, cacheKey), producerContext); } else { EncodedImage cachedReference = task.getResult(); if (cachedReference != null) { listener.onProducerFinishWithSuccess( requestId, PRODUCER_NAME, getExtraMap(listener, requestId, true)); consumer.onProgressUpdate(1); consumer.onNewResult(cachedReference, true); cachedReference.close(); } else { //没有结果,就调用maybeStartInputProducer listener.onProducerFinishWithSuccess( requestId, PRODUCER_NAME, getExtraMap(listener, requestId, false)); maybeStartInputProducer( consumer, new DiskCacheConsumer(consumer, cache, cacheKey), producerContext); } } return null; } }; AtomicBoolean isCancelled = new AtomicBoolean(false); final Task<EncodedImage> diskCacheLookupTask = cache.get(cacheKey, isCancelled); //执行task,task其实就是从缓存中取结果,执行后,前面的continuation就会被回调 diskCacheLookupTask.continueWith(continuation); subscribeTaskForRequestCancellation(isCancelled, producerContext); } //调用mInputProducer的produceResults private void maybeStartInputProducer( Consumer<EncodedImage> consumerOfDiskCacheProducer, Consumer<EncodedImage> consumerOfInputProducer, ProducerContext producerContext) { if (producerContext.getLowestPermittedRequestLevel().getValue() >= ImageRequest.RequestLevel.DISK_CACHE.getValue()) { consumerOfDiskCacheProducer.onNewResult(null, true); return; } mInputProducer.produceResults(consumerOfInputProducer, producerContext); }
从前面知道,当开始拉取图片的时候,Producer#produceResult开始执行,注释标出了关键的步骤,从这些步骤可以看出,其实DiskCacheProducer拉取图片时,做的任务大概就是:先看Diskcache中是否有缓存的图片,如果有,就直接返回缓存,如果没有,就用inputProducer来处理。
然后,inputProducer处理完结果会怎样呢?它处理的结果会在consumer中接收到,上面的例子代码对应的就是DiskCacheConsumer:
/** * Consumer that consumes results from next producer in the sequence. * * <p>The consumer puts the last result received into disk cache, and passes all results (success * or failure) down to the next consumer. */ private class DiskCacheConsumer extends DelegatingConsumer<EncodedImage, EncodedImage> { private final BufferedDiskCache mCache; private final CacheKey mCacheKey; private DiskCacheConsumer( final Consumer<EncodedImage> consumer, final BufferedDiskCache cache, final CacheKey cacheKey) { super(consumer); mCache = cache; mCacheKey = cacheKey; } //inputProducer的结果会从这里返回,即newResult @Override public void onNewResultImpl(EncodedImage newResult, boolean isLast) { //返回的结果加入cache中 if (newResult != null && isLast) { mCache.put(mCacheKey, newResult); } //回调上一层procducer传进来的consumer getConsumer().onNewResult(newResult, isLast); } }
到这里,应该就大概明白这个sequence Producer的作用了,所谓sequence Producer,其实就是一层层的Producer不断的嵌套连接起来,完成同一个任务,而每一个Producer都相互独立,完成各自任务;同时,Producer间产生的结果,也会相互传递,互为表里。可以称其为“Producer链”,但是“Producer链”本身被抽象成一个Producer,那么对于上层来看,这样一个复杂的处理逻辑就被隐藏起来了,变得更加容易理解。
sequence Producer功能极其强大,不同的Producer的组合,产生了很多不同的效果,对于代码的扩展性,可复用性和灵活性都有很大好处。
例如,MultiplexProducer:
注释:
/** * Producer for combining multiple identical requests into a single request. * * <p>Requests using the same key will be combined into a single request. This request is only * cancelled when all underlying requests are cancelled, and returns values to all underlying * consumers. If the request has already return one or more results but has not finished, then * any requests with the same key will have the most recent result returned to them immediately. * * @param <K> type of the key * @param <T> type of the closeable reference result that is returned to this producer */ @ThreadSafe public abstract class MultiplexProducer<K, T extends Closeable> implements Producer<T>
理解为,多路复用Producer(什么鬼东西?),其实就是将相同的任务合并为一个,例如相同url的重复请求,如何做到的,关键代码:
@Override public void produceResults(Consumer<T> consumer, ProducerContext context) { K key = getKey(context); Multiplexer multiplexer; boolean createdNewMultiplexer; // We do want to limit scope of this lock to guard only accesses to mMultiplexers map. // However what we would like to do here is to atomically lookup mMultiplexers, add new // consumer to consumers set associated with the map's entry and call consumer's callback with // last intermediate result. We should not do all of those things under this lock. do { createdNewMultiplexer = false; synchronized (this) { //根据key获得多路复用器,当缓存没有的时候,才create一个,不然直接忽略 multiplexer = getExistingMultiplexer(key); if (multiplexer == null) { multiplexer = createAndPutNewMultiplexer(key); createdNewMultiplexer = true; } } // addNewConsumer may call consumer's onNewResult method immediately. For this reason // we release "this" lock. If multiplexer is removed from mMultiplexers in the meantime, // which is not very probable, then addNewConsumer will fail and we will be able to retry. } while (!multiplexer.addNewConsumer(consumer, context)); //如果前面没有创建,也就是存在缓存的多路复用器,那么就不会调用startInputProducerIfHasAttachedConsumers,然后inputProducer就不起作用了,这样,就起到合并请求的作用 if (createdNewMultiplexer) { multiplexer.startInputProducerIfHasAttachedConsumers(); } }
不同的功能Producer还有很多,例如对图片进行resize和rotate的ResizeAndRotateProducer, 异步执行任务的ThreadHandoffProducer等等,如此灵活的实现,得益于sequence Factory这种设计。
总结
ImagePipeline的核心Producer,通过sequence的形式,很好的串联了整个图片网络读取,缓存,bitmap处理等流程,通过优秀的设计,保证的代码高扩展性,高可复用性和高灵活性。
这种设计刚好对应就是“pipeline”这个词的含义,就如现代的pipelined CPU一样,把对指令的处理,拆分成多个stage,同时输入输出相互依赖和协作,共同完成一个任务。
~~~文卒~~~