Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ public class CachedSchemaTransactionV2 extends SchemaTransactionV2 {

// MetaDriver doesn't expose unlisten, register the meta listener once.
// Lifecycle: this JVM-global flag is intentionally never reset by
// unlistenChanges() (the underlying gRPC watch is process-wide). If that
// watch is silently dropped after a transport reconnect, recovery is not
// automatic; resetMetaListenerForReconnect() is only a manual hook to let
// the next schema operation install a fresh watch.
// unlistenChanges() (the underlying gRPC watch is process-wide). The driver
// watch self-heals across transport reconnects (PdMetaDriver via KvClient,
// EtcdMetaDriver via Watch.Listener re-subscribe), so the subscription stays
// live and the flag staying true is correct.
private static final AtomicBoolean metaEventListenerRegistered =
new AtomicBoolean(false);

Expand Down Expand Up @@ -251,27 +251,6 @@ static <T> void handleSchemaCacheClearEvent(T response) {
}
}

/**
* Manually reset the JVM-global meta listener flag after detecting that
* the MetaManager transport reconnected and dropped the underlying gRPC
* watch. This method is not wired to a MetaManager/MetaDriver reconnect
* callback today; callers must invoke it explicitly after detecting that
* condition. Without such a manual reset {@link #metaEventListenerRegistered}
* would stay {@code true} forever and this JVM would stop receiving
* cross-node schema cache clear events with no error or warning.
*
* <p>TODO: wire this into MetaManager once it exposes a transport
* reconnect callback (e.g. {@code listenReconnect} /
* {@code onTransportReconnect}). Until then it must be invoked
* explicitly by code that detects the reconnect.
*/
public static void resetMetaListenerForReconnect() {
if (metaEventListenerRegistered.compareAndSet(true, false)) {
LOG.warn("Schema cache clear meta listener lost on reconnect - " +
"will re-register on next schema operation.");
}
}

public void clearCache(boolean notify) {
// Same TOCTOU ordering as clearSchemaCache(String): clear nameCache
// first, then the array attachment, then idCache last.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import org.apache.commons.io.FileUtils;
Expand All @@ -33,7 +36,9 @@
import org.apache.hugegraph.meta.lock.LockResult;
import org.apache.hugegraph.type.define.CollectionType;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Log;
import org.apache.hugegraph.util.collection.CollectionFactory;
import org.slf4j.Logger;

import com.google.common.base.Strings;

Expand All @@ -42,6 +47,7 @@
import io.etcd.jetcd.ClientBuilder;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
import io.etcd.jetcd.options.DeleteOption;
Expand All @@ -57,9 +63,24 @@

public class EtcdMetaDriver implements MetaDriver {

private static final Logger LOG = Log.logger(EtcdMetaDriver.class);

private final Client client;
private final EtcdDistributedLock lock;

// Re-subscribes a dropped watch off the jetcd callback thread; single
// daemon thread, process-lifetime (no close()), so JVM shutdown reclaims it.
private final ScheduledExecutorService reWatchExecutor =
Executors.newSingleThreadScheduledExecutor(r -> {
Thread thread = new Thread(r, "etcd-meta-rewatch");
thread.setDaemon(true);
return thread;
});

// Backoff before re-subscribing a dropped watch. Package-private and
// mutable only so tests can set it to 0; never reassigned in production.
long reWatchDelayMs = 1000L;

public EtcdMetaDriver(String trustFile, String clientCertFile,
String clientKeyFile, Object... endpoints) {
ClientBuilder builder = this.etcdMetaDriverBuilder(endpoints);
Expand All @@ -76,6 +97,13 @@ public EtcdMetaDriver(Object... endpoints) {
this.lock = EtcdDistributedLock.getInstance(this.client);
}

// Package-private constructor for tests: inject a mock Client and skip lock
// setup (watch tests never touch the distributed lock).
EtcdMetaDriver(Client client) {
this.client = client;
this.lock = null;
}

private static ByteSequence toByteSequence(String content) {
return ByteSequence.from(content.getBytes());
}
Expand Down Expand Up @@ -303,9 +331,8 @@ public void unlock(String key, LockResult lockResult) {
@SuppressWarnings("unchecked")
@Override
public <T> void listen(String key, Consumer<T> consumer) {

this.client.getWatchClient().watch(toByteSequence(key),
(Consumer<WatchResponse>) consumer);
this.watchKey(toByteSequence(key), WatchOption.DEFAULT,
(Consumer<WatchResponse>) consumer);
}

/**
Expand All @@ -314,9 +341,63 @@ public <T> void listen(String key, Consumer<T> consumer) {
@SuppressWarnings("unchecked")
@Override
public <T> void listenPrefix(String prefix, Consumer<T> consumer) {
ByteSequence sequence = toByteSequence(prefix);
WatchOption option = WatchOption.newBuilder().isPrefix(true).build();
this.client.getWatchClient().watch(sequence, option, (Consumer<WatchResponse>) consumer);
this.watchKey(toByteSequence(prefix), option,
(Consumer<WatchResponse>) consumer);
}

/**
* Subscribe a watch that survives the terminal close jetcd cannot recover
* from. The bare {@code Consumer} overload discards {@code onError} and
* {@code onCompleted}, so a non-retryable failure silently drops the
* listener (issue #3036).
* <p>
* jetcd 0.5.9 ({@code WatchImpl.WatcherImpl.handleError}) already retries
* <em>retryable</em> errors itself: it notifies {@code onError} and then
* reschedules {@code resume()} on the same watcher. Re-subscribing from
* {@code onError} would therefore open a duplicate watch on every transient
* reconnect, so {@code onError} only logs here. A non-retryable error (or an
* explicit cancel) ends in {@code close()}, which removes the watcher and
* invokes {@code onCompleted}; that is the only point where the watch is
* truly gone, so re-subscribe happens there. The old watcher is already
* closed and removed, so the replacement is not a duplicate.
*/
private void watchKey(ByteSequence key, WatchOption option,
Consumer<WatchResponse> consumer) {
Watch.Listener listener = Watch.listener(
consumer,
throwable -> LOG.warn("etcd meta watch error for key '{}', " +
"jetcd will retry if recoverable",
key.toString(Charset.defaultCharset()),
throwable),
() -> this.scheduleReWatch(key, option, consumer));
this.client.getWatchClient().watch(key, option, listener);
}

private void scheduleReWatch(ByteSequence key, WatchOption option,
Consumer<WatchResponse> consumer) {
this.reWatchExecutor.schedule(() -> this.reWatch(key, option, consumer),
this.reWatchDelayMs, TimeUnit.MILLISECONDS);
}

/**
* Re-establish a watch dropped by a terminal close. If the re-subscribe
* itself fails (e.g. the endpoint is still unreachable), it is retried with
* the same backoff instead of giving up, otherwise a single failed attempt
* would lose the listener permanently.
*/
private void reWatch(ByteSequence key, WatchOption option,
Consumer<WatchResponse> consumer) {
try {
LOG.info("Re-establishing etcd meta watch for key '{}'",
key.toString(Charset.defaultCharset()));
this.watchKey(key, option, consumer);
} catch (Exception e) {
LOG.warn("Failed to re-establish etcd meta watch for key '{}', " +
"retrying in {} ms",
key.toString(Charset.defaultCharset()),
this.reWatchDelayMs, e);
this.scheduleReWatch(key, option, consumer);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package org.apache.hugegraph.meta;

import java.nio.charset.Charset;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.hugegraph.testutil.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.options.WatchOption;
import io.etcd.jetcd.watch.WatchResponse;

/**
* Unit tests for {@link EtcdMetaDriver}'s watch recovery (issue #3036). jetcd
* retries recoverable errors itself, so the driver must re-subscribe only on the
* terminal {@code onCompleted} close, not on every {@code onError}. A mock jetcd
* {@link Client} drives these paths through the package-private test
* constructor; no live etcd is needed.
*/
public class EtcdMetaDriverTest {

@Test
public void testListenReWatchesOnCompleted() {
Watch watch = Mockito.mock(Watch.class);
EtcdMetaDriver driver = newDriver(watch);

driver.listen("k", response -> { });
// onCompleted is jetcd's terminal-close signal: the watcher is gone, so
// the driver must re-subscribe (a second watch() call).
captureListener(watch).onCompleted();

Mockito.verify(watch, Mockito.timeout(2000).times(2))
.watch(Mockito.any(ByteSequence.class),
Mockito.any(WatchOption.class),
Mockito.any(Watch.Listener.class));
}

@Test
public void testListenDoesNotReWatchOnError() {
Watch watch = Mockito.mock(Watch.class);
EtcdMetaDriver driver = newDriver(watch);

driver.listen("k", response -> { });
// jetcd already reschedules the same watcher for recoverable errors;
// re-subscribing here would create a duplicate watch. Assert it does not.
captureListener(watch).onError(new RuntimeException("recoverable"));

Mockito.verify(watch, Mockito.after(500).times(1))
.watch(Mockito.any(ByteSequence.class),
Mockito.any(WatchOption.class),
Mockito.any(Watch.Listener.class));
}

@Test
public void testListenPrefixReWatchPreservesKeyAndPrefix() {
Watch watch = Mockito.mock(Watch.class);
Comment thread
dpol1 marked this conversation as resolved.
EtcdMetaDriver driver = newDriver(watch);

driver.listenPrefix("prefix", response -> { });
captureListener(watch).onCompleted();

ArgumentCaptor<ByteSequence> keyCaptor =
ArgumentCaptor.forClass(ByteSequence.class);
ArgumentCaptor<WatchOption> optionCaptor =
ArgumentCaptor.forClass(WatchOption.class);
Mockito.verify(watch, Mockito.timeout(2000).times(2))
.watch(keyCaptor.capture(), optionCaptor.capture(),
Mockito.any(Watch.Listener.class));

// The re-created watch must still target "prefix" with prefix semantics,
// not silently downgrade to an exact-key watch.
ByteSequence reWatchKey = keyCaptor.getAllValues().get(1);
WatchOption reWatchOption = optionCaptor.getAllValues().get(1);
Assert.assertEquals("prefix",
reWatchKey.toString(Charset.defaultCharset()));
Assert.assertTrue(reWatchOption.isPrefix());
}

@Test
public void testReWatchRetriesWhenFirstAttemptThrows() {
Watch watch = Mockito.mock(Watch.class);
// Initial watch() succeeds, the first re-watch throws, the retry succeeds.
// A single failed re-watch must not abandon recovery permanently.
Mockito.when(watch.watch(Mockito.any(ByteSequence.class),
Mockito.any(WatchOption.class),
Mockito.any(Watch.Listener.class)))
.thenReturn(null)
.thenThrow(new RuntimeException("etcd still unreachable"))
.thenReturn(null);
EtcdMetaDriver driver = newDriver(watch);

driver.listen("k", response -> { });
captureListener(watch).onCompleted();

Mockito.verify(watch, Mockito.timeout(2000).times(3))
.watch(Mockito.any(ByteSequence.class),
Mockito.any(WatchOption.class),
Mockito.any(Watch.Listener.class));
}

@Test
public void testListenDeliversEventsToConsumer() {
Watch watch = Mockito.mock(Watch.class);
AtomicReference<WatchResponse> received = new AtomicReference<>();
EtcdMetaDriver driver = newDriver(watch);

driver.listen("k", received::set);
WatchResponse response = Mockito.mock(WatchResponse.class);
captureListener(watch).onNext(response);

Assert.assertSame(response, received.get());
}

private static EtcdMetaDriver newDriver(Watch watch) {
Client client = Mockito.mock(Client.class);
Mockito.when(client.getWatchClient()).thenReturn(watch);
EtcdMetaDriver driver = new EtcdMetaDriver(client);
// No backoff in tests so the re-subscribe runs promptly.
driver.reWatchDelayMs = 0L;
return driver;
}

private static Watch.Listener captureListener(Watch watch) {
ArgumentCaptor<Watch.Listener> captor =
ArgumentCaptor.forClass(Watch.Listener.class);
Mockito.verify(watch).watch(Mockito.any(ByteSequence.class),
Mockito.any(WatchOption.class),
captor.capture());
return captor.getValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hugegraph.unit;

import org.apache.hugegraph.core.RoleElectionStateMachineTest;
import org.apache.hugegraph.meta.EtcdMetaDriverTest;
import org.apache.hugegraph.meta.MetaManagerSchemaCacheClearEventTest;
import org.apache.hugegraph.traversal.optimize.TraversalUtilOptimizeTest;
import org.apache.hugegraph.unit.api.filter.LoadDetectFilterTest;
Expand Down Expand Up @@ -99,6 +100,7 @@
CacheTest.LevelCacheTest.class,
CachedSchemaTransactionTest.class,
MetaManagerSchemaCacheClearEventTest.class,
EtcdMetaDriverTest.class,
CachedGraphTransactionTest.class,
CacheManagerTest.class,
RamTableTest.class,
Expand Down
Loading