目录

C带多组标签的Snowflake-SQL查询批量数据导出程序

C#带多组标签的Snowflake SQL查询批量数据导出程序

设计一个基于多个带标签Snowflake SQL语句作为json配置文件的C#代码程序,实现根据不同的输入参数自动批量地将Snowflake数据库的数据导出为CSV文件到本地目录上,标签加扩展名.csv为导出数据文件名,文件已经存在则覆盖原始文件。需要考虑SQL结果集是大数据量分批数据导出的情况,通过多线程和异步操作来提高程序性能,程序需要异常处理和输出,输出出错时的错误信息,每次每个查询导出数据的运行状态和表数据行数以及运行时间戳,导出时间,输出每个文件记录数量的日志。

步骤1:创建配置文件类

// AppConfig.cs public class AppConfig { public SnowflakeConnectionConfig SnowflakeConnection { get; set; } public List Queries { get; set; } } public class SnowflakeConnectionConfig { public string Account { get; set; } public string User { get; set; } public string Password { get; set; } public string Warehouse { get; set; } public string Database { get; set; } public string Schema { get; set; } public string Role { get; set; } } public class QueryConfig { public string Label { get; set; } public string Sql { get; set; } }

步骤2:实现日志记录器

// Logger.cs public class Logger { private readonly string _logPath; private readonly object _lock = new object(); public Logger(string logPath) { _logPath = logPath; InitializeLogFile(); } private void InitializeLogFile() { lock (_lock) { File.AppendAllText(_logPath, $"{“Timestamp”,-25}|{“Status”,-8}|{“Label”,-20}|{“StartTime”,-20}|{“Duration(s)”,-12}|{“Rows”,-10}|{“Error”}\n"); } } public void LogSuccess(string label, DateTime startTime, long rowCount, TimeSpan duration) { var entry = $"{DateTime.UtcNow:yyyy-MM-dd HH:mm:ss}|{“SUCCESS”,-8}|{label,-20}|{startTime:HH:mm:ss}|{duration.TotalSeconds,-12:N2}|{rowCount,-10}|{"-"}\n"; WriteLogEntry(entry); } public void LogError(string label, DateTime startTime, string error, TimeSpan duration) { var entry = $"{DateTime.UtcNow:yyyy-MM-dd HH:mm:ss}|{“ERROR”,-8}|{label,-20}|{startTime:HH:mm:ss}|{duration.TotalSeconds,-12:N2}|{"-",-10}|{error}\n"; WriteLogEntry(entry); } private void WriteLogEntry(string entry) { lock (_lock) { File.AppendAllText(_logPath, entry); } Console.Write(entry); } }

步骤3:实现数据导出处理器

// ExportProcessor.cs using CsvHelper; using Snowflake.Data.Client; using System.Globalization; public class ExportProcessor { private readonly string _connectionString; private readonly Logger _logger; public ExportProcessor(string connectionString, Logger logger) { _connectionString = connectionString; _logger = logger; } public async Task ExportQueryAsync(QueryConfig query, string outputDir, CancellationToken cancellationToken = default) { var startTime = DateTime.UtcNow; var sw = Stopwatch.StartNew(); long rowCount = 0; string filePath = Path.Combine(outputDir, $"{query.Label}.csv"); try { using (var conn = new SnowflakeDbConnection()) { conn.ConnectionString = _connectionString; await conn.OpenAsync(cancellationToken); using (var cmd = conn.CreateCommand()) { cmd.CommandText = query.Sql; using (var reader = await cmd.ExecuteReaderAsync(cancellationToken)) using (var writer = new StreamWriter(filePath, append: false)) using (var csv = new CsvWriter(writer, CultureInfo.InvariantCulture)) { // Write header for (int i = 0; i < reader.FieldCount; i++) { csv.WriteField(reader.GetName(i)); } await csv.NextRecordAsync(); // Write rows while (await reader.ReadAsync(cancellationToken)) { for (int i = 0; i < reader.FieldCount; i++) { csv.WriteField(reader.GetValue(i)); } await csv.NextRecordAsync(); rowCount++; } } } } sw.Stop(); _logger.LogSuccess(query.Label, startTime, rowCount, sw.Elapsed); } catch (Exception ex) { sw.Stop(); _logger.LogError(query.Label, startTime, ex.Message, sw.Elapsed); SafeDeleteFile(filePath); throw; // Re-throw if using retry logic } } private void SafeDeleteFile(string path) { try { File.Delete(path); } catch { /* Ignore deletion errors */ } } }

步骤4:主程序实现

// Program.cs using System.CommandLine; class Program { static async Task Main(string[] args) { var configOption = new Option( name: “–config”, description: “Path to configuration file”); var outputOption = new Option( name: “–output”, description: “Output directory for CSV files”); var rootCommand = new RootCommand { configOption, outputOption }; rootCommand.Description = “Snowflake Data Exporter”; rootCommand.SetHandler(async (config, outputDir) => { await RunExport(config, outputDir); }, configOption, outputOption); await rootCommand.InvokeAsync(args); } static async Task RunExport(FileInfo configFile, DirectoryInfo outputDir) { // Read configuration var config = JsonSerializer.Deserialize( File.ReadAllText(configFile.FullName), new JsonSerializerOptions { PropertyNameCaseInsensitive = true }); // Create output directory Directory.CreateDirectory(outputDir.FullName); // Initialize logger var logger = new Logger(Path.Combine(outputDir.FullName, “export.log”)); // Build connection string var connBuilder = new SnowflakeDbConnectionStringBuilder { Account = config.SnowflakeConnection.Account, User = config.SnowflakeConnection.User, Password = config.SnowflakeConnection.Password, Warehouse = config.SnowflakeConnection.Warehouse, Db = config.SnowflakeConnection.Database, Schema = config.SnowflakeConnection.Schema, Role = config.SnowflakeConnection.Role }; // Initialize processor var processor = new ExportProcessor( connBuilder.ToString(), logger); // Parallel execution with throttling var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount }; await Parallel.ForEachAsync( config.Queries, parallelOptions, async (query, cancellationToken) => { await processor.ExportQueryAsync( query, outputDir.FullName, cancellationToken); }); } }

步骤5:配置文件示例(config.json)

{ “snowflakeConnection”: { “account”: “your_account”, “user”: “your_user”, “password”: “your_password”, “warehouse”: “COMPUTE_WH”, “database”: “PROD_DB”, “schema”: “PUBLIC”, “role”: “SYSADMIN” }, “queries”: [ { “label”: “customers”, “sql”: “SELECT * FROM CUSTOMERS” }, { “label”: “orders”, “sql”: “SELECT * FROM ORDERS” } ] }

实现说明

  1. 并行处理
  • 使用Parallel.ForEachAsync进行并行查询处理
  • 默认并行度设置为处理器核心数
  • 每个查询独立使用自己的数据库连接
  1. 大文件处理
  • 使用CsvHelper进行流式写入
  • 采用异步I/O操作(ReadAsync/WriteAsync
  • 逐行处理避免内存爆炸
  1. 错误处理
  • 自动删除不完整文件
  • 详细错误日志记录
  • 异常传播与隔离设计
  1. 日志功能
  • 结构化日志格式
  • 线程安全写入
  • 包含关键性能指标
  1. 性能优化
  • 异步数据库操作
  • 并行查询执行
  • 流式结果集处理

使用说明

  1. 安装依赖: dotnet add package Snowflake.Data dotnet add package CsvHelper dotnet add package System.CommandLine
  2. 编译运行: dotnet run – –config ./config.json –output ./exports
  3. 输出结构: exports/ customers.csv orders.csv export.log

日志示例

2023-09-20 14:30:45|SUCCESS |customers |14:30:30|15.23 |1000000 |- 2023-09-20 14:31:02|ERROR |orders |14:30:45|17.12 |- |Timeout expired 此实现提供了:

  • 线程安全的并行处理
  • 完整错误处理机制
  • 详细的执行日志
  • 高效的大数据处理能力
  • 可配置的Snowflake连接参数
  • 清晰的命令行界面