diff --git a/README.md b/README.md index 698c04289c3..3ceb12bb944 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ If you are using Maven with [BOM][libraries-bom], add this to your pom.xml file: com.google.cloud libraries-bom - 26.50.0 + 26.53.0 pom import diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerCloudMonitoringExporter.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerCloudMonitoringExporter.java index 9337d04e531..f9c91b91833 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerCloudMonitoringExporter.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerCloudMonitoringExporter.java @@ -39,6 +39,7 @@ import io.opentelemetry.sdk.metrics.InstrumentType; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.PointData; import io.opentelemetry.sdk.metrics.export.MetricExporter; import java.io.IOException; import java.time.Duration; @@ -49,6 +50,7 @@ import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; +import javax.annotation.Nonnull; import javax.annotation.Nullable; /** @@ -66,7 +68,7 @@ class SpannerCloudMonitoringExporter implements MetricExporter { // https://cloud.google.com/monitoring/quotas#custom_metrics_quotas. private static final int EXPORT_BATCH_SIZE_LIMIT = 200; private final AtomicBoolean spannerExportFailureLogged = new AtomicBoolean(false); - private CompletableResultCode lastExportCode; + private final AtomicBoolean lastExportSkippedData = new AtomicBoolean(false); private final MetricServiceClient client; private final String spannerProjectId; @@ -101,44 +103,49 @@ static SpannerCloudMonitoringExporter create( } @Override - public CompletableResultCode export(Collection collection) { + public CompletableResultCode export(@Nonnull Collection collection) { if (client.isShutdown()) { logger.log(Level.WARNING, "Exporter is shut down"); return CompletableResultCode.ofFailure(); } - this.lastExportCode = exportSpannerClientMetrics(collection); - return lastExportCode; + return exportSpannerClientMetrics(collection); } /** Export client built in metrics */ private CompletableResultCode exportSpannerClientMetrics(Collection collection) { - // Filter spanner metrics + // Filter spanner metrics. Only include metrics that contain a project and instance ID. List spannerMetricData = collection.stream() .filter(md -> SPANNER_METRICS.contains(md.getName())) .collect(Collectors.toList()); - // Skips exporting if there's none - if (spannerMetricData.isEmpty()) { - return CompletableResultCode.ofSuccess(); - } - - // Verifies metrics project id is the same as the spanner project id set on this client - if (!spannerMetricData.stream() + // Log warnings for metrics that will be skipped. + boolean mustFilter = false; + if (spannerMetricData.stream() .flatMap(metricData -> metricData.getData().getPoints().stream()) - .allMatch( - pd -> spannerProjectId.equals(SpannerCloudMonitoringExporterUtils.getProjectId(pd)))) { - logger.log(Level.WARNING, "Metric data has a different projectId. Skipping export."); - return CompletableResultCode.ofFailure(); + .anyMatch(this::shouldSkipPointDataDueToProjectId)) { + logger.log( + Level.WARNING, "Some metric data contain a different projectId. These will be skipped."); + mustFilter = true; } - - // Verifies if metrics data has missing instance id. if (spannerMetricData.stream() .flatMap(metricData -> metricData.getData().getPoints().stream()) - .anyMatch(pd -> SpannerCloudMonitoringExporterUtils.getInstanceId(pd) == null)) { - logger.log(Level.WARNING, "Metric data has missing instanceId. Skipping export."); - return CompletableResultCode.ofFailure(); + .anyMatch(this::shouldSkipPointDataDueToMissingInstanceId)) { + logger.log(Level.WARNING, "Some metric data miss instanceId. These will be skipped."); + mustFilter = true; + } + if (mustFilter) { + spannerMetricData = + spannerMetricData.stream() + .filter(this::shouldSkipMetricData) + .collect(Collectors.toList()); + } + lastExportSkippedData.set(mustFilter); + + // Skips exporting if there's none + if (spannerMetricData.isEmpty()) { + return CompletableResultCode.ofSuccess(); } List spannerTimeSeries; @@ -190,6 +197,26 @@ public void onSuccess(List empty) { return spannerExportCode; } + private boolean shouldSkipMetricData(MetricData metricData) { + return metricData.getData().getPoints().stream() + .anyMatch( + pd -> + shouldSkipPointDataDueToProjectId(pd) + || shouldSkipPointDataDueToMissingInstanceId(pd)); + } + + private boolean shouldSkipPointDataDueToProjectId(PointData pointData) { + return !spannerProjectId.equals(SpannerCloudMonitoringExporterUtils.getProjectId(pointData)); + } + + private boolean shouldSkipPointDataDueToMissingInstanceId(PointData pointData) { + return SpannerCloudMonitoringExporterUtils.getInstanceId(pointData) == null; + } + + boolean lastExportSkippedData() { + return this.lastExportSkippedData.get(); + } + private ApiFuture> exportTimeSeriesInBatch( ProjectName projectName, List timeSeries) { List> batchResults = new ArrayList<>(); @@ -233,7 +260,7 @@ public CompletableResultCode shutdown() { * metric over time. */ @Override - public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) { + public AggregationTemporality getAggregationTemporality(@Nonnull InstrumentType instrumentType) { return AggregationTemporality.CUMULATIVE; } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java index 026f9b4ca9d..e4eec68b278 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java @@ -28,7 +28,6 @@ import com.google.cloud.spanner.CompositeTracer; import com.google.cloud.spanner.SpannerExceptionFactory; import com.google.cloud.spanner.SpannerRpcMetrics; -import com.google.common.base.Supplier; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.spanner.admin.database.v1.DatabaseName; @@ -57,6 +56,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; import java.util.regex.Matcher; @@ -120,14 +120,14 @@ public void start(Listener responseListener, Metadata headers) { getMetricAttributes(key, method.getFullMethodName(), databaseName); Map builtInMetricsAttributes = getBuiltInMetricAttributes(key, databaseName); + addBuiltInMetricAttributes(compositeTracer, builtInMetricsAttributes); super.start( new SimpleForwardingClientCallListener(responseListener) { @Override public void onHeaders(Metadata metadata) { Boolean isDirectPathUsed = isDirectPathUsed(getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR)); - addBuiltInMetricAttributes( - compositeTracer, builtInMetricsAttributes, isDirectPathUsed); + addDirectPathUsedAttribute(compositeTracer, isDirectPathUsed); processHeader(metadata, tagContext, attributes, span); super.onHeaders(metadata); } @@ -241,16 +241,17 @@ private Map getBuiltInMetricAttributes(String key, DatabaseName } private void addBuiltInMetricAttributes( - CompositeTracer compositeTracer, - Map builtInMetricsAttributes, - Boolean isDirectPathUsed) { + CompositeTracer compositeTracer, Map builtInMetricsAttributes) { if (compositeTracer != null) { - // Direct Path used attribute - Map attributes = new HashMap<>(builtInMetricsAttributes); - attributes.put( - BuiltInMetricsConstant.DIRECT_PATH_USED_KEY.getKey(), Boolean.toString(isDirectPathUsed)); + compositeTracer.addAttributes(builtInMetricsAttributes); + } + } - compositeTracer.addAttributes(attributes); + private void addDirectPathUsedAttribute( + CompositeTracer compositeTracer, Boolean isDirectPathUsed) { + if (compositeTracer != null) { + compositeTracer.addAttributes( + BuiltInMetricsConstant.DIRECT_PATH_USED_KEY.getKey(), Boolean.toString(isDirectPathUsed)); } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java index 98ee700a943..1b6d99260fe 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java @@ -18,8 +18,12 @@ import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertWithMessage; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import com.google.api.gax.longrunning.OperationTimedPollAlgorithm; import com.google.api.gax.retrying.RetrySettings; @@ -33,22 +37,20 @@ import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; import com.google.common.collect.Range; +import io.grpc.ManagedChannelBuilder; import io.grpc.Status; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.sdk.OpenTelemetrySdk; import io.opentelemetry.sdk.metrics.SdkMeterProvider; import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder; -import io.opentelemetry.sdk.metrics.data.HistogramPointData; import io.opentelemetry.sdk.metrics.data.LongPointData; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import java.time.Duration; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.junit.After; @@ -173,20 +175,24 @@ public void testMetricsSingleUseQuery() { MetricData operationLatencyMetricData = getMetricData(metricReader, BuiltInMetricsConstant.OPERATION_LATENCIES_NAME); + assertNotNull(operationLatencyMetricData); long operationLatencyValue = getAggregatedValue(operationLatencyMetricData, expectedAttributes); assertThat(operationLatencyValue).isIn(Range.closed(MIN_LATENCY, elapsed)); MetricData attemptLatencyMetricData = getMetricData(metricReader, BuiltInMetricsConstant.ATTEMPT_LATENCIES_NAME); + assertNotNull(attemptLatencyMetricData); long attemptLatencyValue = getAggregatedValue(attemptLatencyMetricData, expectedAttributes); assertThat(attemptLatencyValue).isIn(Range.closed(MIN_LATENCY, elapsed)); MetricData operationCountMetricData = getMetricData(metricReader, BuiltInMetricsConstant.OPERATION_COUNT_NAME); + assertNotNull(operationCountMetricData); assertThat(getAggregatedValue(operationCountMetricData, expectedAttributes)).isEqualTo(1); MetricData attemptCountMetricData = getMetricData(metricReader, BuiltInMetricsConstant.ATTEMPT_COUNT_NAME); + assertNotNull(attemptCountMetricData); assertThat(getAggregatedValue(attemptCountMetricData, expectedAttributes)).isEqualTo(1); } @@ -219,6 +225,7 @@ public void testMetricsWithGaxRetryUnaryRpc() { MetricData attemptCountMetricData = getMetricData(metricReader, BuiltInMetricsConstant.ATTEMPT_COUNT_NAME); + assertNotNull(attemptCountMetricData); assertThat(getAggregatedValue(attemptCountMetricData, expectedAttributesBeginTransactionOK)) .isEqualTo(1); // Attempt count should have a failed metric point for Begin Transaction. @@ -227,6 +234,7 @@ public void testMetricsWithGaxRetryUnaryRpc() { MetricData operationCountMetricData = getMetricData(metricReader, BuiltInMetricsConstant.OPERATION_COUNT_NAME); + assertNotNull(operationCountMetricData); assertThat(getAggregatedValue(operationCountMetricData, expectedAttributesBeginTransactionOK)) .isEqualTo(1); // Operation count should not have a failed metric point for Begin Transaction as overall @@ -236,9 +244,101 @@ public void testMetricsWithGaxRetryUnaryRpc() { .isEqualTo(0); } + @Test + public void testNoNetworkConnection() { + // Create a Spanner instance that tries to connect to a server that does not exist. + // This simulates a bad network connection. + SpannerOptions.Builder builder = SpannerOptions.newBuilder(); + + // Set up the client to fail fast. + builder + .getSpannerStubSettingsBuilder() + .applyToAllUnaryMethods( + input -> { + // This tells the Spanner client to fail directly if it gets an UNAVAILABLE exception. + // The 10-second deadline is chosen to ensure that: + // 1. The test fails within a reasonable amount of time if retries for whatever reason + // has + // been re-enabled. + // 2. The timeout is long enough to never be triggered during normal tests. + input.setSimpleTimeoutNoRetriesDuration(Duration.ofSeconds(10L)); + return null; + }); + + ApiTracerFactory metricsTracerFactory = + new MetricsTracerFactory( + new OpenTelemetryMetricsRecorder(openTelemetry, BuiltInMetricsConstant.METER_NAME), + attributes); + Spanner spanner = + builder + .setProjectId("test-project") + .setChannelConfigurator(ManagedChannelBuilder::usePlaintext) + .setHost("http://localhost:0") + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setMinSessions(0) + .setUseMultiplexedSession(true) + .setUseMultiplexedSessionForRW(true) + .setSkipVerifyingBeginTransactionForMuxRW(true) + .setFailOnSessionLeak() + .build()) + // Setting this to false so that Spanner Options does not register Metrics Tracer + // factory again. + .setBuiltInMetricsEnabled(false) + .setApiTracerFactory(metricsTracerFactory) + .build() + .getService(); + String instance = "test-instance"; + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("test-project", instance, "d")); + + // Using this client will return UNAVAILABLE, as the server is not reachable and we have + // disabled retries. + SpannerException exception = + assertThrows( + SpannerException.class, () -> client.singleUse().executeQuery(SELECT_RANDOM).next()); + assertEquals(ErrorCode.UNAVAILABLE, exception.getErrorCode()); + + Attributes expectedAttributesCreateSessionOK = + expectedBaseAttributes + .toBuilder() + .put(BuiltInMetricsConstant.STATUS_KEY, "OK") + .put(BuiltInMetricsConstant.METHOD_KEY, "Spanner.CreateSession") + // Include the additional attributes that are added by the HeaderInterceptor in the + // filter. Note that the DIRECT_PATH_USED attribute is not added, as the request never + // leaves the client. + .put(BuiltInMetricsConstant.INSTANCE_ID_KEY, instance) + .put(BuiltInMetricsConstant.DATABASE_KEY, "d") + .put(BuiltInMetricsConstant.DIRECT_PATH_ENABLED_KEY, "false") + .build(); + + Attributes expectedAttributesCreateSessionFailed = + expectedBaseAttributes + .toBuilder() + .put(BuiltInMetricsConstant.STATUS_KEY, "UNAVAILABLE") + .put(BuiltInMetricsConstant.METHOD_KEY, "Spanner.CreateSession") + // Include the additional attributes that are added by the HeaderInterceptor in the + // filter. Note that the DIRECT_PATH_USED attribute is not added, as the request never + // leaves the client. + .put(BuiltInMetricsConstant.INSTANCE_ID_KEY, instance) + .put(BuiltInMetricsConstant.DATABASE_KEY, "d") + .put(BuiltInMetricsConstant.DIRECT_PATH_ENABLED_KEY, "false") + .build(); + + MetricData attemptCountMetricData = + getMetricData(metricReader, BuiltInMetricsConstant.ATTEMPT_COUNT_NAME); + assertNotNull(attemptCountMetricData); + + // Attempt count should have a failed metric point for CreateSession. + assertEquals( + 1, getAggregatedValue(attemptCountMetricData, expectedAttributesCreateSessionFailed)); + // There should be no OK metric points for CreateSession. + assertEquals(0, getAggregatedValue(attemptCountMetricData, expectedAttributesCreateSessionOK)); + } + private MetricData getMetricData(InMemoryMetricReader reader, String metricName) { String fullMetricName = BuiltInMetricsConstant.METER_NAME + "/" + metricName; - Collection allMetricData = Collections.emptyList(); + Collection allMetricData; // Fetch the MetricData with retries for (int attemptsLeft = 1000; attemptsLeft > 0; attemptsLeft--) { @@ -265,28 +365,24 @@ private MetricData getMetricData(InMemoryMetricReader reader, String metricName) } } - assertTrue(String.format("MetricData is missing for metric {0}", fullMetricName), false); + fail(String.format("MetricData is missing for metric %s", fullMetricName)); return null; } private long getAggregatedValue(MetricData metricData, Attributes attributes) { switch (metricData.getType()) { case HISTOGRAM: - Optional hd = - metricData.getHistogramData().getPoints().stream() - .filter(pd -> pd.getAttributes().equals(attributes)) - .collect(Collectors.toList()) - .stream() - .findFirst(); - return hd.isPresent() ? (long) hd.get().getSum() / hd.get().getCount() : 0; + return metricData.getHistogramData().getPoints().stream() + .filter(pd -> pd.getAttributes().equals(attributes)) + .map(data -> (long) data.getSum() / data.getCount()) + .findFirst() + .orElse(0L); case LONG_SUM: - Optional ld = - metricData.getLongSumData().getPoints().stream() - .filter(pd -> pd.getAttributes().equals(attributes)) - .collect(Collectors.toList()) - .stream() - .findFirst(); - return ld.isPresent() ? ld.get().getValue() : 0; + return metricData.getLongSumData().getPoints().stream() + .filter(pd -> pd.getAttributes().equals(attributes)) + .map(LongPointData::getValue) + .findFirst() + .orElse(0L); default: return 0; } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerCloudMonitoringExporterTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerCloudMonitoringExporterTest.java index ab30de1ade0..f9a6e9df9a6 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerCloudMonitoringExporterTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerCloudMonitoringExporterTest.java @@ -31,6 +31,8 @@ import static com.google.cloud.spanner.BuiltInMetricsConstant.OPERATION_LATENCIES_NAME; import static com.google.cloud.spanner.BuiltInMetricsConstant.PROJECT_ID_KEY; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -62,6 +64,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -146,7 +149,8 @@ public void testExportingSumData() { ImmutableSumData.create( true, AggregationTemporality.CUMULATIVE, ImmutableList.of(longPointData))); - exporter.export(Arrays.asList(longData)); + exporter.export(Collections.singletonList(longData)); + assertFalse(exporter.lastExportSkippedData()); CreateTimeSeriesRequest request = argumentCaptor.getValue(); @@ -204,7 +208,7 @@ public void testExportingHistogramData() { 1d, // min true, 2d, // max - Arrays.asList(1.0), + Collections.singletonList(1.0), Arrays.asList(1L, 2L)); MetricData histogramData = @@ -217,7 +221,8 @@ public void testExportingHistogramData() { ImmutableHistogramData.create( AggregationTemporality.CUMULATIVE, ImmutableList.of(histogramPointData))); - exporter.export(Arrays.asList(histogramData)); + exporter.export(Collections.singletonList(histogramData)); + assertFalse(exporter.lastExportSkippedData()); CreateTimeSeriesRequest request = argumentCaptor.getValue(); @@ -295,6 +300,7 @@ public void testExportingSumDataInBatches() { assertThat(firstRequest.getTimeSeriesList()).hasSize(200); assertThat(secondRequest.getTimeSeriesList()).hasSize(50); + assertFalse(exporter.lastExportSkippedData()); for (int i = 0; i < 250; i++) { TimeSeries timeSeries; @@ -388,7 +394,8 @@ public void testSkipExportingDataIfMissingInstanceId() throws IOException { CompletableResultCode resultCode = exporter.export(Arrays.asList(operationLongData, attemptLongData)); - assertThat(resultCode).isEqualTo(CompletableResultCode.ofFailure()); + assertTrue(resultCode.isSuccess()); + assertTrue(exporter.lastExportSkippedData()); } private static class FakeMetricServiceClient extends MetricServiceClient {