데이터베이스 개발자 Tip & 강좌

SQLER의 개발자들이 만들어가는 데이터베이스 사용자 Tip & 강좌 게시판입니다. SQL서버, Oracle, MySQL 등 여러 클라우드/오픈소스 기반 데이터베이스 개발 및 운영 관련 팁과 쿼리 노하우를 이곳에서 가장 먼저 접하실 수 있습니다. 많은 도움 되시길 바랍니다.

이번 포스팅에서는 지난 포스팅에 이어서, BULK INSERT로 900만건 CSV 파일 데이터를 처리하는 과정을 수행.

CSV 파일 BULK INSERT는 아래 과정으로 처리한다.

 

Azure SQL Database로 CSV 파일 BULK INSERT - Python

 

CSV 데이터 리뷰

Blob Storage로 CSV 파일 업로드

BULK INSERT 준비

- SAS 토큰 생성

- 테이블 생성

- SCOPED CREDENTIAL 생성

- EXTERNAL DATA SOURCE 추가

BULK INSERT 실행

 

기존 On-prem의 SQL과 다르게, Azure SQL Database에서 위의 과정이 필요한 이유는, Azure SQL Database는 로컬 파일시스템에 대한 접근이 제한되기 때문이다.

Azure SQL Database only supports reading from Azure Blob Storage.

BULK INSERT (Transact-SQL) - SQL Server | Microsoft Docs

따라서, bulk insert 하려는 파일을 blob에 올려 두고 처리해는 과정이 필요하다.

 

사용된 전체 코드는 아래 github 리포지토리에서 확인 가능.

CloudBreadPaPa/azure-sql-bulk-insert: Bulk insert massive CSV data to Azure SQL Database (github.com)

 

CSV 데이터 리뷰

데이터는 1만건, 100만건으로 테스트 목적으로 split을 했다. 1만건의 데이터만 repo에 추가해 두었으며, 전체 과정 진행이 가능.

# load CSV file
import os
import pandas as pd
df = pd.read_csv("data/train_10000.csv")

data 폴더에 CSV 파일을 올려 두었고, 이렇게 구조에 대해서 확인 가능.

파일 split은 How to split large text file in windows? - Stack Overflow 문서를 참고하면 확인 가능하다.

 

Blob Storage로 CSV 파일 업로드

아래의 방법으로, CSV 파일을 blob storage로 업로드 가능하다.

csv_file_name = "train_10000.csv"
upload_file_path = os.path.join(local_path, csv_file_name)

blob_client = blob_service_client.get_blob_client(container=container_name, blob=csv_file_name)

# Upload the created file
with open(upload_file_path, "rb") as data:
    blob_client.upload_blob(data)

Python으로 blob을 처리하는 예제는 아래 링크 참조.

Quickstart: Azure Blob Storage library v12 - Python | Microsoft Docs

 

BULK INSERT 준비

SQL서비스가 blob에 접근할 경우, 당연히 보안 설정 과정이 필요하다. 이 과정을 미리 처리해 두어야 하며, 이때 blob 접근 시 SAS token을 이용한다. 

SAS 토큰 생성

지난 포스트에서 이미 SAS 토큰 생성 방안들을 리뷰했다. 간단히 코드만 리뷰.

개발자 커뮤니티 SQLER.com - Azure Blob Storage SAS token 생성

 

ef generate_sas_token(file_name):
    sas = generate_blob_sas(account_name=AZURE_ACC_NAME,
                            account_key=AZURE_PRIMARY_KEY,
                            container_name=container_name,
                            blob_name=file_name,
                            permission=BlobSasPermissions(read=True),
                            expiry=datetime.utcnow() + timedelta(hours=1))  # 1 hour expire for test

    sas_url ='https://'+AZURE_ACC_NAME+'.blob.core.windows.net/'+container_name+'/'+file_name+'?'+sas
    secret = sas
    return sas_url, secret

sas_url, secret = generate_sas_token(csv_file_name)

이렇게 SAS 토큰을 가져오는 함수를 생성하고, SAS URL과 secret을 받는다. secret만 따로 이용되기 때문에 추가로 함수에서 리턴 받는다.

 

테이블 생성

bulk insert를 위해서는 미리 테이블이 SQL에 생성되어 있어야 한다. 다음 쿼리로 테이블을 생성한다.

create table malware(
MachineIdentifier varchar(256),
ProductName varchar(256),
EngineVersion varchar(256),
AppVersion varchar(256),
AvSigVersion varchar(256),
IsBeta int,
RtpStateBitfield float(24),
IsSxsPassiveMode int,
DefaultBrowsersIdentifier float(24),
AVProductStatesIdentifier float(24),
AVProductsInstalled float(24),
AVProductsEnabled float(24),
HasTpm int,
CountryIdentifier int,
CityIdentifier float(24),
OrganizationIdentifier float(24),
GeoNameIdentifier float(24),
LocaleEnglishNameIdentifier int,
Platform varchar(256),
Processor varchar(256),
OsVer varchar(256),
OsBuild int,
OsSuite int,
OsPlatformSubRelease varchar(256),
OsBuildLab varchar(256),
SkuEdition varchar(256),
IsProtected float(24),
AutoSampleOptIn int,
PuaMode varchar(256),
SMode float(24),
IeVerIdentifier float(24),
SmartScreen varchar(256),
Firewall float(24),
UacLuaenable float(24),
Census_MDC2FormFactor varchar(256),
Census_DeviceFamily varchar(256),
Census_OEMNameIdentifier float(24),
Census_OEMModelIdentifier float(24),
Census_ProcessorCoreCount float(24),
Census_ProcessorManufacturerIdentifier float(24),
Census_ProcessorModelIdentifier float(24),
Census_ProcessorClass varchar(256),
Census_PrimaryDiskTotalCapacity float(24),
Census_PrimaryDiskTypeName varchar(256),
Census_SystemVolumeTotalCapacity float(24),
Census_HasOpticalDiskDrive int,
Census_TotalPhysicalRAM float(24),
Census_ChassisTypeName varchar(256),
Census_InternalPrimaryDiagonalDisplaySizeInInches float(24),
Census_InternalPrimaryDisplayResolutionHorizontal float(24),
Census_InternalPrimaryDisplayResolutionVertical float(24),
Census_PowerPlatformRoleName varchar(256),
Census_InternalBatteryType varchar(256),
Census_InternalBatteryNumberOfCharges float(24),
Census_OSVersion varchar(256),
Census_OSArchitecture varchar(256),
Census_OSBranch varchar(256),
Census_OSBuildNumber int,
Census_OSBuildRevision bigint,
Census_OSEdition varchar(256),
Census_OSSkuName varchar(256),
Census_OSInstallTypeName varchar(256),
Census_OSInstallLanguageIdentifier float(24),
Census_OSUILocaleIdentifier int,
Census_OSWUAutoUpdateOptionsName varchar(256),
Census_IsPortableOperatingSystem int,
Census_GenuineStateName varchar(256),
Census_ActivationChannel varchar(256),
Census_IsFlightingInternal float(24),
Census_IsFlightsDisabled float(24),
Census_FlightRing varchar(256),
Census_ThresholdOptIn float(24),
Census_FirmwareManufacturerIdentifier float(24),
Census_FirmwareVersionIdentifier float(24),
Census_IsSecureBootEnabled int,
Census_IsWIMBootEnabled float(24),
Census_IsVirtualDevice float(24),
Census_IsTouchEnabled int,
Census_IsPenCapable int,
Census_IsAlwaysOnAlwaysConnectedCapable float(24),
Wdft_IsGamer float(24),
Wdft_RegionIdentifier float(24),
HasDetections int
)
GO

이렇게 미리 쿼리를 통해 테이블을 생성해 두고 데이터를 추가한다. 쿼리 수행은 SSMS를 이용하거나, Azure Portal에서 수행도 가능하다.

Query a SQL Database using the query editor in the Azure portal (preview) - Azure SQL Database | Microsoft Docs

 

테이블 생성이 완료되면, Python으로 데이터베이스 접근을 테스트한다.

# install pyodbc in Ubuntu : https://docs.microsoft.com/en-us/sql/connect/python/pyodbc/step-1-configure-development-environment-for-pyodbc-python-development?view=sql-server-ver15#linux
# pip install pyodbc
import pyodbc

server = os.environ['SQLSERVER']
database = os.environ['DATABASE']
username = os.environ['USERNAME']
password = os.environ['PASSWORD']
driver = '{ODBC Driver 17 for SQL Server}'
db_con_str = 'DRIVER='+driver+';SERVER=tcp:'+server+';PORT=1433;DATABASE='+database+';UID='+username+';PWD='+ password

with pyodbc.connect(db_con_str) as conn:
    with conn.cursor() as cursor:
        cursor.execute("SELECT TOP 3 name, collation_name FROM sys.databases") # test run
        row = cursor.fetchone()
        while row:
            print (str(row[0]) + " " + str(row[1]))
            row = cursor.fetchone()

DB를 다룰 때는 주로 SQLAlchemy만 이용했는데, bulk insert 과정이고, 조회 처리가 없어 pyodbc를 이용해 쿼리만 처리한다.

 

Use Python to query a database - Azure SQL Database & SQL Managed Instance | Microsoft Docs

문서에서 python 구성과 Azure SQL Database 접속을 위한 여러 정보를 확인 가능하다.

 

SCOPED CREDENTIAL 생성

위에서 언급한 것처럼, SQL 서비스가 blob에 접근할 때 이용하게 되며, SAS token을 사용해 구성한다. 

전반적인 흐름을 아래 문서에서 확인 가능.

Bulk access to data in Azure Blob storage - SQL Server | Microsoft Docs

sec_cred = "scred_bulk_ins"
query = "CREATE DATABASE SCOPED CREDENTIAL " + sec_cred + " WITH IDENTITY = 'SHARED ACCESS SIGNATURE', SECRET = '" + secret + "';"
print(query)

# set "SCOPED CREDENTIAL"
with pyodbc.connect(db_con_str) as conn:
    with conn.cursor() as cursor:
        cursor.execute(query)

 

이 과정에서 다음 오류가 발생할 수 있다.

ProgrammingError: ('42000', '[42000] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Please create a master key in the database or open the master key in the session before performing this operation. (15581) (SQLExecDirectW)')

아래 문서에서 관련 내용을 찾을 수 있다.

Create a Database Master Key - SQL Server | Microsoft Docs

 

CREATE MASTER KEY ENCRYPTION BY PASSWORD = 'YOUR-MASTER-KEY';

이렇게, master key를 추가하면 해결된다.

이렇게 scoped credential을 생성했으면, 다음 과정인 external data source를 추가한다.

 

EXTERNAL DATA SOURCE 추가

일종의 mount 처리 과정이다. blob storage를 위의 credential로 SQL database가 접근 가능하도록 구성한다.

ext_ds = "malware_ds"
query = "CREATE EXTERNAL DATA SOURCE " + ext_ds + " WITH (TYPE = BLOB_STORAGE, LOCATION = 'https://" + AZURE_ACC_NAME + ".blob.core.windows.net', CREDENTIAL = " + sec_cred + ");"
print(query)

# set "EXTERNAL DATA SOURCE"
with pyodbc.connect(db_con_str) as conn:
    with conn.cursor() as cursor:
        cursor.execute(query)

 

credential과 생성하다가 data source를 추가하다가 수정할 일이 생기면 중복 불가라 약간 귀찮다.

오류가 생겨 재생성 할 경우, 아래 쿼리를 적절히 이용해 drop하고 재생성 하는 것을 권장한다.

 

--DROP DATABASE SCOPED CREDENTIAL scred_bulk_ins 
--DROP EXTERNAL DATA SOURCE malware_ds

 

이제, bulk insert를 수행하기 위한 모든 준비가 완료되었다. bulk insert를 수행한다.

 

BULK INSERT 실행

다음과 같은 python 코드로 수행 가능하다.

sql_table_name = "malware"
query = "BULK INSERT " + sql_table_name + " FROM '" + container_name + "/" + csv_file_name + "' WITH (DATA_SOURCE = '" + ext_ds + "', DATAFILETYPE = 'char', FIELDTERMINATOR = ',', ROWTERMINATOR = '0x0a', TABLOCK, CODEPAGE = 'UTF-8', FIRSTROW=2);"
print(query)

# run "BULK INSERT"
with pyodbc.connect(db_con_str) as conn:
    with conn.cursor() as cursor:
        cursor.execute(query)

 

SQL 쿼리를 먼저 살펴보면 다음과 같다.

BULK INSERT malware FROM 'bulk-insert/train_1000000.csv' WITH (DATA_SOURCE = 'malware_ds', DATAFILETYPE = 'char', FIELDTERMINATOR = ',', ROWTERMINATOR = '0x0a', TABLOCK, CODEPAGE = 'UTF-8', FIRSTROW=2);

 

여러 시행착오와 오류가 발생했다. Windows의 CRLF가 아닌 UNIX LF 이슈가 있었으며, encoding 관련 이슈로 판단된다.

 

오류가 발생한 쿼리 #1

BULK INSERT malware FROM 'bulk-insert/train_10000.csv' WITH (DATA_SOURCE = 'malware_ds', FORMAT = 'CSV', FIRSTROW=2);

위의 쿼리를 최초 이용했으나, 다음 오류가 발생했다.

Msg 7301, Level 16, State 2, Line 98
Cannot obtain the required interface ("IID_IColumnsInfo") from OLE DB provider "BULK" for linked server "(null)".

Completion time: 2021-09-27T15:26:53.6117081+09:00

Cannot obtain the required interface ("IID_IColumnsInfo") from OLE DB provider "BULK" for linked server "(null)". · Issue #408 · microsoft/sql-server-samples (github.com)

내용을 리뷰해 보완하였다.

 

오류 발생 쿼리 #2

BULK INSERT malware FROM 'bulk-insert/train_10000.csv' WITH (DATA_SOURCE = 'malware_ds', DATAFILETYPE = 'char', FIELDTERMINATOR = ',', ROWTERMINATOR = '\n', FIRSTROW=2);

Msg 4866, Level 16, State 8, Line 117
The bulk load failed. The column is too long in the data file for row 1, column 83. Verify that the field terminator and row terminator are specified correctly.
Msg 7301, Level 16, State 2, Line 117
Cannot obtain the required interface ("IID_IColumnsInfo") from OLE DB provider "BULK" for linked server "(null)".

아마도, 필드 터미네이터 관련 이슈로, UNIX LF 관련 이슈로 예상했다. 

Bulk insert, SQL Server 2000, unix linebreaks - Stack Overflow 내용을 참조해 ROWTERMINATOR를 수정했고 해결되었다.

참고로, "FIRSTROW=2"는 첫 줄 header를 건너뛰고, 데이터만 가져오는 속성이다.

이렇게 시행착오가 있었으나, 모두 잘 해결되었다.

 

BULK INSERT 실행 결과 및 속도

우선, 1만건의 데이터를 가장 작은 Azure SQL Database인 basic에서 검토했을 때

- 1만건(4.7MByte)에 약 11초가 소요되었다.

 

100만건 테스트는 기존 basic 사이즈에서 DTUs(S3) / 20GB Data 사이즈로 변경 후 수행했으며,
100만건에 267초가 소요되었다.

sp_spaceused
reserved           data               index_size         unused
------------ ------------------ ------------------ ------------------
638048 KB          634448 KB          1784 KB            1816 KB

CSV는 480M이며, 대략적으로 630M 정도의 DB 사이즈가 소요되었다.

 

최종 900만건 테스트는 

reserved           data               index_size         unused
---------- ------------------ ------------------ ------------------
5522608 KB         5518976 KB         1856 KB            1776 KB

 

2219초 = 36분이 소요되었고, DB는 약 5.5G, CSV 파일은 약 4.1G 크기였다.

 

지난 포스팅에서 테스트했던 ADF와 비교할 때 ADF는 약 1시간 20분 이상 소요되어, 절반 이하의 시간에 완료되었으며, Azure SQL Database의 스케일을 조절하거나 일부 퍼포먼스 관련 튜닝을 수행하면 조금 더 속도가 나올 것으로 예상.

아직 ADX에서 CSV로 추출하는 과정은 수행하지 않았으나, 이 과정을 고려해도 충분히 좋은 성능을 제공할 것으로 판단됨.

 

참고링크 

개발자 커뮤니티 SQLER.com - Azure Data Factory를 이용해 ADX에서 SQL로 900만건의 데이터 전송

개발자 커뮤니티 SQLER.com - Azure Data Explorer에서 SQL서버 데이베이스 테이블 조회/삽입 - sql_request plugin

개발자 커뮤니티 SQLER.com - Azure Data Explorer에 대량 CSV 파일 ingest

개발자 커뮤니티 SQLER.com - Azure Blob Storage SAS token 생성

Bulk access to data in Azure Blob storage - SQL Server | Microsoft Docs

 

No. Subject Author Date Views
2193 Azure Synapse - COPY INTO로 대용량 데이터 분산 로드 코난(김대우) 2021.10.21 16
» Azure SQL Database로 CSV 파일 BULK INSERT - Python 코난(김대우) 2021.09.27 58
2191 Azure Synapse - Spark와 SQL Data warehouse 서비스 file 코난(김대우) 2021.09.16 70
2190 Azure에서 제공하는 데이터베이스 서비스 종류, AWS 및 GCP와 제품 비교 코난(김대우) 2020.12.25 513
2189 SQL Server 트랜잭션 로그 복원시 복원 시간이 오래 걸리는 현상 jevida(강성욱) 2020.02.28 1337
2188 SQL Server 2019 temp table을 사용한 워크로드에서 recompile 감소 jevida(강성욱) 2019.09.24 1658
2187 Azure SQL Managed Instance 및 SQL Server 2016 Later에서 대기 통계 분석 jevida(강성욱) 2019.09.24 1250
2186 SQL Server 2019에서 동기 통계 업데이트시 발생하는 쿼리 Blocking 확인 jevida(강성욱) 2019.09.21 1212
2185 SQL Server 2019 Log Writer Workers jevida(강성욱) 2019.09.21 1402
2184 SQL Server Login Timeout 디버깅 jevida(강성욱) 2019.09.19 1810
2183 SQL Server Worker Thread 기본 계산 jevida(강성욱) 2019.09.18 1337
2182 SQL Linux의 fsync 및 버퍼된 IO (버퍼된 쓰기중 오류가 발생하였을때 파일은 유효할까?) jevida(강성욱) 2019.09.17 1212
2181 SQL Server와 SQL Linux에서 인스턴스 파일 초기화 차이점 jevida(강성욱) 2019.09.14 1241
2180 BCP 실행시 동일 세션에서 여러개의 BULK INSERT 문으로 표시되는 이유 jevida(강성욱) 2019.09.13 1322
2179 삭제된 AD 그룹 계정으로 SQL Server 로그인 사례 (로그인 그룹 삭제 후 조치해야할 사항) jevida(강성욱) 2019.09.12 1129
2178 QPI(Query Performance Insights) 라이브러리를 사용하여 Azure SQL Managed Instance의 로그쓰기 사용량 확인 jevida(강성욱) 2019.09.11 1034
2177 ODBC 드라이버를 사용하여 SQL Server에서 Azure CosmosDB 쿼리 실행 jevida(강성욱) 2019.09.10 1086
2176 Azure SQL Managed Instance에서 로컬 스토리지 사용량 모니터링 jevida(강성욱) 2019.09.09 1101
2175 SQL Server Enterprise Edition 에서 CPU를 40 Core 이상 사용하지 못하는 현상 jevida(강성욱) 2019.09.04 1240
2174 SQL Server Agent Job에서 sysploicy_purge_history 작업실패 jevida(강성욱) 2019.09.04 1283





XE Login