Google-Cloud-Platform

將 csv 從 CF 寫入儲存桶時:‘with open(filepath, ‘w’) as MY_CSV:’ 導致 ‘FileNotFoundError:和rrno2和rrn這2Errno 2沒有相應的文件和目錄:”

  • February 1, 2022

FileNotFoundError: [Errno 2] No such file or directory當我嘗試使用循環數據批次的 csv 寫入器將 csv 文件寫入儲存桶時,出現此錯誤。圍繞該錯誤對 Cloud Function 日誌的完整洞察:


File "/workspace/main.py", line 299, in write_to_csv_file with
open(filepath, "w") as outcsv: FileNotFoundError: [Errno 2] No such
file or directory: 'gs://MY_BUCKET/MY_CSV.csv'

Function execution took 52655 ms, finished with status: 'crash' 

OpenBLAS WARNING - could not determine the L2 cache size on this
system, assuming 256k  ```

而且,雖然這個 bucket_filepath 肯定存在:我可以上傳一個空的虛擬文件並獲取它的“gsutils URI”(右鍵點擊文件右側的三個點),並且 bucket_filepath 看起來會相同:'gs://MY_BUCKET/MY_CSV.csv'.

我檢查了保存一個虛擬的Pandas數據框,而不是使用pd.to_csv它,它使用相同的 bucket_filepath (!)。

因此,必須有另一個原因,可能是作者不被接受,或者with statement打開文件。

引發錯誤的程式碼如下。它與在本地伺服器上的正常 cron 作業中在 Google Cloud Function 之外工作的程式碼相同。我在拋出錯誤的行周圍添加了兩個調試列印,print("Right after opening the file ...")不再顯示。還顯示了為每個批次呼叫的子函式query_execute_batch()write_to_csv_file()但這裡可能不是問題,因為在寫入打開 csv 文件時,錯誤已經在一開始就發生了。

requirements.txt(然後作為模組導入):

SQLAlchemy>=1.4.2
google-cloud-storage>=1.16.1
mysqlclient==2.1.0
pandas==1.2.3
fsspec==2021.11.1
gcsfs==2021.11.1
unicodecsv==0.14.1

main.py

def query_execute_batch(connection):
   """Function for reading data from the query result into batches
   :yield: each result in a loop is a batch of the query result
   """
   results = execute_select_batch(connection, SQL_QUERY)
   print(f"len(results): {len(results)}")
   for result in results:
       yield result

def write_to_csv_file(connection, filepath):
   """Write the data in a loop over batches into a csv.
   This is done in batches since the query from the database is huge.
   :param connection: mysqldb connection to DB
   :param filepath: path to csv file to write data
   returns: metadata on rows and time
   """
   countrows = 0
   print("Right before opening the file ...")    
   with open(filepath, "w") as outcsv:
       print("Right after opening the file ...")        
       writer = csv.DictWriter(
           outcsv,
           fieldnames=FIELDNAMES,
           extrasaction="ignore",
           delimiter="|",
           lineterminator="\n",
       )
       # write header according to fieldnames
       writer.writeheader()

       for batch in query_execute_batch(connection):
           writer.writerows(batch)
           countrows += len(batch)
       datetime_now_save = datetime.now()
   return countrows, datetime_now_save

請注意,為了使上述腳本正常工作,我導入gcsfs了這使得儲存桶可讀寫。否則我可能需要一個Google云儲存對象,例如:

storage_client = storage.Client()
bucket = storage_client.bucket(BUCKET_NAME)

然後使該儲存桶中的文件具有更多功能,但這不是這裡的目的。

在下面的pd.to_csv程式碼中,它使用虛擬 SQL 查詢的輸出SELECT 1作為數據幀的輸入。這可以保存到同一個bucket_filepath,當然原因可能不僅僅是pd.to_csv()這樣,而且數據集是一個虛擬的,而不是來自一個巨大的SELECT query. 或者還有其他原因,我只是猜測。

if records is not None:
   df = pd.DataFrame(records.fetchall())
   df.columns = records.keys()
   df.to_csv(filepath,
       index=False,
   )
   datetime_now_save = datetime.now()
   countrows = df.shape[0]

我想使用 csv 編寫器有機會使用 unicodecsv 模組編寫 unicode 並有機會使用批處理。

我可能願意更改為 pandas 中的批處理(loop + appendmode 或chunksize),例如將大型 Pandas Dataframes to CSV file in chunks以擺脫此儲存桶文件路徑問題,但我寧願使用現成的程式碼(切勿觸摸正在執行的系統)。

如何使用 csv 編寫器完成該 csv 的保存,以便它可以在write模式 =的儲存桶中打開一個新文件with open(filepath, "w") as outcsv:

給定的函式write_to_csv_file()只是雲函式的一小部分,它使用了廣泛的函式和級聯函式。我不能在這裡展示整個可重現的案例,希望可以通過經驗或更簡單的例子來回答。

解決方案令人驚訝。如果使用. gcsfs_open()

如果您使用pd.to_csv(),import gcsfs則不需要,但在make work*gcsfs中仍然需要requirements.txt``pd.to_csv()**,因此,pandasto_csv()似乎會自動使用它。*

拋開驚喜不談,這裡pd.to_csv()是回答問題的程式碼(經過測試):

def write_to_csv_file(connection, filepath):
   """Write the QUERY result in a loop over batches into a csv.
   This is done in batches since the query from the database is huge.
   :param connection: mysqldb connection to DB
   :param filepath: path to csv file to write data
   return: metadata on rows and time
   """
   countrows = 0
   print("Right before opening the file ...")
  

   # A gcsfs object is needed to open a file.
   # https://stackoverflow.com/questions/52805016/how-to-open-a-file-from-google-cloud-storage-into-a-cloud-function
   # https://gcsfs.readthedocs.io/en/latest/index.html#examples
   # Side-note (Exception):
   # pd.to_csv() needs neither the gcsfs object, nor its import.
   # It is not used here, but it has been tested with examples.
   fs = gcsfs.GCSFileSystem(project=MY_PROJECT)
   fs.ls(BUCKET_NAME)


   # wb needed, else "builtins.TypeError: must be str, not bytes"
   # https://stackoverflow.com/questions/5512811/builtins-typeerror-must-be-str-not-bytes
   with fs.open(filepath, 'wb') as outcsv:
       print("Right after opening the file ...")

       writer = csv.DictWriter(
           outcsv,
           fieldnames=FIELDNAMES,
           extrasaction="ignore",
           delimiter="|",
           lineterminator="\n",
       )
       # write header according to fieldnames
       print("before writer.writeheader()")
       writer.writeheader()
       print("after writer.writeheader()")

       for batch in query_execute_batch(connection):
           writer.writerows(batch)
           countrows += len(batch)
       datetime_now_save = datetime.now()
   return countrows, datetime_now_save

邊注

不要像這樣使用 csv 編寫器。

這需要很長時間,而不是pd.to_csv()參數chunksize為 5000 的 700k 行只需 62 秒即可載入並作為 csv 儲存在儲存桶中,具有批量寫入器的 CF 需要超過 9 分鐘,超過超時限制。因此,我被迫使用pd.to_csv()並將我的數據轉換為數據框。

引用自:https://serverfault.com/questions/1091789