diff --git a/README.md b/README.md
index 698c04289c3..3ceb12bb944 100644
--- a/README.md
+++ b/README.md
@@ -19,7 +19,7 @@ If you are using Maven with [BOM][libraries-bom], add this to your pom.xml file:
com.google.cloud
libraries-bom
- 26.50.0
+ 26.53.0
pom
import
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerCloudMonitoringExporter.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerCloudMonitoringExporter.java
index 9337d04e531..f9c91b91833 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerCloudMonitoringExporter.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerCloudMonitoringExporter.java
@@ -39,6 +39,7 @@
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.io.IOException;
import java.time.Duration;
@@ -49,6 +50,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
/**
@@ -66,7 +68,7 @@ class SpannerCloudMonitoringExporter implements MetricExporter {
// https://cloud.google.com/monitoring/quotas#custom_metrics_quotas.
private static final int EXPORT_BATCH_SIZE_LIMIT = 200;
private final AtomicBoolean spannerExportFailureLogged = new AtomicBoolean(false);
- private CompletableResultCode lastExportCode;
+ private final AtomicBoolean lastExportSkippedData = new AtomicBoolean(false);
private final MetricServiceClient client;
private final String spannerProjectId;
@@ -101,44 +103,49 @@ static SpannerCloudMonitoringExporter create(
}
@Override
- public CompletableResultCode export(Collection collection) {
+ public CompletableResultCode export(@Nonnull Collection collection) {
if (client.isShutdown()) {
logger.log(Level.WARNING, "Exporter is shut down");
return CompletableResultCode.ofFailure();
}
- this.lastExportCode = exportSpannerClientMetrics(collection);
- return lastExportCode;
+ return exportSpannerClientMetrics(collection);
}
/** Export client built in metrics */
private CompletableResultCode exportSpannerClientMetrics(Collection collection) {
- // Filter spanner metrics
+ // Filter spanner metrics. Only include metrics that contain a project and instance ID.
List spannerMetricData =
collection.stream()
.filter(md -> SPANNER_METRICS.contains(md.getName()))
.collect(Collectors.toList());
- // Skips exporting if there's none
- if (spannerMetricData.isEmpty()) {
- return CompletableResultCode.ofSuccess();
- }
-
- // Verifies metrics project id is the same as the spanner project id set on this client
- if (!spannerMetricData.stream()
+ // Log warnings for metrics that will be skipped.
+ boolean mustFilter = false;
+ if (spannerMetricData.stream()
.flatMap(metricData -> metricData.getData().getPoints().stream())
- .allMatch(
- pd -> spannerProjectId.equals(SpannerCloudMonitoringExporterUtils.getProjectId(pd)))) {
- logger.log(Level.WARNING, "Metric data has a different projectId. Skipping export.");
- return CompletableResultCode.ofFailure();
+ .anyMatch(this::shouldSkipPointDataDueToProjectId)) {
+ logger.log(
+ Level.WARNING, "Some metric data contain a different projectId. These will be skipped.");
+ mustFilter = true;
}
-
- // Verifies if metrics data has missing instance id.
if (spannerMetricData.stream()
.flatMap(metricData -> metricData.getData().getPoints().stream())
- .anyMatch(pd -> SpannerCloudMonitoringExporterUtils.getInstanceId(pd) == null)) {
- logger.log(Level.WARNING, "Metric data has missing instanceId. Skipping export.");
- return CompletableResultCode.ofFailure();
+ .anyMatch(this::shouldSkipPointDataDueToMissingInstanceId)) {
+ logger.log(Level.WARNING, "Some metric data miss instanceId. These will be skipped.");
+ mustFilter = true;
+ }
+ if (mustFilter) {
+ spannerMetricData =
+ spannerMetricData.stream()
+ .filter(this::shouldSkipMetricData)
+ .collect(Collectors.toList());
+ }
+ lastExportSkippedData.set(mustFilter);
+
+ // Skips exporting if there's none
+ if (spannerMetricData.isEmpty()) {
+ return CompletableResultCode.ofSuccess();
}
List spannerTimeSeries;
@@ -190,6 +197,26 @@ public void onSuccess(List empty) {
return spannerExportCode;
}
+ private boolean shouldSkipMetricData(MetricData metricData) {
+ return metricData.getData().getPoints().stream()
+ .anyMatch(
+ pd ->
+ shouldSkipPointDataDueToProjectId(pd)
+ || shouldSkipPointDataDueToMissingInstanceId(pd));
+ }
+
+ private boolean shouldSkipPointDataDueToProjectId(PointData pointData) {
+ return !spannerProjectId.equals(SpannerCloudMonitoringExporterUtils.getProjectId(pointData));
+ }
+
+ private boolean shouldSkipPointDataDueToMissingInstanceId(PointData pointData) {
+ return SpannerCloudMonitoringExporterUtils.getInstanceId(pointData) == null;
+ }
+
+ boolean lastExportSkippedData() {
+ return this.lastExportSkippedData.get();
+ }
+
private ApiFuture> exportTimeSeriesInBatch(
ProjectName projectName, List timeSeries) {
List> batchResults = new ArrayList<>();
@@ -233,7 +260,7 @@ public CompletableResultCode shutdown() {
* metric over time.
*/
@Override
- public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) {
+ public AggregationTemporality getAggregationTemporality(@Nonnull InstrumentType instrumentType) {
return AggregationTemporality.CUMULATIVE;
}
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java
index 026f9b4ca9d..e4eec68b278 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java
@@ -28,7 +28,6 @@
import com.google.cloud.spanner.CompositeTracer;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.SpannerRpcMetrics;
-import com.google.common.base.Supplier;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.spanner.admin.database.v1.DatabaseName;
@@ -57,6 +56,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
@@ -120,14 +120,14 @@ public void start(Listener responseListener, Metadata headers) {
getMetricAttributes(key, method.getFullMethodName(), databaseName);
Map builtInMetricsAttributes =
getBuiltInMetricAttributes(key, databaseName);
+ addBuiltInMetricAttributes(compositeTracer, builtInMetricsAttributes);
super.start(
new SimpleForwardingClientCallListener(responseListener) {
@Override
public void onHeaders(Metadata metadata) {
Boolean isDirectPathUsed =
isDirectPathUsed(getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
- addBuiltInMetricAttributes(
- compositeTracer, builtInMetricsAttributes, isDirectPathUsed);
+ addDirectPathUsedAttribute(compositeTracer, isDirectPathUsed);
processHeader(metadata, tagContext, attributes, span);
super.onHeaders(metadata);
}
@@ -241,16 +241,17 @@ private Map getBuiltInMetricAttributes(String key, DatabaseName
}
private void addBuiltInMetricAttributes(
- CompositeTracer compositeTracer,
- Map builtInMetricsAttributes,
- Boolean isDirectPathUsed) {
+ CompositeTracer compositeTracer, Map builtInMetricsAttributes) {
if (compositeTracer != null) {
- // Direct Path used attribute
- Map attributes = new HashMap<>(builtInMetricsAttributes);
- attributes.put(
- BuiltInMetricsConstant.DIRECT_PATH_USED_KEY.getKey(), Boolean.toString(isDirectPathUsed));
+ compositeTracer.addAttributes(builtInMetricsAttributes);
+ }
+ }
- compositeTracer.addAttributes(attributes);
+ private void addDirectPathUsedAttribute(
+ CompositeTracer compositeTracer, Boolean isDirectPathUsed) {
+ if (compositeTracer != null) {
+ compositeTracer.addAttributes(
+ BuiltInMetricsConstant.DIRECT_PATH_USED_KEY.getKey(), Boolean.toString(isDirectPathUsed));
}
}
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java
index 98ee700a943..1b6d99260fe 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java
@@ -18,8 +18,12 @@
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth.assertWithMessage;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
@@ -33,22 +37,20 @@
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Range;
+import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
-import io.opentelemetry.sdk.metrics.data.HistogramPointData;
import io.opentelemetry.sdk.metrics.data.LongPointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
import java.time.Duration;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.junit.After;
@@ -173,20 +175,24 @@ public void testMetricsSingleUseQuery() {
MetricData operationLatencyMetricData =
getMetricData(metricReader, BuiltInMetricsConstant.OPERATION_LATENCIES_NAME);
+ assertNotNull(operationLatencyMetricData);
long operationLatencyValue = getAggregatedValue(operationLatencyMetricData, expectedAttributes);
assertThat(operationLatencyValue).isIn(Range.closed(MIN_LATENCY, elapsed));
MetricData attemptLatencyMetricData =
getMetricData(metricReader, BuiltInMetricsConstant.ATTEMPT_LATENCIES_NAME);
+ assertNotNull(attemptLatencyMetricData);
long attemptLatencyValue = getAggregatedValue(attemptLatencyMetricData, expectedAttributes);
assertThat(attemptLatencyValue).isIn(Range.closed(MIN_LATENCY, elapsed));
MetricData operationCountMetricData =
getMetricData(metricReader, BuiltInMetricsConstant.OPERATION_COUNT_NAME);
+ assertNotNull(operationCountMetricData);
assertThat(getAggregatedValue(operationCountMetricData, expectedAttributes)).isEqualTo(1);
MetricData attemptCountMetricData =
getMetricData(metricReader, BuiltInMetricsConstant.ATTEMPT_COUNT_NAME);
+ assertNotNull(attemptCountMetricData);
assertThat(getAggregatedValue(attemptCountMetricData, expectedAttributes)).isEqualTo(1);
}
@@ -219,6 +225,7 @@ public void testMetricsWithGaxRetryUnaryRpc() {
MetricData attemptCountMetricData =
getMetricData(metricReader, BuiltInMetricsConstant.ATTEMPT_COUNT_NAME);
+ assertNotNull(attemptCountMetricData);
assertThat(getAggregatedValue(attemptCountMetricData, expectedAttributesBeginTransactionOK))
.isEqualTo(1);
// Attempt count should have a failed metric point for Begin Transaction.
@@ -227,6 +234,7 @@ public void testMetricsWithGaxRetryUnaryRpc() {
MetricData operationCountMetricData =
getMetricData(metricReader, BuiltInMetricsConstant.OPERATION_COUNT_NAME);
+ assertNotNull(operationCountMetricData);
assertThat(getAggregatedValue(operationCountMetricData, expectedAttributesBeginTransactionOK))
.isEqualTo(1);
// Operation count should not have a failed metric point for Begin Transaction as overall
@@ -236,9 +244,101 @@ public void testMetricsWithGaxRetryUnaryRpc() {
.isEqualTo(0);
}
+ @Test
+ public void testNoNetworkConnection() {
+ // Create a Spanner instance that tries to connect to a server that does not exist.
+ // This simulates a bad network connection.
+ SpannerOptions.Builder builder = SpannerOptions.newBuilder();
+
+ // Set up the client to fail fast.
+ builder
+ .getSpannerStubSettingsBuilder()
+ .applyToAllUnaryMethods(
+ input -> {
+ // This tells the Spanner client to fail directly if it gets an UNAVAILABLE exception.
+ // The 10-second deadline is chosen to ensure that:
+ // 1. The test fails within a reasonable amount of time if retries for whatever reason
+ // has
+ // been re-enabled.
+ // 2. The timeout is long enough to never be triggered during normal tests.
+ input.setSimpleTimeoutNoRetriesDuration(Duration.ofSeconds(10L));
+ return null;
+ });
+
+ ApiTracerFactory metricsTracerFactory =
+ new MetricsTracerFactory(
+ new OpenTelemetryMetricsRecorder(openTelemetry, BuiltInMetricsConstant.METER_NAME),
+ attributes);
+ Spanner spanner =
+ builder
+ .setProjectId("test-project")
+ .setChannelConfigurator(ManagedChannelBuilder::usePlaintext)
+ .setHost("http://localhost:0")
+ .setCredentials(NoCredentials.getInstance())
+ .setSessionPoolOption(
+ SessionPoolOptions.newBuilder()
+ .setMinSessions(0)
+ .setUseMultiplexedSession(true)
+ .setUseMultiplexedSessionForRW(true)
+ .setSkipVerifyingBeginTransactionForMuxRW(true)
+ .setFailOnSessionLeak()
+ .build())
+ // Setting this to false so that Spanner Options does not register Metrics Tracer
+ // factory again.
+ .setBuiltInMetricsEnabled(false)
+ .setApiTracerFactory(metricsTracerFactory)
+ .build()
+ .getService();
+ String instance = "test-instance";
+ DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("test-project", instance, "d"));
+
+ // Using this client will return UNAVAILABLE, as the server is not reachable and we have
+ // disabled retries.
+ SpannerException exception =
+ assertThrows(
+ SpannerException.class, () -> client.singleUse().executeQuery(SELECT_RANDOM).next());
+ assertEquals(ErrorCode.UNAVAILABLE, exception.getErrorCode());
+
+ Attributes expectedAttributesCreateSessionOK =
+ expectedBaseAttributes
+ .toBuilder()
+ .put(BuiltInMetricsConstant.STATUS_KEY, "OK")
+ .put(BuiltInMetricsConstant.METHOD_KEY, "Spanner.CreateSession")
+ // Include the additional attributes that are added by the HeaderInterceptor in the
+ // filter. Note that the DIRECT_PATH_USED attribute is not added, as the request never
+ // leaves the client.
+ .put(BuiltInMetricsConstant.INSTANCE_ID_KEY, instance)
+ .put(BuiltInMetricsConstant.DATABASE_KEY, "d")
+ .put(BuiltInMetricsConstant.DIRECT_PATH_ENABLED_KEY, "false")
+ .build();
+
+ Attributes expectedAttributesCreateSessionFailed =
+ expectedBaseAttributes
+ .toBuilder()
+ .put(BuiltInMetricsConstant.STATUS_KEY, "UNAVAILABLE")
+ .put(BuiltInMetricsConstant.METHOD_KEY, "Spanner.CreateSession")
+ // Include the additional attributes that are added by the HeaderInterceptor in the
+ // filter. Note that the DIRECT_PATH_USED attribute is not added, as the request never
+ // leaves the client.
+ .put(BuiltInMetricsConstant.INSTANCE_ID_KEY, instance)
+ .put(BuiltInMetricsConstant.DATABASE_KEY, "d")
+ .put(BuiltInMetricsConstant.DIRECT_PATH_ENABLED_KEY, "false")
+ .build();
+
+ MetricData attemptCountMetricData =
+ getMetricData(metricReader, BuiltInMetricsConstant.ATTEMPT_COUNT_NAME);
+ assertNotNull(attemptCountMetricData);
+
+ // Attempt count should have a failed metric point for CreateSession.
+ assertEquals(
+ 1, getAggregatedValue(attemptCountMetricData, expectedAttributesCreateSessionFailed));
+ // There should be no OK metric points for CreateSession.
+ assertEquals(0, getAggregatedValue(attemptCountMetricData, expectedAttributesCreateSessionOK));
+ }
+
private MetricData getMetricData(InMemoryMetricReader reader, String metricName) {
String fullMetricName = BuiltInMetricsConstant.METER_NAME + "/" + metricName;
- Collection allMetricData = Collections.emptyList();
+ Collection allMetricData;
// Fetch the MetricData with retries
for (int attemptsLeft = 1000; attemptsLeft > 0; attemptsLeft--) {
@@ -265,28 +365,24 @@ private MetricData getMetricData(InMemoryMetricReader reader, String metricName)
}
}
- assertTrue(String.format("MetricData is missing for metric {0}", fullMetricName), false);
+ fail(String.format("MetricData is missing for metric %s", fullMetricName));
return null;
}
private long getAggregatedValue(MetricData metricData, Attributes attributes) {
switch (metricData.getType()) {
case HISTOGRAM:
- Optional hd =
- metricData.getHistogramData().getPoints().stream()
- .filter(pd -> pd.getAttributes().equals(attributes))
- .collect(Collectors.toList())
- .stream()
- .findFirst();
- return hd.isPresent() ? (long) hd.get().getSum() / hd.get().getCount() : 0;
+ return metricData.getHistogramData().getPoints().stream()
+ .filter(pd -> pd.getAttributes().equals(attributes))
+ .map(data -> (long) data.getSum() / data.getCount())
+ .findFirst()
+ .orElse(0L);
case LONG_SUM:
- Optional ld =
- metricData.getLongSumData().getPoints().stream()
- .filter(pd -> pd.getAttributes().equals(attributes))
- .collect(Collectors.toList())
- .stream()
- .findFirst();
- return ld.isPresent() ? ld.get().getValue() : 0;
+ return metricData.getLongSumData().getPoints().stream()
+ .filter(pd -> pd.getAttributes().equals(attributes))
+ .map(LongPointData::getValue)
+ .findFirst()
+ .orElse(0L);
default:
return 0;
}
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerCloudMonitoringExporterTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerCloudMonitoringExporterTest.java
index ab30de1ade0..f9a6e9df9a6 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerCloudMonitoringExporterTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerCloudMonitoringExporterTest.java
@@ -31,6 +31,8 @@
import static com.google.cloud.spanner.BuiltInMetricsConstant.OPERATION_LATENCIES_NAME;
import static com.google.cloud.spanner.BuiltInMetricsConstant.PROJECT_ID_KEY;
import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -62,6 +64,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@@ -146,7 +149,8 @@ public void testExportingSumData() {
ImmutableSumData.create(
true, AggregationTemporality.CUMULATIVE, ImmutableList.of(longPointData)));
- exporter.export(Arrays.asList(longData));
+ exporter.export(Collections.singletonList(longData));
+ assertFalse(exporter.lastExportSkippedData());
CreateTimeSeriesRequest request = argumentCaptor.getValue();
@@ -204,7 +208,7 @@ public void testExportingHistogramData() {
1d, // min
true,
2d, // max
- Arrays.asList(1.0),
+ Collections.singletonList(1.0),
Arrays.asList(1L, 2L));
MetricData histogramData =
@@ -217,7 +221,8 @@ public void testExportingHistogramData() {
ImmutableHistogramData.create(
AggregationTemporality.CUMULATIVE, ImmutableList.of(histogramPointData)));
- exporter.export(Arrays.asList(histogramData));
+ exporter.export(Collections.singletonList(histogramData));
+ assertFalse(exporter.lastExportSkippedData());
CreateTimeSeriesRequest request = argumentCaptor.getValue();
@@ -295,6 +300,7 @@ public void testExportingSumDataInBatches() {
assertThat(firstRequest.getTimeSeriesList()).hasSize(200);
assertThat(secondRequest.getTimeSeriesList()).hasSize(50);
+ assertFalse(exporter.lastExportSkippedData());
for (int i = 0; i < 250; i++) {
TimeSeries timeSeries;
@@ -388,7 +394,8 @@ public void testSkipExportingDataIfMissingInstanceId() throws IOException {
CompletableResultCode resultCode =
exporter.export(Arrays.asList(operationLongData, attemptLongData));
- assertThat(resultCode).isEqualTo(CompletableResultCode.ofFailure());
+ assertTrue(resultCode.isSuccess());
+ assertTrue(exporter.lastExportSkippedData());
}
private static class FakeMetricServiceClient extends MetricServiceClient {