前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >PICE(2):JDBCStreaming - gRPC-JDBC Service

PICE(2):JDBCStreaming - gRPC-JDBC Service

作者头像
用户1150956
发布于 2018-07-31 07:56:45
发布于 2018-07-31 07:56:45
1.5K00
代码可运行
举报
运行总次数:0
代码可运行

   在一个akka-cluster环境里,从数据调用的角度上,JDBC数据库与集群中其它节点是脱离的。这是因为JDBC数据库不是分布式的,不具备节点位置透明化特性。所以,JDBC数据库服务器必须通过服务方式来向外提供数据操。在这种场景里服务端是JDBC服务,其它节点,包括其它的JDBC数据库节点都是这个JDBC服务的调用客户端。因为我们已经明确选择了在akka-cluster集群环境里实施gRPC服务模式,通过akka-stream的流控制方式实现数据库操作的程序控制,所以在本次讨论里我们将示范说明gRPC-JDBC-Streaming的具体实现和使用方式。

在上次的讨论里我们已经示范了最简单的JDBC-Streaming Unary request/response模式:从客户端向JDBC-Service发送一个JDBCQuery、JDBC服务端运行JDBCQuery后向客户端返回一个数据流DataRows。jdbc.proto文件里用IDL定义的数据和服务类型如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
message JDBCDataRow {
 string year = 1;
 string state = 2;
 string county = 3;
 string value = 4;
}

message JDBCQuery {
  string dbName = 1;
  string statement = 2;
  bytes parameters = 3;
  google.protobuf.Int32Value fetchSize= 4;
  google.protobuf.BoolValue autoCommit = 5;
  google.protobuf.Int32Value queryTimeout = 6;
}

service JDBCServices {
  rpc runQuery(JDBCQuery) returns (stream JDBCDataRow) {}
}

以上数据类型JDBCDataRow和JDBCQuery分别对应JDBC-Streaming工具的流元素结构和JDBCQueryContext,如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  val toRow = (rs: WrappedResultSet) => JDBCDataRow(
    year = rs.string("REPORTYEAR"),
    state = rs.string("STATENAME"),
    county = rs.string("COUNTYNAME"),
    value = rs.string("VALUE")
  )

   val ctx = JDBCQueryContext[JDBCDataRow](
     dbName = Symbol(q.dbName),
     statement = q.statement,
     parameters = params,
     fetchSize = q.fetchSize.getOrElse(100),
     autoCommit = q.autoCommit.getOrElse(false),
     queryTimeout = q.queryTimeout
   )
   jdbcAkkaStream(ctx, toRow)

用scalaPB编译后自动产生服务端和客户端框架代码(boilerplate-code)。我们需要实现具体的JDBC服务:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
class JDBCStreamingServices(implicit ec: ExecutionContextExecutor) extends JdbcGrpcAkkaStream.JDBCServices {
  val logger = Logger.getLogger(classOf[JDBCStreamingServices].getName)

  val toRow = (rs: WrappedResultSet) => JDBCDataRow(
    year = rs.string("REPORTYEAR"),
    state = rs.string("STATENAME"),
    county = rs.string("COUNTYNAME"),
    value = rs.string("VALUE")
  )
  override def runQuery: Flow[JDBCQuery, JDBCDataRow, NotUsed] = {
    logger.info("**** runQuery called on service side ***")
    Flow[JDBCQuery]
      .flatMapConcat { q =>
        //unpack JDBCQuery and construct the context
        val params: Seq[Any] = unmarshal[Seq[Any]](q.parameters)
        logger.info(s"**** query parameters: ${params} ****")
        val ctx = JDBCQueryContext[JDBCDataRow](
          dbName = Symbol(q.dbName),
          statement = q.statement,
          parameters = params,
          fetchSize = q.fetchSize.getOrElse(100),
          autoCommit = q.autoCommit.getOrElse(false),
          queryTimeout = q.queryTimeout
        )
        jdbcAkkaStream(ctx, toRow)
      }
  }
}

下面是客户端调用服务示范:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  val query = JDBCQuery (
    dbName = "h2",
    statement = "select * from AQMRPT where STATENAME = ? and VALUE = ?",
    parameters = marshal(Seq("Arizona", 5))
  )
  def queryRows: Source[JDBCDataRow,NotUsed] = {
    logger.info(s"running queryRows ...")
    Source
      .single(query)
      .via(stub.runQuery)
  }

这个程序的运行方式如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
object QueryRows extends App {
  implicit val system = ActorSystem("QueryRows")
  implicit val mat = ActorMaterializer.create(system)

  val client = new JDBCStreamClient("localhost", 50051)

  client.queryRows.runForeach(println)

  scala.io.StdIn.readLine()
  mat.shutdown()
  system.terminate()
}

那么如果从客户端发出一串的JDBCQuery又如何呢?这也是所谓的BiDi-Streaming模式,在jdbc.proto的服务描述如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
service JDBCServices {
  rpc runQuery(JDBCQuery) returns (stream JDBCDataRow) {}
  rpc batQuery(stream JDBCQuery) returns (stream JDBCDataRow) {}
}

我们看到batQuery的入参是一个stream。自动产生的服务函数batQuery款式是这样的:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  override def runQuery: Flow[JDBCQuery, JDBCDataRow, NotUsed] = { ... }
  override def batQuery: Flow[JDBCQuery, JDBCDataRow, NotUsed] = runQuery

runQuery和batQuery的函数款式是一样的。这就说明服务端提供的服务模式是一样的。在我们这个例子里它们都是对每个收到的JDBCQuery发还相关的数据流。实际上这两项服务的区别在客户方。下面是scalaPB产生的源代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
   override def runQuery: Flow[grpc.jdbc.services.JDBCQuery, grpc.jdbc.services.JDBCDataRow, NotUsed] =
      Flow.fromGraph(
        new GrpcGraphStage[grpc.jdbc.services.JDBCQuery, grpc.jdbc.services.JDBCDataRow]({ outputObserver =>
          new StreamObserver[grpc.jdbc.services.JDBCQuery] {
            override def onError(t: Throwable): Unit = ()
            override def onCompleted(): Unit = ()
            override def onNext(request: grpc.jdbc.services.JDBCQuery): Unit =
              ClientCalls.asyncServerStreamingCall(
                channel.newCall(METHOD_RUN_QUERY, options),
                request,
                outputObserver
              )
          }
        })
      )
 ...
      override def batQuery: Flow[grpc.jdbc.services.JDBCQuery, grpc.jdbc.services.JDBCDataRow, NotUsed] =
        Flow.fromGraph(new GrpcGraphStage[grpc.jdbc.services.JDBCQuery, grpc.jdbc.services.JDBCDataRow](outputObserver =>
          ClientCalls.asyncBidiStreamingCall(
            channel.newCall(METHOD_BAT_QUERY, options),
            outputObserver
          )
        ))

所以在客户端我们调用batQuery:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  def batQueryRows: Source[JDBCDataRow,NotUsed] = {
    logger.info(s"running batQueryRows ...")
    Source
      .fromIterator(() => List(query,query2,query3).toIterator)
      .via(stub.batQuery)
  }

JDBC操作除Query之外还应该具备数据更新部分,包括Schema DDL和database-updates。JDBC-update是通过JDBCContext来传递更新要求的:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
case class JDBCContext(
                        dbName: Symbol,
                        statements: Seq[String] = Nil,
                        parameters: Seq[Seq[Any]] = Nil,
                        fetchSize: Int = 100,
                        queryTimeout: Option[Int] = None,
                        queryTags: Seq[String] = Nil,
                        sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_UPDATE,
                        batch: Boolean = false,
                        returnGeneratedKey: Seq[Option[Any]] = Nil,
                        // no return: None, return by index: Some(1), by name: Some("id")
                        preAction: Option[PreparedStatement => Unit] = None,
                        postAction: Option[PreparedStatement => Unit] = None) {... }

这个class对应的protobuf message定义如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
message JDBCResult {
  bytes result = 1;
}

message JDBCUpdate {
  string dbName = 1;
  repeated string statements = 2;
  bytes parameters = 3;
  google.protobuf.Int32Value fetchSize= 4;
  google.protobuf.Int32Value queryTimeout = 5;
  int32 sqlType = 6;
  google.protobuf.Int32Value batch = 7;
  bytes returnGeneratedKey = 8;
}

service JDBCServices {
  rpc runQuery(JDBCQuery) returns (stream JDBCDataRow) {}
  rpc batQuery(stream JDBCQuery) returns (stream JDBCDataRow) {}
  rpc runDDL(JDBCUpdate) returns (JDBCResult) {}
}

服务函数runDDL返回消息类型JDBCResult: 包嵌一个Seq[Any]类型的返回值。下面是JDBCContext的protobuf message打包、还原使用方法示范,在服务端把JDBCUpdate拆解构建JDBCContext后调用jdbcExecuteDDL:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 override def runDDL: Flow[JDBCUpdate, JDBCResult, NotUsed] = {
    logger.info("**** runDDL called on service side ***")
    Flow[JDBCUpdate]
      .flatMapConcat { context =>
        //unpack JDBCUpdate and construct the context

        val ctx = JDBCContext(
          dbName = Symbol(context.dbName),
          statements = context.statements,
          sqlType = JDBCContext.SQL_EXEDDL,
          queryTimeout = context.queryTimeout
        )

        logger.info(s"**** JDBCContext => ${ctx} ***")

        Source
          .fromFuture(jdbcExecuteDDL(ctx))
          .map { r => JDBCResult(marshal(r)) }

      }
  }

jdbcExecuteDDL返回Future[String],如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  def jdbcExecuteDDL(ctx: JDBCContext)(implicit ec: ExecutionContextExecutor): Future[String] = {
    if (ctx.sqlType != SQL_EXEDDL) {
      Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!"))
    }
    else {
      Future {
        NamedDB(ctx.dbName) localTx { implicit session =>
          ctx.statements.foreach { stm =>
            val ddl = new SQLExecution(statement = stm, parameters = Nil)(
              before = WrappedResultSet => {})(
              after = WrappedResultSet => {})

            ddl.apply()
          }
          "SQL_EXEDDL executed succesfully."
        }
      }
    }
  }

我们可以用Source.fromFuture(jdbcExecuteDDL(cox))来构建一个akka-stream Source。 在客户端构建一个JDBCUpdate结构传给服务端进行运算:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  val dropSQL: String ="""
      drop table members
    """

  val createSQL: String ="""
    create table members (
      id serial not null primary key,
      name varchar(30) not null,
      description varchar(1000),
      birthday date,
      created_at timestamp not null,
      picture blob
    )"""
  val ctx = JDBCUpdate (
    dbName = "h2",
    sqlType = JDBCContext.SQL_EXEDDL,
    statements = Seq(dropSQL,createSQL)
  )

  def createTbl: Source[JDBCResult,NotUsed] = {
    logger.info(s"running createTbl ...")
    Source
      .single(ctx)
      .via(stub.runDDL)
  }

注意:statements = Seq(dropSQL,createSQL)包含了两个独立的SQL运算。

下面我们示范一下从客户端传送一个数据流(stream MemberRow),由服务端插入数据库操作。DDL数据类型和服务函数定义如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
message JDBCDate {
  int32 yyyy = 1;
  int32 mm   = 2;
  int32 dd   = 3;
}

message JDBCTime {
  int32 hh   = 1;
  int32 mm   = 2;
  int32 ss   = 3;
  int32 nnn  = 4;
}

message JDBCDateTime {
   JDBCDate date = 1;
   JDBCTime time = 2;
}
message MemberRow {
  string name = 1;
  JDBCDate birthday = 2;
  string description = 3;
  JDBCDateTime created_at = 4;
  bytes picture = 5;
}

service JDBCServices {
  rpc runQuery(JDBCQuery) returns (stream JDBCDataRow) {}
  rpc batQuery(stream JDBCQuery) returns (stream JDBCDataRow) {}
  rpc runDDL(JDBCUpdate) returns (JDBCResult) {}
  rpc insertRows(stream MemberRow) returns(JDBCResult) {}
}

insertRows服务函数的实现如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 override def insertRows: Flow[MemberRow, JDBCResult, NotUsed] = {
    logger.info("**** insertRows called on service side ***")
    val insertSQL = """
      insert into members(
        name,
        birthday,
        description,
        created_at
      ) values (?, ?, ?, ?)
    """
    Flow[MemberRow]
      .flatMapConcat { row =>
        val ctx = JDBCContext('h2)
          .setUpdateCommand(true,insertSQL,
             row.name,
             jdbcSetDate(row.birthday.get.yyyy,row.birthday.get.mm,row.birthday.get.dd),
             row.description,
             jdbcSetNow
          )

        logger.info(s"**** JDBCContext => ${ctx} ***")

        Source
          .fromFuture(jdbcTxUpdates[Vector](ctx))
          .map { r => JDBCResult(marshal(r)) }
      }
  }

同样,这个jdbcTxUpdates返回结果是Future类型。具体实现在附件的JDBCEngine.scala中。

客户端构建一个MemberRow流,然后经过stub.insertRows发送给服务端:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  val p1 = MemberRow( "Peter Chan",Some(JDBCDate(1967,5,17)),"new member1",None,_root_.com.google.protobuf.ByteString.EMPTY)
  val p2 = MemberRow( "Alanda Wong",Some(JDBCDate(1980,11,10)),"new member2",None,_root_.com.google.protobuf.ByteString.EMPTY)
  val p3 = MemberRow( "Kate Zhang",Some(JDBCDate(1969,8,13)),"new member3",None,_root_.com.google.protobuf.ByteString.EMPTY)
  val p4 = MemberRow( "Tiger Chan",Some(JDBCDate(1962,5,1)),"new member4",None,_root_.com.google.protobuf.ByteString.EMPTY)

  def insertRows: Source[JDBCResult,NotUsed] = {
    logger.info(s"running insertRows ...")
    Source
      .fromIterator(() => List(p1,p2,p3,p4).toIterator)
      .via(stub.insertRows)
  }

最后,我们再示范jdbcBatchUpdate函数的使用。我们从服务端读取MemberRow再传回服务端进行更新操作。DDL如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
message MemberRow {
  string name = 1;
  JDBCDate birthday = 2;
  string description = 3;
  JDBCDateTime created_at = 4;
  bytes picture = 5;
}

service JDBCServices {
  rpc runQuery(JDBCQuery) returns (stream JDBCDataRow) {}
  rpc batQuery(stream JDBCQuery) returns (stream JDBCDataRow) {}
  rpc runDDL(JDBCUpdate) returns (JDBCResult) {}
  rpc insertRows(stream MemberRow) returns(JDBCResult) {}
  rpc updateRows(stream MemberRow) returns(JDBCResult) {}
  rpc getMembers(JDBCQuery) returns (stream MemberRow) {}
}

服务端函数定义如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 val toMemberRow = (rs: WrappedResultSet) => MemberRow(
    name = rs.string("name"),
    description = rs.string("description"),
    birthday = None,
    createdAt = None,
    picture = _root_.com.google.protobuf.ByteString.EMPTY
  )
  override def getMembers: Flow[JDBCQuery, MemberRow, NotUsed] = {
    logger.info("**** getMembers called on service side ***")
    Flow[JDBCQuery]
      .flatMapConcat { q =>
        //unpack JDBCQuery and construct the context
        var params: Seq[Any] =  Nil
        if (q.parameters != _root_.com.google.protobuf.ByteString.EMPTY)
           params = unmarshal[Seq[Any]](q.parameters)
        logger.info(s"**** query parameters: ${params} ****")
        val ctx = JDBCQueryContext[MemberRow](
          dbName = Symbol(q.dbName),
          statement = q.statement,
          parameters = params,
          fetchSize = q.fetchSize.getOrElse(100),
          autoCommit = q.autoCommit.getOrElse(false),
          queryTimeout = q.queryTimeout
        )
        jdbcAkkaStream(ctx, toMemberRow)
      }
  }
override def updateRows: Flow[MemberRow, JDBCResult, NotUsed] = {
    logger.info("**** updateRows called on service side ***")
    val updateSQL = "update members set description = ?, created_at = ? where name = ?"

    Flow[MemberRow]
      .flatMapConcat { row =>
        val ctx = JDBCContext('h2)
            .setBatchCommand(updateSQL)
            .appendBatchParameters(
              row.name + " updated.",
              jdbcSetNow,
              row.name
            ).setBatchReturnGeneratedKeyOption(true)

        logger.info(s"**** JDBCContext => ${ctx} ***")

        Source
          .fromFuture(jdbcBatchUpdate[Vector](ctx))
          .map { r => JDBCResult(marshal(r)) }
      }
  }

jdbcBatchUpdate函数的源代码在附件JDBCEngine.scala中。客户端代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  val queryMember = JDBCQuery (
    dbName = "h2",
    statement = "select * from members"
  )
  def updateRows: Source[JDBCResult,NotUsed] = {
    logger.info(s"running updateRows ...")
    Source
      .single(queryMember)
      .via(stub.getMembers)
      .via(stub.updateRows)
  }

下面的例子示范了如何利用JDBCActionStream来批量处理数据。服务端的源代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  val params: JDBCDataRow => Seq[Any] = row => {
    Seq((row.value.toInt * 2), row.state, row.county, row.year) }
  val sql = "update AQMRPT set total = ? where statename = ? and countyname = ? and reportyear = ?"

  val jdbcActionStream = JDBCActionStream('h2,sql ,params)
    .setParallelism(4).setProcessOrder(false)
  val jdbcActionFlow = jdbcActionStream.performOnRow

  override def updateBat: Flow[JDBCDataRow, JDBCDataRow, NotUsed] = {
    logger.info("**** updateBat called on service side ***")
    Flow[JDBCDataRow]
         .via(jdbcActionFlow)
  }

jdbcActionFlow是一个Flow[R,R,_],所以我们直接用via把它连接到上一个Flow。下面是JDBCActionStream的定义代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  case class JDBCActionStream[R](dbName: Symbol, parallelism: Int = 1, processInOrder: Boolean = true,
                                 statement: String, prepareParams: R => Seq[Any]) {
    jas =>
    def setDBName(db: Symbol): JDBCActionStream[R] = jas.copy(dbName = db)
    def setParallelism(parLevel: Int): JDBCActionStream[R] = jas.copy(parallelism = parLevel)
    def setProcessOrder(ordered: Boolean): JDBCActionStream[R] = jas.copy(processInOrder = ordered)

    private def perform(r: R)(implicit ec: ExecutionContextExecutor) = {
      import scala.concurrent._
      val params = prepareParams(r)
      Future {
        NamedDB(dbName) autoCommit { session =>
          session.execute(statement, params: _*)
        }
        r
      }
    }
    def performOnRow(implicit ec: ExecutionContextExecutor): Flow[R, R, NotUsed] =
      if (processInOrder)
        Flow[R].mapAsync(parallelism)(perform)
      else
        Flow[R].mapAsyncUnordered(parallelism)(perform)

  }
  object JDBCActionStream {
    def apply[R](_dbName: Symbol, _statement: String, params: R => Seq[Any]): JDBCActionStream[R] =
      new JDBCActionStream[R](dbName = _dbName, statement=_statement, prepareParams = params)
  }

函数performOnRow是个passthrough处理过程,使用了mapAsync来支持多线程运算。客户端调用方式如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  def updateBatches: Source[JDBCDataRow,NotUsed] = {
    logger.info(s"running updateBatches ...")
      Source
        .fromIterator(() => List(query,query2,query3).toIterator)
        .via(stub.batQuery)
        .via(stub.updateBat)

  }

下面是本次示范的完整源代码:

jdbc.proto

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
syntax = "proto3";

import "google/protobuf/wrappers.proto";
import "google/protobuf/any.proto";
import "scalapb/scalapb.proto";


package grpc.jdbc.services;

option (scalapb.options) = {
  // use a custom Scala package name
  // package_name: "io.ontherocks.introgrpc.demo"

  // don't append file name to package
  flat_package: true

  // generate one Scala file for all messages (services still get their own file)
  single_file: true

  // add imports to generated file
  // useful when extending traits or using custom types
  // import: "io.ontherocks.hellogrpc.RockingMessage"

  // code to put at the top of generated file
  // works only with `single_file: true`
  //preamble: "sealed trait SomeSealedTrait"
};

/*
 * Demoes various customization options provided by ScalaPBs.
 */

message JDBCDataRow {
 string year = 1;
 string state = 2;
 string county = 3;
 string value = 4;
}


message JDBCQuery {
  string dbName = 1;
  string statement = 2;
  bytes parameters = 3;
  google.protobuf.Int32Value fetchSize= 4;
  google.protobuf.BoolValue autoCommit = 5;
  google.protobuf.Int32Value queryTimeout = 6;
}

message JDBCResult {
  bytes result = 1;
}

message JDBCUpdate {
  string dbName = 1;
  repeated string statements = 2;
  bytes parameters = 3;
  google.protobuf.Int32Value fetchSize= 4;
  google.protobuf.Int32Value queryTimeout = 5;
  int32 sqlType = 6;
  google.protobuf.Int32Value batch = 7;
  bytes returnGeneratedKey = 8;
}

message JDBCDate {
  int32 yyyy = 1;
  int32 mm   = 2;
  int32 dd   = 3;
}

message JDBCTime {
  int32 hh   = 1;
  int32 mm   = 2;
  int32 ss   = 3;
  int32 nnn  = 4;
}

message JDBCDateTime {
   JDBCDate date = 1;
   JDBCTime time = 2;
}

message MemberRow {
  string name = 1;
  JDBCDate birthday = 2;
  string description = 3;
  JDBCDateTime created_at = 4;
  bytes picture = 5;
}

service JDBCServices {
  rpc runQuery(JDBCQuery) returns (stream JDBCDataRow) {}
  rpc batQuery(stream JDBCQuery) returns (stream JDBCDataRow) {}
  rpc runDDL(JDBCUpdate) returns (JDBCResult) {}
  rpc insertRows(stream MemberRow) returns(JDBCResult) {}
  rpc updateRows(stream MemberRow) returns(JDBCResult) {}
  rpc getMembers(JDBCQuery) returns (stream MemberRow) {}
}

JDBCEngine.scala

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package sdp.jdbc.engine
import java.sql.PreparedStatement
import scala.collection.generic.CanBuildFrom
import akka.stream.scaladsl._
import scalikejdbc._
import scalikejdbc.streams._
import akka.NotUsed
import akka.stream._
import java.time._
import scala.concurrent.duration._
import scala.concurrent._
import sdp.jdbc.FileStreaming._

import scalikejdbc.TxBoundary.Try._

import scala.concurrent.ExecutionContextExecutor
import java.io.InputStream

object JDBCContext {
  type SQLTYPE = Int
  val SQL_EXEDDL= 1
  val SQL_UPDATE = 2
  val RETURN_GENERATED_KEYVALUE = true
  val RETURN_UPDATED_COUNT = false

}

case class JDBCQueryContext[M](
                                dbName: Symbol,
                                statement: String,
                                parameters: Seq[Any] = Nil,
                                fetchSize: Int = 100,
                                autoCommit: Boolean = false,
                                queryTimeout: Option[Int] = None)


case class JDBCContext(
                        dbName: Symbol,
                        statements: Seq[String] = Nil,
                        parameters: Seq[Seq[Any]] = Nil,
                        fetchSize: Int = 100,
                        queryTimeout: Option[Int] = None,
                        queryTags: Seq[String] = Nil,
                        sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_UPDATE,
                        batch: Boolean = false,
                        returnGeneratedKey: Seq[Option[Any]] = Nil,
                        // no return: None, return by index: Some(1), by name: Some("id")
                        preAction: Option[PreparedStatement => Unit] = None,
                        postAction: Option[PreparedStatement => Unit] = None) {

  ctx =>

  //helper functions

  def appendTag(tag: String): JDBCContext = ctx.copy(queryTags = ctx.queryTags :+ tag)

  def appendTags(tags: Seq[String]): JDBCContext = ctx.copy(queryTags = ctx.queryTags ++ tags)

  def setFetchSize(size: Int): JDBCContext = ctx.copy(fetchSize = size)

  def setQueryTimeout(time: Option[Int]): JDBCContext = ctx.copy(queryTimeout = time)

  def setPreAction(action: Option[PreparedStatement => Unit]): JDBCContext = {
    if (ctx.sqlType == JDBCContext.SQL_UPDATE &&
      !ctx.batch && ctx.statements.size == 1)
      ctx.copy(preAction = action)
    else
      throw new IllegalStateException("JDBCContex setting error: preAction not supported!")
  }

  def setPostAction(action: Option[PreparedStatement => Unit]): JDBCContext = {
    if (ctx.sqlType == JDBCContext.SQL_UPDATE &&
      !ctx.batch && ctx.statements.size == 1)
      ctx.copy(postAction = action)
    else
      throw new IllegalStateException("JDBCContex setting error: preAction not supported!")
  }

  def appendDDLCommand(_statement: String, _parameters: Any*): JDBCContext = {
    if (ctx.sqlType == JDBCContext.SQL_EXEDDL) {
      ctx.copy(
        statements = ctx.statements ++ Seq(_statement),
        parameters = ctx.parameters ++ Seq(Seq(_parameters))
      )
    } else
      throw new IllegalStateException("JDBCContex setting error: option not supported!")
  }

  def appendUpdateCommand(_returnGeneratedKey: Boolean, _statement: String,_parameters: Any*): JDBCContext = {
    if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch) {
      ctx.copy(
        statements = ctx.statements ++ Seq(_statement),
        parameters = ctx.parameters ++ Seq(_parameters),
        returnGeneratedKey = ctx.returnGeneratedKey ++ (if (_returnGeneratedKey) Seq(Some(1)) else Seq(None))
      )
    } else
      throw new IllegalStateException("JDBCContex setting error: option not supported!")
  }

  def appendBatchParameters(_parameters: Any*): JDBCContext = {
    if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch)
      throw new IllegalStateException("JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!")

    var matchParams = true
    if (ctx.parameters != Nil)
      if (ctx.parameters.head.size != _parameters.size)
        matchParams = false
    if (matchParams) {
      ctx.copy(
        parameters = ctx.parameters ++ Seq(_parameters)
      )
    } else
      throw new IllegalStateException("JDBCContex setting error: batch command parameters not match!")
  }


  def setBatchReturnGeneratedKeyOption(returnKey: Boolean): JDBCContext = {
    if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch)
      throw new IllegalStateException("JDBCContex setting error: only supported in batch update commands!")
    ctx.copy(
      returnGeneratedKey = if (returnKey) Seq(Some(1)) else Nil
    )
  }

  def setDDLCommand(_statement: String, _parameters: Any*): JDBCContext = {
    ctx.copy(
      statements = Seq(_statement),
      parameters = Seq(_parameters),
      sqlType = JDBCContext.SQL_EXEDDL,
      batch = false
    )
  }

  def setUpdateCommand(_returnGeneratedKey: Boolean, _statement: String,_parameters: Any*): JDBCContext = {
    ctx.copy(
      statements = Seq(_statement),
      parameters = Seq(_parameters),
      returnGeneratedKey = if (_returnGeneratedKey) Seq(Some(1)) else Seq(None),
      sqlType = JDBCContext.SQL_UPDATE,
      batch = false
    )
  }
  def setBatchCommand(_statement: String): JDBCContext = {
    ctx.copy (
      statements = Seq(_statement),
      sqlType = JDBCContext.SQL_UPDATE,
      batch = true
    )
  }

}

object JDBCEngine {

  import JDBCContext._

  type JDBCDate = LocalDate
  type JDBCDateTime = LocalDateTime
  type JDBCTime = LocalTime

  def jdbcSetDate(yyyy: Int, mm: Int, dd: Int) = LocalDate.of(yyyy,mm,dd)
  def jdbcSetTime(hh: Int, mm: Int, ss: Int, nn: Int) = LocalTime.of(hh,mm,ss,nn)
  def jdbcSetDateTime(date: JDBCDate, time: JDBCTime) =  LocalDateTime.of(date,time)
  def jdbcSetNow = LocalDateTime.now()


  type JDBCBlob = InputStream

  def fileToJDBCBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)(
    implicit mat: Materializer) = FileToInputStream(fileName,timeOut)

  def jdbcBlobToFile(blob: JDBCBlob, fileName: String)(
    implicit mat: Materializer) =  InputStreamToFile(blob,fileName)



  private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) =>
    throw new IllegalStateException(message)
  }

  def jdbcAkkaStream[A](ctx: JDBCQueryContext[A],extractor: WrappedResultSet => A)
                       (implicit ec: ExecutionContextExecutor): Source[A,NotUsed] = {
      val publisher: DatabasePublisher[A] = NamedDB(ctx.dbName) readOnlyStream {
      val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor(""))
      ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
      val sql: SQL[A, HasExtractor] = rawSql.map(extractor)

      sql.iterator
        .withDBSessionForceAdjuster(session => {
          session.connection.setAutoCommit(ctx.autoCommit)
          session.fetchSize(ctx.fetchSize)
        })
    }
    Source.fromPublisher[A](publisher)
  }


  def jdbcQueryResult[C[_] <: TraversableOnce[_], A](ctx: JDBCQueryContext[A],
                                                     extractor: WrappedResultSet => A)(
                                                      implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = {

    val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor(""))
    ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
    rawSql.fetchSize(ctx.fetchSize)
    implicit val session = NamedAutoSession(ctx.dbName)
    val sql: SQL[A, HasExtractor] = rawSql.map(extractor)
    sql.collection.apply[C]()

  }

  def jdbcExecuteDDL(ctx: JDBCContext)(implicit ec: ExecutionContextExecutor): Future[String] = {
    if (ctx.sqlType != SQL_EXEDDL) {
      Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!"))
    }
    else {
      Future {
        NamedDB(ctx.dbName) localTx { implicit session =>
          ctx.statements.foreach { stm =>
            val ddl = new SQLExecution(statement = stm, parameters = Nil)(
              before = WrappedResultSet => {})(
              after = WrappedResultSet => {})

            ddl.apply()
          }
          "SQL_EXEDDL executed succesfully."
        }
      }
    }
  }

  def jdbcBatchUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
    implicit ec: ExecutionContextExecutor,
             cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
    if (ctx.statements == Nil)
     Future.failed ( new IllegalStateException("JDBCContex setting error: statements empty!"))
    if (ctx.sqlType != SQL_UPDATE) {
      Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!"))
    }
    else {
      if (ctx.batch) {
        if (noReturnKey(ctx)) {
          val usql = SQL(ctx.statements.head)
            .tags(ctx.queryTags: _*)
            .batch(ctx.parameters: _*)
          Future {
            NamedDB(ctx.dbName) localTx { implicit session =>
              ctx.queryTimeout.foreach(session.queryTimeout(_))
              usql.apply[Seq]()
              Seq.empty[Long].to[C]
            }
          }
        } else {
          val usql = new SQLBatchWithGeneratedKey(ctx.statements.head, ctx.parameters, ctx.queryTags)(None)
          Future {
            NamedDB(ctx.dbName) localTx { implicit session =>
              ctx.queryTimeout.foreach(session.queryTimeout(_))
              usql.apply[C]()
            }
          }
        }

      } else {
        Future.failed(new IllegalStateException("JDBCContex setting error: must set batch = true !"))
      }
    }
  }
  private def singleTxUpdateWithReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
          implicit ec: ExecutionContextExecutor,
                    cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
    val Some(key) :: xs = ctx.returnGeneratedKey
    val params: Seq[Any] = ctx.parameters match {
      case Nil => Nil
      case p@_ => p.head
    }
    val usql = new SQLUpdateWithGeneratedKey(ctx.statements.head, params, ctx.queryTags)(key)
    Future {
      NamedDB(ctx.dbName) localTx { implicit session =>
        session.fetchSize(ctx.fetchSize)
        ctx.queryTimeout.foreach(session.queryTimeout(_))
        val result = usql.apply()
        Seq(result).to[C]
      }
    }
  }

  private def singleTxUpdateNoReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
          implicit ec: ExecutionContextExecutor,
                   cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
    val params: Seq[Any] = ctx.parameters match {
      case Nil => Nil
      case p@_ => p.head
    }
    val before = ctx.preAction match {
      case None => pstm: PreparedStatement => {}
      case Some(f) => f
    }
    val after = ctx.postAction match {
      case None => pstm: PreparedStatement => {}
      case Some(f) => f
    }
    val usql = new SQLUpdate(ctx.statements.head,params,ctx.queryTags)(before)(after)
    Future {
      NamedDB(ctx.dbName) localTx {implicit session =>
        session.fetchSize(ctx.fetchSize)
        ctx.queryTimeout.foreach(session.queryTimeout(_))
        val result = usql.apply()
        Seq(result.toLong).to[C]
      }
    }

  }

  private def singleTxUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
    implicit ec: ExecutionContextExecutor,
             cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
    if (noReturnKey(ctx))
      singleTxUpdateNoReturnKey(ctx)
    else
      singleTxUpdateWithReturnKey(ctx)
  }

  private def noReturnKey(ctx: JDBCContext): Boolean = {
    if (ctx.returnGeneratedKey != Nil) {
      val k :: xs = ctx.returnGeneratedKey
      k match {
        case None => true
        case Some(k) => false
      }
    } else true
  }

  def noActon: PreparedStatement=>Unit = pstm => {}

  def multiTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
    implicit ec: ExecutionContextExecutor,
             cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
    Future {
      NamedDB(ctx.dbName) localTx { implicit session =>
        session.fetchSize(ctx.fetchSize)
        ctx.queryTimeout.foreach(session.queryTimeout(_))
        val keys: Seq[Option[Any]] = ctx.returnGeneratedKey match {
          case Nil => Seq.fill(ctx.statements.size)(None)
          case k@_ => k
        }
        val sqlcmd = ctx.statements zip ctx.parameters zip keys
        val results = sqlcmd.map { case ((stm, param), key) =>
          key match {
            case None =>
              new SQLUpdate(stm, param, Nil)(noActon)(noActon).apply().toLong
            case Some(k) =>
              new SQLUpdateWithGeneratedKey(stm, param, Nil)(k).apply().toLong
          }
        }
        results.to[C]
      }
    }
  }


  def jdbcTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
    implicit ec: ExecutionContextExecutor,
             cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
    if (ctx.statements == Nil)
      Future.failed( new IllegalStateException("JDBCContex setting error: statements empty!"))
    if (ctx.sqlType != SQL_UPDATE) {
      Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!"))
    }
    else {
      if (!ctx.batch) {
        if (ctx.statements.size == 1)
          singleTxUpdate(ctx)
        else
          multiTxUpdates(ctx)
      } else
        Future.failed(new IllegalStateException("JDBCContex setting error: must set batch = false !"))

    }
  }

  case class JDBCActionStream[R](dbName: Symbol, parallelism: Int = 1, processInOrder: Boolean = true,
                                 statement: String, prepareParams: R => Seq[Any]) {
    jas =>
    def setDBName(db: Symbol): JDBCActionStream[R] = jas.copy(dbName = db)
    def setParallelism(parLevel: Int): JDBCActionStream[R] = jas.copy(parallelism = parLevel)
    def setProcessOrder(ordered: Boolean): JDBCActionStream[R] = jas.copy(processInOrder = ordered)

    private def perform(r: R)(implicit ec: ExecutionContextExecutor) = {
      import scala.concurrent._
      val params = prepareParams(r)
      Future {
        NamedDB(dbName) autoCommit { session =>
          session.execute(statement, params: _*)
        }
        r
      }
     // Future.successful(r)
    }
    def performOnRow(implicit ec: ExecutionContextExecutor): Flow[R, R, NotUsed] =
      if (processInOrder)
        Flow[R].mapAsync(parallelism)(perform)
      else
        Flow[R].mapAsyncUnordered(parallelism)(perform)

  }
  object JDBCActionStream {
    def apply[R](_dbName: Symbol, _statement: String, params: R => Seq[Any]): JDBCActionStream[R] =
      new JDBCActionStream[R](dbName = _dbName, statement=_statement, prepareParams = params)
  }
}

JDBCService.scala

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package demo.grpc.jdbc.services

import akka.NotUsed
import akka.stream.scaladsl.{Source,Flow}
import grpc.jdbc.services._
import java.util.logging.Logger
import protobuf.bytes.Converter._
import sdp.jdbc.engine._
import JDBCEngine._
import scalikejdbc.WrappedResultSet
import scala.concurrent.ExecutionContextExecutor

class JDBCStreamingServices(implicit ec: ExecutionContextExecutor) extends JdbcGrpcAkkaStream.JDBCServices {
  val logger = Logger.getLogger(classOf[JDBCStreamingServices].getName)

  val toRow = (rs: WrappedResultSet) => JDBCDataRow(
    year = rs.string("REPORTYEAR"),
    state = rs.string("STATENAME"),
    county = rs.string("COUNTYNAME"),
    value = rs.string("VALUE")
  )
  override def runQuery: Flow[JDBCQuery, JDBCDataRow, NotUsed] = {
    logger.info("**** runQuery called on service side ***")
    Flow[JDBCQuery]
      .flatMapConcat { q =>
        //unpack JDBCQuery and construct the context
        var params: Seq[Any] =  Nil
        if (q.parameters != _root_.com.google.protobuf.ByteString.EMPTY)
          params = unmarshal[Seq[Any]](q.parameters)
        logger.info(s"**** query parameters: ${params} ****")
        val ctx = JDBCQueryContext[JDBCDataRow](
          dbName = Symbol(q.dbName),
          statement = q.statement,
          parameters = params,
          fetchSize = q.fetchSize.getOrElse(100),
          autoCommit = q.autoCommit.getOrElse(false),
          queryTimeout = q.queryTimeout
        )
        jdbcAkkaStream(ctx, toRow)
      }
  }

  override def batQuery: Flow[JDBCQuery, JDBCDataRow, NotUsed] = runQuery

  override def runDDL: Flow[JDBCUpdate, JDBCResult, NotUsed] = {
    logger.info("**** runDDL called on service side ***")
    Flow[JDBCUpdate]
      .flatMapConcat { context =>
        //unpack JDBCUpdate and construct the context

        val ctx = JDBCContext(
          dbName = Symbol(context.dbName),
          statements = context.statements,
          sqlType = JDBCContext.SQL_EXEDDL,
          queryTimeout = context.queryTimeout
        )

        logger.info(s"**** JDBCContext => ${ctx} ***")

        Source
          .fromFuture(jdbcExecuteDDL(ctx))
          .map { r => JDBCResult(marshal(r)) }

      }

  }

  override def insertRows: Flow[MemberRow, JDBCResult, NotUsed] = {
    logger.info("**** insertRows called on service side ***")
    val insertSQL = """
      insert into members(
        name,
        birthday,
        description,
        created_at
      ) values (?, ?, ?, ?)
    """
    Flow[MemberRow]
      .flatMapConcat { row =>
        val ctx = JDBCContext('h2)
          .setUpdateCommand(true,insertSQL,
             row.name,
             jdbcSetDate(row.birthday.get.yyyy,row.birthday.get.mm,row.birthday.get.dd),
             row.description,
             jdbcSetNow
          )

        logger.info(s"**** JDBCContext => ${ctx} ***")

        Source
          .fromFuture(jdbcTxUpdates[Vector](ctx))
          .map { r => JDBCResult(marshal(r)) }
      }
  }

  override def updateRows: Flow[MemberRow, JDBCResult, NotUsed] = {
    logger.info("**** updateRows called on service side ***")
    val updateSQL = "update members set description = ?, created_at = ? where name = ?"

    Flow[MemberRow]
      .flatMapConcat { row =>
        val ctx = JDBCContext('h2)
            .setBatchCommand(updateSQL)
            .appendBatchParameters(
              row.name + " updated.",
              jdbcSetNow,
              row.name
            ).setBatchReturnGeneratedKeyOption(true)

        logger.info(s"**** JDBCContext => ${ctx} ***")

        Source
          .fromFuture(jdbcBatchUpdate[Vector](ctx))
          .map { r => JDBCResult(marshal(r)) }
      }
  }


  val toMemberRow = (rs: WrappedResultSet) => MemberRow(
    name = rs.string("name"),
    description = rs.string("description"),
    birthday = None,
    createdAt = None,
    picture = _root_.com.google.protobuf.ByteString.EMPTY
  )
  override def getMembers: Flow[JDBCQuery, MemberRow, NotUsed] = {
    logger.info("**** getMembers called on service side ***")
    Flow[JDBCQuery]
      .flatMapConcat { q =>
        //unpack JDBCQuery and construct the context
        var params: Seq[Any] =  Nil
        if (q.parameters != _root_.com.google.protobuf.ByteString.EMPTY)
           params = unmarshal[Seq[Any]](q.parameters)
        logger.info(s"**** query parameters: ${params} ****")
        val ctx = JDBCQueryContext[MemberRow](
          dbName = Symbol(q.dbName),
          statement = q.statement,
          parameters = params,
          fetchSize = q.fetchSize.getOrElse(100),
          autoCommit = q.autoCommit.getOrElse(false),
          queryTimeout = q.queryTimeout
        )
        jdbcAkkaStream(ctx, toMemberRow)
      }

  }
}

JDBCServer.scala

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package demo.grpc.jdbc.server

import java.util.logging.Logger

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import io.grpc.Server
import demo.grpc.jdbc.services._
import io.grpc.ServerBuilder
import grpc.jdbc.services._

class gRPCServer(server: Server) {

  val logger: Logger = Logger.getLogger(classOf[gRPCServer].getName)

  def start(): Unit = {
    server.start()
    logger.info(s"Server started, listening on ${server.getPort}")
    sys.addShutdownHook {
      // Use stderr here since the logger may has been reset by its JVM shutdown hook.
      System.err.println("*** shutting down gRPC server since JVM is shutting down")
      stop()
      System.err.println("*** server shut down")
    }
    ()
  }

  def stop(): Unit = {
    server.shutdown()
  }

  /**
    * Await termination on the main thread since the grpc library uses daemon threads.
    */
  def blockUntilShutdown(): Unit = {
    server.awaitTermination()
  }
}

object JDBCServer extends App {
  import sdp.jdbc.config._

  implicit val system = ActorSystem("JDBCServer")
  implicit val mat = ActorMaterializer.create(system)
  implicit val ec = system.dispatcher

  ConfigDBsWithEnv("dev").setup('h2)
  ConfigDBsWithEnv("dev").loadGlobalSettings()

  val server = new gRPCServer(
    ServerBuilder
      .forPort(50051)
      .addService(
        JdbcGrpcAkkaStream.bindService(
          new JDBCStreamingServices
        )
      ).build()
  )
  server.start()
  //  server.blockUntilShutdown()
  scala.io.StdIn.readLine()
  ConfigDBsWithEnv("dev").close('h2)
  mat.shutdown()
  system.terminate()
}

JDBCClient.scala

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package demo.grpc.jdbc.client
import grpc.jdbc.services._
import java.util.logging.Logger

import protobuf.bytes.Converter._
import akka.stream.scaladsl._
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ThrottleMode}
import io.grpc._
import sdp.jdbc.engine._

class JDBCStreamClient(host: String, port: Int) {
  val logger: Logger = Logger.getLogger(classOf[JDBCStreamClient].getName)

  val channel = ManagedChannelBuilder
    .forAddress(host,port)
    .usePlaintext(true)
    .build()


  val stub = JdbcGrpcAkkaStream.stub(channel)
  val query = JDBCQuery (
    dbName = "h2",
    statement = "select * from AQMRPT where STATENAME = ? and VALUE = ?",
    parameters = marshal(Seq("Arizona", 2))
  )
  val query2 = JDBCQuery (
    dbName = "h2",
    statement = "select * from AQMRPT where STATENAME = ? and VALUE = ?",
    parameters = marshal(Seq("Colorado", 3))
  )
  val query3= JDBCQuery (
    dbName = "h2",
    statement = "select * from AQMRPT where STATENAME = ? and VALUE = ?",
    parameters = marshal(Seq("Arkansas", 8))
  )

  def queryRows: Source[JDBCDataRow,NotUsed] = {
    logger.info(s"running queryRows ...")
    Source
        .single(query)
        .via(stub.runQuery)
  }

  def batQueryRows: Source[JDBCDataRow,NotUsed] = {
    logger.info(s"running batQueryRows ...")
    Source
      .fromIterator(() => List(query,query2,query3).toIterator)
      .via(stub.batQuery)
  }

  val dropSQL: String ="""
      drop table members
    """

  val createSQL: String ="""
    create table members (
      id serial not null primary key,
      name varchar(30) not null,
      description varchar(1000),
      birthday date,
      created_at timestamp not null,
      picture blob
    )"""
  val ctx = JDBCUpdate (
    dbName = "h2",
    sqlType = JDBCContext.SQL_EXEDDL,
    statements = Seq(dropSQL,createSQL)
  )

  def createTbl: Source[JDBCResult,NotUsed] = {
    logger.info(s"running createTbl ...")
    Source
      .single(ctx)
      .via(stub.runDDL)
  }

  val p1 = MemberRow( "Peter Chan",Some(JDBCDate(1967,5,17)),"new member1",None,_root_.com.google.protobuf.ByteString.EMPTY)
  val p2 = MemberRow( "Alanda Wong",Some(JDBCDate(1980,11,10)),"new member2",None,_root_.com.google.protobuf.ByteString.EMPTY)
  val p3 = MemberRow( "Kate Zhang",Some(JDBCDate(1969,8,13)),"new member3",None,_root_.com.google.protobuf.ByteString.EMPTY)
  val p4 = MemberRow( "Tiger Chan",Some(JDBCDate(1962,5,1)),"new member4",None,_root_.com.google.protobuf.ByteString.EMPTY)

  def insertRows: Source[JDBCResult,NotUsed] = {
    logger.info(s"running insertRows ...")
    Source
      .fromIterator(() => List(p1,p2,p3,p4).toIterator)
      .via(stub.insertRows)
  }

  val queryMember = JDBCQuery (
    dbName = "h2",
    statement = "select * from members"
  )
  def updateRows: Source[JDBCResult,NotUsed] = {
    logger.info(s"running updateRows ...")
    Source
      .single(queryMember)
      .via(stub.getMembers)
      .via(stub.updateRows)
  }
  def updateBatches: Source[JDBCDataRow,NotUsed] = {
    logger.info(s"running updateBatches ...")
      Source
        .fromIterator(() => List(query,query2,query3).toIterator)
        .via(stub.batQuery)
        .via(stub.updateBat)

  }

}


object TestConversion extends App {

  val orgval: Seq[Option[Any]] = Seq(Some(1),Some("id"),None,Some(2))
  println(s"original value: ${orgval}")

  val marval = marshal(orgval)

  println(s"marshal value: ${marval}")

  val unmval = unmarshal[Seq[Option[Any]]](marval)

  println(s"marshal value: ${unmval}")


  val m1 = MemberRow(name = "Peter")
  val m2 = m1.update(
    _.birthday.yyyy := 1989,
    _.birthday.mm := 10,
    _.birthday.dd := 3,
    _.description := "a new member"
  )


}


object QueryRows extends App {
  implicit val system = ActorSystem("QueryRows")
  implicit val mat = ActorMaterializer.create(system)

  val client = new JDBCStreamClient("localhost", 50051)

  client.queryRows.runForeach { r => println(r) }


  scala.io.StdIn.readLine()
  mat.shutdown()
  system.terminate()
}

object BatQueryRows extends App {
  implicit val system = ActorSystem("BatQueryRows")
  implicit val mat = ActorMaterializer.create(system)

  val client = new JDBCStreamClient("localhost", 50051)

  client.batQueryRows.runForeach(println)

  scala.io.StdIn.readLine()
  mat.shutdown()
  system.terminate()
}

object RunDDL extends App {
  implicit val system = ActorSystem("RunDDL")
  implicit val mat = ActorMaterializer.create(system)

  val client = new JDBCStreamClient("localhost", 50051)

  client.createTbl.runForeach{r => println(unmarshal[Seq[Any]](r.result))}

  scala.io.StdIn.readLine()
  mat.shutdown()
  system.terminate()

}

object InsertRows extends App {
  implicit val system = ActorSystem("InsertRows")
  implicit val mat = ActorMaterializer.create(system)

  val client = new JDBCStreamClient("localhost", 50051)

  client.insertRows.runForeach { r => println(unmarshal[Vector[Long]](r.result)) }

  scala.io.StdIn.readLine()
  mat.shutdown()
  system.terminate()
}

object UpdateRows extends App {
  implicit val system = ActorSystem("UpdateRows")
  implicit val mat = ActorMaterializer.create(system)

  val client = new JDBCStreamClient("localhost", 50051)

  client.updateRows.runForeach{ r => println(unmarshal[Vector[Long]](r.result)) }

  scala.io.StdIn.readLine()
  mat.shutdown()
  system.terminate()
}

object BatUpdates extends App {
  implicit val system = ActorSystem("BatUpdates")
  implicit val mat = ActorMaterializer.create(system)

  val client = new JDBCStreamClient("localhost", 50051)

  client.updateBatches.runForeach(println)

  scala.io.StdIn.readLine()
  mat.shutdown()
  system.terminate()
}

ByteConverter.scala

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package protobuf.bytes
import java.io.{ByteArrayInputStream,ByteArrayOutputStream,ObjectInputStream,ObjectOutputStream}
import com.google.protobuf.ByteString
object Converter {

  def marshal(value: Any): ByteString = {
    val stream: ByteArrayOutputStream = new ByteArrayOutputStream()
    val oos = new ObjectOutputStream(stream)
    oos.writeObject(value)
    oos.close()
    ByteString.copyFrom(stream.toByteArray())
  }

  def unmarshal[A](bytes: ByteString): A = {
    val ois = new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray))
    val value = ois.readObject()
    ois.close()
    value.asInstanceOf[A]
  }

}

其它部分的源代码和系统设置可以从上次的讨论稿中获取。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018-06-19 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
OpenShift 4 离线安装复盘(精华版)
详细安装步骤见 OpenShift 4企业高可用集群(离线)安装实践 & Troubleshooting记录,本文重新梳理并剔除了部分不适合手机阅读的细节,总结自己对整个过程的认识,并强调一些网上资源未提及或者不够明显的地方,供大家参考。
DevOps云学堂
2019/11/15
4.6K0
容器超融合的实现&持久存储的动态分配 : Openshift3.9学习系列第六终结篇
干货巨献:Openshift3.9的网络管理大全.加长篇---Openshift3.9学习系列第二篇
魏新宇
2018/07/30
1.5K0
容器超融合的实现&持久存储的动态分配  : Openshift3.9学习系列第六终结篇
OpenShift修改subdomain域名记录
在客户的已经准备好上线的OpenShift 3.9集群中提出修改域名需求,因此先在本人的自己测试环境进行了验证,过程如下,并且还有一些坑,并进行了记录;将ocp6修改为ocp7。
DevOps云学堂
2019/10/18
3.8K0
OpenShift修改subdomain域名记录
Centos7中单节点openshift_3.11安装及配置开机自启
Centos7中单节点openshift_3.11安装及配置开机自启
Java架构师必看
2021/06/09
9530
Centos7中单节点openshift_3.11安装及配置开机自启
009.OpenShift管理及监控
用于调度,并控制pod不能在计算资源少于指定数量的情况下运行。调度程序试图找到一个具有足够计算资源的节点来满足pod请求。
木二
2020/06/24
2.7K0
004.OpenShift命令及故障排查
OCP将OpenShift集群中的为由主节点管理的对象统称为资源,如:node、service、pod、project、deployment、user。
木二
2020/06/23
2.6K0
openshift/origin工作记录(11)——Openshift3.6向Openshift3.11升级以及CVE
版权声明:本文为博主原创文章,未经博主允许不得转载。博客地址:http://blog.csdn.net/huqigang,内容如有错误,欢迎留言指出,谢谢! https://blog.csdn.net/huqigang/article/details/84840197
胡了了
2019/05/27
1.2K0
厉害了!全CI/CD工具链的实现 | 基于OCP离线: Openshift3.9学习系列第五篇
干货巨献:Openshift3.9的网络管理大全.加长篇---Openshift3.9学习系列第二篇
魏新宇
2018/07/30
2K0
厉害了!全CI/CD工具链的实现 | 基于OCP离线: Openshift3.9学习系列第五篇
Openshift 3.11的14大新功能详解
聂健是大魏的红帽同事,本文已获得授权转载,欢迎读者阅读他的技术blog:https://www.cnblogs.com/ericnie/
魏新宇
2018/12/13
4.3K0
OpenShift 3.11 离线安装
环境描述 介绍 两个节点,一个master节点,另一个当做compute和infra节点,使用的操作系统为rhel 7.4,没有安装EFK、service broker、service catalog、metric,promethues在3.11正式GA,默认就会安装。因为本人有红帽的订阅账号,所以可以从红帽的源进行yum安装,需要提醒的是,从3.11开始,红帽官方的镜像仓库从registry.access.redhat.com变为registry.redhat.io,且拉取镜像也需要红帽的订阅账号了。 主
DevOps云学堂
2019/10/18
1.8K0
如何有效地对Docker的镜像进行管理?
容器的存储空间如何提供? 前段时间,笔者看到一篇文章,题目是“容器就是Linux”,写的不错。容器说简单点就是容器级别的虚拟化,在一个Kernel Space上虚拟出多个User Space。那么,容器如何使用存储空间呢? 我们知道,Windows和Linux的操作系统,都是使用文件系统的。在RHEL上,可以针对磁盘划分区,然后创建文件系统。当然,也可以使用LVM的方式,将磁盘创建vg,划分lv,然后创建文件系统。 那么,Docker通过什么方式获取存储空间呢,或者说使用什么存储驱动? 在RHEL, Ce
魏新宇
2018/03/22
1.8K0
如何有效地对Docker的镜像进行管理?
OpenShift企业版安装:单Master集群
OpenShift企业版安装:单Master集群 项目描述本文目的本文描述搭建红帽OpenShift容器平台单Master集群的过程。 适合用于在没有互联网连接的环境中搭建测试验证使用的OpenShift集群。安装版本Red Hat OpenShift Container Platform 3.7作者陈耿 GitHub ID: nichochen 本文是一篇安装指引,目的并非用于讲解教学。故一些技术细节将不展开详细介绍,请读者见谅。 1 安装材料 1.1 安装介质 OpenShift的离线环境安装需要提前准
魏新宇
2018/03/22
5.8K0
OpenShift企业版安装:单Master集群
安装openshift 4.X
本文实现的是基于有主机和网络(无PXE、无dhcp、无dns)的情况下,安装OCP4.6.3。
思量
2021/01/20
2.8K1
Openshift 4.4 静态 IP 离线安装系列(一):准备离线资源
本系列文章描述了离线环境下以 UPI (User Provisioned Infrastructure) 模式安装 Openshift Container Platform (OCP) 4.4.5 的步骤,我的环境是 VMware ESXI 虚拟化,也适用于其他方式提供的虚拟机或物理主机。离线资源包括安装镜像、所有样例 Image Stream 和 OperatorHub 中的所有 RedHat Operators。
米开朗基杨
2020/06/04
2.7K1
Openshift高阶探索实验
一、HA方式部署Openshift 一个典型的OCP高可用架构是:master至少应为三个,且为奇数个(上面有etcd); 基础架构节点不少于两个,上面运行日志、监控、router、INTEGRATE
魏新宇
2018/03/22
3K1
Openshift高阶探索实验
Openshift3.7完整安装手册+容器化Harbor
大魏:燕华是我们的合作伙伴,对Openshift很熟悉。按照本文档,可以一步步将Openshift3.7安装起来。本文仅供测试环境参考,生产环境慎用。 1 环境准备 安装rhel7.3系统,全部使用最小化安装。 主机名IP功能说明ocp37master01.demo.com192.168.250.111Master节点,yum源,ntp时钟服务器、harbor镜像仓库(https加密连接)ocp37node01.demo.com192.168.250.121基础架构节点、计算节点 1.1 基础环境准备 1.
魏新宇
2018/04/18
1.8K0
Openshift3.7完整安装手册+容器化Harbor
openshift/origin学习记录(0)——Ansible安装多节点openshift集群
本节内容是基于Ansible Playbook自动部署openshift集群,之后几节内容会通过一个AllInOne的集群手动添加组件,研究实现的流程。 本部分内容是3.6.0,可能不适用3.6
胡了了
2017/12/28
2.4K0
openshift/origin学习记录(0)——Ansible安装多节点openshift集群
007.OpenShift管理应用部署
RC确保pod指定数量的副本一直运行。如果pod被杀死或被管理员显式删除,复制控制器将自动部署相应的pod。类似地,如果运行的pod数量超过所需的数量,它会根据需要删除pod,以匹配指定的副本计数。
木二
2020/06/23
1.9K0
Docker 使用指南 (二):搭建本地仓库
去中央仓库下载镜像有时候非常的慢,所以 docker 本地仓库和 gitlab 类似,都是为了便于公司内部人员的使用。 一.本地安装 本次实验环境:腾讯云服务器 CentOS 6.7 x86_64 #
田飞雨
2016/07/20
8K1
Docker 使用指南 (二):搭建本地仓库
docker私有仓库搭建,证书认证,鉴权管理
-Docker 是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的 Linux 机器上,也可以实现虚拟化。
互扯程序
2019/05/14
3.2K0
docker私有仓库搭建,证书认证,鉴权管理
相关推荐
OpenShift 4 离线安装复盘(精华版)
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验