
数字水印是一种将标识信息(水印)嵌入到数字载体中,且不影响原始载体使用价值,也不容易被人的感知系统察觉的技术。与隐写术类似,数字水印也涉及信息隐藏,但两者的目的和应用场景有所不同。
数字水印的主要特点:
虽然数字水印和隐写术都涉及信息隐藏技术,但它们在目标、特性和应用场景上有明显区别:
特性 | 数字水印 | 隐写术 |
|---|---|---|
主要目标 | 版权保护、内容认证 | 秘密通信 |
嵌入信息 | 通常是所有者标识信息 | 任意秘密信息 |
信息量 | 相对较小 | 尽可能大 |
鲁棒性要求 | 通常较高 | 不一定很高 |
检测方式 | 通常需要密钥和原始载体 | 通常不需要原始载体 |
数字水印可以按照不同的标准进行分类:
数字水印技术在多个领域有广泛应用:
了解人类视觉系统(HVS)的特性对于设计不可感知的水印至关重要。人类视觉系统有以下重要特性:
这些特性可以用于设计水印嵌入策略,在人眼不敏感的区域嵌入更多水印信息。
数字水印系统涉及多种信号处理技术,主要包括:
这些变换方法为水印嵌入提供了不同的域选择和特性,下面将详细介绍水印嵌入的基本原理。
水印嵌入的基本原理是根据水印信息和密钥,对载体数据进行适当的修改,使得这些修改既包含了水印信息,又不会对载体的感知质量造成明显影响。
水印嵌入的一般步骤:
水印检测的目的是确定载体中是否存在特定的水印,而水印提取则是从载体中恢复出水印信息。
水印检测的一般步骤:
水印提取的步骤与检测类似,但还需要恢复出具体的水印信息。
评估数字水印系统性能的主要指标包括:
空域数字水印直接修改载体数据的像素值或采样值,是最基本的水印嵌入方法。本节将介绍几种常见的空域水印技术及其实现。
最小有效位(LSB)水印是最简单的空域水印技术,它通过修改像素值的最低有效位来嵌入水印信息。
LSB水印的基本原理是将水印信息嵌入到载体数据的最低有效位。由于最低有效位对视觉感知的影响很小,因此LSB水印具有良好的不可感知性。
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
def embed_lsb_watermark(image_path, watermark_path, output_path, key=0):
"""
使用LSB方法嵌入水印
参数:
image_path: 载体图像路径
watermark_path: 水印图像路径
output_path: 输出图像路径
key: 用于随机置换的密钥
返回:
psnr: 嵌入水印后的峰值信噪比
"""
# 加载载体图像和水印图像
carrier = Image.open(image_path).convert('RGB')
watermark = Image.open(watermark_path).convert('L')
# 调整水印图像大小以匹配载体图像
watermark = watermark.resize(carrier.size, Image.BICUBIC)
# 转换为numpy数组
carrier_array = np.array(carrier)
watermark_array = np.array(watermark)
# 创建随机置换用于分散水印(增加安全性)
np.random.seed(key)
h, w, _ = carrier_array.shape
indices = np.arange(h * w)
np.random.shuffle(indices)
# 将水印数据展平并二值化
watermark_flat = watermark_array.flatten() > 128 # 二值化
# 嵌入水印
carrier_flat = carrier_array.flatten()
for i in range(len(watermark_flat)):
idx = indices[i]
# 清除最低有效位并设置为水印位
carrier_flat[idx] = (carrier_flat[idx] & 0xFE) | watermark_flat[i]
# 重塑为原始形状
watermarked_array = carrier_flat.reshape(h, w, 3)
# 计算PSNR
mse = np.mean((carrier_array - watermarked_array) ** 2)
if mse == 0:
psnr = float('inf')
else:
psnr = 20 * np.log10(255.0 / np.sqrt(mse))
# 保存嵌入水印后的图像
watermarked_image = Image.fromarray(watermarked_array.astype(np.uint8))
watermarked_image.save(output_path)
print(f"LSB水印嵌入完成,PSNR = {psnr:.2f} dB")
return psnr
def extract_lsb_watermark(watermarked_path, output_path, key=0, original_shape=None):
"""
提取LSB水印
参数:
watermarked_path: 含水印图像路径
output_path: 输出水印图像路径
key: 用于随机置换的密钥
original_shape: 原始水印形状,如果不提供则与载体图像相同
返回:
提取的水印图像数组
"""
# 加载含水印图像
watermarked = Image.open(watermarked_path).convert('RGB')
watermarked_array = np.array(watermarked)
# 创建与嵌入时相同的随机置换
np.random.seed(key)
h, w, _ = watermarked_array.shape
indices = np.arange(h * w)
np.random.shuffle(indices)
# 提取最低有效位作为水印
watermarked_flat = watermarked_array.flatten()
watermark_flat = (watermarked_flat[indices[:h*w]] & 0x01) * 255 # 提取LSB并映射到0或255
# 重塑为图像
watermark_array = watermark_flat.reshape(h, w)
# 如果提供了原始形状,则调整水印大小
if original_shape:
watermark_image = Image.fromarray(watermark_array.astype(np.uint8))
watermark_image = watermark_image.resize(original_shape, Image.BICUBIC)
watermark_array = np.array(watermark_image)
# 保存提取的水印
watermark_image = Image.fromarray(watermark_array.astype(np.uint8))
watermark_image.save(output_path)
print("LSB水印提取完成")
return watermark_array
# 测试LSB水印
# embed_lsb_watermark('carrier.jpg', 'watermark.png', 'watermarked_lsb.jpg', key=123)
# extract_lsb_watermark('watermarked_lsb.jpg', 'extracted_watermark.png', key=123)优点:
缺点:
扩频水印技术借鉴了通信领域的扩频通信原理,将水印信息分散到整个载体数据中,从而提高水印的鲁棒性。
扩频水印的基本原理是使用伪随机序列(扩频码)将水印信息扩展到更宽的频带或更大的空间,然后将扩展后的水印嵌入到载体数据中。这种方法可以提高水印对噪声和压缩的抵抗能力。
import numpy as np
from PIL import Image
def embed_spread_spectrum_watermark(image_path, watermark, output_path, key=0, alpha=0.05):
"""
使用扩频方法嵌入水印
参数:
image_path: 载体图像路径
watermark: 水印数据(二进制数组)
output_path: 输出图像路径
key: 用于生成伪随机序列的密钥
alpha: 水印强度因子
返回:
psnr: 嵌入水印后的峰值信噪比
"""
# 加载载体图像
carrier = Image.open(image_path).convert('L') # 转换为灰度图像
carrier_array = np.array(carrier, dtype=np.float32)
# 获取图像尺寸
h, w = carrier_array.shape
# 设置随机种子
np.random.seed(key)
# 生成伪随机序列(扩频码)
spread_spectrum = np.random.normal(0, 1, size=(h, w))
# 将水印数据扩展到与图像相同大小
# 使用重复水印的方法
watermark_size = len(watermark)
expanded_watermark = np.zeros((h, w), dtype=np.float32)
for i in range(h):
for j in range(w):
idx = (i * w + j) % watermark_size
expanded_watermark[i, j] = 2 * watermark[idx] - 1 # 将0/1转换为-1/1
# 嵌入水印
watermarked_array = carrier_array + alpha * expanded_watermark * spread_spectrum
# 确保像素值在有效范围内
watermarked_array = np.clip(watermarked_array, 0, 255)
# 计算PSNR
mse = np.mean((carrier_array - watermarked_array) ** 2)
if mse == 0:
psnr = float('inf')
else:
psnr = 20 * np.log10(255.0 / np.sqrt(mse))
# 保存嵌入水印后的图像
watermarked_image = Image.fromarray(watermarked_array.astype(np.uint8))
watermarked_image.save(output_path)
print(f"扩频水印嵌入完成,PSNR = {psnr:.2f} dB")
return psnr
def detect_spread_spectrum_watermark(original_path, watermarked_path, watermark, key=0, alpha=0.05):
"""
检测扩频水印
参数:
original_path: 原始载体图像路径
watermarked_path: 含水印图像路径
watermark: 用于检测的水印数据(二进制数组)
key: 用于生成伪随机序列的密钥
alpha: 水印强度因子
返回:
correlation: 相关性结果
is_watermarked: 是否检测到水印
"""
# 加载原始图像和含水印图像
original = Image.open(original_path).convert('L')
watermarked = Image.open(watermarked_path).convert('L')
original_array = np.array(original, dtype=np.float32)
watermarked_array = np.array(watermarked, dtype=np.float32)
# 获取图像尺寸
h, w = original_array.shape
# 设置随机种子
np.random.seed(key)
# 生成伪随机序列(扩频码)
spread_spectrum = np.random.normal(0, 1, size=(h, w))
# 计算差值图像
diff = (watermarked_array - original_array) / alpha
# 将水印数据扩展到与图像相同大小
watermark_size = len(watermark)
expanded_watermark = np.zeros((h, w), dtype=np.float32)
for i in range(h):
for j in range(w):
idx = (i * w + j) % watermark_size
expanded_watermark[i, j] = 2 * watermark[idx] - 1 # 将0/1转换为-1/1
# 计算相关性
numerator = np.sum(diff * spread_spectrum * expanded_watermark)
denominator = np.sqrt(np.sum((diff * spread_spectrum) ** 2) * np.sum(expanded_watermark ** 2))
correlation = numerator / denominator if denominator != 0 else 0
# 判断是否存在水印
# 这里使用简单的阈值判断,可以根据实际情况调整
threshold = 0.5
is_watermarked = correlation > threshold
print(f"扩频水印检测完成,相关性 = {correlation:.4f}")
print(f"是否检测到水印: {'是' if is_watermarked else '否'}")
return correlation, is_watermarked
# 测试扩频水印
# watermark = np.array([1, 0, 1, 0, 1, 1, 0, 0]) # 示例水印
# embed_spread_spectrum_watermark('carrier.jpg', watermark, 'watermarked_ss.jpg', key=123, alpha=0.05)
# detect_spread_spectrum_watermark('carrier.jpg', 'watermarked_ss.jpg', watermark, key=123, alpha=0.05)优点:
缺点:
直方图调整水印是通过修改载体图像的直方图分布来嵌入水印信息的方法。
直方图调整水印的基本原理是根据水印信息对载体图像的直方图进行有目的的调整,使得调整后的直方图分布能够编码水印信息。常见的方法包括直方图平移和直方图修改等。
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
def embed_histogram_watermark(image_path, watermark, output_path, key=0):
"""
使用直方图平移方法嵌入水印
参数:
image_path: 载体图像路径
watermark: 水印数据(二进制数组)
output_path: 输出图像路径
key: 用于确定嵌入位置的密钥
返回:
psnr: 嵌入水印后的峰值信噪比
"""
# 加载载体图像
carrier = Image.open(image_path).convert('L') # 转换为灰度图像
carrier_array = np.array(carrier)
# 获取图像尺寸
h, w = carrier_array.shape
# 设置随机种子
np.random.seed(key)
# 计算直方图
hist, bins = np.histogram(carrier_array.flatten(), 256, [0, 256])
# 选择嵌入水印的像素值范围(避开0和255以避免溢出)
start_level = 10
end_level = 245
watermark_size = len(watermark)
# 确保水印大小不超过可用容量
if watermark_size > (end_level - start_level + 1):
print("警告: 水印大小超过可用容量,将只嵌入部分水印")
watermark_size = end_level - start_level + 1
watermark = watermark[:watermark_size]
# 创建水印位置映射
positions = np.random.permutation(np.arange(start_level, end_level + 1))[:watermark_size]
# 复制原始图像数组
watermarked_array = carrier_array.copy()
# 根据水印信息进行直方图平移
for i in range(watermark_size):
pos = positions[i]
if watermark[i] == 1:
# 对于水印位1,将像素值为pos的像素移动到pos+1
watermarked_array[carrier_array == pos] += 1
# 计算PSNR
mse = np.mean((carrier_array.astype(np.float32) - watermarked_array.astype(np.float32)) ** 2)
if mse == 0:
psnr = float('inf')
else:
psnr = 20 * np.log10(255.0 / np.sqrt(mse))
# 保存嵌入水印后的图像
watermarked_image = Image.fromarray(watermarked_array.astype(np.uint8))
watermarked_image.save(output_path)
print(f"直方图水印嵌入完成,PSNR = {psnr:.2f} dB")
return psnr
def extract_histogram_watermark(watermarked_path, watermark_size, key=0):
"""
提取直方图水印
参数:
watermarked_path: 含水印图像路径
watermark_size: 水印大小
key: 用于确定嵌入位置的密钥
返回:
extracted_watermark: 提取的水印数据
"""
# 加载含水印图像
watermarked = Image.open(watermarked_path).convert('L')
watermarked_array = np.array(watermarked)
# 设置随机种子
np.random.seed(key)
# 计算直方图
hist, bins = np.histogram(watermarked_array.flatten(), 256, [0, 256])
# 选择嵌入水印的像素值范围
start_level = 10
end_level = 245
# 确保水印大小不超过可用容量
if watermark_size > (end_level - start_level + 1):
print("警告: 请求的水印大小超过可用容量")
watermark_size = end_level - start_level + 1
# 创建与嵌入时相同的水印位置映射
positions = np.random.permutation(np.arange(start_level, end_level + 1))[:watermark_size]
# 提取水印
extracted_watermark = np.zeros(watermark_size, dtype=np.uint8)
for i in range(watermark_size):
pos = positions[i]
# 通过比较pos和pos+1位置的直方图分布来判断水印位
# 这里使用简单的阈值判断,可以根据实际情况调整
if np.abs(hist[pos] - hist[pos+1]) > 50: # 阈值设置为50
extracted_watermark[i] = 1
print("直方图水印提取完成")
print(f"提取的水印: {extracted_watermark}")
return extracted_watermark
# 测试直方图水印
# watermark = np.array([1, 0, 1, 0, 1, 1, 0, 0]) # 示例水印
# embed_histogram_watermark('carrier.jpg', watermark, 'watermarked_hist.jpg', key=123)
# extracted_watermark = extract_histogram_watermark('watermarked_hist.jpg', len(watermark), key=123)优点:
缺点:
变换域数字水印通过在图像的变换域(如频域)中嵌入水印信息,通常具有更好的鲁棒性。本节将介绍几种常见的变换域水印技术及其实现。
离散余弦变换(DCT)域水印是一种广泛使用的变换域水印技术,特别适合于JPEG等基于DCT的压缩标准。
DCT域水印的基本原理是将图像分解为DCT系数,然后修改特定的DCT系数来嵌入水印信息。通常选择中频系数进行修改,因为这些系数对视觉感知的影响较小,同时又相对稳定,不易受到压缩等操作的影响。
import numpy as np
from PIL import Image
from scipy.fftpack import dct, idct
def perform_dct(block):
"""
对图像块执行DCT变换
参数:
block: 输入图像块
返回:
dct_block: DCT变换后的系数块
"""
return dct(dct(block.T, norm='ortho').T, norm='ortho')
def perform_idct(dct_block):
"""
对DCT系数块执行逆DCT变换
参数:
dct_block: 输入DCT系数块
返回:
block: 逆DCT变换后的图像块
"""
return idct(idct(dct_block.T, norm='ortho').T, norm='ortho')
def embed_dct_watermark(image_path, watermark, output_path, key=0, alpha=0.1):
"""
在DCT域中嵌入水印
参数:
image_path: 载体图像路径
watermark: 水印数据(二维数组)
output_path: 输出图像路径
key: 用于确定嵌入位置的密钥
alpha: 水印强度因子
返回:
psnr: 嵌入水印后的峰值信噪比
"""
# 加载载体图像
carrier = Image.open(image_path).convert('L') # 转换为灰度图像
carrier_array = np.array(carrier, dtype=np.float32)
# 获取图像尺寸
h, w = carrier_array.shape
# 设置随机种子
np.random.seed(key)
# DCT块大小
block_size = 8
# 调整水印大小以适应DCT块数量
num_blocks_h = h // block_size
num_blocks_w = w // block_size
# 创建水印索引(使用密钥确定嵌入位置)
available_positions = []
for i in range(num_blocks_h):
for j in range(num_blocks_w):
available_positions.append((i, j))
# 随机选择水印嵌入位置
np.random.shuffle(available_positions)
watermark_size = watermark.size
positions = available_positions[:watermark_size]
# 复制原始图像数组
watermarked_array = carrier_array.copy()
# 展平水印
watermark_flat = watermark.flatten()
# 对每个图像块执行DCT变换,嵌入水印
for idx, (i, j) in enumerate(positions):
# 提取图像块
block = watermarked_array[i*block_size:(i+1)*block_size, j*block_size:(j+1)*block_size]
# 执行DCT变换
dct_block = perform_dct(block)
# 选择嵌入水印的DCT系数位置(选择中频系数)
# 通常避免DC系数(0,0)和高频系数
coeff_pos = (3, 3) # 选择(3,3)位置的中频系数
# 嵌入水印
if watermark_flat[idx] > 0.5: # 假设水印已经二值化
dct_block[coeff_pos] += alpha * np.abs(dct_block[coeff_pos])
else:
dct_block[coeff_pos] -= alpha * np.abs(dct_block[coeff_pos])
# 执行逆DCT变换
watermarked_block = perform_idct(dct_block)
# 将处理后的块放回图像
watermarked_array[i*block_size:(i+1)*block_size, j*block_size:(j+1)*block_size] = watermarked_block
# 确保像素值在有效范围内
watermarked_array = np.clip(watermarked_array, 0, 255)
# 计算PSNR
mse = np.mean((carrier_array - watermarked_array) ** 2)
if mse == 0:
psnr = float('inf')
else:
psnr = 20 * np.log10(255.0 / np.sqrt(mse))
# 保存嵌入水印后的图像
watermarked_image = Image.fromarray(watermarked_array.astype(np.uint8))
watermarked_image.save(output_path)
print(f"DCT域水印嵌入完成,PSNR = {psnr:.2f} dB")
return psnr
def extract_dct_watermark(watermarked_path, watermark_shape, key=0, alpha=0.1):
"""
从DCT域中提取水印
参数:
watermarked_path: 含水印图像路径
watermark_shape: 水印形状(高度,宽度)
key: 用于确定嵌入位置的密钥
alpha: 水印强度因子
返回:
extracted_watermark: 提取的水印
"""
# 加载含水印图像
watermarked = Image.open(watermarked_path).convert('L')
watermarked_array = np.array(watermarked, dtype=np.float32)
# 获取图像尺寸
h, w = watermarked_array.shape
# 设置随机种子
np.random.seed(key)
# DCT块大小
block_size = 8
# 计算水印大小
watermark_size = watermark_shape[0] * watermark_shape[1]
# 创建与嵌入时相同的水印位置
available_positions = []
for i in range(h // block_size):
for j in range(w // block_size):
available_positions.append((i, j))
# 随机选择水印嵌入位置
np.random.shuffle(available_positions)
positions = available_positions[:watermark_size]
# 提取水印
extracted_watermark = np.zeros(watermark_size)
for idx, (i, j) in enumerate(positions):
# 提取图像块
block = watermarked_array[i*block_size:(i+1)*block_size, j*block_size:(j+1)*block_size]
# 执行DCT变换
dct_block = perform_dct(block)
# 选择嵌入水印的DCT系数位置
coeff_pos = (3, 3)
# 由于没有原始图像作为参考,我们使用简化的提取方法
# 在实际应用中,通常需要使用原始图像或采用更复杂的盲提取算法
# 这里我们假设DCT系数的符号变化表示水印位
# 注意:这种方法在没有原始载体的情况下可能不够准确
if dct_block[coeff_pos] > 0:
extracted_watermark[idx] = 1
else:
extracted_watermark[idx] = 0
# 重塑水印
extracted_watermark = extracted_watermark.reshape(watermark_shape)
print("DCT域水印提取完成")
return extracted_watermark
# 测试DCT域水印
# watermark = np.array([[1, 0, 1], [0, 1, 0], [1, 0, 1]]) # 示例水印
# embed_dct_watermark('carrier.jpg', watermark, 'watermarked_dct.jpg', key=123, alpha=0.1)
# extracted_watermark = extract_dct_watermark('watermarked_dct.jpg', watermark.shape, key=123, alpha=0.1)优点:
缺点:
离散小波变换(DWT)域水印利用小波变换的多分辨率特性,通常具有更好的鲁棒性和不可感知性。
DWT域水印的基本原理是将图像进行小波变换,得到不同分辨率和方向的子带,然后在适当的子带中嵌入水印信息。通常选择中高频子带进行嵌入,因为这些子带包含了图像的纹理细节,对水印的感知影响较小。
import numpy as np
from PIL import Image
import pywt # 需要安装PyWavelets库: pip install PyWavelets
def embed_dwt_watermark(image_path, watermark, output_path, key=0, alpha=0.1):
"""
在DWT域中嵌入水印
参数:
image_path: 载体图像路径
watermark: 水印数据(二维数组)
output_path: 输出图像路径
key: 用于确定嵌入位置的密钥
alpha: 水印强度因子
返回:
psnr: 嵌入水印后的峰值信噪比
"""
# 加载载体图像
carrier = Image.open(image_path).convert('L') # 转换为灰度图像
carrier_array = np.array(carrier, dtype=np.float32)
# 设置随机种子
np.random.seed(key)
# 执行一级离散小波变换
coeffs2 = pywt.dwt2(carrier_array, 'haar')
cA, (cH, cV, cD) = coeffs2 # 近似系数和三个细节系数
# 选择嵌入水印的子带(这里选择水平细节子带cH)
watermark_subband = cH.copy()
# 调整水印大小以适应子带大小
h, w = watermark_subband.shape
watermark_resized = np.resize(watermark, (h, w))
# 嵌入水印
watermarked_subband = watermark_subband + alpha * watermark_resized
# 重建水印图像
watermarked_coeffs = (cA, (watermarked_subband, cV, cD))
watermarked_array = pywt.idwt2(watermarked_coeffs, 'haar')
# 确保像素值在有效范围内
watermarked_array = np.clip(watermarked_array, 0, 255)
# 计算PSNR
mse = np.mean((carrier_array - watermarked_array) ** 2)
if mse == 0:
psnr = float('inf')
else:
psnr = 20 * np.log10(255.0 / np.sqrt(mse))
# 保存嵌入水印后的图像
watermarked_image = Image.fromarray(watermarked_array.astype(np.uint8))
watermarked_image.save(output_path)
print(f"DWT域水印嵌入完成,PSNR = {psnr:.2f} dB")
return psnr
def extract_dwt_watermark(watermarked_path, watermark_shape, key=0, alpha=0.1):
"""
从DWT域中提取水印
参数:
watermarked_path: 含水印图像路径
watermark_shape: 水印形状(高度,宽度)
key: 用于确定嵌入位置的密钥
alpha: 水印强度因子
返回:
extracted_watermark: 提取的水印
"""
# 加载含水印图像
watermarked = Image.open(watermarked_path).convert('L')
watermarked_array = np.array(watermarked, dtype=np.float32)
# 执行一级离散小波变换
coeffs2 = pywt.dwt2(watermarked_array, 'haar')
cA, (cH, cV, cD) = coeffs2
# 从水平细节子带提取水印
h, w = cH.shape
# 提取水印
# 由于没有原始图像作为参考,我们使用简化的提取方法
# 在实际应用中,通常需要使用原始图像或采用更复杂的盲提取算法
extracted_watermark = np.zeros((h, w))
# 根据系数的幅度选择水印位
threshold = np.mean(np.abs(cH)) # 使用子带系数的平均绝对值作为阈值
extracted_watermark[cH > threshold] = 1
extracted_watermark[cH <= threshold] = 0
# 调整水印大小为原始形状
watermark = Image.fromarray(extracted_watermark.astype(np.uint8))
watermark = watermark.resize(watermark_shape, Image.BICUBIC)
extracted_watermark = np.array(watermark)
print("DWT域水印提取完成")
return extracted_watermark
# 测试DWT域水印
# watermark = np.array([[1, 0, 1], [0, 1, 0], [1, 0, 1]]) # 示例水印
# embed_dwt_watermark('carrier.jpg', watermark, 'watermarked_dwt.jpg', key=123, alpha=0.1)
# extracted_watermark = extract_dwt_watermark('watermarked_dwt.jpg', watermark.shape, key=123, alpha=0.1)优点:
缺点:
奇异值分解(SVD)域水印利用矩阵的奇异值分解特性,通常具有非常好的鲁棒性。
SVD域水印的基本原理是对图像块或整个图像进行奇异值分解,然后修改奇异值来嵌入水印信息。由于奇异值代表了图像的主要能量,对奇异值的小幅度修改通常不会明显影响图像质量,同时具有很强的鲁棒性。
import numpy as np
from PIL import Image
def embed_svd_watermark(image_path, watermark, output_path, key=0, alpha=0.05):
"""
在SVD域中嵌入水印
参数:
image_path: 载体图像路径
watermark: 水印数据(二维数组)
output_path: 输出图像路径
key: 用于确定嵌入位置的密钥
alpha: 水印强度因子
返回:
psnr: 嵌入水印后的峰值信噪比
"""
# 加载载体图像
carrier = Image.open(image_path).convert('L') # 转换为灰度图像
carrier_array = np.array(carrier, dtype=np.float32)
# 设置随机种子
np.random.seed(key)
# 对整个图像执行SVD
U, S, V = np.linalg.svd(carrier_array)
# 调整水印大小以适应奇异值数量
watermark_size = len(S)
watermark_flat = np.resize(watermark, watermark_size)
# 选择嵌入水印的奇异值数量(通常选择前几个主要奇异值)
num_singular_values = min(watermark_size, 50) # 这里选择前50个奇异值
# 创建水印强度数组(可以根据奇异值的重要性调整强度)
strengths = alpha * np.ones(num_singular_values)
# 嵌入水印
S_watermarked = S.copy()
for i in range(num_singular_values):
# 嵌入水印到奇异值中
S_watermarked[i] = S[i] * (1 + strengths[i] * watermark_flat[i])
# 重建水印图像
# 创建奇异值矩阵
S_matrix = np.zeros_like(carrier_array)
min_dim = min(carrier_array.shape)
S_matrix[:min_dim, :min_dim] = np.diag(S_watermarked[:min_dim])
# 重建图像
watermarked_array = np.dot(U, np.dot(S_matrix, V))
# 确保像素值在有效范围内
watermarked_array = np.clip(watermarked_array, 0, 255)
# 计算PSNR
mse = np.mean((carrier_array - watermarked_array) ** 2)
if mse == 0:
psnr = float('inf')
else:
psnr = 20 * np.log10(255.0 / np.sqrt(mse))
# 保存嵌入水印后的图像
watermarked_image = Image.fromarray(watermarked_array.astype(np.uint8))
watermarked_image.save(output_path)
print(f"SVD域水印嵌入完成,PSNR = {psnr:.2f} dB")
return psnr
def extract_svd_watermark(original_path, watermarked_path, watermark_shape, key=0, alpha=0.05):
"""
从SVD域中提取水印
参数:
original_path: 原始载体图像路径
watermarked_path: 含水印图像路径
watermark_shape: 水印形状(高度,宽度)
key: 用于确定嵌入位置的密钥
alpha: 水印强度因子
返回:
extracted_watermark: 提取的水印
"""
# 加载原始图像和含水印图像
original = Image.open(original_path).convert('L')
watermarked = Image.open(watermarked_path).convert('L')
original_array = np.array(original, dtype=np.float32)
watermarked_array = np.array(watermarked, dtype=np.float32)
# 对原始图像和含水印图像执行SVD
_, S_original, _ = np.linalg.svd(original_array)
_, S_watermarked, _ = np.linalg.svd(watermarked_array)
# 设置随机种子
np.random.seed(key)
# 计算水印大小
watermark_size = watermark_shape[0] * watermark_shape[1]
# 提取水印
# 计算奇异值的相对变化
relative_changes = (S_watermarked - S_original) / S_original
# 创建水印强度数组
strengths = alpha * np.ones_like(relative_changes)
# 提取水印位
extracted_watermark = np.zeros(watermark_size)
for i in range(min(watermark_size, len(relative_changes))):
# 根据相对变化提取水印位
if relative_changes[i] > 0:
extracted_watermark[i] = 1
else:
extracted_watermark[i] = 0
# 重塑水印
extracted_watermark = extracted_watermark.reshape(watermark_shape)
print("SVD域水印提取完成")
return extracted_watermark
# 测试SVD域水印
# watermark = np.array([[1, 0, 1], [0, 1, 0], [1, 0, 1]]) # 示例水印
# embed_svd_watermark('carrier.jpg', watermark, 'watermarked_svd.jpg', key=123, alpha=0.05)
# extracted_watermark = extract_svd_watermark('carrier.jpg', 'watermarked_svd.jpg', watermark.shape, key=123, alpha=0.05)优点:
缺点:
数字水印技术在数字内容版权保护中有着广泛的应用。本节将介绍数字水印在不同类型数字内容版权保护中的应用方法和实例。
图像水印在以下场景中有着重要应用:
以下是一个完整的图像版权保护水印方案实现:
import numpy as np
from PIL import Image, ImageDraw, ImageFont
import hashlib
import datetime
def generate_watermark_info(author_name, copyright_year=None):
"""
生成版权保护水印信息
参数:
author_name: 作者名称
copyright_year: 版权年份,如果为None则使用当前年份
返回:
watermark_text: 水印文本
watermark_hash: 水印信息的哈希值(用于验证)
"""
if copyright_year is None:
copyright_year = datetime.datetime.now().year
# 生成水印文本
watermark_text = f"© {copyright_year} {author_name}"
# 生成水印信息的哈希值
watermark_hash = hashlib.sha256(watermark_text.encode()).hexdigest()
return watermark_text, watermark_hash
def embed_robust_image_watermark(image_path, author_name, output_path, visible_watermark=True, invisible_watermark=True, key=0, alpha=0.1):
"""
为图像嵌入可见和不可见水印用于版权保护
参数:
image_path: 原始图像路径
author_name: 作者名称
output_path: 输出图像路径
visible_watermark: 是否嵌入可见水印
invisible_watermark: 是否嵌入不可见水印
key: 用于不可见水印的密钥
alpha: 水印强度因子
返回:
watermark_info: 包含水印信息的字典
"""
# 加载图像
image = Image.open(image_path).convert('RGB')
image_array = np.array(image, dtype=np.float32)
# 生成水印信息
watermark_text, watermark_hash = generate_watermark_info(author_name)
# 复制原始图像数组
watermarked_array = image_array.copy()
# 嵌入不可见水印
if invisible_watermark:
# 使用前面介绍的DWT域水印方法
# 首先创建基于哈希值的二进制水印
watermark_bits = np.array([int(bit) for bit in ''.join(format(ord(c), '08b') for c in watermark_hash[:16])])
# 将一维水印转为二维
watermark_size = int(np.ceil(np.sqrt(len(watermark_bits))))
watermark = np.zeros((watermark_size, watermark_size))
watermark_flat = watermark_bits[:watermark_size*watermark_size]
watermark[:len(watermark_flat)//watermark_size, :watermark_size] = watermark_flat.reshape(-1, watermark_size)
# 使用DWT嵌入水印
# 这里简化实现,实际应用中应使用前面完整的DWT水印函数
import pywt
# 转换为灰度用于水印嵌入
gray_image = Image.fromarray(watermarked_array.astype(np.uint8)).convert('L')
gray_array = np.array(gray_image, dtype=np.float32)
# 执行离散小波变换
coeffs2 = pywt.dwt2(gray_array, 'haar')
cA, (cH, cV, cD) = coeffs2
# 调整水印大小
h, w = cH.shape
watermark_resized = np.resize(watermark, (h, w))
# 嵌入水印到水平细节子带
cH_watermarked = cH + alpha * watermark_resized
# 重建水印图像
watermarked_coeffs = (cA, (cH_watermarked, cV, cD))
watermarked_gray = pywt.idwt2(watermarked_coeffs, 'haar')
# 将水印灰度图像合并回彩色图像
# 这里使用一个简单的方法:将灰度水印图像添加到所有三个颜色通道
for i in range(3):
watermarked_array[:, :, i] = np.clip(watermarked_array[:, :, i] + (watermarked_gray - gray_array), 0, 255)
# 嵌入可见水印
if visible_watermark:
# 转换为PIL图像以便添加可见水印
watermarked_pil = Image.fromarray(watermarked_array.astype(np.uint8))
draw = ImageDraw.Draw(watermarked_pil)
# 尝试加载字体,如果失败则使用默认字体
try:
font = ImageFont.truetype("arial.ttf", 36)
except:
font = ImageFont.load_default()
# 获取文本大小
text_width, text_height = draw.textsize(watermark_text, font=font)
# 计算水印位置(右下角)
img_width, img_height = watermarked_pil.size
x = img_width - text_width - 20
y = img_height - text_height - 20
# 添加半透明背景
draw.rectangle([(x-10, y-10), (x+text_width+10, y+text_height+10)], fill=(0, 0, 0, 128))
# 添加文本水印
draw.text((x, y), watermark_text, font=font, fill=(255, 255, 255, 180))
# 转换回numpy数组
watermarked_array = np.array(watermarked_pil, dtype=np.float32)
# 保存水印图像
watermarked_image = Image.fromarray(watermarked_array.astype(np.uint8))
watermarked_image.save(output_path)
# 计算PSNR(仅考虑不可见水印部分)
if invisible_watermark:
mse = np.mean((image_array - watermarked_array) ** 2)
if mse == 0:
psnr = float('inf')
else:
psnr = 20 * np.log10(255.0 / np.sqrt(mse))
else:
psnr = float('inf')
# 返回水印信息
watermark_info = {
'author_name': author_name,
'watermark_text': watermark_text,
'watermark_hash': watermark_hash,
'psnr': psnr,
'visible_watermark': visible_watermark,
'invisible_watermark': invisible_watermark
}
print(f"图像版权保护水印嵌入完成")
print(f"水印文本: {watermark_text}")
print(f"水印哈希: {watermark_hash[:16]}...")
if invisible_watermark:
print(f"不可见水印PSNR: {psnr:.2f} dB")
return watermark_info
def verify_image_watermark(image_path, author_name, key=0, alpha=0.1):
"""
验证图像中的版权保护水印
参数:
image_path: 待验证图像路径
author_name: 作者名称
key: 用于不可见水印的密钥
alpha: 水印强度因子
返回:
verification_result: 验证结果字典
"""
# 加载图像
image = Image.open(image_path).convert('RGB')
image_array = np.array(image, dtype=np.float32)
# 生成预期的水印信息
expected_watermark_text, expected_watermark_hash = generate_watermark_info(author_name)
# 初始化验证结果
verification_result = {
'visible_watermark_detected': False,
'invisible_watermark_detected': False,
'watermark_text': None,
'watermark_hash': None,
'confidence': 0.0
}
# 检测可见水印
# 注意:这是一个简化的实现,实际应用中可能需要更复杂的文本检测算法
# 例如使用OCR技术检测文本
print("可见水印检测:这需要OCR技术,此处省略具体实现")
# 尝试提取不可见水印
# 转换为灰度
gray_image = image.convert('L')
gray_array = np.array(gray_image, dtype=np.float32)
# 执行离散小波变换
import pywt
coeffs2 = pywt.dwt2(gray_array, 'haar')
cA, (cH, cV, cD) = coeffs2
# 计算子带系数的统计特性
# 这里使用一个简化的检测方法:检查子带系数的分布
h, w = cH.shape
# 提取可能的水印
# 在实际应用中,应该使用与嵌入时对应的提取算法
threshold = np.mean(np.abs(cH)) * 0.5
extracted_bits = (cH > threshold).flatten()
# 尝试将提取的比特转换为哈希值
# 这是一个简化的实现,实际应用中需要更复杂的解码过程
print("不可见水印检测:此处使用简化实现,实际应用需要完整的解码算法")
# 设置置信度(示例)
# 在实际应用中,应该基于相关性等指标计算置信度
confidence = np.random.random() # 示例值,实际应用应计算真实的置信度
verification_result['confidence'] = confidence
# 根据置信度判断
if confidence > 0.7: # 示例阈值
verification_result['invisible_watermark_detected'] = True
verification_result['watermark_hash'] = expected_watermark_hash # 示例,实际应从水印中提取
print(f"图像版权保护水印验证完成")
print(f"可见水印检测: {'是' if verification_result['visible_watermark_detected'] else '否'}")
print(f"不可见水印检测: {'是' if verification_result['invisible_watermark_detected'] else '否'}")
print(f"检测置信度: {verification_result['confidence']:.2f}")
return verification_result
# 测试图像版权保护水印
# watermark_info = embed_robust_image_watermark('original_image.jpg', 'Digital Rights Owner', 'watermarked_image.jpg', visible_watermark=True, invisible_watermark=True)
# verification_result = verify_image_watermark('watermarked_image.jpg', 'Digital Rights Owner')