表引擎介绍:
ClickHouse表引擎决定了如下几个方面:
怎样存储数据 -将数据写到哪里, 怎样读取数据.
支持何种查询以及怎样支持.
并发数据访问.
索引的使用.
是否多线程的请求执行是可以的.
数据如何同步.
当读取数据时, 引擎只需要抽取必要的列簇. 然而,在一些场景下,查询可能在表引擎中是半处理状态.
在大多数场景中, 我们所使用的引擎主要是 MergeTree 家族.
AggregatingMergeTree
Buffer
CollapsingMergeTree
Distributed
External data for query processing
File(InputFormat)
Join
Kafka
Log
MaterializedView
Memory
Merge
Virtual columns
MergeTree
Null
ReplacingMergeTree
Data replication
ReplicatedMergeTree
ReplicatedCollapsingMergeTree
ReplicatedAggregatingMergeTree
ReplicatedSummingMergeTree
Creating replicated tables
Recovery after failures
Recovery after complete data loss
Converting from MergeTree to ReplicatedMergeTree
Converting from ReplicatedMergeTree to MergeTree
Recovery when metadata in the ZooKeeper cluster is lost or damaged
Resharding
Set
SummingMergeTree
TinyLog
View
Kafka
表引擎的后台为Kafka,Kafka 作为流数据平台具备三中关键能力:
它让你发布和订阅记录数据流。它类似消息队列或者企业级消息总线。
它让你以容错的方式来存储记录数据流。
它让你以流式的方式处理记录数据流。
Kafka(broker_list, topic_list, group_name, format[, schema])
引擎参数:
broker_list - 一个 brokers 列表((localhost:9092).
topic_list - Kafka队列列表用于消费(my_topic).
group_name -Kafka 消费者群组名称(group1).
每个消费者群组的偏移量被追踪,如果你想要跨集群一次性消费消息,你应该使用相同的群组名称
format - 格式的名称,用于反序列化消息. 它接受了相同的值作为 FORMATSQL 语句, 例如 JSONEachRow.
schema - 可选的 schema 值,需要一个 schema 来中断所消费的消息, 例如 Cap’n Proto 格式需要一个 schema 文件的路径和根对象(root object) -schema.capnp:Message. 自描述格式, 例如 JSON 不需要任何的 Schema.
例如:
CREATE TABLE queue (timestamp UInt64, level String, message String) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
SELECT * FROM queue LIMIT 5
已消费的消息自动在后台被追踪,这样每个消息都能够在一个消费者组中被读一次。如果你想要二次消费相同的消息,你能够创建一个带有不同组名的表的拷贝。消费者组是弹性的,跨集群同步的,因此,如果你有10个话题/分区(topic/partitions)和5个表的实例(instances),它将为每个实例自动分配2个话题/分区(topic/partitions)。如果你卸载一个表或者添加新的实例,它将自动重新平衡话题/分区(topic/partitions)分配。详情查看http://kafka.apache.org/intro。
然而直接读取消息并不是太有用,表引擎被用于构建实时摄取Pipeline, 使用MATERIALIZED VIEW。如果一个MATERIALIZED VIEW被挂载到一个 Kafka 表引擎,它将在后台进程中开始消费消息,推送到挂载的视图。它允许你从 Kafka 持续摄取消息,同时使用SELECT语句来转换这些数据到合适的格式。
示例:
CREATE TABLE queue (timestamp UInt64, level String, message String) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
CREATE MATERIALIZED VIEW daily ENGINE = SummingMergeTree(day, (day, level), 8192) AS
SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total FROM queue GROUP BY day, level;
SELECT level, sum(total) FROM daily GROUP BY level;
当INSERT插入数据时,消息立即流入到挂载的视图中。为了提升处理性能,已消费的消息以批量的方式进行压缩 - max_insert_block_size。如果此批量消息不能被完成,刷新周期为 stream_flush_interval_ms (默认为7500ms),它将刷新来保证插入时间间隔。
Buffer
Buffers 将数据写入到内存中,周期性刷新数据到另外的表中。在读取操作的过程中,数据从 Buffer 和另外的表中同时读取。
Buffer(database, table, num_layers, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes)
引擎参数:database, table - 表刷新数据。与数据库名称不同,你能够使用常量表达式返回字符串。
num_layers - 并行水平。
物理上看,表能够用独立 buffers 的‘num_layers’代表。默认值为16,min_time, max_time, min_rows, max_rows, min_bytes,同时max_bytes是从 buffer 刷新数据的条件。
数据从buffer 中刷新,写入到目标表,如果所有的‘min’条件或者最少一个‘max’条件组合时。
min_time, max_time - 从第一次写入到 buffer 的时间。
min_rows, max_rows - 在 buffer 中的行数。
min_bytes, max_bytes - 在 buffer 中字节的数量。
在写入操作的过程中,数据写入到一个‘num_layers’数的随机 buffers 中。或者,如果插入的数据部分足够大(大于 ‘max_rows’ 或者 ‘max_bytes’),它被直接写入到目标表,忽略此 buffer。
刷新数据被单独计算为每个‘num_layers’buffers。例如,如果num_layers = 16 and max_bytes = 100000000,最大的内存消耗是1.6 GB。
例如:
CREATE TABLE merge.hits_buffer AS merge.hits ENGINE = Buffer(merge, hits, 16, 10, 100, 10000, 1000000, 10000000, 100000000)
CollapsingMergeTree
此引擎不同于其他的MergeTree,可以允许自动化删除,或者‘collapsing’ 行的特定部分,当执行merge时.
Yandex.Metrica 有大量的点击日志 (例如hit logs) 和变更日志. 变更日志用于增量计算数据的统计值. 示例如回话更新日志, 或者用户历史更新日志.在 Yandex.Metrica回话是不断在变化的. 例如, 每个回话点击数的增加. 我们引用任意对象的变化作为一个变更对儿 (?old values, ?new values). 如果对象是创建的, 旧的值可能会丢失. 如果对象被删除,新的值可能被丢失. 如果对象发生变化, 但是之前存在的没有被删除,也则两个值同时存在. 在更新日志中,两份数据都存在. Each entry 每一份都包含对象的所有属性, 加一个特定的属性来区分老值和新值.当对象更新时, 只有新的对象添加到更新日志中, 已经存在的值不发生变化.
更新日志使增量计算任意的数据统计任务成为可能. 为了达到这个目标, 我们需要考虑给一个 “new” rows 添一个+标识位, 一个 “old” rows 添一个-标识位. 换句话说, 增量计算所有统计是可能的, 同时我们也能计算 “idempotent” 统计, 例如唯一访客的数量, 当回话更新时,唯一访客并没有被删除.
这是让Yandex.Metrica实时工作的关键技术点。
CollapsingMergeTree接受一个额外的参数 - Int8-type类型列的名称包含了行的标识。例如:
CollapsingMergeTree(EventDate,(CounterID,EventDate,intHash32(UniqID),VisitID),8192,Sign)
在这里,“Sign”是一个包含-1表示“旧”值和1表示“新”值的列。
当合并时,每组连续相同的主键值(用于排序数据的列)被减少到不超过一行,列值为'sign_column = -1'(“负行”),并且不超过一行 列值'sign_column = 1'(“正行”)。 换句话说,来自更新日志的条目是折叠的。
如果正数行和负数行匹配,则写入第一个负行和最后一个正数行。 如果还有一个正数行比负数行多,则只写入最后一个正数行。 如果负数的行比正数行多,只有第一个负数行被写入。 否则,将会出现逻辑错误,并且不会写入任何行。 (如果日志的同一部分意外插入了多次,则会发生逻辑错误,错误只会记录在服务器日志中,并继续进行合并。
因此,折叠不应该改变计算统计的结果。 更新逐渐折叠,最后只剩下每个对象的最后一个值。 与MergeTree相比,CollapsingMergeTree引擎可以使数据量成倍减少。
有几种方法可以从CollapsingMergeTree表中获取完全“折叠”的数据:
1.使用GROUP BY和聚合函数编写一个查询来解释符号。 例如,要计算数量,请写'sum(Sign)'而不是'count()'。 要计算一些东西的总和,写下'sum(Sign * x)'而不是'sum(x)',等等,并且加上'HAVING sum(Sign)> 0'。 并非所有金额都可以这样计算。 例如,集合函数“min”和“max”不能被重写。
2.如果您必须提取数据而不进行聚合(例如,要检查是否存在行的最新值符合某些条件),则可以对FROM子句使用FINAL修饰符。 这种方法效率显着较低。
分布式引擎
分布式引擎本身不存储数据,但允许在多个服务器上进行分布式查询处理。 读取是自动并行的。 在读取数据期间,使用远程服务器上的表索引(如果有的话)。 分布式引擎接受参数:服务器配置文件中的集群名称,远程数据库的名称,远程表的名称以及(可选)分片键。 例:
Distributed(logs, default, hits[, sharding_key])
从位于群集中每个服务器上的“default.hits”表中的“logs”群集中的所有服务器读取数据。 数据不仅被读取,而且在远程服务器上被部分处理。 例如,对于使用GROUP BY的查询,将在远程服务器上聚合数据,聚合函数的中间状态将发送到请求者服务器。 那么数据将被进一步聚合。
您可以使用返回字符串的常量表达式来代替数据库名称。 例如,currentDatabase()。
logs - 服务器配置文件中的群集名称。
集群被设置如下:
在这里,一个集群被定义为由两个分片组成的名称“logs”,每个分片包含两个副本。 分片是指包含数据不同部分的服务器(为了读取所有数据,您必须访问所有分片)。 副本正在复制服务器(为了读取所有数据,您可以访问任何一个副本上的数据)。
对于每个服务器,有几个参数:强制:“主机”,“端口”和可选:“用户”,“密码”。
host - 远程服务器的地址。 可以指定为域名或IPv4或IPv6地址。 如果指定域,服务器将在启动时执行DNS查找,并且结果将被缓存直到服务器关闭。 如果DNS请求失败,服务器将无法启动。 如果您要更改DNS记录,请重新启动服务器以使新记录生效。
port - TCP-端口 用于服务器之间的通信 (tcp_port 在配置文件中, 通常为 9000). 不要与 http_port混淆.
user - 用户名连接到远程服务器。 默认情况下,用户是“默认”。 该用户必须具有访问权才能连接到远程服务器。 访问权限在users.xml配置文件中进行管理。 有关其他信息,请考虑“访问权限”部分。
password - 以明文登录到远程服务器的密码。 默认是空字符串。
指定副本时,读取时将为每个分片选择一个可用副本。 您可以配置负载均衡算法(副本访问的首选项) - 请参阅“负载平衡”设置。 如果与服务器的连接没有建立,则会尝试一个短超时的连接。 如果连接失败,下一个副本将被选中,以此类推。 如果所有副本的连接尝试失败,尝试将以相同的方式重复几次。 这有利于弹性伸缩,但不提供完整的容错能力:远程服务器可能会接受连接,但可能无法正常工作或工作不正常。
查询外部数据
ClickHouse允许向服务器发送处理查询所需的数据以及SELECT查询。 这些数据放在一个临时表中(请参见“临时表”一节),并可以在查询中使用(例如,在IN操作符中)。
例如,如果您的文本文件包含重要的用户标识符,则可以将其与使用此列表过滤的查询一起上传到服务器。如果您需要使用大量外部数据运行多个查询,请不要使用此功能。 提前将数据上传到数据库最好。
可以使用命令行客户端(非交互模式)或使用HTTP接口上传外部数据。
在命令行客户端中,可以使用格式指定参数部分。
--external --file=...[--name=...][--format=...][--types=...--structure=...]
对于正在传输的表数量,您可能有多个这样的部分。
- external - 标记节的开始。 -file - 使用表转储的文件路径,或 - 引用stdin的路径只能从stdin中查询单个表。
以下参数是可选的:-name - 表的名称。 如果省略,则使用_data。 -format - 文件中的数据格式。 如果省略,则使用TabSeparated。
文件通过file进行指定,通过format解析成特定的格式, 使用类型或结构中指定的数据类型。表将被上传到服务器上,作为临时表来访问.
echo-ne"1\n2\n3\n"clickhouse-client --query="SELECT count() FROM test.visits WHERE TraficSourceID IN _data"--external --file=- --types=Int8
849897
cat /etc/passwdsed's/:/\t/g'clickhouse-client --query="SELECT shell, count() AS c FROM passwd GROUP BY shell ORDER BY c DESC"--external --file=- --name=passwd --structure='login String, unused String, uid UInt16, gid UInt16, comment String, home String, shell String'
/bin/sh20
/bin/false5
/bin/bash4
/usr/sbin/nologin1
/bin/sync1
当使用HTTP接口时,外部数据以multipart / form-data格式传递。 每个表作为一个单独的文件传输。 表名取自文件名。 'query_string'传递参数'name_format','name_types'和'name_structure',其中name是这些参数对应的表的名称。 参数的含义与使用命令行客户端时相同。
cat /etc/passwdsed's/:/\t/g'> passwd.tsv
curl -F'passwd=@passwd.tsv;''http://localhost:8123/?query=SELECT+shell,+count()+AS+c+FROM+passwd+GROUP+BY+shell+ORDER+BY+c+DESC&passwd_structure=login+String,+unused+String,+uid+UInt16,+gid+UInt16,+comment+String,+home+String,+shell+String'
/bin/sh20
/bin/false5
/bin/bash4
/usr/sbin/nologin1
/bin/sync1
对于分布式查询处理, 临时表被发送到所有的远程服务器.
领取专属 10元无门槛券
私享最新 技术干货