Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >SDP(3):ScalikeJDBC- JDBC-Engine:Fetching

SDP(3):ScalikeJDBC- JDBC-Engine:Fetching

作者头像
用户1150956
发布于 2018-03-16 08:38:56
发布于 2018-03-16 08:38:56
1.7K00
代码可运行
举报
运行总次数:0
代码可运行

  ScalikeJDBC在覆盖JDBC基本功能上是比较完整的,而且实现这些功能的方式比较简洁,运算效率方面自然会稍高一筹了。理论上用ScalikeJDBC作为一种JDBC-Engine还是比较理想的:让它处于各种JDBC工具库和数据库实例之间接收JDBC运算指令然后连接目标数据库进行相关运算后返回结果。一般来说,各种JDBC工具库如ORM,FRM软件通过各自的DSL在复杂的数据库表关系环境内进行数据管理编程,最终产生相关的SQL语句即(prepared)statement+parameters传递给指定类型的数据库JDBC驱动程序去运算并产生结果。如果这样描述,那么JDBC-Engine主要的功能就是支持下面这个函数:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
jdbcRunSQL(context: JDBCContext): JDBCResultSet

这个函数的用户提供一个JDBCContext类型值,然后由jdbcRunSQL进行接下来的运算并返回结果。从这个角度分析,JDBCContext最起码需要提供下面的属性: 

1、数据库连接:选择数据库连接池

2、运算参数:fetchSize, queryTimeout,queryTag。这几个参数都针对当前运算的SQL

3、Query参数:

    Query类型:select/execute/update、单条/成批、前置/后置query、generateKey

    SQL语句:statement:Seq[String]、parameters: Seq[Option[Seq[Any]]]

下面就是JDBCContext类型定义:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import java.sql.PreparedStatement
import scala.collection.generic.CanBuildFrom
import scalikejdbc._

  object JDBCContext {
    type SQLTYPE = Int
    val SQL_SELECT: Int = 0
    val SQL_EXECUTE = 1
    val SQL_UPDATE = 2

    def returnColumnByIndex(idx: Int) = Some(idx)

    def returnColumnByName(col: String) = Some(col)
  }

  case class JDBCContext(
                          dbName: Symbol,
                          statements: Seq[String],
                          parameters: Seq[Seq[Any]] = Nil,
                          fetchSize: Int = 100,
                          queryTimeout: Option[Int] = None,
                          queryTags: Seq[String] = Nil,
                          sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_SELECT,
                          batch: Boolean = false,
                          returnGeneratedKey: Option[Any] = None,
                          // no return: None, return by index: Some(1), by name: Some("id")
                          preAction: Option[PreparedStatement => Unit] = None,
                          postAction: Option[PreparedStatement => Unit] = None)
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
重新考虑了一下,觉着把jdbc读写分开两个函数来实现更容易使用,因为这样比较符合编程模式和习性。所以最好把sqlType=SQL_SELECT类型SQL独立一个函数出来运算:
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
   def jdbcQueryResult[C[_] <: TraversableOnce[_], A](
         ctx: JDBCContext, rowConverter: WrappedResultSet => A)(
          implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = {
      ctx.sqlType match {
        case SQL_SELECT => {
          val params: Seq[Any] = ctx.parameters match {
            case Nil => Nil
            case p@_ => p.head
          }
          val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statements.head, params)(noExtractor(""))
          ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
          ctx.queryTags.foreach(rawSql.tags(_))
          rawSql.fetchSize(ctx.fetchSize)
          implicit val session = NamedAutoSession(ctx.dbName)
          val sql: SQL[A, HasExtractor] = rawSql.map(rowConverter)
          sql.collection.apply[C]()
        }
        case _ => throw new IllegalStateException("sqlType must be 'SQL_SELECT'!")
      }
    }

还需要提供noExtractor函数来符合SQLToCollectionImpl类型的参数款式要求:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) =>
      throw new IllegalStateException(message)
    }
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
我们来测试用一下jdbcQueryResult:
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import scalikejdbc._
import JDBCEngine._
import configdbs._
import org.joda.time._
object JDBCQueryDemo extends App {
  ConfigDBsWithEnv("dev").setupAll()

  val ctx = JDBCContext(
    dbName = 'h2,
    statements = Seq("select * from members where id = ?"),
    parameters = Seq(Seq(2))
  )

  //data model
  case class Member(
                     id: Long,
                     name: String,
                     description: Option[String] = None,
                     birthday: Option[LocalDate] = None,
                     createdAt: DateTime)

  //data row converter
  val toMember = (rs: WrappedResultSet) => Member(
    id = rs.long("id"),
    name = rs.string("name"),
    description = rs.stringOpt("description"),
    birthday = rs.jodaLocalDateOpt("birthday"),
    createdAt = rs.jodaDateTime("created_at")
  )

  val vecMember: Vector[Member] = jdbcQueryResult[Vector,Member](ctx,toMember)

  println(s"members in vector: $vecMember")

  val ctx1 = ctx.copy(dbName = 'mysql)

  val names: List[String] = jdbcQueryResult[List,String](ctx1,{rs: WrappedResultSet => rs.string("name")})

  println(s"selected name: $names")

  val ctx2 = ctx1.copy(dbName = 'postgres)
  val idname: List[(Long,String)] = jdbcQueryResult[List,(Long,String)](ctx2,{rs: WrappedResultSet => (rs.long("id"),rs.string("name"))})

  println(s"selected id+name: $idname")
}

如果我们使用Slick-DSL进行数据库管理编程后应该如何与JDBC-Engine对接:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 object SlickDAO {
    import slick.jdbc.H2Profile.api._

    case class CountyModel(id: Int, name: String)
    case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") {
      def id = column[Int]("ID",O.AutoInc,O.PrimaryKey)
      def name = column[String]("NAME",O.Length(64))
      def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply)
    }
    val CountyQuery = TableQuery[CountyTable]
    val filter = "Kansas"
    val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"}
    val statement = qry.result.statements.head
  }
  import SlickDAO._


  val slickCtx = JDBCContext(
    dbName = 'h2,
    statements = Seq(statement),
  )

  val vecCounty: Vector[CountyModel] = jdbcQueryResult[Vector,CountyModel](slickCtx,{
    rs: WrappedResultSet => CountyModel(id=rs.int("id"),name=rs.string("name"))})
  vecCounty.foreach(r => println(s"${r.id},${r.name}"))

输出正确。

下面就是本次示范的源代码:

 build.sbt

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
name := "learn-scalikeJDBC"

version := "0.1"

scalaVersion := "2.12.4"

// Scala 2.10, 2.11, 2.12
libraryDependencies ++= Seq(
  "org.scalikejdbc" %% "scalikejdbc"       % "3.1.0",
  "org.scalikejdbc" %% "scalikejdbc-test"   % "3.1.0"   % "test",
  "org.scalikejdbc" %% "scalikejdbc-config"  % "3.1.0",
  "com.h2database"  %  "h2"                % "1.4.196",
  "mysql" % "mysql-connector-java" % "6.0.6",
  "org.postgresql" % "postgresql" % "42.2.0",
  "commons-dbcp" % "commons-dbcp" % "1.4",
  "org.apache.tomcat" % "tomcat-jdbc" % "9.0.2",
  "com.zaxxer" % "HikariCP" % "2.7.4",
  "com.jolbox" % "bonecp" % "0.8.0.RELEASE",
  "com.typesafe.slick" %% "slick" % "3.2.1",
  "ch.qos.logback"  %  "logback-classic"   % "1.2.3"
)

resources/application.conf 包括H2,MySQL,PostgreSQL

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# JDBC settings
test {
  db {
    h2 {
      driver = "org.h2.Driver"
      url = "jdbc:h2:tcp://localhost/~/slickdemo"
      user = ""
      password = ""
      poolInitialSize = 5
      poolMaxSize = 7
      poolConnectionTimeoutMillis = 1000
      poolValidationQuery = "select 1 as one"
      poolFactoryName = "commons-dbcp2"
    }
  }

  db.mysql.driver = "com.mysql.cj.jdbc.Driver"
  db.mysql.url = "jdbc:mysql://localhost:3306/testdb"
  db.mysql.user = "root"
  db.mysql.password = "123"
  db.mysql.poolInitialSize = 5
  db.mysql.poolMaxSize = 7
  db.mysql.poolConnectionTimeoutMillis = 1000
  db.mysql.poolValidationQuery = "select 1 as one"
  db.mysql.poolFactoryName = "bonecp"

  # scallikejdbc Global settings
  scalikejdbc.global.loggingSQLAndTime.enabled = true
  scalikejdbc.global.loggingSQLAndTime.logLevel = info
  scalikejdbc.global.loggingSQLAndTime.warningEnabled = true
  scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000
  scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn
  scalikejdbc.global.loggingSQLAndTime.singleLineMode = false
  scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false
  scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10
}
dev {
  db {
    h2 {
      driver = "org.h2.Driver"
      url = "jdbc:h2:tcp://localhost/~/slickdemo"
      user = ""
      password = ""
      poolFactoryName = "hikaricp"
      numThreads = 10
      maxConnections = 12
      minConnections = 4
      keepAliveConnection = true
    }
    mysql {
      driver = "com.mysql.cj.jdbc.Driver"
      url = "jdbc:mysql://localhost:3306/testdb"
      user = "root"
      password = "123"
      poolInitialSize = 5
      poolMaxSize = 7
      poolConnectionTimeoutMillis = 1000
      poolValidationQuery = "select 1 as one"
      poolFactoryName = "bonecp"

    }
    postgres {
      driver = "org.postgresql.Driver"
      url = "jdbc:postgresql://localhost:5432/testdb"
      user = "root"
      password = "123"
      poolFactoryName = "hikaricp"
      numThreads = 10
      maxConnections = 12
      minConnections = 4
      keepAliveConnection = true
    }
  }
  # scallikejdbc Global settings
  scalikejdbc.global.loggingSQLAndTime.enabled = true
  scalikejdbc.global.loggingSQLAndTime.logLevel = info
  scalikejdbc.global.loggingSQLAndTime.warningEnabled = true
  scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000
  scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn
  scalikejdbc.global.loggingSQLAndTime.singleLineMode = false
  scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false
  scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10

HikariConfig.scala  HikariCP连接池实现

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package configdbs
import scala.collection.mutable
import scala.concurrent.duration.Duration
import scala.language.implicitConversions
import com.typesafe.config._
import java.util.concurrent.TimeUnit
import java.util.Properties
import scalikejdbc.config._
import com.typesafe.config.Config
import com.zaxxer.hikari._
import scalikejdbc.ConnectionPoolFactoryRepository

/** Extension methods to make Typesafe Config easier to use */
class ConfigExtensionMethods(val c: Config) extends AnyVal {
  import scala.collection.JavaConverters._

  def getBooleanOr(path: String, default: => Boolean = false) = if(c.hasPath(path)) c.getBoolean(path) else default
  def getIntOr(path: String, default: => Int = 0) = if(c.hasPath(path)) c.getInt(path) else default
  def getStringOr(path: String, default: => String = null) = if(c.hasPath(path)) c.getString(path) else default
  def getConfigOr(path: String, default: => Config = ConfigFactory.empty()) = if(c.hasPath(path)) c.getConfig(path) else default

  def getMillisecondsOr(path: String, default: => Long = 0L) = if(c.hasPath(path)) c.getDuration(path, TimeUnit.MILLISECONDS) else default
  def getDurationOr(path: String, default: => Duration = Duration.Zero) =
    if(c.hasPath(path)) Duration(c.getDuration(path, TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) else default

  def getPropertiesOr(path: String, default: => Properties = null): Properties =
    if(c.hasPath(path)) new ConfigExtensionMethods(c.getConfig(path)).toProperties else default

  def toProperties: Properties = {
    def toProps(m: mutable.Map[String, ConfigValue]): Properties = {
      val props = new Properties(null)
      m.foreach { case (k, cv) =>
        val v =
          if(cv.valueType() == ConfigValueType.OBJECT) toProps(cv.asInstanceOf[ConfigObject].asScala)
          else if(cv.unwrapped eq null) null
          else cv.unwrapped.toString
        if(v ne null) props.put(k, v)
      }
      props
    }
    toProps(c.root.asScala)
  }

  def getBooleanOpt(path: String): Option[Boolean] = if(c.hasPath(path)) Some(c.getBoolean(path)) else None
  def getIntOpt(path: String): Option[Int] = if(c.hasPath(path)) Some(c.getInt(path)) else None
  def getStringOpt(path: String) = Option(getStringOr(path))
  def getPropertiesOpt(path: String) = Option(getPropertiesOr(path))
}

object ConfigExtensionMethods {
  @inline implicit def configExtensionMethods(c: Config): ConfigExtensionMethods = new ConfigExtensionMethods(c)
}

trait HikariConfigReader extends TypesafeConfigReader {
  self: TypesafeConfig =>      // with TypesafeConfigReader => //NoEnvPrefix =>

  import ConfigExtensionMethods.configExtensionMethods

  def getFactoryName(dbName: Symbol): String = {
    val c: Config = config.getConfig(envPrefix + "db." + dbName.name)
    c.getStringOr("poolFactoryName", ConnectionPoolFactoryRepository.COMMONS_DBCP)
  }

  def hikariCPConfig(dbName: Symbol): HikariConfig = {

    val hconf = new HikariConfig()
    val c: Config = config.getConfig(envPrefix + "db." + dbName.name)

    // Connection settings
    if (c.hasPath("dataSourceClass")) {
      hconf.setDataSourceClassName(c.getString("dataSourceClass"))
    } else {
      Option(c.getStringOr("driverClassName", c.getStringOr("driver"))).map(hconf.setDriverClassName _)
    }
    hconf.setJdbcUrl(c.getStringOr("url", null))
    c.getStringOpt("user").foreach(hconf.setUsername)
    c.getStringOpt("password").foreach(hconf.setPassword)
    c.getPropertiesOpt("properties").foreach(hconf.setDataSourceProperties)

    // Pool configuration
    hconf.setConnectionTimeout(c.getMillisecondsOr("connectionTimeout", 1000))
    hconf.setValidationTimeout(c.getMillisecondsOr("validationTimeout", 1000))
    hconf.setIdleTimeout(c.getMillisecondsOr("idleTimeout", 600000))
    hconf.setMaxLifetime(c.getMillisecondsOr("maxLifetime", 1800000))
    hconf.setLeakDetectionThreshold(c.getMillisecondsOr("leakDetectionThreshold", 0))
    hconf.setInitializationFailFast(c.getBooleanOr("initializationFailFast", false))
    c.getStringOpt("connectionTestQuery").foreach(hconf.setConnectionTestQuery)
    c.getStringOpt("connectionInitSql").foreach(hconf.setConnectionInitSql)
    val numThreads = c.getIntOr("numThreads", 20)
    hconf.setMaximumPoolSize(c.getIntOr("maxConnections", numThreads * 5))
    hconf.setMinimumIdle(c.getIntOr("minConnections", numThreads))
    hconf.setPoolName(c.getStringOr("poolName", dbName.name))
    hconf.setRegisterMbeans(c.getBooleanOr("registerMbeans", false))

    // Equivalent of ConnectionPreparer
    hconf.setReadOnly(c.getBooleanOr("readOnly", false))
    c.getStringOpt("isolation").map("TRANSACTION_" + _).foreach(hconf.setTransactionIsolation)
    hconf.setCatalog(c.getStringOr("catalog", null))

    hconf

  }
}

import scalikejdbc._
trait ConfigDBs {
  self: TypesafeConfigReader with TypesafeConfig with HikariConfigReader =>

  def setup(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = {
    getFactoryName(dbName) match {
      case "hikaricp" => {
        val hconf = hikariCPConfig(dbName)
        val hikariCPSource = new HikariDataSource(hconf)
        if (hconf.getDriverClassName != null && hconf.getDriverClassName.trim.nonEmpty) {
          Class.forName(hconf.getDriverClassName)
        }
        ConnectionPool.add(dbName, new DataSourceConnectionPool(hikariCPSource))
      }
      case _ => {
        val JDBCSettings(url, user, password, driver) = readJDBCSettings(dbName)
        val cpSettings = readConnectionPoolSettings(dbName)
        if (driver != null && driver.trim.nonEmpty) {
          Class.forName(driver)
        }
        ConnectionPool.add(dbName, url, user, password, cpSettings)
      }
    }
  }

  def setupAll(): Unit = {
    loadGlobalSettings()
    dbNames.foreach { dbName => setup(Symbol(dbName)) }
  }

  def close(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = {
    ConnectionPool.close(dbName)
  }

  def closeAll(): Unit = {
    ConnectionPool.closeAll
  }

}


object ConfigDBs extends ConfigDBs
  with TypesafeConfigReader
  with StandardTypesafeConfig
  with HikariConfigReader

case class ConfigDBsWithEnv(envValue: String) extends ConfigDBs
  with TypesafeConfigReader
  with StandardTypesafeConfig
  with HikariConfigReader
  with EnvPrefix {

  override val env = Option(envValue)
}

JDBCEngine.scala jdbcQueryResult函数实现

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import java.sql.PreparedStatement
import scala.collection.generic.CanBuildFrom
import scalikejdbc._

  object JDBCContext {
    type SQLTYPE = Int
    val SQL_SELECT: Int = 0
    val SQL_EXECUTE = 1
    val SQL_UPDATE = 2

    def returnColumnByIndex(idx: Int) = Some(idx)

    def returnColumnByName(col: String) = Some(col)
  }

  case class JDBCContext(
                          dbName: Symbol,
                          statements: Seq[String],
                          parameters: Seq[Seq[Any]] = Nil,
                          fetchSize: Int = 100,
                          queryTimeout: Option[Int] = None,
                          queryTags: Seq[String] = Nil,
                          sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_SELECT,
                          batch: Boolean = false,
                          returnGeneratedKey: Option[Any] = None,
                          // no return: None, return by index: Some(1), by name: Some("id")
                          preAction: Option[PreparedStatement => Unit] = None,
                          postAction: Option[PreparedStatement => Unit] = None)

  object JDBCEngine {

    import JDBCContext._

    private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) =>
      throw new IllegalStateException(message)
    }

    def jdbcQueryResult[C[_] <: TraversableOnce[_], A](
         ctx: JDBCContext, rowConverter: WrappedResultSet => A)(
          implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = {
      ctx.sqlType match {
        case SQL_SELECT => {
          val params: Seq[Any] = ctx.parameters match {
            case Nil => Nil
            case p@_ => p.head
          }
          val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statements.head, params)(noExtractor("boom!"))
          ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
          ctx.queryTags.foreach(rawSql.tags(_))
          rawSql.fetchSize(ctx.fetchSize)
          implicit val session = NamedAutoSession(ctx.dbName)
          val sql: SQL[A, HasExtractor] = rawSql.map(rowConverter)
          sql.collection.apply[C]()
        }
        case _ => throw new IllegalStateException("sqlType must be 'SQL_SELECT'!")
      }
    }

  }

JDBCQueryDemo.scala  功能测试代码

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import scalikejdbc._
import JDBCEngine._
import configdbs._
import org.joda.time._
object JDBCQueryDemo extends App {
  ConfigDBsWithEnv("dev").setupAll()

  val ctx = JDBCContext(
    dbName = 'h2,
    statements = Seq("select * from members where id = ?"),
    parameters = Seq(Seq(2))
  )

  //data model
  case class Member(
                     id: Long,
                     name: String,
                     description: Option[String] = None,
                     birthday: Option[LocalDate] = None,
                     createdAt: DateTime)

  //data row converter
  val toMember = (rs: WrappedResultSet) => Member(
    id = rs.long("id"),
    name = rs.string("name"),
    description = rs.stringOpt("description"),
    birthday = rs.jodaLocalDateOpt("birthday"),
    createdAt = rs.jodaDateTime("created_at")
  )

  val vecMember: Vector[Member] = jdbcQueryResult[Vector,Member](ctx,toMember)

  println(s"members in vector: $vecMember")

  val ctx1 = ctx.copy(dbName = 'mysql)

  val names: List[String] = jdbcQueryResult[List,String](ctx1,{rs: WrappedResultSet => rs.string("name")})

  println(s"selected name: $names")

  val ctx2 = ctx1.copy(dbName = 'postgres)
  val idname: List[(Long,String)] = jdbcQueryResult[List,(Long,String)](ctx2,{rs: WrappedResultSet => (rs.long("id"),rs.string("name"))})

  println(s"selected id+name: $idname")


  object SlickDAO {
    import slick.jdbc.H2Profile.api._

    case class CountyModel(id: Int, name: String)
    case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") {
      def id = column[Int]("ID",O.AutoInc,O.PrimaryKey)
      def name = column[String]("NAME",O.Length(64))
      def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply)
    }
    val CountyQuery = TableQuery[CountyTable]
    val filter = "Kansas"
    val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"}
    val statement = qry.result.statements.head
  }
  import SlickDAO._


  val slickCtx = JDBCContext(
    dbName = 'h2,
    statements = Seq(statement),
  )

  val vecCounty: Vector[CountyModel] = jdbcQueryResult[Vector,CountyModel](slickCtx,{
    rs: WrappedResultSet => CountyModel(id=rs.int("id"),name=rs.string("name"))})
  vecCounty.foreach(r => println(s"${r.id},${r.name}"))


}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018-01-29 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
SDP(5):ScalikeJDBC- JDBC-Engine:Streaming
  作为一种通用的数据库编程引擎,用Streaming来应对海量数据的处理是必备功能。同样,我们还是通过一种Context传递产生流的要求。因为StreamingContext比较简单,而且还涉及到数据抽取函数extractor的传递,所以我们分开来定义: case class JDBCQueryContext[M]( dbName: Symbol, statement: String,
用户1150956
2018/03/16
1K0
PICE(1):Programming In Clustered Environment - 集群环境内编程模式
首先声明:标题上的所谓编程模式是我个人考虑在集群环境下跨节点(jvm)的流程控制编程模式,纯粹按实际需要构想,没什么理论支持。在5月份的深圳scala meetup上我分享了有关集群环境下的编程模式思路。我提供了下面这个示意图:
用户1150956
2018/08/01
1.4K0
PICE(1):Programming In Clustered Environment - 集群环境内编程模式
restapi(3)- MongoDBEngine : MongoDB Scala编程工具库
最近刚好有同事在学习MongoDB,我们讨论过MongoDB应该置于服务器端然后通过web-service为客户端提供数据的上传下载服务。我们可以用上节讨论的respapi框架来实现针对MongoDB的CRUD操作。在谈到restapi之前我在这篇讨论先介绍一下MongoDB数据库操作的scala编程,因为与传统的SQL数据库操作编程有比较大的差别。
用户1150956
2019/08/06
1.4K0
SDP(4):ScalikeJDBC- JDBC-Engine:Updating
    在上一篇博文里我们把JDBC-Engine的读取操作部分分离出来进行了讨论,在这篇准备把更新Update部分功能介绍一下。当然,JDBC-Engine的功能是基于ScalikeJDBC的,所有的操作和属性都包嵌在SQL这个类型中: /** * SQL abstraction. * * @param statement SQL template * @param rawParameters parameters * @param f extractor function * @tpara
用户1150956
2018/03/16
1.5K0
PICE(2):JDBCStreaming - gRPC-JDBC Service
   在一个akka-cluster环境里,从数据调用的角度上,JDBC数据库与集群中其它节点是脱离的。这是因为JDBC数据库不是分布式的,不具备节点位置透明化特性。所以,JDBC数据库服务器必须通过服务方式来向外提供数据操。在这种场景里服务端是JDBC服务,其它节点,包括其它的JDBC数据库节点都是这个JDBC服务的调用客户端。因为我们已经明确选择了在akka-cluster集群环境里实施gRPC服务模式,通过akka-stream的流控制方式实现数据库操作的程序控制,所以在本次讨论里我们将示范说明gRPC-JDBC-Streaming的具体实现和使用方式。
用户1150956
2018/07/31
1.5K0
SDP(12): MongoDB-Engine - Streaming
   在akka-alpakka工具包里也提供了对MongoDB的stream-connector,能针对MongoDB数据库进行streaming操作。这个MongoDB-connector里包含了MongoSource,MongoFlow,MongoSink。我们只使用MongoSource,其它两个我们直接用mapAsyc来创造。下面是MongoSource的定义: object MongoSource { def apply(query: Observable[Document]): Sour
用户1150956
2018/04/02
1.4K0
SDP(7):Cassandra- Cassandra-Engine:Streaming
  akka在alpakka工具包里提供了对cassandra数据库的streaming功能。简单来讲就是用一个CQL-statement读取cassandra数据并产生akka-stream的Source。这是一个支持reactive-stream协议的流: object CassandraSource { /** * Scala API: creates a [[CassandraSourceStage]] from a given statement. */ def apply(
用户1150956
2018/03/16
3.4K0
PICE(3):CassandraStreaming - gRPC-CQL Service
  在上一篇博文里我们介绍了通过gRPC实现JDBC数据库的streaming,这篇我们介绍关于cassandra的streaming实现方式。如果我们需要从一个未部署cassandra的节点或终端上读取cassandra数据,可以用gRPC来搭建一个数据桥梁来连接这两端。这时cassandra这端就是gRPC-Server端,由它提供cassandra的数据服务。
用户1150956
2018/07/31
1.2K0
SDP(1):ScalikeJDBC-基本操作介绍
  简单来说:JDBC是一种开放标准的跨编程语言、跨数据库类型编程API。各类型数据库产品厂商都会按它的标准要求来提供针对自身产品的JDBC驱动程序。最主要的这是一套成熟的工具,在编程人员中使用很普及。既然我们的目标是开发一套标准的大数据系统集成API,那么采用JDBC系统数据接口可以沿用业内丰富的技术资源、覆盖更多类型用户的编程需要,以及降低使用门槛。对于scala编程语言来讲,ScalikeJDBC是一套最合适满足我们下面开发要求的工具库,因为它可以用最简单易用的方式来实现JDBC的基本功能。
用户1150956
2022/05/10
1.4K0
SDP(2):ScalikeJDBC-Connection Pool Configuration
  scalikeJDBC可以通过配置文件来设置连接池及全局系统参数。对配置文件的解析是通过TypesafeConfig工具库实现的。默认加载classpath下的application.conf,application.json和application.properties文件。作为尝试,我们可以在resource/application.conf文件里进行h2和mysql数据库的JDBC驱动参数定义: # JDBC settings db { h2 { driver="org.h2.Driv
用户1150956
2018/03/16
1.3K0
SDP(6):分布式数据库运算环境- Cassandra-Engine
    现代信息系统应该是避不开大数据处理的。作为一个通用的系统集成工具也必须具备大数据存储和读取能力。cassandra是一种分布式的数据库,具备了分布式数据库高可用性(high-availability)特性,对于一个实时大型分布式集成系统来说是核心支柱。与传统的关系数据库对比,cassandra从数据存储结构、读取方式等可以说是皆然不同的。如:cassandra库表设计是反范式的(denormalized)、表结构设计是反过来根据query要求设计的,等等。幸运的是自版本3.0后cassandra提供
用户1150956
2018/03/16
1.7K0
SDP(10):文本式大数据运算环境-MongoDB-Engine功能设计
    为了让前面规划的互联网+数据平台能有效对电子商务数据进行管理及实现大数据统计功能,必须在平台上再增加一个MongDB-Engine:数据平台用户通过传入一种Context来指示MongoDB-Engine运算。与前面JDBC-Engine和Cassandra-Engine通过文本式传递指令不同的是:MangoDB没有一套像SQL或CQL这样的文本式编程语言。但MangoDB基本上都是通过Bson类型的参数进行运算的,Bson是个java interface: /** * An interface
用户1150956
2018/03/16
1K0
Tomcat配置JNDI数据源
本文将介绍Tomcat配置JNDI数据源的方法,主要分为配置局部数据源(仅供单个应用使用)和全局数据源(所有用该Tomcat的应用都可以使用) 一、思考 从我们学习Web开发以来,我们都是通过程序代码来实现数据库的访问的。从一开始直接将数据库配置信息写在代码中,到后来将配置信息抽取出来写在了Properties文件中,我们访问数据库的代码更加精简。 而通过JNDI方式访问数据库则更为直接,JNDI连接数据库的方式直接将数据库信息放在Tomcat中,而项目代码里直接通过JNDI技术就可以得到数据源。他们之间通
陈树义
2018/04/13
2.1K0
restapi(8)- restapi-sql:用户自主的服务
学习函数式编程初衷是看到自己熟悉的oop编程语言和sql数据库在现代商业社会中前景暗淡,准备完全放弃windows技术栈转到分布式大数据技术领域的。但是在现实中理想总是不如人意,本来想在一个规模较小的公司展展拳脚,以为小公司会少点历史包袱,有利于全面技术改造。但现实是:即使是小公司,一旦有个成熟的产品,那么进行全面的技术更新基本上是不可能的了,因为公司要生存,开发人员很难新旧技术之间随时切换。除非有狂热的热情,员工怠慢甚至抵制情绪不容易解决。只能采取逐步切换方式:保留原有产品的后期维护不动,新产品开发用一些新的技术。在我们这里的情况就是:以前一堆c#、sqlserver的东西必须保留,新的功能比如大数据、ai、识别等必须用新的手段如scala、python、dart、akka、kafka、cassandra、mongodb来开发。好了,新旧两个开发平台之间的软件系统对接又变成了一个问题。
用户1150956
2019/10/30
1.5K0
SDP(8):文本式数据库-MongoDB-Scala基本操作
  MongoDB是一种文本式数据库。与传统的关系式数据库最大不同是MongoDB没有标准的格式要求,即没有schema,合适高效处理当今由互联网+商业产生的多元多态数据。MongoDB也是一种分布式数据库,充分具备大数据处理能力和高可用性。MongoDB提供了scala终端驱动mongo-scala-driver,我们就介绍一下MongoDB数据库和通过scala来进行数据操作编程。    与关系数据库相似,MongoDB结构为Database->Collection->Document。Collec
用户1150956
2018/03/16
1.9K0
SDP(11):MongoDB-Engine功能实现
  根据上篇关于MongoDB-Engine的功能设计方案,我们将在这篇讨论里进行功能实现和测试。下面是具体的功能实现代码:基本上是直接调用Mongo-scala的对应函数,需要注意的是java类型和scala类型之间的相互转换: object MGOEngine { import MGOContext._ import MGOCommands._ import MGOAdmins._ def mgoExecute[T](ctx: MGOContext)(implicit client:
用户1150956
2018/03/16
1.1K0
Akka-Cluster(3)- ClusterClient, 集群客户端
  上篇我们介绍了distributed pub/sub消息传递机制。这是在同一个集群内的消息共享机制:发布者(publisher)和订阅者(subscriber)都在同一个集群的节点上,所有节点上的DistributedPubSubMediator通过集群内部的沟通机制在底层构建了消息流通渠道。在actor pub/sub层面可以实现对象位置透明化。在现实里很多前端都会作为某个集群的客户端但又与集群分离,又或者两个独立的集群之间可能会发生交互关系,这是也会出现客户端与服务端不在同一集群内的情况,ClusterClient就是为集群外部actor与集群内部actor进行沟通的解决方案。
用户1150956
2018/12/25
1.9K0
细谈Slick(6)- Projection:ProvenShape,强类型的Query结果类型
  在Slick官方文档中描述:连接后台数据库后,需要通过定义Projection,即def * 来进行具体库表列column的选择和排序。通过Projection我们可以选择库表中部分列、也可以增加
用户1150956
2018/01/05
1.6K0
restapi(6)- do it the functional way, 重温函数式编程
再次看了看上篇博客的源代码,发现连自己都看不懂了。想是为了赶时间交货不知不觉又回到OOP行令模式了,看看下面这段代码:
用户1150956
2019/09/29
9600
Akka-Cluster(2)- distributed pub/sub mechanism 分布式发布/订阅机制
   上期我们介绍了cluster singleton,它的作用是保证在一个集群环境里永远会有唯一一个singleton实例存在。具体使用方式是在集群所有节点部署ClusterSingletonManager,由集群中的leader节点选定其中一个节点并指示上面的ClusterSingletonManager运行一个cluster singleton实例。与singleton实例交互则通过即时构建ClusterSingletonProxy实例当作沟通目标。从应用场景来说cluster singleton应该是某种pull模式的应用:我们把singleton当作中央操作协调,比如说管理一个任务清单,多个ClusterSingletonProxy从任务清单中获取(pull)自己应该执行的任务。如果需要实现push模式的任务派送:即由singleton主动通知集群里某种类型的actor执行任务,那么通过ClusterSingletonProxy沟通就不适用了,使用pub/sub方式是一个可行的解决方案。
用户1150956
2018/12/10
1.3K0
推荐阅读
相关推荐
SDP(5):ScalikeJDBC- JDBC-Engine:Streaming
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验