將 csv 從 CF 寫入儲存桶時:‘with open(filepath, ‘w’) as MY_CSV:’ 導致 ‘FileNotFoundError:和rrno2和rrn這2Errno 2沒有相應的文件和目錄:”
FileNotFoundError: [Errno 2] No such file or directory
當我嘗試使用循環數據批次的 csv 寫入器將 csv 文件寫入儲存桶時,出現此錯誤。圍繞該錯誤對 Cloud Function 日誌的完整洞察:File "/workspace/main.py", line 299, in write_to_csv_file with open(filepath, "w") as outcsv: FileNotFoundError: [Errno 2] No such file or directory: 'gs://MY_BUCKET/MY_CSV.csv' Function execution took 52655 ms, finished with status: 'crash' OpenBLAS WARNING - could not determine the L2 cache size on this system, assuming 256k ```
而且,雖然這個 bucket_filepath 肯定存在:我可以上傳一個空的虛擬文件並獲取它的“gsutils URI”(右鍵點擊文件右側的三個點),並且 bucket_filepath 看起來會相同:
'gs://MY_BUCKET/MY_CSV.csv'
.我檢查了保存一個虛擬的Pandas數據框,而不是使用
pd.to_csv
它,它使用相同的 bucket_filepath (!)。因此,必須有另一個原因,可能是作者不被接受,或者
with statement
打開文件。引發錯誤的程式碼如下。它與在本地伺服器上的正常 cron 作業中在 Google Cloud Function 之外工作的程式碼相同。我在拋出錯誤的行周圍添加了兩個調試列印,
print("Right after opening the file ...")
不再顯示。還顯示了為每個批次呼叫的子函式query_execute_batch()
,write_to_csv_file()
但這裡可能不是問題,因為在寫入打開 csv 文件時,錯誤已經在一開始就發生了。
requirements.txt
(然後作為模組導入):SQLAlchemy>=1.4.2 google-cloud-storage>=1.16.1 mysqlclient==2.1.0 pandas==1.2.3 fsspec==2021.11.1 gcsfs==2021.11.1 unicodecsv==0.14.1
從
main.py
:def query_execute_batch(connection): """Function for reading data from the query result into batches :yield: each result in a loop is a batch of the query result """ results = execute_select_batch(connection, SQL_QUERY) print(f"len(results): {len(results)}") for result in results: yield result def write_to_csv_file(connection, filepath): """Write the data in a loop over batches into a csv. This is done in batches since the query from the database is huge. :param connection: mysqldb connection to DB :param filepath: path to csv file to write data returns: metadata on rows and time """ countrows = 0 print("Right before opening the file ...") with open(filepath, "w") as outcsv: print("Right after opening the file ...") writer = csv.DictWriter( outcsv, fieldnames=FIELDNAMES, extrasaction="ignore", delimiter="|", lineterminator="\n", ) # write header according to fieldnames writer.writeheader() for batch in query_execute_batch(connection): writer.writerows(batch) countrows += len(batch) datetime_now_save = datetime.now() return countrows, datetime_now_save
請注意,為了使上述腳本正常工作,我導入
gcsfs
了這使得儲存桶可讀寫。否則我可能需要一個Google云儲存對象,例如:storage_client = storage.Client() bucket = storage_client.bucket(BUCKET_NAME)
然後使該儲存桶中的文件具有更多功能,但這不是這裡的目的。
在下面的
pd.to_csv
程式碼中,它使用虛擬 SQL 查詢的輸出SELECT 1
作為數據幀的輸入。這可以保存到同一個bucket_filepath,當然原因可能不僅僅是pd.to_csv()
這樣,而且數據集是一個虛擬的,而不是來自一個巨大的SELECT query
. 或者還有其他原因,我只是猜測。if records is not None: df = pd.DataFrame(records.fetchall()) df.columns = records.keys() df.to_csv(filepath, index=False, ) datetime_now_save = datetime.now() countrows = df.shape[0]
我想使用 csv 編寫器有機會使用 unicodecsv 模組編寫 unicode 並有機會使用批處理。
我可能願意更改為 pandas 中的批處理(
loop + append
mode 或chunksize
),例如將大型 Pandas Dataframes to CSV file in chunks以擺脫此儲存桶文件路徑問題,但我寧願使用現成的程式碼(切勿觸摸正在執行的系統)。如何使用 csv 編寫器完成該 csv 的保存,以便它可以在
write
模式 =的儲存桶中打開一個新文件with open(filepath, "w") as outcsv:
?給定的函式
write_to_csv_file()
只是雲函式的一小部分,它使用了廣泛的函式和級聯函式。我不能在這裡展示整個可重現的案例,希望可以通過經驗或更簡單的例子來回答。
解決方案令人驚訝。如果要使用.
gcsfs
_open()
如果您使用
pd.to_csv()
,import gcsfs
則不需要,但在make work*gcsfs
中仍然需要requirements.txt``pd.to_csv()
**,因此,pandasto_csv()
似乎會自動使用它。*拋開驚喜不談,這裡
pd.to_csv()
是回答問題的程式碼(經過測試):def write_to_csv_file(connection, filepath): """Write the QUERY result in a loop over batches into a csv. This is done in batches since the query from the database is huge. :param connection: mysqldb connection to DB :param filepath: path to csv file to write data return: metadata on rows and time """ countrows = 0 print("Right before opening the file ...") # A gcsfs object is needed to open a file. # https://stackoverflow.com/questions/52805016/how-to-open-a-file-from-google-cloud-storage-into-a-cloud-function # https://gcsfs.readthedocs.io/en/latest/index.html#examples # Side-note (Exception): # pd.to_csv() needs neither the gcsfs object, nor its import. # It is not used here, but it has been tested with examples. fs = gcsfs.GCSFileSystem(project=MY_PROJECT) fs.ls(BUCKET_NAME) # wb needed, else "builtins.TypeError: must be str, not bytes" # https://stackoverflow.com/questions/5512811/builtins-typeerror-must-be-str-not-bytes with fs.open(filepath, 'wb') as outcsv: print("Right after opening the file ...") writer = csv.DictWriter( outcsv, fieldnames=FIELDNAMES, extrasaction="ignore", delimiter="|", lineterminator="\n", ) # write header according to fieldnames print("before writer.writeheader()") writer.writeheader() print("after writer.writeheader()") for batch in query_execute_batch(connection): writer.writerows(batch) countrows += len(batch) datetime_now_save = datetime.now() return countrows, datetime_now_save
邊注
不要像這樣使用 csv 編寫器。
這需要很長時間,而不是
pd.to_csv()
參數chunksize
為 5000 的 700k 行只需 62 秒即可載入並作為 csv 儲存在儲存桶中,具有批量寫入器的 CF 需要超過 9 分鐘,超過超時限制。因此,我被迫使用pd.to_csv()
並將我的數據轉換為數據框。