Skip to main content
Glama
test_advanced.py15.7 kB
""" 高级功能测试模块 测试特效处理、高级功能和性能优化等模块的功能。 """ import unittest import asyncio import json import base64 import os import tempfile from PIL import Image import io # 导入测试目标 from tools.effects import ( add_border, create_silhouette, add_shadow, add_watermark, apply_vignette, create_polaroid ) from tools.advanced import ( batch_resize, create_collage, create_thumbnail_grid, blend_images, extract_colors, create_gif ) from utils.performance import ( PerformanceMonitor, ImageCache, ResourceManager, performance_tracking, get_performance_stats, reset_performance_stats ) class TestEffects(unittest.TestCase): """测试特效处理功能""" def setUp(self): """设置测试环境""" # 创建测试图片 self.test_image = Image.new("RGB", (200, 200), "red") buffer = io.BytesIO() self.test_image.save(buffer, format="PNG") self.test_image_base64 = f"data:image/png;base64,{base64.b64encode(buffer.getvalue()).decode()}" # 创建测试水印图片 self.watermark_image = Image.new("RGBA", (50, 50), (255, 255, 255, 128)) buffer = io.BytesIO() self.watermark_image.save(buffer, format="PNG") self.watermark_base64 = f"data:image/png;base64,{base64.b64encode(buffer.getvalue()).decode()}" def test_add_border(self): """测试添加边框""" async def run_test(): arguments = { "image_source": self.test_image_base64, "border_width": 10, "border_color": "#000000", "border_style": "solid" } result = await add_border(arguments) self.assertIsInstance(result, list) self.assertEqual(len(result), 1) result_data = json.loads(result[0].text) self.assertTrue(result_data["success"]) self.assertIn("result", result_data) asyncio.run(run_test()) def test_create_silhouette(self): """测试创建剪影""" async def run_test(): arguments = { "image_source": self.test_image_base64, "threshold": 128, "silhouette_color": "#000000", "background_color": "#FFFFFF" } result = await create_silhouette(arguments) self.assertIsInstance(result, list) result_data = json.loads(result[0].text) self.assertTrue(result_data["success"]) asyncio.run(run_test()) def test_add_shadow(self): """测试添加阴影""" async def run_test(): arguments = { "image_source": self.test_image_base64, "offset_x": 5, "offset_y": 5, "blur_radius": 3, "shadow_color": "#808080", "shadow_opacity": 0.5 } result = await add_shadow(arguments) self.assertIsInstance(result, list) result_data = json.loads(result[0].text) self.assertTrue(result_data["success"]) asyncio.run(run_test()) def test_add_watermark(self): """测试添加水印""" async def run_test(): arguments = { "image_source": self.test_image_base64, "watermark_source": self.watermark_base64, "position": "bottom_right", "opacity": 0.7, "scale": 0.2 } result = await add_watermark(arguments) self.assertIsInstance(result, list) result_data = json.loads(result[0].text) self.assertTrue(result_data["success"]) asyncio.run(run_test()) def test_apply_vignette(self): """测试添加暗角""" async def run_test(): arguments = { "image_source": self.test_image_base64, "strength": 0.5, "radius": 0.8, "color": "#000000" } result = await apply_vignette(arguments) self.assertIsInstance(result, list) result_data = json.loads(result[0].text) self.assertTrue(result_data["success"]) asyncio.run(run_test()) def test_create_polaroid(self): """测试创建宝丽来效果""" async def run_test(): arguments = { "image_source": self.test_image_base64, "border_width": 20, "bottom_border": 60, "border_color": "#FFFFFF", "rotation": 5, "caption": "Test Photo" } result = await create_polaroid(arguments) self.assertIsInstance(result, list) result_data = json.loads(result[0].text) self.assertTrue(result_data["success"]) asyncio.run(run_test()) class TestAdvanced(unittest.TestCase): """测试高级功能""" def setUp(self): """设置测试环境""" # 创建多个测试图片 self.test_images = [] for i, color in enumerate(["red", "green", "blue"]): image = Image.new("RGB", (100, 100), color) buffer = io.BytesIO() image.save(buffer, format="PNG") image_base64 = f"data:image/png;base64,{base64.b64encode(buffer.getvalue()).decode()}" self.test_images.append(image_base64) def test_batch_resize(self): """测试批量调整大小""" async def run_test(): arguments = { "image_sources": self.test_images, "width": 50, "height": 50, "maintain_aspect_ratio": True } result = await batch_resize(arguments) self.assertIsInstance(result, list) result_data = json.loads(result[0].text) self.assertTrue(result_data["success"]) self.assertIn("results", result_data) self.assertEqual(len(result_data["results"]), 3) asyncio.run(run_test()) def test_create_collage(self): """测试创建拼贴""" async def run_test(): arguments = { "image_sources": self.test_images, "layout": "grid", "spacing": 10, "background_color": "#FFFFFF" } result = await create_collage(arguments) self.assertIsInstance(result, list) result_data = json.loads(result[0].text) self.assertTrue(result_data["success"]) self.assertIn("result", result_data) asyncio.run(run_test()) def test_create_thumbnail_grid(self): """测试创建缩略图网格""" async def run_test(): arguments = { "image_sources": self.test_images, "thumbnail_size": 80, "columns": 2, "spacing": 5 } result = await create_thumbnail_grid(arguments) self.assertIsInstance(result, list) result_data = json.loads(result[0].text) self.assertTrue(result_data["success"]) asyncio.run(run_test()) def test_blend_images(self): """测试混合图片""" async def run_test(): arguments = { "image1_source": self.test_images[0], "image2_source": self.test_images[1], "blend_mode": "normal", "opacity": 0.5 } result = await blend_images(arguments) self.assertIsInstance(result, list) result_data = json.loads(result[0].text) self.assertTrue(result_data["success"]) asyncio.run(run_test()) def test_extract_colors(self): """测试提取颜色""" async def run_test(): arguments = { "image_source": self.test_images[0], "color_count": 3, "create_palette": True } result = await extract_colors(arguments) self.assertIsInstance(result, list) result_data = json.loads(result[0].text) self.assertTrue(result_data["success"]) self.assertIn("colors", result_data) self.assertIn("palette", result_data) asyncio.run(run_test()) def test_create_gif(self): """测试创建GIF""" async def run_test(): arguments = { "image_sources": self.test_images, "duration": 500, "loop": True, "optimize": True } result = await create_gif(arguments) self.assertIsInstance(result, list) result_data = json.loads(result[0].text) self.assertTrue(result_data["success"]) self.assertIn("result", result_data) asyncio.run(run_test()) class TestPerformance(unittest.TestCase): """测试性能优化功能""" def setUp(self): """设置测试环境""" self.monitor = PerformanceMonitor() self.cache = ImageCache(max_size_mb=1) # 1MB缓存用于测试 self.resource_manager = ResourceManager() def test_performance_monitor(self): """测试性能监控器""" # 记录一些操作 self.monitor.record_operation("test_op", 0.1, 10.0, cache_hit=False, error=False) self.monitor.record_operation("test_op", 0.2, 15.0, cache_hit=True, error=False) self.monitor.record_operation("test_op", 0.15, 12.0, cache_hit=False, error=True) stats = self.monitor.get_stats() self.assertEqual(stats["total_operations"], 3) self.assertAlmostEqual(stats["average_processing_time"], 0.15, places=2) self.assertAlmostEqual(stats["average_memory_usage_mb"], 12.33, places=1) self.assertAlmostEqual(stats["cache_hit_rate"], 0.5, places=1) self.assertAlmostEqual(stats["error_rate"], 1/3, places=2) def test_image_cache(self): """测试图片缓存""" # 测试缓存存储和获取 image_source = "test_image" operation = "resize" params = {"width": 100, "height": 100} result = "cached_result" # 存储到缓存 self.cache.put(image_source, operation, params, result) # 从缓存获取 cached_result = self.cache.get(image_source, operation, params) self.assertEqual(cached_result, result) # 测试缓存未命中 different_params = {"width": 200, "height": 200} cached_result = self.cache.get(image_source, operation, different_params) self.assertIsNone(cached_result) # 测试缓存统计 stats = self.cache.get_stats() self.assertEqual(stats["items_count"], 1) self.assertTrue(stats["enabled"]) def test_resource_manager(self): """测试资源管理器""" async def run_test(): # 测试获取和释放任务槽位 initial_active = self.resource_manager.active_tasks await self.resource_manager.acquire_task_slot() self.assertEqual(self.resource_manager.active_tasks, initial_active + 1) self.resource_manager.release_task_slot() self.assertEqual(self.resource_manager.active_tasks, initial_active) # 测试内存使用情况 memory_usage = self.resource_manager.get_memory_usage() self.assertIn("rss_mb", memory_usage) self.assertIn("vms_mb", memory_usage) self.assertIn("percent", memory_usage) # 测试统计信息 stats = self.resource_manager.get_stats() self.assertIn("active_tasks", stats) self.assertIn("max_tasks", stats) self.assertIn("memory_usage", stats) asyncio.run(run_test()) def test_performance_tracking_decorator(self): """测试性能跟踪装饰器""" @performance_tracking("test_operation") async def test_function(arguments): await asyncio.sleep(0.01) # 模拟处理时间 return [{"type": "text", "text": "success"}] async def run_test(): arguments = {"test": "data"} result = await test_function(arguments) self.assertIsInstance(result, list) self.assertEqual(len(result), 1) asyncio.run(run_test()) def test_performance_stats(self): """测试性能统计功能""" # 获取性能统计 stats = get_performance_stats() self.assertIn("monitor", stats) self.assertIn("cache", stats) self.assertIn("resources", stats) self.assertIn("timestamp", stats) # 重置性能统计 reset_performance_stats() # 验证统计已重置 new_stats = get_performance_stats() self.assertEqual(new_stats["monitor"]["total_operations"], 0) class TestErrorHandling(unittest.TestCase): """测试错误处理""" def test_invalid_image_source(self): """测试无效图片源""" async def run_test(): arguments = { "image_source": "invalid_base64_data", "border_width": 10, "border_color": "#000000" } result = await add_border(arguments) result_data = json.loads(result[0].text) self.assertFalse(result_data["success"]) self.assertIn("error", result_data) asyncio.run(run_test()) def test_invalid_parameters(self): """测试无效参数""" # 创建有效的测试图片 test_image = Image.new("RGB", (100, 100), "red") buffer = io.BytesIO() test_image.save(buffer, format="PNG") test_image_base64 = f"data:image/png;base64,{base64.b64encode(buffer.getvalue()).decode()}" async def run_test(): # 测试无效的边框宽度 arguments = { "image_source": test_image_base64, "border_width": -5, # 无效值 "border_color": "#000000" } result = await add_border(arguments) result_data = json.loads(result[0].text) self.assertFalse(result_data["success"]) asyncio.run(run_test()) def test_empty_batch_operation(self): """测试空批量操作""" async def run_test(): arguments = { "image_sources": [], # 空列表 "width": 100, "height": 100 } result = await batch_resize(arguments) result_data = json.loads(result[0].text) self.assertFalse(result_data["success"]) asyncio.run(run_test()) def run_async_test(test_func): """运行异步测试的辅助函数""" def wrapper(self): asyncio.run(test_func(self)) return wrapper if __name__ == "__main__": # 运行所有测试 unittest.main(verbosity=2)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/duke0317/ps-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server