From 5194036b423d455a517d06b38fd616a8bbfc4896 Mon Sep 17 00:00:00 2001 From: zachh Date: Tue, 5 Dec 2017 17:42:58 -0800 Subject: Switched CallLogDataSource interface to be Future based. Bug: 34672501 Test: existing PiperOrigin-RevId: 178038086 Change-Id: I1230992ad04bb4415f5a29bd15802d23dff88012 --- .../common/concurrent/DialerFutureSerializer.java | 98 ++++++++++++++++++++++ 1 file changed, 98 insertions(+) create mode 100644 java/com/android/dialer/common/concurrent/DialerFutureSerializer.java (limited to 'java/com/android/dialer/common') diff --git a/java/com/android/dialer/common/concurrent/DialerFutureSerializer.java b/java/com/android/dialer/common/concurrent/DialerFutureSerializer.java new file mode 100644 index 000000000..2629abbbe --- /dev/null +++ b/java/com/android/dialer/common/concurrent/DialerFutureSerializer.java @@ -0,0 +1,98 @@ +/* + * Copyright (C) 2017 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package com.android.dialer.common.concurrent; + +import static com.google.common.util.concurrent.Futures.immediateCancelledFuture; +import static com.google.common.util.concurrent.Futures.immediateFuture; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; + +import com.google.common.util.concurrent.AsyncCallable; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Serializes execution of a set of operations. This class guarantees that a submitted callable will + * not be called before previously submitted callables have completed. + */ +public final class DialerFutureSerializer { + /** This reference acts as a pointer tracking the head of a linked list of ListenableFutures. */ + private final AtomicReference> ref = + new AtomicReference<>(immediateFuture(null)); + + /** Enqueues a task to run when the previous task (if any) completes. */ + public ListenableFuture submit(final Callable callable, Executor executor) { + return submitAsync(() -> immediateFuture(callable.call()), executor); + } + + /** + * Enqueues a task to run when the previous task (if any) completes. + * + *

Cancellation does not propagate from the output future to the future returned from {@code + * callable}, but if the output future is cancelled before {@link AsyncCallable#call()} is + * invoked, {@link AsyncCallable#call()} will not be invoked. + */ + public ListenableFuture submitAsync(final AsyncCallable callable, Executor executor) { + AtomicBoolean wasCancelled = new AtomicBoolean(false); + final AsyncCallable task = + () -> { + if (wasCancelled.get()) { + return immediateCancelledFuture(); + } + return callable.call(); + }; + /* + * Three futures are at play here: + * taskFuture is the future that comes from the callable. + * newFuture is the future we use to track the serialization of our task. + * oldFuture is the previous task's newFuture. + * + * newFuture is guaranteed to only complete once all tasks previously submitted to this instance + * once the futures returned from those submissions have completed. + */ + final SettableFuture newFuture = SettableFuture.create(); + + final ListenableFuture oldFuture = ref.getAndSet(newFuture); + + // Invoke our task once the previous future completes. + final ListenableFuture taskFuture = + Futures.nonCancellationPropagating( + Futures.submitAsync(task, runnable -> oldFuture.addListener(runnable, executor))); + // newFuture's lifetime is determined by taskFuture, unless taskFuture is cancelled, in which + // case it falls back to oldFuture's. This is to ensure that if the future we return is + // cancelled, we don't begin execution of the next task until after oldFuture completes. + taskFuture.addListener( + () -> { + if (taskFuture.isCancelled()) { + // Since the value of oldFuture can only ever be immediateFuture(null) or setFuture of a + // future that eventually came from immediateFuture(null), this doesn't leak throwables + // or completion values. + wasCancelled.set(true); + newFuture.setFuture(oldFuture); + } else { + newFuture.set(null); + } + }, + directExecutor()); + + return taskFuture; + } +} -- cgit v1.2.3