Job

Source code in geocube/entities/job.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
@dataclass
class Job:
    _stub:            Stub
    id:               str
    name:             str
    type:             str
    state:            str
    creation_time:    datetime
    last_update_time: datetime
    logs:             List[str]
    active_tasks:     int
    failed_tasks:     int
    execution_level:  ExecutionLevel
    waiting:          bool

    @classmethod
    def from_pb(cls, stub: Stub, pb_job: operations_pb2.Job):
        return Job(
            _stub=stub,
            id=pb_job.id,
            name=pb_job.name,
            type=pb_job.type,
            state=pb_job.state,
            creation_time=pb_job.creation_time.ToDatetime(),
            last_update_time=pb_job.last_update_time.ToDatetime(),
            logs=pb_job.logs,
            active_tasks=pb_job.active_tasks,
            failed_tasks=pb_job.failed_tasks,
            execution_level=ExecutionLevel(pb_job.execution_level),
            waiting=pb_job.waiting,
        )

    @utils.catch_rpc_error
    def retry(self, force: bool = False):
        """
        Retry a failed job

        Args:
            force: TO BE USED CAUTIOUSLY: retry the current state of the job, whatever the state.
                It can be unpredictable. Should only be used if the job is stuck in a pending state.

        """
        self._stub.RetryJob(operations_pb2.RetryJobRequest(id=self.id, force_any_state=force))

    @utils.catch_rpc_error
    def cancel(self, force: bool = False):
        """ Cancel the job if possible
        force: TO BE USED CAUTIOUSLY: cancel the current state of the job, whatever the state.
        It can be unpredictable. Should only be used if the job is stuck in a pending state."""
        self._stub.CancelJob(operations_pb2.CancelJobRequest(id=self.id, force_any_state=force))

    @utils.catch_rpc_error
    def next(self):
        """ Start the next step (must be in "waiting" state) """
        if not self.waiting:
            raise Exception("Job must be in waiting state")
        self._stub.ContinueJob(operations_pb2.ContinueJobRequest(id=self.id))

    @utils.catch_rpc_error
    def refresh(self, log_page=0, log_limit=1000):
        """ Reload a job from server (inplace operation) """
        res = self._stub.GetJob(operations_pb2.GetJobRequest(id=self.id, log_page=log_page, log_limit=log_limit))
        self.__dict__ = Job.from_pb(self._stub, res.job).__dict__
        return self

    def tasks_from_logs(self) -> List[Task]:
        tasks = {}
        for i, log in enumerate(self.logs):
            log_task = parse.search("Prepare {container:d} container(s) with {records:d} record(s) "
                                    "and {datasets:d} dataset(s) (Cell:{cell}, geographic: {coordinates}) (id:{taskid})", log)
            if log_task is not None:
                if i < 3:
                    warnings.warn("tasks_from_logs might have missed tasks. Please, reload job with more logs")
                # Parse coordinates
                coordinates = parse.findall("{lon:g} {lat:g}", log_task['coordinates'])
                coordinates = geometry.LinearRing([[p['lon'], p['lat']] for p in coordinates])

                tasks[log_task['taskid']] = Task(log_task['cell'], log_task['container'], log_task['records'],
                                                 log_task['datasets'], coordinates, "")
            else:
                log_task_status = parse.search("TaskEvt received with status Task{status} (id:{taskid},", log)
                if log_task_status is not None:
                    task_id = log_task_status["taskid"]
                    if task_id in tasks:
                        tasks[task_id].status = log_task_status["status"]
                    else:
                        warnings.warn(f"taskEvt for id {task_id} found, but task not found")

        return list(tasks.values())

    def deletion_job_from_logs(self) -> str:
        for log in self.logs:
            deletion_job = parse.search("Create a deletion job to delete {nb_datasets:d} dataset(s): {name:S}", log)
            if deletion_job is not None:
                return deletion_job["name"]
        return ""

    def plot_tasks(self):
        tasks = self.tasks_from_logs()
        if len(tasks) == 0:
            raise ValueError("Tasks not found from logs. Cannot display")
        color = ['r' if task.status=='Failed' else 'g' if task.status=='Successful' else 'black' for task in tasks]
        base = utils.plot_aoi(gpd.GeoSeries([task.coordinates for task in tasks]), color=color)
        for i, task in enumerate(tasks):
            center = task.coordinates.centroid
            base.text(center.x, center.y, f"{task.nb_records} rec\n{task.nb_datasets} ds",
                      ha='center', va='center', color=color[i])
        base.set_title(f"Job '{self.name}'\n"
                       f"{len(tasks)} cells ({self.active_tasks} active tasks - {self.failed_tasks} failed)")
        return base

    def __repr__(self):
        return "Job {} ({})".format(self.name, self.id)

    def __str__(self):
        if len(self.logs) > 20:
            logs = f"{self.logs[0]}\n[+{len(self.logs)-20}...]\n"+("      \n".join(self.logs[-19:]))
        else:
            logs = "      \n".join(self.logs)
        return "Job {} ({})\n" \
               "    type         {}\n" \
               "    state        {} {}\n" \
               "    creation     {}\n" \
               "    last_update  {}\n"\
               "    active_tasks {}\n"\
               "    failed_tasks {}\n"\
               "    execution    {}\n"\
               "    logs\n{}\n".format(self.name, self.id, self.type, self.state,
                                       "(waiting for user action)" if self.waiting else "",
                                       self.creation_time, self.last_update_time, self.active_tasks, self.failed_tasks,
                                       self.execution_level.name, logs)

cancel(force=False)

Cancel the job if possible force: TO BE USED CAUTIOUSLY: cancel the current state of the job, whatever the state. It can be unpredictable. Should only be used if the job is stuck in a pending state.

Source code in geocube/entities/job.py
78
79
80
81
82
83
@utils.catch_rpc_error
def cancel(self, force: bool = False):
    """ Cancel the job if possible
    force: TO BE USED CAUTIOUSLY: cancel the current state of the job, whatever the state.
    It can be unpredictable. Should only be used if the job is stuck in a pending state."""
    self._stub.CancelJob(operations_pb2.CancelJobRequest(id=self.id, force_any_state=force))

next()

Start the next step (must be in "waiting" state)

Source code in geocube/entities/job.py
85
86
87
88
89
90
@utils.catch_rpc_error
def next(self):
    """ Start the next step (must be in "waiting" state) """
    if not self.waiting:
        raise Exception("Job must be in waiting state")
    self._stub.ContinueJob(operations_pb2.ContinueJobRequest(id=self.id))

refresh(log_page=0, log_limit=1000)

Reload a job from server (inplace operation)

Source code in geocube/entities/job.py
92
93
94
95
96
97
@utils.catch_rpc_error
def refresh(self, log_page=0, log_limit=1000):
    """ Reload a job from server (inplace operation) """
    res = self._stub.GetJob(operations_pb2.GetJobRequest(id=self.id, log_page=log_page, log_limit=log_limit))
    self.__dict__ = Job.from_pb(self._stub, res.job).__dict__
    return self

retry(force=False)

Retry a failed job

Parameters:
  • force (bool, default: False ) –

    TO BE USED CAUTIOUSLY: retry the current state of the job, whatever the state. It can be unpredictable. Should only be used if the job is stuck in a pending state.

Source code in geocube/entities/job.py
66
67
68
69
70
71
72
73
74
75
76
@utils.catch_rpc_error
def retry(self, force: bool = False):
    """
    Retry a failed job

    Args:
        force: TO BE USED CAUTIOUSLY: retry the current state of the job, whatever the state.
            It can be unpredictable. Should only be used if the job is stuck in a pending state.

    """
    self._stub.RetryJob(operations_pb2.RetryJobRequest(id=self.id, force_any_state=force))