Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 7 additions & 10 deletions obp-api/src/main/protobuf/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -218,16 +218,13 @@ message AccountsBalancesV310JsonGrpc {
}

service ObpService {
// Sends a greeting
rpc getBanks(google.protobuf.Empty) returns (BanksJson400Grpc) {}

//Returns the list of accounts at BANK_ID that the user has access to.
//For each account the API returns the account ID and the available views.
rpc getPrivateAccountsAtOneBank(BankIdUserIdGrpc) returns (AccountsGrpc) {}

//Get the Balances for the Accounts of the current User at one bank.
rpc getBankAccountsBalances(BankIdGrpc) returns (AccountsBalancesV310JsonGrpc) {}

//Returns transactions list (Core info) of the account specified by ACCOUNT_ID.
rpc getCoreTransactionsForBankAccount(BankIdAccountIdAndUserIdGrpc) returns (CoreTransactionsJsonV300Grpc) {}
// Temporarily disabled — see ApiProto.scala javaDescriptor filter and
// ObpServiceGrpc.scala bindService. Re-enable by un-commenting here and
// in those files.
//
//rpc getPrivateAccountsAtOneBank(BankIdUserIdGrpc) returns (AccountsGrpc) {}
//rpc getBankAccountsBalances(BankIdGrpc) returns (AccountsBalancesV310JsonGrpc) {}
//rpc getCoreTransactionsForBankAccount(BankIdAccountIdAndUserIdGrpc) returns (CoreTransactionsJsonV300Grpc) {}
}
41 changes: 41 additions & 0 deletions obp-api/src/main/protobuf/log_cache.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
syntax = "proto3";
package code.obp.grpc.logcache.g1;

import "google/protobuf/timestamp.proto";

// Log level. Wire format: varint int32. Mirrors RedisLogger.LogLevel.
// ALL is the aggregate firehose — gated by canGetSystemLogCacheAll entitlement.
enum LogLevel {
LOG_LEVEL_UNSPECIFIED = 0;
TRACE = 1;
DEBUG = 2;
INFO = 3;
WARNING = 4;
ERROR = 5;
ALL = 6;
}

message StreamLogCacheRequest {
LogLevel level = 1;
}

message LogCacheEntry {
LogLevel level = 1;
string message = 2;
google.protobuf.Timestamp timestamp = 3;
// Identifies which OBP instance (pod) emitted this entry. Sourced from
// the `api_instance_id` prop — either a configured value suffixed with a
// per-JVM UUID, or a pure UUID if the prop is unset. See
// code.api.constant.Constant.ApiInstanceId.
string api_instance_id = 4;
}

// Live tail of the Redis-backed log cache.
// History is served by the REST endpoints GET /system/log-cache/{level};
// this service delivers only new entries via Redis pub/sub channels.
//
// Per-level channels: subscribing to TRACE only delivers TRACE; ALL is a
// separate firehose requiring canGetSystemLogCacheAll.
service LogCacheStreamService {
rpc StreamLogCacheEntries(StreamLogCacheRequest) returns (stream LogCacheEntry);
}
52 changes: 52 additions & 0 deletions obp-api/src/main/protobuf/metrics_stream.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
syntax = "proto3";
package code.obp.grpc.metricsstream.g1;

// Server-side filters. Empty string = no filter on that field.
// Filters AND together: passing consumer_id + verb = events matching BOTH.
// url_substring matches if the event's url contains the given substring.
message StreamMetricsRequest {
string consumer_id = 1;
string user_id = 2;
string verb = 3;
string url_substring = 4;
string implemented_by_partial_function = 5;
string app_name = 6;
}

// Per-REST-call metric record, mirrors APIMetrics.saveMetric args and
// MetricJsonV600 (REST v6.0.0). Field names track REST v6.0.0 verbatim.
//
// `response_body` is intentionally omitted — can be large and contain PII;
// fetch via the REST /management/metrics endpoint if needed.
message MetricEvent {
string url = 1;
// ISO-8601 UTC, seconds precision — matches REST v6.0.0 (yyyy-MM-dd'T'HH:mm:ss'Z').
string date = 2;
int64 duration = 3;
string user_id = 4;
string username = 5;
string app_name = 6;
string developer_email = 7;
string consumer_id = 8;
string implemented_by_partial_function = 9;
string implemented_in_version = 10;
string verb = 11;
int32 status_code = 12;
string correlation_id = 13;
string source_ip = 14;
string target_ip = 15;
// Identifies which OBP instance (pod) served this request. Sourced from
// the `api_instance_id` prop — either a configured value suffixed with a
// per-JVM UUID, or a pure UUID if the prop is unset. See
// code.api.Constant.ApiInstanceId.
string api_instance_id = 16;
// OBP operation id, e.g. "OBPv6.0.0-getBanks". Matches MetricJsonV600.operation_id.
string operation_id = 17;
}

// Live tail of API metrics as they are written.
// History is served by the REST endpoint GET /management/metrics; this
// service delivers only new metrics via a Redis pub/sub channel.
service MetricsStreamService {
rpc StreamMetrics(StreamMetricsRequest) returns (stream MetricEvent);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3191,7 +3191,9 @@ object SwaggerDefinitionsJSON {
source_ip = ExampleValue.ipAddressExample.value,
target_ip = ExampleValue.ipAddressExample.value,
response_body = json.parse("""{"code":401,"message":"OBP-20001: User not logged in. Authentication is required!"}"""),
operation_id = "OBPv4.0.0-getBanks"
status_code = 401,
operation_id = "OBPv4.0.0-getBanks",
api_instance_id = "obp_node_a"
)
lazy val metricsJsonV600 = MetricsJsonV600(
metrics = List(metricJsonV600)
Expand Down
30 changes: 24 additions & 6 deletions obp-api/src/main/scala/code/api/cache/RedisLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package code.api.cache

import code.api.util.ApiRole._
import code.api.util.{APIUtil, ApiRole}
import code.logcache.LogCacheEventBus

import net.liftweb.common.{Box, Empty, Failure => LiftFailure, Full, Logger}
import net.liftweb.json
import net.liftweb.json.Serialization.write
import redis.clients.jedis.{Jedis, Pipeline}

import java.util.concurrent.{Executors, ScheduledThreadPoolExecutor, TimeUnit}
Expand Down Expand Up @@ -108,10 +111,7 @@ object RedisLogger {
while (attempt < maxRetries) {
try {
withPipeline { pipeline =>
// log to requested level
configs.get(level).foreach(cfg => pushLog(pipeline, cfg, message))
// also log to ALL
configs.get(LogLevel.ALL).foreach(cfg => pushLog(pipeline, cfg, s"[$level] $message"))
writeAndPublish(pipeline, level, message)
pipeline.sync()
}

Expand Down Expand Up @@ -212,8 +212,7 @@ object RedisLogger {
try {
withPipeline { pipeline =>
entriesToFlush.asScala.foreach { logEntry =>
configs.get(logEntry.level).foreach(cfg => pushLog(pipeline, cfg, logEntry.message))
configs.get(LogLevel.ALL).foreach(cfg => pushLog(pipeline, cfg, s"[${logEntry.level}] ${logEntry.message}"))
writeAndPublish(pipeline, logEntry.level, logEntry.message)
}
pipeline.sync()
}
Expand Down Expand Up @@ -269,6 +268,25 @@ object RedisLogger {
}
}

private implicit val streamFormats = json.DefaultFormats

/**
* Write a log entry to the REST-facing Redis lists (level queue + ALL queue)
* and publish it once to the gRPC stream bus. All commands go through the
* caller's pipeline so the whole thing is one Redis round-trip.
*/
private def writeAndPublish(pipeline: Pipeline, level: LogLevel.LogLevel, message: String): Unit = {
configs.get(level).foreach(cfg => pushLog(pipeline, cfg, message))
configs.get(LogLevel.ALL).foreach(cfg => pushLog(pipeline, cfg, s"[$level] $message"))
val payload = write(Map(
"level" -> level.toString,
"message" -> message,
"ts" -> System.currentTimeMillis(),
"api_instance_id" -> code.api.Constant.ApiInstanceId
))
LogCacheEventBus.publishInPipeline(pipeline, level, payload)
}

case class LogTailEntry(level: String, message: String)
case class LogTail(entries: List[LogTailEntry])

Expand Down
79 changes: 75 additions & 4 deletions obp-api/src/main/scala/code/api/util/WriteMetricUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,20 @@
import code.api.util.APIUtil.{ResourceDoc, buildOperationId, getCorrelationId, getPropsAsBoolValue, getPropsValue, hasAnOAuthHeader, hasDirectLoginHeader}
import code.api.util.ErrorMessages.attemptedToOpenAnEmptyBox
import code.metrics.APIMetrics
import code.metricsstream.MetricsEventBus
import code.model.Consumer
import code.util.Helper.{MdcLoggable, ObpS}
import com.openbankproject.commons.model.User
import net.liftweb.common.{Box, Empty, Full}
import net.liftweb.http.S
import net.liftweb.util.TimeHelpers.TimeSpan

import java.util.Date
import scala.collection.immutable
import scala.concurrent.Future
import com.openbankproject.commons.ExecutionContext.Implicits.global
import net.liftweb.json.{Extraction, JValue, compactRender}
import net.liftweb.json.Serialization.write

object WriteMetricUtil extends MdcLoggable {

Expand Down Expand Up @@ -61,6 +64,8 @@
val appName = cc.appName.orNull
val developerEmail = cc.developerEmail.orNull

val sourceIp = cc.requestHeaders.find(_.name.toLowerCase() == "x-forwarded-for").map(_.values.mkString(",")).getOrElse("")
val targetIp = cc.requestHeaders.find(_.name.toLowerCase() == "x-forwarded-host").map(_.values.mkString(",")).getOrElse("")
APIMetrics.apiMetrics.vend.saveMetric(
userId,
cc.url,
Expand All @@ -76,9 +81,13 @@
cc.httpCode,
cc.correlationId,
responseBodyToWrite,
cc.requestHeaders.find(_.name.toLowerCase() == "x-forwarded-for").map(_.values.mkString(",")).getOrElse(""),
cc.requestHeaders.find(_.name.toLowerCase() == "x-forwarded-host").map(_.values.mkString(",")).getOrElse("")
sourceIp,
targetIp,
code.api.Constant.ApiInstanceId
)
publishMetricEvent(userId, cc.url, cc.startTime.getOrElse(null), duration, userName, appName,
developerEmail, consumerId, implementedByPartialFunction, cc.implementedInVersion, cc.verb,
cc.httpCode, cc.correlationId, sourceIp, targetIp, cc.operationId.getOrElse(""))
}
}
case _ =>
Expand Down Expand Up @@ -144,6 +153,9 @@
val correlationId = getCorrelationId()
val reqHeaders = S.request.openOrThrowException(attemptedToOpenAnEmptyBox).request.headers

val sourceIp = reqHeaders.find(_.name.toLowerCase() == "x-forwarded-for").map(_.values.mkString(",")).getOrElse("")
val targetIp = reqHeaders.find(_.name.toLowerCase() == "x-forwarded-host").map(_.values.mkString(",")).getOrElse("")

//execute saveMetric in future, as we do not need to know result of operation
Future {
APIMetrics.apiMetrics.vend.saveMetric(
Expand All @@ -161,12 +173,71 @@
None,
correlationId,
"Not enabled for old style endpoints",
reqHeaders.find(_.name.toLowerCase() == "x-forwarded-for").map(_.values.mkString(",")).getOrElse(""),
reqHeaders.find(_.name.toLowerCase() == "x-forwarded-host").map(_.values.mkString(",")).getOrElse("")
sourceIp,
targetIp,
code.api.Constant.ApiInstanceId
)
publishMetricEvent(userId, url, date, duration, userName, appName, developerEmail, consumerId,
implementedByPartialFunction, implementedInVersion, verb, None, correlationId, sourceIp, targetIp,
rd.map(_.operationId).getOrElse(""))
}

}
}

private val metricFormats = net.liftweb.json.DefaultFormats

/**
* Publish a metric event to the gRPC pub/sub channel. No-op when the
* stream service is disabled. Safe to call from the metric-write Future
* — exceptions are swallowed so the REST path is never affected.
*/
private def publishMetricEvent(userId: String,

Check warning on line 195 in obp-api/src/main/scala/code/api/util/WriteMetricUtil.scala

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

This function has 16 parameters, which is greater than the 7 authorized.

See more on https://sonarcloud.io/project/issues?id=OpenBankProject_OBP-API&issues=AZ2lPahK3QItPJkAli9v&open=AZ2lPahK3QItPJkAli9v&pullRequest=2762
url: String,
date: Date,
duration: Long,
userName: String,
appName: String,
developerEmail: String,
consumerId: String,
implementedByPartialFunction: String,
implementedInVersion: String,
verb: String,
httpCode: Option[Int],
correlationId: String,
sourceIp: String,
targetIp: String,
operationId: String): Unit = {
if (!MetricsEventBus.isEnabled) return
try {
implicit val fmts = metricFormats
// Use Lift's date format (same one REST v6.0.0 uses when serializing
// MetricJsonV600.date) so the stream string matches byte-for-byte.
val dateStr = if (date != null) metricFormats.dateFormat.format(date) else ""
val payload = write(Map(
"url" -> Option(url).getOrElse(""),
"date" -> dateStr,
"duration" -> duration,
"user_id" -> Option(userId).getOrElse(""),
"username" -> Option(userName).getOrElse(""),
"app_name" -> Option(appName).getOrElse(""),
"developer_email" -> Option(developerEmail).getOrElse(""),
"consumer_id" -> Option(consumerId).getOrElse(""),
"implemented_by_partial_function" -> Option(implementedByPartialFunction).getOrElse(""),
"implemented_in_version" -> Option(implementedInVersion).getOrElse(""),
"verb" -> Option(verb).getOrElse(""),
"status_code" -> httpCode.getOrElse(0),
"correlation_id" -> Option(correlationId).getOrElse(""),
"source_ip" -> Option(sourceIp).getOrElse(""),
"target_ip" -> Option(targetIp).getOrElse(""),
"api_instance_id" -> code.api.Constant.ApiInstanceId,
"operation_id" -> Option(operationId).getOrElse("")
))
MetricsEventBus.publish(payload)
} catch {
case e: Throwable =>
logger.warn(s"WriteMetricUtil says: publishMetricEvent failed: ${e.getMessage}")
}
}

}
8 changes: 6 additions & 2 deletions obp-api/src/main/scala/code/api/v6_0_0/JSONFactory6.0.0.scala
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,9 @@
source_ip: String,
target_ip: String,
response_body: net.liftweb.json.JValue,
operation_id: String
status_code: Int,
operation_id: String,

Check warning on line 449 in obp-api/src/main/scala/code/api/v6_0_0/JSONFactory6.0.0.scala

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename this parameter to match the regular expression "^[_a-zA-Z][a-zA-Z0-9]*$".

See more on https://sonarcloud.io/project/issues?id=OpenBankProject_OBP-API&issues=AZ2lPaoc3QItPJkAli9w&open=AZ2lPaoc3QItPJkAli9w&pullRequest=2762
api_instance_id: String
)
case class MetricsJsonV600(metrics: List[MetricJsonV600])

Expand Down Expand Up @@ -1644,7 +1646,9 @@
source_ip = metric.getSourceIp(),
target_ip = metric.getTargetIp(),
response_body = net.liftweb.json.parseOpt(metric.getResponseBody()).getOrElse(net.liftweb.json.JString("Not enabled")),
operation_id = operationId
status_code = metric.getHttpCode(),
operation_id = operationId,
api_instance_id = metric.getApiInstanceId()
)
}

Expand Down
Loading
Loading